In the previous post, we talked about thread pools, why they exist, and how executors help us to use them.
However, we just used them to create async tasks that are solved somewhen in the future, but, what if we want to wait for the async task to finish, or join multiple tasks and wait for all of them to finish?
In this post, we are going to see how futures helps us with that, also, streams using the parallel method.
TRY IT YOURSELF: You can find the source code of this post here.
Java Concurrency Series
- Part 1: Concurrency and Parallelism
- Part 2: Thread Pools and Executors
- Part 3: Futures and Parallelism (You are here)
If you like this post and are interested in hearing more about my journey as a Software Engineer, you can follow me on Twitter and travel together.
Submiting Tasks to Create Futures
In the previous post, we use the ExecutorService to execute async tasks, now, we can use the ExecutorService to submit tasks and create Futures to operate over that async tasks. The following is a basic example of a Future:
@Test public void basicFuture() throws InterruptedException, ExecutionException, TimeoutException { ExecutorService executor = Executors .newSingleThreadExecutor(); Future future = executor.submit(() -> { System.out.println(" Thread: " + Thread .currentThread().getName()); return "Finished"; }); //Block until result is ready var result = future.get(); //Block until result is ready or timeout var result = future.get(1, TimeUnit.SECONDS); //Cancel the task future.cancel(true); //Check if was cancelled future.isCancelled(); //Check if is done future.isDone(); }
There we can see how we submit the task to the ExecutorService (line 9) and return a Future. In the Future we can do some operations:
- Block until the result is ready (line 15): This wil give the the result when is ready, and the current thread is going to be block until then.
- Block until the result is ready or timeout (line 17): This wil give the the result when is ready, and the current thread is going to be block until then, or throw a TimeoutException if the result timeouts.
- Cancel the task (line 19): Cancel the task, and if it is running, you can send true to tell the Future to try to interrump it, not garantee
- Check if the task was cancelled (line 21)
- Check if the task is done (line 23)
Using CompletableFuture
CompletableFuture is a subtype for Future, with a lot of cool operations and functional compositions. The following is an example:
@Test public void completableFuture() throws InterruptedException, ExecutionException, TimeoutException { ExecutorService executor = Executors .newSingleThreadExecutor(); Future completableFuture = CompletableFuture.completedFuture("Hello"); CompletableFuture.supplyAsync(() -> { System.out.println(" Thread: " + Thread .currentThread().getName()); return "Finished 1"; }, executor); }
There, we can see on line 8 how to create a CompletableFuture which is already complete, and on line 11, you can see a CompletableFuture created using the supplyAsync method and passing the executor where to run that task. There are tons of methods con CompletableFuture that are interesting, like CompletableFuture.allOf
which join several futures and wait of the result of all of them.
Stream Parallel
The Stream API brings us new features about data processing, including running in parallel some work using the Fork and Join framework, however, we should be careful on using this feature as it won’t work as we expect on all the cases.
The following is the first example I tried to improve the performance of a data streaming, just the pow of a number and finally reducing all of them with a sum operation:
@Test public void noWrongParallel() throws InterruptedException { long nanosStart = System.nanoTime(); OptionalInt total = IntStream.range(1, 1_000_000) .map(i -> i*i) .reduce(Integer::sum); long nanosEnd = System.nanoTime(); System.out.println("Result = " + total.getAsInt()); System.out.println("Time ms= " + ((nanosEnd - nanosStart)/1_000_000)); } @Test public void wrongParallel() throws InterruptedException { long nanosStart = System.nanoTime(); OptionalInt total = IntStream.range(1, 1_000_000) .parallel() .map(i -> i*i) .reduce(Integer::sum); long nanosEnd = System.nanoTime(); System.out.println("Result = " + total.getAsInt()); System.out.println("Time ms= " + ((nanosEnd - nanosStart)/1_000_000)); }
When you run those two methods, the run time is pretty similar, and seems like the parallel doesn’t help too much.
Now, let’s try with the following use case: the pow of a number, filtering the odds and counting them.
@Test public void noParallelGood() throws InterruptedException { long nanosStart = System.nanoTime(); long total = IntStream.range(1, 1_000_000) .map(i -> i * i) .boxed() .filter(integer -> integer % 2 == 0) .count(); long nanosEnd = System.nanoTime(); System.out.println("Result = " + total); System.out.println("Time ms= " + ((nanosEnd - nanosStart)/1_000_000)); } @Test public void parallelGood() throws InterruptedException { long nanosStart = System.nanoTime(); long total = IntStream.range(1, 1_000_000) .parallel() .map(i -> i * i) .boxed() .filter(integer -> integer % 2 == 0) .count(); long nanosEnd = System.nanoTime(); System.out.println("Result = " + total); System.out.println("Time ms= " + ((nanosEnd - nanosStart)/1_000_000)); }
Again, the performance of the two methods is pretty similar.
Of course, there are use cases for this, but my goal is to show you that parallel in streams is not a silver bullet, you need to study and measure the use cases before making happy choices.
Final Thought
Future and completable future are pretty important in Java to create async tasks, also, the stream parallel has its use cases, however, we should think twice on how to use it.
If you like this post and are interested in hearing more about my journey as a Software Engineer, you can follow me on Twitter and travel together.
[…] Part 3: Futures, Parallelism and Project Loom […]
LikeLike
[…] Part 3: Futures, Parallelism and Project Loom […]
LikeLike