📜  Python|线程间通信 |第 2 组

📅  最后修改于: 2022-05-13 01:54:54.975000             🧑  作者: Mango

Python|线程间通信 |第 2 组

先决条件:线程间通信 |第一组

如果线程需要立即知道消费者线程何时处理了特定数据项,则应将发送的数据与允许生产者监视其进度的事件对象配对,如下面的代码所示 -

代码#1:
from queue import Queue
from threading import Thread, Event
  
# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        # Make an (data, event) pair and 
        # hand it to the consumer
        evt = Event()
        out_q.put((data, evt))
        ...
        # Wait for the consumer to process the item
        evt.wait()
      
    # A thread that consumes data
    def consumer(in_q):
        while True:
            # Get some data
            data, evt = in_q.get()
            # Process the data
            ...
            # Indicate completion
            evt.set()

基于简单队列编写线程程序通常是保持理智的好方法。如果一切都分解为简单的线程安全队列,则不需要在程序中乱扔锁和其他低级同步。此外,与队列通信通常会导致设计在以后扩展到其他类型的基于消息的通信模式。例如,可以将程序拆分为多个进程,甚至是一个分布式系统,而无需更改其大部分底层排队架构。
线程队列的一个注意事项是,将项目放入队列不会复制项目。因此,通信实际上涉及在线程之间传递对象引用。

如果共享状态是一个问题,那么只传递不可变的数据结构(例如,整数、字符串或元组)或制作排队项目的深层副本可能是有意义的,如下面的代码所示:

代码#2:

from queue import Queue
from threading import Thread
import copy
  
# A thread that produces data
def producer(out_q):
    while True:
        # Produce some data
        ...
        out_q.put(copy.deepcopy(data))
          
# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
        # Process the data
        ...
  • 队列对象提供了一些额外的特性,这些特性在某些情况下可能会被证明是有用的。
  • 如果创建具有可选大小的队列,例如 Queue(N),它会限制在put()阻塞生产者之前可以排队的项目数。
  • 如果生产者和消费者之间的速度不匹配,则向队列添加上限可能是有意义的。
  • 例如,如果生产者以比消费快得多的速度生成物品。
  • 另一方面,在队列满时进行阻塞也会在整个程序中产生意想不到的级联效应,可能导致它死锁或运行不佳。
  • 一般来说,通信线程之间的“流控制”问题比看起来要困难得多。
  • 如果曾经试图通过摆弄队列大小来解决问题,这可能表明设计脆弱或其他一些固有的扩展问题。

代码 #3: get()put()方法支持非阻塞和超时。

import queue
q = queue.Queue()
  
try:
    data = q.get(block = False)
except queue.Empty:
    ...
try:
    q.put(item, block = False)
except queue.Full:
    ...
      
try:
    data = q.get(timeout = 5.0)
except queue.Empty:
    ...

这两个选项都可以用来避免在特定排队操作上无限期阻塞的问题。例如,非阻塞put()可以与固定大小的队列一起使用,以在队列已满时实现不同类型的处理代码。

代码#4:发出日志消息并丢弃

def producer(q):
    ...
    try:
        q.put(item, block = False)
    except queue.Full:
        log.warning('queued item % r discarded !', item)

如果试图让消费者线程定期放弃诸如q.get()之类的操作,以便它们可以检查诸如终止标志之类的东西,则超时很有用。

代码 #5:使用超时

_running = True
  
def consumer(q):
    while _running:
        try:
            item = q.get(timeout = 5.0)
            # Process item
            ...
        except queue.Empty:
            pass

最后,还有实用方法q.qsize()q.full()q.empty()可以告诉队列的当前大小和状态。但是,请注意,所有这些在多线程环境中都是不可靠的。例如,对q.empty()的调用可能会告诉队列是空的,但是在调用之后已经过去的时间里,另一个线程可能已经将一个项目添加到队列中。坦率地说,最好编写代码不要依赖这些函数。