Spring Batch usando el particionador

1. Información general

En nuestra introducción anterior a Spring Batch, presentamos el marco como una herramienta de procesamiento por lotes. También exploramos los detalles de configuración y la implementación para la ejecución de un trabajo de proceso único y de un solo subproceso.

Para implementar un trabajo con algún procesamiento paralelo, se proporciona una variedad de opciones. En un nivel superior, hay dos modos de procesamiento paralelo:

  1. Proceso único, multiproceso
  2. Multiproceso

En este artículo rápido, analizaremos la partición de Step , que se puede implementar tanto para trabajos de un solo proceso como de múltiples procesos.

2. Partición de un paso

Spring Batch con particionamiento nos brinda la posibilidad de dividir la ejecución de un Paso :

Resumen de particiones

La imagen de arriba muestra una implementación de un Trabajo con un Paso particionado .

Hay un paso llamado "Maestro", cuya ejecución se divide en algunos pasos "Esclavo". Estos esclavos pueden tomar el lugar de un maestro y el resultado seguirá siendo el mismo. Tanto el maestro como el esclavo son instancias de Step . Los esclavos pueden ser servicios remotos o simplemente ejecutar subprocesos localmente.

Si es necesario, podemos pasar datos del maestro al esclavo. Los metadatos (es decir, JobRepository ) aseguran que cada esclavo se ejecute solo una vez en una sola ejecución del trabajo.

Aquí está el diagrama de secuencia que muestra cómo funciona todo:

Paso de particionamiento

Como se muestra, PartitionStep está impulsando la ejecución. El PartitionHandler es responsable de dividir el trabajo de "Master" en los "Slaves". El paso más a la derecha es el esclavo.

3. El Maven POM

Las dependencias de Maven son las mismas que se mencionan en nuestro artículo anterior. Es decir, Spring Core, Spring Batch y la dependencia para la base de datos (en nuestro caso, SQLite ).

4. Configuración

En nuestro artículo introductorio, vimos un ejemplo de conversión de algunos datos financieros de un archivo CSV a XML. Extendamos el mismo ejemplo.

Aquí, convertiremos la información financiera de 5 archivos CSV a los archivos XML correspondientes, utilizando una implementación de subprocesos múltiples.

Podemos lograr esto usando una sola partición de Trabajo y Paso . Tendremos cinco subprocesos, uno para cada uno de los archivos CSV.

Primero que nada, creemos un trabajo:

@Bean(name = "partitionerJob") public Job partitionerJob() throws UnexpectedInputException, MalformedURLException, ParseException { return jobs.get("partitioningJob") .start(partitionStep()) .build(); }

Como podemos ver, este trabajo comienza con PartitioningStep . Este es nuestro paso maestro que se dividirá en varios pasos esclavos:

@Bean public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("partitionStep") .partitioner("slaveStep", partitioner()) .step(slaveStep()) .taskExecutor(taskExecutor()) .build(); }

Aquí, crearemos PartitioningStep usando StepBuilderFactory . Para eso, necesitamos dar la información sobre los SlaveSteps y el Partitioner .

El Particionador es una interfaz que proporciona la posibilidad de definir un conjunto de valores de entrada para cada uno de los esclavos. En otras palabras, la lógica para dividir las tareas en subprocesos respectivos va aquí.

Creemos una implementación, llamada CustomMultiResourcePartitioner , donde colocaremos los nombres de los archivos de entrada y salida en ExecutionContext para pasar a cada paso esclavo:

public class CustomMultiResourcePartitioner implements Partitioner { @Override public Map partition(int gridSize) { Map map = new HashMap(gridSize); int i = 0, k = 1; for (Resource resource : resources) { ExecutionContext context = new ExecutionContext(); Assert.state(resource.exists(), "Resource does not exist: " + resource); context.putString(keyName, resource.getFilename()); context.putString("opFileName", "output"+k+++".xml"); map.put(PARTITION_KEY + i, context); i++; } return map; } }

También crearemos el bean para esta clase, donde le daremos el directorio fuente para los archivos de entrada:

@Bean public CustomMultiResourcePartitioner partitioner() { CustomMultiResourcePartitioner partitioner = new CustomMultiResourcePartitioner(); Resource[] resources; try { resources = resoursePatternResolver .getResources("file:src/main/resources/input/*.csv"); } catch (IOException e) { throw new RuntimeException("I/O problems when resolving" + " the input file pattern.", e); } partitioner.setResources(resources); return partitioner; }

Definiremos el paso esclavo, como cualquier otro paso con el lector y el escritor. El lector y el escritor serán los mismos que vimos en nuestro ejemplo introductorio, excepto que recibirán el parámetro de nombre de archivo del StepExecutionContext.

Tenga en cuenta que estos beans deben tener un alcance de paso para que puedan recibir los parámetros stepExecutionContext en cada paso. Si no tienen alcance de paso, sus beans se crearán inicialmente y no aceptarán los nombres de archivo en el nivel de paso:

@StepScope @Bean public FlatFileItemReader itemReader( @Value("#{stepExecutionContext[fileName]}") String filename) throws UnexpectedInputException, ParseException { FlatFileItemReader reader = new FlatFileItemReader(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); String[] tokens = {"username", "userid", "transactiondate", "amount"}; tokenizer.setNames(tokens); reader.setResource(new ClassPathResource("input/" + filename)); DefaultLineMapper lineMapper = new DefaultLineMapper(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new RecordFieldSetMapper()); reader.setLinesToSkip(1); reader.setLineMapper(lineMapper); return reader; } 
@Bean @StepScope public ItemWriter itemWriter(Marshaller marshaller, @Value("#{stepExecutionContext[opFileName]}") String filename) throws MalformedURLException { StaxEventItemWriter itemWriter = new StaxEventItemWriter(); itemWriter.setMarshaller(marshaller); itemWriter.setRootTagName("transactionRecord"); itemWriter.setResource(new ClassPathResource("xml/" + filename)); return itemWriter; }

Al mencionar al lector y al escritor en el paso esclavo, podemos pasar los argumentos como nulos, porque estos nombres de archivo no se utilizarán, ya que recibirán los nombres de archivo de stepExecutionContext :

@Bean public Step slaveStep() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("slaveStep").chunk(1) .reader(itemReader(null)) .writer(itemWriter(marshaller(), null)) .build(); }

5. Conclusión

En este tutorial, discutimos cómo implementar un trabajo con procesamiento paralelo usando Spring Batch.

Como siempre, la implementación completa de este ejemplo está disponible en GitHub.