CyclicBarrier en Java

1. Introducción

CyclicBarriers son construcciones de sincronización que se introdujeron con Java 5 como parte del paquete java.util.concurrent .

En este artículo, exploraremos esta implementación en un escenario de simultaneidad.

2. Concurrencia de Java: sincronizadores

El paquete java.util.concurrent contiene varias clases que ayudan a administrar un conjunto de subprocesos que colaboran entre sí. Algunos de estos incluyen:

  • CyclicBarrier
  • Phaser
  • CountDownLatch
  • Intercambiador
  • Semáforo
  • SynchronousQueue

Estas clases ofrecen funcionalidades listas para usar para patrones de interacción comunes entre subprocesos.

Si tenemos un conjunto de subprocesos que se comunican entre sí y se asemejan a uno de los patrones comunes, podemos simplemente reutilizar las clases de biblioteca apropiadas (también llamadas Sincronizadores ) en lugar de intentar crear un esquema personalizado usando un conjunto de bloqueos y condiciones. objetos y la palabra clave sincronizada .

Centrémonos en CyclicBarrier en el futuro.

3. CyclicBarrier

Un CyclicBarrier es un sincronizador que permite que un conjunto de hilos esperen unos a otros para alcanzar un punto de ejecución común, también llamado barrera .

CyclicBarriers se utilizan en programas en los que tenemos un número fijo de subprocesos que deben esperar a que los demás alcancen un punto común antes de continuar con la ejecución.

La barrera se llama cíclica porque se puede reutilizar después de que se liberan los hilos en espera.

4. Uso

El constructor de CyclicBarrier es simple. Se necesita un solo entero que denota la cantidad de subprocesos que necesitan llamar al método await () en la instancia de barrera para significar que se alcanzó el punto de ejecución común:

public CyclicBarrier(int parties)

Los hilos que necesitan sincronizar su ejecución también se denominan partes y llamar al método await () es la forma en que podemos registrar que un hilo determinado ha alcanzado el punto de barrera.

Esta llamada es síncrona y el hilo que llama a este método suspende la ejecución hasta que un número específico de hilos haya llamado al mismo método en la barrera. Esta situación en la que el número requerido de subprocesos ha llamado await () , se llama disparar la barrera .

Opcionalmente, podemos pasar el segundo argumento al constructor, que es una instancia Runnable . Esto tiene una lógica que sería ejecutada por el último hilo que tropiece con la barrera:

public CyclicBarrier(int parties, Runnable barrierAction)

5. Implementación

Para ver CyclicBarrier en acción, consideremos el siguiente escenario:

Existe una operación que realiza un número fijo de subprocesos y almacena los resultados correspondientes en una lista. Cuando todos los subprocesos terminan de realizar su acción, uno de ellos (generalmente el último que rompe la barrera) comienza a procesar los datos que fueron recuperados por cada uno de ellos.

Implementemos la clase principal donde ocurre toda la acción:

public class CyclicBarrierDemo { private CyclicBarrier cyclicBarrier; private List
    
      partialResults = Collections.synchronizedList(new ArrayList()); private Random random = new Random(); private int NUM_PARTIAL_RESULTS; private int NUM_WORKERS; // ... }
    

Esta clase es bastante sencilla: NUM_WORKERS es el número de subprocesos que se ejecutarán y NUM_PARTIAL_RESULTS es el número de resultados que producirá cada uno de los subprocesos de trabajo.

Finalmente, tenemos los resultados parciales que son una lista que almacenará los resultados de cada uno de estos subprocesos de trabajo. Tenga en cuenta que esta lista es una SynchronizedList porque varios subprocesos escribirán en ella al mismo tiempo, y el método add () no es seguro para subprocesos en una ArrayList simple .

Ahora implementemos la lógica de cada uno de los hilos de trabajo:

public class CyclicBarrierDemo { // ... class NumberCruncherThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); List partialResult = new ArrayList(); // Crunch some numbers and store the partial result for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) { Integer num = random.nextInt(10); System.out.println(thisThreadName + ": Crunching some numbers! Final result - " + num); partialResult.add(num); } partialResults.add(partialResult); try { System.out.println(thisThreadName + " waiting for others to reach barrier."); cyclicBarrier.await(); } catch (InterruptedException e) { // ... } catch (BrokenBarrierException e) { // ... } } } }

Ahora implementaremos la lógica que se ejecuta cuando la barrera se ha disparado.

Para simplificar las cosas, agreguemos todos los números en la lista de resultados parciales:

public class CyclicBarrierDemo { // ... class AggregatorThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); System.out.println( thisThreadName + ": Computing sum of " + NUM_WORKERS + " workers, having " + NUM_PARTIAL_RESULTS + " results each."); int sum = 0; for (List threadResult : partialResults) { System.out.print("Adding "); for (Integer partialResult : threadResult) { System.out.print(partialResult+" "); sum += partialResult; } System.out.println(); } System.out.println(thisThreadName + ": Final result = " + sum); } } }

El paso final sería construir CyclicBarrier y comenzar con un método main () :

public class CyclicBarrierDemo { // Previous code public void runSimulation(int numWorkers, int numberOfPartialResults) { NUM_PARTIAL_RESULTS = numberOfPartialResults; NUM_WORKERS = numWorkers; cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread()); System.out.println("Spawning " + NUM_WORKERS + " worker threads to compute " + NUM_PARTIAL_RESULTS + " partial results each"); for (int i = 0; i < NUM_WORKERS; i++) { Thread worker = new Thread(new NumberCruncherThread()); worker.setName("Thread " + i); worker.start(); } } public static void main(String[] args) { CyclicBarrierDemo demo = new CyclicBarrierDemo(); demo.runSimulation(5, 3); } } 

En el código anterior, inicializamos la barrera cíclica con 5 subprocesos que producen cada uno 3 enteros como parte de su cálculo y almacenan el mismo en la lista resultante.

Una vez que se dispara la barrera, el último hilo que tropezó con la barrera ejecuta la lógica especificada en AggregatorThread, es decir, sumar todos los números producidos por los hilos.

6. Resultados

Aquí está el resultado de una ejecución del programa anterior: cada ejecución puede crear resultados diferentes ya que los subprocesos se pueden generar en un orden diferente:

Spawning 5 worker threads to compute 3 partial results each Thread 0: Crunching some numbers! Final result - 6 Thread 0: Crunching some numbers! Final result - 2 Thread 0: Crunching some numbers! Final result - 2 Thread 0 waiting for others to reach barrier. Thread 1: Crunching some numbers! Final result - 2 Thread 1: Crunching some numbers! Final result - 0 Thread 1: Crunching some numbers! Final result - 5 Thread 1 waiting for others to reach barrier. Thread 3: Crunching some numbers! Final result - 6 Thread 3: Crunching some numbers! Final result - 4 Thread 3: Crunching some numbers! Final result - 0 Thread 3 waiting for others to reach barrier. Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 0 Thread 2 waiting for others to reach barrier. Thread 4: Crunching some numbers! Final result - 9 Thread 4: Crunching some numbers! Final result - 3 Thread 4: Crunching some numbers! Final result - 5 Thread 4 waiting for others to reach barrier. Thread 4: Computing final sum of 5 workers, having 3 results each. Adding 6 2 2 Adding 2 0 5 Adding 6 4 0 Adding 1 1 0 Adding 9 3 5 Thread 4: Final result = 46 

Como muestra la salida anterior, Thread 4 es el que dispara la barrera y también ejecuta la lógica de agregación final. Tampoco es necesario que los subprocesos se ejecuten realmente en el orden en que se inician, como muestra el ejemplo anterior.

7. Conclusión

En este artículo, vimos qué es una CyclicBarrier y en qué tipo de situaciones es útil.

También implementamos un escenario en el que necesitábamos un número fijo de subprocesos para alcanzar un punto de ejecución fijo, antes de continuar con otra lógica del programa.

Como siempre, el código del tutorial se puede encontrar en GitHub.