Enterprise Java

flatMap() vs. concatMap() vs. concatMapEager() – RxJava FAQ

There are three, seamlessly similar operators in RxJava 2.x: flatMap()concatMap() and concatMapEager(). All of them accept the same argument – a function from original stream’s individual item to a (sub-)stream of arbitrary type. In other words if you have a Flowable<T> you provide a function from T to Flowable<R> for arbitrary R type. After applying any of these operators you end up with Flowable<R>. So how are they different?

Sample project

First let’s build a sample application. We will use Retrofit2 HTTP client wrapper that has built-in plugins for RxJava2. Our task is to leverage GeoNames API in order to find the population of any city in the world. The interface looks as follows:

public interface GeoNames {
 
    Flowable<Long> populationOf(String city);
 
}

The implementation of this interface is auto-generated by Retrofit, scroll down to see glue source code. For the time being just assume we have a function that takes a String with city name and asynchronously returns a one-element stream with a population of that city. Also assume we have a fixed stream of cities we want to look up:

Flowable<String> cities = Flowable.just(
    "Warsaw", "Paris", "London", "Madrid"
);

Our goal is to fetch population of each city.

concatMap(): process upstream sequentially

The sample application with concatMap() looks as follows:

cities
        .concatMap(geoNames::populationOf)
        .subscribe(response -> log.info("Population: {}", response));

Before we see the outcome let’s study what concatMap() is doing underneath. For each upstream event (city) it invokes a function that replaces that event with a (sub)stream. In our case it’s a one-element stream of Long (Flowable<Long>). So with all operators we are comparing we end up with a stream of streams of Long (Flowable<Flowable<Long>>). The real difference arises when we analyze what the operator is doing in order to flatten such nested stream.

concatMap() will first subscribe to the very first substream (Flowable<Long> representing population of Warsaw). By subscribing we actually mean making the physical HTTP call. Only when the first substream completes (emits a single Long in our case and signals completion) concatMap() will continue. Continuing means subscribing to the second substream and waiting for it to complete. The resulting stream completes when the very last substream completes. This leads to a following stream: 1702139, 2138551, 7556900 and 3255944. These happen to be populations of Warsaw, Paris, London and Madrid, accordingly. The order of output is entirely predictable. However it’s also entirely sequential. No concurrency happens at all, we make second HTTP call only when the first one completed. The added complexity of RxJava doesn’t pay off at all:

23:33:33.531 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
23:33:33.656 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (123ms)
23:33:33.674 | Rx-1 | Population: 1702139
23:33:33.676 | Rx-1 | --> GET .../searchJSON?q=Paris http/1.1
23:33:33.715 | Rx-1 | <-- 200 OK .../searchJSON?q=Paris (38ms)
23:33:33.715 | Rx-1 | Population: 2138551
23:33:33.716 | Rx-1 | --> GET .../searchJSON?q=London http/1.1
23:33:33.754 | Rx-1 | <-- 200 OK .../searchJSON?q=London (37ms)
23:33:33.754 | Rx-1 | Population: 7556900
23:33:33.755 | Rx-1 | --> GET .../searchJSON?q=Madrid http/1.1
23:33:33.795 | Rx-1 | <-- 200 OK .../searchJSON?q=Madrid (40ms)
23:33:33.796 | Rx-1 | Population: 3255944

As you can see no multithreading occurs, requests are sequential, waiting for each other. Technically not all of them must happen in the same thread, but they never overlap and take advantage of concurrency. The big plus is guaranteed order of resulting events, which is not that obvious once we jump into flatMap()

flatMap(): processing results on-the-fly, out-of-order

flatMap() code is almost exactly the same:

cities
        .flatMap(geoNames::populationOf)
        .subscribe(response -> log.info("Population: {}", response));

And just like before we start with a stream of streams of Long (Flowable<Flowable<Long>>). However rather than subscribing to each substream one after another, flatMap() operator eagerly subscribes to all substreams at once. This means we see multiple HTTP requests being initiated at the same time in different threads:

00:10:04.919 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
00:10:04.919 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
00:10:04.919 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
00:10:04.919 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
00:10:05.449 | Rx-3 | <-- 200 OK .../searchJSON (529ms)
00:10:05.462 | Rx-3 | Population: 7556900
00:10:05.477 | Rx-1 | <-- 200 OK .../searchJSON (557ms)
00:10:05.478 | Rx-1 | Population: 1702139
00:10:05.751 | Rx-4 | <-- 200 OK .../searchJSON (831ms)
00:10:05.752 | Rx-4 | Population: 3255944
00:10:05.841 | Rx-2 | <-- 200 OK .../searchJSON (922ms)
00:10:05.843 | Rx-2 | Population: 2138551

When any of the underlying substreams emit any value, it is immediately passed downstream to the subscriber. This means we can now process events on-the-fly, as they are produced. Notice that the resulting stream is out-of-order. The first event we received is 7556900, which happens to be the population of London, second in the initial stream. Contrary to concatMap()flatMap() can’t preserve order, thus emits values in “random” order. Well, not really random, we simply receive values as soon as they are available. In this particular execution HTTP response for London came first, but there is absolutely no guarantee for that. This leads to an interesting problem. We have a stream of various population values and initial stream of cities. However the output stream can be an arbitrary permutation of events and we have no idea which population corresponds to which city. We will address this problem in a subsequent article.

concatMapEager(): concurrent, in-order, but somewhat expensive

concatMapEager() seems to bring the best of both worlds: concurrency and guaranteed order of output events:

cities
        .concatMapEager(geoNames::populationOf)
        .subscribe(response -> log.info("Population: {}", response));

After learning what concatMap() and flatMap() are doing, understanding concatMapEager() is fairly simple. Having stream of streams concatMapEager() eagerly (duh!) subscribes to all substreams at the same time, concurrently. However this operator makes sure that results from the first substream are propagated first, even if it’s not the first one to complete. An example will quickly reveal what this means:

00:34:18.371 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
00:34:18.371 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
00:34:18.371 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
00:34:18.371 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
00:34:18.517 | Rx-3 | <-- 200 OK .../searchJSON?q=London (143ms)
00:34:18.563 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (189ms)
00:34:18.565 | Rx-1 | Population: 1702139
00:34:20.460 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (2086ms)
00:34:20.460 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (2086ms)
00:34:20.461 | Rx-2 | Population: 2138551
00:34:20.462 | Rx-2 | Population: 7556900
00:34:20.462 | Rx-2 | Population: 3255944

We initiate four HTTP requests instantly. From the log output we clearly see that the population of London was returned first. However the subscriber did not receive it because population of Warsaw didn’t arrive yet. By coincidence Warsaw completed second so at this point the population of Warsaw can be passed downstream to a subscriber. Unfortunately population of London must wait even more because first we need a population of Paris. Once Paris (immediately followed by Madrid) completes, all remaining results are passed downstream.

Notice how population of London, even though available, must wait dormant until Warsaw and Paris complete. So is concatMapEager() the best possible operator for concurrency? Not quite. Imagine we have a list of thousand cities and for each one we fetch a single 1MB picture. With concatMap() we download pictures sequentially, i.e. slowly. With flatMap() pictures are downloaded concurrently and processed as they arrive, as soon as possible. Now what about concatMapEager()? In worst case scenario we can end up with concatMapEager() buffering 999 pictures because picture from the very first city happens to be the slowest. Even though we already have 99.9% of the results we cannot process them because we enforce strict ordering.

Which operator to use?

flatMap() should be your first weapon of choice. It allows efficient concurrency with streaming behavior. But be prepared to receive results out-of-order. concatMap() works well only when provided transformation is so fast the sequential processing is not a problem. concatMapEager() is very convenient, but watch out for memory consumption. Also in worst case scenario you may end up sitting idle, waiting for very few responses.

Appendix: configuring Retrofit2 client

The GeoNames service interface that we used throughout this article in fact looks like this:

public interface GeoNames {
 
    @GET("/searchJSON")
    Single<SearchResult> search(
            @Query("q") String query,
            @Query("maxRows") int maxRows,
            @Query("style") String style,
            @Query("username") String username
    );
 
    default Flowable<Long> populationOf(String city) {
        return search(city, 1, "LONG", "s3cret")
                .map(SearchResult::getGeonames)
                .map(g -> g.get(0))
                .map(Geoname::getPopulation)
                .toFlowable();
    }
 
}

The implementation of non-default method is auto-generated by Retrofit2. Notice that populationOf() returns a one-element Flowable<Long> for simplicity’s sake. However to fully embrace the nature of this API other implementations would be more reasonable in real world. First of all the SearchResult class returns an ordered list of results (getters/setters omitted):

class SearchResult {
    private List<Geoname> geonames = new ArrayList<>();
}
 
class Geoname {
    private double lat;
    private double lng;
    private Integer geonameId;
    private Long population;
    private String countryCode;
    private String name;
}

After all there are many Warsaws and Londons in the world. We silently assume the list will contain at least one element and the first one is the right match. More appropriate implementation should either return all hits or even better Maybe<Long> type to reflect no matches:

default Maybe<Long> populationOf(String city) {
    return search(city, 1, "LONG", "nurkiewicz")
            .flattenAsFlowable(SearchResult::getGeonames)
            .map(Geoname::getPopulation)
            .firstElement();
}

The glue code looks as follows. First Jackson’s setup in order to parse response from the API:

import com.fasterxml.jackson.databind.ObjectMapper;
 
private ObjectMapper objectMapper() {
    return new ObjectMapper()
            .configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
}

FAIL_ON_UNKNOWN_PROPERTIES is often what you desire. Otherwise you have to map all fields from JSON response and your code will break when API producer introduces new, otherwise backward compatible fields. Then we setup OkHttpClient, used underneath by Retrofit:

import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
 
private OkHttpClient client() {
    HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
    interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
    return new OkHttpClient.Builder().addInterceptor(interceptor).build();
}

Sometimes you can skip the configuration of OkHttp client but we added logging interceptor. By default OkHttp logs using java.util.logging so in order to use decent logging framework we must install a bridge at the very beginning:

import org.slf4j.bridge.SLF4JBridgeHandler;
 
static {
    SLF4JBridgeHandler.removeHandlersForRootLogger();
    SLF4JBridgeHandler.install();
}

And finally Retrofit itself:

import io.reactivex.schedulers.Schedulers;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.jackson.JacksonConverterFactory;
 
GeoNames createClient() {
    return new Retrofit.Builder()
            .client(client())
            .baseUrl("http://api.geonames.org")
            .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
            .addConverterFactory(JacksonConverterFactory.create(objectMapper()))
            .build()
            .create(GeoNames.class);
}

Calling createClient() will yield a dynamic implementation of GeoNames interface. We used the following dependencies:

compile 'io.reactivex.rxjava2:rxjava:2.0.6'
 
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile 'com.squareup.retrofit2:converter-jackson:2.0.1'
compile 'com.squareup.okhttp3:logging-interceptor:3.8.0'
 
compile 'ch.qos.logback:logback-classic:1.1.7'
compile 'org.slf4j:slf4j-api:1.7.21'
compile 'org.slf4j:jul-to-slf4j:1.7.21'

Tomasz Nurkiewicz

Java EE developer, Scala enthusiast. Enjoying data analysis and visualization. Strongly believes in the power of testing and automation.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button