📜  Java.util.concurrent 包

📅  最后修改于: 2022-05-13 01:54:48.182000             🧑  作者: Mango

Java.util.concurrent 包

Java Concurrency包涵盖了Java平台上的并发、多线程并行。并发是并行运行多个或多个程序或应用程序的能力。 Java并发的支柱是线程 (轻量级进程,它有自己的文件和堆栈,可以从同一进程中的其他线程访问共享数据)。可以通过异步或并行执行耗时任务来提高程序的吞吐量和交互性

Java 5向Java平台⇾ Java.util.concurrent 包添加了一个新包。这个包有一套 有助于在Java中开发并发应用程序(多线程)的类和接口。在这个包之前,需要自己制作他们需要的实用程序类。

此软件包的主要组件/实用程序:

我们将讨论这个包中一些最有用的实用程序。

1. 执行者

Executor 是一组接口,代表一个对象,其实现执行任务。任务应该在新线程上运行还是在当前线程上运行取决于实现。因此,我们可以使用此接口将任务执行流程与实际任务执行机制解耦。 Executor 不要求任务执行是异步的。最简单的是可执行接口。

public interface Executor {
    void execute( Runnable command );
}

要创建一个执行程序实例,我们需要创建一个调用程序。

public class Invoker implements Executor {
   @Override
   public void execute(Runnable r) {
       r.run();
   }
}

现在,为了执行任务,我们可以使用这个调用程序。

public void execute() {
   Executor exe = new Invoker();
   exe.execute( () -> {
       // task to be performed
   });
}

如果 executor 不能接受要执行的任务,它将抛出RejectedExecutionException

2. ExecutorService

ExecutorService 是一个接口,只强制底层实现实现execute()方法。它扩展了 Executor 接口并添加了一系列执行返回值的线程的方法。关闭线程池的方法以及为任务执行结果实现的能力。

我们需要创建 Runnable 目标以使用 ExecutorService。

public class Task implements Runnable {
   @Override
   public void run() {

       // task details
   }
}

现在,我们可以创建此类的对象/实例并分配任务。我们需要在创建实例时指定线程池大小。

// 20 is the thread pool size
ExecutorService exec = Executors.newFixedThreadPool(20);

对于单线程 ExecutorService 实例的创建,我们可以使用newSingleThreadExecuter(ThreadFactory threadfactory)来创建实例。执行器创建完成后,我们就可以提交任务了。

public void execute() {
   executor.submit(new Task());
}

此外,我们可以创建 Runnable 实例来提交任务。

executor.submit(() -> {
   new Task();
});

两种开箱即用的终止方法是:

  • shutdown():等待所有提交的任务执行完成。
  • shutdownNow():它立即终止所有正在执行/挂起的任务。

还有一种方法是awaitTermination() ,它强制阻塞直到所有任务在关闭事件触发或执行超时发生后完成执行,或者执行线程本身被中断。

try {
   exec.awaitTermination( 50l, TimeUnit.NANOSECONDS );
} catch (InterruptedException e) {
   e.printStackTrace();
}

3. ScheduledExecutorService

它类似于 ExecutorService。不同的是,这个接口可以周期性地执行任务。两者都可运行 Callable函数用于定义任务。

public void execute() {
   ScheduledExecutorService execServ
     = Executors.newSingleThreadScheduledExecutor();

   Future future = executorService.schedule(() -> {
       // ..
       return "Hello world";
   }, 1, TimeUnit.SECONDS);

   ScheduledFuture scheduledFuture = execServ.schedule(() -> {
       // ..
   }, 1, TimeUnit.SECONDS);

   executorService.shutdown();
}

ScheduledExecutorService也可以定义一个任务 经过一些固定的延迟。

executorService.scheduleAtFixedRate(() -> {
   // ..
}, 1, 20, TimeUnit.SECONDS);

executorService.scheduleWithFixedDelay(() -> {
   // ..
}, 1, 20, TimeUnit.SECONDS);

这里,

  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):此方法创建并执行一个周期性动作,该动作在初始延迟后首先调用,随后在给定的时间段内调用,直到服务实例关闭。
  • scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit):此方法创建并执行一个周期性动作,该动作在提供的初始延迟后首先被调用,并在执行者终止和调用之间以给定的延迟重复执行下一个。

4. 未来

它表示异步操作的结果。其中的方法检查异步操作是否完成,获取完成的结果等。cancel(boolean isInterruptRunning) API 取消操作并释放正在执行的线程。当 isInterruptRunning 值为 true 时,执行任务的线程将立即终止。否则,所有进行中的任务都会完成。

代码片段创建了Future 的一个实例。

public void invoke() {
   ExecutorService executorService = Executors.newFixedThreadPool(20);

   Future future = executorService.submit(() -> {
       // ...
       Thread.sleep(10000l);
       return "Hello";
   });
}

检查future 的结果是否准备好并在计算完成后获取数据的代码。

if (future.isDone() && !future.isCancelled()) {
   try {
       str = future.get();
   } catch (InterruptedException | ExecutionException e) {
       e.printStackTrace();
   }
}

给定操作的超时规范。如果花费的时间超过这个时间,则抛出TimeoutException

try {
   future.get(20, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
   e.printStackTrace();
} 

5.倒计时锁存器

它是一个实用程序类,它会阻塞一组线程,直到某些操作完成。 CountDownLatch 用计数器(整数类型)初始化。该计数器随着依赖线程的执行完成而递减。但是一旦计数器减少到零,其他线程就会被释放。

6. CyclicBarrier

CyclicBarrier 与 CountDownLatch 几乎相同,只是我们可以重用它。它允许多个线程在调用最终任务之前使用await()相互等待,并且此功能不在 CountDownLatch 中。

我们需要创建一个 Runnable Task 实例来启动屏障条件。

public class Task implements Runnable {

   private CyclicBarrier barrier;

   public Task(CyclicBarrier barrier) {
       this.barrier = barrier;
   }

   @Override
   public void run() {
       try {
           LOG.info(Thread.currentThread().getName() +
             " is waiting");
           barrier.await();
           LOG.info(Thread.currentThread().getName() +
             " is released");
       } catch (InterruptedException | BrokenBarrierException e) {
           e.printStackTrace();
       }
   }

}

现在,调用几个线程来竞争屏障条件:

public void start() {

   CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
       // ..
       LOG.info("All previous tasks completed");
   });

   Thread t11 = new Thread(new Task(cyclicBarrier), "T11");
   Thread t12 = new Thread(new Task(cyclicBarrier), "T12");
   Thread t13 = new Thread(new Task(cyclicBarrier), "T13");

   if (!cyclicBarrier.isBroken()) {
       t11.start();
       t12.start();
       t13.start();
   }
}

在上面的代码中, isBroken()方法检查在执行期间是否有任何线程被中断。

7. 信号量

它用于阻止对逻辑或物理资源的某些部分进行线程级访问。信号量包含一组许可。无论线程在何处尝试进入临界区的代码部分,信号量都会给出许可是否可用的许可,表示临界区是否可用。如果许可不可用,则线程无法进入临界区。

它基本上是一个名为 counter 的变量,它维护从临界区进入和离开线程的计数。当执行线程释放临界区时,计数器增加。

下面的代码用于实现信号量:

static Semaphore semaphore = new Semaphore(20);

public void execute() throws InterruptedException {

   LOG.info("Available : " + semaphore.availablePermits());
   LOG.info("No. of threads waiting to acquire: " +
     semaphore.getQueueLength());

   if (semaphore.tryAcquire()) {
       try {
           // 
       }
       finally {
           semaphore.release();
       }
   }

}

信号量可用于实现类似互斥体的数据结构。

8. 线程工厂

它充当线程池,按需创建新线程。 ThreadFactory 可以定义为:

public class GFGThreadFactory implements ThreadFactory {
   private int threadId;
   private String name;

   public GFGThreadFactory(String name) {
       threadId = 1;
       this.name = name;
   }

   @Override
   public Thread newThread(Runnable r) {
       Thread t = new Thread(r, name + "-Thread_" + threadId);
       LOG.info("created new thread with id : " + threadId +
           " and name : " + t.getName());
       threadId++;
       return t;
   }
}

9.阻塞队列

BlockingQueue接口通过在 BlockingQueue 已满或为空时引入阻塞来支持流量控制(除了队列之外)。试图将一个元素排入已满队列的线程将被阻塞,直到某个其他线程通过将一个或多个元素出队或完全清除队列在队列中腾出空间。类似地,它会阻塞一个试图从空队列中删除的线程,直到其他一些线程插入一个项目。 BlockingQueue 不接受空值。如果我们尝试将空项入队,则会抛出NullPointerException

10.延迟队列

DelayQueue 是一种特殊的优先级队列,它根据元素的延迟时间对元素进行排序。这意味着只能从时间已过期的队列中取出那些元素。 DelayQueue 头包含过期时间最短的元素。如果没有延迟到期,则没有头部并且 poll 将返回 null。 DelayQueue 只接受属于 Delayed 类型类的那些元素。 DelayQueue 实现了 getDelay() 方法来返回剩余的延迟时间。

11. 锁

它是用于阻止其他线程访问特定代码段的实用程序。 Lock 和 Synchronized 块之间的区别在于,我们在单独的方法中具有 Lock API 的lock() 和 unlock()操作,而 Synchronized 块完全包含在方法中。

12. 移相器

它比 CountDownLatch 和 CyclicBarrier 更灵活。 Phaser 用作可重用屏障,在继续执行之前,动态线程数需要在该屏障上等待。通过为每个程序阶段重用 Phaser 实例,可以协调多个执行阶段。