You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
309 lines
10 KiB
Python
309 lines
10 KiB
Python
#!/usr/bin/env vpython3
|
|
# Copyright (c) 2015 The Chromium Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
import codecs
|
|
import copy
|
|
import os
|
|
import sys
|
|
import unittest
|
|
|
|
sys.path.insert(
|
|
0,
|
|
os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
|
|
'recipes', 'recipe_modules', 'bot_update', 'resources'))
|
|
import bot_update
|
|
|
|
# TODO: Should fix these warnings.
|
|
# pylint: disable=line-too-long
|
|
|
|
|
|
class MockedPopen(object):
|
|
"""A fake instance of a called subprocess.
|
|
|
|
This is meant to be used in conjunction with MockedCall.
|
|
"""
|
|
def __init__(self, args=None, kwargs=None):
|
|
self.args = args or []
|
|
self.kwargs = kwargs or {}
|
|
self.return_value = None
|
|
self.fails = False
|
|
|
|
def returns(self, rv):
|
|
"""Set the return value when this popen is called.
|
|
|
|
rv can be a string, or a callable (eg function).
|
|
"""
|
|
self.return_value = rv
|
|
return self
|
|
|
|
def check(self, args, kwargs):
|
|
"""Check to see if the given args/kwargs call match this instance.
|
|
|
|
This does a partial match, so that a call to "git clone foo" will match
|
|
this instance if this instance was recorded as "git clone"
|
|
"""
|
|
if any(input_arg != expected_arg
|
|
for (input_arg, expected_arg) in zip(args, self.args)):
|
|
return False
|
|
return self.return_value
|
|
|
|
def __call__(self, args, kwargs):
|
|
"""Actually call this popen instance."""
|
|
if hasattr(self.return_value, '__call__'):
|
|
return self.return_value(*args, **kwargs)
|
|
return self.return_value
|
|
|
|
|
|
class MockedCall(object):
|
|
"""A fake instance of bot_update.call().
|
|
|
|
This object is pre-seeded with "answers" in self.expectations. The type
|
|
is a MockedPopen object, or any object with a __call__() and check() method.
|
|
The check() method is used to check to see if the correct popen object is
|
|
chosen (can be a partial match, eg a "git clone" popen module would match
|
|
a "git clone foo" call).
|
|
By default, if no answers have been pre-seeded, the call() returns successful
|
|
with an empty string.
|
|
"""
|
|
def __init__(self, fake_filesystem):
|
|
self.expectations = []
|
|
self.records = []
|
|
|
|
def expect(self, args=None, kwargs=None):
|
|
args = args or []
|
|
kwargs = kwargs or {}
|
|
popen = MockedPopen(args, kwargs)
|
|
self.expectations.append(popen)
|
|
return popen
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
self.records.append((args, kwargs))
|
|
for popen in self.expectations:
|
|
if popen.check(args, kwargs):
|
|
self.expectations.remove(popen)
|
|
return popen(args, kwargs)
|
|
return ''
|
|
|
|
|
|
class MockedGclientSync():
|
|
"""A class producing a callable instance of gclient sync."""
|
|
def __init__(self, fake_filesystem):
|
|
self.records = []
|
|
|
|
def __call__(self, *args, **_):
|
|
self.records.append(args)
|
|
|
|
|
|
class FakeFile():
|
|
def __init__(self):
|
|
self.contents = ''
|
|
|
|
def write(self, buf):
|
|
self.contents += buf
|
|
|
|
def read(self):
|
|
return self.contents
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, _, __, ___):
|
|
pass
|
|
|
|
|
|
class FakeFilesystem():
|
|
def __init__(self):
|
|
self.files = {}
|
|
|
|
def open(self, target, mode='r', encoding=None):
|
|
if 'w' in mode:
|
|
self.files[target] = FakeFile()
|
|
return self.files[target]
|
|
return self.files[target]
|
|
|
|
|
|
def fake_git(*args, **kwargs):
|
|
return bot_update.call('git', *args, **kwargs)
|
|
|
|
|
|
class BotUpdateUnittests(unittest.TestCase):
|
|
DEFAULT_PARAMS = {
|
|
'solutions': [{
|
|
'name': 'somename',
|
|
'url': 'https://fake.com'
|
|
}],
|
|
'revisions': {},
|
|
'first_sln': 'somename',
|
|
'target_os': None,
|
|
'target_os_only': None,
|
|
'target_cpu': None,
|
|
'patch_root': None,
|
|
'patch_refs': [],
|
|
'gerrit_rebase_patch_ref': None,
|
|
'no_fetch_tags': False,
|
|
'refs': [],
|
|
'git_cache_dir': '',
|
|
'cleanup_dir': None,
|
|
'gerrit_reset': None,
|
|
'enforce_fetch': False,
|
|
'experiments': [],
|
|
}
|
|
|
|
def setUp(self):
|
|
sys.platform = 'linux2' # For consistency, ya know?
|
|
self.filesystem = FakeFilesystem()
|
|
self.call = MockedCall(self.filesystem)
|
|
self.gclient = MockedGclientSync(self.filesystem)
|
|
self.call.expect((sys.executable, '-u', bot_update.GCLIENT_PATH,
|
|
'sync')).returns(self.gclient)
|
|
self.old_call = getattr(bot_update, 'call')
|
|
self.params = copy.deepcopy(self.DEFAULT_PARAMS)
|
|
setattr(bot_update, 'call', self.call)
|
|
setattr(bot_update, 'git', fake_git)
|
|
|
|
self.old_os_cwd = os.getcwd
|
|
setattr(os, 'getcwd', lambda: '/b/build/foo/build')
|
|
|
|
setattr(bot_update, 'open', self.filesystem.open)
|
|
self.old_codecs_open = codecs.open
|
|
setattr(codecs, 'open', self.filesystem.open)
|
|
|
|
def tearDown(self):
|
|
setattr(bot_update, 'call', self.old_call)
|
|
setattr(os, 'getcwd', self.old_os_cwd)
|
|
delattr(bot_update, 'open')
|
|
setattr(codecs, 'open', self.old_codecs_open)
|
|
|
|
def overrideSetupForWindows(self):
|
|
sys.platform = 'win'
|
|
self.call.expect((sys.executable, '-u', bot_update.GCLIENT_PATH,
|
|
'sync')).returns(self.gclient)
|
|
|
|
def testBasic(self):
|
|
bot_update.ensure_checkout(**self.params)
|
|
return self.call.records
|
|
|
|
def testBasicCachepackOffloading(self):
|
|
os.environ['PACKFILE_OFFLOADING'] = '1'
|
|
bot_update.ensure_checkout(**self.params)
|
|
os.environ.pop('PACKFILE_OFFLOADING')
|
|
return self.call.records
|
|
|
|
def testBasicRevision(self):
|
|
self.params['revisions'] = {
|
|
'src': 'HEAD',
|
|
'src/v8': 'deadbeef',
|
|
'somename': 'DNE'
|
|
}
|
|
bot_update.ensure_checkout(**self.params)
|
|
args = self.gclient.records[0]
|
|
idx_first_revision = args.index('--revision')
|
|
idx_second_revision = args.index('--revision', idx_first_revision + 1)
|
|
idx_third_revision = args.index('--revision', idx_second_revision + 1)
|
|
self.assertEqual(args[idx_first_revision + 1], 'somename@unmanaged')
|
|
self.assertEqual(args[idx_second_revision + 1],
|
|
'src@refs/remotes/origin/main')
|
|
self.assertEqual(args[idx_third_revision + 1], 'src/v8@deadbeef')
|
|
return self.call.records
|
|
|
|
def testTagsByDefault(self):
|
|
bot_update.ensure_checkout(**self.params)
|
|
found = False
|
|
for record in self.call.records:
|
|
args = record[0]
|
|
if args[:3] == ('git', 'cache', 'populate'):
|
|
self.assertFalse('--no-fetch-tags' in args)
|
|
found = True
|
|
self.assertTrue(found)
|
|
return self.call.records
|
|
|
|
def testNoTags(self):
|
|
params = self.params
|
|
params['no_fetch_tags'] = True
|
|
bot_update.ensure_checkout(**params)
|
|
found = False
|
|
for record in self.call.records:
|
|
args = record[0]
|
|
if args[:3] == ('git', 'cache', 'populate'):
|
|
self.assertTrue('--no-fetch-tags' in args)
|
|
found = True
|
|
self.assertTrue(found)
|
|
return self.call.records
|
|
|
|
def testGclientNoSyncExperiment(self):
|
|
ref = 'refs/changes/12/345/6'
|
|
repo = 'https://chromium.googlesource.com/v8/v8'
|
|
self.params['patch_refs'] = ['%s@%s' % (repo, ref)]
|
|
self.params['experiments'] = bot_update.EXP_NO_SYNC
|
|
bot_update.ensure_checkout(**self.params)
|
|
args = self.gclient.records[0]
|
|
idx = args.index('--experiment')
|
|
self.assertEqual(args[idx + 1], bot_update.EXP_NO_SYNC)
|
|
|
|
def testApplyPatchOnGclient(self):
|
|
ref = 'refs/changes/12/345/6'
|
|
repo = 'https://chromium.googlesource.com/v8/v8'
|
|
self.params['patch_refs'] = ['%s@%s' % (repo, ref)]
|
|
bot_update.ensure_checkout(**self.params)
|
|
args = self.gclient.records[0]
|
|
idx = args.index('--patch-ref')
|
|
self.assertEqual(args[idx + 1], self.params['patch_refs'][0])
|
|
self.assertNotIn('--patch-ref', args[idx + 1:])
|
|
# Assert we're not patching in bot_update.py
|
|
for record in self.call.records:
|
|
self.assertNotIn('git fetch ' + repo, ' '.join(record[0]))
|
|
|
|
def testPatchRefs(self):
|
|
self.params['patch_refs'] = [
|
|
'https://chromium.googlesource.com/chromium/src@refs/changes/12/345/6',
|
|
'https://chromium.googlesource.com/v8/v8@refs/changes/1/234/56'
|
|
]
|
|
bot_update.ensure_checkout(**self.params)
|
|
args = self.gclient.records[0]
|
|
patch_refs = set(args[i + 1] for i in range(len(args))
|
|
if args[i] == '--patch-ref' and i + 1 < len(args))
|
|
self.assertIn(self.params['patch_refs'][0], patch_refs)
|
|
self.assertIn(self.params['patch_refs'][1], patch_refs)
|
|
|
|
def testGitCheckoutBreaksLocks(self):
|
|
self.overrideSetupForWindows()
|
|
path = '/b/build/foo/build/.git'
|
|
lockfile = 'index.lock'
|
|
removed = []
|
|
old_os_walk = os.walk
|
|
old_os_remove = os.remove
|
|
setattr(os, 'walk', lambda _: [(path, None, [lockfile])])
|
|
setattr(os, 'remove', removed.append)
|
|
bot_update.ensure_checkout(**self.params)
|
|
setattr(os, 'walk', old_os_walk)
|
|
setattr(os, 'remove', old_os_remove)
|
|
self.assertTrue(os.path.join(path, lockfile) in removed)
|
|
|
|
def testParsesRevisions(self):
|
|
revisions = [
|
|
'f671d3baeb64d9dba628ad582e867cf1aebc0207',
|
|
'src@deadbeef',
|
|
'https://foo.googlesource.com/bar@12345',
|
|
'bar@refs/experimental/test@example.com/test',
|
|
]
|
|
expected_results = {
|
|
'root': 'f671d3baeb64d9dba628ad582e867cf1aebc0207',
|
|
'src': 'deadbeef',
|
|
'https://foo.googlesource.com/bar.git': '12345',
|
|
'bar': 'refs/experimental/test@example.com/test',
|
|
}
|
|
actual_results = bot_update.parse_revisions(revisions, 'root')
|
|
self.assertEqual(expected_results, actual_results)
|
|
|
|
|
|
class CallUnitTest(unittest.TestCase):
|
|
def testCall(self):
|
|
ret = bot_update.call(sys.executable, '-c', 'print(1)')
|
|
self.assertEqual(u'1\n', ret)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|