正文
多进程—进程同步控制,IPC
小程序:扫一扫查出行
【扫一扫了解最新限行尾号】
复制小程序
【扫一扫了解最新限行尾号】
复制小程序
multiprocessing包—Process模块开启多进程的两种方式,Process的方法,守护进程
进程同步控制—multiprocessing.Lock multiprocessing.Semaphore multiprocessing.Event
进程间通信(IPC)— multiprocessing.Pipe multiprocessing.Queue
进程间的数据共享— multiprocessing.Manager
1.是运行中的程序; 2.是系统最小的资源分配单位 3.为多个任务之间的数据安全和内存隔离做约束
进程特点
multiprocessing:提到多进程就要想到它
multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。
一、multiprocessing.Process开启多进程:
#process方法: start,join,terminate,isalive 属性 name,pid,daemon(True为守护进程)
#除了子进程代码,其他都是主进程
#注意进程的三态转换 就绪态-(时间片)-执行态,阻塞态
#process方法: start,join,terminate,isalive 属性 name,pid,daemon(True为守护进程)
import time
from multiprocessing import Process def func(): #在子进程中执行的代码,因为被注册了
print('开启一个子进程')
time.sleep(1)
print('证明,print1和print*是一起启动的') #其中p是一个主进程,实例化一个进程对象,
# p.start()这句话为主进程代码。(但是它告诉系统启动子进程,操作系统创建新进程,执行进程中的代码)
if __name__ == '__main__':#·只有在windows下开启子进程,才需要这句话,其他系统不用
p = Process(target=func) #注册函数。 可以理解为,把func这个函数,放到一个新的py文件里执行
#p是一个进程对象,告诉系统开启一个子进程
p.start() #启动子进程,子进程启动起来后,进入就绪排队状态,延迟0.01还是0.001秒看排队长度
print('*'*5) #其中 这里的print和p.start是一起启动的,不管有多少个print,都同步启动 #假设如果我们直接调用func()函数,那么print('*'*5)则与函数此时是同步的,要等函数运行完才能执行print('*'*5)
#但是此时我们用子进程p.start告诉系统,要开启一个子进程去执行func,然后系统就会给func一个进程id,开始运行func.同时主程序会继续运行自己的代码print('*'*5)
# 此时程序变为了异步了,不影响p.start后面的代码运行,也就是说,p.start告诉系统开启子进程调用func后,马上运行
#print('*'*5),然后进程开启后,就绪状态进入了执行状态,再运行 func。
# 此时运行func和执行print('*'*5),变为了异步,不是同步了。这两段代码已经是在两条马路上跑的两辆车了 def f(name): #这里是子进程代码
print('hello', name)
print('我是子进程') if __name__ == '__main__': #以下全部都是主进程代码,包括import模块那
p = Process(target=f, args=('bob',)) #主进程
p.start() #开启了一个子进程,理解为把f放到新的py里执行,但是实际上他还有后续的主进程代码【print('执行主进程的内容了')】要执行
time.sleep(1) #主进程的代码
print('执行主进程的内容了') #主进程的代码
#搭配上面的例子食用更佳。如果上面的sleep是0.1秒,就会先执行上面的print2。如果是1秒,则肯定是先执行,print('执行主进程的内容了')
#此时主进程的代码,和子进程要启动的代码。就是异步执行
Process初识
#pid processid进程id ppid :parent process id import os,time
from multiprocessing import Process
# print('py中的进程',os.getpid())#执行这整段代码,这句话会运行两次第一次指向父进程,第二次指向子进程
def func(name):
print('参数是',name)
print('子进程',os.getpid())
print('子进程的父进程',os.getppid()) #就是下面这一段的pid
time.sleep(1)
# name = input('111111111') #EOFError: EOF when reading a line
print('子进程,调皮一下') if __name__ == '__main__':
p = Process(target=func,args=('gkx',)) #注意传参数时候 args=('gkx',) 逗号一定要加,参数可以是*args,不能是**kwargs
p.start()
print('父进程',os.getpid())
print('父进程的父进程',os.getppid()) #这个就是pycharm的进程id
进程的os.getpid()
# 进程的生命周期
# 主进程
# 子进程
# 开启了子进程的主进程 :
# 主进程自己的代码如果长,等待自己的代码执行结束,
# 子进程的执行时间长,主进程会在主进程代码执行完毕之后等待子进程执行完毕之后 主进程才结束 #···子进程不一定要依赖运行着的父进程
#至于父进程如果关闭了 但是子进程没运行完,
#比如 控制台 python XXX.py 运行了py文件,此时如果关闭了控制台,py文件是否会跟着关闭?
#在linux中 如果 python XXX.py & 后面跟上&则 py文件会一直在后台运行
#所以,关闭父进程,子进程是否关闭要看是怎么规定的
进程周期!!!
import os
from multiprocessing import Process class MyProcess(Process):
def __init__(self,arg1,arg2): #如果属性都有默认值,那么在实例化的时候可以不传属性,类型函数的传值方式
super().__init__()#父类的属性都有默认值所以不用列出
self.arg1 = arg1
self.arg2 = arg2 def run(self):
print(self.name)
print(self.pid)
print('arg1,arg2',self.arg1,self.arg2)
# print('run参数',name)
print(os.getpid()) if __name__ == '__main__':
print('主进程',os.getpid())
p1 = MyProcess('','')
# p1.run('gkx')
p1.start()
p2 = MyProcess('','')
p2.start() #~~~第二种方法的条件:
#1.自定义类,继承Process类
#2.定义个名为 run的方法,run方法中是在子进程中执行的代码
#3.子进程的参数,通过 __init__的方法初始化,并调用父类的__init__
开启多进程的第二种方法
#注意进程间是数据隔离的,所以才会有进程间的通信需求,在博客的后面会提到
from multiprocessing import Process
def func():
num = input('>>>') #EOFError: EOF when reading a line
print(num) if __name__ == '__main__':
Process(target=func).start() #子进程中不能有input
#因为在pycharm中开启多个进程,软件帮你优化后,显示在同一个控制台
#但是实际上它还是两个进程的,感受不到子进程的input,所以报错
#因此,子进程是不能input 的
#不同于聊天,QQ用的是UDP,是客户端和客户端之间的通信
子进程中不能有input
(1)Process模块的几个方法:
import time,os
from multiprocessing import Process def func(arg1,arg2):
print('*'*arg1)
# time.sleep(2) #有种效果:让时间片轮转一下
print('='*arg2) if __name__ == '__main__':
p = Process(target=func ,args=(10,20,))
p.start()
print('哈哈哈哈哈')
p.join() #感知一个子进程的结束,从异步变成了同步。有点像把子进程代码拼进主进程
print('====== 运行完了')
Process的几个方法
# 多进程代码
# from multiprocessing import Process ,一定要在 if __name__ == '__main__':下运行
# 方法
# 进程对象.start() 开启一个子进程
# 进程对象.join() 感知一个子进程的结束
# 进程对象.terminate() 结束一个子进程
# 进程对象.is_alive() 查看某个子进程是否还在运行
# 属性
# 进程对象.name 进程名
# 进程对象.pid 进程号
# 进程对象.daemon 值为True的时候,表示新的子进程是一个守护进程
# 守护进程 随着主进程代码的执行结束而结束
# 一定在start之前设置
(2)开启多个子进程
使用 空列表 [p.join() for p in p_lst] 保证多个子进程运行完后,才运行主进程
import time,os
from multiprocessing import Process def func(arg1,arg2):
print('*'*arg1)
print('!'*arg1)
# time.sleep(2) #由于是异步执行,这里sleep2秒,有点像几条马路上的一个红路灯,把所有子进程拦住,红绿灯一消失,所有进程就开始执行
#所以开启多个子进程,也只会等待2秒而已
print('='*arg2) if __name__ == '__main__':
p_lst = []
for i in range(10): #按顺序告诉系统要开启多个子进程,但是系统并不会按顺序执行!有些是执行态,有些会是就绪态
p = Process(target=func ,args=(10*i,20*i,))
p_lst.append(p)
p.start()
#p.join()如果在for循环里加了p.join则类似变为了同步
[p.join() for p in p_lst] #保证print('====== 运行完了')肯定在最后运行
# p.join() #由于系统执行顺序不确定,print('====== 运行完了') 会出现在不确定的地方
print('+++++++ 运行完了')
开启多个子进程
from multiprocessing import Process
import os def func(filename,content):
with open(filename,'w') as f:
f.write(content*10*'*') if __name__ == '__main__':
p_lst = []
for i in range(5):
p = Process(target=func,args=('info%s'%i,i)) #这里进程是同时启动的,但是有些进入执行态,有些进入就绪态
p_lst.append(p)
p.start()
# p.join() i=0等待文件写入,i=1 等待文件写入........
[p.join() for p in p_lst] #等待大家跑步结束,之前的所有进程必须在这里都执行完才能继续往 下执行
#这里, i=1,i=2,i=3,i=4,i=5是同时写入的,全部写入完,才打印文件名
print([i for i in os.walk(os.getcwd())]) #在使用for生成进程的时候,进程是几乎同时进行的,浪费的时间只是for i in range(5) 这极短的时间
#这个例子适合场景: 所有的子进程需要异步执行,但是需要所有子进程做完了的结果,返回给主进程
join深解
在用for循环的时候,肯定是按顺序的。但是for循环只是极短的时间,生成的子进程,此时有些进程进入就绪态,有些进入执行态,所以顺序不一
(3)守护进程:
#守护进程---> 主进程结束的时候,子进程也主动结束
import time
from multiprocessing import Process def func(): # 守护进程 会 随着 主进程的代码执行完毕 而 结束
while True:
time.sleep(0.2)
print('我还活着') def fun2():
print('in the func22222222222')
time.sleep(8)
print('func2 finished')
if __name__ == '__main__':
p = Process(target=func)
p.daemon=True #设置子进程为守护进程,一定要在 start方法前 设置好
p.start()
p.terminate()#强制结束一个子进程
time.sleep(0.0000000000000000000001)
print(p.is_alive()) #检验子进程是否或者。虽然上一个语句结束了,但是这里操作系统正在回收,但是此时响应时间非常短,所以会返回True。sleep一下就是false了
p2 = Process(target=fun2)
p2.start()
# p2.join()
i = 0
while i<5:
print('我是socket server.')
time.sleep(1)
i +=1 # 守护进程 会 随着 主进程的代码执行完毕 而 结束
# 在主进程内结束一个子进程 p.terminate()
# 结束一个进程不是在执行方法之后立即生效,需要一个操作系统响应的过程
# 检验一个进程是否活着的状态 p.is_alive()
# p.name p.pid 这个进程的名字和进程号
守护进程
(4)使用多进程实现socket服务端的并发
import socket
from multiprocessing import Process def server(conn):
conn.send('你好'.encode('utf-8'))
msg = conn.recv(1024).decode('utf-8')
print(msg)
conn.close() if __name__ == '__main__':
sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen() while True:
conn,addr = sk.accept()
p = Process(target=server,args=(conn,))
p.start() sk.close()
server端
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',8080)) msg = sk.recv(1024).decode('utf-8')
print(msg)
ret = input('>>>> ')
sk.send(ret.encode('utf8')) sk.close()
client端
二、进程的同步控制
当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。
(1)multiprocessing.Lock 同步锁/互斥锁。使用锁来保证数据安全:
#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理 #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
队列和管道都是将数据存放于内存中
队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
加锁的作用
#会造成数据不安全的地方 要加锁 lock.acquire lock.release
from multiprocessing import Process,Lock
import json,time def tick(): with open('ticket_num',) as f:
ret = json.load(f)
time.sleep(0.1)
print('余票: %s'%ret['ticket'])
def buy_tick(i,lock): #with lock
lock.acquire() #相当于拿钥匙才能进门。一旦有进程拿了钥匙,其他进程在这里拿不到,就会阻塞
with open('ticket_num',) as f: #阻塞到有人还钥匙为止
dic = json.load(f)
time.sleep(0.1)
if dic['ticket'] > 0:
dic['ticket'] -= 1
print('\033[32m%s买到票啦\033[0m'%i)
else:
print('\033[31m没有买到票%s\033[0m'%i)
time.sleep(0.1)
with open('ticket_num','w') as f:
json.dump(dic,f)
lock.release() #还钥匙 if __name__ == '__main__':
p_lst = []
for i in range(10):
p = Process(target=tick)
p_lst.append(p)
p.start() [p.join() for p in p_lst]
print('\033[34;42m开始买票了\033[0m')
lock = Lock() for i in range(10):
p2 = Process(target=buy_tick,args=(i,lock))
p2.start() # class MyTicket(Process):
# def __init__(self,i,lock):
# super().__init__()
# self.i = i
# self.lock = lock
#
# def run(self):
# self.lock.acquire() # 相当于拿钥匙才能进门。一旦有进程拿了钥匙,其他进程在这里拿不到,就会阻塞
# with open('ticket_num', ) as f: # 阻塞到有人还钥匙为止
# dic = json.load(f)
# time.sleep(0.1)
# if dic['ticket'] > 0:
# dic['ticket'] -= 1
# print('\033[32m%s买到票啦\033[0m' % self.i)
# else:
# print('\033[31m没有买到票%s\033[0m' % self.i)
# time.sleep(0.1)
# with open('ticket_num', 'w') as f:
# json.dump(dic, f)
# self.lock.release() # 还钥匙
#
# if __name__ == '__main__':
# lock = Lock()
# for i in 'abcdefghijk':
# p = MyTicket(i,lock)
# p.start()
抢票例子-Lock
当不加锁的时候,会出现多个人同时抢到1张票,这明显是不合理的,所以需要加锁
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
死锁
用递归锁解决:from multiprocessing import RLock (具体参见线程中的RLock)
(2)multiprocessing.Semaphore 信号量, 也就是加了计数器的锁,即同一时间只能让指定数目的程序执行任务。以下以移动KTV举例:
互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。
假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
实现:
信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
信号量概念和作用
#信号量 semaphore sem = Semaphore(int) ,也是锁的概念,只是内部加了个计数器
import time,random
from multiprocessing import Semaphore
from multiprocessing import Process # sem = Semaphore(2)
# sem.acquire()
# print('第一把钥匙')
# sem.acquire()
# print('第二把钥匙')
# sem.acquire()
# print('第三把钥匙') def ktv(i,sem):
sem.acquire()
print('%s走进去KTV了'%i)
time.sleep(random.randint(1,5))
print('%s走出来了'%i)
sem.release() if __name__ == '__main__':
sem = Semaphore(5)
for i in range(20):
p = Process(target=ktv,args=(i,sem))
p.start()
移动KTV例子-Semaphore
(3)multiprocessing.Event 事件:事件的阻塞与否是要看 event.wait()是True还是False
其中 wait(time) 可以传参数,wait(2)表示等待2秒后,就不阻塞了
from multiprocessing import Event
#代码要执行,要得到一个信号通知wait(event),同一时间只能由一个进程执行这段代码lock,同一时间只能有指定数量的进程执行这段代码semaphore
#一个信号可以使所有的进程都进入阻塞状态
#也可以控制所有的进程解除阻塞
#一个事件被创建之后,默认是阻塞状态
e = Event() #创建了一个事件对象
print('事件默认状态',e.is_set()) #查看一个事件的状态,默认设置为阻塞
e.set() #将Flag改为True,解除阻塞
print('set之后',e.is_set())
e.clear() #将Flag改为False,继续阻塞
print('clear之后',e.is_set())
e.wait() #是依据 e.is_set()的值来决定是否阻塞
#这里的wait是指,依据事件的状态来决定是否在wait处阻塞
print(123456) #is_set()为 False 那么阻塞,默认阻塞。
# set 将Flag改为True,解除阻塞
#clear 将Flag改为False,继续阻塞
Event细说
# 事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,
# 如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
#e.wait set clear
#wait要配套 set和clear使用。单单只有 set和clear 是不阻塞的
#set和clear是告诉 wait,要不要阻塞,真正的阻塞机制还是在 wait这
事件处理的机制
import time
import random
from multiprocessing import Event,Process
def cars(e,i):
if not e.is_set():
print('car%i在等待'%i)
e.wait() # 阻塞 直到得到一个 事件状态变成 True 的信号
print('\033[0;32;40mcar%i通过\033[0m' % i) def light(e):
while True:
if e.is_set(): #如果e.is_set()真才走下面这句,但是此时 e.is_set默认是False,所以先绿灯亮了
e.clear()
print('\033[31m红灯亮了\033[0m')
else: #默认false,所以先走这里
e.set() #设置 e.is_set() 为True ,此时有车子正在生成
print('\033[32m绿灯亮了\033[0m')
time.sleep(2) if __name__ == '__main__':
e = Event()
traffic = Process(target=light,args=(e,))
traffic.start()
for i in range(20):
car = Process(target=cars, args=(e,i))
car.start()
time.sleep(random.random()) #思路整理:
#当事件传入的时候,首先判断了e.is_set()是false,然后改变 e.is_set()的状态。此时car正在生成,等待事件为 0-1的随机小数
#大部分情况这个小数是足够大,让e.is_set()状态改变的,但是有时候,car随机的小数非常小,小到e.set还在就绪态,所以有很小的几率,会默认 e.is_set()是 false
#让车辆等待和同行显示正常的关键点,在 等待时间要明确
著名的红绿灯例子—Event
三.进程间的通信 — IPC (Inter-Process Communication)
多进程之间,如果不通过特殊的手段,不会共享数据。即数据隔离:
#多进程之间,如果不通过特殊的手段,不会共享数据。即数据隔离
import os
from multiprocessing import Process # n = 50 #因为在这个py里,这两行会被执行两次
# print(n)
def func():
global n
n = 0
print('子pid %s'%os.getpid(),n)
# n = 100
# func()
# print(n)
if __name__ == '__main__':
n = 100
p = Process(target=func)
p.start()
p.join()
print('父pid %s' % os.getpid(), n) #n 还是100 说明,子进程和主进程,全局不一样,即内存是隔离的
进程间的数据隔离
#以后工作中,一般不会直接用Queue,会使用更强大的几个模块:
kafka:大数据的 消息中间键 可保留数据
rebbitmq
memcache:不可保留
(1) mulitiprocessing.Queue 队列。
队列可以理解为:基于管道和锁实现的一个数据在线程/进程之间互相通信的容器
#最主要的四个方法 put get full(满了返回True) empty (为空返回True) 还有q.get_nowait() 具体见最下方
from multiprocessing import Queue,Process def produce(q):
q.put('hello')
print('放进去了') def consume(q):
ret = q.get()
print('收到了',ret) if __name__ == '__main__':
q = Queue() #不设置最大值 Queue(5)最大存储5
p = Process(target=produce,args=(q,))
p.start()
c = Process(target=consume,args=(q,))
c.start()
Queue-多进程
Queue([maxsize])
创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
Queue的实例q具有以下方法: q.get( [ block [ ,timeout ] ] )
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。 q.get_nowait( )
同q.get(False)方法。 q.put(item [, block [,timeout ] ] )
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。 q.qsize()
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。 q.empty()
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。 q.full()
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。 方法介绍
multiprocessing.Queue方法
'''
multiprocessing模块支持进程间通信的两种主要形式:管道和队列
都是基于消息传递实现的,但是队列接口
''' from multiprocessing import Queue
q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3) # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
# 如果队列中的数据一直不被取走,程序就会永远停在这里。
try:
q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
print('队列已经满了') # 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
print(q.full()) #满了 print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
try:
q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
print('队列已经空了') print(q.empty()) #空了 单看队列用法
但看Queue的使用
队列中最出名的模型:生产者消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。 为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。 什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
生产者和消费者模式-概念
#首先判断生产者是否生产完毕,不能用empty和get_nowait,因为有可能消费者消费得快,队列被取光了,此时队列为空,但是生产者随时可能再put进来
#其次,多个进程取队列里取值,比如此时put一个None让进程break,但是其中一个进程拿到了None退出了,另一个却继续阻塞
#所以需要put多个None
# 队列
# 生产者消费者模型 #首先判断生产者是否生产完毕,不能用empty和get_nowait,因为有可能消费者消费得快,队列被取光了,此时队列为空,但是生产者随时可能再put进来
#其次,多个进程取队列里取值,比如此时put一个None让进程break,但是其中一个进程拿到了None退出了,另一个却继续阻塞
#所以需要put多个None
import time
import random
from multiprocessing import Process,Queue def producer(name,food,q):
for i in range(10):
time.sleep(random.randint(1, 3))
f = '%s生产了%s%s'%(name,food,i)
q.put(f)
print(f) def consumer(name,q):
while True:
food = q.get()
if food == None:
print('没了~~~~~~')
break
print('\033[31m%s消费了%s\033[0m'%(name,food))
time.sleep(random.randint(1,3)) if __name__ == '__main__':
q = Queue(20)
p1 = Process(target=producer,args=('egg','包子',q))
p2 = Process(target=producer,args=('wusir','Cake',q))
p1.start()
p2.start()
c1 = Process(target=consumer,args=('alex',q))
c2 = Process(target=consumer,args=('jin',q))
c1.start()
c2.start()
p1.join()
p2.join()
q.put(None)
q.put(None) # def consumer(q,name):
# while True:
# food = q.get()
# if food is None:
# print('%s获取到了一个空'%name)
# break
# print('\033[31m%s消费了%s\033[0m' % (name,food))
# time.sleep(random.randint(1,3))
#
# def producer(name,food,q):
# for i in range(4):
# time.sleep(random.randint(1,3))
# f = '%s生产了%s%s'%(name,food,i)
# print(f)
# q.put(f)
#
# if __name__ == '__main__':
# q = Queue(20)
# p1 = Process(target=producer,args=('Egon','包子',q))
# p2 = Process(target=producer, args=('wusir','泔水', q))
# c1 = Process(target=consumer, args=(q,'alex'))
# c2 = Process(target=consumer, args=(q,'jinboss'))
# p1.start()
# p2.start()
# c1.start()
# c2.start()
# p1.join()
# p2.join()
# q.put(None)
# q.put(None)
生产者消费者模型-put None
使用multiprocessing.JoinableQueue的生产者消费者模型详解:
JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法: q.task_done()
使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。 q.join()
生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。
下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。
方法介绍
import time
import random
from multiprocessing import Process,JoinableQueue
def consumer(q,name):
while True:
food = q.get()
print('\033[31m%s消费了%s\033[0m' % (name,food))
time.sleep(random.randint(1,3))
q.task_done() # count - 1 每处理一个数据,就获得一个task_done记录,最后获取一个join信号,进行判断,是否全部都已经task_done
#可以理解为,join信号发送过来说我生产了20个东西,task_done对比以下记录,如果也有20个记录则表示处理完毕 def producer(name,food,q):
for i in range(4):
time.sleep(random.randint(1,3))
f = '%s生产了%s%s'%(name,food,i)
print(f)
q.put(f)
q.join() # 阻塞 直到一个队列中的所有数据 全部被处理完毕。原本只要生产完就行,现在要等到包子都吃完才行
#当全部生产完毕,会发送一个join信号
#队列生产过程被拉长了,从生产完就结束,变为了要等 q.task_done() 完才能结束
#也就是说,执行q.task_done() 都会记录下来,直到队列中所有的数据,都有task_done记录 if __name__ == '__main__':
q = JoinableQueue(20)
p1 = Process(target=producer,args=('Egon','包子',q))
p2 = Process(target=producer, args=('wusir','泔水', q))
c1 = Process(target=consumer, args=(q,'alex'))
c2 = Process(target=consumer, args=(q,'jinboss'))
p1.start()
p2.start()
c1.daemon = True # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join() # 感知一个子进程的结束
#首先通过 q.join感知consumer的结束,然后通过 p1.join感知 producer的结束。再然后通过结束主进程,来结束consumer的子进程(设置了守护进程) # 在消费者这一端:
# 每次获取一个数据
# 处理一个数据
# 发送一个记号 : 标志一个数据被处理成功 # 在生产者这一端:
# 每一次生产一个数据,
# 且每一次生产的数据都放在队列中
# 在队列中刻上一个记号
# 当生产者全部生产完毕之后,
# join信号 : 已经停止生产数据了
# 且要等待之前被刻上的记号都被消费完
# 当数据都被处理完时,join阻塞结束 # consumer 中把所有的任务消耗完
# producer 端 的 join感知到,停止阻塞
# 所有的producer进程结束
# 主进程中的p.join结束
# 主进程中代码结束
# 守护进程(消费者的进程)结束
JoinableQueue
(2) mulitiprocessing.Pipe 管道。
Pipe:进程数据的不安全性
from multiprocessing import Process,Pipe def func(conn1,conn2):
conn2.close()
while True:
# msg = conn1.recv()
# print(msg)
try:
msg = conn1.recv()
print(msg)
except EOFError: #只有当主进程2个口都关闭,子进程关闭了其中一个口,且管道内没数据了,才会报错EOFError
conn1.close() #因为程序要判定,任何一个口都不会给我发消息了
break
管道—EOFError
#队列 管道+锁,所以以后会比较多使用队列
#管道 更底层的 #IPC(Inter-Process Communication)进程间通信
#Pipe:进程数据的不安全性,在只有一个生产者和只有一个消费者时,不会有问题
#但是存在这种问题:同时有几个进程往管道要数据,有一条数据被准备取走了,但是另一个进程,此时又来取,就会出现问题。
#多个消费者取同一个数据
#所以要通过枷锁解决
from multiprocessing import Process,Pipe,Lock
import time
import random def producer(con,pro,name,food):
con.close()
for i in range(6):
f = '%s生产了%s%s'%(name,food,i)
print(f)
pro.send(f)
time.sleep(0.5)
pro.send(None)
pro.send(None)
pro.close() def consumer(con,pro,name,lock):
pro.close()
while True:
lock.acquire()
msg = con.recv()
lock.release()
time.sleep(0.5)
if msg:
print('\033[32m%s消费了%s\033[0m' % (name, msg))
else:
con.close()
break
# try:
# lock.acquire()
# msg = con.recv()
# lock.release()
# print('\033[32m%s消费了%s\033[0m'%(name,msg))
# time.sleep(0.5)
# except EOFError: #子进程,主进程的通道关闭,且管道里没值了,就会引发EOFError
# con.close
# print('meile')
# lock.release()
# break if __name__ == '__main__':
con,pro = Pipe() #
lock = Lock()
p1 = Process(target=producer,args=(con,pro,'egg','baozi',))
c1 = Process(target=consumer,args=(con,pro,'aaaa',lock))
c2 = Process(target=consumer,args=(con,pro,'bbbb',lock))
p1.start()
c1.start()
c2.start()
con.close() #注意要引发 EFOError ,主进程得con和pro也要关闭
pro.close()
Pipe—生产者消费者模型
四.进程间的数据共享
#Manager
# 进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
# 虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此
# A manager returned by Manager() will support types
# list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. #但是manager的数据共享是不安全性的,同管道,可能两个进程取了同一个数据
#加锁解决 from multiprocessing import Manager,Process,Lock
def work(d,lock):
with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
d['count']-=1 if __name__ == '__main__':
lock=Lock()
with Manager() as m:
dic=m.dict({'count':100})
p_l=[]
for i in range(100):
p=Process(target=work,args=(dic,lock))
p_l.append(p)
p.start()
for p in p_l:
p.join()
print(dic)
Manager
数据共享请参考此博客链接