You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
depot_tools/tests/auth_test.py

339 lines
13 KiB
Python

#!/usr/bin/env vpython3
# Copyright (c) 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit Tests for auth.py"""
import calendar
import datetime
import json
import os
import unittest
import sys
from unittest import mock
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import auth
import subprocess2
NOW = datetime.datetime(2019, 10, 17, 12, 30, 59, 0)
VALID_EXPIRY = NOW + datetime.timedelta(seconds=31)
class AuthenticatorTest(unittest.TestCase):
def setUp(self):
mock.patch('subprocess2.check_call').start()
mock.patch('subprocess2.check_call_out').start()
mock.patch('auth.datetime_now', return_value=NOW).start()
self.addCleanup(mock.patch.stopall)
def testHasCachedCredentials_NotLoggedIn(self):
subprocess2.check_call_out.side_effect = [
subprocess2.CalledProcessError(1, ['cmd'], 'cwd', 'stdout',
'stderr')
]
self.assertFalse(auth.Authenticator().has_cached_credentials())
def testHasCachedCredentials_LoggedIn(self):
subprocess2.check_call_out.return_value = (json.dumps({
'token': 'token',
'expiry': 12345678
}), '')
self.assertTrue(auth.Authenticator().has_cached_credentials())
def testGetAccessToken_NotLoggedIn(self):
subprocess2.check_call_out.side_effect = [
subprocess2.CalledProcessError(1, ['cmd'], 'cwd', 'stdout',
'stderr')
]
self.assertRaises(auth.LoginRequiredError,
auth.Authenticator().get_access_token)
def testGetAccessToken_CachedToken(self):
authenticator = auth.Authenticator()
authenticator._access_token = auth.Token('token', None)
self.assertEqual(auth.Token('token', None),
authenticator.get_access_token())
subprocess2.check_call_out.assert_not_called()
def testGetAccesstoken_LoggedIn(self):
expiry = calendar.timegm(VALID_EXPIRY.timetuple())
subprocess2.check_call_out.return_value = (json.dumps({
'token': 'token',
'expiry': expiry
}), '')
self.assertEqual(auth.Token('token', VALID_EXPIRY),
auth.Authenticator().get_access_token())
subprocess2.check_call_out.assert_called_with([
'luci-auth', 'token', '-scopes', auth.OAUTH_SCOPE_EMAIL,
'-json-output', '-'
],
stdout=subprocess2.PIPE,
stderr=subprocess2.PIPE)
def testGetAccessToken_DifferentScope(self):
expiry = calendar.timegm(VALID_EXPIRY.timetuple())
subprocess2.check_call_out.return_value = (json.dumps({
'token': 'token',
'expiry': expiry
}), '')
self.assertEqual(auth.Token('token', VALID_EXPIRY),
auth.Authenticator('custom scopes').get_access_token())
subprocess2.check_call_out.assert_called_with([
'luci-auth', 'token', '-scopes', 'custom scopes', '-json-output',
'-'
],
stdout=subprocess2.PIPE,
stderr=subprocess2.PIPE)
def testAuthorize_AccessToken(self):
http = mock.Mock()
http_request = http.request
http_request.__name__ = '__name__'
authenticator = auth.Authenticator()
authenticator._access_token = auth.Token('access_token', None)
authenticator._id_token = auth.Token('id_token', None)
authorized = authenticator.authorize(http)
authorized.request('https://example.com',
method='POST',
body='body',
headers={'header': 'value'})
http_request.assert_called_once_with(
'https://example.com', 'POST', 'body', {
'header': 'value',
'Authorization': 'Bearer access_token'
}, mock.ANY, mock.ANY)
def testGetIdToken_NotLoggedIn(self):
subprocess2.check_call_out.side_effect = [
subprocess2.CalledProcessError(1, ['cmd'], 'cwd', 'stdout',
'stderr')
]
self.assertRaises(auth.LoginRequiredError,
auth.Authenticator().get_id_token)
def testGetIdToken_CachedToken(self):
authenticator = auth.Authenticator()
authenticator._id_token = auth.Token('token', None)
self.assertEqual(auth.Token('token', None),
authenticator.get_id_token())
subprocess2.check_call_out.assert_not_called()
def testGetIdToken_LoggedIn(self):
expiry = calendar.timegm(VALID_EXPIRY.timetuple())
subprocess2.check_call_out.return_value = (json.dumps({
'token': 'token',
'expiry': expiry
}), '')
self.assertEqual(
auth.Token('token', VALID_EXPIRY),
auth.Authenticator(audience='https://test.com').get_id_token())
subprocess2.check_call_out.assert_called_with([
'luci-auth', 'token', '-use-id-token', '-audience',
'https://test.com', '-json-output', '-'
],
stdout=subprocess2.PIPE,
stderr=subprocess2.PIPE)
def testAuthorize_IdToken(self):
http = mock.Mock()
http_request = http.request
http_request.__name__ = '__name__'
authenticator = auth.Authenticator()
authenticator._access_token = auth.Token('access_token', None)
authenticator._id_token = auth.Token('id_token', None)
authorized = authenticator.authorize(http, use_id_token=True)
authorized.request('https://example.com',
method='POST',
body='body',
headers={'header': 'value'})
http_request.assert_called_once_with(
'https://example.com', 'POST', 'body', {
'header': 'value',
'Authorization': 'Bearer id_token'
}, mock.ANY, mock.ANY)
class TokenTest(unittest.TestCase):
def setUp(self):
mock.patch('auth.datetime_now', return_value=NOW).start()
self.addCleanup(mock.patch.stopall)
def testNeedsRefresh_NoExpiry(self):
self.assertFalse(auth.Token('token', None).needs_refresh())
def testNeedsRefresh_Expired(self):
expired = NOW + datetime.timedelta(seconds=30)
self.assertTrue(auth.Token('token', expired).needs_refresh())
def testNeedsRefresh_Valid(self):
self.assertFalse(auth.Token('token', VALID_EXPIRY).needs_refresh())
class HasLuciContextLocalAuthTest(unittest.TestCase):
def setUp(self):
mock.patch('os.environ').start()
mock.patch('builtins.open', mock.mock_open()).start()
self.addCleanup(mock.patch.stopall)
def testNoLuciContextEnvVar(self):
os.environ = {}
self.assertFalse(auth.has_luci_context_local_auth())
def testNonexistentPath(self):
os.environ = {'LUCI_CONTEXT': 'path'}
open.side_effect = OSError
self.assertFalse(auth.has_luci_context_local_auth())
open.assert_called_with('path')
def testInvalidJsonFile(self):
os.environ = {'LUCI_CONTEXT': 'path'}
open().read.return_value = 'not-a-json-file'
self.assertFalse(auth.has_luci_context_local_auth())
open.assert_called_with('path')
def testNoLocalAuth(self):
os.environ = {'LUCI_CONTEXT': 'path'}
open().read.return_value = '{}'
self.assertFalse(auth.has_luci_context_local_auth())
open.assert_called_with('path')
def testNoDefaultAccountId(self):
os.environ = {'LUCI_CONTEXT': 'path'}
open().read.return_value = json.dumps({
'local_auth': {
'secret':
'secret',
'accounts': [{
'email': 'bots@account.iam.gserviceaccount.com',
'id': 'system',
}],
'rpc_port':
1234,
}
})
self.assertFalse(auth.has_luci_context_local_auth())
open.assert_called_with('path')
def testHasLocalAuth(self):
os.environ = {'LUCI_CONTEXT': 'path'}
open().read.return_value = json.dumps({
'local_auth': {
'secret':
'secret',
'accounts': [
{
'email': 'bots@account.iam.gserviceaccount.com',
'id': 'system',
},
{
'email': 'builder@account.iam.gserviceaccount.com',
'id': 'task',
},
],
'rpc_port':
1234,
'default_account_id':
'task',
},
})
self.assertTrue(auth.has_luci_context_local_auth())
open.assert_called_with('path')
class GerritAuthenticatorTest(unittest.TestCase):
def setUp(self):
mock.patch('subprocess2.communicate').start()
self.addCleanup(mock.patch.stopall)
self.authenticator = auth.GerritAuthenticator()
def _set_gcl_result(self, *, exitcode, stdout, stderr):
subprocess2.communicate.return_value = (
(stdout, stderr),
exitcode,
)
def testGetAccessToken(self):
self._set_gcl_result(exitcode=0,
stdout=b"username=git-luci\npassword=decacafe\n",
stderr=b"")
out = self.authenticator.get_access_token()
# Check ReAuth is disabled for get_acess_token().
subprocess2.communicate.assert_called()
args, kwargs = subprocess2.communicate.call_args
self.assertEqual(args[0], ['git-credential-luci', 'get'])
self.assertEqual(kwargs["env"]["LUCI_ENABLE_REAUTH"], '0')
# Check that the access token is extracted correctly.
self.assertEqual(out, "decacafe")
def testGetAccessTokenRequiresLogin(self):
self._set_gcl_result(exitcode=2,
stdout=b"",
stderr=b"interactive login required")
with self.assertRaises(auth.GitLoginRequiredError):
self.authenticator.get_access_token()
def testGetAuthorizationHeader_ReAuthToken(self):
self._set_gcl_result(exitcode=0,
stdout=b"authtype=BearerReAuth\ncredential=cafe\n",
stderr=b"")
context = auth.ReAuthContext(host="chromium", project="infra/infra")
out = self.authenticator.get_authorization_header(context)
# Check we didn't disable ReAuth.
subprocess2.communicate.assert_called()
args, kwargs = subprocess2.communicate.call_args
self.assertEqual(args[0], ['git-credential-luci', 'get'])
self.assertNotIn("env", kwargs)
# Check we pass ReAuth context to `git-credential-luci`.
self.assertEqual(
b'capability[]=authtype\nprotocol=https\nhost=chromium\npath=infra/infra\n',
kwargs["stdin"])
# Check the token is extracted correctly.
self.assertEqual(out, "BearerReAuth cafe")
def testGetAuthorizationHeader_ReAuthNotNeeded(self):
self._set_gcl_result(exitcode=0,
stdout=b"username=git-luci\npassword=decacafe\n",
stderr=b"")
context = auth.ReAuthContext(host="chromium",
project="infra/experimental")
out = self.authenticator.get_authorization_header(context)
# Check the access token is extracted correctly.
self.assertEqual(out, "Bearer decacafe")
def testGetAuthorizationHeader_ReAuthRequired(self):
self._set_gcl_result(exitcode=3, stdout=b"", stderr=b"ReAuth required")
context = auth.ReAuthContext(host="chromium", project="infra/infra")
with self.assertRaises(auth.GitReAuthRequiredError):
self.authenticator.get_authorization_header(context)
def testGetAuthorizationHeader_LoginRequired(self):
self._set_gcl_result(exitcode=2,
stdout=b"",
stderr=b"interactive login required")
context = auth.ReAuthContext(host="chromium", project="infra/infra")
with self.assertRaises(auth.GitLoginRequiredError):
self.authenticator.get_authorization_header(context)
if __name__ == '__main__':
if '-v' in sys.argv:
logging.basicConfig(level=logging.DEBUG)
unittest.main()