depot_tools: Remove unused files.

- appengine_mapper.py
- checkout.py
- patch.py
- testing_support/gerrit_test_case.py
- testing_support/patches_data.py
- tests/checkout_test.py
- tests/patch_test.py

Bug: 984182
Change-Id: I2d1ccb1dc41d7034f63043aa87bca3bca4e18294
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/1727401
Reviewed-by: Robbie Iannucci <iannucci@chromium.org>
Commit-Queue: Edward Lesmes <ehmaldonado@chromium.org>
changes/01/1727401/6
Edward Lemur 6 years ago committed by Commit Bot
parent c0758331ea
commit 364640e249

@ -1,23 +0,0 @@
# Copyright (c) 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Ensures that all depot_tools talks directly to appengine to avoid SNI."""
import urlparse
mapping = {
'codereview.chromium.org': 'chromiumcodereview.appspot.com',
'crashpad.chromium.org': 'crashpad-home.appspot.com',
'bugs.chromium.org': 'monorail-prod.appspot.com',
'bugs-staging.chromium.org': 'monorail-staging.appspot.com',
}
def MapUrl(url):
parts = list(urlparse.urlsplit(url))
new_netloc = mapping.get(parts[1])
if new_netloc:
parts[1] = new_netloc
return urlparse.urlunsplit(parts)

@ -1,433 +0,0 @@
# coding=utf-8
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Manages a project checkout.
Includes support only for git.
"""
from __future__ import print_function
import fnmatch
import logging
import os
import re
import shutil
import subprocess
import sys
import tempfile
# The configparser module was renamed in Python 3.
try:
import configparser
except ImportError:
import ConfigParser as configparser
import patch
import scm
import subprocess2
if sys.platform in ('cygwin', 'win32'):
# Disable timeouts on Windows since we can't have shells with timeouts.
GLOBAL_TIMEOUT = None
FETCH_TIMEOUT = None
else:
# Default timeout of 15 minutes.
GLOBAL_TIMEOUT = 15*60
# Use a larger timeout for checkout since it can be a genuinely slower
# operation.
FETCH_TIMEOUT = 30*60
def get_code_review_setting(path, key,
codereview_settings_file='codereview.settings'):
"""Parses codereview.settings and return the value for the key if present.
Don't cache the values in case the file is changed."""
# TODO(maruel): Do not duplicate code.
settings = {}
try:
settings_file = open(os.path.join(path, codereview_settings_file), 'r')
try:
for line in settings_file.readlines():
if not line or line.startswith('#'):
continue
if not ':' in line:
# Invalid file.
return None
k, v = line.split(':', 1)
settings[k.strip()] = v.strip()
finally:
settings_file.close()
except IOError:
return None
return settings.get(key, None)
def align_stdout(stdout):
"""Returns the aligned output of multiple stdouts."""
output = ''
for item in stdout:
item = item.strip()
if not item:
continue
output += ''.join(' %s\n' % line for line in item.splitlines())
return output
class PatchApplicationFailed(Exception):
"""Patch failed to be applied."""
def __init__(self, errors, verbose):
super(PatchApplicationFailed, self).__init__(errors, verbose)
self.errors = errors
self.verbose = verbose
def __str__(self):
out = []
for e in self.errors:
p, status = e
if p and p.filename:
out.append('Failed to apply patch for %s:' % p.filename)
if status:
out.append(status)
if p and self.verbose:
out.append('Patch: %s' % p.dump())
return '\n'.join(out)
class CheckoutBase(object):
# Set to None to have verbose output.
VOID = subprocess2.VOID
def __init__(self, root_dir, project_name, post_processors):
"""
Args:
post_processor: list of lambda(checkout, patches) to call on each of the
modified files.
"""
super(CheckoutBase, self).__init__()
self.root_dir = root_dir
self.project_name = project_name
if self.project_name is None:
self.project_path = self.root_dir
else:
self.project_path = os.path.join(self.root_dir, self.project_name)
# Only used for logging purposes.
self._last_seen_revision = None
self.post_processors = post_processors
assert self.root_dir
assert self.project_path
assert os.path.isabs(self.project_path)
def get_settings(self, key):
return get_code_review_setting(self.project_path, key)
def prepare(self, revision):
"""Checks out a clean copy of the tree and removes any local modification.
This function shouldn't throw unless the remote repository is inaccessible,
there is no free disk space or hard issues like that.
Args:
revision: The revision it should sync to, SCM specific.
"""
raise NotImplementedError()
def apply_patch(self, patches, post_processors=None, verbose=False):
"""Applies a patch and returns the list of modified files.
This function should throw patch.UnsupportedPatchFormat or
PatchApplicationFailed when relevant.
Args:
patches: patch.PatchSet object.
"""
raise NotImplementedError()
def commit(self, commit_message, user):
"""Commits the patch upstream, while impersonating 'user'."""
raise NotImplementedError()
def revisions(self, rev1, rev2):
"""Returns the count of revisions from rev1 to rev2, e.g. len(]rev1, rev2]).
If rev2 is None, it means 'HEAD'.
Returns None if there is no link between the two.
"""
raise NotImplementedError()
class GitCheckout(CheckoutBase):
"""Manages a git checkout."""
def __init__(self, root_dir, project_name, remote_branch, git_url,
commit_user, post_processors=None):
super(GitCheckout, self).__init__(root_dir, project_name, post_processors)
self.git_url = git_url
self.commit_user = commit_user
self.remote_branch = remote_branch
# The working branch where patches will be applied. It will track the
# remote branch.
self.working_branch = 'working_branch'
# There is no reason to not hardcode origin.
self.remote = 'origin'
# There is no reason to not hardcode master.
self.master_branch = 'master'
def prepare(self, revision):
"""Resets the git repository in a clean state.
Checks it out if not present and deletes the working branch.
"""
assert self.remote_branch
assert self.git_url
if not os.path.isdir(self.project_path):
# Clone the repo if the directory is not present.
logging.info(
'Checking out %s in %s', self.project_name, self.project_path)
self._check_call_git(
['clone', self.git_url, '-b', self.remote_branch, self.project_path],
cwd=None, timeout=FETCH_TIMEOUT)
else:
# Throw away all uncommitted changes in the existing checkout.
self._check_call_git(['checkout', self.remote_branch])
self._check_call_git(
['reset', '--hard', '--quiet',
'%s/%s' % (self.remote, self.remote_branch)])
if revision:
try:
# Look if the commit hash already exist. If so, we can skip a
# 'git fetch' call.
revision = self._check_output_git(['rev-parse', revision]).rstrip()
except subprocess.CalledProcessError:
self._check_call_git(
['fetch', self.remote, self.remote_branch, '--quiet'])
revision = self._check_output_git(['rev-parse', revision]).rstrip()
self._check_call_git(['checkout', '--force', '--quiet', revision])
else:
branches, active = self._branches()
if active != self.master_branch:
self._check_call_git(
['checkout', '--force', '--quiet', self.master_branch])
self._sync_remote_branch()
if self.working_branch in branches:
self._call_git(['branch', '-D', self.working_branch])
return self._get_head_commit_hash()
def _sync_remote_branch(self):
"""Syncs the remote branch."""
# We do a 'git pull origin master:refs/remotes/origin/master' instead of
# 'git pull origin master' because from the manpage for git-pull:
# A parameter <ref> without a colon is equivalent to <ref>: when
# pulling/fetching, so it merges <ref> into the current branch without
# storing the remote branch anywhere locally.
remote_tracked_path = 'refs/remotes/%s/%s' % (
self.remote, self.remote_branch)
self._check_call_git(
['pull', self.remote,
'%s:%s' % (self.remote_branch, remote_tracked_path),
'--quiet'])
def _get_head_commit_hash(self):
"""Gets the current revision (in unicode) from the local branch."""
return unicode(self._check_output_git(['rev-parse', 'HEAD']).strip())
def apply_patch(self, patches, post_processors=None, verbose=False):
"""Applies a patch on 'working_branch' and switches to it.
The changes remain staged on the current branch.
"""
post_processors = post_processors or self.post_processors or []
# It this throws, the checkout is corrupted. Maybe worth deleting it and
# trying again?
if self.remote_branch:
self._check_call_git(
['checkout', '-b', self.working_branch, '-t', self.remote_branch,
'--quiet'])
errors = []
for index, p in enumerate(patches):
stdout = []
try:
filepath = os.path.join(self.project_path, p.filename)
if p.is_delete:
if (not os.path.exists(filepath) and
any(p1.source_filename == p.filename for p1 in patches[0:index])):
# The file was already deleted if a prior patch with file rename
# was already processed because 'git apply' did it for us.
pass
else:
stdout.append(self._check_output_git(['rm', p.filename]))
assert(not os.path.exists(filepath))
stdout.append('Deleted.')
else:
dirname = os.path.dirname(p.filename)
full_dir = os.path.join(self.project_path, dirname)
if dirname and not os.path.isdir(full_dir):
os.makedirs(full_dir)
stdout.append('Created missing directory %s.' % dirname)
if p.is_binary:
content = p.get()
with open(filepath, 'wb') as f:
f.write(content)
stdout.append('Added binary file %d bytes' % len(content))
cmd = ['add', p.filename]
if verbose:
cmd.append('--verbose')
stdout.append(self._check_output_git(cmd))
else:
# No need to do anything special with p.is_new or if not
# p.diff_hunks. git apply manages all that already.
cmd = ['apply', '--index', '-3', '-p%s' % p.patchlevel]
if verbose:
cmd.append('--verbose')
stdout.append(self._check_output_git(cmd, stdin=p.get(True)))
for post in post_processors:
post(self, p)
if verbose:
print(p.filename)
print(align_stdout(stdout))
except OSError as e:
errors.append((p, '%s%s' % (align_stdout(stdout), e)))
except subprocess.CalledProcessError as e:
errors.append((p,
'While running %s;\n%s%s' % (
' '.join(e.cmd),
align_stdout(stdout),
align_stdout([getattr(e, 'stdout', '')]))))
if errors:
raise PatchApplicationFailed(errors, verbose)
found_files = self._check_output_git(
['-c', 'core.quotePath=false', 'diff', '--ignore-submodules',
'--name-only', '--staged']).splitlines(False)
if sorted(patches.filenames) != sorted(found_files):
extra_files = sorted(set(found_files) - set(patches.filenames))
unpatched_files = sorted(set(patches.filenames) - set(found_files))
if extra_files:
print('Found extra files: %r' % extra_files)
if unpatched_files:
print('Found unpatched files: %r' % unpatched_files)
def commit(self, commit_message, user):
"""Commits, updates the commit message and pushes."""
# TODO(hinoka): CQ no longer uses this, I think its deprecated.
# Delete this.
assert self.commit_user
assert isinstance(commit_message, unicode)
current_branch = self._check_output_git(
['rev-parse', '--abbrev-ref', 'HEAD']).strip()
assert current_branch == self.working_branch
commit_cmd = ['commit', '-m', commit_message]
if user and user != self.commit_user:
# We do not have the first or last name of the user, grab the username
# from the email and call it the original author's name.
# TODO(rmistry): Do not need the below if user is already in
# "Name <email>" format.
name = user.split('@')[0]
commit_cmd.extend(['--author', '%s <%s>' % (name, user)])
self._check_call_git(commit_cmd)
# Push to the remote repository.
self._check_call_git(
['push', 'origin', '%s:%s' % (self.working_branch, self.remote_branch),
'--quiet'])
# Get the revision after the push.
revision = self._get_head_commit_hash()
# Switch back to the remote_branch and sync it.
self._check_call_git(['checkout', self.remote_branch])
self._sync_remote_branch()
# Delete the working branch since we are done with it.
self._check_call_git(['branch', '-D', self.working_branch])
return revision
def _check_call_git(self, args, **kwargs):
kwargs.setdefault('cwd', self.project_path)
kwargs.setdefault('stdout', self.VOID)
kwargs.setdefault('timeout', GLOBAL_TIMEOUT)
return subprocess2.check_call_out(['git'] + args, **kwargs)
def _call_git(self, args, **kwargs):
"""Like check_call but doesn't throw on failure."""
kwargs.setdefault('cwd', self.project_path)
kwargs.setdefault('stdout', self.VOID)
kwargs.setdefault('timeout', GLOBAL_TIMEOUT)
return subprocess2.call(['git'] + args, **kwargs)
def _check_output_git(self, args, **kwargs):
kwargs.setdefault('cwd', self.project_path)
kwargs.setdefault('timeout', GLOBAL_TIMEOUT)
return subprocess2.check_output(
['git'] + args, stderr=subprocess2.STDOUT, **kwargs)
def _branches(self):
"""Returns the list of branches and the active one."""
out = self._check_output_git(['branch']).splitlines(False)
branches = [l[2:] for l in out]
active = None
for l in out:
if l.startswith('*'):
active = l[2:]
break
return branches, active
def revisions(self, rev1, rev2):
"""Returns the number of actual commits between both hash."""
self._fetch_remote()
rev2 = rev2 or '%s/%s' % (self.remote, self.remote_branch)
# Revision range is ]rev1, rev2] and ordering matters.
try:
out = self._check_output_git(
['log', '--format="%H"' , '%s..%s' % (rev1, rev2)])
except subprocess.CalledProcessError:
return None
return len(out.splitlines())
def _fetch_remote(self):
"""Fetches the remote without rebasing."""
# git fetch is always verbose even with -q, so redirect its output.
self._check_output_git(['fetch', self.remote, self.remote_branch],
timeout=FETCH_TIMEOUT)
class ReadOnlyCheckout(object):
"""Converts a checkout into a read-only one."""
def __init__(self, checkout, post_processors=None):
super(ReadOnlyCheckout, self).__init__()
self.checkout = checkout
self.post_processors = (post_processors or []) + (
self.checkout.post_processors or [])
def prepare(self, revision):
return self.checkout.prepare(revision)
def get_settings(self, key):
return self.checkout.get_settings(key)
def apply_patch(self, patches, post_processors=None, verbose=False):
return self.checkout.apply_patch(
patches, post_processors or self.post_processors, verbose)
def commit(self, message, user): # pylint: disable=no-self-use
logging.info('Would have committed for %s with message: %s' % (
user, message))
return 'FAKE'
def revisions(self, rev1, rev2):
return self.checkout.revisions(rev1, rev2)
@property
def project_name(self):
return self.checkout.project_name
@property
def project_path(self):
return self.checkout.project_path

@ -40,7 +40,6 @@ import zlib
from third_party import colorama
from third_party import httplib2
import auth
import checkout
import clang_format
import dart_format
import setup_color

@ -1,548 +0,0 @@
# coding=utf-8
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Utility functions to handle patches."""
import posixpath
import os
import re
class UnsupportedPatchFormat(Exception):
def __init__(self, filename, status):
super(UnsupportedPatchFormat, self).__init__(filename, status)
self.filename = filename
self.status = status
def __str__(self):
out = 'Can\'t process patch for file %s.' % self.filename
if self.status:
out += '\n%s' % self.status
return out
class FilePatchBase(object):
"""Defines a single file being modified.
'/' is always used instead of os.sep for consistency.
"""
is_delete = False
is_binary = False
is_new = False
def __init__(self, filename):
assert self.__class__ is not FilePatchBase
self.filename = self._process_filename(filename)
# Set when the file is copied or moved.
self.source_filename = None
@property
def filename_utf8(self):
return self.filename.encode('utf-8')
@property
def source_filename_utf8(self):
if self.source_filename is not None:
return self.source_filename.encode('utf-8')
@staticmethod
def _process_filename(filename):
filename = filename.replace('\\', '/')
# Blacklist a few characters for simplicity.
for i in ('$', '..', '\'', '"', '<', '>', ':', '|', '?', '*'):
if i in filename:
raise UnsupportedPatchFormat(
filename, 'Can\'t use \'%s\' in filename.' % i)
if filename.startswith('/'):
raise UnsupportedPatchFormat(
filename, 'Filename can\'t start with \'/\'.')
if filename == 'CON':
raise UnsupportedPatchFormat(
filename, 'Filename can\'t be \'CON\'.')
if re.match(r'COM\d', filename):
raise UnsupportedPatchFormat(
filename, 'Filename can\'t be \'%s\'.' % filename)
return filename
def set_relpath(self, relpath):
if not relpath:
return
relpath = relpath.replace('\\', '/')
if relpath[0] == '/':
self._fail('Relative path starts with %s' % relpath[0])
self.filename = self._process_filename(
posixpath.join(relpath, self.filename))
if self.source_filename:
self.source_filename = self._process_filename(
posixpath.join(relpath, self.source_filename))
def _fail(self, msg):
"""Shortcut function to raise UnsupportedPatchFormat."""
raise UnsupportedPatchFormat(self.filename, msg)
def __str__(self):
# Use a status-like board.
out = ''
if self.is_binary:
out += 'B'
else:
out += ' '
if self.is_delete:
out += 'D'
else:
out += ' '
if self.is_new:
out += 'N'
else:
out += ' '
if self.source_filename:
out += 'R'
else:
out += ' '
out += ' '
if self.source_filename:
out += '%s->' % self.source_filename_utf8
return out + self.filename_utf8
def dump(self):
"""Dumps itself in a verbose way to help diagnosing."""
return str(self)
class FilePatchDelete(FilePatchBase):
"""Deletes a file."""
is_delete = True
def __init__(self, filename, is_binary):
super(FilePatchDelete, self).__init__(filename)
self.is_binary = is_binary
class FilePatchBinary(FilePatchBase):
"""Content of a new binary file."""
is_binary = True
def __init__(self, filename, data, svn_properties, is_new):
super(FilePatchBinary, self).__init__(filename)
self.data = data
self.svn_properties = svn_properties or []
self.is_new = is_new
def get(self):
return self.data
def __str__(self):
return str(super(FilePatchBinary, self)) + ' %d bytes' % len(self.data)
class Hunk(object):
"""Parsed hunk data container."""
def __init__(self, start_src, lines_src, start_dst, lines_dst):
self.start_src = start_src
self.lines_src = lines_src
self.start_dst = start_dst
self.lines_dst = lines_dst
self.variation = self.lines_dst - self.lines_src
self.text = []
def __repr__(self):
return '%s<(%d, %d) to (%d, %d)>' % (
self.__class__.__name__,
self.start_src, self.lines_src, self.start_dst, self.lines_dst)
class FilePatchDiff(FilePatchBase):
"""Patch for a single file."""
def __init__(self, filename, diff, svn_properties):
super(FilePatchDiff, self).__init__(filename)
if not diff:
self._fail('File doesn\'t have a diff.')
self.diff_header, self.diff_hunks = self._split_header(diff)
self.svn_properties = svn_properties or []
self.is_git_diff = self._is_git_diff_header(self.diff_header)
self.patchlevel = 0
if self.is_git_diff:
self._verify_git_header()
else:
self._verify_svn_header()
self.hunks = self._split_hunks()
if self.source_filename and not self.is_new:
self._fail('If source_filename is set, is_new must be also be set')
def get(self, for_git):
if for_git or not self.source_filename:
return self.diff_header + self.diff_hunks
else:
# patch is stupid. It patches the source_filename instead so get rid of
# any source_filename reference if needed.
return (
self.diff_header.replace(
self.source_filename_utf8, self.filename_utf8) +
self.diff_hunks)
def set_relpath(self, relpath):
old_filename = self.filename_utf8
old_source_filename = self.source_filename_utf8 or self.filename_utf8
super(FilePatchDiff, self).set_relpath(relpath)
# Update the header too.
filename = self.filename_utf8
source_filename = self.source_filename_utf8 or self.filename_utf8
lines = self.diff_header.splitlines(True)
for i, line in enumerate(lines):
if line.startswith('diff --git'):
lines[i] = line.replace(
'a/' + old_source_filename, source_filename).replace(
'b/' + old_filename, filename)
elif re.match(r'^\w+ from .+$', line) or line.startswith('---'):
lines[i] = line.replace(old_source_filename, source_filename)
elif re.match(r'^\w+ to .+$', line) or line.startswith('+++'):
lines[i] = line.replace(old_filename, filename)
self.diff_header = ''.join(lines)
def _split_header(self, diff):
"""Splits a diff in two: the header and the hunks."""
header = []
hunks = diff.splitlines(True)
while hunks:
header.append(hunks.pop(0))
if header[-1].startswith('--- '):
break
else:
# Some diff may not have a ---/+++ set like a git rename with no change or
# a svn diff with only property change.
pass
if hunks:
if not hunks[0].startswith('+++ '):
self._fail('Inconsistent header')
header.append(hunks.pop(0))
if hunks:
if not hunks[0].startswith('@@ '):
self._fail('Inconsistent hunk header')
# Mangle any \\ in the header to /.
header_lines = ('Index:', 'diff', 'copy', 'rename', '+++', '---')
basename = os.path.basename(self.filename_utf8)
for i in xrange(len(header)):
if (header[i].split(' ', 1)[0] in header_lines or
header[i].endswith(basename)):
header[i] = header[i].replace('\\', '/')
return ''.join(header), ''.join(hunks)
@staticmethod
def _is_git_diff_header(diff_header):
"""Returns True if the diff for a single files was generated with git."""
# Delete: http://codereview.chromium.org/download/issue6368055_22_29.diff
# Rename partial change:
# http://codereview.chromium.org/download/issue6250123_3013_6010.diff
# Rename no change:
# http://codereview.chromium.org/download/issue6287022_3001_4010.diff
return any(l.startswith('diff --git') for l in diff_header.splitlines())
def _split_hunks(self):
"""Splits the hunks and does verification."""
hunks = []
for line in self.diff_hunks.splitlines(True):
if line.startswith('@@'):
match = re.match(r'^@@ -([\d,]+) \+([\d,]+) @@.*$', line)
# File add will result in "-0,0 +1" but file deletion will result in
# "-1,N +0,0" where N is the number of lines deleted. That's from diff
# and svn diff. git diff doesn't exhibit this behavior.
# svn diff for a single line file rewrite "@@ -1 +1 @@". Fun.
# "@@ -1 +1,N @@" is also valid where N is the length of the new file.
if not match:
self._fail('Hunk header is unparsable')
count = match.group(1).count(',')
if not count:
start_src = int(match.group(1))
lines_src = 1
elif count == 1:
start_src, lines_src = map(int, match.group(1).split(',', 1))
else:
self._fail('Hunk header is malformed')
count = match.group(2).count(',')
if not count:
start_dst = int(match.group(2))
lines_dst = 1
elif count == 1:
start_dst, lines_dst = map(int, match.group(2).split(',', 1))
else:
self._fail('Hunk header is malformed')
new_hunk = Hunk(start_src, lines_src, start_dst, lines_dst)
if hunks:
if new_hunk.start_src <= hunks[-1].start_src:
self._fail('Hunks source lines are not ordered')
if new_hunk.start_dst <= hunks[-1].start_dst:
self._fail('Hunks destination lines are not ordered')
hunks.append(new_hunk)
continue
hunks[-1].text.append(line)
if len(hunks) == 1:
if hunks[0].start_src == 0 and hunks[0].lines_src == 0:
self.is_new = True
if hunks[0].start_dst == 0 and hunks[0].lines_dst == 0:
self.is_delete = True
if self.is_new and self.is_delete:
self._fail('Hunk header is all 0')
if not self.is_new and not self.is_delete:
for hunk in hunks:
variation = (
len([1 for i in hunk.text if i.startswith('+')]) -
len([1 for i in hunk.text if i.startswith('-')]))
if variation != hunk.variation:
self._fail(
'Hunk header is incorrect: %d vs %d; %r' % (
variation, hunk.variation, hunk))
if not hunk.start_src:
self._fail(
'Hunk header start line is incorrect: %d' % hunk.start_src)
if not hunk.start_dst:
self._fail(
'Hunk header start line is incorrect: %d' % hunk.start_dst)
hunk.start_src -= 1
hunk.start_dst -= 1
if self.is_new and hunks:
hunks[0].start_dst -= 1
if self.is_delete and hunks:
hunks[0].start_src -= 1
return hunks
def mangle(self, string):
"""Mangle a file path."""
return '/'.join(string.replace('\\', '/').split('/')[self.patchlevel:])
def _verify_git_header(self):
"""Sanity checks the header.
Expects the following format:
<garbage>
diff --git (|a/)<filename> (|b/)<filename>
<similarity>
<filemode changes>
<index>
<copy|rename from>
<copy|rename to>
--- <filename>
+++ <filename>
Everything is optional except the diff --git line.
"""
lines = self.diff_header.splitlines()
# Verify the diff --git line.
old = None
new = None
while lines:
match = re.match(r'^diff \-\-git (.*?) (.*)$', lines.pop(0))
if not match:
continue
if match.group(1).startswith('a/') and match.group(2).startswith('b/'):
self.patchlevel = 1
old = self.mangle(match.group(1))
new = self.mangle(match.group(2))
# The rename is about the new file so the old file can be anything.
if new not in (self.filename_utf8, 'dev/null'):
self._fail('Unexpected git diff output name %s.' % new)
if old == 'dev/null' and new == 'dev/null':
self._fail('Unexpected /dev/null git diff.')
break
if not old or not new:
self._fail('Unexpected git diff; couldn\'t find git header.')
if old not in (self.filename_utf8, 'dev/null'):
# Copy or rename.
self.source_filename = old.decode('utf-8')
self.is_new = True
last_line = ''
while lines:
line = lines.pop(0)
self._verify_git_header_process_line(lines, line, last_line)
last_line = line
# Cheap check to make sure the file name is at least mentioned in the
# 'diff' header. That the only remaining invariant.
if not self.filename_utf8 in self.diff_header:
self._fail('Diff seems corrupted.')
def _verify_git_header_process_line(self, lines, line, last_line):
"""Processes a single line of the header.
Returns True if it should continue looping.
Format is described to
http://www.kernel.org/pub/software/scm/git/docs/git-diff.html
"""
match = re.match(r'^(rename|copy) from (.+)$', line)
old = self.source_filename_utf8 or self.filename_utf8
if match:
if old != match.group(2):
self._fail('Unexpected git diff input name for line %s.' % line)
if not lines or not lines[0].startswith('%s to ' % match.group(1)):
self._fail(
'Confused %s from/to git diff for line %s.' %
(match.group(1), line))
return
match = re.match(r'^(rename|copy) to (.+)$', line)
if match:
if self.filename_utf8 != match.group(2):
self._fail('Unexpected git diff output name for line %s.' % line)
if not last_line.startswith('%s from ' % match.group(1)):
self._fail(
'Confused %s from/to git diff for line %s.' %
(match.group(1), line))
return
match = re.match(r'^deleted file mode (\d{6})$', line)
if match:
# It is necessary to parse it because there may be no hunk, like when the
# file was empty.
self.is_delete = True
return
match = re.match(r'^new(| file) mode (\d{6})$', line)
if match:
mode = match.group(2)
# Only look at owner ACL for executable.
if bool(int(mode[4]) & 1):
self.svn_properties.append(('svn:executable', '.'))
elif not self.source_filename and self.is_new:
# It's a new file, not from a rename/copy, then there's no property to
# delete.
self.svn_properties.append(('svn:executable', None))
return
match = re.match(r'^--- (.*)$', line)
if match:
if last_line[:3] in ('---', '+++'):
self._fail('--- and +++ are reversed')
if match.group(1) == '/dev/null':
self.is_new = True
elif self.mangle(match.group(1)) != old:
# git patches are always well formatted, do not allow random filenames.
self._fail('Unexpected git diff: %s != %s.' % (old, match.group(1)))
if not lines or not lines[0].startswith('+++'):
self._fail('Missing git diff output name.')
return
match = re.match(r'^\+\+\+ (.*)$', line)
if match:
if not last_line.startswith('---'):
self._fail('Unexpected git diff: --- not following +++.')
if '/dev/null' == match.group(1):
self.is_delete = True
elif self.filename_utf8 != self.mangle(match.group(1)):
self._fail(
'Unexpected git diff: %s != %s.' % (self.filename, match.group(1)))
if lines:
self._fail('Crap after +++')
# We're done.
return
def _verify_svn_header(self):
"""Sanity checks the header.
A svn diff can contain only property changes, in that case there will be no
proper header. To make things worse, this property change header is
localized.
"""
lines = self.diff_header.splitlines()
last_line = ''
while lines:
line = lines.pop(0)
self._verify_svn_header_process_line(lines, line, last_line)
last_line = line
# Cheap check to make sure the file name is at least mentioned in the
# 'diff' header. That the only remaining invariant.
if not self.filename_utf8 in self.diff_header:
self._fail('Diff seems corrupted.')
def _verify_svn_header_process_line(self, lines, line, last_line):
"""Processes a single line of the header.
Returns True if it should continue looping.
"""
match = re.match(r'^--- ([^\t]+).*$', line)
if match:
if last_line[:3] in ('---', '+++'):
self._fail('--- and +++ are reversed')
if match.group(1) == '/dev/null':
self.is_new = True
elif self.mangle(match.group(1)) != self.filename_utf8:
# guess the source filename.
self.source_filename = match.group(1).decode('utf-8')
self.is_new = True
if not lines or not lines[0].startswith('+++'):
self._fail('Nothing after header.')
return
match = re.match(r'^\+\+\+ ([^\t]+).*$', line)
if match:
if not last_line.startswith('---'):
self._fail('Unexpected diff: --- not following +++.')
if match.group(1) == '/dev/null':
self.is_delete = True
elif self.mangle(match.group(1)) != self.filename_utf8:
self._fail('Unexpected diff: %s.' % match.group(1))
if lines:
self._fail('Crap after +++')
# We're done.
return
def dump(self):
"""Dumps itself in a verbose way to help diagnosing."""
return str(self) + '\n' + self.get(True)
class PatchSet(object):
"""A list of FilePatch* objects."""
def __init__(self, patches):
for p in patches:
assert isinstance(p, FilePatchBase)
def key(p):
"""Sort by ordering of application.
File move are first.
Deletes are last.
"""
# The bool is necessary because None < 'string' but the reverse is needed.
return (
p.is_delete,
# False is before True, so files *with* a source file will be first.
not bool(p.source_filename),
p.source_filename_utf8,
p.filename_utf8)
self.patches = sorted(patches, key=key)
def set_relpath(self, relpath):
"""Used to offset the patch into a subdirectory."""
for patch in self.patches:
patch.set_relpath(relpath)
def __iter__(self):
for patch in self.patches:
yield patch
def __getitem__(self, key):
return self.patches[key]
@property
def filenames(self):
return [p.filename for p in self.patches]

@ -1,480 +0,0 @@
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test framework for code that interacts with gerrit.
class GerritTestCase
--------------------------------------------------------------------------------
This class initializes and runs an a gerrit instance on localhost. To use the
framework, define a class that extends GerritTestCase, and then do standard
python unittest development as described here:
http://docs.python.org/2.7/library/unittest.html#basic-example
When your test code runs, the framework will:
- Download the latest stable(-ish) binary release of the gerrit code.
- Start up a live gerrit instance running in a temp directory on the localhost.
- Set up a single gerrit user account with admin priveleges.
- Supply credential helpers for interacting with the gerrit instance via http
or ssh.
Refer to depot_tools/testing_support/gerrit-init.sh for details about how the
gerrit instance is set up, and refer to helper methods defined below
(createProject, cloneProject, uploadChange, etc.) for ways to interact with the
gerrit instance from your test methods.
class RepoTestCase
--------------------------------------------------------------------------------
This class extends GerritTestCase, and creates a set of project repositories
and a manifest repository that can be used in conjunction with the 'repo' tool.
Each test method will initialize and sync a brand-new repo working directory.
The 'repo' command may be invoked in a subprocess as part of your tests.
One gotcha: 'repo upload' will always attempt to use the ssh interface to talk
to gerrit.
"""
from __future__ import print_function
import collections
import errno
import netrc
import os
import re
import shutil
import signal
import socket
import stat
import subprocess
import sys
import tempfile
import unittest
import urllib
import gerrit_util
DEPOT_TOOLS_DIR = os.path.normpath(os.path.join(
os.path.realpath(__file__), '..', '..'))
# When debugging test code, it's sometimes helpful to leave the test gerrit
# instance intact and running after the test code exits. Setting TEARDOWN
# to False will do that.
TEARDOWN = True
class GerritTestCase(unittest.TestCase):
"""Test class for tests that interact with a gerrit server.
The class setup creates and launches a stand-alone gerrit instance running on
localhost, for test methods to interact with. Class teardown stops and
deletes the gerrit instance.
Note that there is a single gerrit instance for ALL test methods in a
GerritTestCase sub-class.
"""
COMMIT_RE = re.compile(r'^commit ([0-9a-fA-F]{40})$')
CHANGEID_RE = re.compile(r'^\s+Change-Id:\s*(\S+)$')
DEVNULL = open(os.devnull, 'w')
TEST_USERNAME = 'test-username'
TEST_EMAIL = 'test-username@test.org'
GerritInstance = collections.namedtuple('GerritInstance', [
'credential_file',
'gerrit_dir',
'gerrit_exe',
'gerrit_host',
'gerrit_pid',
'gerrit_url',
'git_dir',
'git_host',
'git_url',
'http_port',
'netrc_file',
'ssh_ident',
'ssh_port',
])
@classmethod
def check_call(cls, *args, **kwargs):
kwargs.setdefault('stdout', cls.DEVNULL)
kwargs.setdefault('stderr', cls.DEVNULL)
subprocess.check_call(*args, **kwargs)
@classmethod
def check_output(cls, *args, **kwargs):
kwargs.setdefault('stderr', cls.DEVNULL)
return subprocess.check_output(*args, **kwargs)
@classmethod
def _create_gerrit_instance(cls, gerrit_dir):
gerrit_init_script = os.path.join(
DEPOT_TOOLS_DIR, 'testing_support', 'gerrit-init.sh')
http_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
http_sock.bind(('', 0))
http_port = str(http_sock.getsockname()[1])
ssh_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssh_sock.bind(('', 0))
ssh_port = str(ssh_sock.getsockname()[1])
# NOTE: this is not completely safe. These port numbers could be
# re-assigned by the OS between the calls to socket.close() and gerrit
# starting up. The only safe way to do this would be to pass file
# descriptors down to the gerrit process, which is not even remotely
# supported. Alas.
http_sock.close()
ssh_sock.close()
cls.check_call(['bash', gerrit_init_script, '--http-port', http_port,
'--ssh-port', ssh_port, gerrit_dir])
gerrit_exe = os.path.join(gerrit_dir, 'bin', 'gerrit.sh')
cls.check_call(['bash', gerrit_exe, 'start'])
with open(os.path.join(gerrit_dir, 'logs', 'gerrit.pid')) as fh:
gerrit_pid = int(fh.read().rstrip())
return cls.GerritInstance(
credential_file=os.path.join(gerrit_dir, 'tmp', '.git-credentials'),
gerrit_dir=gerrit_dir,
gerrit_exe=gerrit_exe,
gerrit_host='localhost:%s' % http_port,
gerrit_pid=gerrit_pid,
gerrit_url='http://localhost:%s' % http_port,
git_dir=os.path.join(gerrit_dir, 'git'),
git_host='%s/git' % gerrit_dir,
git_url='file://%s/git' % gerrit_dir,
http_port=http_port,
netrc_file=os.path.join(gerrit_dir, 'tmp', '.netrc'),
ssh_ident=os.path.join(gerrit_dir, 'tmp', 'id_rsa'),
ssh_port=ssh_port,)
@classmethod
def setUpClass(cls):
"""Sets up the gerrit instances in a class-specific temp dir."""
# Create gerrit instance.
gerrit_dir = tempfile.mkdtemp()
os.chmod(gerrit_dir, 0o700)
gi = cls.gerrit_instance = cls._create_gerrit_instance(gerrit_dir)
# Set netrc file for http authentication.
cls.gerrit_util_netrc_orig = gerrit_util.NETRC
gerrit_util.NETRC = netrc.netrc(gi.netrc_file)
# gerrit_util.py defaults to using https, but for testing, it's much
# simpler to use http connections.
cls.gerrit_util_protocol_orig = gerrit_util.GERRIT_PROTOCOL
gerrit_util.GERRIT_PROTOCOL = 'http'
# Because we communicate with the test server via http, rather than https,
# libcurl won't add authentication headers to raw git requests unless the
# gerrit server returns 401. That works for pushes, but for read operations
# (like git-ls-remote), gerrit will simply omit any ref that requires
# authentication. By default gerrit doesn't permit anonymous read access to
# refs/meta/config. Override that behavior so tests can access
# refs/meta/config if necessary.
clone_path = os.path.join(gi.gerrit_dir, 'tmp', 'All-Projects')
cls._CloneProject('All-Projects', clone_path)
project_config = os.path.join(clone_path, 'project.config')
cls.check_call(['git', 'config', '--file', project_config, '--add',
'access.refs/meta/config.read', 'group Anonymous Users'])
cls.check_call(['git', 'add', project_config], cwd=clone_path)
cls.check_call(
['git', 'commit', '-m', 'Anonyous read for refs/meta/config'],
cwd=clone_path)
cls.check_call(['git', 'push', 'origin', 'HEAD:refs/meta/config'],
cwd=clone_path)
def setUp(self):
self.tempdir = tempfile.mkdtemp()
os.chmod(self.tempdir, 0o700)
def tearDown(self):
if TEARDOWN:
shutil.rmtree(self.tempdir)
@classmethod
def createProject(cls, name, description='Test project', owners=None,
submit_type='CHERRY_PICK'):
"""Create a project on the test gerrit server."""
if owners is None:
owners = ['Administrators']
body = {
'description': description,
'submit_type': submit_type,
'owners': owners,
}
path = 'projects/%s' % urllib.quote(name, '')
conn = gerrit_util.CreateHttpConn(
cls.gerrit_instance.gerrit_host, path, reqtype='PUT', body=body)
jmsg = gerrit_util.ReadHttpJsonResponse(conn, accept_statuses=[200, 201])
assert jmsg['name'] == name
@classmethod
def _post_clone_bookkeeping(cls, clone_path):
config_path = os.path.join(clone_path, '.git', 'config')
cls.check_call(
['git', 'config', '--file', config_path, 'user.email', cls.TEST_EMAIL])
cls.check_call(
['git', 'config', '--file', config_path, 'credential.helper',
'store --file=%s' % cls.gerrit_instance.credential_file])
@classmethod
def _CloneProject(cls, name, path):
"""Clone a project from the test gerrit server."""
gi = cls.gerrit_instance
parent_dir = os.path.dirname(path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
url = '/'.join((gi.gerrit_url, name))
cls.check_call(['git', 'clone', url, path])
cls._post_clone_bookkeeping(path)
# Install commit-msg hook to add Change-Id lines.
hook_path = os.path.join(path, '.git', 'hooks', 'commit-msg')
cls.check_call(['curl', '-o', hook_path,
'/'.join((gi.gerrit_url, 'tools/hooks/commit-msg'))])
os.chmod(hook_path, stat.S_IRWXU)
return path
def cloneProject(self, name, path=None):
"""Clone a project from the test gerrit server."""
if path is None:
path = os.path.basename(name)
if path.endswith('.git'):
path = path[:-4]
path = os.path.join(self.tempdir, path)
return self._CloneProject(name, path)
@classmethod
def _CreateCommit(cls, clone_path, fn=None, msg=None, text=None):
"""Create a commit in the given git checkout."""
if not fn:
fn = 'test-file.txt'
if not msg:
msg = 'Test Message'
if not text:
text = 'Another day, another dollar.'
fpath = os.path.join(clone_path, fn)
with open(fpath, 'a') as fh:
fh.write('%s\n' % text)
cls.check_call(['git', 'add', fn], cwd=clone_path)
cls.check_call(['git', 'commit', '-m', msg], cwd=clone_path)
return cls._GetCommit(clone_path)
def createCommit(self, clone_path, fn=None, msg=None, text=None):
"""Create a commit in the given git checkout."""
clone_path = os.path.join(self.tempdir, clone_path)
return self._CreateCommit(clone_path, fn, msg, text)
@classmethod
def _GetCommit(cls, clone_path, ref='HEAD'):
"""Get the sha1 and change-id for a ref in the git checkout."""
log_proc = cls.check_output(['git', 'log', '-n', '1', ref], cwd=clone_path)
sha1 = None
change_id = None
for line in log_proc.splitlines():
match = cls.COMMIT_RE.match(line)
if match:
sha1 = match.group(1)
continue
match = cls.CHANGEID_RE.match(line)
if match:
change_id = match.group(1)
continue
assert sha1
assert change_id
return (sha1, change_id)
def getCommit(self, clone_path, ref='HEAD'):
"""Get the sha1 and change-id for a ref in the git checkout."""
clone_path = os.path.join(self.tempdir, clone_path)
return self._GetCommit(clone_path, ref)
@classmethod
def _UploadChange(cls, clone_path, branch='master', remote='origin'):
"""Create a gerrit CL from the HEAD of a git checkout."""
cls.check_call(
['git', 'push', remote, 'HEAD:refs/for/%s' % branch], cwd=clone_path)
def uploadChange(self, clone_path, branch='master', remote='origin'):
"""Create a gerrit CL from the HEAD of a git checkout."""
clone_path = os.path.join(self.tempdir, clone_path)
self._UploadChange(clone_path, branch, remote)
@classmethod
def _PushBranch(cls, clone_path, branch='master'):
"""Push a branch directly to gerrit, bypassing code review."""
cls.check_call(
['git', 'push', 'origin', 'HEAD:refs/heads/%s' % branch],
cwd=clone_path)
def pushBranch(self, clone_path, branch='master'):
"""Push a branch directly to gerrit, bypassing code review."""
clone_path = os.path.join(self.tempdir, clone_path)
self._PushBranch(clone_path, branch)
@classmethod
def createAccount(cls, name='Test User', email='test-user@test.org',
password=None, groups=None):
"""Create a new user account on gerrit."""
username = email.partition('@')[0]
gerrit_cmd = 'gerrit create-account %s --full-name "%s" --email %s' % (
username, name, email)
if password:
gerrit_cmd += ' --http-password "%s"' % password
if groups:
gerrit_cmd += ' '.join(['--group %s' % x for x in groups])
ssh_cmd = ['ssh', '-p', cls.gerrit_instance.ssh_port,
'-i', cls.gerrit_instance.ssh_ident,
'-o', 'NoHostAuthenticationForLocalhost=yes',
'-o', 'StrictHostKeyChecking=no',
'%s@localhost' % cls.TEST_USERNAME, gerrit_cmd]
cls.check_call(ssh_cmd)
@classmethod
def _stop_gerrit(cls, gerrit_instance):
"""Stops the running gerrit instance and deletes it."""
try:
# This should terminate the gerrit process.
cls.check_call(['bash', gerrit_instance.gerrit_exe, 'stop'])
finally:
try:
# cls.gerrit_pid should have already terminated. If it did, then
# os.waitpid will raise OSError.
os.waitpid(gerrit_instance.gerrit_pid, os.WNOHANG)
except OSError as e:
if e.errno == errno.ECHILD:
# If gerrit shut down cleanly, os.waitpid will land here.
# pylint: disable=lost-exception
return
# If we get here, the gerrit process is still alive. Send the process
# SIGTERM for good measure.
try:
os.kill(gerrit_instance.gerrit_pid, signal.SIGTERM)
except OSError:
if e.errno == errno.ESRCH:
# os.kill raised an error because the process doesn't exist. Maybe
# gerrit shut down cleanly after all.
# pylint: disable=lost-exception
return
# Announce that gerrit didn't shut down cleanly.
msg = 'Test gerrit server (pid=%d) did not shut down cleanly.' % (
gerrit_instance.gerrit_pid)
print(msg, file=sys.stderr)
@classmethod
def tearDownClass(cls):
gerrit_util.NETRC = cls.gerrit_util_netrc_orig
gerrit_util.GERRIT_PROTOCOL = cls.gerrit_util_protocol_orig
if TEARDOWN:
cls._stop_gerrit(cls.gerrit_instance)
shutil.rmtree(cls.gerrit_instance.gerrit_dir)
class RepoTestCase(GerritTestCase):
"""Test class which runs in a repo checkout."""
REPO_URL = 'https://chromium.googlesource.com/external/repo'
MANIFEST_PROJECT = 'remotepath/manifest'
MANIFEST_TEMPLATE = """<?xml version="1.0" encoding="UTF-8"?>
<manifest>
<remote name="remote1"
fetch="%(gerrit_url)s"
review="%(gerrit_host)s" />
<remote name="remote2"
fetch="%(gerrit_url)s"
review="%(gerrit_host)s" />
<default revision="refs/heads/master" remote="remote1" sync-j="1" />
<project remote="remote1" path="localpath/testproj1" name="remotepath/testproj1" />
<project remote="remote1" path="localpath/testproj2" name="remotepath/testproj2" />
<project remote="remote2" path="localpath/testproj3" name="remotepath/testproj3" />
<project remote="remote2" path="localpath/testproj4" name="remotepath/testproj4" />
</manifest>
"""
@classmethod
def setUpClass(cls):
GerritTestCase.setUpClass()
gi = cls.gerrit_instance
# Create local mirror of repo tool repository.
repo_mirror_path = os.path.join(gi.git_dir, 'repo.git')
cls.check_call(
['git', 'clone', '--mirror', cls.REPO_URL, repo_mirror_path])
# Check out the top-level repo script; it will be used for invocation.
repo_clone_path = os.path.join(gi.gerrit_dir, 'tmp', 'repo')
cls.check_call(['git', 'clone', '-n', repo_mirror_path, repo_clone_path])
cls.check_call(
['git', 'checkout', 'origin/stable', 'repo'], cwd=repo_clone_path)
shutil.rmtree(os.path.join(repo_clone_path, '.git'))
cls.repo_exe = os.path.join(repo_clone_path, 'repo')
# Create manifest repository.
cls.createProject(cls.MANIFEST_PROJECT)
clone_path = os.path.join(gi.gerrit_dir, 'tmp', 'manifest')
cls._CloneProject(cls.MANIFEST_PROJECT, clone_path)
manifest_path = os.path.join(clone_path, 'default.xml')
with open(manifest_path, 'w') as fh:
fh.write(cls.MANIFEST_TEMPLATE % gi.__dict__)
cls.check_call(['git', 'add', 'default.xml'], cwd=clone_path)
cls.check_call(['git', 'commit', '-m', 'Test manifest.'], cwd=clone_path)
cls._PushBranch(clone_path)
# Create project repositories.
for i in xrange(1, 5):
proj = 'testproj%d' % i
cls.createProject('remotepath/%s' % proj)
clone_path = os.path.join(gi.gerrit_dir, 'tmp', proj)
cls._CloneProject('remotepath/%s' % proj, clone_path)
cls._CreateCommit(clone_path)
cls._PushBranch(clone_path, 'master')
def setUp(self):
super(RepoTestCase, self).setUp()
manifest_url = '/'.join((self.gerrit_instance.gerrit_url,
self.MANIFEST_PROJECT))
repo_url = '/'.join((self.gerrit_instance.gerrit_url, 'repo'))
self.check_call(
[self.repo_exe, 'init', '-u', manifest_url, '--repo-url',
repo_url, '--no-repo-verify'], cwd=self.tempdir)
self.check_call([self.repo_exe, 'sync'], cwd=self.tempdir)
for i in xrange(1, 5):
clone_path = os.path.join(self.tempdir, 'localpath', 'testproj%d' % i)
self._post_clone_bookkeeping(clone_path)
# Tell 'repo upload' to upload this project without prompting.
config_path = os.path.join(clone_path, '.git', 'config')
self.check_call(
['git', 'config', '--file', config_path, 'review.%s.upload' %
self.gerrit_instance.gerrit_host, 'true'])
@classmethod
def runRepo(cls, *args, **kwargs):
# Unfortunately, munging $HOME appears to be the only way to control the
# netrc file used by repo.
munged_home = os.path.join(cls.gerrit_instance.gerrit_dir, 'tmp')
if 'env' not in kwargs:
env = kwargs['env'] = os.environ.copy()
env['HOME'] = munged_home
else:
env.setdefault('HOME', munged_home)
args[0].insert(0, cls.repo_exe)
cls.check_call(*args, **kwargs)
def uploadChange(self, clone_path, branch='master', remote='origin'):
review_host = self.check_output(
['git', 'config', 'remote.%s.review' % remote],
cwd=clone_path).strip()
assert(review_host)
projectname = self.check_output(
['git', 'config', 'remote.%s.projectname' % remote],
cwd=clone_path).strip()
assert(projectname)
GerritTestCase._UploadChange(
clone_path, branch=branch, remote='%s://%s/%s' % (
gerrit_util.GERRIT_PROTOCOL, review_host, projectname))

@ -1,351 +0,0 @@
# coding: utf-8
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Samples patches to test patch.py."""
class RAW(object):
PATCH = (
'Index: chrome/file.cc\n'
'===================================================================\n'
'--- chrome/file.cc\t(revision 74690)\n'
'+++ chrome/file.cc\t(working copy)\n'
'@@ -3,6 +3,7 @@ bb\n'
' ccc\n'
' dd\n'
' e\n'
'+FOO!\n'
' ff\n'
' ggg\n'
' hh\n')
NEW = (
'--- /dev/null\n'
'+++ foo\n'
'@@ -0,0 +1 @@\n'
'+bar\n')
NEW_NOT_NULL = (
'--- file_a\n'
'+++ file_a\n'
'@@ -0,0 +1 @@\n'
'+foo\n')
MINIMAL_NEW = (
'--- /dev/null\t2\n'
'+++ chrome/file.cc\tfoo\n')
MINIMAL = (
'--- file_a\n'
'+++ file_a\n')
MINIMAL_RENAME = (
'--- file_a\n'
'+++ file_b\n')
DELETE = (
'--- tools/clang_check/README.chromium\n'
'+++ /dev/null\n'
'@@ -1,1 +0,0 @@\n'
'-bar\n')
MINIMAL_DELETE = (
'--- chrome/file.cc\tbar\n'
'+++ /dev/null\tfoo\n')
DELETE2 = (
'Index: browser/extensions/extension_sidebar_api.cc\n'
'===================================================================\n'
'--- browser/extensions/extension_sidebar_api.cc\t(revision 116830)\n'
'+++ browser/extensions/extension_sidebar_api.cc\t(working copy)\n'
'@@ -1,19 +0,0 @@\n'
'-// Copyright (c) 2011 The Chromium Authors. All rights reserved.\n'
'-// Use of this source code is governed by a BSD-style license that\n'
'-// found in the LICENSE file.\n'
'-\n'
'-#include "base/command_line.h"\n'
'-#include "chrome/browser/extensions/extension_apitest.h"\n'
'-#include "chrome/common/chrome_switches.h"\n'
'-\n'
'-class SidebarApiTest : public ExtensionApiTest {\n'
'- public:\n'
'- void SetUpCommandLine(CommandLine* command_line) {\n'
'- ExtensionApiTest::SetUpCommandLine(command_line);\n'
'- command_line->AppendSwitch(switches::Bleh);\n'
'- }\n'
'-};\n'
'-\n'
'-IN_PROC_BROWSER_TEST_F(SidebarApiTest, Sidebar) {\n'
'- ASSERT_TRUE(RunExtensionTest("sidebar")) << message_;\n'
'-}\n')
# http://codereview.chromium.org/api/7530007/5001
# http://codereview.chromium.org/download/issue7530007_5001_4011.diff
CRAP_ONLY = (
'Index: scripts/master/factory/skia/__init__.py\n'
'===================================================================\n')
TWO_HUNKS = (
'Index: chrome/app/generated_resources.grd\n'
'===================================================================\n'
'--- chrome/app/generated_resources.grd\t(revision 116830)\n'
'+++ chrome/app/generated_resources.grd\t(working copy)\n'
'@@ -4169,9 +4169,6 @@\n'
' <message name="IDS_EXTENSION_LOAD_OPTIONS_PAGE_FAILED" desc="">\n'
' Could not load options page \'<ph name="OPTIONS_PAGE">$1<ex....\n'
' </message>\n'
'- <message name="IDS_EXTENSION_LOAD_SIDEBAR_PAGE_FAILED" desc="">\n'
'- Could not load sidebar page \'<ph name="SIDEBAR_PAGE">$1<e...\n'
'- </message>\n'
' <if expr="is_win">\n'
' <message name="IDS_EXTENSION_UNPACK_FAILED" desc="On wind...\n'
' Can not unpack extension. To safely unpack an extensio...\n'
'@@ -5593,9 +5590,6 @@\n'
' <message name="IDS_ACCNAME_WEB_CONTENTS" desc="The acces...\n'
' Web Contents\n'
' </message>\n'
'- <message name="IDS_ACCNAME_SIDE_BAR" desc="The acces...\n'
'- Sidebar\n'
'- </message>\n'
' \n'
' <!-- Browser Hung Plugin Detector -->\n'
' <message name="IDS_UNKNOWN_PLUGIN_NAME" ...\n')
# http://codereview.chromium.org/download/issue9091003_9005_8009.diff
DIFFERENT = (
'Index: master/unittests/data/processes-summary.dat\n'
'===================================================================\n'
'--- master/unittests/data/processes-summary.dat\t(revision 116240)\n'
'+++ master/unittests/data/processes-summary.dat\t(working copy)\n'
'@@ -1 +1 @@\n'
'-{"traces": {"1t_proc": ["2.0", "0.0"], "1t_proc_ref": ["1.0", ...\n'
'+{"traces": {"1t_proc": ["2.0", "0.0"], "1t_proc_ref": ["1.0", ...\n')
RENAME_UTF8 = (
u'--- file_à\n'
u'+++ filé_b\n'
u'@@ -3,6 +3,7 @@ bb\n'
u' ccc\n'
u' ddé\n'
u' e\n'
u'+FÔÒ!\n'
u' ff\n'
u' ggg\n'
u' hh\n').encode('utf-8')
class GIT(object):
"""Sample patches generated by git diff."""
PATCH = (
'diff --git a/chrome/file.cc b/chrome/file.cc\n'
'index 0e4de76..8320059 100644\n'
'--- a/chrome/file.cc\n'
'+++ b/chrome/file.cc\n'
'@@ -3,6 +3,7 @@ bb\n'
' ccc\n'
' dd\n'
' e\n'
'+FOO!\n'
' ff\n'
' ggg\n'
' hh\n')
# http://codereview.chromium.org/download/issue10868039_12001_10003.diff
PATCH_SHORT_HUNK_HEADER = (
'Index: chrome/browser/api/OWNERS\n'
'diff --git a/chrome/browser/api/OWNERS b/chrome/browser/api/OWNERS\n'
'--- a/chrome/browser/api/OWNERS\n'
'+++ b/chrome/browser/api/OWNERS\n'
'@@ -1 +1,2 @@\n'
'+erikwright@chromium.org\n'
' joi@chromium.org\n')
# http://codereview.chromium.org/download/issue6368055_22_29.diff
DELETE = (
'Index: tools/clang_check/README.chromium\n'
'diff --git a/tools/clang_check/README.chromium '
'b/tools/clang_check/README.chromium\n'
'deleted file mode 100644\n'
'index fcaa7e0e94bb604a026c4f478fecb1c5796f5413..'
'0000000000000000000000000000000000000000\n'
'--- a/tools/clang_check/README.chromium\n'
'+++ /dev/null\n'
'@@ -1,9 +0,0 @@\n'
'-These are terrible, terrible hacks.\n'
'-\n'
'-They are meant \n'
'-AND doing the normal \n'
'-run during normal \n'
'-build system to do a syntax check.\n'
'-\n'
'-Also see\n'
'\n')
# http://codereview.chromium.org/download/issue8508015_6001_7001.diff
DELETE_EMPTY = (
'Index: tests/__init__.py\n'
'diff --git a/tests/__init__.py b/tests/__init__.py\n'
'deleted file mode 100644\n'
'index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..'
'0000000000000000000000000000000000000000\n')
# http://codereview.chromium.org/download/issue6250123_3013_6010.diff
RENAME_PARTIAL = (
'Index: chromeos/views/webui_menu_widget.h\n'
'diff --git a/chromeos/views/DOMui_menu_widget.h '
'b/chromeos/views/webui_menu_widget.h\n'
'similarity index 79%\n'
'rename from chromeos/views/DOMui_menu_widget.h\n'
'rename to chromeos/views/webui_menu_widget.h\n'
'index 095d4c474fd9718f5aebfa41a1ccb2d951356d41..'
'157925075434b590e8acaaf605a64f24978ba08b 100644\n'
'--- a/chromeos/views/DOMui_menu_widget.h\n'
'+++ b/chromeos/views/webui_menu_widget.h\n'
'@@ -1,9 +1,9 @@\n'
'-// Copyright (c) 2010\n'
'+// Copyright (c) 2011\n'
' // Use of this source code\n'
' // found in the LICENSE file.\n'
' \n'
'-#ifndef DOM\n'
'-#define DOM\n'
'+#ifndef WEB\n'
'+#define WEB\n'
' #pragma once\n'
' \n'
' #include <string>\n')
# http://codereview.chromium.org/download/issue6287022_3001_4010.diff
RENAME = (
'Index: tools/run_local_server.sh\n'
'diff --git a/tools/run_local_server.PY b/tools/run_local_server.sh\n'
'similarity index 100%\n'
'rename from tools/run_local_server.PY\n'
'rename to tools/run_local_server.sh\n')
COPY = (
'diff --git a/PRESUBMIT.py b/pp\n'
'similarity index 100%\n'
'copy from PRESUBMIT.py\n'
'copy to pp\n')
COPY_PARTIAL = (
'diff --git a/wtf b/wtf2\n'
'similarity index 98%\n'
'copy from wtf\n'
'copy to wtf2\n'
'index 79fbaf3..3560689 100755\n'
'--- a/wtf\n'
'+++ b/wtf2\n'
'@@ -1,4 +1,4 @@\n'
'-#!/usr/bin/env python\n'
'+#!/usr/bin/env python1.3\n'
' # Copyright (c) 2010 The Chromium Authors. All rights reserved.\n'
' # blah blah blah as\n'
' # found in the LICENSE file.\n')
NEW = (
'diff --git a/foo b/foo\n'
'new file mode 100644\n'
'index 0000000..5716ca5\n'
'--- /dev/null\n'
'+++ b/foo\n'
'@@ -0,0 +1 @@\n'
'+bar\n')
NEW_EXE = (
'diff --git a/natsort_test.py b/natsort_test.py\n'
'new file mode 100755\n'
'--- /dev/null\n'
'+++ b/natsort_test.py\n'
'@@ -0,0 +1,1 @@\n'
'+#!/usr/bin/env python\n')
# To make sure the subdirectory was created as needed.
NEW_SUBDIR = (
'diff --git a/new_dir/subdir/new_file b/new_dir/subdir/new_file\n'
'new file mode 100644\n'
'--- /dev/null\n'
'+++ b/new_dir/subdir/new_file\n'
'@@ -0,0 +1,2 @@\n'
'+A new file\n'
'+should exist.\n')
NEW_MODE = (
'diff --git a/natsort_test.py b/natsort_test.py\n'
'new file mode 100644\n'
'--- /dev/null\n'
'+++ b/natsort_test.py\n'
'@@ -0,0 +1,1 @@\n'
'+#!/usr/bin/env python\n')
MODE_EXE = (
'diff --git a/git_cl/git-cl b/git_cl/git-cl\n'
'old mode 100644\n'
'new mode 100755\n')
MODE_EXE_JUNK = (
'Index: Junk\n'
'diff --git a/git_cl/git-cl b/git_cl/git-cl\n'
'old mode 100644\n'
'new mode 100755\n')
NEW_NOT_EXECUTABLE = (
'diff --git a/build/android/ant/create.js b/build/android/ant/create.js\n'
'new file mode 100644\n'
'index 0000000000000000000..542a89e978feada38dd\n'
'--- /dev/null\n'
'+++ b/build/android/ant/create.js\n'
'@@ -0,0 +1,1 @@\n'
'+// Copyright (c) 2012 The Chromium Authors. All rights reserved.\n'
)
FOUR_HUNKS = (
'Index: presubmit_support.py\n'
'diff --git a/presubmit_support.py b/presubmit_support.py\n'
'index 52416d3f..d56512f2 100755\n'
'--- a/presubmit_support.py\n'
'+++ b/presubmit_support.py\n'
'@@ -558,6 +558,7 @@ class SvnAffectedFile(AffectedFile):\n'
' AffectedFile.__init__(self, *args, **kwargs)\n'
' self._server_path = None\n'
' self._is_text_file = None\n'
'+ self._diff = None\n'
' \n'
' def ServerPath(self):\n'
' if self._server_path is None:\n'
'@@ -598,8 +599,10 @@ class SvnAffectedFile(AffectedFile):\n'
' return self._is_text_file\n'
' \n'
' def GenerateScmDiff(self):\n'
'- return scm.SVN.GenerateDiff(\n'
'- [self.LocalPath()], self._local_root, False, None)\n'
'+ if self._diff is None:\n'
'+ self._diff = scm.SVN.GenerateDiff(\n'
'+ [self.LocalPath()], self._local_root, False, None)\n'
'+ return self._diff\n'
' \n'
' \n'
' class GitAffectedFile(AffectedFile):\n'
'@@ -611,6 +614,7 @@ class GitAffectedFile(AffectedFile):\n'
' AffectedFile.__init__(self, *args, **kwargs)\n'
' self._server_path = None\n'
' self._is_text_file = None\n'
'+ self._diff = None\n'
' \n'
' def ServerPath(self):\n'
' if self._server_path is None:\n'
'@@ -645,7 +649,10 @@ class GitAffectedFile(AffectedFile):\n'
' return self._is_text_file\n'
' \n'
' def GenerateScmDiff(self):\n'
'- return scm.GIT.GenerateDiff(self._local_root, files=[self.Lo...\n'
'+ if self._diff is None:\n'
'+ self._diff = scm.GIT.GenerateDiff(\n'
'+ self._local_root, files=[self.LocalPath(),])\n'
'+ return self._diff\n'
' \n'
' \n'
' class Change(object):\n')

@ -1,337 +0,0 @@
#!/usr/bin/env python
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for checkout.py."""
from __future__ import print_function
import logging
import os
import shutil
import sys
import unittest
from xml.etree import ElementTree
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(ROOT_DIR))
from testing_support import fake_repos
from testing_support.patches_data import GIT, RAW
import checkout
import patch
import subprocess2
# pass -v to enable it.
DEBUGGING = False
# A patch that will fail to apply.
BAD_PATCH = ''.join(
[l for l in GIT.PATCH.splitlines(True) if l.strip() != 'e'])
class FakeRepos(fake_repos.FakeReposBase):
TEST_GIT_REPO = 'repo_1'
def populateGit(self):
"""Creates a few revisions of changes files."""
self._commit_git(self.TEST_GIT_REPO, self._git_tree())
# Fix for the remote rejected error. For more details see:
# http://stackoverflow.com/questions/2816369/git-push-error-remote
subprocess2.check_output(
['git', '--git-dir',
os.path.join(self.git_root, self.TEST_GIT_REPO, '.git'),
'config', '--bool', 'core.bare', 'true'])
assert os.path.isdir(
os.path.join(self.git_root, self.TEST_GIT_REPO, '.git'))
@staticmethod
def _git_tree():
fs = {}
fs['origin'] = 'git@1'
fs['extra'] = 'dummy\n' # new
fs['codereview.settings'] = (
'# Test data\n'
'bar: pouet\n')
fs['chrome/file.cc'] = (
'a\n'
'bb\n'
'ccc\n'
'dd\n'
'e\n'
'ff\n'
'ggg\n'
'hh\n'
'i\n'
'jj\n'
'kkk\n'
'll\n'
'm\n'
'nn\n'
'ooo\n'
'pp\n'
'q\n')
fs['chromeos/views/DOMui_menu_widget.h'] = (
'// Copyright (c) 2010\n'
'// Use of this source code\n'
'// found in the LICENSE file.\n'
'\n'
'#ifndef DOM\n'
'#define DOM\n'
'#pragma once\n'
'\n'
'#include <string>\n'
'#endif\n')
return fs
# pylint: disable=no-self-use
class BaseTest(fake_repos.FakeReposTestBase):
name = 'foo'
FAKE_REPOS_CLASS = FakeRepos
is_read_only = False
def setUp(self):
super(BaseTest, self).setUp()
self._old_call = subprocess2.call
def redirect_call(args, **kwargs):
if not DEBUGGING:
kwargs.setdefault('stdout', subprocess2.PIPE)
kwargs.setdefault('stderr', subprocess2.STDOUT)
return self._old_call(args, **kwargs)
subprocess2.call = redirect_call
self.usr, self.pwd = self.FAKE_REPOS.USERS[0]
self.previous_log = None
def tearDown(self):
subprocess2.call = self._old_call
super(BaseTest, self).tearDown()
def get_patches(self):
return patch.PatchSet([
patch.FilePatchDiff('new_dir/subdir/new_file', GIT.NEW_SUBDIR, []),
patch.FilePatchDiff('chrome/file.cc', GIT.PATCH, []),
# TODO(maruel): Test with is_new == False.
patch.FilePatchBinary('bin_file', '\x00', [], is_new=True),
patch.FilePatchDelete('extra', False),
])
def get_trunk(self, modified):
raise NotImplementedError()
def _check_base(self, co, root, expected):
raise NotImplementedError()
def _check_exception(self, co, err_msg):
co.prepare(None)
try:
co.apply_patch([patch.FilePatchDiff('chrome/file.cc', BAD_PATCH, [])])
self.fail()
except checkout.PatchApplicationFailed as e:
self.assertEquals(e.filename, 'chrome/file.cc')
self.assertEquals(e.status, err_msg)
def _log(self):
raise NotImplementedError()
def _test_process(self, co_lambda):
"""Makes sure the process lambda is called correctly."""
post_processors = [lambda *args: results.append(args)]
co = co_lambda(post_processors)
self.assertEquals(post_processors, co.post_processors)
co.prepare(None)
ps = self.get_patches()
results = []
co.apply_patch(ps)
expected_co = getattr(co, 'checkout', co)
# Because of ReadOnlyCheckout.
expected = [(expected_co, p) for p in ps.patches]
self.assertEquals(len(expected), len(results))
self.assertEquals(expected, results)
def _check_move(self, co):
"""Makes sure file moves are handled correctly."""
co.prepare(None)
patchset = patch.PatchSet([
patch.FilePatchDelete('chromeos/views/DOMui_menu_widget.h', False),
patch.FilePatchDiff(
'chromeos/views/webui_menu_widget.h', GIT.RENAME_PARTIAL, []),
])
co.apply_patch(patchset)
# Make sure chromeos/views/DOMui_menu_widget.h is deleted and
# chromeos/views/webui_menu_widget.h is correctly created.
root = os.path.join(self.root_dir, self.name)
tree = self.get_trunk(False)
del tree['chromeos/views/DOMui_menu_widget.h']
tree['chromeos/views/webui_menu_widget.h'] = (
'// Copyright (c) 2011\n'
'// Use of this source code\n'
'// found in the LICENSE file.\n'
'\n'
'#ifndef WEB\n'
'#define WEB\n'
'#pragma once\n'
'\n'
'#include <string>\n'
'#endif\n')
#print patchset[0].get()
#print fake_repos.read_tree(root)
self.assertTree(tree, root)
class GitBaseTest(BaseTest):
def setUp(self):
super(GitBaseTest, self).setUp()
self.enabled = self.FAKE_REPOS.set_up_git()
self.assertTrue(self.enabled)
self.previous_log = self._log()
# pylint: disable=arguments-differ
def _log(self, log_from_local_repo=False):
if log_from_local_repo:
repo_root = os.path.join(self.root_dir, self.name)
else:
repo_root = os.path.join(self.FAKE_REPOS.git_root,
self.FAKE_REPOS.TEST_GIT_REPO)
out = subprocess2.check_output(
['git',
'--git-dir',
os.path.join(repo_root, '.git'),
'log', '--pretty=format:"%H%x09%ae%x09%ad%x09%s"',
'--max-count=1']).strip('"')
if out and len(out.split()) != 0:
revision = out.split()[0]
else:
return {'revision': 0}
return {
'revision': revision,
'author': out.split()[1],
'msg': out.split()[-1],
}
def _check_base(self, co, root, expected):
read_only = isinstance(co, checkout.ReadOnlyCheckout)
self.assertEquals(read_only, self.is_read_only)
if not read_only:
self.FAKE_REPOS.git_dirty = True
self.assertEquals(root, co.project_path)
git_rev = co.prepare(None)
self.assertEquals(unicode, type(git_rev))
self.assertEquals(self.previous_log['revision'], git_rev)
self.assertEquals('pouet', co.get_settings('bar'))
self.assertTree(self.get_trunk(False), root)
patches = self.get_patches()
co.apply_patch(patches)
self.assertEquals(
['bin_file', 'chrome/file.cc', 'new_dir/subdir/new_file', 'extra'],
patches.filenames)
# Hackish to verify _branches() internal function.
# pylint: disable=protected-access
self.assertEquals(
(['master', 'working_branch'], 'working_branch'),
co._branches())
# Verify that the patch is applied even for read only checkout.
self.assertTree(self.get_trunk(True), root)
fake_author = self.FAKE_REPOS.USERS[1][0]
revision = co.commit(u'msg', fake_author)
# Nothing changed.
self.assertTree(self.get_trunk(True), root)
if read_only:
self.assertEquals('FAKE', revision)
self.assertEquals(self.previous_log['revision'], co.prepare(None))
# Changes should be reverted now.
self.assertTree(self.get_trunk(False), root)
expected = self.previous_log
else:
self.assertEquals(self._log()['revision'], revision)
self.assertEquals(self._log()['revision'], co.prepare(None))
self.assertTree(self.get_trunk(True), root)
expected = self._log()
actual = self._log(log_from_local_repo=True)
self.assertEquals(expected, actual)
def get_trunk(self, modified):
tree = {}
for k, v in self.FAKE_REPOS.git_hashes[
self.FAKE_REPOS.TEST_GIT_REPO][1][1].iteritems():
assert k not in tree
tree[k] = v
if modified:
content_lines = tree['chrome/file.cc'].splitlines(True)
tree['chrome/file.cc'] = ''.join(
content_lines[0:5] + ['FOO!\n'] + content_lines[5:])
tree['bin_file'] = '\x00'
del tree['extra']
tree['new_dir/subdir/new_file'] = 'A new file\nshould exist.\n'
return tree
def _test_prepare(self, co):
print(co.prepare(None))
class GitCheckout(GitBaseTest):
def _get_co(self, post_processors):
self.assertNotEqual(False, post_processors)
return checkout.GitCheckout(
root_dir=self.root_dir,
project_name=self.name,
remote_branch='master',
git_url=os.path.join(self.FAKE_REPOS.git_root,
self.FAKE_REPOS.TEST_GIT_REPO),
commit_user=self.usr,
post_processors=post_processors)
def testAll(self):
root = os.path.join(self.root_dir, self.name)
self._check_base(self._get_co(None), root, None)
@unittest.skip('flaky')
def testException(self):
self._check_exception(
self._get_co(None),
'While running git apply --index -3 -p1;\n fatal: corrupt patch at '
'line 12\n')
def testProcess(self):
self._test_process(self._get_co)
def _testPrepare(self):
self._test_prepare(self._get_co(None))
def testMove(self):
co = self._get_co(None)
self._check_move(co)
out = subprocess2.check_output(
['git', 'diff', '--staged', '--name-status', '--no-renames'],
cwd=co.project_path)
out = sorted(out.splitlines())
expected = sorted(
[
'A\tchromeos/views/webui_menu_widget.h',
'D\tchromeos/views/DOMui_menu_widget.h',
])
self.assertEquals(expected, out)
if __name__ == '__main__':
if '-v' in sys.argv:
DEBUGGING = True
logging.basicConfig(
level=logging.DEBUG,
format='%(levelname)5s %(filename)15s(%(lineno)3d): %(message)s')
else:
logging.basicConfig(
level=logging.ERROR,
format='%(levelname)5s %(filename)15s(%(lineno)3d): %(message)s')
unittest.main()

@ -83,19 +83,6 @@ class PresubmitMock(object):
return True
class GitCheckoutMock(object):
def __init__(self, *args, **kwargs):
pass
@staticmethod
def reset():
GitCheckoutMock.conflict = False
def apply_patch(self, p):
if GitCheckoutMock.conflict:
raise Exception('failed')
class WatchlistsMock(object):
def __init__(self, _):
pass
@ -673,8 +660,6 @@ class TestGitCl(TestCase):
self.mock(git_cl, 'write_json', lambda path, contents:
self._mocked_call('write_json', path, contents))
self.mock(git_cl.presubmit_support, 'DoPresubmitChecks', PresubmitMock)
self.mock(git_cl.checkout, 'GitCheckout', GitCheckoutMock)
GitCheckoutMock.reset()
self.mock(git_cl.watchlists, 'Watchlists', WatchlistsMock)
self.mock(git_cl.auth, 'get_authenticator_for_host', AuthenticatorMock)
self.mock(git_cl.gerrit_util, 'GetChangeDetail',

@ -1,556 +0,0 @@
#!/usr/bin/env python
# coding: utf-8
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for patch.py."""
import logging
import os
import posixpath
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from testing_support.patches_data import GIT, RAW
import patch
class PatchTest(unittest.TestCase):
def _check_patch(self,
p,
filename,
diff,
source_filename=None,
is_binary=False,
is_delete=False,
is_git_diff=False,
is_new=False,
patchlevel=0,
svn_properties=None,
nb_hunks=None):
self.assertEquals(p.filename, filename)
self.assertEquals(p.source_filename, source_filename)
self.assertEquals(p.is_binary, is_binary)
self.assertEquals(p.is_delete, is_delete)
if hasattr(p, 'is_git_diff'):
self.assertEquals(p.is_git_diff, is_git_diff)
self.assertEquals(p.is_new, is_new)
if hasattr(p, 'patchlevel'):
self.assertEquals(p.patchlevel, patchlevel)
if diff:
if is_binary:
self.assertEquals(p.get(), diff)
else:
self.assertEquals(p.get(True), diff)
if hasattr(p, 'hunks'):
self.assertEquals(len(p.hunks), nb_hunks)
else:
self.assertEquals(None, nb_hunks)
if hasattr(p, 'svn_properties'):
self.assertEquals(p.svn_properties, svn_properties or [])
def testFilePatchDelete(self):
p = patch.FilePatchDelete('foo', False)
self._check_patch(p, 'foo', None, is_delete=True)
def testFilePatchDeleteBin(self):
p = patch.FilePatchDelete('foo', True)
self._check_patch(p, 'foo', None, is_delete=True, is_binary=True)
def testFilePatchBinary(self):
p = patch.FilePatchBinary('foo', 'data', [], is_new=False)
self._check_patch(p, 'foo', 'data', is_binary=True)
def testFilePatchBinaryNew(self):
p = patch.FilePatchBinary('foo', 'data', [], is_new=True)
self._check_patch(p, 'foo', 'data', is_binary=True, is_new=True)
def testFilePatchDiff(self):
p = patch.FilePatchDiff('chrome/file.cc', RAW.PATCH, [])
self._check_patch(p, 'chrome/file.cc', RAW.PATCH, nb_hunks=1)
def testDifferent(self):
name = 'master/unittests/data/processes-summary.dat'
p = patch.FilePatchDiff(name, RAW.DIFFERENT, [])
self._check_patch(p, name, RAW.DIFFERENT, nb_hunks=1)
def testFilePatchDiffHeaderMode(self):
p = patch.FilePatchDiff('git_cl/git-cl', GIT.MODE_EXE, [])
self._check_patch(
p, 'git_cl/git-cl', GIT.MODE_EXE, is_git_diff=True, patchlevel=1,
svn_properties=[('svn:executable', '.')], nb_hunks=0)
def testFilePatchDiffHeaderModeIndex(self):
p = patch.FilePatchDiff('git_cl/git-cl', GIT.MODE_EXE_JUNK, [])
self._check_patch(
p, 'git_cl/git-cl', GIT.MODE_EXE_JUNK, is_git_diff=True, patchlevel=1,
svn_properties=[('svn:executable', '.')], nb_hunks=0)
def testFilePatchDiffHeaderNotExecutable(self):
p = patch.FilePatchDiff(
'build/android/ant/create.js', GIT.NEW_NOT_EXECUTABLE, [])
self._check_patch(
p, 'build/android/ant/create.js', GIT.NEW_NOT_EXECUTABLE,
is_git_diff=True, patchlevel=1, is_new=True,
nb_hunks=1)
def testFilePatchDiffSvnNew(self):
# The code path is different for git and svn.
p = patch.FilePatchDiff('foo', RAW.NEW, [])
self._check_patch(p, 'foo', RAW.NEW, is_new=True, nb_hunks=1)
def testFilePatchDiffGitNew(self):
# The code path is different for git and svn.
p = patch.FilePatchDiff('foo', GIT.NEW, [])
self._check_patch(
p, 'foo', GIT.NEW, is_new=True, is_git_diff=True, patchlevel=1,
nb_hunks=1)
def testSvn(self):
# Should not throw.
p = patch.FilePatchDiff('chrome/file.cc', RAW.PATCH, [])
lines = RAW.PATCH.splitlines(True)
header = ''.join(lines[:4])
hunks = ''.join(lines[4:])
self.assertEquals(header, p.diff_header)
self.assertEquals(hunks, p.diff_hunks)
self.assertEquals(RAW.PATCH, p.get(True))
self.assertEquals(RAW.PATCH, p.get(False))
def testSvnNew(self):
p = patch.FilePatchDiff('chrome/file.cc', RAW.MINIMAL_NEW, [])
self.assertEquals(RAW.MINIMAL_NEW, p.diff_header)
self.assertEquals('', p.diff_hunks)
self.assertEquals(RAW.MINIMAL_NEW, p.get(True))
self.assertEquals(RAW.MINIMAL_NEW, p.get(False))
def testSvnDelete(self):
p = patch.FilePatchDiff('chrome/file.cc', RAW.MINIMAL_DELETE, [])
self.assertEquals(RAW.MINIMAL_DELETE, p.diff_header)
self.assertEquals('', p.diff_hunks)
self.assertEquals(RAW.MINIMAL_DELETE, p.get(True))
self.assertEquals(RAW.MINIMAL_DELETE, p.get(False))
def testSvnRename(self):
p = patch.FilePatchDiff('file_b', RAW.MINIMAL_RENAME, [])
self.assertEquals(RAW.MINIMAL_RENAME, p.diff_header)
self.assertEquals('', p.diff_hunks)
self.assertEquals(RAW.MINIMAL_RENAME, p.get(True))
self.assertEquals('--- file_b\n+++ file_b\n', p.get(False))
def testRelPath(self):
patches = patch.PatchSet([
patch.FilePatchDiff('pp', GIT.COPY, []),
patch.FilePatchDiff(
'chromeos\\views/webui_menu_widget.h', GIT.RENAME_PARTIAL, []),
patch.FilePatchDiff('tools/run_local_server.sh', GIT.RENAME, []),
patch.FilePatchBinary('bar', 'data', [], is_new=False),
patch.FilePatchDiff('chrome/file.cc', RAW.PATCH, []),
patch.FilePatchDiff('foo', GIT.NEW, []),
patch.FilePatchDelete('other/place/foo', True),
patch.FilePatchDiff(
'tools\\clang_check/README.chromium', GIT.DELETE, []),
])
expected = [
'pp',
'chromeos/views/webui_menu_widget.h',
'tools/run_local_server.sh',
'bar',
'chrome/file.cc',
'foo',
'other/place/foo',
'tools/clang_check/README.chromium',
]
self.assertEquals(expected, patches.filenames)
# Test patch #4.
orig_name = patches.patches[4].filename
orig_source_name = patches.patches[4].source_filename or orig_name
patches.set_relpath(os.path.join('a', 'bb'))
# Expect posixpath all the time.
expected = [posixpath.join('a', 'bb', x) for x in expected]
self.assertEquals(expected, patches.filenames)
# Make sure each header is updated accordingly.
header = []
new_name = posixpath.join('a', 'bb', orig_name)
new_source_name = posixpath.join('a', 'bb', orig_source_name)
for line in RAW.PATCH.splitlines(True):
if line.startswith('@@'):
break
if line[:3] == '---':
line = line.replace(orig_source_name, new_source_name)
if line[:3] == '+++':
line = line.replace(orig_name, new_name)
header.append(line)
header = ''.join(header)
self.assertEquals(header, patches.patches[4].diff_header)
def testRelPathEmpty(self):
patches = patch.PatchSet([
patch.FilePatchDiff('chrome\\file.cc', RAW.PATCH, []),
patch.FilePatchDelete('other\\place\\foo', True),
])
patches.set_relpath('')
self.assertEquals(
['chrome/file.cc', 'other/place/foo'],
[f.filename for f in patches])
self.assertEquals([None, None], [f.source_filename for f in patches])
def testBackSlash(self):
mangled_patch = RAW.PATCH.replace('chrome/', 'chrome\\')
patches = patch.PatchSet([
patch.FilePatchDiff('chrome\\file.cc', mangled_patch, []),
patch.FilePatchDelete('other\\place\\foo', True),
])
expected = ['chrome/file.cc', 'other/place/foo']
self.assertEquals(expected, patches.filenames)
self.assertEquals(RAW.PATCH, patches.patches[0].get(True))
self.assertEquals(RAW.PATCH, patches.patches[0].get(False))
def testTwoHunks(self):
name = 'chrome/app/generated_resources.grd'
p = patch.FilePatchDiff(name, RAW.TWO_HUNKS, [])
self._check_patch(p, name, RAW.TWO_HUNKS, nb_hunks=2)
def testGitThreeHunks(self):
p = patch.FilePatchDiff('presubmit_support.py', GIT.FOUR_HUNKS, [])
self._check_patch(
p, 'presubmit_support.py', GIT.FOUR_HUNKS, is_git_diff=True,
patchlevel=1,
nb_hunks=4)
def testDelete(self):
p = patch.FilePatchDiff('tools/clang_check/README.chromium', RAW.DELETE, [])
self._check_patch(
p, 'tools/clang_check/README.chromium', RAW.DELETE, is_delete=True,
nb_hunks=1)
def testDelete2(self):
name = 'browser/extensions/extension_sidebar_api.cc'
p = patch.FilePatchDiff(name, RAW.DELETE2, [])
self._check_patch(p, name, RAW.DELETE2, is_delete=True, nb_hunks=1)
def testGitDelete(self):
p = patch.FilePatchDiff('tools/clang_check/README.chromium', GIT.DELETE, [])
self._check_patch(
p, 'tools/clang_check/README.chromium', GIT.DELETE, is_delete=True,
is_git_diff=True, patchlevel=1, nb_hunks=1)
def testGitRename(self):
p = patch.FilePatchDiff('tools/run_local_server.sh', GIT.RENAME, [])
self._check_patch(
p,
'tools/run_local_server.sh',
GIT.RENAME,
is_git_diff=True,
patchlevel=1,
source_filename='tools/run_local_server.PY',
is_new=True,
nb_hunks=0)
def testGitRenamePartial(self):
p = patch.FilePatchDiff(
'chromeos/views/webui_menu_widget.h', GIT.RENAME_PARTIAL, [])
self._check_patch(
p,
'chromeos/views/webui_menu_widget.h',
GIT.RENAME_PARTIAL,
source_filename='chromeos/views/DOMui_menu_widget.h',
is_git_diff=True,
patchlevel=1,
is_new=True,
nb_hunks=1)
def testGitCopy(self):
p = patch.FilePatchDiff('pp', GIT.COPY, [])
self._check_patch(
p, 'pp', GIT.COPY, is_git_diff=True, patchlevel=1,
source_filename='PRESUBMIT.py', is_new=True, nb_hunks=0)
def testOnlyHeader(self):
p = patch.FilePatchDiff('file_a', RAW.MINIMAL, [])
self._check_patch(p, 'file_a', RAW.MINIMAL, nb_hunks=0)
def testSmallest(self):
p = patch.FilePatchDiff('file_a', RAW.NEW_NOT_NULL, [])
self._check_patch(p, 'file_a', RAW.NEW_NOT_NULL, is_new=True, nb_hunks=1)
def testRenameOnlyHeader(self):
p = patch.FilePatchDiff('file_b', RAW.MINIMAL_RENAME, [])
self._check_patch(
p, 'file_b', RAW.MINIMAL_RENAME, source_filename='file_a', is_new=True,
nb_hunks=0)
def testUnicodeFilenameGet(self):
p = patch.FilePatchDiff(u'filé_b', RAW.RENAME_UTF8, [])
self._check_patch(
p, u'filé_b', RAW.RENAME_UTF8, source_filename=u'file_à', is_new=True,
nb_hunks=1)
self.assertTrue(isinstance(p.get(False), str))
p.set_relpath('foo')
self.assertTrue(isinstance(p.get(False), str))
self.assertEquals(u'foo/file_à'.encode('utf-8'), p.source_filename_utf8)
self.assertEquals(u'foo/file_à', p.source_filename)
self.assertEquals(u'foo/filé_b'.encode('utf-8'), p.filename_utf8)
self.assertEquals(u'foo/filé_b', p.filename)
def testGitCopyPartial(self):
p = patch.FilePatchDiff('wtf2', GIT.COPY_PARTIAL, [])
self._check_patch(
p, 'wtf2', GIT.COPY_PARTIAL, source_filename='wtf', is_git_diff=True,
patchlevel=1, is_new=True, nb_hunks=1)
def testGitCopyPartialAsSvn(self):
p = patch.FilePatchDiff('wtf2', GIT.COPY_PARTIAL, [])
# TODO(maruel): Improve processing.
diff = (
'diff --git a/wtf2 b/wtf22\n'
'similarity index 98%\n'
'copy from wtf2\n'
'copy to wtf22\n'
'index 79fbaf3..3560689 100755\n'
'--- a/wtf2\n'
'+++ b/wtf22\n'
'@@ -1,4 +1,4 @@\n'
'-#!/usr/bin/env python\n'
'+#!/usr/bin/env python1.3\n'
' # Copyright (c) 2010 The Chromium Authors. All rights reserved.\n'
' # blah blah blah as\n'
' # found in the LICENSE file.\n')
self.assertEquals(diff, p.get(False))
def testGitNewExe(self):
p = patch.FilePatchDiff('natsort_test.py', GIT.NEW_EXE, [])
self._check_patch(
p,
'natsort_test.py',
GIT.NEW_EXE,
is_new=True,
is_git_diff=True,
patchlevel=1,
svn_properties=[('svn:executable', '.')],
nb_hunks=1)
def testGitNewMode(self):
p = patch.FilePatchDiff('natsort_test.py', GIT.NEW_MODE, [])
self._check_patch(
p, 'natsort_test.py', GIT.NEW_MODE, is_new=True, is_git_diff=True,
patchlevel=1, nb_hunks=1)
def testPatchsetOrder(self):
# Deletes must be last.
# File renames/move/copy must be first.
patches = [
patch.FilePatchDiff('chrome/file.cc', RAW.PATCH, []),
patch.FilePatchDiff(
'tools\\clang_check/README.chromium', GIT.DELETE, []),
patch.FilePatchDiff('tools/run_local_server.sh', GIT.RENAME, []),
patch.FilePatchDiff(
'chromeos\\views/webui_menu_widget.h', GIT.RENAME_PARTIAL, []),
patch.FilePatchDiff('pp', GIT.COPY, []),
patch.FilePatchDiff('foo', GIT.NEW, []),
patch.FilePatchDelete('other/place/foo', True),
patch.FilePatchBinary('bar', 'data', [], is_new=False),
]
expected = [
'pp',
'chromeos/views/webui_menu_widget.h',
'tools/run_local_server.sh',
'bar',
'chrome/file.cc',
'foo',
'other/place/foo',
'tools/clang_check/README.chromium',
]
patchset = patch.PatchSet(patches)
self.assertEquals(expected, patchset.filenames)
def testGitPatch(self):
p = patch.FilePatchDiff('chrome/file.cc', GIT.PATCH, [])
self._check_patch(
p, 'chrome/file.cc', GIT.PATCH, is_git_diff=True, patchlevel=1,
nb_hunks=1)
def testGitPatchShortHunkHeader(self):
p = patch.FilePatchDiff(
'chrome/browser/api/OWNERS', GIT.PATCH_SHORT_HUNK_HEADER, [])
self._check_patch(
p, 'chrome/browser/api/OWNERS', GIT.PATCH_SHORT_HUNK_HEADER,
is_git_diff=True, patchlevel=1, nb_hunks=1)
class PatchTestFail(unittest.TestCase):
# All patches that should throw.
def testFilePatchDelete(self):
self.assertFalse(hasattr(patch.FilePatchDelete('foo', False), 'get'))
def testFilePatchDeleteBin(self):
self.assertFalse(hasattr(patch.FilePatchDelete('foo', True), 'get'))
def testFilePatchDiffBad(self):
try:
patch.FilePatchDiff('foo', 'data', [])
self.fail()
except patch.UnsupportedPatchFormat:
pass
def testFilePatchDiffEmpty(self):
try:
patch.FilePatchDiff('foo', '', [])
self.fail()
except patch.UnsupportedPatchFormat:
pass
def testFilePatchDiffNone(self):
try:
patch.FilePatchDiff('foo', None, [])
self.fail()
except patch.UnsupportedPatchFormat:
pass
def testFilePatchBadDiffName(self):
try:
patch.FilePatchDiff('foo', RAW.PATCH, [])
self.fail()
except patch.UnsupportedPatchFormat as e:
self.assertEquals(
"Can't process patch for file foo.\nUnexpected diff: chrome/file.cc.",
str(e))
def testFilePatchDiffBadHeader(self):
try:
diff = (
'+++ b/foo\n'
'@@ -0,0 +1 @@\n'
'+bar\n')
patch.FilePatchDiff('foo', diff, [])
self.fail()
except patch.UnsupportedPatchFormat:
pass
def testFilePatchDiffBadGitHeader(self):
try:
diff = (
'diff --git a/foo b/foo\n'
'+++ b/foo\n'
'@@ -0,0 +1 @@\n'
'+bar\n')
patch.FilePatchDiff('foo', diff, [])
self.fail()
except patch.UnsupportedPatchFormat:
pass
def testFilePatchDiffBadHeaderReversed(self):
try:
diff = (
'+++ b/foo\n'
'--- b/foo\n'
'@@ -0,0 +1 @@\n'
'+bar\n')
patch.FilePatchDiff('foo', diff, [])
self.fail()
except patch.UnsupportedPatchFormat:
pass
def testFilePatchDiffGitBadHeaderReversed(self):
try:
diff = (
'diff --git a/foo b/foo\n'
'+++ b/foo\n'
'--- b/foo\n'
'@@ -0,0 +1 @@\n'
'+bar\n')
patch.FilePatchDiff('foo', diff, [])
self.fail()
except patch.UnsupportedPatchFormat:
pass
def testFilePatchDiffInvalidGit(self):
try:
patch.FilePatchDiff('svn_utils_test.txt', (
'diff --git a/tests/svn_utils_test_data/svn_utils_test.txt '
'b/tests/svn_utils_test_data/svn_utils_test.txt\n'
'index 0e4de76..8320059 100644\n'
'--- a/svn_utils_test.txt\n'
'+++ b/svn_utils_test.txt\n'
'@@ -3,6 +3,7 @@ bb\n'
'ccc\n'
'dd\n'
'e\n'
'+FOO!\n'
'ff\n'
'ggg\n'
'hh\n'),
[])
self.fail()
except patch.UnsupportedPatchFormat:
pass
try:
patch.FilePatchDiff('svn_utils_test2.txt', (
'diff --git a/svn_utils_test_data/svn_utils_test.txt '
'b/svn_utils_test.txt\n'
'index 0e4de76..8320059 100644\n'
'--- a/svn_utils_test.txt\n'
'+++ b/svn_utils_test.txt\n'
'@@ -3,6 +3,7 @@ bb\n'
'ccc\n'
'dd\n'
'e\n'
'+FOO!\n'
'ff\n'
'ggg\n'
'hh\n'),
[])
self.fail()
except patch.UnsupportedPatchFormat:
pass
def testRelPathBad(self):
patches = patch.PatchSet([
patch.FilePatchDiff('chrome\\file.cc', RAW.PATCH, []),
patch.FilePatchDelete('other\\place\\foo', True),
])
try:
patches.set_relpath('..')
self.fail()
except patch.UnsupportedPatchFormat:
pass
def testInverted(self):
try:
patch.FilePatchDiff(
'file_a', '+++ file_a\n--- file_a\n@@ -0,0 +1 @@\n+foo\n', [])
self.fail()
except patch.UnsupportedPatchFormat:
pass
def testInvertedOnlyHeader(self):
try:
patch.FilePatchDiff('file_a', '+++ file_a\n--- file_a\n', [])
self.fail()
except patch.UnsupportedPatchFormat:
pass
def testBadHunkCommas(self):
try:
patch.FilePatchDiff(
'file_a',
'--- file_a\n'
'+++ file_a\n'
'@@ -0,,0 +1 @@\n'
'+foo\n',
[])
self.fail()
except patch.UnsupportedPatchFormat:
pass
if __name__ == '__main__':
logging.basicConfig(level=
[logging.WARNING, logging.INFO, logging.DEBUG][
min(2, sys.argv.count('-v'))])
unittest.main()
Loading…
Cancel
Save