1. Introducción
Este artículo es un vistazo a los grupos de subprocesos en Java, comenzando con las diferentes implementaciones en la biblioteca estándar de Java y luego mirando la biblioteca Guava de Google.
2. El grupo de hilos
En Java, los subprocesos se asignan a subprocesos a nivel del sistema que son los recursos del sistema operativo. Si crea subprocesos de forma incontrolable, puede quedarse sin estos recursos rápidamente.
El cambio de contexto entre subprocesos también lo realiza el sistema operativo, para emular el paralelismo. Una visión simplista es que, cuantos más subprocesos genere, menos tiempo dedica cada subproceso a hacer el trabajo real.
El patrón Thread Pool ayuda a ahorrar recursos en una aplicación multiproceso y también a contener el paralelismo en ciertos límites predefinidos.
Cuando usa un grupo de subprocesos, escribe su código concurrente en forma de tareas paralelas y las envía para su ejecución a una instancia de un grupo de subprocesos . Esta instancia controla varios subprocesos reutilizados para ejecutar estas tareas.
El patrón le permite controlar la cantidad de subprocesos que está creando la aplicación , su ciclo de vida, así como programar la ejecución de tareas y mantener las tareas entrantes en una cola.
3. Grupos de subprocesos en Java
3.1. Ejecutores , Ejecutor y EjecutorServicio
La clase auxiliar Ejecutores contiene varios métodos para la creación de instancias de grupos de subprocesos preconfiguradas para usted. Esas clases son un buen lugar para comenzar; utilícelo si no necesita aplicar ningún ajuste fino personalizado.
Las interfaces Executor y ExecutorService se utilizan para trabajar con diferentes implementaciones de grupos de subprocesos en Java. Por lo general, debe mantener su código desacoplado de la implementación real del grupo de subprocesos y usar estas interfaces en toda su aplicación.
La interfaz del ejecutor tiene un único método de ejecución para enviar instancias ejecutables para su ejecución.
Aquí hay un ejemplo rápido de cómo puede usar la API de Ejecutores para adquirir una instancia de Ejecutor respaldada por un grupo de subprocesos único y una cola ilimitada para ejecutar tareas de forma secuencial. Aquí, ejecutamos una sola tarea que simplemente imprime " Hello World " en la pantalla. La tarea se envía como lambda (una característica de Java 8) que se infiere que es ejecutable .
Executor executor = Executors.newSingleThreadExecutor(); executor.execute(() -> System.out.println("Hello World"));
La interfaz ExecutorService contiene una gran cantidad de métodos para controlar el progreso de las tareas y gestionar la terminación del servicio . Con esta interfaz, puede enviar las tareas para su ejecución y también controlar su ejecución mediante la instancia Future devuelta .
En el siguiente ejemplo , creamos un ExecutorService , presentará una tarea y luego usar el vuelto Futuro 's get método que esperar hasta que la tarea presentada está terminado y se devuelve el valor:
ExecutorService executorService = Executors.newFixedThreadPool(10); Future future = executorService.submit(() -> "Hello World"); // some operations String result = future.get();
Por supuesto, en un escenario de la vida real, por lo general, no desea llamar a future.get () de inmediato, sino aplazar la llamada hasta que realmente necesite el valor del cálculo.
El método de envío está sobrecargado para tomar Runnable o Callable, que son interfaces funcionales y se pueden pasar como lambdas (comenzando con Java 8).
El método único de Runnable no genera una excepción y no devuelve un valor. La interfaz invocable puede ser más conveniente, ya que nos permite lanzar una excepción y devolver un valor.
Finalmente, para permitir que el compilador infiera el tipo invocable , simplemente devuelva un valor de lambda.
Para obtener más ejemplos del uso de la interfaz ExecutorService y futuros, consulte "Una guía para Java ExecutorService".
3.2. ThreadPoolExecutor
El ThreadPoolExecutor es una implementación de grupo de subprocesos extensible con una gran cantidad de parámetros y ganchos para ajuste fino.
Los principales parámetros de configuración que discutiremos aquí son: corePoolSize , maximumPoolSize y keepAliveTime .
El grupo consta de una cantidad fija de subprocesos centrales que se mantienen dentro todo el tiempo y algunos subprocesos excesivos que pueden generarse y luego terminarse cuando ya no se necesitan. El parámetro corePoolSize es el número de subprocesos centrales que se instanciarán y mantendrán en el grupo. Cuando ingresa una nueva tarea, si todos los subprocesos centrales están ocupados y la cola interna está llena, entonces se permite que el grupo crezca hasta el tamaño máximo de la piscina .
El parámetro keepAliveTime es el intervalo de tiempo durante el cual los subprocesos excesivos (instanciados en exceso de corePoolSize ) pueden existir en el estado inactivo. De forma predeterminada, ThreadPoolExecutor solo considera los subprocesos no centrales para su eliminación. Para aplicar la misma política de eliminación a los subprocesos centrales, podemos usar el método allowCoreThreadTimeOut (true) .
Estos parámetros cubren una amplia gama de casos de uso, pero las configuraciones más típicas están predefinidas en los métodos estáticos de Executors .
Por ejemplo , el método newFixedThreadPool crea un ThreadPoolExecutor con valores iguales de los parámetros corePoolSize y maximumPoolSize y un keepAliveTime de cero . Esto significa que el número de subprocesos en este grupo de subprocesos es siempre el mismo:
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); assertEquals(2, executor.getPoolSize()); assertEquals(1, executor.getQueue().size());
En el ejemplo anterior, creamos una instancia de ThreadPoolExecutor con un número de subprocesos fijo de 2. Esto significa que si el número de tareas que se ejecutan simultáneamente es menor o igual a dos en todo momento, entonces se ejecutan de inmediato. De lo contrario, algunas de estas tareas pueden colocarse en una cola para esperar su turno .
Creamos tres tareas invocables que imitan el trabajo pesado durmiendo durante 1000 milisegundos. Las dos primeras tareas se ejecutarán a la vez y la tercera tendrá que esperar en la cola. Podemos verificarlo llamando a los métodos getPoolSize () y getQueue (). Size () inmediatamente después de enviar las tareas.
Se puede crear otro ThreadPoolExecutor preconfigurado con el método Executors.newCachedThreadPool () . Este método no recibe varios hilos en absoluto. El corePoolSize es en realidad establece en 0, y la maximumPoolSize se establece en Integer.MAX_VALUE para esta instancia. El keepAliveTime es de 60 segundos para este.
Estos valores de parámetros significan que el grupo de subprocesos en caché puede crecer sin límites para adaptarse a cualquier número de tareas enviadas . Pero cuando los hilos ya no sean necesarios, se eliminarán después de 60 segundos de inactividad. Un caso de uso típico es cuando tiene muchas tareas de corta duración en su aplicación.
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); assertEquals(3, executor.getPoolSize()); assertEquals(0, executor.getQueue().size());
El tamaño de la cola en el ejemplo anterior siempre será cero porque internamente se usa una instancia de SynchronousQueue . En SynchronousQueue , los pares de operaciones de inserción y eliminación siempre ocurren simultáneamente, por lo que la cola nunca contiene nada.
The Executors.newSingleThreadExecutor() API creates another typical form of ThreadPoolExecutor containing a single thread. The single thread executor is ideal for creating an event loop. The corePoolSize and maximumPoolSize parameters are equal to 1, and the keepAliveTime is zero.
Tasks in the above example will be executed sequentially, so the flag value will be 2 after the task's completion:
AtomicInteger counter = new AtomicInteger(); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> { counter.set(1); }); executor.submit(() -> { counter.compareAndSet(1, 2); });
Additionally, this ThreadPoolExecutor is decorated with an immutable wrapper, so it cannot be reconfigured after creation. Note that also this is the reason we cannot cast it to a ThreadPoolExecutor.
3.3. ScheduledThreadPoolExecutor
The ScheduledThreadPoolExecutor extends the ThreadPoolExecutor class and also implements the ScheduledExecutorService interface with several additional methods:
- schedule method allows to execute a task once after a specified delay;
- scheduleAtFixedRate method allows to execute a task after a specified initial delay and then execute it repeatedly with a certain period; the period argument is the time measured between the starting times of the tasks, so the execution rate is fixed;
- scheduleWithFixedDelay method is similar to scheduleAtFixedRate in that it repeatedly executes the given task, but the specified delay is measured between the end of the previous task and the start of the next; the execution rate may vary depending on the time it takes to execute any given task.
The Executors.newScheduledThreadPool() method is typically used to create a ScheduledThreadPoolExecutor with a given corePoolSize, unbounded maximumPoolSize and zero keepAliveTime. Here's how to schedule a task for execution in 500 milliseconds:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); executor.schedule(() -> { System.out.println("Hello World"); }, 500, TimeUnit.MILLISECONDS);
The following code shows how to execute a task after 500 milliseconds delay and then repeat it every 100 milliseconds. After scheduling the task, we wait until it fires three times using the CountDownLatch lock, then cancel it using the Future.cancel() method.
CountDownLatch lock = new CountDownLatch(3); ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); ScheduledFuture future = executor.scheduleAtFixedRate(() -> { System.out.println("Hello World"); lock.countDown(); }, 500, 100, TimeUnit.MILLISECONDS); lock.await(1000, TimeUnit.MILLISECONDS); future.cancel(true);
3.4. ForkJoinPool
ForkJoinPool is the central part of the fork/join framework introduced in Java 7. It solves a common problem of spawning multiple tasks in recursive algorithms. Using a simple ThreadPoolExecutor, you will run out of threads quickly, as every task or subtask requires its own thread to run.
In a fork/join framework, any task can spawn (fork) a number of subtasks and wait for their completion using the join method. The benefit of the fork/join framework is that it does not create a new thread for each task or subtask, implementing the Work Stealing algorithm instead. This framework is thoroughly described in the article “Guide to the Fork/Join Framework in Java”
Let’s look at a simple example of using ForkJoinPool to traverse a tree of nodes and calculate the sum of all leaf values. Here’s a simple implementation of a tree consisting of a node, an int value and a set of child nodes:
static class TreeNode { int value; Set children; TreeNode(int value, TreeNode... children) { this.value = value; this.children = Sets.newHashSet(children); } }
Now if we want to sum all values in a tree in parallel, we need to implement a RecursiveTask interface. Each task receives its own node and adds its value to the sum of values of its children. To calculate the sum of children values, task implementation does the following:
- streams the children set,
- maps over this stream, creating a new CountingTask for each element,
- executes each subtask by forking it,
- collects the results by calling the join method on each forked task,
- sums the results using the Collectors.summingInt collector.
public static class CountingTask extends RecursiveTask { private final TreeNode node; public CountingTask(TreeNode node) { this.node = node; } @Override protected Integer compute() { return node.value + node.children.stream() .map(childNode -> new CountingTask(childNode).fork()) .collect(Collectors.summingInt(ForkJoinTask::join)); } }
The code to run the calculation on an actual tree is very simple:
TreeNode tree = new TreeNode(5, new TreeNode(3), new TreeNode(2, new TreeNode(2), new TreeNode(8))); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); int sum = forkJoinPool.invoke(new CountingTask(tree));
4. Thread Pool's Implementation in Guava
Guava is a popular Google library of utilities. It has many useful concurrency classes, including several handy implementations of ExecutorService. The implementing classes are not accessible for direct instantiation or subclassing, so the only entry point for creating their instances is the MoreExecutors helper class.
4.1. Adding Guava as a Maven Dependency
Add the following dependency to your Maven pom file to include the Guava library to your project. You can find the latest version of Guava library in the Maven Central repository:
com.google.guava guava 19.0
4.2. Direct Executor and Direct Executor Service
Sometimes you want to execute the task either in the current thread or in a thread pool, depending on some conditions. You would prefer to use a single Executor interface and just switch the implementation. Although it is not so hard to come up with an implementation of Executor or ExecutorService that executes the tasks in the current thread, it still requires writing some boilerplate code.
Gladly, Guava provides predefined instances for us.
Here's an example that demonstrates the execution of a task in the same thread. Although the provided task sleeps for 500 milliseconds, it blocks the current thread, and the result is available immediately after the execute call is finished:
Executor executor = MoreExecutors.directExecutor(); AtomicBoolean executed = new AtomicBoolean(); executor.execute(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } executed.set(true); }); assertTrue(executed.get());
The instance returned by the directExecutor() method is actually a static singleton, so using this method does not provide any overhead on object creation at all.
You should prefer this method to the MoreExecutors.newDirectExecutorService() because that API creates a full-fledged executor service implementation on every call.
4.3. Exiting Executor Services
Another common problem is shutting down the virtual machine while a thread pool is still running its tasks. Even with a cancellation mechanism in place, there is no guarantee that the tasks will behave nicely and stop their work when the executor service shuts down. This may cause JVM to hang indefinitely while the tasks keep doing their work.
To solve this problem, Guava introduces a family of exiting executor services. They are based on daemon threads that terminate together with the JVM.
These services also add a shutdown hook with the Runtime.getRuntime().addShutdownHook() method and prevent the VM from terminating for a configured amount of time before giving up on hung tasks.
In the following example, we're submitting the task that contains an infinite loop, but we use an exiting executor service with a configured time of 100 milliseconds to wait for the tasks upon VM termination. Without the exitingExecutorService in place, this task would cause the VM to hang indefinitely:
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); ExecutorService executorService = MoreExecutors.getExitingExecutorService(executor, 100, TimeUnit.MILLISECONDS); executorService.submit(() -> { while (true) { } });
4.4. Listening Decorators
Listening decorators allow you to wrap the ExecutorService and receive ListenableFuture instances upon task submission instead of simple Future instances. The ListenableFuture interface extends Future and has a single additional method addListener. This method allows adding a listener that is called upon future completion.
Rara vez querrá usar el método ListenableFuture.addListener () directamente, pero es esencial para la mayoría de los métodos auxiliares en la clase de utilidad Futures . Por ejemplo, con el método Futures.allAsList () puede combinar varias instancias de ListenableFuture en un solo ListenableFuture que se completa con la finalización exitosa de todos los futuros combinados:
ExecutorService executorService = Executors.newCachedThreadPool(); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService); ListenableFuture future1 = listeningExecutorService.submit(() -> "Hello"); ListenableFuture future2 = listeningExecutorService.submit(() -> "World"); String greeting = Futures.allAsList(future1, future2).get() .stream() .collect(Collectors.joining(" ")); assertEquals("Hello World", greeting);
5. Conclusión
En este artículo, hemos discutido el patrón Thread Pool y sus implementaciones en la biblioteca estándar de Java y en la biblioteca Guava de Google.
El código fuente del artículo está disponible en GitHub.