Python并行处理数据


作者Lou Xiao创建时间2020-05-03 21:16:00更新时间2020-05-03 21:23:00

机器配置

  • CPU: AMD Ryzen 5 3600 6-Core Processor
  • RAM: 32GiB
  • OS: CentOS Linux 7 (Core)

(1)普通版本

1.双击鼠标左键复制此行;2.单击复制所有代码。
                                
                                    
1 import time
2
3 # 这个任务计算是独立,和其它完全不相关(不依赖、不影响)
4 def work(a, b, c):
5 return 3.0 * a + 5.0 * b + 7.0 * c
6
7
8 if __name__ == '__main__':
9 total_sum = 0.0
10 start_time = time.time()
11 for i in range(1, 10):
12 for j in range(1, 100):
13 for k in range(1, 100):
14 total_sum += work(i, j, k)
15 end_time = time.time()
16 total_time = end_time - start_time
17 print("sum: %d ; time: %10.2fs" % (total_sum, total_time))

运行结果

1.双击鼠标左键复制此行;2.单击复制所有代码。
                                
                                    
1 sum: 54248535 ; time: 0.03s

(2)并行化版本1

1.双击鼠标左键复制此行;2.单击复制所有代码。
                                
                                    
1 import time
2 import multiprocessing
3
4
5 # python 3.6.10
6
7 def work(a, b, c):
8 return 3.0 * a + 5.0 * b + 7.0 * c
9
10
11 if __name__ == '__main__':
12 start_time = time.time()
13 pool = multiprocessing.Pool(4)
14 # 限制任务队列最大数量
15 pool._taskqueue.maxsize = 1000
16 future_list = []
17 for i in range(1, 10):
18 for j in range(1, 100):
19 for k in range(1, 100):
20 # 注意此处,并行运行任务
21 future = pool.apply_async(work, args=(i, j, k))
22 future_list.append(future)
23 # 收集计算结果
24 total_sum = 0.0
25 for future in future_list:
26 # 得到 work的计算结果
27 result = future.get()
28 total_sum += result
29 pool.close()
30 end_time = time.time()
31 total_time = end_time - start_time
32 print("sum: %d ; time: %10.2fs" % (total_sum, total_time))

运行结果

1.双击鼠标左键复制此行;2.单击复制所有代码。
                                
                                    
1 sum: 54248535 ; time: 6.37s

(3)并行化版本2(改进版本)

1.双击鼠标左键复制此行;2.单击复制所有代码。
                                
                                    
1 import time
2 import multiprocessing
3
4
5 # python 3.6.10
6
7
8 def work(a):
9 _sum = 0.0
10 for j in range(1, 100):
11 for k in range(1, 100):
12 _sum += 3.0 * a + 5.0 * j + 7.0 * k
13 return _sum
14
15
16 if __name__ == '__main__':
17 start_time = time.time()
18 pool = multiprocessing.Pool(4)
19 # 限制任务队列最大数量
20 pool._taskqueue.maxsize = 1000
21 future_list = []
22 for i in range(1, 10):
23 # 注意此处,并行运行任务
24 future = pool.apply_async(work, args=(i,))
25 future_list.append(future)
26 # 收集计算结果
27 total_sum = 0.0
28 for future in future_list:
29 # 得到 work的计算结果
30 result = future.get()
31 total_sum += result
32 pool.close()
33 end_time = time.time()
34 total_time = end_time - start_time
35 print("sum: %d ; time: %10.2fs" % (total_sum, total_time))

运行结果

1.双击鼠标左键复制此行;2.单击复制所有代码。
                                
                                    
1 sum: 54248535 ; time: 0.01s

结论

  • 演示如果把串行任务改成并行
  • 并行化版本1计算时间显示:太小的计算任务使用并行化反而更慢,说明并行化是有额外损耗的。并行化要考虑具体情况。
  • 子任务计算量要划分合适,不能太小。
文章目录