Software Development

Serverless: Building a mini producer/consumer data pipeline with AWS SNS

I wanted to create a little data pipeline with Serverless whose main use would be to run once a day, call an API, and load that data into a database.

It’s mostly used to pull in recent data from that API, but I also wanted to be able to invoke it manually and specify a date range.

I created the following pair of lambdas that communicate with each other via an SNS topic.

The code

serverless.yml

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
service: marks-blog
 
frameworkVersion: ">=1.2.0 <2.0.0"
 
provider:
  name: aws
  runtime: python3.6
  timeout: 180
  iamRoleStatements:
    - Effect: 'Allow'
      Action:
        - "sns:Publish"
      Resource:
        - ${self:custom.BlogTopic}
 
custom:
  BlogTopic:
    Fn::Join:
      - ":"
      - - arn
        - aws
        - sns
        - Ref: AWS::Region
        - Ref: AWS::AccountId
        - marks-blog-topic
 
functions:
  message-consumer:
    name: MessageConsumer
    handler: handler.consumer
    events:
      - sns:
          topicName: marks-blog-topic
          displayName: Topic to process events
  message-producer:
    name: MessageProducer
    handler: handler.producer
    events:
      - schedule: rate(1 day)

handler.py

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import boto3
import json
import datetime
from datetime import timezone
  
def producer(event, context):
    sns = boto3.client('sns')
  
    context_parts = context.invoked_function_arn.split(':')
    topic_name = "marks-blog-topic"
    topic_arn = "arn:aws:sns:{region}:{account_id}:{topic}".format(
        region=context_parts[3], account_id=context_parts[4], topic=topic_name)
  
    now = datetime.datetime.now(timezone.utc)
    start_date = (now - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
    end_date = now.strftime("%Y-%m-%d")
  
    params = {"startDate": start_date, "endDate": end_date, "tags": ["neo4j"]}
  
    sns.publish(TopicArn= topic_arn, Message= json.dumps(params))
  
  
def consumer(event, context):
    for record in event["Records"]:
        message = json.loads(record["Sns"]["Message"])
  
        start_date = message["startDate"]
        end_date = message["endDate"]
        tags = message["tags"]
  
        print("start_date: " + start_date)
        print("end_date: " + end_date)
        print("tags: " + str(tags))

Trying it out

We can simulate a message being received locally by executing the following command:

1
2
3
4
5
6
7
8
$ serverless invoke local \
    --function message-consumer \
    --data '{"Records":[{"Sns": {"Message":"{\"tags\": [\"neo4j\"], \"startDate\": \"2017-09-25\", \"endDate\": \"2017-09-29\"  }"}}]}'
  
start_date: 2017-09-25
end_date: 2017-09-29
tags: ['neo4j']
null

That seems to work fine. What about if we invoke the message-producer on AWS?

1
2
3
$ serverless invoke --function message-producer
  
null

Did the consumer received the message?

1
2
3
4
5
6
7
8
$ serverless logs --function message-consumer
  
START RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Version: $LATEST
start_date: 2017-09-29
end_date: 2017-09-30
tags: ['neo4j']
END RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f
REPORT RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f  Duration: 0.46 ms   Billed Duration: 100 ms     Memory Size: 1024 MB    Max Memory Used: 32 MB

Looks like it! We can also invoke the consumer directly on AWS:

1
2
3
4
5
$ serverless invoke \
    --function message-consumer \
    --data '{"Records":[{"Sns": {"Message":"{\"tags\": [\"neo4j\"], \"startDate\": \"2017-09-25\", \"endDate\": \"2017-09-26\"  }"}}]}'
  
null

And now if we check the consumer’s logs we’ll see both messages:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
$ serverless logs --function message-consumer
  
START RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Version: $LATEST
start_date: 2017-09-29
end_date: 2017-09-30
tags: ['neo4j']
END RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f
REPORT RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f  Duration: 0.46 ms   Billed Duration: 100 ms     Memory Size: 1024 MB    Max Memory Used: 32 MB 
  
START RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3ed Version: $LATEST
start_date: 2017-09-25
end_date: 2017-09-26
tags: ['neo4j']
END RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3ed
REPORT RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3ed  Duration: 16.46 ms  Billed Duration: 100 ms     Memory Size: 1024 MB    Max Memory Used: 32 MB

Success!

Published on Java Code Geeks with permission by Mark Needham, partner at our JCG program. See the original article here: Serverless: Building a mini producer/consumer data pipeline with AWS SNS

Opinions expressed by Java Code Geeks contributors are their own.

Subscribe
Notify of
guest


This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button