git cl status: use smarter parallel processing.

Previously, 1 CL was queried first, followed by all remaining ones
in parallel. The purpose was to ensure that valid authentication
token is present. However, it still required one call to Rietveld
or Gerrit, needlessly increasing latency.

This CL first loops over all CLs and ensures that credentials are
present, refreshing refresh tokens for Rietveld if necessary. Then,
all CLs are queried in parallel.

R=sergiyb@chromium.org,clemensh@chromium.org
BUG=681704

Change-Id: Ic125ac7c2a684d6f3c34e4e8b899192abbed01bb
Reviewed-on: https://chromium-review.googlesource.com/431033
Reviewed-by: Sergiy Byelozyorov <sergiyb@chromium.org>
Commit-Queue: Andrii Shyshkalov <tandrii@chromium.org>
changes/33/431033/4
Andrii Shyshkalov 9 years ago committed by Commit Bot
parent 3c86cee33a
commit 2f8e924184

@ -1799,11 +1799,13 @@ class _ChangelistCodereviewBase(object):
failed.""" failed."""
raise NotImplementedError() raise NotImplementedError()
def EnsureAuthenticated(self, force): def EnsureAuthenticated(self, force, refresh=False):
"""Best effort check that user is authenticated with codereview server. """Best effort check that user is authenticated with codereview server.
Arguments: Arguments:
force: whether to skip confirmation questions. force: whether to skip confirmation questions.
refresh: whether to attempt to refresh credentials. Ignored if not
applicable.
""" """
raise NotImplementedError() raise NotImplementedError()
@ -1852,13 +1854,15 @@ class _RietveldChangelistImpl(_ChangelistCodereviewBase):
self._rietveld_server = settings.GetDefaultServerUrl() self._rietveld_server = settings.GetDefaultServerUrl()
return self._rietveld_server return self._rietveld_server
def EnsureAuthenticated(self, force): def EnsureAuthenticated(self, force, refresh=False):
"""Best effort check that user is authenticated with Rietveld server.""" """Best effort check that user is authenticated with Rietveld server."""
if self._auth_config.use_oauth2: if self._auth_config.use_oauth2:
authenticator = auth.get_authenticator_for_host( authenticator = auth.get_authenticator_for_host(
self.GetCodereviewServer(), self._auth_config) self.GetCodereviewServer(), self._auth_config)
if not authenticator.has_cached_credentials(): if not authenticator.has_cached_credentials():
raise auth.LoginRequiredError(self.GetCodereviewServer()) raise auth.LoginRequiredError(self.GetCodereviewServer())
if refresh:
authenticator.get_access_token()
def FetchDescription(self): def FetchDescription(self):
issue = self.GetIssue() issue = self.GetIssue()
@ -2315,7 +2319,7 @@ class _GerritChangelistImpl(_ChangelistCodereviewBase):
def CodereviewServerConfigKey(cls): def CodereviewServerConfigKey(cls):
return 'gerritserver' return 'gerritserver'
def EnsureAuthenticated(self, force): def EnsureAuthenticated(self, force, refresh=None):
"""Best effort check that user is authenticated with Gerrit server.""" """Best effort check that user is authenticated with Gerrit server."""
if settings.GetGerritSkipEnsureAuthenticated(): if settings.GetGerritSkipEnsureAuthenticated():
# For projects with unusual authentication schemes. # For projects with unusual authentication schemes.
@ -3469,10 +3473,21 @@ def get_cl_statuses(changes, fine_grained, max_processes=None):
# Silence upload.py otherwise it becomes unwieldy. # Silence upload.py otherwise it becomes unwieldy.
upload.verbosity = 0 upload.verbosity = 0
if fine_grained: if not changes:
# Process one branch synchronously to work through authentication, then raise StopIteration()
# spawn processes to process all the other branches in parallel.
if changes: if not fine_grained:
# Fast path which doesn't involve querying codereview servers.
# Do not use GetApprovingReviewers(), since it requires an HTTP request.
for cl in changes:
yield (cl, 'waiting' if cl.GetIssueURL() else 'error')
return
# First, sort out authentication issues.
logging.debug('ensuring credentials exist')
for cl in changes:
cl.EnsureAuthenticated(force=False, refresh=True)
def fetch(cl): def fetch(cl):
try: try:
return (cl, cl.GetStatus()) return (cl, cl.GetStatus())
@ -3480,38 +3495,30 @@ def get_cl_statuses(changes, fine_grained, max_processes=None):
# See http://crbug.com/629863. # See http://crbug.com/629863.
logging.exception('failed to fetch status for %s:', cl) logging.exception('failed to fetch status for %s:', cl)
raise raise
yield fetch(changes[0])
changes_to_fetch = changes[1:]
if not changes_to_fetch:
# Exit early if there was only one branch to fetch.
return
pool = ThreadPool( threads_count = len(changes)
min(max_processes, len(changes_to_fetch)) if max_processes:
if max_processes is not None threads_count = max(1, min(threads_count, max_processes))
else max(len(changes_to_fetch), 1)) logging.debug('querying %d CLs using %d threads', len(changes), threads_count)
pool = ThreadPool(threads_count)
fetched_cls = set() fetched_cls = set()
it = pool.imap_unordered(fetch, changes_to_fetch).__iter__() try:
it = pool.imap_unordered(fetch, changes).__iter__()
while True: while True:
try: try:
row = it.next(timeout=5) cl, status = it.next(timeout=5)
except multiprocessing.TimeoutError: except multiprocessing.TimeoutError:
break break
fetched_cls.add(cl)
fetched_cls.add(row[0]) yield cl, status
yield row finally:
pool.close()
# Add any branches that failed to fetch. # Add any branches that failed to fetch.
for cl in set(changes_to_fetch) - fetched_cls: for cl in set(changes) - fetched_cls:
yield (cl, 'error') yield (cl, 'error')
else:
# Do not use GetApprovingReviewers(), since it requires an HTTP request.
for cl in changes:
yield (cl, 'waiting' if cl.GetIssueURL() else 'error')
def upload_branch_deps(cl, args): def upload_branch_deps(cl, args):
"""Uploads CLs of local branches that are dependents of the current branch. """Uploads CLs of local branches that are dependents of the current branch.

Loading…
Cancel
Save