Java Concurrency - Fork-Join-Framework
Das Fork-Join-Framework ermöglicht es, eine bestimmte Aufgabe für mehrere Worker zu unterbrechen und dann auf das Ergebnis zu warten, um sie zu kombinieren. Es nutzt die Kapazität der Multiprozessor-Maschine in hohem Maße. Im Folgenden sind die Kernkonzepte und -objekte aufgeführt, die im Fork-Join-Framework verwendet werden.
Gabel
Fork ist ein Prozess, bei dem sich eine Aufgabe in kleinere und unabhängige Unteraufgaben aufteilt, die gleichzeitig ausgeführt werden können.
Syntax
Sum left = new Sum(array, low, mid);
left.fork();
Hier ist Sum eine Unterklasse von RecursiveTask und left.fork () fasst die Aufgabe in Unteraufgaben zusammen.
Beitreten
Join ist ein Prozess, bei dem eine Aufgabe alle Ergebnisse von Unteraufgaben zusammenfügt, sobald die Ausführung der Unteraufgaben abgeschlossen ist. Andernfalls wartet sie weiter.
Syntax
left.join();
Hier links ist ein Objekt der Summenklasse.
ForkJoinPool
Es handelt sich um einen speziellen Thread-Pool, der für die Aufteilung von Fork-and-Join-Aufgaben entwickelt wurde.
Syntax
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
Hier ein neuer ForkJoinPool mit einer Parallelitätsstufe von 4 CPUs.
Rekursive Aktion
RecursiveAction stellt eine Aufgabe dar, die keinen Wert zurückgibt.
Syntax
class Writer extends RecursiveAction {
@Override
protected void compute() { }
}
RecursiveTask
RecursiveTask stellt eine Aufgabe dar, die einen Wert zurückgibt.
Syntax
class Sum extends RecursiveTask<Long> {
@Override
protected Long compute() { return null; }
}
Beispiel
Das folgende TestThread-Programm zeigt die Verwendung des Fork-Join-Frameworks in einer threadbasierten Umgebung.
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException,
ExecutionException {
int nThreads = Runtime.getRuntime().availableProcessors();
System.out.println(nThreads);
int[] numbers = new int[1000];
for(int i = 0; i < numbers.length; i++) {
numbers[i] = i;
}
ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads);
Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length));
System.out.println(result);
}
static class Sum extends RecursiveTask<Long> {
int low;
int high;
int[] array;
Sum(int[] array, int low, int high) {
this.array = array;
this.low = low;
this.high = high;
}
protected Long compute() {
if(high - low <= 10) {
long sum = 0;
for(int i = low; i < high; ++i)
sum += array[i];
return sum;
} else {
int mid = low + (high - low) / 2;
Sum left = new Sum(array, low, mid);
Sum right = new Sum(array, mid, high);
left.fork();
long rightResult = right.compute();
long leftResult = left.join();
return leftResult + rightResult;
}
}
}
}
Dies führt zu folgendem Ergebnis.
Ausgabe
32
499500