Flujos reactivos de Java 9

1. Información general

En este artículo, veremos las secuencias reactivas de Java 9. En pocas palabras, podremos usar la clase Flow , que incluye los bloques de construcción principales para construir la lógica de procesamiento de flujo reactivo.

Reactive Streams es un estándar para el procesamiento de flujo asíncrono con contrapresión sin bloqueo. Esta especificación se define en el Reactive Manifesto, y hay varias implementaciones de la misma, por ejemplo, RxJava o Akka-Streams.

2. Descripción general de la API reactiva

Para construir un flujo , podemos usar tres abstracciones principales y componerlas en lógica de procesamiento asincrónico.

Cada flujo necesita procesar los eventos que le publica una instancia de publicador ; el editor tiene un método: subscribe ().

Si alguno de los suscriptores desea recibir eventos publicados por él, debe suscribirse al editor en cuestión.

El receptor de mensajes debe implementar la interfaz de suscriptor . Por lo general, este es el final de cada procesamiento de flujo porque la instancia del mismo no envía mensajes más.

Podemos pensar en el suscriptor como un fregadero. Esto tiene cuatro métodos que deben anularse : onSubscribe (), onNext (), onError () y onComplete (). Los veremos en la siguiente sección.

Si queremos transformar el mensaje entrante y pasarlo al siguiente suscriptor, necesitamos implementar la interfaz del procesador . Esto actúa como suscriptor porque recibe mensajes y como publicador porque procesa esos mensajes y los envía para su posterior procesamiento.

3. Publicación y consumo de mensajes

Digamos que queremos crear un flujo simple , en el que tenemos un publicador que publica mensajes y un suscriptor simple que consume mensajes a medida que llegan, uno a la vez.

Creemos una clase EndSubscriber . Necesitamos implementar la interfaz de suscriptor . A continuación, anularemos los métodos necesarios.

Se llama al método onSubscribe () antes de que comience el procesamiento. La instancia de la suscripción se pasa como argumento. Es una clase que se utiliza para controlar el flujo de mensajes entre el suscriptor y el publicador:

public class EndSubscriber implements Subscriber { private Subscription subscription; public List consumedElements = new LinkedList(); @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } }

También inicializamos una Lista vacía de elementos consumidos que se utilizarán en las pruebas.

Ahora, necesitamos implementar los métodos restantes de la interfaz del suscriptor . El método principal aquí es onNext (): se llama cada vez que el editor publica un nuevo mensaje:

@Override public void onNext(T item) { System.out.println("Got : " + item); subscription.request(1); }

Tenga en cuenta que cuando iniciamos la suscripción en el método onSubscribe () y cuando procesamos un mensaje, debemos llamar al método request () en la Suscripción para indicar que el Suscriptor actual está listo para consumir más mensajes.

Por último, necesitamos implementar onError () , que se llama siempre que se lanzará alguna excepción en el procesamiento, así como onComplete (), que se llama cuando el editor está cerrado:

@Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { System.out.println("Done"); }

Escribamos una prueba para el flujo de procesamiento . Usaremos la clase SubmissionPublisher , una construcción de java.util.concurrent , que implementa la interfaz de Publisher .

Vamos a enviar N elementos al editor , que recibirá nuestro suscriptor final:

@Test public void whenSubscribeToIt_thenShouldConsumeAll() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); EndSubscriber subscriber = new EndSubscriber(); publisher.subscribe(subscriber); List items = List.of("1", "x", "2", "x", "3", "x"); // when assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until( () -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(items) ); }

Tenga en cuenta que estamos llamando al método close () en la instancia de EndSubscriber. Se invocará onComplete () de devolución de llamada por debajo en cada suscriptor de lo dado Editorial.

La ejecución de ese programa producirá el siguiente resultado:

Got : 1 Got : x Got : 2 Got : x Got : 3 Got : x Done

4. Transformación de mensajes

Digamos que queremos construir una lógica similar entre un publicador y un suscriptor , pero también aplicar alguna transformación.

Crearemos la clase TransformProcessor que implementa Processor y extiende SubmissionPublisher, ya que será P ublisher y S ubscriber.

Pasaremos una función que transformará las entradas en salidas:

public class TransformProcessor extends SubmissionPublisher implements Flow.Processor { private Function function; private Flow.Subscription subscription; public TransformProcessor(Function function) { super(); this.function = function; } @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { submit(function.apply(item)); subscription.request(1); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { close(); } }

Escribamos ahora una prueba rápida con un flujo de procesamiento en el que el publicador está publicando elementos de cadena .

Nuestro TransformProcessor analizará la cadena como entero , lo que significa que debe realizarse una conversión aquí:

@Test public void whenSubscribeAndTransformElements_thenShouldConsumeAll() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); TransformProcessor transformProcessor = new TransformProcessor(Integer::parseInt); EndSubscriber subscriber = new EndSubscriber(); List items = List.of("1", "2", "3"); List expectedResult = List.of(1, 2, 3); // when publisher.subscribe(transformProcessor); transformProcessor.subscribe(subscriber); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until(() -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(expectedResult) ); }

Tenga en cuenta que llamar al método close () en el Publisher base hará que se invoque el método onComplete () en TransformProcessor .

Tenga en cuenta que todos los editores de la cadena de procesamiento deben cerrarse de esta manera.

5. Control de la demanda de mensajes mediante la suscripción

Digamos que queremos consumir solo el primer elemento de la Suscripción, aplicar algo de lógica y finalizar el procesamiento. Podemos usar el método request () para lograr esto.

Modifiquemos nuestro EndSubscriber para consumir solo N número de mensajes. Pasaremos ese número como el argumento del constructor howMuchMessagesConsume :

public class EndSubscriber implements Subscriber { private AtomicInteger howMuchMessagesConsume; private Subscription subscription; public List consumedElements = new LinkedList(); public EndSubscriber(Integer howMuchMessagesConsume) { this.howMuchMessagesConsume = new AtomicInteger(howMuchMessagesConsume); } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { howMuchMessagesConsume.decrementAndGet(); System.out.println("Got : " + item); consumedElements.add(item); if (howMuchMessagesConsume.get() > 0) { subscription.request(1); } } //... }

Podemos solicitar elementos todo el tiempo que queramos.

Escribamos una prueba en la que solo queremos consumir un elemento de la Suscripción dada :

@Test public void whenRequestForOnlyOneElement_thenShouldConsumeOne() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); EndSubscriber subscriber = new EndSubscriber(1); publisher.subscribe(subscriber); List items = List.of("1", "x", "2", "x", "3", "x"); List expected = List.of("1"); // when assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until(() -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(expected) ); }

Aunque el editor está publicando seis elementos, nuestro EndSubscriber consumirá solo un elemento porque indica la demanda de procesamiento solo de ese.

Al utilizar el método request () en la suscripción, podemos implementar un mecanismo de contrapresión más sofisticado para controlar la velocidad del consumo de mensajes.

6. Conclusión

En este artículo, echamos un vistazo a las secuencias reactivas de Java 9.

Vimos cómo crear un flujo de procesamiento que consta de un publicador y un suscriptor. Creamos un flujo de procesamiento más complejo con la transformación de elementos usando Procesadores .

Finalmente, usamos la Suscripción para controlar la demanda de elementos por parte del Suscriptor.

La implementación de todos estos ejemplos y fragmentos de código se puede encontrar en el proyecto GitHub; este es un proyecto de Maven, por lo que debería ser fácil de importar y ejecutar tal como está.