Enterprise Java

Integrating with RabbitMQ using Spring Cloud Stream

In my previous post I wrote about a very simple integration scenario between two systems – one generating a work unit and another processing that work unit and how Spring Integration makes such integration very easy.

WorkUnitsFlow

Here I will demonstrate how this integration scenario can be simplified even further using Spring Cloud Stream

I have the sample code available here – the right maven dependencies for Spring Cloud Stream is available in the pom.xml.

Producer

So again starting with the producer responsible for generating the work units. All that needs to be done code wise to send messages to RabbitMQ is to have a java configuration along these lines:

@Configuration
@EnableBinding(WorkUnitsSource.class)
@IntegrationComponentScan
public class IntegrationConfiguration {}

This looks deceptively simple but does a lot under the covers, from what I can understand and glean from the documentation these are what this configuration triggers:

1. Spring Integration message channels based on the classes that are bound to the @EnableBinding annotation are created. The WorkUnitsSource class above is the definition of a custom channel called “worksChannel” and looks like this:

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface WorkUnitsSource {

    String CHANNEL_NAME = "worksChannel";

    @Output
    MessageChannel worksChannel();

}

2. Based on which “binder” implementation is available at runtime(say RabbitMQ, Kaffka, Redis, Gemfire), the channel in the previous step will be connected to the appropriate structures in the system – so for eg, I am want my “worksChannel” to in turn send messages to RabbitMQ, Spring Cloud Stream would take care of automatically creating a topic exchange in RabbitMQ

I wanted some further customizations in terms of how the data is sent to RabbitMQ – specifically I wanted my domain objects to be serialized to json before being sent across and I want to specify the name of the RabbitMQ exchange that the payload is sent to, this is controlled by certain configurations that can be attached to the channel the following way using a yaml file:

spring:
  cloud:
    stream:
      bindings:
        worksChannel:
          destination: work.exchange
          contentType: application/json
          group: testgroup

One final detail is a way for the rest of the application to interact with Spring Cloud Stream, this can be done directly in Spring Integration by defining a message gateway:

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import works.service.domain.WorkUnit;

@MessagingGateway
public interface WorkUnitGateway {
 @Gateway(requestChannel = WorkUnitsSource.CHANNEL_NAME)
 void generate(WorkUnit workUnit);

}

That is essentially it, Spring Cloud Stream would now wire up the entire Spring integration flow, create the appropriate structures in RabbitMQ.

Consumer

Similar to the Producer, first I want to define the channel called “worksChannel” which would handle the incoming message from RabbitMQ:

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface WorkUnitsSink {
    String CHANNEL_NAME = "worksChannel";

    @Input
    SubscribableChannel worksChannel();
}

and let Spring Cloud Stream create the channels and RabbitMQ bindings based on this definition:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBinding(WorkUnitsSink.class)
public class IntegrationConfiguration {}

To process the messages, Spring Cloud Stream provides a listener which can be created the following way:

@Service
public class WorkHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(WorkHandler.class);

    @StreamListener(WorkUnitsSink.CHANNEL_NAME)
    public void process(WorkUnit workUnit) {
        LOGGER.info("Handling work unit - id: {}, definition: {}", workUnit.getId(), workUnit.getDefinition());
    }
}

And finally the configuration which connects this channel to the RabbitMQ infrastructure expressed in a yaml file:

spring:
  cloud:
    stream:
      bindings:
        worksChannel:
          destination: work.exchange
          group: testgroup

Now if the producer and any number of consumers were started up, the message sent via the producer would be sent to a Rabbit MQ topic exchange as a json, retrieved by the consumer, deserialized to an object and passed to the work processor.

A good amount of the boiler plate involved in creating the RabbitMQ infrastructure is now handled purely by convention by the Spring Cloud Stream libraries. Though Spring Cloud Stream attempts to provide a facade over the raw Spring Integration, it is useful to have a basic knowledge of Spring integration to use Spring Cloud Stream effectively.

The sample described here is available at my github repository

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Sheetal D
Sheetal D
7 years ago

Hi,

In your post you’ve mentioned, “I want my “worksChannel” to in turn send messages to RabbitMQ, Spring Cloud Stream would take care of automatically creating a topic exchange in RabbitMQ”, in my case I’m using Kafka and the messages are getting sent over the channel and I’m able to receive the messages as well, but I’m not seeing any messages on the Kafka topic. Can you guide a bit on that?

Back to top button