|
|
|
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
|
|
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
|
|
# found in the LICENSE file.
|
|
|
|
|
|
|
|
"""A database of OWNERS files.
|
|
|
|
|
|
|
|
OWNERS files indicate who is allowed to approve changes in a specific directory
|
|
|
|
(or who is allowed to make changes without needing approval of another OWNER).
|
|
|
|
Note that all changes must still be reviewed by someone familiar with the code,
|
|
|
|
so you may need approval from both an OWNER and a reviewer in many cases.
|
|
|
|
|
|
|
|
The syntax of the OWNERS file is, roughly:
|
|
|
|
|
|
|
|
lines := (\s* line? \s* "\n")*
|
|
|
|
|
|
|
|
line := directive
|
|
|
|
| "per-file" \s+ glob \s* "=" \s* directive
|
|
|
|
| comment
|
|
|
|
|
|
|
|
directive := "set noparent"
|
|
|
|
| "file:" glob
|
|
|
|
| email_address
|
|
|
|
| "*"
|
|
|
|
|
|
|
|
glob := [a-zA-Z0-9_-*?]+
|
|
|
|
|
|
|
|
comment := "#" [^"\n"]*
|
|
|
|
|
|
|
|
Email addresses must follow the foo@bar.com short form (exact syntax given
|
|
|
|
in BASIC_EMAIL_REGEXP, below). Filename globs follow the simple unix
|
|
|
|
shell conventions, and relative and absolute paths are not allowed (i.e.,
|
|
|
|
globs only refer to the files in the current directory).
|
|
|
|
|
|
|
|
If a user's email is one of the email_addresses in the file, the user is
|
|
|
|
considered an "OWNER" for all files in the directory.
|
|
|
|
|
|
|
|
If the "per-file" directive is used, the line only applies to files in that
|
|
|
|
directory that match the filename glob specified.
|
|
|
|
|
|
|
|
If the "set noparent" directive used, then only entries in this OWNERS file
|
|
|
|
apply to files in this directory; if the "set noparent" directive is not
|
|
|
|
used, then entries in OWNERS files in enclosing (upper) directories also
|
|
|
|
apply (up until a "set noparent is encountered").
|
|
|
|
|
|
|
|
If "per-file glob=set noparent" is used, then global directives are ignored
|
|
|
|
for the glob, and only the "per-file" owners are used for files matching that
|
|
|
|
glob.
|
|
|
|
|
|
|
|
If the "file:" directive is used, the referred to OWNERS file will be parsed and
|
|
|
|
considered when determining the valid set of OWNERS. If the filename starts with
|
|
|
|
"//" it is relative to the root of the repository, otherwise it is relative to
|
|
|
|
the current file
|
|
|
|
|
|
|
|
Examples for all of these combinations can be found in tests/owners_unittest.py.
|
|
|
|
"""
|
|
|
|
|
|
|
|
import collections
|
|
|
|
import fnmatch
|
|
|
|
import random
|
|
|
|
import re
|
|
|
|
|
|
|
|
|
|
|
|
# If this is present by itself on a line, this means that everyone can review.
|
|
|
|
EVERYONE = '*'
|
|
|
|
|
|
|
|
|
|
|
|
# Recognizes 'X@Y' email addresses. Very simplistic.
|
|
|
|
BASIC_EMAIL_REGEXP = r'^[\w\-\+\%\.]+\@[\w\-\+\%\.]+$'
|
|
|
|
|
|
|
|
|
|
|
|
def _assert_is_collection(obj):
|
|
|
|
assert not isinstance(obj, basestring)
|
|
|
|
# Module 'collections' has no 'Iterable' member
|
|
|
|
# pylint: disable=no-member
|
|
|
|
if hasattr(collections, 'Iterable') and hasattr(collections, 'Sized'):
|
|
|
|
assert (isinstance(obj, collections.Iterable) and
|
|
|
|
isinstance(obj, collections.Sized))
|
|
|
|
|
|
|
|
|
|
|
|
class SyntaxErrorInOwnersFile(Exception):
|
|
|
|
def __init__(self, path, lineno, msg):
|
|
|
|
super(SyntaxErrorInOwnersFile, self).__init__((path, lineno, msg))
|
|
|
|
self.path = path
|
|
|
|
self.lineno = lineno
|
|
|
|
self.msg = msg
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
return '%s:%d syntax error: %s' % (self.path, self.lineno, self.msg)
|
|
|
|
|
|
|
|
|
|
|
|
class Database(object):
|
|
|
|
"""A database of OWNERS files for a repository.
|
|
|
|
|
|
|
|
This class allows you to find a suggested set of reviewers for a list
|
|
|
|
of changed files, and see if a list of changed files is covered by a
|
|
|
|
list of reviewers."""
|
|
|
|
|
|
|
|
def __init__(self, root, fopen, os_path):
|
|
|
|
"""Args:
|
|
|
|
root: the path to the root of the Repository
|
|
|
|
open: function callback to open a text file for reading
|
|
|
|
os_path: module/object callback with fields for 'abspath', 'dirname',
|
|
|
|
'exists', 'join', and 'relpath'
|
|
|
|
"""
|
|
|
|
self.root = root
|
|
|
|
self.fopen = fopen
|
|
|
|
self.os_path = os_path
|
|
|
|
|
|
|
|
# Pick a default email regexp to use; callers can override as desired.
|
|
|
|
self.email_regexp = re.compile(BASIC_EMAIL_REGEXP)
|
|
|
|
|
|
|
|
# Mapping of owners to the paths or globs they own.
|
|
|
|
self._owners_to_paths = {EVERYONE: set()}
|
|
|
|
|
|
|
|
# Mapping of paths to authorized owners.
|
|
|
|
self._paths_to_owners = {}
|
|
|
|
|
|
|
|
# Mapping reviewers to the preceding comment per file in the OWNERS files.
|
|
|
|
self.comments = {}
|
|
|
|
|
|
|
|
# Cache of compiled regexes for _fnmatch()
|
|
|
|
self._fnmatch_cache = {}
|
|
|
|
|
|
|
|
# Set of paths that stop us from looking above them for owners.
|
|
|
|
# (This is implicitly true for the root directory).
|
|
|
|
self._stop_looking = set([''])
|
|
|
|
|
|
|
|
# Set of files which have already been read.
|
|
|
|
self.read_files = set()
|
|
|
|
|
|
|
|
def reviewers_for(self, files, author):
|
|
|
|
"""Returns a suggested set of reviewers that will cover the files.
|
|
|
|
|
|
|
|
files is a sequence of paths relative to (and under) self.root.
|
|
|
|
If author is nonempty, we ensure it is not included in the set returned
|
|
|
|
in order avoid suggesting the author as a reviewer for their own changes."""
|
|
|
|
self._check_paths(files)
|
|
|
|
self.load_data_needed_for(files)
|
|
|
|
|
|
|
|
suggested_owners = self._covering_set_of_owners_for(files, author)
|
|
|
|
if EVERYONE in suggested_owners:
|
|
|
|
if len(suggested_owners) > 1:
|
|
|
|
suggested_owners.remove(EVERYONE)
|
|
|
|
else:
|
|
|
|
suggested_owners = set(['<anyone>'])
|
|
|
|
return suggested_owners
|
|
|
|
|
|
|
|
def files_not_covered_by(self, files, reviewers):
|
|
|
|
"""Returns the files not owned by one of the reviewers.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
files is a sequence of paths relative to (and under) self.root.
|
|
|
|
reviewers is a sequence of strings matching self.email_regexp.
|
|
|
|
"""
|
|
|
|
self._check_paths(files)
|
|
|
|
self._check_reviewers(reviewers)
|
|
|
|
self.load_data_needed_for(files)
|
|
|
|
|
|
|
|
return set(f for f in files if not self._is_obj_covered_by(f, reviewers))
|
|
|
|
|
|
|
|
def _check_paths(self, files):
|
|
|
|
def _is_under(f, pfx):
|
|
|
|
return self.os_path.abspath(self.os_path.join(pfx, f)).startswith(pfx)
|
|
|
|
_assert_is_collection(files)
|
|
|
|
assert all(not self.os_path.isabs(f) and
|
|
|
|
_is_under(f, self.os_path.abspath(self.root)) for f in files)
|
|
|
|
|
|
|
|
def _check_reviewers(self, reviewers):
|
|
|
|
_assert_is_collection(reviewers)
|
|
|
|
assert all(self.email_regexp.match(r) for r in reviewers)
|
|
|
|
|
|
|
|
def _is_obj_covered_by(self, objname, reviewers):
|
|
|
|
reviewers = list(reviewers) + [EVERYONE]
|
|
|
|
while True:
|
|
|
|
for reviewer in reviewers:
|
|
|
|
for owned_pattern in self._owners_to_paths.get(reviewer, set()):
|
|
|
|
if fnmatch.fnmatch(objname, owned_pattern):
|
|
|
|
return True
|
|
|
|
if self._should_stop_looking(objname):
|
|
|
|
break
|
|
|
|
objname = self.os_path.dirname(objname)
|
|
|
|
return False
|
|
|
|
|
|
|
|
def _enclosing_dir_with_owners(self, objname):
|
|
|
|
"""Returns the innermost enclosing directory that has an OWNERS file."""
|
|
|
|
dirpath = objname
|
|
|
|
while not self._owners_for(dirpath):
|
|
|
|
if self._should_stop_looking(dirpath):
|
|
|
|
break
|
|
|
|
dirpath = self.os_path.dirname(dirpath)
|
|
|
|
return dirpath
|
|
|
|
|
|
|
|
def load_data_needed_for(self, files):
|
|
|
|
for f in files:
|
|
|
|
dirpath = self.os_path.dirname(f)
|
|
|
|
while not self._owners_for(dirpath):
|
|
|
|
self._read_owners(self.os_path.join(dirpath, 'OWNERS'))
|
|
|
|
if self._should_stop_looking(dirpath):
|
|
|
|
break
|
|
|
|
dirpath = self.os_path.dirname(dirpath)
|
|
|
|
|
|
|
|
def _should_stop_looking(self, objname):
|
|
|
|
return any(self._fnmatch(objname, stop_looking)
|
|
|
|
for stop_looking in self._stop_looking)
|
|
|
|
|
|
|
|
def _owners_for(self, objname):
|
|
|
|
obj_owners = set()
|
|
|
|
for owned_path, path_owners in self._paths_to_owners.iteritems():
|
|
|
|
if self._fnmatch(objname, owned_path):
|
|
|
|
obj_owners |= path_owners
|
|
|
|
return obj_owners
|
|
|
|
|
|
|
|
def _read_owners(self, path):
|
|
|
|
owners_path = self.os_path.join(self.root, path)
|
|
|
|
if not self.os_path.exists(owners_path):
|
|
|
|
return
|
|
|
|
|
|
|
|
if owners_path in self.read_files:
|
|
|
|
return
|
|
|
|
|
|
|
|
self.read_files.add(owners_path)
|
|
|
|
|
|
|
|
comment = []
|
|
|
|
dirpath = self.os_path.dirname(path)
|
|
|
|
in_comment = False
|
|
|
|
lineno = 0
|
|
|
|
for line in self.fopen(owners_path):
|
|
|
|
lineno += 1
|
|
|
|
line = line.strip()
|
|
|
|
if line.startswith('#'):
|
|
|
|
if not in_comment:
|
|
|
|
comment = []
|
|
|
|
comment.append(line[1:].strip())
|
|
|
|
in_comment = True
|
|
|
|
continue
|
|
|
|
if line == '':
|
|
|
|
continue
|
|
|
|
in_comment = False
|
|
|
|
|
|
|
|
if line == 'set noparent':
|
|
|
|
self._stop_looking.add(dirpath)
|
|
|
|
continue
|
|
|
|
|
|
|
|
m = re.match('per-file (.+)=(.+)', line)
|
|
|
|
if m:
|
|
|
|
glob_string = m.group(1).strip()
|
|
|
|
directive = m.group(2).strip()
|
|
|
|
full_glob_string = self.os_path.join(self.root, dirpath, glob_string)
|
|
|
|
if '/' in glob_string or '\\' in glob_string:
|
|
|
|
raise SyntaxErrorInOwnersFile(owners_path, lineno,
|
|
|
|
'per-file globs cannot span directories or use escapes: "%s"' %
|
|
|
|
line)
|
|
|
|
relative_glob_string = self.os_path.relpath(full_glob_string, self.root)
|
|
|
|
self._add_entry(relative_glob_string, directive, 'per-file line',
|
|
|
|
owners_path, lineno, '\n'.join(comment))
|
|
|
|
continue
|
|
|
|
|
|
|
|
if line.startswith('set '):
|
|
|
|
raise SyntaxErrorInOwnersFile(owners_path, lineno,
|
|
|
|
'unknown option: "%s"' % line[4:].strip())
|
|
|
|
|
|
|
|
self._add_entry(dirpath, line, 'line', owners_path, lineno,
|
|
|
|
' '.join(comment))
|
|
|
|
|
|
|
|
def _add_entry(self, path, directive,
|
|
|
|
line_type, owners_path, lineno, comment):
|
|
|
|
if directive == 'set noparent':
|
|
|
|
self._stop_looking.add(path)
|
|
|
|
elif directive.startswith('file:'):
|
|
|
|
owners_file = self._resolve_include(directive[5:], owners_path)
|
|
|
|
if not owners_file:
|
|
|
|
raise SyntaxErrorInOwnersFile(owners_path, lineno,
|
|
|
|
('%s does not refer to an existing file.' % directive[5:]))
|
|
|
|
|
|
|
|
self._read_owners(owners_file)
|
|
|
|
|
|
|
|
dirpath = self.os_path.dirname(owners_file)
|
|
|
|
for key in self._owners_to_paths:
|
|
|
|
if not dirpath in self._owners_to_paths[key]:
|
|
|
|
continue
|
|
|
|
self._owners_to_paths[key].add(path)
|
|
|
|
|
|
|
|
if dirpath in self._paths_to_owners:
|
|
|
|
self._paths_to_owners.setdefault(path, set()).update(
|
|
|
|
self._paths_to_owners[dirpath])
|
|
|
|
|
|
|
|
elif self.email_regexp.match(directive) or directive == EVERYONE:
|
|
|
|
self.comments.setdefault(directive, {})
|
|
|
|
self.comments[directive][path] = comment
|
|
|
|
self._owners_to_paths.setdefault(directive, set()).add(path)
|
|
|
|
self._paths_to_owners.setdefault(path, set()).add(directive)
|
|
|
|
else:
|
|
|
|
raise SyntaxErrorInOwnersFile(owners_path, lineno,
|
|
|
|
('%s is not a "set" directive, file include, "*", '
|
|
|
|
'or an email address: "%s"' % (line_type, directive)))
|
|
|
|
|
|
|
|
def _resolve_include(self, path, start):
|
|
|
|
if path.startswith('//'):
|
|
|
|
include_path = path[2:]
|
|
|
|
else:
|
|
|
|
assert start.startswith(self.root)
|
|
|
|
start = self.os_path.dirname(self.os_path.relpath(start, self.root))
|
|
|
|
include_path = self.os_path.join(start, path)
|
|
|
|
|
|
|
|
owners_path = self.os_path.join(self.root, include_path)
|
|
|
|
if not self.os_path.exists(owners_path):
|
|
|
|
return None
|
|
|
|
|
|
|
|
return include_path
|
|
|
|
|
|
|
|
def _covering_set_of_owners_for(self, files, author):
|
|
|
|
dirs_remaining = set(self._enclosing_dir_with_owners(f) for f in files)
|
|
|
|
all_possible_owners = self.all_possible_owners(dirs_remaining, author)
|
|
|
|
suggested_owners = set()
|
|
|
|
while dirs_remaining:
|
|
|
|
owner = self.lowest_cost_owner(all_possible_owners, dirs_remaining)
|
|
|
|
suggested_owners.add(owner)
|
|
|
|
dirs_to_remove = set(el[0] for el in all_possible_owners[owner])
|
|
|
|
dirs_remaining -= dirs_to_remove
|
|
|
|
return suggested_owners
|
|
|
|
|
|
|
|
def all_possible_owners(self, dirs, author):
|
|
|
|
"""Returns a list of (potential owner, distance-from-dir) tuples; a
|
|
|
|
distance of 1 is the lowest/closest possible distance (which makes the
|
|
|
|
subsequent math easier)."""
|
|
|
|
all_possible_owners = {}
|
|
|
|
for current_dir in dirs:
|
|
|
|
dirname = current_dir
|
|
|
|
distance = 1
|
|
|
|
while True:
|
|
|
|
for owner in self._owners_for(dirname):
|
|
|
|
if author and owner == author:
|
|
|
|
continue
|
|
|
|
all_possible_owners.setdefault(owner, [])
|
|
|
|
# If the same person is in multiple OWNERS files above a given
|
|
|
|
# directory, only count the closest one.
|
|
|
|
if not any(current_dir == el[0] for el in all_possible_owners[owner]):
|
|
|
|
all_possible_owners[owner].append((current_dir, distance))
|
|
|
|
if self._should_stop_looking(dirname):
|
|
|
|
break
|
|
|
|
dirname = self.os_path.dirname(dirname)
|
|
|
|
distance += 1
|
|
|
|
return all_possible_owners
|
|
|
|
|
|
|
|
def _fnmatch(self, filename, pattern):
|
|
|
|
"""Same as fnmatch.fnmatch(), but interally caches the compiled regexes."""
|
|
|
|
matcher = self._fnmatch_cache.get(pattern)
|
|
|
|
if matcher is None:
|
|
|
|
matcher = re.compile(fnmatch.translate(pattern)).match
|
|
|
|
self._fnmatch_cache[pattern] = matcher
|
|
|
|
return matcher(filename)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def total_costs_by_owner(all_possible_owners, dirs):
|
|
|
|
# We want to minimize both the number of reviewers and the distance
|
|
|
|
# from the files/dirs needing reviews. The "pow(X, 1.75)" below is
|
|
|
|
# an arbitrarily-selected scaling factor that seems to work well - it
|
|
|
|
# will select one reviewer in the parent directory over three reviewers
|
|
|
|
# in subdirs, but not one reviewer over just two.
|
|
|
|
result = {}
|
|
|
|
for owner in all_possible_owners:
|
|
|
|
total_distance = 0
|
|
|
|
num_directories_owned = 0
|
|
|
|
for dirname, distance in all_possible_owners[owner]:
|
|
|
|
if dirname in dirs:
|
|
|
|
total_distance += distance
|
|
|
|
num_directories_owned += 1
|
|
|
|
if num_directories_owned:
|
|
|
|
result[owner] = (total_distance /
|
|
|
|
pow(num_directories_owned, 1.75))
|
|
|
|
return result
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def lowest_cost_owner(all_possible_owners, dirs):
|
|
|
|
total_costs_by_owner = Database.total_costs_by_owner(all_possible_owners,
|
|
|
|
dirs)
|
|
|
|
# Return the lowest cost owner. In the case of a tie, pick one randomly.
|
|
|
|
lowest_cost = min(total_costs_by_owner.itervalues())
|
|
|
|
lowest_cost_owners = filter(
|
|
|
|
lambda owner: total_costs_by_owner[owner] == lowest_cost,
|
|
|
|
total_costs_by_owner)
|
|
|
|
return random.Random().choice(lowest_cost_owners)
|