@ -3,6 +3,8 @@
# found in the LICENSE file.
""" Google OAuth2 related functions. """
from __future__ import annotations
import collections
import datetime
import functools
@ -10,6 +12,7 @@ import httplib2
import json
import logging
import os
from typing import Optional
import subprocess2
@ -58,6 +61,22 @@ class LoginRequiredError(Exception):
return ' luci-auth login -scopes " %s " ' % self . scopes
class GitLoginRequiredError ( Exception ) :
""" Interaction with the user is required to authenticate.
This is for git - credential - luci , not luci - auth .
"""
def __init__ ( self ) :
msg = ( ' You are not logged in. Please login first by running: \n '
' %s ' % self . login_command )
super ( GitLoginRequiredError , self ) . __init__ ( msg )
@property
def login_command ( self ) - > str :
return ' git-credential-luci login '
def has_luci_context_local_auth ( ) :
""" Returns whether LUCI_CONTEXT should be used for ambient authentication. """
ctx_path = os . environ . get ( ' LUCI_CONTEXT ' )
@ -201,3 +220,45 @@ class Authenticator(object):
# stdout/stderr.
logging . error ( ' luci-auth token failed: %s ' , e )
return None
class GerritAuthenticator ( object ) :
""" Object that knows how to refresh access tokens for Gerrit.
Unlike Authenticator , this is specifically for authenticating Gerrit
requests .
"""
def __init__ ( self ) :
self . _access_token : Optional [ str ] = None
def get_access_token ( self ) - > str :
""" Returns AccessToken, refreshing it if necessary.
Raises :
GitLoginRequiredError if user interaction is required .
"""
access_token = self . _get_luci_auth_token ( )
if access_token :
return access_token
logging . debug ( ' Failed to create access token ' )
raise GitLoginRequiredError ( )
def _get_luci_auth_token ( self , use_id_token = False ) - > Optional [ str ] :
logging . debug ( ' Running git-credential-luci ' )
try :
out , err = subprocess2 . check_call_out (
[ ' git-credential-luci ' , ' get ' ] ,
stdout = subprocess2 . PIPE ,
stderr = subprocess2 . PIPE )
logging . debug ( ' git-credential-luci stderr: \n %s ' , err )
for line in out . decode ( ) . splitlines ( ) :
if line . startswith ( ' password= ' ) :
return line [ len ( ' password= ' ) : ] . rstrip ( )
logging . error ( ' git-credential-luci did not return a token ' )
return None
except subprocess2 . CalledProcessError as e :
# subprocess2.CalledProcessError.__str__ nicely formats
# stdout/stderr.
logging . error ( ' git-credential-luci failed: %s ' , e )
return None