在Popen.stdin上轮询/等待HUP

如何解决在Popen.stdin上轮询/等待HUP

我正在用Python编写Zenity的基于GNU dd(1)的状态监视器。 由于我要定位的系统的限制,包装程序必须在Python 2上运行,并且不能拉入外部库。

在Zenity的“取消”按钮中,如果尚未完成,则必须终止dd。

我必须立即执行以下操作(即立即触发/驱动/立即执行);如果同时满足/触发了以下多个条件,则应按列出的顺序执行它们:

  • Zenity退出时,终止dd
  • 当dd写入其stderr时,请munge +将数据转发给Zenity的stdin
  • 当dd退出时,如果其返回码不为0,则终止Zenity

但是,epoll对象似乎仅在dd的输出上触发;尽管我在Zenity的EPOLLHUP上注册了stdin,但它从未触发Zenity退出。

应该/如何做?我知道epoll是可用于正确触发dd输出(通过EPOLLIN)的唯一原语;我还了解到,这是一个很古怪的原语,可能不适合触发Zenity的退出。 (如果需要,我可以在该文件中实现更多的逻辑;这样做比引入任何第三方库都更可取,无论它多么小或“普通”。我重申,我了解epoll难以使用,可能需要大量的胶合逻辑。)

或者:如果epoll不是监视subprocess退出的正确原语,那么在Python 2兼容的情况下,监视子进程退出的同时监视子进程的输出的正确方法是什么?方式

(我并不是天生就需要多线程功能;按顺序执行所有操作将完全在规范范围内;不过,如果在这种情况下绝对需要多线程编程,以避免繁琐的操作,循环,就这样吧。)


下面是到目前为止我的完整代码。

#!/usr/bin/env python
from __future__ import division
import sys,os,stat,fcntl,select,subprocess,re

def main(args=sys.argv[1:]):
    fname = parseifname(args)
    n = sizeof(fname)

    dcmd = ['dd'] + args + ['status=progress']
    zcmd = ['zenity','--progress','--time-remaining']

    #Launch dd
    dd = subprocess.Popen(dcmd,stderr=subprocess.PIPE)
    set_nonblocking(dd.stderr)

    #Launch Zenity
    zenity = subprocess.Popen(zcmd,stdin=subprocess.PIPE)
    set_direct(zenity.stdin)#TODO: why doesn't this line work?*

    #set title/status
    zenity.stdin.write(('#%s\n' % ' '.join(dcmd)).encode())
    zenity.stdin.flush()#*i.e. instances of this line shouldn't be necessary...

    #We want to trigger on all of the following:
    toPoll = [
        (dd.stderr,select.EPOLLIN       #dd status update
                  | select.EPOLLHUP),#dd exit
        (zenity.stdin,select.EPOLLHUP),#Zenity exit
    ]

    calcPercent = genCalcPercent(n)

    with ePoll(toPoll) as E:
        rBytes = re.compile(r'\r(\d+) bytes'.encode())
        while dd.poll() is None:
            evs = E.poll()#TODO: I'm not sure if this is blocking,or if I've induced a busy loop...
            for fn,ev in evs:
                if fn == dd.stderr.fileno():
                    if (ev & select.EPOLLIN):
                        #dd sent some output
                        line = dd.stderr.read()
                        m = rBytes.match(line)
                        #sys.stderr.buffer.write(line)
                        if m:
                            x = int(m.groups()[0])
                            zenity.stdin.write(('%f\n' % calcPercent(x)).encode())
                            zenity.stdin.flush()
                    if (ev & select.EPOLLHUP):
                        #dd exited
                        pass#The containing loop will handle this; don't need to take action
                if fn == zenity.stdin.fileno():
                    if (ev & select.EPOLLHUP):#TODO: WHY DOESN'T THIS ACTIVATE??
                        #Zenity exited
                        dd.terminate()
        if dd.returncode == 0:
            #dd exited successfully
            zenity.stdin.write('100\n'.encode())
            zenity.stdin.flush()
        else:
            zenity.terminate()

# Functions below here #

def parseifname(argv=sys.argv[:1],default='/dev/stdin'):
    '''Given dd's argument list,attempts to return the name of that file which dd would use as its input file'''
    M = re.compile(r'^if=(.*)$')
    ifname = default
    for x in argv:
        m = M.match(x)
        if m:
            ifname = m.groups()[0]
    return ifname

def sizeof(fname):
    '''Attempts to find the length,in bytes,of the given file or block device'''
    s = os.stat(fname)
    m = s.st_mode
    try:
        if stat.S_ISREG(m):
            #Regular File
            n = s.st_size
        elif stat.S_ISBLK(m):
            #Block Device
            n = int(subprocess.check_output(['lsblk','-b','-n','-l','-o','SIZE','-d',fname]))
        else:
            raise ValueError("file is neither a standard nor block file")
    except:
        #Unidentifiable
        n = None
    return n

def genCalcPercent(n):
    '''Given n,returns a function which,given x,returns either x as a percentage of n,or some sane stand-in for such'''
    if n:
        #Input file size was identified
        return lambda x: 100 * x / n
    else:
        #Input file size was unidentifiable,zero,or otherwise falsy
        #we'll at least  try to visually show progress
        return lambda x: 99.99999 * (1 - 0.5 ** (x / 2**32))

def set_nonblocking(fd=sys.stdin):
    '''Appends os.O_NONBLOCK to the given file descriptor's flags.'''
    return fcntl.fcntl(
     fd,fcntl.F_SETFL,fcntl.fcntl(fd,fcntl.F_GETFL)
      | os.O_NONBLOCK
    )

def set_direct(fd=sys.stdout):
    '''Appends os.O_SYNC to the given file descriptor's flags.'''
    return fcntl.fcntl(
     fd,fcntl.F_GETFL)
      | os.O_SYNC
    )

class ePoll:
    '''Thin contextlib wrapper around select.epoll; allows tersely watching multiple events'''
    def __init__(self,fdSpecs):
        self._E = select.epoll()
        self._fds = []
        for fd,opt in fdSpecs:
            self._E.register(fd,opt)
            self._fds.append(fd)
    def __enter__(self):
        return self._E
    def __exit__(self,exc_type,exc_value,traceback):
        for fd in self._fds:
            self._E.unregister(fd)
        self._E.close()

if __name__=='__main__':
    main()

解决方法

事实证明答案很简单:使用EPOLLERR而不是EPOLLHUP

我对这种正确解决方案*表示严重怀疑,但是它确实可以正常工作

import select,subprocess,time

E = select.epoll()

p = subprocess.Popen(["sh","-c","sleep 3"],stdin=subprocess.PIPE)

#time.sleep(5) #Uncomment this line to convince yourself there is no race-condition here
E.register(p.stdin,select.EPOLLERR)

print("Polling...")
evs = E.poll()
print("Caught events!")

assert (p.stdin.fileno(),select.EPOLLERR) in evs

E.close()

*如果这不是正确的解决方案,那么即使在现在,我还是非常想发现正确的解决方案。


这里是原始问题的脚本the completed version,如果有人在意的话:

#!/usr/bin/env python
from __future__ import division
import sys,os,stat,fcntl,select,re
from functools import reduce

#TODO: rewrite this whole program as a sub-512-byte Perl script

def main(args=sys.argv[1:]):

    dcmd = ['dd'] + args + ['status=progress']

    zcmd = ['zenity','--progress','--time-remaining','--title=dd','--text=%s' % '\t'.join(args),'--cancel-label=Abort','--ok-label=Done',]

    fname = parseifname(args)
    n = sizeof(fname)#or (parsebs(args) * parsecount(args))#TODO

    calcPercent = genCalcPercent(n)

    # https://git.savannah.gnu.org/cgit/coreutils.git/tree/src/dd.c?h=v8.32#n814
    rBytes = re.compile(r'^\r?(\d+) byte(?:s(?: \([^\)]+\))?)? copied'.encode())

    with epoll_with() as E:

        #Launch Zenity
        zenity = subprocess.Popen(zcmd,stdin=subprocess.PIPE)
        modfl(zenity.stdin,(+os.O_SYNC,))

        E.register(zenity.stdin,select.EPOLLHUP
        )

        if sys.version_info.major == 3:
            #TODO: why doesn't O_SYNC work on Python 3??? ARGH
            def w(*a,**k):
                i = zenity.stdin.write(*a,**k)
                zenity.stdin.flush()
                return i
        else:
            w = zenity.stdin.write

        #set title/status
        w(('#%s\n' % ' '.join(args)).encode())

        #Launch dd
        dd = subprocess.Popen(dcmd,stderr=subprocess.PIPE)
        modfl(dd.stderr,(+os.O_NONBLOCK,))

        E.register(dd.stderr,select.EPOLLIN
          | select.EPOLLHUP
        )

        break_main_loop = False
        while not break_main_loop:
            events = E.poll()
            for fn,event in events:

                if (fn == zenity.stdin.fileno()) and (event & (select.EPOLLHUP | select.EPOLLERR)):
                    #Zenity exited
                    dd.terminate()
                    break_main_loop = True
                    continue

                if (fn == dd.stderr.fileno()) and (event & select.EPOLLIN):
                    #dd sent some output
                    line = dd.stderr.read()

                    m = rBytes.match(line)
                    if m:
                        x = int(m.groups()[0])

                        w(('%f\n' % calcPercent(x)).encode())
                    #else:
                    #   print(line,file=sys.stderr)
                    continue

                if (fn == dd.stderr.fileno()) and (event & (select.EPOLLHUP | select.EPOLLERR)):
                    #dd exited
                    break_main_loop = True
                    continue

    #wait for dd to finish being terminated,if necessary; drain any of its final output
    dd.communicate()
    assert dd.returncode is not None

    if dd.returncode == 0:
        #dd exited successfully
        try:
            w('100\n'.encode())
            zenity.stdin.close()
        except BrokenPipeError:
            #Zenity was already exited for some reason
            pass
    else:
        #dd exited unsuccessfully
        #(almost certainly user-cancelled)
        zenity.terminate()

    return dd.returncode

# Functions below here #

def parseifname(args=sys.argv[:1],default='/dev/stdin'):
    '''Given dd's argument list,attempts to return the name of that file which dd would use as its input file'''
    M = re.compile(r'^if=(.*)$')
    ifname = default
    for arg in args:
        m = M.match(arg)
        if m:
            ifname = m.groups()[0]
    return ifname

def sizeof(fname):
    '''Attempts to find the length,in bytes,of the given file or block device'''
    s = os.stat(fname)
    m = s.st_mode
    try:
        if stat.S_ISREG(m):
            #Regular File
            return s.st_size
        elif stat.S_ISBLK(m):
            #Block Device
            return int(subprocess.check_output(['lsblk','-b','-n','-l','-o','SIZE','-d',fname]))
        else:
            raise ValueError("file is neither a standard nor block file")
    except Exception:
        #Also catches e.g. errors running lsblk
        return None

def genCalcPercent(n):
    '''Given n,returns a function which,given x,returns either x as a percentage of n,or some sane stand-in for such'''
    if n:
        #Input file size was identified
        return lambda x: 100 * x / n
    else:
        #Input file size was unidentifiable,zero,or otherwise falsy
        #we'll at least  try to visually show progress:
        # a continuous exponential approach to (100-1e-5)%
        # which passes 50% at 4GiB transferred
        return lambda x: 99.99999 * (1 - 0.5 ** (x / 2**32))

def modfl(fd,flags):
    '''Adds all given positive flags to,and removes all given negative flags from,the given file descriptor'''
    # e.g.: modfl(f,-os.O_NONBLOCK))
    # would make f synchronous and blocking

    #OR-in positive flags; NAND-out negative ones
    ins = lambda fl,x: (fl | x) if x >= 0 else (fl &~ -x)

    # 1. Get the current flag field
    cur = fcntl.fcntl(fd,fcntl.F_GETFL)
    # 2. Calculate the new flag field
    new = reduce(ins,flags,cur)
    # 3. Apply the new flag field
    return fcntl.fcntl(fd,fcntl.F_SETFL,new)

if hasattr(select.epoll,'__enter__'):
    epoll_with = select.epoll
else:
    class epoll_with:
        '''contextlib wrapper for python2's epoll'''
        def __init__(self,*args,**kwargs):
            self._E = select.epoll(*args,**kwargs)
        def __enter__(self):
            return self._E
        def __exit__(self,exc_type,exc_value,traceback):
            self._E.close()

if __name__ == '__main__':
    sys.exit(main())

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)> insert overwrite table dwd_trade_cart_add_inc > select data.id, > data.user_id, > data.course_id, > date_format(
错误1 hive (edu)> insert into huanhuan values(1,'haoge'); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive> show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 <configuration> <property> <name>yarn.nodemanager.res