Concurrence Java - Framework Fork-Join

Le framework fork-join permet de casser une certaine tâche sur plusieurs ouvriers puis d'attendre le résultat pour les combiner. Il exploite dans une large mesure la capacité des machines multiprocesseurs. Voici les concepts et objets de base utilisés dans le framework fork-join.

Fourchette

Fork est un processus dans lequel une tâche se divise en sous-tâches plus petites et indépendantes qui peuvent être exécutées simultanément.

Syntaxe

Sum left  = new Sum(array, low, mid);
left.fork();

Ici, Sum est une sous-classe de RecursiveTask et left.fork () transforme la tâche en sous-tâches.

Joindre

Join est un processus dans lequel une tâche joint tous les résultats des sous-tâches une fois que les sous-tâches ont fini de s'exécuter, sinon elle continue d'attendre.

Syntaxe

left.join();

Voici à gauche un objet de la classe Sum.

ForkJoinPool

il s'agit d'un pool de threads spécial conçu pour fonctionner avec la division des tâches de type fork-and-join.

Syntaxe

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

Voici un nouveau ForkJoinPool avec un niveau de parallélisme de 4 processeurs.

RecursiveAction

RecursiveAction représente une tâche qui ne renvoie aucune valeur.

Syntaxe

class Writer extends RecursiveAction {
   @Override
   protected void compute() { }
}

Tâche récursive

RecursiveTask représente une tâche qui renvoie une valeur.

Syntaxe

class Sum extends RecursiveTask<Long> {
   @Override
   protected Long compute() { return null; }
}

Exemple

Le programme TestThread suivant montre l'utilisation du framework Fork-Join dans un environnement basé sur les threads.

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException, 
      ExecutionException {
      
      int nThreads = Runtime.getRuntime().availableProcessors();
      System.out.println(nThreads);
      
      int[] numbers = new int[1000]; 

      for(int i = 0; i < numbers.length; i++) {
         numbers[i] = i;
      }

      ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads);
      Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length));
      System.out.println(result);
   }  

   static class Sum extends RecursiveTask<Long> {
      int low;
      int high;
      int[] array;

      Sum(int[] array, int low, int high) {
         this.array = array;
         this.low   = low;
         this.high  = high;
      }

      protected Long compute() {
         
         if(high - low <= 10) {
            long sum = 0;
            
            for(int i = low; i < high; ++i) 
               sum += array[i];
               return sum;
         } else {	    	
            int mid = low + (high - low) / 2;
            Sum left  = new Sum(array, low, mid);
            Sum right = new Sum(array, mid, high);
            left.fork();
            long rightResult = right.compute();
            long leftResult  = left.join();
            return leftResult + rightResult;
         }
      }
   }
}

Cela produira le résultat suivant.

Production

32
499500