📅  最后修改于: 2020-10-28 01:39:02             🧑  作者: Mango
在本文中,我们将学习如何使用Python实现多进程。我们还将讨论其高级概念。
多进程是系统并行运行一个或多个进程的能力。简而言之,多进程使用单个计算机系统中的两个或多个CPU。此方法还能够在多个进程之间分配任务。
处理单元共享主存储器和外围设备以同时处理程序。多进程应用程序分成较小的部分,并独立运行。操作系统将每个进程分配给处理器。
Python提供了一个称为multiprocessing的内置软件包,该软件包支持交换过程。在进行多进程之前,我们必须了解过程对象。
多进程对于在计算机系统内执行多项任务至关重要。假设一台没有多处理器或单处理器的计算机。我们同时将各种过程分配给该系统。
然后,它将不得不中断上一个任务,并转移到另一个任务以使所有进程继续进行。就像Chef独自在厨房工作一样简单。他必须完成一些任务来烹饪食物,例如切割,清洁,烹饪,揉面团,烘烤等。
因此,多进程对于不间断地同时执行多个任务至关重要。它还使跟踪所有任务变得容易。因此,出现了多进程的概念。
在多进程中,CPU可以一次分配多个任务,每个任务都有自己的处理器。
Python提供了多进程模块,可以在单个系统中执行多个任务。它提供了一个用户友好且直观的API,可用于多进程。
让我们了解多重处理的简单示例。
范例-
from multiprocessing import Process
def disp():
print ('Hello !! Welcome to Python Tutorial')
if __name__ == '__main__':
p = Process(target=disp)
p.start()
p.join()
输出:
'Hello !! Welcome to Python Tutorial'
说明:
在上面的代码中,我们导入了Process类,然后在disp()函数创建了Process对象。然后,我们使用start()方法开始该过程,并使用join()方法完成该过程。我们还可以使用args关键字在已声明的函数传递参数。
让我们了解带有参数的多重处理的以下示例。
示例-2
# Python multiprocessing example
# importing the multiprocessing module
import multiprocessing
def cube(n):
# This function will print the cube of the given number
print("The Cube is: {}".format(n * n * n))
def square(n):
# This function will print the square of the given number
print("The Square is: {}".format(n * n))
if __name__ == "__main__":
# creating two processes
process1 = multiprocessing.Process(target= square, args=(5, ))
process2 = multiprocessing.Process(target= cube, args=(5, ))
# Here we start the process 1
process1.start()
# Here we start process 2
process2.start()
# The join() method is used to wait for process 1 to complete
process1.join()
# It is used to wait for process 1 to complete
process2.join()
# Print if both processes are completed
print("Both processes are finished")
输出:
The Cube is: 125
The Square is: 25
Both processes are finished
说明-
在上面的示例中,我们创建了两个函数-cube()函数计算给定数字的立方体,square()函数计算给定数字的平方。
接下来,我们定义具有两个参数的Process类的过程对象。第一个参数是代表要执行的函数的目标,第二个参数是args代表要在函数内传递的函数。
process1 = multiprocessing.Process(target= square, args=(5, ))
process2 = multiprocessing.Process(target= cube, args=(5, ))
我们使用了start()方法来启动该过程。
process1.start()
process2.start()
正如我们在输出中看到的那样,它等待进程1完成,然后等待进程2完成。最后一条语句在两个进程完成之后执行。
Python多进程模块提供了许多用于构建并行程序的类。我们将讨论其主要类-进程,队列和锁定。在前面的示例中,我们已经讨论了Process类。现在,我们将讨论Queue和Lock类。
让我们看一下获取当前系统中CPU数量的简单示例。
范例-
import multiprocessing
print("The number of CPU currently working in system : ", multiprocessing.cpu_count())
输出:
('The number of CPU currently woking in system : ', 32)
以上CPU数量因您的PC而异。对我们来说,核心数是32。
我们知道Queue是数据结构的重要组成部分。 Python多进程与基于“先进先出”概念的数据结构队列完全相同。队列通常存储Python对象,并且在进程之间共享数据中起着至关重要的作用。
在流程的目标函数中将队列作为参数传递,以允许流程使用数据。队列提供了put()函数来插入数据,而get()函数则从队列中获取数据。让我们了解以下示例。
范例-
# Importing Queue Class
from multiprocessing import Queue
fruits = ['Apple', 'Orange', 'Guava', 'Papaya', 'Banana']
count = 1
# creating a queue object
queue = Queue()
print('pushing items to the queue:')
for fr in fruits:
print('item no: ', count, ' ', fr)
queue.put(fr)
count += 1
print('\npopping items from the queue:')
count = 0
while not queue.empty():
print('item no: ', count, ' ', queue.get())
count += 1
输出:
pushing items to the queue:
('item no: ', 1, ' ', 'Apple')
('item no: ', 2, ' ', 'Orange')
('item no: ', 3, ' ', 'Guava')
('item no: ', 4, ' ', 'Papaya')
('item no: ', 5, ' ', 'Banana')
popping items from the queue:
('item no: ', 0, ' ', 'Apple')
('item no: ', 1, ' ', 'Orange')
('item no: ', 2, ' ', 'Guava')
('item no: ', 3, ' ', 'Papaya')
('item no: ', 4, ' ', 'Banana')
说明-
在上面的代码中,我们导入了Queue类并初始化了名为Fruits的列表。接下来,我们为1分配一个计数。count变量将对元素总数进行计数。然后,我们通过调用Queue()方法创建了队列对象。该对象将用于在队列中执行操作。在for循环中,我们使用put()函数在队列中逐个插入元素,并在每次循环迭代时将计数增加1。
多进程Lock类用于获取对该进程的锁定,以便我们可以让另一个进程执行类似的代码,直到释放该锁定为止。 Lock类主要执行两项任务。第一个是使用acquire()函数获取锁,第二个是使用release()函数释放锁。
假设我们有多个任务。因此,我们创建了两个队列:第一个队列将维护任务,另一个将存储完整的任务日志。下一步是实例化流程以完成任务。如前所述,Queue类已经同步,因此我们不需要使用Lock类来获取锁。
在下面的示例中,我们将所有多进程类合并在一起。让我们看下面的例子。
范例-
from multiprocessing import Lock, Process, Queue, current_process
import time
import queue
def jobTodo(tasks_to_perform, complete_tasks):
while True:
try:
# The try block to catch task from the queue.
# The get_nowait() function is used to
# raise queue.Empty exception if the queue is empty.
task = tasks_to_perform.get_nowait()
except queue.Empty:
break
else:
# if no exception has been raised, the else block will execute
# add the task completion
print(task)
complete_tasks.put(task + ' is done by ' + current_process().name)
time.sleep(.5)
return True
def main():
total_task = 8
total_number_of_processes = 3
tasks_to_perform = Queue()
complete_tasks = Queue()
number_of_processes = []
for i in range(total_task):
tasks_to_perform.put("Task no " + str(i))
# defining number of processes
for w in range(total_number_of_processes):
p = Process(target=jobTodo, args=(tasks_to_perform, complete_tasks))
number_of_processes.append(p)
p.start()
# completing process
for p in number_of_processes:
p.join()
# print the output
while not complete_tasks.empty():
print(complete_tasks.get())
return True
if __name__ == '__main__':
main()
输出:
Task no 2
Task no 5
Task no 0
Task no 3
Task no 6
Task no 1
Task no 4
Task no 7
Task no 0 is done by Process-1
Task no 1 is done by Process-3
Task no 2 is done by Process-2
Task no 3 is done by Process-1
Task no 4 is done by Process-3
Task no 5 is done by Process-2
Task no 6 is done by Process-1
Task no 7 is done by Process-3
Python多进程池对于跨多个输入值并行执行函数至关重要。它还可用于跨进程分配输入数据(数据并行性)。考虑以下多进程池示例。
范例-
from multiprocessing import Pool
import time
w = (["V", 5], ["X", 2], ["Y", 1], ["Z", 3])
def work_log(data_for_work):
print(" Process name is %s waiting time is %s seconds" % (data_for_work[0], data_for_work[1]))
time.sleep(int(data_for_work[1]))
print(" Process %s Executed." % data_for_work[0])
def handler():
p = Pool(2)
p.map(work_log, w)
if __name__ == '__main__':
handler()
输出:
Process name is V waiting time is 5 seconds
Process V Executed.
Process name is X waiting time is 2 seconds
Process X Executed.
Process name is Y waiting time is 1 seconds
Process Y Executed.
Process name is Z waiting time is 3 seconds
Process Z Executed.
让我们了解多进程池的另一个示例。
示例-2
from multiprocessing import Pool
def fun(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(fun, [1, 2, 3]))
输出:
[1, 8, 27]
代理对象称为驻留在不同进程中的共享对象。该对象也称为代理。多个代理对象可能具有相似的引用。代理对象由各种方法组成,这些方法用于调用其引用对象的相应方法。以下是代理对象的示例。
范例-
from multiprocessing import Manager
manager = Manager()
l = manager.list([i*i for i in range(10)])
print(l)
print(repr(l))
print(l[4])
print(l[2:5])
输出:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
16
[4, 9, 16]
代理对象是可拾取的,因此我们可以在进程之间传递它们。这些对象还用于控制同步级别。
到目前为止,我们已经讨论了使用Python进行多处理的基本概念。多处理本身就是一个广泛的主题,对于在单个系统中执行各种任务至关重要。我们正在定义一些基本功能,这些功能通常用于实现多处理。
Method | Description |
---|---|
pipe() | The pipe() function returns a pair of connection objects. |
run() | The run() method is used to represent the process activities. |
start() | The start()method is used to start the process. |
join([timeout]) | The join() method is used to block the process until the process whose join() method is called terminates. The timeout is optional argument. |
is_alive() | It returns if process is alive. |
terminate() | As the name suggests, it is used to terminate the process. Always remember – the terminate() method is used in Linux, for Windows, we use TerminateProcess() method. |
kill() | This method is similar to the terminate() but using the SIGKILL signal on Unix. |
close() | This method is used to close the Process object and releases all resources associated with it. |
qsize() | It returns the approximate size of the queue. |
empty() | If queue is empty, it returns True. |
full() | It returns True, if queue is full. |
get_await() | This method is equivalent get(False). |
get() | This method is used to get elements from the queue. It removes and returns an element from queue. |
put() | This method is used to insert an element into the queue. |
cpu_count() | It returns the number of working CPU within the system. |
current_process() | It returns the Process object corresponding to the current process. |
parent_process() | It returns the parent Process object corresponding to the current process. |
task_done() | This function is used indicate that an enqueued task is completed. |
join_thread() | This method is used to join the background thread |