@ -114,68 +114,49 @@ OK = object()
FAIL = object ( )
class ProcessObservers ( object ) :
""" ProcessObservers allows monitoring of child process. """
def poke ( self ) :
""" poke is called when child process sent `BUF_SIZE` data to stdout. """
pass
def cancel ( self ) :
""" cancel is called once proc exists successfully. """
pass
class PsPrinter ( ProcessObservers ) :
def __init__ ( self , interval = 300 ) :
class RepeatingTimer ( threading . Thread ) :
""" Call a function every n seconds, unless reset. """
def __init__ ( self , interval , function , args = None , kwargs = None ) :
threading . Thread . __init__ ( self )
self . interval = interval
self . active = sys . platform . startswith ( ' linux2 ' )
self . thread = None
def print_pstree ( self ) :
""" Debugging function used to print " ps auxwwf " for stuck processes. """
self . function = function
self . args = args if args is not None else [ ]
self . kwargs = kwargs if kwargs is not None else { }
self . cond = threading . Condition ( )
self . is_shutdown = False
def reset ( self ) :
""" Resets timer interval. """
with self . cond :
self . is_reset = True
self . cond . notify_all ( )
def shutdown ( self ) :
""" Stops repeating timer. """
with self . cond :
self . is_shutdown = True
self . cond . notify_all ( )
def run ( self ) :
with self . cond :
while not self . is_shutdown :
self . cond . wait ( self . interval )
if not self . is_reset and not self . is_shutdown :
self . function ( * self . args , * * self . kwargs )
self . is_reset = False
def _print_pstree ( ) :
""" Debugging function used to print " ps auxwwf " for stuck processes. """
if sys . platform . startswith ( ' linux2 ' ) :
# Add new line for cleaner output
print ( )
subprocess . call ( [ ' ps ' , ' auxwwf ' ] )
# Restart timer, we want to continue printing until the process is
# terminated.
self . poke ( )
def poke ( self ) :
if self . active :
self . cancel ( )
self . thread = threading . Timer ( self . interval , self . print_pstree )
self . thread . start ( )
def cancel ( self ) :
if self . active and self . thread is not None :
self . thread . cancel ( )
self . thread = None
class StaleProcess ( ProcessObservers ) :
''' StaleProcess terminates process if there is no poke call in `interval`. '''
def __init__ ( self , interval , proc ) :
self . interval = interval
self . proc = proc
self . thread = None
def _terminate_process ( self ) :
print ( ' Terminating stale process... ' )
self . proc . terminate ( )
def poke ( self ) :
self . cancel ( )
if self . interval > 0 :
self . thread = threading . Timer ( self . interval , self . _terminate_process )
self . thread . start ( )
def cancel ( self ) :
if self . thread is not None :
self . thread . cancel ( )
self . thread = None
def _terminate_process ( proc ) :
print ( ' Terminating stale process... ' )
proc . terminate ( )
def call ( * args , * * kwargs ) : # pragma: no cover
@ -206,17 +187,17 @@ def call(*args, **kwargs): # pragma: no cover
proc . stdin . close ( )
stale_process_duration = env . get ( ' STALE_PROCESS_DURATION ' ,
STALE_PROCESS_DURATION )
observers = [ PsPrinter ( ) , StaleProcess ( int ( stale_process_duration ) , proc ) ]
observers = [
RepeatingTimer ( 300 , _print_pstree ) ,
RepeatingTimer ( int ( stale_process_duration ) , _terminate_process , [ proc ] ) ]
for observer in observers :
observer . start ( )
# This is here because passing 'sys.stdout' into stdout for proc will
# produce out of order output.
hanging_cr = False
while True :
for observer in observers :
try :
observer . poke ( )
except :
print ( ' failed to poke, active thread count is %d ' % threading . active_count ( ) )
raise
observer . reset ( )
buf = proc . stdout . read ( BUF_SIZE )
if not buf :
break
@ -232,7 +213,7 @@ def call(*args, **kwargs): # pragma: no cover
sys . stdout . write ( ' \n ' )
out . write ( ' \n ' )
for observer in observers :
observer . cancel ( )
observer . shutdown ( )
code = proc . wait ( )
elapsed_time = ( ( time . time ( ) - start_time ) / 60.0 )