Integrating with RabbitMQ using Spring Cloud Stream
In my previous post I wrote about a very simple integration scenario between two systems – one generating a work unit and another processing that work unit and how Spring Integration makes such integration very easy.
Here I will demonstrate how this integration scenario can be simplified even further using Spring Cloud Stream
I have the sample code available here – the right maven dependencies for Spring Cloud Stream is available in the pom.xml.
Producer
So again starting with the producer responsible for generating the work units. All that needs to be done code wise to send messages to RabbitMQ is to have a java configuration along these lines:
@Configuration @EnableBinding(WorkUnitsSource.class) @IntegrationComponentScan public class IntegrationConfiguration {}
This looks deceptively simple but does a lot under the covers, from what I can understand and glean from the documentation these are what this configuration triggers:
1. Spring Integration message channels based on the classes that are bound to the @EnableBinding annotation are created. The WorkUnitsSource class above is the definition of a custom channel called “worksChannel” and looks like this:
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface WorkUnitsSource { String CHANNEL_NAME = "worksChannel"; @Output MessageChannel worksChannel(); }
2. Based on which “binder” implementation is available at runtime(say RabbitMQ, Kaffka, Redis, Gemfire), the channel in the previous step will be connected to the appropriate structures in the system – so for eg, I am want my “worksChannel” to in turn send messages to RabbitMQ, Spring Cloud Stream would take care of automatically creating a topic exchange in RabbitMQ
I wanted some further customizations in terms of how the data is sent to RabbitMQ – specifically I wanted my domain objects to be serialized to json before being sent across and I want to specify the name of the RabbitMQ exchange that the payload is sent to, this is controlled by certain configurations that can be attached to the channel the following way using a yaml file:
spring: cloud: stream: bindings: worksChannel: destination: work.exchange contentType: application/json group: testgroup
One final detail is a way for the rest of the application to interact with Spring Cloud Stream, this can be done directly in Spring Integration by defining a message gateway:
import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.MessagingGateway; import works.service.domain.WorkUnit; @MessagingGateway public interface WorkUnitGateway { @Gateway(requestChannel = WorkUnitsSource.CHANNEL_NAME) void generate(WorkUnit workUnit); }
That is essentially it, Spring Cloud Stream would now wire up the entire Spring integration flow, create the appropriate structures in RabbitMQ.
Consumer
Similar to the Producer, first I want to define the channel called “worksChannel” which would handle the incoming message from RabbitMQ:
import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface WorkUnitsSink { String CHANNEL_NAME = "worksChannel"; @Input SubscribableChannel worksChannel(); }
and let Spring Cloud Stream create the channels and RabbitMQ bindings based on this definition:
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.context.annotation.Configuration; @Configuration @EnableBinding(WorkUnitsSink.class) public class IntegrationConfiguration {}
To process the messages, Spring Cloud Stream provides a listener which can be created the following way:
@Service public class WorkHandler { private static final Logger LOGGER = LoggerFactory.getLogger(WorkHandler.class); @StreamListener(WorkUnitsSink.CHANNEL_NAME) public void process(WorkUnit workUnit) { LOGGER.info("Handling work unit - id: {}, definition: {}", workUnit.getId(), workUnit.getDefinition()); } }
And finally the configuration which connects this channel to the RabbitMQ infrastructure expressed in a yaml file:
spring: cloud: stream: bindings: worksChannel: destination: work.exchange group: testgroup
Now if the producer and any number of consumers were started up, the message sent via the producer would be sent to a Rabbit MQ topic exchange as a json, retrieved by the consumer, deserialized to an object and passed to the work processor.
A good amount of the boiler plate involved in creating the RabbitMQ infrastructure is now handled purely by convention by the Spring Cloud Stream libraries. Though Spring Cloud Stream attempts to provide a facade over the raw Spring Integration, it is useful to have a basic knowledge of Spring integration to use Spring Cloud Stream effectively.
The sample described here is available at my github repository
Reference: | Integrating with RabbitMQ using Spring Cloud Stream from our JCG partner Biju Kunjummen at the all and sundry blog. |
Hi,
In your post you’ve mentioned, “I want my “worksChannel” to in turn send messages to RabbitMQ, Spring Cloud Stream would take care of automatically creating a topic exchange in RabbitMQ”, in my case I’m using Kafka and the messages are getting sent over the channel and I’m able to receive the messages as well, but I’m not seeing any messages on the Kafka topic. Can you guide a bit on that?