Procesamiento por lotes de Java EE 7

1. Introducción

Imagina que tuviéramos que completar manualmente tareas como procesar nóminas, calcular intereses y generar facturas. ¡Se volvería bastante aburrido, propenso a errores y una lista interminable de tareas manuales!

En este tutorial, echaremos un vistazo al procesamiento por lotes de Java (JSR 352), una parte de la plataforma Jakarta EE, y una gran especificación para automatizar tareas como estas. Ofrece a los desarrolladores de aplicaciones un modelo para desarrollar sistemas sólidos de procesamiento por lotes para que puedan centrarse en la lógica empresarial.

2. Dependencias de Maven

Dado que JSR 352 es solo una especificación, necesitaremos incluir su API y su implementación, como jberet :

 javax.batch javax.batch-api 1.0.1   org.jberet jberet-core 1.0.2.Final   org.jberet jberet-support 1.0.2.Final   org.jberet jberet-se 1.0.2.Final 

También agregaremos una base de datos en memoria para que podamos ver algunos escenarios más realistas.

3. Conceptos clave

JSR 352 introduce algunos conceptos, que podemos ver de esta manera:

Primero definamos cada pieza:

  • Comenzando por la izquierda, tenemos JobOperator . Que gestiona todos los aspectos del trabajo de procesamiento, tales como iniciar, detener y reiniciar
  • A continuación, tenemos el trabajo . Un trabajo es una colección lógica de pasos; encapsula todo un proceso por lotes
  • Un trabajo contendrá entre 1 y n pasos . Cada paso es una unidad de trabajo secuencial e independiente. Un paso se compone de leer la entrada, procesar esa entrada y escribir la salida.
  • Y por último, pero no menos importante, tenemos el JobRepository que almacena la información de ejecución de los trabajos. Ayuda a realizar un seguimiento de los trabajos, su estado y los resultados de finalización.

Los pasos tienen un poco más de detalle que esto, así que echemos un vistazo a eso a continuación. Primero, veremos los pasos de Chunk y luego los de Batchlet .

4. Crear un fragmento

Como se dijo anteriormente, un trozo es una especie de paso . A menudo usamos un fragmento para expresar una operación que se realiza una y otra vez, por ejemplo, sobre un conjunto de elementos. Es una especie de operaciones intermedias de Java Streams.

Al describir un fragmento, necesitamos expresar de dónde sacar los elementos, cómo procesarlos y dónde enviarlos después.

4.1. Artículos de lectura

Para leer elementos, necesitaremos implementar ItemReader.

En este caso, crearemos un lector que simplemente emitirá los números del 1 al 10:

@Named public class SimpleChunkItemReader extends AbstractItemReader { private Integer[] tokens; private Integer count; @Inject JobContext jobContext; @Override public Integer readItem() throws Exception { if (count >= tokens.length) { return null; } jobContext.setTransientUserData(count); return tokens[count++]; } @Override public void open(Serializable checkpoint) throws Exception { tokens = new Integer[] { 1,2,3,4,5,6,7,8,9,10 }; count = 0; } }

Ahora, solo estamos leyendo del estado interno de la clase aquí. Pero, por supuesto, readItem podría extraer de una base de datos , del sistema de archivos o de alguna otra fuente externa.

Tenga en cuenta que estamos guardando parte de este estado interno usando JobContext # setTransientUserData () que será útil más adelante.

Además, tenga en cuenta el parámetro de punto de control . Volveremos a retomar eso también.

4.2. Procesamiento de artículos

Por supuesto, la razón por la que estamos fragmentando es porque queremos realizar algún tipo de operación en nuestros artículos.

Cada vez que devolvemos un valor nulo de un procesador de artículos, eliminamos ese artículo del lote.

Entonces, digamos aquí que queremos mantener solo los números pares. Podemos usar un ItemProcessor que rechaza los impares devolviendo null :

@Named public class SimpleChunkItemProcessor implements ItemProcessor { @Override public Integer processItem(Object t) { Integer item = (Integer) t; return item % 2 == 0 ? item : null; } }

processItem se llamará una vez por cada elemento que emita nuestro ItemReader .

4.3. Artículos de escritura

Finalmente, el trabajo invocará ItemWriter para que podamos escribir nuestros elementos transformados:

@Named public class SimpleChunkWriter extends AbstractItemWriter { List processed = new ArrayList(); @Override public void writeItems(List items) throws Exception { items.stream().map(Integer.class::cast).forEach(processed::add); } } 

¿Cuánto duran los artículos ? En un momento, definiremos el tamaño de un fragmento, que determinará el tamaño de la lista que se envía a writeItems .

4.4. Definición de una parte de un trabajo

Ahora reunimos todo esto en un archivo XML usando JSL o Job Specification Language. Tenga en cuenta que enumeraremos nuestro lector, procesador, fragmento y también un tamaño de fragmento:

El tamaño del fragmento es la frecuencia con la que el progreso del fragmento se compromete con el repositorio de trabajos , lo cual es importante para garantizar la finalización en caso de que una parte del sistema falle.

Necesitaremos colocar este archivo en META-INF / batch-jobs para. jar y en WEB-INF / classes / META-INF / batch-jobs para archivos .war .

Le dimos a nuestro trabajo la identificación "simpleChunk", así que intentémoslo en una prueba unitaria.

Ahora, los trabajos se ejecutan de forma asincrónica, lo que los hace difíciles de probar. En la muestra, asegúrese de revisar nuestro BatchTestHelper que sondea y espera hasta que se completa el trabajo:

@Test public void givenChunk_thenBatch_completesWithSuccess() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleChunk", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobExecution = BatchTestHelper.keepTestAlive(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); } 

Entonces eso es lo que son los trozos. Ahora, echemos un vistazo a los lotes.

5. Creación de un lote

No todo encaja perfectamente en un modelo iterativo. Por ejemplo, es posible que tengamos una tarea que simplemente necesitemos invocar una vez, ejecutarla hasta su finalización y devolver un estado de salida.

El contrato de un lote es bastante simple:

@Named public class SimpleBatchLet extends AbstractBatchlet { @Override public String process() throws Exception { return BatchStatus.COMPLETED.toString(); } }

Como es el JSL:

Y podemos probarlo usando el mismo enfoque que antes:

@Test public void givenBatchlet_thenBatch_completeWithSuccess() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleBatchLet", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobExecution = BatchTestHelper.keepTestAlive(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

Entonces, hemos analizado un par de formas diferentes de implementar pasos.

Ahora veamos los mecanismos para marcar y garantizar el progreso.

6. Punto de control personalizado

Es probable que las fallas ocurran en medio de un trabajo. ¿Deberíamos empezar de nuevo, o podemos empezar de algún modo donde lo dejamos?

Como sugiere el nombre, los puntos de control nos ayudan a establecer periódicamente un marcador en caso de falla.

De forma predeterminada, el final del procesamiento de fragmentos es un punto de control natural .

Sin embargo, podemos personalizarlo con nuestro propio CheckpointAlgorithm :

@Named public class CustomCheckPoint extends AbstractCheckpointAlgorithm { @Inject JobContext jobContext; @Override public boolean isReadyToCheckpoint() throws Exception { int counterRead = (Integer) jobContext.getTransientUserData(); return counterRead % 5 == 0; } }

¿Recuerda el recuento que colocamos en datos transitorios antes? Aquí, podemos sacarlo con JobContext # getTransientUserDatapara indicar que queremos comprometernos en cada quinto número procesado.

Sin esto, se produciría una confirmación al final de cada fragmento, o en nuestro caso, cada tercer número.

Y luego, lo emparejamos con la directiva de algoritmo de pago en nuestro XML debajo de nuestro fragmento :

Probemos el código, notando nuevamente que algunos de los pasos estándar están ocultos en BatchTestHelper :

@Test public void givenChunk_whenCustomCheckPoint_thenCommitCountIsThree() throws Exception { // ... start job and wait for completion jobOperator.getStepExecutions(executionId) .stream() .map(BatchTestHelper::getCommitCount) .forEach(count -> assertEquals(3L, count.longValue())); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

Por lo tanto, podríamos estar esperando un recuento de confirmaciones de 2 ya que tenemos diez elementos y configuramos las confirmaciones para que sean cada quinto elemento. Pero, el marco hace una confirmación de lectura final más al final para garantizar que todo se haya procesado, que es lo que nos lleva a 3.

A continuación, veamos cómo manejar los errores.

7. Manejo de excepciones

De forma predeterminada, el operador del trabajo marcará nuestro trabajo como FALLIDO en caso de una excepción.

Cambiemos nuestro lector de artículos para asegurarnos de que falle:

@Override public Integer readItem() throws Exception { if (tokens.hasMoreTokens()) { String tempTokenize = tokens.nextToken(); throw new RuntimeException(); } return null; }

Y luego prueba:

@Test public void whenChunkError_thenBatch_CompletesWithFailed() throws Exception { // ... start job and wait for completion assertEquals(jobExecution.getBatchStatus(), BatchStatus.FAILED); }

But, we can override this default behavior in a number of ways:

  • skip-limit specifies the number of exceptions this step will ignore before failing
  • retry-limit specifies the number of times the job operator should retry the step before failing
  • skippable-exception-class specifies a set of exceptions that chunk processing will ignore

So, we can edit our job so that it ignores RuntimeException, as well as a few others, just for illustration:

And now our code will pass:

@Test public void givenChunkError_thenErrorSkipped_CompletesWithSuccess() throws Exception { // ... start job and wait for completion jobOperator.getStepExecutions(executionId).stream() .map(BatchTestHelper::getProcessSkipCount) .forEach(skipCount -> assertEquals(1L, skipCount.longValue())); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8. Executing Multiple Steps

We mentioned earlier that a job can have any number of steps, so let's see that now.

8.1. Firing the Next Step

By default, each step is the last step in the job.

In order to execute the next step within a batch job, we'll have to explicitly specify by using the next attribute within the step definition:

If we forget this attribute, then the next step in sequence will not get executed.

And we can see what this looks like in the API:

@Test public void givenTwoSteps_thenBatch_CompleteWithSuccess() throws Exception { // ... start job and wait for completion assertEquals(2 , jobOperator.getStepExecutions(executionId).size()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8.2. Flows

A sequence of steps can also be encapsulated into a flow. When the flow is finished, it is the entire flow that transitions to the execution element. Also, elements inside the flow can't transition to elements outside the flow.

We can, say, execute two steps inside a flow, and then have that flow transition to an isolated step:

And we can still see each step execution independently:

@Test public void givenFlow_thenBatch_CompleteWithSuccess() throws Exception { // ... start job and wait for completion assertEquals(3, jobOperator.getStepExecutions(executionId).size()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8.3. Decisions

We also have if/else support in the form of decisions. Decisions provide a customized way of determining a sequence among steps, flows, and splits.

Like steps, it works on transition elements such as next which can direct or terminate job execution.

Let's see how the job can be configured:

Any decision element needs to be configured with a class that implements Decider. Its job is to return a decision as a String.

Each next inside decision is like a case in a switch statement.

8.4. Splits

Splits are handy since they allow us to execute flows concurrently:

Of course, this means that the order isn't guaranteed.

Let's confirm that they still all get run. The flow steps will be performed in an arbitrary order, but the isolated step will always be last:

@Test public void givenSplit_thenBatch_CompletesWithSuccess() throws Exception { // ... start job and wait for completion List stepExecutions = jobOperator.getStepExecutions(executionId); assertEquals(3, stepExecutions.size()); assertEquals("splitJobSequenceStep3", stepExecutions.get(2).getStepName()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

9. Partitioning a Job

We can also consume the batch properties within our Java code which have been defined in our job.

They can be scoped at three levels – the job, the step, and the batch-artifact.

Let's see some examples of how they consumed.

When we want to consume the properties at job level:

@Inject JobContext jobContext; ... jobProperties = jobContext.getProperties(); ...

This can be consumed at a step level as well:

@Inject StepContext stepContext; ... stepProperties = stepContext.getProperties(); ...

When we want to consume the properties at batch-artifact level:

@Inject @BatchProperty(name = "name") private String nameString;

This comes in handy with partitions.

See, with splits, we can run flows concurrently. But we can also partition a step into n sets of items or set separate inputs, allowing us another way to split up the work across multiple threads.

To comprehend the segment of work each partition should do, we can combine properties with partitions:

10. Stop and Restart

Now, that's it for defining jobs. Now let's talk for a minute about managing them.

We've already seen in our unit tests that we can get an instance of JobOperator from BatchRuntime:

JobOperator jobOperator = BatchRuntime.getJobOperator();

And then, we can start the job:

Long executionId = jobOperator.start("simpleBatchlet", new Properties());

However, we can also stop the job:

jobOperator.stop(executionId);

And lastly, we can restart the job:

executionId = jobOperator.restart(executionId, new Properties());

Let's see how we can stop a running job:

@Test public void givenBatchLetStarted_whenStopped_thenBatchStopped() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleBatchLet", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobOperator.stop(executionId); jobExecution = BatchTestHelper.keepTestStopped(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED); }

And if a batch is STOPPED, then we can restart it:

@Test public void givenBatchLetStopped_whenRestarted_thenBatchCompletesSuccess() { // ... start and stop the job assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED); executionId = jobOperator.restart(jobExecution.getExecutionId(), new Properties()); jobExecution = BatchTestHelper.keepTestAlive(jobOperator.getJobExecution(executionId)); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

11. Fetching Jobs

When a batch job is submitted then the batch runtime creates an instance of JobExecution to track it.

To obtain the JobExecution for an execution id, we can use the JobOperator#getJobExecution(executionId) method.

And, StepExecution provides helpful information for tracking a step's execution.

To obtain the StepExecution for an execution id, we can use the JobOperator#getStepExecutions(executionId) method.

And from that, we can get several metrics about the step via StepExecution#getMetrics:

@Test public void givenChunk_whenJobStarts_thenStepsHaveMetrics() throws Exception { // ... start job and wait for completion assertTrue(jobOperator.getJobNames().contains("simpleChunk")); assertTrue(jobOperator.getParameters(executionId).isEmpty()); StepExecution stepExecution = jobOperator.getStepExecutions(executionId).get(0); Map metricTest = BatchTestHelper.getMetricsMap(stepExecution.getMetrics()); assertEquals(10L, metricTest.get(Metric.MetricType.READ_COUNT).longValue()); assertEquals(5L, metricTest.get(Metric.MetricType.FILTER_COUNT).longValue()); assertEquals(4L, metricTest.get(Metric.MetricType.COMMIT_COUNT).longValue()); assertEquals(5L, metricTest.get(Metric.MetricType.WRITE_COUNT).longValue()); // ... and many more! }

12. Disadvantages

JSR 352 is powerful, though it is lacking in a number of areas:

  • Parece haber una falta de lectores y escritores que puedan procesar otros formatos como JSON
  • No hay soporte para genéricos
  • La partición solo admite un solo paso
  • La API no ofrece nada para admitir la programación (aunque J2EE tiene un módulo de programación separado)
  • Debido a su naturaleza asincrónica, las pruebas pueden ser un desafío
  • La API es bastante detallada

13. Conclusión

En este artículo, analizamos JSR 352 y aprendimos sobre fragmentos, lotes, divisiones, flujos y mucho más. Sin embargo, apenas hemos arañado la superficie.

Como siempre, el código de demostración se puede encontrar en GitHub.