📜  Java – ForkJoinPool 与 ExecutorService(1)

📅  最后修改于: 2023-12-03 15:01:33.083000             🧑  作者: Mango

Java – ForkJoinPool 与 ExecutorService

在 Java 中,线程池是一种非常重要的机制,他可以帮助我们优化程序并发处理能力。Java 的并发包提供了许多线程池实现,其中最常用的是 ExecutorService。最近几个版本的 Java 中,加入了新的线程池实现 ForkJoinPool,下面我们来介绍一下这两种线程池的使用方法及其区别。

ExecutorService

ExecutorService 是 Java 标准库中的线程池框架,使用方便简单。它提供了一系列的方法来提交任务并管理任务的执行。最常用的方法是 submitinvokeAll,这两个方法都可以提交一个任务(RunnableCallable),并返回一个 Future 对象,用于获取任务的执行结果。

以下是一个使用 ExecutorService 的示例代码:

import java.util.concurrent.*;

public class ExecutorServiceDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        Future<Integer> future = executorService.submit(() -> {
            Thread.sleep(1000);
            return 42;
        });
        System.out.println(future.get());
        executorService.shutdown();
    }
}

在这个示例中,我们创建了一个固定线程数为 10 的 ExecutorService,并提交了一个任务,该任务会睡眠 1 秒钟,然后返回 42。使用 future.get() 方法可以获取任务的执行结果,最后需要调用 executorService.shutdown() 来关闭线程池。

ForkJoinPool

ForkJoinPool 是一种特殊的线程池,它的名字就表明了它的特点:它会将一个大任务分成若干个小任务,然后提交到线程池中执行。当小任务执行完毕后,它会将结果合并起来,最终得到大任务的结果。这个过程称为分治算法,它在很多的并行算法中广泛应用。

使用 ForkJoinPool 需要继承 RecursiveTaskRecursiveAction,这两个类都实现了 ForkJoinTask 接口,可以提交到 ForkJoinPool 中执行。其中,RecursiveTask 可以返回一个结果,RecursiveAction 不需要返回。

以下是一个使用 ForkJoinPool 的示例代码:

import java.util.Arrays;
import java.util.concurrent.*;

public class ForkJoinDemo extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 1000;
    private final int[] array;
    private final int start, end;

    public ForkJoinDemo(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end - start <= THRESHOLD) {
            return Arrays.stream(array, start, end).sum();
        } else {
            int mid = (start + end) >>> 1;
            ForkJoinDemo left = new ForkJoinDemo(array, start, mid);
            ForkJoinDemo right = new ForkJoinDemo(array, mid, end);
            left.fork();
            int rightResult = right.compute();
            int leftResult = left.join();
            return leftResult + rightResult;
        }
    }

    public static void main(String[] args) {
        int[] array = new int[1000000];
        Arrays.fill(array, 1);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        int sum = forkJoinPool.invoke(new ForkJoinDemo(array, 0, array.length));
        System.out.println(sum);
        forkJoinPool.shutdown();
    }
}

在这个示例中,我们需要计算一个整数数组中元素的总和。我们将这个任务拆分成很多个小任务,每个小任务计算一个子数组的和。当一个子数组的长度小于等于 1000 时,我们直接使用 Arrays.stream 方法计算该子数组的和;当一个子数组的长度大于 1000 时,我们将其拆分为两个子任务,提交到线程池中执行。最终,通过使用 forkJoinPool.invoke() 方法执行任务,我们可以得到整个数组的总和。

区别

ExecutorService 中,所有的任务都要由外部程序负责提交到线程池中执行。而在 ForkJoinPool 中,任务之间存在递归调用的关系,子任务的执行由线程池自动管理。这使得 ForkJoinPool 在处理分治算法等任务时更加便捷。

此外,ForkJoinPool 还有一个特点,就是每个线程都维护一个工作队列,用于保存尚未处理的子任务。当一个任务完成时,它会从其他线程的工作队列中偷取一个子任务进行处理。这种工作窃取算法可以大大减少线程的竞争,提高线程池的执行效率。

总结

ExecutorServiceForkJoinPool 分别适用于不同的场景。如果你需要提交一些独立的任务,而这些任务之间没有任何关系,那么 ExecutorService 是一个不错的选择;如果你需要处理一些分治算法等依赖关系比较强的任务,那么 ForkJoinPool 可以提供更好的帮助。