diff --git a/subprocess2.py b/subprocess2.py index 4708aadf3..810631ae2 100644 --- a/subprocess2.py +++ b/subprocess2.py @@ -8,12 +8,13 @@ In theory you shouldn't need anything else in subprocess, or this module failed. """ from __future__ import with_statement +import cStringIO import errno import logging import os +import Queue import subprocess import sys -import tempfile import time import threading @@ -176,6 +177,9 @@ def Popen(args, **kwargs): # When the pipe fills up, it will deadlock this process. Using a real file # works around that issue. kwargs[stream] = open(os.devnull, 'w') + if callable(kwargs.get(stream)): + # Callable stdout/stderr should be used only with call() wrappers. + kwargs[stream] = PIPE fix('stdout') fix('stderr') @@ -198,6 +202,140 @@ def Popen(args, **kwargs): raise +def _queue_pipe_read(pipe, name, done, dest): + """Queue characters read from a pipe into a queue. + + Left outside the _tee_threads function to not introduce a function closure + to speed up variable lookup. + """ + while not done.isSet(): + data = pipe.read(1) + if not data: + break + dest.put((name, data)) + dest.put(name) + + +def _tee_threads(proc, timeout, start, stdin, args, kwargs): + """Does I/O for a process's pipes using thread. + + It's the simplest and slowest implementation. Expect very slow behavior. + + If there is a callback and it doesn't keep up with the calls, the timeout + effectiveness will be delayed accordingly. + """ + # TODO(maruel): Implement a select based implementation on POSIX and a Windows + # one using WaitForMultipleObjects(). + # + # Queue of either of when done or (, data). + # In theory we would like to limit to ~64kb items to not cause large memory + # usage when the callback blocks. It is not done because it slows down + # processing on OSX10.6 by a factor of 2x, making it even slower than Windows! + # Revisit this decision if it becomes a problem, e.g. crash because of + # memory exhaustion. + queue = Queue.Queue() + done = threading.Event() + + def write_stdin(): + stdin_io = cStringIO.StringIO(stdin) + while not done.isSet(): + data = stdin_io.read(1024) + if data: + proc.stdin.write(data) + else: + proc.stdin.close() + break + queue.put('stdin') + + def timeout_fn(): + done.wait(timeout) + # No need to close the pipes since killing should be sufficient. + queue.put('timeout') + + # Starts up to 4 threads: + # Read stdout + # Read stderr + # Write stdin + # Timeout + threads = {} + if timeout is not None: + threads['timeout'] = threading.Thread(target=timeout_fn) + if callable(kwargs.get('stdout')): + threads['stdout'] = threading.Thread( + target=_queue_pipe_read, args=(proc.stdout, 'stdout', done, queue)) + if callable(kwargs.get('stderr')): + threads['stderr'] = threading.Thread( + target=_queue_pipe_read, + args=(proc.stderr, 'stderr', done, queue)) + if isinstance(stdin, str): + threads['stdin'] = threading.Thread(target=write_stdin) + for t in threads.itervalues(): + t.daemon = True + t.start() + + timed_out = False + try: + while proc.returncode is None: + assert threads + proc.poll() + item = queue.get() + if isinstance(item, str): + threads[item].join() + del threads[item] + if item == 'timeout' and not timed_out and proc.poll() is None: + logging.debug('Timed out: killing') + proc.kill() + timed_out = True + if not threads: + # We won't be waken up anymore. Need to busy loop. + break + else: + kwargs[item[0]](item[1]) + finally: + # Stop the threads. + done.set() + # Join threads + for thread in threads.itervalues(): + thread.join() + + # Flush the queue. + try: + while True: + item = queue.get(False) + if isinstance(item, str): + if item == 'timeout': + # TODO(maruel): Does it make sense at that point? + if not timed_out and proc.poll() is None: + logging.debug('Timed out: killing') + proc.kill() + timed_out = True + else: + kwargs[item[0]](item[1]) + except Queue.Empty: + pass + + # Get the remainder. + if callable(kwargs.get('stdout')): + data = proc.stdout.read() + while data: + kwargs['stdout'](data) + data = proc.stdout.read() + if callable(kwargs.get('stderr')): + data = proc.stderr.read() + while data: + kwargs['stderr'](data) + data = proc.stderr.read() + + if proc.returncode is None: + # Usually happens when killed with timeout but not listening to pipes. + proc.wait() + + if timed_out: + return TIMED_OUT + + return proc.returncode + + def communicate(args, timeout=None, **kwargs): """Wraps subprocess.Popen().communicate(). @@ -207,6 +345,11 @@ def communicate(args, timeout=None, **kwargs): TIMED_OUT. - Automatically passes stdin content as input so do not specify stdin=PIPE. """ + if timeout and kwargs.get('shell'): + raise TypeError( + 'Using timeout and shell simultaneously will cause a process leak ' + 'since the shell will be killed instead of the child process.') + stdin = kwargs.pop('stdin', None) if stdin is not None: if stdin is VOID: @@ -218,36 +361,37 @@ def communicate(args, timeout=None, **kwargs): # set the Popen() parameter accordingly. kwargs['stdin'] = PIPE - if not timeout: + start = time.time() + proc = Popen(args, **kwargs) + need_buffering = (timeout or + callable(kwargs.get('stdout')) or callable(kwargs.get('stderr'))) + + if not need_buffering: # Normal workflow. - proc = Popen(args, **kwargs) - if stdin is not None: + if stdin not in (None, VOID): return proc.communicate(stdin), proc.returncode else: return proc.communicate(), proc.returncode - # Create a temporary file to workaround python's deadlock. + stdout = None + stderr = None + # Convert to a lambda to workaround python's deadlock. # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait - # When the pipe fills up, it will deadlock this process. Using a real file - # works around that issue. - with tempfile.TemporaryFile() as buff: - start = time.time() - kwargs['stdout'] = buff - proc = Popen(args, **kwargs) - if stdin is not None: - proc.stdin.write(stdin) - while proc.returncode is None: - proc.poll() - if timeout and (time.time() - start) > timeout: - proc.kill() - proc.wait() - # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT. - proc.returncode = TIMED_OUT - time.sleep(0.001) - # Now that the process died, reset the cursor and read the file. - buff.seek(0) - out = [buff.read(), None] - return out, proc.returncode + # When the pipe fills up, it will deadlock this process. Using a thread + # works around that issue. No need for thread safe function since the call + # backs are guaranteed to be called from the main thread. + if kwargs.get('stdout') == PIPE: + stdout = [] + kwargs['stdout'] = stdout.append + if kwargs.get('stderr') == PIPE: + stderr = [] + kwargs['stderr'] = stderr.append + returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs) + if not stdout is None: + stdout = ''.join(stdout) + if not stderr is None: + stderr = ''.join(stderr) + return (stdout, stderr), returncode def call(args, **kwargs): diff --git a/tests/subprocess2_test.py b/tests/subprocess2_test.py index 9d30452c3..d6c3fb700 100755 --- a/tests/subprocess2_test.py +++ b/tests/subprocess2_test.py @@ -5,6 +5,7 @@ """Unit tests for subprocess2.py.""" +import logging import optparse import os import sys @@ -19,6 +20,14 @@ import subprocess2 # Method could be a function # pylint: disable=R0201 + +def convert(string): + """Converts string to CRLF on Windows.""" + if sys.platform == 'win32': + return string.replace('\n', '\r\n') + return string + + class Subprocess2Test(unittest.TestCase): # Can be mocked in a test. TO_SAVE = { @@ -147,13 +156,24 @@ class Subprocess2Test(unittest.TestCase): self.assertEquals(expected, results) def test_timeout(self): - # It'd be better to not discard stdout. out, returncode = subprocess2.communicate( self.exe + ['--sleep', '--stdout'], timeout=0.01, - stdout=subprocess2.PIPE) + stdout=subprocess2.PIPE, + shell=False) self.assertEquals(subprocess2.TIMED_OUT, returncode) - self.assertEquals(['', None], out) + self.assertEquals(('', None), out) + + def test_timeout_shell_throws(self): + try: + subprocess2.communicate( + self.exe + ['--sleep', '--stdout'], + timeout=0.01, + stdout=subprocess2.PIPE, + shell=True) + self.fail() + except TypeError: + pass def test_check_output_no_stdout(self): try: @@ -168,10 +188,7 @@ class Subprocess2Test(unittest.TestCase): stdout=subprocess2.VOID, stderr=subprocess2.PIPE) self.assertEquals(None, out) - expected = 'a\nbb\nccc\n' - if sys.platform == 'win32': - expected = expected.replace('\n', '\r\n') - self.assertEquals(expected, err) + self.assertEquals(convert('a\nbb\nccc\n'), err) self.assertEquals(0, code) def test_stderr_void(self): @@ -235,6 +252,66 @@ class Subprocess2Test(unittest.TestCase): self.assertEquals(None, e.stderr) self.assertEquals(64, e.returncode) + def test_check_output_tee_stderr(self): + stderr = [] + out, returncode = subprocess2.communicate( + self.exe + ['--stderr'], stderr=stderr.append) + self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) + self.assertEquals((None, None), out) + self.assertEquals(0, returncode) + + def test_check_output_tee_stdout_stderr(self): + stdout = [] + stderr = [] + out, returncode = subprocess2.communicate( + self.exe + ['--stdout', '--stderr'], + stdout=stdout.append, + stderr=stderr.append) + self.assertEquals(convert('A\nBB\nCCC\n'), ''.join(stdout)) + self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) + self.assertEquals((None, None), out) + self.assertEquals(0, returncode) + + def test_check_output_tee_stdin(self): + stdout = [] + stdin = '0123456789' + out, returncode = subprocess2.communicate( + self.exe + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append) + self.assertEquals(convert('A\nBB\nCCC\n'), ''.join(stdout)) + self.assertEquals((None, None), out) + self.assertEquals(0, returncode) + + def test_check_output_tee_throw(self): + stderr = [] + try: + subprocess2.check_output( + self.exe + ['--stderr', '--fail'], stderr=stderr.append) + self.fail() + except subprocess2.CalledProcessError, e: + self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) + self.assertEquals('', e.stdout) + self.assertEquals(None, e.stderr) + self.assertEquals(64, e.returncode) + + def test_check_output_tee_large(self): + stdout = [] + # Read 128kb. On my workstation it takes >2s. Welcome to 2011. + out, returncode = subprocess2.communicate( + self.exe + ['--large'], stdout=stdout.append) + self.assertEquals(128*1024, len(''.join(stdout))) + self.assertEquals((None, None), out) + self.assertEquals(0, returncode) + + def test_check_output_tee_large_stdin(self): + stdout = [] + # Write 128kb. + stdin = '0123456789abcdef' * (8*1024) + out, returncode = subprocess2.communicate( + self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append) + self.assertEquals(128*1024, len(''.join(stdout))) + self.assertEquals((None, None), out) + self.assertEquals(0, returncode) + def child_main(args): parser = optparse.OptionParser() @@ -247,9 +324,13 @@ def child_main(args): parser.add_option('--stdout', action='store_true') parser.add_option('--stderr', action='store_true') parser.add_option('--sleep', action='store_true') + parser.add_option('--large', action='store_true') + parser.add_option('--read', action='store_true') options, args = parser.parse_args(args) if args: parser.error('Internal error') + if options.sleep: + time.sleep(10) def do(string): if options.stdout: @@ -260,12 +341,23 @@ def child_main(args): do('A') do('BB') do('CCC') - if options.sleep: - time.sleep(10) + if options.large: + # Print 128kb. + string = '0123456789abcdef' * (8*1024) + sys.stdout.write(string) + if options.read: + try: + while sys.stdin.read(): + pass + except OSError: + pass return options.return_value if __name__ == '__main__': + logging.basicConfig(level= + [logging.WARNING, logging.INFO, logging.DEBUG][ + min(2, sys.argv.count('-v'))]) if len(sys.argv) > 1 and sys.argv[1] == '--child': sys.exit(child_main(sys.argv[2:])) unittest.main()