Java Concurrency - Khuôn khổ Fork-Join
Khuôn khổ tham gia rẽ nhánh cho phép phá vỡ một nhiệm vụ nhất định trên một số công nhân và sau đó chờ kết quả để kết hợp chúng. Nó tận dụng công suất của máy đa bộ xử lý ở mức độ lớn. Sau đây là các khái niệm và đối tượng cốt lõi được sử dụng trong khuôn khổ fork-join.
Cái nĩa
Fork là một quá trình trong đó một nhiệm vụ tự phân tách thành các nhiệm vụ con nhỏ hơn và độc lập có thể được thực hiện đồng thời.
Cú pháp
Sum left = new Sum(array, low, mid);
left.fork();
Ở đây Sum là một lớp con của RecursiveTask và left.fork () tách nhiệm vụ thành các nhiệm vụ con.
Tham gia
Tham gia là một quá trình trong đó một nhiệm vụ kết hợp tất cả các kết quả của các nhiệm vụ con khi các nhiệm vụ con đã hoàn thành việc thực thi, nếu không nó sẽ tiếp tục chờ đợi.
Cú pháp
left.join();
Ở đây bên trái là một đối tượng của lớp Sum.
ForkJoinPool
nó là một nhóm chủ đề đặc biệt được thiết kế để hoạt động với phân tách nhiệm vụ fork-và-join.
Cú pháp
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
Đây là ForkJoinPool mới với cấp độ song song của 4 CPU.
RecursiveAction
RecursiveAction đại diện cho một tác vụ không trả về bất kỳ giá trị nào.
Cú pháp
class Writer extends RecursiveAction {
@Override
protected void compute() { }
}
RecursiveTask
RecursiveTask biểu diễn một tác vụ trả về một giá trị.
Cú pháp
class Sum extends RecursiveTask<Long> {
@Override
protected Long compute() { return null; }
}
Thí dụ
Chương trình TestThread sau đây cho thấy cách sử dụng khuôn khổ Fork-Join trong môi trường dựa trên luồng.
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException,
ExecutionException {
int nThreads = Runtime.getRuntime().availableProcessors();
System.out.println(nThreads);
int[] numbers = new int[1000];
for(int i = 0; i < numbers.length; i++) {
numbers[i] = i;
}
ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads);
Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length));
System.out.println(result);
}
static class Sum extends RecursiveTask<Long> {
int low;
int high;
int[] array;
Sum(int[] array, int low, int high) {
this.array = array;
this.low = low;
this.high = high;
}
protected Long compute() {
if(high - low <= 10) {
long sum = 0;
for(int i = low; i < high; ++i)
sum += array[i];
return sum;
} else {
int mid = low + (high - low) / 2;
Sum left = new Sum(array, low, mid);
Sum right = new Sum(array, mid, high);
left.fork();
long rightResult = right.compute();
long leftResult = left.join();
return leftResult + rightResult;
}
}
}
}
Điều này sẽ tạo ra kết quả sau.
Đầu ra
32
499500