diff --git a/subprocess2.py b/subprocess2.py index 810631ae2..430010c14 100644 --- a/subprocess2.py +++ b/subprocess2.py @@ -13,11 +13,16 @@ import errno import logging import os import Queue +import select import subprocess import sys import time import threading +if sys.platform != 'win32': + import fcntl + + # Constants forwarded from subprocess. PIPE = subprocess.PIPE STDOUT = subprocess.STDOUT @@ -203,7 +208,7 @@ def Popen(args, **kwargs): def _queue_pipe_read(pipe, name, done, dest): - """Queue characters read from a pipe into a queue. + """Queues characters read from a pipe into a queue. Left outside the _tee_threads function to not introduce a function closure to speed up variable lookup. @@ -336,6 +341,80 @@ def _tee_threads(proc, timeout, start, stdin, args, kwargs): return proc.returncode +def _read_pipe(handles, pipe, out_fn): + """Reads bytes from a pipe and calls the output callback.""" + data = pipe.read() + if not data: + del handles[pipe] + else: + out_fn(data) + + +def _tee_posix(proc, timeout, start, stdin, args, kwargs): + """Polls a process and its pipe using select.select(). + + TODO(maruel): Implement a non-polling method for OSes that support it. + """ + handles_r = {} + if callable(kwargs.get('stdout')): + handles_r[proc.stdout] = lambda: _read_pipe( + handles_r, proc.stdout, kwargs['stdout']) + if callable(kwargs.get('stderr')): + handles_r[proc.stderr] = lambda: _read_pipe( + handles_r, proc.stderr, kwargs['stderr']) + + handles_w = {} + if isinstance(stdin, str): + stdin_io = cStringIO.StringIO(stdin) + def write_stdin(): + data = stdin_io.read(1) + if data: + proc.stdin.write(data) + else: + del handles_w[proc.stdin] + proc.stdin.close() + handles_w[proc.stdin] = write_stdin + else: + # TODO(maruel): Fix me, it could be VOID. + assert stdin is None + + # Make all the file objects of the child process non-blocking file. + # TODO(maruel): Test if a pipe is handed to the child process. + for pipe in (proc.stdin, proc.stdout, proc.stderr): + fileno = pipe and getattr(pipe, 'fileno', lambda: None)() + if fileno: + # Note: making a pipe non-blocking means the C stdio could act wrong. In + # particular, readline() cannot be used. Work around is to use os.read(). + fl = fcntl.fcntl(fileno, fcntl.F_GETFL) + fcntl.fcntl(fileno, fcntl.F_SETFL, fl | os.O_NONBLOCK) + + timed_out = False + while handles_r or handles_w or (timeout and proc.poll() is None): + period = None + if timeout: + period = max(0, timeout - (time.time() - start)) + if not period and not timed_out: + proc.kill() + timed_out = True + if timed_out: + period = 0.001 + + # It reconstructs objects on each loop, not very efficient. + reads, writes, _, = select.select( + handles_r.keys(), handles_w.keys(), [], period) + for read in reads: + handles_r[read]() + for write in writes: + handles_w[write]() + + # No pipe open anymore and if there was a time out, the child process was + # killed already. + proc.wait() + if timed_out: + return TIMED_OUT + return proc.returncode + + def communicate(args, timeout=None, **kwargs): """Wraps subprocess.Popen().communicate(). @@ -386,7 +465,12 @@ def communicate(args, timeout=None, **kwargs): if kwargs.get('stderr') == PIPE: stderr = [] kwargs['stderr'] = stderr.append - returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs) + if sys.platform == 'win32': + # On cygwin, ctypes._FUNCFLAG_STDCALL, which is used by ctypes.WINFUNCTYPE, + # doesn't exist so _tee_win() cannot be used yet. + returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs) + else: + returncode = _tee_posix(proc, timeout, start, stdin, args, kwargs) if not stdout is None: stdout = ''.join(stdout) if not stderr is None: diff --git a/tests/subprocess2_test.py b/tests/subprocess2_test.py index 6d31d1882..cb390751e 100755 --- a/tests/subprocess2_test.py +++ b/tests/subprocess2_test.py @@ -12,6 +12,11 @@ import sys import time import unittest +try: + import fcntl +except ImportError: + fcntl = None + ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, ROOT_DIR) @@ -182,6 +187,16 @@ class S2Test(unittest.TestCase): super(S2Test, self).setUp() self.exe_path = __file__ self.exe = [sys.executable, self.exe_path, '--child'] + self.states = {} + if fcntl: + for v in (sys.stdin, sys.stdout, sys.stderr): + fileno = v.fileno() + self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL) + + def tearDown(self): + for fileno, fl in self.states.iteritems(): + self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL)) + super(S2Test, self).tearDown() def _run_test(self, function): """Runs tests in 6 combinations: