Add new class ExecutionQueue to take care of out-of-order execution.

ExecutionQueue takes care of reordering execution depending on each Dependency
requirements. There is no need to sort anymore.

ExecutionQueue also manages the Progress instance and is multithread safe, in preparation to parallel checkout.

Review URL: http://codereview.chromium.org/3112002

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@55882 0039d316-1c4b-4281-b951-d872f2087c98
experimental/szager/collated-output
maruel@chromium.org 15 years ago
parent 05d5a80a2e
commit 049bcedffe

@ -49,7 +49,7 @@ Hooks
] ]
""" """
__version__ = "0.5" __version__ = "0.5.1"
import logging import logging
import optparse import optparse
@ -59,6 +59,7 @@ import pprint
import re import re
import subprocess import subprocess
import sys import sys
import threading
import urlparse import urlparse
import urllib import urllib
@ -79,6 +80,110 @@ def attr(attr, data):
## GClient implementation. ## GClient implementation.
class WorkItem(object):
"""One work item."""
requirements = []
name = None
def run(self):
pass
class ExecutionQueue(object):
"""Dependencies sometime needs to be run out of order due to From() keyword.
This class manages that all the required dependencies are run before running
each one.
Methods of this class are multithread safe.
"""
def __init__(self, progress):
self.lock = threading.Lock()
# List of Dependency.
self.queued = []
# List of strings representing each Dependency.name that was run.
self.ran = []
# List of items currently running.
self.running = []
self.progress = progress
if self.progress:
self.progress.update()
def enqueue(self, d):
"""Enqueue one Dependency to be executed later once its requirements are
satisfied.
"""
assert isinstance(d, WorkItem)
try:
self.lock.acquire()
self.queued.append(d)
total = len(self.queued) + len(self.ran) + len(self.running)
finally:
self.lock.release()
if self.progress:
self.progress._total = total + 1
self.progress.update(0)
def flush(self, *args, **kwargs):
"""Runs all enqueued items until all are executed."""
while self._run_one_item(*args, **kwargs):
pass
queued = []
running = []
try:
self.lock.acquire()
if self.queued:
queued = self.queued
self.queued = []
if self.running:
running = self.running
self.running = []
finally:
self.lock.release()
if self.progress:
self.progress.end()
if queued:
raise gclient_utils.Error('Entries still queued: %s' % str(queued))
if running:
raise gclient_utils.Error('Entries still queued: %s' % str(running))
def _run_one_item(self, *args, **kwargs):
"""Removes one item from the queue that has all its requirements completed
and execute it.
Returns False if no item could be run.
"""
i = 0
d = None
try:
self.lock.acquire()
while i != len(self.queued) and not d:
d = self.queued.pop(i)
for r in d.requirements:
if not r in self.ran:
self.queued.insert(i, d)
d = None
break
i += 1
if not d:
return False
self.running.append(d)
finally:
self.lock.release()
d.run(*args, **kwargs)
try:
self.lock.acquire()
# TODO(maruel): http://crbug.com/51711
#assert not d.name in self.ran
if not d.name in self.ran:
self.ran.append(d.name)
self.running.remove(d)
if self.progress:
self.progress.update(1)
finally:
self.lock.release()
return True
class GClientKeywords(object): class GClientKeywords(object):
class FromImpl(object): class FromImpl(object):
@ -134,7 +239,7 @@ class GClientKeywords(object):
raise gclient_utils.Error("Var is not defined: %s" % var_name) raise gclient_utils.Error("Var is not defined: %s" % var_name)
class Dependency(GClientKeywords): class Dependency(GClientKeywords, WorkItem):
"""Object that represents a dependency checkout.""" """Object that represents a dependency checkout."""
DEPS_FILE = 'DEPS' DEPS_FILE = 'DEPS'
@ -320,16 +425,9 @@ class Dependency(GClientKeywords):
name) name)
self.dependencies.append(Dependency(self, name, url, None, None, None, self.dependencies.append(Dependency(self, name, url, None, None, None,
None)) None))
# Sorting by name would in theory make the whole thing coherent, since
# subdirectories will be sorted after the parent directory, but that doens't
# work with From() that fetch from a dependency with a name being sorted
# later. But if this would be removed right now, many projects wouldn't be
# able to sync anymore.
self.dependencies.sort(key=lambda x: x.name)
logging.debug('Loaded: %s' % str(self)) logging.debug('Loaded: %s' % str(self))
def RunCommandRecursively(self, options, revision_overrides, def run(self, options, revision_overrides, command, args, work_queue):
command, args, pm):
"""Runs 'command' before parsing the DEPS in case it's a initial checkout """Runs 'command' before parsing the DEPS in case it's a initial checkout
or a revert.""" or a revert."""
assert self._file_list == [] assert self._file_list == []
@ -357,21 +455,13 @@ class Dependency(GClientKeywords):
for f in self._file_list] for f in self._file_list]
options.revision = None options.revision = None
self.processed = True self.processed = True
if pm:
# The + 1 comes from the fact that .gclient is considered a step in
# itself, .i.e. this code is called one time for the .gclient. This is not
# conceptually correct but it simplifies code.
pm._total = len(self.tree(False)) + 1
pm.update()
if self.recursion_limit(): if self.recursion_limit():
# Then we can parse the DEPS file. # Then we can parse the DEPS file.
self.ParseDepsFile(True) self.ParseDepsFile(True)
if pm:
pm._total = len(self.tree(False)) + 1
pm.update(0)
# Adjust the implicit dependency requirement; e.g. if a DEPS file contains # Adjust the implicit dependency requirement; e.g. if a DEPS file contains
# both src/foo and src/foo/bar, src/foo/bar is implicitly dependent of # both src/foo and src/foo/bar, src/foo/bar is implicitly dependent of
# src/foo. Yes, it's O(n^2)... # src/foo. Yes, it's O(n^2)... It's important to do that before
# enqueueing them.
for s in self.dependencies: for s in self.dependencies:
for s2 in self.dependencies: for s2 in self.dependencies:
if s is s2: if s is s2:
@ -381,13 +471,10 @@ class Dependency(GClientKeywords):
# Parse the dependencies of this dependency. # Parse the dependencies of this dependency.
for s in self.dependencies: for s in self.dependencies:
# TODO(maruel): All these can run concurrently! No need for threads, work_queue.enqueue(s)
# just buffer stdout&stderr on pipes and flush as they complete.
# Watch out for stdin.
s.RunCommandRecursively(options, revision_overrides, command, args, pm)
def RunHooksRecursively(self, options): def RunHooksRecursively(self, options):
"""Evaluates all hooks, running actions as needed. RunCommandRecursively() """Evaluates all hooks, running actions as needed. run()
must have been called before to load the DEPS.""" must have been called before to load the DEPS."""
# If "--force" was specified, run all hooks regardless of what files have # If "--force" was specified, run all hooks regardless of what files have
# changed. # changed.
@ -493,7 +580,7 @@ class Dependency(GClientKeywords):
out = [] out = []
for i in ('name', 'url', 'parsed_url', 'safesync_url', 'custom_deps', for i in ('name', 'url', 'parsed_url', 'safesync_url', 'custom_deps',
'custom_vars', 'deps_hooks', '_file_list', 'processed', 'custom_vars', 'deps_hooks', '_file_list', 'processed',
'hooks_ran', 'deps_parsed', 'requirements'): 'hooks_ran', 'deps_parsed', 'requirements', 'direct_reference'):
# 'deps_file' # 'deps_file'
if self.__dict__[i]: if self.__dict__[i]:
out.append('%s: %s' % (i, self.__dict__[i])) out.append('%s: %s' % (i, self.__dict__[i]))
@ -601,6 +688,8 @@ solutions = [
'incomplete: %s' % s) 'incomplete: %s' % s)
# .gclient can have hooks. # .gclient can have hooks.
self.deps_hooks = config_dict.get('hooks', []) self.deps_hooks = config_dict.get('hooks', [])
self.direct_reference = True
self.deps_parsed = True
def SaveConfig(self): def SaveConfig(self):
gclient_utils.FileWrite(os.path.join(self.root_dir(), gclient_utils.FileWrite(os.path.join(self.root_dir(),
@ -705,11 +794,12 @@ solutions = [
revision_overrides = self._EnforceRevisions() revision_overrides = self._EnforceRevisions()
pm = None pm = None
if command == 'update' and not self._options.verbose: if command == 'update' and not self._options.verbose:
pm = Progress('Syncing projects', len(self.tree(False)) + 1) pm = Progress('Syncing projects', 1)
self.RunCommandRecursively(self._options, revision_overrides, work_queue = ExecutionQueue(pm)
command, args, pm) for s in self.dependencies:
if pm: work_queue.enqueue(s)
pm.end() work_queue.flush(self._options, revision_overrides, command, args,
work_queue)
# Once all the dependencies have been processed, it's now safe to run the # Once all the dependencies have been processed, it's now safe to run the
# hooks. # hooks.
@ -750,7 +840,10 @@ solutions = [
if not self.dependencies: if not self.dependencies:
raise gclient_utils.Error('No solution specified') raise gclient_utils.Error('No solution specified')
# Load all the settings. # Load all the settings.
self.RunCommandRecursively(self._options, {}, None, [], None) work_queue = ExecutionQueue(None)
for s in self.dependencies:
work_queue.enqueue(s)
work_queue.flush(self._options, {}, None, [], work_queue)
def GetURLAndRev(dep): def GetURLAndRev(dep):
"""Returns the revision-qualified SCM url for a Dependency.""" """Returns the revision-qualified SCM url for a Dependency."""
@ -809,8 +902,7 @@ solutions = [
def ParseDepsFile(self, direct_reference): def ParseDepsFile(self, direct_reference):
"""No DEPS to parse for a .gclient file.""" """No DEPS to parse for a .gclient file."""
self.direct_reference = True raise gclient_utils.Error('Internal error')
self.deps_parsed = True
def root_dir(self): def root_dir(self):
"""Root directory of gclient checkout.""" """Root directory of gclient checkout."""

@ -373,17 +373,17 @@ class GClientSmokeSVN(GClientSmokeBase):
# So verify it works with --verbose. # So verify it works with --verbose.
out = self.parseGclient(['status', '--deps', 'mac', '--verbose'], out = self.parseGclient(['status', '--deps', 'mac', '--verbose'],
[['running', join(self.root_dir, 'src')], [['running', join(self.root_dir, 'src')],
['running', join(self.root_dir, 'src', 'other')],
['running', join(self.root_dir, 'src', 'third_party', 'fpp')], ['running', join(self.root_dir, 'src', 'third_party', 'fpp')],
['running', join(self.root_dir, 'src', 'other')],
['running', join(self.root_dir, 'src', 'third_party', 'prout')]]) ['running', join(self.root_dir, 'src', 'third_party', 'prout')]])
out = self.svnBlockCleanup(out) out = self.svnBlockCleanup(out)
self.checkString('other', out[0][1]) self.checkString('other', out[0][1])
self.checkString(join('third_party', 'fpp'), out[0][2]) self.checkString(join('third_party', 'fpp'), out[0][2])
self.checkString(join('third_party', 'prout'), out[0][3]) self.checkString(join('third_party', 'prout'), out[0][3])
self.checkString('hi', out[1][1]) self.checkString('hi', out[2][1])
self.assertEquals(4, len(out[0])) self.assertEquals(4, len(out[0]))
self.assertEquals(2, len(out[1])) self.assertEquals(1, len(out[1]))
self.assertEquals(1, len(out[2])) self.assertEquals(2, len(out[2]))
self.assertEquals(1, len(out[3])) self.assertEquals(1, len(out[3]))
self.assertEquals(4, len(out)) self.assertEquals(4, len(out))

Loading…
Cancel
Save