Cyclops-react Organises the Cambrian Explosion of Java 8 Libraries
What is Cyclops-react?
The arrival of Lambda expressions and default methods in Java 8 heralded the biggest structural changes to the Java language in a decade. Building on top of this were some new cool APIs, such as Stream, Optional, CompletableFuture
– finally Java developers could code in a more functional style. While this was very welcome, for many the enhancements did not quite go far enough.
Stream, Optional, CompletableFuture
all share the same abstract structure and obey the same rules. Yet the APIs don’t agree on common method names, never mind provide a common interface. For example Stream#map
/ Optional#map
becomes CompletableFuture#thenApply
. Also, the functionality added to Stream & Optional
is missing from collections generally. Where is List#map
?
The JDK Stream implementation performs well, is totally lazy and well designed for extension, but provides only a limited subset of potential operators (constrained, perhaps, by a focus on data parallelism). Into the void stepped libraries such as jOOλ with its sequential Stream extension (called Seq
). Seq
adds many additional Streaming operators. jOOλ generally adds many missing functional features such as Tuples.
A core goal of cyclops-react, as well as adding original features such as FutureStreams, is to provide a mechanism for joining up both the JDK APIs and the third party functional libraries. There was a Cambrian explosion of cool libraries that emerged after the launch of Java 8. Libraries like Javaslang & Project Reactor. cyclops-react does this in the first instance by extending the JDK, and by leveraging other libraries such as jOOλ, pCollections & Agrona. These libraries in turn also extend JDK interfaces where possible to add features such as Persistent Collections and wait free Many Producer Single Consumer Queues.
Beyond reusing and extending JDK interfaces our aims were to make it easy for developers to integrate with external libraries by making use of third party standards such as the reactive-streams API and by building our own abstractions where no set standard existed. The libraries we currently focus on integrating with are Google’s Guava, RxJava, Functional Java, Project Reactor and Javaslang. We’ve created abstractions for wrapping types like Stream, Optional & CompletableFuture
– where no interface existed or was possible before. We chose these goals, because we are using cyclops-react in production across a Microservices architecture and being able to leverage the right technology for a problem and have it integrate smoothly with the rest of our code base is critical.
cyclops-react is quite a large feature rich project, and in addition has a number of integration modules. In the article below I’ll cover some of the available features with a particular goal of showing how cyclops-react helps join up the dots across the JDK and into the brave new world of the pace setting Java 8 open source community.
Extending the JDK
cyclops-react extends JDK APIs where possible. For example ReactiveSeq
adds functionality for handling errors, asynchronous processing and much more extends extends both JDK Stream and jOOλ’s Seq. cyclops-react Collection extensions, rather than creating new collection implementations, implement and extend the appropriate JDK interfaces. cyclops-react LazyFutureStream
in turn extends ReactiveSeq
, and allows aggregate operations over Streams of Futures as if it were a simple Stream (this proves to be very useful for handling a large number typical Java I/O operations asynchronously and performantly).
ListX
extends List
, but adds operators that execute eagerly
ListX<Integer> tenTimes = ListX.of(1,2,3,4) .map(i->i*10);
cyclops-react adds lots of operators for users to explore. We can, for example, apply functions across multiple collections at the same time
The reactive-streams API acts as a natural bridge between producers (publishers) of data and consumers (subscribers). All cyclops-react data types implement the Publisher
interface from reactive-streams, and Subscriber
implementations that can convert to any cyclops-react type are provided also. This makes direct integration with other reactive-streams based libraries, such as Project Reactor straightforward.
For example we can lazily populate a Reactor Flux from any cyclops publisher, such as SortedSetX
, or populate a cyclops-react type from a Reactor type.
Flux<Integer> stream = Flux.from( SortedSetX.of(1,2,3,4,5,6,7,8)); //Flux[1,2,3,4,5,6,7,8] ListX<Character> list = ListX.fromPublisher( Flux.just("a","b","c"));
Reactor Flux and Mono types can work directly with cyclops-react For
comprehensions (each supported library also has their own set of native For
comprehension classes in their integration module).
// import static com.aol.cyclops.control.For.*; Publishers.each2( Flux.just(1,2,3), i -> ReactiveSeq.range(i,5),Tuple::tuple).printOut(); /* (1, 1) (1, 2) (1, 3) (1, 4) (2, 2) (2, 3) (2, 4) (3, 3) (3, 4) */
A For
comprehension is a way of managing nested iteration over types with flatMap and map methods, by cascading calls to the appropriate methods. In cyclops-react, nested statements can access the elements of the previous statements, so For
comprehensions can be a very useful way of managing the behavior of existing. For example to ensure that calls to existing methods findId and loadData which may return null values, and will throw NPEs if provided with a null parameter we can make use of a For
comprehension that will safely execute loadData only when an Optional with a value is returned from findId()
List<Data> data = For.optional(findId()) .optional(this::loadData); //loadData is only called if findId() returns a value
Similarly, a type such as Try could be used to handle exceptional results from either findId or loadData, Futures can be used to execute chained methods asynchronously and so on.
Building cross-library abstractions
Java 8 introduced Monads to Java (Stream, Optional, CompletableFuture
), but didn’t provide a common interface that would help reuse, in fact the method names used in CompletableFuture
differ significantly from those used in Optional & Stream
for the same function. So map
became thenApply
and flatMap thenCompose
. Across the Java 8 world monads are becoming an increasingly common pattern, but there is often no way to abstract across them. In cyclops-react, rather than attempt to define an interface to represent monads, we built a set of wrapper interfaces and a number of custom adapters to adapt different instances from across the main functional-style libraries for Java 8 to those wrappers. The wrappers extend AnyM
(short for Any Monad) and there are two sub-interfaces – AnyMValue
which represents any monadic type that resolves to a single value (like Optional
or CompletableFuture
) or AnyMSeq
that ultimately resolves to a sequence of values (like a Stream or List). The cyclops extension wrappers provide a mechanism to wrap the types from RxJava, Guava, Reactor, FunctionalJava and Javaslang.
//We can wrap any type from Reactor, RxJava, //FunctionalJava, Javaslang, Guava AnyMSeq<Integer> wrapped = Fj.list(List.list(1,2,3,4,5)); //And manipulate it AnyMSeq<Integer> timesTen = wrapped.map(i->i*10);
cyclops-react provides a common set of interfaces that these wrappers (and other cyclops-react types) inherit from, allowing developers to write more generic reusable code. AnyM
extends reactive-streams publishers, meaning you can make any Javaslang, Guava, FunctionalJava or RxJava type a reactive-streams publisher with cyclops-react.
AnyMSeq<Integer> wrapped = Javaslang.traversable(List.of(1,2,3,4,5)); //The wrapped type is a reactive-streams publisher Flux<Integer> fromJavaslang = Flux.from(wrapped); wrapped.forEachWithError( System.out::println, System.out::err);
Furthermore the reactive functionality from cyclops-react is provided directly on the AnyM types. This means we can, for example, schedule data emission from a Javaslang or FunctionalJava Stream – or execute a reduce operation lazily, or asynchronously.
AnyMSeq<Integer> wrapped = Javaslang.traversable(Stream.of(1,2,3,4,5)); CompletableFuture<Integer> asyncResult = wrapped.futureOperations(Executors.newFixedThreadPool(1)) .reduce(50, (acc, next) -> acc + next); //CompletableFuture[1550] AnyMSeq<Integer> wrapped = FJ.list(list.list(1,2,3,4,5)); Eval<Integer> lazyResult = wrapped.map(i -> i * 10) .lazyOperations() .reduce(50, (acc,next) -> acc + next); //Eval[15500] HotStream<Integer> emitting = wrapped.schedule( "0 * * * * ?", Executors.newScheduledThreadPool(1)); emitting.connect() .debounce(1,TimeUnit.DAYS) .forEachWithError( this::logSuccess, this::logFailure);
Theres a lot to explore both in cyclops-react and in the new broader Java 8 eco-system, hopefully you’ll have a fun adventure playing with, learning from and extending the Java 8 boundaries yourself!
Reference: | Cyclops-react Organises the Cambrian Explosion of Java 8 Libraries from our JCG partner Lukas Eder at the JAVA, SQL, AND JOOQ blog. |