Python 多进程原理及实现
1 进程的基本概念
什么是进程?
"htmlcode">
# -*- coding: utf-8 -*- import os import time print("在创建子进程前: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) pid = os.fork() if pid == 0: print("子进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) time.sleep(5) else: print("父进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) # pid表示回收的子进程的pid #pid, result = os.wait() # 回收子进程资源 阻塞 time.sleep(5) #print("父进程:回收的子进程pid=%d" % pid) #print("父进程:子进程退出时 result=%d" % result) # 下面的内容会被打印两次,一次是在父进程中,一次是在子进程中。 # 父进程中拿到的返回值是创建的子进程的pid,大于0 print("fork创建完后: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
2.1 父子进程如何区分"htmlcode">
import os
pid = os.fork() # 创建一个子进程
os.wait() # 等待子进程结束释放资源
pid为0的代表子进程。
import os pid = os.fork() # 创建一个子进程 os.wait() # 等待子进程结束释放资源 pid为0的代表子进程。
缺点:
"htmlcode">
# -*- coding: utf-8 -*- import os from multiprocessing import Process import time def fun(name): print("2 子进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) print("hello " + name) def test(): print('ssss') if __name__ == "__main__": print("1 主进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) ps = Process(target=fun, args=('jingsanpang', )) print("111 ##### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident)) print("3 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) print(ps.is_alive()) # 启动之前 is_alive为False(系统未创建) ps.start() print(ps.is_alive()) # 启动之后,is_alive为True(系统已创建) print("222 #### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident)) print("4 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) ps.join() # 等待子进程完成任务 类似于os.wait() print(ps.is_alive()) print("5 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) ps.terminate() #终断进程 print("6 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
特点:
"htmlcode">
import multiprocessing import time def work(msg): mult_proces_name = multiprocessing.current_process().name print('process: ' + mult_proces_name + '-' + msg) if __name__ == "__main__": pool = multiprocessing.Pool(processes=5) # 创建5个进程 for i in range(20): msg = "process %d" %(i) pool.apply_async(work, (msg, )) pool.close() # 关闭进程池,表示不能在往进程池中添加进程 pool.join() # 等待进程池中的所有进程执行完毕,必须在close()之后调用 print("Sub-process all done.")
上述代码中的pool.apply_async()是apply()函数的变体,apply_async()是apply()的并行版本,apply()是apply_async()的阻塞版本,使用apply()主进程会被阻塞直到函数执行结束,所以说是阻塞版本。apply()既是Pool的方法,也是Python内置的函数,两者等价。可以看到输出结果并不是按照代码for循环中的顺序输出的。
多个子进程并返回值
apply_async()本身就可以返回被进程调用的函数的返回值。上一个创建多个子进程的代码中,如果在函数func中返回一个值,那么pool.apply_async(func, (msg, ))的结果就是返回pool中所有进程的值的对象(注意是对象,不是值本身)。
import multiprocessing import time def func(msg): return multiprocessing.current_process().name + '-' + msg if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) # 创建4个进程 results = [] for i in range(20): msg = "process %d" %(i) results.append(pool.apply_async(func, (msg, ))) pool.close() # 关闭进程池,表示不能再往进程池中添加进程,需要在join之前调用 pool.join() # 等待进程池中的所有进程执行完毕 print ("Sub-process(es) done.") for res in results: print (res.get())
与之前的输出不同,这次的输出是有序的。
"htmlcode">
import multiprocessing def foo(conn): conn.send('hello father') #向管道pipe发消息 print(conn.recv()) if __name__ == '__main__': conn1,conn2=multiprocessing.Pipe(True) #开辟两个口,都是能进能出,括号中如果False即单向通信 p=multiprocessing.Process(target=foo,args=(conn1,)) #子进程使用sock口,调用foo函数 p.start() print(conn2.recv()) #主进程使用conn口接收,从管道(Pipe)中读取消息 conn2.send('hi son') #主进程使用conn口发送
(2)消息队列Queue
Queue是多进程的安全队列,可以使用Queue实现多进程之间的数据传递。
Queue的一些常用方法:
- Queue.qsize():返回当前队列包含的消息数量;
- Queue.empty():如果队列为空,返回True,反之False ;
- Queue.full():如果队列满了,返回True,反之False;
- Queue.get():获取队列中的一条消息,然后将其从列队中移除,可传参超时时长。
- Queue.get_nowait():相当Queue.get(False),取不到值时触发异常:Empty;
- Queue.put():将一个值添加进数列,可传参超时时长。
- Queue.put_nowait():相当于Queue.get(False),当队列满了时报错:Full。
案例:
from multiprocessing import Process, Queue import time def write(q): for i in ['A', 'B', 'C', 'D', 'E']: print('Put %s to queue' % i) q.put(i) time.sleep(0.5) def read(q): while True: v = q.get(True) print('get %s from queue' % v) if __name__ == '__main__': q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) print('write process = ', pw) print('read process = ', pr) pw.start() pr.start() pw.join() pr.join() pr.terminate() pw.terminate()
Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。
注:进程间通信应该尽量避免使用共享数据的方式
5 多进程实现生产者消费者
以下通过多进程实现生产者,消费者模式
import multiprocessing from multiprocessing import Process from time import sleep import time class MultiProcessProducer(multiprocessing.Process): def __init__(self, num, queue): """Constructor""" multiprocessing.Process.__init__(self) self.num = num self.queue = queue def run(self): t1 = time.time() print('producer start ' + str(self.num)) for i in range(1000): self.queue.put((i, self.num)) # print 'producer put', i, self.num t2 = time.time() print('producer exit ' + str(self.num)) use_time = str(t2 - t1) print('producer ' + str(self.num) + ', use_time: '+ use_time) class MultiProcessConsumer(multiprocessing.Process): def __init__(self, num, queue): """Constructor""" multiprocessing.Process.__init__(self) self.num = num self.queue = queue def run(self): t1 = time.time() print('consumer start ' + str(self.num)) while True: d = self.queue.get() if d != None: # print 'consumer get', d, self.num continue else: break t2 = time.time() print('consumer exit ' + str(self.num)) print('consumer ' + str(self.num) + ', use time:' + str(t2 - t1)) def main(): # create queue queue = multiprocessing.Queue() # create processes producer = [] for i in range(5): producer.append(MultiProcessProducer(i, queue)) consumer = [] for i in range(5): consumer.append(MultiProcessConsumer(i, queue)) # start processes for i in range(len(producer)): producer[i].start() for i in range(len(consumer)): consumer[i].start() # wait for processs to exit for i in range(len(producer)): producer[i].join() for i in range(len(consumer)): queue.put(None) for i in range(len(consumer)): consumer[i].join() print('all done finish') if __name__ == "__main__": main()
6 总结
python中的多进程创建有以下两种方式:
(1)fork子进程
(2)采用 multiprocessing 这个库创建子进程
需要注意的是队列中queue.Queue是线程安全的,但并不是进程安全,所以多进程一般使用线程、进程安全的multiprocessing.Queue()
另外, 进程池使用 multiprocessing.Pool实现,pool = multiprocessing.Pool(processes = 3),产生一个进程池,pool.apply_async实现非租塞模式,pool.apply实现阻塞模式。
apply_async和 apply函数,前者是非阻塞的,后者是阻塞。可以看出运行时间相差的倍数正是进程池数量。
同时可以通过result.append(pool.apply_async(func, (msg, )))获取非租塞式调用结果信息的。
以上就是Python 多进程原理及实现的详细内容,更多关于python 多进程的资料请关注其它相关文章!
下一篇:Python lxml库的简单介绍及基本使用讲解