You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
depot_tools/split_cl.py

1080 lines
42 KiB
Python

#!/usr/bin/env python3
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Splits a branch into smaller branches and uploads CLs."""
import collections
import dataclasses
import hashlib
import math
import os
import re
import tempfile
from typing import List, Set, Tuple, Dict, Any
import gclient_utils
import git_footers
import scm
import git_common as git
# If a call to `git cl split` will generate more than this number of CLs, the
# command will prompt the user to make sure they know what they're doing. Large
# numbers of CLs generated by `git cl split` have caused infrastructure issues
# in the past.
CL_SPLIT_FORCE_LIMIT = 10
# The maximum number of top reviewers to list. `git cl split` may send many CLs
# to a single reviewer, so the top reviewers with the most CLs sent to them
# will be listed.
CL_SPLIT_TOP_REVIEWERS = 5
def Emit(*msg: str):
"""Wrapper for easier mocking during tests"""
print(*msg)
def EmitWarning(*msg: str):
print("Warning: ", *msg)
def HashList(lst: List[Any]) -> str:
"""
Hash a list, returning a positive integer. Lists with identical elements
should have the same hash, regardless of order.
"""
# We need a bytes-like object for hashlib algorithms
byts = bytes().join(
(action + file).encode() for action, file in sorted(lst))
# No security implication: we just need a deterministic output
hashed = hashlib.sha1(byts)
return hashed.hexdigest()[:10]
FilesAndOwnersDirectory = collections.namedtuple("FilesAndOwnersDirectory",
"files owners_directories")
@dataclasses.dataclass
class CLInfo:
"""
Data structure representing a single CL. The script will split the large CL
into a list of these.
Fields:
- reviewers: the reviewers the CL will be sent to.
- files: a list of <action>, <file> pairs in the CL.
Has the same format as `git status`.
- description: a string describing the CL. Typically the list of affected
directories. Only used for replacing $description in
the user-provided CL description.
"""
# Have to use default_factory because lists are mutable
reviewers: Set[str] = dataclasses.field(default_factory=set)
files: List[Tuple[str, str]] = dataclasses.field(default_factory=list)
# This is only used for formatting in the CL description, so it just
# has to be convertible to string.
description: Any = ""
def FormatForPrinting(self) -> str:
"""
Format the CLInfo for printing to a file in a human-readable format.
"""
# Don't quote the reviewer emails in the output
reviewers_str = ", ".join(self.reviewers)
lines = [
f"Reviewers: [{reviewers_str}]", f"Description: {self.description}"
] + [f"{action}, {file}" for (action, file) in self.files]
return "\n".join(lines)
def CLInfoFromFilesAndOwnersDirectoriesDict(
d: Dict[Tuple[str], FilesAndOwnersDirectory]) -> List[CLInfo]:
"""
Transform a dictionary mapping reviewer tuples to FilesAndOwnersDirectories
into a list of CLInfo
"""
cl_infos = []
for (reviewers, fod) in d.items():
cl_infos.append(
CLInfo(set(reviewers), fod.files,
FormatDirectoriesForPrinting(fod.owners_directories)))
return cl_infos
def EnsureInGitRepository():
"""Throws an exception if the current directory is not a git repository."""
git.run('rev-parse')
def CreateBranchName(prefix: str, files: List[Tuple[str, str]]) -> str:
"""
Given a sub-CL as a list of (action, file) pairs, create a unique and
deterministic branch name for it.
The name has the format <prefix>_<dirname>_<hash(files)>_split.
"""
file_names = [file for _, file in files]
if len(file_names) == 1:
# Only one file, just use its directory as the common path
common_path = os.path.dirname(file_names[0])
else:
common_path = os.path.commonpath(file_names)
if not common_path:
# Files have nothing in common at all. Unlikely but possible.
common_path = "None"
# Replace path delimiter with underscore in common_path.
common_path = common_path.replace(os.path.sep, '_')
return f"{prefix}_{HashList(files)}_{common_path}_split"
def CreateBranchForOneCL(prefix: str, files: List[Tuple[str, str]],
upstream: str) -> bool:
"""Creates a branch named |prefix| + "_" + |hash(files)| + "_split".
Return false if the branch already exists. |upstream| is used as upstream
for the created branch.
"""
branches_on_disk = set(git.branches(use_limit=False))
branch_name = CreateBranchName(prefix, files)
if branch_name in branches_on_disk:
return False
git.run('checkout', '-t', upstream, '-b', branch_name)
return True
def ValidateExistingBranches(prefix: str, cl_infos: List[CLInfo]) -> bool:
"""
Check if there are splitting branches left over from a previous run.
We only allow branches to exist if we're resuming a previous upload,
in which case we require that the existing branches are a subset of
the branches we're going to generate.
"""
branches_on_disk = set(
branch for branch in git.branches(use_limit=False)
if branch.startswith(prefix + "_") and branch.endswith("_split"))
branches_to_be_made = set(
CreateBranchName(prefix, info.files) for info in cl_infos)
if not branches_on_disk.issubset(branches_to_be_made):
Emit("It seems like you've already run `git cl split` on this branch.\n"
"If you're resuming a previous upload, you must pass in the "
"same splitting as before, using the --from-file option.\n"
"If you're starting a new upload, please clean up existing split "
f"branches (starting with '{prefix}_' and ending with '_split'), "
"and re-run the tool.")
Emit("The following branches need to be cleaned up:\n")
for branch in branches_on_disk - branches_to_be_made:
Emit(branch)
return False
return True
def FormatDirectoriesForPrinting(directories: List[str],
prefix: str = None) -> str:
"""Formats directory list for printing
Uses dedicated format for single-item list."""
prefixed = directories
if prefix:
prefixed = [(prefix + d) for d in directories]
return str(prefixed[0]) if len(prefixed) == 1 else str(prefixed)
def FormatDescriptionOrComment(txt, desc):
"""Replaces $description with |desc| in |txt|."""
# TODO(389069356): Remove support for $directory entirely once it's been
# deprecated for a while.
replaced_txt = txt.replace('$directory', desc)
if txt != replaced_txt:
EmitWarning('Usage of $directory is deprecated and will be removed '
'in a future update. Please use $description instead, '
'which has the same behavior by default.\n\n')
replaced_txt = replaced_txt.replace('$description', desc)
return replaced_txt
def AddUploadedByGitClSplitToDescription(description, is_experimental=False):
"""Adds a 'This CL was uploaded by git cl split.' line to |description|.
The line is added before footers, or at the end of |description| if it has
no footers.
"""
if is_experimental:
new_lines = [
'This CL was uploaded by an experimental version of git cl split',
'(https://crbug.com/389069356).'
]
else:
new_lines = ['This CL was uploaded by git cl split.']
split_footers = git_footers.split_footers(description)
lines = split_footers[0]
if lines[-1] and not lines[-1].isspace():
lines = lines + ['']
lines = lines + new_lines
if split_footers[1]:
lines += [''] + split_footers[1]
return '\n'.join(lines)
def UploadCl(refactor_branch, refactor_branch_upstream, cl_description, files,
user_description, saved_splitting_file, comment, reviewers,
changelist, cmd_upload, cq_dry_run, enable_auto_submit, topic,
repository_root):
"""Uploads a CL with all changes to |files| in |refactor_branch|.
Args:
refactor_branch: Name of the branch that contains the changes to upload.
refactor_branch_upstream: Name of the upstream of |refactor_branch|.
cl_description: Description of this specific CL, e.g. the list of
affected directories.
files: List of AffectedFile instances to include in the uploaded CL.
user_description: Description provided by user.
comment: Comment to post on the uploaded CL.
reviewers: A set of reviewers for the CL.
changelist: The Changelist class.
cmd_upload: The function associated with the git cl upload command.
cq_dry_run: If CL uploads should also do a cq dry run.
enable_auto_submit: If CL uploads should also enable auto submit.
topic: Topic to associate with uploaded CLs.
"""
# Create a branch.
if not CreateBranchForOneCL(refactor_branch, files,
refactor_branch_upstream):
Emit(
f'Skipping existing branch for CL with description: {cl_description}'
)
return
# Checkout all changes to files in |files|.
deleted_files = []
modified_files = []
for action, f in files:
abspath = os.path.abspath(os.path.join(repository_root, f))
if action == 'D':
deleted_files.append(abspath)
else:
modified_files.append(abspath)
if deleted_files:
git.run(*['rm'] + deleted_files)
if modified_files:
git.run(*['checkout', refactor_branch, '--'] + modified_files)
# Commit changes. The temporary file is created with delete=False so that it
# can be deleted manually after git has read it rather than automatically
# when it is closed.
with gclient_utils.temporary_file() as tmp_file:
gclient_utils.FileWrite(
tmp_file,
FormatDescriptionOrComment(user_description, cl_description))
git.run('commit', '-F', tmp_file)
# Upload a CL.
upload_args = ['-f']
if reviewers:
upload_args.extend(['-r', ','.join(sorted(reviewers))])
if cq_dry_run:
upload_args.append('--cq-dry-run')
if not comment:
upload_args.append('--send-mail')
if enable_auto_submit:
upload_args.append('--enable-auto-submit')
if topic:
upload_args.append('--topic={}'.format(topic))
Emit(f'Uploading CL with description: {cl_description} ...')
ret = cmd_upload(upload_args)
if ret != 0:
Emit('Uploading failed.')
Emit('Note: git cl split has built-in resume capabilities.')
Emit(f'Delete {git.current_branch()} then run\n'
f'git cl split --from-file={saved_splitting_file}\n'
'to resume uploading.')
if comment:
changelist().AddComment(FormatDescriptionOrComment(
comment, cl_description),
publish=True)
def GetFilesSplitByOwners(files, max_depth):
"""Returns a map of files split by OWNERS file.
Returns:
A map where keys are paths to directories containing an OWNERS file and
values are lists of files sharing an OWNERS file.
"""
files_split_by_owners = {}
for action, path in files:
# normpath() is important to normalize separators here, in prepration
# for str.split() before. It would be nicer to use something like
# pathlib here but alas...
dir_with_owners = os.path.normpath(os.path.dirname(path))
if max_depth >= 1:
dir_with_owners = os.path.join(
*dir_with_owners.split(os.path.sep)[:max_depth])
# Find the closest parent directory with an OWNERS file.
while (dir_with_owners not in files_split_by_owners
and not os.path.isfile(os.path.join(dir_with_owners, 'OWNERS'))):
dir_with_owners = os.path.dirname(dir_with_owners)
files_split_by_owners.setdefault(dir_with_owners, []).append(
(action, path))
return files_split_by_owners
def PrintClInfo(cl_index, num_cls, cl_description, file_paths, user_description,
reviewers, cq_dry_run, enable_auto_submit, topic):
"""Prints info about a CL.
Args:
cl_index: The index of this CL in the list of CLs to upload.
num_cls: The total number of CLs that will be uploaded.
cl_description: Description of this specific CL, e.g. the list of
affected directories.
file_paths: A list of files in this CL.
user_description: Description provided by user.
reviewers: A set of reviewers for this CL.
cq_dry_run: If the CL should also be sent to CQ dry run.
enable_auto_submit: If the CL should also have auto submit enabled.
topic: Topic to set for this CL.
"""
description_lines = FormatDescriptionOrComment(user_description,
cl_description).splitlines()
indented_description = '\n'.join([' ' + l for l in description_lines])
Emit('CL {}/{}'.format(cl_index, num_cls))
Emit('Paths: {}'.format(cl_description))
Emit('Reviewers: {}'.format(', '.join(reviewers)))
Emit('Auto-Submit: {}'.format(enable_auto_submit))
Emit('CQ Dry Run: {}'.format(cq_dry_run))
Emit('Topic: {}'.format(topic))
Emit('\n' + indented_description + '\n')
Emit('\n'.join(file_paths))
def LoadDescription(description_file, dry_run):
if not description_file:
if not dry_run:
# Parser checks this as well, so should be impossible
raise ValueError(
"Must provide a description file except during dry runs")
return ('Dummy description for dry run.\n'
'description = $description')
return gclient_utils.FileRead(description_file)
def PrintSummary(cl_infos, refactor_branch):
"""Print a brief summary of the splitting so the user
can review it before uploading.
Args:
files_split_by_reviewers: A dictionary mapping reviewer tuples
to the files and directories assigned to them.
"""
for info in cl_infos:
Emit(f'Reviewers: {info.reviewers}, files: {len(info.files)}, '
f'description: {info.description}')
num_cls = len(cl_infos)
Emit(f'\nWill split branch {refactor_branch} into {num_cls} CLs. '
'Please quickly review them before proceeding.\n')
if (num_cls > CL_SPLIT_FORCE_LIMIT):
EmitWarning(
'Uploading this many CLs may potentially '
'reach the limit of concurrent runs, imposed on you by the '
'build infrastructure. Your runs may be throttled as a '
'result.\n\nPlease email infra-dev@chromium.org if you '
'have any questions. '
'The infra team reserves the right to cancel '
'your jobs if they are overloading the CQ.\n\n'
'(Alternatively, you can reduce the number of CLs created by '
'using the --max-depth option, or altering the arguments to '
'--target-range, as appropriate. Pass --dry-run to examine the '
'CLs which will be created until you are happy with the '
'results.)')
def SplitCl(description_file, comment_file, changelist, cmd_upload, dry_run,
summarize, reviewers_override, cq_dry_run, enable_auto_submit,
max_depth, topic, target_range, expect_owners_override, from_file,
repository_root):
""""Splits a branch into smaller branches and uploads CLs.
Args:
description_file: File containing the description of uploaded CLs.
comment_file: File containing the comment of uploaded CLs.
changelist: The Changelist class.
cmd_upload: The function associated with the git cl upload command.
dry_run: Whether this is a dry run (no branches or CLs created).
reviewers_override: Either None or a (possibly empty) list of reviewers
all CLs should be sent to.
cq_dry_run: If CL uploads should also do a cq dry run.
enable_auto_submit: If CL uploads should also enable auto submit.
max_depth: The maximum directory depth to search for OWNERS files. A
value less than 1 means no limit.
topic: Topic to associate with split CLs.
Returns:
0 in case of success. 1 in case of error.
"""
description = LoadDescription(description_file, dry_run)
description = AddUploadedByGitClSplitToDescription(
description, is_experimental=target_range)
comment = gclient_utils.FileRead(comment_file) if comment_file else None
EnsureInGitRepository()
cl = changelist()
upstream = cl.GetCommonAncestorWithUpstream()
files = [(action.strip(), f)
for action, f in scm.GIT.CaptureStatus(repository_root, upstream)]
if not files:
Emit('Cannot split an empty CL.')
return 1
author = git.run('config', 'user.email').strip() or None
refactor_branch = git.current_branch()
assert refactor_branch, "Can't run from detached branch."
refactor_branch_upstream = git.upstream(refactor_branch)
assert refactor_branch_upstream, \
"Branch %s must have an upstream." % refactor_branch
if not dry_run and not CheckDescriptionBugLink(description):
return 0
if from_file:
cl_infos = LoadSplittingFromFile(from_file, files_on_disk=files)
elif target_range:
min_files, max_files = target_range
cl_infos = GroupFilesByDirectory(cl, author, expect_owners_override,
files, min_files, max_files)
else:
files_split_by_reviewers = SelectReviewersForFiles(
cl, author, files, max_depth)
cl_infos = CLInfoFromFilesAndOwnersDirectoriesDict(
files_split_by_reviewers)
# Note that we do this override even if the list is empty (indicating that
# the user requested CLs not be assigned to any reviewers).
if reviewers_override != None:
for info in cl_infos:
info.reviewers = set(reviewers_override)
if not dry_run:
PrintSummary(cl_infos, refactor_branch)
answer = gclient_utils.AskForData(
'Proceed? (y/N, or i to edit interactively): ')
if answer.lower() == 'i':
cl_infos, saved_splitting_file = EditSplittingInteractively(
cl_infos, files_on_disk=files)
else:
# Save even if we're continuing, so the user can safely resume an
# aborted upload with the same splitting
saved_splitting_file = SaveSplittingToTempFile(cl_infos)
if answer.lower() != 'y':
return 0
# Make sure there isn't any clutter left over from a previous run
if not ValidateExistingBranches(refactor_branch, cl_infos):
return 0
elif summarize:
PrintSummary(cl_infos, refactor_branch)
cls_per_reviewer = collections.defaultdict(int)
for cl_index, cl_info in enumerate(cl_infos, 1):
if dry_run and summarize:
pass
elif dry_run:
file_paths = [f for _, f in cl_info.files]
PrintClInfo(cl_index, len(cl_infos), cl_info.description,
file_paths, description, cl_info.reviewers, cq_dry_run,
enable_auto_submit, topic)
else:
UploadCl(refactor_branch, refactor_branch_upstream,
cl_info.description, cl_info.files, description,
saved_splitting_file, comment, cl_info.reviewers,
changelist, cmd_upload, cq_dry_run, enable_auto_submit,
topic, repository_root)
for reviewer in cl_info.reviewers:
cls_per_reviewer[reviewer] += 1
# List the top reviewers that will be sent the most CLs as a result of
# the split.
reviewer_rankings = sorted(cls_per_reviewer.items(),
key=lambda item: item[1],
reverse=True)
Emit('The top reviewers are:')
for reviewer, count in reviewer_rankings[:CL_SPLIT_TOP_REVIEWERS]:
Emit(f' {reviewer}: {count} CLs')
if dry_run:
# Wait until now to save the splitting so the file name doesn't get
# washed away by the flood of dry-run printing.
SaveSplittingToTempFile(cl_infos)
# Go back to the original branch.
git.run('checkout', refactor_branch)
return 0
def CheckDescriptionBugLink(description):
"""Verifies that the description contains a bug link.
Examples:
Bug: 123
Bug: chromium:456
Prompts user if the description does not contain a bug link.
"""
bug_pattern = re.compile(r"^Bug:\s*(?:[a-zA-Z]+:)?[0-9]+", re.MULTILINE)
matches = re.findall(bug_pattern, description)
answer = 'y'
if not matches:
answer = gclient_utils.AskForData(
'Description does not include a bug link. Proceed? (y/N):')
return answer.lower() == 'y'
def SelectReviewersForFiles(cl, author, files, max_depth):
"""Selects reviewers for passed-in files
Args:
cl: Changelist class instance
author: Email of person running 'git cl split'
files: List of files
max_depth: The maximum directory depth to search for OWNERS files.
A value less than 1 means no limit.
"""
info_split_by_owners = GetFilesSplitByOwners(files, max_depth)
info_split_by_reviewers = {}
for (directory, split_files) in info_split_by_owners.items():
# Use '/' as a path separator in the branch name and the CL description
# and comment.
directory = directory.replace(os.path.sep, '/')
file_paths = [f for _, f in split_files]
# Convert reviewers list to tuple in order to use reviewers as key to
# dictionary.
reviewers = tuple(
cl.owners_client.SuggestOwners(
file_paths, exclude=[author, cl.owners_client.EVERYONE]))
if not reviewers in info_split_by_reviewers:
info_split_by_reviewers[reviewers] = FilesAndOwnersDirectory([], [])
info_split_by_reviewers[reviewers].files.extend(split_files)
info_split_by_reviewers[reviewers].owners_directories.append(directory)
return info_split_by_reviewers
################################################################################
# Code for saving, editing, and loading splittings.
################################################################################
def SaveSplittingToFile(cl_infos: List[CLInfo], filename: str, silent=False):
"""
Writes the listed CLs to the designated file, in a human-readable and
editable format. Include an explanation of the file format at the top,
as well as instructions for how to use it.
"""
preamble = (
"# CLs in this file must have the following format:\n"
"# A 'Reviewers: [...]' line, where '...' is a (possibly empty) list "
"of reviewer emails.\n"
"# A 'Description: ...' line, where '...' is any string (by default, "
"the list of directories the files have been pulled from).\n"
"# One or more file lines, consisting of an <action>, <file> pair, in "
"the format output by `git status`.\n\n"
"# Each 'Reviewers' line begins a new CL.\n"
"# To use the splitting in this file, use the --from-file option.\n\n")
cl_string = "\n\n".join([info.FormatForPrinting() for info in cl_infos])
gclient_utils.FileWrite(filename, preamble + cl_string)
if not silent:
Emit(f"Saved splitting to {filename}")
def SaveSplittingToTempFile(cl_infos: List[CLInfo], silent=False):
"""
Create a file in the user's temp directory, and save the splitting there.
"""
# We can't use gclient_utils.temporary_file because it will be removed
temp_file, temp_name = tempfile.mkstemp(prefix="split_cl_")
os.close(temp_file) # Necessary for windows
SaveSplittingToFile(cl_infos, temp_name, silent)
return temp_name
class ClSplitParseError(Exception):
pass
# Matches 'Reviewers: [...]', extracts the ...
reviewers_re = re.compile(r'Reviewers:\s*\[([^\]]*)\]')
# Matches 'Description: ...', extracts the ...
description_re = re.compile(r'Description:\s*(.+)')
# Matches '<action>, <file>', and extracts both
# <action> must be a valid code (either 1 or 2 letters)
file_re = re.compile(r'([MTADRC]{1,2}),\s*(.+)')
# We use regex parsing instead of e.g. json because it lets us use a much more
# human-readable format, similar to the summary printed in dry runs
def ParseSplittings(lines: List[str]) -> List[CLInfo]:
"""
Parse a splitting file. We expect to get a series of lines in the format
of CLInfo.FormatForPrinting. In the following order, we expect to see
- A 'Reviewers: ' line containing a list,
- A 'Description: ' line containing anything, and
- A list of <action>, <path> pairs, each on its own line
Note that this function only transforms the file into a list of CLInfo
(if possible). It does not validate the information; for that, see
ValidateSplitting.
"""
cl_infos = []
current_cl_info = None
for line in lines:
line = line.strip()
# Skip empty or commented lines
if not line or line.startswith('#'):
continue
# Start a new CL whenever we see a new Reviewers: line
m = re.fullmatch(reviewers_re, line)
if m:
reviewers_str = m.group(1)
reviewers = [r.strip() for r in reviewers_str.split(",")]
# Account for empty list or trailing comma
if not reviewers[-1]:
reviewers = reviewers[:-1]
if current_cl_info:
cl_infos.append(current_cl_info)
current_cl_info = CLInfo(reviewers=reviewers)
continue
if not current_cl_info:
# Make sure no nonempty lines appear before the first CL
raise ClSplitParseError(
f"Error: Line appears before the first 'Reviewers: ' line:\n{line}"
)
# Description is just used as a description, so any string is fine
m = re.fullmatch(description_re, line)
if m:
if current_cl_info.description:
raise ClSplitParseError(
f"Error parsing line: CL already has a description entry\n{line}"
)
current_cl_info.description = m.group(1).strip()
continue
# Any other line is presumed to be an '<action>, <file>' pair
m = re.fullmatch(file_re, line)
if m:
action, path = m.groups()
current_cl_info.files.append((action, path))
continue
raise ClSplitParseError("Error parsing line: Does not look like\n"
"'Reviewers: [...]',\n"
"'Description: ...', or\n"
f"a pair of '<action>, <file>':\n{line}")
if (current_cl_info):
cl_infos.append(current_cl_info)
return cl_infos
def ValidateSplitting(cl_infos: List[CLInfo], filename: str,
files_on_disk: List[Tuple[str, str]]):
"""
Ensure that the provided list of CLs is a valid splitting.
Specifically, check that:
- Each file is in at most one CL
- Each file and action appear in the list of changed files reported by git
- Warn if some files don't appear in any CL
- Warn if a reviewer string looks wrong, or if a CL is empty
"""
# Validate the parsed information
if not cl_infos:
EmitWarning("No CLs listed in file. No action will be taken.")
return []
files_in_loaded_cls = set()
# Collect all files, ensuring no duplicates
# Warn on empty CLs or invalid reviewer strings
for info in cl_infos:
if not info.files:
EmitWarning("CL has no files, and will be skipped:\n",
info.FormatForPrinting())
for file_info in info.files:
if file_info in files_in_loaded_cls:
raise ClSplitParseError(
f"File appears in multiple CLs in {filename}:\n{file_info}")
files_in_loaded_cls.add(file_info)
for reviewer in info.reviewers:
if not (re.fullmatch(r"[^@]+@[^.]+\..+", reviewer)):
EmitWarning("reviewer does not look like an email address: ",
reviewer)
# Strip empty CLs
cl_infos = [info for info in cl_infos if info.files]
# Ensure the files in the user-provided CL splitting match the files
# that git reports.
# Warn if not all the files git reports appear.
# Fail if the user mentions a file that isn't reported by git
files_on_disk = set(files_on_disk)
if not files_in_loaded_cls.issubset(files_on_disk):
extra_files = files_in_loaded_cls.difference(files_on_disk)
extra_files_str = "\n".join(f"{action}, {file}"
for (action, file) in extra_files)
raise ClSplitParseError(
f"Some files are listed in {filename} but do not match any files "
f"listed by git:\n{extra_files_str}")
unmentioned_files = files_on_disk.difference(files_in_loaded_cls)
if (unmentioned_files):
EmitWarning(
"the following files are not included in any CL in {filename}. "
"They will not be uploaded:")
for file in unmentioned_files:
Emit(file)
def LoadSplittingFromFile(filename: str,
files_on_disk: List[Tuple[str, str]]) -> List[CLInfo]:
"""
Given a file and the list of <action>, <file> pairs reported by git,
read the file and return the list of CLInfos it contains.
"""
lines = gclient_utils.FileRead(filename).splitlines()
cl_infos = ParseSplittings(lines)
ValidateSplitting(cl_infos, filename, files_on_disk)
return cl_infos
def EditSplittingInteractively(
cl_infos: List[CLInfo],
files_on_disk: List[Tuple[str, str]]) -> List[CLInfo]:
"""
Allow the user to edit the generated splitting using their default editor.
Make sure the edited splitting is saved so they can retrieve it if needed.
"""
tmp_file = SaveSplittingToTempFile(cl_infos, silent=True)
splitting = gclient_utils.RunEditor(gclient_utils.FileRead(tmp_file), False)
cl_infos = ParseSplittings(splitting.splitlines())
# Save the edited splitting before validation, so the user can go back
# and edit it if there are any typos
SaveSplittingToFile(cl_infos, tmp_file)
ValidateSplitting(cl_infos, "the provided splitting", files_on_disk)
return cl_infos, tmp_file
################################################################################
# Code for the clustering-based splitting algorithm.
################################################################################
def GroupFilesByDirectory(cl, author: str, expect_owners_override: bool,
all_files: Tuple[str, str], min_files: int,
max_files: int) -> List[CLInfo]:
"""
Group the contents of |all_files| into clusters of size between |min_files|
and |max_files|, inclusive, based on their directory structure. Assign one
reviewer to each group to create a CL. If |expect_owners_override| is true,
consider only the directory structure of the files, ignoring ownership.
May rarely create groups with fewer than |min_files| files, or assign
multiple reviewers to a single CL.
Args:
cl: Changelist class instance, for calling owners methods
author: Email of person running the script; never assigned as a reviewer
"""
# Record the actions associated with each file because the clustering
# algorithm just takes filenames
actions_by_file = {}
file_paths = []
for (action, file) in all_files:
actions_by_file[file] = action
file_paths.append(file)
reviewers_so_far = []
cls = []
# Go through the clusters by path length so that we're likely to choose
# top-level owners earlier
for (directories, files) in sorted(
ClusterFiles(expect_owners_override, file_paths, min_files,
max_files)):
# Use '/' as a path separator in the branch name and the CL description
# and comment.
directories = [
directory.replace(os.path.sep, '/') for directory in directories
]
files_with_actions = [(actions_by_file[file], file) for file in files]
# Try to find a reviewer. If some of the files have noparent set,
# we'll likely get multiple reviewers. Don't consider reviewers we've
# already assigned something to.
# FIXME: Rather than excluding existing reviewers, it would be better
# to just penalize them, but still choose them over reviewers who have
# a worse score. At the moment, owners_client doesn't support anything
# to do with the score.
reviewers = cl.owners_client.SuggestMinimalOwners(
files,
exclude=[author, cl.owners_client.EVERYONE] + reviewers_so_far)
# Retry without excluding existing reviewers if we couldn't find any.
# This is very unlikely since there are many fallback owners.
if not reviewers:
reviewers = cl.owners_client.SuggestMinimalOwners(
directories, exclude=[author, cl.owners_client.EVERYONE])
reviewers_so_far.extend(reviewers)
cls.append(
CLInfo(set(reviewers), files_with_actions,
FormatDirectoriesForPrinting(directories)))
return cls
### Trie Code
def FolderHasParent(path: str) -> bool:
"""
Check if a folder inherits owners from a higher-level directory:
i.e. it's not at top level, and doesn't have an OWNERS file that contains
`set noparent`
"""
# Treat each top-leve directory as having no parent, as well as the root
# directory.
if len(path.split(os.path.sep)) <= 1:
# Top level
return False
owners_file = os.path.join(path, 'OWNERS')
if (os.path.isfile(owners_file)):
with (open(owners_file)) as f:
for line in f.readlines():
# Strip whitespace and comments
line = line.split('#')[0].strip()
if (line == 'set noparent'):
return False
return True
class DirectoryTrie():
"""
Trie structure: Nested dictionaries representing file paths.
Each level represents one folder, and contains:
- The path to that folder (its prefix)
- A list of files that reside in that folder
- A boolean for whether that folder inherits owners from a parent folder
- One Trie representing each of that folder's subdirectories
Files are stored with their entire path, so we don't need to reconstruct
it every time we read them.
"""
def __init__(self, expect_owners_override, prefix: str = ""):
""" Create an empty DirectoryTrie with the specified prefix """
has_parent = expect_owners_override or FolderHasParent(prefix)
# yapf: disable
self.subdirectories : Dict[str, DirectoryTrie] = {}
self.files : List[str] = []
self.prefix : str = prefix
self.has_parent : bool = has_parent
self.expect_owners_override : bool = expect_owners_override
# yapf: enable
def AddFile(self, path: List[str]):
"""
Add a file to the Trie, adding new subdirectories if necessary.
The file should be represented as a list of directories, with the final
entry being the filename.
"""
if len(path) == 1:
self.files.append(os.path.join(self.prefix, path[0]))
else:
directory = path[0]
if directory not in self.subdirectories:
prefix = os.path.join(self.prefix, directory)
self.subdirectories[directory] = DirectoryTrie(
self.expect_owners_override, prefix)
self.subdirectories[directory].AddFile(path[1:])
def AddFiles(self, paths: List[List[str]]):
""" Convenience function to add many files at once. """
for path in paths:
self.AddFile(path)
def ToList(self) -> List[str]:
""" Return a list of all files in the trie. """
files = []
files += self.files
for subdir in self.subdirectories.values():
files += subdir.ToList()
return files
### Clustering code
# Convenience type: a "bin" represents a collection of files:
# it tracks their prefix(es) and the list of files themselves.
# Both elements are string lists.
Bin = collections.namedtuple("Bin", "prefixes files")
def PackFiles(max_size: int, files_to_pack: List[Bin]) -> List[Bin]:
"""
Simple bin packing algorithm: given a list of small bins, consolidate them
into as few larger bins as possible, where each bin can hold at most
|max_size| files.
"""
bins = []
# Guess how many bins we'll need ahead of time so we can spread things
# between them. We'll add more bins later if necessary
expected_bins_needed = math.ceil(
sum(len(bin.files) for bin in files_to_pack) / max_size)
expected_avg_bin_size = math.ceil(
sum(len(bin.files) for bin in files_to_pack) / expected_bins_needed)
for _ in range(expected_bins_needed):
bins.append(Bin([], []))
# Sort by number of files, decreasing
sorted_by_num_files = sorted(files_to_pack, key=lambda bin: -len(bin.files))
# Invariant: the least-filled bin is always the first element of |bins|
# This ensures we spread things between bins as much as possible.
for (prefixes, files) in sorted_by_num_files:
b = bins[0]
if len(b.files) + len(files) <= max_size:
b[0].extend(prefixes)
b[1].extend(files)
else:
# Since the first bin is the emptiest, if we failed to fit in
# that we don't need to try any others.
# If these files alone are too large, split them up into
# groups of size |expected_avg_bin_size|
if len(files) > max_size:
bins.extend([
Bin(prefixes, files[i:i + expected_avg_bin_size])
for i in range(0, len(files), expected_avg_bin_size)
])
else:
bins.append(Bin(prefixes, files))
# Maintain invariant
bins.sort(key=lambda bin: len(bin.files))
return [bin for bin in bins if len(bin.files) > 0]
def ClusterFiles(expect_owners_override: bool, files: List[str], min_files: int,
max_files: int) -> List[Bin]:
"""
Group the entries of |files| into clusters of size between |min_files| and
|max_files|, inclusive. Guarantees that the size does not exceed
|max_files|, but the size may rarely be less than |min_files|. If
|expect_owners_override| is true, don't consider ownership when clustering,
only directory structure.
Clustering strategy for a given directory:
1. Try to group each subdirectory independently
2. Group any remaining files as follows:
2a. If there are less than |min_files| files and the folder has a parent,
give up and let the parent folder handle it.
2c. Otherwise, if there are at most |max_files| files, create one
cluster.
2c. Finally, if there are more than |max_files| files, create several
clusters of size less than |max_files|.
"""
trie = DirectoryTrie(expect_owners_override)
trie.AddFiles([file.split(os.path.sep) for file in files])
clusters: List[Bin] = []
def ClusterDirectory(current_dir: DirectoryTrie) -> List[str]:
"""
Attempt to cluster the files for a directory, by grouping them into
Bins and appending the bins to |clusters|.
Returns a list of files that weren't able to be clustered (because
there weren't at least |min_files| files).
"""
# Track all the files we need to handle in this directory
unclustered_files: List[Bin] = []
# Record any files that live in this directory directly
if len(current_dir.files) > 0:
unclustered_files.append(
Bin([current_dir.prefix], current_dir.files))
# Step 1: Try to cluster each subdirectory independently
for subdir in current_dir.subdirectories.values():
unclustered_files_in_subdir = ClusterDirectory(subdir)
# If not all files were submitted, record them
if len(unclustered_files_in_subdir) > 0:
unclustered_files.append(
Bin([subdir.prefix], unclustered_files_in_subdir))
# A flattened list containing just the names of all unclustered files
unclustered_files_names_only = [
file for bin in unclustered_files for file in bin.files
]
if len(unclustered_files_names_only) == 0:
return []
# Step 2a: If we don't have enough files for a cluster and it's possible
# to recurse upward, do so
if (len(unclustered_files_names_only) < min_files
and current_dir.has_parent):
return unclustered_files_names_only
# Step 2b, 2c: Create one or more clusters from the unclustered files
# by appending to the |clusters| variable in the outer scope
nonlocal clusters
if len(unclustered_files_names_only) <= max_files:
clusters.append(
Bin([current_dir.prefix], unclustered_files_names_only))
else:
clusters += PackFiles(max_files, unclustered_files)
return []
unclustered_paths = ClusterDirectory(trie)
if (len(unclustered_paths) > 0):
EmitWarning(
'Not all files were assigned to a CL!\n'
'This should be impossible, file a bug.\n'
f'{len(unclustered_paths)} Unassigned files: {unclustered_paths}')
return clusters