#!/usr/bin/env vpython3 # Copyright (c) 2020 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging import os import re import sys ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) GCLIENT_PATH = os.path.join(ROOT_DIR, 'gclient') sys.path.insert(0, ROOT_DIR) import subprocess2 from testing_support import fake_repos class GClientSmokeBase(fake_repos.FakeReposTestBase): def setUp(self): super(GClientSmokeBase, self).setUp() # Make sure it doesn't try to auto update when testing! self.env = os.environ.copy() self.env['DEPOT_TOOLS_UPDATE'] = '0' self.env['DEPOT_TOOLS_METRICS'] = '0' # Suppress Python 3 warnings and other test undesirables. self.env['GCLIENT_TEST'] = '1' self.env['GCLIENT_PY3'] = '0' if sys.version_info.major == 2 else '1' self.maxDiff = None def gclient(self, cmd, cwd=None, error_ok=False): if not cwd: cwd = self.root_dir cmd = [GCLIENT_PATH] + cmd process = subprocess2.Popen( cmd, cwd=cwd, env=self.env, stdout=subprocess2.PIPE, stderr=subprocess2.PIPE, universal_newlines=True) (stdout, stderr) = process.communicate() logging.debug("XXX: %s\n%s\nXXX" % (' '.join(cmd), stdout)) logging.debug("YYY: %s\n%s\nYYY" % (' '.join(cmd), stderr)) if not error_ok: self.assertEqual(0, process.returncode, stderr) return (stdout.replace('\r\n', '\n'), stderr.replace('\r\n', '\n'), process.returncode) def untangle(self, stdout): """Separates output based on thread IDs.""" tasks = {} remaining = [] task_id = 0 for line in stdout.splitlines(False): m = re.match(r'^(\d)+>(.*)$', line) if not m: if task_id: # Lines broken with carriage breaks don't have a thread ID, but belong # to the last seen thread ID. tasks.setdefault(task_id, []).append(line) else: remaining.append(line) else: self.assertEqual([], remaining) task_id = int(m.group(1)) tasks.setdefault(task_id, []).append(m.group(2)) out = [] for key in sorted(tasks.keys()): out.extend(tasks[key]) out.extend(remaining) return '\n'.join(out) def parseGclient(self, cmd, items, expected_stderr='', untangle=False): """Parse gclient's output to make it easier to test. If untangle is True, tries to sort out the output from parallel checkout.""" (stdout, stderr, _) = self.gclient(cmd) if untangle: stdout = self.untangle(stdout) self.checkString(expected_stderr, stderr) return self.checkBlock(stdout, items) def splitBlock(self, stdout): """Split gclient's output into logical execution blocks. ___ running 'foo' at '/bar' (...) ___ running 'baz' at '/bar' (...) will result in 2 items of len((...).splitlines()) each. """ results = [] for line in stdout.splitlines(False): # Intentionally skips empty lines. if not line: continue if not line.startswith('__'): if results: results[-1].append(line) else: # TODO(maruel): gclient's git stdout is inconsistent. # This should fail the test instead!! pass continue match = re.match(r'^________ ([a-z]+) \'(.*)\' in \'(.*)\'$', line) if match: results.append([[match.group(1), match.group(2), match.group(3)]]) continue match = re.match(r'^_____ (.*) is missing, syncing instead$', line) if match: # Blah, it's when a dependency is deleted, we should probably not # output this message. results.append([line]) continue # These two regexps are a bit too broad, they are necessary only for git # checkouts. if (re.match(r'_____ [^ ]+ at [^ ]+', line) or re.match(r'_____ [^ ]+ : Attempting rebase onto [0-9a-f]+...', line)): continue # Fail for any unrecognized lines that start with '__'. self.fail(line) return results def checkBlock(self, stdout, items): results = self.splitBlock(stdout) for i in range(min(len(results), len(items))): if isinstance(items[i], (list, tuple)): verb = items[i][0] path = items[i][1] else: verb = items[i] path = self.root_dir self.checkString(results[i][0][0], verb, (i, results[i][0][0], verb)) if sys.platform == 'win32': # Make path lower case since casing can change randomly. self.checkString( results[i][0][2].lower(), path.lower(), (i, results[i][0][2].lower(), path.lower())) else: self.checkString(results[i][0][2], path, (i, results[i][0][2], path)) self.assertEqual(len(results), len(items), (stdout, items, len(results))) return results