Fork/Join 框架为Java数据并行提供了高性能的细粒度任务执行框架。它的并行计算引擎被许多更高级别的框架使用。 fork/join 框架支持一种并行编程风格,通过“分而治之”来解决问题,如下图所示:
- 将任务拆分为子任务。
- 并行解决子任务
- 子任务可以在不同的内核上并行运行。
- 子任务也可以在单个内核的不同线程中并发运行。
- 等待他们完成
- join() 等待子任务完成
- 合并结果。
- 任务使用调用 join() 将子任务结果合并在一起。
如果一个任务没有返回结果,那么它只是等待它的子任务完成。
下面是一个Java程序来演示 Fork/Join Framework 的工作:
Java
// Java program to demonstrate the working of Fork/Join
// Framework
// Importing required libraries
import java.io.*;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
// Class 1
// helper class
class SearchTask extends RecursiveTask {
// Global variables
int array[];
int start, end;
int searchElement;
// Constructor for initialising globals
public SearchTask(int array[], int start, int end,
int searchElement)
{
// This keyword regers to current object itself
this.array = array;
this.start = start;
this.end = end;
this.searchElement = searchElement;
}
// Method
// @Override
protected Integer compute()
{
// Returns the count computed by processSearch
return processSearch();
}
// Method
// To count the the count of searched element
private Integer processSearch()
{
// Initially count os set to zero
int count = 0;
// iterating using for loop
for (int i = start; i <= end; i++) {
// if element is present in array
if (array[i] == searchElement) {
// Increment the count
count++;
}
}
// Returning the count of searched element
return count;
}
}
// Class 2
// Main class
public class GFG {
// main driver method
public static void main(String args[])
{
// Custom inpu array elements
int array[] = { 1, 2, 6, 3, 4, 5, 6,
7, 8, 9, 10, 11, 12, 6 };
// Custom element to be searched in array
int searchElement = 6;
// initializing starting and ending indices
int start = 0;
int end = array.length - 1;
// Creating object of ForkJoinPool class
ForkJoinPool pool = ForkJoinPool.commonPool();
// Now creating object of aboce class
SearchTask task = new SearchTask(array, start, end,
searchElement);
int result = pool.invoke(task);
// Print and display the searched elememnt
// If found do display out the number of times it is
// found
System.out.println(searchElement + " found "
+ result + " times ");
}
}
Java
// Java program to demonstrate the working of
// ExecutorService
// Importing required libraries
import java.io.*;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
// Class 1
// helper class extending Runnable interface
class Service implements Runnable {
// member variable of this class
int i;
// Constructor of this class
public Service(int i)
{
// Initializing the counter variable
this.i = i;
}
// Method
// @Override
public void run()
{
// Printing the counter
System.out.println(i + " ");
// Try block to check for exceptions
try {
// Making thread to sleep for 1 second
// using the sleep() method
Thread.sleep(1000);
}
// Catch block to handle the exceptions
catch (InterruptedException e) {
// Print the line number and the corresponding
// exception occured
e.printStackTrace();
}
}
}
// Class 2
// Main class
// ExecutorUtility
public class GFG {
// Main driver method
public static void main(String[] args)
{
// Creating an object of ExecutorService class to
// create fixed size thread pool
ExecutorService es
= Executors.newFixedThreadPool(5);
// Print the time difference before completion
System.out.println(new Date());
for (int i = 0; i < 25; i++) {
// Executes the given command at some time in
// the future
es.execute(new Service(i));
}
// Executor is shut down so that
// its task can be considered complete
es.shutdown();
// Print the time difference after completion
System.out.println(new Date());
}
}
输出
6 found 3 times
现在深入探讨Java ExecutorService 接口扩展了 Executor,因此我们获得了由 Executor 定义的唯一的 execute(Runnable) 方法。与Java Executor 相比, Java ExecutorService 中有更多的方法。 ExecutorService 接口中的一些方法可用于提交一个或多个任务并返回称为未来的东西(本质上是对在后台并发或异步运行的计算结果的代理)。
ExecutorService 以如下方式工作:
- 提交 1+ 个任务并返回这些任务的期货。
- 管理任务和执行器服务本身的生命周期,例如,中断池中的工作线程。
- ExecutorService 实例可以处于三种状态之一
- 运行:通过工厂方法创建之后。
- 关机:被正常或突然关机后。
- 终止:毕竟任务已经完成。
执行:
例子
Java
// Java program to demonstrate the working of
// ExecutorService
// Importing required libraries
import java.io.*;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
// Class 1
// helper class extending Runnable interface
class Service implements Runnable {
// member variable of this class
int i;
// Constructor of this class
public Service(int i)
{
// Initializing the counter variable
this.i = i;
}
// Method
// @Override
public void run()
{
// Printing the counter
System.out.println(i + " ");
// Try block to check for exceptions
try {
// Making thread to sleep for 1 second
// using the sleep() method
Thread.sleep(1000);
}
// Catch block to handle the exceptions
catch (InterruptedException e) {
// Print the line number and the corresponding
// exception occured
e.printStackTrace();
}
}
}
// Class 2
// Main class
// ExecutorUtility
public class GFG {
// Main driver method
public static void main(String[] args)
{
// Creating an object of ExecutorService class to
// create fixed size thread pool
ExecutorService es
= Executors.newFixedThreadPool(5);
// Print the time difference before completion
System.out.println(new Date());
for (int i = 0; i < 25; i++) {
// Executes the given command at some time in
// the future
es.execute(new Service(i));
}
// Executor is shut down so that
// its task can be considered complete
es.shutdown();
// Print the time difference after completion
System.out.println(new Date());
}
}
输出:
现在最后让我们总结一下 Fork/Join Framework 和 ExecutorService 的区别如下:
Fork/Join Framework | ExecutorService |
---|---|
The Fork/Join framework in Java 7 is an implementation of the Divide and Conquer algorithm, in which a central ForkJoinPool executes branching ForkJoinTasks. | ExecutorService is an Executor that provides methods to manage the progress-tracking and termination of asynchronous tasks. |
Fork/Join Framework makes use of Work Stealing Algorithm. In the Fork/Join framework, when a task is waiting for the completion of the sub-tasks it has created using the join operation, the worker thread that is executing that task looks for another task that has not been executed yet and steals them to start their execution. | Unlike Fork/Join Framework, when a task is waiting for the completion of the sub-tasks it has created using the join operation, the worker thread that is executing that waiting task doesn’t look for another task. |
Fork-join is wonderful for recursive problems, where a task involves running subtasks and then processing their results. | If you try to solve a recursive problem like this using ExecutorService, you end up with threads tied up waiting for other threads to deliver results to them. |
Fork Join is an implementation of ExecuterService. The main difference is that this implementation creates a DEQUE worker pool. | Executor service creates asked number of thread, and apply a blocking queue to store all the remaining waiting task. |