#!/usr/bin/env vpython3
# coding=utf-8
# Copyright (c) 2019 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

from __future__ import print_function
from __future__ import unicode_literals


import base64
import httplib2
import json
import os
import sys
import unittest

if sys.version_info.major == 2:
  from cStringIO import StringIO
  import mock
else:
  from io import StringIO
  from unittest import mock

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import gerrit_util
import gclient_utils
import metrics
import metrics_utils
import subprocess2


class CookiesAuthenticatorTest(unittest.TestCase):
  _GITCOOKIES = '\n'.join([
      '\t'.join([
          'chromium.googlesource.com',
          'FALSE',
          '/',
          'TRUE',
          '2147483647',
          'o',
          'git-user.chromium.org=1/chromium-secret',
      ]),
      '\t'.join([
          'chromium-review.googlesource.com',
          'FALSE',
          '/',
          'TRUE',
          '2147483647',
          'o',
          'git-user.chromium.org=1/chromium-secret',
      ]),
      '\t'.join([
          '.example.com',
          'FALSE',
          '/',
          'TRUE',
          '2147483647',
          'o',
          'example-bearer-token',
      ]),
      '\t'.join([
          'another-path.example.com',
          'FALSE',
          '/foo',
          'TRUE',
          '2147483647',
          'o',
          'git-example.com=1/another-path-secret',
      ]),
      '\t'.join([
          'another-key.example.com',
          'FALSE',
          '/',
          'TRUE',
          '2147483647',
          'not-o',
          'git-example.com=1/another-key-secret',
      ]),
      '#' + '\t'.join([
          'chromium-review.googlesource.com',
          'FALSE',
          '/',
          'TRUE',
          '2147483647',
          'o',
          'git-invalid-user.chromium.org=1/invalid-chromium-secret',
      ]),
      'Some unrelated line\t that should not be here',
  ])

  def setUp(self):
    mock.patch('gclient_utils.FileRead', return_value=self._GITCOOKIES).start()
    mock.patch('os.getenv', return_value={}).start()
    mock.patch('os.environ', {'HOME': '$HOME'}).start()
    mock.patch('os.path.exists', return_value=True).start()
    mock.patch(
        'subprocess2.check_output',
        side_effect=[
            subprocess2.CalledProcessError(1, ['cmd'], 'cwd', 'out', 'err')],
    ).start()
    self.addCleanup(mock.patch.stopall)
    self.maxDiff = None

  def testGetNewPasswordUrl(self):
    auth = gerrit_util.CookiesAuthenticator()
    self.assertEqual(
        'https://chromium-review.googlesource.com/new-password',
        auth.get_new_password_url('chromium.googlesource.com'))
    self.assertEqual(
        'https://chrome-internal-review.googlesource.com/new-password',
        auth.get_new_password_url('chrome-internal-review.googlesource.com'))

  def testGetNewPasswordMessage(self):
    auth = gerrit_util.CookiesAuthenticator()
    self.assertIn(
        'https://chromium-review.googlesource.com/new-password',
        auth.get_new_password_message('chromium-review.googlesource.com'))
    self.assertIn(
        'https://chrome-internal-review.googlesource.com/new-password',
        auth.get_new_password_message('chrome-internal.googlesource.com'))

  def testGetGitcookiesPath(self):
    self.assertEqual(
        os.path.expanduser(os.path.join('~', '.gitcookies')),
        gerrit_util.CookiesAuthenticator().get_gitcookies_path())

    subprocess2.check_output.side_effect = [b'http.cookiefile']
    self.assertEqual(
        'http.cookiefile',
        gerrit_util.CookiesAuthenticator().get_gitcookies_path())
    subprocess2.check_output.assert_called_with(
        ['git', 'config', '--path', 'http.cookiefile'])

    os.getenv.return_value = 'git-cookies-path'
    self.assertEqual(
        'git-cookies-path',
        gerrit_util.CookiesAuthenticator().get_gitcookies_path())
    os.getenv.assert_called_with('GIT_COOKIES_PATH')

  def testGitcookies(self):
    auth = gerrit_util.CookiesAuthenticator()
    self.assertEqual(auth.gitcookies, {
        'chromium.googlesource.com':
            ('git-user.chromium.org', '1/chromium-secret'),
        'chromium-review.googlesource.com':
            ('git-user.chromium.org', '1/chromium-secret'),
        '.example.com':
            ('', 'example-bearer-token'),
    })

  def testGetAuthHeader(self):
    expected_chromium_header = (
        'Basic Z2l0LXVzZXIuY2hyb21pdW0ub3JnOjEvY2hyb21pdW0tc2VjcmV0')

    auth = gerrit_util.CookiesAuthenticator()
    self.assertEqual(
        expected_chromium_header,
        auth.get_auth_header('chromium.googlesource.com'))
    self.assertEqual(
        expected_chromium_header,
        auth.get_auth_header('chromium-review.googlesource.com'))
    self.assertEqual(
        'Bearer example-bearer-token',
        auth.get_auth_header('some-review.example.com'))

  def testGetAuthEmail(self):
    auth = gerrit_util.CookiesAuthenticator()
    self.assertEqual(
        'user@chromium.org',
        auth.get_auth_email('chromium.googlesource.com'))
    self.assertEqual(
        'user@chromium.org',
        auth.get_auth_email('chromium-review.googlesource.com'))
    self.assertIsNone(auth.get_auth_email('some-review.example.com'))


class GceAuthenticatorTest(unittest.TestCase):
  def setUp(self):
    super(GceAuthenticatorTest, self).setUp()
    mock.patch('httplib2.Http').start()
    mock.patch('os.getenv', return_value=None).start()
    mock.patch('gerrit_util.time_sleep').start()
    mock.patch('gerrit_util.time_time').start()
    self.addCleanup(mock.patch.stopall)
    # GceAuthenticator has class variables that cache the results. Build a new
    # class for every test to avoid inter-test dependencies.
    class GceAuthenticator(gerrit_util.GceAuthenticator):
      pass
    self.GceAuthenticator = GceAuthenticator

  def testIsGce_EnvVarSkip(self, *_mocks):
    os.getenv.return_value = '1'
    self.assertFalse(self.GceAuthenticator.is_gce())
    os.getenv.assert_called_once_with('SKIP_GCE_AUTH_FOR_GIT')

  def testIsGce_Error(self):
    httplib2.Http().request.side_effect = httplib2.HttpLib2Error
    self.assertFalse(self.GceAuthenticator.is_gce())

  def testIsGce_500(self):
    httplib2.Http().request.return_value = (mock.Mock(status=500), None)
    self.assertFalse(self.GceAuthenticator.is_gce())
    last_call = gerrit_util.time_sleep.mock_calls[-1]
    self.assertLessEqual(last_call, mock.call(43.0))

  def testIsGce_FailsThenSucceeds(self):
    response = mock.Mock(status=200)
    response.get.return_value = 'Google'
    httplib2.Http().request.side_effect = [
        (mock.Mock(status=500), None),
        (response, 'who cares'),
    ]
    self.assertTrue(self.GceAuthenticator.is_gce())

  def testIsGce_MetadataFlavorIsNotGoogle(self):
    response = mock.Mock(status=200)
    response.get.return_value = None
    httplib2.Http().request.return_value = (response, 'who cares')
    self.assertFalse(self.GceAuthenticator.is_gce())
    response.get.assert_called_once_with('metadata-flavor')

  def testIsGce_ResultIsCached(self):
    response = mock.Mock(status=200)
    response.get.return_value = 'Google'
    httplib2.Http().request.side_effect = [(response, 'who cares')]
    self.assertTrue(self.GceAuthenticator.is_gce())
    self.assertTrue(self.GceAuthenticator.is_gce())
    httplib2.Http().request.assert_called_once()

  def testGetAuthHeader_Error(self):
    httplib2.Http().request.side_effect = httplib2.HttpLib2Error
    self.assertIsNone(self.GceAuthenticator().get_auth_header(''))

  def testGetAuthHeader_500(self):
    httplib2.Http().request.return_value = (mock.Mock(status=500), None)
    self.assertIsNone(self.GceAuthenticator().get_auth_header(''))

  def testGetAuthHeader_Non200(self):
    httplib2.Http().request.return_value = (mock.Mock(status=403), None)
    self.assertIsNone(self.GceAuthenticator().get_auth_header(''))

  def testGetAuthHeader_OK(self):
    httplib2.Http().request.return_value = (
        mock.Mock(status=200),
        '{"expires_in": 125, "token_type": "TYPE", "access_token": "TOKEN"}')
    gerrit_util.time_time.return_value = 0
    self.assertEqual('TYPE TOKEN', self.GceAuthenticator().get_auth_header(''))

  def testGetAuthHeader_Cache(self):
    httplib2.Http().request.return_value = (
        mock.Mock(status=200),
        '{"expires_in": 125, "token_type": "TYPE", "access_token": "TOKEN"}')
    gerrit_util.time_time.return_value = 0
    self.assertEqual('TYPE TOKEN', self.GceAuthenticator().get_auth_header(''))
    self.assertEqual('TYPE TOKEN', self.GceAuthenticator().get_auth_header(''))
    httplib2.Http().request.assert_called_once()

  def testGetAuthHeader_CacheOld(self):
    httplib2.Http().request.return_value = (
        mock.Mock(status=200),
        '{"expires_in": 125, "token_type": "TYPE", "access_token": "TOKEN"}')
    gerrit_util.time_time.side_effect = [0, 100, 200]
    self.assertEqual('TYPE TOKEN', self.GceAuthenticator().get_auth_header(''))
    self.assertEqual('TYPE TOKEN', self.GceAuthenticator().get_auth_header(''))
    self.assertEqual(2, len(httplib2.Http().request.mock_calls))


class GerritUtilTest(unittest.TestCase):
  def setUp(self):
    super(GerritUtilTest, self).setUp()
    mock.patch('gerrit_util.LOGGER').start()
    mock.patch('gerrit_util.time_sleep').start()
    mock.patch('metrics.collector').start()
    mock.patch(
        'metrics_utils.extract_http_metrics',
        return_value='http_metrics').start()
    self.addCleanup(mock.patch.stopall)

  def testQueryString(self):
    self.assertEqual('', gerrit_util._QueryString([]))
    self.assertEqual(
        'first%20param%2B', gerrit_util._QueryString([], 'first param+'))
    self.assertEqual(
        'key:val+foo:bar',
         gerrit_util._QueryString([('key', 'val'), ('foo', 'bar')]))
    self.assertEqual(
        'first%20param%2B+key:val+foo:bar',
         gerrit_util._QueryString(
            [('key', 'val'), ('foo', 'bar')], 'first param+'))

  @mock.patch('gerrit_util.Authenticator')
  def testCreateHttpConn_Basic(self, mockAuth):
    mockAuth.get().get_auth_header.return_value = None
    conn = gerrit_util.CreateHttpConn('host.example.com', 'foo/bar')
    self.assertEqual('host.example.com', conn.req_host)
    self.assertEqual({
        'uri': 'https://host.example.com/foo/bar',
        'method': 'GET',
        'headers': {},
        'body': None,
    }, conn.req_params)

  @mock.patch('gerrit_util.Authenticator')
  def testCreateHttpConn_Authenticated(self, mockAuth):
    mockAuth.get().get_auth_header.return_value = 'Bearer token'
    conn = gerrit_util.CreateHttpConn(
        'host.example.com', 'foo/bar', headers={'header': 'value'})
    self.assertEqual('host.example.com', conn.req_host)
    self.assertEqual({
        'uri': 'https://host.example.com/a/foo/bar',
        'method': 'GET',
        'headers': {'Authorization': 'Bearer token', 'header': 'value'},
        'body': None,
    }, conn.req_params)

  @mock.patch('gerrit_util.Authenticator')
  def testCreateHttpConn_Body(self, mockAuth):
    mockAuth.get().get_auth_header.return_value = None
    conn = gerrit_util.CreateHttpConn(
        'host.example.com', 'foo/bar', body={'l': [1, 2, 3], 'd': {'k': 'v'}})
    self.assertEqual('host.example.com', conn.req_host)
    self.assertEqual({
        'uri': 'https://host.example.com/foo/bar',
        'method': 'GET',
        'headers': {'Content-Type': 'application/json'},
        'body': '{"d": {"k": "v"}, "l": [1, 2, 3]}',
    }, conn.req_params)

  def testReadHttpResponse_200(self):
    conn = mock.Mock()
    conn.req_params = {'uri': 'uri', 'method': 'method'}
    conn.request.return_value = (mock.Mock(status=200), b'content\xe2\x9c\x94')

    content = gerrit_util.ReadHttpResponse(conn)
    self.assertEqual('content✔', content.getvalue())
    metrics.collector.add_repeated.assert_called_once_with(
        'http_requests', 'http_metrics')

  def testReadHttpResponse_AuthenticationIssue(self):
    for status in (302, 401, 403):
      response = mock.Mock(status=status)
      response.get.return_value = None
      conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'})
      conn.request.return_value = (response, b'')

      with mock.patch('sys.stdout', StringIO()):
        with self.assertRaises(gerrit_util.GerritError) as cm:
          gerrit_util.ReadHttpResponse(conn)

        self.assertEqual(status, cm.exception.http_status)
        self.assertIn(
            'Your Gerrit credentials might be misconfigured',
            sys.stdout.getvalue())

  def testReadHttpResponse_ClientError(self):
    conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'})
    conn.request.return_value = (mock.Mock(status=404), b'')

    with self.assertRaises(gerrit_util.GerritError) as cm:
      gerrit_util.ReadHttpResponse(conn)

    self.assertEqual(404, cm.exception.http_status)

  def readHttpResponse_ServerErrorHelper(self, status):
    conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'})
    conn.request.return_value = (mock.Mock(status=status), b'')

    with self.assertRaises(gerrit_util.GerritError) as cm:
      gerrit_util.ReadHttpResponse(conn)

    self.assertEqual(status, cm.exception.http_status)
    self.assertEqual(gerrit_util.TRY_LIMIT, len(conn.request.mock_calls))
    last_call = gerrit_util.time_sleep.mock_calls[-1]
    self.assertLessEqual(last_call, mock.call(422.0))

  def testReadHttpResponse_ServerError(self):
    self.readHttpResponse_ServerErrorHelper(status=404)
    self.readHttpResponse_ServerErrorHelper(status=409)
    self.readHttpResponse_ServerErrorHelper(status=429)
    self.readHttpResponse_ServerErrorHelper(status=500)

  def testReadHttpResponse_ServerErrorAndSuccess(self):
    conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'})
    conn.request.side_effect = [
        (mock.Mock(status=500), b''),
        (mock.Mock(status=200), b'content\xe2\x9c\x94'),
    ]

    self.assertEqual('content✔', gerrit_util.ReadHttpResponse(conn).getvalue())
    self.assertEqual(2, len(conn.request.mock_calls))
    gerrit_util.time_sleep.assert_called_once_with(10.0)

  def testReadHttpResponse_Expected404(self):
    conn = mock.Mock()
    conn.req_params = {'uri': 'uri', 'method': 'method'}
    conn.request.return_value = (mock.Mock(status=404), b'content\xe2\x9c\x94')

    content = gerrit_util.ReadHttpResponse(conn, (404,))
    self.assertEqual('', content.getvalue())

  @mock.patch('gerrit_util.ReadHttpResponse')
  def testReadHttpJsonResponse_NotJSON(self, mockReadHttpResponse):
    mockReadHttpResponse.return_value = StringIO('not json')
    with self.assertRaises(gerrit_util.GerritError) as cm:
      gerrit_util.ReadHttpJsonResponse(None)
    self.assertEqual(cm.exception.http_status, 200)
    self.assertEqual(
        cm.exception.message, '(200) Unexpected json output: not json')

  @mock.patch('gerrit_util.ReadHttpResponse')
  def testReadHttpJsonResponse_EmptyValue(self, mockReadHttpResponse):
    mockReadHttpResponse.return_value = StringIO(')]}\'')
    self.assertIsNone(gerrit_util.ReadHttpJsonResponse(None))

  @mock.patch('gerrit_util.ReadHttpResponse')
  def testReadHttpJsonResponse_JSON(self, mockReadHttpResponse):
    expected_value = {'foo': 'bar', 'baz': [1, '2', 3]}
    mockReadHttpResponse.return_value = StringIO(
        ')]}\'\n' + json.dumps(expected_value))
    self.assertEqual(expected_value, gerrit_util.ReadHttpJsonResponse(None))

  @mock.patch('gerrit_util.CreateHttpConn')
  @mock.patch('gerrit_util.ReadHttpJsonResponse')
  def testQueryChanges(self, mockJsonResponse, mockCreateHttpConn):
    gerrit_util.QueryChanges(
        'host', [('key', 'val'), ('foo', 'bar')], 'first param', limit=500,
        o_params=['PARAM_A', 'PARAM_B'], start='start')
    mockCreateHttpConn.assert_called_once_with(
        'host',
        ('changes/?q=first%20param+key:val+foo:bar'
         '&start=start'
         '&n=500'
         '&o=PARAM_A'
         '&o=PARAM_B'))

  def testQueryChanges_NoParams(self):
    self.assertRaises(RuntimeError, gerrit_util.QueryChanges, 'host', [])

  @mock.patch('gerrit_util.QueryChanges')
  def testGenerateAllChanges(self, mockQueryChanges):
    mockQueryChanges.side_effect = [
        # First results page
        [
            {'_number': '4'},
            {'_number': '3'},
            {'_number': '2', '_more_changes': True},
        ],
        # Second results page, there are new changes, so second page includes
        # some results from the first page.
        [
            {'_number': '2'},
            {'_number': '1'},
        ],
        # GenerateAllChanges queries again from the start to get any new
        # changes (5 in this case).
        [
            {'_number': '5'},
            {'_number': '4'},
            {'_number': '3', '_more_changes': True},

        ],
    ]

    changes = list(gerrit_util.GenerateAllChanges('host', 'params'))
    self.assertEqual(
        [
            {'_number': '4'},
            {'_number': '3'},
            {'_number': '2', '_more_changes': True},
            {'_number': '1'},
            {'_number': '5'},
        ],
        changes)
    self.assertEqual(
        [
            mock.call('host', 'params', None, 500, None, 0),
            mock.call('host', 'params', None, 500, None, 3),
            mock.call('host', 'params', None, 500, None, 0),
        ],
        mockQueryChanges.mock_calls)


if __name__ == '__main__':
  unittest.main()