Java Concurrency - Khuôn khổ Fork-Join

Khuôn khổ tham gia rẽ nhánh cho phép phá vỡ một nhiệm vụ nhất định trên một số công nhân và sau đó chờ kết quả để kết hợp chúng. Nó tận dụng công suất của máy đa bộ xử lý ở mức độ lớn. Sau đây là các khái niệm và đối tượng cốt lõi được sử dụng trong khuôn khổ fork-join.

Cái nĩa

Fork là một quá trình trong đó một nhiệm vụ tự phân tách thành các nhiệm vụ con nhỏ hơn và độc lập có thể được thực hiện đồng thời.

Cú pháp

Sum left  = new Sum(array, low, mid);
left.fork();

Ở đây Sum là một lớp con của RecursiveTask và left.fork () tách nhiệm vụ thành các nhiệm vụ con.

Tham gia

Tham gia là một quá trình trong đó một nhiệm vụ kết hợp tất cả các kết quả của các nhiệm vụ con khi các nhiệm vụ con đã hoàn thành việc thực thi, nếu không nó sẽ tiếp tục chờ đợi.

Cú pháp

left.join();

Ở đây bên trái là một đối tượng của lớp Sum.

ForkJoinPool

nó là một nhóm chủ đề đặc biệt được thiết kế để hoạt động với phân tách nhiệm vụ fork-và-join.

Cú pháp

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

Đây là ForkJoinPool mới với cấp độ song song của 4 CPU.

RecursiveAction

RecursiveAction đại diện cho một tác vụ không trả về bất kỳ giá trị nào.

Cú pháp

class Writer extends RecursiveAction {
   @Override
   protected void compute() { }
}

RecursiveTask

RecursiveTask biểu diễn một tác vụ trả về một giá trị.

Cú pháp

class Sum extends RecursiveTask<Long> {
   @Override
   protected Long compute() { return null; }
}

Thí dụ

Chương trình TestThread sau đây cho thấy cách sử dụng khuôn khổ Fork-Join trong môi trường dựa trên luồng.

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException, 
      ExecutionException {
      
      int nThreads = Runtime.getRuntime().availableProcessors();
      System.out.println(nThreads);
      
      int[] numbers = new int[1000]; 

      for(int i = 0; i < numbers.length; i++) {
         numbers[i] = i;
      }

      ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads);
      Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length));
      System.out.println(result);
   }  

   static class Sum extends RecursiveTask<Long> {
      int low;
      int high;
      int[] array;

      Sum(int[] array, int low, int high) {
         this.array = array;
         this.low   = low;
         this.high  = high;
      }

      protected Long compute() {
         
         if(high - low <= 10) {
            long sum = 0;
            
            for(int i = low; i < high; ++i) 
               sum += array[i];
               return sum;
         } else {	    	
            int mid = low + (high - low) / 2;
            Sum left  = new Sum(array, low, mid);
            Sum right = new Sum(array, mid, high);
            left.fork();
            long rightResult = right.compute();
            long leftResult  = left.join();
            return leftResult + rightResult;
         }
      }
   }
}

Điều này sẽ tạo ra kết quả sau.

Đầu ra

32
499500