#!/usr/bin/env vpython3 # coding=utf-8 # Copyright 2013 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Unit tests for git_common.py""" import binascii import collections import datetime import os import shutil import signal import sys import tempfile import time import unittest DEPOT_TOOLS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, DEPOT_TOOLS_ROOT) from testing_support import coverage_utils from testing_support import git_test_utils GitRepo = git_test_utils.GitRepo class GitCommonTestBase(unittest.TestCase): @classmethod def setUpClass(cls): super(GitCommonTestBase, cls).setUpClass() import git_common cls.gc = git_common cls.gc.TEST_MODE = True os.environ["GIT_EDITOR"] = ":" # Supress git editor during rebase. class Support(GitCommonTestBase): def _testMemoizeOneBody(self, threadsafe): calls = collections.defaultdict(int) def double_if_even(val): calls[val] += 1 return val * 2 if val % 2 == 0 else None # Use this explicitly as a wrapper fn instead of a decorator. Otherwise # pylint crashes (!!) double_if_even = self.gc.memoize_one( threadsafe=threadsafe)(double_if_even) self.assertEqual(4, double_if_even(2)) self.assertEqual(4, double_if_even(2)) self.assertEqual(None, double_if_even(1)) self.assertEqual(None, double_if_even(1)) self.assertDictEqual({1: 2, 2: 1}, calls) double_if_even.set(10, 20) self.assertEqual(20, double_if_even(10)) self.assertDictEqual({1: 2, 2: 1}, calls) double_if_even.clear() self.assertEqual(4, double_if_even(2)) self.assertEqual(4, double_if_even(2)) self.assertEqual(None, double_if_even(1)) self.assertEqual(None, double_if_even(1)) self.assertEqual(20, double_if_even(10)) self.assertDictEqual({1: 4, 2: 2, 10: 1}, calls) def testMemoizeOne(self): self._testMemoizeOneBody(threadsafe=False) def testMemoizeOneThreadsafe(self): self._testMemoizeOneBody(threadsafe=True) def testOnce(self): testlist = [] # This works around a bug in pylint once = self.gc.once @once def add_to_list(): testlist.append('dog') add_to_list() add_to_list() add_to_list() add_to_list() self.assertEqual(testlist, ['dog']) def slow_square(i): """Helper for ScopedPoolTest. Must be global because non top-level functions aren't pickleable. """ return i**2 class ScopedPoolTest(GitCommonTestBase): CTRL_C = signal.CTRL_C_EVENT if sys.platform == 'win32' else signal.SIGINT def testThreads(self): result = [] with self.gc.ScopedPool(kind='threads') as pool: result = list(pool.imap(slow_square, range(10))) self.assertEqual([0, 1, 4, 9, 16, 25, 36, 49, 64, 81], result) def testThreadsCtrlC(self): result = [] with self.assertRaises(KeyboardInterrupt): with self.gc.ScopedPool(kind='threads') as pool: # Make sure this pool is interrupted in mid-swing for i in pool.imap(slow_square, range(20)): if i > 32: os.kill(os.getpid(), self.CTRL_C) result.append(i) self.assertEqual([0, 1, 4, 9, 16, 25], result) def testProcs(self): result = [] with self.gc.ScopedPool() as pool: result = list(pool.imap(slow_square, range(10))) self.assertEqual([0, 1, 4, 9, 16, 25, 36, 49, 64, 81], result) def testProcsCtrlC(self): result = [] with self.assertRaises(KeyboardInterrupt): with self.gc.ScopedPool() as pool: # Make sure this pool is interrupted in mid-swing for i in pool.imap(slow_square, range(20)): if i > 32: os.kill(os.getpid(), self.CTRL_C) result.append(i) self.assertEqual([0, 1, 4, 9, 16, 25], result) class ProgressPrinterTest(GitCommonTestBase): class FakeStream(object): def __init__(self): self.data = set() self.count = 0 def write(self, line): self.data.add(line) def flush(self): self.count += 1 def testBasic(self): """This test is probably racy, but I don't have a better alternative.""" fmt = '%(count)d/10' stream = self.FakeStream() pp = self.gc.ProgressPrinter(fmt, enabled=True, fout=stream, period=0.01) with pp as inc: for _ in range(10): time.sleep(0.02) inc() filtered = {x.strip() for x in stream.data} rslt = {fmt % {'count': i} for i in range(11)} self.assertSetEqual(filtered, rslt) self.assertGreaterEqual(stream.count, 10) class GitReadOnlyFunctionsTest(git_test_utils.GitRepoReadOnlyTestBase, GitCommonTestBase): REPO_SCHEMA = """ A B C D B E D """ COMMIT_A = { 'some/files/file1': { 'data': b'file1' }, 'some/files/file2': { 'data': b'file2' }, 'some/files/file3': { 'data': b'file3' }, 'some/other/file': { 'data': b'otherfile' }, } COMMIT_C = { 'some/files/file2': { 'mode': 0o755, 'data': b'file2 - vanilla\n' }, } COMMIT_E = { 'some/files/file2': { 'data': b'file2 - merged\n' }, } COMMIT_D = { 'some/files/file2': { 'data': b'file2 - vanilla\nfile2 - merged\n' }, } def testHashes(self): ret = self.repo.run( self.gc.hash_multi, *[ 'main', 'main~3', self.repo['E'] + '~', self.repo['D'] + '^2', 'tag_C^{}', ]) self.assertEqual([ self.repo['D'], self.repo['A'], self.repo['B'], self.repo['E'], self.repo['C'], ], ret) self.assertEqual(self.repo.run(self.gc.hash_one, 'branch_D'), self.repo['D']) self.assertTrue(self.repo['D'].startswith( self.repo.run(self.gc.hash_one, 'branch_D', short=True))) def testStream(self): items = set(self.repo.commit_map.values()) def testfn(): for line in self.gc.run_stream('log', '--format=%H').readlines(): line = line.strip().decode('utf-8') self.assertIn(line, items) items.remove(line) self.repo.run(testfn) def testStreamWithRetcode(self): items = set(self.repo.commit_map.values()) def testfn(): with self.gc.run_stream_with_retcode('log', '--format=%H') as stdout: for line in stdout.readlines(): line = line.strip().decode('utf-8') self.assertIn(line, items) items.remove(line) self.repo.run(testfn) def testStreamWithRetcodeException(self): import subprocess2 with self.assertRaises(subprocess2.CalledProcessError): with self.gc.run_stream_with_retcode('checkout', 'unknown-branch'): pass def testCurrentBranch(self): def cur_branch_out_of_git(): os.chdir('..') return self.gc.current_branch() self.assertIsNone(self.repo.run(cur_branch_out_of_git)) self.repo.git('checkout', 'branch_D') self.assertEqual(self.repo.run(self.gc.current_branch), 'branch_D') def testBranches(self): # This check fails with git 2.4 (see crbug.com/487172) self.assertEqual(self.repo.run(set, self.gc.branches()), {'main', 'branch_D', 'root_A'}) def testDiff(self): # Get the names of the blobs being compared (to avoid hard-coding). c_blob_short = self.repo.git('rev-parse', '--short', 'tag_C:some/files/file2').stdout.strip() d_blob_short = self.repo.git('rev-parse', '--short', 'tag_D:some/files/file2').stdout.strip() expected_output = [ 'diff --git a/some/files/file2 b/some/files/file2', 'index %s..%s 100755' % (c_blob_short, d_blob_short), '--- a/some/files/file2', '+++ b/some/files/file2', '@@ -1 +1,2 @@', ' file2 - vanilla', '+file2 - merged' ] self.assertEqual( expected_output, self.repo.run(self.gc.diff, 'tag_C', 'tag_D').split('\n')) def testDormant(self): self.assertFalse(self.repo.run(self.gc.is_dormant, 'main')) self.repo.git('config', 'branch.main.dormant', 'true') self.assertTrue(self.repo.run(self.gc.is_dormant, 'main')) def testBlame(self): def get_porcelain_for_commit(commit_name, lines): format_string = ( '%H {}\nauthor %an\nauthor-mail <%ae>\nauthor-time %at\n' 'author-tz +0000\ncommitter %cn\ncommitter-mail <%ce>\n' 'committer-time %ct\ncommitter-tz +0000\nsummary {}') format_string = format_string.format(lines, commit_name) info = self.repo.show_commit(commit_name, format_string=format_string) return info.split('\n') # Expect to blame line 1 on C, line 2 on E. ABBREV_LEN = 7 c_short = self.repo['C'][:1 + ABBREV_LEN] c_author = self.repo.show_commit('C', format_string='%an %ai') e_short = self.repo['E'][:1 + ABBREV_LEN] e_author = self.repo.show_commit('E', format_string='%an %ai') expected_output = [ '%s (%s 1) file2 - vanilla' % (c_short, c_author), '%s (%s 2) file2 - merged' % (e_short, e_author) ] self.assertEqual( expected_output, self.repo.run(self.gc.blame, 'some/files/file2', 'tag_D', abbrev=ABBREV_LEN).split('\n')) # Test porcelain. expected_output = [] expected_output.extend(get_porcelain_for_commit('C', '1 1 1')) expected_output.append('previous %s some/files/file2' % self.repo['B']) expected_output.append('filename some/files/file2') expected_output.append('\tfile2 - vanilla') expected_output.extend(get_porcelain_for_commit('E', '1 2 1')) expected_output.append('previous %s some/files/file2' % self.repo['B']) expected_output.append('filename some/files/file2') expected_output.append('\tfile2 - merged') self.assertEqual( expected_output, self.repo.run(self.gc.blame, 'some/files/file2', 'tag_D', porcelain=True).split('\n')) def testParseCommitrefs(self): ret = self.repo.run( self.gc.parse_commitrefs, *[ 'main', 'main~3', self.repo['E'] + '~', self.repo['D'] + '^2', 'tag_C^{}', ]) hashes = [ self.repo['D'], self.repo['A'], self.repo['B'], self.repo['E'], self.repo['C'], ] self.assertEqual(ret, [binascii.unhexlify(h) for h in hashes]) expected_re = r"one of \(u?'main', u?'bananas'\)" with self.assertRaisesRegexp(Exception, expected_re): self.repo.run(self.gc.parse_commitrefs, 'main', 'bananas') def testRepoRoot(self): def cd_and_repo_root(path): os.chdir(path) return self.gc.repo_root() self.assertEqual(self.repo.repo_path, self.repo.run(self.gc.repo_root)) # cd to a subdirectory; repo_root should still return the root dir. self.assertEqual(self.repo.repo_path, self.repo.run(cd_and_repo_root, 'some/files')) def testTags(self): self.assertEqual(set(self.repo.run(self.gc.tags)), {'tag_' + l for l in 'ABCDE'}) def testTree(self): tree = self.repo.run(self.gc.tree, 'main:some/files') file1 = self.COMMIT_A['some/files/file1']['data'] file2 = self.COMMIT_D['some/files/file2']['data'] file3 = self.COMMIT_A['some/files/file3']['data'] self.assertEqual( tree['file1'], ('100644', 'blob', git_test_utils.git_hash_data(file1))) self.assertEqual( tree['file2'], ('100755', 'blob', git_test_utils.git_hash_data(file2))) self.assertEqual( tree['file3'], ('100644', 'blob', git_test_utils.git_hash_data(file3))) tree = self.repo.run(self.gc.tree, 'main:some') self.assertEqual(len(tree), 2) # Don't check the tree hash because we're lazy :) self.assertEqual(tree['files'][:2], ('040000', 'tree')) tree = self.repo.run(self.gc.tree, 'main:wat') self.assertEqual(tree, None) def testTreeRecursive(self): tree = self.repo.run(self.gc.tree, 'main:some', recurse=True) file1 = self.COMMIT_A['some/files/file1']['data'] file2 = self.COMMIT_D['some/files/file2']['data'] file3 = self.COMMIT_A['some/files/file3']['data'] other = self.COMMIT_A['some/other/file']['data'] self.assertEqual( tree['files/file1'], ('100644', 'blob', git_test_utils.git_hash_data(file1))) self.assertEqual( tree['files/file2'], ('100755', 'blob', git_test_utils.git_hash_data(file2))) self.assertEqual( tree['files/file3'], ('100644', 'blob', git_test_utils.git_hash_data(file3))) self.assertEqual( tree['other/file'], ('100644', 'blob', git_test_utils.git_hash_data(other))) class GitMutableFunctionsTest(git_test_utils.GitRepoReadWriteTestBase, GitCommonTestBase): REPO_SCHEMA = '' def _intern_data(self, data): with tempfile.TemporaryFile('wb') as f: f.write(data.encode('utf-8')) f.seek(0) return self.repo.run(self.gc.intern_f, f) def testInternF(self): data = 'CoolBobcatsBro' data_hash = self._intern_data(data) self.assertEqual(git_test_utils.git_hash_data(data.encode()), data_hash) self.assertEqual(data, self.repo.git('cat-file', 'blob', data_hash).stdout) def testMkTree(self): tree = {} for i in 1, 2, 3: name = '✔ file%d' % i tree[name] = ('100644', 'blob', self._intern_data(name)) tree_hash = self.repo.run(self.gc.mktree, tree) self.assertEqual('b524c02ba0e1cf482f8eb08c3d63e97b8895c89c', tree_hash) def testConfig(self): self.repo.git('config', '--add', 'happy.derpies', 'food') self.assertEqual( self.repo.run(self.gc.get_config_list, 'happy.derpies'), ['food']) self.assertEqual(self.repo.run(self.gc.get_config_list, 'sad.derpies'), []) self.repo.git('config', '--add', 'happy.derpies', 'cat') self.assertEqual( self.repo.run(self.gc.get_config_list, 'happy.derpies'), ['food', 'cat']) self.assertEqual('cat', self.repo.run(self.gc.get_config, 'dude.bob', 'cat')) self.repo.run(self.gc.set_config, 'dude.bob', 'dog') self.assertEqual('dog', self.repo.run(self.gc.get_config, 'dude.bob', 'cat')) self.repo.run(self.gc.del_config, 'dude.bob') # This should work without raising an exception self.repo.run(self.gc.del_config, 'dude.bob') self.assertEqual('cat', self.repo.run(self.gc.get_config, 'dude.bob', 'cat')) self.assertEqual('origin/main', self.repo.run(self.gc.root)) self.repo.git('config', 'depot-tools.upstream', 'catfood') self.assertEqual('catfood', self.repo.run(self.gc.root)) self.repo.git('config', '--add', 'core.fsmonitor', 'true') self.assertEqual(True, self.repo.run(self.gc.is_fsmonitor_enabled)) self.repo.git('config', '--add', 'core.fsmonitor', 't') self.assertEqual(False, self.repo.run(self.gc.is_fsmonitor_enabled)) self.repo.git('config', '--add', 'core.fsmonitor', 'false') self.assertEqual(False, self.repo.run(self.gc.is_fsmonitor_enabled)) def testRoot(self): origin_schema = git_test_utils.GitRepoSchema( """ A B C B D """, self.getRepoContent) origin = origin_schema.reify() # Set the default branch to branch_D instead of main. origin.git('checkout', 'branch_D') self.repo.git('remote', 'add', 'origin', origin.repo_path) self.repo.git('fetch', 'origin') self.repo.git('remote', 'set-head', 'origin', '-a') self.assertEqual('origin/branch_D', self.repo.run(self.gc.root)) def testUpstream(self): self.repo.git('commit', '--allow-empty', '-am', 'foooooo') self.assertEqual(self.repo.run(self.gc.upstream, 'bobly'), None) self.assertEqual(self.repo.run(self.gc.upstream, 'main'), None) self.repo.git('checkout', '-t', '-b', 'happybranch', 'main') self.assertEqual(self.repo.run(self.gc.upstream, 'happybranch'), 'main') def testNormalizedVersion(self): self.assertTrue( all( isinstance(x, int) for x in self.repo.run(self.gc.get_git_version))) def testGetBranchesInfo(self): self.repo.git('commit', '--allow-empty', '-am', 'foooooo') self.repo.git('checkout', '-t', '-b', 'happybranch', 'main') self.repo.git('commit', '--allow-empty', '-am', 'foooooo') self.repo.git('checkout', '-t', '-b', 'child', 'happybranch') self.repo.git('checkout', '-t', '-b', 'to_delete', 'main') self.repo.git('checkout', '-t', '-b', 'parent_gone', 'to_delete') self.repo.git('branch', '-D', 'to_delete') supports_track = (self.repo.run(self.gc.get_git_version) >= self.gc.MIN_UPSTREAM_TRACK_GIT_VERSION) actual = self.repo.run(self.gc.get_branches_info, supports_track) expected = { 'happybranch': (self.repo.run(self.gc.hash_one, 'happybranch', short=True), 'main', 1 if supports_track else None, None), 'child': (self.repo.run(self.gc.hash_one, 'child', short=True), 'happybranch', None, None), 'main': (self.repo.run(self.gc.hash_one, 'main', short=True), '', None, None), '': None, 'parent_gone': (self.repo.run(self.gc.hash_one, 'parent_gone', short=True), 'to_delete', None, None), 'to_delete': None } self.assertEqual(expected, actual) def testGetBranchesInfoWithReset(self): self.repo.git('commit', '--allow-empty', '-am', 'foooooo') self.repo.git('checkout', '-t', '-b', 'foobarA', 'main') self.repo.git('config', 'branch.foobarA.base', self.repo.run(self.gc.hash_one, 'main')) self.repo.git('config', 'branch.foobarA.base-upstream', 'main') with self.repo.open('foobar1', 'w') as f: f.write('hello') self.repo.git('add', 'foobar1') self.repo.git_commit('commit1') with self.repo.open('foobar2', 'w') as f: f.write('goodbye') self.repo.git('add', 'foobar2') self.repo.git_commit('commit2') self.repo.git('checkout', '-t', '-b', 'foobarB', 'foobarA') self.repo.git('config', 'branch.foobarB.base', self.repo.run(self.gc.hash_one, 'foobarA')) self.repo.git('config', 'branch.foobarB.base-upstream', 'foobarA') self.repo.git('checkout', 'foobarA') self.repo.git('reset', '--hard', 'HEAD~') with self.repo.open('foobar', 'w') as f: f.write('world') self.repo.git('add', 'foobar') self.repo.git_commit('commit1.2') actual = self.repo.run(self.gc.get_branches_info, True) expected = { 'foobarA': (self.repo.run(self.gc.hash_one, 'foobarA', short=True), 'main', 2, None), 'foobarB': (self.repo.run(self.gc.hash_one, 'foobarB', short=True), 'foobarA', None, 1), 'main': (self.repo.run(self.gc.hash_one, 'main', short=True), '', None, None), '': None } self.assertEqual(expected, actual) class GitMutableStructuredTest(git_test_utils.GitRepoReadWriteTestBase, GitCommonTestBase): REPO_SCHEMA = """ A B C D E F G B H I J K J L X Y Z CAT DOG """ COMMIT_B = {'file': {'data': b'B'}} COMMIT_H = {'file': {'data': b'H'}} COMMIT_I = {'file': {'data': b'I'}} COMMIT_J = {'file': {'data': b'J'}} COMMIT_K = {'file': {'data': b'K'}} COMMIT_L = {'file': {'data': b'L'}} def setUp(self): super(GitMutableStructuredTest, self).setUp() self.repo.git('branch', '--set-upstream-to', 'root_X', 'branch_Z') self.repo.git('branch', '--set-upstream-to', 'branch_G', 'branch_K') self.repo.git('branch', '--set-upstream-to', 'branch_K', 'branch_L') self.repo.git('branch', '--set-upstream-to', 'root_A', 'branch_G') self.repo.git('branch', '--set-upstream-to', 'root_X', 'root_A') def testTooManyBranches(self): for i in range(30): self.repo.git('branch', 'a' * i) _, rslt = self.repo.capture_stdio(list, self.gc.branches()) self.assertIn('too many branches (39/20)', rslt) self.repo.git('config', 'depot-tools.branch-limit', 'cat') _, rslt = self.repo.capture_stdio(list, self.gc.branches()) self.assertIn('too many branches (39/20)', rslt) self.repo.git('config', 'depot-tools.branch-limit', '100') # should not raise # This check fails with git 2.4 (see crbug.com/487172) self.assertEqual(38, len(self.repo.run(list, self.gc.branches()))) def testMergeBase(self): self.repo.git('checkout', 'branch_K') self.assertEqual( self.repo['B'], self.repo.run(self.gc.get_or_create_merge_base, 'branch_K', 'branch_G')) self.assertEqual( self.repo['J'], self.repo.run(self.gc.get_or_create_merge_base, 'branch_L', 'branch_K')) self.assertEqual( self.repo['B'], self.repo.run(self.gc.get_config, 'branch.branch_K.base')) self.assertEqual( 'branch_G', self.repo.run(self.gc.get_config, 'branch.branch_K.base-upstream')) # deadbeef is a bad hash, so this will result in repo['B'] self.repo.run(self.gc.manual_merge_base, 'branch_K', 'deadbeef', 'branch_G') self.assertEqual( self.repo['B'], self.repo.run(self.gc.get_or_create_merge_base, 'branch_K', 'branch_G')) # but if we pick a real ancestor, then it'll work self.repo.run(self.gc.manual_merge_base, 'branch_K', self.repo['I'], 'branch_G') self.assertEqual( self.repo['I'], self.repo.run(self.gc.get_or_create_merge_base, 'branch_K', 'branch_G')) self.assertEqual( { 'branch_K': self.repo['I'], 'branch_L': self.repo['J'] }, self.repo.run(self.gc.branch_config_map, 'base')) self.repo.run(self.gc.remove_merge_base, 'branch_K') self.repo.run(self.gc.remove_merge_base, 'branch_L') self.assertEqual( None, self.repo.run(self.gc.get_config, 'branch.branch_K.base')) self.assertEqual({}, self.repo.run(self.gc.branch_config_map, 'base')) # if it's too old, then it caps at merge-base self.repo.run(self.gc.manual_merge_base, 'branch_K', self.repo['A'], 'branch_G') self.assertEqual( self.repo['B'], self.repo.run(self.gc.get_or_create_merge_base, 'branch_K', 'branch_G')) # If the user does --set-upstream-to something else, then we discard the # base and recompute it. self.repo.run(self.gc.run, 'branch', '-u', 'root_A') self.assertEqual( self.repo['A'], self.repo.run(self.gc.get_or_create_merge_base, 'branch_K')) self.assertIsNone( self.repo.run(self.gc.get_or_create_merge_base, 'branch_DOG')) def testGetBranchTree(self): skipped, tree = self.repo.run(self.gc.get_branch_tree) # This check fails with git 2.4 (see crbug.com/487172) self.assertEqual(skipped, {'main', 'root_X', 'branch_DOG', 'root_CAT'}) self.assertEqual( tree, { 'branch_G': 'root_A', 'root_A': 'root_X', 'branch_K': 'branch_G', 'branch_L': 'branch_K', 'branch_Z': 'root_X' }) topdown = list(self.gc.topo_iter(tree)) bottomup = list(self.gc.topo_iter(tree, top_down=False)) self.assertEqual(topdown, [ ('branch_Z', 'root_X'), ('root_A', 'root_X'), ('branch_G', 'root_A'), ('branch_K', 'branch_G'), ('branch_L', 'branch_K'), ]) self.assertEqual(bottomup, [ ('branch_L', 'branch_K'), ('branch_Z', 'root_X'), ('branch_K', 'branch_G'), ('branch_G', 'root_A'), ('root_A', 'root_X'), ]) def testIsGitTreeDirty(self): retval = [] self.repo.capture_stdio(lambda: retval.append( self.repo.run(self.gc.is_dirty_git_tree, 'foo'))) self.assertEqual(False, retval[0]) self.repo.open('test.file', 'w').write('test data') self.repo.git('add', 'test.file') retval = [] self.repo.capture_stdio(lambda: retval.append( self.repo.run(self.gc.is_dirty_git_tree, 'foo'))) self.assertEqual(True, retval[0]) def testSquashBranch(self): self.repo.git('checkout', 'branch_K') self.assertEqual( True, self.repo.run(self.gc.squash_current_branch, '✔ cool message')) lines = ['✔ cool message', ''] for l in 'HIJK': lines.extend((self.repo[l], l, '')) lines.pop() msg = '\n'.join(lines) self.assertEqual( self.repo.run(self.gc.run, 'log', '-n1', '--format=%B'), msg) self.assertEqual( self.repo.git('cat-file', 'blob', 'branch_K:file').stdout, 'K') def testSquashBranchDefaultMessage(self): self.repo.git('checkout', 'branch_K') self.assertEqual(True, self.repo.run(self.gc.squash_current_branch)) self.assertEqual( self.repo.run(self.gc.run, 'log', '-n1', '--format=%s'), 'git squash commit for branch_K.') def testSquashBranchEmpty(self): self.repo.git('checkout', 'branch_K') self.repo.git('checkout', 'branch_G', '.') self.repo.git('commit', '-m', 'revert all changes no branch') # Should return False since the quash would result in an empty commit stdout = self.repo.capture_stdio(self.gc.squash_current_branch)[0] self.assertEqual(stdout, 'Nothing to commit; squashed branch is empty\n') def testRebase(self): self.assertSchema(""" A B C D E F G B H I J K J L X Y Z CAT DOG """) rslt = self.repo.run(self.gc.rebase, 'branch_G', 'branch_K~4', 'branch_K') self.assertTrue(rslt.success) self.assertSchema(""" A B C D E F G H I J K B H I J L X Y Z CAT DOG """) rslt = self.repo.run(self.gc.rebase, 'branch_K', 'branch_L~1', 'branch_L', abort=True) self.assertFalse(rslt.success) self.assertFalse(self.repo.run(self.gc.in_rebase)) rslt = self.repo.run(self.gc.rebase, 'branch_K', 'branch_L~1', 'branch_L', abort=False) self.assertFalse(rslt.success) self.assertTrue(self.repo.run(self.gc.in_rebase)) self.assertEqual( self.repo.git('status', '--porcelain').stdout, 'UU file\n') self.repo.git('checkout', '--theirs', 'file') self.repo.git('add', 'file') self.repo.git('rebase', '--continue') self.assertSchema(""" A B C D E F G H I J K L X Y Z CAT DOG """) def testStatus(self): def inner(): dictified_status = lambda: { k: dict(v._asdict()) # pylint: disable=protected-access for k, v in self.repo.run(self.gc.status) } self.repo.git('mv', 'file', 'cat') with open('COOL', 'w') as f: f.write('Super cool file!') self.assertDictEqual( dictified_status(), { 'cat': { 'lstat': 'R', 'rstat': ' ', 'src': 'file' }, 'COOL': { 'lstat': '?', 'rstat': '?', 'src': 'COOL' } }) self.repo.run(inner) class GitFreezeThaw(git_test_utils.GitRepoReadWriteTestBase): @classmethod def setUpClass(cls): super(GitFreezeThaw, cls).setUpClass() import git_common cls.gc = git_common cls.gc.TEST_MODE = True REPO_SCHEMA = """ A B C D B E D """ COMMIT_A = { 'some/files/file1': { 'data': b'file1' }, 'some/files/file2': { 'data': b'file2' }, 'some/files/file3': { 'data': b'file3' }, 'some/other/file': { 'data': b'otherfile' }, } COMMIT_C = { 'some/files/file2': { 'mode': 0o755, 'data': b'file2 - vanilla' }, } COMMIT_E = { 'some/files/file2': { 'data': b'file2 - merged' }, } COMMIT_D = { 'some/files/file2': { 'data': b'file2 - vanilla\nfile2 - merged' }, } def testNothing(self): self.assertIsNotNone(self.repo.run(self.gc.thaw)) # 'Nothing to thaw' self.assertIsNotNone(self.repo.run( self.gc.freeze)) # 'Nothing to freeze' def testAll(self): def inner(): with open('some/files/file2', 'a') as f2: print('cool appended line', file=f2) with open('some/files/file3', 'w') as f3: print('hello', file=f3) self.repo.git('add', 'some/files/file3') with open('some/files/file3', 'a') as f3: print('world', file=f3) os.mkdir('some/other_files') with open('some/other_files/subdir_file', 'w') as f3: print('new file!', file=f3) with open('some/files/file5', 'w') as f5: print('New file!1!one!', file=f5) with open('some/files/file6', 'w') as f6: print('hello', file=f6) self.repo.git('add', 'some/files/file6') with open('some/files/file6', 'w') as f6: print('world', file=f6) with open('some/files/file7', 'w') as f7: print('hello', file=f7) self.repo.git('add', 'some/files/file7') os.remove('some/files/file7') STATUS_1 = '\n'.join( (' M some/files/file2', 'MM some/files/file3', 'A some/files/file5', 'AM some/files/file6', 'AD some/files/file7', '?? some/other_files/')) + '\n' self.repo.git('add', 'some/files/file5') # Freeze group 1 self.assertEqual( self.repo.git('status', '--porcelain').stdout, STATUS_1) self.assertIsNone(self.gc.freeze()) self.assertEqual(self.repo.git('status', '--porcelain').stdout, '') # Freeze group 2 with open('some/files/file2', 'a') as f2: print('new! appended line!', file=f2) self.assertEqual( self.repo.git('status', '--porcelain').stdout, ' M some/files/file2\n') self.assertIsNone(self.gc.freeze()) self.assertEqual(self.repo.git('status', '--porcelain').stdout, '') # Thaw it out! self.assertIsNone(self.gc.thaw()) self.assertIsNotNone( self.gc.thaw()) # One thaw should thaw everything self.assertEqual( self.repo.git('status', '--porcelain').stdout, STATUS_1) self.repo.run(inner) def testTooBig(self): def inner(): self.repo.git('config', 'depot-tools.freeze-size-limit', '1') with open('bigfile', 'w') as f: chunk = 'NERDFACE' * 1024 for _ in range(128 * 2 + 1): # Just over 2 mb f.write(chunk) _, err = self.repo.capture_stdio(self.gc.freeze) self.assertIn('too much untracked+unignored', err) self.repo.run(inner) def testTooBigMultipleFiles(self): def inner(): self.repo.git('config', 'depot-tools.freeze-size-limit', '1') for i in range(3): with open('file%d' % i, 'w') as f: chunk = 'NERDFACE' * 1024 for _ in range(50): # About 400k f.write(chunk) _, err = self.repo.capture_stdio(self.gc.freeze) self.assertIn('too much untracked+unignored', err) self.repo.run(inner) def testMerge(self): def inner(): self.repo.git('checkout', '-b', 'bad_merge_branch') with open('bad_merge', 'w') as f: f.write('bad_merge_left') self.repo.git('add', 'bad_merge') self.repo.git('commit', '-m', 'bad_merge') self.repo.git('checkout', 'branch_D') with open('bad_merge', 'w') as f: f.write('bad_merge_right') self.repo.git('add', 'bad_merge') self.repo.git('commit', '-m', 'bad_merge_d') self.repo.git('merge', 'bad_merge_branch') _, err = self.repo.capture_stdio(self.gc.freeze) self.assertIn('Cannot freeze unmerged changes', err) self.repo.run(inner) def testAddError(self): def inner(): self.repo.git('checkout', '-b', 'unreadable_file_branch') with open('bad_file', 'w') as f: f.write('some text') os.chmod('bad_file', 0o0111) ret = self.repo.run(self.gc.freeze) self.assertIn('Failed to index some unindexed files.', ret) self.repo.run(inner) class GitMakeWorkdir(git_test_utils.GitRepoReadOnlyTestBase, GitCommonTestBase): def setUp(self): self._tempdir = tempfile.mkdtemp() def tearDown(self): shutil.rmtree(self._tempdir) REPO_SCHEMA = """ A """ @unittest.skipIf(not hasattr(os, 'symlink'), "OS doesn't support symlink") def testMakeWorkdir(self): workdir = os.path.join(self._tempdir, 'workdir') self.gc.make_workdir(os.path.join(self.repo.repo_path, '.git'), os.path.join(workdir, '.git')) EXPECTED_LINKS = [ 'config', 'info', 'hooks', 'logs/refs', 'objects', 'refs', ] for path in EXPECTED_LINKS: self.assertTrue(os.path.islink(os.path.join(workdir, '.git', path))) self.assertEqual( os.path.realpath(os.path.join(workdir, '.git', path)), os.path.join(self.repo.repo_path, '.git', path)) self.assertFalse(os.path.islink(os.path.join(workdir, '.git', 'HEAD'))) class GitTestUtilsTest(git_test_utils.GitRepoReadOnlyTestBase): REPO_SCHEMA = """ A B C """ COMMIT_A = { 'file1': { 'data': b'file1' }, } COMMIT_B = { 'file1': { 'data': b'file1 changed' }, } # Test special keys (custom commit data). COMMIT_C = { GitRepo.AUTHOR_NAME: 'Custom Author', GitRepo.AUTHOR_EMAIL: 'author@example.com', GitRepo.AUTHOR_DATE: datetime.datetime(1980, 9, 8, 7, 6, 5, tzinfo=git_test_utils.UTC), GitRepo.COMMITTER_NAME: 'Custom Committer', GitRepo.COMMITTER_EMAIL: 'committer@example.com', GitRepo.COMMITTER_DATE: datetime.datetime(1990, 4, 5, 6, 7, 8, tzinfo=git_test_utils.UTC), 'file1': { 'data': b'file1 changed again' }, } def testAutomaticCommitDates(self): # The dates should start from 1970-01-01 and automatically increment. # They must be in UTC (otherwise the tests are system-dependent, and if # your local timezone is positive, timestamps will be <0 which causes # bizarre behaviour in Git; http://crbug.com/581895). self.assertEqual('Author McAuthorly 1970-01-01 00:00:00 +0000', self.repo.show_commit('A', format_string='%an %ai')) self.assertEqual('Charles Committish 1970-01-02 00:00:00 +0000', self.repo.show_commit('A', format_string='%cn %ci')) self.assertEqual('Author McAuthorly 1970-01-03 00:00:00 +0000', self.repo.show_commit('B', format_string='%an %ai')) self.assertEqual('Charles Committish 1970-01-04 00:00:00 +0000', self.repo.show_commit('B', format_string='%cn %ci')) def testCustomCommitData(self): self.assertEqual( 'Custom Author author@example.com ' '1980-09-08 07:06:05 +0000', self.repo.show_commit('C', format_string='%an %ae %ai')) self.assertEqual( 'Custom Committer committer@example.com ' '1990-04-05 06:07:08 +0000', self.repo.show_commit('C', format_string='%cn %ce %ci')) if __name__ == '__main__': sys.exit( coverage_utils.covered_main( os.path.join(DEPOT_TOOLS_ROOT, 'git_common.py')))