Descripción general de java.util.concurrent

1. Información general

El paquete java.util.concurrent proporciona herramientas para crear aplicaciones simultáneas.

En este artículo, haremos una descripción general de todo el paquete.

2. Componentes principales

El java.util.concurrent contiene demasiadas características para discutir en un solo artículo. En este artículo, nos centraremos principalmente en algunas de las utilidades más útiles de este paquete como:

  • Ejecutor
  • ExecutorService
  • ScheduledExecutorService
  • Futuro
  • CountDownLatch
  • CyclicBarrier
  • Semáforo
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • Cerraduras
  • Phaser

También puede encontrar muchos artículos dedicados a clases individuales aquí.

2.1. Ejecutor

Executor es una interfaz que representa un objeto que ejecuta las tareas proporcionadas.

Depende de la implementación particular (desde donde se inicia la invocación) si la tarea debe ejecutarse en un hilo nuevo o actual. Por lo tanto, utilizando esta interfaz, podemos desacoplar el flujo de ejecución de la tarea del mecanismo de ejecución de la tarea real.

Un punto a tener en cuenta aquí es que Executor no requiere estrictamente que la ejecución de la tarea sea asincrónica. En el caso más simple, un ejecutor puede invocar la tarea enviada instantáneamente en el hilo de invocación.

Necesitamos crear un invocador para crear la instancia ejecutora:

public class Invoker implements Executor { @Override public void execute(Runnable r) { r.run(); } }

Ahora, podemos usar este invocador para ejecutar la tarea.

public void execute() { Executor executor = new Invoker(); executor.execute( () -> { // task to be performed }); }

El punto a tener en cuenta aquí es que si el ejecutor no puede aceptar la tarea para su ejecución, lanzará RejectedExecutionException .

2.2. ExecutorService

ExecutorService es una solución completa para el procesamiento asincrónico. Administra una cola en memoria y programa las tareas enviadas según la disponibilidad de subprocesos.

Para usar ExecutorService, necesitamos crear una clase Runnable .

public class Task implements Runnable { @Override public void run() { // task details } }

Ahora podemos crear la instancia ExecutorService y asignar esta tarea. En el momento de la creación, necesitamos especificar el tamaño del grupo de subprocesos.

ExecutorService executor = Executors.newFixedThreadPool(10);

Si queremos crear una instancia de ExecutorService de un solo subproceso , podemos usar newSingleThreadExecutor (ThreadFactory threadFactory) para crear la instancia.

Una vez que se crea el ejecutor, podemos usarlo para enviar la tarea.

public void execute() { executor.submit(new Task()); }

También podemos crear la instancia Runnable mientras enviamos la tarea.

executor.submit(() -> { new Task(); });

También viene con dos métodos de terminación de ejecución listos para usar. El primero es shutdown () ; espera hasta que todas las tareas enviadas terminen de ejecutarse. El otro método es shutdownNow () whic h termina inmediatamente todas las tareas pendientes / ejecutoras.

También hay otro método awaitTermination (tiempo de espera prolongado, unidad TimeUnit) que se bloquea de manera forzada hasta que todas las tareas hayan completado la ejecución después de que se haya activado un evento de apagado o se haya agotado el tiempo de espera de ejecución, o se haya interrumpido el subproceso de ejecución en sí

try { executor.awaitTermination( 20l, TimeUnit.NANOSECONDS ); } catch (InterruptedException e) { e.printStackTrace(); }

2.3. ScheduledExecutorService

ScheduledExecutorService es una interfaz similar a ExecutorService, pero puede realizar tareas periódicamente.

Los métodos de Executor y ExecutorService se programan en el lugar sin introducir ningún retraso artificial. Cero o cualquier valor negativo significa que la solicitud debe ejecutarse instantáneamente.

Podemos utilizar tanto la interfaz ejecutable como la invocable para definir la tarea.

public void execute() { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); Future future = executorService.schedule(() -> { // ... return "Hello world"; }, 1, TimeUnit.SECONDS); ScheduledFuture scheduledFuture = executorService.schedule(() -> { // ... }, 1, TimeUnit.SECONDS); executorService.shutdown(); }

ScheduledExecutorService también puede programar la tarea después de un retraso fijo determinado :

executorService.scheduleAtFixedRate(() -> { // ... }, 1, 10, TimeUnit.SECONDS); executorService.scheduleWithFixedDelay(() -> { // ... }, 1, 10, TimeUnit.SECONDS);

Aquí, el método scheduleAtFixedRate (comando Runnable, long initialDelay, long period, TimeUnit unit) crea y ejecuta una acción periódica que se invoca en primer lugar después de la demora inicial proporcionada y, posteriormente, con el período dado hasta que se apaga la instancia del servicio.

The scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit ) method creates and executes a periodic action that is invoked firstly after the provided initial delay, and repeatedly with the given delay between the termination of the executing one and the invocation of the next one.

2.4. Future

Future is used to represent the result of an asynchronous operation. It comes with methods for checking if the asynchronous operation is completed or not, getting the computed result, etc.

What's more, the cancel(boolean mayInterruptIfRunning) API cancels the operation and releases the executing thread. If the value of mayInterruptIfRunning is true, the thread executing the task will be terminated instantly.

Otherwise, in-progress tasks will be allowed to complete.

We can use below code snippet to create a future instance:

public void invoke() { ExecutorService executorService = Executors.newFixedThreadPool(10); Future future = executorService.submit(() -> { // ... Thread.sleep(10000l); return "Hello world"; }); }

We can use following code snippet to check if the future result is ready and fetch the data if the computation is done:

if (future.isDone() && !future.isCancelled()) { try { str = future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }

We can also specify a timeout for a given operation. If the task takes more than this time, a TimeoutException is thrown:

try { future.get(10, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); }

2.5. CountDownLatch

CountDownLatch (introduced in JDK 5) is a utility class which blocks a set of threads until some operation completes.

A CountDownLatch is initialized with a counter(Integer type); this counter decrements as the dependent threads complete execution. But once the counter reaches zero, other threads get released.

You can learn more about CountDownLatch here.

2.6. CyclicBarrier

CyclicBarrier works almost the same as CountDownLatch except that we can reuse it. Unlike CountDownLatch, it allows multiple threads to wait for each other using await() method(known as barrier condition) before invoking the final task.

We need to create a Runnable task instance to initiate the barrier condition:

public class Task implements Runnable { private CyclicBarrier barrier; public Task(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { try { LOG.info(Thread.currentThread().getName() + " is waiting"); barrier.await(); LOG.info(Thread.currentThread().getName() + " is released"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }

Now we can invoke some threads to race for the barrier condition:

public void start() { CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { // ... LOG.info("All previous tasks are completed"); }); Thread t1 = new Thread(new Task(cyclicBarrier), "T1"); Thread t2 = new Thread(new Task(cyclicBarrier), "T2"); Thread t3 = new Thread(new Task(cyclicBarrier), "T3"); if (!cyclicBarrier.isBroken()) { t1.start(); t2.start(); t3.start(); } }

Here, the isBroken() method checks if any of the threads got interrupted during the execution time. We should always perform this check before performing the actual process.

2.7. Semaphore

The Semaphore is used for blocking thread level access to some part of the physical or logical resource. A semaphore contains a set of permits; whenever a thread tries to enter the critical section, it needs to check the semaphore if a permit is available or not.

If a permit is not available (via tryAcquire()), the thread is not allowed to jump into the critical section; however, if the permit is available the access is granted, and the permit counter decreases.

Once the executing thread releases the critical section, again the permit counter increases (done by release() method).

We can specify a timeout for acquiring access by using the tryAcquire(long timeout, TimeUnit unit) method.

We can also check the number of available permits or the number of threads waiting to acquire the semaphore.

Following code snippet can be used to implement a semaphore:

static Semaphore semaphore = new Semaphore(10); public void execute() throws InterruptedException { LOG.info("Available permit : " + semaphore.availablePermits()); LOG.info("Number of threads waiting to acquire: " + semaphore.getQueueLength()); if (semaphore.tryAcquire()) { try { // ... } finally { semaphore.release(); } } }

We can implement a Mutex like data-structure using Semaphore. More details on this can be found here.

2.8. ThreadFactory

As the name suggests, ThreadFactory acts as a thread (non-existing) pool which creates a new thread on demand. It eliminates the need of a lot of boilerplate coding for implementing efficient thread creation mechanisms.

We can define a ThreadFactory:

public class BaeldungThreadFactory implements ThreadFactory { private int threadId; private String name; public BaeldungThreadFactory(String name) { threadId = 1; this.name = name; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, name + "-Thread_" + threadId); LOG.info("created new thread with id : " + threadId + " and name : " + t.getName()); threadId++; return t; } }

We can use this newThread(Runnable r) method to create a new thread at runtime:

BaeldungThreadFactory factory = new BaeldungThreadFactory( "BaeldungThreadFactory"); for (int i = 0; i < 10; i++) { Thread t = factory.newThread(new Task()); t.start(); }

2.9. BlockingQueue

In asynchronous programming, one of the most common integration patterns is the producer-consumer pattern. The java.util.concurrent package comes with a data-structure know as BlockingQueue – which can be very useful in these async scenarios.

More information and a working example on this is available here.

2.10. DelayQueue

DelayQueue is an infinite-size blocking queue of elements where an element can only be pulled if it's expiration time (known as user defined delay) is completed. Hence, the topmost element (head) will have the most amount delay and it will be polled last.

More information and a working example on this is available here.

2.11. Locks

Not surprisingly, Lock is a utility for blocking other threads from accessing a certain segment of code, apart from the thread that's executing it currently.

The main difference between a Lock and a Synchronized block is that synchronized block is fully contained in a method; however, we can have Lock API’s lock() and unlock() operation in separate methods.

More information and a working example on this is available here.

2.12. Phaser

Phaser es una solución más flexible que CyclicBarrier y CountDownLatch - se utiliza para actuar como una barrera reutilizable en el que el número dinámico de hilos tiene que esperar antes de la ejecución de continuar. Podemos coordinar múltiples fases de ejecución, reutilizando una instancia de Phaser para cada fase del programa.

Más información y un ejemplo práctico sobre esto está disponible aquí.

3. Conclusión

En este artículo de descripción general de alto nivel, nos hemos centrado en las diferentes utilidades disponibles del paquete java.util.concurrent .

Como siempre, el código fuente completo está disponible en GitHub.