diff --git a/presubmit_support.py b/presubmit_support.py index 89f7d5ad3..0a2d69129 100755 --- a/presubmit_support.py +++ b/presubmit_support.py @@ -30,10 +30,8 @@ import os # Somewhat exposed through the API. import pickle # Exposed through the API. import random import re # Exposed through the API. -import signal import sys # Parts exposed through API. import tempfile # Exposed through the API. -import threading import time import traceback # Exposed through the API. import types @@ -66,153 +64,11 @@ class CommandData(object): def __init__(self, name, cmd, kwargs, message): self.name = name self.cmd = cmd - self.stdin = kwargs.get('stdin', None) self.kwargs = kwargs - self.kwargs['stdout'] = subprocess.PIPE - self.kwargs['stderr'] = subprocess.STDOUT - self.kwargs['stdin'] = subprocess.PIPE self.message = message self.info = None -# Adapted from -# https://github.com/google/gtest-parallel/blob/master/gtest_parallel.py#L37 -# -# An object that catches SIGINT sent to the Python process and notices -# if processes passed to wait() die by SIGINT (we need to look for -# both of those cases, because pressing Ctrl+C can result in either -# the main process or one of the subprocesses getting the signal). -# -# Before a SIGINT is seen, wait(p) will simply call p.wait() and -# return the result. Once a SIGINT has been seen (in the main process -# or a subprocess, including the one the current call is waiting for), -# wait(p) will call p.terminate() and raise ProcessWasInterrupted. -class SigintHandler(object): - class ProcessWasInterrupted(Exception): - pass - - sigint_returncodes = {-signal.SIGINT, # Unix - -1073741510, # Windows - } - def __init__(self): - self.__lock = threading.Lock() - self.__processes = set() - self.__got_sigint = False - signal.signal(signal.SIGINT, lambda signal_num, frame: self.interrupt()) - - def __on_sigint(self): - self.__got_sigint = True - while self.__processes: - try: - self.__processes.pop().terminate() - except OSError: - pass - - def interrupt(self): - with self.__lock: - self.__on_sigint() - - def got_sigint(self): - with self.__lock: - return self.__got_sigint - - def wait(self, p, stdin): - with self.__lock: - if self.__got_sigint: - p.terminate() - self.__processes.add(p) - stdout, stderr = p.communicate(stdin) - code = p.returncode - with self.__lock: - self.__processes.discard(p) - if code in self.sigint_returncodes: - self.__on_sigint() - if self.__got_sigint: - raise self.ProcessWasInterrupted - return stdout, stderr - -sigint_handler = SigintHandler() - - -class ThreadPool(object): - def __init__(self, pool_size=None): - self._tests = [] - self._nonparallel_tests = [] - self._pool_size = pool_size or multiprocessing.cpu_count() - self._messages = [] - self._messages_lock = threading.Lock() - self._current_index = 0 - self._current_index_lock = threading.Lock() - - def CallCommand(self, test): - """Runs an external program. - - This function converts invocation of .py files and invocations of "python" - to vpython invocations. - """ - vpython = 'vpython.bat' if sys.platform == 'win32' else 'vpython' - - cmd = test.cmd - if cmd[0] == 'python': - cmd = list(cmd) - cmd[0] = vpython - elif cmd[0].endswith('.py'): - cmd = [vpython] + cmd - - try: - start = time.time() - p = subprocess.Popen(cmd, **test.kwargs) - stdout, _ = sigint_handler.wait(p, test.stdin) - duration = time.time() - start - except OSError as e: - duration = time.time() - start - return test.message( - '%s exec failure (%4.2fs)\n %s' % (test.name, duration, e)) - if p.returncode != 0: - return test.message( - '%s (%4.2fs) failed\n%s' % (test.name, duration, stdout)) - if test.info: - return test.info('%s (%4.2fs)' % (test.name, duration)) - - def AddTests(self, tests, parallel=True): - if parallel: - self._tests.extend(tests) - else: - self._nonparallel_tests.extend(tests) - - def RunAsync(self): - def _WorkerFn(): - while True: - test_index = None - with self._current_index_lock: - if self._current_index == len(self._tests): - break - test_index = self._current_index - self._current_index += 1 - result = self.CallCommand(self._tests[test_index]) - if result: - with self._messages_lock: - self._messages.append(result) - - def _StartDaemon(): - t = threading.Thread(target=_WorkerFn) - t.daemon = True - t.start() - return t - - for test in self._nonparallel_tests: - result = self.CallCommand(test) - if result: - self._messages.append(result) - - if self._tests: - threads = [_StartDaemon() for _ in range(self._pool_size)] - for worker in threads: - worker.join() - - return self._messages - - def normpath(path): '''Version of os.path.normpath that also changes backward slashes to forward slashes when not running on Windows. @@ -532,7 +388,7 @@ class InputApi(object): ) def __init__(self, change, presubmit_path, is_committing, - verbose, gerrit_obj, dry_run=None, thread_pool=None): + verbose, gerrit_obj, dry_run=None): """Builds an InputApi object. Args: @@ -549,8 +405,6 @@ class InputApi(object): self.gerrit = gerrit_obj self.dry_run = dry_run - self.thread_pool = thread_pool or ThreadPool() - # We expose various modules and functions as attributes of the input_api # so that presubmit scripts don't have to import them. self.ast = ast @@ -590,6 +444,12 @@ class InputApi(object): self.cpu_count = multiprocessing.cpu_count() + # this is done here because in RunTests, the current working directory has + # changed, which causes Pool() to explode fantastically when run on windows + # (because it tries to load the __main__ module, which imports lots of + # things relative to the current working directory). + self._run_tests_pool = multiprocessing.Pool(self.cpu_count) + # The local path of the currently-being-processed presubmit script. self._current_presubmit_path = os.path.dirname(presubmit_path) @@ -767,21 +627,27 @@ class InputApi(object): return 'TBR' in self.change.tags or self.change.TBRsFromDescription() def RunTests(self, tests_mix, parallel=True): - # RunTests doesn't actually run tests. It adds them to a ThreadPool that - # will run all tests once all PRESUBMIT files are processed. tests = [] msgs = [] for t in tests_mix: - if isinstance(t, OutputApi.PresubmitResult) and t: + if isinstance(t, OutputApi.PresubmitResult): msgs.append(t) else: assert issubclass(t.message, _PresubmitResult) tests.append(t) if self.verbose: t.info = _PresubmitNotifyResult - t.kwargs['cwd'] = self.PresubmitLocalPath() - self.thread_pool.AddTests(tests, parallel) - return msgs + if len(tests) > 1 and parallel: + # async recipe works around multiprocessing bug handling Ctrl-C + msgs.extend(self._run_tests_pool.map_async(CallCommand, tests).get(99999)) + else: + msgs.extend(map(CallCommand, tests)) + return [m for m in msgs if m] + + def ShutdownPool(self): + self._run_tests_pool.close() + self._run_tests_pool.join() + self._run_tests_pool = None class _DiffCache(object): @@ -1399,7 +1265,7 @@ def DoPostUploadExecuter(change, class PresubmitExecuter(object): def __init__(self, change, committing, verbose, - gerrit_obj, dry_run=None, thread_pool=None): + gerrit_obj, dry_run=None): """ Args: change: The Change object. @@ -1413,7 +1279,6 @@ class PresubmitExecuter(object): self.verbose = verbose self.dry_run = dry_run self.more_cc = [] - self.thread_pool = thread_pool def ExecPresubmitScript(self, script_text, presubmit_path): """Executes a single presubmit script. @@ -1434,7 +1299,7 @@ class PresubmitExecuter(object): # Load the presubmit script into context. input_api = InputApi(self.change, presubmit_path, self.committing, self.verbose, gerrit_obj=self.gerrit, - dry_run=self.dry_run, thread_pool=self.thread_pool) + dry_run=self.dry_run) output_api = OutputApi(self.committing) context = {} try: @@ -1469,6 +1334,8 @@ class PresubmitExecuter(object): else: result = () # no error since the script doesn't care about current event. + input_api.ShutdownPool() + # Return the process to the original working directory. os.chdir(main_path) return result @@ -1528,9 +1395,8 @@ def DoPresubmitChecks(change, if not presubmit_files and verbose: output.write("Warning, no PRESUBMIT.py found.\n") results = [] - thread_pool = ThreadPool() executer = PresubmitExecuter(change, committing, verbose, - gerrit_obj, dry_run, thread_pool) + gerrit_obj, dry_run) if default_presubmit: if verbose: output.write("Running default presubmit script.\n") @@ -1544,8 +1410,6 @@ def DoPresubmitChecks(change, presubmit_script = gclient_utils.FileRead(filename, 'rU') results += executer.ExecPresubmitScript(presubmit_script, filename) - results += thread_pool.RunAsync() - output.more_cc.extend(executer.more_cc) errors = [] notifications = [] @@ -1653,6 +1517,41 @@ def canned_check_filter(method_names): setattr(presubmit_canned_checks, name, method) +def CallCommand(cmd_data): + """Runs an external program, potentially from a child process created by the + multiprocessing module. + + multiprocessing needs a top level function with a single argument. + + This function converts invocation of .py files and invocations of "python" to + vpython invocations. + """ + vpython = 'vpython.bat' if sys.platform == 'win32' else 'vpython' + + cmd = cmd_data.cmd + if cmd[0] == 'python': + cmd = list(cmd) + cmd[0] = vpython + elif cmd[0].endswith('.py'): + cmd = [vpython] + cmd + + cmd_data.kwargs['stdout'] = subprocess.PIPE + cmd_data.kwargs['stderr'] = subprocess.STDOUT + try: + start = time.time() + (out, _), code = subprocess.communicate(cmd, **cmd_data.kwargs) + duration = time.time() - start + except OSError as e: + duration = time.time() - start + return cmd_data.message( + '%s exec failure (%4.2fs)\n %s' % (cmd_data.name, duration, e)) + if code != 0: + return cmd_data.message( + '%s (%4.2fs) failed\n%s' % (cmd_data.name, duration, out)) + if cmd_data.info: + return cmd_data.info('%s (%4.2fs)' % (cmd_data.name, duration)) + + def main(argv=None): parser = optparse.OptionParser(usage="%prog [options] ", version="%prog " + str(__version__)) diff --git a/tests/presubmit_unittest.py b/tests/presubmit_unittest.py index 3f3e5d4d3..02bb10418 100755 --- a/tests/presubmit_unittest.py +++ b/tests/presubmit_unittest.py @@ -172,18 +172,16 @@ class PresubmitUnittest(PresubmitTestsBase): self.mox.ReplayAll() members = [ 'AffectedFile', 'Change', 'DoPostUploadExecuter', 'DoPresubmitChecks', - 'GetPostUploadExecuter', 'GitAffectedFile', 'CommandData', + 'GetPostUploadExecuter', 'GitAffectedFile', 'CallCommand', 'CommandData', 'GitChange', 'InputApi', 'ListRelevantPresubmitFiles', 'main', 'OutputApi', 'ParseFiles', 'PresubmitFailure', 'PresubmitExecuter', 'PresubmitOutput', 'ScanSubDirs', - 'SigintHandler', 'ThreadPool', 'ast', 'cPickle', 'cpplint', 'cStringIO', 'contextlib', 'canned_check_filter', 'fix_encoding', 'fnmatch', 'gclient_utils', 'git_footers', 'glob', 'inspect', 'json', 'load_files', 'logging', 'marshal', 'normpath', 'optparse', 'os', 'owners', 'owners_finder', 'pickle', 'presubmit_canned_checks', 'random', 're', 'scm', - 'sigint_handler', 'signal', - 'subprocess', 'sys', 'tempfile', 'threading', + 'subprocess', 'sys', 'tempfile', 'time', 'traceback', 'types', 'unittest', 'urllib2', 'warn', 'multiprocessing', 'DoGetTryMasters', 'GetTryMastersExecuter', 'itertools', 'urlparse', 'gerrit_util', @@ -1004,6 +1002,7 @@ class InputApiUnittest(PresubmitTestsBase): 'PresubmitLocalPath', 'ReadFile', 'RightHandSideLines', + 'ShutdownPool', 'ast', 'basename', 'cPickle', @@ -1033,7 +1032,6 @@ class InputApiUnittest(PresubmitTestsBase): 'subprocess', 'tbr', 'tempfile', - 'thread_pool', 'time', 'traceback', 'unittest', @@ -1710,28 +1708,19 @@ class ChangeUnittest(PresubmitTestsBase): self.assertEquals('bar,baz,foo', change.TBR) -class CannedChecksUnittest(PresubmitTestsBase): - """Tests presubmit_canned_checks.py.""" - def CommHelper(self, input_api, cmd, stdin=None, ret=None, **kwargs): - ret = ret or (('', None), 0) - kwargs.setdefault('cwd', mox.IgnoreArg()) - kwargs.setdefault('stdin', subprocess.PIPE) - - mock_process = input_api.mox.CreateMockAnything() - mock_process.returncode = ret[1] +def CommHelper(input_api, cmd, ret=None, **kwargs): + ret = ret or (('', None), 0) + input_api.subprocess.communicate( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs + ).AndReturn(ret) - input_api.PresubmitLocalPath().AndReturn(self.fake_root_dir) - input_api.subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs - ).AndReturn(mock_process) - presubmit.sigint_handler.wait(mock_process, stdin).AndReturn(ret[0]) +class CannedChecksUnittest(PresubmitTestsBase): + """Tests presubmit_canned_checks.py.""" def MockInputApi(self, change, committing): # pylint: disable=no-self-use input_api = self.mox.CreateMock(presubmit.InputApi) - input_api.mox = self.mox - input_api.thread_pool = presubmit.ThreadPool() input_api.cStringIO = presubmit.cStringIO input_api.json = presubmit.json input_api.logging = logging @@ -1764,7 +1753,6 @@ class CannedChecksUnittest(PresubmitTestsBase): input_api.Command = presubmit.CommandData input_api.RunTests = functools.partial( presubmit.InputApi.RunTests, input_api) - presubmit.sigint_handler = self.mox.CreateMock(presubmit.SigintHandler) return input_api def testMembersChanged(self): @@ -2274,33 +2262,30 @@ class CannedChecksUnittest(PresubmitTestsBase): def testRunPythonUnitTestsNoTest(self): input_api = self.MockInputApi(None, False) self.mox.ReplayAll() - presubmit_canned_checks.RunPythonUnitTests( + results = presubmit_canned_checks.RunPythonUnitTests( input_api, presubmit.OutputApi, []) - results = input_api.thread_pool.RunAsync() self.assertEquals(results, []) def testRunPythonUnitTestsNonExistentUpload(self): input_api = self.MockInputApi(None, False) - self.CommHelper(input_api, ['pyyyyython', '-m', '_non_existent_module'], - ret=(('foo', None), 1), env=None) + CommHelper(input_api, ['pyyyyython', '-m', '_non_existent_module'], + ret=(('foo', None), 1), cwd=None, env=None) self.mox.ReplayAll() - presubmit_canned_checks.RunPythonUnitTests( + results = presubmit_canned_checks.RunPythonUnitTests( input_api, presubmit.OutputApi, ['_non_existent_module']) - results = input_api.thread_pool.RunAsync() self.assertEquals(len(results), 1) self.assertEquals(results[0].__class__, presubmit.OutputApi.PresubmitNotifyResult) def testRunPythonUnitTestsNonExistentCommitting(self): input_api = self.MockInputApi(None, True) - self.CommHelper(input_api, ['pyyyyython', '-m', '_non_existent_module'], - ret=(('foo', None), 1), env=None) + CommHelper(input_api, ['pyyyyython', '-m', '_non_existent_module'], + ret=(('foo', None), 1), cwd=None, env=None) self.mox.ReplayAll() - presubmit_canned_checks.RunPythonUnitTests( + results = presubmit_canned_checks.RunPythonUnitTests( input_api, presubmit.OutputApi, ['_non_existent_module']) - results = input_api.thread_pool.RunAsync() self.assertEquals(len(results), 1) self.assertEquals(results[0].__class__, presubmit.OutputApi.PresubmitError) @@ -2308,13 +2293,12 @@ class CannedChecksUnittest(PresubmitTestsBase): input_api = self.MockInputApi(None, False) input_api.unittest = self.mox.CreateMock(unittest) input_api.cStringIO = self.mox.CreateMock(presubmit.cStringIO) - self.CommHelper(input_api, ['pyyyyython', '-m', 'test_module'], - ret=(('foo', None), 1), env=None) + CommHelper(input_api, ['pyyyyython', '-m', 'test_module'], + ret=(('foo', None), 1), cwd=None, env=None) self.mox.ReplayAll() - presubmit_canned_checks.RunPythonUnitTests( + results = presubmit_canned_checks.RunPythonUnitTests( input_api, presubmit.OutputApi, ['test_module']) - results = input_api.thread_pool.RunAsync() self.assertEquals(len(results), 1) self.assertEquals(results[0].__class__, presubmit.OutputApi.PresubmitNotifyResult) @@ -2322,13 +2306,12 @@ class CannedChecksUnittest(PresubmitTestsBase): def testRunPythonUnitTestsFailureCommitting(self): input_api = self.MockInputApi(None, True) - self.CommHelper(input_api, ['pyyyyython', '-m', 'test_module'], - ret=(('foo', None), 1), env=None) + CommHelper(input_api, ['pyyyyython', '-m', 'test_module'], + ret=(('foo', None), 1), cwd=None, env=None) self.mox.ReplayAll() - presubmit_canned_checks.RunPythonUnitTests( + results = presubmit_canned_checks.RunPythonUnitTests( input_api, presubmit.OutputApi, ['test_module']) - results = input_api.thread_pool.RunAsync() self.assertEquals(len(results), 1) self.assertEquals(results[0].__class__, presubmit.OutputApi.PresubmitError) self.assertEquals('test_module (0.00s) failed\nfoo', results[0]._message) @@ -2337,13 +2320,13 @@ class CannedChecksUnittest(PresubmitTestsBase): input_api = self.MockInputApi(None, False) input_api.cStringIO = self.mox.CreateMock(presubmit.cStringIO) input_api.unittest = self.mox.CreateMock(unittest) - self.CommHelper(input_api, ['pyyyyython', '-m', 'test_module'], env=None) + CommHelper(input_api, ['pyyyyython', '-m', 'test_module'], + cwd=None, env=None) self.mox.ReplayAll() - presubmit_canned_checks.RunPythonUnitTests( + results = presubmit_canned_checks.RunPythonUnitTests( input_api, presubmit.OutputApi, ['test_module']) - results = input_api.thread_pool.RunAsync() - self.assertEquals(results, []) + self.assertEquals(len(results), 0) def testCannedRunPylint(self): input_api = self.MockInputApi(None, True) @@ -2356,12 +2339,12 @@ class CannedChecksUnittest(PresubmitTestsBase): pylint = os.path.join(_ROOT, 'third_party', 'pylint.py') pylintrc = os.path.join(_ROOT, 'pylintrc') - self.CommHelper(input_api, + CommHelper(input_api, ['pyyyyython', pylint, '--args-on-stdin'], env=mox.IgnoreArg(), stdin= '--rcfile=%s\n--disable=cyclic-import\n--jobs=2\nfile1.py' % pylintrc) - self.CommHelper(input_api, + CommHelper(input_api, ['pyyyyython', pylint, '--args-on-stdin'], env=mox.IgnoreArg(), stdin= '--rcfile=%s\n--disable=all\n--enable=cyclic-import\nfile1.py' @@ -2370,8 +2353,6 @@ class CannedChecksUnittest(PresubmitTestsBase): results = presubmit_canned_checks.RunPylint( input_api, presubmit.OutputApi) - input_api.thread_pool.RunAsync() - self.assertEquals([], results) self.checkstdout('') @@ -2764,20 +2745,19 @@ class CannedChecksUnittest(PresubmitTestsBase): unit_tests = ['allo', 'bar.py'] input_api.PresubmitLocalPath().AndReturn(self.fake_root_dir) input_api.PresubmitLocalPath().AndReturn(self.fake_root_dir) - self.CommHelper(input_api, ['allo', '--verbose'], cwd=self.fake_root_dir) + CommHelper(input_api, ['allo', '--verbose'], cwd=self.fake_root_dir) cmd = ['bar.py', '--verbose'] if input_api.platform == 'win32': cmd.insert(0, 'vpython.bat') else: cmd.insert(0, 'vpython') - self.CommHelper(input_api, cmd, cwd=self.fake_root_dir, ret=(('', None), 1)) + CommHelper(input_api, cmd, cwd=self.fake_root_dir, ret=(('', None), 1)) self.mox.ReplayAll() - presubmit_canned_checks.RunUnitTests( + results = presubmit_canned_checks.RunUnitTests( input_api, presubmit.OutputApi, unit_tests) - results = input_api.thread_pool.RunAsync() self.assertEqual(2, len(results)) self.assertEqual( presubmit.OutputApi.PresubmitNotifyResult, results[0].__class__) @@ -2796,20 +2776,19 @@ class CannedChecksUnittest(PresubmitTestsBase): path = presubmit.os.path.join(self.fake_root_dir, 'random_directory') input_api.os_listdir(path).AndReturn(['.', '..', 'a', 'b', 'c']) input_api.os_path.isfile = lambda x: not x.endswith('.') - self.CommHelper( + CommHelper( input_api, [presubmit.os.path.join('random_directory', 'b'), '--verbose'], cwd=self.fake_root_dir) input_api.logging.debug('Found 5 files, running 1 unit tests') self.mox.ReplayAll() - presubmit_canned_checks.RunUnitTestsInDirectory( + results = presubmit_canned_checks.RunUnitTestsInDirectory( input_api, presubmit.OutputApi, 'random_directory', whitelist=['^a$', '^b$'], blacklist=['a']) - results = input_api.thread_pool.RunAsync() self.assertEqual(1, len(results)) self.assertEqual( presubmit.OutputApi.PresubmitNotifyResult, results[0].__class__) @@ -2864,11 +2843,7 @@ class CannedChecksUnittest(PresubmitTestsBase): input_api, presubmit.OutputApi, path='/path/to/foo') self.assertEquals(command.cmd, ['cipd', 'ensure-file-verify', '-ensure-file', '/path/to/foo']) - self.assertEquals(command.kwargs, { - 'stdin': subprocess.PIPE, - 'stdout': subprocess.PIPE, - 'stderr': subprocess.STDOUT, - }) + self.assertEquals(command.kwargs, {}) def testCheckCIPDManifest_content(self): input_api = self.MockInputApi(None, False) @@ -2879,12 +2854,7 @@ class CannedChecksUnittest(PresubmitTestsBase): input_api, presubmit.OutputApi, content='manifest_content') self.assertEquals(command.cmd, ['cipd', 'ensure-file-verify', '-log-level', 'debug', '-ensure-file=-']) - self.assertEquals(command.stdin, 'manifest_content') - self.assertEquals(command.kwargs, { - 'stdin': subprocess.PIPE, - 'stdout': subprocess.PIPE, - 'stderr': subprocess.STDOUT, - }) + self.assertEquals(command.kwargs, {'stdin': 'manifest_content'}) def testCheckCIPDPackages(self): content = '\n'.join([ @@ -2906,12 +2876,7 @@ class CannedChecksUnittest(PresubmitTestsBase): }) self.assertEquals(command.cmd, ['cipd', 'ensure-file-verify', '-ensure-file=-']) - self.assertEquals(command.stdin, content) - self.assertEquals(command.kwargs, { - 'stdin': subprocess.PIPE, - 'stdout': subprocess.PIPE, - 'stderr': subprocess.STDOUT, - }) + self.assertEquals(command.kwargs, {'stdin': content}) def testCannedCheckVPythonSpec(self): change = presubmit.Change('a', 'b', self.fake_root_dir, None, 0, 0, None) @@ -2934,12 +2899,7 @@ class CannedChecksUnittest(PresubmitTestsBase): '-vpython-tool', 'verify' ]) self.assertDictEqual( - commands[0].kwargs, - { - 'stderr': input_api.subprocess.STDOUT, - 'stdout': input_api.subprocess.PIPE, - 'stdin': input_api.subprocess.PIPE, - }) + commands[0].kwargs, {'stderr': input_api.subprocess.STDOUT}) self.assertEqual(commands[0].message, presubmit.OutputApi.PresubmitError) self.assertIsNone(commands[0].info)