diff --git a/presubmit_support.py b/presubmit_support.py index c1edc9493..9f8115d3f 100755 --- a/presubmit_support.py +++ b/presubmit_support.py @@ -30,8 +30,10 @@ import os # Somewhat exposed through the API. import pickle # Exposed through the API. import random import re # Exposed through the API. +import signal import sys # Parts exposed through API. import tempfile # Exposed through the API. +import threading import time import traceback # Exposed through the API. import types @@ -66,11 +68,153 @@ class CommandData(object): def __init__(self, name, cmd, kwargs, message): self.name = name self.cmd = cmd + self.stdin = kwargs.get('stdin', None) self.kwargs = kwargs + self.kwargs['stdout'] = subprocess.PIPE + self.kwargs['stderr'] = subprocess.STDOUT + self.kwargs['stdin'] = subprocess.PIPE self.message = message self.info = None +# Adapted from +# https://github.com/google/gtest-parallel/blob/master/gtest_parallel.py#L37 +# +# An object that catches SIGINT sent to the Python process and notices +# if processes passed to wait() die by SIGINT (we need to look for +# both of those cases, because pressing Ctrl+C can result in either +# the main process or one of the subprocesses getting the signal). +# +# Before a SIGINT is seen, wait(p) will simply call p.wait() and +# return the result. Once a SIGINT has been seen (in the main process +# or a subprocess, including the one the current call is waiting for), +# wait(p) will call p.terminate() and raise ProcessWasInterrupted. +class SigintHandler(object): + class ProcessWasInterrupted(Exception): + pass + + sigint_returncodes = {-signal.SIGINT, # Unix + -1073741510, # Windows + } + def __init__(self): + self.__lock = threading.Lock() + self.__processes = set() + self.__got_sigint = False + signal.signal(signal.SIGINT, lambda signal_num, frame: self.interrupt()) + + def __on_sigint(self): + self.__got_sigint = True + while self.__processes: + try: + self.__processes.pop().terminate() + except OSError: + pass + + def interrupt(self): + with self.__lock: + self.__on_sigint() + + def got_sigint(self): + with self.__lock: + return self.__got_sigint + + def wait(self, p, stdin): + with self.__lock: + if self.__got_sigint: + p.terminate() + self.__processes.add(p) + stdout, stderr = p.communicate(stdin) + code = p.returncode + with self.__lock: + self.__processes.discard(p) + if code in self.sigint_returncodes: + self.__on_sigint() + if self.__got_sigint: + raise self.ProcessWasInterrupted + return stdout, stderr + +sigint_handler = SigintHandler() + + +class ThreadPool(object): + def __init__(self, pool_size=None): + self._tests = [] + self._nonparallel_tests = [] + self._pool_size = pool_size or multiprocessing.cpu_count() + self._messages = [] + self._messages_lock = threading.Lock() + self._current_index = 0 + self._current_index_lock = threading.Lock() + + def CallCommand(self, test): + """Runs an external program. + + This function converts invocation of .py files and invocations of "python" + to vpython invocations. + """ + vpython = 'vpython.bat' if sys.platform == 'win32' else 'vpython' + + cmd = test.cmd + if cmd[0] == 'python': + cmd = list(cmd) + cmd[0] = vpython + elif cmd[0].endswith('.py'): + cmd = [vpython] + cmd + + try: + start = time.time() + p = subprocess.Popen(cmd, **test.kwargs) + stdout, _ = sigint_handler.wait(p, test.stdin) + duration = time.time() - start + except OSError as e: + duration = time.time() - start + return test.message( + '%s exec failure (%4.2fs)\n %s' % (test.name, duration, e)) + if p.returncode != 0: + return test.message( + '%s (%4.2fs) failed\n%s' % (test.name, duration, stdout)) + if test.info: + return test.info('%s (%4.2fs)' % (test.name, duration)) + + def AddTests(self, tests, parallel=True): + if parallel: + self._tests.extend(tests) + else: + self._nonparallel_tests.extend(tests) + + def RunAsync(self): + def _WorkerFn(): + while True: + test_index = None + with self._current_index_lock: + if self._current_index == len(self._tests): + break + test_index = self._current_index + self._current_index += 1 + result = self.CallCommand(self._tests[test_index]) + if result: + with self._messages_lock: + self._messages.append(result) + + def _StartDaemon(): + t = threading.Thread(target=_WorkerFn) + t.daemon = True + t.start() + return t + + for test in self._nonparallel_tests: + result = self.CallCommand(test) + if result: + self._messages.append(result) + + if self._tests: + threads = [_StartDaemon() for _ in range(self._pool_size)] + for worker in threads: + worker.join() + + return self._messages + + def normpath(path): '''Version of os.path.normpath that also changes backward slashes to forward slashes when not running on Windows. @@ -390,7 +534,7 @@ class InputApi(object): ) def __init__(self, change, presubmit_path, is_committing, - rietveld_obj, verbose, gerrit_obj=None, dry_run=None): + rietveld_obj, verbose, gerrit_obj=None, dry_run=None, thread_pool=None): """Builds an InputApi object. Args: @@ -413,6 +557,8 @@ class InputApi(object): if self.rietveld: self.host_url = self.rietveld.url + self.thread_pool = thread_pool or ThreadPool() + # We expose various modules and functions as attributes of the input_api # so that presubmit scripts don't have to import them. self.ast = ast @@ -452,12 +598,6 @@ class InputApi(object): self.cpu_count = multiprocessing.cpu_count() - # this is done here because in RunTests, the current working directory has - # changed, which causes Pool() to explode fantastically when run on windows - # (because it tries to load the __main__ module, which imports lots of - # things relative to the current working directory). - self._run_tests_pool = multiprocessing.Pool(self.cpu_count) - # The local path of the currently-being-processed presubmit script. self._current_presubmit_path = os.path.dirname(presubmit_path) @@ -635,27 +775,21 @@ class InputApi(object): return 'TBR' in self.change.tags or self.change.TBRsFromDescription() def RunTests(self, tests_mix, parallel=True): + # RunTests doesn't actually run tests. It adds them to a ThreadPool that + # will run all tests once all PRESUBMIT files are processed. tests = [] msgs = [] for t in tests_mix: - if isinstance(t, OutputApi.PresubmitResult): + if isinstance(t, OutputApi.PresubmitResult) and t: msgs.append(t) else: assert issubclass(t.message, _PresubmitResult) tests.append(t) if self.verbose: t.info = _PresubmitNotifyResult - if len(tests) > 1 and parallel: - # async recipe works around multiprocessing bug handling Ctrl-C - msgs.extend(self._run_tests_pool.map_async(CallCommand, tests).get(99999)) - else: - msgs.extend(map(CallCommand, tests)) - return [m for m in msgs if m] - - def ShutdownPool(self): - self._run_tests_pool.close() - self._run_tests_pool.join() - self._run_tests_pool = None + t.kwargs['cwd'] = self.PresubmitLocalPath() + self.thread_pool.AddTests(tests, parallel) + return msgs class _DiffCache(object): @@ -1273,7 +1407,7 @@ def DoPostUploadExecuter(change, class PresubmitExecuter(object): def __init__(self, change, committing, rietveld_obj, verbose, - gerrit_obj=None, dry_run=None): + gerrit_obj=None, dry_run=None, thread_pool=None): """ Args: change: The Change object. @@ -1289,6 +1423,7 @@ class PresubmitExecuter(object): self.verbose = verbose self.dry_run = dry_run self.more_cc = [] + self.thread_pool = thread_pool def ExecPresubmitScript(self, script_text, presubmit_path): """Executes a single presubmit script. @@ -1309,7 +1444,8 @@ class PresubmitExecuter(object): # Load the presubmit script into context. input_api = InputApi(self.change, presubmit_path, self.committing, self.rietveld, self.verbose, - gerrit_obj=self.gerrit, dry_run=self.dry_run) + gerrit_obj=self.gerrit, dry_run=self.dry_run, + thread_pool=self.thread_pool) output_api = OutputApi(self.committing) context = {} try: @@ -1344,8 +1480,6 @@ class PresubmitExecuter(object): else: result = () # no error since the script doesn't care about current event. - input_api.ShutdownPool() - # Return the process to the original working directory. os.chdir(main_path) return result @@ -1407,8 +1541,9 @@ def DoPresubmitChecks(change, if not presubmit_files and verbose: output.write("Warning, no PRESUBMIT.py found.\n") results = [] + thread_pool = ThreadPool() executer = PresubmitExecuter(change, committing, rietveld_obj, verbose, - gerrit_obj, dry_run) + gerrit_obj, dry_run, thread_pool) if default_presubmit: if verbose: output.write("Running default presubmit script.\n") @@ -1422,6 +1557,8 @@ def DoPresubmitChecks(change, presubmit_script = gclient_utils.FileRead(filename, 'rU') results += executer.ExecPresubmitScript(presubmit_script, filename) + results += thread_pool.RunAsync() + output.more_cc.extend(executer.more_cc) errors = [] notifications = [] @@ -1529,41 +1666,6 @@ def canned_check_filter(method_names): setattr(presubmit_canned_checks, name, method) -def CallCommand(cmd_data): - """Runs an external program, potentially from a child process created by the - multiprocessing module. - - multiprocessing needs a top level function with a single argument. - - This function converts invocation of .py files and invocations of "python" to - vpython invocations. - """ - vpython = 'vpython.bat' if sys.platform == 'win32' else 'vpython' - - cmd = cmd_data.cmd - if cmd[0] == 'python': - cmd = list(cmd) - cmd[0] = vpython - elif cmd[0].endswith('.py'): - cmd = [vpython] + cmd - - cmd_data.kwargs['stdout'] = subprocess.PIPE - cmd_data.kwargs['stderr'] = subprocess.STDOUT - try: - start = time.time() - (out, _), code = subprocess.communicate(cmd, **cmd_data.kwargs) - duration = time.time() - start - except OSError as e: - duration = time.time() - start - return cmd_data.message( - '%s exec failure (%4.2fs)\n %s' % (cmd_data.name, duration, e)) - if code != 0: - return cmd_data.message( - '%s (%4.2fs) failed\n%s' % (cmd_data.name, duration, out)) - if cmd_data.info: - return cmd_data.info('%s (%4.2fs)' % (cmd_data.name, duration)) - - def main(argv=None): parser = optparse.OptionParser(usage="%prog [options] ", version="%prog " + str(__version__)) diff --git a/tests/presubmit_unittest.py b/tests/presubmit_unittest.py index 63a1f0b5e..53ff26ea5 100755 --- a/tests/presubmit_unittest.py +++ b/tests/presubmit_unittest.py @@ -173,16 +173,19 @@ class PresubmitUnittest(PresubmitTestsBase): self.mox.ReplayAll() members = [ 'AffectedFile', 'Change', 'DoPostUploadExecuter', 'DoPresubmitChecks', - 'GetPostUploadExecuter', 'GitAffectedFile', 'CallCommand', 'CommandData', + 'GetPostUploadExecuter', 'GitAffectedFile', 'CommandData', 'GitChange', 'InputApi', 'ListRelevantPresubmitFiles', 'main', 'OutputApi', 'ParseFiles', 'PresubmitFailure', 'PresubmitExecuter', 'PresubmitOutput', 'ScanSubDirs', + 'SigintHandler', 'ThreadPool', 'ast', 'auth', 'cPickle', 'cpplint', 'cStringIO', 'contextlib', 'canned_check_filter', 'fix_encoding', 'fnmatch', 'gclient_utils', 'git_footers', 'glob', 'inspect', 'json', 'load_files', 'logging', 'marshal', 'normpath', 'optparse', 'os', 'owners', 'owners_finder', 'pickle', 'presubmit_canned_checks', 'random', 're', 'rietveld', 'scm', - 'subprocess', 'sys', 'tempfile', 'time', 'traceback', 'types', 'unittest', + 'sigint_handler', 'signal', + 'subprocess', 'sys', 'tempfile', 'threading', + 'time', 'traceback', 'types', 'unittest', 'urllib2', 'warn', 'multiprocessing', 'DoGetTryMasters', 'GetTryMastersExecuter', 'itertools', 'urlparse', 'gerrit_util', 'GerritAccessor', @@ -1002,7 +1005,6 @@ class InputApiUnittest(PresubmitTestsBase): 'PresubmitLocalPath', 'ReadFile', 'RightHandSideLines', - 'ShutdownPool', 'ast', 'basename', 'cPickle', @@ -1034,6 +1036,7 @@ class InputApiUnittest(PresubmitTestsBase): 'subprocess', 'tbr', 'tempfile', + 'thread_pool', 'time', 'traceback', 'unittest', @@ -1711,19 +1714,28 @@ class ChangeUnittest(PresubmitTestsBase): self.assertEquals('bar,baz,foo', change.TBR) -def CommHelper(input_api, cmd, ret=None, **kwargs): - ret = ret or (('', None), 0) - input_api.subprocess.communicate( - cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs - ).AndReturn(ret) - - class CannedChecksUnittest(PresubmitTestsBase): """Tests presubmit_canned_checks.py.""" + def CommHelper(self, input_api, cmd, stdin=None, ret=None, **kwargs): + ret = ret or (('', None), 0) + kwargs.setdefault('cwd', mox.IgnoreArg()) + kwargs.setdefault('stdin', subprocess.PIPE) + + mock_process = input_api.mox.CreateMockAnything() + mock_process.returncode = ret[1] + + input_api.PresubmitLocalPath().AndReturn(self.fake_root_dir) + input_api.subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs + ).AndReturn(mock_process) + + presubmit.sigint_handler.wait(mock_process, stdin).AndReturn(ret[0]) def MockInputApi(self, change, committing): # pylint: disable=no-self-use input_api = self.mox.CreateMock(presubmit.InputApi) + input_api.mox = self.mox + input_api.thread_pool = presubmit.ThreadPool() input_api.cStringIO = presubmit.cStringIO input_api.json = presubmit.json input_api.logging = logging @@ -1758,6 +1770,7 @@ class CannedChecksUnittest(PresubmitTestsBase): input_api.Command = presubmit.CommandData input_api.RunTests = functools.partial( presubmit.InputApi.RunTests, input_api) + presubmit.sigint_handler = self.mox.CreateMock(presubmit.SigintHandler) return input_api def testMembersChanged(self): @@ -2268,30 +2281,33 @@ class CannedChecksUnittest(PresubmitTestsBase): def testRunPythonUnitTestsNoTest(self): input_api = self.MockInputApi(None, False) self.mox.ReplayAll() - results = presubmit_canned_checks.RunPythonUnitTests( + presubmit_canned_checks.RunPythonUnitTests( input_api, presubmit.OutputApi, []) + results = input_api.thread_pool.RunAsync() self.assertEquals(results, []) def testRunPythonUnitTestsNonExistentUpload(self): input_api = self.MockInputApi(None, False) - CommHelper(input_api, ['pyyyyython', '-m', '_non_existent_module'], - ret=(('foo', None), 1), cwd=None, env=None) + self.CommHelper(input_api, ['pyyyyython', '-m', '_non_existent_module'], + ret=(('foo', None), 1), env=None) self.mox.ReplayAll() - results = presubmit_canned_checks.RunPythonUnitTests( + presubmit_canned_checks.RunPythonUnitTests( input_api, presubmit.OutputApi, ['_non_existent_module']) + results = input_api.thread_pool.RunAsync() self.assertEquals(len(results), 1) self.assertEquals(results[0].__class__, presubmit.OutputApi.PresubmitNotifyResult) def testRunPythonUnitTestsNonExistentCommitting(self): input_api = self.MockInputApi(None, True) - CommHelper(input_api, ['pyyyyython', '-m', '_non_existent_module'], - ret=(('foo', None), 1), cwd=None, env=None) + self.CommHelper(input_api, ['pyyyyython', '-m', '_non_existent_module'], + ret=(('foo', None), 1), env=None) self.mox.ReplayAll() - results = presubmit_canned_checks.RunPythonUnitTests( + presubmit_canned_checks.RunPythonUnitTests( input_api, presubmit.OutputApi, ['_non_existent_module']) + results = input_api.thread_pool.RunAsync() self.assertEquals(len(results), 1) self.assertEquals(results[0].__class__, presubmit.OutputApi.PresubmitError) @@ -2299,12 +2315,13 @@ class CannedChecksUnittest(PresubmitTestsBase): input_api = self.MockInputApi(None, False) input_api.unittest = self.mox.CreateMock(unittest) input_api.cStringIO = self.mox.CreateMock(presubmit.cStringIO) - CommHelper(input_api, ['pyyyyython', '-m', 'test_module'], - ret=(('foo', None), 1), cwd=None, env=None) + self.CommHelper(input_api, ['pyyyyython', '-m', 'test_module'], + ret=(('foo', None), 1), env=None) self.mox.ReplayAll() - results = presubmit_canned_checks.RunPythonUnitTests( + presubmit_canned_checks.RunPythonUnitTests( input_api, presubmit.OutputApi, ['test_module']) + results = input_api.thread_pool.RunAsync() self.assertEquals(len(results), 1) self.assertEquals(results[0].__class__, presubmit.OutputApi.PresubmitNotifyResult) @@ -2312,12 +2329,13 @@ class CannedChecksUnittest(PresubmitTestsBase): def testRunPythonUnitTestsFailureCommitting(self): input_api = self.MockInputApi(None, True) - CommHelper(input_api, ['pyyyyython', '-m', 'test_module'], - ret=(('foo', None), 1), cwd=None, env=None) + self.CommHelper(input_api, ['pyyyyython', '-m', 'test_module'], + ret=(('foo', None), 1), env=None) self.mox.ReplayAll() - results = presubmit_canned_checks.RunPythonUnitTests( + presubmit_canned_checks.RunPythonUnitTests( input_api, presubmit.OutputApi, ['test_module']) + results = input_api.thread_pool.RunAsync() self.assertEquals(len(results), 1) self.assertEquals(results[0].__class__, presubmit.OutputApi.PresubmitError) self.assertEquals('test_module (0.00s) failed\nfoo', results[0]._message) @@ -2326,13 +2344,13 @@ class CannedChecksUnittest(PresubmitTestsBase): input_api = self.MockInputApi(None, False) input_api.cStringIO = self.mox.CreateMock(presubmit.cStringIO) input_api.unittest = self.mox.CreateMock(unittest) - CommHelper(input_api, ['pyyyyython', '-m', 'test_module'], - cwd=None, env=None) + self.CommHelper(input_api, ['pyyyyython', '-m', 'test_module'], env=None) self.mox.ReplayAll() - results = presubmit_canned_checks.RunPythonUnitTests( + presubmit_canned_checks.RunPythonUnitTests( input_api, presubmit.OutputApi, ['test_module']) - self.assertEquals(len(results), 0) + results = input_api.thread_pool.RunAsync() + self.assertEquals(results, []) def testCannedRunPylint(self): input_api = self.MockInputApi(None, True) @@ -2345,12 +2363,12 @@ class CannedChecksUnittest(PresubmitTestsBase): pylint = os.path.join(_ROOT, 'third_party', 'pylint.py') pylintrc = os.path.join(_ROOT, 'pylintrc') - CommHelper(input_api, + self.CommHelper(input_api, ['pyyyyython', pylint, '--args-on-stdin'], env=mox.IgnoreArg(), stdin= '--rcfile=%s\n--disable=cyclic-import\n--jobs=2\nfile1.py' % pylintrc) - CommHelper(input_api, + self.CommHelper(input_api, ['pyyyyython', pylint, '--args-on-stdin'], env=mox.IgnoreArg(), stdin= '--rcfile=%s\n--disable=all\n--enable=cyclic-import\nfile1.py' @@ -2359,6 +2377,8 @@ class CannedChecksUnittest(PresubmitTestsBase): results = presubmit_canned_checks.RunPylint( input_api, presubmit.OutputApi) + input_api.thread_pool.RunAsync() + self.assertEquals([], results) self.checkstdout('') @@ -2782,19 +2802,20 @@ class CannedChecksUnittest(PresubmitTestsBase): unit_tests = ['allo', 'bar.py'] input_api.PresubmitLocalPath().AndReturn(self.fake_root_dir) input_api.PresubmitLocalPath().AndReturn(self.fake_root_dir) - CommHelper(input_api, ['allo', '--verbose'], cwd=self.fake_root_dir) + self.CommHelper(input_api, ['allo', '--verbose'], cwd=self.fake_root_dir) cmd = ['bar.py', '--verbose'] if input_api.platform == 'win32': cmd.insert(0, 'vpython.bat') else: cmd.insert(0, 'vpython') - CommHelper(input_api, cmd, cwd=self.fake_root_dir, ret=(('', None), 1)) + self.CommHelper(input_api, cmd, cwd=self.fake_root_dir, ret=(('', None), 1)) self.mox.ReplayAll() - results = presubmit_canned_checks.RunUnitTests( + presubmit_canned_checks.RunUnitTests( input_api, presubmit.OutputApi, unit_tests) + results = input_api.thread_pool.RunAsync() self.assertEqual(2, len(results)) self.assertEqual( presubmit.OutputApi.PresubmitNotifyResult, results[0].__class__) @@ -2813,19 +2834,20 @@ class CannedChecksUnittest(PresubmitTestsBase): path = presubmit.os.path.join(self.fake_root_dir, 'random_directory') input_api.os_listdir(path).AndReturn(['.', '..', 'a', 'b', 'c']) input_api.os_path.isfile = lambda x: not x.endswith('.') - CommHelper( + self.CommHelper( input_api, [presubmit.os.path.join('random_directory', 'b'), '--verbose'], cwd=self.fake_root_dir) input_api.logging.debug('Found 5 files, running 1 unit tests') self.mox.ReplayAll() - results = presubmit_canned_checks.RunUnitTestsInDirectory( + presubmit_canned_checks.RunUnitTestsInDirectory( input_api, presubmit.OutputApi, 'random_directory', whitelist=['^a$', '^b$'], blacklist=['a']) + results = input_api.thread_pool.RunAsync() self.assertEqual(1, len(results)) self.assertEqual( presubmit.OutputApi.PresubmitNotifyResult, results[0].__class__) @@ -2880,7 +2902,11 @@ class CannedChecksUnittest(PresubmitTestsBase): input_api, presubmit.OutputApi, path='/path/to/foo') self.assertEquals(command.cmd, ['cipd', 'ensure-file-verify', '-ensure-file', '/path/to/foo']) - self.assertEquals(command.kwargs, {}) + self.assertEquals(command.kwargs, { + 'stdin': subprocess.PIPE, + 'stdout': subprocess.PIPE, + 'stderr': subprocess.STDOUT, + }) def testCheckCIPDManifest_content(self): input_api = self.MockInputApi(None, False) @@ -2891,7 +2917,12 @@ class CannedChecksUnittest(PresubmitTestsBase): input_api, presubmit.OutputApi, content='manifest_content') self.assertEquals(command.cmd, ['cipd', 'ensure-file-verify', '-log-level', 'debug', '-ensure-file=-']) - self.assertEquals(command.kwargs, {'stdin': 'manifest_content'}) + self.assertEquals(command.stdin, 'manifest_content') + self.assertEquals(command.kwargs, { + 'stdin': subprocess.PIPE, + 'stdout': subprocess.PIPE, + 'stderr': subprocess.STDOUT, + }) def testCheckCIPDPackages(self): content = '\n'.join([ @@ -2913,7 +2944,12 @@ class CannedChecksUnittest(PresubmitTestsBase): }) self.assertEquals(command.cmd, ['cipd', 'ensure-file-verify', '-ensure-file=-']) - self.assertEquals(command.kwargs, {'stdin': content}) + self.assertEquals(command.stdin, content) + self.assertEquals(command.kwargs, { + 'stdin': subprocess.PIPE, + 'stdout': subprocess.PIPE, + 'stderr': subprocess.STDOUT, + }) def testCannedCheckVPythonSpec(self): change = presubmit.Change('a', 'b', self.fake_root_dir, None, 0, 0, None) @@ -2936,7 +2972,12 @@ class CannedChecksUnittest(PresubmitTestsBase): '-vpython-tool', 'verify' ]) self.assertDictEqual( - commands[0].kwargs, {'stderr': input_api.subprocess.STDOUT}) + commands[0].kwargs, + { + 'stderr': input_api.subprocess.STDOUT, + 'stdout': input_api.subprocess.PIPE, + 'stdin': input_api.subprocess.PIPE, + }) self.assertEqual(commands[0].message, presubmit.OutputApi.PresubmitError) self.assertIsNone(commands[0].info)