You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
depot_tools/split_cl.py

725 lines
28 KiB
Python

#!/usr/bin/env python3
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Splits a branch into smaller branches and uploads CLs."""
import collections
import dataclasses
import os
import re
import subprocess2
import sys
import tempfile
from typing import List, Set, Tuple, Dict, Any
import gclient_utils
import git_footers
import scm
import git_common as git
# If a call to `git cl split` will generate more than this number of CLs, the
# command will prompt the user to make sure they know what they're doing. Large
# numbers of CLs generated by `git cl split` have caused infrastructure issues
# in the past.
CL_SPLIT_FORCE_LIMIT = 10
# The maximum number of top reviewers to list. `git cl split` may send many CLs
# to a single reviewer, so the top reviewers with the most CLs sent to them
# will be listed.
CL_SPLIT_TOP_REVIEWERS = 5
def EmitWarning(msg: str):
print("Warning: ", msg)
def HashList(lst: List[Any]) -> int:
"""
Hash a list, returning a positive integer. Lists with identical elements
should have the same hash, regardless of order.
"""
# Python refuses to hash lists directly because they're mutable
tup = tuple(sorted(lst))
return abs(hash(tup))
FilesAndOwnersDirectory = collections.namedtuple("FilesAndOwnersDirectory",
"files owners_directories")
@dataclasses.dataclass
class CLInfo:
"""
Data structure representing a single CL. The script will split the large CL
into a list of these.
Fields:
- reviewers: the reviewers the CL will be sent to.
- files: a list of <action>, <file> pairs in the CL.
Has the same format as `git status`.
- directories: a string representing the directories containing the files
in this CL. This is only used for replacing $directory in
the user-provided CL description.
"""
# Have to use default_factory because lists are mutable
reviewers: Set[str] = dataclasses.field(default_factory=set)
files: List[Tuple[str, str]] = dataclasses.field(default_factory=list)
# This is only used for formatting in the CL description, so it just
# has to be convertible to string.
directories: Any = ""
def FormatForPrinting(self) -> str:
"""
Format the CLInfo for printing to a file in a human-readable format.
"""
# Don't quote the reviewer emails in the output
reviewers_str = ", ".join(self.reviewers)
lines = [
f"Reviewers: [{reviewers_str}]", f"Description: {self.directories}"
] + [f"{action}, {file}" for (action, file) in self.files]
return "\n".join(lines)
def CLInfoFromFilesAndOwnersDirectoriesDict(
d: Dict[Tuple[str], FilesAndOwnersDirectory]) -> List[CLInfo]:
"""
Transform a dictionary mapping reviewer tuples to FilesAndOwnersDirectories
into a list of CLInfo
"""
cl_infos = []
for (reviewers, fod) in d.items():
cl_infos.append(
CLInfo(set(reviewers), fod.files, fod.owners_directories))
return cl_infos
def EnsureInGitRepository():
"""Throws an exception if the current directory is not a git repository."""
git.run('rev-parse')
def CreateBranchName(prefix: str, files: List[Tuple[str, str]]) -> str:
"""
Given a sub-CL as a list of (action, file) pairs, create a unique and
deterministic branch name for it.
The name has the format <prefix>_<dirname>_<hash(files)>_split.
"""
file_names = [file for _, file in files]
if len(file_names) == 1:
# Only one file, just use its directory as the common path
common_path = os.path.dirname(file_names[0])
else:
common_path = os.path.commonpath(file_names)
if not common_path:
# Files have nothing in common at all. Unlikely but possible.
common_path = "None"
# Replace path delimiter with underscore in common_path.
common_path = common_path.replace(os.path.sep, '_')
return f"{prefix}_{HashList(files):020}_{common_path}_split"
def CreateBranchForOneCL(prefix: str, files: List[Tuple[str, str]],
upstream: str) -> bool:
"""Creates a branch named |prefix| + "_" + |hash(files)| + "_split".
Return false if the branch already exists. |upstream| is used as upstream
for the created branch.
"""
existing_branches = set(git.branches(use_limit=False))
branch_name = CreateBranchName(prefix, files)
if branch_name in existing_branches:
return False
git.run('checkout', '-t', upstream, '-b', branch_name)
return True
def FormatDirectoriesForPrinting(directories, prefix=None):
"""Formats directory list for printing
Uses dedicated format for single-item list."""
prefixed = directories
if prefix:
prefixed = [(prefix + d) for d in directories]
return str(prefixed) if len(prefixed) > 1 else str(prefixed[0])
def FormatDescriptionOrComment(txt, directories):
"""Replaces $directory with |directories| in |txt|."""
to_insert = FormatDirectoriesForPrinting(directories)
return txt.replace('$directory', to_insert)
def AddUploadedByGitClSplitToDescription(description):
"""Adds a 'This CL was uploaded by git cl split.' line to |description|.
The line is added before footers, or at the end of |description| if it has
no footers.
"""
split_footers = git_footers.split_footers(description)
lines = split_footers[0]
if lines[-1] and not lines[-1].isspace():
lines = lines + ['']
lines = lines + ['This CL was uploaded by git cl split.']
if split_footers[1]:
lines += [''] + split_footers[1]
return '\n'.join(lines)
def UploadCl(refactor_branch, refactor_branch_upstream, directories, files,
description, comment, reviewers, changelist, cmd_upload,
cq_dry_run, enable_auto_submit, topic, repository_root):
"""Uploads a CL with all changes to |files| in |refactor_branch|.
Args:
refactor_branch: Name of the branch that contains the changes to upload.
refactor_branch_upstream: Name of the upstream of |refactor_branch|.
directories: Paths to the directories that contain the OWNERS files for
which to upload a CL.
files: List of AffectedFile instances to include in the uploaded CL.
description: Description of the uploaded CL.
comment: Comment to post on the uploaded CL.
reviewers: A set of reviewers for the CL.
changelist: The Changelist class.
cmd_upload: The function associated with the git cl upload command.
cq_dry_run: If CL uploads should also do a cq dry run.
enable_auto_submit: If CL uploads should also enable auto submit.
topic: Topic to associate with uploaded CLs.
"""
# Create a branch.
if not CreateBranchForOneCL(refactor_branch, files,
refactor_branch_upstream):
print('Skipping ' + FormatDirectoriesForPrinting(directories) +
' for which a branch already exists.')
return
# Checkout all changes to files in |files|.
deleted_files = []
modified_files = []
for action, f in files:
abspath = os.path.abspath(os.path.join(repository_root, f))
if action == 'D':
deleted_files.append(abspath)
else:
modified_files.append(abspath)
if deleted_files:
git.run(*['rm'] + deleted_files)
if modified_files:
git.run(*['checkout', refactor_branch, '--'] + modified_files)
# Commit changes. The temporary file is created with delete=False so that it
# can be deleted manually after git has read it rather than automatically
# when it is closed.
with gclient_utils.temporary_file() as tmp_file:
gclient_utils.FileWrite(
tmp_file, FormatDescriptionOrComment(description, directories))
git.run('commit', '-F', tmp_file)
# Upload a CL.
upload_args = ['-f']
if reviewers:
upload_args.extend(['-r', ','.join(sorted(reviewers))])
if cq_dry_run:
upload_args.append('--cq-dry-run')
if not comment:
upload_args.append('--send-mail')
if enable_auto_submit:
upload_args.append('--enable-auto-submit')
if topic:
upload_args.append('--topic={}'.format(topic))
print('Uploading CL for ' + FormatDirectoriesForPrinting(directories) +
'...')
ret = cmd_upload(upload_args)
if ret != 0:
print('Uploading failed.')
print('Note: git cl split has built-in resume capabilities.')
print('Delete ' + git.current_branch() +
' then run git cl split again to resume uploading.')
if comment:
changelist().AddComment(FormatDescriptionOrComment(
comment, directories),
publish=True)
def GetFilesSplitByOwners(files, max_depth):
"""Returns a map of files split by OWNERS file.
Returns:
A map where keys are paths to directories containing an OWNERS file and
values are lists of files sharing an OWNERS file.
"""
files_split_by_owners = {}
for action, path in files:
# normpath() is important to normalize separators here, in prepration
# for str.split() before. It would be nicer to use something like
# pathlib here but alas...
dir_with_owners = os.path.normpath(os.path.dirname(path))
if max_depth >= 1:
dir_with_owners = os.path.join(
*dir_with_owners.split(os.path.sep)[:max_depth])
# Find the closest parent directory with an OWNERS file.
while (dir_with_owners not in files_split_by_owners
and not os.path.isfile(os.path.join(dir_with_owners, 'OWNERS'))):
dir_with_owners = os.path.dirname(dir_with_owners)
files_split_by_owners.setdefault(dir_with_owners, []).append(
(action, path))
return files_split_by_owners
def PrintClInfo(cl_index, num_cls, directories, file_paths, description,
reviewers, cq_dry_run, enable_auto_submit, topic):
"""Prints info about a CL.
Args:
cl_index: The index of this CL in the list of CLs to upload.
num_cls: The total number of CLs that will be uploaded.
directories: Paths to directories that contains the OWNERS files for
which to upload a CL.
file_paths: A list of files in this CL.
description: The CL description.
reviewers: A set of reviewers for this CL.
cq_dry_run: If the CL should also be sent to CQ dry run.
enable_auto_submit: If the CL should also have auto submit enabled.
topic: Topic to set for this CL.
"""
description_lines = FormatDescriptionOrComment(description,
directories).splitlines()
indented_description = '\n'.join([' ' + l for l in description_lines])
print('CL {}/{}'.format(cl_index, num_cls))
print('Paths: {}'.format(FormatDirectoriesForPrinting(directories)))
print('Reviewers: {}'.format(', '.join(reviewers)))
print('Auto-Submit: {}'.format(enable_auto_submit))
print('CQ Dry Run: {}'.format(cq_dry_run))
print('Topic: {}'.format(topic))
print('\n' + indented_description + '\n')
print('\n'.join(file_paths))
print()
def LoadDescription(description_file, dry_run):
if not description_file:
if not dry_run:
# Parser checks this as well, so should be impossible
raise ValueError(
"Must provide a description file except during dry runs")
return ('Dummy description for dry run.\n'
'directory = $directory')
return gclient_utils.FileRead(description_file)
def PrintSummary(cl_infos, refactor_branch):
"""Print a brief summary of the splitting so the user
can review it before uploading.
Args:
files_split_by_reviewers: A dictionary mapping reviewer tuples
to the files and directories assigned to them.
"""
for info in cl_infos:
print(f'Reviewers: {info.reviewers}, files: {len(info.files)}, ',
f'directories: {info.directories}')
num_cls = len(cl_infos)
print(f'\nWill split branch {refactor_branch} into {num_cls} CLs. '
'Please quickly review them before proceeding.\n')
if (num_cls > CL_SPLIT_FORCE_LIMIT):
EmitWarning(
'Uploading this many CLs may potentially '
'reach the limit of concurrent runs, imposed on you by the '
'build infrastructure. Your runs may be throttled as a '
'result.\n\nPlease email infra-dev@chromium.org if you '
'have any questions. '
'The infra team reserves the right to cancel '
'your jobs if they are overloading the CQ.\n\n'
'(Alternatively, you can reduce the number of CLs created by '
'using the --max-depth option. Pass --dry-run to examine the '
'CLs which will be created until you are happy with the '
'results.)')
def SplitCl(description_file, comment_file, changelist, cmd_upload, dry_run,
cq_dry_run, enable_auto_submit, max_depth, topic, from_file,
repository_root):
""""Splits a branch into smaller branches and uploads CLs.
Args:
description_file: File containing the description of uploaded CLs.
comment_file: File containing the comment of uploaded CLs.
changelist: The Changelist class.
cmd_upload: The function associated with the git cl upload command.
dry_run: Whether this is a dry run (no branches or CLs created).
cq_dry_run: If CL uploads should also do a cq dry run.
enable_auto_submit: If CL uploads should also enable auto submit.
max_depth: The maximum directory depth to search for OWNERS files. A
value less than 1 means no limit.
topic: Topic to associate with split CLs.
Returns:
0 in case of success. 1 in case of error.
"""
description = LoadDescription(description_file, dry_run)
description = AddUploadedByGitClSplitToDescription(description)
comment = gclient_utils.FileRead(comment_file) if comment_file else None
try:
EnsureInGitRepository()
cl = changelist()
upstream = cl.GetCommonAncestorWithUpstream()
files = [
(action.strip(), f)
for action, f in scm.GIT.CaptureStatus(repository_root, upstream)
]
if not files:
print('Cannot split an empty CL.')
return 1
author = git.run('config', 'user.email').strip() or None
refactor_branch = git.current_branch()
assert refactor_branch, "Can't run from detached branch."
refactor_branch_upstream = git.upstream(refactor_branch)
assert refactor_branch_upstream, \
"Branch %s must have an upstream." % refactor_branch
if not dry_run and not CheckDescriptionBugLink(description):
return 0
if from_file:
cl_infos = LoadSplittingFromFile(from_file, files_on_disk=files)
else:
files_split_by_reviewers = SelectReviewersForFiles(
cl, author, files, max_depth)
cl_infos = CLInfoFromFilesAndOwnersDirectoriesDict(
files_split_by_reviewers)
if not dry_run:
PrintSummary(cl_infos, refactor_branch)
answer = gclient_utils.AskForData(
'Proceed? (y/N, or i to edit interactively): ')
if answer.lower() == 'i':
cl_infos = EditSplittingInteractively(cl_infos,
files_on_disk=files)
else:
# Save even if we're continuing, so the user can safely resume an
# aborted upload with the same splitting
SaveSplittingToTempFile(cl_infos)
if answer.lower() != 'y':
return 0
cls_per_reviewer = collections.defaultdict(int)
for cl_index, cl_info in enumerate(cl_infos, 1):
# Convert reviewers from tuple to set.
if dry_run:
file_paths = [f for _, f in cl_info.files]
PrintClInfo(cl_index, len(cl_infos), cl_info.directories,
file_paths, description, cl_info.reviewers,
cq_dry_run, enable_auto_submit, topic)
else:
UploadCl(refactor_branch, refactor_branch_upstream,
cl_info.directories, cl_info.files, description,
comment, cl_info.reviewers, changelist, cmd_upload,
cq_dry_run, enable_auto_submit, topic, repository_root)
for reviewer in cl_info.reviewers:
cls_per_reviewer[reviewer] += 1
# List the top reviewers that will be sent the most CLs as a result of
# the split.
reviewer_rankings = sorted(cls_per_reviewer.items(),
key=lambda item: item[1],
reverse=True)
print('The top reviewers are:')
for reviewer, count in reviewer_rankings[:CL_SPLIT_TOP_REVIEWERS]:
print(f' {reviewer}: {count} CLs')
if dry_run:
# Wait until now to save the splitting so the file name doesn't get
# washed away by the flood of dry-run printing.
SaveSplittingToTempFile(cl_infos)
# Go back to the original branch.
git.run('checkout', refactor_branch)
except subprocess2.CalledProcessError as cpe:
sys.stderr.write(cpe.stderr)
return 1
return 0
def CheckDescriptionBugLink(description):
"""Verifies that the description contains a bug link.
Examples:
Bug: 123
Bug: chromium:456
Prompts user if the description does not contain a bug link.
"""
bug_pattern = re.compile(r"^Bug:\s*(?:[a-zA-Z]+:)?[0-9]+", re.MULTILINE)
matches = re.findall(bug_pattern, description)
answer = 'y'
if not matches:
answer = gclient_utils.AskForData(
'Description does not include a bug link. Proceed? (y/N):')
return answer.lower() == 'y'
def SelectReviewersForFiles(cl, author, files, max_depth):
"""Selects reviewers for passed-in files
Args:
cl: Changelist class instance
author: Email of person running 'git cl split'
files: List of files
max_depth: The maximum directory depth to search for OWNERS files.
A value less than 1 means no limit.
"""
info_split_by_owners = GetFilesSplitByOwners(files, max_depth)
info_split_by_reviewers = {}
for (directory, split_files) in info_split_by_owners.items():
# Use '/' as a path separator in the branch name and the CL description
# and comment.
directory = directory.replace(os.path.sep, '/')
file_paths = [f for _, f in split_files]
# Convert reviewers list to tuple in order to use reviewers as key to
# dictionary.
reviewers = tuple(
cl.owners_client.SuggestOwners(
file_paths, exclude=[author, cl.owners_client.EVERYONE]))
if not reviewers in info_split_by_reviewers:
info_split_by_reviewers[reviewers] = FilesAndOwnersDirectory([], [])
info_split_by_reviewers[reviewers].files.extend(split_files)
info_split_by_reviewers[reviewers].owners_directories.append(directory)
return info_split_by_reviewers
def SaveSplittingToFile(cl_infos: List[CLInfo], filename: str, silent=False):
"""
Writes the listed CLs to the designated file, in a human-readable and
editable format. Include an explanation of the file format at the top,
as well as instructions for how to use it.
"""
preamble = (
"# CLs in this file must have the following format:\n"
"# A 'Reviewers: [...]' line, where '...' is a (possibly empty) list "
"of reviewer emails.\n"
"# A 'Description: ...' line, where '...' is any string (by default, "
"the list of directories the files have been pulled from).\n"
"# One or more file lines, consisting of an <action>, <file> pair, in "
"the format output by `git status`.\n\n"
"# Each 'Reviewers' line begins a new CL.\n"
"# To use the splitting in this file, use the --from-file option.\n\n")
cl_string = "\n\n".join([info.FormatForPrinting() for info in cl_infos])
gclient_utils.FileWrite(filename, preamble + cl_string)
if not silent:
print(f"Saved splitting to {filename}")
def SaveSplittingToTempFile(cl_infos: List[CLInfo], silent=False):
"""
Create a file in the user's temp directory, and save the splitting there.
"""
# We can't use gclient_utils.temporary_file because it will be removed
temp_file, temp_name = tempfile.mkstemp(prefix="split_cl_")
os.close(temp_file) # Necessary for windows
SaveSplittingToFile(cl_infos, temp_name, silent)
return temp_name
class ClSplitParseError(Exception):
pass
# Matches 'Reviewers: [...]', extracts the ...
reviewers_re = re.compile(r'Reviewers:\s*\[([^\]]*)\]')
# Matches 'Description: ...', extracts the ...
description_re = re.compile(r'Description:\s*(.+)')
# Matches '<action>, <file>', and extracts both
# <action> must be a valid code (either 1 or 2 letters)
file_re = re.compile(r'([MTADRC]{1,2}),\s*(.+)')
# TODO(crbug.com/389069356): Replace the "Description" line with an optional
# "Description" line, and adjust the description variables accordingly, as well
# as all the places in the code that expect to get a directory list.
# We use regex parsing instead of e.g. json because it lets us use a much more
# human-readable format, similar to the summary printed in dry runs
def ParseSplittings(lines: List[str]) -> List[CLInfo]:
"""
Parse a splitting file. We expect to get a series of lines in the format
of CLInfo.FormatForPrinting. In the following order, we expect to see
- A 'Reviewers: ' line containing a list,
- A 'Description: ' line containing anything, and
- A list of <action>, <path> pairs, each on its own line
Note that this function only transforms the file into a list of CLInfo
(if possible). It does not validate the information; for that, see
ValidateSplitting.
"""
cl_infos = []
current_cl_info = None
for line in lines:
line = line.strip()
# Skip empty or commented lines
if not line or line.startswith('#'):
continue
# Start a new CL whenever we see a new Reviewers: line
m = re.fullmatch(reviewers_re, line)
if m:
reviewers_str = m.group(1)
reviewers = [r.strip() for r in reviewers_str.split(",")]
# Account for empty list or trailing comma
if not reviewers[-1]:
reviewers = reviewers[:-1]
if current_cl_info:
cl_infos.append(current_cl_info)
current_cl_info = CLInfo(reviewers=reviewers)
continue
if not current_cl_info:
# Make sure no nonempty lines appear before the first CL
raise ClSplitParseError(
f"Error: Line appears before the first 'Reviewers: ' line:\n{line}"
)
# Description is just used as a description, so any string is fine
m = re.fullmatch(description_re, line)
if m:
if current_cl_info.directories:
raise ClSplitParseError(
f"Error parsing line: CL already has a directories entry\n{line}"
)
current_cl_info.directories = m.group(1).strip()
continue
# Any other line is presumed to be an '<action>, <file>' pair
m = re.fullmatch(file_re, line)
if m:
action, path = m.groups()
current_cl_info.files.append((action, path))
continue
raise ClSplitParseError("Error parsing line: Does not look like\n"
"'Reviewers: [...]',\n"
"'Description: ...', or\n"
f"a pair of '<action>, <file>':\n{line}")
if (current_cl_info):
cl_infos.append(current_cl_info)
return cl_infos
def ValidateSplitting(cl_infos: List[CLInfo], filename: str,
files_on_disk: List[Tuple[str, str]]):
"""
Ensure that the provided list of CLs is a valid splitting.
Specifically, check that:
- Each file is in at most one CL
- Each file and action appear in the list of changed files reported by git
- Warn if some files don't appear in any CL
- Warn if a reviewer string looks wrong, or if a CL is empty
"""
# Validate the parsed information
if not cl_infos:
EmitWarning("No CLs listed in file. No action will be taken.")
return []
files_in_loaded_cls = set()
# Collect all files, ensuring no duplicates
# Warn on empty CLs or invalid reviewer strings
for info in cl_infos:
if not info.files:
EmitWarning("CL has no files, and will be skipped:\n",
info.FormatForPrinting())
for file_info in info.files:
if file_info in files_in_loaded_cls:
raise ClSplitParseError(
f"File appears in multiple CLs in {filename}:\n{file_info}")
files_in_loaded_cls.add(file_info)
for reviewer in info.reviewers:
if not (re.fullmatch(r"[^@]+@[^.]+\..+", reviewer)):
EmitWarning("reviewer does not look like an email address: ",
reviewer)
# Strip empty CLs
cl_infos = [info for info in cl_infos if info.files]
# Ensure the files in the user-provided CL splitting match the files
# that git reports.
# Warn if not all the files git reports appear.
# Fail if the user mentions a file that isn't reported by git
files_on_disk = set(files_on_disk)
if not files_in_loaded_cls.issubset(files_on_disk):
extra_files = files_in_loaded_cls.difference(files_on_disk)
extra_files_str = "\n".join(f"{action}, {file}"
for (action, file) in extra_files)
raise ClSplitParseError(
f"Some files are listed in {filename} but do not match any files "
f"listed by git:\n{extra_files_str}")
unmentioned_files = files_on_disk.difference(files_in_loaded_cls)
if (unmentioned_files):
EmitWarning(
"the following files are not included in any CL in {filename}. "
"They will not be uploaded:\n", unmentioned_files)
def LoadSplittingFromFile(filename: str,
files_on_disk: List[Tuple[str, str]]) -> List[CLInfo]:
"""
Given a file and the list of <action>, <file> pairs reported by git,
read the file and return the list of CLInfos it contains.
"""
lines = gclient_utils.FileRead(filename).splitlines()
cl_infos = ParseSplittings(lines)
ValidateSplitting(cl_infos, filename, files_on_disk)
return cl_infos
def EditSplittingInteractively(
cl_infos: List[CLInfo],
files_on_disk: List[Tuple[str, str]]) -> List[CLInfo]:
"""
Allow the user to edit the generated splitting using their default editor.
Make sure the edited splitting is saved so they can retrieve it if needed.
"""
tmp_file = SaveSplittingToTempFile(cl_infos, silent=True)
splitting = gclient_utils.RunEditor(gclient_utils.FileRead(tmp_file), False)
cl_infos = ParseSplittings(splitting.splitlines())
# Save the edited splitting before validation, so the user can go back
# and edit it if there are any typos
SaveSplittingToFile(cl_infos, tmp_file)
ValidateSplitting(cl_infos, "the provided splitting", files_on_disk)
return cl_infos