深入 Python 进程间通信原理与实战图文版 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
请不要在回答技术问题时复制粘贴 AI 生成的内容
shellquery
V2EX    程序员

深入 Python 进程间通信原理与实战图文版

 
  •   shellquery 2018 年 5 月 29 日 1809 次点击
    这是一个创建于 2852 天前的主题,其中的信息可能已经有所发展或是发生改变。

    继上节使用原生多进程并行运行,基于 Redis 作为消息队列完成了圆周率的计算 t/458357 本节我们使用原生操作系统消息队列来替换 Redis。

    文件

    使用文件进行通信是最简单的一种通信方式,子进程将结果输出到临时文件,父进程从文件中读出来。文件名使用子进程的进程 id 来命名。进程随时都可以通过os.getpid()来获取自己的进程 id。

    # coding: utf-8 import os import sys import math def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): pids = [] unit = n / 10 for i in range(10): # 分 10 个子进程 mink = unit * i maxk = mink + unit pid = os.fork() if pid > 0: pids.append(pid) else: s = slice(mink, maxk) # 子进程开始计算 with open("%d" % os.getpid(), "w") as f: f.write(str(s)) sys.exit(0) # 子进程结束 sums = [] for pid in pids: os.waitpid(pid, 0) # 等待子进程结束 with open("%d" % pid, "r") as f: sums.append(float(f.read())) os.remove("%d" % pid) # 删除通信的文件 return math.sqrt(sum(sums) * 8) print pi(10000000) 

    输出

    3.14159262176 

    管道 pipe

    管道是 Unix 进程间通信最常用的方法之一,它通过在父子进程之间开通读写通道来进行双工交流。我们通过 os.read()和 os.write()来对文件描述符进行读写操作,使用 os.close()关闭描述符。

    上图为单进程的管道

    上图为父子进程分离后的管道

    # coding: utf-8 import os import sys import math def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): childs = {} unit = n / 10 for i in range(10): # 分 10 个子进程 mink = unit * i maxk = mink + unit r, w = os.pipe() pid = os.fork() if pid > 0: childs[pid] = r # 将子进程的 pid 和读描述符存起来 os.close(w) # 父进程关闭写描述符,只读 else: os.close(r) # 子进程关闭读描述符,只写 s = slice(mink, maxk) # 子进程开始计算 os.write(w, str(s)) os.close(w) # 写完了,关闭写描述符 sys.exit(0) # 子进程结束 sums = [] for pid, r in childs.items(): sums.append(float(os.read(r, 1024))) os.close(r) # 读完了,关闭读描述符 os.waitpid(pid, 0) # 等待子进程结束 return math.sqrt(sum(sums) * 8) print pi(10000000) 

    输出

    3.14159262176 

    Unix 域套接字

    当同一个机器的多个进程使用普通套接字进行通信时,需要经过网络协议栈,这非常浪费,因为同一个机器根本没有必要走网络。所以 Unix 提供了一个套接字的特殊版本,它使用和套接字一摸一样的 api,但是地址不再是网络端口,而是文件。相当于我们通过某个特殊文件来进行套接字通信。

    # coding: utf-8 import os import sys import math import socket def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): server_address = "/tmp/pi_sock" # 套接字对应的文件名 childs = [] unit = n / 10 servsock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) servsock.bind(server_address) servsock.listen(10) # 监听子进程连接请求 for i in range(10): # 分 10 个子进程 mink = unit * i maxk = mink + unit pid = os.fork() if pid > 0: childs.append(pid) else: servsock.close() # 子进程要关闭 servsock 引用 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(server_address) # 连接父进程套接字 s = slice(mink, maxk) # 子进程开始计算 sock.sendall(str(s)) sock.close() # 关闭连接 sys.exit(0) # 子进程结束 sums = [] for pid in childs: conn, _ = servsock.accept() # 接收子进程连接 sums.append(float(conn.recv(1024))) conn.close() # 关闭连接 for pid in childs: os.waitpid(pid, 0) # 等待子进程结束 servsock.close() # 关闭套接字 os.unlink(server_address) # 移除套接字文件 return math.sqrt(sum(sums) * 8) print pi(10000000) 

    无名套接字 socketpair

    我们知道跨网络通信免不了要通过套接字进行通信,但是本例的多进程是在同一个机器上,用不着跨网络,使用普通套接字进行通信有点浪费。

    上图为单进程的 socketpair

    上图为父子进程分离后的 socketpair

    为了解决这个问题,Unix 系统提供了无名套接字 socketpair,不需要端口也可以创建套接字,父子进程通过 socketpair 来进行全双工通信。

    socketpair 返回两个套接字对象,一个用于读一个用于写,它有点类似于 pipe,只不过 pipe 返回的是两个文件描述符,都是整数。所以写起代码形式上跟 pipe 几乎没有什么区别。

    我们使用 sock.send()和 sock.recv()来对套接字进行读写,通过 sock.close()来关闭套接字对象。

    # coding: utf-8 import os import sys import math import socket def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): childs = {} unit = n / 10 for i in range(10): # 分 10 个子进程 mink = unit * i maxk = mink + unit rsock, wsock = socket.socketpair() pid = os.fork() if pid > 0: childs[pid] = rsock wsock.close() else: rsock.close() s = slice(mink, maxk) # 子进程开始计算 wsock.send(str(s)) wsock.close() sys.exit(0) # 子进程结束 sums = [] for pid, rsock in childs.items(): sums.append(float(rsock.recv(1024))) rsock.close() os.waitpid(pid, 0) # 等待子进程结束 return math.sqrt(sum(sums) * 8) print pi(10000000) 

    输出

    3.14159262176 

    OS 消息队列

    操作系统也提供了跨进程的消息队列对象可以让我们直接使用,只不过 python 没有默认提供包装好的 api 来直接使用。我们必须使用第三方扩展来完成 OS 消息队列通信。第三方扩展是通过使用 Python 包装的 C 实现来完成的。

    OS 消息队列有两种形式,一种是 posix 消息队列,另一种是 systemv 消息队列,有些操作系统两者都支持,有些只支持其中的一个,比如 macos 仅支持 systemv 消息队列,我本地的 python 的 docker 镜像是 debian linux,它仅支持 posix 消息队列。

    posix 消息队列 我们先使用 posix 消息队列来完成圆周率的计算,posix 消息队列需要提供一个唯一的名称,它必须是/开头。close()方法仅仅是减少内核消息队列对象的引用,而不是彻底关闭它。unlink()方法才能彻底销毁它。O_CREAT 选项表示如果不存在就创建。向队列里塞消息使用 send 方法,收取消息使用 receive 方法,receive 方法返回一个 tuple,tuple 的第一个值是消息的内容,第二个值是消息的优先级。之所以有优先级,是因为 posix 消息队列支持消息的排序,在 send 方法的第二个参数可以提供优先级整数值,默认为 0,越大优先级越高。

    # coding: utf-8 import os import sys import math from posix_ipc import MessageQueue as Queue def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): pids = [] unit = n / 10 q = Queue("/pi", flags=os.O_CREAT) for i in range(10): # 分 10 个子进程 mink = unit * i maxk = mink + unit pid = os.fork() if pid > 0: pids.append(pid) else: s = slice(mink, maxk) # 子进程开始计算 q.send(str(s)) q.close() sys.exit(0) # 子进程结束 sums = [] for pid in pids: sums.append(float(q.receive()[0])) os.waitpid(pid, 0) # 等待子进程结束 q.close() q.unlink() # 彻底销毁队列 return math.sqrt(sum(sums) * 8) print pi(10000000) 

    输出

    3.14159262176 

    systemv 消息队列 systemv 消息队列和 posix 消息队列用起来有所不同。systemv 的消息队列是以整数 key 作为名称,如果不指定,它就创建一个唯一的未占用的整数 key。它还提供消息类型的整数参数,但是不支持消息优先级。

    # coding: utf-8 import os import sys import math import sysv_ipc from sysv_ipc import MessageQueue as Queue def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): pids = [] unit = n / 10 q = Queue(key=None, flags=sysv_ipc.IPC_CREX) for i in range(10): # 分 10 个子进程 mink = unit * i maxk = mink + unit pid = os.fork() if pid > 0: pids.append(pid) else: s = slice(mink, maxk) # 子进程开始计算 q.send(str(s)) sys.exit(0) # 子进程结束 sums = [] for pid in pids: sums.append(float(q.receive()[0])) os.waitpid(pid, 0) # 等待子进程结束 q.remove() # 销毁消息队列 return math.sqrt(sum(sums) * 8) print pi(10000000) 

    输出

    3.14159262176 

    共享内存

    共享内存也是非常常见的多进程通信方式,操作系统负责将同一份物理地址的内存映射到多个进程的不同的虚拟地址空间中。进而每个进程都可以操作这份内存。考虑到物理内存的唯一性,它属于临界区资源,需要在进程访问时搞好并发控制,比如使用信号量。我们通过一个信号量来控制所有子进程的顺序读写共享内存。我们分配一个 8 字节 double 类型的共享内存用来存储极限的和,每次从共享内存中读出来时,要使用 struct 进行反序列化(unpack),将新的值写进去之前也要使用 struct 进行序列化(pack)。每次读写操作都需要将读写指针移动到内存开头位置(lseek)。

    # coding: utf-8 import os import sys import math import struct import posix_ipc from posix_ipc import Semaphore from posix_ipc import SharedMemory as Memory def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): pids = [] unit = n / 10 sem_lock = Semaphore("/pi_sem_lock", flags=posix_ipc.O_CREX, initial_value=1) # 使用一个信号量控制多个进程互斥访问共享内存 memory = Memory("/pi_rw", size=8, flags=posix_ipc.O_CREX) os.lseek(memory.fd, 0, os.SEEK_SET) # 初始化和为 0.0 的 double 值 os.write(memory.fd, struct.pack('d', 0.0)) for i in range(10): # 分 10 个子进程 mink = unit * i maxk = mink + unit pid = os.fork() if pid > 0: pids.append(pid) else: s = slice(mink, maxk) # 子进程开始计算 sem_lock.acquire() try: os.lseek(memory.fd, 0, os.SEEK_SET) bs = os.read(memory.fd, 8) # 从共享内存读出来当前值 cur_val, = struct.unpack('d', bs) # 反序列化,逗号不能少 cur_val += s # 加上当前进程的计算结果 bs = struct.pack('d', cur_val) # 序列化 os.lseek(memory.fd, 0, os.SEEK_SET) os.write(memory.fd, bs) # 写进共享内存 memory.close_fd() finally: sem_lock.release() sys.exit(0) # 子进程结束 sums = [] for pid in pids: os.waitpid(pid, 0) # 等待子进程结束 os.lseek(memory.fd, 0, os.SEEK_SET) bs = os.read(memory.fd, 8) # 读出最终这结果 sums, = struct.unpack('d', bs) # 反序列化 memory.close_fd() # 关闭共享内存 memory.unlink() # 销毁共享内存 sem_lock.unlink() # 销毁信号量 return math.sqrt(sums * 8) print pi(10000000) 

    输出

    3.14159262176 

    阅读更多 Python 高级文章,请关注公众号「码洞」

    目前尚无回复
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2859 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 29ms UTC 14:11 PVG 22:11 LAX 07:11 JFK 10:11
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86