Introducción a Spring Cloud Stream

1. Información general

Spring Cloud Stream es un marco construido sobre Spring Boot y Spring Integration que ayuda a crear microservicios impulsados ​​por eventos o mensajes .

En este artículo, presentaremos conceptos y construcciones de Spring Cloud Stream con algunos ejemplos simples.

2. Dependencias de Maven

Para comenzar, necesitaremos agregar Spring Cloud Starter Stream con la dependencia del corredor RabbitMQ Maven como messaging-middleware a nuestro pom.xml :

 org.springframework.cloud spring-cloud-starter-stream-rabbit 1.3.0.RELEASE 

Y agregaremos la dependencia del módulo de Maven Central para habilitar también el soporte JUnit:

 org.springframework.cloud spring-cloud-stream-test-support 1.3.0.RELEASE test 

3. Conceptos principales

La arquitectura de microservicios sigue el principio de “terminales inteligentes y tuberías tontas”. La comunicación entre los puntos finales está impulsada por partes de middleware de mensajería como RabbitMQ o Apache Kafka. Los servicios se comunican mediante la publicación de eventos de dominio a través de estos puntos finales o canales .

Repasemos los conceptos que componen el marco Spring Cloud Stream, junto con los paradigmas esenciales que debemos conocer para crear servicios basados ​​en mensajes.

3.1. Construye

Veamos un servicio simple en Spring Cloud Stream que escucha el enlace de entrada y envía una respuesta al enlace de salida :

@SpringBootApplication @EnableBinding(Processor.class) public class MyLoggerServiceApplication { public static void main(String[] args) { SpringApplication.run(MyLoggerServiceApplication.class, args); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public LogMessage enrichLogMessage(LogMessage log) { return new LogMessage(String.format("[1]: %s", log.getMessage())); } }

La anotación @EnableBinding configura la aplicación para vincular los canales INPUT y OUTPUT definidos dentro de la interfaz Procesador . Ambos canales son enlaces que se pueden configurar para utilizar un middleware o un binder de mensajería concreto.

Echemos un vistazo a la definición de todos estos conceptos:

  • Enlaces : una colección de interfaces que identifican los canales de entrada y salida de forma declarativa
  • Binder : implementación de middleware de mensajería como Kafka o RabbitMQ
  • Canal : representa el conducto de comunicación entre el middleware de mensajería y la aplicación.
  • StreamListeners : métodos de manejo de mensajes en beans que se invocarán automáticamente en un mensaje del canal después de que MessageConverter realice la serialización / deserialización entre eventos específicos de middleware y tipos de objetos de dominio / POJO
  • Mes salvia Esquemas - utilizado para la serialización y deserialización de mensajes, estos esquemas se pueden leer de forma estática desde una ubicación o cargados dinámicamente, el apoyo a la evolución de los tipos de objetos de dominio

3.2. Patrones de comunicación

Los mensajes designados a los destinos se entregan mediante el patrón de mensajería Publicar-Suscribir . Los editores clasifican los mensajes en temas, cada uno identificado por un nombre. Los suscriptores expresan interés en uno o más temas. El middleware filtra los mensajes, entregando los de los temas interesantes a los suscriptores.

Ahora, los suscriptores podrían agruparse. Un grupo de consumidores es un conjunto de suscriptores o consumidores, identificados por una identificación de grupo , dentro del cual los mensajes de un tema o la partición de un tema se entregan de manera equilibrada.

4. Modelo de programación

Esta sección describe los conceptos básicos de la creación de aplicaciones Spring Cloud Stream.

4.1. Pruebas funcionales

El soporte de prueba es una implementación de binder que permite interactuar con los canales e inspeccionar mensajes.

Enviemos un mensaje al servicio enrichLogMessage anterior y verifiquemos si la respuesta contiene el texto "[1]:" al principio del mensaje:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = MyLoggerServiceApplication.class) @DirtiesContext public class MyLoggerApplicationTests { @Autowired private Processor pipe; @Autowired private MessageCollector messageCollector; @Test public void whenSendMessage_thenResponseShouldUpdateText() { pipe.input() .send(MessageBuilder.withPayload(new LogMessage("This is my message")) .build()); Object payload = messageCollector.forChannel(pipe.output()) .poll() .getPayload(); assertEquals("[1]: This is my message", payload.toString()); } }

4.2. Canales personalizados

En el ejemplo anterior, usamos la interfaz del procesador proporcionada por Spring Cloud, que tiene solo un canal de entrada y un canal de salida.

Si necesitamos algo diferente, como un canal de entrada y dos de salida, podemos crear un procesador personalizado:

public interface MyProcessor { String INPUT = "myInput"; @Input SubscribableChannel myInput(); @Output("myOutput") MessageChannel anOutput(); @Output MessageChannel anotherOutput(); }

Spring nos proporcionará la implementación adecuada de esta interfaz. Los nombres de los canales se pueden configurar mediante anotaciones como en @Output ("myOutput") .

De lo contrario, Spring usará los nombres de los métodos como nombres de canales. Por lo tanto, tenemos tres canales llamados myInput , myOutput y anotherOutput .

Ahora, imaginemos que queremos enrutar los mensajes a una salida si el valor es menor que 10 y en otra salida el valor es mayor o igual a 10:

@Autowired private MyProcessor processor; @StreamListener(MyProcessor.INPUT) public void routeValues(Integer val) { if (val < 10) { processor.anOutput().send(message(val)); } else { processor.anotherOutput().send(message(val)); } } private static final  Message message(T val) { return MessageBuilder.withPayload(val).build(); }

4.3. Despacho condicional

Usando la anotación @StreamListener , también podemos filtrar los mensajes que esperamos en el consumidor usando cualquier condición que definamos con expresiones SpEL.

Como ejemplo, podríamos usar el envío condicional como otro enfoque para enrutar mensajes a diferentes salidas:

@Autowired private MyProcessor processor; @StreamListener( target = MyProcessor.INPUT, condition = "payload = 10") public void routeValuesToAnotherOutput(Integer val) { processor.anotherOutput().send(message(val)); }

La única limitación de este enfoque es que estos métodos no deben devolver un valor.

5. Configuración

Let's set up the application that will process the message from the RabbitMQ broker.

5.1. Binder Configuration

We can configure our application to use the default binder implementation via META-INF/spring.binders:

rabbit:\ org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

Or we can add the binder library for RabbitMQ to the classpath by including this dependency:

 org.springframework.cloud spring-cloud-stream-binder-rabbit 1.3.0.RELEASE 

If no binder implementation is provided, Spring will use direct message communication between the channels.

5.2. RabbitMQ Configuration

To configure the example in section 3.1 to use the RabbitMQ binder, we need to update the application.yml located at src/main/resources:

spring: cloud: stream: bindings: input: destination: queue.log.messages binder: local_rabbit output: destination: queue.pretty.log.messages binder: local_rabbit binders: local_rabbit: type: rabbit environment: spring: rabbitmq: host:  port: 5672 username:  password:  virtual-host: /

The input binding will use the exchange called queue.log.messages, and the output binding will use the exchange queue.pretty.log.messages. Both bindings will use the binder called local_rabbit.

Note that we don't need to create the RabbitMQ exchanges or queues in advance. When running the application, both exchanges are automatically created.

To test the application, we can use the RabbitMQ management site to publish a message. In the Publish Message panel of the exchange queue.log.messages, we need to enter the request in JSON format.

5.3. Customizing Message Conversion

Spring Cloud Stream allows us to apply message conversion for specific content types. In the above example, instead of using JSON format, we want to provide plain text.

To do this, we'll to apply a custom transformation to LogMessage using a MessageConverter:

@SpringBootApplication @EnableBinding(Processor.class) public class MyLoggerServiceApplication { //... @Bean public MessageConverter providesTextPlainMessageConverter() { return new TextPlainMessageConverter(); } //... }
public class TextPlainMessageConverter extends AbstractMessageConverter { public TextPlainMessageConverter() { super(new MimeType("text", "plain")); } @Override protected boolean supports(Class clazz) { return (LogMessage.class == clazz); } @Override protected Object convertFromInternal(Message message, Class targetClass, Object conversionHint) { Object payload = message.getPayload(); String text = payload instanceof String ? (String) payload : new String((byte[]) payload); return new LogMessage(text); } }

After applying these changes, going back to the Publish Message panel, if we set the header “contentTypes” to “text/plain” and the payload to “Hello World“, it should work as before.

5.4. Consumer Groups

When running multiple instances of our application, every time there is a new message in an input channel, all subscribers will be notified.

Most of the time, we need the message to be processed only once. Spring Cloud Stream implements this behavior via consumer groups.

To enable this behavior, each consumer binding can use the spring.cloud.stream.bindings..group property to specify a group name:

spring: cloud: stream: bindings: input: destination: queue.log.messages binder: local_rabbit group: logMessageConsumers ...

6. Message-Driven Microservices

In this section, we introduce all the required features for running our Spring Cloud Stream applications in a microservices context.

6.1. Scaling Up

When multiple applications are running, it's important to ensure the data is split properly across consumers. To do so, Spring Cloud Stream provides two properties:

  • spring.cloud.stream.instanceCount — number of running applications
  • spring.cloud.stream.instanceIndex — index of the current application

For example, if we've deployed two instances of the above MyLoggerServiceApplication application, the property spring.cloud.stream.instanceCount should be 2 for both applications, and the property spring.cloud.stream.instanceIndex should be 0 and 1 respectively.

These properties are automatically set if we deploy the Spring Cloud Stream applications using Spring Data Flow as described in this article.

6.2. Partitioning

The domain events could be Partitioned messages. This helps when we are scaling up the storage and improving application performance.

The domain event usually has a partition key so that it ends up in the same partition with related messages.

Let's say that we want the log messages to be partitioned by the first letter in the message, which would be the partition key, and grouped into two partitions.

There would be one partition for the log messages that start with A-M and another partition for N-Z. This can be configured using two properties:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression — the expression to partition the payloads
  • spring.cloud.stream.bindings.output.producer.partitionCount — the number of groups

Sometimes the expression to partition is too complex to write it in only one line. For these cases, we can write our custom partition strategy using the property spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass.

6.3. Health Indicator

In a microservices context, we also need to detect when a service is down or starts failing. Spring Cloud Stream provides the property management.health.binders.enabled to enable the health indicators for binders.

When running the application, we can query the health status at //:/health.

7. Conclusion

En este tutorial, presentamos los conceptos principales de Spring Cloud Stream y mostramos cómo usarlo a través de algunos ejemplos simples sobre RabbitMQ. Puede encontrar más información sobre Spring Cloud Stream aquí.

El código fuente de este artículo se puede encontrar en GitHub.