@ -297,8 +297,10 @@ def CheckCallAndFilterAndHeader(args, always=False, **kwargs):
def SoftClone ( obj ) :
""" Clones an object. copy.copy() doesn ' t work on ' file ' objects. """
class NewObject ( object ) : pass
new_obj = NewObject ( )
if obj . __class__ . __name__ == ' SoftCloned ' :
return obj
class SoftCloned ( object ) : pass
new_obj = SoftCloned ( )
for member in dir ( obj ) :
if member . startswith ( ' _ ' ) :
continue
@ -314,10 +316,11 @@ def MakeFileAutoFlush(fileobj, delay=10):
return fileobj
new_fileobj = SoftClone ( fileobj )
new_fileobj . lock = threading . Lock ( )
if not hasattr ( new_fileobj , ' lock ' ) :
new_fileobj . lock = threading . Lock ( )
new_fileobj . last_flushed_at = time . time ( )
new_fileobj . delay = delay
new_fileobj . old_auto_flush_write = fileobj. write
new_fileobj . old_auto_flush_write = new_ fileobj. write
# Silence pylint.
new_fileobj . flush = fileobj . flush
@ -339,27 +342,68 @@ def MakeFileAutoFlush(fileobj, delay=10):
return new_fileobj
class StdoutAnnotated ( object ) :
""" Prepends every line with a string."""
def __init__ ( self , prepend , stdout ) :
self . prepend = prepend
self . buf = ' '
self . stdout = stdout
def MakeFileAnnotated ( fileobj ) :
""" Creates a file object clone to automatically prepends every line in worker
threads with a NN > prefix . """
if hasattr ( fileobj , ' output_buffers ' ) :
# Already patched.
return fileobj
def write ( self , out ) :
self . buf + = out
while ' \n ' in self . buf :
line , self . buf = self . buf . split ( ' \n ' , 1 )
self . stdout . write ( self . prepend + line + ' \n ' )
new_fileobj = SoftClone ( fileobj )
if not hasattr ( new_fileobj , ' lock ' ) :
new_fileobj . lock = threading . Lock ( )
new_fileobj . output_buffers = { }
new_fileobj . old_annotated_write = new_fileobj . write
def annotated_write ( out ) :
index = getattr ( threading . currentThread ( ) , ' index ' , None )
if index is None :
# Undexed threads aren't buffered.
new_fileobj . old_annotated_write ( out )
return
def flush ( self ) :
pass
new_fileobj . lock . acquire ( )
try :
# Use a dummy array to hold the string so the code can be lockless.
# Strings are immutable, requiring to keep a lock for the whole dictionary
# otherwise. Using an array is faster than using a dummy object.
if not index in new_fileobj . output_buffers :
obj = new_fileobj . output_buffers [ index ] = [ ' ' ]
else :
obj = new_fileobj . output_buffers [ index ]
finally :
new_fileobj . lock . release ( )
def full_flush ( self ) :
if self . buf :
self . stdout . write ( self . prepend + self . buf )
self . stdout . flush ( )
self . buf = ' '
# Continue lockless.
obj [ 0 ] + = out
while ' \n ' in obj [ 0 ] :
line , remaining = obj [ 0 ] . split ( ' \n ' , 1 )
new_fileobj . old_annotated_write ( ' %d > %s \n ' % ( index , line ) )
obj [ 0 ] = remaining
def full_flush ( ) :
""" Flush buffered output. """
orphans = [ ]
new_fileobj . lock . acquire ( )
try :
# Detect threads no longer existing.
indexes = ( getattr ( t , ' index ' , None ) for t in threading . enumerate ( ) )
indexed = filter ( None , indexes )
for index in new_fileobj . output_buffers :
if not index in indexes :
orphans . append ( ( index , new_fileobj . output_buffers [ index ] [ 0 ] ) )
for orphan in orphans :
del new_fileobj . output_buffers [ orphan [ 0 ] ]
finally :
new_fileobj . lock . release ( )
# Don't keep the lock while writting. Will append \n when it shouldn't.
for orphan in orphans :
new_fileobj . old_annotated_write ( ' %d > %s \n ' % ( orphan [ 0 ] , orphan [ 1 ] ) )
new_fileobj . write = annotated_write
new_fileobj . full_flush = full_flush
return new_fileobj
def CheckCallAndFilter ( args , stdout = None , filter_fn = None ,
@ -628,12 +672,10 @@ class ExecutionQueue(object):
if self . jobs > 1 :
# Start the thread.
index = len ( self . ran ) + len ( self . running ) + 1
# Copy 'options' and add annotated stdout .
# Copy 'options' .
task_kwargs = kwargs . copy ( )
task_kwargs [ ' options ' ] = copy . copy ( task_kwargs [ ' options ' ] )
task_kwargs [ ' options ' ] . stdout = StdoutAnnotated (
' %d > ' % index , task_kwargs [ ' options ' ] . stdout )
new_thread = self . _Worker ( task_item , args , task_kwargs )
new_thread = self . _Worker ( task_item , index , args , task_kwargs )
self . running . append ( new_thread )
new_thread . start ( )
else :
@ -646,10 +688,11 @@ class ExecutionQueue(object):
class _Worker ( threading . Thread ) :
""" One thread to execute one WorkItem. """
def __init__ ( self , item , args, kwargs ) :
def __init__ ( self , item , index, args, kwargs ) :
threading . Thread . __init__ ( self , name = item . name or ' Worker ' )
logging . info ( item . name )
self . item = item
self . index = index
self . args = args
self . kwargs = kwargs