Semáforos en Java

1. Información general

En este tutorial rápido, exploraremos los conceptos básicos de semáforos y mutex en Java.

2. Semáforo

Comenzaremos con java.util.concurrent.Semaphore. Podemos usar semáforos para limitar la cantidad de subprocesos concurrentes que acceden a un recurso específico.

En el siguiente ejemplo, implementaremos una cola de inicio de sesión simple para limitar la cantidad de usuarios en el sistema:

class LoginQueueUsingSemaphore { private Semaphore semaphore; public LoginQueueUsingSemaphore(int slotLimit) { semaphore = new Semaphore(slotLimit); } boolean tryLogin() { return semaphore.tryAcquire(); } void logout() { semaphore.release(); } int availableSlots() { return semaphore.availablePermits(); } }

Observe cómo usamos los siguientes métodos:

  • tryAcquire () : devuelve verdadero si un permiso está disponible inmediatamente y lo adquiere; de ​​lo contrario, devuelve falso, pero adquiere () adquiere un permiso y bloquea hasta que haya uno disponible
  • release () - liberar un permiso
  • availablePermits () - devuelve el número de permisos actuales disponibles

Para probar nuestra cola de inicio de sesión, primero intentaremos alcanzar el límite y verificaremos si el próximo intento de inicio de sesión será bloqueado:

@Test public void givenLoginQueue_whenReachLimit_thenBlocked() { int slots = 10; ExecutorService executorService = Executors.newFixedThreadPool(slots); LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(loginQueue::tryLogin)); executorService.shutdown(); assertEquals(0, loginQueue.availableSlots()); assertFalse(loginQueue.tryLogin()); }

A continuación, veremos si hay espacios disponibles después de cerrar la sesión:

@Test public void givenLoginQueue_whenLogout_thenSlotsAvailable() { int slots = 10; ExecutorService executorService = Executors.newFixedThreadPool(slots); LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(loginQueue::tryLogin)); executorService.shutdown(); assertEquals(0, loginQueue.availableSlots()); loginQueue.logout(); assertTrue(loginQueue.availableSlots() > 0); assertTrue(loginQueue.tryLogin()); }

3. Semáforo temporizado

A continuación, analizaremos Apache Commons TimedSemaphore. TimedSemaphore permite una serie de permisos como un semáforo simple, pero en un período de tiempo determinado, después de este período, el tiempo se reinicia y todos los permisos se liberan.

Podemos usar TimedSemaphore para construir una cola de retardo simple de la siguiente manera:

class DelayQueueUsingTimedSemaphore { private TimedSemaphore semaphore; DelayQueueUsingTimedSemaphore(long period, int slotLimit) { semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit); } boolean tryAdd() { return semaphore.tryAcquire(); } int availableSlots() { return semaphore.getAvailablePermits(); } }

Cuando usamos una cola de retardo con un segundo como período de tiempo y después de usar todas las ranuras en un segundo, ninguna debería estar disponible:

public void givenDelayQueue_whenReachLimit_thenBlocked() { int slots = 50; ExecutorService executorService = Executors.newFixedThreadPool(slots); DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(delayQueue::tryAdd)); executorService.shutdown(); assertEquals(0, delayQueue.availableSlots()); assertFalse(delayQueue.tryAdd()); }

Pero después de dormir por un tiempo, el semáforo debería reiniciarse y liberar los permisos :

@Test public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException { int slots = 50; ExecutorService executorService = Executors.newFixedThreadPool(slots); DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(delayQueue::tryAdd)); executorService.shutdown(); assertEquals(0, delayQueue.availableSlots()); Thread.sleep(1000); assertTrue(delayQueue.availableSlots() > 0); assertTrue(delayQueue.tryAdd()); }

4. Semáforo frente a mutex

Mutex actúa de manera similar a un semáforo binario, podemos usarlo para implementar la exclusión mutua.

En el siguiente ejemplo, usaremos un semáforo binario simple para construir un contador:

class CounterUsingMutex { private Semaphore mutex; private int count; CounterUsingMutex() { mutex = new Semaphore(1); count = 0; } void increase() throws InterruptedException { mutex.acquire(); this.count = this.count + 1; Thread.sleep(1000); mutex.release(); } int getCount() { return this.count; } boolean hasQueuedThreads() { return mutex.hasQueuedThreads(); } }

Cuando muchos subprocesos intentan acceder al contador a la vez, simplemente se bloquearán en una cola :

@Test public void whenMutexAndMultipleThreads_thenBlocked() throws InterruptedException { int count = 5; ExecutorService executorService = Executors.newFixedThreadPool(count); CounterUsingMutex counter = new CounterUsingMutex(); IntStream.range(0, count) .forEach(user -> executorService.execute(() -> { try { counter.increase(); } catch (InterruptedException e) { e.printStackTrace(); } })); executorService.shutdown(); assertTrue(counter.hasQueuedThreads()); }

Cuando esperemos, todos los hilos accederán al contador y no quedarán hilos en la cola:

@Test public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount() throws InterruptedException { int count = 5; ExecutorService executorService = Executors.newFixedThreadPool(count); CounterUsingMutex counter = new CounterUsingMutex(); IntStream.range(0, count) .forEach(user -> executorService.execute(() -> { try { counter.increase(); } catch (InterruptedException e) { e.printStackTrace(); } })); executorService.shutdown(); assertTrue(counter.hasQueuedThreads()); Thread.sleep(5000); assertFalse(counter.hasQueuedThreads()); assertEquals(count, counter.getCount()); }

5. Conclusión

En este artículo, exploramos los conceptos básicos de los semáforos en Java.

Como siempre, el código fuente completo está disponible en GitHub.