Core Java

Cyclops-react Organises the Cambrian Explosion of Java 8 Libraries

What is Cyclops-react?

The arrival of Lambda expressions and default methods in Java 8 heralded the biggest structural changes to the Java language in a decade. Building on top of this were some new cool APIs, such as Stream, Optional, CompletableFuture – finally Java developers could code in a more functional style. While this was very welcome, for many the enhancements did not quite go far enough.

Stream, Optional, CompletableFuture all share the same abstract structure and obey the same rules. Yet the APIs don’t agree on common method names, never mind provide a common interface. For example Stream#map / Optional#map becomes CompletableFuture#thenApply. Also, the functionality added to Stream & Optional is missing from collections generally. Where is List#map ?

The JDK Stream implementation performs well, is totally lazy and well designed for extension, but provides only a limited subset of potential operators (constrained, perhaps, by a focus on data parallelism). Into the void stepped libraries such as jOOλ with its sequential Stream extension (called Seq). Seq adds many additional Streaming operators. jOOλ generally adds many missing functional features such as Tuples.

A core goal of cyclops-react, as well as adding original features such as FutureStreams, is to provide a mechanism for joining up both the JDK APIs and the third party functional libraries. There was a Cambrian explosion of cool libraries that emerged after the launch of Java 8. Libraries like Javaslang & Project Reactor. cyclops-react does this in the first instance by extending the JDK, and by leveraging other libraries such as  jOOλ, pCollections & Agrona. These libraries in turn also extend JDK interfaces where possible to add features such as Persistent Collections and wait free Many Producer Single Consumer Queues.

Beyond reusing and extending JDK interfaces our aims were to make it easy for developers to integrate with external libraries by making use of third party standards such as the reactive-streams API and by building our own abstractions where no set standard existed. The libraries we currently focus on integrating with are Google’s Guava, RxJava, Functional Java, Project Reactor and Javaslang. We’ve created abstractions for wrapping types like Stream, Optional & CompletableFuture – where no interface existed or was possible before. We chose these goals, because we are using cyclops-react in production across a Microservices architecture and being able to leverage the right technology for a problem and have it integrate smoothly with the rest of our code base is critical.

cyclops-react is quite a large feature rich project, and in addition has a number of integration modules. In the article below I’ll cover some of the available features with a particular goal of showing how cyclops-react helps join up the dots across the JDK and into the brave new world of the pace setting Java 8 open source community.

Extending the JDK

cyclops-react extends JDK APIs where possible. For example ReactiveSeq adds functionality for handling errors, asynchronous processing and much more extends extends both JDK Stream and jOOλ’s Seq. cyclops-react Collection extensions, rather than creating new collection implementations, implement and extend the appropriate JDK interfaces. cyclops-react LazyFutureStream in turn extends ReactiveSeq, and allows aggregate operations over Streams of Futures as if it were a simple Stream (this proves to be very useful for handling a large number typical Java I/O operations asynchronously and performantly).

ListX extends List, but adds operators that execute eagerly

ListX<Integer> tenTimes = ListX.of(1,2,3,4)
                               .map(i->i*10);

cyclops-react adds lots of operators for users to explore. We can, for example, apply functions across multiple collections at the same time

The reactive-streams API acts as a natural bridge between producers (publishers) of data and consumers (subscribers). All cyclops-react data types implement the Publisher interface from reactive-streams, and Subscriber implementations that can convert to any cyclops-react type are provided also. This makes direct integration with other reactive-streams based libraries, such as Project Reactor straightforward.

For example we can lazily populate a Reactor Flux from any cyclops publisher, such as SortedSetX, or populate a cyclops-react type from a Reactor type.

Flux<Integer> stream = Flux.from(
  SortedSetX.of(1,2,3,4,5,6,7,8));
//Flux[1,2,3,4,5,6,7,8]

ListX<Character> list = ListX.fromPublisher(
  Flux.just("a","b","c"));

Reactor Flux and Mono types can work directly with cyclops-react For comprehensions (each supported library also has their own set of native For comprehension classes in their integration module).

// import static com.aol.cyclops.control.For.*;
        
Publishers.each2(
  Flux.just(1,2,3), 
  i -> ReactiveSeq.range(i,5),Tuple::tuple).printOut();
        
/*
(1, 1)
(1, 2)
(1, 3)
(1, 4)
(2, 2)
(2, 3)
(2, 4)
(3, 3)
(3, 4)
*/

A For comprehension is a way of managing nested iteration over types with flatMap and map methods, by cascading calls to the appropriate methods. In cyclops-react, nested statements can access the elements of the previous statements, so For comprehensions can be a very useful way of managing the behavior of existing. For example to ensure that calls to existing methods findId and loadData which may return null values, and will throw NPEs if provided with a null parameter we can make use of a For comprehension that will safely execute loadData only when an Optional with a value is returned from findId()

List<Data> data = 
For.optional(findId())
   .optional(this::loadData);
//loadData is only called if findId() returns a value

Similarly, a type such as Try could be used to handle exceptional results from either findId or loadData, Futures can be used to execute chained methods asynchronously and so on.

Building cross-library abstractions

Java 8 introduced Monads to Java (Stream, Optional, CompletableFuture), but didn’t provide a common interface that would help reuse, in fact the method names used in CompletableFuture differ significantly from those used in Optional & Stream for the same function. So map became thenApply and flatMap thenCompose. Across the Java 8 world monads are becoming an increasingly common pattern, but there is often no way to abstract across them. In cyclops-react, rather than attempt to define an interface to represent monads, we built a set of wrapper interfaces and a number of custom adapters to adapt different instances from across the main functional-style libraries for Java 8 to those wrappers. The wrappers extend AnyM (short for Any Monad) and there are two sub-interfaces – AnyMValue which represents any monadic type that resolves to a single value (like Optional or CompletableFuture) or AnyMSeq that ultimately resolves to a sequence of values (like a Stream or List). The cyclops extension wrappers provide a mechanism to wrap the types from RxJava, Guava, Reactor, FunctionalJava and Javaslang.

//We can wrap any type from Reactor, RxJava,
//FunctionalJava, Javaslang, Guava
AnyMSeq<Integer> wrapped = 
  Fj.list(List.list(1,2,3,4,5));

//And manipulate it
AnyMSeq<Integer> timesTen = wrapped.map(i->i*10);

cyclops-react provides a common set of interfaces that these wrappers (and other cyclops-react types) inherit from, allowing developers to write more generic reusable code. AnyM extends reactive-streams publishers, meaning you can make any Javaslang, Guava, FunctionalJava or RxJava type a reactive-streams publisher with cyclops-react.

AnyMSeq<Integer> wrapped = 
  Javaslang.traversable(List.of(1,2,3,4,5));

//The wrapped type is a reactive-streams publisher
Flux<Integer> fromJavaslang = Flux.from(wrapped);

wrapped.forEachWithError(
  System.out::println,
  System.out::err);

Furthermore the reactive functionality from cyclops-react is provided directly on the AnyM types. This means we can, for example, schedule data emission from a Javaslang or FunctionalJava Stream – or execute a reduce operation lazily, or asynchronously.

AnyMSeq<Integer> wrapped = 
  Javaslang.traversable(Stream.of(1,2,3,4,5));

CompletableFuture<Integer> asyncResult = 
  wrapped.futureOperations(Executors.newFixedThreadPool(1))
         .reduce(50, (acc, next) -> acc + next);
//CompletableFuture[1550]

AnyMSeq<Integer> wrapped = 
  FJ.list(list.list(1,2,3,4,5));

Eval<Integer> lazyResult = 
  wrapped.map(i -> i * 10)
         .lazyOperations()
         .reduce(50, (acc,next) -> acc + next);
//Eval[15500]

HotStream<Integer> emitting = wrapped.schedule(
  "0 * * * * ?", 
  Executors.newScheduledThreadPool(1));

emitting.connect()
        .debounce(1,TimeUnit.DAYS)
        .forEachWithError(
           this::logSuccess,
           this::logFailure);

Theres a lot to explore both in cyclops-react and in the new broader Java 8 eco-system, hopefully you’ll have a fun adventure playing with, learning from and extending the Java 8 boundaries yourself!

John Mc clean

John is an Architect at AOL. He works in the ad tech and platforms group, where he leads the advertising demand side forecasting team. A team that builds and runs a system that processes billions of RTB, impression and viewability records in realtime to generate price volume curves and other forecasts for advertising campaigns in milliseconds. John is also the lead developer for AOL open source projects cyclops-react and Microserver. Extracted from AOL’s forecasting system these projects allow AOL to rapidly deploy new features that work at scale, by guiding Java developers along the path of functional, reactive, microservices.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button