如何优雅的通知 multiprocessing.Pool 中的进程退出? - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
Monad
V2EX    Python

如何优雅的通知 multiprocessing.Pool 中的进程退出?

  •  
  •   Monad 2018 年 4 月 15 日 7769 次点击
    这是一个创建于 2836 天前的主题,其中的信息可能已经有所发展或是发生改变。

    目的是想在主进程中随时终止子进程的执行 目前的代码长这样 有个问题是如果我 Ctrl-C 的话 所有的子进程虽然会正常退出 但是主进程会一直挂起在 pool.join()上 求解决方案~

    #!/usr/bin/env python2 from __future__ import print_function import time import signal import logging from multiprocessing import Manager from multiprocessing.pool import Pool from multiprocessing.queues import Queue def Fn(n, q, ns): if ns.done: return try: q.put(n) finally: return def main(): handler = signal.signal(signal.SIGINT, signal.SIG_IGN) pool = Pool(processes=5) signal.signal(signal.SIGINT, handler) total = 10000 manager = Manager() q = manager.Queue() ns = manager.Namespace() ns.dOne= False for n in range(total): pool.apply_async(Fn, args=(n, q, ns)) pool.close() try: received = 0 while received != total: n = q.get(60) print('get {} from queue'.format(n)) received += 1 except KeyboardInterrupt: ns.dOne= True pass finally: pool.join() if __name__ == '__main__': main() 
    15 条回复    2018-09-12 15:17:33 +08:00
    lolizeppelin
        1
    lolizeppelin  
       2018 年 4 月 15 日
    openstack 里缩水的代码, 稍微简化了下
    https://github.com/lolizeppelin/simpleservice/blob/master/simpleservice/base.py

    看懂了就知道怎么处理了
    lolizeppelin
        2
    lolizeppelin  
       2018 年 4 月 15 日
    def _pipe_watcher(self):
    # This will block until the write end is closed when the parent
    # dies unexpectedly
    self.readpipe.read(1)

    LOG.info('Parent process has died unexpectedly, exiting')

    if self.launcher:
    self.launcher.stop()
    sys.exit(1)
    Monad
        3
    Monad  
    OP
       2018 年 4 月 15 日
    @lolizeppelin #2 这段代码在主进程创建 pipe, 然后设置进程退出时的 close fd, 通过这种方式来通知子进程 read 返回吧
    但是我现在是 hang 在了主进程的 join 呢 而且目前来看 子进程(应该)是都正常退出了 主进程没有返回
    lolizeppelin
        4
    lolizeppelin  
       2018 年 4 月 15 日 via Android
    pipe 是主进程退出 子进程也收到能退出 是个退出保险

    主进程里确认子进程退出用 waitpid
    lolizeppelin
        5
    lolizeppelin  
       2018 年 4 月 15 日
    啥叫“应该” 子进程是否结束是可以看到

    multiprocessing 我记得默认是用 socket 来父子进程通信的
    join 里应该是取了 socket 数据 并 wait 子进程结束

    好好看看处理信号的部分就知道怎么让子进程 exit 了

    当然如果你代码是 win 上的当我上面的的都没说
    Monad
        6
    Monad  
    OP
       2018 年 4 月 15 日
    @lolizeppelin #5 我的意思是 ##应该正常##退出了 子进程是肯定退出了 ps 看过 至于是不是异常退出的 并没有确定
    如果不用 Pool 手动创建管理的话 waitpid 应该是 OK 的
    但是用 Pool 的话 惯用法应该不会去 os.getpid 然后手动 waitpid 吧
    lolizeppelin
        7
    lolizeppelin  
       2018 年 4 月 15 日 via Android
    你代码有问题 子进程是不是正常退出的都不知道 直接 try 包一层都好啊

    子进程有信号处理没
    主进程收到 ctrl c 信号以后 给所有子进程发终止信号不就行了
    lolizeppelin
        8
    lolizeppelin  
       2018 年 4 月 15 日 via Android
    打日志 好歹你要知道子进程怎么退出的
    Monad
        9
    Monad  
    OP
       2018 年 4 月 15 日
    @lolizeppelin #7 不是子进程正不正常退出的问题 这只是一个描述问题 不用纠结 我用 try 能保证它退出 并且我上面贴的代码就是这样了

    现在的问题是 子进程全部退出了 父进程仍然在 pool.join()没有返回 这个问题
    Monad
        10
    Monad  
    OP
       2018 年 4 月 15 日
    @lolizeppelin #8 按照我的理解 如果用 C 的做法实现 pool.join 子进程无论怎么正常 /异常退出 父进程都应该能够通过 waitpid 感知到子进程退出 所有退出之后 join 就可以返回了 所以 python 现在这个现象让我很不理解
    lolizeppelin
        11
    lolizeppelin  
       2018 年 4 月 15 日
    看了下, 和父子进程一点关系都没.....

    自进程
    ns.done 没有捕获异常只是小问题

    主要在这 3 有问题
    manager = Manager()
    q = manager.Queue()
    ns = manager.Namespace()

    要解决得慢慢折腾里面代码 我随便弄了下不想弄了, 折腾 multiprocessing 不如自己写多进程代码还好控一点
    lolizeppelin
        12
    lolizeppelin  
       2018 年 4 月 16 日
    @Monad
    大致搞定了 给你代码弄蒙了

    一开始叫你看信号是没错的,你信号用错了.................

    except KeyboardInterrupt 这是不对的,你注册正确的拦截信号以后,是不会收到这个错误的

    信号要处理 2 次,一次是在 fork 前,就是 multiprocessing 创建任务之前,拦截 SIGINT,拦截执行内容
    def empty(signo, frame):
    print 'do nothing!!!'
    这里的目的是让 multiprocessing 里的代码不会因为收到 SIGINT 抛出异常

    第二次处理时在 fork 后,拦截内容
    def stop(signo, frame):
    print 'stoped'
    ns.dOne= True

    这里拦截到信号以后设置 ns.done
    lolizeppelin
        13
    lolizeppelin  
       2018 年 4 月 16 日
    顺便...这个 pool 用的有点问题 Fn 返回后还会生成新的 Fn 塞进去....具体你看看怎么停掉 pool 我就不看了
    fool079
        14
    fool079  
       2018 年 4 月 16 日
    我印象中 setDaemon=True
    也就是设为守护进程就可以随着父进程的关闭而关闭了。。
    itfanr
        15
    itfanr  
       2018 年 9 月 12 日
    参考 Samba 和 ctdb 代码吧
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     3536 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 29ms UTC 00:52 PVG 08:52 LAX 16:52 JFK 19:52
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86