📜  Python|线程间通信 |第一组

📅  最后修改于: 2022-05-13 01:55:10.944000             🧑  作者: Mango

Python|线程间通信 |第一组

给定程序中的多个线程,并且希望在它们之间安全地通信或交换数据。

也许从一个线程向另一个线程发送数据最安全的方法是使用队列库中的队列。为此,请创建一个由线程共享的 Queue 实例。然后线程使用put()get()操作从队列中添加或删除项目,如下面的代码所示。

代码#1:

from queue import Queue
from threading import Thread
  
# A thread that produces data
def producer(out_q):
    while True:
        # Produce some data
        ...
        out_q.put(data)
          
# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
        # Process the data
        ...
          
# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target = consumer, args =(q, ))
t2 = Thread(target = producer, args =(q, ))
t1.start()
t2.start()

队列实例已经具有所有必需的锁定,因此可以根据需要由尽可能多的线程安全地共享它们。使用队列时,协调生产者和消费者的关闭可能有些棘手。

这个问题的一个常见解决方案是依赖一个特殊的哨兵值,当它被放入队列时,会导致消费者终止,如下面的代码所示:

代码#2:

from queue import Queue
from threading import Thread
  
# Object that signals shutdown
_sentinel = object()
  
# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        out_q.put(data)
  
    # Put the sentinel on the queue to indicate completion
    out_q.put(_sentinel)
  
  
# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
          
        # Check for termination
        if data is _sentinel:
            in_q.put(_sentinel)
            break
        ...

上面代码的一个微妙特征是,消费者在收到特殊的哨兵值后,会立即将其放回队列中。这会将哨兵传播到可能正在侦听同一队列的其他消费者线程,从而将它们一个接一个地关闭。

虽然队列是最常见的线程通信机制,但只要添加所需的锁定和同步,就可以构建自己的数据结构。最常见的方法是用条件变量包装数据结构。

代码 #3:构建线程安全的优先级队列

import heapq
import threading
  
class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._count = 0
        self._cv = threading.Condition()
          
    def put(self, item, priority):
        with self._cv:
            heapq.heappush(self._queue, (-priority, self._count, item))
            self._count += 1
            self._cv.notify()
              
    def get(self):
        with self._cv:
            while len(self._queue) == 0:
                self._cv.wait()
            return heapq.heappop(self._queue)[-1]

线程与队列的通信是一种单向且非确定性的过程。一般来说,没有办法知道接收线程何时实际接收到消息并对其进行处理。但是,队列对象确实提供了一些基本的完成功能,如下面示例中的task_done()join()方法所示 -

代码#4:

from queue import Queue
from threading import Thread
  
# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        out_q.put(data)
          
# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
        # Process the data
        ...
        # Indicate completion
        in_q.task_done()
          
# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target = consumer, args =(q, ))
t2 = Thread(target = producer, args =(q, ))
t1.start()
t2.start()
  
# Wait for all produced items to be consumed
q.join()