diff --git a/download_from_google_storage.py b/download_from_google_storage.py index 854caa614..3dcc86243 100755 --- a/download_from_google_storage.py +++ b/download_from_google_storage.py @@ -78,11 +78,10 @@ class Gsutil(object): RETRY_DELAY_MULTIPLE = 1.3 VPYTHON = 'vpython.bat' if GetNormalizedPlatform() == 'win32' else 'vpython' - def __init__(self, path, boto_path=None, timeout=None, version='4.28'): + def __init__(self, path, boto_path=None, version='4.28'): if not os.path.exists(path): raise FileNotFoundError('GSUtil not found in %s' % path) self.path = path - self.timeout = timeout self.boto_path = boto_path self.version = version @@ -103,7 +102,7 @@ class Gsutil(object): def call(self, *args): cmd = [self.VPYTHON, self.path, '--force-version', self.version] cmd.extend(args) - return subprocess2.call(cmd, env=self.get_sub_env(), timeout=self.timeout) + return subprocess2.call(cmd, env=self.get_sub_env()) def check_call(self, *args): cmd = [self.VPYTHON, self.path, '--force-version', self.version] @@ -112,8 +111,7 @@ class Gsutil(object): cmd, stdout=subprocess2.PIPE, stderr=subprocess2.PIPE, - env=self.get_sub_env(), - timeout=self.timeout) + env=self.get_sub_env()) # Parse output. status_code_match = re.search(b'status=([0-9]+)', err) diff --git a/subprocess2.py b/subprocess2.py index 34fdee828..dea1a2d6c 100644 --- a/subprocess2.py +++ b/subprocess2.py @@ -12,37 +12,27 @@ import errno import io import logging import os - -try: - import Queue -except ImportError: # For Py3 compatibility - import queue as Queue - import subprocess import sys -import time import threading # Cache the string-escape codec to ensure subprocess can find it later. # See crbug.com/912292#c2 for context. if sys.version_info.major == 2: + import Queue codecs.lookup('string-escape') - -# TODO(crbug.com/953884): Remove this when python3 migration is done. -try: - basestring -except NameError: +else: + import queue as Queue # pylint: disable=redefined-builtin - basestring = str + basestring = (str, bytes) # Constants forwarded from subprocess. PIPE = subprocess.PIPE STDOUT = subprocess.STDOUT # Sends stdout or stderr to os.devnull. -VOID = object() -# Error code when a process was killed because it timed out. -TIMED_OUT = -2001 +VOID = open(os.devnull, 'w') +VOID_INPUT = open(os.devnull, 'r') class CalledProcessError(subprocess.CalledProcessError): @@ -106,42 +96,6 @@ def get_english_env(env): return env -class NagTimer(object): - """ - Triggers a callback when a time interval passes without an event being fired. - - For example, the event could be receiving terminal output from a subprocess; - and the callback could print a warning to stderr that the subprocess appeared - to be hung. - """ - def __init__(self, interval, cb): - self.interval = interval - self.cb = cb - self.timer = threading.Timer(self.interval, self.fn) - self.last_output = self.previous_last_output = 0 - - def start(self): - self.last_output = self.previous_last_output = time.time() - self.timer.start() - - def event(self): - self.last_output = time.time() - - def fn(self): - now = time.time() - if self.last_output == self.previous_last_output: - self.cb(now - self.previous_last_output) - # Use 0.1 fudge factor, just in case - # (self.last_output - now) is very close to zero. - sleep_time = (self.last_output - now - 0.1) % self.interval - self.previous_last_output = self.last_output - self.timer = threading.Timer(sleep_time + 0.1, self.fn) - self.timer.start() - - def cancel(self): - self.timer.cancel() - - class Popen(subprocess.Popen): """Wraps subprocess.Popen() with various workarounds. @@ -184,33 +138,6 @@ class Popen(subprocess.Popen): tmp_str += '; cwd=%s' % kwargs['cwd'] logging.debug(tmp_str) - self.stdout_cb = None - self.stderr_cb = None - self.stdin_is_void = False - self.stdout_is_void = False - self.stderr_is_void = False - self.cmd_str = tmp_str - - if kwargs.get('stdin') is VOID: - kwargs['stdin'] = open(os.devnull, 'r') - self.stdin_is_void = True - - for stream in ('stdout', 'stderr'): - if kwargs.get(stream) in (VOID, os.devnull): - kwargs[stream] = open(os.devnull, 'w') - setattr(self, stream + '_is_void', True) - if callable(kwargs.get(stream)): - setattr(self, stream + '_cb', kwargs[stream]) - kwargs[stream] = PIPE - - self.start = time.time() - self.timeout = None - self.nag_timer = None - self.nag_max = None - self.shell = kwargs.get('shell', None) - # Silence pylint on MacOSX - self.returncode = None - try: with self.popen_lock: super(Popen, self).__init__(args, **kwargs) @@ -231,205 +158,25 @@ class Popen(subprocess.Popen): 'Check that %s or %s exist and have execution permission.' % (str(e), kwargs.get('cwd'), args[0])) - def _tee_threads(self, input): # pylint: disable=redefined-builtin - """Does I/O for a process's pipes using threads. - - It's the simplest and slowest implementation. Expect very slow behavior. - - If there is a callback and it doesn't keep up with the calls, the timeout - effectiveness will be delayed accordingly. - """ - # Queue of either of when done or (, data). In - # theory we would like to limit to ~64kb items to not cause large memory - # usage when the callback blocks. It is not done because it slows down - # processing on OSX10.6 by a factor of 2x, making it even slower than - # Windows! Revisit this decision if it becomes a problem, e.g. crash - # because of memory exhaustion. - queue = Queue.Queue() - done = threading.Event() - nag = None - - def write_stdin(): - try: - stdin_io = io.BytesIO(input) - while True: - data = stdin_io.read(1024) - if data: - self.stdin.write(data) - else: - self.stdin.close() - break - finally: - queue.put('stdin') - - def _queue_pipe_read(pipe, name): - """Queues characters read from a pipe into a queue.""" - try: - while True: - data = pipe.read(1) - if not data: - break - if nag: - nag.event() - queue.put((name, data)) - finally: - queue.put(name) - - def timeout_fn(): - try: - done.wait(self.timeout) - finally: - queue.put('timeout') - - def wait_fn(): - try: - self.wait() - finally: - queue.put('wait') - - # Starts up to 5 threads: - # Wait for the process to quit - # Read stdout - # Read stderr - # Write stdin - # Timeout - threads = { - 'wait': threading.Thread(target=wait_fn), - } - if self.timeout is not None: - threads['timeout'] = threading.Thread(target=timeout_fn) - if self.stdout_cb: - threads['stdout'] = threading.Thread( - target=_queue_pipe_read, args=(self.stdout, 'stdout')) - if self.stderr_cb: - threads['stderr'] = threading.Thread( - target=_queue_pipe_read, args=(self.stderr, 'stderr')) - if input: - threads['stdin'] = threading.Thread(target=write_stdin) - elif self.stdin: - # Pipe but no input, make sure it's closed. - self.stdin.close() - for t in threads.itervalues(): - t.start() - - if self.nag_timer: - def _nag_cb(elapsed): - logging.warn(' No output for %.0f seconds from command:' % elapsed) - logging.warn(' %s' % self.cmd_str) - if (self.nag_max and - int('%.0f' % (elapsed / self.nag_timer)) >= self.nag_max): - queue.put('timeout') - done.set() # Must do this so that timeout thread stops waiting. - nag = NagTimer(self.nag_timer, _nag_cb) - nag.start() - - timed_out = False - try: - # This thread needs to be optimized for speed. - while threads: - item = queue.get() - if item[0] == 'stdout': - self.stdout_cb(item[1]) - elif item[0] == 'stderr': - self.stderr_cb(item[1]) - else: - # A thread terminated. - if item in threads: - threads[item].join() - del threads[item] - if item == 'wait': - # Terminate the timeout thread if necessary. - done.set() - elif item == 'timeout' and not timed_out and self.poll() is None: - logging.debug('Timed out after %.0fs: killing' % ( - time.time() - self.start)) - self.kill() - timed_out = True - finally: - # Stop the threads. - done.set() - if nag: - nag.cancel() - if 'wait' in threads: - # Accelerate things, otherwise it would hang until the child process is - # done. - logging.debug('Killing child because of an exception') - self.kill() - # Join threads. - for thread in threads.itervalues(): - thread.join() - if timed_out: - self.returncode = TIMED_OUT - - # pylint: disable=arguments-differ,W0622 - def communicate(self, input=None, timeout=None, nag_timer=None, - nag_max=None): - """Adds timeout and callbacks support. - - Returns (stdout, stderr) like subprocess.Popen().communicate(). - - - The process will be killed after |timeout| seconds and returncode set to - TIMED_OUT. - - If the subprocess runs for |nag_timer| seconds without producing terminal - output, print a warning to stderr. - """ - self.timeout = timeout - self.nag_timer = nag_timer - self.nag_max = nag_max - if (not self.timeout and not self.nag_timer and - not self.stdout_cb and not self.stderr_cb): - return super(Popen, self).communicate(input) - - if self.timeout and self.shell: - raise TypeError( - 'Using timeout and shell simultaneously will cause a process leak ' - 'since the shell will be killed instead of the child process.') - - stdout = None - stderr = None - # Convert to a lambda to workaround python's deadlock. - # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait - # When the pipe fills up, it would deadlock this process. - if self.stdout and not self.stdout_cb and not self.stdout_is_void: - stdout = [] - self.stdout_cb = stdout.append - if self.stderr and not self.stderr_cb and not self.stderr_is_void: - stderr = [] - self.stderr_cb = stderr.append - self._tee_threads(input) - if stdout is not None: - stdout = ''.join(stdout) - if stderr is not None: - stderr = ''.join(stderr) - return (stdout, stderr) - - -def communicate(args, timeout=None, nag_timer=None, nag_max=None, **kwargs): - """Wraps subprocess.Popen().communicate() and add timeout support. + +def communicate(args, **kwargs): + """Wraps subprocess.Popen().communicate(). Returns ((stdout, stderr), returncode). - - The process will be killed after |timeout| seconds and returncode set to - TIMED_OUT. - If the subprocess runs for |nag_timer| seconds without producing terminal output, print a warning to stderr. - Automatically passes stdin content as input so do not specify stdin=PIPE. """ - stdin = kwargs.pop('stdin', None) - if stdin is not None: - if isinstance(stdin, basestring): - # When stdin is passed as an argument, use it as the actual input data and - # set the Popen() parameter accordingly. - kwargs['stdin'] = PIPE - else: - kwargs['stdin'] = stdin - stdin = None + stdin = None + # When stdin is passed as an argument, use it as the actual input data and + # set the Popen() parameter accordingly. + if 'stdin' in kwargs and isinstance(kwargs['stdin'], basestring): + stdin = kwargs['stdin'] + kwargs['stdin'] = PIPE proc = Popen(args, **kwargs) - if stdin: - return proc.communicate(stdin, timeout, nag_timer), proc.returncode - else: - return proc.communicate(None, timeout, nag_timer), proc.returncode + return proc.communicate(stdin), proc.returncode def call(args, **kwargs): @@ -472,7 +219,7 @@ def capture(args, **kwargs): - Discards returncode. - Blocks stdin by default if not specified since no output will be visible. """ - kwargs.setdefault('stdin', VOID) + kwargs.setdefault('stdin', VOID_INPUT) # Like check_output, deny the caller from using stdout arg. return communicate(args, stdout=PIPE, **kwargs)[0][0] @@ -488,7 +235,7 @@ def check_output(args, **kwargs): - Blocks stdin by default if not specified since no output will be visible. - As per doc, "The stdout argument is not allowed as it is used internally." """ - kwargs.setdefault('stdin', VOID) + kwargs.setdefault('stdin', VOID_INPUT) if 'stdout' in kwargs: raise ValueError('stdout argument not allowed, it would be overridden.') return check_call_out(args, stdout=PIPE, **kwargs)[0] diff --git a/tests/subprocess2_test.py b/tests/subprocess2_test.py index 1e3187bb8..57280b8df 100755 --- a/tests/subprocess2_test.py +++ b/tests/subprocess2_test.py @@ -30,8 +30,8 @@ from testing_support import auto_stub # Create aliases for subprocess2 specific tests. They shouldn't be used for # regression tests. -TIMED_OUT = subprocess2.TIMED_OUT VOID = subprocess2.VOID +VOID_INPUT = subprocess2.VOID_INPUT PIPE = subprocess2.PIPE STDOUT = subprocess2.STDOUT @@ -78,7 +78,7 @@ class DefaultsTest(auto_stub.TestCase): results['args'] = args @staticmethod # pylint: disable=redefined-builtin - def communicate(input=None, timeout=None, nag_max=None, nag_timer=None): + def communicate(input=None): return None, None self.mock(subprocess2, 'Popen', fake_Popen) return results @@ -113,7 +113,7 @@ class DefaultsTest(auto_stub.TestCase): expected = { 'args': ['foo'], 'a':True, - 'stdin': subprocess2.VOID, + 'stdin': subprocess2.VOID_INPUT, 'stdout': subprocess2.PIPE, } self.assertEquals(expected, results) @@ -149,7 +149,6 @@ class DefaultsTest(auto_stub.TestCase): env['LANGUAGE'] = 'en_US.UTF-8' expected['env'] = env self.assertEquals(expected, results) - self.assertTrue(time.time() >= proc.start) def test_check_output_defaults(self): results = self._fake_communicate() @@ -159,7 +158,7 @@ class DefaultsTest(auto_stub.TestCase): expected = { 'args': ['foo'], 'a':True, - 'stdin': subprocess2.VOID, + 'stdin': subprocess2.VOID_INPUT, 'stdout': subprocess2.PIPE, } self.assertEquals(expected, results) @@ -334,7 +333,7 @@ class RegressionTest(BaseTestCase): p1 = subprocess.Popen(cmd, stderr=subprocess.PIPE, shell=False) p2 = subprocess2.Popen(cmd, stderr=subprocess.PIPE, shell=False) r1 = p1.communicate() - r2 = p2.communicate(timeout=100) + r2 = p2.communicate() self.assertEquals(r1, r2) @@ -370,27 +369,6 @@ class S2Test(BaseTestCase): self.assertEquals(stdout, e.stdout) self.assertEquals(stderr, e.stderr) - def test_timeout(self): - # timeout doesn't exist in subprocess. - def fn(c, e, un): - res = subprocess2.communicate( - self.exe + ['--sleep_first', '--stdout'], - timeout=0.01, - stdout=PIPE, - shell=False) - self._check_res(res, '', None, TIMED_OUT) - self._run_test(fn) - - def test_timeout_shell_throws(self): - def fn(c, e, un): - try: - # With shell=True, it needs a string. - subprocess2.communicate(' '.join(self.exe), timeout=0.01, shell=True) - self.fail() - except TypeError: - pass - self._run_test(fn) - def test_stdin(self): def fn(c, e, un): stdin = '0123456789' @@ -422,17 +400,16 @@ class S2Test(BaseTestCase): self._run_test(fn) def test_stdin_void(self): - res = subprocess2.communicate(self.exe + ['--read'], stdin=VOID) + res = subprocess2.communicate(self.exe + ['--read'], stdin=VOID_INPUT) self._check_res(res, None, None, 0) - def test_stdin_void_stdout_timeout(self): - # Make sure a mix of VOID, PIPE and timeout works. + def test_stdin_void_stdout(self): + # Make sure a mix of VOID and PIPE works. def fn(c, e, un): res = subprocess2.communicate( e + ['--stdout', '--read'], - stdin=VOID, + stdin=VOID_INPUT, stdout=PIPE, - timeout=10, universal_newlines=un, shell=False) self._check_res(res, c('A\nBB\nCCC\n'), None, 0) @@ -468,155 +445,6 @@ class S2Test(BaseTestCase): self._check_res(res, None, None, 0) self._run_test(fn) - def test_tee_stderr(self): - def fn(c, e, un): - stderr = [] - res = subprocess2.communicate( - e + ['--stderr'], stderr=stderr.append, universal_newlines=un) - self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) - self._check_res(res, None, None, 0) - self._run_test(fn) - - def test_tee_stdout_stderr(self): - def fn(c, e, un): - stdout = [] - stderr = [] - res = subprocess2.communicate( - e + ['--stdout', '--stderr'], - stdout=stdout.append, - stderr=stderr.append, - universal_newlines=un) - self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) - self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) - self._check_res(res, None, None, 0) - self._run_test(fn) - - def test_tee_stdin(self): - def fn(c, e, un): - # Mix of stdin input and stdout callback. - stdout = [] - stdin = '0123456789' - res = subprocess2.communicate( - e + ['--stdout', '--read'], - stdin=stdin, - stdout=stdout.append, - universal_newlines=un) - self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) - self._check_res(res, None, None, 10) - self._run_test(fn) - - def test_tee_throw(self): - def fn(c, e, un): - # Make sure failure still returns stderr completely. - stderr = [] - try: - subprocess2.check_output( - e + ['--stderr', '--fail'], - stderr=stderr.append, - universal_newlines=un) - self.fail() - except subprocess2.CalledProcessError as exception: - self._check_exception(exception, '', None, 64) - self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) - self._run_test(fn) - - def test_tee_timeout_stdout_void(self): - def fn(c, e, un): - stderr = [] - res = subprocess2.communicate( - e + ['--stdout', '--stderr', '--fail'], - stdout=VOID, - stderr=stderr.append, - shell=False, - timeout=10, - universal_newlines=un) - self._check_res(res, None, None, 64) - self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) - self._run_test(fn) - - def test_tee_timeout_stderr_void(self): - def fn(c, e, un): - stdout = [] - res = subprocess2.communicate( - e + ['--stdout', '--stderr', '--fail'], - stdout=stdout.append, - stderr=VOID, - shell=False, - timeout=10, - universal_newlines=un) - self._check_res(res, None, None, 64) - self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) - self._run_test(fn) - - def test_tee_timeout_stderr_stdout(self): - def fn(c, e, un): - stdout = [] - res = subprocess2.communicate( - e + ['--stdout', '--stderr', '--fail'], - stdout=stdout.append, - stderr=STDOUT, - shell=False, - timeout=10, - universal_newlines=un) - self._check_res(res, None, None, 64) - # Ordering is random due to buffering. - self.assertEquals( - set(c('a\nbb\nccc\nA\nBB\nCCC\n').splitlines(True)), - set(''.join(stdout).splitlines(True))) - self._run_test(fn) - - def test_tee_large(self): - stdout = [] - # Read 128kb. On my workstation it takes >2s. Welcome to 2011. - res = subprocess2.communicate(self.exe + ['--large'], stdout=stdout.append) - self.assertEquals(128*1024, len(''.join(stdout))) - self._check_res(res, None, None, 0) - - def test_tee_large_stdin(self): - stdout = [] - # Write 128kb. - stdin = '0123456789abcdef' * (8*1024) - res = subprocess2.communicate( - self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append) - self.assertEquals(128*1024, len(''.join(stdout))) - # Windows return code is > 8 bits. - returncode = len(stdin) if sys.platform == 'win32' else 0 - self._check_res(res, None, None, returncode) - - def test_tee_cb_throw(self): - # Having a callback throwing up should not cause side-effects. It's a bit - # hard to measure. - class Blow(Exception): - pass - def blow(_): - raise Blow() - proc = subprocess2.Popen(self.exe + ['--stdout'], stdout=blow) - try: - proc.communicate() - self.fail() - except Blow: - self.assertNotEquals(0, proc.returncode) - - def test_nag_timer(self): - w = [] - l = logging.getLogger() - class _Filter(logging.Filter): - def filter(self, record): - if record.levelno == logging.WARNING: - w.append(record.getMessage().lstrip()) - return 0 - f = _Filter() - l.addFilter(f) - proc = subprocess2.Popen( - self.exe + ['--stdout', '--sleep_first'], stdout=PIPE) - res = proc.communicate(nag_timer=3), proc.returncode - l.removeFilter(f) - self._check_res(res, 'A\nBB\nCCC\n', None, 0) - expected = ['No output for 3 seconds from command:', proc.cmd_str, - 'No output for 6 seconds from command:', proc.cmd_str, - 'No output for 9 seconds from command:', proc.cmd_str] - self.assertEquals(w, expected) - def child_main(args): if sys.platform == 'win32':