Spring Integration Java DSL

1. Introducción

En este tutorial, aprenderemos sobre Spring Integration Java DSL para crear integraciones de aplicaciones.

Tomaremos la integración de movimiento de archivos que construimos en Introducción a Spring Integration y usaremos DSL en su lugar.

2. Dependencias

Spring Integration Java DSL es parte de Spring Integration Core.

Entonces, podemos agregar esa dependencia:

 org.springframework.integration spring-integration-core 5.0.6.RELEASE 

Y para trabajar en nuestra aplicación de movimiento de archivos, también necesitaremos Spring Integration File:

 org.springframework.integration spring-integration-file 5.0.6.RELEASE 

3. Spring Integration Java DSL

Antes de Java DSL, los usuarios configuraban los componentes de Spring Integration en XML.

El DSL presenta algunos constructores fluidos a partir de los cuales podemos crear fácilmente un pipeline completo de Spring Integration exclusivamente en Java.

Entonces, digamos que queremos crear un canal que supere los datos que llegan a través de la tubería.

En el pasado, podríamos haber hecho:

Y ahora podemos hacer en su lugar:

@Bean public IntegrationFlow upcaseFlow() { return IntegrationFlows.from("input") .transform(String::toUpperCase) .get(); }

4. La aplicación para mover archivos

Para comenzar nuestra integración de movimiento de archivos, necesitaremos algunos bloques de construcción simples.

4.1. Flujo de integración

El primer bloque de construcción que necesitamos es un flujo de integración, que podemos obtener del constructor IntegrationFlows :

IntegrationFlows.from(...)

from puede tomar varios tipos, pero en este tutorial, veremos solo tres:

  • MessageSource s
  • MessageChannel s, y
  • Cadena s

Hablaremos de los tres en breve.

Después de haber llamado desde , ahora tenemos disponibles algunos métodos de personalización:

IntegrationFlow flow = IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs()) .handle(targetDirectory()) // add more components .get();

En última instancia, IntegrationFlows siempre producirá una instancia de IntegrationFlow, que es el producto final de cualquier aplicación Spring Integration.

Este patrón de recibir información, realizar las transformaciones adecuadas y emitir los resultados es fundamental para todas las aplicaciones de Spring Integration .

4.2. Describir una fuente de entrada

Primero, para mover archivos, necesitaremos indicar a nuestro flujo de integración dónde debe buscarlos, y para eso, necesitamos un MessageSource:

@Bean public MessageSource sourceDirectory() { // .. create a message source }

En pocas palabras, un MessageSource es un lugar del que pueden provenir mensajes externos a la aplicación.

Más específicamente, necesitamos algo que pueda adaptar esa fuente externa a la representación de mensajes de Spring. Y dado que esta adaptación se centra en la entrada , a menudo se denominan adaptadores de canal de entrada.

La dependencia del archivo de integración de primavera nos da un adaptador de canal de entrada que es excelente para nuestro caso de uso: FileReadingMessageSource:

@Bean public MessageSource sourceDirectory() { FileReadingMessageSource messageSource = new FileReadingMessageSource(); messageSource.setDirectory(new File(INPUT_DIR)); return messageSource; }

Aquí, nuestro FileReadingMessageSource leerá un directorio proporcionado por INPUT_DIR y creará un MessageSource a partir de él.

Especifiquemos esto como nuestra fuente en un IntegrationFlows. De la invocación:

IntegrationFlows.from(sourceDirectory());

4.3. Configurar una fuente de entrada

Ahora, si estamos pensando en esto como una aplicación de larga duración, probablemente querremos poder notar los archivos a medida que ingresan , no solo mover los archivos que ya están allí al inicio.

Para facilitar esto, a partir también puede tomar adicionales configuradores como una mayor personalización de la fuente de entrada:

IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)));

En este caso, podemos hacer que nuestra fuente de entrada sea más resistente al decirle a Spring Integration que sondee esa fuente, nuestro sistema de archivos en este caso, cada 10 segundos.

Y, por supuesto, esto no se aplica solo a nuestra fuente de entrada de archivos, podríamos agregar este sondeo a cualquier MessageSource .

4.4. Filtrar mensajes de una fuente de entrada

Next, let's suppose we want our file-moving application to move specific files only, say image files having jpg extension.

For this, we can use GenericSelector:

@Bean public GenericSelector onlyJpgs() { return new GenericSelector() { @Override public boolean accept(File source) { return source.getName().endsWith(".jpg"); } }; }

So, let's update our integration flow again:

IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs());

Or, because this filter is so simple, we could have instead defined it using a lambda:

IntegrationFlows.from(sourceDirectory()) .filter(source -> ((File) source).getName().endsWith(".jpg"));

4.5. Handling Messages With Service Activators

Now that we have a filtered list of files, we need to write them to a new location.

Service Activators are what we turn to when we're thinking about outputs in Spring Integration.

Let's use the FileWritingMessageHandler service activator from spring-integration-file:

@Bean public MessageHandler targetDirectory() { FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR)); handler.setFileExistsMode(FileExistsMode.REPLACE); handler.setExpectReply(false); return handler; }

Here, our FileWritingMessageHandler will write each Message payload it receives to OUTPUT_DIR.

Again, let's update:

IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs()) .handle(targetDirectory());

And notice, by the way, the usage of setExpectReply. Because integration flows can bebidirectional, this invocation indicates that this particular pipe is one way.

4.6. Activating Our Integration Flow

When we have added all our components we need to register our IntegrationFlow as a bean to activate it:

@Bean public IntegrationFlow fileMover() { return IntegrationFlows.from(sourceDirectory(), c -> c.poller(Pollers.fixedDelay(10000))) .filter(onlyJpgs()) .handle(targetDirectory()) .get(); }

The get method extracts an IntegrationFlow instance that we need to register as a Spring Bean.

As soon as our application context loads, all our components contained in our IntegrationFlow gets activated.

And now, our application will start moving files from the source directory to target directory.

5. Additional Components

In our DSL-based file-moving application, we created an Inbound Channel Adapter, a Message Filter, and a Service Activator.

Let's look at a few other common Spring Integration components and see how we might use them.

5.1. Message Channels

As mentioned earlier, a Message Channel is another way to initialize a flow:

IntegrationFlows.from("anyChannel")

We can read this as “please find or create a channel bean called anyChannel. Then, read any data that is fed into anyChannel from other flows.”

But, really it is more general-purpose than that.

Simply put, a channel abstracts away producers from consumers, and we can think of it as a Java Queue. A channel can be inserted at any point in the flow.

Let's say, for example, that we want to prioritize the files as they get moved from one directory to the next:

@Bean public PriorityChannel alphabetically() { return new PriorityChannel(1000, (left, right) -> ((File)left.getPayload()).getName().compareTo( ((File)right.getPayload()).getName())); }

Then, we can insert an invocation to channel in between our flow:

@Bean public IntegrationFlow fileMover() { return IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs()) .channel("alphabetically") .handle(targetDirectory()) .get(); }

There are dozens of channels to pick from, some of the more handy ones being for concurrency, auditing, or intermediate persistence (think Kafka or JMS buffers).

Also, channels can be powerful when combined with Bridges.

5.2. Bridge

When we want to combine two channels, we use a Bridge.

Let's imagine that instead of writing directly to an output directory, we instead had our file-moving app write to another channel:

@Bean public IntegrationFlow fileReader() { return IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs()) .channel("holdingTank") .get(); }

Now, because we've simply written it to a channel, we can bridge from there to other flows.

Let's create a bridge that polls our holding tank for messages and writes them to a destination:

@Bean public IntegrationFlow fileWriter() { return IntegrationFlows.from("holdingTank") .bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20))) .handle(targetDirectory()) .get(); }

Again, because we wrote to an intermediate channel, now we can add another flow that takes these same files and writes them at a different rate:

@Bean public IntegrationFlow anotherFileWriter() { return IntegrationFlows.from("holdingTank") .bridge(e -> e.poller(Pollers.fixedRate(2, TimeUnit.SECONDS, 10))) .handle(anotherTargetDirectory()) .get(); }

As we can see, individual bridges can control the polling configuration for different handlers.

As soon as our application context is loaded, we now have a more complex app in action that will start moving files from the source directory to two target directories.

6. Conclusion

En este artículo, vimos varias formas de usar Spring Integration Java DSL para construir diferentes tuberías de integración.

Básicamente, pudimos recrear la aplicación de movimiento de archivos de un tutorial anterior, esta vez usando Java puro.

Además, echamos un vistazo a algunos otros componentes como canales y puentes.

El código fuente completo utilizado en este tutorial está disponible en Github.