这篇文章将会讲解在 Python 中使用
多进程模块时在进程之间共享数据和消息传递的概念。
在多处理中,任何新创建的进程都将执行以下操作:
可以考虑使用下面的程序来理解此概念:
import multiprocessing
# 具有全局作用域的空列表
result = []
def square_list(mylist):
"""函数对给定列表进行平方运算"""
global result
# 将mylist的方块附加到全局列表结果
for num in mylist:
result.append(num * num)
# 打印全局列表结果
print("Result(in process p1): {}".format(result))
if __name__ == "__main__":
# 输入列表
mylist = [1, 2, 3, 4]
# 创建新进程
p1 = multiprocessing.Process(target=square_list, args=(mylist,))
# 开启进程
p1.start()
# 等待进程完成
p1.join()
# 打印全局结果列表
print("Result(in main program): {}".format(result))
以下是输出结果
Result(in process p1): [1, 4, 9, 16]
Result(in main program): []
在上面的示例中,我们尝试在两个位置打印全局列表结果的内容:
- 在square_list函数中。由于此函数由进程 p1 调用,因此结果列表仅在进程 p1 的内存空间中更改。
- 在主程序中完成过程 p1 后。由于主程序由不同的进程运行,因此其内存空间仍包含空的结果列表。
下图显示了这个概念:
在进程之间共享数据
共享内存
共享内存: 多进程模块提供数组和值对象以在进程之间共享数据。
- 数组: 从共享内存中分配的 ctypes 数组。
- 值: 从共享内存中分配的 ctypes 对象。
下面给出的是一个简单的示例,显示了使用 Array 和 Value 在进程之间共享数据。
import multiprocessing
def square_list(mylist, result, square_sum):
"""函数对给定列表进行平方运算"""
# 将mylist的方块附加到结果数组
for idx, num in enumerate(mylist):
result[idx] = num * num
# 平方和值
square_sum.value = sum(result)
# 打印结果数组
print("Result(in process p1): {}".format(result[:]))
# 打印square_sum值
print("Sum of squares(in process p1): {}".format(square_sum.value))
if __name__ == "__main__":
# 输入列表
mylist = [1, 2, 3, 4]
# 创建int数据类型的数组,其中有4个整数的空格
result = multiprocessing.Array('i', 4)
# 创建int数据类型的值
square_sum = multiprocessing.Value('i')
# 创建新流程
p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))
# 正在启动进程
p1.start()
# 等待进程完成
p1.join()
# 打印结果数组
print("Result(in main program): {}".format(result[:]))
# 打印square_sum值
print("Sum of squares(in main program): {}".format(square_sum.value))
运行结果:
Result(in process p1): [1, 4, 9, 16]
Sum of squares(in process p1): 30
Result(in main program): [1, 4, 9, 16]
Sum of squares(in main program): 30
让我们尝试逐行理解上面的代码:
-
首先,我们创建一个数组结果,如下所示: result = multiprocessing.Array('i', 4)
- 第一个参数是数据类型。“i”代表整数,而“d”代表浮点数据类型。
- 第二个参数是数组的大小。在这里,我们创建一个包含 4 个元素的数组。
同样,我们创建一个价值square_sum如下所示: square_sum = multiprocessing.Value('i')
在这里,我们只需要指定数据类型。该值可以给出一个初始值(例如10),如下所示: square_sum = multiprocessing.Value('i', 10)
-
其次,我们在创建 Process 对象时将结果和square_sum作为参数传递。 p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))
-
通过指定数组元素的索引,为结果数组元素指定一个值。 for idx, num in enumerate(mylist):
result[idx] = num * num
square_sum通过使用其 value 属性为其赋值: square_sum.value = sum(result)
-
为了打印结果数组元素,我们使用 result[:] 来打印完整的数组。 print("Result(in process p1): {}".format(result[:]))
square_sum值简单地打印为: print("Sum of squares(in process p1): {}".format(square_sum.value))
下图描述了进程如何共享数组和值对象:
服务器进程
服务器进程 : 每当python程序启动时,服务器进程也会启动。从那时起,每当需要新进程时,父进程就会连接到服务器并请求它分叉新进程。 服务器进程可以保存Python对象,并允许其他进程使用代理操作它们。 多处理模块提供了一个管理器类,用于控制服务器进程。因此,经理提供了一种创建可在不同流程之间共享的数据的方法。
服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型,如列表、字典、队列、值、数组等。此外,单个管理器可以由网络上不同计算机上的进程共享。但是,它们比使用共享内存慢。
请考虑下面给出的示例:
import multiprocessing
def print_records(records):
"""用于打印记录(列表)中的记录(元组)的函数"""
for record in records:
print("Name: {0}\nScore: {1}\n".format(record[0], record[1]))
def insert_record(record, records):
"""向记录(列表)添加新记录的函数"""
records.append(record)
print("已添加新记录!\n")
if __name__ == '__main__':
with multiprocessing.Manager() as manager:
# 在服务器进程内存中创建列表
records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin', 9)])
# 要插入到记录中的新记录
new_record = ('Jeff', 8)
# 创建新流程
p1 = multiprocessing.Process(target=insert_record, args=(new_record, records))
p2 = multiprocessing.Process(target=print_records, args=(records,))
# 运行进程p1以插入新记录
p1.start()
p1.join()
# 运行进程p2以打印记录
p2.start()
p2.join()
运行输出结果:
已添加新记录!
Name: Sam
Score: 10
Name: Adam
Score: 9
Name: Kevin
Score: 9
Name: Jeff
Score: 8
进程已结束,退出代码为 0
让我们尝试理解上面的代码段:
-
然后,我们使用以下命令在服务器进程内存中创建一个列表记录: records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin',9)])
同样,您可以将字典创建为 manager.dict 方法。
- 最后,我们创建进程 p1(在记录列表中插入新记录)和 p2(打印记录),并在将记录作为参数之一传递时运行它们。
服务器进程的概念如下图所示:
进程之间的通信
有效使用多个流程通常需要它们之间进行一些沟通,以便可以划分工作并聚合结果。 多处理支持进程之间的两种类型的通信通道:
队列
-
队列: 在进程与多处理之间进行通信的一种简单方法是使用队列来回传递消息。任何Python对象都可以通过队列。 注意: 多处理。队列类是队列的近似克隆 。队列。 参考下面给出的示例程序: import multiprocessing
def square_list(mylist, q):
"""函数对给定列表进行平方运算"""
# 将mylist的方块附加到队列
for num in mylist:
q.put(num * num)
def print_queue(q):
"""打印队列元素的函数"""
print("队列元素:")
while not q.empty():
print(q.get())
print("队列现在为空!")
if __name__ == "__main__":
# 输入列表
mylist = [1, 2, 3, 4]
# 创建多进程队列
q = multiprocessing.Queue()
# 创建新流程
p1 = multiprocessing.Process(target=square_list, args=(mylist, q))
p2 = multiprocessing.Process(target=print_queue, args=(q,))
# 将进程p1运行到列表
p1.start()
p1.join()
# 运行进程p2以获取队列元素
p2.start()
p2.join()
运行结果:
让我们尝试一步一步地理解上面的代码:
-
首先,我们使用以下命令创建一个多处理队列: q = multiprocessing.Queue()
-
然后,我们通过进程 p1 将空队列 q 传递给square_list函数。使用 put 方法将元素插入到队列中。 q.put(num * num)
-
为了打印队列元素,我们使用 get 方法,直到队列不为空。 while not q.empty():
print(q.get())
下面给出的是一个简单的图表,描述了队列上的操作:
管道
管道: 一个管道只能有两个端点。因此,当只需要双向通信时,它优先于队列。
多处理模块提供 Pipe() 函数,该函数返回一对由管道连接的连接对象。Pipe() 返回的两个连接对象表示管道的两端。每个连接对象都有 send() 和 recv() 方法(以及其他方法)。 考虑下面给出的程序:
import multiprocessing
def sender(conn, msgs):
"""用于将消息发送到管道另一端的函数"""
for msg in msgs:
conn.send(msg)
print("已发送消息: {}".format(msg))
conn.close()
def receiver(conn):
"""用于打印从管道另一端接收的消息的函数"""
while 1:
msg = conn.recv()
if msg == "END":
break
print("收到消息: {}".format(msg))
if __name__ == "__main__":
# 要发送的消息
msgs = ["hello", "hey", "hru?", "END"]
# 创建管道
parent_conn, child_conn = multiprocessing.Pipe()
# 创建新进程
p1 = multiprocessing.Process(target=sender, args=(parent_conn, msgs))
p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
# 正在运行进程
p1.start()
p2.start()
# 等待进程完成
p1.join()
p2.join()
运行结果:
让我们尝试理解上面的代码:
-
管道是使用以下方法创建的: parent_conn, child_conn = multiprocessing.Pipe()
该函数为管道的两端返回了两个连接对象。 -
消息使用 send 方法从管道的一端发送到另一端。 conn.send(msg)
-
为了在管道的一端接收任何消息,我们使用 recv 方法。 msg = conn.recv()
-
在上面的程序中,我们将消息列表从一端发送到另一端。在另一端,我们阅读消息,直到收到“END”消息。
考虑下图,其中显示了黑白管道和过程的关系:
注意: 如果两个进程(或线程)尝试同时读取或写入管道的同一端,则管道中的数据可能会损坏。当然,同时使用管道不同端的进程不存在损坏的风险。另请注意,队列在进程之间执行适当的同步,但代价是增加了复杂性。因此,队列被认为是线程和进程安全的!
|