Menu

Incremental Load of Redshift data into Dynamodb table using AWS Glue

In today's post we will load incremental data from Redshift into Amazon Dynamodb using AWS Glue.


To load data incrementally into Dynamodb, we need a date/id column. Dynamodb supports limited query features. Therefore, we will create a Global Secondary Index on a date column (create_datetime) as a sort key and a static temporary column (gsi_sk) as partition key with value of "1". our table name for this demo is "test_inc_tble"

The above Global Secondary Index on Dynamodb table will help us in defining incremental load query. with create_datetime as our sort key and gsi_sk as partition key, we can fetch the last maximum create_datetime by boto3 library's Query function. We will sort all the items in the Dynamodb table in descending order based on the sort key defined in the secondary index (create_datetime) and fetch top row item only. This will give us the maximum create_datetime.

dynamodb = boto3.client("dynamodb")
response = dynamodb.query(
    TableName="test_inc_tble",
    IndexName="gsi_sk-create_datetime-index",
    Select='SPECIFIC_ATTRIBUTES',
    ExpressionAttributeValues={
        ':v1': {
            'N': '1',
        },
    },
    KeyConditionExpression="gsi_sk = :v1",
    ProjectionExpression='create_datetime',
    Limit=1,
    ScanIndexForward=False 
)

Once we have the maximum create_datetime value, we will build our redshift query to fetch data greater than the maximum create_datetime.

 f"""SELECT  *,1 as GSI_SK FROM peds_ops_dataservices_nonprod_dc_db.public.analytics_ops_allscans where pe_create_datetime > '{max_pe_create_datetime_value}' """

The last step is to write incremental data into Dynamodb.

glueContext.write_dynamic_frame_from_options(
    frame=AmazonRedshift_node1725042760130,
    connection_type="dynamodb",
    connection_options={"dynamodb.output.tableName": "test_inc_tble",
        "dynamodb.throughput.write.percent": "1.0"
    }
)

Now, we will go to AWS Glue and start building our pipeline. 

Paste the below code in the Glue Script:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

logger = glueContext.get_logger()
logger.info('Hello Glue')

# Use the Query operation to retrieve the first item in the index in descending order
dynamodb = boto3.client("dynamodb")
response = dynamodb.query(
    TableName="test_inc_tble",
    IndexName="gsi_sk-create_datetime-index",
    Select='SPECIFIC_ATTRIBUTES',
    ExpressionAttributeValues={
        ':v1': {
            'N': '1',
        },
    },
    KeyConditionExpression="gsi_sk = :v1",
    ProjectionExpression='create_datetime',
    Limit=1,
    ScanIndexForward=False 
)

# Extract the max(create_datetime) from the response
max_create_datetime_value = response['Items'][0]["create_datetime"].get('S')

logger.info("max_partition_key_value is:"+ max_create_datetime_value)

# fetch incremental data from redshift data share based on max(create_datetime)
AmazonRedshift_node1725042760130 = glueContext.create_dynamic_frame.from_options(connection_type="redshift", connection_options={"sampleQuery": f"""SELECT  *,1 as GSI_SK FROM public.allscans_tbl where create_datetime > '{max_create_datetime_value}' """, "redshiftTmpDir": "s3://aws-glue-assets-782783949364-us-east-1/temporary/", "useConnectionProperties": "true", "connectionName": "Redshift-connection-datashare-2"}, transformation_ctx="AmazonRedshift_getmaxtimestamp")

#write incremental data to dynamodb
glueContext.write_dynamic_frame_from_options(
    frame=AmazonRedshift_node1725042760130,
    connection_type="dynamodb",
    connection_options={"dynamodb.output.tableName": "test_inc_tble",
        "dynamodb.throughput.write.percent": "1.0"
    }
)

job.commit()


No comments:

Post a Comment