More advanced stuff with JMS and AWS SQS
As you might know SQS in AWS SQS stands for ‘Simple Queue Service’. While playing around with it I recently found one of the reasons why it may be called ‘simple’. In two previous posts (here and here) I showed to use SQS as a JMS queue provider in combination with the Spring Framework. With this basic setup I decided to take it a step further and started to experiment with the request-response pattern in combination with JMS (making use of the JMS Property ‘JMSReplyTo’ and temporary queues). In this rather classic article it is nicely explained how it works and why it works that way.
To show how it should work I first show the setup that I used with Apache ActiveMQ. Let me show the bean that picks the message from a queue, performs an action on the content and send back the reply to the JMSReplyTo in the JMS Header. Since I make use of Spring this sounds harder than it really is. First the Java code:
package net.pascalalma.aws.sqs.requestresponse; import org.springframework.stereotype.Service; @Service public class MyMessageService implements ResponsiveTextMessageDelegate { public String onMessage(String txt) { return String.valueOf(txt.length()); } }
That is quite a simple class I would say. It implements the ResponsiveTextMessageDelegate (the details of this interface are described here) and simply returns the length of the content of the incoming message. All the other things that need to be done are taken care of by the Spring Framework. The Spring config for this service looks like:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"> <context:component-scan base-package="net.pascalalma.aws.sqs.requestresponse"></context:component-scan> <context:annotation-config/> <!-- ActiveMQ config --> <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory"/> </bean> <bean id="requestQueueName" class="java.lang.String"> <constructor-arg value="DefaultDemoQueue"/> </bean> <bean id="myMessageService" class="net.pascalalma.aws.sqs.requestresponse.MyMessageService" /> <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <property name="delegate" ref="myMessageService"/> <property name="defaultListenerMethod" value="onMessage"/> <property name="messageConverter" ref="messageConverter" /> </bean> <bean id="messageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" /> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsFactory"/> <property name="destinationName" ref="requestQueueName"/> <property name="messageListener" ref="messageListener"/> </bean> </beans>
This is mostly the same as the configuration described in my previous post. The only difference is that I now make use of a converter, the SimpleMessageConverter, which takes care of converting the returned String to a TextMessage. If we would not define this converter we would receive the following error:
java.lang.NoSuchMethodException: net.pascalalma.aws.sqs.requestresponse.MyMessageService.onMessage(org.apache.activemq.command.ActiveMQTextMessage
What we need next is a Service client bean that can ‘talk’ to our service. This might look like this in Java:
package net.pascalalma.aws.sqs.requestresponse; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.jms.core.SessionCallback; import org.springframework.jms.support.JmsUtils; import org.springframework.stereotype.Component; import org.springframework.util.Assert; import javax.annotation.Resource; import javax.jms.*; import java.util.Random; @Component public class MyMessageServiceClient { final static Logger logger = Logger.getLogger(MyMessageServiceClient.class); @Resource private JmsTemplate jmsTemplate; @Autowired private String requestQueueName; public String process(final String txt) { //Setup a message producer to send message to the queue the server is consuming from Message response = jmsTemplate.sendAndReceive(requestQueueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage message = session.createTextMessage(); message.setText(txt); return message; } }); String result = null; try { result = ((TextMessage) response).getText(); } catch (JMSException e) { logger.error(e); } return result; } }
What we see is that we make use of the jmsTemplate’s sendAndReceive to send a message created in the MessageCreator callback and wait for the response message. The corresponding Spring config for this class is:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"> <context:component-scan base-package="net.pascalalma.aws.sqs.requestresponse"></context:component-scan> <context:annotation-config/> <!-- ActiveMQ config --> <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> <!-- End ActiveMQ specific --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory"/> </bean> <bean id="requestQueueName" class="java.lang.String"> <constructor-arg value="DefaultDemoQueue"/> </bean> <bean id="myMessageServiceClient" class="net.pascalalma.aws.sqs.requestresponse.MyMessageServiceClient"/> </beans>
What is left now is some ‘container’ to see these beans in actions for which I created a main class for the ‘server’ part:
package net.pascalalma.aws.sqs.requestresponse; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class MessageServiceMain { public static void main(String[] args) { //Build application context by reading spring-config.xml ApplicationContext ctx = new ClassPathXmlApplicationContext(new String[]{"requestresponse/application-context.xml"}); } }
Running this class in your IDE or Terminal just reads the SPring config and instantiates the service beans. The client’s Main class has a little more code:
package net.pascalalma.aws.sqs.requestresponse; import org.apache.log4j.Logger; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import java.util.HashMap; import java.util.Map; import java.util.Random; public class MessageServiceClientMain { final static Logger logger = Logger.getLogger(MessageServiceClientMain.class); public static void main(String[] args) { //Build application context by reading spring-config.xml ApplicationContext ctx = new ClassPathXmlApplicationContext(new String[]{"requestresponse/application-context-client.xml"}); //Get an instance of ProductService class; MyMessageServiceClient messageServiceClient = (MyMessageServiceClient) ctx.getBean("myMessageServiceClient"); //Call getProduct method of ProductService String random = createRandomString(); for (int i=0; i<16; i++) { String key = random.substring(i); logger.info("Sending to service: " + key); logger.info("Sending to service with length: " + key.length()); String result = messageServiceClient.process(key); logger.info("Received from service: " + result); logger.info("======================================================"); } } private static String createRandomString() { Random random = new Random(System.currentTimeMillis()); long randomLong = random.nextLong(); return Long.toHexString(randomLong); } }
Running this class will generate messages and send them to the service and prints the result that is received from the service like this:
2015-04-20 20:29:14 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 42fdcd4355cc5314 2015-04-20 20:29:14 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 16 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 2fdcd4355cc5314 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 15 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 15 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
So far so good. Now let’s use AWS SQS instead of a local Active MQ instance. This is easy to accomplish by simply modify the configuration for the used JmsFactory in both our Spring configurations:
... <bean id="credentialsProviderBean" class="com.amazonaws.auth.DefaultAWSCredentialsProviderChain"/> <bean id="connectionFactoryBuilder" class="com.amazon.sqs.javamessaging.SQSConnectionFactory$Builder"> <property name="regionName" value="eu-west-1"/> <property name="numberOfMessagesToPrefetch" value="5"/> <property name="awsCredentialsProvider" ref="credentialsProviderBean"/> </bean> <bean id="jmsFactory" class="com.amazon.sqs.javamessaging.SQSConnectionFactory" factory-bean="connectionFactoryBuilder" factory-method="build"/> ...
Now if we start the ‘server’ app and the ‘client’ app we get the following output:
2015-04-25 20:22:49 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: f1db848691a26c85 2015-04-25 20:22:49 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16 Exception in thread "main" org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Unsupported Method at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316) at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169) at org.springframework.jms.core.JmsTemplate.executeLocal(JmsTemplate.java:986) at org.springframework.jms.core.JmsTemplate.sendAndReceive(JmsTemplate.java:922) at net.pascalalma.aws.sqs.requestresponse.MyMessageServiceClient.process(MyMessageServiceClient.java:29) at net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain.main(MessageServiceClientMain.java:29) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: javax.jms.JMSException: Unsupported Method at com.amazon.sqs.javamessaging.SQSSession.createTemporaryQueue(SQSSession.java:744) at org.springframework.jms.core.JmsTemplate.doSendAndReceive(JmsTemplate.java:946) at org.springframework.jms.core.JmsTemplate$12.doInJms(JmsTemplate.java:926) at org.springframework.jms.core.JmsTemplate$12.doInJms(JmsTemplate.java:922) at org.springframework.jms.core.JmsTemplate.executeLocal(JmsTemplate.java:983) ... 8 more
As you can see we get a stacktrace telling us that the JMS method ‘createTemporaryQueue’ isn’t supported by SQS! So far for the JMS support. I guess that is why they call it the Simple Queueing Service since only some of the possible JMS method are implemented ;-). I searched for some more info about this but without any luck. However, I did run into this framework: Nevado JMS. They claimed to be a JMS driver for AWS SQS/ SNS so I decided to give that a try. First I added the following dependency to my project’s pom:
<dependency> <groupId>org.skyscreamer</groupId> <artifactId>nevado-jms</artifactId> <version>1.3.1</version> </dependency>
And then modified the JmsFactory in both my Spring configs again, this time to:
... <bean id="sqsConnectorFactory" class="org.skyscreamer.nevado.jms.connector.amazonaws.AmazonAwsSQSConnectorFactory" /> <bean id="jmsFactory" class="org.skyscreamer.nevado.jms.NevadoConnectionFactory"> <property name="sqsConnectorFactory" ref="sqsConnectorFactory" /> <property name="awsAccessKey" value="${aws.accessKey}" /> <property name="awsSecretKey" value="${aws.secretKey}" /> </bean> ...
Now when I ran the main classes I got the expected result back:
2015-04-25 20:33:27 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: dad74fbff8e0a2f2 2015-04-25 20:33:27 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16 2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 16 2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: ad74fbff8e0a2f2 2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 15 2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 15 2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: d74fbff8e0a2f2 2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 14 2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 14 2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 74fbff8e0a2f2 2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 13 2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 13 2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 4fbff8e0a2f2 2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 12 2015-04-25 20:34:21 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 12 2015-04-25 20:34:21 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
So this shows that the more advanced stuff is still possible with the so called ‘Simple’ services although it takes some help from the community here and there :-)
Reference: | More advanced stuff with JMS and AWS SQS from our JCG partner Pascal Alma at the The Pragmatic Integrator blog. |