You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
808 lines
31 KiB
Python
808 lines
31 KiB
Python
#!/usr/bin/env vpython3
|
|
# coding=utf-8
|
|
# Copyright (c) 2019 The Chromium Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
import json
|
|
import os
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import textwrap
|
|
import unittest
|
|
|
|
from io import StringIO
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
from unittest import mock
|
|
|
|
import httplib2
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
import scm_mock
|
|
|
|
import gerrit_util
|
|
import metrics
|
|
import scm
|
|
import subprocess2
|
|
|
|
RUN_SUBPROC_TESTS = 'RUN_SUBPROC_TESTS' in os.environ
|
|
|
|
|
|
def makeConn(host: str) -> gerrit_util.HttpConn:
|
|
"""Makes an empty gerrit_util.HttpConn for the given host."""
|
|
return gerrit_util.HttpConn(
|
|
req_uri='???',
|
|
req_method='GET',
|
|
req_host=host,
|
|
req_headers={},
|
|
req_body=None,
|
|
)
|
|
|
|
|
|
class CookiesAuthenticatorTest(unittest.TestCase):
|
|
_GITCOOKIES = '\n'.join([
|
|
'\t'.join([
|
|
'chromium.googlesource.com',
|
|
'FALSE',
|
|
'/',
|
|
'TRUE',
|
|
'2147483647',
|
|
'o',
|
|
'git-user.chromium.org=1/chromium-secret',
|
|
]),
|
|
'\t'.join([
|
|
'chromium-review.googlesource.com',
|
|
'FALSE',
|
|
'/',
|
|
'TRUE',
|
|
'2147483647',
|
|
'o',
|
|
'git-user.chromium.org=1/chromium-secret',
|
|
]),
|
|
'\t'.join([
|
|
'.example.com',
|
|
'FALSE',
|
|
'/',
|
|
'TRUE',
|
|
'2147483647',
|
|
'o',
|
|
'example-bearer-token',
|
|
]),
|
|
'\t'.join([
|
|
'another-path.example.com',
|
|
'FALSE',
|
|
'/foo',
|
|
'TRUE',
|
|
'2147483647',
|
|
'o',
|
|
'git-example.com=1/another-path-secret',
|
|
]),
|
|
'\t'.join([
|
|
'another-key.example.com',
|
|
'FALSE',
|
|
'/',
|
|
'TRUE',
|
|
'2147483647',
|
|
'not-o',
|
|
'git-example.com=1/another-key-secret',
|
|
]),
|
|
'#' + '\t'.join([
|
|
'chromium-review.googlesource.com',
|
|
'FALSE',
|
|
'/',
|
|
'TRUE',
|
|
'2147483647',
|
|
'o',
|
|
'git-invalid-user.chromium.org=1/invalid-chromium-secret',
|
|
]),
|
|
'Some unrelated line\t that should not be here',
|
|
])
|
|
|
|
def setUp(self):
|
|
mock.patch('gclient_utils.FileRead',
|
|
return_value=self._GITCOOKIES).start()
|
|
mock.patch('os.getenv', return_value={}).start()
|
|
mock.patch('os.environ', {'HOME': '$HOME'}).start()
|
|
mock.patch('os.getcwd', return_value='/fame/cwd').start()
|
|
mock.patch('os.path.exists', return_value=True).start()
|
|
mock.patch(
|
|
'git_common.run',
|
|
side_effect=[
|
|
subprocess2.CalledProcessError(1, ['cmd'], 'cwd', 'out', 'err')
|
|
],
|
|
).start()
|
|
scm_mock.GIT(self)
|
|
|
|
self.addCleanup(mock.patch.stopall)
|
|
self.maxDiff = None
|
|
|
|
def assertAuthenticatedConnAuth(self,
|
|
auth: gerrit_util.CookiesAuthenticator,
|
|
host: str, expected: str):
|
|
conn = makeConn(host)
|
|
auth.authenticate(conn)
|
|
self.assertEqual(conn.req_headers['Authorization'], expected)
|
|
|
|
def testGetNewPasswordUrl(self):
|
|
auth = gerrit_util.CookiesAuthenticator()
|
|
self.assertEqual('https://chromium.googlesource.com/new-password',
|
|
auth.get_new_password_url('chromium.googlesource.com'))
|
|
self.assertEqual(
|
|
'https://chrome-internal.googlesource.com/new-password',
|
|
auth.get_new_password_url(
|
|
'chrome-internal-review.googlesource.com'))
|
|
|
|
def testGetNewPasswordMessage(self):
|
|
auth = gerrit_util.CookiesAuthenticator()
|
|
self.assertIn(
|
|
'https://chromium.googlesource.com/new-password',
|
|
auth._get_new_password_message('chromium-review.googlesource.com'))
|
|
self.assertIn(
|
|
'https://chrome-internal.googlesource.com/new-password',
|
|
auth._get_new_password_message('chrome-internal.googlesource.com'))
|
|
|
|
def testGetGitcookiesPath(self):
|
|
self.assertEqual(
|
|
os.path.expanduser(os.path.join('~', '.gitcookies')),
|
|
gerrit_util.CookiesAuthenticator().get_gitcookies_path())
|
|
|
|
scm.GIT.SetConfig(os.getcwd(), 'http.cookiefile', '/some/path')
|
|
self.assertEqual(
|
|
'/some/path',
|
|
gerrit_util.CookiesAuthenticator().get_gitcookies_path())
|
|
|
|
os.getenv.return_value = 'git-cookies-path'
|
|
self.assertEqual(
|
|
'git-cookies-path',
|
|
gerrit_util.CookiesAuthenticator().get_gitcookies_path())
|
|
os.getenv.assert_called_with('GIT_COOKIES_PATH')
|
|
|
|
def testGitcookies(self):
|
|
auth = gerrit_util.CookiesAuthenticator()
|
|
self.assertEqual(
|
|
auth.gitcookies, {
|
|
'chromium.googlesource.com':
|
|
('git-user.chromium.org', '1/chromium-secret'),
|
|
'chromium-review.googlesource.com':
|
|
('git-user.chromium.org', '1/chromium-secret'),
|
|
'.example.com': ('', 'example-bearer-token'),
|
|
})
|
|
|
|
def testGetAuthHeader(self):
|
|
expected_chromium_header = (
|
|
'Basic Z2l0LXVzZXIuY2hyb21pdW0ub3JnOjEvY2hyb21pdW0tc2VjcmV0')
|
|
|
|
auth = gerrit_util.CookiesAuthenticator()
|
|
self.assertAuthenticatedConnAuth(auth, 'chromium.googlesource.com',
|
|
expected_chromium_header)
|
|
self.assertAuthenticatedConnAuth(auth,
|
|
'chromium-review.googlesource.com',
|
|
expected_chromium_header)
|
|
self.assertAuthenticatedConnAuth(auth, 'some-review.example.com',
|
|
'Bearer example-bearer-token')
|
|
|
|
def testGetAuthEmail(self):
|
|
auth = gerrit_util.CookiesAuthenticator()
|
|
self.assertEqual('user@chromium.org',
|
|
auth.get_auth_email('chromium.googlesource.com'))
|
|
self.assertEqual(
|
|
'user@chromium.org',
|
|
auth.get_auth_email('chromium-review.googlesource.com'))
|
|
self.assertIsNone(auth.get_auth_email('some-review.example.com'))
|
|
|
|
|
|
class GceAuthenticatorTest(unittest.TestCase):
|
|
def setUp(self):
|
|
super(GceAuthenticatorTest, self).setUp()
|
|
mock.patch('httplib2.Http').start()
|
|
mock.patch('os.getenv', return_value=None).start()
|
|
mock.patch('gerrit_util.time_sleep').start()
|
|
mock.patch('gerrit_util.time_time').start()
|
|
self.addCleanup(mock.patch.stopall)
|
|
|
|
# GceAuthenticator has class variables that cache the results. Build a
|
|
# new class for every test to avoid inter-test dependencies.
|
|
class GceAuthenticator(gerrit_util.GceAuthenticator):
|
|
pass
|
|
|
|
self.GceAuthenticator = GceAuthenticator
|
|
|
|
def assertAuthenticatedToken(self, token: Optional[str]):
|
|
conn = makeConn('some.example.com')
|
|
self.GceAuthenticator().authenticate(conn)
|
|
if token is None:
|
|
self.assertNotIn('Authorization', conn.req_headers)
|
|
else:
|
|
self.assertEqual(conn.req_headers['Authorization'], token)
|
|
|
|
def testIsGce_EnvVarSkip(self, *_mocks):
|
|
os.getenv.return_value = '1'
|
|
self.assertFalse(self.GceAuthenticator.is_applicable())
|
|
os.getenv.assert_called_once_with('SKIP_GCE_AUTH_FOR_GIT')
|
|
|
|
def testIsGce_Error(self):
|
|
httplib2.Http().request.side_effect = httplib2.HttpLib2Error
|
|
self.assertFalse(self.GceAuthenticator.is_applicable())
|
|
|
|
def testIsGce_500(self):
|
|
httplib2.Http().request.return_value = (mock.Mock(status=500), None)
|
|
self.assertFalse(self.GceAuthenticator.is_applicable())
|
|
last_call = gerrit_util.time_sleep.mock_calls[-1]
|
|
self.assertLessEqual(last_call, mock.call(43.0))
|
|
|
|
def testIsGce_FailsThenSucceeds(self):
|
|
response = mock.Mock(status=200)
|
|
response.get.return_value = 'Google'
|
|
httplib2.Http().request.side_effect = [
|
|
(mock.Mock(status=500), None),
|
|
(response, 'who cares'),
|
|
]
|
|
self.assertTrue(self.GceAuthenticator.is_applicable())
|
|
|
|
def testIsGce_MetadataFlavorIsNotGoogle(self):
|
|
response = mock.Mock(status=200)
|
|
response.get.return_value = None
|
|
httplib2.Http().request.return_value = (response, 'who cares')
|
|
self.assertFalse(self.GceAuthenticator.is_applicable())
|
|
response.get.assert_called_once_with('metadata-flavor')
|
|
|
|
def testIsGce_ResultIsCached(self):
|
|
response = mock.Mock(status=200)
|
|
response.get.return_value = 'Google'
|
|
httplib2.Http().request.side_effect = [(response, 'who cares')]
|
|
self.assertTrue(self.GceAuthenticator.is_applicable())
|
|
self.assertTrue(self.GceAuthenticator.is_applicable())
|
|
httplib2.Http().request.assert_called_once()
|
|
|
|
def testGetAuthHeader_Error(self):
|
|
httplib2.Http().request.side_effect = httplib2.HttpLib2Error
|
|
self.assertAuthenticatedToken(None)
|
|
|
|
def testGetAuthHeader_500(self):
|
|
httplib2.Http().request.return_value = (mock.Mock(status=500), None)
|
|
self.assertAuthenticatedToken(None)
|
|
|
|
def testGetAuthHeader_Non200(self):
|
|
httplib2.Http().request.return_value = (mock.Mock(status=403), None)
|
|
self.assertAuthenticatedToken(None)
|
|
|
|
def testGetAuthHeader_OK(self):
|
|
httplib2.Http().request.return_value = (
|
|
mock.Mock(status=200),
|
|
'{"expires_in": 125, "token_type": "TYPE", "access_token": "TOKEN"}'
|
|
)
|
|
gerrit_util.time_time.return_value = 0
|
|
self.assertAuthenticatedToken('TYPE TOKEN')
|
|
|
|
def testGetAuthHeader_Cache(self):
|
|
httplib2.Http().request.return_value = (
|
|
mock.Mock(status=200),
|
|
'{"expires_in": 125, "token_type": "TYPE", "access_token": "TOKEN"}'
|
|
)
|
|
gerrit_util.time_time.return_value = 0
|
|
self.assertAuthenticatedToken('TYPE TOKEN')
|
|
self.assertAuthenticatedToken('TYPE TOKEN')
|
|
httplib2.Http().request.assert_called_once()
|
|
|
|
def testGetAuthHeader_CacheOld(self):
|
|
httplib2.Http().request.return_value = (
|
|
mock.Mock(status=200),
|
|
'{"expires_in": 125, "token_type": "TYPE", "access_token": "TOKEN"}'
|
|
)
|
|
gerrit_util.time_time.side_effect = [0, 100, 200]
|
|
self.assertAuthenticatedToken('TYPE TOKEN')
|
|
self.assertAuthenticatedToken('TYPE TOKEN')
|
|
self.assertEqual(2, len(httplib2.Http().request.mock_calls))
|
|
|
|
|
|
class GerritUtilTest(unittest.TestCase):
|
|
def setUp(self):
|
|
super(GerritUtilTest, self).setUp()
|
|
mock.patch('gerrit_util.LOGGER').start()
|
|
mock.patch('gerrit_util.time_sleep').start()
|
|
mock.patch('metrics.collector').start()
|
|
mock.patch('metrics_utils.extract_http_metrics',
|
|
return_value='http_metrics').start()
|
|
self.addCleanup(mock.patch.stopall)
|
|
|
|
def testQueryString(self):
|
|
self.assertEqual('', gerrit_util._QueryString([]))
|
|
self.assertEqual('first%20param%2B',
|
|
gerrit_util._QueryString([], 'first param+'))
|
|
self.assertEqual(
|
|
'key:val+foo:bar',
|
|
gerrit_util._QueryString([('key', 'val'), ('foo', 'bar')]))
|
|
self.assertEqual(
|
|
'first%20param%2B+key:val+foo:bar',
|
|
gerrit_util._QueryString([('key', 'val'), ('foo', 'bar')],
|
|
'first param+'))
|
|
|
|
@mock.patch('gerrit_util.CookiesAuthenticator._get_auth_for_host')
|
|
@mock.patch('gerrit_util._Authenticator.get')
|
|
def testCreateHttpConn_Basic(self, mockAuth, cookieAuth):
|
|
mockAuth.return_value = gerrit_util.CookiesAuthenticator()
|
|
cookieAuth.return_value = None
|
|
|
|
conn = gerrit_util.CreateHttpConn('host.example.com', 'foo/bar')
|
|
self.assertEqual('host.example.com', conn.req_host)
|
|
self.assertEqual(
|
|
{
|
|
'uri': 'https://host.example.com/a/foo/bar',
|
|
'method': 'GET',
|
|
'headers': {},
|
|
'body': None,
|
|
}, conn.req_params)
|
|
|
|
@mock.patch('gerrit_util.CookiesAuthenticator._get_auth_for_host')
|
|
@mock.patch('gerrit_util._Authenticator.get')
|
|
def testCreateHttpConn_Authenticated(self, mockAuth, cookieAuth):
|
|
mockAuth.return_value = gerrit_util.CookiesAuthenticator()
|
|
cookieAuth.return_value = (None, 'token')
|
|
|
|
conn = gerrit_util.CreateHttpConn('host.example.com',
|
|
'foo/bar',
|
|
headers={'header': 'value'})
|
|
self.assertEqual('host.example.com', conn.req_host)
|
|
self.assertEqual(
|
|
{
|
|
'uri': 'https://host.example.com/a/foo/bar',
|
|
'method': 'GET',
|
|
'headers': {
|
|
'Authorization': 'Bearer token',
|
|
'header': 'value'
|
|
},
|
|
'body': None,
|
|
}, conn.req_params)
|
|
|
|
@mock.patch('gerrit_util.CookiesAuthenticator._get_auth_for_host')
|
|
@mock.patch('gerrit_util._Authenticator')
|
|
def testCreateHttpConn_Body(self, mockAuth, cookieAuth):
|
|
mockAuth.return_value = gerrit_util.CookiesAuthenticator()
|
|
cookieAuth.return_value = None
|
|
|
|
conn = gerrit_util.CreateHttpConn('host.example.com',
|
|
'foo/bar',
|
|
body={
|
|
'l': [1, 2, 3],
|
|
'd': {
|
|
'k': 'v'
|
|
}
|
|
})
|
|
self.assertEqual('host.example.com', conn.req_host)
|
|
self.assertEqual(
|
|
{
|
|
'uri': 'https://host.example.com/a/foo/bar',
|
|
'method': 'GET',
|
|
'headers': {
|
|
'Content-Type': 'application/json'
|
|
},
|
|
'body': '{"d": {"k": "v"}, "l": [1, 2, 3]}',
|
|
}, conn.req_params)
|
|
|
|
def testReadHttpResponse_200(self):
|
|
conn = mock.Mock()
|
|
conn.req_params = {'uri': 'uri', 'method': 'method'}
|
|
conn.request.return_value = (mock.Mock(status=200),
|
|
b'content\xe2\x9c\x94')
|
|
|
|
content = gerrit_util.ReadHttpResponse(conn)
|
|
self.assertEqual('content✔', content.getvalue())
|
|
metrics.collector.add_repeated.assert_called_once_with(
|
|
'http_requests', 'http_metrics')
|
|
|
|
def testReadHttpResponse_AuthenticationIssue(self):
|
|
for status in (302, 401, 403):
|
|
response = mock.Mock(status=status)
|
|
response.get.return_value = None
|
|
conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'})
|
|
conn.request.return_value = (response, b'')
|
|
|
|
with mock.patch('sys.stdout', StringIO()):
|
|
with self.assertRaises(gerrit_util.GerritError) as cm:
|
|
gerrit_util.ReadHttpResponse(conn)
|
|
|
|
self.assertEqual(status, cm.exception.http_status)
|
|
self.assertIn('Your Gerrit credentials might be misconfigured',
|
|
sys.stdout.getvalue())
|
|
|
|
def testReadHttpResponse_ClientError(self):
|
|
conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'})
|
|
conn.request.return_value = (mock.Mock(status=404), b'')
|
|
|
|
with self.assertRaises(gerrit_util.GerritError) as cm:
|
|
gerrit_util.ReadHttpResponse(conn)
|
|
|
|
self.assertEqual(404, cm.exception.http_status)
|
|
|
|
def readHttpResponse_ServerErrorHelper(self, status):
|
|
conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'})
|
|
conn.request.return_value = (mock.Mock(status=status), b'')
|
|
|
|
with self.assertRaises(gerrit_util.GerritError) as cm:
|
|
gerrit_util.ReadHttpResponse(conn)
|
|
|
|
self.assertEqual(status, cm.exception.http_status)
|
|
self.assertEqual(gerrit_util.TRY_LIMIT, len(conn.request.mock_calls))
|
|
last_call = gerrit_util.time_sleep.mock_calls[-1]
|
|
self.assertLessEqual(last_call, mock.call(422.0))
|
|
|
|
def testReadHttpResponse_ServerError(self):
|
|
self.readHttpResponse_ServerErrorHelper(status=404)
|
|
self.readHttpResponse_ServerErrorHelper(status=409)
|
|
self.readHttpResponse_ServerErrorHelper(status=429)
|
|
self.readHttpResponse_ServerErrorHelper(status=500)
|
|
|
|
def testReadHttpResponse_ServerErrorAndSuccess(self):
|
|
conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'})
|
|
conn.request.side_effect = [
|
|
(mock.Mock(status=500), b''),
|
|
(mock.Mock(status=200), b'content\xe2\x9c\x94'),
|
|
]
|
|
|
|
self.assertEqual('content✔',
|
|
gerrit_util.ReadHttpResponse(conn).getvalue())
|
|
self.assertEqual(2, len(conn.request.mock_calls))
|
|
gerrit_util.time_sleep.assert_called_once_with(12.0)
|
|
|
|
def testReadHttpResponse_TimeoutAndSuccess(self):
|
|
conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'})
|
|
conn.request.side_effect = [
|
|
socket.timeout('timeout'),
|
|
(mock.Mock(status=200), b'content\xe2\x9c\x94'),
|
|
]
|
|
|
|
self.assertEqual('content✔',
|
|
gerrit_util.ReadHttpResponse(conn).getvalue())
|
|
self.assertEqual(2, len(conn.request.mock_calls))
|
|
gerrit_util.time_sleep.assert_called_once_with(12.0)
|
|
|
|
def testReadHttpResponse_SetMaxTries(self):
|
|
conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'})
|
|
conn.request.side_effect = [
|
|
(mock.Mock(status=409), b'error!'),
|
|
(mock.Mock(status=409), b'error!'),
|
|
(mock.Mock(status=409), b'error!'),
|
|
]
|
|
|
|
self.assertRaises(gerrit_util.GerritError,
|
|
gerrit_util.ReadHttpResponse,
|
|
conn,
|
|
max_tries=2)
|
|
self.assertEqual(2, len(conn.request.mock_calls))
|
|
gerrit_util.time_sleep.assert_called_once_with(12.0)
|
|
|
|
def testReadHttpResponse_Expected404(self):
|
|
conn = mock.Mock()
|
|
conn.req_params = {'uri': 'uri', 'method': 'method'}
|
|
conn.request.return_value = (mock.Mock(status=404),
|
|
b'content\xe2\x9c\x94')
|
|
|
|
content = gerrit_util.ReadHttpResponse(conn, (404, ))
|
|
self.assertEqual('', content.getvalue())
|
|
|
|
@mock.patch('gerrit_util.ReadHttpResponse')
|
|
def testReadHttpJsonResponse_NotJSON(self, mockReadHttpResponse):
|
|
mockReadHttpResponse.return_value = StringIO('not json')
|
|
with self.assertRaises(gerrit_util.GerritError) as cm:
|
|
gerrit_util.ReadHttpJsonResponse(None)
|
|
self.assertEqual(cm.exception.http_status, 200)
|
|
self.assertEqual(cm.exception.message,
|
|
'(200) Unexpected json output: not json')
|
|
|
|
@mock.patch('gerrit_util.ReadHttpResponse')
|
|
def testReadHttpJsonResponse_EmptyValue(self, mockReadHttpResponse):
|
|
mockReadHttpResponse.return_value = StringIO(')]}\'')
|
|
self.assertEqual(gerrit_util.ReadHttpJsonResponse(None), {})
|
|
|
|
@mock.patch('gerrit_util.ReadHttpResponse')
|
|
def testReadHttpJsonResponse_JSON(self, mockReadHttpResponse):
|
|
expected_value = {'foo': 'bar', 'baz': [1, '2', 3]}
|
|
mockReadHttpResponse.return_value = StringIO(')]}\'\n' +
|
|
json.dumps(expected_value))
|
|
self.assertEqual(expected_value, gerrit_util.ReadHttpJsonResponse(None))
|
|
|
|
@mock.patch('gerrit_util.CreateHttpConn')
|
|
@mock.patch('gerrit_util.ReadHttpJsonResponse')
|
|
def testQueryChanges(self, mockJsonResponse, mockCreateHttpConn):
|
|
gerrit_util.QueryChanges('host', [('key', 'val'), ('foo', 'bar baz')],
|
|
'first param',
|
|
limit=500,
|
|
o_params=['PARAM_A', 'PARAM_B'],
|
|
start='start')
|
|
mockCreateHttpConn.assert_called_once_with(
|
|
'host', ('changes/?q=first%20param+key:val+foo:bar+baz'
|
|
'&start=start'
|
|
'&n=500'
|
|
'&o=PARAM_A'
|
|
'&o=PARAM_B'),
|
|
timeout=30.0)
|
|
|
|
def testQueryChanges_NoParams(self):
|
|
self.assertRaises(RuntimeError, gerrit_util.QueryChanges, 'host', [])
|
|
|
|
@mock.patch('gerrit_util.QueryChanges')
|
|
def testGenerateAllChanges(self, mockQueryChanges):
|
|
mockQueryChanges.side_effect = [
|
|
# First results page
|
|
[
|
|
{
|
|
'_number': '4'
|
|
},
|
|
{
|
|
'_number': '3'
|
|
},
|
|
{
|
|
'_number': '2',
|
|
'_more_changes': True
|
|
},
|
|
],
|
|
# Second results page, there are new changes, so second page
|
|
# includes some results from the first page.
|
|
[
|
|
{
|
|
'_number': '2'
|
|
},
|
|
{
|
|
'_number': '1'
|
|
},
|
|
],
|
|
# GenerateAllChanges queries again from the start to get any new
|
|
# changes (5 in this case).
|
|
[
|
|
{
|
|
'_number': '5'
|
|
},
|
|
{
|
|
'_number': '4'
|
|
},
|
|
{
|
|
'_number': '3',
|
|
'_more_changes': True
|
|
},
|
|
],
|
|
]
|
|
|
|
changes = list(gerrit_util.GenerateAllChanges('host', 'params'))
|
|
self.assertEqual([
|
|
{
|
|
'_number': '4'
|
|
},
|
|
{
|
|
'_number': '3'
|
|
},
|
|
{
|
|
'_number': '2',
|
|
'_more_changes': True
|
|
},
|
|
{
|
|
'_number': '1'
|
|
},
|
|
{
|
|
'_number': '5'
|
|
},
|
|
], changes)
|
|
self.assertEqual([
|
|
mock.call('host', 'params', None, 500, None, 0),
|
|
mock.call('host', 'params', None, 500, None, 3),
|
|
mock.call('host', 'params', None, 500, None, 0),
|
|
], mockQueryChanges.mock_calls)
|
|
|
|
@mock.patch('gerrit_util.CreateHttpConn')
|
|
@mock.patch('gerrit_util.ReadHttpJsonResponse')
|
|
def testIsCodeOwnersEnabledOnRepo_Disabled(self, mockJsonResponse,
|
|
mockCreateHttpConn):
|
|
mockJsonResponse.return_value = {'status': {'disabled': True}}
|
|
self.assertFalse(gerrit_util.IsCodeOwnersEnabledOnRepo('host', 'repo'))
|
|
|
|
@mock.patch('gerrit_util.CreateHttpConn')
|
|
@mock.patch('gerrit_util.ReadHttpJsonResponse')
|
|
def testIsCodeOwnersEnabledOnRepo_Enabled(self, mockJsonResponse,
|
|
mockCreateHttpConn):
|
|
mockJsonResponse.return_value = {'status': {}}
|
|
self.assertTrue(gerrit_util.IsCodeOwnersEnabledOnRepo('host', 'repo'))
|
|
|
|
|
|
class SSOAuthenticatorTest(unittest.TestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls) -> None:
|
|
cls._original_timeout_secs = gerrit_util.SSOAuthenticator._timeout_secs
|
|
return super().setUpClass()
|
|
|
|
def setUp(self) -> None:
|
|
gerrit_util.SSOAuthenticator._sso_info = None
|
|
gerrit_util.SSOAuthenticator._testing_load_expired_cookies = True
|
|
gerrit_util.SSOAuthenticator._timeout_secs = self._original_timeout_secs
|
|
self.sso = gerrit_util.SSOAuthenticator()
|
|
return super().setUp()
|
|
|
|
def tearDown(self) -> None:
|
|
gerrit_util.SSOAuthenticator._sso_info = None
|
|
gerrit_util.SSOAuthenticator._testing_load_expired_cookies = False
|
|
gerrit_util.SSOAuthenticator._timeout_secs = self._original_timeout_secs
|
|
return super().tearDown()
|
|
|
|
@property
|
|
def _input_dir(self) -> Path:
|
|
base = Path(__file__).absolute().with_suffix('.inputs')
|
|
# Here _testMethodName would be a string like "testCmdAssemblyFound"
|
|
return base / self._testMethodName
|
|
|
|
@mock.patch('gerrit_util.ssoHelper.find_cmd',
|
|
return_value='/fake/git-remote-sso')
|
|
def testCmdAssemblyFound(self, _):
|
|
self.assertEqual(self.sso._resolve_sso_cmd(),
|
|
('/fake/git-remote-sso', '-print_config',
|
|
'sso://*.git.corp.google.com'))
|
|
with mock.patch('scm.GIT.GetConfig') as p:
|
|
p.side_effect = ['firefly@google.com']
|
|
self.assertTrue(self.sso.is_applicable())
|
|
|
|
@mock.patch('gerrit_util.ssoHelper.find_cmd', return_value=None)
|
|
def testCmdAssemblyNotFound(self, _):
|
|
self.assertEqual(self.sso._resolve_sso_cmd(), ())
|
|
self.assertFalse(self.sso.is_applicable())
|
|
|
|
def testParseConfigOK(self):
|
|
test_config = {
|
|
'somekey': 'a value with = in it',
|
|
'novalue': '',
|
|
'http.proxy': 'localhost:12345',
|
|
'http.cookiefile': str(self._input_dir / 'cookiefile.txt'),
|
|
'include.path': str(self._input_dir / 'gitconfig'),
|
|
}
|
|
parsed = self.sso._parse_config(test_config)
|
|
self.assertDictEqual(parsed.headers, {
|
|
'Authorization': 'Basic REALLY_COOL_TOKEN',
|
|
})
|
|
self.assertEqual(parsed.proxy.proxy_host, b'localhost')
|
|
self.assertEqual(parsed.proxy.proxy_port, 12345)
|
|
|
|
c = parsed.cookies._cookies
|
|
self.assertEqual(c['login.example.com']['/']['SSO'].value,
|
|
'TUVFUE1PUlAK')
|
|
self.assertEqual(c['.example.com']['/']['__CoolProxy'].value,
|
|
'QkxFRVBCTE9SUAo=')
|
|
|
|
@unittest.skipUnless(RUN_SUBPROC_TESTS, 'subprocess tests are flakey')
|
|
def testLaunchHelperOK(self):
|
|
gerrit_util.SSOAuthenticator._sso_cmd = ('python3',
|
|
str(self._input_dir /
|
|
'git-remote-sso.py'))
|
|
|
|
info = self.sso._get_sso_info()
|
|
self.assertDictEqual(info.headers, {
|
|
'Authorization': 'Basic REALLY_COOL_TOKEN',
|
|
})
|
|
self.assertEqual(info.proxy.proxy_host, b'localhost')
|
|
self.assertEqual(info.proxy.proxy_port, 12345)
|
|
c = info.cookies._cookies
|
|
self.assertEqual(c['login.example.com']['/']['SSO'].value,
|
|
'TUVFUE1PUlAK')
|
|
self.assertEqual(c['.example.com']['/']['__CoolProxy'].value,
|
|
'QkxFRVBCTE9SUAo=')
|
|
|
|
@unittest.skipUnless(RUN_SUBPROC_TESTS, 'subprocess tests are flakey')
|
|
def testLaunchHelperFailQuick(self):
|
|
gerrit_util.SSOAuthenticator._sso_cmd = ('python3',
|
|
str(self._input_dir /
|
|
'git-remote-sso.py'))
|
|
|
|
with self.assertRaisesRegex(SystemExit, "SSO Failure Message!!!"):
|
|
self.sso._get_sso_info()
|
|
|
|
@unittest.skipUnless(RUN_SUBPROC_TESTS, 'subprocess tests are flakey')
|
|
def testLaunchHelperFailSlow(self):
|
|
gerrit_util.SSOAuthenticator._timeout_secs = 0.2
|
|
gerrit_util.SSOAuthenticator._sso_cmd = ('python3',
|
|
str(self._input_dir /
|
|
'git-remote-sso.py'))
|
|
|
|
with self.assertRaises(subprocess.TimeoutExpired):
|
|
self.sso._get_sso_info()
|
|
|
|
|
|
class SSOHelperTest(unittest.TestCase):
|
|
|
|
def setUp(self) -> None:
|
|
self.sso = gerrit_util.SSOHelper()
|
|
return super().setUp()
|
|
|
|
@mock.patch('shutil.which', return_value='/fake/git-remote-sso')
|
|
def testFindCmd(self, _):
|
|
self.assertEqual(self.sso.find_cmd(), '/fake/git-remote-sso')
|
|
|
|
@mock.patch('shutil.which', return_value=None)
|
|
def testFindCmdMissing(self, _):
|
|
self.assertEqual(self.sso.find_cmd(), '')
|
|
|
|
@mock.patch('shutil.which', return_value='/fake/git-remote-sso')
|
|
def testFindCmdCached(self, which):
|
|
self.sso.find_cmd()
|
|
self.sso.find_cmd()
|
|
self.assertEqual(which.called, 1)
|
|
|
|
|
|
class ShouldUseSSOTest(unittest.TestCase):
|
|
|
|
def setUp(self) -> None:
|
|
self.newauth = mock.patch('newauth.Enabled', return_value=True)
|
|
self.newauth.start()
|
|
self.cwd = mock.patch('os.getcwd', return_value='/fake/cwd')
|
|
self.cwd.start()
|
|
self.sso = mock.patch('gerrit_util.ssoHelper.find_cmd',
|
|
return_value='/fake/git-remote-sso')
|
|
self.sso.start()
|
|
scm_mock.GIT(self)
|
|
self.addCleanup(mock.patch.stopall)
|
|
|
|
gerrit_util.ShouldUseSSO.cache_clear()
|
|
return super().setUp()
|
|
|
|
def tearDown(self) -> None:
|
|
super().tearDown()
|
|
self.sso.stop()
|
|
self.newauth.stop()
|
|
|
|
@mock.patch('newauth.Enabled', return_value=False)
|
|
def testDisabled(self, _):
|
|
self.assertFalse(
|
|
gerrit_util.ShouldUseSSO('fake-host.googlesource.com',
|
|
'firefly@google.com'))
|
|
|
|
@mock.patch('gerrit_util.ssoHelper.find_cmd', return_value='')
|
|
def testMissingCommand(self, _):
|
|
self.assertFalse(
|
|
gerrit_util.ShouldUseSSO('fake-host.googlesource.com',
|
|
'firefly@google.com'))
|
|
|
|
def testBadHost(self):
|
|
self.assertFalse(
|
|
gerrit_util.ShouldUseSSO('fake-host.coreboot.org',
|
|
'firefly@google.com'))
|
|
|
|
def testEmptyEmail(self):
|
|
self.assertTrue(
|
|
gerrit_util.ShouldUseSSO('fake-host.googlesource.com', ''))
|
|
|
|
def testGoogleEmail(self):
|
|
self.assertTrue(
|
|
gerrit_util.ShouldUseSSO('fake-host.googlesource.com',
|
|
'firefly@google.com'))
|
|
|
|
def testGmail(self):
|
|
self.assertFalse(
|
|
gerrit_util.ShouldUseSSO('fake-host.googlesource.com',
|
|
'firefly@gmail.com'))
|
|
|
|
@mock.patch('gerrit_util.GetAccountEmails',
|
|
return_value=[{
|
|
'email': 'firefly@chromium.org'
|
|
}])
|
|
def testLinkedChromium(self, email):
|
|
self.assertTrue(
|
|
gerrit_util.ShouldUseSSO('fake-host.googlesource.com',
|
|
'firefly@chromium.org'))
|
|
email.assert_called_with('fake-host.googlesource.com',
|
|
'self',
|
|
authenticator=mock.ANY)
|
|
|
|
@mock.patch('gerrit_util.GetAccountEmails',
|
|
return_value=[{
|
|
'email': 'firefly@google.com'
|
|
}])
|
|
def testUnlinkedChromium(self, email):
|
|
self.assertFalse(
|
|
gerrit_util.ShouldUseSSO('fake-host.googlesource.com',
|
|
'firefly@chromium.org'))
|
|
email.assert_called_with('fake-host.googlesource.com',
|
|
'self',
|
|
authenticator=mock.ANY)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|