Java Concurrency: Futures and Parallelism

In the previous post, we talked about thread pools, why they exist, and how executors help us to use them.

However, we just used them to create async tasks that are solved somewhen in the future, but, what if we want to wait for the async task to finish, or join multiple tasks and wait for all of them to finish?

In this post, we are going to see how futures helps us with that, also, streams using the parallel method.

TRY IT YOURSELF: You can find the source code of this post here.

Java Concurrency Series

If you like this post and are interested in hearing more about my journey as a Software Engineer, you can follow me on Twitter and travel together.

Submiting Tasks to Create Futures

In the previous post, we use the ExecutorService to execute async tasks, now, we can use the ExecutorService to submit tasks and create Futures to operate over that async tasks. The following is a basic example of a Future:

  @Test
  public void basicFuture()
          throws InterruptedException,
          ExecutionException, TimeoutException {
    ExecutorService executor = Executors
            .newSingleThreadExecutor();

    Future future = executor.submit(() -> {
      System.out.println(" Thread: " + Thread
              .currentThread().getName());
      return "Finished";
    });

    //Block until result is ready
    var result = future.get();
    //Block until result is ready or timeout
    var result = future.get(1, TimeUnit.SECONDS);
    //Cancel the task
    future.cancel(true);
    //Check if was cancelled
    future.isCancelled();
    //Check if is done
    future.isDone();
  }

There we can see how we submit the task to the ExecutorService (line 9) and return a Future. In the Future we can do some operations:

  • Block until the result is ready (line 15): This wil give the the result when is ready, and the current thread is going to be block until then.
  • Block until the result is ready or timeout (line 17): This wil give the the result when is ready, and the current thread is going to be block until then, or throw a TimeoutException if the result timeouts.
  • Cancel the task (line 19): Cancel the task, and if it is running, you can send true to tell the Future to try to interrump it, not garantee
  • Check if the task was cancelled (line 21)
  • Check if the task is done (line 23)

Using CompletableFuture

CompletableFuture is a subtype for Future, with a lot of cool operations and functional compositions. The following is an example:

  @Test
  public void completableFuture()
          throws InterruptedException,
          ExecutionException, TimeoutException {
    ExecutorService executor = Executors
            .newSingleThreadExecutor();
    
    Future completableFuture =
            CompletableFuture.completedFuture("Hello");
    
    CompletableFuture.supplyAsync(() -> {
      System.out.println(" Thread: " + Thread
              .currentThread().getName());
      return "Finished 1";
    }, executor);

  }

There, we can see on line 8 how to create a CompletableFuture which is already complete, and on line 11, you can see a CompletableFuture created using the supplyAsync method and passing the executor where to run that task. There are tons of methods con CompletableFuture that are interesting, like CompletableFuture.allOf which join several futures and wait of the result of all of them.

Stream Parallel

The Stream API brings us new features about data processing, including running in parallel some work using the Fork and Join framework, however, we should be careful on using this feature as it won’t work as we expect on all the cases.

The following is the first example I tried to improve the performance of a data streaming, just the pow of a number and finally reducing all of them with a sum operation:

  @Test
  public void noWrongParallel()
          throws InterruptedException {
    long nanosStart = System.nanoTime();

    OptionalInt total = IntStream.range(1, 1_000_000)
            .map(i -> i*i)
            .reduce(Integer::sum);

    long nanosEnd = System.nanoTime();

    System.out.println("Result = " + total.getAsInt());
    System.out.println("Time ms= " + ((nanosEnd - nanosStart)/1_000_000));
  }

  @Test
  public void wrongParallel()
          throws InterruptedException {
    long nanosStart = System.nanoTime();

    OptionalInt total = IntStream.range(1, 1_000_000)
            .parallel()
            .map(i -> i*i)
            .reduce(Integer::sum);

    long nanosEnd = System.nanoTime();

    System.out.println("Result = " + total.getAsInt());
    System.out.println("Time ms= " + ((nanosEnd - nanosStart)/1_000_000));
  }

When you run those two methods, the run time is pretty similar, and seems like the parallel doesn’t help too much.

Now, let’s try with the following use case: the pow of a number, filtering the odds and counting them.

  @Test
  public void noParallelGood()
          throws InterruptedException {
    long nanosStart = System.nanoTime();

    long total = IntStream.range(1, 1_000_000)
            .map(i -> i * i)
            .boxed()
            .filter(integer -> integer % 2 == 0)
            .count();

    long nanosEnd = System.nanoTime();

    System.out.println("Result = " + total);
    System.out.println("Time ms= " + ((nanosEnd - nanosStart)/1_000_000));
  }

  @Test
  public void parallelGood()
          throws InterruptedException {
    long nanosStart = System.nanoTime();

    long total = IntStream.range(1, 1_000_000)
            .parallel()
            .map(i -> i * i)
            .boxed()
            .filter(integer -> integer % 2 == 0)
            .count();

    long nanosEnd = System.nanoTime();

    System.out.println("Result = " + total);
    System.out.println("Time ms= " + ((nanosEnd - nanosStart)/1_000_000));
  }

Again, the performance of the two methods is pretty similar.

Of course, there are use cases for this, but my goal is to show you that parallel in streams is not a silver bullet, you need to study and measure the use cases before making happy choices.

Final Thought

Future and completable future are pretty important in Java to create async tasks, also, the stream parallel has its use cases, however, we should think twice on how to use it.

If you like this post and are interested in hearing more about my journey as a Software Engineer, you can follow me on Twitter and travel together.

Advertisement

2 comments

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s