[git_common] Fix all type checking errors

Bug: b/351071334
Change-Id: I130300bebd25a62a14ea10202ea9d369f825ed50
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/5757941
Reviewed-by: Yiwei Zhang <yiwzhang@google.com>
Commit-Queue: Allen Li <ayatane@chromium.org>
changes/41/5757941/5
Allen Li 12 months ago committed by LUCI CQ
parent b4e97035af
commit cdf5599e67

@ -40,6 +40,9 @@ import signal
import tempfile
import textwrap
import time
import typing
from typing import AnyStr
from typing import Optional
from typing import Tuple
import scm
@ -369,8 +372,9 @@ def branch_config_map(option):
try:
reg = re.compile(r'^branch\.(.*)\.%s$' % option)
return {
reg.match(k).group(1): v
m.group(1): v
for k, v in get_config_regexp(reg.pattern)
if (m := reg.match(k)) is not None
}
except subprocess2.CalledProcessError:
return {}
@ -382,7 +386,9 @@ def branches(use_limit=True, *args):
key = 'depot-tools.branch-limit'
limit = get_config_int(key, 20)
raw_branches = run('branch', *args).splitlines()
output = run('branch', *args)
assert isinstance(output, str)
raw_branches = output.splitlines()
num = len(raw_branches)
@ -407,10 +413,14 @@ def branches(use_limit=True, *args):
def get_config(option: str, default: Optional[str] = None) -> Optional[str]:
return scm.GIT.GetConfig(os.getcwd(), option, default)
def get_config_int(option, default=0):
def get_config_int(option: str, default: int = 0) -> int:
assert isinstance(default, int)
val = get_config(option)
if val is None:
return default
try:
return int(get_config(option, default))
return int(val)
except ValueError:
return default
@ -426,6 +436,7 @@ def get_config_regexp(pattern):
def is_fsmonitor_enabled():
"""Returns true if core.fsmonitor is enabled in git config."""
fsmonitor = get_config('core.fsmonitor', 'False')
assert isinstance(fsmonitor, str)
return fsmonitor.strip().lower() == 'true'
@ -435,6 +446,7 @@ def warn_submodule():
# they have fsmonitor enabled.
if sys.platform.startswith('darwin') and is_fsmonitor_enabled():
version_string = run('--version')
assert isinstance(version_string, str)
if version_string.endswith('goog'):
return
version_tuple = _extract_git_tuple(version_string)
@ -458,11 +470,11 @@ def current_branch():
return None
def del_branch_config(branch, option, scope='local'):
def del_branch_config(branch, option, scope: scm.GitConfigScope = 'local'):
del_config('branch.%s.%s' % (branch, option), scope=scope)
def del_config(option, scope='local'):
def del_config(option, scope: scm.GitConfigScope = 'local'):
try:
scm.GIT.SetConfig(os.getcwd(), option, scope=scope)
except subprocess2.CalledProcessError:
@ -583,7 +595,7 @@ def get_branch_tree(use_limit=False):
return skipped, branch_tree
def get_or_create_merge_base(branch, parent=None) -> str:
def get_or_create_merge_base(branch, parent=None) -> Optional[str]:
"""Finds the configured merge base for branch.
If parent is supplied, it's used instead of calling upstream(branch).
@ -593,7 +605,8 @@ def get_or_create_merge_base(branch, parent=None) -> str:
parent = parent or upstream(branch)
if parent is None or branch is None:
return None
actual_merge_base: str = run('merge-base', parent, branch)
actual_merge_base = run('merge-base', parent, branch)
assert isinstance(actual_merge_base, str)
if base_upstream != parent:
base = None
@ -635,6 +648,7 @@ def hash_one(reflike, short=False):
def in_rebase():
git_dir = run('rev-parse', '--git-dir')
assert isinstance(git_dir, str)
return (os.path.exists(os.path.join(git_dir, 'rebase-merge'))
or os.path.exists(os.path.join(git_dir, 'rebase-apply')))
@ -761,7 +775,7 @@ def repo_root():
return run('rev-parse', '--show-toplevel')
def upstream_default():
def upstream_default() -> str:
"""Returns the default branch name of the origin repository."""
try:
ret = run('rev-parse', '--abbrev-ref', 'origin/HEAD')
@ -773,6 +787,7 @@ def upstream_default():
ret = run('rev-parse', '--abbrev-ref', 'origin/HEAD')
except subprocess2.CalledProcessError:
pass
assert isinstance(ret, str)
return ret
except subprocess2.CalledProcessError:
return 'origin/main'
@ -802,10 +817,11 @@ def less(): # pragma: no cover
# -R: Don't escape ANSI color codes.
# -X: Don't clear the screen before starting.
cmd = ('less', '-FRX')
proc = subprocess2.Popen(cmd, stdin=subprocess2.PIPE)
try:
proc = subprocess2.Popen(cmd, stdin=subprocess2.PIPE)
yield proc.stdin
finally:
assert proc.stdin is not None
try:
proc.stdin.close()
except BrokenPipeError:
@ -828,7 +844,7 @@ def run_with_retcode(*cmd, **kwargs):
return cpe.returncode
def run_stream(*cmd, **kwargs):
def run_stream(*cmd, **kwargs) -> typing.IO[AnyStr]:
"""Runs a git command. Returns stdout as a PIPE (file-like object).
stderr is dropped to avoid races if the process outputs to both stdout and
@ -839,6 +855,7 @@ def run_stream(*cmd, **kwargs):
kwargs.setdefault('shell', False)
cmd = (GIT_EXE, '-c', 'color.ui=never') + cmd
proc = subprocess2.Popen(cmd, **kwargs)
assert proc.stdout is not None
return proc.stdout
@ -855,8 +872,8 @@ def run_stream_with_retcode(*cmd, **kwargs):
kwargs.setdefault('stdout', subprocess2.PIPE)
kwargs.setdefault('shell', False)
cmd = (GIT_EXE, '-c', 'color.ui=never') + cmd
proc = subprocess2.Popen(cmd, **kwargs)
try:
proc = subprocess2.Popen(cmd, **kwargs)
yield proc.stdout
finally:
retcode = proc.wait()
@ -918,28 +935,31 @@ def _run_with_stderr(*cmd, **kwargs) -> Tuple[str, str] | Tuple[bytes, bytes]:
cmd = (GIT_EXE, '-c', 'color.ui=never') + cmd
proc = subprocess2.Popen(cmd, **kwargs)
ret, err = proc.communicate(indata)
stdout, stderr = proc.communicate(indata)
retcode = proc.wait()
if retcode not in accepted_retcodes:
raise subprocess2.CalledProcessError(retcode, cmd, os.getcwd(), ret,
err)
raise subprocess2.CalledProcessError(retcode, cmd, os.getcwd(), stdout,
stderr)
if autostrip:
ret = (ret or b'').strip()
err = (err or b'').strip()
stdout = (stdout or b'').strip()
stderr = (stderr or b'').strip()
if decode:
ret = ret.decode('utf-8', 'replace')
err = err.decode('utf-8', 'replace')
return stdout.decode('utf-8',
'replace'), stderr.decode('utf-8', 'replace')
return ret, err
return stdout, stderr
def set_branch_config(branch, option, value, scope='local'):
def set_branch_config(branch,
option,
value,
scope: scm.GitConfigScope = 'local'):
set_config('branch.%s.%s' % (branch, option), value, scope=scope)
def set_config(option, value, scope='local'):
def set_config(option, value, scope: scm.GitConfigScope = 'local'):
scm.GIT.SetConfig(os.getcwd(), option, value, scope=scope)
@ -1028,8 +1048,9 @@ def squash_current_branch(header=None, merge_base=None):
log_msg = header + '\n'
if log_msg:
log_msg += '\n'
log_msg += run('log', '--reverse', '--format=%H%n%B',
'%s..HEAD' % merge_base)
output = run('log', '--reverse', '--format=%H%n%B', '%s..HEAD' % merge_base)
assert isinstance(output, str)
log_msg += output
run('reset', '--soft', merge_base)
if not get_dirty_files():
@ -1060,6 +1081,7 @@ def thaw():
for sha in stream:
sha = sha.strip().decode('utf-8')
msg = run('show', '--format=%f%b', '-s', 'HEAD', '--')
assert isinstance(msg, str)
match = FREEZE_MATCHER.match(msg)
if not match:
if not took_action:
@ -1203,6 +1225,7 @@ def get_branches_info(include_tracking_status):
info_map = {}
data = run('for-each-ref', format_string, 'refs/heads')
assert isinstance(data, str)
BranchesInfo = collections.namedtuple('BranchesInfo',
'hash upstream commits behind')
for line in data.splitlines():

Loading…
Cancel
Save