When the Java 8 Streams API is not Enough
Java 8 was – as always – a release of compromises and backwards-compatibility. A release where the JSR-335 expert group might not have agreed upon scope or feasibility of certain features with some of the audience. See some concrete explanations by Brian Goetz about why …
- … “final” is not allowed in Java 8 default methods
- … “synchronized” is not allowed in Java 8 default methods
But today we’re going to focus on the Streams API’s “short-comings”, or as Brian Goetz would probably put it: things out of scope given the design goals.
Parallel Streams?
Parallel computing is hard, and it used to be a pain. People didn’t exactly love the new (now old) Fork / Join API, when it was first shipped with Java 7. Conversely, and clearly, the conciseness of calling Stream.parallel()
is unbeatable.
But many people don’t actually need parallel computing (not to be confused with multi-threading!). In 95% of all cases, people would have probably preferred a more powerful Streams API, or perhaps a generally more powerful Collections API with lots of awesome methods on various Iterable
subtypes.
Changing Iterable
is dangerous, though. Even a no-brainer as transforming an Iterable
into a Stream
via a potential Iterable.stream()
method seems to risk opening pandora’s box!.
Sequential Streams!
So if the JDK doesn’t ship it, we create it ourselves!
Streams are quite awesome per se. They’re potentially infinite, and that’s a cool feature. Mostly – and especially with functional programming – the size of a collection doesn’t really matter that much, as we transform element by element using functions.
If we admit Streams to be purely sequential, then we could have any of these pretty cool methods as well (some of which would also be possible with parallel Streams):
cycle()
– a guaranteed way to make every stream infiniteduplicate()
– duplicate a stream into two equivalent streamsfoldLeft()
– a sequential and non-associative alternative toreduce()
foldRight()
– a sequential and non-associative alternative toreduce()
limitUntil()
– limit the stream to those records before the first one to satisfy a predicatelimitWhile()
– limit the stream to those records before the first one not to satisfy a predicatemaxBy()
– reduce the stream to the maximum mapped valueminBy()
– reduce the stream to the minimum mapped valuepartition()
– partition a stream into two streams, one satisfying a predicate and the other not satisfying the same predicatereverse()
– produce a new stream in inverse orderskipUntil()
– skip records until a predicate is satisifiedskipWhile()
– skip records as long as a predicate is satisfiedslice()
– take a slice of the stream, i.e. combineskip()
andlimit()
splitAt()
– split a stream into two streams at a given positionunzip()
– split a stream of pairs into two streamszip()
– merge two streams into a single stream of pairszipWithIndex()
– merge a stream with its corresponding stream of indexes into a single stream of pairs
jOOλ’s new Seq type does all that
All of the above is part of jOOλ. jOOλ (pronounced “jewel”, or “dju-lambda”, also written jOOL in URLs and such) is an ASL 2.0 licensed library that emerged from our own development needs when implementing jOOQ integration tests with Java 8. Java 8 is exceptionally well-suited for writing tests that reason about sets, tuples, records, and all things SQL.
But the Streams API just slightly feels insufficient, so we have wrapped JDK’s Streams into our own Seq
type (Seq for sequence / sequential Stream):
// Wrap a stream in a sequence Seq<Integer> seq1 = seq(Stream.of(1, 2, 3)); // Or create a sequence directly from values Seq<Integer> seq2 = Seq.of(1, 2, 3);
We’ve made Seq
a new interface that extends the JDK Stream
interface, so you can use Seq
fully interoperably with other Java APIs – leaving the existing methods unchanged:
public interface Seq<T> extends Stream<T> { /** * The underlying {@link Stream} implementation. */ Stream<T> stream(); // [...] }
Now, functional programming is only half the fun if you don’t have tuples. Unfortunately, Java doesn’t have built-in tuples and while it is easy to create a tuple library using generics, tuples are still second-class syntactic citizens when comparing Java to Scala, for instance, or C# and even VB.NET.
Nonetheless…
jOOλ also has tuples
We’ve run a code-generator to produce tuples of degree 1-8 (we might add more in the future, e.g. to match Scala’s and jOOQ’s “magical” degree 22).
And if a library has such tuples, the library also needs corresponding functions. The essence of these TupleN
and FunctionN
types is summarised as follows:
public class Tuple3<T1, T2, T3> implements Tuple, Comparable<Tuple3<T1, T2, T3>>, Serializable, Cloneable { public final T1 v1; public final T2 v2; public final T3 v3; // [...] }
and
@FunctionalInterface public interface Function3<T1, T2, T3, R> { default R apply(Tuple3<T1, T2, T3> args) { return apply(args.v1, args.v2, args.v3); } R apply(T1 v1, T2 v2, T3 v3); }
There are many more features in Tuple types, but let’s leave them out for today.
On a side note, I’ve recently had an interesting discussion with Gavin King (the creator of Hibernate) on reddit. From an ORM perspective, Java classes seem like a suitable implementation for SQL / relational tuples, and they are indeed. From an ORM perspective.
But classes and tuples are fundamentally different, which is a very subtle issue with most ORMs – e.g. as explained here by Vlad Mihalcea.
Besides, SQL’s notion of row value expressions (i.e. tuples) is quite different from what can be modelled with Java classes. This topic will be covered in a subsequent blog post.
Some jOOλ examples
With the aforementioned goals in mind, let’s see how the above API can be put to work by example:
zipping
// (tuple(1, "a"), tuple(2, "b"), tuple(3, "c")) Seq.of(1, 2, 3).zip(Seq.of("a", "b", "c")); // ("1:a", "2:b", "3:c") Seq.of(1, 2, 3).zip( Seq.of("a", "b", "c"), (x, y) -> x + ":" + y ); // (tuple("a", 0), tuple("b", 1), tuple("c", 2)) Seq.of("a", "b", "c").zipWithIndex(); // tuple((1, 2, 3), (a, b, c)) Seq.unzip(Seq.of( tuple(1, "a"), tuple(2, "b"), tuple(3, "c") ));
This is already a case where tuples have become very handy. When we “zip” two streams into one, we want a wrapper value type that combines both values. Classically, people might’ve used Object[]
for quick-and-dirty solutions, but an array doesn’t indicate attribute types or degree.
Unfortunately, the Java compiler cannot reason about the effective bound of the <T>
type in Seq<T>
. This is why we can only have a static unzip()
method (instead of an instance one), whose signature looks like this:
// This works static <T1, T2> Tuple2<Seq<T1>, Seq<T2>> unzip(Stream<Tuple2<T1, T2>> stream) { ... } // This doesn't work: interface Seq<T> extends Stream<T> { Tuple2<Seq<???>, Seq<???>> unzip(); }
Skipping and limiting
// (3, 4, 5) Seq.of(1, 2, 3, 4, 5).skipWhile(i -> i < 3); // (3, 4, 5) Seq.of(1, 2, 3, 4, 5).skipUntil(i -> i == 3); // (1, 2) Seq.of(1, 2, 3, 4, 5).limitWhile(i -> i < 3); // (1, 2) Seq.of(1, 2, 3, 4, 5).limitUntil(i -> i == 3);
Other functional libraries probably use different terms than skip (e.g. drop) and limit (e.g. take). It doesn’t really matter in the end. We opted for the terms that are already present in the existing Stream API: Stream.skip()
and Stream.limit()
Folding
// "abc" Seq.of("a", "b", "c").foldLeft("", (u, t) -> t + u); // "cba" Seq.of("a", "b", "c").foldRight("", (t, u) -> t + u);
The Stream.reduce()
operations are designed for parallelisation. This means that the functions passed to it must have these important attributes:
But sometimes, you really want to “reduce” a stream with functions that do not have the above attributes, and consequently, you probably don’t care about the reduction being parallelisable. This is where “folding” comes in.
A nice explanation about the various differences between reducing and folding (in Scala) can be seen here.
Splitting
// tuple((1, 2, 3), (1, 2, 3)) Seq.of(1, 2, 3).duplicate(); // tuple((1, 3, 5), (2, 4, 6)) Seq.of(1, 2, 3, 4, 5, 6).partition(i -> i % 2 != 0) // tuple((1, 2), (3, 4, 5)) Seq.of(1, 2, 3, 4, 5).splitAt(2);
The above functions all have one thing in common: They operate on a single stream in order to produce two new streams, that can be consumed independently.
Obviously, this means that internally, some memory must be consumed to keep buffers of partially consumed streams. E.g.
- duplication needs to keep track of all values that have been consumed in one stream, but not in the other
- partitioning needs to fast forward to the next value that satisfies (or doesn’t satisfy) the predicate, without losing all the dropped values
- splitting might need to fast forward to the split index
For some real functional fun, let’s have a look at a possible splitAt()
implementation:
static <T> Tuple2<Seq<T>, Seq<T>> splitAt(Stream<T> stream, long position) { return seq(stream) .zipWithIndex() .partition(t -> t.v2 < position) .map((v1, v2) -> tuple( v1.map(t -> t.v1), v2.map(t -> t.v1) )); }
… or with comments:
static <T> Tuple2<Seq<T>, Seq<T>> splitAt(Stream<T> stream, long position) { // Add jOOλ functionality to the stream // -> local Type: Seq<T> return seq(stream) // Keep track of stream positions // with each element in the stream // -> local Type: Seq<Tuple2<T, Long>> .zipWithIndex() // Split the streams at position // -> local Type: Tuple2<Seq<Tuple2<T, Long>>, // Seq<Tuple2<T, Long>>> .partition(t -> t.v2 < position) // Remove the indexes from zipWithIndex again // -> local Type: Tuple2<Seq<T>, Seq<T>> .map((v1, v2) -> tuple( v1.map(t -> t.v1), v2.map(t -> t.v1) )); }
Nice, isn’t it? A possible implementation for partition()
, on the other hand, is a bit more complex. Here trivially with Iterator
instead of the new Spliterator
:
static <T> Tuple2<Seq<T>, Seq<T>> partition( Stream<T> stream, Predicate<? super T> predicate ) { final Iterator<T> it = stream.iterator(); final LinkedList<T> buffer1 = new LinkedList<>(); final LinkedList<T> buffer2 = new LinkedList<>(); class Partition implements Iterator<T> { final boolean b; Partition(boolean b) { this.b = b; } void fetch() { while (buffer(b).isEmpty() && it.hasNext()) { T next = it.next(); buffer(predicate.test(next)).offer(next); } } LinkedList<T> buffer(boolean test) { return test ? buffer1 : buffer2; } @Override public boolean hasNext() { fetch(); return !buffer(b).isEmpty(); } @Override public T next() { return buffer(b).poll(); } } return tuple( seq(new Partition(true)), seq(new Partition(false)) ); }
I’ll let you do the exercise and verify the above code.
Get and contribute to jOOλ, now!
All of the above is part of jOOλ, available for free from GitHub. There is already a partially Java-8-ready, full-blown library called functionaljava, which goes much further than jOOλ.
Yet, we believe that all what’s missing from Java 8’s Streams API is really just a couple of methods that are very useful for sequential streams.
In a previous post, we’ve shown how we can bring lambdas to String-based SQL using a simple wrapper for JDBC (of course, we still believe that you should use jOOQ instead).
Today, we’ve shown how we can write awesome functional and sequential Stream processing very easily, with jOOλ.
Stay tuned for even more jOOλ goodness in the near future (and pull requests are very welcome, of course!)
Reference: | When the Java 8 Streams API is not Enough from our JCG partner Lukas Eder at the JAVA, SQL, AND JOOQ blog. |