Guía de los arroyos de Akka

1. Información general

En este artículo, veremos la biblioteca akka-streams que se construye sobre el marco del actor Akka, que se adhiere al manifiesto de flujos reactivos. La API de Akka Streams nos permite componer fácilmente flujos de transformación de datos a partir de pasos independientes.

Además, todo el procesamiento se realiza de forma reactiva, sin bloqueo y asincrónica.

2. Dependencias de Maven

Para comenzar, necesitamos agregar las bibliotecas akka-stream y akka-stream-testkit en nuestro pom.xml:

 com.typesafe.akka akka-stream_2.11 2.5.2   com.typesafe.akka akka-stream-testkit_2.11 2.5.2 

3. API de Akka Streams

Para trabajar con Akka Streams, debemos conocer los conceptos básicos de la API:

  • Fuente - el punto de entrada al procesamiento en la biblioteca akka-stream - podemos crear una instancia de esta clase a partir de múltiples fuentes; por ejemplo, podemos usar elmétodo single () si queremos crear una fuente a partir de una sola cadena , o podemos crear una fuente a partir de un Iterable de elementos
  • Flow , el bloque de construcción de procesamiento principal , cadainstancia de Flow tiene un valor de entrada y un valor de salida
  • Materializador: podemos usar uno si queremos que nuestro flujo tenga algunos efectos secundarios como registrar o guardar resultados ; más comúnmente, pasaremos elalias NotUsed como materializador para indicar que nuestro flujo no debería tener efectos secundarios
  • Operación de sumidero : cuando estamos construyendo un flujo, no se ejecuta hasta que registramos una operación de sumidero en él; es una operación de terminal que activa todos los cálculos en todo el flujo.

4. Creación de flujos en Akka Streams

Vamos a empezar por la construcción de un ejemplo sencillo, en donde vamos a mostrar cómo crear y combinar múltiples Flujo s - para procesar una corriente de números enteros y calcular la ventana de media móvil de pares de número entero de la corriente.

Analizaremos una cadena de enteros delimitada por punto y coma como entrada para crear nuestra fuente akka-stream para el ejemplo.

4.1. Usar un flujo para analizar la entrada

Primero, creemos una clase DataImporter que tomará una instancia del ActorSystem que usaremos más adelante para crear nuestro Flow :

public class DataImporter { private ActorSystem actorSystem; // standard constructors, getters... }

A continuación, creemos un método parseLine que generará una Lista de enteros a partir de nuestra Cadena de entrada delimitada . Tenga en cuenta que estamos usando Java Stream API aquí solo para analizar:

private List parseLine(String line) { String[] fields = line.split(";"); return Arrays.stream(fields) .map(Integer::parseInt) .collect(Collectors.toList()); }

Nuestro flujo inicial aplicará parseLine a nuestra entrada para crear un flujo con el tipo de entrada String y el tipo de salida Integer :

private Flow parseContent() { return Flow.of(String.class) .mapConcat(this::parseLine); }

Cuando llamamos al método parseLine () , el compilador sabe que el argumento de esa función lambda será un String , igual que el tipo de entrada de nuestro Flow .

Tenga en cuenta que estamos usando el mapConcat () método - equivalente a la de Java 8 flatMap () método - porque queremos aplanar la Lista de entero devuelto por parseLine () en un flujo de enteros de manera que las etapas subsiguientes en nuestro procesamiento no es necesario para ocuparse de la Lista .

4.2. Usar un flujo para realizar cálculos

En este punto, tenemos nuestro flujo de enteros analizados. Ahora, necesitamos implementar una lógica que agrupe todos los elementos de entrada en pares y calcule un promedio de esos pares .

Ahora, crearemos un flujo de enteros y los agruparemos usando el método agrupado () .

A continuación, queremos calcular un promedio.

Dado que no estamos interesados ​​en el orden en el que se procesarán esos promedios, podemos tener promedios calculados en paralelo usando múltiples subprocesos usando el método mapAsyncUnordered () , pasando el número de subprocesos como un argumento a este método.

La acción que se pasará como lambda al flujo debe devolver un CompletableFuture porque esa acción se calculará de forma asincrónica en el hilo independiente:

private Flow computeAverage() { return Flow.of(Integer.class) .grouped(2) .mapAsyncUnordered(8, integers -> CompletableFuture.supplyAsync(() -> integers.stream() .mapToDouble(v -> v) .average() .orElse(-1.0))); }

Estamos calculando promedios en ocho subprocesos paralelos. Tenga en cuenta que estamos utilizando Java 8 Stream API para calcular un promedio.

4.3. Composición de varios flujos en un solo flujo

El flujo de API es una abstracción fluidez que nos permite componer múltiples Flujo de casos para lograr nuestro objetivo procesamiento final . Podemos tener flujos granulares donde uno, por ejemplo, está analizando JSON, otro está haciendo alguna transformación y otro está recopilando algunas estadísticas.

Tal granularidad nos ayudará a crear un código más comprobable porque podemos probar cada paso de procesamiento de forma independiente.

Creamos dos flujos arriba que pueden funcionar independientemente uno del otro. Ahora, queremos componerlos juntos.

Primero, queremos analizar nuestra cadena de entrada y, a continuación, queremos calcular un promedio en una secuencia de elementos.

Podemos componer nuestros flujos usando el método via () :

Flow calculateAverage() { return Flow.of(String.class) .via(parseContent()) .via(computeAverage()); }

Creamos un flujo que tiene el tipo de entrada String y otros dos flujos después. El flujo parseContent () toma una entrada de cadena y devuelve un entero como salida. El flujo computeAverage () toma ese Integer y calcula un Double de retorno promedio como el tipo de salida.

5. Añadiendo fregadero al flujo

Como mencionamos, hasta este punto todo el Flow aún no se ha ejecutado porque es perezoso. Para iniciar la ejecución del flujo necesitamos definir un sumidero . La operación Sink puede, por ejemplo, guardar datos en una base de datos o enviar resultados a algún servicio web externo.

Supongamos que tenemos una clase AverageRepository con el siguiente método save () que escribe resultados en nuestra base de datos:

CompletionStage save(Double average) { return CompletableFuture.supplyAsync(() -> { // write to database return average; }); }

Ahora, queremos crear una operación de sumidero que utilice este método para guardar los resultados de nuestro procesamiento de flujo . Para crear nuestro sumidero, primero necesitamos crear un flujo que tome un resultado de nuestro procesamiento como tipo de entrada . A continuación, queremos guardar todos nuestros resultados en la base de datos.

Nuevamente, no nos importa el orden de los elementos, por lo que podemos realizar las operaciones save () en paralelo utilizando el método mapAsyncUnordered () .

Para crear un Sink desde el Flow , necesitamos llamar a toMat () con Sink.ignore () como primer argumento y Keep.right () como el segundo porque queremos devolver un estado del procesamiento:

private Sink
    
      storeAverages() { return Flow.of(Double.class) .mapAsyncUnordered(4, averageRepository::save) .toMat(Sink.ignore(), Keep.right()); }
    

6. Definición de una fuente para el flujo

Lo último que debemos hacer es crear una fuente a partir de la cadena de entrada . Podemos aplicar un flujo calculateAverage () a esta fuente usando el método via () .

Luego, para agregar el Sink al procesamiento, necesitamos llamar al método runWith () y pasar el Sink storeAverage () que acabamos de crear:

CompletionStage calculateAverageForContent(String content) { return Source.single(content) .via(calculateAverage()) .runWith(storeAverages(), ActorMaterializer.create(actorSystem)) .whenComplete((d, e) -> { if (d != null) { System.out.println("Import finished "); } else { e.printStackTrace(); } }); }

Note that when the processing is finished we are adding the whenComplete() callback, in which we can perform some action depending on the outcome of the processing.

7. Testing Akka Streams

We can test our processing using the akka-stream-testkit.

The best way to test the actual logic of the processing is to test all Flow logic and use TestSink to trigger the computation and assert on the results.

In our test, we are creating the Flow that we want to test, and next, we are creating a Source from the test input content:

@Test public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() { // given Flow tested = new DataImporter(actorSystem).calculateAverage(); String input = "1;9;11;0"; // when Source flow = Source.single(input).via(tested); // then flow .runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem)) .request(4) .expectNextUnordered(5d, 5.5); }

We are checking that we are expecting four input arguments, and two results that are averages can arrive in any order because our processing is done in the asynchronous and parallel way.

8. Conclusion

In this article, we were looking at the akka-stream library.

We defined a process that combines multiple Flows to calculate moving average of elements. Then, we defined a Source that is an entry point of the stream processing and a Sink that triggers the actual processing.

Finally, we wrote a test for our processing using the akka-stream-testkit.

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.