diff --git a/subprocess2.py b/subprocess2.py index 430010c14..4708aadf3 100644 --- a/subprocess2.py +++ b/subprocess2.py @@ -8,21 +8,15 @@ In theory you shouldn't need anything else in subprocess, or this module failed. """ from __future__ import with_statement -import cStringIO import errno import logging import os -import Queue -import select import subprocess import sys +import tempfile import time import threading -if sys.platform != 'win32': - import fcntl - - # Constants forwarded from subprocess. PIPE = subprocess.PIPE STDOUT = subprocess.STDOUT @@ -182,9 +176,6 @@ def Popen(args, **kwargs): # When the pipe fills up, it will deadlock this process. Using a real file # works around that issue. kwargs[stream] = open(os.devnull, 'w') - if callable(kwargs.get(stream)): - # Callable stdout/stderr should be used only with call() wrappers. - kwargs[stream] = PIPE fix('stdout') fix('stderr') @@ -207,214 +198,6 @@ def Popen(args, **kwargs): raise -def _queue_pipe_read(pipe, name, done, dest): - """Queues characters read from a pipe into a queue. - - Left outside the _tee_threads function to not introduce a function closure - to speed up variable lookup. - """ - while not done.isSet(): - data = pipe.read(1) - if not data: - break - dest.put((name, data)) - dest.put(name) - - -def _tee_threads(proc, timeout, start, stdin, args, kwargs): - """Does I/O for a process's pipes using thread. - - It's the simplest and slowest implementation. Expect very slow behavior. - - If there is a callback and it doesn't keep up with the calls, the timeout - effectiveness will be delayed accordingly. - """ - # TODO(maruel): Implement a select based implementation on POSIX and a Windows - # one using WaitForMultipleObjects(). - # - # Queue of either of when done or (, data). - # In theory we would like to limit to ~64kb items to not cause large memory - # usage when the callback blocks. It is not done because it slows down - # processing on OSX10.6 by a factor of 2x, making it even slower than Windows! - # Revisit this decision if it becomes a problem, e.g. crash because of - # memory exhaustion. - queue = Queue.Queue() - done = threading.Event() - - def write_stdin(): - stdin_io = cStringIO.StringIO(stdin) - while not done.isSet(): - data = stdin_io.read(1024) - if data: - proc.stdin.write(data) - else: - proc.stdin.close() - break - queue.put('stdin') - - def timeout_fn(): - done.wait(timeout) - # No need to close the pipes since killing should be sufficient. - queue.put('timeout') - - # Starts up to 4 threads: - # Read stdout - # Read stderr - # Write stdin - # Timeout - threads = {} - if timeout is not None: - threads['timeout'] = threading.Thread(target=timeout_fn) - if callable(kwargs.get('stdout')): - threads['stdout'] = threading.Thread( - target=_queue_pipe_read, args=(proc.stdout, 'stdout', done, queue)) - if callable(kwargs.get('stderr')): - threads['stderr'] = threading.Thread( - target=_queue_pipe_read, - args=(proc.stderr, 'stderr', done, queue)) - if isinstance(stdin, str): - threads['stdin'] = threading.Thread(target=write_stdin) - for t in threads.itervalues(): - t.daemon = True - t.start() - - timed_out = False - try: - while proc.returncode is None: - assert threads - proc.poll() - item = queue.get() - if isinstance(item, str): - threads[item].join() - del threads[item] - if item == 'timeout' and not timed_out and proc.poll() is None: - logging.debug('Timed out: killing') - proc.kill() - timed_out = True - if not threads: - # We won't be waken up anymore. Need to busy loop. - break - else: - kwargs[item[0]](item[1]) - finally: - # Stop the threads. - done.set() - # Join threads - for thread in threads.itervalues(): - thread.join() - - # Flush the queue. - try: - while True: - item = queue.get(False) - if isinstance(item, str): - if item == 'timeout': - # TODO(maruel): Does it make sense at that point? - if not timed_out and proc.poll() is None: - logging.debug('Timed out: killing') - proc.kill() - timed_out = True - else: - kwargs[item[0]](item[1]) - except Queue.Empty: - pass - - # Get the remainder. - if callable(kwargs.get('stdout')): - data = proc.stdout.read() - while data: - kwargs['stdout'](data) - data = proc.stdout.read() - if callable(kwargs.get('stderr')): - data = proc.stderr.read() - while data: - kwargs['stderr'](data) - data = proc.stderr.read() - - if proc.returncode is None: - # Usually happens when killed with timeout but not listening to pipes. - proc.wait() - - if timed_out: - return TIMED_OUT - - return proc.returncode - - -def _read_pipe(handles, pipe, out_fn): - """Reads bytes from a pipe and calls the output callback.""" - data = pipe.read() - if not data: - del handles[pipe] - else: - out_fn(data) - - -def _tee_posix(proc, timeout, start, stdin, args, kwargs): - """Polls a process and its pipe using select.select(). - - TODO(maruel): Implement a non-polling method for OSes that support it. - """ - handles_r = {} - if callable(kwargs.get('stdout')): - handles_r[proc.stdout] = lambda: _read_pipe( - handles_r, proc.stdout, kwargs['stdout']) - if callable(kwargs.get('stderr')): - handles_r[proc.stderr] = lambda: _read_pipe( - handles_r, proc.stderr, kwargs['stderr']) - - handles_w = {} - if isinstance(stdin, str): - stdin_io = cStringIO.StringIO(stdin) - def write_stdin(): - data = stdin_io.read(1) - if data: - proc.stdin.write(data) - else: - del handles_w[proc.stdin] - proc.stdin.close() - handles_w[proc.stdin] = write_stdin - else: - # TODO(maruel): Fix me, it could be VOID. - assert stdin is None - - # Make all the file objects of the child process non-blocking file. - # TODO(maruel): Test if a pipe is handed to the child process. - for pipe in (proc.stdin, proc.stdout, proc.stderr): - fileno = pipe and getattr(pipe, 'fileno', lambda: None)() - if fileno: - # Note: making a pipe non-blocking means the C stdio could act wrong. In - # particular, readline() cannot be used. Work around is to use os.read(). - fl = fcntl.fcntl(fileno, fcntl.F_GETFL) - fcntl.fcntl(fileno, fcntl.F_SETFL, fl | os.O_NONBLOCK) - - timed_out = False - while handles_r or handles_w or (timeout and proc.poll() is None): - period = None - if timeout: - period = max(0, timeout - (time.time() - start)) - if not period and not timed_out: - proc.kill() - timed_out = True - if timed_out: - period = 0.001 - - # It reconstructs objects on each loop, not very efficient. - reads, writes, _, = select.select( - handles_r.keys(), handles_w.keys(), [], period) - for read in reads: - handles_r[read]() - for write in writes: - handles_w[write]() - - # No pipe open anymore and if there was a time out, the child process was - # killed already. - proc.wait() - if timed_out: - return TIMED_OUT - return proc.returncode - - def communicate(args, timeout=None, **kwargs): """Wraps subprocess.Popen().communicate(). @@ -424,11 +207,6 @@ def communicate(args, timeout=None, **kwargs): TIMED_OUT. - Automatically passes stdin content as input so do not specify stdin=PIPE. """ - if timeout and kwargs.get('shell'): - raise TypeError( - 'Using timeout and shell simultaneously will cause a process leak ' - 'since the shell will be killed instead of the child process.') - stdin = kwargs.pop('stdin', None) if stdin is not None: if stdin is VOID: @@ -440,42 +218,36 @@ def communicate(args, timeout=None, **kwargs): # set the Popen() parameter accordingly. kwargs['stdin'] = PIPE - start = time.time() - proc = Popen(args, **kwargs) - need_buffering = (timeout or - callable(kwargs.get('stdout')) or callable(kwargs.get('stderr'))) - - if not need_buffering: + if not timeout: # Normal workflow. - if stdin not in (None, VOID): + proc = Popen(args, **kwargs) + if stdin is not None: return proc.communicate(stdin), proc.returncode else: return proc.communicate(), proc.returncode - stdout = None - stderr = None - # Convert to a lambda to workaround python's deadlock. + # Create a temporary file to workaround python's deadlock. # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait - # When the pipe fills up, it will deadlock this process. Using a thread - # works around that issue. No need for thread safe function since the call - # backs are guaranteed to be called from the main thread. - if kwargs.get('stdout') == PIPE: - stdout = [] - kwargs['stdout'] = stdout.append - if kwargs.get('stderr') == PIPE: - stderr = [] - kwargs['stderr'] = stderr.append - if sys.platform == 'win32': - # On cygwin, ctypes._FUNCFLAG_STDCALL, which is used by ctypes.WINFUNCTYPE, - # doesn't exist so _tee_win() cannot be used yet. - returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs) - else: - returncode = _tee_posix(proc, timeout, start, stdin, args, kwargs) - if not stdout is None: - stdout = ''.join(stdout) - if not stderr is None: - stderr = ''.join(stderr) - return (stdout, stderr), returncode + # When the pipe fills up, it will deadlock this process. Using a real file + # works around that issue. + with tempfile.TemporaryFile() as buff: + start = time.time() + kwargs['stdout'] = buff + proc = Popen(args, **kwargs) + if stdin is not None: + proc.stdin.write(stdin) + while proc.returncode is None: + proc.poll() + if timeout and (time.time() - start) > timeout: + proc.kill() + proc.wait() + # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT. + proc.returncode = TIMED_OUT + time.sleep(0.001) + # Now that the process died, reset the cursor and read the file. + buff.seek(0) + out = [buff.read(), None] + return out, proc.returncode def call(args, **kwargs): diff --git a/tests/subprocess2_test.py b/tests/subprocess2_test.py index cb390751e..9d30452c3 100755 --- a/tests/subprocess2_test.py +++ b/tests/subprocess2_test.py @@ -5,18 +5,12 @@ """Unit tests for subprocess2.py.""" -import logging import optparse import os import sys import time import unittest -try: - import fcntl -except ImportError: - fcntl = None - ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, ROOT_DIR) @@ -25,25 +19,7 @@ import subprocess2 # Method could be a function # pylint: disable=R0201 - -def convert_to_crlf(string): - """Unconditionally convert LF to CRLF.""" - return string.replace('\n', '\r\n') - - -def convert_to_cr(string): - """Unconditionally convert LF to CR.""" - return string.replace('\n', '\r') - - -def convert_win(string): - """Converts string to CRLF on Windows only.""" - if sys.platform == 'win32': - return string.replace('\n', '\r\n') - return string - - -class DefaultsTest(unittest.TestCase): +class Subprocess2Test(unittest.TestCase): # Can be mocked in a test. TO_SAVE = { subprocess2: [ @@ -52,6 +28,8 @@ class DefaultsTest(unittest.TestCase): } def setUp(self): + self.exe_path = __file__ + self.exe = [sys.executable, self.exe_path, '--child'] self.saved = {} for module, names in self.TO_SAVE.iteritems(): self.saved[module] = dict( @@ -168,67 +146,14 @@ class DefaultsTest(unittest.TestCase): } self.assertEquals(expected, results) - def test_timeout_shell_throws(self): - # Never called. - _ = self._fake_Popen() - try: - subprocess2.communicate( - sys.executable, - timeout=0.01, - stdout=subprocess2.PIPE, - shell=True) - self.fail() - except TypeError: - pass - - -class S2Test(unittest.TestCase): - def setUp(self): - super(S2Test, self).setUp() - self.exe_path = __file__ - self.exe = [sys.executable, self.exe_path, '--child'] - self.states = {} - if fcntl: - for v in (sys.stdin, sys.stdout, sys.stderr): - fileno = v.fileno() - self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL) - - def tearDown(self): - for fileno, fl in self.states.iteritems(): - self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL)) - super(S2Test, self).tearDown() - - def _run_test(self, function): - """Runs tests in 6 combinations: - - LF output with universal_newlines=False - - CR output with universal_newlines=False - - CRLF output with universal_newlines=False - - LF output with universal_newlines=True - - CR output with universal_newlines=True - - CRLF output with universal_newlines=True - - First |function| argument is the convertion for the origianl expected LF - string to the right EOL. - Second |function| argument is the executable and initial flag to run, to - control what EOL is used by the child process. - Third |function| argument is universal_newlines value. - """ - noop = lambda x: x - function(noop, self.exe, False) - function(convert_to_cr, self.exe + ['--cr'], False) - function(convert_to_crlf, self.exe + ['--crlf'], False) - function(noop, self.exe, True) - function(noop, self.exe + ['--cr'], True) - function(noop, self.exe + ['--crlf'], True) - def test_timeout(self): + # It'd be better to not discard stdout. out, returncode = subprocess2.communicate( - self.exe + ['--sleep_first', '--stdout'], + self.exe + ['--sleep', '--stdout'], timeout=0.01, - stdout=subprocess2.PIPE, - shell=False) + stdout=subprocess2.PIPE) self.assertEquals(subprocess2.TIMED_OUT, returncode) - self.assertEquals(('', None), out) + self.assertEquals(['', None], out) def test_check_output_no_stdout(self): try: @@ -238,78 +163,68 @@ class S2Test(unittest.TestCase): pass def test_stdout_void(self): - def fn(c, e, un): - (out, err), code = subprocess2.communicate( - e + ['--stdout', '--stderr'], - stdout=subprocess2.VOID, - stderr=subprocess2.PIPE, - universal_newlines=un) - self.assertEquals(None, out) - self.assertEquals(c('a\nbb\nccc\n'), err) - self.assertEquals(0, code) - self._run_test(fn) + (out, err), code = subprocess2.communicate( + self.exe + ['--stdout', '--stderr'], + stdout=subprocess2.VOID, + stderr=subprocess2.PIPE) + self.assertEquals(None, out) + expected = 'a\nbb\nccc\n' + if sys.platform == 'win32': + expected = expected.replace('\n', '\r\n') + self.assertEquals(expected, err) + self.assertEquals(0, code) def test_stderr_void(self): - def fn(c, e, un): - (out, err), code = subprocess2.communicate( - e + ['--stdout', '--stderr'], - stdout=subprocess2.PIPE, - stderr=subprocess2.VOID, - universal_newlines=un) - self.assertEquals(c('A\nBB\nCCC\n'), out) - self.assertEquals(None, err) - self.assertEquals(0, code) - self._run_test(fn) + (out, err), code = subprocess2.communicate( + self.exe + ['--stdout', '--stderr'], + universal_newlines=True, + stdout=subprocess2.PIPE, + stderr=subprocess2.VOID) + self.assertEquals('A\nBB\nCCC\n', out) + self.assertEquals(None, err) + self.assertEquals(0, code) def test_check_output_throw_stdout(self): - def fn(c, e, un): - try: - subprocess2.check_output( - e + ['--fail', '--stdout'], universal_newlines=un) - self.fail() - except subprocess2.CalledProcessError, e: - self.assertEquals(c('A\nBB\nCCC\n'), e.stdout) - self.assertEquals(None, e.stderr) - self.assertEquals(64, e.returncode) - self._run_test(fn) + try: + subprocess2.check_output( + self.exe + ['--fail', '--stdout'], universal_newlines=True) + self.fail() + except subprocess2.CalledProcessError, e: + self.assertEquals('A\nBB\nCCC\n', e.stdout) + self.assertEquals(None, e.stderr) + self.assertEquals(64, e.returncode) def test_check_output_throw_no_stderr(self): - def fn(c, e, un): - try: - subprocess2.check_output( - e + ['--fail', '--stderr'], universal_newlines=un) - self.fail() - except subprocess2.CalledProcessError, e: - self.assertEquals(c(''), e.stdout) - self.assertEquals(None, e.stderr) - self.assertEquals(64, e.returncode) - self._run_test(fn) + try: + subprocess2.check_output( + self.exe + ['--fail', '--stderr'], universal_newlines=True) + self.fail() + except subprocess2.CalledProcessError, e: + self.assertEquals('', e.stdout) + self.assertEquals(None, e.stderr) + self.assertEquals(64, e.returncode) def test_check_output_throw_stderr(self): - def fn(c, e, un): - try: - subprocess2.check_output( - e + ['--fail', '--stderr'], stderr=subprocess2.PIPE, - universal_newlines=un) - self.fail() - except subprocess2.CalledProcessError, e: - self.assertEquals('', e.stdout) - self.assertEquals(c('a\nbb\nccc\n'), e.stderr) - self.assertEquals(64, e.returncode) - self._run_test(fn) + try: + subprocess2.check_output( + self.exe + ['--fail', '--stderr'], stderr=subprocess2.PIPE, + universal_newlines=True) + self.fail() + except subprocess2.CalledProcessError, e: + self.assertEquals('', e.stdout) + self.assertEquals('a\nbb\nccc\n', e.stderr) + self.assertEquals(64, e.returncode) def test_check_output_throw_stderr_stdout(self): - def fn(c, e, un): - try: - subprocess2.check_output( - e + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, - universal_newlines=un) - self.fail() - except subprocess2.CalledProcessError, e: - self.assertEquals(c('a\nbb\nccc\n'), e.stdout) - self.assertEquals(None, e.stderr) - self.assertEquals(64, e.returncode) - self._run_test(fn) + try: + subprocess2.check_output( + self.exe + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, + universal_newlines=True) + self.fail() + except subprocess2.CalledProcessError, e: + self.assertEquals('a\nbb\nccc\n', e.stdout) + self.assertEquals(None, e.stderr) + self.assertEquals(64, e.returncode) def test_check_call_throw(self): try: @@ -320,87 +235,8 @@ class S2Test(unittest.TestCase): self.assertEquals(None, e.stderr) self.assertEquals(64, e.returncode) - def test_check_output_tee_stderr(self): - def fn(c, e, un): - stderr = [] - out, returncode = subprocess2.communicate( - e + ['--stderr'], stderr=stderr.append, - universal_newlines=un) - self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) - self.assertEquals((None, None), out) - self.assertEquals(0, returncode) - self._run_test(fn) - - def test_check_output_tee_stdout_stderr(self): - def fn(c, e, un): - stdout = [] - stderr = [] - out, returncode = subprocess2.communicate( - e + ['--stdout', '--stderr'], - stdout=stdout.append, - stderr=stderr.append, - universal_newlines=un) - self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) - self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) - self.assertEquals((None, None), out) - self.assertEquals(0, returncode) - self._run_test(fn) - - def test_check_output_tee_stdin(self): - def fn(c, e, un): - stdout = [] - stdin = '0123456789' - out, returncode = subprocess2.communicate( - e + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append, - universal_newlines=un) - self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) - self.assertEquals((None, None), out) - self.assertEquals(0, returncode) - self._run_test(fn) - - def test_check_output_tee_throw(self): - def fn(c, e, un): - stderr = [] - try: - subprocess2.check_output( - e + ['--stderr', '--fail'], stderr=stderr.append, - universal_newlines=un) - self.fail() - except subprocess2.CalledProcessError, e: - self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) - self.assertEquals('', e.stdout) - self.assertEquals(None, e.stderr) - self.assertEquals(64, e.returncode) - self._run_test(fn) - - def test_check_output_tee_large(self): - stdout = [] - # Read 128kb. On my workstation it takes >2s. Welcome to 2011. - out, returncode = subprocess2.communicate( - self.exe + ['--large'], stdout=stdout.append) - self.assertEquals(128*1024, len(''.join(stdout))) - self.assertEquals((None, None), out) - self.assertEquals(0, returncode) - - def test_check_output_tee_large_stdin(self): - stdout = [] - # Write 128kb. - stdin = '0123456789abcdef' * (8*1024) - out, returncode = subprocess2.communicate( - self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append) - self.assertEquals(128*1024, len(''.join(stdout))) - self.assertEquals((None, None), out) - self.assertEquals(0, returncode) - def child_main(args): - if sys.platform == 'win32': - # Annoying, make sure the output is not translated on Windows. - # pylint: disable=E1101,F0401 - import msvcrt - msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) - msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) - parser = optparse.OptionParser() parser.add_option( '--fail', @@ -408,52 +244,28 @@ def child_main(args): action='store_const', default=0, const=64) - parser.add_option( - '--crlf', action='store_const', const='\r\n', dest='eol', default='\n') - parser.add_option( - '--cr', action='store_const', const='\r', dest='eol') parser.add_option('--stdout', action='store_true') parser.add_option('--stderr', action='store_true') - parser.add_option('--sleep_first', action='store_true') - parser.add_option('--sleep_last', action='store_true') - parser.add_option('--large', action='store_true') - parser.add_option('--read', action='store_true') + parser.add_option('--sleep', action='store_true') options, args = parser.parse_args(args) if args: parser.error('Internal error') - if options.sleep_first: - time.sleep(10) def do(string): if options.stdout: - sys.stdout.write(string.upper()) - sys.stdout.write(options.eol) + print >> sys.stdout, string.upper() if options.stderr: - sys.stderr.write(string.lower()) - sys.stderr.write(options.eol) + print >> sys.stderr, string.lower() do('A') do('BB') do('CCC') - if options.large: - # Print 128kb. - string = '0123456789abcdef' * (8*1024) - sys.stdout.write(string) - if options.read: - try: - while sys.stdin.read(): - pass - except OSError: - pass - if options.sleep_last: + if options.sleep: time.sleep(10) return options.return_value if __name__ == '__main__': - logging.basicConfig(level= - [logging.WARNING, logging.INFO, logging.DEBUG][ - min(2, sys.argv.count('-v'))]) if len(sys.argv) > 1 and sys.argv[1] == '--child': sys.exit(child_main(sys.argv[2:])) unittest.main()