Java Concurrency - Fork-Join-Framework

Das Fork-Join-Framework ermöglicht es, eine bestimmte Aufgabe für mehrere Worker zu unterbrechen und dann auf das Ergebnis zu warten, um sie zu kombinieren. Es nutzt die Kapazität der Multiprozessor-Maschine in hohem Maße. Im Folgenden sind die Kernkonzepte und -objekte aufgeführt, die im Fork-Join-Framework verwendet werden.

Gabel

Fork ist ein Prozess, bei dem sich eine Aufgabe in kleinere und unabhängige Unteraufgaben aufteilt, die gleichzeitig ausgeführt werden können.

Syntax

Sum left  = new Sum(array, low, mid);
left.fork();

Hier ist Sum eine Unterklasse von RecursiveTask und left.fork () fasst die Aufgabe in Unteraufgaben zusammen.

Beitreten

Join ist ein Prozess, bei dem eine Aufgabe alle Ergebnisse von Unteraufgaben zusammenfügt, sobald die Ausführung der Unteraufgaben abgeschlossen ist. Andernfalls wartet sie weiter.

Syntax

left.join();

Hier links ist ein Objekt der Summenklasse.

ForkJoinPool

Es handelt sich um einen speziellen Thread-Pool, der für die Aufteilung von Fork-and-Join-Aufgaben entwickelt wurde.

Syntax

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

Hier ein neuer ForkJoinPool mit einer Parallelitätsstufe von 4 CPUs.

Rekursive Aktion

RecursiveAction stellt eine Aufgabe dar, die keinen Wert zurückgibt.

Syntax

class Writer extends RecursiveAction {
   @Override
   protected void compute() { }
}

RecursiveTask

RecursiveTask stellt eine Aufgabe dar, die einen Wert zurückgibt.

Syntax

class Sum extends RecursiveTask<Long> {
   @Override
   protected Long compute() { return null; }
}

Beispiel

Das folgende TestThread-Programm zeigt die Verwendung des Fork-Join-Frameworks in einer threadbasierten Umgebung.

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException, 
      ExecutionException {
      
      int nThreads = Runtime.getRuntime().availableProcessors();
      System.out.println(nThreads);
      
      int[] numbers = new int[1000]; 

      for(int i = 0; i < numbers.length; i++) {
         numbers[i] = i;
      }

      ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads);
      Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length));
      System.out.println(result);
   }  

   static class Sum extends RecursiveTask<Long> {
      int low;
      int high;
      int[] array;

      Sum(int[] array, int low, int high) {
         this.array = array;
         this.low   = low;
         this.high  = high;
      }

      protected Long compute() {
         
         if(high - low <= 10) {
            long sum = 0;
            
            for(int i = low; i < high; ++i) 
               sum += array[i];
               return sum;
         } else {	    	
            int mid = low + (high - low) / 2;
            Sum left  = new Sum(array, low, mid);
            Sum right = new Sum(array, mid, high);
            left.fork();
            long rightResult = right.compute();
            long leftResult  = left.join();
            return leftResult + rightResult;
         }
      }
   }
}

Dies führt zu folgendem Ergebnis.

Ausgabe

32
499500