diff --git a/auth.py b/auth.py index 1123cf70c..204cfb7d4 100644 --- a/auth.py +++ b/auth.py @@ -57,7 +57,15 @@ class LoginRequiredError(Exception): def has_luci_context_local_auth(): """Returns whether LUCI_CONTEXT should be used for ambient authentication.""" - return bool(os.environ.get('LUCI_CONTEXT')) + ctx_path = os.environ.get('LUCI_CONTEXT') + if not ctx_path: + return False + try: + with open(ctx_path) as f: + loaded = json.load(f) + except (OSError, IOError, ValueError): + return False + return loaded.get('local_auth', {}).get('default_account_id') is not None class Authenticator(object): diff --git a/tests/auth_test.py b/tests/auth_test.py index bdd7add1d..f8e18fd70 100755 --- a/tests/auth_test.py +++ b/tests/auth_test.py @@ -19,6 +19,11 @@ from third_party import mock import auth import subprocess2 +if sys.version_info.major == 2: + BUILTIN_OPEN = '__builtin__.open' +else: + BUILTIN_OPEN = 'builtins.open' + NOW = datetime.datetime(2019, 10, 17, 12, 30, 59, 0) VALID_EXPIRY = NOW + datetime.timedelta(seconds=31) @@ -113,6 +118,72 @@ class AccessTokenTest(unittest.TestCase): self.assertFalse(auth.AccessToken('token', VALID_EXPIRY).needs_refresh()) +class HasLuciContextLocalAuthTest(unittest.TestCase): + def setUp(self): + mock.patch('os.environ').start() + mock.patch(BUILTIN_OPEN, mock.mock_open()).start() + self.addCleanup(mock.patch.stopall) + + def testNoLuciContextEnvVar(self): + os.environ = {} + self.assertFalse(auth.has_luci_context_local_auth()) + + def testUnexistentPath(self): + os.environ = {'LUCI_CONTEXT': 'path'} + open.side_effect = OSError + self.assertFalse(auth.has_luci_context_local_auth()) + open.assert_called_with('path') + + def testInvalidJsonFile(self): + os.environ = {'LUCI_CONTEXT': 'path'} + open().read.return_value = 'not-a-json-file' + self.assertFalse(auth.has_luci_context_local_auth()) + open.assert_called_with('path') + + def testNoLocalAuth(self): + os.environ = {'LUCI_CONTEXT': 'path'} + open().read.return_value = '{}' + self.assertFalse(auth.has_luci_context_local_auth()) + open.assert_called_with('path') + + def testNoDefaultAccountId(self): + os.environ = {'LUCI_CONTEXT': 'path'} + open().read.return_value = json.dumps({ + 'local_auth': { + 'secret': 'secret', + 'accounts': [{ + 'email': 'bots@account.iam.gserviceaccount.com', + 'id': 'system', + }], + 'rpc_port': 1234, + } + }) + self.assertFalse(auth.has_luci_context_local_auth()) + open.assert_called_with('path') + + def testHasLocalAuth(self): + os.environ = {'LUCI_CONTEXT': 'path'} + open().read.return_value = json.dumps({ + 'local_auth': { + 'secret': 'secret', + 'accounts': [ + { + 'email': 'bots@account.iam.gserviceaccount.com', + 'id': 'system', + }, + { + 'email': 'builder@account.iam.gserviceaccount.com', + 'id': 'task', + }, + ], + 'rpc_port': 1234, + 'default_account_id': 'task', + }, + }) + self.assertTrue(auth.has_luci_context_local_auth()) + open.assert_called_with('path') + + if __name__ == '__main__': if '-v' in sys.argv: logging.basicConfig(level=logging.DEBUG)