diff --git a/download_from_google_storage.py b/download_from_google_storage.py new file mode 100755 index 000000000..b1891755d --- /dev/null +++ b/download_from_google_storage.py @@ -0,0 +1,339 @@ +#!/usr/bin/env python +# Copyright (c) 2012 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Download files from Google Storage based on SHA1 sums.""" + + +import hashlib +import optparse +import os +import Queue +import re +import sys +import threading +import time + +import subprocess2 + + +GSUTIL_DEFAULT_PATH = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + 'third_party', 'gsutil', 'gsutil') + + +class FileNotFoundError(IOError): + pass + + +class InvalidFileError(IOError): + pass + + +# Common utilities +class Gsutil(object): + """Call gsutil with some predefined settings. This is a convenience object, + and is also immutable.""" + def __init__(self, path, boto_path=None, timeout=None): + if not os.path.exists(path): + raise FileNotFoundError('GSUtil not found in %s' % path) + self.path = path + self.timeout = timeout + self.boto_path = boto_path + + def call(self, *args): + env = os.environ.copy() + if self.boto_path: + env['AWS_CREDENTIAL_FILE'] = self.boto_path + return subprocess2.call((sys.executable, self.path) + args, + env=env, + timeout=self.timeout) + + def check_call(self, *args): + env = os.environ.copy() + if self.boto_path: + env['AWS_CREDENTIAL_FILE'] = self.boto_path + ((out, err), code) = subprocess2.communicate( + (sys.executable, self.path) + args, + stdout=subprocess2.PIPE, + stderr=subprocess2.PIPE, + env=env, + timeout=self.timeout) + + # Parse output. + status_code_match = re.search('status=([0-9]+)', err) + if status_code_match: + return (int(status_code_match.group(1)), out, err) + if ('You are attempting to access protected data with ' + 'no configured credentials.' in err): + return (403, out, err) + if 'No such object' in err: + return (404, out, err) + return (code, out, err) + + +def check_bucket_permissions(bucket, gsutil): + if not bucket: + print >> sys.stderr, 'Missing bucket %s.' + return (None, 1) + base_url = 'gs://%s' % bucket + + code, _, ls_err = gsutil.check_call('ls', base_url) + if code == 403: + code, _, _ = gsutil.call('config') + if code != 0: + print >> sys.stderr, 'Error while authenticating to %s.' % base_url + elif code == 404: + print >> sys.stderr, '%s not found.' % base_url + elif code != 0: + print >> sys.stderr, ls_err + return (base_url, code) + + +def get_sha1(filename): + sha1 = hashlib.sha1() + with open(filename, 'rb') as f: + while True: + # Read in 1mb chunks, so it doesn't all have to be loaded into memory. + chunk = f.read(1024*1024) + if not chunk: + break + sha1.update(chunk) + return sha1.hexdigest() + + +# Download-specific code starts here + +def enumerate_work_queue(input_filename, work_queue, directory, + recursive, ignore_errors, output, sha1_file): + if sha1_file: + if not os.path.exists(input_filename): + if not ignore_errors: + raise FileNotFoundError('%s not found.' % input_filename) + print >> sys.stderr, '%s not found.' % input_filename + with open(input_filename, 'rb') as f: + sha1_match = re.match('^([A-Za-z0-9]{40})$', f.read(1024).rstrip()) + if sha1_match: + work_queue.put( + (sha1_match.groups(1)[0], input_filename.replace('.sha1', ''))) + return 1 + if not ignore_errors: + raise InvalidFileError('No sha1 sum found in %s.' % input_filename) + print >> sys.stderr, 'No sha1 sum found in %s.' % input_filename + return 0 + + if not directory: + work_queue.put((input_filename, output)) + return 1 + + work_queue_size = 0 + for root, dirs, files in os.walk(input_filename): + if not recursive: + for item in dirs[:]: + dirs.remove(item) + else: + for exclude in ['.svn', '.git']: + if exclude in dirs: + dirs.remove(exclude) + for filename in files: + full_path = os.path.join(root, filename) + if full_path.endswith('.sha1'): + with open(full_path, 'rb') as f: + sha1_match = re.match('^([A-Za-z0-9]{40})$', f.read(1024).rstrip()) + if sha1_match: + work_queue.put( + (sha1_match.groups(1)[0], full_path.replace('.sha1', ''))) + work_queue_size += 1 + else: + if not ignore_errors: + raise InvalidFileError('No sha1 sum found in %s.' % filename) + print >> sys.stderr, 'No sha1 sum found in %s.' % filename + return work_queue_size + + +def _downloader_worker_thread(thread_num, q, force, base_url, + gsutil, out_q, ret_codes): + while True: + input_sha1_sum, output_filename = q.get() + if input_sha1_sum is None: + return + if os.path.exists(output_filename) and not force: + if get_sha1(output_filename) == input_sha1_sum: + out_q.put( + '%d> File %s exists and SHA1 matches. Skipping.' % ( + thread_num, output_filename)) + continue + # Check if file exists. + file_url = '%s/%s' % (base_url, input_sha1_sum) + if gsutil.check_call('ls', file_url)[0] != 0: + out_q.put('%d> File %s for %s does not exist, skipping.' % ( + thread_num, file_url, output_filename)) + ret_codes.put((1, 'File %s for %s does not exist.' % ( + file_url, output_filename))) + continue + # Fetch the file. + out_q.put('%d> Downloading %s...' % ( + thread_num, output_filename)) + code, _, err = gsutil.check_call('cp', '-q', file_url, output_filename) + if code != 0: + out_q.put('%d> %s' % (thread_num, err)) + ret_codes.put((code, err)) + + +def printer_worker(output_queue): + while True: + line = output_queue.get() + # Its plausible we want to print empty lines. + if line is None: + break + print line + + +def download_from_google_storage( + input_filename, base_url, gsutil, num_threads, directory, recursive, + force, output, ignore_errors, sha1_file): + # Start up all the worker threads. + all_threads = [] + download_start = time.time() + stdout_queue = Queue.Queue() + work_queue = Queue.Queue() + ret_codes = Queue.Queue() + ret_codes.put((0, None)) + for thread_num in range(num_threads): + t = threading.Thread( + target=_downloader_worker_thread, + args=[thread_num, work_queue, force, base_url, + gsutil, stdout_queue, ret_codes]) + t.daemon = True + t.start() + all_threads.append(t) + printer_thread = threading.Thread(target=printer_worker, args=[stdout_queue]) + printer_thread.daemon = True + printer_thread.start() + + # Enumerate our work queue. + work_queue_size = enumerate_work_queue( + input_filename, work_queue, directory, recursive, + ignore_errors, output, sha1_file) + for _ in all_threads: + work_queue.put((None, None)) # Used to tell worker threads to stop. + + # Wait for all downloads to finish. + for t in all_threads: + t.join() + stdout_queue.put(None) + printer_thread.join() + + # See if we ran into any errors. + max_ret_code = 0 + for ret_code, message in ret_codes.queue: + max_ret_code = max(ret_code, max_ret_code) + if message: + print >> sys.stderr, message + if not max_ret_code: + print 'Success!' + + print 'Downloading %d files took %1f second(s)' % ( + work_queue_size, time.time() - download_start) + return max_ret_code + + +def main(args): + usage = ('usage: %prog [options] target\n' + 'Target must be:\n' + ' (default) a sha1 sum ([A-Za-z0-9]{40}).\n' + ' (-s or --sha1_file) a .sha1 file, containing a sha1 sum on ' + 'the first line.\n' + ' (-d or --directory) A directory to scan for .sha1 files.') + parser = optparse.OptionParser(usage) + parser.add_option('-o', '--output', + help='Specify the output file name. Defaults to: ' + '(a) Given a SHA1 hash, the name is the SHA1 hash. ' + '(b) Given a .sha1 file or directory, the name will ' + 'match (.*).sha1.') + parser.add_option('-b', '--bucket', + help='Google Storage bucket to fetch from.') + parser.add_option('-e', '--boto', + help='Specify a custom boto file.') + parser.add_option('-c', '--no_resume', action='store_true', + help='Resume download if file is partially downloaded.') + parser.add_option('-f', '--force', action='store_true', + help='Force download even if local file exists.') + parser.add_option('-i', '--ignore_errors', action='store_true', + help='Don\'t throw error if we find an invalid .sha1 file.') + parser.add_option('-r', '--recursive', action='store_true', + help='Scan folders recursively for .sha1 files. ' + 'Must be used with -d/--directory') + parser.add_option('-t', '--num_threads', default=1, type='int', + help='Number of downloader threads to run.') + parser.add_option('-d', '--directory', action='store_true', + help='The target is a directory. ' + 'Cannot be used with -s/--sha1_file.') + parser.add_option('-s', '--sha1_file', action='store_true', + help='The target is a file containing a sha1 sum. ' + 'Cannot be used with -d/--directory.') + + (options, args) = parser.parse_args() + if not args: + parser.error('Missing target.') + if len(args) > 1: + parser.error('Too many targets.') + if not options.bucket: + parser.error('Missing bucket. Specify bucket with --bucket.') + if options.sha1_file and options.directory: + parser.error('Both --directory and --sha1_file are specified, ' + 'can only specify one.') + if options.recursive and not options.directory: + parser.error('--recursive specified but --directory not specified.') + if options.output and options.directory: + parser.error('--directory is specified, so --output has no effect.') + input_filename = args[0] + + # Set output filename if not specified. + if not options.output and not options.directory: + if not options.sha1_file: + # Target is a sha1 sum, so output filename would also be the sha1 sum. + options.output = input_filename + elif options.sha1_file: + # Target is a .sha1 file. + if not input_filename.endswith('.sha1'): + parser.error('--sha1_file is specified, but the input filename ' + 'does not end with .sha1, and no --output is specified. ' + 'Either make sure the input filename has a .sha1 ' + 'extension, or specify --output.') + options.output = input_filename[:-5] + else: + parser.error('Unreachable state.') + + # Check if output file already exists. + if not options.directory and not options.force and not options.no_resume: + if os.path.exists(options.output): + parser.error('Output file %s exists and --no_resume is specified.' + % options.output) + + # Make sure we can find a working instance of gsutil. + if os.path.exists(GSUTIL_DEFAULT_PATH): + gsutil = Gsutil(GSUTIL_DEFAULT_PATH) + else: + gsutil = None + for path in os.environ["PATH"].split(os.pathsep): + if os.path.exists(path) and 'gsutil' in os.listdir(path): + gsutil = Gsutil(os.path.join(path, 'gsutil')) + if not gsutil: + parser.error('gsutil not found in %s, bad depot_tools checkout?' % + GSUTIL_DEFAULT_PATH) + + # Check we have a valid bucket with valid permissions. + base_url, code = check_bucket_permissions(options.bucket, gsutil) + if code: + return code + + return download_from_google_storage( + input_filename, base_url, gsutil, options.num_threads, options.directory, + options.recursive, options.force, options.output, options.ignore_errors, + options.sha1_file) + + +if __name__ == '__main__': + sys.exit(main(sys.argv)) \ No newline at end of file diff --git a/tests/download_from_google_storage_unittests.py b/tests/download_from_google_storage_unittests.py new file mode 100755 index 000000000..abf12d991 --- /dev/null +++ b/tests/download_from_google_storage_unittests.py @@ -0,0 +1,282 @@ +#!/usr/bin/env python +# Copyright (c) 2012 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +# pylint: disable=W0212 + +"""Unit tests for download_from_google_storage.py.""" + +import optparse +import os +import Queue +import shutil +import sys +import tempfile +import threading +import unittest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import upload_to_google_storage +import download_from_google_storage + +# ../third_party/gsutil/gsutil +GSUTIL_DEFAULT_PATH = os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), + 'third_party', 'gsutil', 'gsutil') +TEST_DIR = os.path.dirname(os.path.abspath(__file__)) + + +class GsutilMock(object): + def __init__(self, path, boto_path=None, timeout=None): + self.path = path + self.timeout = timeout + self.boto_path = boto_path + self.expected = [] + self.history = [] + self.lock = threading.Lock() + + def add_expected(self, return_code, out, err): + self.expected.append((return_code, out, err)) + + def append_history(self, method, args): + self.history.append((method, args)) + + def call(self, *args): + with self.lock: + self.append_history('call', args) + if self.expected: + return self.expected.pop(0)[0] + else: + return 0 + + def check_call(self, *args): + with self.lock: + self.append_history('check_call', args) + if self.expected: + return self.expected.pop(0) + else: + return (0, '', '') + + +class GstoolsUnitTests(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.mkdtemp(prefix='gstools_test') + self.base_path = os.path.join(self.temp_dir, 'test_files') + shutil.copytree(os.path.join(TEST_DIR, 'gstools'), self.base_path) + + def cleanUp(self): + shutil.rmtree(self.temp_dir) + + def test_gsutil(self): + gsutil = download_from_google_storage.Gsutil(GSUTIL_DEFAULT_PATH) + self.assertEqual(gsutil.path, GSUTIL_DEFAULT_PATH) + code, _, err = gsutil.check_call() + self.assertEqual(code, 0) + self.assertEqual(err, '') + + def test_gsutil_version(self): + gsutil = download_from_google_storage.Gsutil(GSUTIL_DEFAULT_PATH) + _, _, err = gsutil.check_call('version') + err_lines = err.splitlines() + self.assertEqual(err_lines[0], 'gsutil version 3.25') + self.assertEqual( + err_lines[1], + 'checksum ce71ac982f1148315e7fa65cff2f83e8 (OK)') + + def test_get_sha1(self): + lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') + self.assertEqual( + download_from_google_storage.get_sha1(lorem_ipsum), + '7871c8e24da15bad8b0be2c36edc9dc77e37727f') + + def test_get_md5(self): + lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') + self.assertEqual( + upload_to_google_storage.get_md5(lorem_ipsum), + '634d7c1ed3545383837428f031840a1e') + + def test_get_md5_cached_read(self): + lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') + # Use a fake 'stale' MD5 sum. Expected behavior is to return stale sum. + self.assertEqual( + upload_to_google_storage.get_md5_cached(lorem_ipsum), + '734d7c1ed3545383837428f031840a1e') + + def test_get_md5_cached_write(self): + lorem_ipsum2 = os.path.join(self.base_path, 'lorem_ipsum2.txt') + lorem_ipsum2_md5 = os.path.join(self.base_path, 'lorem_ipsum2.txt.md5') + if os.path.exists(lorem_ipsum2_md5): + os.remove(lorem_ipsum2_md5) + # Use a fake 'stale' MD5 sum. Expected behavior is to return stale sum. + self.assertEqual( + upload_to_google_storage.get_md5_cached(lorem_ipsum2), + '4c02d1eb455a0f22c575265d17b84b6d') + self.assertTrue(os.path.exists(lorem_ipsum2_md5)) + self.assertEqual( + open(lorem_ipsum2_md5, 'rb').read(), + '4c02d1eb455a0f22c575265d17b84b6d') + os.remove(lorem_ipsum2_md5) # Clean up. + self.assertFalse(os.path.exists(lorem_ipsum2_md5)) + + +class DownloadTests(unittest.TestCase): + def setUp(self): + self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH) + self.temp_dir = tempfile.mkdtemp(prefix='gstools_test') + self.checkout_test_files = os.path.join( + TEST_DIR, 'gstools', 'download_test_data') + self.base_path = os.path.join( + self.temp_dir, 'download_test_data') + shutil.copytree(self.checkout_test_files, self.base_path) + self.base_url = 'gs://sometesturl' + self.parser = optparse.OptionParser() + self.queue = Queue.Queue() + self.ret_codes = Queue.Queue() + self.lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') + self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' + self.maxDiff = None + + def cleanUp(self): + shutil.rmtree(self.temp_dir) + + def test_enumerate_files_non_recursive(self): + queue_size = download_from_google_storage.enumerate_work_queue( + self.base_path, self.queue, True, False, False, None, False) + expected_queue = [ + ('e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe', + os.path.join(self.base_path, 'rootfolder_text.txt')), + ('7871c8e24da15bad8b0be2c36edc9dc77e37727f', + os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt'))] + self.assertEqual(sorted(expected_queue), sorted(self.queue.queue)) + self.assertEqual(queue_size, 2) + + def test_enumerate_files_recursive(self): + queue_size = download_from_google_storage.enumerate_work_queue( + self.base_path, self.queue, True, True, False, None, False) + expected_queue = [ + ('e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe', + os.path.join(self.base_path, 'rootfolder_text.txt')), + ('7871c8e24da15bad8b0be2c36edc9dc77e37727f', + os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt')), + ('b5415aa0b64006a95c0c409182e628881d6d6463', + os.path.join(self.base_path, 'subfolder', 'subfolder_text.txt'))] + self.assertEqual(sorted(expected_queue), sorted(self.queue.queue)) + self.assertEqual(queue_size, 3) + + def test_download_worker_single_file(self): + sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' + input_filename = '%s/%s' % (self.base_url, sha1_hash) + output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt') + self.queue.put((sha1_hash, output_filename)) + self.queue.put((None, None)) + stdout_queue = Queue.Queue() + download_from_google_storage._downloader_worker_thread( + 0, self.queue, False, self.base_url, self.gsutil, + stdout_queue, self.ret_codes) + expected_calls = [ + ('check_call', + ('ls', input_filename)), + ('check_call', + ('cp', '-q', input_filename, output_filename))] + expected_output = [ + '0> Downloading %s...' % output_filename] + expected_ret_codes = [] + self.assertEqual(list(stdout_queue.queue), expected_output) + self.assertEqual(self.gsutil.history, expected_calls) + self.assertEqual(list(self.ret_codes.queue), expected_ret_codes) + + def test_download_worker_skips_file(self): + sha1_hash = 'e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe' + output_filename = os.path.join(self.base_path, 'rootfolder_text.txt') + self.queue.put((sha1_hash, output_filename)) + self.queue.put((None, None)) + stdout_queue = Queue.Queue() + download_from_google_storage._downloader_worker_thread( + 0, self.queue, False, self.base_url, self.gsutil, + stdout_queue, self.ret_codes) + expected_output = [ + '0> File %s exists and SHA1 matches. Skipping.' % output_filename + ] + self.assertEqual(list(stdout_queue.queue), expected_output) + self.assertEqual(self.gsutil.history, []) + + def test_download_worker_skips_not_found_file(self): + sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' + input_filename = '%s/%s' % (self.base_url, sha1_hash) + output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt') + self.queue.put((sha1_hash, output_filename)) + self.queue.put((None, None)) + stdout_queue = Queue.Queue() + self.gsutil.add_expected(1, '', '') # Return error when 'ls' is called. + download_from_google_storage._downloader_worker_thread( + 0, self.queue, False, self.base_url, self.gsutil, + stdout_queue, self.ret_codes) + expected_output = [ + '0> File %s for %s does not exist, skipping.' % ( + input_filename, output_filename), + ] + expected_calls = [ + ('check_call', + ('ls', input_filename)) + ] + expected_ret_codes = [ + (1, 'File %s for %s does not exist.' % ( + input_filename, output_filename)) + ] + self.assertEqual(list(stdout_queue.queue), expected_output) + self.assertEqual(self.gsutil.history, expected_calls) + self.assertEqual(list(self.ret_codes.queue), expected_ret_codes) + + def test_download_cp_fails(self): + sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' + input_filename = '%s/%s' % (self.base_url, sha1_hash) + output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt') + self.gsutil.add_expected(0, '', '') + self.gsutil.add_expected(101, '', 'Test error message.') + code = download_from_google_storage.download_from_google_storage( + input_filename=sha1_hash, + base_url=self.base_url, + gsutil=self.gsutil, + num_threads=1, + directory=False, + recursive=False, + force=True, + output=output_filename, + ignore_errors=False, + sha1_file=False) + expected_calls = [ + ('check_call', + ('ls', input_filename)), + ('check_call', + ('cp', '-q', input_filename, output_filename)) + ] + self.assertEqual(self.gsutil.history, expected_calls) + self.assertEqual(code, 101) + + def test_download_directory_no_recursive_non_force(self): + sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' + input_filename = '%s/%s' % (self.base_url, sha1_hash) + output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt') + code = download_from_google_storage.download_from_google_storage( + input_filename=self.base_path, + base_url=self.base_url, + gsutil=self.gsutil, + num_threads=1, + directory=True, + recursive=False, + force=False, + output=None, + ignore_errors=False, + sha1_file=False) + expected_calls = [ + ('check_call', + ('ls', input_filename)), + ('check_call', + ('cp', '-q', input_filename, output_filename))] + self.assertEqual(self.gsutil.history, expected_calls) + self.assertEqual(code, 0) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/gstools/download_test_data/rootfolder_text.txt b/tests/gstools/download_test_data/rootfolder_text.txt new file mode 100644 index 000000000..6de7b8c69 --- /dev/null +++ b/tests/gstools/download_test_data/rootfolder_text.txt @@ -0,0 +1 @@ +This is a test file. diff --git a/tests/gstools/download_test_data/rootfolder_text.txt.sha1 b/tests/gstools/download_test_data/rootfolder_text.txt.sha1 new file mode 100644 index 000000000..212aed339 --- /dev/null +++ b/tests/gstools/download_test_data/rootfolder_text.txt.sha1 @@ -0,0 +1 @@ +e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe diff --git a/tests/gstools/download_test_data/subfolder/subfolder_text.txt b/tests/gstools/download_test_data/subfolder/subfolder_text.txt new file mode 100644 index 000000000..27ea4400c --- /dev/null +++ b/tests/gstools/download_test_data/subfolder/subfolder_text.txt @@ -0,0 +1,2 @@ +This is a test file. +This file exists in a subfolder diff --git a/tests/gstools/download_test_data/subfolder/subfolder_text.txt.sha1 b/tests/gstools/download_test_data/subfolder/subfolder_text.txt.sha1 new file mode 100644 index 000000000..096b6feb8 --- /dev/null +++ b/tests/gstools/download_test_data/subfolder/subfolder_text.txt.sha1 @@ -0,0 +1 @@ +b5415aa0b64006a95c0c409182e628881d6d6463 diff --git a/tests/gstools/download_test_data/uploaded_lorem_ipsum.txt.sha1 b/tests/gstools/download_test_data/uploaded_lorem_ipsum.txt.sha1 new file mode 100644 index 000000000..9723fef71 --- /dev/null +++ b/tests/gstools/download_test_data/uploaded_lorem_ipsum.txt.sha1 @@ -0,0 +1 @@ +7871c8e24da15bad8b0be2c36edc9dc77e37727f diff --git a/tests/gstools/lorem_ipsum.txt b/tests/gstools/lorem_ipsum.txt new file mode 100644 index 000000000..0dc0fba09 --- /dev/null +++ b/tests/gstools/lorem_ipsum.txt @@ -0,0 +1,6 @@ +Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod +tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, +quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo +consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse +cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non +proident, sunt in culpa qui officia deserunt mollit anim id est laborum. diff --git a/tests/gstools/lorem_ipsum.txt.md5 b/tests/gstools/lorem_ipsum.txt.md5 new file mode 100644 index 000000000..3aede178d --- /dev/null +++ b/tests/gstools/lorem_ipsum.txt.md5 @@ -0,0 +1 @@ +734d7c1ed3545383837428f031840a1e diff --git a/tests/gstools/lorem_ipsum2.txt b/tests/gstools/lorem_ipsum2.txt new file mode 100644 index 000000000..76da195ab --- /dev/null +++ b/tests/gstools/lorem_ipsum2.txt @@ -0,0 +1,7 @@ +Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod +tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, +quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo +consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse +cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non +proident, sunt in culpa qui officia deserunt mollit anim id est laborum. +This is the second file. diff --git a/tests/upload_to_google_storage_unittests.py b/tests/upload_to_google_storage_unittests.py new file mode 100755 index 000000000..8879f5cf8 --- /dev/null +++ b/tests/upload_to_google_storage_unittests.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python +# Copyright (c) 2012 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Unit tests for upload_to_google_storage.py.""" + +import optparse +import os +import Queue +import shutil +import StringIO +import sys +import tempfile +import threading +import unittest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import upload_to_google_storage +from download_from_google_storage_unittests import GsutilMock + +# ../third_party/gsutil/gsutil +GSUTIL_DEFAULT_PATH = os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), + 'third_party', 'gsutil', 'gsutil') +TEST_DIR = os.path.dirname(os.path.abspath(__file__)) + + +class UploadTests(unittest.TestCase): + def setUp(self): + self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH) + self.temp_dir = tempfile.mkdtemp(prefix='gstools_test') + self.base_path = os.path.join(self.temp_dir, 'gstools') + shutil.copytree(os.path.join(TEST_DIR, 'gstools'), self.base_path) + self.base_url = 'gs://sometesturl' + self.parser = optparse.OptionParser() + self.ret_codes = Queue.Queue() + self.stdout_queue = Queue.Queue() + self.lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') + self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' + + def cleanUp(self): + shutil.rmtree(self.temp_dir) + sys.stdin = sys.__stdin__ + + def test_upload_single_file(self): + filenames = [self.lorem_ipsum] + output_filename = '%s.sha1' % self.lorem_ipsum + code = upload_to_google_storage.upload_to_google_storage( + filenames, self.base_url, self.gsutil, True, False, 1, False) + self.assertEqual( + self.gsutil.history, + [('check_call', + ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))), + ('check_call', + ('cp', '-q', filenames[0], '%s/%s' % (self.base_url, + self.lorem_ipsum_sha1)))]) + self.assertTrue(os.path.exists(output_filename)) + self.assertEqual( + open(output_filename, 'rb').read(), + '7871c8e24da15bad8b0be2c36edc9dc77e37727f') + os.remove(output_filename) + self.assertEqual(code, 0) + + def test_upload_single_file_remote_exists(self): + filenames = [self.lorem_ipsum] + output_filename = '%s.sha1' % self.lorem_ipsum + etag_string = 'ETag: 634d7c1ed3545383837428f031840a1e' + self.gsutil.add_expected(0, '', '') + self.gsutil.add_expected(0, etag_string, '') + code = upload_to_google_storage.upload_to_google_storage( + filenames, self.base_url, self.gsutil, False, False, 1, False) + self.assertEqual( + self.gsutil.history, + [('check_call', + ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))), + ('check_call', + ('ls', '-L', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1)))]) + self.assertTrue(os.path.exists(output_filename)) + self.assertEqual( + open(output_filename, 'rb').read(), + '7871c8e24da15bad8b0be2c36edc9dc77e37727f') + os.remove(output_filename) + self.assertEqual(code, 0) + + def test_upload_worker_errors(self): + work_queue = Queue.Queue() + work_queue.put((self.lorem_ipsum, self.lorem_ipsum_sha1)) + work_queue.put((None, None)) + self.gsutil.add_expected(1, '', '') # For the first ls call. + self.gsutil.add_expected(20, '', 'Expected error message') + # pylint: disable=W0212 + upload_to_google_storage._upload_worker( + 0, + work_queue, + self.base_url, + self.gsutil, + threading.Lock(), + False, + False, + self.stdout_queue, + self.ret_codes) + expected_ret_codes = [ + (20, + 'Encountered error on uploading %s to %s/%s\nExpected error message' % + (self.lorem_ipsum, self.base_url, self.lorem_ipsum_sha1))] + self.assertEqual(list(self.ret_codes.queue), expected_ret_codes) + + def test_skip_hashing(self): + filenames = [self.lorem_ipsum] + output_filename = '%s.sha1' % self.lorem_ipsum + fake_hash = '6871c8e24da15bad8b0be2c36edc9dc77e37727f' + with open(output_filename, 'wb') as f: + f.write(fake_hash) # Fake hash. + code = upload_to_google_storage.upload_to_google_storage( + filenames, self.base_url, self.gsutil, False, False, 1, True) + self.assertEqual( + self.gsutil.history, + [('check_call', + ('ls', '%s/%s' % (self.base_url, fake_hash))), + ('check_call', + ('ls', '-L', '%s/%s' % (self.base_url, fake_hash))), + ('check_call', + ('cp', '-q', filenames[0], '%s/%s' % (self.base_url, fake_hash)))]) + self.assertEqual( + open(output_filename, 'rb').read(), fake_hash) + os.remove(output_filename) + self.assertEqual(code, 0) + + def test_get_targets_no_args(self): + try: + upload_to_google_storage.get_targets([], self.parser, False) + self.fail() + except SystemExit, e: + self.assertEqual(e.code, 2) + + def test_get_targets_passthrough(self): + result = upload_to_google_storage.get_targets( + ['a', 'b', 'c', 'd', 'e'], + self.parser, + False) + self.assertEqual(result, ['a', 'b', 'c', 'd', 'e']) + + def test_get_targets_multiple_stdin(self): + inputs = ['a', 'b', 'c', 'd', 'e'] + sys.stdin = StringIO.StringIO(os.linesep.join(inputs)) + result = upload_to_google_storage.get_targets( + ['-'], + self.parser, + False) + self.assertEqual(result, inputs) + + def test_get_targets_multiple_stdin_null(self): + inputs = ['a', 'b', 'c', 'd', 'e'] + sys.stdin = StringIO.StringIO('\0'.join(inputs)) + result = upload_to_google_storage.get_targets( + ['-'], + self.parser, + True) + self.assertEqual(result, inputs) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/upload_to_google_storage.py b/upload_to_google_storage.py new file mode 100755 index 000000000..5ff85c781 --- /dev/null +++ b/upload_to_google_storage.py @@ -0,0 +1,245 @@ +#!/usr/bin/env python +# Copyright (c) 2012 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Uploads files to Google Storage content addressed.""" + +import hashlib +import optparse +import os +import Queue +import re +import sys +import threading +import time + +from download_from_google_storage import check_bucket_permissions +from download_from_google_storage import get_sha1 +from download_from_google_storage import Gsutil +from download_from_google_storage import printer_worker + +GSUTIL_DEFAULT_PATH = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + 'third_party', 'gsutil', 'gsutil') + +USAGE_STRING = """%prog [options] target [target2 ...]. +Target is the file intended to be uploaded to Google Storage. +If target is "-", then a list of files will be taken from standard input + +This script will generate a file (original filename).sha1 containing the +sha1 sum of the uploaded file. +It is recommended that the .sha1 file is checked into the repository, +the original file removed from the repository, and a hook added to the +DEPS file to call download_from_google_storage.py. + +Example usages +-------------- + +Scan the current directory and upload all files larger than 1MB: +find . -name .svn -prune -o -size +1000k -type f -print0 | %prog -0 -b bkt - +(Replace "bkt" with the name of a writable bucket.) +""" + + +def get_md5(filename): + md5_calculator = hashlib.md5() + with open(filename, 'rb') as f: + while True: + chunk = f.read(1024*1024) + if not chunk: + break + md5_calculator.update(chunk) + return md5_calculator.hexdigest() + + +def get_md5_cached(filename): + """Don't calculate the MD5 if we can find a .md5 file.""" + # See if we can find an existing MD5 sum stored in a file. + if os.path.exists('%s.md5' % filename): + with open('%s.md5' % filename, 'rb') as f: + md5_match = re.search('([a-z0-9]{32})', f.read()) + if md5_match: + return md5_match.group(1) + else: + md5_hash = get_md5(filename) + with open('%s.md5' % filename, 'wb') as f: + f.write(md5_hash) + return md5_hash + + +def _upload_worker( + thread_num, upload_queue, base_url, gsutil, md5_lock, force, + use_md5, stdout_queue, ret_codes): + while True: + filename, sha1_sum = upload_queue.get() + if not filename: + break + file_url = '%s/%s' % (base_url, sha1_sum) + if gsutil.check_call('ls', file_url)[0] == 0 and not force: + # File exists, check MD5 hash. + _, out, _ = gsutil.check_call('ls', '-L', file_url) + etag_match = re.search('ETag:\s+([a-z0-9]{32})', out) + if etag_match: + remote_md5 = etag_match.group(1) + # Calculate the MD5 checksum to match it to Google Storage's ETag. + with md5_lock: + if use_md5: + local_md5 = get_md5_cached(filename) + else: + local_md5 = get_md5(filename) + if local_md5 == remote_md5: + stdout_queue.put( + '%d> File %s already exists and MD5 matches, upload skipped' % + (thread_num, filename)) + continue + stdout_queue.put('%d> Uploading %s...' % ( + thread_num, filename)) + code, _, err = gsutil.check_call('cp', '-q', filename, file_url) + if code != 0: + ret_codes.put( + (code, + 'Encountered error on uploading %s to %s\n%s' % + (filename, file_url, err))) + continue + + +def get_targets(args, parser, use_null_terminator): + if not args: + parser.error('Missing target.') + + if len(args) == 1 and args[0] == '-': + # Take stdin as a newline or null seperated list of files. + if use_null_terminator: + return sys.stdin.read().split('\0') + else: + return sys.stdin.read().splitlines() + else: + return args + + +def upload_to_google_storage( + input_filenames, base_url, gsutil, force, + use_md5, num_threads, skip_hashing): + # We only want one MD5 calculation happening at a time to avoid HD thrashing. + md5_lock = threading.Lock() + + # Start up all the worker threads plus the printer thread. + all_threads = [] + ret_codes = Queue.Queue() + ret_codes.put((0, None)) + upload_queue = Queue.Queue() + upload_timer = time.time() + stdout_queue = Queue.Queue() + printer_thread = threading.Thread(target=printer_worker, args=[stdout_queue]) + printer_thread.daemon = True + printer_thread.start() + for thread_num in range(num_threads): + t = threading.Thread( + target=_upload_worker, + args=[thread_num, upload_queue, base_url, gsutil, md5_lock, + force, use_md5, stdout_queue, ret_codes]) + t.daemon = True + t.start() + all_threads.append(t) + + # We want to hash everything in a single thread since its faster. + # The bottleneck is in disk IO, not CPU. + hashing_start = time.time() + for filename in input_filenames: + if not os.path.exists(filename): + stdout_queue.put('Main> Error: %s not found, skipping.' % filename) + continue + if os.path.exists('%s.sha1' % filename) and skip_hashing: + stdout_queue.put( + 'Main> Found hash for %s, sha1 calculation skipped.' % filename) + with open(filename + '.sha1', 'rb') as f: + sha1_file = f.read(1024) + if not re.match('^([a-z0-9]{40})$', sha1_file): + print >> sys.stderr, 'Invalid sha1 hash file %s.sha1' % filename + return 1 + upload_queue.put((filename, sha1_file)) + continue + stdout_queue.put('Main> Calculating hash for %s...' % filename) + sha1_sum = get_sha1(filename) + with open(filename + '.sha1', 'wb') as f: + f.write(sha1_sum) + stdout_queue.put('Main> Done calculating hash for %s.' % filename) + upload_queue.put((filename, sha1_sum)) + hashing_duration = time.time() - hashing_start + + # Wait for everything to finish. + for _ in all_threads: + upload_queue.put((None, None)) # To mark the end of the work queue. + for t in all_threads: + t.join() + stdout_queue.put(None) + printer_thread.join() + + # Print timing information. + print 'Hashing %s files took %1f seconds' % ( + len(input_filenames), hashing_duration) + print 'Uploading took %1f seconds' % (time.time() - upload_timer) + + # See if we ran into any errors. + max_ret_code = 0 + for ret_code, message in ret_codes.queue: + max_ret_code = max(ret_code, max_ret_code) + if message: + print >> sys.stderr, message + + if not max_ret_code: + print 'Success!' + + return max_ret_code + + +def main(args): + parser = optparse.OptionParser(USAGE_STRING) + parser.add_option('-b', '--bucket', + help='Google Storage bucket to upload to.') + parser.add_option('-e', '--boto', help='Specify a custom boto file.') + parser.add_option('-f', '--force', action='store_true', + help='Force upload even if remote file exists.') + parser.add_option('-g', '--gsutil_path', default=GSUTIL_DEFAULT_PATH, + help='Path to the gsutil script.') + parser.add_option('-m', '--use_md5', action='store_true', + help='Generate MD5 files when scanning, and don\'t check ' + 'the MD5 checksum if a .md5 file is found.') + parser.add_option('-t', '--num_threads', default=1, type='int', + help='Number of uploader threads to run.') + parser.add_option('-s', '--skip_hashing', action='store_true', + help='Skip hashing if .sha1 file exists.') + parser.add_option('-0', '--use_null_terminator', action='store_true', + help='Use \\0 instead of \\n when parsing ' + 'the file list from stdin. This is useful if the input ' + 'is coming from "find ... -print0".') + (options, args) = parser.parse_args() + + # Enumerate our inputs. + input_filenames = get_targets(args, parser, options.use_null_terminator) + + # Make sure we can find a working instance of gsutil. + if os.path.exists(GSUTIL_DEFAULT_PATH): + gsutil = Gsutil(GSUTIL_DEFAULT_PATH) + else: + gsutil = None + for path in os.environ["PATH"].split(os.pathsep): + if os.path.exists(path) and 'gsutil' in os.listdir(path): + gsutil = Gsutil(os.path.join(path, 'gsutil')) + if not gsutil: + parser.error('gsutil not found in %s, bad depot_tools checkout?' % + GSUTIL_DEFAULT_PATH) + + # Check we have a valid bucket with valid permissions. + base_url, code = check_bucket_permissions(options.bucket, gsutil) + if code: + return code + + return upload_to_google_storage( + input_filenames, base_url, gsutil, options.force, options.use_md5, + options.num_threads, options.skip_hashing) + + +if __name__ == '__main__': + sys.exit(main(sys.argv)) \ No newline at end of file