|
|
|
@ -217,124 +217,120 @@ def CheckCallAndFilterAndHeader(args, always=False, **kwargs):
|
|
|
|
|
return CheckCallAndFilter(args, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def SoftClone(obj):
|
|
|
|
|
"""Clones an object. copy.copy() doesn't work on 'file' objects."""
|
|
|
|
|
if obj.__class__.__name__ == 'SoftCloned':
|
|
|
|
|
return obj
|
|
|
|
|
class SoftCloned(object):
|
|
|
|
|
pass
|
|
|
|
|
new_obj = SoftCloned()
|
|
|
|
|
for member in dir(obj):
|
|
|
|
|
if member.startswith('_'):
|
|
|
|
|
continue
|
|
|
|
|
setattr(new_obj, member, getattr(obj, member))
|
|
|
|
|
return new_obj
|
|
|
|
|
class Wrapper(object):
|
|
|
|
|
"""Wraps an object, acting as a transparent proxy for all properties by
|
|
|
|
|
default.
|
|
|
|
|
"""
|
|
|
|
|
def __init__(self, wrapped):
|
|
|
|
|
self._wrapped = wrapped
|
|
|
|
|
|
|
|
|
|
def __getattr__(self, name):
|
|
|
|
|
return getattr(self._wrapped, name)
|
|
|
|
|
|
|
|
|
|
def MakeFileAutoFlush(fileobj, delay=10):
|
|
|
|
|
|
|
|
|
|
class AutoFlush(Wrapper):
|
|
|
|
|
"""Creates a file object clone to automatically flush after N seconds."""
|
|
|
|
|
if hasattr(fileobj, 'last_flushed_at'):
|
|
|
|
|
# Already patched. Just update delay.
|
|
|
|
|
fileobj.delay = delay
|
|
|
|
|
return fileobj
|
|
|
|
|
def __init__(self, wrapped, delay):
|
|
|
|
|
super(AutoFlush, self).__init__(wrapped)
|
|
|
|
|
if not hasattr(self, 'lock'):
|
|
|
|
|
self.lock = threading.Lock()
|
|
|
|
|
self.__last_flushed_at = time.time()
|
|
|
|
|
self.delay = delay
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def autoflush(self):
|
|
|
|
|
return self
|
|
|
|
|
|
|
|
|
|
# Attribute 'XXX' defined outside __init__
|
|
|
|
|
# pylint: disable=W0201
|
|
|
|
|
new_fileobj = SoftClone(fileobj)
|
|
|
|
|
if not hasattr(new_fileobj, 'lock'):
|
|
|
|
|
new_fileobj.lock = threading.Lock()
|
|
|
|
|
new_fileobj.last_flushed_at = time.time()
|
|
|
|
|
new_fileobj.delay = delay
|
|
|
|
|
new_fileobj.old_auto_flush_write = new_fileobj.write
|
|
|
|
|
# Silence pylint.
|
|
|
|
|
new_fileobj.flush = fileobj.flush
|
|
|
|
|
|
|
|
|
|
def auto_flush_write(out):
|
|
|
|
|
new_fileobj.old_auto_flush_write(out)
|
|
|
|
|
def write(self, out, *args, **kwargs):
|
|
|
|
|
self._wrapped.write(out, *args, **kwargs)
|
|
|
|
|
should_flush = False
|
|
|
|
|
new_fileobj.lock.acquire()
|
|
|
|
|
self.lock.acquire()
|
|
|
|
|
try:
|
|
|
|
|
if (new_fileobj.delay and
|
|
|
|
|
(time.time() - new_fileobj.last_flushed_at) > new_fileobj.delay):
|
|
|
|
|
if self.delay and (time.time() - self.__last_flushed_at) > self.delay:
|
|
|
|
|
should_flush = True
|
|
|
|
|
new_fileobj.last_flushed_at = time.time()
|
|
|
|
|
self.__last_flushed_at = time.time()
|
|
|
|
|
finally:
|
|
|
|
|
new_fileobj.lock.release()
|
|
|
|
|
self.lock.release()
|
|
|
|
|
if should_flush:
|
|
|
|
|
new_fileobj.flush()
|
|
|
|
|
|
|
|
|
|
new_fileobj.write = auto_flush_write
|
|
|
|
|
return new_fileobj
|
|
|
|
|
self.flush()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def MakeFileAnnotated(fileobj, include_zero=False):
|
|
|
|
|
class Annotated(Wrapper):
|
|
|
|
|
"""Creates a file object clone to automatically prepends every line in worker
|
|
|
|
|
threads with a NN> prefix."""
|
|
|
|
|
if hasattr(fileobj, 'output_buffers'):
|
|
|
|
|
# Already patched.
|
|
|
|
|
return fileobj
|
|
|
|
|
threads with a NN> prefix.
|
|
|
|
|
"""
|
|
|
|
|
def __init__(self, wrapped, include_zero=False):
|
|
|
|
|
super(Annotated, self).__init__(wrapped)
|
|
|
|
|
if not hasattr(self, 'lock'):
|
|
|
|
|
self.lock = threading.Lock()
|
|
|
|
|
self.__output_buffers = {}
|
|
|
|
|
self.__include_zero = include_zero
|
|
|
|
|
|
|
|
|
|
# Attribute 'XXX' defined outside __init__
|
|
|
|
|
# pylint: disable=W0201
|
|
|
|
|
new_fileobj = SoftClone(fileobj)
|
|
|
|
|
if not hasattr(new_fileobj, 'lock'):
|
|
|
|
|
new_fileobj.lock = threading.Lock()
|
|
|
|
|
new_fileobj.output_buffers = {}
|
|
|
|
|
new_fileobj.old_annotated_write = new_fileobj.write
|
|
|
|
|
|
|
|
|
|
def annotated_write(out):
|
|
|
|
|
index = getattr(threading.currentThread(), 'index', None)
|
|
|
|
|
if index is None:
|
|
|
|
|
if not include_zero:
|
|
|
|
|
# Unindexed threads aren't buffered.
|
|
|
|
|
new_fileobj.old_annotated_write(out)
|
|
|
|
|
return
|
|
|
|
|
index = 0
|
|
|
|
|
|
|
|
|
|
new_fileobj.lock.acquire()
|
|
|
|
|
@property
|
|
|
|
|
def annotated(self):
|
|
|
|
|
return self
|
|
|
|
|
|
|
|
|
|
def write(self, out):
|
|
|
|
|
index = getattr(threading.currentThread(), 'index', 0)
|
|
|
|
|
if not index and not self.__include_zero:
|
|
|
|
|
# Unindexed threads aren't buffered.
|
|
|
|
|
return self._wrapped.write(out)
|
|
|
|
|
|
|
|
|
|
self.lock.acquire()
|
|
|
|
|
try:
|
|
|
|
|
# Use a dummy array to hold the string so the code can be lockless.
|
|
|
|
|
# Strings are immutable, requiring to keep a lock for the whole dictionary
|
|
|
|
|
# otherwise. Using an array is faster than using a dummy object.
|
|
|
|
|
if not index in new_fileobj.output_buffers:
|
|
|
|
|
obj = new_fileobj.output_buffers[index] = ['']
|
|
|
|
|
if not index in self.__output_buffers:
|
|
|
|
|
obj = self.__output_buffers[index] = ['']
|
|
|
|
|
else:
|
|
|
|
|
obj = new_fileobj.output_buffers[index]
|
|
|
|
|
obj = self.__output_buffers[index]
|
|
|
|
|
finally:
|
|
|
|
|
new_fileobj.lock.release()
|
|
|
|
|
self.lock.release()
|
|
|
|
|
|
|
|
|
|
# Continue lockless.
|
|
|
|
|
obj[0] += out
|
|
|
|
|
while '\n' in obj[0]:
|
|
|
|
|
line, remaining = obj[0].split('\n', 1)
|
|
|
|
|
if line:
|
|
|
|
|
new_fileobj.old_annotated_write('%d>%s\n' % (index, line))
|
|
|
|
|
self._wrapped.write('%d>%s\n' % (index, line))
|
|
|
|
|
obj[0] = remaining
|
|
|
|
|
|
|
|
|
|
def full_flush():
|
|
|
|
|
def flush(self):
|
|
|
|
|
"""Flush buffered output."""
|
|
|
|
|
orphans = []
|
|
|
|
|
new_fileobj.lock.acquire()
|
|
|
|
|
self.lock.acquire()
|
|
|
|
|
try:
|
|
|
|
|
# Detect threads no longer existing.
|
|
|
|
|
indexes = (getattr(t, 'index', None) for t in threading.enumerate())
|
|
|
|
|
indexes = filter(None, indexes)
|
|
|
|
|
for index in new_fileobj.output_buffers:
|
|
|
|
|
for index in self.__output_buffers:
|
|
|
|
|
if not index in indexes:
|
|
|
|
|
orphans.append((index, new_fileobj.output_buffers[index][0]))
|
|
|
|
|
orphans.append((index, self.__output_buffers[index][0]))
|
|
|
|
|
for orphan in orphans:
|
|
|
|
|
del new_fileobj.output_buffers[orphan[0]]
|
|
|
|
|
del self.__output_buffers[orphan[0]]
|
|
|
|
|
finally:
|
|
|
|
|
new_fileobj.lock.release()
|
|
|
|
|
self.lock.release()
|
|
|
|
|
|
|
|
|
|
# Don't keep the lock while writting. Will append \n when it shouldn't.
|
|
|
|
|
for orphan in orphans:
|
|
|
|
|
if orphan[1]:
|
|
|
|
|
new_fileobj.old_annotated_write('%d>%s\n' % (orphan[0], orphan[1]))
|
|
|
|
|
self._wrapped.write('%d>%s\n' % (orphan[0], orphan[1]))
|
|
|
|
|
return self._wrapped.flush()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
new_fileobj.write = annotated_write
|
|
|
|
|
new_fileobj.full_flush = full_flush
|
|
|
|
|
return new_fileobj
|
|
|
|
|
def MakeFileAutoFlush(fileobj, delay=10):
|
|
|
|
|
autoflush = getattr(fileobj, 'autoflush', None)
|
|
|
|
|
if autoflush:
|
|
|
|
|
autoflush.delay = delay
|
|
|
|
|
return fileobj
|
|
|
|
|
return AutoFlush(fileobj, delay)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def MakeFileAnnotated(fileobj, include_zero=False):
|
|
|
|
|
if getattr(fileobj, 'annotated', None):
|
|
|
|
|
return fileobj
|
|
|
|
|
return Annotated(fileobj)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def CheckCallAndFilter(args, stdout=None, filter_fn=None,
|
|
|
|
@ -638,7 +634,7 @@ class ExecutionQueue(object):
|
|
|
|
|
self.running.append(t)
|
|
|
|
|
else:
|
|
|
|
|
t.join()
|
|
|
|
|
sys.stdout.full_flush() # pylint: disable=E1101
|
|
|
|
|
sys.stdout.flush()
|
|
|
|
|
if self.progress:
|
|
|
|
|
self.progress.update(1, t.item.name)
|
|
|
|
|
if t.item.name in self.ran:
|
|
|
|
|