From 9e5317ac389249633ba1886bf656a915e83a8d11 Mon Sep 17 00:00:00 2001 From: "maruel@chromium.org" Date: Fri, 13 Aug 2010 20:35:11 +0000 Subject: [PATCH] Add --jobs support to gclient. --jobs=1 is still the default for now. Huge thanks to piman@ for working on a patch. I chose a different design but he gave me motivation and ideas. Sorry for not accepting his patch earlier, this was mostly due to broken gclient implementation itself. gclient can now run an unlimited number of parallel checkouts and always keep the checkout coherency correct. --jobs=1 is single threaded as before, albeit with a different code path. Issues: - Using --jobs with a value other than 1 will result in a mangled output. - Exceptions thrown in a thread will be have the wrong stack trace. TEST=gclient sync -j 99 in a ssh:// chromiumos checkout is dramatically faster. --- Here's the perf on linux on i7-860 for a chromium checkout with warm cache. Cold cache will result is significantly reduced improvements so this is best case improvements. The sync was no-op all the time except where noted. All execution where with "time gclient sync " + args. Didn't include 'sys' column since it was statistically insignifiant and highly correlated with 'user'. runs with -f runs with -m without -f nor -m args real user real user real user -j 12 20.59s 18.00s 5.64s 7.95s 5.86s 8.10s #1 1m05.26s 20.02s 5.20s 7.94s 5.10s 8.09s 22.79s 18.17s -j 1 #2 1m47.00s 16.72s 9.69s 5.72s 12.35s 5.96s 1m31.28s 17.06s 9.54s 5.85s 10.51s 6.20s 1m31.79s 16.39s before #3 1m30.94s 16.74s 9.77s 5.83s 10.45s 5.77s 1m30.17s 17.30s 10.36s 5.68s 10.16s 5.88s hook #4 8.52s 7.93s 8.73s 8.13s #1 This particular run synched to r56023, a webkit roll updating layout tests. It's still faster than a no-op sync without parallel checkout. #2 Maybe there was a sync or computer hickup, I didn't realize. #3 This is depot_tools@56020 #4 Since -f implies runhooks, I ran the hook 'python src/build/gyp_chromium' manually to compare. Hooks are still run in a single thread. I didn't rest 'gclient runhooks'. I tried to go a ssh:// checkout of chromium os tree but it timed out everytime I tried to sync so I couldn't get data points. I expect an order of magnitude of improvement or more. Review URL: http://codereview.chromium.org/3135014 git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@56079 0039d316-1c4b-4281-b951-d872f2087c98 --- gclient.py | 14 ++-- gclient_utils.py | 162 +++++++++++++++++++++++++++++------------------ 2 files changed, 110 insertions(+), 66 deletions(-) diff --git a/gclient.py b/gclient.py index aa80b8660a..4699d1f955 100644 --- a/gclient.py +++ b/gclient.py @@ -49,8 +49,9 @@ Hooks ] """ -__version__ = "0.5.2" +__version__ = "0.6" +import copy import logging import optparse import os @@ -355,12 +356,13 @@ class Dependency(GClientKeywords, gclient_utils.WorkItem): args + [self.parsed_url.GetFilename()], self._file_list) else: + # Create a shallow copy to mutate revision. + options = copy.copy(options) options.revision = revision_overrides.get(self.name) scm = gclient_scm.CreateSCM(self.parsed_url, self.root_dir(), self.name) scm.RunCommand(command, options, args, self._file_list) self._file_list = [os.path.join(self.name, f.strip()) for f in self._file_list] - options.revision = None self.processed = True if self.recursion_limit() > 0: # Then we can parse the DEPS file. @@ -710,7 +712,7 @@ solutions = [ pm = None if command == 'update' and not self._options.verbose: pm = Progress('Syncing projects', 1) - work_queue = gclient_utils.ExecutionQueue(pm) + work_queue = gclient_utils.ExecutionQueue(self._options.jobs, pm) for s in self.dependencies: work_queue.enqueue(s) work_queue.flush(self._options, revision_overrides, command, args, @@ -755,7 +757,7 @@ solutions = [ if not self.dependencies: raise gclient_utils.Error('No solution specified') # Load all the settings. - work_queue = gclient_utils.ExecutionQueue(None) + work_queue = gclient_utils.ExecutionQueue(self._options.jobs, None) for s in self.dependencies: work_queue.enqueue(s) work_queue.flush(self._options, {}, None, [], work_queue) @@ -1168,6 +1170,8 @@ def Main(argv): ' %-10s %s' % (fn[3:], Command(fn[3:]).__doc__.split('\n')[0].strip()) for fn in dir(sys.modules[__name__]) if fn.startswith('CMD')])) parser = optparse.OptionParser(version='%prog ' + __version__) + parser.add_option('-j', '--jobs', default=1, type='int', + help='Specify how many SCM commands can run in parallel') parser.add_option('-v', '--verbose', action='count', default=0, help='Produces additional output for diagnostics. Can be ' 'used up to three times for more logging info.') @@ -1186,6 +1190,8 @@ def Main(argv): logging.basicConfig(level=level, format='%(module)s(%(lineno)d) %(funcName)s:%(message)s') options.entries_filename = options.config_filename + '_entries' + if options.jobs < 1: + parser.error('--jobs must be 1 or higher') # These hacks need to die. if not hasattr(options, 'revisions'): diff --git a/gclient_utils.py b/gclient_utils.py index c530f4cb4c..b2b0093140 100644 --- a/gclient_utils.py +++ b/gclient_utils.py @@ -21,6 +21,7 @@ import re import stat import subprocess import sys +import threading import time import threading import xml.dom.minidom @@ -378,21 +379,30 @@ class WorkItem(object): class ExecutionQueue(object): - """Dependencies sometime needs to be run out of order due to From() keyword. + """Runs a set of WorkItem that have interdependencies and were WorkItem are + added as they are processed. - This class manages that all the required dependencies are run before running - each one. + In gclient's case, Dependencies sometime needs to be run out of order due to + From() keyword. This class manages that all the required dependencies are run + before running each one. - Methods of this class are multithread safe. + Methods of this class are thread safe. """ - def __init__(self, progress): - self.lock = threading.Lock() - # List of WorkItem, Dependency inherits from WorkItem. + def __init__(self, jobs, progress): + """jobs specifies the number of concurrent tasks to allow. progress is a + Progress instance.""" + # Set when a thread is done or a new item is enqueued. + self.ready_cond = threading.Condition() + # Maximum number of concurrent tasks. + self.jobs = jobs + # List of WorkItem, for gclient, these are Dependency instances. self.queued = [] # List of strings representing each Dependency.name that was run. self.ran = [] # List of items currently running. self.running = [] + # Exceptions thrown if any. + self.exceptions = [] self.progress = progress if self.progress: self.progress.update() @@ -402,71 +412,99 @@ class ExecutionQueue(object): satisfied. """ assert isinstance(d, WorkItem) + self.ready_cond.acquire() try: - self.lock.acquire() self.queued.append(d) total = len(self.queued) + len(self.ran) + len(self.running) + logging.debug('enqueued(%s)' % d.name) + if self.progress: + self.progress._total = total + 1 + self.progress.update(0) + self.ready_cond.notifyAll() finally: - self.lock.release() - if self.progress: - self.progress._total = total + 1 - self.progress.update(0) + self.ready_cond.release() def flush(self, *args, **kwargs): """Runs all enqueued items until all are executed.""" - while self._run_one_item(*args, **kwargs): - pass - queued = [] - running = [] + self.ready_cond.acquire() try: - self.lock.acquire() - if self.queued: - queued = self.queued - self.queued = [] - if self.running: - running = self.running - self.running = [] + while True: + # Check for task to run first, then wait. + while True: + if self.exceptions: + # Systematically flush the queue when there is an exception logged + # in. + self.queued = [] + # Flush threads that have terminated. + self.running = [t for t in self.running if t.isAlive()] + if not self.queued and not self.running: + break + if self.jobs == len(self.running): + break + for i in xrange(len(self.queued)): + # Verify its requirements. + for r in self.queued[i].requirements: + if not r in self.ran: + # Requirement not met. + break + else: + # Start one work item: all its requirements are satisfied. + d = self.queued.pop(i) + new_thread = self._Worker(self, d, args=args, kwargs=kwargs) + if self.jobs > 1: + # Start the thread. + self.running.append(new_thread) + new_thread.start() + else: + # Run the 'thread' inside the main thread. + new_thread.run() + break + else: + # Couldn't find an item that could run. Break out the outher loop. + break + if not self.queued and not self.running: + break + # We need to poll here otherwise Ctrl-C isn't processed. + self.ready_cond.wait(10) + # Something happened: self.enqueue() or a thread terminated. Loop again. finally: - self.lock.release() + self.ready_cond.release() + assert not self.running, 'Now guaranteed to be single-threaded' + if self.exceptions: + # TODO(maruel): Get back the original stack location. + raise self.exceptions.pop(0) if self.progress: self.progress.end() - if queued: - raise gclient_utils.Error('Entries still queued: %s' % str(queued)) - if running: - raise gclient_utils.Error('Entries still queued: %s' % str(running)) - def _run_one_item(self, *args, **kwargs): - """Removes one item from the queue that has all its requirements completed - and execute it. + class _Worker(threading.Thread): + """One thread to execute one WorkItem.""" + def __init__(self, parent, item, args=(), kwargs=None): + threading.Thread.__init__(self, name=item.name or 'Worker') + self.args = args + self.kwargs = kwargs or {} + self.item = item + self.parent = parent + + def run(self): + """Runs in its own thread.""" + logging.debug('running(%s)' % self.item.name) + exception = None + try: + self.item.run(*self.args, **self.kwargs) + except Exception, e: + # TODO(maruel): Catch exception location. + exception = e - Returns False if no item could be run. - """ - i = 0 - d = None - try: - self.lock.acquire() - while i != len(self.queued) and not d: - d = self.queued.pop(i) - for r in d.requirements: - if not r in self.ran: - self.queued.insert(i, d) - d = None - break - i += 1 - if not d: - return False - self.running.append(d) - finally: - self.lock.release() - d.run(*args, **kwargs) - try: - self.lock.acquire() - assert not d.name in self.ran - if not d.name in self.ran: - self.ran.append(d.name) - self.running.remove(d) - if self.progress: - self.progress.update(1) - finally: - self.lock.release() - return True + # This assumes the following code won't throw an exception. Bad. + self.parent.ready_cond.acquire() + try: + if exception: + self.parent.exceptions.append(exception) + if self.parent.progress: + self.parent.progress.update(1) + assert not self.item.name in self.parent.ran + if not self.item.name in self.parent.ran: + self.parent.ran.append(self.item.name) + finally: + self.parent.ready_cond.notifyAll() + self.parent.ready_cond.release()