#!/usr/bin/env vpython3 # Copyright 2014 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Test gsutil.py.""" from __future__ import unicode_literals import base64 import hashlib import io import json import os import shutil import subprocess import sys import tempfile import unittest import zipfile try: import urllib2 as urllib except ImportError: # For Py3 compatibility import urllib.request as urllib # Add depot_tools to path THIS_DIR = os.path.dirname(os.path.abspath(__file__)) DEPOT_TOOLS_DIR = os.path.dirname(THIS_DIR) sys.path.append(DEPOT_TOOLS_DIR) import gsutil class TestError(Exception): pass class FakeCall(object): def __init__(self): self.expectations = [] def add_expectation(self, *args, **kwargs): returns = kwargs.pop('_returns', None) self.expectations.append((args, kwargs, returns)) def __call__(self, *args, **kwargs): if not self.expectations: raise TestError('Got unexpected\n%s\n%s' % (args, kwargs)) exp_args, exp_kwargs, exp_returns = self.expectations.pop(0) if args != exp_args or kwargs != exp_kwargs: message = 'Expected:\n args: %s\n kwargs: %s\n' % (exp_args, exp_kwargs) message += 'Got:\n args: %s\n kwargs: %s\n' % (args, kwargs) raise TestError(message) return exp_returns class GsutilUnitTests(unittest.TestCase): def setUp(self): self.fake = FakeCall() self.tempdir = tempfile.mkdtemp() self.old_urlopen = getattr(urllib, 'urlopen') self.old_call = getattr(subprocess, 'call') setattr(urllib, 'urlopen', self.fake) setattr(subprocess, 'call', self.fake) def tearDown(self): self.assertEqual(self.fake.expectations, []) shutil.rmtree(self.tempdir) setattr(urllib, 'urlopen', self.old_urlopen) setattr(subprocess, 'call', self.old_call) def test_download_gsutil(self): version = '4.2' filename = 'gsutil_%s.zip' % version full_filename = os.path.join(self.tempdir, filename) fake_file = b'This is gsutil.zip' fake_file2 = b'This is other gsutil.zip' url = '%s%s' % (gsutil.GSUTIL_URL, filename) self.fake.add_expectation(url, _returns=io.BytesIO(fake_file)) self.assertEqual( gsutil.download_gsutil(version, self.tempdir), full_filename) with open(full_filename, 'rb') as f: self.assertEqual(fake_file, f.read()) metadata_url = gsutil.API_URL + filename md5_calc = hashlib.md5() md5_calc.update(fake_file) b64_md5 = base64.b64encode(md5_calc.hexdigest().encode('utf-8')) self.fake.add_expectation( metadata_url, _returns=io.BytesIO( json.dumps({'md5Hash': b64_md5.decode('utf-8')}).encode('utf-8'))) self.assertEqual( gsutil.download_gsutil(version, self.tempdir), full_filename) with open(full_filename, 'rb') as f: self.assertEqual(fake_file, f.read()) self.assertEqual(self.fake.expectations, []) self.fake.add_expectation( metadata_url, _returns=io.BytesIO( json.dumps({ 'md5Hash': base64.b64encode(b'aaaaaaa').decode('utf-8') # Bad MD5 }).encode('utf-8'))) self.fake.add_expectation(url, _returns=io.BytesIO(fake_file2)) self.assertEqual( gsutil.download_gsutil(version, self.tempdir), full_filename) with open(full_filename, 'rb') as f: self.assertEqual(fake_file2, f.read()) self.assertEqual(self.fake.expectations, []) def test_ensure_gsutil_full(self): version = '4.2' gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil') gsutil_bin = os.path.join(gsutil_dir, 'gsutil') gsutil_flag = os.path.join(gsutil_dir, 'install.flag') os.makedirs(gsutil_dir) zip_filename = 'gsutil_%s.zip' % version url = '%s%s' % (gsutil.GSUTIL_URL, zip_filename) _, tempzip = tempfile.mkstemp() fake_gsutil = 'Fake gsutil' with zipfile.ZipFile(tempzip, 'w') as zf: zf.writestr('gsutil/gsutil', fake_gsutil) with open(tempzip, 'rb') as f: self.fake.add_expectation(url, _returns=io.BytesIO(f.read())) # This should write the gsutil_bin with 'Fake gsutil' gsutil.ensure_gsutil(version, self.tempdir, False) self.assertTrue(os.path.exists(gsutil_bin)) with open(gsutil_bin, 'r') as f: self.assertEqual(f.read(), fake_gsutil) self.assertTrue(os.path.exists(gsutil_flag)) self.assertEqual(self.fake.expectations, []) def test_ensure_gsutil_short(self): version = '4.2' gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil') gsutil_bin = os.path.join(gsutil_dir, 'gsutil') gsutil_flag = os.path.join(gsutil_dir, 'install.flag') os.makedirs(gsutil_dir) with open(gsutil_bin, 'w') as f: f.write('Foobar') with open(gsutil_flag, 'w') as f: f.write('Barbaz') self.assertEqual( gsutil.ensure_gsutil(version, self.tempdir, False), gsutil_bin) if __name__ == '__main__': unittest.main()