#!/usr/bin/python # Copyright (c) 2010 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Generate fake repositories for testing.""" import atexit import logging import os import pprint import re import shutil import subprocess import sys import unittest ## Utility functions def addKill(): """Add kill() method to subprocess.Popen for python <2.6""" if getattr(subprocess.Popen, 'kill', None): return if sys.platform.startswith('win'): def kill_win(process): import win32process return win32process.TerminateProcess(process._handle, -1) subprocess.kill = kill_win else: def kill_nix(process): import signal return os.kill(process.pid, signal.SIGKILL) subprocess.kill = kill_nix def rmtree(path): """Delete a directory.""" if os.path.exists(path): shutil.rmtree(path) def write(path, content): f = open(path, 'wb') f.write(content) f.close() join = os.path.join def check_call(*args, **kwargs): logging.debug(args[0]) subprocess.check_call(*args, **kwargs) def Popen(*args, **kwargs): kwargs.setdefault('stdout', subprocess.PIPE) kwargs.setdefault('stderr', subprocess.STDOUT) logging.debug(args[0]) return subprocess.Popen(*args, **kwargs) def read_tree(tree_root): """Returns a dict of all the files in a tree. Defaults to self.root_dir.""" tree = {} for root, dirs, files in os.walk(tree_root): for d in filter(lambda x: x.startswith('.'), dirs): dirs.remove(d) for f in [join(root, f) for f in files if not f.startswith('.')]: tree[f[len(tree_root) + 1:]] = open(join(root, f), 'rb').read() return tree def dict_diff(dict1, dict2): diff = {} for k, v in dict1.iteritems(): if k not in dict2: diff[k] = v elif v != dict2[k]: diff[k] = (v, dict2[k]) for k, v in dict2.iteritems(): if k not in dict1: diff[k] = v return diff def mangle_svn_tree(*args): result = {} for old_root, new_root, tree in args: for k, v in tree.iteritems(): if not k.startswith(old_root): continue result[join(new_root, k[len(old_root) + 1:])] = v return result def mangle_git_tree(*args): result = {} for new_root, tree in args: for k, v in tree.iteritems(): result[join(new_root, k)] = v return result def commit_svn(repo): """Commits the changes and returns the new revision number.""" # Basic parsing. to_add = [] to_remove = [] for item in Popen(['svn', 'status'], cwd=repo).communicate()[0].splitlines(False): if item[0] == '?': to_add.append(item[8:]) elif item[0] == '!': to_remove.append(item[8:]) if to_add: check_call(['svn', 'add', '--no-auto-props', '-q'] + to_add, cwd=repo) if to_remove: check_call(['svn', 'remove', '-q'] + to_remove, cwd=repo) out = Popen(['svn', 'commit', repo, '-m', 'foo', '--non-interactive', '--no-auth-cache', '--username', 'user1', '--password', 'foo'], cwd=repo).communicate()[0] rev = re.search(r'revision (\d+).', out).group(1) st = Popen(['svn', 'status'], cwd=repo).communicate()[0] assert len(st) == 0, st logging.debug('At revision %s' % rev) return rev def commit_git(repo): """Commits the changes and returns the new hash.""" check_call(['git', 'add', '-A', '-f'], cwd=repo) check_call(['git', 'commit', '-q', '--message', 'foo'], cwd=repo) rev = Popen(['git', 'show-ref', '--head', 'HEAD'], cwd=repo).communicate()[0].split(' ', 1)[0] logging.debug('At revision %s' % rev) return rev _FAKE_LOADED = False class FakeRepos(object): """Generate both svn and git repositories to test gclient functionality. Many DEPS functionalities need to be tested: Var, File, From, deps_os, hooks, use_relative_paths. And types of dependencies: Relative urls, Full urls, both svn and git.""" # Should leak the repositories. SHOULD_LEAK = False # Override if unhappy. TRIAL_DIR = None # Hostname HOST = '127.0.0.1' def __init__(self, trial_dir=None, leak=None, host=None): global _FAKE_LOADED if _FAKE_LOADED: raise Exception('You can only start one FakeRepos at a time.') _FAKE_LOADED = True # Quick hack. if '-v' in sys.argv: logging.basicConfig(level=logging.DEBUG) if '-l' in sys.argv: self.SHOULD_LEAK = True sys.argv.remove('-l') elif leak is not None: self.SHOULD_LEAK = leak if host: self.HOST = host if trial_dir: self.TRIAL_DIR = trial_dir # Format is [ None, tree, tree, ...] self.svn_revs = [None] # Format is { repo: [ (hash, tree), (hash, tree), ... ], ... } self.git_hashes = {} self.svnserve = None self.gitdaemon = None self.common_init = False def trial_dir(self): if not self.TRIAL_DIR: self.TRIAL_DIR = os.path.join( os.path.dirname(os.path.abspath(__file__)), '_trial') return self.TRIAL_DIR def setUp(self): """All late initialization comes here. Note that it deletes all trial_dir() and not only repos_dir.""" if not self.common_init: self.common_init = True self.repos_dir = os.path.join(self.trial_dir(), 'repos') self.git_root = join(self.repos_dir, 'git') self.svn_root = join(self.repos_dir, 'svn_checkout') addKill() rmtree(self.trial_dir()) os.makedirs(self.repos_dir) atexit.register(self.tearDown) def tearDown(self): if self.svnserve: logging.debug('Killing svnserve pid %s' % self.svnserve.pid) self.svnserve.kill() self.svnserve = None if self.gitdaemon: logging.debug('Killing git-daemon pid %s' % self.gitdaemon.pid) self.gitdaemon.kill() self.gitdaemon = None if not self.SHOULD_LEAK: logging.debug('Removing %s' % self.trial_dir()) rmtree(self.trial_dir()) def _genTree(self, root, tree_dict): """For a dictionary of file contents, generate a filesystem.""" if not os.path.isdir(root): os.makedirs(root) for (k, v) in tree_dict.iteritems(): k_os = k.replace('/', os.sep) k_arr = k_os.split(os.sep) if len(k_arr) > 1: p = os.sep.join([root] + k_arr[:-1]) if not os.path.isdir(p): os.makedirs(p) if v is None: os.remove(join(root, k)) else: write(join(root, k), v) def setUpSVN(self): """Creates subversion repositories and start the servers.""" if self.svnserve: return self.setUp() root = join(self.repos_dir, 'svn') check_call(['svnadmin', 'create', root]) write(join(root, 'conf', 'svnserve.conf'), '[general]\n' 'anon-access = read\n' 'auth-access = write\n' 'password-db = passwd\n') write(join(root, 'conf', 'passwd'), '[users]\n' 'user1 = foo\n' 'user2 = bar\n') # Start the daemon. cmd = ['svnserve', '-d', '--foreground', '-r', self.repos_dir] if self.HOST == '127.0.0.1': cmd.append('--listen-host=127.0.0.1') logging.debug(cmd) self.svnserve = Popen(cmd, cwd=root) self.populateSvn() def populateSvn(self): """Creates a few revisions of changes including DEPS files.""" # Repos check_call(['svn', 'checkout', 'svn://127.0.0.1/svn', self.svn_root, '-q', '--non-interactive', '--no-auth-cache', '--username', 'user1', '--password', 'foo']) assert os.path.isdir(join(self.svn_root, '.svn')) def file_system(rev, DEPS): fs = { 'origin': 'svn@%(rev)d\n', 'trunk/origin': 'svn/trunk@%(rev)d\n', 'trunk/src/origin': 'svn/trunk/src@%(rev)d\n', 'trunk/src/third_party/origin': 'svn/trunk/src/third_party@%(rev)d\n', 'trunk/other/origin': 'src/trunk/other@%(rev)d\n', 'trunk/third_party/origin': 'svn/trunk/third_party@%(rev)d\n', 'trunk/third_party/foo/origin': 'svn/trunk/third_party/foo@%(rev)d\n', 'trunk/third_party/prout/origin': 'svn/trunk/third_party/foo@%(rev)d\n', } for k in fs.iterkeys(): fs[k] = fs[k] % { 'rev': rev } fs['trunk/src/DEPS'] = DEPS return fs # Testing: # - dependency disapear # - dependency renamed # - versioned and unversioned reference # - relative and full reference # - deps_os # - var # - hooks # TODO(maruel): # - File # - $matching_files # - use_relative_paths self._commit_svn(file_system(1, """ vars = { 'DummyVariable': 'third_party', } deps = { 'src/other': 'svn://%(host)s/svn/trunk/other', 'src/third_party/fpp': '/trunk/' + Var('DummyVariable') + '/foo', } deps_os = { 'mac': { 'src/third_party/prout': '/trunk/third_party/prout', }, }""" % { 'host': self.HOST })) self._commit_svn(file_system(2, """ deps = { 'src/other': 'svn://%(host)s/svn/trunk/other', 'src/third_party/foo': '/trunk/third_party/foo@1', } # I think this is wrong to have the hooks run from the base of the gclient # checkout. It's maybe a bit too late to change that behavior. hooks = [ { 'pattern': '.', 'action': ['python', '-c', 'open(\\'src/svn_hooked1\\', \\'w\\').write(\\'svn_hooked1\\')'], }, { # Should not be run. 'pattern': 'nonexistent', 'action': ['python', '-c', 'open(\\'src/svn_hooked2\\', \\'w\\').write(\\'svn_hooked2\\')'], }, ] """ % { 'host': self.HOST })) def setUpGIT(self): """Creates git repositories and start the servers.""" if self.gitdaemon: return self.setUp() for repo in ['repo_%d' % r for r in range(1, 5)]: check_call(['git', 'init', '-q', join(self.git_root, repo)]) self.git_hashes[repo] = [] # Testing: # - dependency disapear # - dependency renamed # - versioned and unversioned reference # - relative and full reference # - deps_os # - var # - hooks # TODO(maruel): # - File # - $matching_files # - use_relative_paths self._commit_git('repo_1', { 'DEPS': """ vars = { 'DummyVariable': 'repo', } deps = { 'src/repo2': 'git://%(host)s/git/repo_2', 'src/repo2/repo3': '/' + Var('DummyVariable') + '_3', } deps_os = { 'mac': { 'src/repo4': '/repo_4', }, }""" % { 'host': self.HOST }, 'origin': 'git/repo_1@1\n', }) self._commit_git('repo_2', { 'origin': "git/repo_2@1\n" }) self._commit_git('repo_2', { 'origin': "git/repo_2@2\n" }) self._commit_git('repo_3', { 'origin': "git/repo_3@1\n" }) self._commit_git('repo_3', { 'origin': "git/repo_3@2\n" }) self._commit_git('repo_4', { 'origin': "git/repo_4@1\n" }) self._commit_git('repo_4', { 'origin': "git/repo_4@2\n" }) self._commit_git('repo_1', { 'DEPS': """ deps = { 'src/repo2': 'git://%(host)s/git/repo_2@%(hash)s', 'src/repo2/repo_renamed': '/repo_3', } # I think this is wrong to have the hooks run from the base of the gclient # checkout. It's maybe a bit too late to change that behavior. hooks = [ { 'pattern': '.', 'action': ['python', '-c', 'open(\\'src/git_hooked1\\', \\'w\\').write(\\'git_hooked1\\')'], }, { # Should not be run. 'pattern': 'nonexistent', 'action': ['python', '-c', 'open(\\'src/git_hooked2\\', \\'w\\').write(\\'git_hooked2\\')'], }, ] """ % { # TODO(maruel): http://crosbug.com/3591 We need to strip the hash.. duh. 'host': self.HOST, 'hash': self.git_hashes['repo_2'][0][0][:7] }, 'origin': "git/repo_1@2\n" }) # Start the daemon. cmd = ['git', 'daemon', '--export-all', '--base-path=' + self.repos_dir] if self.HOST == '127.0.0.1': cmd.append('--listen=127.0.0.1') logging.debug(cmd) self.gitdaemon = Popen(cmd, cwd=self.repos_dir) def _commit_svn(self, tree): self._genTree(self.svn_root, tree) commit_svn(self.svn_root) if self.svn_revs and self.svn_revs[-1]: new_tree = self.svn_revs[-1].copy() new_tree.update(tree) else: new_tree = tree.copy() self.svn_revs.append(new_tree) def _commit_git(self, repo, tree): repo_root = join(self.git_root, repo) self._genTree(repo_root, tree) hash = commit_git(repo_root) if self.git_hashes[repo]: new_tree = self.git_hashes[repo][-1][1].copy() new_tree.update(tree) else: new_tree = tree.copy() self.git_hashes[repo].append((hash, new_tree)) class FakeReposTestBase(unittest.TestCase): """This is vaguely inspired by twisted.""" # Replace this in your subclass. CLASS_ROOT_DIR = None # static FakeRepos instance. FAKE_REPOS = FakeRepos() def setUp(self): unittest.TestCase.setUp(self) self.FAKE_REPOS.setUp() # Remove left overs and start fresh. if not self.CLASS_ROOT_DIR: self.CLASS_ROOT_DIR = join(self.FAKE_REPOS.trial_dir(), 'smoke') self.root_dir = join(self.CLASS_ROOT_DIR, self.id()) rmtree(self.root_dir) os.makedirs(self.root_dir) self.svn_base = 'svn://%s/svn/' % self.FAKE_REPOS.HOST self.git_base = 'git://%s/git/' % self.FAKE_REPOS.HOST def tearDown(self): if not self.FAKE_REPOS.SHOULD_LEAK: rmtree(self.root_dir) def checkString(self, expected, result): """Prints the diffs to ease debugging.""" if expected != result: # Strip the begining while expected and result and expected[0] == result[0]: expected = expected[1:] result = result[1:] # The exception trace makes it hard to read so dump it too. if '\n' in result: print result self.assertEquals(expected, result) def check(self, expected, results): """Checks stdout, stderr, retcode.""" self.checkString(expected[0], results[0]) self.checkString(expected[1], results[1]) self.assertEquals(expected[2], results[2]) def assertTree(self, tree, tree_root=None): """Diff the checkout tree with a dict.""" if not tree_root: tree_root = self.root_dir actual = read_tree(tree_root) diff = dict_diff(tree, actual) if diff: logging.debug('Actual %s\n%s' % (tree_root, pprint.pformat(actual))) logging.debug('Expected\n%s' % pprint.pformat(tree)) logging.debug('Diff\n%s' % pprint.pformat(diff)) self.assertEquals(diff, []) def main(argv): fake = FakeRepos() print 'Using %s' % fake.trial_dir() try: fake.setUp() print('Fake setup, press enter to quit or Ctrl-C to keep the checkouts.') sys.stdin.readline() except KeyboardInterrupt: fake.SHOULD_LEAK = True return 0 if __name__ == '__main__': sys.exit(main(sys.argv))