depot_tools: Simplify subprocess2.

Remove support for unused features:
- timeout
- nag_timer and nag_max
- Popen start property
- callback functions for stdout and stderr

Bug: 984182
Change-Id: Ib2327119508a89d1e60f4a42c64b78d050f48092
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/1745406
Reviewed-by: Robbie Iannucci <iannucci@chromium.org>
Commit-Queue: Edward Lesmes <ehmaldonado@chromium.org>
changes/06/1745406/6
Edward Lemur 6 years ago committed by Commit Bot
parent 1b4c7e9f38
commit 1556fbc353

@ -78,11 +78,10 @@ class Gsutil(object):
RETRY_DELAY_MULTIPLE = 1.3
VPYTHON = 'vpython.bat' if GetNormalizedPlatform() == 'win32' else 'vpython'
def __init__(self, path, boto_path=None, timeout=None, version='4.28'):
def __init__(self, path, boto_path=None, version='4.28'):
if not os.path.exists(path):
raise FileNotFoundError('GSUtil not found in %s' % path)
self.path = path
self.timeout = timeout
self.boto_path = boto_path
self.version = version
@ -103,7 +102,7 @@ class Gsutil(object):
def call(self, *args):
cmd = [self.VPYTHON, self.path, '--force-version', self.version]
cmd.extend(args)
return subprocess2.call(cmd, env=self.get_sub_env(), timeout=self.timeout)
return subprocess2.call(cmd, env=self.get_sub_env())
def check_call(self, *args):
cmd = [self.VPYTHON, self.path, '--force-version', self.version]
@ -112,8 +111,7 @@ class Gsutil(object):
cmd,
stdout=subprocess2.PIPE,
stderr=subprocess2.PIPE,
env=self.get_sub_env(),
timeout=self.timeout)
env=self.get_sub_env())
# Parse output.
status_code_match = re.search(b'status=([0-9]+)', err)

@ -12,37 +12,27 @@ import errno
import io
import logging
import os
try:
import Queue
except ImportError: # For Py3 compatibility
import queue as Queue
import subprocess
import sys
import time
import threading
# Cache the string-escape codec to ensure subprocess can find it later.
# See crbug.com/912292#c2 for context.
if sys.version_info.major == 2:
import Queue
codecs.lookup('string-escape')
# TODO(crbug.com/953884): Remove this when python3 migration is done.
try:
basestring
except NameError:
else:
import queue as Queue
# pylint: disable=redefined-builtin
basestring = str
basestring = (str, bytes)
# Constants forwarded from subprocess.
PIPE = subprocess.PIPE
STDOUT = subprocess.STDOUT
# Sends stdout or stderr to os.devnull.
VOID = object()
# Error code when a process was killed because it timed out.
TIMED_OUT = -2001
VOID = open(os.devnull, 'w')
VOID_INPUT = open(os.devnull, 'r')
class CalledProcessError(subprocess.CalledProcessError):
@ -106,42 +96,6 @@ def get_english_env(env):
return env
class NagTimer(object):
"""
Triggers a callback when a time interval passes without an event being fired.
For example, the event could be receiving terminal output from a subprocess;
and the callback could print a warning to stderr that the subprocess appeared
to be hung.
"""
def __init__(self, interval, cb):
self.interval = interval
self.cb = cb
self.timer = threading.Timer(self.interval, self.fn)
self.last_output = self.previous_last_output = 0
def start(self):
self.last_output = self.previous_last_output = time.time()
self.timer.start()
def event(self):
self.last_output = time.time()
def fn(self):
now = time.time()
if self.last_output == self.previous_last_output:
self.cb(now - self.previous_last_output)
# Use 0.1 fudge factor, just in case
# (self.last_output - now) is very close to zero.
sleep_time = (self.last_output - now - 0.1) % self.interval
self.previous_last_output = self.last_output
self.timer = threading.Timer(sleep_time + 0.1, self.fn)
self.timer.start()
def cancel(self):
self.timer.cancel()
class Popen(subprocess.Popen):
"""Wraps subprocess.Popen() with various workarounds.
@ -184,33 +138,6 @@ class Popen(subprocess.Popen):
tmp_str += '; cwd=%s' % kwargs['cwd']
logging.debug(tmp_str)
self.stdout_cb = None
self.stderr_cb = None
self.stdin_is_void = False
self.stdout_is_void = False
self.stderr_is_void = False
self.cmd_str = tmp_str
if kwargs.get('stdin') is VOID:
kwargs['stdin'] = open(os.devnull, 'r')
self.stdin_is_void = True
for stream in ('stdout', 'stderr'):
if kwargs.get(stream) in (VOID, os.devnull):
kwargs[stream] = open(os.devnull, 'w')
setattr(self, stream + '_is_void', True)
if callable(kwargs.get(stream)):
setattr(self, stream + '_cb', kwargs[stream])
kwargs[stream] = PIPE
self.start = time.time()
self.timeout = None
self.nag_timer = None
self.nag_max = None
self.shell = kwargs.get('shell', None)
# Silence pylint on MacOSX
self.returncode = None
try:
with self.popen_lock:
super(Popen, self).__init__(args, **kwargs)
@ -231,205 +158,25 @@ class Popen(subprocess.Popen):
'Check that %s or %s exist and have execution permission.'
% (str(e), kwargs.get('cwd'), args[0]))
def _tee_threads(self, input): # pylint: disable=redefined-builtin
"""Does I/O for a process's pipes using threads.
It's the simplest and slowest implementation. Expect very slow behavior.
If there is a callback and it doesn't keep up with the calls, the timeout
effectiveness will be delayed accordingly.
"""
# Queue of either of <threadname> when done or (<threadname>, data). In
# theory we would like to limit to ~64kb items to not cause large memory
# usage when the callback blocks. It is not done because it slows down
# processing on OSX10.6 by a factor of 2x, making it even slower than
# Windows! Revisit this decision if it becomes a problem, e.g. crash
# because of memory exhaustion.
queue = Queue.Queue()
done = threading.Event()
nag = None
def write_stdin():
try:
stdin_io = io.BytesIO(input)
while True:
data = stdin_io.read(1024)
if data:
self.stdin.write(data)
else:
self.stdin.close()
break
finally:
queue.put('stdin')
def _queue_pipe_read(pipe, name):
"""Queues characters read from a pipe into a queue."""
try:
while True:
data = pipe.read(1)
if not data:
break
if nag:
nag.event()
queue.put((name, data))
finally:
queue.put(name)
def timeout_fn():
try:
done.wait(self.timeout)
finally:
queue.put('timeout')
def wait_fn():
try:
self.wait()
finally:
queue.put('wait')
# Starts up to 5 threads:
# Wait for the process to quit
# Read stdout
# Read stderr
# Write stdin
# Timeout
threads = {
'wait': threading.Thread(target=wait_fn),
}
if self.timeout is not None:
threads['timeout'] = threading.Thread(target=timeout_fn)
if self.stdout_cb:
threads['stdout'] = threading.Thread(
target=_queue_pipe_read, args=(self.stdout, 'stdout'))
if self.stderr_cb:
threads['stderr'] = threading.Thread(
target=_queue_pipe_read, args=(self.stderr, 'stderr'))
if input:
threads['stdin'] = threading.Thread(target=write_stdin)
elif self.stdin:
# Pipe but no input, make sure it's closed.
self.stdin.close()
for t in threads.itervalues():
t.start()
if self.nag_timer:
def _nag_cb(elapsed):
logging.warn(' No output for %.0f seconds from command:' % elapsed)
logging.warn(' %s' % self.cmd_str)
if (self.nag_max and
int('%.0f' % (elapsed / self.nag_timer)) >= self.nag_max):
queue.put('timeout')
done.set() # Must do this so that timeout thread stops waiting.
nag = NagTimer(self.nag_timer, _nag_cb)
nag.start()
timed_out = False
try:
# This thread needs to be optimized for speed.
while threads:
item = queue.get()
if item[0] == 'stdout':
self.stdout_cb(item[1])
elif item[0] == 'stderr':
self.stderr_cb(item[1])
else:
# A thread terminated.
if item in threads:
threads[item].join()
del threads[item]
if item == 'wait':
# Terminate the timeout thread if necessary.
done.set()
elif item == 'timeout' and not timed_out and self.poll() is None:
logging.debug('Timed out after %.0fs: killing' % (
time.time() - self.start))
self.kill()
timed_out = True
finally:
# Stop the threads.
done.set()
if nag:
nag.cancel()
if 'wait' in threads:
# Accelerate things, otherwise it would hang until the child process is
# done.
logging.debug('Killing child because of an exception')
self.kill()
# Join threads.
for thread in threads.itervalues():
thread.join()
if timed_out:
self.returncode = TIMED_OUT
# pylint: disable=arguments-differ,W0622
def communicate(self, input=None, timeout=None, nag_timer=None,
nag_max=None):
"""Adds timeout and callbacks support.
Returns (stdout, stderr) like subprocess.Popen().communicate().
- The process will be killed after |timeout| seconds and returncode set to
TIMED_OUT.
- If the subprocess runs for |nag_timer| seconds without producing terminal
output, print a warning to stderr.
"""
self.timeout = timeout
self.nag_timer = nag_timer
self.nag_max = nag_max
if (not self.timeout and not self.nag_timer and
not self.stdout_cb and not self.stderr_cb):
return super(Popen, self).communicate(input)
if self.timeout and self.shell:
raise TypeError(
'Using timeout and shell simultaneously will cause a process leak '
'since the shell will be killed instead of the child process.')
stdout = None
stderr = None
# Convert to a lambda to workaround python's deadlock.
# http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
# When the pipe fills up, it would deadlock this process.
if self.stdout and not self.stdout_cb and not self.stdout_is_void:
stdout = []
self.stdout_cb = stdout.append
if self.stderr and not self.stderr_cb and not self.stderr_is_void:
stderr = []
self.stderr_cb = stderr.append
self._tee_threads(input)
if stdout is not None:
stdout = ''.join(stdout)
if stderr is not None:
stderr = ''.join(stderr)
return (stdout, stderr)
def communicate(args, timeout=None, nag_timer=None, nag_max=None, **kwargs):
"""Wraps subprocess.Popen().communicate() and add timeout support.
def communicate(args, **kwargs):
"""Wraps subprocess.Popen().communicate().
Returns ((stdout, stderr), returncode).
- The process will be killed after |timeout| seconds and returncode set to
TIMED_OUT.
- If the subprocess runs for |nag_timer| seconds without producing terminal
output, print a warning to stderr.
- Automatically passes stdin content as input so do not specify stdin=PIPE.
"""
stdin = kwargs.pop('stdin', None)
if stdin is not None:
if isinstance(stdin, basestring):
# When stdin is passed as an argument, use it as the actual input data and
# set the Popen() parameter accordingly.
kwargs['stdin'] = PIPE
else:
kwargs['stdin'] = stdin
stdin = None
stdin = None
# When stdin is passed as an argument, use it as the actual input data and
# set the Popen() parameter accordingly.
if 'stdin' in kwargs and isinstance(kwargs['stdin'], basestring):
stdin = kwargs['stdin']
kwargs['stdin'] = PIPE
proc = Popen(args, **kwargs)
if stdin:
return proc.communicate(stdin, timeout, nag_timer), proc.returncode
else:
return proc.communicate(None, timeout, nag_timer), proc.returncode
return proc.communicate(stdin), proc.returncode
def call(args, **kwargs):
@ -472,7 +219,7 @@ def capture(args, **kwargs):
- Discards returncode.
- Blocks stdin by default if not specified since no output will be visible.
"""
kwargs.setdefault('stdin', VOID)
kwargs.setdefault('stdin', VOID_INPUT)
# Like check_output, deny the caller from using stdout arg.
return communicate(args, stdout=PIPE, **kwargs)[0][0]
@ -488,7 +235,7 @@ def check_output(args, **kwargs):
- Blocks stdin by default if not specified since no output will be visible.
- As per doc, "The stdout argument is not allowed as it is used internally."
"""
kwargs.setdefault('stdin', VOID)
kwargs.setdefault('stdin', VOID_INPUT)
if 'stdout' in kwargs:
raise ValueError('stdout argument not allowed, it would be overridden.')
return check_call_out(args, stdout=PIPE, **kwargs)[0]

@ -30,8 +30,8 @@ from testing_support import auto_stub
# Create aliases for subprocess2 specific tests. They shouldn't be used for
# regression tests.
TIMED_OUT = subprocess2.TIMED_OUT
VOID = subprocess2.VOID
VOID_INPUT = subprocess2.VOID_INPUT
PIPE = subprocess2.PIPE
STDOUT = subprocess2.STDOUT
@ -78,7 +78,7 @@ class DefaultsTest(auto_stub.TestCase):
results['args'] = args
@staticmethod
# pylint: disable=redefined-builtin
def communicate(input=None, timeout=None, nag_max=None, nag_timer=None):
def communicate(input=None):
return None, None
self.mock(subprocess2, 'Popen', fake_Popen)
return results
@ -113,7 +113,7 @@ class DefaultsTest(auto_stub.TestCase):
expected = {
'args': ['foo'],
'a':True,
'stdin': subprocess2.VOID,
'stdin': subprocess2.VOID_INPUT,
'stdout': subprocess2.PIPE,
}
self.assertEquals(expected, results)
@ -149,7 +149,6 @@ class DefaultsTest(auto_stub.TestCase):
env['LANGUAGE'] = 'en_US.UTF-8'
expected['env'] = env
self.assertEquals(expected, results)
self.assertTrue(time.time() >= proc.start)
def test_check_output_defaults(self):
results = self._fake_communicate()
@ -159,7 +158,7 @@ class DefaultsTest(auto_stub.TestCase):
expected = {
'args': ['foo'],
'a':True,
'stdin': subprocess2.VOID,
'stdin': subprocess2.VOID_INPUT,
'stdout': subprocess2.PIPE,
}
self.assertEquals(expected, results)
@ -334,7 +333,7 @@ class RegressionTest(BaseTestCase):
p1 = subprocess.Popen(cmd, stderr=subprocess.PIPE, shell=False)
p2 = subprocess2.Popen(cmd, stderr=subprocess.PIPE, shell=False)
r1 = p1.communicate()
r2 = p2.communicate(timeout=100)
r2 = p2.communicate()
self.assertEquals(r1, r2)
@ -370,27 +369,6 @@ class S2Test(BaseTestCase):
self.assertEquals(stdout, e.stdout)
self.assertEquals(stderr, e.stderr)
def test_timeout(self):
# timeout doesn't exist in subprocess.
def fn(c, e, un):
res = subprocess2.communicate(
self.exe + ['--sleep_first', '--stdout'],
timeout=0.01,
stdout=PIPE,
shell=False)
self._check_res(res, '', None, TIMED_OUT)
self._run_test(fn)
def test_timeout_shell_throws(self):
def fn(c, e, un):
try:
# With shell=True, it needs a string.
subprocess2.communicate(' '.join(self.exe), timeout=0.01, shell=True)
self.fail()
except TypeError:
pass
self._run_test(fn)
def test_stdin(self):
def fn(c, e, un):
stdin = '0123456789'
@ -422,17 +400,16 @@ class S2Test(BaseTestCase):
self._run_test(fn)
def test_stdin_void(self):
res = subprocess2.communicate(self.exe + ['--read'], stdin=VOID)
res = subprocess2.communicate(self.exe + ['--read'], stdin=VOID_INPUT)
self._check_res(res, None, None, 0)
def test_stdin_void_stdout_timeout(self):
# Make sure a mix of VOID, PIPE and timeout works.
def test_stdin_void_stdout(self):
# Make sure a mix of VOID and PIPE works.
def fn(c, e, un):
res = subprocess2.communicate(
e + ['--stdout', '--read'],
stdin=VOID,
stdin=VOID_INPUT,
stdout=PIPE,
timeout=10,
universal_newlines=un,
shell=False)
self._check_res(res, c('A\nBB\nCCC\n'), None, 0)
@ -468,155 +445,6 @@ class S2Test(BaseTestCase):
self._check_res(res, None, None, 0)
self._run_test(fn)
def test_tee_stderr(self):
def fn(c, e, un):
stderr = []
res = subprocess2.communicate(
e + ['--stderr'], stderr=stderr.append, universal_newlines=un)
self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
self._check_res(res, None, None, 0)
self._run_test(fn)
def test_tee_stdout_stderr(self):
def fn(c, e, un):
stdout = []
stderr = []
res = subprocess2.communicate(
e + ['--stdout', '--stderr'],
stdout=stdout.append,
stderr=stderr.append,
universal_newlines=un)
self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
self._check_res(res, None, None, 0)
self._run_test(fn)
def test_tee_stdin(self):
def fn(c, e, un):
# Mix of stdin input and stdout callback.
stdout = []
stdin = '0123456789'
res = subprocess2.communicate(
e + ['--stdout', '--read'],
stdin=stdin,
stdout=stdout.append,
universal_newlines=un)
self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
self._check_res(res, None, None, 10)
self._run_test(fn)
def test_tee_throw(self):
def fn(c, e, un):
# Make sure failure still returns stderr completely.
stderr = []
try:
subprocess2.check_output(
e + ['--stderr', '--fail'],
stderr=stderr.append,
universal_newlines=un)
self.fail()
except subprocess2.CalledProcessError as exception:
self._check_exception(exception, '', None, 64)
self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
self._run_test(fn)
def test_tee_timeout_stdout_void(self):
def fn(c, e, un):
stderr = []
res = subprocess2.communicate(
e + ['--stdout', '--stderr', '--fail'],
stdout=VOID,
stderr=stderr.append,
shell=False,
timeout=10,
universal_newlines=un)
self._check_res(res, None, None, 64)
self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
self._run_test(fn)
def test_tee_timeout_stderr_void(self):
def fn(c, e, un):
stdout = []
res = subprocess2.communicate(
e + ['--stdout', '--stderr', '--fail'],
stdout=stdout.append,
stderr=VOID,
shell=False,
timeout=10,
universal_newlines=un)
self._check_res(res, None, None, 64)
self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
self._run_test(fn)
def test_tee_timeout_stderr_stdout(self):
def fn(c, e, un):
stdout = []
res = subprocess2.communicate(
e + ['--stdout', '--stderr', '--fail'],
stdout=stdout.append,
stderr=STDOUT,
shell=False,
timeout=10,
universal_newlines=un)
self._check_res(res, None, None, 64)
# Ordering is random due to buffering.
self.assertEquals(
set(c('a\nbb\nccc\nA\nBB\nCCC\n').splitlines(True)),
set(''.join(stdout).splitlines(True)))
self._run_test(fn)
def test_tee_large(self):
stdout = []
# Read 128kb. On my workstation it takes >2s. Welcome to 2011.
res = subprocess2.communicate(self.exe + ['--large'], stdout=stdout.append)
self.assertEquals(128*1024, len(''.join(stdout)))
self._check_res(res, None, None, 0)
def test_tee_large_stdin(self):
stdout = []
# Write 128kb.
stdin = '0123456789abcdef' * (8*1024)
res = subprocess2.communicate(
self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append)
self.assertEquals(128*1024, len(''.join(stdout)))
# Windows return code is > 8 bits.
returncode = len(stdin) if sys.platform == 'win32' else 0
self._check_res(res, None, None, returncode)
def test_tee_cb_throw(self):
# Having a callback throwing up should not cause side-effects. It's a bit
# hard to measure.
class Blow(Exception):
pass
def blow(_):
raise Blow()
proc = subprocess2.Popen(self.exe + ['--stdout'], stdout=blow)
try:
proc.communicate()
self.fail()
except Blow:
self.assertNotEquals(0, proc.returncode)
def test_nag_timer(self):
w = []
l = logging.getLogger()
class _Filter(logging.Filter):
def filter(self, record):
if record.levelno == logging.WARNING:
w.append(record.getMessage().lstrip())
return 0
f = _Filter()
l.addFilter(f)
proc = subprocess2.Popen(
self.exe + ['--stdout', '--sleep_first'], stdout=PIPE)
res = proc.communicate(nag_timer=3), proc.returncode
l.removeFilter(f)
self._check_res(res, 'A\nBB\nCCC\n', None, 0)
expected = ['No output for 3 seconds from command:', proc.cmd_str,
'No output for 6 seconds from command:', proc.cmd_str,
'No output for 9 seconds from command:', proc.cmd_str]
self.assertEquals(w, expected)
def child_main(args):
if sys.platform == 'win32':

Loading…
Cancel
Save