最新消息:XAMPP默认安装之后是很不安全的,我们只需要点击左方菜单的 "安全"选项,按照向导操作即可完成安全设置。

Python基础入门学习之多进程示例

XAMPP案例 admin 660浏览 0评论

0Python

目录1、多进程示例:创建多进程示例:创建多进程获取子进程任务结果2、进程池同步异步多个不同的任务方法执行同一个数据源返回不同的结果多个数据源,执行相同的任务方法校验,发现不合格,就退出不再执行回调函数-接收子进程返回结果,实时处理

1、多进程

示例:创建多进程
import os
import time
from multiprocessing import Process


def task(num):
   print(f"父进程:{os.getppid()}-创建子进程{os.getpid()},执行任务:{num}")
   time.sleep(3)


if __name__ == '__main__':
   start = time.time()
   p_list = []
   print("当前环境进程:", os.getpid())
   for i in range(5):
       p = Process(group=None, target=task, args=(i,), kwargs={}, name=f"进程名:校长{i}")
       p.start()
       p_list.append(p)
  [o.join() for o in p_list]
   print(f"父进程等待每个子进程任务结束,总耗时:{time.time()-start}")
   
   
"""
结果如下:
当前环境进程:14276
父进程:14276-创建子进程8492,执行任务:0
父进程:14276-创建子进程15080,执行任务:1
父进程:14276-创建子进程14920,执行任务:2
父进程:14276-创建子进程13528,执行任务:4
父进程:14276-创建子进程13452,执行任务:3
父进程等待每个子进程任务结束,总耗时:3.257822036743164

因为每个子进程内存空间是隔离的,所以此时是无法得到每个任务的结果的,一般想得到结果可以采用队列的方式保存!
"""  
示例:创建多进程获取子进程任务结果
import os
from multiprocessing import (Process, Queue)


def task(num, q):
   print(f"父进程:{os.getppid()}-创建子进程{os.getpid()},执行任务:{num}")
   q.put(f"任务:{os.getpid()}执行结果:{num}")


if __name__ == '__main__':
   q_obj = Queue()
   p_list = []
   for i in range(5):
       p = Process(target=task, args=(i, q_obj))
       p.start()
       p_list.append(p)

   # 遍历每个子进程,确认其执行完毕
  [o.join() for o in p_list]
   # 遍历每个子进程,获取对应子进程执行任务结果
   response = [q_obj.get() for j in p_list]
   print(f"子进程执行的结果集为:{response}")

"""
执行结果为:
父进程:14472-创建子进程10172,执行任务:1
父进程:14472-创建子进程13464,执行任务:0
父进程:14472-创建子进程1548,执行任务:3
父进程:14472-创建子进程10692,执行任务:4
父进程:14472-创建子进程13412,执行任务:2
子进程执行的结果集为:['任务:10172执行结果:1', '任务:13464执行结果:0', '任务:1548执行结果:3', '任务:10692执行结果:4', '任务:13412执行结果:2']
"""

2、进程池

特点:同时开启指定数量的进程(一般CPU个数),并行执行任务,用于高计算,并行,有任务执行返回值

同步

任务结果顺序是按照提交任务结果的顺序,同步也就是按进程创建的顺序

import os
import time
from multiprocessing import Pool

# 获取cpu个数
print(os.cpu_count())


def task(flag):
   print(f"进程:{os.getppid()},创建子进程:{os.getpid()},执行任务{flag}")
   time.sleep(2)
   return flag


if __name__ == '__main__':
   start = time.time()
   pool = Pool(processes=os.cpu_count())
   job_list = range(10)
   results = []
   for i in job_list:
       # apply的方式
       ret = pool.apply(task, (i,))  # 同步,进程池是有返回值的
       
       # map的方式
       # ret = p.map(task, (job,)) # 同步,一共执行10个任务
       
       results.append(ret)
   pool.close()   # 关闭 并不是进程池中的进程不工作了,而是关闭了进程池,让任务不能再继续提交了
   pool.join()    # 等待这个池中提交的任务都执行完,表示等待所有子进程中的代码都执行完 主进程才结束
   
   print(f"apply-任务执行结果合集:{results}")   ----> 任务执行结果合集:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
   print(f"apply-总耗时:{time.time()-start}")  ----> 总耗时:20.55962562561035
       
   print(f"map-执行的结果为:{result_list}")   ---> 执行的结果为:[[0], [1], [4], [9], [16], [25], [36], [49], [64], [81]]
   print(f"map-总耗时:{time.time() - start}") ---> 总耗时:20.336746215820312
异步

任务结果是按照子进程提交任务的顺序,结果顺序不可控,要求任务关联性不高

import os
import time
from multiprocessing import Pool

# 获取cpu个数
print(os.cpu_count())


def task(flag):
   print(f"进程:{os.getppid()},创建子进程:{os.getpid()},执行任务{flag}")
   time.sleep(2)
   return flag


if __name__ == '__main__':
   start = time.time()
   pool = Pool(processes=os.cpu_count())
   job_list = range(10)
   results = []
   for i in job_list:
       # apply_async方式
       ret = pool.apply_async(task, (i,))  # 异步,一次并行执行池子里配置的数量的任务
       
       # map_async方式
       ret = p.map_async(task, (job,))  # 同步,一共执行5个任务
       results.append(ret)
   pool.close()
   pool.join()
   ret = [job.get() for job in results]   # 异步需要调用get方法
   print(f"apply_async-任务执行结果合集:{ret}")        ----> 异步时,任务执行结果合集:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
   print(f"apply_async-总耗时:{time.time() - start}") ----> 异步时,总耗时:4.462151765823364  
       
   print(f"map_async-执行的结果为:{result_list}")   ---> 执行的结果为:[[0], [1], [4], [9], [16], [25], [36], [49], [64], [81]]
   print(f"map_async-总耗时:{time.time() - start}") ---> 总耗时:4.353601694107056    
# 在异步提交中,可以不用join(),主进程会执行完代码,但会等待进程池中的任务结束,才结束
多个不同的任务方法执行同一个数据源返回不同的结果
# 对同一份请求报文,执行不同的校验,返回对应的结果
import os
from multiprocessing import Pool


def task1(data):
   print(f"子进程:{os.getpid()},执行任务")
   print(data.get("num"))  # 5
   code = 1
   if data["num"] > 10:
       code = 0
   return dict(code=code)


def task2(data):
   print(f"子进程:{os.getpid()},执行任务")
   code = 1
   if data["num"] < 9:
       code = 0
   return dict(code=code)


def task3(data):
   print(f"子进程:{os.getpid()},执行任务")
   code = 1
   if 2 < data["num"] < 9:
       code = 0
   return dict(code=code)


if __name__ == '__main__':
   
   pool = Pool(os.cpu_count())
   jobs = [task1, task2, task3]
   results = []
   data = {"num": 5}
   for i in jobs:
       # apply_async方式
       ret = pool.apply_async(i, (data,))
       
       results.append(ret)
   pool.close()
   pool.join()
   print(f"结果集为:{[j.get() for j in results]}")  ---> 结果集为:[{'code': 1}, {'code': 0}, {'code': 0}]
   
   # map_async方式
   pool = Pool(os.cpu_count())
   jobs = [task1, task2, task3]
   results = []
   data = {"num": 5}
   for i in jobs:
       ret = pool.map_async(job, (data,))
       results.append(ret)
   pool.close()
   pool.join()
   print(f"结果集为:{[j.get() for j in results]}")  ---> 结果集为:[[{'code': 1}], [{'code': 0}], [{'code': 0}]
多个数据源,执行相同的任务方法校验,发现不合格,就退出不再执行

只要得到自己想要的结果,就结束,节约资源

import os
import time
from queue import Queue
from multiprocessing import Pool

"""不同的数据源,执行相同的任务"""


def task(data):
   time.sleep(0.05)
   print(data)
   if data == 5:
       return False
   else:
       return True


if __name__ == '__main__':
   pool = Pool(os.cpu_count())
   q = Queue()
   data_list = range(1000)
   for i in data_list:
       ret = pool.apply_async(task, args=(i,))
       q.put(ret)
   pool.close()
   while True:
       p_result_obj = q.get()
       flag = p_result_obj.get()
       print(flag)
       if flag is False:
           # 如果校验失败,则退出不再执行后面的
           print("发现校验失败,结束进程池中的所有子进程")
           pool.terminate()
           break

   pool.join()
   print("执行后面的逻辑")
   
# 升级版本:多进程+多线程
import os
import time
from queue import Queue
from threading import Thread,get_ident
from multiprocessing import Pool


def task(data):
   time.sleep(0.05)
   print(f"主进程{os.getppid()},创建子进程{os.getpid()},执行任务{data}")
   print(data)
   if data == 5:
       return False
   else:
       return True


def pool_th(data_list, q, pool):
   for i in data_list:
       # 创建多个子进程,异步只是将任务添加到队列,还没有执行完
       q.put(pool.apply_async(task, args=(i,)))


def result_th(q, p):
   while True:
       flag = q.get().get()  # 获取子进程结果
       print(f"在子线程{get_ident()}中获取子进程执行的结果进行判断:{flag}")
       if not flag:
           p.terminate()  # 结束所有子进程
           break


if __name__ == '__main__':
   result_q = Queue()
   pool = Pool()
   data_list = range(1000)
   # 开启多线程
   t1 = Thread(target=pool_th, args=(data_list, result_q, pool))
   t2 = Thread(target=result_th, args=(result_q, pool))
   t1.start()
   t2.start()
   t1.join()
   t2.join()
   pool.join()
   print("执行后面的逻辑")
   
# 这里有个通病,那就是结束进程池子进程任务时,有的子进程已经执行了,因为每个子进程执行的结果插入顺序不是有序的,而我们需要的那个结果也许虽然按任务来说在前面,但它执行的时间并不一定在前面

回调函数-接收子进程返回值,实时处理

定义:将一个进程的执行结果的返回值,会当callback参数来执行配置的callback函数,从而减少获取子进程结果等I/O操作浪费的时间

作用:进程池中的任何一个任务一旦处理完了,就立即告知主进程,我已执行完毕,你可以处理我的结果了,主进程则调用一个函数【你配置的回调函数】去处理该结果。

注意:回调函数是没有返回值,所以回调函数一般可用于对子进程结果的判断后,然后写库等等操作

from multiprocessing import Pool
import time
import os


def task(num):
    time.sleep(2)
    print(f"父进程{os.getppid()},子进程:{os.getpid()}执行任务{num}")
    if num > 3:
        res = {"job_num": num, "result": 0}
    else:
        res = {"job_num": num, "result": 1}
    return res


def back(args):
    print(f"回调即时处理每个子进程返回的结果{args},其父进程为:{os.getpid()}")
    if args["result"] == 0:
        print("写库")
    else:
        print(f"记录日志,任务{args['job_num']},执行失败,结果为:{args['result']}")


if __name__ == '__main__':
    print(f"当前进程:{os.getpid()},父:{os.getppid()}")
    jobs = range(10)
    p = Pool(os.cpu_count())
    for job in jobs:
        p.apply_async(func=task, args=(job,), callback=back)
    p.close()
    p.join()

转载请注明:XAMPP中文组官网 » Python基础入门学习之多进程示例

您必须 登录 才能发表评论!