You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			294 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			294 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Python
		
	
#!/usr/bin/env vpython3
 | 
						|
# Copyright (c) 2015 The Chromium Authors. All rights reserved.
 | 
						|
# Use of this source code is governed by a BSD-style license that can be
 | 
						|
# found in the LICENSE file.
 | 
						|
 | 
						|
import codecs
 | 
						|
import copy
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import unittest
 | 
						|
 | 
						|
sys.path.insert(0, os.path.join(
 | 
						|
    os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
 | 
						|
    'recipes', 'recipe_modules', 'bot_update', 'resources'))
 | 
						|
import bot_update
 | 
						|
 | 
						|
 | 
						|
class MockedPopen(object):
 | 
						|
  """A fake instance of a called subprocess.
 | 
						|
 | 
						|
  This is meant to be used in conjunction with MockedCall.
 | 
						|
  """
 | 
						|
  def __init__(self, args=None, kwargs=None):
 | 
						|
    self.args = args or []
 | 
						|
    self.kwargs = kwargs or {}
 | 
						|
    self.return_value = None
 | 
						|
    self.fails = False
 | 
						|
 | 
						|
  def returns(self, rv):
 | 
						|
    """Set the return value when this popen is called.
 | 
						|
 | 
						|
    rv can be a string, or a callable (eg function).
 | 
						|
    """
 | 
						|
    self.return_value = rv
 | 
						|
    return self
 | 
						|
 | 
						|
  def check(self, args, kwargs):
 | 
						|
    """Check to see if the given args/kwargs call match this instance.
 | 
						|
 | 
						|
    This does a partial match, so that a call to "git clone foo" will match
 | 
						|
    this instance if this instance was recorded as "git clone"
 | 
						|
    """
 | 
						|
    if any(input_arg != expected_arg
 | 
						|
           for (input_arg, expected_arg) in zip(args, self.args)):
 | 
						|
      return False
 | 
						|
    return self.return_value
 | 
						|
 | 
						|
  def __call__(self, args, kwargs):
 | 
						|
    """Actually call this popen instance."""
 | 
						|
    if hasattr(self.return_value, '__call__'):
 | 
						|
      return self.return_value(*args, **kwargs)
 | 
						|
    return self.return_value
 | 
						|
 | 
						|
 | 
						|
class MockedCall(object):
 | 
						|
  """A fake instance of bot_update.call().
 | 
						|
 | 
						|
  This object is pre-seeded with "answers" in self.expectations.  The type
 | 
						|
  is a MockedPopen object, or any object with a __call__() and check() method.
 | 
						|
  The check() method is used to check to see if the correct popen object is
 | 
						|
  chosen (can be a partial match, eg a "git clone" popen module would match
 | 
						|
  a "git clone foo" call).
 | 
						|
  By default, if no answers have been pre-seeded, the call() returns successful
 | 
						|
  with an empty string.
 | 
						|
  """
 | 
						|
  def __init__(self, fake_filesystem):
 | 
						|
    self.expectations = []
 | 
						|
    self.records = []
 | 
						|
 | 
						|
  def expect(self, args=None, kwargs=None):
 | 
						|
    args = args or []
 | 
						|
    kwargs = kwargs or {}
 | 
						|
    popen = MockedPopen(args, kwargs)
 | 
						|
    self.expectations.append(popen)
 | 
						|
    return popen
 | 
						|
 | 
						|
  def __call__(self, *args, **kwargs):
 | 
						|
    self.records.append((args, kwargs))
 | 
						|
    for popen in self.expectations:
 | 
						|
      if popen.check(args, kwargs):
 | 
						|
        self.expectations.remove(popen)
 | 
						|
        return popen(args, kwargs)
 | 
						|
    return ''
 | 
						|
 | 
						|
 | 
						|
class MockedGclientSync():
 | 
						|
  """A class producing a callable instance of gclient sync."""
 | 
						|
  def __init__(self, fake_filesystem):
 | 
						|
    self.records = []
 | 
						|
 | 
						|
  def __call__(self, *args, **_):
 | 
						|
    self.records.append(args)
 | 
						|
 | 
						|
 | 
						|
class FakeFile():
 | 
						|
  def __init__(self):
 | 
						|
    self.contents = ''
 | 
						|
 | 
						|
  def write(self, buf):
 | 
						|
    self.contents += buf
 | 
						|
 | 
						|
  def read(self):
 | 
						|
    return self.contents
 | 
						|
 | 
						|
  def __enter__(self):
 | 
						|
    return self
 | 
						|
 | 
						|
  def __exit__(self, _, __, ___):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class FakeFilesystem():
 | 
						|
  def __init__(self):
 | 
						|
    self.files = {}
 | 
						|
 | 
						|
  def open(self, target, mode='r', encoding=None):
 | 
						|
    if 'w' in mode:
 | 
						|
      self.files[target] = FakeFile()
 | 
						|
      return self.files[target]
 | 
						|
    return self.files[target]
 | 
						|
 | 
						|
 | 
						|
def fake_git(*args, **kwargs):
 | 
						|
  return bot_update.call('git', *args, **kwargs)
 | 
						|
 | 
						|
 | 
						|
class BotUpdateUnittests(unittest.TestCase):
 | 
						|
  DEFAULT_PARAMS = {
 | 
						|
      'solutions': [{
 | 
						|
          'name': 'somename',
 | 
						|
          'url': 'https://fake.com'
 | 
						|
      }],
 | 
						|
      'revisions': {},
 | 
						|
      'first_sln': 'somename',
 | 
						|
      'target_os': None,
 | 
						|
      'target_os_only': None,
 | 
						|
      'target_cpu': None,
 | 
						|
      'patch_root': None,
 | 
						|
      'patch_refs': [],
 | 
						|
      'gerrit_rebase_patch_ref': None,
 | 
						|
      'no_fetch_tags': False,
 | 
						|
      'refs': [],
 | 
						|
      'git_cache_dir': '',
 | 
						|
      'cleanup_dir': None,
 | 
						|
      'gerrit_reset': None,
 | 
						|
      'enforce_fetch': False,
 | 
						|
      'experiments': [],
 | 
						|
  }
 | 
						|
 | 
						|
  def setUp(self):
 | 
						|
    sys.platform = 'linux2'  # For consistency, ya know?
 | 
						|
    self.filesystem = FakeFilesystem()
 | 
						|
    self.call = MockedCall(self.filesystem)
 | 
						|
    self.gclient = MockedGclientSync(self.filesystem)
 | 
						|
    self.call.expect(
 | 
						|
        (sys.executable, '-u', bot_update.GCLIENT_PATH, 'sync')
 | 
						|
    ).returns(self.gclient)
 | 
						|
    self.old_call = getattr(bot_update, 'call')
 | 
						|
    self.params = copy.deepcopy(self.DEFAULT_PARAMS)
 | 
						|
    setattr(bot_update, 'call', self.call)
 | 
						|
    setattr(bot_update, 'git', fake_git)
 | 
						|
 | 
						|
    self.old_os_cwd = os.getcwd
 | 
						|
    setattr(os, 'getcwd', lambda: '/b/build/foo/build')
 | 
						|
 | 
						|
    setattr(bot_update, 'open', self.filesystem.open)
 | 
						|
    self.old_codecs_open = codecs.open
 | 
						|
    setattr(codecs, 'open', self.filesystem.open)
 | 
						|
 | 
						|
  def tearDown(self):
 | 
						|
    setattr(bot_update, 'call', self.old_call)
 | 
						|
    setattr(os, 'getcwd', self.old_os_cwd)
 | 
						|
    delattr(bot_update, 'open')
 | 
						|
    setattr(codecs, 'open', self.old_codecs_open)
 | 
						|
 | 
						|
  def overrideSetupForWindows(self):
 | 
						|
    sys.platform = 'win'
 | 
						|
    self.call.expect(
 | 
						|
        (sys.executable, '-u', bot_update.GCLIENT_PATH, 'sync')
 | 
						|
    ).returns(self.gclient)
 | 
						|
 | 
						|
  def testBasic(self):
 | 
						|
    bot_update.ensure_checkout(**self.params)
 | 
						|
    return self.call.records
 | 
						|
 | 
						|
  def testBasicCachepackOffloading(self):
 | 
						|
    os.environ['PACKFILE_OFFLOADING'] = '1'
 | 
						|
    bot_update.ensure_checkout(**self.params)
 | 
						|
    os.environ.pop('PACKFILE_OFFLOADING')
 | 
						|
    return self.call.records
 | 
						|
 | 
						|
  def testBasicRevision(self):
 | 
						|
    self.params['revisions'] = {
 | 
						|
        'src': 'HEAD', 'src/v8': 'deadbeef', 'somename': 'DNE'}
 | 
						|
    bot_update.ensure_checkout(**self.params)
 | 
						|
    args = self.gclient.records[0]
 | 
						|
    idx_first_revision = args.index('--revision')
 | 
						|
    idx_second_revision = args.index(
 | 
						|
        '--revision', idx_first_revision+1)
 | 
						|
    idx_third_revision = args.index('--revision', idx_second_revision+1)
 | 
						|
    self.assertEqual(args[idx_first_revision+1], 'somename@unmanaged')
 | 
						|
    self.assertEqual(
 | 
						|
        args[idx_second_revision+1], 'src@refs/remotes/origin/main')
 | 
						|
    self.assertEqual(args[idx_third_revision+1], 'src/v8@deadbeef')
 | 
						|
    return self.call.records
 | 
						|
 | 
						|
  def testTagsByDefault(self):
 | 
						|
    bot_update.ensure_checkout(**self.params)
 | 
						|
    found = False
 | 
						|
    for record in self.call.records:
 | 
						|
      args = record[0]
 | 
						|
      if args[:3] == ('git', 'cache', 'populate'):
 | 
						|
        self.assertFalse('--no-fetch-tags' in args)
 | 
						|
        found = True
 | 
						|
    self.assertTrue(found)
 | 
						|
    return self.call.records
 | 
						|
 | 
						|
  def testNoTags(self):
 | 
						|
    params = self.params
 | 
						|
    params['no_fetch_tags'] = True
 | 
						|
    bot_update.ensure_checkout(**params)
 | 
						|
    found = False
 | 
						|
    for record in self.call.records:
 | 
						|
      args = record[0]
 | 
						|
      if args[:3] == ('git', 'cache', 'populate'):
 | 
						|
        self.assertTrue('--no-fetch-tags' in args)
 | 
						|
        found = True
 | 
						|
    self.assertTrue(found)
 | 
						|
    return self.call.records
 | 
						|
 | 
						|
  def testApplyPatchOnGclient(self):
 | 
						|
    ref = 'refs/changes/12/345/6'
 | 
						|
    repo = 'https://chromium.googlesource.com/v8/v8'
 | 
						|
    self.params['patch_refs'] = ['%s@%s' % (repo, ref)]
 | 
						|
    bot_update.ensure_checkout(**self.params)
 | 
						|
    args = self.gclient.records[0]
 | 
						|
    idx = args.index('--patch-ref')
 | 
						|
    self.assertEqual(args[idx+1], self.params['patch_refs'][0])
 | 
						|
    self.assertNotIn('--patch-ref', args[idx+1:])
 | 
						|
    # Assert we're not patching in bot_update.py
 | 
						|
    for record in self.call.records:
 | 
						|
      self.assertNotIn('git fetch ' + repo,
 | 
						|
                       ' '.join(record[0]))
 | 
						|
 | 
						|
  def testPatchRefs(self):
 | 
						|
    self.params['patch_refs'] = [
 | 
						|
        'https://chromium.googlesource.com/chromium/src@refs/changes/12/345/6',
 | 
						|
        'https://chromium.googlesource.com/v8/v8@refs/changes/1/234/56']
 | 
						|
    bot_update.ensure_checkout(**self.params)
 | 
						|
    args = self.gclient.records[0]
 | 
						|
    patch_refs = set(
 | 
						|
        args[i+1] for i in range(len(args))
 | 
						|
        if args[i] == '--patch-ref' and i+1 < len(args))
 | 
						|
    self.assertIn(self.params['patch_refs'][0], patch_refs)
 | 
						|
    self.assertIn(self.params['patch_refs'][1], patch_refs)
 | 
						|
 | 
						|
  def testGitCheckoutBreaksLocks(self):
 | 
						|
    self.overrideSetupForWindows()
 | 
						|
    path = '/b/build/foo/build/.git'
 | 
						|
    lockfile = 'index.lock'
 | 
						|
    removed = []
 | 
						|
    old_os_walk = os.walk
 | 
						|
    old_os_remove = os.remove
 | 
						|
    setattr(os, 'walk', lambda _: [(path, None, [lockfile])])
 | 
						|
    setattr(os, 'remove', removed.append)
 | 
						|
    bot_update.ensure_checkout(**self.params)
 | 
						|
    setattr(os, 'walk', old_os_walk)
 | 
						|
    setattr(os, 'remove', old_os_remove)
 | 
						|
    self.assertTrue(os.path.join(path, lockfile) in removed)
 | 
						|
 | 
						|
  def testParsesRevisions(self):
 | 
						|
    revisions = [
 | 
						|
      'f671d3baeb64d9dba628ad582e867cf1aebc0207',
 | 
						|
      'src@deadbeef',
 | 
						|
      'https://foo.googlesource.com/bar@12345',
 | 
						|
      'bar@refs/experimental/test@example.com/test',
 | 
						|
    ]
 | 
						|
    expected_results = {
 | 
						|
      'root': 'f671d3baeb64d9dba628ad582e867cf1aebc0207',
 | 
						|
      'src': 'deadbeef',
 | 
						|
      'https://foo.googlesource.com/bar.git': '12345',
 | 
						|
      'bar': 'refs/experimental/test@example.com/test',
 | 
						|
    }
 | 
						|
    actual_results = bot_update.parse_revisions(revisions, 'root')
 | 
						|
    self.assertEqual(expected_results, actual_results)
 | 
						|
 | 
						|
class CallUnitTest(unittest.TestCase):
 | 
						|
  def testCall(self):
 | 
						|
    ret = bot_update.call(sys.executable, '-c', 'print(1)')
 | 
						|
    self.assertEqual(u'1\n', ret)
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
  unittest.main()
 |