From 2f8e924184e7d6dff625ae8c7e54ed7c2d9a30a0 Mon Sep 17 00:00:00 2001 From: Andrii Shyshkalov Date: Mon, 23 Jan 2017 19:20:19 +0100 Subject: [PATCH] git cl status: use smarter parallel processing. Previously, 1 CL was queried first, followed by all remaining ones in parallel. The purpose was to ensure that valid authentication token is present. However, it still required one call to Rietveld or Gerrit, needlessly increasing latency. This CL first loops over all CLs and ensures that credentials are present, refreshing refresh tokens for Rietveld if necessary. Then, all CLs are queried in parallel. R=sergiyb@chromium.org,clemensh@chromium.org BUG=681704 Change-Id: Ic125ac7c2a684d6f3c34e4e8b899192abbed01bb Reviewed-on: https://chromium-review.googlesource.com/431033 Reviewed-by: Sergiy Byelozyorov Commit-Queue: Andrii Shyshkalov --- git_cl.py | 89 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 48 insertions(+), 41 deletions(-) diff --git a/git_cl.py b/git_cl.py index 3b92b9034..d86902c04 100755 --- a/git_cl.py +++ b/git_cl.py @@ -1799,11 +1799,13 @@ class _ChangelistCodereviewBase(object): failed.""" raise NotImplementedError() - def EnsureAuthenticated(self, force): + def EnsureAuthenticated(self, force, refresh=False): """Best effort check that user is authenticated with codereview server. Arguments: force: whether to skip confirmation questions. + refresh: whether to attempt to refresh credentials. Ignored if not + applicable. """ raise NotImplementedError() @@ -1852,13 +1854,15 @@ class _RietveldChangelistImpl(_ChangelistCodereviewBase): self._rietveld_server = settings.GetDefaultServerUrl() return self._rietveld_server - def EnsureAuthenticated(self, force): + def EnsureAuthenticated(self, force, refresh=False): """Best effort check that user is authenticated with Rietveld server.""" if self._auth_config.use_oauth2: authenticator = auth.get_authenticator_for_host( self.GetCodereviewServer(), self._auth_config) if not authenticator.has_cached_credentials(): raise auth.LoginRequiredError(self.GetCodereviewServer()) + if refresh: + authenticator.get_access_token() def FetchDescription(self): issue = self.GetIssue() @@ -2315,7 +2319,7 @@ class _GerritChangelistImpl(_ChangelistCodereviewBase): def CodereviewServerConfigKey(cls): return 'gerritserver' - def EnsureAuthenticated(self, force): + def EnsureAuthenticated(self, force, refresh=None): """Best effort check that user is authenticated with Gerrit server.""" if settings.GetGerritSkipEnsureAuthenticated(): # For projects with unusual authentication schemes. @@ -3469,48 +3473,51 @@ def get_cl_statuses(changes, fine_grained, max_processes=None): # Silence upload.py otherwise it becomes unwieldy. upload.verbosity = 0 - if fine_grained: - # Process one branch synchronously to work through authentication, then - # spawn processes to process all the other branches in parallel. - if changes: - def fetch(cl): - try: - return (cl, cl.GetStatus()) - except: - # See http://crbug.com/629863. - logging.exception('failed to fetch status for %s:', cl) - raise - yield fetch(changes[0]) - - changes_to_fetch = changes[1:] - if not changes_to_fetch: - # Exit early if there was only one branch to fetch. - return - - pool = ThreadPool( - min(max_processes, len(changes_to_fetch)) - if max_processes is not None - else max(len(changes_to_fetch), 1)) - - fetched_cls = set() - it = pool.imap_unordered(fetch, changes_to_fetch).__iter__() - while True: - try: - row = it.next(timeout=5) - except multiprocessing.TimeoutError: - break - - fetched_cls.add(row[0]) - yield row - - # Add any branches that failed to fetch. - for cl in set(changes_to_fetch) - fetched_cls: - yield (cl, 'error') + if not changes: + raise StopIteration() - else: + if not fine_grained: + # Fast path which doesn't involve querying codereview servers. # Do not use GetApprovingReviewers(), since it requires an HTTP request. for cl in changes: yield (cl, 'waiting' if cl.GetIssueURL() else 'error') + return + + # First, sort out authentication issues. + logging.debug('ensuring credentials exist') + for cl in changes: + cl.EnsureAuthenticated(force=False, refresh=True) + + def fetch(cl): + try: + return (cl, cl.GetStatus()) + except: + # See http://crbug.com/629863. + logging.exception('failed to fetch status for %s:', cl) + raise + + threads_count = len(changes) + if max_processes: + threads_count = max(1, min(threads_count, max_processes)) + logging.debug('querying %d CLs using %d threads', len(changes), threads_count) + + pool = ThreadPool(threads_count) + fetched_cls = set() + try: + it = pool.imap_unordered(fetch, changes).__iter__() + while True: + try: + cl, status = it.next(timeout=5) + except multiprocessing.TimeoutError: + break + fetched_cls.add(cl) + yield cl, status + finally: + pool.close() + + # Add any branches that failed to fetch. + for cl in set(changes) - fetched_cls: + yield (cl, 'error') def upload_branch_deps(cl, args):