From 94c712fa1b22489ddf80cd686a00eb2a85920890 Mon Sep 17 00:00:00 2001 From: "maruel@chromium.org" Date: Thu, 1 Dec 2011 15:04:57 +0000 Subject: [PATCH] Reimplement r109239 but using Popen.communicate() instead. Enables threaded callback handler for subprocess.communicate(). R=dpranke@chromium.org BUG= TEST= Review URL: http://codereview.chromium.org/8749015 git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@112465 0039d316-1c4b-4281-b951-d872f2087c98 --- subprocess2.py | 197 +++++++++++++++++++++++++++++++------- tests/subprocess2_test.py | 160 ++++++++++++++++++++++++++++--- 2 files changed, 308 insertions(+), 49 deletions(-) diff --git a/subprocess2.py b/subprocess2.py index c2f393a99..3ad804791 100644 --- a/subprocess2.py +++ b/subprocess2.py @@ -8,12 +8,13 @@ In theory you shouldn't need anything else in subprocess, or this module failed. """ from __future__ import with_statement +import cStringIO import errno import logging import os +import Queue import subprocess import sys -import tempfile import time import threading @@ -170,6 +171,10 @@ class Popen(subprocess.Popen): tmp_str += '; cwd=%s' % kwargs['cwd'] logging.debug(tmp_str) + self.stdout_cb = None + self.stderr_cb = None + self.stdout_void = False + self.stderr_void = False def fix(stream): if kwargs.get(stream) in (VOID, os.devnull): # Replaces VOID with handle to /dev/null. @@ -178,11 +183,17 @@ class Popen(subprocess.Popen): # When the pipe fills up, it will deadlock this process. Using a real # file works around that issue. kwargs[stream] = open(os.devnull, 'w') + setattr(self, stream + '_void', True) + if callable(kwargs.get(stream)): + # Callable stdout/stderr should be used only with call() wrappers. + setattr(self, stream + '_cb', kwargs[stream]) + kwargs[stream] = PIPE fix('stdout') fix('stderr') self.start = time.time() + self.timeout = None self.shell = kwargs.get('shell', None) # Silence pylint on MacOSX self.returncode = None @@ -205,6 +216,152 @@ class Popen(subprocess.Popen): # through raise + def _tee_threads(self, input): # pylint: disable=W0622 + """Does I/O for a process's pipes using threads. + + It's the simplest and slowest implementation. Expect very slow behavior. + + If there is a callback and it doesn't keep up with the calls, the timeout + effectiveness will be delayed accordingly. + """ + # Queue of either of when done or (, data). In + # theory we would like to limit to ~64kb items to not cause large memory + # usage when the callback blocks. It is not done because it slows down + # processing on OSX10.6 by a factor of 2x, making it even slower than + # Windows! Revisit this decision if it becomes a problem, e.g. crash + # because of memory exhaustion. + queue = Queue.Queue() + done = threading.Event() + + def write_stdin(): + try: + stdin_io = cStringIO.StringIO(input) + while True: + data = stdin_io.read(1024) + if data: + self.stdin.write(data) + else: + self.stdin.close() + break + finally: + queue.put('stdin') + + def _queue_pipe_read(pipe, name): + """Queues characters read from a pipe into a queue.""" + try: + while True: + data = pipe.read(1) + if not data: + break + queue.put((name, data)) + finally: + queue.put(name) + + def timeout_fn(): + try: + done.wait(self.timeout) + finally: + queue.put('timeout') + + def wait_fn(): + try: + self.wait() + finally: + queue.put('wait') + + # Starts up to 5 threads: + # Wait for the process to quit + # Read stdout + # Read stderr + # Write stdin + # Timeout + threads = { + 'wait': threading.Thread(target=wait_fn), + } + if self.timeout is not None: + threads['timeout'] = threading.Thread(target=timeout_fn) + if self.stdout_cb: + threads['stdout'] = threading.Thread( + target=_queue_pipe_read, args=(self.stdout, 'stdout')) + if self.stderr_cb: + threads['stderr'] = threading.Thread( + target=_queue_pipe_read, args=(self.stderr, 'stderr')) + if input: + threads['stdin'] = threading.Thread(target=write_stdin) + for t in threads.itervalues(): + t.start() + + timed_out = False + try: + # This thread needs to be optimized for speed. + while threads: + item = queue.get() + if item[0] is 'stdout': + self.stdout_cb(item[1]) + elif item[0] is 'stderr': + self.stderr_cb(item[1]) + else: + # A thread terminated. + threads[item].join() + del threads[item] + if item == 'wait': + # Terminate the timeout thread if necessary. + done.set() + elif item == 'timeout' and not timed_out and self.poll() is None: + logging.debug('Timed out after %fs: killing' % self.timeout) + self.kill() + timed_out = True + finally: + # Stop the threads. + done.set() + if 'wait' in threads: + # Accelerate things, otherwise it would hang until the child process is + # done. + logging.debug('Killing child because of an exception') + self.kill() + # Join threads. + for thread in threads.itervalues(): + thread.join() + if timed_out: + self.returncode = TIMED_OUT + + def communicate(self, input=None, timeout=None): # pylint: disable=W0221,W0622 + """Adds timeout and callbacks support. + + Returns (stdout, stderr) like subprocess.Popen().communicate(). + + - The process will be killed after |timeout| seconds and returncode set to + TIMED_OUT. + """ + self.timeout = timeout + if not self.timeout and not self.stdout_cb and not self.stderr_cb: + return super(Popen, self).communicate(input) + + if self.timeout and self.shell: + raise TypeError( + 'Using timeout and shell simultaneously will cause a process leak ' + 'since the shell will be killed instead of the child process.') + + stdout = None + stderr = None + # Convert to a lambda to workaround python's deadlock. + # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait + # When the pipe fills up, it will deadlock this process. Using a thread + # works around that issue. No need for thread safe function since the call + # backs are guaranteed to be called from the main thread. + if self.stdout and not self.stdout_cb and not self.stdout_void: + stdout = cStringIO.StringIO() + self.stdout_cb = stdout.write + if self.stderr and not self.stderr_cb and not self.stderr_void: + stderr = cStringIO.StringIO() + self.stderr_cb = stderr.write + self._tee_threads(input) + if stdout: + stdout = stdout.getvalue() + if stderr: + stderr = stderr.getvalue() + return (stdout, stderr) + def communicate(args, timeout=None, **kwargs): """Wraps subprocess.Popen().communicate() and add timeout support. @@ -226,39 +383,11 @@ def communicate(args, timeout=None, **kwargs): # set the Popen() parameter accordingly. kwargs['stdin'] = PIPE - if not timeout: - # Normal workflow. - proc = Popen(args, **kwargs) - if stdin is not None: - return proc.communicate(stdin), proc.returncode - else: - return proc.communicate(), proc.returncode - - # Create a temporary file to workaround python's deadlock. - # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait - # When the pipe fills up, it will deadlock this process. Using a real file - # works around that issue. - with tempfile.TemporaryFile() as buff: - kwargs['stdout'] = buff - proc = Popen(args, **kwargs) - if proc.shell: - raise TypeError( - 'Using timeout and shell simultaneously will cause a process leak ' - 'since the shell will be killed instead of the child process.') - if stdin is not None: - proc.stdin.write(stdin) - while proc.returncode is None: - proc.poll() - if timeout and (time.time() - proc.start) > timeout: - proc.kill() - proc.wait() - # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT. - proc.returncode = TIMED_OUT - time.sleep(0.001) - # Now that the process died, reset the cursor and read the file. - buff.seek(0) - out = (buff.read(), None) - return out, proc.returncode + proc = Popen(args, **kwargs) + if stdin not in (None, VOID): + return proc.communicate(stdin, timeout), proc.returncode + else: + return proc.communicate(None, timeout), proc.returncode def call(args, **kwargs): diff --git a/tests/subprocess2_test.py b/tests/subprocess2_test.py index 55936e558..1a413e587 100755 --- a/tests/subprocess2_test.py +++ b/tests/subprocess2_test.py @@ -77,7 +77,7 @@ class DefaultsTest(auto_stub.TestCase): results.update(kwargs) results['args'] = args @staticmethod - def communicate(): + def communicate(input=None, timeout=None): # pylint: disable=W0622 return None, None self.mock(subprocess2, 'Popen', fake_Popen) return results @@ -180,6 +180,12 @@ class BaseTestCase(unittest.TestCase): self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL)) super(BaseTestCase, self).tearDown() + def _check_res(self, res, stdout, stderr, returncode): + (out, err), code = res + self.assertEquals(stdout, out) + self.assertEquals(stderr, err) + self.assertEquals(returncode, code) + class RegressionTest(BaseTestCase): # Regression tests to ensure that subprocess and subprocess2 have the same @@ -299,6 +305,27 @@ class RegressionTest(BaseTestCase): except subp.CalledProcessError, e: self._check_exception(subp, e, None, None, 64) + def test_redirect_stderr_to_stdout_pipe(self): + def fn(c, e, un, subp): + # stderr output into stdout. + proc = subp.Popen( + e + ['--stderr'], + stdout=subp.PIPE, + stderr=subp.STDOUT, + universal_newlines=un) + res = proc.communicate(), proc.returncode + self._check_res(res, c('a\nbb\nccc\n'), None, 0) + self._run_test(fn) + + def test_redirect_stderr_to_stdout(self): + def fn(c, e, un, subp): + # stderr output into stdout but stdout is not piped. + proc = subp.Popen( + e + ['--stderr'], stderr=STDOUT, universal_newlines=un) + res = proc.communicate(), proc.returncode + self._check_res(res, None, None, 0) + self._run_test(fn) + class S2Test(BaseTestCase): # Tests that can only run in subprocess2, e.g. new functionalities. @@ -326,11 +353,11 @@ class S2Test(BaseTestCase): function(noop, self.exe + ['--cr'], True) function(noop, self.exe + ['--crlf'], True) - def _check_res(self, res, stdout, stderr, returncode): - (out, err), code = res - self.assertEquals(stdout, out) - self.assertEquals(stderr, err) - self.assertEquals(returncode, code) + def _check_exception(self, e, stdout, stderr, returncode): + """On exception, look if the exception members are set correctly.""" + self.assertEquals(returncode, e.returncode) + self.assertEquals(stdout, e.stdout) + self.assertEquals(stderr, e.stderr) def test_timeout(self): # timeout doesn't exist in subprocess. @@ -383,25 +410,128 @@ class S2Test(BaseTestCase): self._check_res(res, None, None, 0) self._run_test(fn) - def test_check_output_redirect_stderr_to_stdout_pipe(self): + def test_tee_stderr(self): def fn(c, e, un): - # stderr output into stdout. + stderr = [] res = subprocess2.communicate( - e + ['--stderr'], - stdout=PIPE, - stderr=STDOUT, + e + ['--stderr'], stderr=stderr.append, universal_newlines=un) + self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) + self._check_res(res, None, None, 0) + self._run_test(fn) + + def test_tee_stdout_stderr(self): + def fn(c, e, un): + stdout = [] + stderr = [] + res = subprocess2.communicate( + e + ['--stdout', '--stderr'], + stdout=stdout.append, + stderr=stderr.append, universal_newlines=un) - self._check_res(res, c('a\nbb\nccc\n'), None, 0) + self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) + self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) + self._check_res(res, None, None, 0) self._run_test(fn) - def test_check_output_redirect_stderr_to_stdout(self): + def test_tee_stdin(self): def fn(c, e, un): - # stderr output into stdout but stdout is not piped. + stdout = [] + stdin = '0123456789' res = subprocess2.communicate( - e + ['--stderr'], stderr=STDOUT, universal_newlines=un) + e + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append, + universal_newlines=un) + self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) self._check_res(res, None, None, 0) self._run_test(fn) + def test_tee_throw(self): + def fn(c, e, un): + stderr = [] + try: + subprocess2.check_output( + e + ['--stderr', '--fail'], stderr=stderr.append, + universal_newlines=un) + self.fail() + except subprocess2.CalledProcessError, e: + self._check_exception(e, '', None, 64) + self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) + self._run_test(fn) + + def test_tee_timeout_stdout_void(self): + def fn(c, e, un): + stderr = [] + res = subprocess2.communicate( + e + ['--stdout', '--stderr', '--fail'], + stdout=VOID, + stderr=stderr.append, + shell=False, + timeout=10, + universal_newlines=un) + self._check_res(res, None, None, 64) + self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) + self._run_test(fn) + + def test_tee_timeout_stderr_void(self): + def fn(c, e, un): + stdout = [] + res = subprocess2.communicate( + e + ['--stdout', '--stderr', '--fail'], + stdout=stdout.append, + stderr=VOID, + shell=False, + timeout=10, + universal_newlines=un) + self._check_res(res, None, None, 64) + self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) + self._run_test(fn) + + def test_tee_timeout_stderr_stdout(self): + def fn(c, e, un): + stdout = [] + res = subprocess2.communicate( + e + ['--stdout', '--stderr', '--fail'], + stdout=stdout.append, + stderr=STDOUT, + shell=False, + timeout=10, + universal_newlines=un) + self._check_res(res, None, None, 64) + # Ordering is random due to buffering. + self.assertEquals( + set(c('a\nbb\nccc\nA\nBB\nCCC\n').splitlines(True)), + set(''.join(stdout).splitlines(True))) + self._run_test(fn) + + def test_tee_large(self): + stdout = [] + # Read 128kb. On my workstation it takes >2s. Welcome to 2011. + res = subprocess2.communicate(self.exe + ['--large'], stdout=stdout.append) + self.assertEquals(128*1024, len(''.join(stdout))) + self._check_res(res, None, None, 0) + + def test_tee_large_stdin(self): + stdout = [] + # Write 128kb. + stdin = '0123456789abcdef' * (8*1024) + res = subprocess2.communicate( + self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append) + self.assertEquals(128*1024, len(''.join(stdout))) + self._check_res(res, None, None, 0) + + def test_tee_cb_throw(self): + # Having a callback throwing up should not cause side-effects. It's a bit + # hard to measure. + class Blow(Exception): + pass + def blow(_): + raise Blow() + proc = subprocess2.Popen(self.exe + ['--stdout'], stdout=blow) + try: + proc.communicate() + self.fail() + except Blow: + self.assertNotEquals(0, proc.returncode) + def child_main(args): if sys.platform == 'win32':