diff --git a/gclient_scm.py b/gclient_scm.py index 26845b828..9ccc76f21 100644 --- a/gclient_scm.py +++ b/gclient_scm.py @@ -116,6 +116,9 @@ class SCMWrapper(object): This is the abstraction layer to bind to different SCM. """ + nag_timer = 30 + nag_max = 3 + def __init__(self, url=None, root_dir=None, relpath=None): self.url = url self._root_dir = root_dir @@ -195,6 +198,8 @@ class GitWrapper(SCMWrapper): gclient_utils.CheckCallAndFilter( ['git', 'diff', merge_base], cwd=self.checkout_path, + nag_timer=self.nag_timer, + nag_max=self.nag_max, filter_fn=GitDiffFilterer(self.relpath).Filter) def UpdateSubmoduleConfig(self): @@ -208,6 +213,8 @@ class GitWrapper(SCMWrapper): cmd4 = ['git', 'config', 'fetch.recurseSubmodules', 'false'] kwargs = {'cwd': self.checkout_path, 'print_stdout': False, + 'nag_timer': self.nag_timer, + 'nag_max': self.nag_max, 'filter_fn': lambda x: None} try: gclient_utils.CheckCallAndFilter(cmd, **kwargs) @@ -852,6 +859,8 @@ class GitWrapper(SCMWrapper): return subprocess2.check_output( ['git'] + args, stderr=subprocess2.PIPE, + nag_timer=self.nag_timer, + nag_max=self.nag_max, cwd=self.checkout_path).strip() def _UpdateBranchHeads(self, options, fetch=False): @@ -879,6 +888,8 @@ class GitWrapper(SCMWrapper): def _Run(self, args, options, **kwargs): kwargs.setdefault('cwd', self.checkout_path) kwargs.setdefault('print_stdout', True) + kwargs.setdefault('nag_timer', self.nag_timer) + kwargs.setdefault('nag_max', self.nag_max) stdout = kwargs.get('stdout', sys.stdout) stdout.write('\n________ running \'git %s\' in \'%s\'\n' % ( ' '.join(args), kwargs['cwd'])) @@ -928,6 +939,8 @@ class SVNWrapper(SCMWrapper): ['svn', 'diff', '-x', '--ignore-eol-style'] + args, cwd=self.checkout_path, print_stdout=False, + nag_timer=self.nag_timer, + nag_max=self.nag_max, filter_fn=SvnDiffFilterer(self.relpath).Filter) def update(self, options, args, file_list): @@ -1225,6 +1238,8 @@ class SVNWrapper(SCMWrapper): def _Run(self, args, options, **kwargs): """Runs a commands that goes to stdout.""" kwargs.setdefault('cwd', self.checkout_path) + kwargs.setdefault('nag_timer', self.nag_timer) + kwargs.setdefault('nag_max', self.nag_max) gclient_utils.CheckCallAndFilterAndHeader(['svn'] + args, always=options.verbose, **kwargs) diff --git a/gclient_utils.py b/gclient_utils.py index bf9803e86..d0fa201b2 100644 --- a/gclient_utils.py +++ b/gclient_utils.py @@ -375,7 +375,7 @@ class GClientChildren(object): def CheckCallAndFilter(args, stdout=None, filter_fn=None, print_stdout=None, call_filter_on_first_line=False, - **kwargs): + nag_timer=None, nag_max=None, **kwargs): """Runs a command and calls back a filter function if needed. Accepts all subprocess2.Popen() parameters plus: @@ -399,6 +399,21 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None, # Do a flush of stdout before we begin reading from the subprocess2's stdout stdout.flush() + nag = None + if nag_timer: + # Hack thread.index to force correct annotation. + index = getattr(threading.currentThread(), 'index', 0) + def _nag_cb(elapsed): + setattr(threading.currentThread(), 'index', index) + stdout.write(' No output for %.0f seconds from command:\n' % elapsed) + stdout.write(' %s\n' % kid.cmd_str) + if (nag_max and + int('%.0f' % (elapsed / nag_timer)) >= nag_max): + stdout.write(' ... killing it!\n') + kid.kill() + nag = subprocess2.NagTimer(nag_timer, _nag_cb) + nag.start() + # Also, we need to forward stdout to prevent weird re-ordering of output. # This has to be done on a per byte basis to make sure it is not buffered: # normally buffering is done for each line, but if svn requests input, no @@ -406,6 +421,8 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None, try: in_byte = kid.stdout.read(1) if in_byte: + if nag: + nag.event() if call_filter_on_first_line: filter_fn(None) in_line = '' @@ -422,6 +439,8 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None, filter_fn(in_line) in_line = '' in_byte = kid.stdout.read(1) + if in_byte and nag: + nag.event() # Flush the rest of buffered output. This is only an issue with # stdout/stderr not ending with a \n. if len(in_line): @@ -435,6 +454,9 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None, except KeyboardInterrupt: print >> sys.stderr, 'Failed while running "%s"' % ' '.join(args) raise + finally: + if nag: + nag.cancel() if rv: raise subprocess2.CalledProcessError( diff --git a/subprocess2.py b/subprocess2.py index e81798613..ac44555dc 100644 --- a/subprocess2.py +++ b/subprocess2.py @@ -132,6 +132,42 @@ def get_english_env(env): return env +class NagTimer(object): + """ + Triggers a callback when a time interval passes without an event being fired. + + For example, the event could be receiving terminal output from a subprocess; + and the callback could print a warning to stderr that the subprocess appeared + to be hung. + """ + def __init__(self, interval, cb): + self.interval = interval + self.cb = cb + self.timer = threading.Timer(self.interval, self.fn) + self.last_output = self.previous_last_output = 0 + + def start(self): + self.last_output = self.previous_last_output = time.time() + self.timer.start() + + def event(self): + self.last_output = time.time() + + def fn(self): + now = time.time() + if self.last_output == self.previous_last_output: + self.cb(now - self.previous_last_output) + # Use 0.1 fudge factor, just in case + # (self.last_output - now) is very close to zero. + sleep_time = (self.last_output - now - 0.1) % self.interval + self.previous_last_output = self.last_output + self.timer = threading.Timer(sleep_time + 0.1, self.fn) + self.timer.start() + + def cancel(self): + self.timer.cancel() + + class Popen(subprocess.Popen): """Wraps subprocess.Popen() with various workarounds. @@ -192,6 +228,7 @@ class Popen(subprocess.Popen): self.start = time.time() self.timeout = None self.nag_timer = None + self.nag_max = None self.shell = kwargs.get('shell', None) # Silence pylint on MacOSX self.returncode = None @@ -230,8 +267,7 @@ class Popen(subprocess.Popen): # because of memory exhaustion. queue = Queue.Queue() done = threading.Event() - timer = [] - last_output = [time.time()] * 2 + nag = None def write_stdin(): try: @@ -253,28 +289,12 @@ class Popen(subprocess.Popen): data = pipe.read(1) if not data: break - last_output[0] = time.time() + if nag: + nag.event() queue.put((name, data)) finally: queue.put(name) - def nag_fn(): - now = time.time() - if done.is_set(): - return - if last_output[0] == last_output[1]: - logging.warn(' No output for %.0f seconds from command:' % ( - now - last_output[1])) - logging.warn(' %s' % self.cmd_str) - # Use 0.1 fudge factor in case: - # now ~= last_output[0] + self.nag_timer - sleep_time = self.nag_timer + last_output[0] - now - 0.1 - while sleep_time < 0: - sleep_time += self.nag_timer - last_output[1] = last_output[0] - timer[0] = threading.Timer(sleep_time, nag_fn) - timer[0].start() - def timeout_fn(): try: done.wait(self.timeout) @@ -313,8 +333,15 @@ class Popen(subprocess.Popen): t.start() if self.nag_timer: - timer.append(threading.Timer(self.nag_timer, nag_fn)) - timer[0].start() + def _nag_cb(elapsed): + logging.warn(' No output for %.0f seconds from command:' % elapsed) + logging.warn(' %s' % self.cmd_str) + if (self.nag_max and + int('%.0f' % (elapsed / self.nag_timer)) >= self.nag_max): + queue.put('timeout') + done.set() # Must do this so that timeout thread stops waiting. + nag = NagTimer(self.nag_timer, _nag_cb) + nag.start() timed_out = False try: @@ -327,20 +354,22 @@ class Popen(subprocess.Popen): self.stderr_cb(item[1]) else: # A thread terminated. - threads[item].join() - del threads[item] + if item in threads: + threads[item].join() + del threads[item] if item == 'wait': # Terminate the timeout thread if necessary. done.set() elif item == 'timeout' and not timed_out and self.poll() is None: - logging.debug('Timed out after %fs: killing' % self.timeout) + logging.debug('Timed out after %.0fs: killing' % ( + time.time() - self.start)) self.kill() timed_out = True finally: # Stop the threads. done.set() - if timer: - timer[0].cancel() + if nag: + nag.cancel() if 'wait' in threads: # Accelerate things, otherwise it would hang until the child process is # done. @@ -353,7 +382,8 @@ class Popen(subprocess.Popen): self.returncode = TIMED_OUT # pylint: disable=W0221,W0622 - def communicate(self, input=None, timeout=None, nag_timer=None): + def communicate(self, input=None, timeout=None, nag_timer=None, + nag_max=None): """Adds timeout and callbacks support. Returns (stdout, stderr) like subprocess.Popen().communicate(). @@ -365,6 +395,7 @@ class Popen(subprocess.Popen): """ self.timeout = timeout self.nag_timer = nag_timer + self.nag_max = nag_max if (not self.timeout and not self.nag_timer and not self.stdout_cb and not self.stderr_cb): return super(Popen, self).communicate(input) @@ -393,7 +424,7 @@ class Popen(subprocess.Popen): return (stdout, stderr) -def communicate(args, timeout=None, nag_timer=None, **kwargs): +def communicate(args, timeout=None, nag_timer=None, nag_max=None, **kwargs): """Wraps subprocess.Popen().communicate() and add timeout support. Returns ((stdout, stderr), returncode). diff --git a/tests/gclient_scm_test.py b/tests/gclient_scm_test.py index f9a29bbfa..3640a3e25 100755 --- a/tests/gclient_scm_test.py +++ b/tests/gclient_scm_test.py @@ -107,6 +107,8 @@ class SVNWrapperTestCase(BaseTestCase): 'RunCommand', 'cleanup', 'diff', + 'nag_max', + 'nag_timer', 'pack', 'relpath', 'revert', @@ -496,6 +498,8 @@ class SVNWrapperTestCase(BaseTestCase): gclient_scm.gclient_utils.CheckCallAndFilterAndHeader( ['svn', 'checkout', '--depth', 'empty', self.url, self.base_path], always=True, + nag_max=3, + nag_timer=30, cwd=self.root_dir) gclient_scm.scm.SVN.RunAndGetFileList( options.verbose, @@ -530,7 +534,7 @@ class SVNWrapperTestCase(BaseTestCase): files_list = self.mox.CreateMockAnything() gclient_scm.gclient_utils.CheckCallAndFilterAndHeader( ['svn', 'export', join(self.url, 'DEPS'), join(self.base_path, 'DEPS')], - always=True, cwd=self.root_dir) + nag_timer=30, nag_max=3, always=True, cwd=self.root_dir) self.mox.ReplayAll() scm = self._scm_wrapper(url=self.url, root_dir=self.root_dir, @@ -563,6 +567,8 @@ class SVNWrapperTestCase(BaseTestCase): gclient_scm.gclient_utils.CheckCallAndFilterAndHeader( ['svn', 'checkout', '--depth', 'empty', self.url, self.base_path], always=True, + nag_max=3, + nag_timer=30, cwd=self.root_dir) gclient_scm.scm.SVN.RunAndGetFileList( options.verbose, @@ -787,6 +793,8 @@ class ManagedGitWrapperTestCase(BaseGitWrapperTestCase): 'RunCommand', 'cleanup', 'diff', + 'nag_max', + 'nag_timer', 'pack', 'UpdateSubmoduleConfig', 'relpath', diff --git a/tests/subprocess2_test.py b/tests/subprocess2_test.py index 62034adf5..dc020f2b8 100755 --- a/tests/subprocess2_test.py +++ b/tests/subprocess2_test.py @@ -78,7 +78,7 @@ class DefaultsTest(auto_stub.TestCase): results['args'] = args @staticmethod # pylint: disable=W0622 - def communicate(input=None, timeout=None, nag_timer=None): + def communicate(input=None, timeout=None, nag_max=None, nag_timer=None): return None, None self.mock(subprocess2, 'Popen', fake_Popen) return results