Core Java

Playing with Java 8 – Lambdas and Concurrency

So Java 8 was released a while back, with a ton of features and changes. All us Java zealots have been waiting for this for ages, all the way back to from when they originally announced all the great features that will be in Java 7, which ended up being pulled.

I have just recently had the time to actually start giving it a real look, I updated my home projects to 8 and I have to say I am generally quite happy with what we got. The java.time API the “mimics” JodaTime is a big improvement, the java.util.stream package is going useful, lambdas are going to change our coding style, which might take a bit of getting used to and with those changes… the quote, “With great power comes great responsibility” rings true, I sense there may be some interesting times in our future, as is quite easy to write some hard to decipher code. As an example debugging the code I wrote below would be “fun”…

The file example is on my Github blog repo

What this example does is simple, run couple threads, do some work concurrently, then wait for them all to complete. I figured while I am playing with Java 8, let me go for it fully…
Here’s what I came up with:

package net.briandupreez.blog.java8.futures;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;


/**
 * Generified future running and completion
 *
 * @param <T> the result type
 * @param <S> the task input
 */
public class WaitingFuturesRunner<T, S> {
    private transient static final Log logger = LogFactory.getLog(WaitingFuturesRunner.class);
    private final Collection<Task<T, S>> tasks;
    private final long timeOut;
    private final TimeUnit timeUnit;
    private final ExecutorService executor;

    /**
     * Constructor, used to initialise with the required tasks
     *
     * @param tasks the list of tasks to execute
     * @param timeOut  max length of time to wait
     * @param timeUnit     time out timeUnit
     */
    public WaitingFuturesRunner(final Collection<Task<T, S>> tasks, final long timeOut, final TimeUnit timeUnit) {
        this.tasks = tasks;
        this.timeOut = timeOut;
        this.timeUnit = timeUnit;
        this.executor = Executors.newFixedThreadPool(tasks.size());
    }

    /**
     * Go!
     *
     * @param taskInput          The input to the task
     * @param consolidatedResult a container of all the completed results
     */
    public void go(final S taskInput, final ConsolidatedResult<T> consolidatedResult) {
        final CountDownLatch latch = new CountDownLatch(tasks.size());
        final List<CompletableFuture<T>> theFutures = tasks.stream()
                .map(aSearch -> CompletableFuture.supplyAsync(() -> processTask(aSearch, taskInput, latch), executor))
                .collect(Collectors.<CompletableFuture<T>>toList());

        final CompletableFuture<List<T>> allDone = collectTasks(theFutures);
        try {
            latch.await(timeOut, timeUnit);
            logger.debug("complete... adding results");
            allDone.get().forEach(consolidatedResult::addResult);
        } catch (final InterruptedException | ExecutionException e) {
            logger.error("Thread Error", e);
            throw new RuntimeException("Thread Error, could not complete processing", e);
        }
    }

    private <E> CompletableFuture<List<E>> collectTasks(final List<CompletableFuture<E>> futures) {
        final CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        return allDoneFuture.thenApply(v -> futures.stream()
                        .map(CompletableFuture<E>::join)
                        .collect(Collectors.<E>toList())
        );
    }

    private T processTask(final Task<T, S> task, final S searchTerm, final CountDownLatch latch) {
        logger.debug("Starting: " + task);
        T searchResults = null;
        try {
            searchResults = task.process(searchTerm, latch);
        } catch (final Exception e) {
            e.printStackTrace();
        }
        return searchResults;
    }

}

Test:

package net.briandupreez.blog.java8.futures;

import net.briandupreez.blog.java8.futures.example.StringInputTask;
import net.briandupreez.blog.java8.futures.example.StringResults;
import org.apache.log4j.BasicConfigurator;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * Test
 * Created by brian on 4/26/14.
 */
public class CompletableFuturesRunnerTest {

    @BeforeClass
    public static void init() {
        BasicConfigurator.configure();
    }

    /**
     *  5tasks at 3000ms concurrently should not be more than 3100
     * @throws Exception error
     */
    @Test(timeout = 3100)
    public void testGo() throws Exception {
        final List<Task<String, String>> taskList = setupTasks();

        final WaitingFuturesRunner<String, String> completableFuturesRunner = new WaitingFuturesRunner<>(taskList, 4, TimeUnit.SECONDS);
        final StringResults consolidatedResults = new StringResults();

        completableFuturesRunner.go("Something To Process", consolidatedResults);

        Assert.assertEquals(5, consolidatedResults.getResults().size());
        for (final String s : consolidatedResults.getResults()) {
            Assert.assertTrue(s.contains("complete"));
            Assert.assertTrue(s.contains("Something To Process"));
        }


    }

    private List<Task<String, String>> setupTasks() {
        final List<Task<String, String>> taskList = new ArrayList<>();
        final StringInputTask stringInputTask = new StringInputTask("Task 1");
        final StringInputTask stringInputTask2 = new StringInputTask("Task 2");
        final StringInputTask stringInputTask3 = new StringInputTask("Task 3");
        final StringInputTask stringInputTask4 = new StringInputTask("Task 4");
        final StringInputTask stringInputTask5 = new StringInputTask("Task 5");
        taskList.add(stringInputTask);
        taskList.add(stringInputTask2);
        taskList.add(stringInputTask3);
        taskList.add(stringInputTask4);
        taskList.add(stringInputTask5);
        return taskList;
    }
}

Output:

0 [pool-1-thread-1] Starting: StringInputTask{taskName='Task 1'}

0 [pool-1-thread-5] Starting: StringInputTask{taskName='Task 5'}

0 [pool-1-thread-2] Starting: StringInputTask{taskName='Task 2'}

2 [pool-1-thread-4] Starting: StringInputTask{taskName='Task 4'}

2 [pool-1-thread-3] Starting: StringInputTask{taskName='Task 3'}

3003 [pool-1-thread-5] Done: Task 5

3004 [pool-1-thread-3] Done: Task 3

3003 [pool-1-thread-1] Done: Task 1

3003 [pool-1-thread-4] Done: Task 4

3003 [pool-1-thread-2] Done: Task 2

3007 [Thread-0] WaitingFuturesRunner  - complete... adding results

Some of the useful articles / links I found and read while doing this:

Oracle: Lambda Tutorial

IBM: Java 8 Concurrency

Tomasz Nurkiewicz : Definitive Guide to CompletableFuture

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

4 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Toshi Mochizuki
Toshi Mochizuki
9 years ago

Thanks

Alex
Alex
8 years ago

Good stuff.

chenjiannew
chenjiannew
6 years ago

private final Collection<Task> tasks;
Task ?

FasterXu
FasterXu
6 years ago

why do you use the package name “future” ,but not “feature”

Back to top button