Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

解决issue#1008,补充关于多进程的示例 #1030

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions Day01-15/13.进程和线程.md
Original file line number Diff line number Diff line change
Expand Up @@ -486,4 +486,73 @@ if __name__ == '__main__':
```

比较两段代码的执行结果(在我目前使用的MacBook上,上面的代码需要大概6秒左右的时间,而下面的代码只需要不到1秒的时间,再强调一次我们只是比较了运算的时间,不考虑列表创建及切片操作花费的时间),使用多进程后由于获得了更多的CPU执行时间以及更好的利用了CPU的多核特性,明显的减少了程序的执行时间,而且计算量越大效果越明显。当然,如果愿意还可以将多个进程部署在不同的计算机上,做成分布式进程,具体的做法就是通过`multiprocessing.managers`模块中提供的管理器将`Queue`对象通过网络共享出来(注册到网络上让其他计算机可以访问),这部分内容也留到爬虫的专题再进行讲解。
##### 上述例子的补充内容:
这部分的代码中的p.start()在启动进程后就马上地开始运算,这样导致后续 start = time()开始记时的时候有部分进程实际已经计算了一些数据。实际的代码这样写没有什么问题,这样写的好处简单易懂,但为了严谨一些,我们可以用下面的思路来解决这个问题。

我们先需要加入一些必要的库和说明
psutil 用于访问系统详情和进程控制,用来显示一些cpu和进程的信息。
Process 用于创建新的进程
Event 是一个用于进程间通信的同步原语,可以设置信号状态为真,让所有阻塞等待这个事件的进程继续运行。
思路:
1.我们可以先初始化进程但不启动它们:创建每个进程后,将它们加入到进程列表中,但不立即调用 p.start()。
2.接着设置一个全局的 Event:使用 Event 来控制所有进程的启动。所有进程将在 start_event.wait() 调用处阻塞,直到 start_event.set() 被调用。
3.统一启动所有进程:在主进程中,首先设置好计时开始的点 start = time(),然后通过 start_event.set() 释放所有子进程的阻塞状态,允许它们开始执行。这样使得所有进程的任务同步开始计算。
4.开始并同步所有进程:等待所有任务结束后,合并执行结果。

代码中还加入查看每个进程任务所花费的用时和以及进程分配的cpu信息(这样保证不同进程都执行差不多的时间),同时也有最终总执行的时间。
```python

import psutil
from multiprocessing import Process, Queue, Event
from time import time,sleep

def task_handler(curr_list, result_queue, start_event, cpu_id):
psutil.Process().cpu_affinity([cpu_id]) # 尝试设置CPU亲和性
start_event.wait() # 等待从主进程发出的开始信号
start_time = time() # 记录开始时间
total = sum(curr_list)
end_time = time() # 记录结束时间
# 将结果和时间放入队列
result_queue.put((total, start_time, end_time, cpu_id))

def main():
processes = []
number_list = [x for x in range(1, 100000001)]
result_queue = Queue()
start_event = Event() # 创建一个事件用于同步开始计算
cpus = psutil.cpu_count(logical=False) # 获取物理核心数
index = 0

# 创建并启动进程,尽量平均分配到不同的CPU核心上
for i in range(8):
cpu_id = i % cpus
p = Process(target=task_handler, args=(number_list[index:index + 12500000], result_queue, start_event, cpu_id))
index += 12500000
processes.append(p)
p.start()

# sleep(5)
start = time()

# 发送开始计算的信号
start_event.set()

# 等待所有进程完成
for p in processes:
p.join()

# 计算完成,合并执行结果
total = 0
while not result_queue.empty():
subtotal, proc_start_time, proc_end_time, cpu_id = result_queue.get()
total += subtotal
# 正确打印每个进程的执行时间和所在CPU
print(f'Process on CPU {cpu_id} took: {proc_end_time - proc_start_time:.3f} seconds')
end = time()
print(total)
print('Total Execution time: %.3fs' % (end - start))
print('Total execution time: ', (proc_end_time - proc_start_time), 's', sep='')

if __name__ == '__main__':
main()
```