EI43cFjvB9dChSgCy+7stUHkbz247A6AEO1 Gl6O97hlJJirQl7CD3oxhpgtJj7B2 U2Z iGkmOq6RSi

event_driven

Event-Driven Ingestion and Processing¶
Oftentimes, we want to ingest and process data as it is being collected. Say, for instance, that we are working with the city of Chicago to study noise levels around the city (and help them develop a rapid response system for potential noise pollution issues).

We have installed thousands of sensors in different neighborhoods throughout the city that record the number of decibels measured by the sensor at a particular point in time. This data is being collected and sent into the cloud on up to a second-by-second basis (note, though, that the data stream is bursty; no information is sent from the sensor if the decibels are below a certain threshold) and needs to be processed and entered into a database as quickly as possible for further analysis downstream (e.g. identifying locations that are overly noisy for a duration that is deemed unacceptable).

The data being sent from the sensors into our cloud ecosystem looks like so (from sensor 1 within the Hyde Park location and sensor 1 within the Woodlawn location):

hyde_park_test = {
“location”: “HydePark”,
“sensor”: “01”,
“timestamp”: “0909092022”,

woodlawn_test = {
“location”: “Woodlawn”,
“sensor”: “01”,
“timestamp”:”0101012022″,

Employing an SQS Queue in front of our Lambda Functions

We could simply invoke a Lambda function from each of the sensors, but as discussed in lecture, we may quickly run into issues with Lambda concurrency as we scale into the thousands of sensors if we’re not careful. It would be better for us to establish some sort of a buffer that we can set up to automatically scale our Lambda function invokations based on the number of items that need to be processed — e.g. an SQS queue.

Let’s load the packages we need for today (and create all of the boto3 client objects we will need to architect our event-driven pipelines):

import boto3
import json

sqs = boto3.client(‘sqs’)
sns = boto3.client(‘sns’)
aws_lambda = boto3.client(‘lambda’)
iam_client = boto3.client(‘iam’)
role = iam_client.get_role(RoleName=’LabRole’)
dynamodb = boto3.resource(‘dynamodb’)
dynamo_client = boto3.client(‘dynamodb’)
kinesis = boto3.client(‘kinesis’)

Let’s deploy such an architecture, where data from our sensors is sent into an SQS queue, which automatically invokes a Lambda function (scaling out to whatever concurrency is necessary to process the events in the queue), which then writes these results into a DynamoDB database table:

# Create Lambda Function
with open(‘deployment-packages/6M-1.zip’, ‘rb’) as f:
lambda_zip = f.read()

# If function hasn’t yet been created, create it
response = aws_lambda.create_function(
FunctionName=’6M’,
Runtime=’python3.9′,
Role=role[‘Role’][‘Arn’],
Handler=’lambda_function.lambda_handler’,
Code=dict(ZipFile=lambda_zip),
except aws_lambda.exceptions.ResourceConflictException:
# If function already exists, update it based on zip
# file contents
response = aws_lambda.update_function_code(
FunctionName=’6M’,
ZipFile=lambda_zip

lambda_arn = response[‘FunctionArn’]

# Create SQS Queue
queue_url = sqs.create_queue(QueueName=’6M’)[‘QueueUrl’]
except sqs.exceptions.QueueNameExists:
queue_url = [url
for url in sqs.list_queues()[‘QueueUrls’]
if ‘6M’ in url][0]

sqs_info = sqs.get_queue_attributes(QueueUrl=queue_url,
AttributeNames=[‘QueueArn’])
sqs_arn = sqs_info[‘Attributes’][‘QueueArn’]

# Trigger Lambda Function when new messages enter SQS Queue
response = aws_lambda.create_event_source_mapping(
EventSourceArn=sqs_arn,
FunctionName=’6M’,
Enabled=True,
BatchSize=10
except aws_lambda.exceptions.ResourceConflictException:
es_id = aws_lambda.list_event_source_mappings(
EventSourceArn=sqs_arn,
FunctionName=’6M’
)[‘EventSourceMappings’][0][‘UUID’]

response = aws_lambda.update_event_source_mapping(
UUID=es_id,
FunctionName=’6M’,
Enabled=True,
BatchSize=10

# Create DynamoDB Table for storing processed data
# ‘id’ will just be a concatenated version of
# location + sensor + date for sake of demo
table = dynamodb.create_table(
TableName=’6M_DB’,
KeySchema=[
‘AttributeName’: ‘id’,
‘KeyType’: ‘HASH’
AttributeDefinitions=[
‘AttributeName’: ‘id’,
‘AttributeType’: ‘S’
ProvisionedThroughput={
‘ReadCapacityUnits’: 1,
‘WriteCapacityUnits’: 1
except dynamo_client.exceptions.ResourceInUseException:
table = dynamodb.Table(‘6M_DB’)

# Wait until AWS confirms that table exists before moving on
table.meta.client.get_waiter(‘table_exists’).wait(TableName=’6M_DB’)

# get data about table (should currently be no items in table)
print(“SQS -> Lambda -> DynamoDB Architecture has been Launched”)

SQS -> Lambda -> DynamoDB Architecture has been Launched

Imagining that some of our sensors have collected data and are ready to send it to the cloud, let’s send our test data into our SQS queue to see how this works.

for test in [hyde_park_test, woodlawn_test]:
response = sqs.send_message(QueueUrl=queue_url,
MessageBody=json.dumps(test))

# Go to console to show that Lambda was invoked and DB shows SQS as source
# of data; e.g. followed the pipeline as expected

Event Driven Domain Decomposition (via SNS and SQS)

We might run into situations where our SQS queue becomes a bottleneck (remember that each queue supports a maximum of 1000 concurrent Lambda invocations). To scale further, we might attempt to perform an event-driven form of domain decomposition, where we filter data by some variable (for instance, by neighborhood) and send data to multiple queues based on the value of this variable (which will increase our concurrent Lambda function invocation by the number of queues in the system — up to the maximum Lambda concurrency available in our account).

We can perform such filtering by first publishing all new data to an SNS topic and then subscribing multiple SQS queues to the topic. To perform domain decomposition, we can apply a filter term to our subscriptions, whereby each SQS queue receives a different subset of the data (e.g. only the data from a particular neighborhood, or by some numerical threshold if we want).

Let’s launch such an architecture where we send data from each neighborhood to its own queue (e.g. publish Woodlawn data -> SNS Topic -> Woodlawn-specific SQS queue received data -> triggers Lambda function to process data -> writes to DynamoDB).

# Write function that will give SNS permission to write to SQS queues
def allow_sns_to_write_to_sqs(topicarn, queuearn):
policy_document = “””{{
“Version”:”2008-10-17″,
“Statement”:[
“Effect”:”Allow”,
“Principal” : {{
“Service”: “sns.amazonaws.com”
“Action”: “sqs:SendMessage”,
“Resource”: “{}”,
“Condition”:{{
“ArnEquals”:{{
“aws:SourceArn”: “{}”
}}”””.format(queuearn, topicarn)

return policy_document

# Create SNS Topic if doesn’t already exist
sns_topic_arn = sns.create_topic(Name=’6M_DomainDecomp’)[‘TopicArn’]

sns_topic_arn = [i[‘Topics’][0][‘TopicArn’]
for i in sns.get_paginator(‘list_topics’).paginate()
if ‘6M_DomainDecomp’ in i[‘Topics’][0][‘TopicArn’]][0]

for neighborhood in [‘HydePark’, ‘Woodlawn’]:
# Create an SQS queue for each neighborhood (domain decomposition):
queue_url = sqs.create_queue(QueueName=neighborhood)[‘QueueUrl’]
except sqs.exceptions.QueueNameExists:
queue_url = [url for url in sqs.list_queues()[‘QueueUrls’] if neighborhood in url][0]

sqs_info = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=[‘QueueArn’])
sqs_arn = sqs_info[‘Attributes’][‘QueueArn’]

# Trigger the same Lambda Function when new messages enter into either SQS Queue
response = aws_lambda.create_event_source_mapping(
EventSourceArn=sqs_arn,
FunctionName=’6M’,
Enabled=True,
BatchSize=10
except aws_lambda.exceptions.ResourceConflictException:
es_id = aws_lambda.list_event_source_mappings(
EventSourceArn=sqs_arn,
FunctionName=’6M’
)[‘EventSourceMappings’][0][‘UUID’]

response = aws_lambda.update_event_source_mapping(
UUID=es_id,
FunctionName=’6M’,
Enabled=True,
BatchSize=10

# Subscribe each queue to SNS Topic, filtering message attributes based on location
# e.g. sensor data from Woodlawn -> Woodlawn SQS queue
response = sns.subscribe(TopicArn=sns_topic_arn,
Protocol=’sqs’,
Endpoint=sqs_arn,
Attributes={‘RawMessageDelivery’: ‘true’,
‘FilterPolicy’: json.dumps({‘location’: [neighborhood]})

# Allow SNS to write to SQS queue
policy_json = allow_sns_to_write_to_sqs(sns_topic_arn, sqs_arn)
response = sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={
‘Policy’ : policy_json

print(‘SNS -> SQS (2) -> Lambda -> DynamoDB Architecture Launched’)

SNS -> SQS (2) -> Lambda -> DynamoDB Architecture Launched

# Publish test data to SNS Topic
# ‘MessageAttributes’ section is what SNS uses to filter
for test in [hyde_park_test, woodlawn_test]:
sns.publish(TopicArn = sns_topic_arn,
Message = json.dumps(test),
MessageAttributes={
‘location’ : {
‘DataType’: ‘String’,
‘StringValue’: test[‘location’]
# Look in DynamoDB to confirm that used different queues for different neighborhoods

Event-Driven Functional Decomposition

We can also use this SNS approach to send the same data to different Lambda functions that will do different processing tasks at the same time (i.e. functional decomposition). This could mean writing to multiple databases at the same time, or just performing two separate, intensive calculations. Let’s imagine, for instance, that for each sensor reading ingested into the cloud, we needed to perform 2 transformations (for the sake of this demo, let’s say that you needed to add 1 to the number of decibels recorded and also subtract 1) before writing those values into our database. We could perform both transformations at once if we incorporated different transformations in different Lambda functions. Let’s create such a pipeline:

# Create SNS Topic if doesn’t already exist
sns_topic_arn = sns.create_topic(Name=’6M_FunctionalDecomp’)[‘TopicArn’]

sns_topic_arn = [i[‘Topics’][0][‘TopicArn’]
for i in sns.get_paginator(‘list_topics’).paginate()
if ‘6M_FunctionalDecomp’ in i[‘Topics’][0][‘TopicArn’]][0]

dpkgs = [‘6M-2a’, ‘6M-2b’]
for pkg in dpkgs:
# Create Lambda Function (two different functions that will transform decibels)
with open(‘deployment-packages/{}.zip’.format(pkg), ‘rb’) as f:
lambda_zip = f.read()

# If function hasn’t yet been created, create it
response = aws_lambda.create_function(
FunctionName=pkg,
Runtime=’python3.9′,
Role=role[‘Role’][‘Arn’],
Handler=’lambda_function.lambda_handler’,
Code=dict(ZipFile=lambda_zip),
except aws_lambda.exceptions.ResourceConflictException:
# If function already exists, update it based on zip
# file contents
response = aws_lambda.update_function_code(
FunctionName=pkg,
ZipFile=lambda_zip

# Create an SQS queue to stand in front of each function:
queue_url = sqs.create_queue(QueueName=pkg)[‘QueueUrl’]
except sqs.exceptions.QueueNameExists:
queue_url = [url for url in sqs.list_queues()[‘QueueUrls’] if pkg in url][0]

sqs_info = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=[‘QueueArn’])
sqs_arn = sqs_info[‘Attributes’][‘QueueArn’]

# Trigger different Lambda Functions for all new data (functional decomp)
response = aws_lambda.create_event_source_mapping(
EventSourceArn=sqs_arn,
FunctionName=pkg,
Enabled=True,
BatchSize=10
except aws_lambda.exceptions.ResourceConflictException:
es_id = aws_lambda.list_event_source_mappings(
EventSourceArn=sqs_arn,
FunctionName=pkg
)[‘EventSourceMappings’][0][‘UUID’]

response = aws_lambda.update_event_source_mapping(
UUID=es_id,
FunctionName=pkg,
Enabled=True,
BatchSize=10

# Subscribe each queue to SNS Topic, no filter; each queue receives all messages
response = sns.subscribe(TopicArn=sns_topic_arn,
Protocol=’sqs’,
Endpoint=sqs_arn,
Attributes={‘RawMessageDelivery’: ‘true’}

# Allow SNS to write to SQS queue
policy_json = allow_sns_to_write_to_sqs(sns_topic_arn, sqs_arn)
response = sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={
‘Policy’ : policy_json

print(‘SNS -> SQS (2) -> Lambda (2) -> DynamoDB Architecture Launched’)

# Subscribe SQS to SNS Topic and publish data to the topic; mimic different devices sending a message into our AWS ecosystem

SNS -> SQS (2) -> Lambda (2) -> DynamoDB Architecture Launched

Imagining that some sensors publish data into our new SNS topic, we can take a look at how this approach can allow us to update two different elements in our database at once:

# Publish test data to SNS Topic (not filtering, so don’t need MessageAttributes):
for test in [hyde_park_test, woodlawn_test]:
sns.publish(TopicArn = sns_topic_arn,
Message = json.dumps(test)
# Should see transformations from both Lambda functions in DynamoDB

This approach becomes even more powerful as we begin to combine domain and functional decomposition — e.g. filtering by “dangerous” decibel numbers and sending potentially dangerous measurements to a “priority” queue that will notify law enforcement immediately, while processing the remaining measurements through a standard queue (or set of queues).

Note that we’re only scratching the surface here of what we can do with event-driven processing in the AWS Cloud. For instance, we can also use other AWS services as event sources for Lambda functions (S3 and DynamoDB to name a couple) — so when a new file is uploaded to S3 (or data is entered into a DynamoDB table), we could perform some immediately trigger a Lambda function to perform some further processing. I’ll leave this as an exercise for you to explore this functionality in great detail on your own and think about you can automate and produce long-running, scalable serverless pipelines in this way.

Kinesis Streams

What if we want to replay a series of events? AWS Kinesis streams might be a better option for ingestion in this case, as they allow us to replay streaming data in the order that it arrived, as well as have multiple consumers reading from the same stream (if you provision enough shards to support them — Kinesis does not provision throughput at read time as dynamically as SQS does, as mentioned in lecture). We can set up a Kinesis stream as a trigger for Lambda functions just as we did SQS:

# Create Lambda Function
with open(‘deployment-packages/6M-3.zip’, ‘rb’) as f:
lambda_zip = f.read()

# If function hasn’t yet been created, create it
response = aws_lambda.create_function(
FunctionName=’6M_Streaming’,
Runtime=’python3.9′,
Role=role[‘Role’][‘Arn’],
Handler=’lambda_function.lambda_handler’,
Code=dict(ZipFile=lambda_zip),
except aws_lambda.exceptions.ResourceConflictException:
# If function already exists, update it based on zip
# file contents
response = aws_lambda.update_function_code(
FunctionName=’6M_Streaming’,
ZipFile=lambda_zip

lambda_arn = response[‘FunctionArn’]

response = kinesis.create_stream(StreamName=’6M’,
ShardCount=1

# Is the stream active and ready to be written to/read from?
waiter = kinesis.get_waiter(‘stream_exists’)
waiter.wait(StreamName=’6M’)
print(‘Kinesis Stream created’)
print(‘Kinesis Stream already exists’)

kinesis_info = kinesis.describe_stream(StreamName=’6M’)
kinesis_arn = kinesis_info[‘StreamDescription’][‘StreamARN’]

# Trigger Lambda Function when new messages enter Kinesis Stream
# LATEST = start reading just after the most record in the shard
response = aws_lambda.create_event_source_mapping(
EventSourceArn=kinesis_arn,
FunctionName=’6M_Streaming’,
Enabled=True,
BatchSize=1,
StartingPosition=’LATEST’
except aws_lambda.exceptions.ResourceConflictException:
es_id = aws_lambda.list_event_source_mappings(
EventSourceArn=kinesis_arn,
FunctionName=’6M_Streaming’
)[‘EventSourceMappings’][0][‘UUID’]

response = aws_lambda.update_event_source_mapping(
UUID=es_id,
FunctionName=’6M_Streaming’,
Enabled=True,
BatchSize=1

Kinesis Stream created

And then pass data into the stream just as before:

for test in [hyde_park_test, woodlawn_test]:
kinesis.put_record(StreamName=’6M’,
Data=json.dumps(test),
PartitionKey=’partitionkey’)

# Go to console to check to confirm DB is updated from different source

Then, even after processing the stream via our Lambda function, we can go back and replay the streaming events from the start (or from another starting point as well, if we so wish — up to 24 hours ago, by default):

# Iterator type TRIM_HORIZON = start reading at oldest record in the shard
# (demo that we can replay from same stream on different device)
shard_it = kinesis.get_shard_iterator(StreamName=”6M”,
ShardId=’shardId-000000000000′,
ShardIteratorType=’TRIM_HORIZON’
)[“ShardIterator”]

out = kinesis.get_records(ShardIterator=shard_it,

{‘Records’: [{‘SequenceNumber’: ‘49640028028587114397338335237477344054568536786832719874’,
‘ApproximateArrivalTimestamp’: datetime.datetime(2023, 4, 20, 19, 23, 56, 649000, tzinfo=tzlocal()),
‘Data’: b'{“location”: “HydePark”, “sensor”: “01”, “timestamp”: “0909092022”, “db”: 85}’,
‘PartitionKey’: ‘partitionkey’},
{‘SequenceNumber’: ‘49640028028587114397338335237478552980388151484726902786’,
‘ApproximateArrivalTimestamp’: datetime.datetime(2023, 4, 20, 19, 23, 56, 858000, tzinfo=tzlocal()),
‘Data’: b'{“location”: “Woodlawn”, “sensor”: “01”, “timestamp”: “0101012022”, “db”: 70}’,
‘PartitionKey’: ‘partitionkey’}],
‘NextShardIterator’: ‘AAAAAAAAAAH3iX1lQiujJ6WEI0qPQNaYeeh1ZMWgGeMg9sKKMAeGPuqp+qwV4lAAlOS6VoeEOsOQOxaspwp1UA3vMt5OZkuIKRO6sfvTTWK4r7XScQ13W5hc8CmfrZkt10+cJUDDQtV8PRi8eYFDllcynvSpBVAWYn9A7WGCa8Ex564/Va+UhnWfjB3+VciLj/r/Vo6LFOVmZns80bjBUnalXUm9ly44’,
‘MillisBehindLatest’: 1000,
‘ResponseMetadata’: {‘RequestId’: ‘c21f537f-79da-da56-9ea4-8aa2f935d6df’,
‘HTTPStatusCode’: 200,
‘HTTPHeaders’: {‘x-amzn-requestid’: ‘c21f537f-79da-da56-9ea4-8aa2f935d6df’,
‘x-amz-id-2’: ‘/x+9wyp75y9pA0EI43cFjvB9dChSgCy+7stUHkbz247A6AEO1/Gl6O97hlJJirQl7CD3oxhpgtJj7B2/U2Z/iGkmOq6RSiVgvEuxrDHqcVM=’,
‘date’: ‘Fri, 21 Apr 2023 00:23:57 GMT’,
‘content-type’: ‘application/x-amz-json-1.1’,
‘content-length’: ‘824’},
‘RetryAttempts’: 0}}

Pretty useful for streaming analysis purposes! Alright, this concludes our treatment of event-driven architectures for today.

Once you’re done, be sure to shut down all the resources that are running as a part of this demonstration to avoid incurring additional charges:

# Delete each pipeline component if it still exists:
for f in [‘6M’, ‘6M-2a’, ‘6M-2b’, ‘6M_Streaming’]:
aws_lambda.delete_function(FunctionName=f)
print(“Lambda Function Deleted”)
except aws_lambda.exceptions.ResourceNotFoundException:
print(“AWS Lambda Function Already Deleted”)

event_source_uuids = [mapping[‘UUID’] for mapping in aws_lambda.list_event_source_mappings()[‘EventSourceMappings’]]
for uuid in event_source_uuids:
aws_lambda.delete_event_source_mapping(UUID=uuid)
print(“Event source mappings deleted”)

queue_urls = []
if ‘QueueUrls’ in sqs.list_queues():
queue_urls = [url for url in sqs.list_queues()[‘QueueUrls’]]
for queue_url in queue_urls:
sqs.delete_queue(QueueUrl=queue_url)
print(“SQS Queue Deleted”)
except sqs.exceptions.QueueDoesNotExist:
print(“SQS Queue Already Deleted”)

# Unsubscribe Queues from topics
subscription_arns = [s[‘SubscriptionArn’]
for s in sns.list_subscriptions()[‘Subscriptions’]
if ‘6M’ in s[‘SubscriptionArn’]]
for subscription_arn in subscription_arns:
sns.unsubscribe(SubscriptionArn=subscription_arn)
print(“Unsubscribed from SNS Topics”)

# Delete Topics
topic_arns = [t[‘TopicArn’]
for t in sns.list_topics()[‘Topics’]
if ‘6M’ in t[‘TopicArn’]]
for topic_arn in topic_arns:
sns.delete_topic(TopicArn=topic_arn)
print(“SNS Topics Deleted”)

# DynamoDB
dynamo_client.delete_table(TableName=’6M_DB’)
print(“DynamoDB Table Deleted”)
except dynamo_client.exceptions.ResourceNotFoundException:
print(“DynamoDB Table Already Deleted”)

# Kinesis Stream
response = kinesis.delete_stream(StreamName=’6M’)
print(“Kinesis Stream Successfully Deleted”)
except kinesis.exceptions.ResourceNotFoundException:
print(“Kinesis Stream Already Deleted”)

Lambda Function Deleted
Event source mappings deleted
SQS Queue Deleted
Unsubscribed from SNS Topics
SNS Topics Deleted
DynamoDB Table Deleted
Kinesis Stream Successfully Deleted