diff --git a/gclient.py b/gclient.py index aa80b8660a..4699d1f955 100644 --- a/gclient.py +++ b/gclient.py @@ -49,8 +49,9 @@ Hooks ] """ -__version__ = "0.5.2" +__version__ = "0.6" +import copy import logging import optparse import os @@ -355,12 +356,13 @@ class Dependency(GClientKeywords, gclient_utils.WorkItem): args + [self.parsed_url.GetFilename()], self._file_list) else: + # Create a shallow copy to mutate revision. + options = copy.copy(options) options.revision = revision_overrides.get(self.name) scm = gclient_scm.CreateSCM(self.parsed_url, self.root_dir(), self.name) scm.RunCommand(command, options, args, self._file_list) self._file_list = [os.path.join(self.name, f.strip()) for f in self._file_list] - options.revision = None self.processed = True if self.recursion_limit() > 0: # Then we can parse the DEPS file. @@ -710,7 +712,7 @@ solutions = [ pm = None if command == 'update' and not self._options.verbose: pm = Progress('Syncing projects', 1) - work_queue = gclient_utils.ExecutionQueue(pm) + work_queue = gclient_utils.ExecutionQueue(self._options.jobs, pm) for s in self.dependencies: work_queue.enqueue(s) work_queue.flush(self._options, revision_overrides, command, args, @@ -755,7 +757,7 @@ solutions = [ if not self.dependencies: raise gclient_utils.Error('No solution specified') # Load all the settings. - work_queue = gclient_utils.ExecutionQueue(None) + work_queue = gclient_utils.ExecutionQueue(self._options.jobs, None) for s in self.dependencies: work_queue.enqueue(s) work_queue.flush(self._options, {}, None, [], work_queue) @@ -1168,6 +1170,8 @@ def Main(argv): ' %-10s %s' % (fn[3:], Command(fn[3:]).__doc__.split('\n')[0].strip()) for fn in dir(sys.modules[__name__]) if fn.startswith('CMD')])) parser = optparse.OptionParser(version='%prog ' + __version__) + parser.add_option('-j', '--jobs', default=1, type='int', + help='Specify how many SCM commands can run in parallel') parser.add_option('-v', '--verbose', action='count', default=0, help='Produces additional output for diagnostics. Can be ' 'used up to three times for more logging info.') @@ -1186,6 +1190,8 @@ def Main(argv): logging.basicConfig(level=level, format='%(module)s(%(lineno)d) %(funcName)s:%(message)s') options.entries_filename = options.config_filename + '_entries' + if options.jobs < 1: + parser.error('--jobs must be 1 or higher') # These hacks need to die. if not hasattr(options, 'revisions'): diff --git a/gclient_utils.py b/gclient_utils.py index c530f4cb4c..b2b0093140 100644 --- a/gclient_utils.py +++ b/gclient_utils.py @@ -21,6 +21,7 @@ import re import stat import subprocess import sys +import threading import time import threading import xml.dom.minidom @@ -378,21 +379,30 @@ class WorkItem(object): class ExecutionQueue(object): - """Dependencies sometime needs to be run out of order due to From() keyword. + """Runs a set of WorkItem that have interdependencies and were WorkItem are + added as they are processed. - This class manages that all the required dependencies are run before running - each one. + In gclient's case, Dependencies sometime needs to be run out of order due to + From() keyword. This class manages that all the required dependencies are run + before running each one. - Methods of this class are multithread safe. + Methods of this class are thread safe. """ - def __init__(self, progress): - self.lock = threading.Lock() - # List of WorkItem, Dependency inherits from WorkItem. + def __init__(self, jobs, progress): + """jobs specifies the number of concurrent tasks to allow. progress is a + Progress instance.""" + # Set when a thread is done or a new item is enqueued. + self.ready_cond = threading.Condition() + # Maximum number of concurrent tasks. + self.jobs = jobs + # List of WorkItem, for gclient, these are Dependency instances. self.queued = [] # List of strings representing each Dependency.name that was run. self.ran = [] # List of items currently running. self.running = [] + # Exceptions thrown if any. + self.exceptions = [] self.progress = progress if self.progress: self.progress.update() @@ -402,71 +412,99 @@ class ExecutionQueue(object): satisfied. """ assert isinstance(d, WorkItem) + self.ready_cond.acquire() try: - self.lock.acquire() self.queued.append(d) total = len(self.queued) + len(self.ran) + len(self.running) + logging.debug('enqueued(%s)' % d.name) + if self.progress: + self.progress._total = total + 1 + self.progress.update(0) + self.ready_cond.notifyAll() finally: - self.lock.release() - if self.progress: - self.progress._total = total + 1 - self.progress.update(0) + self.ready_cond.release() def flush(self, *args, **kwargs): """Runs all enqueued items until all are executed.""" - while self._run_one_item(*args, **kwargs): - pass - queued = [] - running = [] + self.ready_cond.acquire() try: - self.lock.acquire() - if self.queued: - queued = self.queued - self.queued = [] - if self.running: - running = self.running - self.running = [] + while True: + # Check for task to run first, then wait. + while True: + if self.exceptions: + # Systematically flush the queue when there is an exception logged + # in. + self.queued = [] + # Flush threads that have terminated. + self.running = [t for t in self.running if t.isAlive()] + if not self.queued and not self.running: + break + if self.jobs == len(self.running): + break + for i in xrange(len(self.queued)): + # Verify its requirements. + for r in self.queued[i].requirements: + if not r in self.ran: + # Requirement not met. + break + else: + # Start one work item: all its requirements are satisfied. + d = self.queued.pop(i) + new_thread = self._Worker(self, d, args=args, kwargs=kwargs) + if self.jobs > 1: + # Start the thread. + self.running.append(new_thread) + new_thread.start() + else: + # Run the 'thread' inside the main thread. + new_thread.run() + break + else: + # Couldn't find an item that could run. Break out the outher loop. + break + if not self.queued and not self.running: + break + # We need to poll here otherwise Ctrl-C isn't processed. + self.ready_cond.wait(10) + # Something happened: self.enqueue() or a thread terminated. Loop again. finally: - self.lock.release() + self.ready_cond.release() + assert not self.running, 'Now guaranteed to be single-threaded' + if self.exceptions: + # TODO(maruel): Get back the original stack location. + raise self.exceptions.pop(0) if self.progress: self.progress.end() - if queued: - raise gclient_utils.Error('Entries still queued: %s' % str(queued)) - if running: - raise gclient_utils.Error('Entries still queued: %s' % str(running)) - def _run_one_item(self, *args, **kwargs): - """Removes one item from the queue that has all its requirements completed - and execute it. + class _Worker(threading.Thread): + """One thread to execute one WorkItem.""" + def __init__(self, parent, item, args=(), kwargs=None): + threading.Thread.__init__(self, name=item.name or 'Worker') + self.args = args + self.kwargs = kwargs or {} + self.item = item + self.parent = parent + + def run(self): + """Runs in its own thread.""" + logging.debug('running(%s)' % self.item.name) + exception = None + try: + self.item.run(*self.args, **self.kwargs) + except Exception, e: + # TODO(maruel): Catch exception location. + exception = e - Returns False if no item could be run. - """ - i = 0 - d = None - try: - self.lock.acquire() - while i != len(self.queued) and not d: - d = self.queued.pop(i) - for r in d.requirements: - if not r in self.ran: - self.queued.insert(i, d) - d = None - break - i += 1 - if not d: - return False - self.running.append(d) - finally: - self.lock.release() - d.run(*args, **kwargs) - try: - self.lock.acquire() - assert not d.name in self.ran - if not d.name in self.ran: - self.ran.append(d.name) - self.running.remove(d) - if self.progress: - self.progress.update(1) - finally: - self.lock.release() - return True + # This assumes the following code won't throw an exception. Bad. + self.parent.ready_cond.acquire() + try: + if exception: + self.parent.exceptions.append(exception) + if self.parent.progress: + self.parent.progress.update(1) + assert not self.item.name in self.parent.ran + if not self.item.name in self.parent.ran: + self.parent.ran.append(self.item.name) + finally: + self.parent.ready_cond.notifyAll() + self.parent.ready_cond.release()