diff --git a/recipes/recipe_modules/bot_update/resources/bot_update.py b/recipes/recipe_modules/bot_update/resources/bot_update.py index 808e606d3..058dcaf71 100755 --- a/recipes/recipe_modules/bot_update/resources/bot_update.py +++ b/recipes/recipe_modules/bot_update/resources/bot_update.py @@ -114,68 +114,49 @@ OK = object() FAIL = object() -class ProcessObservers(object): - """ProcessObservers allows monitoring of child process.""" - - def poke(self): - """poke is called when child process sent `BUF_SIZE` data to stdout.""" - pass - - def cancel(self): - """cancel is called once proc exists successfully.""" - pass - - -class PsPrinter(ProcessObservers): - def __init__(self, interval=300): +class RepeatingTimer(threading.Thread): + """Call a function every n seconds, unless reset.""" + def __init__(self, interval, function, args=None, kwargs=None): + threading.Thread.__init__(self) self.interval = interval - self.active = sys.platform.startswith('linux2') - self.thread = None - - def print_pstree(self): - """Debugging function used to print "ps auxwwf" for stuck processes.""" + self.function = function + self.args = args if args is not None else [] + self.kwargs = kwargs if kwargs is not None else {} + self.cond = threading.Condition() + self.is_shutdown = False + + def reset(self): + """Resets timer interval.""" + with self.cond: + self.is_reset = True + self.cond.notify_all() + + def shutdown(self): + """Stops repeating timer.""" + with self.cond: + self.is_shutdown = True + self.cond.notify_all() + + def run(self): + with self.cond: + while not self.is_shutdown: + self.cond.wait(self.interval) + if not self.is_reset and not self.is_shutdown: + self.function(*self.args, **self.kwargs) + self.is_reset = False + + +def _print_pstree(): + """Debugging function used to print "ps auxwwf" for stuck processes.""" + if sys.platform.startswith('linux2'): # Add new line for cleaner output print() subprocess.call(['ps', 'auxwwf']) - # Restart timer, we want to continue printing until the process is - # terminated. - self.poke() - - def poke(self): - if self.active: - self.cancel() - self.thread = threading.Timer(self.interval, self.print_pstree) - self.thread.start() - - def cancel(self): - if self.active and self.thread is not None: - self.thread.cancel() - self.thread = None - -class StaleProcess(ProcessObservers): - '''StaleProcess terminates process if there is no poke call in `interval`. ''' - - def __init__(self, interval, proc): - self.interval = interval - self.proc = proc - self.thread = None - - def _terminate_process(self): - print('Terminating stale process...') - self.proc.terminate() - - def poke(self): - self.cancel() - if self.interval > 0: - self.thread = threading.Timer(self.interval, self._terminate_process) - self.thread.start() - - def cancel(self): - if self.thread is not None: - self.thread.cancel() - self.thread = None +def _terminate_process(proc): + print('Terminating stale process...') + proc.terminate() def call(*args, **kwargs): # pragma: no cover @@ -206,17 +187,17 @@ def call(*args, **kwargs): # pragma: no cover proc.stdin.close() stale_process_duration = env.get('STALE_PROCESS_DURATION', STALE_PROCESS_DURATION) - observers = [PsPrinter(), StaleProcess(int(stale_process_duration), proc)] + observers = [ + RepeatingTimer(300, _print_pstree), + RepeatingTimer(int(stale_process_duration), _terminate_process, [proc])] + for observer in observers: + observer.start() # This is here because passing 'sys.stdout' into stdout for proc will # produce out of order output. hanging_cr = False while True: for observer in observers: - try: - observer.poke() - except: - print('failed to poke, active thread count is %d' % threading.active_count()) - raise + observer.reset() buf = proc.stdout.read(BUF_SIZE) if not buf: break @@ -232,7 +213,7 @@ def call(*args, **kwargs): # pragma: no cover sys.stdout.write('\n') out.write('\n') for observer in observers: - observer.cancel() + observer.shutdown() code = proc.wait() elapsed_time = ((time.time() - start_time) / 60.0)