#!/usr/bin/env vpython3
# coding=utf-8
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for fetch.py."""

import contextlib
import logging
import optparse
import os
import subprocess
import sys
import tempfile
import unittest

import distutils

if sys.version_info.major == 2:
  from StringIO import StringIO
  import mock
else:
  from io import StringIO
  from unittest import mock

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import fetch


class SystemExitMock(Exception):
  pass


class UsageMock(Exception):
  pass


class TestUtilityFunctions(unittest.TestCase):
  """This test case is against utility functions"""

  @mock.patch('sys.stdout', StringIO())
  @mock.patch('os.listdir', return_value=['foo.py', 'bar'])
  @mock.patch('sys.exit', side_effect=SystemExitMock)
  def test_usage_with_message(self, exit_mock, listdir):
    with self.assertRaises(SystemExitMock):
      fetch.usage('foo')
    exit_mock.assert_called_once_with(1)
    listdir.assert_called_once()

    stdout = sys.stdout.getvalue()
    self.assertTrue(stdout.startswith('Error: foo'))

    self._usage_static_message(stdout)

  @mock.patch('sys.stdout', StringIO())
  @mock.patch('os.listdir', return_value=['foo.py', 'bar'])
  @mock.patch('sys.exit', side_effect=SystemExitMock)
  def test_usage_with_no_message(self, exit_mock, listdir):
    with self.assertRaises(SystemExitMock):
      fetch.usage()
    exit_mock.assert_called_once_with(0)
    listdir.assert_called_once()

    self._usage_static_message(sys.stdout.getvalue())

  def _usage_static_message(self, stdout):
    valid_fetch_config_text = 'Valid fetch configs:'
    self.assertIn(valid_fetch_config_text, stdout)

    # split[0] contains static text, whereas split[1] contains list of configs
    split = stdout.split(valid_fetch_config_text)
    self.assertEqual(2, len(split))

    # verify a few fetch_configs
    self.assertIn('foo', split[1])
    self.assertNotIn('bar', split[1])

  @mock.patch('fetch.usage', side_effect=UsageMock)
  def test_handle_args_invalid_usage(self, usage):
    with self.assertRaises(UsageMock):
      fetch.handle_args(['filename', '-h'])
    usage.assert_called_with()

    with self.assertRaises(UsageMock):
      fetch.handle_args(['filename'])
    usage.assert_called_with('Must specify a config.')

    with self.assertRaises(UsageMock):
      fetch.handle_args(['filename', '--foo'])
    usage.assert_called_with('Invalid option --foo.')

    bad_arguments = [
        '--not-valid-param', '-not-valid-param=1', '--not-valid-param=1=2'
    ]

    for bad_argument in bad_arguments:
      with self.assertRaises(UsageMock):
        fetch.handle_args(['filename', 'foo', bad_argument])
      usage.assert_called_with('Got bad arguments [\'%s\']' % (bad_argument))

  @mock.patch('optparse.Values', return_value=None)
  def test_handle_args_valid_usage(self, values):
    response = fetch.handle_args(['filename', 'foo'])
    values.assert_called_with({
        'dry_run': False,
        'nohooks': False,
        'no_history': False,
        'force': False
    })
    self.assertEqual((None, 'foo', []), response)

    response = fetch.handle_args([
        'filename', '-n', '--dry-run', '--nohooks', '--no-history', '--force',
        'foo', '--some-param=1', '--bar=2'
    ])
    values.assert_called_with({
        'dry_run': True,
        'nohooks': True,
        'no_history': True,
        'force': True
    })
    self.assertEqual((None, 'foo', ['--some-param=1', '--bar=2']), response)

  @mock.patch('os.path.exists', return_value=False)
  @mock.patch('sys.stdout', StringIO())
  @mock.patch('sys.exit', side_effect=SystemExitMock)
  def test_run_config_fetch_not_found(self, exit_mock, exists):
    with self.assertRaises(SystemExitMock):
      fetch.run_config_fetch('foo', [])
    exit_mock.assert_called_with(1)
    exists.assert_called_once()

    self.assertEqual(1, len(exists.call_args[0]))
    self.assertTrue(exists.call_args[0][0].endswith('foo.py'))

    stdout = sys.stdout.getvalue()
    self.assertEqual('Could not find a config for foo\n', stdout)

  def test_run_config_fetch_integration(self):
    config = fetch.run_config_fetch('depot_tools', [])
    url = 'https://chromium.googlesource.com/chromium/tools/depot_tools.git'
    spec = {
        'type': 'gclient_git',
        'gclient_git_spec': {
            'solutions': [{
                'url': url,
                'managed': False,
                'name': 'depot_tools',
                'deps_file': 'DEPS',
            }],
        }
    }
    self.assertEqual((spec, 'depot_tools'), config)

  def test_checkout_factory(self):
    with self.assertRaises(KeyError):
      fetch.CheckoutFactory('invalid', {}, {}, "root")

    gclient = fetch.CheckoutFactory('gclient', {}, {}, "root")
    self.assertTrue(isinstance(gclient, fetch.GclientCheckout))


class TestCheckout(unittest.TestCase):
  def setUp(self):
    mock.patch('sys.stdout', StringIO()).start()
    self.addCleanup(mock.patch.stopall)

    self.opts = optparse.Values({'dry_run': False})
    self.checkout = fetch.Checkout(self.opts, {}, '')

  @contextlib.contextmanager
  def _temporary_file(self):
    """Creates a temporary file and removes it once it's out of scope"""
    name = tempfile.mktemp()
    try:
      with open(name, 'w+') as f:
        yield f
    finally:
      os.remove(name)

  def test_run_dry(self):
    self.opts.dry_run = True
    self.checkout.run(['foo-not-found'])
    self.assertEqual('Running: foo-not-found\n', sys.stdout.getvalue())

  def test_run_non_existing_command(self):
    with self.assertRaises(OSError):
      self.checkout.run(['foo-not-found'])
    self.assertEqual('Running: foo-not-found\n', sys.stdout.getvalue())

  def test_run_non_existing_command_return_stdout(self):
    with self.assertRaises(OSError):
      self.checkout.run(['foo-not-found'], return_stdout=True)
    self.assertEqual('Running: foo-not-found\n', sys.stdout.getvalue())

  @mock.patch('sys.stderr', StringIO())
  @mock.patch('sys.exit', side_effect=SystemExitMock)
  def test_run_wrong_param(self, exit_mock):
    # mocked version of sys.std* is not passed to subprocess, use temp files
    with self._temporary_file() as f:
      with self.assertRaises(subprocess.CalledProcessError):
        self.checkout.run([sys.executable, '-invalid-param'],
                          return_stdout=True,
                          stderr=f)
      f.seek(0)
      # Expect some message to stderr
      self.assertNotEqual('', f.read())
    self.assertEqual('', sys.stderr.getvalue())

    with self._temporary_file() as f:
      with self.assertRaises(SystemExitMock):
        self.checkout.run([sys.executable, '-invalid-param'], stderr=f)
      f.seek(0)
      # Expect some message to stderr
      self.assertNotEqual('', f.read())
    self.assertIn('Subprocess failed with return code', sys.stdout.getvalue())
    exit_mock.assert_called_once()

  def test_run_return_as_value(self):
    cmd = [sys.executable, '-c', 'print("foo")']

    response = self.checkout.run(cmd, return_stdout=True)
    # we expect no response other than information about command
    self.assertNotIn('foo', sys.stdout.getvalue().split('\n'))
    # this file should be included in response
    self.assertEqual('foo', response.strip())

  def test_run_print_to_stdout(self):
    cmd = [sys.executable, '-c', 'print("foo")']

    # mocked version of sys.std* is not passed to subprocess, use temp files
    with self._temporary_file() as stdout:
      with self._temporary_file() as stderr:
        response = self.checkout.run(cmd, stdout=stdout, stderr=stderr)
        stdout.seek(0)
        stderr.seek(0)
        self.assertEqual('foo\n', stdout.read())
        self.assertEqual('', stderr.read())

    stdout = sys.stdout.getvalue()
    self.assertEqual('', response)


class TestGClientCheckout(unittest.TestCase):
  def setUp(self):
    self.run = mock.patch('fetch.Checkout.run').start()

    self.opts = optparse.Values({'dry_run': False})
    self.checkout = fetch.GclientCheckout(self.opts, {}, '/root')

    self.addCleanup(mock.patch.stopall)

  @mock.patch('distutils.spawn.find_executable', return_value=True)
  def test_run_gclient_executable_found(self, find_executable):
    self.checkout.run_gclient('foo', 'bar', baz='qux')
    find_executable.assert_called_once_with('gclient')
    self.run.assert_called_once_with(('gclient', 'foo', 'bar'), baz='qux')

  @mock.patch('distutils.spawn.find_executable', return_value=False)
  def test_run_gclient_executable_not_found(self, find_executable):
    self.checkout.run_gclient('foo', 'bar', baz='qux')
    find_executable.assert_called_once_with('gclient')
    args = self.run.call_args[0][0]
    kargs = self.run.call_args[1]

    self.assertEqual(4, len(args))
    self.assertEqual(sys.executable, args[0])
    self.assertTrue(args[1].endswith('gclient.py'))
    self.assertEqual(('foo', 'bar'), args[2:])
    self.assertEqual({'baz': 'qux'}, kargs)


class TestGclientGitCheckout(unittest.TestCase):
  def setUp(self):
    self.run_gclient = mock.patch('fetch.GclientCheckout.run_gclient').start()
    self.run_git = mock.patch('fetch.GitCheckout.run_git').start()

    self.opts = optparse.Values({
        'dry_run': False,
        'nohooks': True,
        'no_history': False
    })
    specs = {
        'solutions': [{
            'foo': 'bar',
            'baz': 1
        }, {
            'foo': False
        }],
        'with_branch_heads': True,
    }

    self.checkout = fetch.GclientGitCheckout(self.opts, specs, '/root')

    self.addCleanup(mock.patch.stopall)

  def test_init(self):
    self.checkout.init()
    self.assertEqual(2, self.run_gclient.call_count)
    self.assertEqual(3, self.run_git.call_count)

    # Verify only expected commands and ignore arguments to avoid copying
    # commands from fetch.py
    self.assertEqual(['config', 'sync'],
                     [a[0][0] for a in self.run_gclient.call_args_list])
    self.assertEqual(['submodule', 'config', 'config'],
                     [a[0][0] for a in self.run_git.call_args_list])

    # First call to gclient, format spec is expected to be called so "foo" is
    # expected to be present
    args = self.run_gclient.call_args_list[0][0]
    self.assertEqual('config', args[0])
    self.assertIn('foo', args[2])


if __name__ == '__main__':
  logging.basicConfig(
      level=logging.DEBUG if '-v' in sys.argv else logging.ERROR)
  unittest.main()