Java Executor Framework 中的 CustomThreadPoolExecutor
Executors 管理线程执行。在 executor 的最上层,hierarchy 是 Executor 接口,用于发起一个线程。 ExecutorService 扩展了 Executor 并提供了管理执行的方法。 ExecutorService 共有三种实现:ThreadPoolExecutor、ScheduledThreadPoolExecutor 和 ForkJoinPool。 Java.util.concurrent 还定义了 Executors 实用程序类,其中包括一些静态方法,可以简化各种执行程序的创建。与执行程序相关的是 Future 和 Callable 接口。 Future 包含一个线程在执行后返回的值。因此,当线程终止时,它的值将在“将来”定义。 Callable 定义了一个返回值的线程。在本文中,我们将学习Java的Custom ThreadPoolExecutor。
首先,让我们讨论这里积极使用的两个概念,即线程池和阻塞队列。
- ThreadPool 是一个容器,其中包含一些线程。这些线程被赋予一些任务。当一个线程完成它的任务时,下一个任务就会交给它。在多线程环境中工作时,为每个新任务创建新的单独线程是不切实际的,因为创建新线程是操作系统的开销。
- 阻塞队列是当您尝试从队列中出队并且队列为空时阻塞的队列,如果您尝试将项目入队到 t 并且队列已满。阻塞队列中的所有操作都是线程安全的。
此外,要实施的重要具体方法如下:
方法一: execute()
此方法包含在 Executor 接口中。此函数在将来的某个时间执行给定的任务。它不返回任何内容,因此此方法的返回类型为 void。
方法二: myNewFixedThreadPool()
这是 Executors 类的工厂方法。它用于在线程池中创建固定数量的线程。
- 参数: int 线程数
- 返回类型: ExecutorService
程序:
- 创建一个接口,我们将在其中创建一个执行方法。此方法将执行赋予它的任务。
- 在上面生成的代码中,我们实现了一个可运行的接口。我们正在以 1000 毫秒的延迟打印线程的当前名称。这些是我们将要执行的任务。
- MyExecutor 类提供了一个静态方法 myNewFixedThreadPool,我们将在其中传递我们想要创建的线程数。这个方法告诉线程池线程池中有多少线程。这些线程将执行任务直到所有任务完成。
- 这是自定义线程池类。这个类是整个机制的核心。它使用了两个重要的概念 LinkedBlockingQueue 和 Execution 类。执行类将进一步解释。此类从 myNewFixedThreadPool 方法接收线程计数。我们提交的所有任务都存储在队列中。所有线程将从队列中获取任务。我们使用 MyExecuorService 的 execute 方法提交任务。
- 执行类执行非常重要的任务,即在线程池中添加创建我们想要的线程数。这个类是我们定义如何从 LinkedBlockingQueue 获取任务的地方。
- 最后,在这个类中,我们将所有部分收集在一起,我们的自定义线程池已准备就绪。
实现:这里我们将一些线程作为 3 传递。任务数为 20,并使用 execute 方法执行它们。
Java
// Java Program to illustrate Concept of
// CustomThreadPoolExecutor Executor Framework
// Importing LinkedBlockingQueue class from java.util
// package
import java.util.concurrent.LinkedBlockingQueue;
// Interface
// Custom interface for which contains execute method
interface MyExecutorService {
// Method
void execute(Runnable r);
}
// Class 1
// Helper class
class MyExecutors {
// Member variables of this class
int capacity;
// Passing the number of threads that
// will be in the thread pool
static MyExecutorService
myNewFixedThreadPool(int capacity)
{
return new MyThreadPool(capacity);
}
}
// Class 2
// Helper class extending to MyExecutorService interface
class MyThreadPool implements MyExecutorService {
// Member variables of this class
static int capacity;
static int currentCapacity;
// Creating object of LinkedBlockingQueue class
// Declaring object of type Runnable
static LinkedBlockingQueue
linkedTaskBlockingQueue;
// Member variables of this class
Execution e;
// Method 1
public MyThreadPool(int capacity)
{
// Member variables of this class
// this keyword refers to current instance itself
this.capacity = capacity;
currentCapacity = 0;
// Creating a linked blocking queue which will block
// if its empty
// and it will perform thread safe operation.
linkedTaskBlockingQueue
= new LinkedBlockingQueue();
// Creating the object of execution class
e = new Execution();
}
// Method 2
// @Override
public void execute(Runnable r)
{
// Declaring and adding tasks to
// blocking queue using add() method
linkedTaskBlockingQueue.add(r);
// executeMyMethod() method of Execution class
// which will execute the tasks
e.executeMyMethod();
}
}
// Class 3
// Helper class extending Runnable inteface
class Execution implements Runnable {
// Method 1 of this class
void executeMyMethod()
{
// At start the current capacity will be 0
// The another capacity is the number of threads we
// want to create so we will increase the current
// capacity count after creating each thread it
// means that we will create the threads if the
// current capacity is less than capacity passed by
// us i.e number of threads we want to create.
// In this case 3 threads will get created
if (MyThreadPool.currentCapacity
< MyThreadPool.capacity) {
MyThreadPool.currentCapacity++;
// Creating object of Thread class
Thread t = new Thread(new Execution());
// Starting the thread
t.start();
}
}
// Method 2 of this class
// @Override
public void run()
{
// Till it is true
while (true) {
// Here we are fetching the tasks from the
// linkedblocking queue
// which we have submitted using execute method
// and executing them
if (MyThreadPool.linkedTaskBlockingQueue.size()
!= 0) {
MyThreadPool.linkedTaskBlockingQueue.poll()
.run();
}
}
}
}
// Class 4
// Helper class
// Here we are creating a simple task
// which is printing current thread name
class Mytask implements Runnable {
// Method 1 of this class
// @Override
public void run()
{
// Try block to check for exceptions
try {
// Making thread to pause fo a second
// using sleep() method
Thread.sleep(1000);
}
// Catch block to check for exceptions
catch (InterruptedException e) {
// Print the exception scaling ith line number
// using printStackrace() method
e.printStackTrace();
}
// Print and display the current thread using
// currentThread() method by getting thread name
// using getName() method
System.out.println(
"Current Thread :-> "
+ Thread.currentThread().getName());
}
}
// Class 5
// Main Class
public class ExecutorServiceCustom {
// Main driver method
public static void main(String[] args)
{
// Getting the object of MyExcutorService by using
// the factory method myNewFixedThreadPool
// Passing number of threads as 3
MyExecutorService service
= MyExecutors.myNewFixedThreadPool(3);
for (int i = 0; i < 20; i++) {
// Creating 20 tasks and passing them to execute
service.execute(new Mytask());
}
Runnable runnableTask = null;
}
}
输出:
Current Thread :-> Thread-0
Current Thread :-> Thread-1
Current Thread :-> Thread-2
Current Thread :-> Thread-0
Current Thread :-> Thread-1
Current Thread :-> Thread-2
Current Thread :-> Thread-0
Current Thread :-> Thread-1
Current Thread :-> Thread-2
Current Thread :-> Thread-0
Current Thread :-> Thread-1
Current Thread :-> Thread-2
Current Thread :-> Thread-0
Current Thread :-> Thread-1
Current Thread :-> Thread-2
Current Thread :-> Thread-0
Current Thread :-> Thread-1
Current Thread :-> Thread-2
Current Thread :-> Thread-0
Current Thread :-> Thread-1
Note: In the above output, we have printed the thread name as defined in the runnable 20 times as we have submitted 20 tasks which is visually described through a video below