You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			148 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			148 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			Python
		
	
#!/usr/bin/env vpython3
 | 
						|
# Copyright (c) 2020 The Chromium Authors. All rights reserved.
 | 
						|
# Use of this source code is governed by a BSD-style license that can be
 | 
						|
# found in the LICENSE file.
 | 
						|
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import re
 | 
						|
import sys
 | 
						|
 | 
						|
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 | 
						|
GCLIENT_PATH = os.path.join(ROOT_DIR, 'gclient')
 | 
						|
sys.path.insert(0, ROOT_DIR)
 | 
						|
 | 
						|
import subprocess2
 | 
						|
from testing_support import fake_repos
 | 
						|
 | 
						|
 | 
						|
class GClientSmokeBase(fake_repos.FakeReposTestBase):
 | 
						|
  def setUp(self):
 | 
						|
    super(GClientSmokeBase, self).setUp()
 | 
						|
    # Make sure it doesn't try to auto update when testing!
 | 
						|
    self.env = os.environ.copy()
 | 
						|
    self.env['DEPOT_TOOLS_UPDATE'] = '0'
 | 
						|
    self.env['DEPOT_TOOLS_METRICS'] = '0'
 | 
						|
    # Suppress Python 3 warnings and other test undesirables.
 | 
						|
    self.env['GCLIENT_TEST'] = '1'
 | 
						|
    self.env['GCLIENT_PY3'] = '0' if sys.version_info.major == 2 else '1'
 | 
						|
    self.maxDiff = None
 | 
						|
 | 
						|
  def gclient(self, cmd, cwd=None, error_ok=False):
 | 
						|
    if not cwd:
 | 
						|
      cwd = self.root_dir
 | 
						|
    cmd = [GCLIENT_PATH] + cmd
 | 
						|
    process = subprocess2.Popen(
 | 
						|
        cmd, cwd=cwd, env=self.env, stdout=subprocess2.PIPE,
 | 
						|
        stderr=subprocess2.PIPE, universal_newlines=True)
 | 
						|
    (stdout, stderr) = process.communicate()
 | 
						|
    logging.debug("XXX: %s\n%s\nXXX" % (' '.join(cmd), stdout))
 | 
						|
    logging.debug("YYY: %s\n%s\nYYY" % (' '.join(cmd), stderr))
 | 
						|
 | 
						|
    if not error_ok:
 | 
						|
      self.assertEqual(0, process.returncode, stderr)
 | 
						|
 | 
						|
    return (stdout.replace('\r\n', '\n'), stderr.replace('\r\n', '\n'),
 | 
						|
            process.returncode)
 | 
						|
 | 
						|
  def untangle(self, stdout):
 | 
						|
    """Separates output based on thread IDs."""
 | 
						|
    tasks = {}
 | 
						|
    remaining = []
 | 
						|
    task_id = 0
 | 
						|
    for line in stdout.splitlines(False):
 | 
						|
      m = re.match(r'^(\d)+>(.*)$', line)
 | 
						|
      if not m:
 | 
						|
        if task_id:
 | 
						|
          # Lines broken with carriage breaks don't have a thread ID, but belong
 | 
						|
          # to the last seen thread ID.
 | 
						|
          tasks.setdefault(task_id, []).append(line)
 | 
						|
        else:
 | 
						|
          remaining.append(line)
 | 
						|
      else:
 | 
						|
        self.assertEqual([], remaining)
 | 
						|
        task_id = int(m.group(1))
 | 
						|
        tasks.setdefault(task_id, []).append(m.group(2))
 | 
						|
    out = []
 | 
						|
    for key in sorted(tasks.keys()):
 | 
						|
      out.extend(tasks[key])
 | 
						|
    out.extend(remaining)
 | 
						|
    return '\n'.join(out)
 | 
						|
 | 
						|
  def parseGclient(self, cmd, items, expected_stderr='', untangle=False):
 | 
						|
    """Parse gclient's output to make it easier to test.
 | 
						|
    If untangle is True, tries to sort out the output from parallel checkout."""
 | 
						|
    (stdout, stderr, _) = self.gclient(cmd)
 | 
						|
    if untangle:
 | 
						|
      stdout = self.untangle(stdout)
 | 
						|
    self.checkString(expected_stderr, stderr)
 | 
						|
    return self.checkBlock(stdout, items)
 | 
						|
 | 
						|
  def splitBlock(self, stdout):
 | 
						|
    """Split gclient's output into logical execution blocks.
 | 
						|
    ___ running 'foo' at '/bar'
 | 
						|
    (...)
 | 
						|
    ___ running 'baz' at '/bar'
 | 
						|
    (...)
 | 
						|
 | 
						|
    will result in 2 items of len((...).splitlines()) each.
 | 
						|
    """
 | 
						|
    results = []
 | 
						|
    for line in stdout.splitlines(False):
 | 
						|
      # Intentionally skips empty lines.
 | 
						|
      if not line:
 | 
						|
        continue
 | 
						|
      if not line.startswith('__'):
 | 
						|
        if results:
 | 
						|
          results[-1].append(line)
 | 
						|
        else:
 | 
						|
          # TODO(maruel): gclient's git stdout is inconsistent.
 | 
						|
          # This should fail the test instead!!
 | 
						|
          pass
 | 
						|
        continue
 | 
						|
 | 
						|
      match = re.match(r'^________ ([a-z]+) \'(.*)\' in \'(.*)\'$', line)
 | 
						|
      if match:
 | 
						|
        results.append([[match.group(1), match.group(2), match.group(3)]])
 | 
						|
        continue
 | 
						|
 | 
						|
      match = re.match(r'^_____ (.*) is missing, syncing instead$', line)
 | 
						|
      if match:
 | 
						|
        # Blah, it's when a dependency is deleted, we should probably not
 | 
						|
        # output this message.
 | 
						|
        results.append([line])
 | 
						|
        continue
 | 
						|
 | 
						|
      # These two regexps are a bit too broad, they are necessary only for git
 | 
						|
      # checkouts.
 | 
						|
      if (re.match(r'_____ [^ ]+ at [^ ]+', line) or
 | 
						|
          re.match(r'_____ [^ ]+ : Attempting rebase onto [0-9a-f]+...', line)):
 | 
						|
        continue
 | 
						|
 | 
						|
      # Fail for any unrecognized lines that start with '__'.
 | 
						|
      self.fail(line)
 | 
						|
    return results
 | 
						|
 | 
						|
  def checkBlock(self, stdout, items):
 | 
						|
    results = self.splitBlock(stdout)
 | 
						|
    for i in range(min(len(results), len(items))):
 | 
						|
      if isinstance(items[i], (list, tuple)):
 | 
						|
        verb = items[i][0]
 | 
						|
        path = items[i][1]
 | 
						|
      else:
 | 
						|
        verb = items[i]
 | 
						|
        path = self.root_dir
 | 
						|
      self.checkString(results[i][0][0], verb, (i, results[i][0][0], verb))
 | 
						|
      if sys.platform == 'win32':
 | 
						|
        # Make path lower case since casing can change randomly.
 | 
						|
        self.checkString(
 | 
						|
            results[i][0][2].lower(),
 | 
						|
            path.lower(),
 | 
						|
            (i, results[i][0][2].lower(), path.lower()))
 | 
						|
      else:
 | 
						|
        self.checkString(results[i][0][2], path, (i, results[i][0][2], path))
 | 
						|
    self.assertEqual(len(results), len(items), (stdout, items, len(results)))
 | 
						|
    return results
 | 
						|
 | 
						|
 |