@ -52,8 +52,8 @@ class AuthenticatorTest(unittest.TestCase):
def testGetAccessToken_CachedToken ( self ) :
authenticator = auth . Authenticator ( )
authenticator . _access_token = auth . Access Token( ' token ' , None )
self . assertEqual ( auth . Access Token( ' token ' , None ) ,
authenticator . _access_token = auth . Token( ' token ' , None )
self . assertEqual ( auth . Token( ' token ' , None ) ,
authenticator . get_access_token ( ) )
subprocess2 . check_call_out . assert_not_called ( )
@ -63,7 +63,7 @@ class AuthenticatorTest(unittest.TestCase):
' token ' : ' token ' ,
' expiry ' : expiry
} ) , ' ' )
self . assertEqual ( auth . Access Token( ' token ' , VALID_EXPIRY ) ,
self . assertEqual ( auth . Token( ' token ' , VALID_EXPIRY ) ,
auth . Authenticator ( ) . get_access_token ( ) )
subprocess2 . check_call_out . assert_called_with ( [
' luci-auth ' , ' token ' , ' -scopes ' , auth . OAUTH_SCOPE_EMAIL ,
@ -78,7 +78,7 @@ class AuthenticatorTest(unittest.TestCase):
' token ' : ' token ' ,
' expiry ' : expiry
} ) , ' ' )
self . assertEqual ( auth . Access Token( ' token ' , VALID_EXPIRY ) ,
self . assertEqual ( auth . Token( ' token ' , VALID_EXPIRY ) ,
auth . Authenticator ( ' custom scopes ' ) . get_access_token ( ) )
subprocess2 . check_call_out . assert_called_with ( [
' luci-auth ' , ' token ' , ' -scopes ' , ' custom scopes ' , ' -json-output ' ,
@ -87,41 +87,92 @@ class AuthenticatorTest(unittest.TestCase):
stdout = subprocess2 . PIPE ,
stderr = subprocess2 . PIPE )
def testAuthorize ( self ) :
def testAuthorize _AccessToken ( self ) :
http = mock . Mock ( )
http_request = http . request
http_request . __name__ = ' __name__ '
authenticator = auth . Authenticator ( )
authenticator . _access_token = auth . AccessToken ( ' token ' , None )
authenticator . _access_token = auth . Token ( ' access_token ' , None )
authenticator . _id_token = auth . Token ( ' id_token ' , None )
authorized = authenticator . authorize ( http )
authorized . request ( ' https://example.com ' ,
method = ' POST ' ,
body = ' body ' ,
headers = { ' header ' : ' value ' } )
http_request . assert_called_once_with ( ' https://example.com ' , ' POST ' ,
' body ' , {
' header ' : ' value ' ,
' Authorization ' : ' Bearer token '
} , mock . ANY , mock . ANY )
http_request . assert_called_once_with (
' https://example.com ' , ' POST ' , ' body ' , {
' header ' : ' value ' ,
' Authorization ' : ' Bearer access_token '
} , mock . ANY , mock . ANY )
def testGetIdToken_NotLoggedIn ( self ) :
subprocess2 . check_call_out . side_effect = [
subprocess2 . CalledProcessError ( 1 , [ ' cmd ' ] , ' cwd ' , ' stdout ' ,
' stderr ' )
]
self . assertRaises ( auth . LoginRequiredError ,
auth . Authenticator ( ) . get_id_token )
def testGetIdToken_CachedToken ( self ) :
authenticator = auth . Authenticator ( )
authenticator . _id_token = auth . Token ( ' token ' , None )
self . assertEqual ( auth . Token ( ' token ' , None ) ,
authenticator . get_id_token ( ) )
subprocess2 . check_call_out . assert_not_called ( )
def testGetIdToken_LoggedIn ( self ) :
expiry = calendar . timegm ( VALID_EXPIRY . timetuple ( ) )
subprocess2 . check_call_out . return_value = ( json . dumps ( {
' token ' : ' token ' ,
' expiry ' : expiry
} ) , ' ' )
self . assertEqual (
auth . Token ( ' token ' , VALID_EXPIRY ) ,
auth . Authenticator ( audience = ' https://test.com ' ) . get_id_token ( ) )
subprocess2 . check_call_out . assert_called_with ( [
' luci-auth ' , ' token ' , ' -use-id-token ' , ' -audience ' ,
' https://test.com ' , ' -json-output ' , ' - '
] ,
stdout = subprocess2 . PIPE ,
stderr = subprocess2 . PIPE )
def testAuthorize_IdToken ( self ) :
http = mock . Mock ( )
http_request = http . request
http_request . __name__ = ' __name__ '
authenticator = auth . Authenticator ( )
authenticator . _access_token = auth . Token ( ' access_token ' , None )
authenticator . _id_token = auth . Token ( ' id_token ' , None )
authorized = authenticator . authorize ( http , use_id_token = True )
authorized . request ( ' https://example.com ' ,
method = ' POST ' ,
body = ' body ' ,
headers = { ' header ' : ' value ' } )
http_request . assert_called_once_with (
' https://example.com ' , ' POST ' , ' body ' , {
' header ' : ' value ' ,
' Authorization ' : ' Bearer id_token '
} , mock . ANY , mock . ANY )
class AccessTokenTest ( unittest . TestCase ) :
class TokenTest( unittest . TestCase ) :
def setUp ( self ) :
mock . patch ( ' auth.datetime_now ' , return_value = NOW ) . start ( )
self . addCleanup ( mock . patch . stopall )
def testNeedsRefresh_NoExpiry ( self ) :
self . assertFalse ( auth . AccessToken ( ' token ' , None ) . needs_refresh ( ) )
self . assertFalse ( auth . Token( ' token ' , None ) . needs_refresh ( ) )
def testNeedsRefresh_Expired ( self ) :
expired = NOW + datetime . timedelta ( seconds = 30 )
self . assertTrue ( auth . AccessToken ( ' token ' , expired ) . needs_refresh ( ) )
self . assertTrue ( auth . Token( ' token ' , expired ) . needs_refresh ( ) )
def testNeedsRefresh_Valid ( self ) :
self . assertFalse (
auth . AccessToken ( ' token ' , VALID_EXPIRY ) . needs_refresh ( ) )
self . assertFalse ( auth . Token ( ' token ' , VALID_EXPIRY ) . needs_refresh ( ) )
class HasLuciContextLocalAuthTest ( unittest . TestCase ) :