Concorrenza Java - Framework Fork-Join
Il framework fork-join consente di interrompere una determinata attività su più worker e quindi attendere il risultato per combinarli. Sfrutta in larga misura la capacità della macchina multiprocessore. Di seguito sono riportati i concetti e gli oggetti principali utilizzati nel framework fork-join.
Forchetta
Fork è un processo in cui un'attività si divide in sotto-attività più piccole e indipendenti che possono essere eseguite contemporaneamente.
Sintassi
Sum left  = new Sum(array, low, mid);
left.fork();
Qui Sum è una sottoclasse di RecursiveTask e left.fork () suddivide l'attività in sotto-attività.
Aderire
Join è un processo in cui un'attività unisce tutti i risultati delle sottoattività una volta terminata l'esecuzione delle sottoattività, altrimenti continua ad attendere.
Sintassi
left.join();
Qui a sinistra c'è un oggetto della classe Sum.
ForkJoinPool
è uno speciale pool di thread progettato per funzionare con la suddivisione delle attività fork-and-join.
Sintassi
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
Ecco un nuovo ForkJoinPool con un livello di parallelismo di 4 CPU.
RecursiveAction
RecursiveAction rappresenta un'attività che non restituisce alcun valore.
Sintassi
class Writer extends RecursiveAction {
   @Override
   protected void compute() { }
}
RecursiveTask
RecursiveTask rappresenta un'attività che restituisce un valore.
Sintassi
class Sum extends RecursiveTask<Long> {
   @Override
   protected Long compute() { return null; }
}
Esempio
Il seguente programma TestThread mostra l'utilizzo del framework Fork-Join in un ambiente basato su thread.
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class TestThread {
   public static void main(final String[] arguments) throws InterruptedException, 
      ExecutionException {
      
      int nThreads = Runtime.getRuntime().availableProcessors();
      System.out.println(nThreads);
      
      int[] numbers = new int[1000]; 
      for(int i = 0; i < numbers.length; i++) {
         numbers[i] = i;
      }
      ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads);
      Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length));
      System.out.println(result);
   }  
   static class Sum extends RecursiveTask<Long> {
      int low;
      int high;
      int[] array;
      Sum(int[] array, int low, int high) {
         this.array = array;
         this.low   = low;
         this.high  = high;
      }
      protected Long compute() {
         
         if(high - low <= 10) {
            long sum = 0;
            
            for(int i = low; i < high; ++i) 
               sum += array[i];
               return sum;
         } else {	    	
            int mid = low + (high - low) / 2;
            Sum left  = new Sum(array, low, mid);
            Sum right = new Sum(array, mid, high);
            left.fork();
            long rightResult = right.compute();
            long leftResult  = left.join();
            return leftResult + rightResult;
         }
      }
   }
}
Questo produrrà il seguente risultato.
Produzione
32
499500