1. Introducción
En este artículo, brindaremos una guía para la clase CountDownLatch y demostraremos cómo se puede usar en algunos ejemplos prácticos.
Esencialmente, al usar CountDownLatch podemos hacer que un hilo se bloquee hasta que otros hilos hayan completado una tarea determinada.
2. Uso en programación concurrente
En pocas palabras, un CountDownLatch tiene un campo de contador , que puede disminuir según lo necesitemos. Luego podemos usarlo para bloquear un hilo de llamada hasta que se cuente hasta cero.
Si estuviéramos haciendo algún procesamiento paralelo, podríamos crear una instancia de CountDownLatch con el mismo valor para el contador que varios subprocesos en los que queremos trabajar. Luego, podríamos simplemente llamar a countdown () después de que finalice cada subproceso, garantizando que un subproceso dependiente que llame a await () se bloqueará hasta que finalicen los subprocesos de trabajo.
3. Esperando a que se complete un grupo de subprocesos
Probemos este patrón creando un Worker y usando un campo CountDownLatch para señalar cuando se ha completado:
public class Worker implements Runnable { private List outputScraper; private CountDownLatch countDownLatch; public Worker(List outputScraper, CountDownLatch countDownLatch) { this.outputScraper = outputScraper; this.countDownLatch = countDownLatch; } @Override public void run() { doSomeWork(); outputScraper.add("Counted down"); countDownLatch.countDown(); } }
Luego, creemos una prueba para demostrar que podemos obtener un CountDownLatch para esperar a que se completen las instancias de Worker :
@Test public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch countDownLatch = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new Worker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); outputScraper.add("Latch released"); assertThat(outputScraper) .containsExactly( "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Latch released" ); }
Naturalmente, "Latch liberado" siempre será la última salida, ya que depende de la liberación de CountDownLatch .
Tenga en cuenta que si no llamamos a await () , no podríamos garantizar el orden de ejecución de los subprocesos, por lo que la prueba fallaría aleatoriamente.
4. Un grupo de subprocesos esperando comenzar
Si tomamos el ejemplo anterior, pero esta vez comenzaron miles de subprocesos en lugar de cinco, es probable que muchos de los anteriores hayan terminado de procesarse antes de que hayamos llamado start () en los posteriores. Esto podría dificultar la reproducción de un problema de concurrencia, ya que no podríamos hacer que todos nuestros subprocesos se ejecutaran en paralelo.
Para evitar esto, hagamos que CountdownLatch funcione de manera diferente que en el ejemplo anterior. En lugar de bloquear un subproceso principal hasta que algunos subprocesos secundarios hayan terminado, podemos bloquear cada subproceso secundario hasta que todos los demás hayan comenzado.
Modifiquemos nuestro método run () para que se bloquee antes de procesar:
public class WaitingWorker implements Runnable { private List outputScraper; private CountDownLatch readyThreadCounter; private CountDownLatch callingThreadBlocker; private CountDownLatch completedThreadCounter; public WaitingWorker( List outputScraper, CountDownLatch readyThreadCounter, CountDownLatch callingThreadBlocker, CountDownLatch completedThreadCounter) { this.outputScraper = outputScraper; this.readyThreadCounter = readyThreadCounter; this.callingThreadBlocker = callingThreadBlocker; this.completedThreadCounter = completedThreadCounter; } @Override public void run() { readyThreadCounter.countDown(); try { callingThreadBlocker.await(); doSomeWork(); outputScraper.add("Counted down"); } catch (InterruptedException e) { e.printStackTrace(); } finally { completedThreadCounter.countDown(); } } }
Ahora, modifiquemos nuestra prueba para que se bloquee hasta que todos los Trabajadores hayan comenzado, desbloquee a los Trabajadores y luego bloquee hasta que los Trabajadores hayan terminado:
@Test public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch readyThreadCounter = new CountDownLatch(5); CountDownLatch callingThreadBlocker = new CountDownLatch(1); CountDownLatch completedThreadCounter = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new WaitingWorker( outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter))) .limit(5) .collect(toList()); workers.forEach(Thread::start); readyThreadCounter.await(); outputScraper.add("Workers ready"); callingThreadBlocker.countDown(); completedThreadCounter.await(); outputScraper.add("Workers complete"); assertThat(outputScraper) .containsExactly( "Workers ready", "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Workers complete" ); }
Este patrón es realmente útil para tratar de reproducir errores de concurrencia, ya que se puede usar para forzar a miles de subprocesos a intentar realizar alguna lógica en paralelo.
5. Terminación de un CountdownLatch temprana
A veces, podemos encontrarnos con una situación en la que los Trabajadores terminan por error antes de contar el CountDownLatch. Esto podría resultar en que nunca llegue a cero y espere () nunca termine:
@Override public void run() { if (true) { throw new RuntimeException("Oh dear, I'm a BrokenWorker"); } countDownLatch.countDown(); outputScraper.add("Counted down"); }
Modifiquemos nuestra prueba anterior para usar BrokenWorker, a fin de mostrar cómo await () se bloqueará para siempre:
@Test public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch countDownLatch = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); }
Claramente, este no es el comportamiento que queremos; sería mucho mejor que la aplicación continuara que bloquear infinitamente.
Para solucionar esto, agreguemos un argumento de tiempo de espera a nuestra llamada a await ().
boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS); assertThat(completed).isFalse();
Como podemos ver, la prueba eventualmente expirará y await () devolverá falso .
6. Conclusión
En esta guía rápida, hemos demostrado cómo podemos usar CountDownLatch para bloquear un hilo hasta que otros hilos hayan terminado algún procesamiento.
También hemos mostrado cómo se puede usar para ayudar a depurar problemas de concurrencia asegurándonos de que los subprocesos se ejecuten en paralelo.
La implementación de estos ejemplos se puede encontrar en GitHub; este es un proyecto basado en Maven, por lo que debería ser fácil de ejecutar tal como está.