You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			372 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			372 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
#!/usr/bin/env vpython3
 | 
						|
# coding=utf-8
 | 
						|
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
 | 
						|
# Use of this source code is governed by a BSD-style license that can be
 | 
						|
# found in the LICENSE file.
 | 
						|
 | 
						|
from __future__ import print_function
 | 
						|
from __future__ import unicode_literals
 | 
						|
 | 
						|
import io
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import tempfile
 | 
						|
import time
 | 
						|
import unittest
 | 
						|
 | 
						|
if sys.version_info.major == 2:
 | 
						|
  from StringIO import StringIO
 | 
						|
  import mock
 | 
						|
else:
 | 
						|
  from io import StringIO
 | 
						|
  from unittest import mock
 | 
						|
 | 
						|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 | 
						|
 | 
						|
from testing_support import trial_dir
 | 
						|
 | 
						|
import gclient_utils
 | 
						|
import subprocess2
 | 
						|
 | 
						|
 | 
						|
class CheckCallAndFilterTestCase(unittest.TestCase):
 | 
						|
  class ProcessIdMock(object):
 | 
						|
    def __init__(self, test_string, return_code=0):
 | 
						|
      self.stdout = test_string.encode('utf-8')
 | 
						|
      self.pid = 9284
 | 
						|
      self.return_code = return_code
 | 
						|
 | 
						|
    def wait(self):
 | 
						|
      return self.return_code
 | 
						|
 | 
						|
  def PopenMock(self, *args, **kwargs):
 | 
						|
    kid = self.kids.pop(0)
 | 
						|
    stdout = kwargs.get('stdout')
 | 
						|
    os.write(stdout, kid.stdout)
 | 
						|
    return kid
 | 
						|
 | 
						|
  def setUp(self):
 | 
						|
    super(CheckCallAndFilterTestCase, self).setUp()
 | 
						|
    self.printfn = io.StringIO()
 | 
						|
    self.stdout = io.BytesIO()
 | 
						|
    self.kids = []
 | 
						|
    if sys.version_info.major == 2:
 | 
						|
      mock.patch('sys.stdout', self.stdout).start()
 | 
						|
      mock.patch('__builtin__.print', self.printfn.write).start()
 | 
						|
    else:
 | 
						|
      mock.patch('sys.stdout', mock.Mock()).start()
 | 
						|
      mock.patch('sys.stdout.buffer', self.stdout).start()
 | 
						|
      mock.patch('sys.stdout.isatty', return_value=False).start()
 | 
						|
      mock.patch('builtins.print', self.printfn.write).start()
 | 
						|
    mock.patch('sys.stdout.flush', lambda: None).start()
 | 
						|
    self.addCleanup(mock.patch.stopall)
 | 
						|
 | 
						|
  @mock.patch('subprocess2.Popen')
 | 
						|
  def testCheckCallAndFilter(self, mockPopen):
 | 
						|
    cwd = 'bleh'
 | 
						|
    args = ['boo', 'foo', 'bar']
 | 
						|
    test_string = 'ahah\naccb\nallo\naddb\n✔'
 | 
						|
 | 
						|
    self.kids = [self.ProcessIdMock(test_string)]
 | 
						|
    mockPopen.side_effect = self.PopenMock
 | 
						|
 | 
						|
    line_list = []
 | 
						|
    result = gclient_utils.CheckCallAndFilter(
 | 
						|
        args, cwd=cwd, show_header=True, always_show_header=True,
 | 
						|
        filter_fn=line_list.append)
 | 
						|
 | 
						|
    self.assertEqual(result, test_string.encode('utf-8'))
 | 
						|
    self.assertEqual(line_list, [
 | 
						|
        '________ running \'boo foo bar\' in \'bleh\'\n',
 | 
						|
        'ahah',
 | 
						|
        'accb',
 | 
						|
        'allo',
 | 
						|
        'addb',
 | 
						|
        '✔'])
 | 
						|
    self.assertEqual(self.stdout.getvalue(), b'')
 | 
						|
 | 
						|
    mockPopen.assert_called_with(
 | 
						|
        args, cwd=cwd, stdout=mock.ANY, stderr=subprocess2.STDOUT,
 | 
						|
        bufsize=0)
 | 
						|
 | 
						|
  @mock.patch('time.sleep')
 | 
						|
  @mock.patch('subprocess2.Popen')
 | 
						|
  def testCheckCallAndFilter_RetryOnce(self, mockPopen, mockTime):
 | 
						|
    cwd = 'bleh'
 | 
						|
    args = ['boo', 'foo', 'bar']
 | 
						|
    test_string = 'ahah\naccb\nallo\naddb\n✔'
 | 
						|
 | 
						|
    self.kids = [
 | 
						|
        self.ProcessIdMock(test_string, 1),
 | 
						|
        self.ProcessIdMock(test_string, 0)
 | 
						|
    ]
 | 
						|
    mockPopen.side_effect = self.PopenMock
 | 
						|
 | 
						|
    line_list = []
 | 
						|
    result = gclient_utils.CheckCallAndFilter(
 | 
						|
        args, cwd=cwd, show_header=True, always_show_header=True,
 | 
						|
        filter_fn=line_list.append, retry=True)
 | 
						|
 | 
						|
    self.assertEqual(result, test_string.encode('utf-8'))
 | 
						|
 | 
						|
    self.assertEqual(line_list, [
 | 
						|
        '________ running \'boo foo bar\' in \'bleh\'\n',
 | 
						|
        'ahah',
 | 
						|
        'accb',
 | 
						|
        'allo',
 | 
						|
        'addb',
 | 
						|
        '✔',
 | 
						|
        '________ running \'boo foo bar\' in \'bleh\' attempt 2 / 4\n',
 | 
						|
        'ahah',
 | 
						|
        'accb',
 | 
						|
        'allo',
 | 
						|
        'addb',
 | 
						|
        '✔',
 | 
						|
    ])
 | 
						|
 | 
						|
    mockTime.assert_called_with(gclient_utils.RETRY_INITIAL_SLEEP)
 | 
						|
 | 
						|
    self.assertEqual(
 | 
						|
        mockPopen.mock_calls,
 | 
						|
        [
 | 
						|
            mock.call(
 | 
						|
                args, cwd=cwd, stdout=mock.ANY,
 | 
						|
                stderr=subprocess2.STDOUT, bufsize=0),
 | 
						|
            mock.call(
 | 
						|
                args, cwd=cwd, stdout=mock.ANY,
 | 
						|
                stderr=subprocess2.STDOUT, bufsize=0),
 | 
						|
        ])
 | 
						|
 | 
						|
    self.assertEqual(self.stdout.getvalue(), b'')
 | 
						|
    self.assertEqual(
 | 
						|
        self.printfn.getvalue(),
 | 
						|
        'WARNING: subprocess \'"boo" "foo" "bar"\' in bleh failed; will retry '
 | 
						|
        'after a short nap...')
 | 
						|
 | 
						|
  @mock.patch('subprocess2.Popen')
 | 
						|
  def testCheckCallAndFilter_PrintStdout(self, mockPopen):
 | 
						|
    cwd = 'bleh'
 | 
						|
    args = ['boo', 'foo', 'bar']
 | 
						|
    test_string = 'ahah\naccb\nallo\naddb\n✔'
 | 
						|
 | 
						|
    self.kids = [self.ProcessIdMock(test_string)]
 | 
						|
    mockPopen.side_effect = self.PopenMock
 | 
						|
 | 
						|
    result = gclient_utils.CheckCallAndFilter(
 | 
						|
        args, cwd=cwd, show_header=True, always_show_header=True,
 | 
						|
        print_stdout=True)
 | 
						|
 | 
						|
    self.assertEqual(result, test_string.encode('utf-8'))
 | 
						|
    self.assertEqual(self.stdout.getvalue().splitlines(), [
 | 
						|
        b"________ running 'boo foo bar' in 'bleh'",
 | 
						|
        b'ahah',
 | 
						|
        b'accb',
 | 
						|
        b'allo',
 | 
						|
        b'addb',
 | 
						|
        b'\xe2\x9c\x94',
 | 
						|
    ])
 | 
						|
 | 
						|
 | 
						|
class AnnotatedTestCase(unittest.TestCase):
 | 
						|
  def setUp(self):
 | 
						|
    self.out = gclient_utils.MakeFileAnnotated(io.BytesIO())
 | 
						|
    self.annotated = gclient_utils.MakeFileAnnotated(
 | 
						|
        io.BytesIO(), include_zero=True)
 | 
						|
 | 
						|
  def testWrite(self):
 | 
						|
    test_cases = [
 | 
						|
        ('test string\n', b'test string\n'),
 | 
						|
        (b'test string\n', b'test string\n'),
 | 
						|
        ('✔\n', b'\xe2\x9c\x94\n'),
 | 
						|
        (b'\xe2\x9c\x94\n', b'\xe2\x9c\x94\n'),
 | 
						|
        ('first line\nsecondline\n', b'first line\nsecondline\n'),
 | 
						|
        (b'first line\nsecondline\n', b'first line\nsecondline\n'),
 | 
						|
    ]
 | 
						|
 | 
						|
    for test_input, expected_output in test_cases:
 | 
						|
      out = gclient_utils.MakeFileAnnotated(io.BytesIO())
 | 
						|
      out.write(test_input)
 | 
						|
      self.assertEqual(out.getvalue(), expected_output)
 | 
						|
 | 
						|
  def testWrite_Annotated(self):
 | 
						|
    test_cases = [
 | 
						|
        ('test string\n', b'0>test string\n'),
 | 
						|
        (b'test string\n', b'0>test string\n'),
 | 
						|
        ('✔\n', b'0>\xe2\x9c\x94\n'),
 | 
						|
        (b'\xe2\x9c\x94\n', b'0>\xe2\x9c\x94\n'),
 | 
						|
        ('first line\nsecondline\n', b'0>first line\n0>secondline\n'),
 | 
						|
        (b'first line\nsecondline\n', b'0>first line\n0>secondline\n'),
 | 
						|
    ]
 | 
						|
 | 
						|
    for test_input, expected_output in test_cases:
 | 
						|
      out = gclient_utils.MakeFileAnnotated(io.BytesIO(), include_zero=True)
 | 
						|
      out.write(test_input)
 | 
						|
      self.assertEqual(out.getvalue(), expected_output)
 | 
						|
 | 
						|
  def testByteByByteInput(self):
 | 
						|
    self.out.write(b'\xe2')
 | 
						|
    self.out.write(b'\x9c')
 | 
						|
    self.out.write(b'\x94')
 | 
						|
    self.out.write(b'\n')
 | 
						|
    self.out.write(b'\xe2')
 | 
						|
    self.out.write(b'\n')
 | 
						|
    self.assertEqual(self.out.getvalue(), b'\xe2\x9c\x94\n\xe2\n')
 | 
						|
 | 
						|
  def testByteByByteInput_Annotated(self):
 | 
						|
    self.annotated.write(b'\xe2')
 | 
						|
    self.annotated.write(b'\x9c')
 | 
						|
    self.annotated.write(b'\x94')
 | 
						|
    self.annotated.write(b'\n')
 | 
						|
    self.annotated.write(b'\xe2')
 | 
						|
    self.annotated.write(b'\n')
 | 
						|
    self.assertEqual(self.annotated.getvalue(), b'0>\xe2\x9c\x94\n0>\xe2\n')
 | 
						|
 | 
						|
  def testFlush_Annotated(self):
 | 
						|
    self.annotated.write(b'first line\nsecond line')
 | 
						|
    self.assertEqual(self.annotated.getvalue(), b'0>first line\n')
 | 
						|
    self.annotated.flush()
 | 
						|
    self.assertEqual(
 | 
						|
        self.annotated.getvalue(), b'0>first line\n0>second line\n')
 | 
						|
 | 
						|
 | 
						|
class SplitUrlRevisionTestCase(unittest.TestCase):
 | 
						|
  def testSSHUrl(self):
 | 
						|
    url = "ssh://test@example.com/test.git"
 | 
						|
    rev = "ac345e52dc"
 | 
						|
    out_url, out_rev = gclient_utils.SplitUrlRevision(url)
 | 
						|
    self.assertEqual(out_rev, None)
 | 
						|
    self.assertEqual(out_url, url)
 | 
						|
    out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
 | 
						|
    self.assertEqual(out_rev, rev)
 | 
						|
    self.assertEqual(out_url, url)
 | 
						|
    url = "ssh://example.com/test.git"
 | 
						|
    out_url, out_rev = gclient_utils.SplitUrlRevision(url)
 | 
						|
    self.assertEqual(out_rev, None)
 | 
						|
    self.assertEqual(out_url, url)
 | 
						|
    out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
 | 
						|
    self.assertEqual(out_rev, rev)
 | 
						|
    self.assertEqual(out_url, url)
 | 
						|
    url = "ssh://example.com/git/test.git"
 | 
						|
    out_url, out_rev = gclient_utils.SplitUrlRevision(url)
 | 
						|
    self.assertEqual(out_rev, None)
 | 
						|
    self.assertEqual(out_url, url)
 | 
						|
    out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
 | 
						|
    self.assertEqual(out_rev, rev)
 | 
						|
    self.assertEqual(out_url, url)
 | 
						|
    rev = "test-stable"
 | 
						|
    out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
 | 
						|
    self.assertEqual(out_rev, rev)
 | 
						|
    self.assertEqual(out_url, url)
 | 
						|
    url = "ssh://user-name@example.com/~/test.git"
 | 
						|
    out_url, out_rev = gclient_utils.SplitUrlRevision(url)
 | 
						|
    self.assertEqual(out_rev, None)
 | 
						|
    self.assertEqual(out_url, url)
 | 
						|
    out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
 | 
						|
    self.assertEqual(out_rev, rev)
 | 
						|
    self.assertEqual(out_url, url)
 | 
						|
    url = "ssh://user-name@example.com/~username/test.git"
 | 
						|
    out_url, out_rev = gclient_utils.SplitUrlRevision(url)
 | 
						|
    self.assertEqual(out_rev, None)
 | 
						|
    self.assertEqual(out_url, url)
 | 
						|
    out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
 | 
						|
    self.assertEqual(out_rev, rev)
 | 
						|
    self.assertEqual(out_url, url)
 | 
						|
    url = "git@github.com:dart-lang/spark.git"
 | 
						|
    out_url, out_rev = gclient_utils.SplitUrlRevision(url)
 | 
						|
    self.assertEqual(out_rev, None)
 | 
						|
    self.assertEqual(out_url, url)
 | 
						|
    out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
 | 
						|
    self.assertEqual(out_rev, rev)
 | 
						|
    self.assertEqual(out_url, url)
 | 
						|
 | 
						|
  def testSVNUrl(self):
 | 
						|
    url = "svn://example.com/test"
 | 
						|
    rev = "ac345e52dc"
 | 
						|
    out_url, out_rev = gclient_utils.SplitUrlRevision(url)
 | 
						|
    self.assertEqual(out_rev, None)
 | 
						|
    self.assertEqual(out_url, url)
 | 
						|
    out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
 | 
						|
    self.assertEqual(out_rev, rev)
 | 
						|
    self.assertEqual(out_url, url)
 | 
						|
 | 
						|
 | 
						|
class GClientUtilsTest(trial_dir.TestCase):
 | 
						|
  def testHardToDelete(self):
 | 
						|
    # Use the fact that tearDown will delete the directory to make it hard to do
 | 
						|
    # so.
 | 
						|
    l1 = os.path.join(self.root_dir, 'l1')
 | 
						|
    l2 = os.path.join(l1, 'l2')
 | 
						|
    l3 = os.path.join(l2, 'l3')
 | 
						|
    f3 = os.path.join(l3, 'f3')
 | 
						|
    os.mkdir(l1)
 | 
						|
    os.mkdir(l2)
 | 
						|
    os.mkdir(l3)
 | 
						|
    gclient_utils.FileWrite(f3, 'foo')
 | 
						|
    os.chmod(f3, 0)
 | 
						|
    os.chmod(l3, 0)
 | 
						|
    os.chmod(l2, 0)
 | 
						|
    os.chmod(l1, 0)
 | 
						|
 | 
						|
  def testUpgradeToHttps(self):
 | 
						|
    values = [
 | 
						|
        ['', ''],
 | 
						|
        [None, None],
 | 
						|
        ['foo', 'https://foo'],
 | 
						|
        ['http://foo', 'https://foo'],
 | 
						|
        ['foo/', 'https://foo/'],
 | 
						|
        ['ssh-svn://foo', 'ssh-svn://foo'],
 | 
						|
        ['ssh-svn://foo/bar/', 'ssh-svn://foo/bar/'],
 | 
						|
        ['codereview.chromium.org', 'https://codereview.chromium.org'],
 | 
						|
        ['codereview.chromium.org/', 'https://codereview.chromium.org/'],
 | 
						|
        ['http://foo:10000', 'http://foo:10000'],
 | 
						|
        ['http://foo:10000/bar', 'http://foo:10000/bar'],
 | 
						|
        ['foo:10000', 'http://foo:10000'],
 | 
						|
        ['foo:', 'https://foo:'],
 | 
						|
    ]
 | 
						|
    for content, expected in values:
 | 
						|
      self.assertEqual(
 | 
						|
          expected, gclient_utils.UpgradeToHttps(content))
 | 
						|
 | 
						|
  def testParseCodereviewSettingsContent(self):
 | 
						|
    values = [
 | 
						|
        ['# bleh\n', {}],
 | 
						|
        ['\t# foo : bar\n', {}],
 | 
						|
        ['Foo:bar', {'Foo': 'bar'}],
 | 
						|
        ['Foo:bar:baz\n', {'Foo': 'bar:baz'}],
 | 
						|
        [' Foo : bar ', {'Foo': 'bar'}],
 | 
						|
        [' Foo : bar \n', {'Foo': 'bar'}],
 | 
						|
        ['a:b\n\rc:d\re:f', {'a': 'b', 'c': 'd', 'e': 'f'}],
 | 
						|
        ['an_url:http://value/', {'an_url': 'http://value/'}],
 | 
						|
        [
 | 
						|
          'CODE_REVIEW_SERVER : http://r/s',
 | 
						|
          {'CODE_REVIEW_SERVER': 'https://r/s'}
 | 
						|
        ],
 | 
						|
        ['VIEW_VC:http://r/s', {'VIEW_VC': 'https://r/s'}],
 | 
						|
    ]
 | 
						|
    for content, expected in values:
 | 
						|
      self.assertEqual(
 | 
						|
          expected, gclient_utils.ParseCodereviewSettingsContent(content))
 | 
						|
 | 
						|
  def testFileRead_Bytes(self):
 | 
						|
    with gclient_utils.temporary_file() as tmp:
 | 
						|
      gclient_utils.FileWrite(
 | 
						|
          tmp, b'foo \xe2\x9c bar', mode='wb', encoding=None)
 | 
						|
      self.assertEqual('foo \ufffd bar', gclient_utils.FileRead(tmp))
 | 
						|
 | 
						|
  def testFileRead_Unicode(self):
 | 
						|
    with gclient_utils.temporary_file() as tmp:
 | 
						|
      gclient_utils.FileWrite(tmp, 'foo ✔ bar')
 | 
						|
      self.assertEqual('foo ✔ bar', gclient_utils.FileRead(tmp))
 | 
						|
 | 
						|
  def testTemporaryFile(self):
 | 
						|
    with gclient_utils.temporary_file() as tmp:
 | 
						|
      gclient_utils.FileWrite(tmp, 'test')
 | 
						|
      self.assertEqual('test', gclient_utils.FileRead(tmp))
 | 
						|
    self.assertFalse(os.path.exists(tmp))
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
  unittest.main()
 | 
						|
 | 
						|
# vim: ts=2:sw=2:tw=80:et:
 |