Introducción a Apache Storm

1. Información general

Este tutorial será una introducción a Apache Storm, un sistema de cálculo distribuido en tiempo real.

Nos centraremos y cubriremos:

  • ¿Qué es exactamente Apache Storm y qué problemas resuelve?
  • Su arquitectura y
  • Cómo usarlo en un proyecto

2. ¿Qué es Apache Storm?

Apache Storm es un sistema distribuido de código abierto y gratuito para cálculos en tiempo real.

Proporciona tolerancia a fallos, escalabilidad y garantiza el procesamiento de datos, y es especialmente bueno para procesar flujos de datos ilimitados.

Algunos buenos casos de uso de Storm pueden ser procesar operaciones de tarjetas de crédito para la detección de fraudes o procesar datos de hogares inteligentes para detectar sensores defectuosos.

Storm permite la integración con varias bases de datos y sistemas de colas disponibles en el mercado.

3. Dependencia de Maven

Antes de usar Apache Storm, debemos incluir la dependencia del núcleo de la tormenta en nuestro proyecto:

 org.apache.storm storm-core 1.2.2 provided 

Solo debemos usar el alcance proporcionado si tenemos la intención de ejecutar nuestra aplicación en el clúster de Storm.

Para ejecutar la aplicación localmente, podemos usar un llamado modo local que simulará el clúster Storm en un proceso local, en tal caso debemos eliminar el proporcionado.

4. Modelo de datos

El modelo de datos de Apache Storm consta de dos elementos: tuplas y flujos.

4.1. Tupla

Una tupla es una lista ordenada de campos con nombre con tipos dinámicos. Esto significa que no necesitamos declarar explícitamente los tipos de campos.

Storm necesita saber cómo serializar todos los valores que se utilizan en una tupla. De forma predeterminada, ya puede serializar tipos primitivos, cadenas y matrices de bytes .

Y dado que Storm usa la serialización de Kryo, necesitamos registrar el serializador usando Config para usar los tipos personalizados. Podemos hacer esto de dos formas:

Primero, podemos registrar la clase para serializar usando su nombre completo:

Config config = new Config(); config.registerSerialization(User.class);

En tal caso, Kryo serializará la clase usando FieldSerializer. De forma predeterminada, esto serializará todos los campos no transitorios de la clase, tanto privados como públicos.

O en su lugar, podemos proporcionar tanto la clase para serializar como el serializador que queremos que Storm use para esa clase:

Config config = new Config(); config.registerSerialization(User.class, UserSerializer.class);

Para crear el serializador personalizado, necesitamos extender la clase genérica Serializer que tiene dos métodos de escritura y lectura.

4.2. Corriente

Un Stream es la abstracción central del ecosistema Storm. La corriente es una secuencia sin límites de tuplas.

Storms permite procesar múltiples flujos en paralelo.

Cada flujo tiene una identificación que se proporciona y se asigna durante la declaración.

5. Topología

La lógica de la aplicación Storm en tiempo real está empaquetada en la topología. La topología consta de picos y pernos .

5.1. Canalón

Los picos son las fuentes de los arroyos. Emiten tuplas a la topología.

Las tuplas se pueden leer desde varios sistemas externos como Kafka, Kestrel o ActiveMQ.

Los picos pueden ser confiables o no confiables . Fiable significa que el canalón puede responder que la tupla que no ha sido procesada por Storm. No confiable significa que el pico no responde, ya que usará un mecanismo de disparar y olvidar para emitir las tuplas.

Para crear el pico personalizado, necesitamos implementar la interfaz IRichSpout o extender cualquier clase que ya implemente la interfaz, por ejemplo, una clase BaseRichSpout abstracta .

Creemos un pico poco confiable :

public class RandomIntSpout extends BaseRichSpout { private Random random; private SpoutOutputCollector outputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { random = new Random(); outputCollector = spoutOutputCollector; } @Override public void nextTuple() { Utils.sleep(1000); outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis())); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp")); } }

Nuestro RandomIntSpout personalizado generará un entero aleatorio y una marca de tiempo cada segundo.

5.2. Tornillo

Los pernos procesan tuplas en la secuencia. Pueden realizar diversas operaciones como filtrado, agregaciones o funciones personalizadas.

Algunas operaciones requieren varios pasos y, por tanto, necesitaremos utilizar varios tornillos en tales casos.

Para crear el Bolt personalizado , necesitamos implementar IRichBolt o para operaciones más simples, la interfaz IBasicBolt .

También hay varias clases de ayuda disponibles para implementar Bolt. En este caso, usaremos BaseBasicBolt :

public class PrintingBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { System.out.println(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }

Este PrintingBolt personalizado simplemente imprimirá todas las tuplas en la consola.

6. Creación de una topología simple

Juntemos estas ideas en una topología simple. Nuestra topología tendrá un pico y tres tornillos.

6.1. Número aleatorio

Al principio, crearemos un pico poco confiable. Generará números enteros aleatorios del rango (0,100) cada segundo:

public class RandomNumberSpout extends BaseRichSpout { private Random random; private SpoutOutputCollector collector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { random = new Random(); collector = spoutOutputCollector; } @Override public void nextTuple() { Utils.sleep(1000); int operation = random.nextInt(101); long timestamp = System.currentTimeMillis(); Values values = new Values(operation, timestamp); collector.emit(values); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("operation", "timestamp")); } }

6.2. Perno de filtrado

A continuación, crearemos un perno que filtrará todos los elementos con operación igual a 0:

public class FilteringBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { int operation = tuple.getIntegerByField("operation"); if (operation > 0) { basicOutputCollector.emit(tuple.getValues()); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("operation", "timestamp")); } }

6.3. Perno de agregación

A continuación, creemos un Bolt más complicado que agregará todas las operaciones positivas de cada día.

Para este propósito, usaremos una clase específica creada especialmente para implementar pernos que operan en ventanas en lugar de operar en tuplas individuales: BaseWindowedBolt .

Windows are an essential concept in stream processing, splitting the infinite streams into finite chunks. We can then apply computations to each chunk. There are generally two types of windows:

Time windows are used to group elements from a given time period using timestamps. Time windows may have a different number of elements.

Count windows are used to create windows with a defined size. In such a case, all windows will have the same size and the window will not be emitted if there are fewer elements than the defined size.

Our AggregatingBolt will generate the sum of all positive operations from a time window along with its beginning and end timestamps:

public class AggregatingBolt extends BaseWindowedBolt { private OutputCollector outputCollector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.outputCollector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp")); } @Override public void execute(TupleWindow tupleWindow) { List tuples = tupleWindow.get(); tuples.sort(Comparator.comparing(this::getTimestamp)); int sumOfOperations = tuples.stream() .mapToInt(tuple -> tuple.getIntegerByField("operation")) .sum(); Long beginningTimestamp = getTimestamp(tuples.get(0)); Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1)); Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp); outputCollector.emit(values); } private Long getTimestamp(Tuple tuple) { return tuple.getLongByField("timestamp"); } }

Note that, in this case, getting the first element of the list directly is safe. That's because each window is calculated using the timestamp field of the Tuple, so there has to be at least one element in each window.

6.4. FileWritingBolt

Finally, we'll create a bolt that will take all elements with sumOfOperations greater than 2000, serialize them and write them to the file:

public class FileWritingBolt extends BaseRichBolt { public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class); private BufferedWriter writer; private String filePath; private ObjectMapper objectMapper; @Override public void cleanup() { try { writer.close(); } catch (IOException e) { logger.error("Failed to close writer!"); } } @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); try { writer = new BufferedWriter(new FileWriter(filePath)); } catch (IOException e) { logger.error("Failed to open a file for writing.", e); } } @Override public void execute(Tuple tuple) { int sumOfOperations = tuple.getIntegerByField("sumOfOperations"); long beginningTimestamp = tuple.getLongByField("beginningTimestamp"); long endTimestamp = tuple.getLongByField("endTimestamp"); if (sumOfOperations > 2000) { AggregatedWindow aggregatedWindow = new AggregatedWindow( sumOfOperations, beginningTimestamp, endTimestamp); try { writer.write(objectMapper.writeValueAsString(aggregatedWindow)); writer.newLine(); writer.flush(); } catch (IOException e) { logger.error("Failed to write data to file.", e); } } } // public constructor and other methods }

Note that we don't need to declare the output as this will be the last bolt in our topology

6.5. Running the Topology

Finally, we can pull everything together and run our topology:

public static void runTopology() { TopologyBuilder builder = new TopologyBuilder(); Spout random = new RandomNumberSpout(); builder.setSpout("randomNumberSpout"); Bolt filtering = new FilteringBolt(); builder.setBolt("filteringBolt", filtering) .shuffleGrouping("randomNumberSpout"); Bolt aggregating = new AggregatingBolt() .withTimestampField("timestamp") .withLag(BaseWindowedBolt.Duration.seconds(1)) .withWindow(BaseWindowedBolt.Duration.seconds(5)); builder.setBolt("aggregatingBolt", aggregating) .shuffleGrouping("filteringBolt");  String filePath = "./src/main/resources/data.txt"; Bolt file = new FileWritingBolt(filePath); builder.setBolt("fileBolt", file) .shuffleGrouping("aggregatingBolt"); Config config = new Config(); config.setDebug(false); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Test", config, builder.createTopology()); }

To make the data flow through each piece in the topology, we need to indicate how to connect them. shuffleGroup allows us to state that data for filteringBolt will be coming from randomNumberSpout.

For each Bolt, we need to add shuffleGroup which defines the source of elements for this bolt. The source of elements may be a Spout or another Bolt. And if we set the same source for more than one bolt, the source will emit all elements to each of them.

In this case, our topology will use the LocalCluster to run the job locally.

7. Conclusion

En este tutorial, presentamos Apache Storm, un sistema de cálculo distribuido en tiempo real. Creamos un pico, algunos tornillos y los juntamos en una topología completa.

Y, como siempre, todas las muestras de código se pueden encontrar en GitHub.