From 65be6f6bcfa73bc016d816366e4b7431ad7c6a5a Mon Sep 17 00:00:00 2001 From: "maruel@chromium.org" Date: Wed, 9 Nov 2011 13:57:57 +0000 Subject: [PATCH] Add callback support for stdout and stderr. It's currently an inefficient thread implementation. Interestingly enough, callback support is significantly faster on cygwin than on native python. Writing an efficient implementation is punted for a later change, one per implementation. Stops using a temporary file since it's not necessary anymore. The goal is to reduce the number of places where a similar paradigm is used by having a canonical generic implementation. R=dpranke@chromium.org BUG= TEST=Tested manually on Windows, cygwin, linux, OSX 10.6 Review URL: http://codereview.chromium.org/8374026 git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@109239 0039d316-1c4b-4281-b951-d872f2087c98 --- subprocess2.py | 194 +++++++++++++++++++++++++++++++++----- tests/subprocess2_test.py | 110 +++++++++++++++++++-- 2 files changed, 270 insertions(+), 34 deletions(-) diff --git a/subprocess2.py b/subprocess2.py index 4708aadf3..810631ae2 100644 --- a/subprocess2.py +++ b/subprocess2.py @@ -8,12 +8,13 @@ In theory you shouldn't need anything else in subprocess, or this module failed. """ from __future__ import with_statement +import cStringIO import errno import logging import os +import Queue import subprocess import sys -import tempfile import time import threading @@ -176,6 +177,9 @@ def Popen(args, **kwargs): # When the pipe fills up, it will deadlock this process. Using a real file # works around that issue. kwargs[stream] = open(os.devnull, 'w') + if callable(kwargs.get(stream)): + # Callable stdout/stderr should be used only with call() wrappers. + kwargs[stream] = PIPE fix('stdout') fix('stderr') @@ -198,6 +202,140 @@ def Popen(args, **kwargs): raise +def _queue_pipe_read(pipe, name, done, dest): + """Queue characters read from a pipe into a queue. + + Left outside the _tee_threads function to not introduce a function closure + to speed up variable lookup. + """ + while not done.isSet(): + data = pipe.read(1) + if not data: + break + dest.put((name, data)) + dest.put(name) + + +def _tee_threads(proc, timeout, start, stdin, args, kwargs): + """Does I/O for a process's pipes using thread. + + It's the simplest and slowest implementation. Expect very slow behavior. + + If there is a callback and it doesn't keep up with the calls, the timeout + effectiveness will be delayed accordingly. + """ + # TODO(maruel): Implement a select based implementation on POSIX and a Windows + # one using WaitForMultipleObjects(). + # + # Queue of either of when done or (, data). + # In theory we would like to limit to ~64kb items to not cause large memory + # usage when the callback blocks. It is not done because it slows down + # processing on OSX10.6 by a factor of 2x, making it even slower than Windows! + # Revisit this decision if it becomes a problem, e.g. crash because of + # memory exhaustion. + queue = Queue.Queue() + done = threading.Event() + + def write_stdin(): + stdin_io = cStringIO.StringIO(stdin) + while not done.isSet(): + data = stdin_io.read(1024) + if data: + proc.stdin.write(data) + else: + proc.stdin.close() + break + queue.put('stdin') + + def timeout_fn(): + done.wait(timeout) + # No need to close the pipes since killing should be sufficient. + queue.put('timeout') + + # Starts up to 4 threads: + # Read stdout + # Read stderr + # Write stdin + # Timeout + threads = {} + if timeout is not None: + threads['timeout'] = threading.Thread(target=timeout_fn) + if callable(kwargs.get('stdout')): + threads['stdout'] = threading.Thread( + target=_queue_pipe_read, args=(proc.stdout, 'stdout', done, queue)) + if callable(kwargs.get('stderr')): + threads['stderr'] = threading.Thread( + target=_queue_pipe_read, + args=(proc.stderr, 'stderr', done, queue)) + if isinstance(stdin, str): + threads['stdin'] = threading.Thread(target=write_stdin) + for t in threads.itervalues(): + t.daemon = True + t.start() + + timed_out = False + try: + while proc.returncode is None: + assert threads + proc.poll() + item = queue.get() + if isinstance(item, str): + threads[item].join() + del threads[item] + if item == 'timeout' and not timed_out and proc.poll() is None: + logging.debug('Timed out: killing') + proc.kill() + timed_out = True + if not threads: + # We won't be waken up anymore. Need to busy loop. + break + else: + kwargs[item[0]](item[1]) + finally: + # Stop the threads. + done.set() + # Join threads + for thread in threads.itervalues(): + thread.join() + + # Flush the queue. + try: + while True: + item = queue.get(False) + if isinstance(item, str): + if item == 'timeout': + # TODO(maruel): Does it make sense at that point? + if not timed_out and proc.poll() is None: + logging.debug('Timed out: killing') + proc.kill() + timed_out = True + else: + kwargs[item[0]](item[1]) + except Queue.Empty: + pass + + # Get the remainder. + if callable(kwargs.get('stdout')): + data = proc.stdout.read() + while data: + kwargs['stdout'](data) + data = proc.stdout.read() + if callable(kwargs.get('stderr')): + data = proc.stderr.read() + while data: + kwargs['stderr'](data) + data = proc.stderr.read() + + if proc.returncode is None: + # Usually happens when killed with timeout but not listening to pipes. + proc.wait() + + if timed_out: + return TIMED_OUT + + return proc.returncode + + def communicate(args, timeout=None, **kwargs): """Wraps subprocess.Popen().communicate(). @@ -207,6 +345,11 @@ def communicate(args, timeout=None, **kwargs): TIMED_OUT. - Automatically passes stdin content as input so do not specify stdin=PIPE. """ + if timeout and kwargs.get('shell'): + raise TypeError( + 'Using timeout and shell simultaneously will cause a process leak ' + 'since the shell will be killed instead of the child process.') + stdin = kwargs.pop('stdin', None) if stdin is not None: if stdin is VOID: @@ -218,36 +361,37 @@ def communicate(args, timeout=None, **kwargs): # set the Popen() parameter accordingly. kwargs['stdin'] = PIPE - if not timeout: + start = time.time() + proc = Popen(args, **kwargs) + need_buffering = (timeout or + callable(kwargs.get('stdout')) or callable(kwargs.get('stderr'))) + + if not need_buffering: # Normal workflow. - proc = Popen(args, **kwargs) - if stdin is not None: + if stdin not in (None, VOID): return proc.communicate(stdin), proc.returncode else: return proc.communicate(), proc.returncode - # Create a temporary file to workaround python's deadlock. + stdout = None + stderr = None + # Convert to a lambda to workaround python's deadlock. # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait - # When the pipe fills up, it will deadlock this process. Using a real file - # works around that issue. - with tempfile.TemporaryFile() as buff: - start = time.time() - kwargs['stdout'] = buff - proc = Popen(args, **kwargs) - if stdin is not None: - proc.stdin.write(stdin) - while proc.returncode is None: - proc.poll() - if timeout and (time.time() - start) > timeout: - proc.kill() - proc.wait() - # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT. - proc.returncode = TIMED_OUT - time.sleep(0.001) - # Now that the process died, reset the cursor and read the file. - buff.seek(0) - out = [buff.read(), None] - return out, proc.returncode + # When the pipe fills up, it will deadlock this process. Using a thread + # works around that issue. No need for thread safe function since the call + # backs are guaranteed to be called from the main thread. + if kwargs.get('stdout') == PIPE: + stdout = [] + kwargs['stdout'] = stdout.append + if kwargs.get('stderr') == PIPE: + stderr = [] + kwargs['stderr'] = stderr.append + returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs) + if not stdout is None: + stdout = ''.join(stdout) + if not stderr is None: + stderr = ''.join(stderr) + return (stdout, stderr), returncode def call(args, **kwargs): diff --git a/tests/subprocess2_test.py b/tests/subprocess2_test.py index 9d30452c3..d6c3fb700 100755 --- a/tests/subprocess2_test.py +++ b/tests/subprocess2_test.py @@ -5,6 +5,7 @@ """Unit tests for subprocess2.py.""" +import logging import optparse import os import sys @@ -19,6 +20,14 @@ import subprocess2 # Method could be a function # pylint: disable=R0201 + +def convert(string): + """Converts string to CRLF on Windows.""" + if sys.platform == 'win32': + return string.replace('\n', '\r\n') + return string + + class Subprocess2Test(unittest.TestCase): # Can be mocked in a test. TO_SAVE = { @@ -147,13 +156,24 @@ class Subprocess2Test(unittest.TestCase): self.assertEquals(expected, results) def test_timeout(self): - # It'd be better to not discard stdout. out, returncode = subprocess2.communicate( self.exe + ['--sleep', '--stdout'], timeout=0.01, - stdout=subprocess2.PIPE) + stdout=subprocess2.PIPE, + shell=False) self.assertEquals(subprocess2.TIMED_OUT, returncode) - self.assertEquals(['', None], out) + self.assertEquals(('', None), out) + + def test_timeout_shell_throws(self): + try: + subprocess2.communicate( + self.exe + ['--sleep', '--stdout'], + timeout=0.01, + stdout=subprocess2.PIPE, + shell=True) + self.fail() + except TypeError: + pass def test_check_output_no_stdout(self): try: @@ -168,10 +188,7 @@ class Subprocess2Test(unittest.TestCase): stdout=subprocess2.VOID, stderr=subprocess2.PIPE) self.assertEquals(None, out) - expected = 'a\nbb\nccc\n' - if sys.platform == 'win32': - expected = expected.replace('\n', '\r\n') - self.assertEquals(expected, err) + self.assertEquals(convert('a\nbb\nccc\n'), err) self.assertEquals(0, code) def test_stderr_void(self): @@ -235,6 +252,66 @@ class Subprocess2Test(unittest.TestCase): self.assertEquals(None, e.stderr) self.assertEquals(64, e.returncode) + def test_check_output_tee_stderr(self): + stderr = [] + out, returncode = subprocess2.communicate( + self.exe + ['--stderr'], stderr=stderr.append) + self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) + self.assertEquals((None, None), out) + self.assertEquals(0, returncode) + + def test_check_output_tee_stdout_stderr(self): + stdout = [] + stderr = [] + out, returncode = subprocess2.communicate( + self.exe + ['--stdout', '--stderr'], + stdout=stdout.append, + stderr=stderr.append) + self.assertEquals(convert('A\nBB\nCCC\n'), ''.join(stdout)) + self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) + self.assertEquals((None, None), out) + self.assertEquals(0, returncode) + + def test_check_output_tee_stdin(self): + stdout = [] + stdin = '0123456789' + out, returncode = subprocess2.communicate( + self.exe + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append) + self.assertEquals(convert('A\nBB\nCCC\n'), ''.join(stdout)) + self.assertEquals((None, None), out) + self.assertEquals(0, returncode) + + def test_check_output_tee_throw(self): + stderr = [] + try: + subprocess2.check_output( + self.exe + ['--stderr', '--fail'], stderr=stderr.append) + self.fail() + except subprocess2.CalledProcessError, e: + self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) + self.assertEquals('', e.stdout) + self.assertEquals(None, e.stderr) + self.assertEquals(64, e.returncode) + + def test_check_output_tee_large(self): + stdout = [] + # Read 128kb. On my workstation it takes >2s. Welcome to 2011. + out, returncode = subprocess2.communicate( + self.exe + ['--large'], stdout=stdout.append) + self.assertEquals(128*1024, len(''.join(stdout))) + self.assertEquals((None, None), out) + self.assertEquals(0, returncode) + + def test_check_output_tee_large_stdin(self): + stdout = [] + # Write 128kb. + stdin = '0123456789abcdef' * (8*1024) + out, returncode = subprocess2.communicate( + self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append) + self.assertEquals(128*1024, len(''.join(stdout))) + self.assertEquals((None, None), out) + self.assertEquals(0, returncode) + def child_main(args): parser = optparse.OptionParser() @@ -247,9 +324,13 @@ def child_main(args): parser.add_option('--stdout', action='store_true') parser.add_option('--stderr', action='store_true') parser.add_option('--sleep', action='store_true') + parser.add_option('--large', action='store_true') + parser.add_option('--read', action='store_true') options, args = parser.parse_args(args) if args: parser.error('Internal error') + if options.sleep: + time.sleep(10) def do(string): if options.stdout: @@ -260,12 +341,23 @@ def child_main(args): do('A') do('BB') do('CCC') - if options.sleep: - time.sleep(10) + if options.large: + # Print 128kb. + string = '0123456789abcdef' * (8*1024) + sys.stdout.write(string) + if options.read: + try: + while sys.stdin.read(): + pass + except OSError: + pass return options.return_value if __name__ == '__main__': + logging.basicConfig(level= + [logging.WARNING, logging.INFO, logging.DEBUG][ + min(2, sys.argv.count('-v'))]) if len(sys.argv) > 1 and sys.argv[1] == '--child': sys.exit(child_main(sys.argv[2:])) unittest.main()