step functions

step_functions

Using Step Functions to Orchestrate Massively Parallel Batch Jobs¶
You’ll recall from our serverless demonstration a couple of weeks ago, that we can upload Lambda functions programmatically:

import boto3
import json
import time
import requests
from concurrent.futures import ThreadPoolExecutor
import matplotlib.pyplot as plt

# Access our class IAM role, which allows Lambda
# to interact with other AWS resources
aws_lambda = boto3.client(‘lambda’)
iam_client = boto3.client(‘iam’)
role = iam_client.get_role(RoleName=’LabRole’)

# Open zipped directory
with open(‘hello_world.zip’, ‘rb’) as f:
lambda_zip = f.read()

# If function hasn’t yet been created, create it
response = aws_lambda.create_function(
FunctionName=’hello_world_programmatic’,
Runtime=’python3.9′,
Role=role[‘Role’][‘Arn’],
Handler=’lambda_function.lambda_handler’,
Code=dict(ZipFile=lambda_zip),
Timeout=300
except aws_lambda.exceptions.ResourceConflictException:
# If function already exists, update it based on zip
# file contents
response = aws_lambda.update_function_code(
FunctionName=’hello_world_programmatic’,
ZipFile=lambda_zip

# collect ARN for later use in Step Function state machine
lambda_arn = response[‘FunctionArn’]

And that Lambda will scale out automatically to meet concurrent demand, meaning that it will automatically parallelize based on how many concurrent invocations it receives:

# 1. write function to invoke our function for us and pass in data:
def invoke_function(data):
r = aws_lambda.invoke(FunctionName=’hello_world_programmatic’,
InvocationType=’RequestResponse’,
Payload=json.dumps(data))
return json.loads(r[‘Payload’].read())

# 2. Demo that lambda function will scale out if called concurrently on different threads locally
test_data = {‘key1’: 1, ‘key2’: 2}
with ThreadPoolExecutor(max_workers=10) as executor:
results = executor.map(invoke_function, [test_data for _ in range(10)])

# 3. In AWS Console: confirm that we had concurrent executions (takes a few seconds to update)
# Same results too:
[result for result in results]

[3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

This capacity to scale based on concurrent demand makes Lambda functions great for event-driven workflows (which we talked about earlier in the week).

For batch-job types of tasks, though, we should ideally be able to scale out to as many available Lambda workers as possible (i.e. thousands of concurrent function invocations on different segments of a dataset — a serverless domain decomposition) and not be limited by our local resources.

This is a where AWS Step Functions can be very useful — orchestrating large, embarrassingly parallel code execution across many Lambda workers with very little code (all we need to do is specify a model for how our Lambda Function should be invoked!). Specifically, we’ll be using the “map” state to scatter input data to multiple Lambda workers at the same time and selecting the express workflow Step Function option to a run our short job synchronously.

(Demonstrate how to incorporate Lambda Function into Step Function workflow via graphical model in console after launching state machine via programmatic approach below).

sfn = boto3.client(‘stepfunctions’)

def make_def(lambda_arn):
definition = {
“Comment”: “My State Machine”,
“StartAt”: “Map”,
“States”: {
“Type”: “Map”,
“End”: True,
“Iterator”: {
“StartAt”: “Lambda Invoke”,
“States”: {
“Lambda Invoke”: {
“Type”: “Task”,
“Resource”: “arn:aws:states:::lambda:invoke”,
“OutputPath”: “$.Payload”,
“Parameters”: {
“Payload.$”: “$”,
“FunctionName”: lambda_arn
“Retry”: [
“ErrorEquals”: [
“Lambda.ServiceException”,
“Lambda.AWSLambdaException”,
“Lambda.SdkClientException”,
“Lambda.TooManyRequestsException”,
“States.TaskFailed”
“IntervalSeconds”: 2,
“MaxAttempts”: 6,
“BackoffRate”: 2
“End”: True
return definition

sf_def = make_def(lambda_arn)

response = sfn.create_state_machine(
name=’hello_world_sm’,
definition=json.dumps(sf_def),
roleArn=role[‘Role’][‘Arn’],
type=’EXPRESS’
except sfn.exceptions.StateMachineAlreadyExists:
response = sfn.list_state_machines()
state_machine_arn = [sm[‘stateMachineArn’]
for sm in response[‘stateMachines’]
if sm[‘name’] == ‘hello_world_sm’][0]
response = sfn.update_state_machine(
stateMachineArn=state_machine_arn,
definition=json.dumps(sf_def),
roleArn=role[‘Role’][‘Arn’]

response = sfn.list_state_machines()
print(response)

{‘stateMachines’: [{‘stateMachineArn’: ‘arn:aws:states:us-east-1:230488219088:stateMachine:hello_world_sm’, ‘name’: ‘hello_world_sm’, ‘type’: ‘EXPRESS’, ‘creationDate’: datetime.datetime(2023, 4, 10, 22, 2, 39, 239000, tzinfo=tzlocal())}, {‘stateMachineArn’: ‘arn:aws:states:us-east-1:230488219088:stateMachine:word_count_sm’, ‘name’: ‘word_count_sm’, ‘type’: ‘EXPRESS’, ‘creationDate’: datetime.datetime(2023, 4, 10, 22, 24, 39, 669000, tzinfo=tzlocal())}], ‘ResponseMetadata’: {‘RequestId’: ‘5cf223a7-e6e5-40fe-83a4-01d62d0cad41’, ‘HTTPStatusCode’: 200, ‘HTTPHeaders’: {‘x-amzn-requestid’: ‘5cf223a7-e6e5-40fe-83a4-01d62d0cad41’, ‘date’: ‘Mon, 24 Apr 2023 18:24:23 GMT’, ‘content-type’: ‘application/x-amz-json-1.0’, ‘content-length’: ‘339’}, ‘RetryAttempts’: 0}}

# Get arn for Step Function state machine
state_machine_arn = [sm[‘stateMachineArn’]
for sm in response[‘stateMachines’]
if sm[‘name’] == ‘hello_world_sm’][0]

# generate test data to pass as input
# “Map” will automatically invoke a separate Lambda function
# to process each dictionary in the list (50 concurrently)
data = [{“key1”: 1, “key2”: 2} for _ in range(50)]

Once we have the identifier (ARN) for our Step Function state machine, we can pass in input data in JSON format. We have two options for execution — synchronous execution (e.g. our notebook will wait for a response from AWS before moving on to the next cell), and asynchronous execution (which we might want to use if our Lambda functions were writing results to a cloud database and we don’t need to wait for execution to finish before moving on with our code).

# Synchronous Execution
response = sfn.start_sync_execution(
stateMachineArn=state_machine_arn,
name=’sync_test’,
input=json.dumps(data)

print(response[‘output’])

[3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3]

# Async; perhaps writing results to db and don’t need to wait for execution to finish before moving on with code
response = sfn.start_execution(
stateMachineArn=state_machine_arn,
name=’async_test’,
input=json.dumps(data)

print(response) # no results returned for async option
# Can go into logs in Cloud Watch and see execution results (Express SF workflow)
# Note that Standard Step Function workflow allows us to audit results via Boto3)

{‘executionArn’: ‘arn:aws:states:us-east-1:230488219088:express:hello_world_sm:async_test:f9323c5e-92ab-4ac3-adf7-40a513fa9851’, ‘startDate’: datetime.datetime(2023, 4, 24, 13, 24, 29, 105000, tzinfo=tzlocal()), ‘ResponseMetadata’: {‘RequestId’: ‘712db404-3138-4988-9440-4b0832e6bce2’, ‘HTTPStatusCode’: 200, ‘HTTPHeaders’: {‘x-amzn-requestid’: ‘712db404-3138-4988-9440-4b0832e6bce2’, ‘date’: ‘Mon, 24 Apr 2023 18:24:29 GMT’, ‘content-type’: ‘application/x-amz-json-1.0’, ‘content-length’: ‘156’}, ‘RetryAttempts’: 0}}

Using AWS Lambda to make HTTP Requests in Parallel¶
A common task for computational social scientists is making HTTP requests to access and process web data (i.e. to make API requests and scrape web pages). It can be quite limiting to make these requests serially, though. When we do, the amount of data we are able to collect is limited both by our internet bandwidth and machine’s ability to sequentially process the data.

It would be much better to parallelize this workflow. Here, we’ll take a look at how we could do this in a serverless fashion using the same workflow as above (Mapping Data to Lambda Functions via Step Functions). Specifically, we will call the Google Books API in parallel on a list of ISBNs (one API call can be constructed like this), calculating the number of words used in the description for each book in our ISBN list (see workflow below). Word count is a simple metric (used here as a proof-of-concept), but it would also be possible to perform other Natural Language Processing routines using this same approach. Such a cloud workflow allows us to gather and process far greater amounts of data than would be otherwise possible on our local machines.

First, we’ll need to load in a list of ISBNs (provided in this directory):

with open(‘isbn.txt’) as file:
isbn_list = [isbn.strip() for isbn in file]

Then, we can write a function that will take in a list of ISBNs and compute the the number of words in the description of the corresponding book:

def get_desc_wc(isbn_list):
Takes in a list of ISBNs and returns a list of description
word counts corresponding to each ISBN (via the Google
Books API).
url = “https://www.googleapis.com/books/v1/volumes?q=isbn:”
wc_list = []
for isbn in isbn_list:
r = requests.get(url + isbn)
data = r.json()
# Try to get description, but if there is none, set
# word count to be 0 for that book
description = data[‘items’][0][‘volumeInfo’][‘description’]
wc_list.append(len(description.split()))
except KeyError:
wc_list.append(0)
return wc_list

We can then call our function and it will sequentially request information and calculate the description word count for each one of our ISBNs in the list:

start = time.time()
wc_list = get_desc_wc(isbn_list)
time_elapsed = time.time() – start

print(“Time elapsed (in seconds) – Serial: “, time_elapsed)

plt.hist(wc_list);

Time elapsed (in seconds) – Serial: 80.51322746276855

This is a bit slow (exactly how slow will be variable and heavily based on your internet connection!), though, and could benefit from parallelization. One way we can do this in a “serverless” fashion is by using AWS Step Functions to invoke many AWS Lambda functions to make these ISBN API calls (and calculate the description word count) in parallel.

In a personal AWS account, you can make up to 3000 concurrent Lambda invocations in an initial burst, scaling by an additional 500 instances each minute that your code runs (until your account’s concurrency limit is reached). Note, though, that we’re limited in our AWS Academy accounts in the number of concurrent Lambda invocations we can make. As a result, we won’t be able to see the same scalability as we would see in a personal account, but you can at least get the idea of how this pipeline can be constructed.

To make this work, we’ll do the same thing we did above in our “Hello World” case — using Step Functions to invoke multiple Lambda functions in parallel and then return the output of the executions synchronously to our local machine.

For our Lambda function, we can write a lambda_handler that uses our API request function above like so (to be run on mini-batches of the overall isbn list):

import requests

def get_desc_wc(isbn_list):
Takes in a list of ISBNs and returns a list of description
word counts corresponding to each ISBN (via the Google
Books API).
url = “https://www.googleapis.com/books/v1/volumes?q=isbn:”

wc_list = []
for isbn in isbn_list:
r = requests.get(url + isbn)
data = r.json()
# Try to get description, but if there is none, set
# word count to be 0 for that book
description = data[‘items’][0][‘volumeInfo’][‘description’]
wc_list.append(len(description.split()))
except KeyError:
wc_list.append(0)
return wc_list

def lambda_handler(event, context):
wc = get_desc_wc(event[‘isbn’])

The only tricky thing with running this particular code as a Lambda function is that we are importing in the requests module in order to make HTTP requests. Such dependencies are not included by default in Lambda’s Python runtimes.

In order to work with dependencies, we’ll need to package our Lambda Function with all of its dependencies (which you can do manually like so or by adding a predefined “layer” defined by a 3rd-party via its ARN in the Lambda dashboard). Note as well that you also can work with Docker container images (AWS provides up-to-date base images for Lambda) to test and upload your function to AWS. The Docker approach is not necessary for any of the assignments/exam in the class, but you may need to explore this option if you use this approach in your own research (i.e. if you’re working with an especially big model, or large software not already installed on Lambda, our normal Lambda function creation workflow will not work).

Using the manual workflow linked above, we have included a zipped version of the Lambda function (with its dependencies) in this directory (word_count.zip) that you can upload and use (walk through directory structure live in class). You’ll need to do the same if you want to provide added functionality to your Lambda functions (e.g. BeautifulSoup for web-scraping, numpy or pandas for analytical workflows, etc.). We’ll also need to increase our lambda timeout so that we have enough time for our program to run to perform longer running jobs (the default is a 3s timeout).

Let’s use boto3 this time to create our Lambda Function from our zipped word_count.zip (to demonstrate that we can do this programmatically as well as in the console as we did earlier):

# Open our Zipped directory
with open(‘word_count.zip’, ‘rb’) as f:
lambda_zip = f.read()

# If function hasn’t yet been created, create it
response = aws_lambda.create_function(
FunctionName=’word_count’,
Runtime=’python3.9′,
Role=role[‘Role’][‘Arn’],
Handler=’lambda_function.lambda_handler’,
Code=dict(ZipFile=lambda_zip),
Timeout=300
except aws_lambda.exceptions.ResourceConflictException:
# If function already exists, update it based on zip
# file contents
response = aws_lambda.update_function_code(
FunctionName=’word_count’,
ZipFile=lambda_zip

lambda_arn = response[‘FunctionArn’]

Once we have our function, we can incorporate it into a Step Function state machine, like we did in our Hello World example:

sf_def = make_def(lambda_arn)

response = sfn.create_state_machine(
name=’word_count_sm’,
definition=json.dumps(sf_def),
roleArn=role[‘Role’][‘Arn’],
type=’EXPRESS’
except sfn.exceptions.StateMachineAlreadyExists:
response = sfn.list_state_machines()
state_machine_arn = [sm[‘stateMachineArn’]
for sm in response[‘stateMachines’]
if sm[‘name’] == ‘word_count_sm’][0]
response = sfn.update_state_machine(
stateMachineArn=state_machine_arn,
definition=json.dumps(sf_def),
roleArn=role[‘Role’][‘Arn’]

Once our Step Function is set, we’re ready to provide data that will be spread across our Lambda worker invocations. Remember, AWS Academy will throttle our Lambda invocations (well below the 3000 concurrent requests we can make in a personal account), so we subdivide our list of ISBNs into 50 equal batches:

n = 500 // 50 # subdivide list of ISBNs into 50 equal batches
isbn_batches = [{‘isbn’: isbn_list[i:i + n]} for i in range(0, len(isbn_list), n)]

# 50 lists of 10 ISBNs
print(len(isbn_batches), len(isbn_batches[0][‘isbn’]))

Now, let’s get our Step Function state machine arn and pass in our data:

# Get arn for Step Function state machine
response = sfn.list_state_machines()
state_machine_arn = [sm[‘stateMachineArn’]
for sm in response[‘stateMachines’]
if sm[‘name’] == ‘word_count_sm’][0]

# Spread ISBN batches across Lambda workers
start = time.time()
response = sfn.start_sync_execution(
stateMachineArn=state_machine_arn,
name=’isbn_500′,
input=json.dumps(isbn_batches)
time_elapsed = time.time() – start

print(time_elapsed)

4.21414041519165

This is ~20x faster than the serial solution (meaning we can gather more data in the same amount of time, re: Gustafson’s Law!). If we ran this same code on larger sets of ISBNs in a personal account, we would expect our compute time to remain similar, so long as we stay beneath the 3000 concurrent Lambda invocation maximum.

So, we can quickly parallelize workflows in a serverless fashion using a combination of AWS Lambda and Step Functions. You can also create even more complicated workflows in Step Functions involving functional decomposition, multiple layers of Lambda function invocations, and more.

One thing to note with the workflow above is that the payload size for inputs and outputs is not infinite (Step Functions will only accept inputs and outputs of size 262 KB or less). So, if you are working with really big inputs and/or outputs, you will want to input data from AWS cloud databases/distributed storage systems (e.g. providing the keys to the data you’re referencing in the input JSON) and output data to a database/storage system within your Lambda function invocation as well.