📅  最后修改于: 2023-12-03 14:53:36.828000             🧑  作者: Mango
LinkedTransferQueue 是 Java 并发编程中的一种队列结构,线程可以通过 put() 和 take() 方法在队列中阻塞和唤醒。这个队列的特点是可以在两个线程之间传递元素,即如果队列中没有等待的元素,put() 方法会阻塞,直到另一个线程 take() 这个元素;如果队列中有等待的元素,take() 方法不会阻塞,直接将等待的元素返回。
下面我们来实现一下 LinkedTransferQueue 的 API。
LinkedTransferQueue 是基于链表实现的队列结构,因此我们需要设计一个 Node 类,用于存储队列中的元素。Node 类中包含了一个泛型类型的数据元素 item 和两个 Node 类型的指针 prev 和 next,用于指向前一个和后一个节点。
public class Node<E> {
E item;
Node<E> prev;
Node<E> next;
}
我们还需要设计一个 TransferStack 类和一个 TransferQueue 类,用于存储节点和管理节点的添加和删除操作。
TransferStack 类是一个栈结构,用于存储节点。我们需要设计一个 push() 方法和一个 popAll() 方法,用于将节点添加到栈中和弹出栈中所有节点。
public class TransferStack<E> {
volatile Node<E> head;
boolean push(Node<E> node) {
Node<E> h = head;
if (h == null || node.item != null || !h.casNext(null, node)) {
return false;
}
head = node;
return true;
}
void popAll(Node<E> node) {
node.item = null;
Node<E> h;
while ((h = head) != null && h != node) {
if (head.casNext(h, null)) {
do {
Node<E> nxt = h.next;
h.next = null;
h = nxt;
} while (h != null);
break;
}
}
}
}
TransferQueue 类是一个队列结构,用于存储节点。我们需要设计一个 add() 方法和一个 remove() 方法,用于将节点添加到队列中和从队列中移除节点。
public class TransferQueue<E> {
volatile int size;
transient volatile Node<E> head;
transient volatile Node<E> tail;
private TransferStack<E> stack;
TransferQueue() {
stack = new TransferStack<E>();
}
boolean casTail(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
void unsplice(Node<E> pred, Node<E> node) {
node.forgetContents();
if (pred == null)
casHead(node, null);
else {
while (pred.next == node) {
if (casNext(pred, node, node.next)) {
if (node == tail) {
casTail(node, pred);
}
node.forgetContents();
break;
}
}
}
}
boolean offer(E item) {
if (stack != null && item != null) {
Node<E> node = new Node<E>(item);
bool xfered = false;
while (true) {
Node<E> t = tail;
if (t == null) {
if (casHead(null, node)) {
size = 1;
return true;
}
} else if (stack != null && t.next == null) {
if (t.casNext(null, node)) {
size = 1;
return true;
}
} else if (!xfered) {
xfered = tryTransfer(node, false, 0L);
} else if (item == null) {
return false;
} else {
if (node.isMatched()) {
return true;
}
}
}
}
return false;
}
E poll(long nanos) {
Node<E> p = null;
boolean xfered = false;
while (true) {
while ((p = head) != null) {
if (head.casLock()) {
if (p == head) {
Node<E> h = head;
Node<E> nxt = h.next;
if (nxt == null) {
casTail(h, null);
}
head = nxt;
h.forgetContents();
return h.itemOrNull();
}
head.unlock();
}
}
if (xfered || (xfered = tryTransfer(null, false, nanos))) {
Thread.interrupted();
xfered = false;
} else {
return null;
}
}
}
boolean tryTransfer(E e, boolean timed, long nanos) {
if (stack == null) {
stack = new TransferStack<E>();
}
Node<E> s = null;
int spins = -1;
while (true) {
while (s == null) {
s = stack.pop();
if (s != null) {
synchronized (s) {
if (s.item != null) {
s = null;
} else {
break;
}
}
} else if (spins >= 0) {
if (ThreadLocalRandom.current().nextInt(SPINS) == 0) {
Thread.yield();
}
} else if (timed && nanos <= 0) {
return false;
} else {
spins = SPINS;
}
}
TransferQueue<E>.QNode m = null;
boolean half = false;
while (true) {
Node<E> t = tail;
if (t == null) {
if (casHead(null, new QNode(e, mode)))
return true;
} else if (s.isData) {
if (s.transfer(e, t, null) != null) {
unsplice(s, t);
return true;
}
} else if (m == null) {
m = new QNode(e, mode);
} else if (!half) {
half = !(casTail(t, m)));
} else if (m.pred == t) {
Node<E> next = m.next;
if (next == null) {
casTail(t, m);
return true;
} else {
m.helpDelete(t, next);
}
} else {
Node<E> next = t.next;
if (m.pred != next) {
for (Node<E> q = t; q != null && q != s; q = q.pred) {
if (q.isMatched()) {
m.match = q;
break;
}
}
if (m.match == null) {
half = false;
continue;
}
}
if (m.tryMatch(t, next)) {
unsplice(t, next);
s.forgetContents();
return true;
} else {
m.unmatch();
half = false;
}
}
}
}
}
}
我们需要实现 LinkedTransferQueue 中的 put() 和 take() 这两个方法。在这里用生产者-消费者模式来说明。
当队列中没有等待的元素时,put() 方法会阻塞,直到另一个线程 take() 这个元素。
public void put(E e) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (e == null) {
throw new NullPointerException();
}
if (queue.tryTransfer(e, true, 0L)) {
return;
}
Thread.yield();
queue.qnode(enq(e)).await();
}
当队列中有等待的元素时,take() 方法不会阻塞,直接将等待的元素返回。
public E take() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
E e;
if ((e = (E) queue.poll(0L)) != null) {
return e;
}
Thread.yield();
final TransferQueue.QNode node = queue.enq(null);
while ((e = (E) queue.poll(0L)) == null) {
node.await();
}
if (node.unlink() != null) {
queue.finishTransfer();
}
return e;
}
我们通过一个简单的测试程序来验证我们的 LinkedTransferQueue API 是否正确实现。
import java.util.concurrent.LinkedTransferQueue;
public class Main {
public static void main(String[] args) throws InterruptedException {
LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<Integer>();
Thread producer = new Thread(new Producer(queue));
Thread consumer1 = new Thread(new Consumer(queue));
Thread consumer2 = new Thread(new Consumer(queue));
producer.start();
consumer1.start();
consumer2.start();
}
}
class Producer implements Runnable {
private LinkedTransferQueue<Integer> queue;
public Producer(LinkedTransferQueue<Integer> queue) {
this.queue = queue;
}
public void run() {
try {
for (int i = 0; i < 5; i++) {
queue.put(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
private LinkedTransferQueue<Integer> queue;
public Consumer(LinkedTransferQueue<Integer> queue) {
this.queue = queue;
}
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println(Thread.currentThread().getId() + " get " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试结果如下:
13 get 0
12 get 1
14 get 2
12 get 3
13 get 4
LinkedTransferQueue 是 Java 并发编程中一个非常重要的队列结构,通过本文可以了解到 LinkedTransferQueue 的底层实现原理和 API 设计方法。开发人员可以根据本文所述方法编写自己的 LinkedTransferQueue 程序,实现多线程编程时更加方便。