1. Introducción
Spring Cloud Data Flow es un modelo operativo y de programación nativo de la nube para microservicios de datos componibles.
Con Spring Cloud Data Flow , los desarrolladores pueden crear y orquestar canalizaciones de datos para casos de uso comunes, como ingesta de datos, análisis en tiempo real e importación / exportación de datos.
Estas canalizaciones de datos vienen en dos tipos, canalizaciones de datos de transmisión y por lotes.
En el primer caso, se consume o produce una cantidad ilimitada de datos a través del middleware de mensajería. Mientras que en el segundo caso, la tarea de corta duración procesa un conjunto finito de datos y luego termina.
Este artículo se centrará en el procesamiento de transmisión.
2. Descripción de la arquitectura
Los componentes clave de este tipo de arquitectura son las aplicaciones , el servidor de flujo de datos y el tiempo de ejecución de destino.
Además de estos componentes clave, normalmente también tenemos un Data Flow Shell y un agente de mensajes dentro de la arquitectura.
Veamos todos estos componentes con más detalle.
2.1. Aplicaciones
Normalmente, una canalización de datos de transmisión incluye eventos de consumo de sistemas externos, procesamiento de datos y persistencia políglota. Estas fases se denominan comúnmente Fuente , Procesador y Receptor en la terminología de Spring Cloud :
- Fuente: es la aplicación que consume eventos.
- Procesador: consume datos de la fuente , realiza algún procesamiento en ellos y emite los datos procesados a la siguiente aplicación en la tubería
- Receptor: consume de una fuente o procesador y escribe los datos en la capa de persistencia deseada
Estas aplicaciones se pueden empaquetar de dos formas:
- Spring Boot uber-jar que está alojado en un repositorio de maven, archivo, http o cualquier otra implementación de recursos de Spring (este método se utilizará en este artículo)
- Estibador
Muchas aplicaciones de fuentes, procesadores y receptores para casos de uso comunes (por ejemplo, jdbc, hdfs, http, enrutador) ya están provistas y listas para usar por el equipo de Spring Cloud Data Flow .
2.2. Tiempo de ejecución
Además, se necesita un tiempo de ejecución para que se ejecuten estas aplicaciones. Los tiempos de ejecución admitidos son:
- Fundición en la nube
- HILO Apache
- Kubernetes
- Apache Mesos
- Servidor local para desarrollo (que se utilizará en este artículo)
2.3. Servidor de flujo de datos
El componente responsable de implementar aplicaciones en un tiempo de ejecución es el servidor de flujo de datos . Se proporciona un jar ejecutable de Data Flow Server para cada uno de los tiempos de ejecución de destino.
El servidor de flujo de datos es responsable de interpretar:
- Un flujo de DSL que describe el flujo lógico de datos a través de múltiples aplicaciones.
- Un manifiesto de implementación que describe la asignación de aplicaciones al tiempo de ejecución.
2.4. Shell de flujo de datos
Data Flow Shell es un cliente del Data Flow Server. El shell nos permite ejecutar el comando DSL necesario para interactuar con el servidor.
Como ejemplo, el DSL para describir el flujo de datos desde una fuente http a un receptor jdbc se escribiría como “http | jdbc ”. Estos nombres en el DSL se registran con el servidor de flujo de datos y se asignan a los artefactos de la aplicación que se pueden alojar en los repositorios de Maven o Docker.
Spring también ofrece una interfaz gráfica, llamada Flo , para crear y monitorear canales de transmisión de datos. Sin embargo, su uso está fuera de la discusión de este artículo.
2.5. Agente de mensajes
Como hemos visto en el ejemplo de la sección anterior, hemos utilizado el símbolo de tubería en la definición del flujo de datos. El símbolo de la tubería representa la comunicación entre las dos aplicaciones a través del middleware de mensajería.
Esto significa que necesitamos un agente de mensajes en funcionamiento en el entorno de destino.
Los dos intermediarios de middleware de mensajería compatibles son:
- Apache Kafka
- RabbitMQ
Entonces, ahora que tenemos una descripción general de los componentes arquitectónicos , es hora de construir nuestra primera canalización de procesamiento de flujo.
3. Instale un agente de mensajes
Como hemos visto, las aplicaciones en proceso necesitan un middleware de mensajería para comunicarse. Para el propósito de este artículo, usaremos RabbitMQ .
Para conocer todos los detalles de la instalación, puede seguir las instrucciones en el sitio oficial.
4. El servidor de flujo de datos local
Para acelerar el proceso de generación de nuestras aplicaciones, usaremos Spring Initializr; con su ayuda, podemos obtener nuestras aplicaciones Spring Boot en unos minutos.
Después de acceder al sitio web, simplemente elija un grupo y un nombre de artefacto .
Una vez hecho esto, haga clic en el botón Generar proyecto para iniciar la descarga del artefacto Maven.

Una vez completada la descarga, descomprima el proyecto e impórtelo como un proyecto Maven en su IDE de elección.
Agreguemos una dependencia de Maven al proyecto. Como necesitaremos las bibliotecas del servidor local de Dataflow , agreguemos la dependencia spring-cloud-starter-dataflow-server-local:
org.springframework.cloud spring-cloud-starter-dataflow-server-local
Ahora necesitamos anotar la clase principal de Spring Boot con la anotación @EnableDataFlowServer :
@EnableDataFlowServer @SpringBootApplication public class SpringDataFlowServerApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowServerApplication.class, args); } }
Eso es todo. Nuestro servidor de flujo de datos local está listo para ser ejecutado:
mvn spring-boot:run
La aplicación se iniciará en el puerto 9393.
5. Shell de flujo de datos
Nuevamente, vaya a Spring Initializr y elija un nombre de grupo y artefacto .
Una vez que hayamos descargado e importado el proyecto, agreguemos una dependencia spring-cloud-dataflow-shell:
org.springframework.cloud spring-cloud-dataflow-shell
Ahora necesitamos agregar la anotación @EnableDataFlowShell a la clase principal de Spring Boot :
@EnableDataFlowShell @SpringBootApplication public class SpringDataFlowShellApplication { public static void main(String[] args) { SpringApplication.run(SpringDataFlowShellApplication.class, args); } }
Ahora podemos ejecutar el shell:
mvn spring-boot:run
After the shell is running, we can type the help command in the prompt to see a complete list of command that we can perform.
6. The Source Application
Similarly, on Initializr, we'll now create a simple application and add a Stream Rabbit dependency called spring-cloud-starter-stream-rabbit:
org.springframework.cloud spring-cloud-starter-stream-rabbit
We'll then add the @EnableBinding(Source.class) annotation to the Spring Boot main class:
@EnableBinding(Source.class) @SpringBootApplication public class SpringDataFlowTimeSourceApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeSourceApplication.class, args); } }
Now we need to define the source of the data that must be processed. This source could be any potentially endless workload (internet-of-things sensor data, 24/7 event processing, online transaction data ingest).
In our sample application, we produce one event (for simplicity a new timestamp) every 10 seconds with a Poller.
The @InboundChannelAdapter annotation sends a message to the source’s output channel, using the return value as the payload of the message:
@Bean @InboundChannelAdapter( value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1") ) public MessageSource timeMessageSource() { return () -> MessageBuilder.withPayload(new Date().getTime()).build(); }
Our data source is ready.
7. The Processor Application
Next- we'll create an application and add a Stream Rabbit dependency.
We'll then add the @EnableBinding(Processor.class) annotation to the Spring Boot main class:
@EnableBinding(Processor.class) @SpringBootApplication public class SpringDataFlowTimeProcessorApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeProcessorApplication.class, args); } }
Next, we need to define a method to process the data that coming from the source application.
To define a transformer, we need to annotate this method with @Transformer annotation:
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(Long timestamp) { DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy"); String date = dateFormat.format(timestamp); return date; }
It converts a timestamp from the ‘input' channel to a formatted date which will be sent to the ‘output' channel.
8. The Sink Application
The last application to create is the Sink application.
Again, go to the Spring Initializr and choose a Group, an Artifact name. After downloading the project let's add a Stream Rabbit dependency.
Then add the @EnableBinding(Sink.class) annotation to the Spring Boot main class:
@EnableBinding(Sink.class) @SpringBootApplication public class SpringDataFlowLoggingSinkApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowLoggingSinkApplication.class, args); } }
Now we need a method to intercept the messages coming from the processor application.
To do this, we need to add the @StreamListener(Sink.INPUT) annotation to our method:
@StreamListener(Sink.INPUT) public void loggerSink(String date) { logger.info("Received: " + date); }
The method simply prints the timestamp transformed in a formatted date to a log file.
9. Register a Stream App
The Spring Cloud Data Flow Shell allow us to Register a Stream App with the App Registry using the app register command.
We must provide a unique name, application type, and a URI that can be resolved to the app artifact. For the type, specify “source“, “processor“, or “sink“.
When providing a URI with the maven scheme, the format should conform to the following:
maven://:[:[:]]:
To register the Source, Processor and Sink applications previously created , go to the Spring Cloud Data Flow Shell and issue the following commands from the prompt:
app register --name time-source --type source --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT app register --name time-processor --type processor --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT app register --name logging-sink --type sink --uri maven://com.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT
10. Create and Deploy the Stream
To create a new stream definition go to the Spring Cloud Data Flow Shell and execute the following shell command:
stream create --name time-to-log --definition 'time-source | time-processor | logging-sink'
This defines a stream named time-to-log based on the DSL expression ‘time-source | time-processor | logging-sink'.
Then to deploy the stream execute the following shell command:
stream deploy --name time-to-log
The Data Flow Server resolves time-source, time-processor, and logging-sink to maven coordinates and uses those to launch the time-source, time-processor and logging-sink applications of the stream.
If the stream is correctly deployed you’ll see in the Data Flow Server logs that the modules have been started and tied together:
2016-08-24 12:29:10.516 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink 2016-08-24 12:29:17.600 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-processor instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor 2016-08-24 12:29:23.280 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-source instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source
11. Reviewing the Result
In this example, the source simply sends the current timestamp as a message each second, the processor format it and the log sink outputs the formatted timestamp using the logging framework.
The log files are located within the directory displayed in the Data Flow Server’s log output, as shown above. To see the result, we can tail the log:
tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log 2016-08-24 12:40:42.029 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01 2016-08-24 12:40:52.035 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11 2016-08-24 12:41:02.030 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21
12. Conclusion
In this article, we have seen how to build a data pipeline for stream processing through the use of Spring Cloud Data Flow.
Además, vimos el papel de las aplicaciones Source , Processor y Sink dentro de la transmisión y cómo conectar y vincular este módulo dentro de un Data Flow Server mediante el uso de Data Flow Shell .
El código de ejemplo se puede encontrar en el proyecto GitHub.