To load data incrementally into Dynamodb, we need a date/id column. Dynamodb supports limited query features. Therefore, we will create a Global Secondary Index on a date column (create_datetime) as a sort key and a static temporary column (gsi_sk) as partition key with value of "1". our table name for this demo is "test_inc_tble"
The above Global Secondary Index on Dynamodb table will help us in defining incremental load query. with create_datetime as our sort key and gsi_sk as partition key, we can fetch the last maximum create_datetime by boto3 library's Query function. We will sort all the items in the Dynamodb table in descending order based on the sort key defined in the secondary index (create_datetime) and fetch top row item only. This will give us the maximum create_datetime.
dynamodb = boto3.client("dynamodb")
response = dynamodb.query(
TableName="test_inc_tble",
IndexName="gsi_sk-create_datetime-index",
Select='SPECIFIC_ATTRIBUTES',
ExpressionAttributeValues={
':v1': {
'N': '1',
},
},
KeyConditionExpression="gsi_sk = :v1",
ProjectionExpression='create_datetime',
Limit=1,
ScanIndexForward=False
)
Once we have the maximum create_datetime value, we will build our redshift query to fetch data greater than the maximum create_datetime.
f"""SELECT *,1 as GSI_SK FROM peds_ops_dataservices_nonprod_dc_db.public.analytics_ops_allscans where pe_create_datetime > '{max_pe_create_datetime_value}' """
The last step is to write incremental data into Dynamodb.
glueContext.write_dynamic_frame_from_options(
frame=AmazonRedshift_node1725042760130,
connection_type="dynamodb",
connection_options={"dynamodb.output.tableName": "test_inc_tble",
"dynamodb.throughput.write.percent": "1.0"
}
)
Now, we will go to AWS Glue and start building our pipeline.
Paste the below code in the Glue Script:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()
logger.info('Hello Glue')
# Use the Query operation to retrieve the first item in the index in descending order
dynamodb = boto3.client("dynamodb")
response = dynamodb.query(
TableName="test_inc_tble",
IndexName="gsi_sk-create_datetime-index",
Select='SPECIFIC_ATTRIBUTES',
ExpressionAttributeValues={
':v1': {
'N': '1',
},
},
KeyConditionExpression="gsi_sk = :v1",
ProjectionExpression='create_datetime',
Limit=1,
ScanIndexForward=False
)
# Extract the max(create_datetime) from the response
max_create_datetime_value = response['Items'][0]["create_datetime"].get('S')
logger.info("max_partition_key_value is:"+ max_create_datetime_value)
# fetch incremental data from redshift data share based on max(create_datetime)
AmazonRedshift_node1725042760130 = glueContext.create_dynamic_frame.from_options(connection_type="redshift", connection_options={"sampleQuery": f"""SELECT *,1 as GSI_SK FROM public.allscans_tbl where create_datetime > '{max_create_datetime_value}' """, "redshiftTmpDir": "s3://aws-glue-assets-782783949364-us-east-1/temporary/", "useConnectionProperties": "true", "connectionName": "Redshift-connection-datashare-2"}, transformation_ctx="AmazonRedshift_getmaxtimestamp")
#write incremental data to dynamodb
glueContext.write_dynamic_frame_from_options(
frame=AmazonRedshift_node1725042760130,
connection_type="dynamodb",
connection_options={"dynamodb.output.tableName": "test_inc_tble",
"dynamodb.throughput.write.percent": "1.0"
}
)
job.commit()
No comments:
Post a Comment