From 049bcedffee62a6118ea6a5e0c0702a035dc6da6 Mon Sep 17 00:00:00 2001 From: "maruel@chromium.org" Date: Thu, 12 Aug 2010 13:37:20 +0000 Subject: [PATCH] Add new class ExecutionQueue to take care of out-of-order execution. ExecutionQueue takes care of reordering execution depending on each Dependency requirements. There is no need to sort anymore. ExecutionQueue also manages the Progress instance and is multithread safe, in preparation to parallel checkout. Review URL: http://codereview.chromium.org/3112002 git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@55882 0039d316-1c4b-4281-b951-d872f2087c98 --- gclient.py | 160 +++++++++++++++++++++++++++++-------- tests/gclient_smoketest.py | 8 +- 2 files changed, 130 insertions(+), 38 deletions(-) diff --git a/gclient.py b/gclient.py index 41dc58b9c..4170f5fc6 100644 --- a/gclient.py +++ b/gclient.py @@ -49,7 +49,7 @@ Hooks ] """ -__version__ = "0.5" +__version__ = "0.5.1" import logging import optparse @@ -59,6 +59,7 @@ import pprint import re import subprocess import sys +import threading import urlparse import urllib @@ -79,6 +80,110 @@ def attr(attr, data): ## GClient implementation. +class WorkItem(object): + """One work item.""" + requirements = [] + name = None + + def run(self): + pass + + +class ExecutionQueue(object): + """Dependencies sometime needs to be run out of order due to From() keyword. + + This class manages that all the required dependencies are run before running + each one. + + Methods of this class are multithread safe. + """ + def __init__(self, progress): + self.lock = threading.Lock() + # List of Dependency. + self.queued = [] + # List of strings representing each Dependency.name that was run. + self.ran = [] + # List of items currently running. + self.running = [] + self.progress = progress + if self.progress: + self.progress.update() + + def enqueue(self, d): + """Enqueue one Dependency to be executed later once its requirements are + satisfied. + """ + assert isinstance(d, WorkItem) + try: + self.lock.acquire() + self.queued.append(d) + total = len(self.queued) + len(self.ran) + len(self.running) + finally: + self.lock.release() + if self.progress: + self.progress._total = total + 1 + self.progress.update(0) + + def flush(self, *args, **kwargs): + """Runs all enqueued items until all are executed.""" + while self._run_one_item(*args, **kwargs): + pass + queued = [] + running = [] + try: + self.lock.acquire() + if self.queued: + queued = self.queued + self.queued = [] + if self.running: + running = self.running + self.running = [] + finally: + self.lock.release() + if self.progress: + self.progress.end() + if queued: + raise gclient_utils.Error('Entries still queued: %s' % str(queued)) + if running: + raise gclient_utils.Error('Entries still queued: %s' % str(running)) + + def _run_one_item(self, *args, **kwargs): + """Removes one item from the queue that has all its requirements completed + and execute it. + + Returns False if no item could be run. + """ + i = 0 + d = None + try: + self.lock.acquire() + while i != len(self.queued) and not d: + d = self.queued.pop(i) + for r in d.requirements: + if not r in self.ran: + self.queued.insert(i, d) + d = None + break + i += 1 + if not d: + return False + self.running.append(d) + finally: + self.lock.release() + d.run(*args, **kwargs) + try: + self.lock.acquire() + # TODO(maruel): http://crbug.com/51711 + #assert not d.name in self.ran + if not d.name in self.ran: + self.ran.append(d.name) + self.running.remove(d) + if self.progress: + self.progress.update(1) + finally: + self.lock.release() + return True + class GClientKeywords(object): class FromImpl(object): @@ -134,7 +239,7 @@ class GClientKeywords(object): raise gclient_utils.Error("Var is not defined: %s" % var_name) -class Dependency(GClientKeywords): +class Dependency(GClientKeywords, WorkItem): """Object that represents a dependency checkout.""" DEPS_FILE = 'DEPS' @@ -320,16 +425,9 @@ class Dependency(GClientKeywords): name) self.dependencies.append(Dependency(self, name, url, None, None, None, None)) - # Sorting by name would in theory make the whole thing coherent, since - # subdirectories will be sorted after the parent directory, but that doens't - # work with From() that fetch from a dependency with a name being sorted - # later. But if this would be removed right now, many projects wouldn't be - # able to sync anymore. - self.dependencies.sort(key=lambda x: x.name) logging.debug('Loaded: %s' % str(self)) - def RunCommandRecursively(self, options, revision_overrides, - command, args, pm): + def run(self, options, revision_overrides, command, args, work_queue): """Runs 'command' before parsing the DEPS in case it's a initial checkout or a revert.""" assert self._file_list == [] @@ -357,21 +455,13 @@ class Dependency(GClientKeywords): for f in self._file_list] options.revision = None self.processed = True - if pm: - # The + 1 comes from the fact that .gclient is considered a step in - # itself, .i.e. this code is called one time for the .gclient. This is not - # conceptually correct but it simplifies code. - pm._total = len(self.tree(False)) + 1 - pm.update() if self.recursion_limit(): # Then we can parse the DEPS file. self.ParseDepsFile(True) - if pm: - pm._total = len(self.tree(False)) + 1 - pm.update(0) # Adjust the implicit dependency requirement; e.g. if a DEPS file contains # both src/foo and src/foo/bar, src/foo/bar is implicitly dependent of - # src/foo. Yes, it's O(n^2)... + # src/foo. Yes, it's O(n^2)... It's important to do that before + # enqueueing them. for s in self.dependencies: for s2 in self.dependencies: if s is s2: @@ -381,13 +471,10 @@ class Dependency(GClientKeywords): # Parse the dependencies of this dependency. for s in self.dependencies: - # TODO(maruel): All these can run concurrently! No need for threads, - # just buffer stdout&stderr on pipes and flush as they complete. - # Watch out for stdin. - s.RunCommandRecursively(options, revision_overrides, command, args, pm) + work_queue.enqueue(s) def RunHooksRecursively(self, options): - """Evaluates all hooks, running actions as needed. RunCommandRecursively() + """Evaluates all hooks, running actions as needed. run() must have been called before to load the DEPS.""" # If "--force" was specified, run all hooks regardless of what files have # changed. @@ -493,7 +580,7 @@ class Dependency(GClientKeywords): out = [] for i in ('name', 'url', 'parsed_url', 'safesync_url', 'custom_deps', 'custom_vars', 'deps_hooks', '_file_list', 'processed', - 'hooks_ran', 'deps_parsed', 'requirements'): + 'hooks_ran', 'deps_parsed', 'requirements', 'direct_reference'): # 'deps_file' if self.__dict__[i]: out.append('%s: %s' % (i, self.__dict__[i])) @@ -601,6 +688,8 @@ solutions = [ 'incomplete: %s' % s) # .gclient can have hooks. self.deps_hooks = config_dict.get('hooks', []) + self.direct_reference = True + self.deps_parsed = True def SaveConfig(self): gclient_utils.FileWrite(os.path.join(self.root_dir(), @@ -705,11 +794,12 @@ solutions = [ revision_overrides = self._EnforceRevisions() pm = None if command == 'update' and not self._options.verbose: - pm = Progress('Syncing projects', len(self.tree(False)) + 1) - self.RunCommandRecursively(self._options, revision_overrides, - command, args, pm) - if pm: - pm.end() + pm = Progress('Syncing projects', 1) + work_queue = ExecutionQueue(pm) + for s in self.dependencies: + work_queue.enqueue(s) + work_queue.flush(self._options, revision_overrides, command, args, + work_queue) # Once all the dependencies have been processed, it's now safe to run the # hooks. @@ -750,7 +840,10 @@ solutions = [ if not self.dependencies: raise gclient_utils.Error('No solution specified') # Load all the settings. - self.RunCommandRecursively(self._options, {}, None, [], None) + work_queue = ExecutionQueue(None) + for s in self.dependencies: + work_queue.enqueue(s) + work_queue.flush(self._options, {}, None, [], work_queue) def GetURLAndRev(dep): """Returns the revision-qualified SCM url for a Dependency.""" @@ -809,8 +902,7 @@ solutions = [ def ParseDepsFile(self, direct_reference): """No DEPS to parse for a .gclient file.""" - self.direct_reference = True - self.deps_parsed = True + raise gclient_utils.Error('Internal error') def root_dir(self): """Root directory of gclient checkout.""" diff --git a/tests/gclient_smoketest.py b/tests/gclient_smoketest.py index b83035016..d1251e19e 100755 --- a/tests/gclient_smoketest.py +++ b/tests/gclient_smoketest.py @@ -373,17 +373,17 @@ class GClientSmokeSVN(GClientSmokeBase): # So verify it works with --verbose. out = self.parseGclient(['status', '--deps', 'mac', '--verbose'], [['running', join(self.root_dir, 'src')], - ['running', join(self.root_dir, 'src', 'other')], ['running', join(self.root_dir, 'src', 'third_party', 'fpp')], + ['running', join(self.root_dir, 'src', 'other')], ['running', join(self.root_dir, 'src', 'third_party', 'prout')]]) out = self.svnBlockCleanup(out) self.checkString('other', out[0][1]) self.checkString(join('third_party', 'fpp'), out[0][2]) self.checkString(join('third_party', 'prout'), out[0][3]) - self.checkString('hi', out[1][1]) + self.checkString('hi', out[2][1]) self.assertEquals(4, len(out[0])) - self.assertEquals(2, len(out[1])) - self.assertEquals(1, len(out[2])) + self.assertEquals(1, len(out[1])) + self.assertEquals(2, len(out[2])) self.assertEquals(1, len(out[3])) self.assertEquals(4, len(out))