如何解决在Popen.stdin上轮询/等待HUP
我正在用Python编写Zenity的基于GNU dd(1)
的状态监视器。
由于我要定位的系统的限制,包装程序必须在Python 2上运行,并且不能拉入外部库。
在Zenity的“取消”按钮中,如果尚未完成,则必须终止dd。
我必须立即执行以下操作(即立即触发/驱动/立即执行);如果同时满足/触发了以下多个条件,则应按列出的顺序执行它们:
- Zenity退出时,终止dd
- 当dd写入其
stderr
时,请munge +将数据转发给Zenity的stdin
- 当dd退出时,如果其返回码不为0,则终止Zenity
但是,epoll对象似乎仅在dd的输出上触发;尽管我在Zenity的EPOLLHUP
上注册了stdin
,但它从未触发Zenity退出。
应该/如何做?我知道epoll是可用于正确触发dd
输出(通过EPOLLIN
)的唯一原语;我还了解到,这是一个很古怪的原语,可能不适合触发Zenity的退出。 (如果需要,我可以在该文件中实现更多的逻辑;这样做比引入任何第三方库都更可取,无论它多么小或“普通”。我重申,我了解epoll难以使用,可能需要大量的胶合逻辑。)
或者:如果epoll不是监视subprocess
退出的正确原语,那么在Python 2兼容的情况下,监视子进程退出的同时监视子进程的输出的正确方法是什么?方式?
(我并不是天生就需要多线程功能;按顺序执行所有操作将完全在规范范围内;不过,如果在这种情况下绝对需要多线程编程,以避免繁琐的操作,循环,就这样吧。)
下面是到目前为止我的完整代码。
#!/usr/bin/env python
from __future__ import division
import sys,os,stat,fcntl,select,subprocess,re
def main(args=sys.argv[1:]):
fname = parseifname(args)
n = sizeof(fname)
dcmd = ['dd'] + args + ['status=progress']
zcmd = ['zenity','--progress','--time-remaining']
#Launch dd
dd = subprocess.Popen(dcmd,stderr=subprocess.PIPE)
set_nonblocking(dd.stderr)
#Launch Zenity
zenity = subprocess.Popen(zcmd,stdin=subprocess.PIPE)
set_direct(zenity.stdin)#TODO: why doesn't this line work?*
#set title/status
zenity.stdin.write(('#%s\n' % ' '.join(dcmd)).encode())
zenity.stdin.flush()#*i.e. instances of this line shouldn't be necessary...
#We want to trigger on all of the following:
toPoll = [
(dd.stderr,select.EPOLLIN #dd status update
| select.EPOLLHUP),#dd exit
(zenity.stdin,select.EPOLLHUP),#Zenity exit
]
calcPercent = genCalcPercent(n)
with ePoll(toPoll) as E:
rBytes = re.compile(r'\r(\d+) bytes'.encode())
while dd.poll() is None:
evs = E.poll()#TODO: I'm not sure if this is blocking,or if I've induced a busy loop...
for fn,ev in evs:
if fn == dd.stderr.fileno():
if (ev & select.EPOLLIN):
#dd sent some output
line = dd.stderr.read()
m = rBytes.match(line)
#sys.stderr.buffer.write(line)
if m:
x = int(m.groups()[0])
zenity.stdin.write(('%f\n' % calcPercent(x)).encode())
zenity.stdin.flush()
if (ev & select.EPOLLHUP):
#dd exited
pass#The containing loop will handle this; don't need to take action
if fn == zenity.stdin.fileno():
if (ev & select.EPOLLHUP):#TODO: WHY DOESN'T THIS ACTIVATE??
#Zenity exited
dd.terminate()
if dd.returncode == 0:
#dd exited successfully
zenity.stdin.write('100\n'.encode())
zenity.stdin.flush()
else:
zenity.terminate()
# Functions below here #
def parseifname(argv=sys.argv[:1],default='/dev/stdin'):
'''Given dd's argument list,attempts to return the name of that file which dd would use as its input file'''
M = re.compile(r'^if=(.*)$')
ifname = default
for x in argv:
m = M.match(x)
if m:
ifname = m.groups()[0]
return ifname
def sizeof(fname):
'''Attempts to find the length,in bytes,of the given file or block device'''
s = os.stat(fname)
m = s.st_mode
try:
if stat.S_ISREG(m):
#Regular File
n = s.st_size
elif stat.S_ISBLK(m):
#Block Device
n = int(subprocess.check_output(['lsblk','-b','-n','-l','-o','SIZE','-d',fname]))
else:
raise ValueError("file is neither a standard nor block file")
except:
#Unidentifiable
n = None
return n
def genCalcPercent(n):
'''Given n,returns a function which,given x,returns either x as a percentage of n,or some sane stand-in for such'''
if n:
#Input file size was identified
return lambda x: 100 * x / n
else:
#Input file size was unidentifiable,zero,or otherwise falsy
#we'll at least try to visually show progress
return lambda x: 99.99999 * (1 - 0.5 ** (x / 2**32))
def set_nonblocking(fd=sys.stdin):
'''Appends os.O_NONBLOCK to the given file descriptor's flags.'''
return fcntl.fcntl(
fd,fcntl.F_SETFL,fcntl.fcntl(fd,fcntl.F_GETFL)
| os.O_NONBLOCK
)
def set_direct(fd=sys.stdout):
'''Appends os.O_SYNC to the given file descriptor's flags.'''
return fcntl.fcntl(
fd,fcntl.F_GETFL)
| os.O_SYNC
)
class ePoll:
'''Thin contextlib wrapper around select.epoll; allows tersely watching multiple events'''
def __init__(self,fdSpecs):
self._E = select.epoll()
self._fds = []
for fd,opt in fdSpecs:
self._E.register(fd,opt)
self._fds.append(fd)
def __enter__(self):
return self._E
def __exit__(self,exc_type,exc_value,traceback):
for fd in self._fds:
self._E.unregister(fd)
self._E.close()
if __name__=='__main__':
main()
解决方法
事实证明答案很简单:使用EPOLLERR
而不是EPOLLHUP
。
我对这种正确解决方案*表示严重怀疑,但是它确实可以正常工作 :
import select,subprocess,time
E = select.epoll()
p = subprocess.Popen(["sh","-c","sleep 3"],stdin=subprocess.PIPE)
#time.sleep(5) #Uncomment this line to convince yourself there is no race-condition here
E.register(p.stdin,select.EPOLLERR)
print("Polling...")
evs = E.poll()
print("Caught events!")
assert (p.stdin.fileno(),select.EPOLLERR) in evs
E.close()
*如果这不是正确的解决方案,那么即使在现在,我还是非常想发现正确的解决方案。
这里是原始问题的脚本the completed version,如果有人在意的话:
#!/usr/bin/env python
from __future__ import division
import sys,os,stat,fcntl,select,re
from functools import reduce
#TODO: rewrite this whole program as a sub-512-byte Perl script
def main(args=sys.argv[1:]):
dcmd = ['dd'] + args + ['status=progress']
zcmd = ['zenity','--progress','--time-remaining','--title=dd','--text=%s' % '\t'.join(args),'--cancel-label=Abort','--ok-label=Done',]
fname = parseifname(args)
n = sizeof(fname)#or (parsebs(args) * parsecount(args))#TODO
calcPercent = genCalcPercent(n)
# https://git.savannah.gnu.org/cgit/coreutils.git/tree/src/dd.c?h=v8.32#n814
rBytes = re.compile(r'^\r?(\d+) byte(?:s(?: \([^\)]+\))?)? copied'.encode())
with epoll_with() as E:
#Launch Zenity
zenity = subprocess.Popen(zcmd,stdin=subprocess.PIPE)
modfl(zenity.stdin,(+os.O_SYNC,))
E.register(zenity.stdin,select.EPOLLHUP
)
if sys.version_info.major == 3:
#TODO: why doesn't O_SYNC work on Python 3??? ARGH
def w(*a,**k):
i = zenity.stdin.write(*a,**k)
zenity.stdin.flush()
return i
else:
w = zenity.stdin.write
#set title/status
w(('#%s\n' % ' '.join(args)).encode())
#Launch dd
dd = subprocess.Popen(dcmd,stderr=subprocess.PIPE)
modfl(dd.stderr,(+os.O_NONBLOCK,))
E.register(dd.stderr,select.EPOLLIN
| select.EPOLLHUP
)
break_main_loop = False
while not break_main_loop:
events = E.poll()
for fn,event in events:
if (fn == zenity.stdin.fileno()) and (event & (select.EPOLLHUP | select.EPOLLERR)):
#Zenity exited
dd.terminate()
break_main_loop = True
continue
if (fn == dd.stderr.fileno()) and (event & select.EPOLLIN):
#dd sent some output
line = dd.stderr.read()
m = rBytes.match(line)
if m:
x = int(m.groups()[0])
w(('%f\n' % calcPercent(x)).encode())
#else:
# print(line,file=sys.stderr)
continue
if (fn == dd.stderr.fileno()) and (event & (select.EPOLLHUP | select.EPOLLERR)):
#dd exited
break_main_loop = True
continue
#wait for dd to finish being terminated,if necessary; drain any of its final output
dd.communicate()
assert dd.returncode is not None
if dd.returncode == 0:
#dd exited successfully
try:
w('100\n'.encode())
zenity.stdin.close()
except BrokenPipeError:
#Zenity was already exited for some reason
pass
else:
#dd exited unsuccessfully
#(almost certainly user-cancelled)
zenity.terminate()
return dd.returncode
# Functions below here #
def parseifname(args=sys.argv[:1],default='/dev/stdin'):
'''Given dd's argument list,attempts to return the name of that file which dd would use as its input file'''
M = re.compile(r'^if=(.*)$')
ifname = default
for arg in args:
m = M.match(arg)
if m:
ifname = m.groups()[0]
return ifname
def sizeof(fname):
'''Attempts to find the length,in bytes,of the given file or block device'''
s = os.stat(fname)
m = s.st_mode
try:
if stat.S_ISREG(m):
#Regular File
return s.st_size
elif stat.S_ISBLK(m):
#Block Device
return int(subprocess.check_output(['lsblk','-b','-n','-l','-o','SIZE','-d',fname]))
else:
raise ValueError("file is neither a standard nor block file")
except Exception:
#Also catches e.g. errors running lsblk
return None
def genCalcPercent(n):
'''Given n,returns a function which,given x,returns either x as a percentage of n,or some sane stand-in for such'''
if n:
#Input file size was identified
return lambda x: 100 * x / n
else:
#Input file size was unidentifiable,zero,or otherwise falsy
#we'll at least try to visually show progress:
# a continuous exponential approach to (100-1e-5)%
# which passes 50% at 4GiB transferred
return lambda x: 99.99999 * (1 - 0.5 ** (x / 2**32))
def modfl(fd,flags):
'''Adds all given positive flags to,and removes all given negative flags from,the given file descriptor'''
# e.g.: modfl(f,-os.O_NONBLOCK))
# would make f synchronous and blocking
#OR-in positive flags; NAND-out negative ones
ins = lambda fl,x: (fl | x) if x >= 0 else (fl &~ -x)
# 1. Get the current flag field
cur = fcntl.fcntl(fd,fcntl.F_GETFL)
# 2. Calculate the new flag field
new = reduce(ins,flags,cur)
# 3. Apply the new flag field
return fcntl.fcntl(fd,fcntl.F_SETFL,new)
if hasattr(select.epoll,'__enter__'):
epoll_with = select.epoll
else:
class epoll_with:
'''contextlib wrapper for python2's epoll'''
def __init__(self,*args,**kwargs):
self._E = select.epoll(*args,**kwargs)
def __enter__(self):
return self._E
def __exit__(self,exc_type,exc_value,traceback):
self._E.close()
if __name__ == '__main__':
sys.exit(main())
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。