Serverless: Building a mini producer/consumer data pipeline with AWS SNS
I wanted to create a little data pipeline with Serverless whose main use would be to run once a day, call an API, and load that data into a database.
It’s mostly used to pull in recent data from that API, but I also wanted to be able to invoke it manually and specify a date range.
I created the following pair of lambdas that communicate with each other via an SNS topic.
The code
serverless.yml
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | service: marks-blog frameworkVersion: ">=1.2.0 <2.0.0" provider: name: aws runtime: python3. 6 timeout: 180 iamRoleStatements: - Effect: 'Allow' Action: - "sns:Publish" Resource: - ${self:custom.BlogTopic} custom: BlogTopic: Fn::Join: - ":" - - arn - aws - sns - Ref: AWS::Region - Ref: AWS::AccountId - marks-blog-topic functions: message-consumer: name: MessageConsumer handler: handler.consumer events: - sns: topicName: marks-blog-topic displayName: Topic to process events message-producer: name: MessageProducer handler: handler.producer events: - schedule: rate( 1 day) |
handler.py
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | import boto3 import json import datetime from datetime import timezone def producer(event, context): sns = boto3.client( 'sns' ) context_parts = context.invoked_function_arn.split( ':' ) topic_name = "marks-blog-topic" topic_arn = "arn:aws:sns:{region}:{account_id}:{topic}" .format( region=context_parts[ 3 ], account_id=context_parts[ 4 ], topic=topic_name) now = datetime.datetime.now(timezone.utc) start_date = (now - datetime.timedelta(days= 1 )).strftime( "%Y-%m-%d" ) end_date = now.strftime( "%Y-%m-%d" ) params = { "startDate" : start_date, "endDate" : end_date, "tags" : [ "neo4j" ]} sns.publish(TopicArn= topic_arn, Message= json.dumps(params)) def consumer(event, context): for record in event[ "Records" ]: message = json.loads(record[ "Sns" ][ "Message" ]) start_date = message[ "startDate" ] end_date = message[ "endDate" ] tags = message[ "tags" ] print( "start_date: " + start_date) print( "end_date: " + end_date) print( "tags: " + str(tags)) |
Trying it out
We can simulate a message being received locally by executing the following command:
1 2 3 4 5 6 7 8 | $ serverless invoke local \ -- function message-consumer \ --data '{"Records":[{"Sns": {"Message":"{\"tags\": [\"neo4j\"], \"startDate\": \"2017-09-25\", \"endDate\": \"2017-09-29\" }"}}]}' start_date: 2017-09-25 end_date: 2017-09-29 tags: [ 'neo4j' ] null |
That seems to work fine. What about if we invoke the message-producer on AWS?
1 2 3 | $ serverless invoke -- function message-producer null |
Did the consumer received the message?
1 2 3 4 5 6 7 8 | $ serverless logs -- function message-consumer START RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Version: $LATEST start_date: 2017-09-29 end_date: 2017-09-30 tags: [ 'neo4j' ] END RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f REPORT RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Duration: 0.46 ms Billed Duration: 100 ms Memory Size: 1024 MB Max Memory Used: 32 MB |
Looks like it! We can also invoke the consumer directly on AWS:
1 2 3 4 5 | $ serverless invoke \ -- function message-consumer \ --data '{"Records":[{"Sns": {"Message":"{\"tags\": [\"neo4j\"], \"startDate\": \"2017-09-25\", \"endDate\": \"2017-09-26\" }"}}]}' null |
And now if we check the consumer’s logs we’ll see both messages:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 | $ serverless logs -- function message-consumer START RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Version: $LATEST start_date: 2017-09-29 end_date: 2017-09-30 tags: [ 'neo4j' ] END RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f REPORT RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Duration: 0.46 ms Billed Duration: 100 ms Memory Size: 1024 MB Max Memory Used: 32 MB START RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3ed Version: $LATEST start_date: 2017-09-25 end_date: 2017-09-26 tags: [ 'neo4j' ] END RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3ed REPORT RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3ed Duration: 16.46 ms Billed Duration: 100 ms Memory Size: 1024 MB Max Memory Used: 32 MB |
Success!
Published on Java Code Geeks with permission by Mark Needham, partner at our JCG program. See the original article here: Serverless: Building a mini producer/consumer data pipeline with AWS SNS Opinions expressed by Java Code Geeks contributors are their own. |