📜  Python中的多处理 | Set 2(进程之间的通信)

📅  最后修改于: 2022-05-13 01:55:35.306000             🧑  作者: Mango

Python中的多处理 | Set 2(进程之间的通信)

Python中的多处理 |设置 1
这些文章讨论了在Python中使用多处理模块时进程之间的数据共享和消息传递的概念。
在多处理中,任何新创建的进程都将执行以下操作:

  • 独立运行
  • 有自己的记忆空间。

考虑下面的程序来理解这个概念:

import multiprocessing
  
# empty list with global scope
result = []
  
def square_list(mylist):
    """
    function to square a given list
    """
    global result
    # append squares of mylist to global list result
    for num in mylist:
        result.append(num * num)
    # print global list result
    print("Result(in process p1): {}".format(result))
  
if __name__ == "__main__":
    # input list
    mylist = [1,2,3,4]
  
    # creating new process
    p1 = multiprocessing.Process(target=square_list, args=(mylist,))
    # starting process
    p1.start()
    # wait until process is finished
    p1.join()
  
    # print global result list
    print("Result(in main program): {}".format(result))
Result(in process p1): [1, 4, 9, 16]
Result(in main program): []

在上面的例子中,我们尝试在两个地方打印全局列表结果的内容:

  • square_list函数中。由于此函数由进程p1调用,因此结果列表仅在进程p1的内存空间中更改。
  • 在主程序中处理p1完成后。由于主程序由不同的进程运行,它的内存空间仍然包含空的结果列表。

下图阐明了这个概念:

在进程之间共享数据

  1. 共享内存:多处理模块提供ArrayValue对象在进程之间共享数据。
    • 数组:共享内存分配的 ctypes 数组。
    • 值:共享内存分配的 ctypes 对象。

    下面给出了一个简单的示例,展示了使用ArrayValue在进程之间共享数据。

    import multiprocessing
      
    def square_list(mylist, result, square_sum):
        """
        function to square a given list
        """
        # append squares of mylist to result array
        for idx, num in enumerate(mylist):
            result[idx] = num * num
      
        # square_sum value
        square_sum.value = sum(result)
      
        # print result Array
        print("Result(in process p1): {}".format(result[:]))
      
        # print square_sum Value
        print("Sum of squares(in process p1): {}".format(square_sum.value))
      
    if __name__ == "__main__":
        # input list
        mylist = [1,2,3,4]
      
        # creating Array of int data type with space for 4 integers
        result = multiprocessing.Array('i', 4)
      
        # creating Value of int data type
        square_sum = multiprocessing.Value('i')
      
        # creating new process
        p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))
      
        # starting process
        p1.start()
      
        # wait until the process is finished
        p1.join()
      
        # print result array
        print("Result(in main program): {}".format(result[:]))
      
        # print square_sum Value
        print("Sum of squares(in main program): {}".format(square_sum.value))
    
    Result(in process p1): [1, 4, 9, 16]
    Sum of squares(in process p1): 30
    Result(in main program): [1, 4, 9, 16]
    Sum of squares(in main program): 30
    

    让我们试着逐行理解上面的代码:

    • 首先,我们创建一个这样的 Array结果
      result = multiprocessing.Array('i', 4)
      
      • 第一个参数是数据类型。 “i”代表整数,而“d”代表浮点数据类型。
      • 第二个参数是数组的大小。在这里,我们创建了一个包含 4 个元素的数组。

      同样,我们像这样创建一个 Value square_sum

      square_sum = multiprocessing.Value('i')
      

      在这里,我们只需要指定数据类型。该值可以被赋予一个初始值(比如 10),如下所示:

      square_sum = multiprocessing.Value('i', 10)
      
    • 其次,我们在创建Process对象时将resultsquare_sum作为参数传递。
      p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))
      
    • 通过指定数组元素的索引,结果数组元素被赋予一个值。
      for idx, num in enumerate(mylist):
            result[idx] = num * num
      

      square_sum使用其value属性赋予一个值:

      square_sum.value = sum(result)
      
    • 为了打印结果数组元素,我们使用result[:]打印完整的数组。
      print("Result(in process p1): {}".format(result[:]))
      

      square_sum的值简单地打印为:

      print("Sum of squares(in process p1): {}".format(square_sum.value))
      

    下图描述了进程如何共享ArrayValue对象:

  2. 服务器进程:每当Python程序启动时,也会启动一个服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它派生一个新进程。
    服务器进程可以保存Python对象并允许其他进程使用代理来操作它们。
    multiprocessing模块提供了一个管理器类来控制服务器进程。因此,管理器提供了一种方法来创建可以在不同流程之间共享的数据。

    考虑下面给出的示例:

    import multiprocessing
      
    def print_records(records):
        """
        function to print record(tuples) in records(list)
        """
        for record in records:
            print("Name: {0}\nScore: {1}\n".format(record[0], record[1]))
      
    def insert_record(record, records):
        """
        function to add a new record to records(list)
        """
        records.append(record)
        print("New record added!\n")
      
    if __name__ == '__main__':
        with multiprocessing.Manager() as manager:
            # creating a list in server process memory
            records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin',9)])
            # new record to be inserted in records
            new_record = ('Jeff', 8)
      
            # creating new processes
            p1 = multiprocessing.Process(target=insert_record, args=(new_record, records))
            p2 = multiprocessing.Process(target=print_records, args=(records,))
      
            # running process p1 to insert new record
            p1.start()
            p1.join()
      
            # running process p2 to print records
            p2.start()
            p2.join()
    
    New record added!
    
    Name: Sam
    Score: 10
    
    Name: Adam
    Score: 9
    
    Name: Kevin
    Score: 9
    
    Name: Jeff
    Score: 8
    

    让我们试着理解上面的代码:

    • 首先,我们使用以下方法创建一个管理器对象:
      with multiprocessing.Manager() as manager:
      

      with语句块下的所有行都在manager对象的范围内。

    • 然后,我们使用以下命令在服务器进程内存中创建一个列表记录
      records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin',9)])
      

      同样,您可以创建字典作为manager.dict方法。

    • 最后,我们创建进程p1 (在记录列表中插入新记录)和p2 (打印记录)并运行它们,同时将记录作为参数之一传递。

    服务器进程的概念如下图所示:

进程之间的通信

多个流程的有效使用通常需要它们之间进行一些沟通,以便可以划分工作并汇总结果。
multiprocessing支持两种类型的进程之间的通信通道:

  • 队列
  • 管道
  1. 队列:使用多处理在进程之间进行通信的一种简单方法是使用队列来回传递消息。任何Python对象都可以通过队列。
    注意: multiprocessing.Queue类是queue.Queue的近似克隆。
    考虑下面给出的示例程序:
    import multiprocessing
      
    def square_list(mylist, q):
        """
        function to square a given list
        """
        # append squares of mylist to queue
        for num in mylist:
            q.put(num * num)
      
    def print_queue(q):
        """
        function to print queue elements
        """
        print("Queue elements:")
        while not q.empty():
            print(q.get())
        print("Queue is now empty!")
      
    if __name__ == "__main__":
        # input list
        mylist = [1,2,3,4]
      
        # creating multiprocessing Queue
        q = multiprocessing.Queue()
      
        # creating new processes
        p1 = multiprocessing.Process(target=square_list, args=(mylist, q))
        p2 = multiprocessing.Process(target=print_queue, args=(q,))
      
        # running process p1 to square list
        p1.start()
        p1.join()
      
        # running process p2 to get queue elements
        p2.start()
        p2.join()
    
    Queue elements:
    1
    4
    9
    16
    Queue is now empty!
    

    让我们试着一步一步理解上面的代码:

    • 首先,我们使用以下方法创建一个多处理队列
      q = multiprocessing.Queue()
      
    • 然后我们通过进程p1将空队列q传递给square_list函数。使用put方法将元素插入队列。
      q.put(num * num)
      
    • 为了打印队列元素,我们使用get方法直到队列不为空。
      while not q.empty():
            print(q.get())
      

    下面给出了一个描述队列操作的简单图表:

  2. 管道:管道只能有两个端点。因此,当只需要双向通信时,它优于队列。

    multiprocessing模块提供Pipe()函数,该函数返回一对由管道连接的连接对象。 Pipe()返回的两个连接对象代表管道的两端。每个连接对象都有send()recv()方法(等等)。
    考虑下面给出的程序:

    import multiprocessing
      
    def sender(conn, msgs):
        """
        function to send messages to other end of pipe
        """
        for msg in msgs:
            conn.send(msg)
            print("Sent the message: {}".format(msg))
        conn.close()
      
    def receiver(conn):
        """
        function to print the messages received from other
        end of pipe
        """
        while 1:
            msg = conn.recv()
            if msg == "END":
                break
            print("Received the message: {}".format(msg))
      
    if __name__ == "__main__":
        # messages to be sent
        msgs = ["hello", "hey", "hru?", "END"]
      
        # creating a pipe
        parent_conn, child_conn = multiprocessing.Pipe()
      
        # creating new processes
        p1 = multiprocessing.Process(target=sender, args=(parent_conn,msgs))
        p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
      
        # running processes
        p1.start()
        p2.start()
      
        # wait until processes finish
        p1.join()
        p2.join()
    
    Sent the message: hello
    Sent the message: hey
    Sent the message: hru?
    Received the message: hello
    Sent the message: END
    Received the message: hey
    Received the message: hru?
    

    让我们试着理解上面的代码:

    • 简单地使用以下方法创建了一个管道:
      parent_conn, child_conn = multiprocessing.Pipe()
      

      该函数返回管道两端的两个连接对象。

    • 使用send方法将消息从管道的一端发送到另一端。
      conn.send(msg)
      
    • 要在管道的一端接收任何消息,我们使用recv方法。
      msg = conn.recv()
      
    • 在上面的程序中,我们将消息列表从一端发送到另一端。在另一端,我们阅读消息,直到收到“END”消息。

    考虑下面给出的图表,它显示了 b/w 管道和进程的关系:

注意:如果两个进程(或线程)尝试同时读取或写入管道的同一端,则管道中的数据可能会损坏。当然,同时使用管道不同端的进程不存在损坏风险。另请注意,队列在进程之间进行适当的同步,但代价是增加了复杂性。因此,队列被认为是线程和进程安全的!

下一个:

  • Python中进程的同步和池化