Enterprise Java

RxJava + Java8 + Java EE 7 + Arquillian = Bliss

firefighting-reactive-management_DigitalStorm_226x150

Microservices are an architectural style where each service is implemented as an independent system. They can use their own persistence system (although it is not mandatory), deployment, language, …

Because a system is composed by more than one service, each service will communicate with other services, typically using a lightweight protocol like HTTP and following a Restful Web approach. You can read more about microservices here: http://martinfowler.com/articles/microservices.html

Let’s see a really simple example. Suppose we have a booking shop where users can navigate through a catalog and when they find a book which they want to see more information, they click on the isbn, and then a new screen is opened with detailed information of the book and comments about it written by readers.

This system may be composed by two services:

  • One service to get book details. They could be retrieved from any legacy system like a RDBMS.
  • One service to get all comments written in a book and in this case that information could be stored in a document base database.

The problem here is that for each request that a user does we need to open two connections, one for each service. Of course we need a way do that jobs in parallel to improve the performance. And here lies one problem, how we can deal with this asynchronous requests? The first idea is to use Future class. For two services may be good but if you require four or five services the code will become more and more complex, or for example you may need to get data from one service and using it in another services or adapt the result of one service to be the input of another one. So there is a cost of management of threads and synchronization.

It will be awesome to have some way to deal with this problem in a clean and easy way. And this is exactly what RxJava does. RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

With RxJava instead of pulling data from an structure, data is pushed to it which reacts with an event that are listened by a subscriber and acts accordantly. You can find more information in https://github.com/Netflix/RxJava.

So in this case what we are going to implement is the example described here using RxJava, Java EE 7, Java 8 and Arquillian for testing.

This post assumes you know how to write Rest services using Java EE specification.

So let’s start with two services:

 @Singleton
@Path("bookinfo")
public class BookInfoService {

    @GET
    @Path("{isbn}")
    @Produces(MediaType.APPLICATION_JSON)
    @Consumes(MediaType.APPLICATION_JSON)
    public JsonObject findBookByISBN(@PathParam("isbn") String isbn) {

        return Json.createObjectBuilder()
            .add("author", "George R.R. Martin")
            .add("isbn", "1111")
            .add("title", "A Game Of Thrones").build();
    }

}
@Singleton
@Path("comments")
public class CommentsService {

    @GET
    @Path("{isbn}")
    @Produces(MediaType.APPLICATION_JSON)
    public JsonArray bookComments(@PathParam("isbn") String isbn) {

        return Json.createArrayBuilder().add("Good Book").add("Awesome").build();

    }

}
@ApplicationPath("rest")
public class ApplicationResource extends Application {
}

And finally it is time to create a third facade service which receives communication from the client, sends to both services in parallel a request and finally zip both responses. zip is the process of combining sets of items emitted together via a specified function and sent it back to client (not to be confused with compression!).

@Singleton
@Path("book")
public class BookService {

    private static final String BOOKSERVICE = "http://localhost:8080/bookservice";
    private static final String COMMENTSERVICE = "http://localhost:8080/bookcomments";

    @Resource(name = "DefaultManagedExecutorService")
    ManagedExecutorService executor;

    Client bookServiceClient;
    WebTarget bookServiceTarget;

    Client commentServiceClient;
    WebTarget commentServiceTarget;

    @PostConstruct
    void initializeRestClients() {

        bookServiceClient = ClientBuilder.newClient();
        bookServiceTarget = bookServiceClient.target(BOOKSERVICE + "/rest/bookinfo");

        commentServiceClient = ClientBuilder.newClient();
        commentServiceTarget = commentServiceClient.target(COMMENTSERVICE + "/rest/comments");

    }

    @GET
    @Path("{isbn}")
    @Produces(MediaType.APPLICATION_JSON)
    public void bookAndComment(@Suspended final AsyncResponse asyncResponse, @PathParam("isbn") String isbn) {
    //RxJava code shown below
    }
}

Basically we create a new service. In this case URLs of both services we are going to connect are hardcoded. This is done for academic purpose but in production-like code you will inject it from a producer class or from properties file or any system you will use for this purpose. Then we create javax.ws.rs.client.WebTarget for consuming Restful Web Service.

After that we need to implement the bookAndComment method using RxJava API.

The main class used in RxJava is rx.Observable. This class is an observable as his name suggest and it is the responsible of firing events for pushing objects. By default events are synchronous and it is responsible of developer to make them asynchronous.

So we need one asynchronous observable instance for each service:

public Observable<JsonObject> getBookInfo(final String isbn) {
        return Observable.create((Observable.OnSubscribe<JsonObject>) subscriber -> {

            Runnable r = () -> {
                subscriber.onNext(bookServiceTarget.path(isbn).request().get(JsonObject.class));
                subscriber.onCompleted();
            };

            executor.execute(r);

        });
}

Basically we create an Observable that will execute the specified function when a Subscriber subscribes to it. The function is created using a lambda expression to avoid creating nested inner classes. In this case we are returning a JsonObject as a result of calling the bookinfo service. The result is passed to onNext method so subscribers can receive the result. Because we want to execute this logic asynchronously, the code is wrapped inside a Runnable block.

Also it is required to call the onCompleted method when all logic is done.

Notice that because we want to make observable asynchronous apart of creating a Runnable, we are using an Executor to run the logic in separate thread. One of the great additions in Java EE 7 is a managed way to create threads inside a container. In this case we are using ManagedExecutorService provided by container to span a task asynchronously in a different thread of the current one.

public Observable<JsonArray> getComments(final String isbn) {
        return Observable.create((Observable.OnSubscribe<JsonArray>) subscriber -> {

            Runnable r = () -> {
                subscriber.onNext(commentServiceTarget.path(isbn).request().get(JsonArray.class));
                subscriber.onCompleted();
            };

            executor.execute(r);

        });
}

Similar to previous but instead of getting book info we are getting an array of comments.

Then we need to create an observable in charge of zipping both responses when both of them are available. And this is done by using zip method on Observable class which receives two Observables and applies a function to combine the result of both of them. In this case a lambda expression that creates a new json object appending both responses.

@GET
@Path("{isbn}")
@Produces(MediaType.APPLICATION_JSON)
public void bookAndComment(@Suspended final AsyncResponse asyncResponse, @PathParam("isbn") String isbn) {
    //Calling previous defined functions
    Observable<JsonObject> bookInfo = getBookInfo(isbn);
    Observable<JsonArray> comments = getComments(isbn);

    Observable.zip(bookInfo, comments, (JsonObject book, JsonArray bookcomments) ->
                    Json.createObjectBuilder().add("book", book).add("comments", bookcomments).build()
                  )
                  .subscribe(new Subscriber<JsonObject>() {
                        @Override
                        public void onCompleted() {
                        }
                
                        @Override
                        public void onError(Throwable e) {
                            asyncResponse.resume(e);
                        }

                        @Override
                        public void onNext(JsonObject jsonObject) {
                            asyncResponse.resume(jsonObject);
                        }
                    });
}

Let’s take a look of previous service. We are using one of the new additions in Java EE which is Jax-Rs 2.0 asynchronous REST endpoints by using @Suspended annotation. Basically what we are doing is freeing server resources and generating the response when it is available using the resume method.

And finally a test. We are using Wildfly 8.1 as Java EE 7 server and Arquillian. Because each service may be deployed in different server, we are going to deploy each service in different war but inside same server.

So in this case we are going to deploy three war files which is totally easy to do it in Arquillian.

@RunWith(Arquillian.class)
public class BookTest {

    @Deployment(testable = false, name = "bookservice")
    public static WebArchive createDeploymentBookInfoService() {
        return ShrinkWrap.create(WebArchive.class, "bookservice.war").addClasses(BookInfoService.class, ApplicationResource.class);
    }

    @Deployment(testable = false, name = "bookcomments")
    public static WebArchive createDeploymentCommentsService() {
        return ShrinkWrap.create(WebArchive.class, "bookcomments.war").addClasses(CommentsService.class, ApplicationResource.class);
    }

    @Deployment(testable = false, name = "book")
    public static WebArchive createDeploymentBookService() {
        WebArchive webArchive = ShrinkWrap.create(WebArchive.class, "book.war").addClasses(BookService.class, ApplicationResource.class)
                .addAsLibraries(Maven.resolver().loadPomFromFile("pom.xml").resolve("com.netflix.rxjava:rxjava-core").withTransitivity().as(JavaArchive.class));
        return webArchive;
    }

    @ArquillianResource
    URL base;

    @Test
    @OperateOnDeployment("book")
    public void should_return_book() throws MalformedURLException {

        Client client = ClientBuilder.newClient();
        JsonObject book = client.target(URI.create(new URL(base, "rest/").toExternalForm())).path("book/1111").request().get(JsonObject.class);

        //assertions
    }
}

In this case client will request all information from a book. In server part zip method will wait until book and comments are retrieved in parallel and then will combine both responses to a single object and sent back to client.

This is a very simple example of RxJava. In fact in this case we have only seen how to use zip method, but there are many more methods provided by RxJava that are so useful as well like take(), map(), merge(), … (https://github.com/Netflix/RxJava/wiki/Alphabetical-List-of-Observable-Operators)

Moreover in this example we have seen only an example of connecting to two services and retrieving information in parallel, and you may wonder why not to use Future class. It is totally fine to use Future and Callbacks in this example but probably in your real life your logic won’t be as easy as zipping two services. Maybe you will have more services, maybe you will need to get information from one service and then for each result open a new connection. As you can see you may start with two Future instances but finishing with a bunch of Future.get() methods, timeouts, … So it is in these situations where RxJava really simplify the development of the application.

Furthermore we have seen how to use some of the new additions of Java EE 7 like how to develop an asynchronous Restful service with Jax-Rs.

In this post we have learnt how to deal with the interconnection between services andhow to make them scalable and less resource consume. But we have not talked about what’s happening when one of these services fails. What’s happening with the callers? Do we have a way to manage it? Is there a way to not spent resources when one of the service is not available? We will touch this in next post talking about fault tolerance.

We keep learning,

Alex.


Bon dia, bon dia! Bon dia al dematí! Fem fora la mandra I saltem corrents del llit. (Bon Dia! – Dàmaris Gelabert)

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Valery
Valery
10 years ago

Nice article!
But I would partly disagree with your conclusion at the end.
Though using plain Futures is indeed painful, there is also a CompletableFuture in Java 8 that may replace RxJava Observables in many cases.

Back to top button