最新消息:XAMPP默认安装之后是很不安全的,我们只需要点击左方菜单的 "安全"选项,按照向导操作即可完成安全设置。

python 多进程间相互协调: Queue 队列和pipe管道[数据传递]

XAMPP案例 admin 348浏览 0评论

multiprocessing 多进程之间相互协调的方式有如下几种:

  • Lock:锁,
  • Semaphore:信号量
  • Queue:队列,
  • Event:事件
  • Pipe:管道

在多进程方式并发编程中,

  • 如果我们实在无法避免进行数据共享的话(通常应该会尽量避免使用共享数据的方式进行通讯)应尽可能使用消息传递和队列,这样我们可以避免处理复杂的同步和锁问题,

管道空间

多个进程使用同一空间进行IO操作,通常情况下,需要在内存中开辟一块空间,这个空间就是管道空间。

管道空间按照作用分类:

  • 双向管道:全双工,所有进程均可读写(默认)
  • 单向管道:半双工,一个只读,一个只写

PS:管道均由Pipe()实现

如果多个进程同时进入管道的话,数据依旧是乱序的,所以通常还是需要加锁保证数据的安全,而multiproce>ssing还提供了Queue,即队列,队列就是管道加锁的体现。

所以 Queue:队列是多进程安全的队列,通常可以使用Queue实现多进程之间的数据传递。

Queue常见应用场景:

如:生产者消费者模式中经常使用的方式:

  • 一个进程向队列写数据
  • 另一个进程从队列取数据

生产者和消费者模式是通过一个容器来来解决生产者和消费者之间的耦合度问题。

他们之间彼此不直接通信。当生产者生产完数据的时候,向队列写数据,然后消费者从队列取数据,有一个缓冲的作用,被称之为阻塞队列。可以起到缓冲或平衡的作用!

Queue模块的一些方法:

  • qsize(): 返回队列的大致长度。由于多线程或者多进程的上下文,这个数字是不可靠的。

ps:注意,在 Unix 平台上,例如 Mac OS X ,这个方法可能会抛出 NotImplementedError  异常,因为该平台没有实现 sem_getvalue() 。

  • empty():如果队列是空的,返回 True ,反之返回 False 。由于多线程或多进程的环境,该状态是不可靠的。
  • full():如果队列是满的,返回 True ,反之返回 False 。由于多线程或多进程的环境,该状态是不可靠的。
  • put(obj[, block[, timeout]]):将 obj 放入队列。如果可选参数 block 是 True (默认值) 而且 timeout 是 None (默认值), 将会阻塞当前进程,直到有空的缓冲槽。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的缓冲槽时抛出 queue.Full  异常。反之 (block 是 False 时),仅当有可用缓冲槽时才放入对象,否则抛出 queue.Full 异常 (在这种情形下 timeout 参数会被忽略)。
  • put_nowait(obj) 相当于 put(obj, False)。取不到值时触发异常:Empty;
  • get([block[, timeout]]):从队列中取出并返回对象。如果可选参数 block 是 True (默认值) 而且 timeout 是 None (默认值), 将会阻塞当前进程,直到队列中出现可用的对象。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的对象时抛出 queue.Empty 异常。反之 (block 是 False 时),仅当有可用对象能够取出时返回,否则抛出 queue.Empty 异常 (在这种情形下 timeout 参数会被忽略)。

PS:如果队列为空,则立即抛出Queue.Empty异常

  • get_nowait():相当于 get(False) 。
  • close():指示当前进程将不会再往队列中放入对象。一旦所有缓冲区中的数据被写入管道之后,后台的线程会退出。这个方法在队列被gc回收时会自动调用。

生产者和消费者的示例1:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     jincheng7
   文件功能描述 :   功能描述
   创建人 :       小钟同学
   创建时间 :          2021/5/7
-------------------------------------------------
   修改描述-2021/5/7:         
-------------------------------------------------
"""

from multiprocessing import Process, Manager, Queue
import os
import time

def jincheng1(dict):
    print("子进程id:%s" % (os.getpid()))
    time.sleep(1)
    dict[os.getpid()] = "我子进程:"+str(os.getpid())
    print("子进程id:%s下的dict内容:" % (os.getpid()),dict)

def jincheng2(dict):
    print("子进程id:%s" % (os.getpid()))
    time.sleep(1)
    dict[os.getpid()] = "我子进程:"+str(os.getpid())
    print("子进程id:%s下的dict内容:" % (os.getpid()),dict)

def consume(queue):
    while True:
        if not queue.empty():
            mDict = queue.get()
            print('消费者的处理最终结果:')
            for key in dict(mDict):
                print("%s=%s" % (key, mDict[key]))
        else:
            continue

def produceProcess(mQueue):
    # 不断的创建的新的进程把消息放到队列里
    while True:
        start = time.time()

        manager = Manager()
        mDict = manager.dict()

        ps_1 = Process(target=jincheng1, args=(mDict,))
        ps_1.start()
        ps_1.join()
        ps_2 = Process(target=jincheng2, args=(mDict,))
        ps_2.start()
        ps_2.join()

        mQueue.put(mDict)
        print('主进程运行时间:%s' % (time.time() - start))

if __name__ == '__main__':
    # 创建一个队列存储共享数据
    mQueue = Queue()

    # 创建一个消费者进程从队列mQueue中读取数据
    ps_detect = Process(target=consume, args=(mQueue,))
    ps_detect.start()

    # 主进程做为生产者,将三个子进程的预处理结果存入队列mQueue
    produceProcess(mQueue)

输出的结果:

消费者的处理最终结果:
42336=我子进程:42336
39096=我子进程:39096
子进程id:32896
子进程id:32896下的dict内容: {32896: '我子进程:32896'}
子进程id:35964
子进程id:35964下的dict内容: {32896: '我子进程:32896', 35964: '我子进程:35964'}
主进程运行时间:7.566763877868652
消费者的处理最终结果:
32896=我子进程:32896
35964=我子进程:35964
子进程id:36356
子进程id:36356下的dict内容: {36356: '我子进程:36356'}
子进程id:43308
子进程id:43308下的dict内容: {43308: '我子进程:43308', 36356: '我子进程:36356'}
主进程运行时间:2.718729019165039
消费者的处理最终结果:
36356=我子进程:36356

多个生产者和多个消费者下的JoinableQueue

JoinableQueue 主要用于我们的Queue队列允许项目的消费者来通知生产者已经成功处理.

PS:通知进程是通过共享的信号和条件(添加一个添加到我们的队列里面。消费里面判断条件)

JoinableQueue是Queue的子类,额外添加了

  • task_done()
  • join()

JoinableQueue 类是 Queue 的子类,额外添加了 task_done() 和 join() 方法。

  • task_done() 指出之前进入队列的任务已经完成。由队列的消费者进程使用。对于每次调用 get() 获取的任务,执行完成后调用 task_done() 告诉队列该任务已经处理完成。如果 join() 方法正在阻塞之中,该方法会在所有对象都被处理完的时候返回 (即对之前使用 put() 放进队列中的所有对象都已经返回了对应的 task_done() ) 。

    如果被调用的次数多于放入队列中的项目数量,将引发 ValueError 异常 。

  • join() 阻塞至队列中所有的元素都被接收和处理完毕。当条目添加到队列的时候,未完成任务的计数就会增加。每当消费者进程调用 task_done() 表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候, join() 阻塞被解除。

示例:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     jincheng9
   文件功能描述 :   功能描述
   创建人 :       小钟同学
   创建时间 :          2021/5/7
-------------------------------------------------
   修改描述-2021/5/7:         
-------------------------------------------------
"""
import time
import random
from multiprocessing import Process
from multiprocessing import JoinableQueue

def consumer(name, q):
    while True:
        result = q.get()
        time.sleep(random.randint(1, 3))
        print('消费者-->%s 购买了--->:%s' % (name, result))
        q.task_done()  # 发送信号给生产者的q.join(),通知已处理完从队列中拿走的一个项目

def producer(name, q):
    for i in range(5):
        time.sleep(random.randint(1, 2))  # 模拟生产时间
        result = '麦辣鸡编号为: %s ' % i
        q.put(result)
        print('生产者-->%s 生产了%s' % (name, result))
    q.join()  # 需等到消费者把自己放入队列中的所有项目都取走处理完后且收到调用task_done()之后,生产者才能结束

if __name__ == '__main__':
    q = JoinableQueue()

    p1 = Process(target=producer, args=('肯德基1号店', q))
    p2 = Process(target=producer, args=('肯德基2号店', q))

    c1 = Process(target=consumer, args=('客户1', q))
    c2 = Process(target=consumer, args=('客户2', q))
    c3 = Process(target=consumer, args=('客户3', q))
    # 设置为守护进程,随着主进程的退出而退出
    c1.daemon = True
    c2.daemon = True
    c3.daemon = True
    # 启动生产者
    p1.start()
    p2.start()
    # # 等待所有的生产者执行完成后才退出
    # p1.join()
    # p2.join()

    # 启动消费者
    c1.start()
    c2.start()
    c3.start()

    # 等待所有的生产者执行完成后才退出
    p1.join()
    p2.join()

输出结果为:

D:\ceshi>python jincheng9.py
生产者-->肯德基1号店 生产了麦辣鸡编号为: 0
生产者-->肯德基2号店 生产了麦辣鸡编号为: 0
生产者-->肯德基1号店 生产了麦辣鸡编号为:1
生产者-->肯德基2号店 生产了麦辣鸡编号为:1
消费者-->客户1 购买了--->:麦辣鸡编号为: 0
消费者-->客户2 购买了--->:麦辣鸡编号为: 0
生产者-->肯德基1号店 生产了麦辣鸡编号为:2
生产者-->肯德基2号店 生产了麦辣鸡编号为:2
消费者-->客户2 购买了--->:麦辣鸡编号为:2
消费者-->客户3 购买了--->:麦辣鸡编号为:1
消费者-->客户1 购买了--->:麦辣鸡编号为:1
生产者-->肯德基2号店 生产了麦辣鸡编号为:3
生产者-->肯德基1号店 生产了麦辣鸡编号为:3
消费者-->客户1 购买了--->:麦辣鸡编号为:3
生产者-->肯德基2号店 生产了麦辣鸡编号为:4
生产者-->肯德基1号店 生产了麦辣鸡编号为:4
消费者-->客户2 购买了--->:麦辣鸡编号为:2
消费者-->客户3 购买了--->:麦辣鸡编号为:3
消费者-->客户2 购买了--->:麦辣鸡编号为:4
消费者-->客户1 购买了--->:麦辣鸡编号为:4

管道Pipe

管道Pipe和Queue的作用大致差不多,也是实现进程间的通信

Pipe的本质是进程之间的数据传递,不是数据共享。

PS:如果两个进程试图在同一时间的同一端进行读取和写入那么,这可能会损坏管道中的数据。所以这种方式通常不推荐使用了!!!

非进程安全

管道实例的创建:

使用 Pipe 实现进程通信,首先需要调用 multiprocessing.Pipe() 函数来创建一个管道。该函数的语法格式如下:

conn1, conn2 = multiprocessing.Pipe( [duplex=True] )

  • conn1 和 conn2 分别用来接收 Pipe 函数返回的 2 个端口;
  • duplex 参数默认为 True,表示该管道是双向的,即位于 2 个端口的进程既可以发送数据,也可以接受数据,
  • 如果将 duplex 值设为 False,则表示管道是单向的,conn1 只能用来接收数据,而 conn2 只能用来发送数据。

另外值得一提的是,conn1 和 conn2 都属于 PipeConnection 对象,它们还可以调用表 2 所示的这些方法。

管道方法介绍:

pipe()类似我们的socket的通讯,返回两个连接对象分别表示管道的两端, 每端都有

  • send()
  • recv()方法

其他方法:

  • conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
  • conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
  • conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
  • conn1.fileno():返回连接使用的整数文件描述符
  • conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout设成None,操作将无限期地等待数据到达。
  • conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常, 并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
  • conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收

示例:

from multiprocessing import Process, Pipe
def fun1(conn):
    print('子进程发送消息:')
    conn.send('你好主进程:我是你孙子')
    print('子进程接受消息:')
    print(conn.recv())
    conn.close()

if __name__ == '__main__':
    conn1, conn2 = Pipe() #关键点,pipe实例化生成一个双向管
    p = Process(target=fun1, args=(conn2,)) #conn2传给子进程
    p.start()
    print('主进程接受消息:')
    print(conn1.recv())
    print('主进程发送消息:')
    conn1.send("你好子进程,:我是你大爷")
    p.join()
    print('结束')

输出结果:

D:\ceshi>python jincheng10.py
主进程接受消息:
子进程发送消息:
子进程接受消息:
你好主进程:我是你孙子
主进程发送消息:
你好子进程,:我是你大爷
结束

转载请注明:XAMPP中文组官网 » python 多进程间相互协调: Queue 队列和pipe管道[数据传递]

您必须 登录 才能发表评论!