实现同步队列 API 的Java程序
SynchronousQueue 是一个特殊的阻塞队列,没有内部容量。它有助于以线程安全的方式在线程之间交换数据或信息。
SynchronousQueue 仅支持 2 个操作:
这两种方法都是阻塞方法,这意味着当我们想在队列中添加一条信息或数据时,我们调用put()方法,但该方法将保持阻塞状态或等待其他线程调用take()方法和允许线程获取数据或信息。
1.采取()
Java
try
{
synchronousQueue.put("data or information goes here");
}
catch(InterruptedException iex)
{
iex.printStackTrace();
}
Java
try
{
// data type according to the data or information
String info = synchronousQueue.take();
}
catch(InterruptedException iex)
{
iex.printStackTrace();
}
Java
// Java program to implement SynchronousQueue API.
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
public class SynchronousQAPI {
public SynchronousQueue synchronousQ;
// we create a SynchronousQueue with no fair policy
public SynchronousQAPI()
{
synchronousQ = new SynchronousQueue();
}
// we create a SynchronousQueue with fair policy
public SynchronousQAPI(boolean fair)
{
synchronousQ = new SynchronousQueue();
}
// As we discussed above in API overview that
// SynchronousQueue has 2 supported operations put() and
// take() So, we will implement this methods only
// put() method: It insert element at tail of the queue
// and used to wait until the queue is full.
public void put(E e) throws InterruptedException
{
synchronousQ.put(e);
}
// take() method: return element at the head of the
// queue
public E take() throws InterruptedException
{
return synchronousQ.take();
}
// Implementation of Put Thread (producer)
class Put implements Runnable {
@SuppressWarnings("rawtypes")
BlockingQueue SynchronousQueue;
@SuppressWarnings("rawtypes")
public Put(BlockingQueue q)
{
this.SynchronousQueue = q;
}
@SuppressWarnings("unchecked")
@Override
public void run()
{
try {
// put the data
SynchronousQueue.put(1);
System.out.println(
"1 added to synchronous queue.");
Thread.sleep(1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Take implements Runnable {
@SuppressWarnings("rawtypes")
BlockingQueue SynchronousQueue;
@SuppressWarnings("rawtypes")
public Take(BlockingQueue q)
{
this.SynchronousQueue = q;
}
@Override public void run()
{
try {
// take out the previously inserted data
this.SynchronousQueue.take();
System.out.println(
"1 removed from synchronous queue.");
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args)
throws InterruptedException
{
SynchronousQAPI synchronousQueue
= new SynchronousQAPI();
new Thread(new SynchronousQAPI<>().new Put(
synchronousQueue.synchronousQ))
.start();
new Thread(new SynchronousQAPI<>().new Take(
synchronousQueue.synchronousQ))
.start();
}
}
2.放()
Java
try
{
// data type according to the data or information
String info = synchronousQueue.take();
}
catch(InterruptedException iex)
{
iex.printStackTrace();
}
SynchronousQueue 有两种类型的构造函数,它们基于两种不同的访问策略:
1. SynchronousQueue():在这种情况下,如果多个线程正在等待,则这些线程被授予随机或未指定的访问权限,这称为不公平策略。
2. SynchronousQueue(boolean fair):在这种情况下,如果有多个线程在等待,那么这些线程以先进先出的方式被授予访问权限。
执行:
Java
// Java program to implement SynchronousQueue API.
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
public class SynchronousQAPI {
public SynchronousQueue synchronousQ;
// we create a SynchronousQueue with no fair policy
public SynchronousQAPI()
{
synchronousQ = new SynchronousQueue();
}
// we create a SynchronousQueue with fair policy
public SynchronousQAPI(boolean fair)
{
synchronousQ = new SynchronousQueue();
}
// As we discussed above in API overview that
// SynchronousQueue has 2 supported operations put() and
// take() So, we will implement this methods only
// put() method: It insert element at tail of the queue
// and used to wait until the queue is full.
public void put(E e) throws InterruptedException
{
synchronousQ.put(e);
}
// take() method: return element at the head of the
// queue
public E take() throws InterruptedException
{
return synchronousQ.take();
}
// Implementation of Put Thread (producer)
class Put implements Runnable {
@SuppressWarnings("rawtypes")
BlockingQueue SynchronousQueue;
@SuppressWarnings("rawtypes")
public Put(BlockingQueue q)
{
this.SynchronousQueue = q;
}
@SuppressWarnings("unchecked")
@Override
public void run()
{
try {
// put the data
SynchronousQueue.put(1);
System.out.println(
"1 added to synchronous queue.");
Thread.sleep(1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Take implements Runnable {
@SuppressWarnings("rawtypes")
BlockingQueue SynchronousQueue;
@SuppressWarnings("rawtypes")
public Take(BlockingQueue q)
{
this.SynchronousQueue = q;
}
@Override public void run()
{
try {
// take out the previously inserted data
this.SynchronousQueue.take();
System.out.println(
"1 removed from synchronous queue.");
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args)
throws InterruptedException
{
SynchronousQAPI synchronousQueue
= new SynchronousQAPI();
new Thread(new SynchronousQAPI<>().new Put(
synchronousQueue.synchronousQ))
.start();
new Thread(new SynchronousQAPI<>().new Take(
synchronousQueue.synchronousQ))
.start();
}
}
输出
1 added to synchronous queue.
1 removed from synchronous queue.