1. Introducción
Este tutorial es una guía de la funcionalidad y los casos de uso de la clase CompletableFuture que se introdujo como una mejora de la API de simultaneidad de Java 8.
2. Computación asincrónica en Java
Es difícil razonar sobre la computación asincrónica. Por lo general, queremos pensar en cualquier cálculo como una serie de pasos, pero en el caso del cálculo asincrónico, las acciones representadas como devoluciones de llamada tienden a estar dispersas en el código o profundamente anidadas entre sí . Las cosas empeoran aún más cuando necesitamos manejar errores que pueden ocurrir durante uno de los pasos.
La interfaz Future se agregó en Java 5 para servir como resultado de un cálculo asincrónico, pero no tenía ningún método para combinar estos cálculos o manejar posibles errores.
Java 8 introdujo la clase CompletableFuture . Junto con la interfaz Future , también implementó la interfaz CompletionStage . Esta interfaz define el contrato para un paso de cálculo asincrónico que podemos combinar con otros pasos.
CompletableFuture es al mismo tiempo un bloque de construcción y un marco, con aproximadamente 50 métodos diferentes para componer, combinar y ejecutar pasos de cálculo asincrónico y manejar errores .
Una API tan grande puede ser abrumadora, pero en su mayoría se encuentran en varios casos de uso claros y distintos.
3. Usando CompletableFuture como un futuro simple
En primer lugar, la clase CompletableFuture implementa la interfaz Future , por lo que podemos usarla como implementación Future , pero con una lógica de finalización adicional .
Por ejemplo, podemos crear una instancia de esta clase con un constructor sin argumentos para representar algún resultado futuro, entregarlo a los consumidores y completarlo en algún momento en el futuro usando el método completo . Los consumidores pueden usar el método get para bloquear el hilo actual hasta que se proporcione este resultado.
En el siguiente ejemplo, tenemos un método que crea una instancia CompletableFuture , luego genera algunos cálculos en otro hilo y devuelve el Future inmediatamente.
Cuando se realiza el cálculo, el método completa el Futuro proporcionando el resultado al método completo :
public Future calculateAsync() throws InterruptedException { CompletableFuture completableFuture = new CompletableFuture(); Executors.newCachedThreadPool().submit(() -> { Thread.sleep(500); completableFuture.complete("Hello"); return null; }); return completableFuture; }
Para derivar el cálculo, usamos la API del ejecutor . Este método de crear y completar un CompletableFuture se puede utilizar junto con cualquier mecanismo de simultaneidad o API, incluidos los hilos sin procesar.
Observe que el método calculateAsync devuelve una instancia futura .
Simplemente llamamos al método, recibimos la instancia Future y llamamos al método get cuando estemos listos para bloquear el resultado.
También observe que el método get arroja algunas excepciones verificadas, a saber, ExecutionException (encapsulando una excepción que ocurrió durante un cálculo) e InterruptedException (una excepción que significa que un hilo que ejecuta un método fue interrumpido):
Future completableFuture = calculateAsync(); // ... String result = completableFuture.get(); assertEquals("Hello", result);
Si ya conocemos el resultado de un cálculo , podemos usar el método static completeFuture con un argumento que represente un resultado de este cálculo. En consecuencia, el método get del Future nunca se bloqueará, devolviendo inmediatamente este resultado en su lugar:
Future completableFuture = CompletableFuture.completedFuture("Hello"); // ... String result = completableFuture.get(); assertEquals("Hello", result);
Como escenario alternativo, es posible que deseemos cancelar la ejecución de un futuro .
4. CompletableFuture con lógica de cálculo encapsulada
El código anterior nos permite elegir cualquier mecanismo de ejecución concurrente, pero ¿qué pasa si queremos omitir este texto estándar y simplemente ejecutar algún código de forma asincrónica?
Los métodos estáticos runAsync y supplyAsync nos permiten crear una instancia CompletableFuture a partir de los tipos funcionales Runnable y Supplier correspondientemente.
Tanto Runnable como Supplier son interfaces funcionales que permiten pasar sus instancias como expresiones lambda gracias a la nueva característica de Java 8.
La interfaz Runnable es la misma interfaz antigua que se usa en los subprocesos y no permite devolver un valor.
La interfaz del proveedor es una interfaz funcional genérica con un único método que no tiene argumentos y devuelve un valor de un tipo parametrizado.
Esto nos permite proporcionar una instancia del Proveedor como una expresión lambda que realiza el cálculo y devuelve el resultado . Es tan simple como:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello"); // ... assertEquals("Hello", future.get());
5. Procesamiento de resultados de cálculos asincrónicos
La forma más genérica de procesar el resultado de un cálculo es alimentarlo a una función. El método thenApply hace exactamente eso; acepta una instancia de función , la usa para procesar el resultado y devuelve un futuro que contiene un valor devuelto por una función:
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenApply(s -> s + " World"); assertEquals("Hello World", future.get());
Si no necesitamos devolver un valor en la cadena Future , podemos usar una instancia de la interfaz funcional del consumidor . Su método único toma un parámetro y devuelve vacío .
Hay un método para este caso de uso en CompletableFuture. El método thenAccept recibe un consumidor y le pasa el resultado del cálculo. Luego, la llamada final future.get () devuelve una instancia del tipo Void :
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenAccept(s -> System.out.println("Computation returned: " + s)); future.get();
Por último, si no tenemos el valor de la computación, ni queremos volver algún valor al final de la cadena, entonces podemos pasar a un Ejecutable lambda a la thenRun método. En el siguiente ejemplo, simplemente imprimimos una línea en la consola después de llamar a future.get ():
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenRun(() -> System.out.println("Computation finished.")); future.get();
6. Combinación de futuros
The best part of the CompletableFuture API is the ability to combine CompletableFuture instances in a chain of computation steps.
The result of this chaining is itself a CompletableFuture that allows further chaining and combining. This approach is ubiquitous in functional languages and is often referred to as a monadic design pattern.
In the following example we use the thenCompose method to chain two Futures sequentially.
Notice that this method takes a function that returns a CompletableFuture instance. The argument of this function is the result of the previous computation step. This allows us to use this value inside the next CompletableFuture‘s lambda:
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World")); assertEquals("Hello World", completableFuture.get());
The thenCompose method, together with thenApply, implement basic building blocks of the monadic pattern. They closely relate to the map and flatMap methods of Stream and Optional classes also available in Java 8.
Both methods receive a function and apply it to the computation result, but the thenCompose (flatMap) method receives a function that returns another object of the same type. This functional structure allows composing the instances of these classes as building blocks.
If we want to execute two independent Futures and do something with their results, we can use the thenCombine method that accepts a Future and a Function with two arguments to process both results:
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCombine(CompletableFuture.supplyAsync( () -> " World"), (s1, s2) -> s1 + s2)); assertEquals("Hello World", completableFuture.get());
A simpler case is when we want to do something with two Futures‘ results, but don't need to pass any resulting value down a Future chain. The thenAcceptBoth method is there to help:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> System.out.println(s1 + s2));
7. Difference Between thenApply() and thenCompose()
In our previous sections, we've shown examples regarding thenApply() and thenCompose(). Both APIs help chain different CompletableFuture calls, but the usage of these 2 functions is different.
7.1. thenApply()
We can use this method to work with a result of the previous call. However, a key point to remember is that the return type will be combined of all calls.
So this method is useful when we want to transform the result of a CompletableFuture call:
CompletableFuture finalResult = compute().thenApply(s-> s + 1);
7.2. thenCompose()
The thenCompose() method is similar to thenApply() in that both return a new Completion Stage. However, thenCompose() uses the previous stage as the argument. It will flatten and return a Future with the result directly, rather than a nested future as we observed in thenApply():
CompletableFuture computeAnother(Integer i){ return CompletableFuture.supplyAsync(() -> 10 + i); } CompletableFuture finalResult = compute().thenCompose(this::computeAnother);
So if the idea is to chain CompletableFuture methods then it’s better to use thenCompose().
Also, note that the difference between these two methods is analogous to the difference between map() and flatMap().
8. Running Multiple Futures in Parallel
When we need to execute multiple Futures in parallel, we usually want to wait for all of them to execute and then process their combined results.
The CompletableFuture.allOf static method allows to wait for completion of all of the Futures provided as a var-arg:
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> "Beautiful"); CompletableFuture future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture combinedFuture = CompletableFuture.allOf(future1, future2, future3); // ... combinedFuture.get(); assertTrue(future1.isDone()); assertTrue(future2.isDone()); assertTrue(future3.isDone());
Notice that the return type of the CompletableFuture.allOf() is a CompletableFuture. The limitation of this method is that it does not return the combined results of all Futures. Instead, we have to manually get results from Futures. Fortunately, CompletableFuture.join() method and Java 8 Streams API makes it simple:
String combined = Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" ")); assertEquals("Hello Beautiful World", combined);
The CompletableFuture.join() method is similar to the get method, but it throws an unchecked exception in case the Future does not complete normally. This makes it possible to use it as a method reference in the Stream.map() method.
9. Handling Errors
For error handling in a chain of asynchronous computation steps, we have to adapt the throw/catch idiom in a similar fashion.
Instead of catching an exception in a syntactic block, the CompletableFuture class allows us to handle it in a special handle method. This method receives two parameters: a result of a computation (if it finished successfully), and the exception thrown (if some computation step did not complete normally).
In the following example, we use the handle method to provide a default value when the asynchronous computation of a greeting was finished with an error because no name was provided:
String name = null; // ... CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { if (name == null) { throw new RuntimeException("Computation error!"); } return "Hello, " + name; })}).handle((s, t) -> s != null ? s : "Hello, Stranger!"); assertEquals("Hello, Stranger!", completableFuture.get());
As an alternative scenario, suppose we want to manually complete the Future with a value, as in the first example, but also have the ability to complete it with an exception. The completeExceptionally method is intended for just that. The completableFuture.get() method in the following example throws an ExecutionException with a RuntimeException as its cause:
CompletableFuture completableFuture = new CompletableFuture(); // ... completableFuture.completeExceptionally( new RuntimeException("Calculation failed!")); // ... completableFuture.get(); // ExecutionException
In the example above, we could have handled the exception with the handle method asynchronously, but with the get method we can use the more typical approach of a synchronous exception processing.
10. Async Methods
Most methods of the fluent API in CompletableFuture class have two additional variants with the Async postfix. These methods are usually intended for running a corresponding step of execution in another thread.
The methods without the Async postfix run the next execution stage using a calling thread. In contrast, the Async method without the Executor argument runs a step using the common fork/join pool implementation of Executor that is accessed with the ForkJoinPool.commonPool() method. Finally, the Async method with an Executor argument runs a step using the passed Executor.
Here's a modified example that processes the result of a computation with a Function instance. The only visible difference is the thenApplyAsync method, but under the hood the application of a function is wrapped into a ForkJoinTask instance (for more information on the fork/join framework, see the article “Guide to the Fork/Join Framework in Java”). This allows us to parallelize our computation even more and use system resources more efficiently:
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenApplyAsync(s -> s + " World"); assertEquals("Hello World", future.get());
11. JDK 9 CompletableFuture API
Java 9 enhances the CompletableFuture API with the following changes:
- New factory methods added
- Support for delays and timeouts
- Improved support for subclassing
and new instance APIs:
- Executor defaultExecutor()
- CompletableFuture newIncompleteFuture()
- CompletableFuture copy()
- CompletionStage minimalCompletionStage()
- CompletableFuture completeAsync(Supplier supplier, Executor executor)
- CompletableFuture completeAsync(Supplier supplier)
- CompletableFuture orTimeout(long timeout, TimeUnit unit)
- CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit)
We also now have a few static utility methods:
- Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
- Executor delayedExecutor(long delay, TimeUnit unit)
- CompletionStage completedStage(U value)
- CompletionStage failedStage(Throwable ex)
- CompletableFuture failedFuture(Throwable ex)
Finally, to address timeout, Java 9 has introduced two more new functions:
- orTimeout()
- completeOnTimeout()
Here's the detailed article for further reading: Java 9 CompletableFuture API Improvements.
12. Conclusion
En este artículo, describimos los métodos y casos de uso típicos de la clase CompletableFuture .
El código fuente del artículo está disponible en GitHub.