1. Información general
El marco de bifurcación / unión se presentó en Java 7. Proporciona herramientas para ayudar a acelerar el procesamiento paralelo al intentar utilizar todos los núcleos de procesador disponibles, lo que se logra mediante un enfoque de dividir y conquistar .
En la práctica, esto significa que el marco primero se "bifurca" , dividiendo recursivamente la tarea en subtareas independientes más pequeñas hasta que sean lo suficientemente simples como para ejecutarse de forma asincrónica.
Después de eso, comienza la parte de "unión" , en la que los resultados de todas las subtareas se unen recursivamente en un solo resultado, o en el caso de una tarea que devuelve vacío, el programa simplemente espera hasta que se ejecutan todas las subtareas.
Para proporcionar una ejecución paralela efectiva, el marco de bifurcación / unión utiliza un grupo de subprocesos llamado ForkJoinPool , que administra subprocesos de trabajo de tipo ForkJoinWorkerThread .
2. ForkJoinPool
El ForkJoinPool es el corazón del marco. Es una implementación del ExecutorService que administra los subprocesos de trabajo y nos proporciona herramientas para obtener información sobre el estado y el rendimiento del grupo de subprocesos.
Los subprocesos de trabajo pueden ejecutar solo una tarea a la vez, pero ForkJoinPool no crea un subproceso separado para cada subtarea. En cambio, cada hilo del grupo tiene su propia cola de dos extremos (o deque, mazo pronunciado ) que almacena tareas.
Esta arquitectura es vital para equilibrar la carga de trabajo del hilo con la ayuda del algoritmo de robo de trabajo.
2.1. Algoritmo de robo de trabajo
En pocas palabras, los hilos libres intentan "robar" el trabajo de los deques de hilos ocupados.
De forma predeterminada, un hilo de trabajo obtiene tareas del jefe de su propio deque. Cuando está vacío, el subproceso toma una tarea de la cola de la cola de otro subproceso ocupado o de la cola de entrada global, ya que aquí es donde es probable que se ubiquen las piezas de trabajo más grandes.
Este enfoque minimiza la posibilidad de que los subprocesos compitan por tareas. También reduce la cantidad de veces que el subproceso tendrá que buscar trabajo, ya que primero funciona en las partes más grandes de trabajo disponibles.
2.2. Creación de instancias de ForkJoinPool
En Java 8, la forma más conveniente de obtener acceso a la instancia de ForkJoinPool es usar su método estático commonPool (). Como sugiere su nombre, esto proporcionará una referencia al grupo común, que es un grupo de subprocesos predeterminado para cada ForkJoinTask .
Según la documentación de Oracle, el uso del fondo común predefinido reduce el consumo de recursos, ya que esto desalienta la creación de un grupo de subprocesos por tarea.
ForkJoinPool commonPool = ForkJoinPool.commonPool();
El mismo comportamiento se puede lograr en Java 7 creando un ForkJoinPool y asignándolo a un campo estático público de una clase de utilidad:
public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);
Ahora se puede acceder fácilmente:
ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;
Con los constructores de ForkJoinPool , es posible crear un grupo de subprocesos personalizado con un nivel específico de paralelismo, fábrica de subprocesos y manejador de excepciones. En el ejemplo anterior, el grupo tiene un nivel de paralelismo de 2. Esto significa que el grupo utilizará 2 núcleos de procesador.
3. ForkJoinTask
ForkJoinTask es el tipo base para las tareas ejecutadas dentro de ForkJoinPool. En la práctica, debería ampliarse una de sus dos subclases: RecursiveAction para tareas nulas y RecursiveTask para tareas que devuelven un valor.Ambos tienen un método abstracto compute () en el que se define la lógica de la tarea.
3.1. RecursiveAction: un ejemplo
En el siguiente ejemplo, la unidad de trabajo que se procesará está representada por una cadena denominada carga de trabajo . Para fines de demostración, la tarea no tiene sentido: simplemente escribe en mayúsculas su entrada y la registra.
Para demostrar el comportamiento de bifurcación del marco, el ejemplo divide la tarea si la carga de trabajo .length () es mayor que un umbral especificadoutilizando el método createSubtask () .
La cadena se divide de forma recursiva en subcadenas, creando instancias CustomRecursiveTask que se basan en estas subcadenas.
Como resultado, el método devuelve una lista.
La lista se envía a ForkJoinPool usando el método invokeAll () :
public class CustomRecursiveAction extends RecursiveAction { private String workload = ""; private static final int THRESHOLD = 4; private static Logger logger = Logger.getAnonymousLogger(); public CustomRecursiveAction(String workload) { this.workload = workload; } @Override protected void compute() { if (workload.length() > THRESHOLD) { ForkJoinTask.invokeAll(createSubtasks()); } else { processing(workload); } } private List createSubtasks() { List subtasks = new ArrayList(); String partOne = workload.substring(0, workload.length() / 2); String partTwo = workload.substring(workload.length() / 2, workload.length()); subtasks.add(new CustomRecursiveAction(partOne)); subtasks.add(new CustomRecursiveAction(partTwo)); return subtasks; } private void processing(String work) { String result = work.toUpperCase(); logger.info("This result - (" + result + ") - was processed by " + Thread.currentThread().getName()); } }
Este patrón se puede utilizar para desarrollar sus propias clases de RecursiveAction . Para hacer esto, cree un objeto que represente la cantidad total de trabajo, elija un umbral adecuado, defina un método para dividir el trabajo y defina un método para realizar el trabajo.
3.2. RecursiveTask
Para las tareas que devuelven un valor, la lógica aquí es similar, excepto que el resultado de cada subtarea se une en un solo resultado:
public class CustomRecursiveTask extends RecursiveTask { private int[] arr; private static final int THRESHOLD = 20; public CustomRecursiveTask(int[] arr) { this.arr = arr; } @Override protected Integer compute() { if (arr.length > THRESHOLD) { return ForkJoinTask.invokeAll(createSubtasks()) .stream() .mapToInt(ForkJoinTask::join) .sum(); } else { return processing(arr); } } private Collection createSubtasks() { List dividedTasks = new ArrayList(); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, 0, arr.length / 2))); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, arr.length / 2, arr.length))); return dividedTasks; } private Integer processing(int[] arr) { return Arrays.stream(arr) .filter(a -> a > 10 && a a * 10) .sum(); } }
En este ejemplo, el trabajo está representado por una matriz almacenada en el campo arr de la clase CustomRecursiveTask . El método createSubtasks () divide de forma recursiva la tarea en piezas de trabajo más pequeñas hasta que cada pieza es más pequeña que el umbral . Luego, el método invokeAll () envía las subtareas al fondo común y devuelve una lista de Future .
Para activar la ejecución, se llama al método join () para cada subtarea.
En este ejemplo, esto se logra utilizando la API Stream de Java 8 ; el método sum () se utiliza como una representación de la combinación de subresultados en el resultado final.
4. Envío de tareas a ForkJoinPool
Para enviar tareas al grupo de subprocesos, se pueden utilizar pocos enfoques.
El método submit () o execute () (sus casos de uso son los mismos):
forkJoinPool.execute(customRecursiveTask); int result = customRecursiveTask.join();
El método invoke () bifurca la tarea y espera el resultado, y no necesita ninguna unión manual:
int result = forkJoinPool.invoke(customRecursiveTask);
El método invokeAll () es la forma más conveniente de enviar una secuencia de ForkJoinTasks a ForkJoinPool. Toma las tareas como parámetros (dos tareas, var args o una colección), luego las bifurcaciones devuelven una colección de objetos Future en el orden en que fueron producidos.
Alternatively, you can use separate fork() and join() methods. The fork() method submits a task to a pool, but it doesn't trigger its execution. The join() method must be used for this purpose. In the case of RecursiveAction, the join() returns nothing but null; for RecursiveTask, it returns the result of the task's execution:
customRecursiveTaskFirst.fork(); result = customRecursiveTaskLast.join();
In our RecursiveTask example we used the invokeAll() method to submit a sequence of subtasks to the pool. The same job can be done with fork() and join(), though this has consequences for the ordering of the results.
Para evitar confusiones, generalmente es una buena idea usar el método invokeAll () para enviar más de una tarea a ForkJoinPool.
5. Conclusiones
El uso del marco fork / join puede acelerar el procesamiento de tareas grandes, pero para lograr este resultado, se deben seguir algunas pautas:
- Utilice la menor cantidad posible de grupos de subprocesos ; en la mayoría de los casos, la mejor decisión es utilizar un grupo de subprocesos por aplicación o sistema
- Utilice el grupo de subprocesos comunes predeterminado, si no se necesita un ajuste específico
- Utilice un umbral razonable para dividir ForkJoinTask en subtareas
- Evite cualquier bloqueo en su ForkJoinTasks
Los ejemplos utilizados en este artículo están disponibles en el repositorio de GitHub vinculado.