You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			142 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			142 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
#!/usr/bin/env python3
 | 
						|
# Copyright (c) 2011 The Chromium Authors. All rights reserved.
 | 
						|
# Use of this source code is governed by a BSD-style license that can be
 | 
						|
# found in the LICENSE file.
 | 
						|
"""Watchlists
 | 
						|
 | 
						|
Watchlists is a mechanism that allow a developer (a "watcher") to watch over
 | 
						|
portions of code that they are interested in. A "watcher" will be cc-ed to
 | 
						|
changes that modify that portion of code, thereby giving them an opportunity
 | 
						|
to make comments on chromium-review.googlesource.com even before the change is
 | 
						|
committed.
 | 
						|
Refer:
 | 
						|
https://chromium.googlesource.com/chromium/src/+/HEAD/docs/infra/watchlists.md
 | 
						|
 | 
						|
When invoked directly from the base of a repository, this script lists out
 | 
						|
the watchers for files given on the command line. This is useful to verify
 | 
						|
changes to WATCHLISTS files.
 | 
						|
"""
 | 
						|
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import re
 | 
						|
import sys
 | 
						|
 | 
						|
 | 
						|
class Watchlists(object):
 | 
						|
    """Manage Watchlists.
 | 
						|
 | 
						|
  This class provides mechanism to load watchlists for a repo and identify
 | 
						|
  watchers.
 | 
						|
  Usage:
 | 
						|
    wl = Watchlists("/path/to/repo/root")
 | 
						|
    watchers = wl.GetWatchersForPaths(["/path/to/file1",
 | 
						|
                                       "/path/to/file2",])
 | 
						|
  """
 | 
						|
 | 
						|
    _RULES = "WATCHLISTS"
 | 
						|
    _RULES_FILENAME = _RULES
 | 
						|
    _repo_root = None
 | 
						|
    _defns = {}  # Definitions
 | 
						|
    _path_regexps = {}  # Name -> Regular expression mapping
 | 
						|
    _watchlists = {}  # name to email mapping
 | 
						|
 | 
						|
    def __init__(self, repo_root):
 | 
						|
        self._repo_root = repo_root
 | 
						|
        self._LoadWatchlistRules()
 | 
						|
 | 
						|
    def _GetRulesFilePath(self):
 | 
						|
        """Returns path to WATCHLISTS file."""
 | 
						|
        return os.path.join(self._repo_root, self._RULES_FILENAME)
 | 
						|
 | 
						|
    def _HasWatchlistsFile(self):
 | 
						|
        """Determine if watchlists are available for this repo."""
 | 
						|
        return os.path.exists(self._GetRulesFilePath())
 | 
						|
 | 
						|
    def _ContentsOfWatchlistsFile(self):
 | 
						|
        """Read the WATCHLISTS file and return its contents."""
 | 
						|
        try:
 | 
						|
            watchlists_file = open(self._GetRulesFilePath())
 | 
						|
            contents = watchlists_file.read()
 | 
						|
            watchlists_file.close()
 | 
						|
            return contents
 | 
						|
        except IOError as e:
 | 
						|
            logging.error("Cannot read %s: %s" % (self._GetRulesFilePath(), e))
 | 
						|
            return ''
 | 
						|
 | 
						|
    def _LoadWatchlistRules(self):
 | 
						|
        """Load watchlists from WATCHLISTS file. Does nothing if not present."""
 | 
						|
        if not self._HasWatchlistsFile():
 | 
						|
            return
 | 
						|
 | 
						|
        contents = self._ContentsOfWatchlistsFile()
 | 
						|
        watchlists_data = None
 | 
						|
        try:
 | 
						|
            watchlists_data = eval(contents, {'__builtins__': None}, None)
 | 
						|
        except SyntaxError as e:
 | 
						|
            logging.error("Cannot parse %s. %s" % (self._GetRulesFilePath(), e))
 | 
						|
            return
 | 
						|
 | 
						|
        defns = watchlists_data.get("WATCHLIST_DEFINITIONS")
 | 
						|
        if not defns:
 | 
						|
            logging.error("WATCHLIST_DEFINITIONS not defined in %s" %
 | 
						|
                          self._GetRulesFilePath())
 | 
						|
            return
 | 
						|
        watchlists = watchlists_data.get("WATCHLISTS")
 | 
						|
        if not watchlists:
 | 
						|
            logging.error("WATCHLISTS not defined in %s" %
 | 
						|
                          self._GetRulesFilePath())
 | 
						|
            return
 | 
						|
        self._defns = defns
 | 
						|
        self._watchlists = watchlists
 | 
						|
 | 
						|
        # Compile the regular expressions ahead of time to avoid creating them
 | 
						|
        # on-the-fly multiple times per file.
 | 
						|
        self._path_regexps = {}
 | 
						|
        for name, rule in defns.items():
 | 
						|
            filepath = rule.get('filepath')
 | 
						|
            if not filepath:
 | 
						|
                continue
 | 
						|
            self._path_regexps[name] = re.compile(filepath)
 | 
						|
 | 
						|
        # Verify that all watchlist names are defined
 | 
						|
        for name in watchlists:
 | 
						|
            if name not in defns:
 | 
						|
                logging.error("%s not defined in %s" %
 | 
						|
                              (name, self._GetRulesFilePath()))
 | 
						|
 | 
						|
    def GetWatchersForPaths(self, paths):
 | 
						|
        """Fetch the list of watchers for |paths|
 | 
						|
 | 
						|
    Args:
 | 
						|
      paths: [path1, path2, ...]
 | 
						|
 | 
						|
    Returns:
 | 
						|
      [u1@chromium.org, u2@gmail.com, ...]
 | 
						|
    """
 | 
						|
        watchers = set()  # A set, to avoid duplicates
 | 
						|
        for path in paths:
 | 
						|
            path = path.replace(os.sep, '/')
 | 
						|
            for name, rule in self._path_regexps.items():
 | 
						|
                if name not in self._watchlists:
 | 
						|
                    continue
 | 
						|
                if rule.search(path):
 | 
						|
                    for watchlist in self._watchlists[name]:
 | 
						|
                        watchers.add(watchlist)
 | 
						|
        return sorted(watchers)
 | 
						|
 | 
						|
 | 
						|
def main(argv):
 | 
						|
    # Confirm that watchlists can be parsed and spew out the watchers
 | 
						|
    if len(argv) < 2:
 | 
						|
        print("Usage (from the base of repo):")
 | 
						|
        print("  %s [file-1] [file-2] ...." % argv[0])
 | 
						|
        return 1
 | 
						|
    wl = Watchlists(os.getcwd())
 | 
						|
    watchers = wl.GetWatchersForPaths(argv[1:])
 | 
						|
    print(watchers)
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    main(sys.argv)
 |