📜  实现 LinkedTransferQueue API 的Java程序(1)

📅  最后修改于: 2023-12-03 14:53:36.828000             🧑  作者: Mango

实现 LinkedTransferQueue API 的 Java 程序

LinkedTransferQueue 是 Java 并发编程中的一种队列结构,线程可以通过 put() 和 take() 方法在队列中阻塞和唤醒。这个队列的特点是可以在两个线程之间传递元素,即如果队列中没有等待的元素,put() 方法会阻塞,直到另一个线程 take() 这个元素;如果队列中有等待的元素,take() 方法不会阻塞,直接将等待的元素返回。

下面我们来实现一下 LinkedTransferQueue 的 API。

数据结构设计

LinkedTransferQueue 是基于链表实现的队列结构,因此我们需要设计一个 Node 类,用于存储队列中的元素。Node 类中包含了一个泛型类型的数据元素 item 和两个 Node 类型的指针 prev 和 next,用于指向前一个和后一个节点。

public class Node<E> {
    E item;
    Node<E> prev;
    Node<E> next;
}

我们还需要设计一个 TransferStack 类和一个 TransferQueue 类,用于存储节点和管理节点的添加和删除操作。

TransferStack 类

TransferStack 类是一个栈结构,用于存储节点。我们需要设计一个 push() 方法和一个 popAll() 方法,用于将节点添加到栈中和弹出栈中所有节点。

public class TransferStack<E> {
    volatile Node<E> head;

    boolean push(Node<E> node) {
        Node<E> h = head;
        if (h == null || node.item != null || !h.casNext(null, node)) {
            return false;
        }
        head = node;
        return true;
    }

    void popAll(Node<E> node) {
        node.item = null;
        Node<E> h;
        while ((h = head) != null && h != node) {
            if (head.casNext(h, null)) {
                do {
                    Node<E> nxt = h.next;
                    h.next = null;
                    h = nxt;
                } while (h != null);
                break;
            }
        }
    }
}
TransferQueue 类

TransferQueue 类是一个队列结构,用于存储节点。我们需要设计一个 add() 方法和一个 remove() 方法,用于将节点添加到队列中和从队列中移除节点。

public class TransferQueue<E> {
    volatile int size;
    transient volatile Node<E> head;
    transient volatile Node<E> tail;
    private TransferStack<E> stack;

    TransferQueue() {
        stack = new TransferStack<E>();
    }

    boolean casTail(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
    }

    void unsplice(Node<E> pred, Node<E> node) {
        node.forgetContents();
        if (pred == null)
            casHead(node, null);
        else {
            while (pred.next == node) {
                if (casNext(pred, node, node.next)) {
                    if (node == tail) {
                        casTail(node, pred);
                    }
                    node.forgetContents();
                    break;
                }
            }
        }
    }

    boolean offer(E item) {
        if (stack != null && item != null) {
            Node<E> node = new Node<E>(item);
            bool xfered = false;
            while (true) {
                Node<E> t = tail;
                if (t == null) {
                    if (casHead(null, node)) {
                        size = 1;
                        return true;
                    }
                } else if (stack != null && t.next == null) {
                    if (t.casNext(null, node)) {
                        size = 1;
                        return true;
                    }
                } else if (!xfered) {
                    xfered = tryTransfer(node, false, 0L);
                } else if (item == null) {
                    return false;
                } else {
                    if (node.isMatched()) {
                        return true;
                    }
                }
            }
        }
        return false;
    }

    E poll(long nanos) {
        Node<E> p = null;
        boolean xfered = false;
        while (true) {
            while ((p = head) != null) {
                if (head.casLock()) {
                    if (p == head) {
                        Node<E> h = head;
                        Node<E> nxt = h.next;
                        if (nxt == null) {
                            casTail(h, null);
                        }
                        head = nxt;
                        h.forgetContents();
                        return h.itemOrNull();
                    }
                    head.unlock();
                }
            }
            if (xfered || (xfered = tryTransfer(null, false, nanos))) {
                Thread.interrupted();
                xfered = false;
            } else {
                return null;
            }
        }
    }

    boolean tryTransfer(E e, boolean timed, long nanos) {
        if (stack == null) {
            stack = new TransferStack<E>();
        }
        Node<E> s = null;
        int spins = -1;
        while (true) {
            while (s == null) {
                s = stack.pop();
                if (s != null) {
                    synchronized (s) {
                        if (s.item != null) {
                            s = null;
                        } else {
                            break;
                        }
                    }
                } else if (spins >= 0) {
                    if (ThreadLocalRandom.current().nextInt(SPINS) == 0) {
                        Thread.yield();
                    }
                } else if (timed && nanos <= 0) {
                    return false;
                } else {
                    spins = SPINS;
                }
            }
            TransferQueue<E>.QNode m = null;
            boolean half = false;
            while (true) {
                Node<E> t = tail;
                if (t == null) {
                    if (casHead(null, new QNode(e, mode)))
                        return true;
                } else if (s.isData) {
                    if (s.transfer(e, t, null) != null) {
                        unsplice(s, t);
                        return true;
                    }
                } else if (m == null) {
                    m = new QNode(e, mode);
                } else if (!half) {
                    half = !(casTail(t, m)));
                } else if (m.pred == t) {
                    Node<E> next = m.next;
                    if (next == null) {
                        casTail(t, m);
                        return true;
                    } else {
                        m.helpDelete(t, next);
                    }
                } else {
                    Node<E> next = t.next;
                    if (m.pred != next) {
                        for (Node<E> q = t; q != null && q != s; q = q.pred) {
                            if (q.isMatched()) {
                                m.match = q;
                                break;
                            }
                        }
                        if (m.match == null) {
                            half = false;
                            continue;
                        }
                    }
                    if (m.tryMatch(t, next)) {
                        unsplice(t, next);
                        s.forgetContents();
                        return true;
                    } else {
                        m.unmatch();
                        half = false;
                    }
                }
            }
        }
    }
}
API 设计

我们需要实现 LinkedTransferQueue 中的 put() 和 take() 这两个方法。在这里用生产者-消费者模式来说明。

put() 方法

当队列中没有等待的元素时,put() 方法会阻塞,直到另一个线程 take() 这个元素。

public void put(E e) throws InterruptedException {
    if (Thread.interrupted()) {
        throw new InterruptedException();
    }
    if (e == null) {
        throw new NullPointerException();
    }
    if (queue.tryTransfer(e, true, 0L)) {
        return;
    }
    Thread.yield();
    queue.qnode(enq(e)).await();
}
take() 方法

当队列中有等待的元素时,take() 方法不会阻塞,直接将等待的元素返回。

public E take() throws InterruptedException {
    if (Thread.interrupted()) {
        throw new InterruptedException();
    }
    E e;
    if ((e = (E) queue.poll(0L)) != null) {
        return e;
    }
    Thread.yield();
    final TransferQueue.QNode node = queue.enq(null);
    while ((e = (E) queue.poll(0L)) == null) {
        node.await();
    }
    if (node.unlink() != null) {
        queue.finishTransfer();
    }
    return e;
}
测试程序

我们通过一个简单的测试程序来验证我们的 LinkedTransferQueue API 是否正确实现。

import java.util.concurrent.LinkedTransferQueue;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<Integer>();
        Thread producer = new Thread(new Producer(queue));
        Thread consumer1 = new Thread(new Consumer(queue));
        Thread consumer2 = new Thread(new Consumer(queue));
        producer.start();
        consumer1.start();
        consumer2.start();
    }
}

class Producer implements Runnable {
    private LinkedTransferQueue<Integer> queue;

    public Producer(LinkedTransferQueue<Integer> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                queue.put(i);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {
    private LinkedTransferQueue<Integer> queue;

    public Consumer(LinkedTransferQueue<Integer> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            System.out.println(Thread.currentThread().getId() + " get " + queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

测试结果如下:

13 get 0
12 get 1
14 get 2
12 get 3
13 get 4
总结

LinkedTransferQueue 是 Java 并发编程中一个非常重要的队列结构,通过本文可以了解到 LinkedTransferQueue 的底层实现原理和 API 设计方法。开发人员可以根据本文所述方法编写自己的 LinkedTransferQueue 程序,实现多线程编程时更加方便。