Guía de java.util.concurrent.BlockingQueue

1. Información general

En este artículo, veremos una de las construcciones java.util.concurrent más útiles para resolver el problema productor-consumidor concurrente. Veremos una API de la interfaz BlockingQueue y cómo los métodos de esa interfaz facilitan la escritura de programas concurrentes.

Más adelante en el artículo, mostraremos un ejemplo de un programa simple que tiene múltiples hilos productores y múltiples hilos consumidores.

2. Bloqueo de tipos de cola

Podemos distinguir dos tipos de BlockingQueue :

  • cola ilimitada: puede crecer casi indefinidamente
  • cola limitada - con capacidad máxima definida

2.1. Cola ilimitada

Crear colas ilimitadas es simple:

BlockingQueue blockingQueue = new LinkedBlockingDeque();

La capacidad de blockQueue se establecerá en Integer.MAX_VALUE. Todas las operaciones que agregan un elemento a la cola ilimitada nunca se bloquearán, por lo que podría crecer a un tamaño muy grande.

Lo más importante al diseñar un programa productor-consumidor utilizando BlockingQueue ilimitado es que los consumidores deberían poder consumir mensajes tan rápido como los productores agregan mensajes a la cola. De lo contrario, la memoria podría llenarse y obtendríamos una excepción OutOfMemory .

2.2. Cola limitada

El segundo tipo de colas es la cola limitada. Podemos crear tales colas pasando la capacidad como argumento a un constructor:

BlockingQueue blockingQueue = new LinkedBlockingDeque(10);

Aquí tenemos un blockQueue que tiene una capacidad igual a 10. Significa que cuando un productor intenta agregar un elemento a una cola ya llena, dependiendo del método que se usó para agregarlo ( oferta () , agregar () o poner () ), se bloqueará hasta que haya espacio disponible para insertar el objeto. De lo contrario, las operaciones fallarán.

El uso de una cola limitada es una buena forma de diseñar programas simultáneos porque cuando insertamos un elemento en una cola ya llena, las operaciones deben esperar hasta que los consumidores se pongan al día y hagan algo de espacio disponible en la cola. Nos da estrangulamiento sin ningún esfuerzo de nuestra parte.

3. API de BlockingQueue

Hay dos tipos de métodos en la interfaz BlockingQueue : métodos responsables de agregar elementos a una cola y métodos que recuperan esos elementos. Cada método de esos dos grupos se comporta de manera diferente en caso de que la cola esté llena / vacía.

3.1. Agregar elementos

  • add (): devuelve verdadero si la inserción fue exitosa; de lo contrario, arroja una IllegalStateException
  • put (): inserta el elemento especificado en una cola, esperando un espacio libre si es necesario
  • oferta (): devuelve verdadero si la inserción fue exitosa, de lo contrario falso
  • oferta (E e, tiempo de espera prolongado, unidad de unidad de tiempo): intenta insertar un elemento en una cola y espera un espacio disponible dentro de un tiempo de espera especificado

3.2. Recuperando elementos

  • take () : espera un elemento principal de una cola y lo elimina. Si la cola está vacía, se bloquea y espera a que un elemento esté disponible
  • poll (tiempo de espera prolongado, unidad TimeUnit): recupera y elimina el encabezado de la cola, esperando hasta el tiempo de espera especificado si es necesario para que un elemento esté disponible. Devuelve nulo después de un tiempo de espera

Estos métodos son los componentes básicos más importantes de la interfaz BlockingQueue al crear programas de productor-consumidor.

4. Ejemplo productor-consumidor multiproceso

Creemos un programa que consta de dos partes: un productor y un consumidor.

El productor producirá un número aleatorio de 0 a 100 y pondrá ese número en una BlockingQueue . Tendremos 4 hilos de productor y usaremos el método put () para bloquear hasta que haya espacio disponible en la cola.

Lo importante a recordar es que debemos evitar que nuestros hilos de consumo esperen a que un elemento aparezca en una cola de forma indefinida.

Una buena técnica para señalar del productor al consumidor que no hay más mensajes que procesar es enviar un mensaje especial llamado píldora venenosa. Necesitamos enviar tantas píldoras venenosas como consumidores tengamos. Luego, cuando un consumidor tome ese mensaje especial de píldora venenosa de una cola, finalizará la ejecución con gracia.

Veamos una clase de productor:

public class NumbersProducer implements Runnable { private BlockingQueue numbersQueue; private final int poisonPill; private final int poisonPillPerProducer; public NumbersProducer(BlockingQueue numbersQueue, int poisonPill, int poisonPillPerProducer) { this.numbersQueue = numbersQueue; this.poisonPill = poisonPill; this.poisonPillPerProducer = poisonPillPerProducer; } public void run() { try { generateNumbers(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void generateNumbers() throws InterruptedException { for (int i = 0; i < 100; i++) { numbersQueue.put(ThreadLocalRandom.current().nextInt(100)); } for (int j = 0; j < poisonPillPerProducer; j++) { numbersQueue.put(poisonPill); } } }

Nuestro constructor de productores toma como argumento el BlockingQueue que se usa para coordinar el procesamiento entre el productor y el consumidor. Vemos que el método generateNumbers () pondrá 100 elementos en una cola. También se necesita un mensaje de píldora venenosa, para saber qué tipo de mensaje se debe poner en una cola cuando finalice la ejecución. Ese mensaje debe colocarse en una cola con los tiempos de poisonPillPerProducer .

Cada consumidor tomará un elemento de BlockingQueue usando el método take () para que se bloquee hasta que haya un elemento en una cola. Después de tomar un entero de una cola, comprueba si el mensaje es una píldora venenosa, si es así, la ejecución de un hilo ha finalizado. De lo contrario, imprimirá el resultado en la salida estándar junto con el nombre del hilo actual.

Esto nos dará una idea del funcionamiento interno de nuestros consumidores:

public class NumbersConsumer implements Runnable { private BlockingQueue queue; private final int poisonPill; public NumbersConsumer(BlockingQueue queue, int poisonPill) { this.queue = queue; this.poisonPill = poisonPill; } public void run() { try { while (true) { Integer number = queue.take(); if (number.equals(poisonPill)) { return; } System.out.println(Thread.currentThread().getName() + " result: " + number); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }

Lo importante a tener en cuenta es el uso de una cola. Al igual que en el constructor del productor, se pasa una cola como argumento. Podemos hacerlo porque BlockingQueue se puede compartir entre subprocesos sin ninguna sincronización explícita.

Ahora que tenemos nuestro productor y consumidor, podemos comenzar nuestro programa. Necesitamos definir la capacidad de la cola y la configuramos en 100 elementos.

Queremos tener 4 hilos productores y un número de hilos consumidores será igual al número de procesadores disponibles:

int BOUND = 10; int N_PRODUCERS = 4; int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); int poisonPill = Integer.MAX_VALUE; int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; int mod = N_CONSUMERS % N_PRODUCERS; BlockingQueue queue = new LinkedBlockingQueue(BOUND); for (int i = 1; i < N_PRODUCERS; i++) { new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start(); } for (int j = 0; j < N_CONSUMERS; j++) { new Thread(new NumbersConsumer(queue, poisonPill)).start(); } new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start(); 

BlockingQueue se crea utilizando una construcción con capacidad. Estamos creando 4 productores y N consumidores. Especificamos nuestro mensaje de píldora venenosa para que sea un Integer.MAX_VALUE porque nuestro productor nunca enviará dicho valor en condiciones normales de trabajo. Lo más importante a notar aquí es que BlockingQueue se usa para coordinar el trabajo entre ellos.

Cuando ejecutamos el programa, 4 subprocesos de productores colocarán enteros aleatorios en una BlockingQueue y los consumidores tomarán esos elementos de la cola. Cada hilo imprimirá en la salida estándar el nombre del hilo junto con un resultado.

5. Conclusión

Este artículo muestra un uso práctico de BlockingQueue y explica los métodos que se utilizan para agregar y recuperar elementos de él. Además, hemos mostrado cómo crear un programa productor-consumidor multiproceso utilizando BlockingQueue para coordinar el trabajo entre productores y consumidores.

La implementación de todos estos ejemplos y fragmentos de código se puede encontrar en el proyecto GitHub; este es un proyecto basado en Maven, por lo que debería ser fácil de importar y ejecutar tal como está.