You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
depot_tools/tests/gclient_scm_test.py

1043 lines
38 KiB
Python

#!/usr/bin/env python
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for gclient_scm.py."""
# pylint: disable=E1103
# Import before super_mox to keep valid references.
from shutil import rmtree
from subprocess import Popen, PIPE, STDOUT
import logging
import os
import re
import sys
import tempfile
import threading
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from testing_support.super_mox import mox, StdoutCheck, SuperMoxTestBase
from testing_support.super_mox import TestCaseUtils
import gclient_scm
import git_cache
import subprocess2
# Disable global git cache
git_cache.Mirror.SetCachePath(None)
# Shortcut since this function is used often
join = gclient_scm.os.path.join
TIMESTAMP_RE = re.compile('\[[0-9]{1,2}:[0-9]{2}:[0-9]{2}\] (.*)', re.DOTALL)
def strip_timestamps(value):
lines = value.splitlines(True)
for i in xrange(len(lines)):
m = TIMESTAMP_RE.match(lines[i])
if m:
lines[i] = m.group(1)
return ''.join(lines)
# Access to a protected member XXX of a client class
# pylint: disable=W0212
class GCBaseTestCase(object):
def assertRaisesError(self, msg, fn, *args, **kwargs):
"""Like unittest's assertRaises() but checks for Gclient.Error."""
# pylint: disable=E1101
try:
fn(*args, **kwargs)
except gclient_scm.gclient_utils.Error, e:
self.assertEquals(e.args[0], msg)
else:
self.fail('%s not raised' % msg)
class BaseTestCase(GCBaseTestCase, SuperMoxTestBase):
def setUp(self):
SuperMoxTestBase.setUp(self)
self.mox.StubOutWithMock(gclient_scm.gclient_utils, 'CheckCallAndFilter')
self.mox.StubOutWithMock(gclient_scm.gclient_utils,
'CheckCallAndFilterAndHeader')
self.mox.StubOutWithMock(gclient_scm.gclient_utils, 'FileRead')
self.mox.StubOutWithMock(gclient_scm.gclient_utils, 'FileWrite')
self.mox.StubOutWithMock(gclient_scm.gclient_utils, 'rmtree')
self.mox.StubOutWithMock(gclient_scm.scm.SVN, 'Capture')
self.mox.StubOutWithMock(gclient_scm.scm.SVN, '_CaptureInfo')
self.mox.StubOutWithMock(gclient_scm.scm.SVN, 'CaptureStatus')
self.mox.StubOutWithMock(gclient_scm.scm.SVN, 'RunAndGetFileList')
self.mox.StubOutWithMock(subprocess2, 'communicate')
self.mox.StubOutWithMock(subprocess2, 'Popen')
self._scm_wrapper = gclient_scm.CreateSCM
gclient_scm.scm.SVN.current_version = None
self._original_SVNBinaryExists = gclient_scm.SVNWrapper.BinaryExists
self._original_GitBinaryExists = gclient_scm.GitWrapper.BinaryExists
gclient_scm.SVNWrapper.BinaryExists = staticmethod(lambda : True)
gclient_scm.GitWrapper.BinaryExists = staticmethod(lambda : True)
# Absolute path of the fake checkout directory.
self.base_path = join(self.root_dir, self.relpath)
def tearDown(self):
SuperMoxTestBase.tearDown(self)
gclient_scm.SVNWrapper.BinaryExists = self._original_SVNBinaryExists
gclient_scm.GitWrapper.BinaryExists = self._original_GitBinaryExists
class BasicTests(SuperMoxTestBase):
def setUp(self):
SuperMoxTestBase.setUp(self)
def testGetFirstRemoteUrl(self):
REMOTE_STRINGS = [('remote.origin.url E:\\foo\\bar', 'E:\\foo\\bar'),
('remote.origin.url /b/foo/bar', '/b/foo/bar'),
('remote.origin.url https://foo/bar', 'https://foo/bar'),
('remote.origin.url E:\\Fo Bar\\bax', 'E:\\Fo Bar\\bax'),
('remote.origin.url git://what/"do', 'git://what/"do')]
FAKE_PATH = '/fake/path'
self.mox.StubOutWithMock(gclient_scm.scm.GIT, 'Capture')
for question, _ in REMOTE_STRINGS:
gclient_scm.scm.GIT.Capture(
['config', '--local', '--get-regexp', r'remote.*.url'],
cwd=FAKE_PATH).AndReturn(question)
self.mox.ReplayAll()
for _, answer in REMOTE_STRINGS:
self.assertEquals(gclient_scm.SCMWrapper._get_first_remote_url(FAKE_PATH),
answer)
def tearDown(self):
SuperMoxTestBase.tearDown(self)
class BaseGitWrapperTestCase(GCBaseTestCase, StdoutCheck, TestCaseUtils,
unittest.TestCase):
"""This class doesn't use pymox."""
class OptionsObject(object):
def __init__(self, verbose=False, revision=None):
self.auto_rebase = False
self.verbose = verbose
self.revision = revision
self.manually_grab_svn_rev = True
self.deps_os = None
self.force = False
self.reset = False
self.nohooks = False
self.no_history = False
self.upstream = False
self.cache_dir = None
self.merge = False
self.jobs = 1
self.break_repo_locks = False
self.delete_unversioned_trees = False
sample_git_import = """blob
mark :1
data 6
Hello
blob
mark :2
data 4
Bye
reset refs/heads/master
commit refs/heads/master
mark :3
author Bob <bob@example.com> 1253744361 -0700
committer Bob <bob@example.com> 1253744361 -0700
data 8
A and B
M 100644 :1 a
M 100644 :2 b
blob
mark :4
data 10
Hello
You
blob
mark :5
data 8
Bye
You
commit refs/heads/origin
mark :6
author Alice <alice@example.com> 1253744424 -0700
committer Alice <alice@example.com> 1253744424 -0700
data 13
Personalized
from :3
M 100644 :4 a
M 100644 :5 b
blob
mark :7
data 5
Mooh
commit refs/heads/feature
mark :8
author Bob <bob@example.com> 1390311986 -0000
committer Bob <bob@example.com> 1390311986 -0000
data 6
Add C
from :3
M 100644 :7 c
reset refs/heads/master
from :3
"""
def Options(self, *args, **kwargs):
return self.OptionsObject(*args, **kwargs)
def checkstdout(self, expected):
value = sys.stdout.getvalue()
sys.stdout.close()
# pylint: disable=E1101
self.assertEquals(expected, strip_timestamps(value))
@staticmethod
def CreateGitRepo(git_import, path):
"""Do it for real."""
try:
Popen(['git', 'init', '-q'], stdout=PIPE, stderr=STDOUT,
cwd=path).communicate()
except OSError:
# git is not available, skip this test.
return False
Popen(['git', 'fast-import', '--quiet'], stdin=PIPE, stdout=PIPE,
stderr=STDOUT, cwd=path).communicate(input=git_import)
Popen(['git', 'checkout', '-q'], stdout=PIPE, stderr=STDOUT,
cwd=path).communicate()
Popen(['git', 'remote', 'add', '-f', 'origin', '.'], stdout=PIPE,
stderr=STDOUT, cwd=path).communicate()
Popen(['git', 'checkout', '-b', 'new', 'origin/master', '-q'], stdout=PIPE,
stderr=STDOUT, cwd=path).communicate()
Popen(['git', 'push', 'origin', 'origin/origin:origin/master', '-q'],
stdout=PIPE, stderr=STDOUT, cwd=path).communicate()
Popen(['git', 'config', '--unset', 'remote.origin.fetch'], stdout=PIPE,
stderr=STDOUT, cwd=path).communicate()
Popen(['git', 'config', 'user.email', 'someuser@chromium.org'], stdout=PIPE,
stderr=STDOUT, cwd=path).communicate()
Popen(['git', 'config', 'user.name', 'Some User'], stdout=PIPE,
stderr=STDOUT, cwd=path).communicate()
return True
def _GetAskForDataCallback(self, expected_prompt, return_value):
def AskForData(prompt, options):
self.assertEquals(prompt, expected_prompt)
return return_value
return AskForData
def setUp(self):
TestCaseUtils.setUp(self)
unittest.TestCase.setUp(self)
self.url = 'git://foo'
# The .git suffix allows gclient_scm to recognize the dir as a git repo
# when cloning it locally
self.root_dir = tempfile.mkdtemp('.git')
self.relpath = '.'
self.base_path = join(self.root_dir, self.relpath)
self.enabled = self.CreateGitRepo(self.sample_git_import, self.base_path)
StdoutCheck.setUp(self)
self._original_GitBinaryExists = gclient_scm.GitWrapper.BinaryExists
self._original_SVNBinaryExists = gclient_scm.SVNWrapper.BinaryExists
gclient_scm.GitWrapper.BinaryExists = staticmethod(lambda : True)
gclient_scm.SVNWrapper.BinaryExists = staticmethod(lambda : True)
def tearDown(self):
try:
rmtree(self.root_dir)
StdoutCheck.tearDown(self)
TestCaseUtils.tearDown(self)
unittest.TestCase.tearDown(self)
finally:
# TODO(maruel): Use auto_stub.TestCase.
gclient_scm.GitWrapper.BinaryExists = self._original_GitBinaryExists
gclient_scm.SVNWrapper.BinaryExists = self._original_SVNBinaryExists
class ManagedGitWrapperTestCase(BaseGitWrapperTestCase):
def testRevertMissing(self):
if not self.enabled:
return
options = self.Options()
file_path = join(self.base_path, 'a')
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
file_list = []
scm.update(options, None, file_list)
gclient_scm.os.remove(file_path)
file_list = []
scm.revert(options, self.args, file_list)
self.assertEquals(file_list, [file_path])
file_list = []
scm.diff(options, self.args, file_list)
self.assertEquals(file_list, [])
sys.stdout.close()
def testRevertNone(self):
if not self.enabled:
return
options = self.Options()
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
file_list = []
scm.update(options, None, file_list)
file_list = []
scm.revert(options, self.args, file_list)
self.assertEquals(file_list, [])
self.assertEquals(scm.revinfo(options, self.args, None),
'a7142dc9f0009350b96a11f372b6ea658592aa95')
sys.stdout.close()
def testRevertModified(self):
if not self.enabled:
return
options = self.Options()
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
file_list = []
scm.update(options, None, file_list)
file_path = join(self.base_path, 'a')
open(file_path, 'a').writelines('touched\n')
file_list = []
scm.revert(options, self.args, file_list)
self.assertEquals(file_list, [file_path])
file_list = []
scm.diff(options, self.args, file_list)
self.assertEquals(file_list, [])
self.assertEquals(scm.revinfo(options, self.args, None),
'a7142dc9f0009350b96a11f372b6ea658592aa95')
sys.stdout.close()
def testRevertNew(self):
if not self.enabled:
return
options = self.Options()
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
file_list = []
scm.update(options, None, file_list)
file_path = join(self.base_path, 'c')
f = open(file_path, 'w')
f.writelines('new\n')
f.close()
Popen(['git', 'add', 'c'], stdout=PIPE,
stderr=STDOUT, cwd=self.base_path).communicate()
file_list = []
scm.revert(options, self.args, file_list)
self.assertEquals(file_list, [file_path])
file_list = []
scm.diff(options, self.args, file_list)
self.assertEquals(file_list, [])
self.assertEquals(scm.revinfo(options, self.args, None),
'a7142dc9f0009350b96a11f372b6ea658592aa95')
sys.stdout.close()
def testStatusNew(self):
if not self.enabled:
return
options = self.Options()
file_path = join(self.base_path, 'a')
open(file_path, 'a').writelines('touched\n')
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
file_list = []
scm.status(options, self.args, file_list)
self.assertEquals(file_list, [file_path])
self.checkstdout(
('\n________ running \'git diff --name-status '
'069c602044c5388d2d15c3f875b057c852003458\' in \'%s\'\nM\ta\n') %
join(self.root_dir, '.'))
def testStatus2New(self):
if not self.enabled:
return
options = self.Options()
expected_file_list = []
for f in ['a', 'b']:
file_path = join(self.base_path, f)
open(file_path, 'a').writelines('touched\n')
expected_file_list.extend([file_path])
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
file_list = []
scm.status(options, self.args, file_list)
expected_file_list = [join(self.base_path, x) for x in ['a', 'b']]
self.assertEquals(sorted(file_list), expected_file_list)
self.checkstdout(
('\n________ running \'git diff --name-status '
'069c602044c5388d2d15c3f875b057c852003458\' in \'%s\'\nM\ta\nM\tb\n') %
join(self.root_dir, '.'))
def testUpdateUpdate(self):
if not self.enabled:
return
options = self.Options()
expected_file_list = [join(self.base_path, x) for x in ['a', 'b']]
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
file_list = []
scm.update(options, (), file_list)
self.assertEquals(file_list, expected_file_list)
self.assertEquals(scm.revinfo(options, (), None),
'a7142dc9f0009350b96a11f372b6ea658592aa95')
sys.stdout.close()
def testUpdateMerge(self):
if not self.enabled:
return
options = self.Options()
options.merge = True
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
scm._Run(['checkout', '-q', 'feature'], options)
rev = scm.revinfo(options, (), None)
file_list = []
scm.update(options, (), file_list)
self.assertEquals(file_list, [join(self.base_path, x)
for x in ['a', 'b', 'c']])
# The actual commit that is created is unstable, so we verify its tree and
# parents instead.
self.assertEquals(scm._Capture(['rev-parse', 'HEAD:']),
'd2e35c10ac24d6c621e14a1fcadceb533155627d')
self.assertEquals(scm._Capture(['rev-parse', 'HEAD^1']), rev)
self.assertEquals(scm._Capture(['rev-parse', 'HEAD^2']),
scm._Capture(['rev-parse', 'origin/master']))
sys.stdout.close()
def testUpdateRebase(self):
if not self.enabled:
return
options = self.Options()
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
scm._Run(['checkout', '-q', 'feature'], options)
file_list = []
# Fake a 'y' key press.
scm._AskForData = self._GetAskForDataCallback(
'Cannot fast-forward merge, attempt to rebase? '
'(y)es / (q)uit / (s)kip : ', 'y')
scm.update(options, (), file_list)
self.assertEquals(file_list, [join(self.base_path, x)
for x in ['a', 'b', 'c']])
# The actual commit that is created is unstable, so we verify its tree and
# parent instead.
self.assertEquals(scm._Capture(['rev-parse', 'HEAD:']),
'd2e35c10ac24d6c621e14a1fcadceb533155627d')
self.assertEquals(scm._Capture(['rev-parse', 'HEAD^']),
scm._Capture(['rev-parse', 'origin/master']))
sys.stdout.close()
def testUpdateReset(self):
if not self.enabled:
return
options = self.Options()
options.reset = True
dir_path = join(self.base_path, 'c')
os.mkdir(dir_path)
open(join(dir_path, 'nested'), 'w').writelines('new\n')
file_path = join(self.base_path, 'file')
open(file_path, 'w').writelines('new\n')
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
file_list = []
scm.update(options, (), file_list)
self.assert_(gclient_scm.os.path.isdir(dir_path))
self.assert_(gclient_scm.os.path.isfile(file_path))
sys.stdout.close()
def testUpdateResetDeleteUnversionedTrees(self):
if not self.enabled:
return
options = self.Options()
options.reset = True
options.delete_unversioned_trees = True
dir_path = join(self.base_path, 'dir')
os.mkdir(dir_path)
open(join(dir_path, 'nested'), 'w').writelines('new\n')
file_path = join(self.base_path, 'file')
open(file_path, 'w').writelines('new\n')
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
file_list = []
scm.update(options, (), file_list)
self.assert_(not gclient_scm.os.path.isdir(dir_path))
self.assert_(gclient_scm.os.path.isfile(file_path))
sys.stdout.close()
def testUpdateUnstagedConflict(self):
if not self.enabled:
return
options = self.Options()
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
file_path = join(self.base_path, 'b')
open(file_path, 'w').writelines('conflict\n')
try:
scm.update(options, (), [])
self.fail()
except (gclient_scm.gclient_utils.Error, subprocess2.CalledProcessError):
# The exact exception text varies across git versions so it's not worth
# verifying it. It's fine as long as it throws.
pass
# Manually flush stdout since we can't verify it's content accurately across
# git versions.
sys.stdout.getvalue()
sys.stdout.close()
def testUpdateLocked(self):
if not self.enabled:
return
options = self.Options()
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
file_path = join(self.base_path, '.git', 'index.lock')
with open(file_path, 'w'):
pass
with self.assertRaisesRegexp(subprocess2.CalledProcessError,
'Unable to create.*/index.lock'):
scm.update(options, (), [])
sys.stdout.close()
def testUpdateLockedBreak(self):
if not self.enabled:
return
options = self.Options()
options.break_repo_locks = True
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
file_path = join(self.base_path, '.git', 'index.lock')
with open(file_path, 'w'):
pass
scm.update(options, (), [])
self.assertRegexpMatches(sys.stdout.getvalue(),
"breaking lock.*\.git/index\.lock")
self.assertFalse(os.path.exists(file_path))
sys.stdout.close()
def testUpdateConflict(self):
if not self.enabled:
return
options = self.Options()
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
file_path = join(self.base_path, 'b')
open(file_path, 'w').writelines('conflict\n')
scm._Run(['commit', '-am', 'test'], options)
scm._AskForData = self._GetAskForDataCallback(
'Cannot fast-forward merge, attempt to rebase? '
'(y)es / (q)uit / (s)kip : ', 'y')
exception = ('Conflict while rebasing this branch.\n'
'Fix the conflict and run gclient again.\n'
'See \'man git-rebase\' for details.\n')
self.assertRaisesError(exception, scm.update, options, (), [])
exception = ('\n____ . at refs/remotes/origin/master\n'
'\tYou have unstaged changes.\n'
'\tPlease commit, stash, or reset.\n')
self.assertRaisesError(exception, scm.update, options, (), [])
sys.stdout.close()
def testRevinfo(self):
if not self.enabled:
return
options = self.Options()
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
rev_info = scm.revinfo(options, (), None)
self.assertEquals(rev_info, '069c602044c5388d2d15c3f875b057c852003458')
class ManagedGitWrapperTestCaseMox(BaseTestCase):
class OptionsObject(object):
def __init__(self, verbose=False, revision=None, force=False):
self.verbose = verbose
self.revision = revision
self.deps_os = None
self.force = force
self.reset = False
self.nohooks = False
self.break_repo_locks = False
# TODO(maruel): Test --jobs > 1.
self.jobs = 1
def Options(self, *args, **kwargs):
return self.OptionsObject(*args, **kwargs)
def checkstdout(self, expected):
value = sys.stdout.getvalue()
sys.stdout.close()
# pylint: disable=E1101
self.assertEquals(expected, strip_timestamps(value))
def setUp(self):
BaseTestCase.setUp(self)
self.fake_hash_1 = 't0ta11yf4k3'
self.fake_hash_2 = '3v3nf4k3r'
self.url = 'git://foo'
self.root_dir = '/tmp' if sys.platform != 'win32' else 't:\\tmp'
self.relpath = 'fake'
self.base_path = os.path.join(self.root_dir, self.relpath)
self.backup_base_path = os.path.join(self.root_dir,
'old_%s.git' % self.relpath)
def tearDown(self):
BaseTestCase.tearDown(self)
def testGetUsableRevGit(self):
# pylint: disable=E1101
options = self.Options(verbose=True)
self.mox.StubOutWithMock(gclient_scm.scm.GIT, 'IsValidRevision', True)
gclient_scm.scm.GIT.IsValidRevision(cwd=self.base_path, rev=self.fake_hash_1
).AndReturn(True)
self.mox.StubOutWithMock(gclient_scm.scm.GIT, 'IsGitSvn', True)
gclient_scm.scm.GIT.IsGitSvn(cwd=self.base_path).MultipleTimes(
).AndReturn(False)
gclient_scm.scm.os.path.isdir(self.base_path).AndReturn(True)
gclient_scm.os.path.isdir(self.base_path).AndReturn(True)
self.mox.ReplayAll()
git_scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
# A [fake] git sha1 with a git repo should work (this is in the case that
# the LKGR gets flipped to git sha1's some day).
self.assertEquals(git_scm.GetUsableRev(self.fake_hash_1, options),
self.fake_hash_1)
# An SVN rev with an existing purely git repo should raise an exception.
self.assertRaises(gclient_scm.gclient_utils.Error,
git_scm.GetUsableRev, '1', options)
def testGetUsableRevGitSvn(self):
# pylint: disable=E1101
options = self.Options()
too_big = str(1e7)
# Pretend like the git-svn repo's HEAD is at r2.
self.mox.StubOutWithMock(gclient_scm.scm.GIT, 'GetGitSvnHeadRev', True)
gclient_scm.scm.GIT.GetGitSvnHeadRev(cwd=self.base_path).MultipleTimes(
).AndReturn(2)
self.mox.StubOutWithMock(
gclient_scm.scm.GIT, 'GetBlessedSha1ForSvnRev', True)
# r1 -> first fake hash, r3 -> second fake hash.
gclient_scm.scm.GIT.GetBlessedSha1ForSvnRev(cwd=self.base_path, rev='1'
).AndReturn(self.fake_hash_1)
gclient_scm.scm.GIT.GetBlessedSha1ForSvnRev(cwd=self.base_path, rev='3'
).MultipleTimes().AndReturn(self.fake_hash_2)
# Ensure that we call git svn fetch if our LKGR is > the git-svn HEAD rev.
self.mox.StubOutWithMock(gclient_scm.GitWrapper, '_Fetch', True)
self.mox.StubOutWithMock(gclient_scm.scm.GIT, 'Capture', True)
gclient_scm.scm.GIT.Capture(['config', '--get', 'svn-remote.svn.fetch'],
cwd=self.base_path).AndReturn('blah')
# pylint: disable=E1120
gclient_scm.scm.GIT.Capture(['svn', 'fetch'], cwd=self.base_path)
error = subprocess2.CalledProcessError(1, 'cmd', '/cwd', 'stdout', 'stderr')
gclient_scm.scm.GIT.Capture(['config', '--get', 'svn-remote.svn.fetch'],
cwd=self.base_path).AndRaise(error)
gclient_scm.GitWrapper._Fetch(options)
gclient_scm.scm.GIT.Capture(['svn', 'fetch'], cwd=self.base_path)
gclient_scm.GitWrapper._Fetch(options)
self.mox.StubOutWithMock(gclient_scm.scm.GIT, 'IsGitSvn', True)
gclient_scm.scm.GIT.IsGitSvn(cwd=self.base_path).MultipleTimes(
).AndReturn(True)
self.mox.StubOutWithMock(gclient_scm.scm.GIT, 'IsValidRevision', True)
gclient_scm.scm.GIT.IsValidRevision(cwd=self.base_path, rev=self.fake_hash_1
).AndReturn(True)
gclient_scm.scm.GIT.IsValidRevision(cwd=self.base_path, rev=too_big
).MultipleTimes(2).AndReturn(False)
gclient_scm.os.path.isdir(self.base_path).AndReturn(False)
gclient_scm.os.path.isdir(self.base_path).MultipleTimes().AndReturn(True)
self.mox.ReplayAll()
git_svn_scm = self._scm_wrapper(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
# Without an existing checkout, this should fail.
# TODO(dbeam) Fix this. http://crbug.com/109184
self.assertRaises(gclient_scm.gclient_utils.Error,
git_svn_scm.GetUsableRev, '1', options)
# Given an SVN revision with a git-svn checkout, it should be translated to
# a git sha1 and be usable.
self.assertEquals(git_svn_scm.GetUsableRev('1', options),
self.fake_hash_1)
# Our fake HEAD rev is r2, so this should call git fetch and git svn fetch
# to get more revs (pymox will complain if this doesn't happen). We mock an
# optimized checkout the first time, so this run should call git fetch.
self.assertEquals(git_svn_scm.GetUsableRev('3', options),
self.fake_hash_2)
# The time we pretend we're not optimized, so no git fetch should fire.
self.assertEquals(git_svn_scm.GetUsableRev('3', options),
self.fake_hash_2)
# Given a git sha1 with a git-svn checkout, it should be used as is.
self.assertEquals(git_svn_scm.GetUsableRev(self.fake_hash_1, options),
self.fake_hash_1)
# We currently check for seemingly valid SVN revisions by assuming 6 digit
# numbers, so assure that numeric revs >= 1000000 don't work.
self.assertRaises(gclient_scm.gclient_utils.Error,
git_svn_scm.GetUsableRev, too_big, options)
def testUpdateNoDotGit(self):
options = self.Options()
gclient_scm.os.path.isdir(
os.path.join(self.base_path, '.git', 'hooks')).AndReturn(False)
gclient_scm.os.path.exists(self.backup_base_path).AndReturn(False)
gclient_scm.os.path.exists(self.base_path).AndReturn(True)
gclient_scm.os.path.isdir(self.base_path).AndReturn(True)
gclient_scm.os.path.exists(os.path.join(self.base_path, '.git')
).AndReturn(False)
self.mox.StubOutWithMock(gclient_scm.GitWrapper, '_Clone', True)
# pylint: disable=E1120
gclient_scm.GitWrapper._Clone('refs/remotes/origin/master', self.url,
options)
self.mox.StubOutWithMock(gclient_scm.subprocess2, 'check_output', True)
gclient_scm.subprocess2.check_output(
['git', 'ls-files'], cwd=self.base_path,
env=gclient_scm.scm.GIT.ApplyEnvVars({}), stderr=-1,).AndReturn('')
gclient_scm.subprocess2.check_output(
['git', 'rev-parse', '--verify', 'HEAD'],
cwd=self.base_path,
env=gclient_scm.scm.GIT.ApplyEnvVars({}),
stderr=-1,
).AndReturn('')
self.mox.ReplayAll()
scm = self._scm_wrapper(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
scm.update(options, None, [])
self.checkstdout('\n')
def testUpdateConflict(self):
options = self.Options()
gclient_scm.os.path.isdir(
os.path.join(self.base_path, '.git', 'hooks')).AndReturn(False)
gclient_scm.os.path.exists(self.backup_base_path).AndReturn(False)
gclient_scm.os.path.exists(self.base_path).AndReturn(True)
gclient_scm.os.path.isdir(self.base_path).AndReturn(True)
gclient_scm.os.path.exists(os.path.join(self.base_path, '.git')
).AndReturn(False)
self.mox.StubOutWithMock(gclient_scm.GitWrapper, '_Clone', True)
# pylint: disable=E1120
gclient_scm.GitWrapper._Clone(
'refs/remotes/origin/master', self.url, options
).AndRaise(gclient_scm.subprocess2.CalledProcessError(None, None, None,
None, None))
self.mox.StubOutWithMock(gclient_scm.GitWrapper, '_DeleteOrMove', True)
gclient_scm.GitWrapper._DeleteOrMove(False)
gclient_scm.GitWrapper._Clone('refs/remotes/origin/master', self.url,
options)
self.mox.StubOutWithMock(gclient_scm.subprocess2, 'check_output', True)
gclient_scm.subprocess2.check_output(
['git', 'ls-files'], cwd=self.base_path,
env=gclient_scm.scm.GIT.ApplyEnvVars({}), stderr=-1,).AndReturn('')
gclient_scm.subprocess2.check_output(
['git', 'rev-parse', '--verify', 'HEAD'],
cwd=self.base_path,
env=gclient_scm.scm.GIT.ApplyEnvVars({}),
stderr=-1,
).AndReturn('')
self.mox.ReplayAll()
scm = self._scm_wrapper(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
scm.update(options, None, [])
self.checkstdout('\n')
class UnmanagedGitWrapperTestCase(BaseGitWrapperTestCase):
def checkInStdout(self, expected):
value = sys.stdout.getvalue()
sys.stdout.close()
# pylint: disable=E1101
self.assertIn(expected, value)
def checkNotInStdout(self, expected):
value = sys.stdout.getvalue()
sys.stdout.close()
# pylint: disable=E1101
self.assertNotIn(expected, value)
def getCurrentBranch(self):
# Returns name of current branch or HEAD for detached HEAD
branch = gclient_scm.scm.GIT.Capture(['rev-parse', '--abbrev-ref', 'HEAD'],
cwd=self.base_path)
if branch == 'HEAD':
return None
return branch
def testUpdateClone(self):
if not self.enabled:
return
options = self.Options()
origin_root_dir = self.root_dir
self.root_dir = tempfile.mkdtemp()
self.relpath = '.'
self.base_path = join(self.root_dir, self.relpath)
scm = gclient_scm.CreateSCM(url=origin_root_dir,
root_dir=self.root_dir,
relpath=self.relpath)
expected_file_list = [join(self.base_path, "a"),
join(self.base_path, "b")]
file_list = []
options.revision = 'unmanaged'
scm.update(options, (), file_list)
self.assertEquals(file_list, expected_file_list)
self.assertEquals(scm.revinfo(options, (), None),
'069c602044c5388d2d15c3f875b057c852003458')
# indicates detached HEAD
self.assertEquals(self.getCurrentBranch(), None)
self.checkInStdout(
'Checked out refs/remotes/origin/master to a detached HEAD')
rmtree(origin_root_dir)
def testUpdateCloneOnCommit(self):
if not self.enabled:
return
options = self.Options()
origin_root_dir = self.root_dir
self.root_dir = tempfile.mkdtemp()
self.relpath = '.'
self.base_path = join(self.root_dir, self.relpath)
url_with_commit_ref = origin_root_dir +\
'@a7142dc9f0009350b96a11f372b6ea658592aa95'
scm = gclient_scm.CreateSCM(url=url_with_commit_ref,
root_dir=self.root_dir,
relpath=self.relpath)
expected_file_list = [join(self.base_path, "a"),
join(self.base_path, "b")]
file_list = []
options.revision = 'unmanaged'
scm.update(options, (), file_list)
self.assertEquals(file_list, expected_file_list)
self.assertEquals(scm.revinfo(options, (), None),
'a7142dc9f0009350b96a11f372b6ea658592aa95')
# indicates detached HEAD
self.assertEquals(self.getCurrentBranch(), None)
self.checkInStdout(
'Checked out a7142dc9f0009350b96a11f372b6ea658592aa95 to a detached HEAD')
rmtree(origin_root_dir)
def testUpdateCloneOnBranch(self):
if not self.enabled:
return
options = self.Options()
origin_root_dir = self.root_dir
self.root_dir = tempfile.mkdtemp()
self.relpath = '.'
self.base_path = join(self.root_dir, self.relpath)
url_with_branch_ref = origin_root_dir + '@feature'
scm = gclient_scm.CreateSCM(url=url_with_branch_ref,
root_dir=self.root_dir,
relpath=self.relpath)
expected_file_list = [join(self.base_path, "a"),
join(self.base_path, "b"),
join(self.base_path, "c")]
file_list = []
options.revision = 'unmanaged'
scm.update(options, (), file_list)
self.assertEquals(file_list, expected_file_list)
self.assertEquals(scm.revinfo(options, (), None),
'9a51244740b25fa2ded5252ca00a3178d3f665a9')
self.assertEquals(self.getCurrentBranch(), 'feature')
self.checkNotInStdout('Checked out feature to a detached HEAD')
rmtree(origin_root_dir)
def testUpdateCloneOnFetchedRemoteBranch(self):
if not self.enabled:
return
options = self.Options()
origin_root_dir = self.root_dir
self.root_dir = tempfile.mkdtemp()
self.relpath = '.'
self.base_path = join(self.root_dir, self.relpath)
url_with_branch_ref = origin_root_dir + '@refs/remotes/origin/feature'
scm = gclient_scm.CreateSCM(url=url_with_branch_ref,
root_dir=self.root_dir,
relpath=self.relpath)
expected_file_list = [join(self.base_path, "a"),
join(self.base_path, "b"),
join(self.base_path, "c")]
file_list = []
options.revision = 'unmanaged'
scm.update(options, (), file_list)
self.assertEquals(file_list, expected_file_list)
self.assertEquals(scm.revinfo(options, (), None),
'9a51244740b25fa2ded5252ca00a3178d3f665a9')
# indicates detached HEAD
self.assertEquals(self.getCurrentBranch(), None)
self.checkInStdout(
'Checked out refs/remotes/origin/feature to a detached HEAD')
rmtree(origin_root_dir)
def testUpdateCloneOnTrueRemoteBranch(self):
if not self.enabled:
return
options = self.Options()
origin_root_dir = self.root_dir
self.root_dir = tempfile.mkdtemp()
self.relpath = '.'
self.base_path = join(self.root_dir, self.relpath)
url_with_branch_ref = origin_root_dir + '@refs/heads/feature'
scm = gclient_scm.CreateSCM(url=url_with_branch_ref,
root_dir=self.root_dir,
relpath=self.relpath)
expected_file_list = [join(self.base_path, "a"),
join(self.base_path, "b"),
join(self.base_path, "c")]
file_list = []
options.revision = 'unmanaged'
scm.update(options, (), file_list)
self.assertEquals(file_list, expected_file_list)
self.assertEquals(scm.revinfo(options, (), None),
'9a51244740b25fa2ded5252ca00a3178d3f665a9')
# @refs/heads/feature is AKA @refs/remotes/origin/feature in the clone, so
# should be treated as such by gclient.
# TODO(mmoss): Though really, we should only allow DEPS to specify branches
# as they are known in the upstream repo, since the mapping into the local
# repo can be modified by users (or we might even want to change the gclient
# defaults at some point). But that will take more work to stop using
# refs/remotes/ everywhere that we do (and to stop assuming a DEPS ref will
# always resolve locally, like when passing them to show-ref or rev-list).
self.assertEquals(self.getCurrentBranch(), None)
self.checkInStdout(
'Checked out refs/remotes/origin/feature to a detached HEAD')
rmtree(origin_root_dir)
def testUpdateUpdate(self):
if not self.enabled:
return
options = self.Options()
expected_file_list = []
scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
file_list = []
options.revision = 'unmanaged'
scm.update(options, (), file_list)
self.assertEquals(file_list, expected_file_list)
self.assertEquals(scm.revinfo(options, (), None),
'069c602044c5388d2d15c3f875b057c852003458')
self.checkstdout('________ unmanaged solution; skipping .\n')
class GitHungTest(BaseGitWrapperTestCase):
def setUp(self):
super(GitHungTest, self).setUp()
self.old = gclient_scm.gclient_utils.CheckCallAndFilter
self.old2 = gclient_scm.gclient_utils.subprocess2.Popen
self.options = self.Options()
self.options.verbose = False
self.scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir,
relpath=self.relpath)
os.environ['GCLIENT_KILL_GIT_FETCH_AFTER'] = '1.0'
def tearDown(self):
os.environ.pop('GCLIENT_KILL_GIT_FETCH_AFTER')
gclient_scm.gclient_utils.CheckCallAndFilter = self.old
gclient_scm.gclient_utils.subprocess2.Popen = self.old2
super(GitHungTest, self).tearDown()
def testGitFetchOk(self):
def subprocess_git_fetch_run(_, filter_fn, kill_timeout, **__):
self.assertEqual(kill_timeout, 1.0)
filter_fn('remote: something')
gclient_scm.gclient_utils.CheckCallAndFilter = subprocess_git_fetch_run
self.scm._Fetch(self.options)
self.checkstdout('remote: something\n')
def testGitFetchHungAndRetry(self):
class Process(object):
# First process will hang, second process will exit with 0 quickly.
cv = threading.Condition()
count = -1
killed = []
def __init__(self):
self.count += 1
self.stdout = self
self.data = list('retry' if self.count > 0 else 'hung')
self.data.reverse()
self.this_killed = False
def read(self, _):
if self.data:
return self.data.pop()
if self.count == 0:
# Simulate hung process.
with self.cv:
self.cv.wait(timeout=0)
return ''
def kill(self):
self.this_killed = True
self.killed.append(self.count)
with self.cv:
self.cv.notify()
def wait(self):
if self.this_killed:
return 1
return 0
gclient_scm.gclient_utils.subprocess2.Popen = lambda *_, **__: Process()
self.scm._Capture = lambda *_, **__: None
self.scm._Fetch(self.options)
self.checkstdout('hung\n')
if __name__ == '__main__':
level = logging.DEBUG if '-v' in sys.argv else logging.FATAL
logging.basicConfig(
level=level,
format='%(asctime).19s %(levelname)s %(filename)s:'
'%(lineno)s %(message)s')
unittest.main()
# vim: ts=2:sw=2:tw=80:et: