You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
726 lines
30 KiB
Python
726 lines
30 KiB
Python
# Copyright 2010 Google Inc. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Base class for gsutil commands.
|
|
|
|
In addition to base class code, this file contains helpers that depend on base
|
|
class state (such as GetAclCommandHelper, which depends on self.gsutil_bin_dir,
|
|
self.bucket_storage_uri_class, etc.) In general, functions that depend on class
|
|
state and that are used by multiple commands belong in this file. Functions that
|
|
don't depend on class state belong in util.py, and non-shared helpers belong in
|
|
individual subclasses.
|
|
"""
|
|
|
|
import boto
|
|
import getopt
|
|
import gslib
|
|
import logging
|
|
import multiprocessing
|
|
import os
|
|
import platform
|
|
import re
|
|
import sys
|
|
import wildcard_iterator
|
|
import xml.dom.minidom
|
|
|
|
from boto import handler
|
|
from boto.storage_uri import StorageUri
|
|
from getopt import GetoptError
|
|
from gslib import util
|
|
from gslib.exception import CommandException
|
|
from gslib.help_provider import HelpProvider
|
|
from gslib.name_expansion import NameExpansionIterator
|
|
from gslib.name_expansion import NameExpansionIteratorQueue
|
|
from gslib.project_id import ProjectIdHandler
|
|
from gslib.storage_uri_builder import StorageUriBuilder
|
|
from gslib.thread_pool import ThreadPool
|
|
from gslib.util import HAVE_OAUTH2
|
|
from gslib.util import NO_MAX
|
|
|
|
from gslib.wildcard_iterator import ContainsWildcard
|
|
|
|
|
|
def _ThreadedLogger():
|
|
"""Creates a logger that resembles 'print' output, but is thread safe.
|
|
|
|
The logger will display all messages logged with level INFO or above. Log
|
|
propagation is disabled.
|
|
|
|
Returns:
|
|
A logger object.
|
|
"""
|
|
log = logging.getLogger('threaded-logging')
|
|
log.propagate = False
|
|
log.setLevel(logging.INFO)
|
|
log_handler = logging.StreamHandler()
|
|
log_handler.setFormatter(logging.Formatter('%(message)s'))
|
|
log.addHandler(log_handler)
|
|
return log
|
|
|
|
# command_spec key constants.
|
|
COMMAND_NAME = 'command_name'
|
|
COMMAND_NAME_ALIASES = 'command_name_aliases'
|
|
MIN_ARGS = 'min_args'
|
|
MAX_ARGS = 'max_args'
|
|
SUPPORTED_SUB_ARGS = 'supported_sub_args'
|
|
FILE_URIS_OK = 'file_uri_ok'
|
|
PROVIDER_URIS_OK = 'provider_uri_ok'
|
|
URIS_START_ARG = 'uris_start_arg'
|
|
CONFIG_REQUIRED = 'config_required'
|
|
|
|
_EOF_NAME_EXPANSION_RESULT = ("EOF")
|
|
|
|
|
|
class Command(object):
|
|
# Global instance of a threaded logger object.
|
|
THREADED_LOGGER = _ThreadedLogger()
|
|
|
|
REQUIRED_SPEC_KEYS = [COMMAND_NAME]
|
|
|
|
# Each subclass must define the following map, minimally including the
|
|
# keys in REQUIRED_SPEC_KEYS; other values below will be used as defaults,
|
|
# although for readbility subclasses should specify the complete map.
|
|
command_spec = {
|
|
# Name of command.
|
|
COMMAND_NAME : None,
|
|
# List of command name aliases.
|
|
COMMAND_NAME_ALIASES : [],
|
|
# Min number of args required by this command.
|
|
MIN_ARGS : 0,
|
|
# Max number of args required by this command, or NO_MAX.
|
|
MAX_ARGS : NO_MAX,
|
|
# Getopt-style string specifying acceptable sub args.
|
|
SUPPORTED_SUB_ARGS : '',
|
|
# True if file URIs are acceptable for this command.
|
|
FILE_URIS_OK : False,
|
|
# True if provider-only URIs are acceptable for this command.
|
|
PROVIDER_URIS_OK : False,
|
|
# Index in args of first URI arg.
|
|
URIS_START_ARG : 0,
|
|
# True if must configure gsutil before running command.
|
|
CONFIG_REQUIRED : True,
|
|
}
|
|
_default_command_spec = command_spec
|
|
help_spec = HelpProvider.help_spec
|
|
|
|
"""Define an empty test specification, which derived classes must populate.
|
|
|
|
This is a list of tuples containing the following values:
|
|
|
|
step_name - mnemonic name for test, displayed when test is run
|
|
cmd_line - shell command line to run test
|
|
expect_ret or None - expected return code from test (None means ignore)
|
|
(result_file, expect_file) or None - tuple of result file and expected
|
|
file to diff for additional test
|
|
verification beyond the return code
|
|
(None means no diff requested)
|
|
Notes:
|
|
|
|
- Setting expected_ret to None means there is no expectation and,
|
|
hence, any returned value will pass.
|
|
|
|
- Any occurrences of the string 'gsutil' in the cmd_line parameter
|
|
are expanded to the full path to the gsutil command under test.
|
|
|
|
- The cmd_line, result_file and expect_file parameters may
|
|
contain the following special substrings:
|
|
|
|
$Bn - converted to one of 10 unique-for-testing bucket names (n=0..9)
|
|
$On - converted to one of 10 unique-for-testing object names (n=0..9)
|
|
$Fn - converted to one of 10 unique-for-testing file names (n=0..9)
|
|
$G - converted to the directory where gsutil is installed. Useful for
|
|
referencing test data.
|
|
|
|
- The generated file names are full pathnames, whereas the generated
|
|
bucket and object names are simple relative names.
|
|
|
|
- Tests with a non-None result_file and expect_file automatically
|
|
trigger an implicit diff of the two files.
|
|
|
|
- These test specifications, in combination with the conversion strings
|
|
allow tests to be constructed parametrically. For example, here's an
|
|
annotated subset of a test_steps for the cp command:
|
|
|
|
# Copy local file to object, verify 0 return code.
|
|
('simple cp', 'gsutil cp $F1 gs://$B1/$O1', 0, None, None),
|
|
# Copy uploaded object back to local file and diff vs. orig file.
|
|
('verify cp', 'gsutil cp gs://$B1/$O1 $F2', 0, '$F2', '$F1'),
|
|
|
|
- After pattern substitution, the specs are run sequentially, in the
|
|
order in which they appear in the test_steps list.
|
|
"""
|
|
test_steps = []
|
|
|
|
# Define a convenience property for command name, since it's used many places.
|
|
def _GetDefaultCommandName(self):
|
|
return self.command_spec[COMMAND_NAME]
|
|
command_name = property(_GetDefaultCommandName)
|
|
|
|
def __init__(self, command_runner, args, headers, debug, parallel_operations,
|
|
gsutil_bin_dir, boto_lib_dir, config_file_list, gsutil_ver,
|
|
bucket_storage_uri_class, test_method=None,
|
|
bypass_prodaccess=True):
|
|
"""
|
|
Args:
|
|
command_runner: CommandRunner (for commands built atop other commands).
|
|
args: Command-line args (arg0 = actual arg, not command name ala bash).
|
|
headers: Dictionary containing optional HTTP headers to pass to boto.
|
|
debug: Debug level to pass in to boto connection (range 0..3).
|
|
parallel_operations: Should command operations be executed in parallel?
|
|
gsutil_bin_dir: Bin dir from which gsutil is running.
|
|
boto_lib_dir: Lib dir where boto runs.
|
|
config_file_list: Config file list returned by _GetBotoConfigFileList().
|
|
gsutil_ver: Version string of currently running gsutil command.
|
|
bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
|
|
Settable for testing/mocking.
|
|
test_method: Optional general purpose method for testing purposes.
|
|
Application and semantics of this method will vary by
|
|
command and test type.
|
|
bypass_prodaccess: Boolean to ignore the existance of prodaccess.
|
|
|
|
Implementation note: subclasses shouldn't need to define an __init__
|
|
method, and instead depend on the shared initialization that happens
|
|
here. If you do define an __init__ method in a subclass you'll need to
|
|
explicitly call super().__init__(). But you're encouraged not to do this,
|
|
because it will make changing the __init__ interface more painful.
|
|
"""
|
|
# Save class values from constructor params.
|
|
self.command_runner = command_runner
|
|
self.args = args
|
|
self.unparsed_args = args
|
|
self.headers = headers
|
|
self.debug = debug
|
|
self.parallel_operations = parallel_operations
|
|
self.gsutil_bin_dir = gsutil_bin_dir
|
|
self.boto_lib_dir = boto_lib_dir
|
|
self.config_file_list = config_file_list
|
|
self.gsutil_ver = gsutil_ver
|
|
self.bucket_storage_uri_class = bucket_storage_uri_class
|
|
self.test_method = test_method
|
|
self.exclude_symlinks = False
|
|
self.recursion_requested = False
|
|
self.all_versions = False
|
|
self.bypass_prodaccess = bypass_prodaccess
|
|
|
|
# Process sub-command instance specifications.
|
|
# First, ensure subclass implementation sets all required keys.
|
|
for k in self.REQUIRED_SPEC_KEYS:
|
|
if k not in self.command_spec or self.command_spec[k] is None:
|
|
raise CommandException('"%s" command implementation is missing %s '
|
|
'specification' % (self.command_name, k))
|
|
# Now override default command_spec with subclass-specified values.
|
|
tmp = self._default_command_spec
|
|
tmp.update(self.command_spec)
|
|
self.command_spec = tmp
|
|
del tmp
|
|
|
|
# Make sure command provides a test specification.
|
|
if not self.test_steps:
|
|
# TODO: Uncomment following lines when test feature is ready.
|
|
#raise CommandException('"%s" command implementation is missing test '
|
|
#'specification' % self.command_name)
|
|
pass
|
|
|
|
# Parse and validate args.
|
|
try:
|
|
(self.sub_opts, self.args) = getopt.getopt(
|
|
args, self.command_spec[SUPPORTED_SUB_ARGS])
|
|
except GetoptError, e:
|
|
raise CommandException('%s for "%s" command.' % (e.msg,
|
|
self.command_name))
|
|
if (len(self.args) < self.command_spec[MIN_ARGS]
|
|
or len(self.args) > self.command_spec[MAX_ARGS]):
|
|
raise CommandException('Wrong number of arguments for "%s" command.' %
|
|
self.command_name)
|
|
if (not self.command_spec[FILE_URIS_OK]
|
|
and self.HaveFileUris(self.args[self.command_spec[URIS_START_ARG]:])):
|
|
raise CommandException('"%s" command does not support "file://" URIs. '
|
|
'Did you mean to use a gs:// URI?' %
|
|
self.command_name)
|
|
if (not self.command_spec[PROVIDER_URIS_OK]
|
|
and self._HaveProviderUris(
|
|
self.args[self.command_spec[URIS_START_ARG]:])):
|
|
raise CommandException('"%s" command does not support provider-only '
|
|
'URIs.' % self.command_name)
|
|
if self.command_spec[CONFIG_REQUIRED]:
|
|
self._ConfigureNoOpAuthIfNeeded()
|
|
|
|
self.proj_id_handler = ProjectIdHandler()
|
|
self.suri_builder = StorageUriBuilder(debug, bucket_storage_uri_class)
|
|
|
|
# Cross-platform path to run gsutil binary.
|
|
self.gsutil_cmd = ''
|
|
# Cross-platform list containing gsutil path for use with subprocess.
|
|
self.gsutil_exec_list = []
|
|
# If running on Windows, invoke python interpreter explicitly.
|
|
if platform.system() == "Windows":
|
|
self.gsutil_cmd += 'python '
|
|
self.gsutil_exec_list += ['python']
|
|
# Add full path to gsutil to make sure we test the correct version.
|
|
self.gsutil_path = os.path.join(self.gsutil_bin_dir, 'gsutil')
|
|
self.gsutil_cmd += self.gsutil_path
|
|
self.gsutil_exec_list += [self.gsutil_path]
|
|
|
|
# We're treating recursion_requested like it's used by all commands, but
|
|
# only some of the commands accept the -R option.
|
|
if self.sub_opts:
|
|
for o, unused_a in self.sub_opts:
|
|
if o == '-r' or o == '-R':
|
|
self.recursion_requested = True
|
|
break
|
|
|
|
def WildcardIterator(self, uri_or_str, all_versions=False):
|
|
"""
|
|
Helper to instantiate gslib.WildcardIterator. Args are same as
|
|
gslib.WildcardIterator interface, but this method fills in most of the
|
|
values from instance state.
|
|
|
|
Args:
|
|
uri_or_str: StorageUri or URI string naming wildcard objects to iterate.
|
|
"""
|
|
return wildcard_iterator.wildcard_iterator(
|
|
uri_or_str, self.proj_id_handler,
|
|
bucket_storage_uri_class=self.bucket_storage_uri_class,
|
|
all_versions=all_versions,
|
|
headers=self.headers, debug=self.debug)
|
|
|
|
def RunCommand(self):
|
|
"""Abstract function in base class. Subclasses must implement this. The
|
|
return value of this function will be used as the exit status of the
|
|
process, so subclass commands should return an integer exit code (0 for
|
|
success, a value in [1,255] for failure).
|
|
"""
|
|
raise CommandException('Command %s is missing its RunCommand() '
|
|
'implementation' % self.command_name)
|
|
|
|
############################################################
|
|
# Shared helper functions that depend on base class state. #
|
|
############################################################
|
|
|
|
def UrisAreForSingleProvider(self, uri_args):
|
|
"""Tests whether the uris are all for a single provider.
|
|
|
|
Returns: a StorageUri for one of the uris on success, None on failure.
|
|
"""
|
|
provider = None
|
|
uri = None
|
|
for uri_str in uri_args:
|
|
# validate=False because we allow wildcard uris.
|
|
uri = boto.storage_uri(
|
|
uri_str, debug=self.debug, validate=False,
|
|
bucket_storage_uri_class=self.bucket_storage_uri_class)
|
|
if not provider:
|
|
provider = uri.scheme
|
|
elif uri.scheme != provider:
|
|
return None
|
|
return uri
|
|
|
|
def SetAclCommandHelper(self):
|
|
"""
|
|
Common logic for setting ACLs. Sets the standard ACL or the default
|
|
object ACL depending on self.command_name.
|
|
"""
|
|
|
|
acl_arg = self.args[0]
|
|
uri_args = self.args[1:]
|
|
# Disallow multi-provider setacl requests, because there are differences in
|
|
# the ACL models.
|
|
storage_uri = self.UrisAreForSingleProvider(uri_args)
|
|
if not storage_uri:
|
|
raise CommandException('"%s" command spanning providers not allowed.' %
|
|
self.command_name)
|
|
|
|
# Determine whether acl_arg names a file containing XML ACL text vs. the
|
|
# string name of a canned ACL.
|
|
if os.path.isfile(acl_arg):
|
|
acl_file = open(acl_arg, 'r')
|
|
acl_arg = acl_file.read()
|
|
|
|
# TODO: Remove this workaround when GCS allows
|
|
# whitespace in the Permission element on the server-side
|
|
acl_arg = re.sub(r'<Permission>\s*(\S+)\s*</Permission>',
|
|
r'<Permission>\1</Permission>', acl_arg)
|
|
|
|
acl_file.close()
|
|
self.canned = False
|
|
else:
|
|
# No file exists, so expect a canned ACL string.
|
|
canned_acls = storage_uri.canned_acls()
|
|
if acl_arg not in canned_acls:
|
|
raise CommandException('Invalid canned ACL "%s".' % acl_arg)
|
|
self.canned = True
|
|
|
|
# Used to track if any ACLs failed to be set.
|
|
self.everything_set_okay = True
|
|
|
|
def _SetAclExceptionHandler(e):
|
|
"""Simple exception handler to allow post-completion status."""
|
|
self.THREADED_LOGGER.error(str(e))
|
|
self.everything_set_okay = False
|
|
|
|
def _SetAclFunc(name_expansion_result):
|
|
exp_src_uri = self.suri_builder.StorageUri(
|
|
name_expansion_result.GetExpandedUriStr())
|
|
# We don't do bucket operations multi-threaded (see comment below).
|
|
assert self.command_name != 'setdefacl'
|
|
self.THREADED_LOGGER.info('Setting ACL on %s...' %
|
|
name_expansion_result.expanded_uri_str)
|
|
if self.canned:
|
|
exp_src_uri.set_acl(acl_arg, exp_src_uri.object_name, False,
|
|
self.headers)
|
|
else:
|
|
exp_src_uri.set_xml_acl(acl_arg, exp_src_uri.object_name, False,
|
|
self.headers)
|
|
|
|
# If user specified -R option, convert any bucket args to bucket wildcards
|
|
# (e.g., gs://bucket/*), to prevent the operation from being applied to
|
|
# the buckets themselves.
|
|
if self.recursion_requested:
|
|
for i in range(len(uri_args)):
|
|
uri = self.suri_builder.StorageUri(uri_args[i])
|
|
if uri.names_bucket():
|
|
uri_args[i] = uri.clone_replace_name('*').uri
|
|
else:
|
|
# Handle bucket ACL setting operations single-threaded, because
|
|
# our threading machinery currently assumes it's working with objects
|
|
# (name_expansion_iterator), and normally we wouldn't expect users to need
|
|
# to set ACLs on huge numbers of buckets at once anyway.
|
|
for i in range(len(uri_args)):
|
|
uri_str = uri_args[i]
|
|
if self.suri_builder.StorageUri(uri_str).names_bucket():
|
|
self._RunSingleThreadedSetAcl(acl_arg, uri_args)
|
|
return
|
|
|
|
name_expansion_iterator = NameExpansionIterator(
|
|
self.command_name, self.proj_id_handler, self.headers, self.debug,
|
|
self.bucket_storage_uri_class, uri_args, self.recursion_requested,
|
|
self.recursion_requested, all_versions=self.all_versions)
|
|
|
|
# Perform requests in parallel (-m) mode, if requested, using
|
|
# configured number of parallel processes and threads. Otherwise,
|
|
# perform requests with sequential function calls in current process.
|
|
self.Apply(_SetAclFunc, name_expansion_iterator, _SetAclExceptionHandler)
|
|
|
|
if not self.everything_set_okay:
|
|
raise CommandException('ACLs for some objects could not be set.')
|
|
|
|
def _RunSingleThreadedSetAcl(self, acl_arg, uri_args):
|
|
some_matched = False
|
|
for uri_str in uri_args:
|
|
for blr in self.WildcardIterator(uri_str):
|
|
if blr.HasPrefix():
|
|
continue
|
|
some_matched = True
|
|
uri = blr.GetUri()
|
|
if self.command_name == 'setdefacl':
|
|
print 'Setting default object ACL on %s...' % uri
|
|
if self.canned:
|
|
uri.set_def_acl(acl_arg, uri.object_name, False, self.headers)
|
|
else:
|
|
uri.set_def_xml_acl(acl_arg, False, self.headers)
|
|
else:
|
|
print 'Setting ACL on %s...' % uri
|
|
if self.canned:
|
|
uri.set_acl(acl_arg, uri.object_name, False, self.headers)
|
|
else:
|
|
uri.set_xml_acl(acl_arg, uri.object_name, False, self.headers)
|
|
if not some_matched:
|
|
raise CommandException('No URIs matched')
|
|
|
|
def GetAclCommandHelper(self):
|
|
"""Common logic for getting ACLs. Gets the standard ACL or the default
|
|
object ACL depending on self.command_name."""
|
|
|
|
# Resolve to just one object.
|
|
# Handle wildcard-less URI specially in case this is a version-specific
|
|
# URI, because WildcardIterator().IterUris() would lose the versioning info.
|
|
if not ContainsWildcard(self.args[0]):
|
|
uri = self.suri_builder.StorageUri(self.args[0])
|
|
else:
|
|
uris = list(self.WildcardIterator(self.args[0]).IterUris())
|
|
if len(uris) == 0:
|
|
raise CommandException('No URIs matched')
|
|
if len(uris) != 1:
|
|
raise CommandException('%s matched more than one URI, which is not '
|
|
'allowed by the %s command' % (self.args[0], self.command_name))
|
|
uri = uris[0]
|
|
if not uri.names_bucket() and not uri.names_object():
|
|
raise CommandException('"%s" command must specify a bucket or '
|
|
'object.' % self.command_name)
|
|
if self.command_name == 'getdefacl':
|
|
acl = uri.get_def_acl(False, self.headers)
|
|
else:
|
|
acl = uri.get_acl(False, self.headers)
|
|
# Pretty-print the XML to make it more easily human editable.
|
|
parsed_xml = xml.dom.minidom.parseString(acl.to_xml().encode('utf-8'))
|
|
print parsed_xml.toprettyxml(indent=' ')
|
|
|
|
def GetXmlSubresource(self, subresource, uri_arg):
|
|
"""Print an xml subresource, e.g. logging, for a bucket/object.
|
|
|
|
Args:
|
|
subresource: The subresource name.
|
|
uri_arg: URI for the bucket/object. Wildcards will be expanded.
|
|
|
|
Raises:
|
|
CommandException: if errors encountered.
|
|
"""
|
|
# Wildcarding is allowed but must resolve to just one bucket.
|
|
uris = list(self.WildcardIterator(uri_arg).IterUris())
|
|
if len(uris) != 1:
|
|
raise CommandException('Wildcards must resolve to exactly one item for '
|
|
'get %s' % subresource)
|
|
uri = uris[0]
|
|
xml_str = uri.get_subresource(subresource, False, self.headers)
|
|
# Pretty-print the XML to make it more easily human editable.
|
|
parsed_xml = xml.dom.minidom.parseString(xml_str.encode('utf-8'))
|
|
print parsed_xml.toprettyxml(indent=' ')
|
|
|
|
def Apply(self, func, name_expansion_iterator, thr_exc_handler,
|
|
shared_attrs=None):
|
|
"""Dispatch input URI assignments across a pool of parallel OS
|
|
processes and/or Python threads, based on options (-m or not)
|
|
and settings in the user's config file. If non-parallel mode
|
|
or only one OS process requested, execute requests sequentially
|
|
in the current OS process.
|
|
|
|
Args:
|
|
func: Function to call to process each URI.
|
|
name_expansion_iterator: Iterator of NameExpansionResult.
|
|
thr_exc_handler: Exception handler for ThreadPool class.
|
|
shared_attrs: List of attributes to manage across sub-processes.
|
|
|
|
Raises:
|
|
CommandException if invalid config encountered.
|
|
"""
|
|
|
|
# Set OS process and python thread count as a function of options
|
|
# and config.
|
|
if self.parallel_operations:
|
|
process_count = boto.config.getint(
|
|
'GSUtil', 'parallel_process_count',
|
|
gslib.commands.config.DEFAULT_PARALLEL_PROCESS_COUNT)
|
|
if process_count < 1:
|
|
raise CommandException('Invalid parallel_process_count "%d".' %
|
|
process_count)
|
|
thread_count = boto.config.getint(
|
|
'GSUtil', 'parallel_thread_count',
|
|
gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT)
|
|
if thread_count < 1:
|
|
raise CommandException('Invalid parallel_thread_count "%d".' %
|
|
thread_count)
|
|
else:
|
|
# If -m not specified, then assume 1 OS process and 1 Python thread.
|
|
process_count = 1
|
|
thread_count = 1
|
|
|
|
if self.debug:
|
|
self.THREADED_LOGGER.info('process count: %d', process_count)
|
|
self.THREADED_LOGGER.info('thread count: %d', thread_count)
|
|
|
|
if self.parallel_operations and process_count > 1:
|
|
procs = []
|
|
# If any shared attributes passed by caller, create a dictionary of
|
|
# shared memory variables for every element in the list of shared
|
|
# attributes.
|
|
shared_vars = None
|
|
if shared_attrs:
|
|
for name in shared_attrs:
|
|
if not shared_vars:
|
|
shared_vars = {}
|
|
shared_vars[name] = multiprocessing.Value('i', 0)
|
|
# Construct work queue for parceling out work to multiprocessing workers,
|
|
# setting the max queue length of 50k so we will block if workers don't
|
|
# empty the queue as fast as we can continue iterating over the bucket
|
|
# listing. This number may need tuning; it should be large enough to
|
|
# keep workers busy (overlapping bucket list next-page retrieval with
|
|
# operations being fed from the queue) but small enough that we don't
|
|
# overfill memory when runing across a slow network link.
|
|
work_queue = multiprocessing.Queue(50000)
|
|
for shard in range(process_count):
|
|
# Spawn a separate OS process for each shard.
|
|
if self.debug:
|
|
self.THREADED_LOGGER.info('spawning process for shard %d', shard)
|
|
p = multiprocessing.Process(target=self._ApplyThreads,
|
|
args=(func, work_queue, shard,
|
|
thread_count, thr_exc_handler,
|
|
shared_vars))
|
|
procs.append(p)
|
|
p.start()
|
|
|
|
last_name_expansion_result = None
|
|
try:
|
|
# Feed all work into the queue being emptied by the workers.
|
|
for name_expansion_result in name_expansion_iterator:
|
|
last_name_expansion_result = name_expansion_result
|
|
work_queue.put(name_expansion_result)
|
|
except:
|
|
sys.stderr.write('Failed URI iteration. Last result (prior to '
|
|
'exception) was: %s\n'
|
|
% repr(last_name_expansion_result))
|
|
finally:
|
|
# We do all of the process cleanup in a finally cause in case the name
|
|
# expansion iterator throws an exception. This will send EOF to all the
|
|
# child processes and join them back into the parent process.
|
|
|
|
# Send an EOF per worker.
|
|
for shard in range(process_count):
|
|
work_queue.put(_EOF_NAME_EXPANSION_RESULT)
|
|
|
|
# Wait for all spawned OS processes to finish.
|
|
failed_process_count = 0
|
|
for p in procs:
|
|
p.join()
|
|
# Count number of procs that returned non-zero exit code.
|
|
if p.exitcode != 0:
|
|
failed_process_count += 1
|
|
|
|
# Propagate shared variables back to caller's attributes.
|
|
if shared_vars:
|
|
for (name, var) in shared_vars.items():
|
|
setattr(self, name, var.value)
|
|
|
|
# Abort main process if one or more sub-processes failed. Note that this
|
|
# is outside the finally clause, because we only want to raise a new
|
|
# exception if an exception wasn't already raised in the try clause above.
|
|
if failed_process_count:
|
|
plural_str = ''
|
|
if failed_process_count > 1:
|
|
plural_str = 'es'
|
|
raise Exception('unexpected failure in %d sub-process%s, '
|
|
'aborting...' % (failed_process_count, plural_str))
|
|
|
|
else:
|
|
# Using just 1 process, so funnel results to _ApplyThreads using facade
|
|
# that makes NameExpansionIterator look like a Multiprocessing.Queue
|
|
# that sends one EOF once the iterator empties.
|
|
work_queue = NameExpansionIteratorQueue(name_expansion_iterator,
|
|
_EOF_NAME_EXPANSION_RESULT)
|
|
self._ApplyThreads(func, work_queue, 0, thread_count, thr_exc_handler,
|
|
None)
|
|
|
|
def HaveFileUris(self, args_to_check):
|
|
"""Checks whether args_to_check contain any file URIs.
|
|
|
|
Args:
|
|
args_to_check: Command-line argument subset to check.
|
|
|
|
Returns:
|
|
True if args_to_check contains any file URIs.
|
|
"""
|
|
for uri_str in args_to_check:
|
|
if uri_str.lower().startswith('file://') or uri_str.find(':') == -1:
|
|
return True
|
|
return False
|
|
|
|
######################
|
|
# Private functions. #
|
|
######################
|
|
|
|
def _HaveProviderUris(self, args_to_check):
|
|
"""Checks whether args_to_check contains any provider URIs (like 'gs://').
|
|
|
|
Args:
|
|
args_to_check: Command-line argument subset to check.
|
|
|
|
Returns:
|
|
True if args_to_check contains any provider URIs.
|
|
"""
|
|
for uri_str in args_to_check:
|
|
if re.match('^[a-z]+://$', uri_str):
|
|
return True
|
|
return False
|
|
|
|
def _ConfigureNoOpAuthIfNeeded(self):
|
|
"""Sets up no-op auth handler if no boto credentials are configured."""
|
|
config = boto.config
|
|
if not util.HasConfiguredCredentials(self.bypass_prodaccess):
|
|
if self.config_file_list:
|
|
if (config.has_option('Credentials', 'gs_oauth2_refresh_token')
|
|
and not HAVE_OAUTH2):
|
|
raise CommandException(
|
|
'Your gsutil is configured with OAuth2 authentication '
|
|
'credentials.\nHowever, OAuth2 is only supported when running '
|
|
'under Python 2.6 or later\n(unless additional dependencies are '
|
|
'installed, see README for details); you are running Python %s.' %
|
|
sys.version)
|
|
raise CommandException('You have no storage service credentials in any '
|
|
'of the following boto config\nfiles. Please '
|
|
'add your credentials as described in the '
|
|
'gsutil README file, or else\nre-run '
|
|
'"gsutil config" to re-create a config '
|
|
'file:\n%s' % self.config_file_list)
|
|
else:
|
|
# With no boto config file the user can still access publicly readable
|
|
# buckets and objects.
|
|
from gslib import no_op_auth_plugin
|
|
|
|
def _ApplyThreads(self, func, work_queue, shard, num_threads,
|
|
thr_exc_handler=None, shared_vars=None):
|
|
"""
|
|
Perform subset of required requests across a caller specified
|
|
number of parallel Python threads, which may be one, in which
|
|
case the requests are processed in the current thread.
|
|
|
|
Args:
|
|
func: Function to call for each request.
|
|
work_queue: shared queue of NameExpansionResult to process.
|
|
shard: Assigned subset (shard number) for this function.
|
|
num_threads: Number of Python threads to spawn to process this shard.
|
|
thr_exc_handler: Exception handler for ThreadPool class.
|
|
shared_vars: Dict of shared memory variables to be managed.
|
|
(only relevant, and non-None, if this function is
|
|
run in a separate OS process).
|
|
"""
|
|
# Each OS process needs to establish its own set of connections to
|
|
# the server to avoid writes from different OS processes interleaving
|
|
# onto the same socket (and garbling the underlying SSL session).
|
|
# We ensure each process gets its own set of connections here by
|
|
# closing all connections in the storage provider connection pool.
|
|
connection_pool = StorageUri.provider_pool
|
|
if connection_pool:
|
|
for i in connection_pool:
|
|
connection_pool[i].connection.close()
|
|
|
|
if num_threads > 1:
|
|
thread_pool = ThreadPool(num_threads, thr_exc_handler)
|
|
try:
|
|
while True: # Loop until we hit EOF marker.
|
|
name_expansion_result = work_queue.get()
|
|
if name_expansion_result == _EOF_NAME_EXPANSION_RESULT:
|
|
break
|
|
exp_src_uri = self.suri_builder.StorageUri(
|
|
name_expansion_result.GetExpandedUriStr())
|
|
if self.debug:
|
|
self.THREADED_LOGGER.info('process %d shard %d is handling uri %s',
|
|
os.getpid(), shard, exp_src_uri)
|
|
if (self.exclude_symlinks and exp_src_uri.is_file_uri()
|
|
and os.path.islink(exp_src_uri.object_name)):
|
|
self.THREADED_LOGGER.info('Skipping symbolic link %s...', exp_src_uri)
|
|
elif num_threads > 1:
|
|
thread_pool.AddTask(func, name_expansion_result)
|
|
else:
|
|
func(name_expansion_result)
|
|
# If any Python threads created, wait here for them to finish.
|
|
if num_threads > 1:
|
|
thread_pool.WaitCompletion()
|
|
finally:
|
|
if num_threads > 1:
|
|
thread_pool.Shutdown()
|
|
# If any shared variables (which means we are running in a separate OS
|
|
# process), increment value for each shared variable.
|
|
if shared_vars:
|
|
for (name, var) in shared_vars.items():
|
|
var.value += getattr(self, name)
|