You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			340 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			340 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
# Copyright 2010 Google Inc.
 | 
						|
#
 | 
						|
# Permission is hereby granted, free of charge, to any person obtaining a
 | 
						|
# copy of this software and associated documentation files (the
 | 
						|
# "Software"), to deal in the Software without restriction, including
 | 
						|
# without limitation the rights to use, copy, modify, merge, publish, dis-
 | 
						|
# tribute, sublicense, and/or sell copies of the Software, and to permit
 | 
						|
# persons to whom the Software is furnished to do so, subject to the fol-
 | 
						|
# lowing conditions:
 | 
						|
#
 | 
						|
# The above copyright notice and this permission notice shall be included
 | 
						|
# in all copies or substantial portions of the Software.
 | 
						|
#
 | 
						|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 | 
						|
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
 | 
						|
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
 | 
						|
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
 | 
						|
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 | 
						|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 | 
						|
# IN THE SOFTWARE.
 | 
						|
 | 
						|
import errno
 | 
						|
import httplib
 | 
						|
import os
 | 
						|
import re
 | 
						|
import socket
 | 
						|
import time
 | 
						|
import boto
 | 
						|
from boto import config, storage_uri_for_key
 | 
						|
from boto.connection import AWSAuthConnection
 | 
						|
from boto.exception import ResumableDownloadException
 | 
						|
from boto.exception import ResumableTransferDisposition
 | 
						|
from boto.s3.keyfile import KeyFile
 | 
						|
 | 
						|
"""
 | 
						|
Resumable download handler.
 | 
						|
 | 
						|
Resumable downloads will retry failed downloads, resuming at the byte count
 | 
						|
completed by the last download attempt. If too many retries happen with no
 | 
						|
progress (per configurable num_retries param), the download will be aborted.
 | 
						|
 | 
						|
The caller can optionally specify a tracker_file_name param in the
 | 
						|
ResumableDownloadHandler constructor. If you do this, that file will
 | 
						|
save the state needed to allow retrying later, in a separate process
 | 
						|
(e.g., in a later run of gsutil).
 | 
						|
 | 
						|
Note that resumable downloads work across providers (they depend only
 | 
						|
on support Range GETs), but this code is in the boto.s3 package
 | 
						|
because it is the wrong abstraction level to go in the top-level boto
 | 
						|
package.
 | 
						|
 | 
						|
TODO: At some point we should refactor the code to have a storage_service
 | 
						|
package where all these provider-independent files go.
 | 
						|
"""
 | 
						|
 | 
						|
 | 
						|
class ByteTranslatingCallbackHandler(object):
 | 
						|
    """
 | 
						|
    Proxy class that translates progress callbacks made by
 | 
						|
    boto.s3.Key.get_file(), taking into account that we're resuming
 | 
						|
    a download.
 | 
						|
    """
 | 
						|
    def __init__(self, proxied_cb, download_start_point):
 | 
						|
        self.proxied_cb = proxied_cb
 | 
						|
        self.download_start_point = download_start_point
 | 
						|
 | 
						|
    def call(self, total_bytes_uploaded, total_size):
 | 
						|
        self.proxied_cb(self.download_start_point + total_bytes_uploaded,
 | 
						|
                        total_size)
 | 
						|
 | 
						|
 | 
						|
def get_cur_file_size(fp, position_to_eof=False):
 | 
						|
    """
 | 
						|
    Returns size of file, optionally leaving fp positioned at EOF.
 | 
						|
    """
 | 
						|
    if isinstance(fp, KeyFile) and not position_to_eof:
 | 
						|
        # Avoid EOF seek for KeyFile case as it's very inefficient.
 | 
						|
        return fp.getkey().size
 | 
						|
    if not position_to_eof:
 | 
						|
        cur_pos = fp.tell()
 | 
						|
    fp.seek(0, os.SEEK_END)
 | 
						|
    cur_file_size = fp.tell()
 | 
						|
    if not position_to_eof:
 | 
						|
        fp.seek(cur_pos, os.SEEK_SET)
 | 
						|
    return cur_file_size
 | 
						|
 | 
						|
 | 
						|
class ResumableDownloadHandler(object):
 | 
						|
    """
 | 
						|
    Handler for resumable downloads.
 | 
						|
    """
 | 
						|
 | 
						|
    ETAG_REGEX = '([a-z0-9]{32})\n'
 | 
						|
 | 
						|
    RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error,
 | 
						|
                            socket.gaierror)
 | 
						|
 | 
						|
    def __init__(self, tracker_file_name=None, num_retries=None):
 | 
						|
        """
 | 
						|
        Constructor. Instantiate once for each downloaded file.
 | 
						|
 | 
						|
        :type tracker_file_name: string
 | 
						|
        :param tracker_file_name: optional file name to save tracking info
 | 
						|
            about this download. If supplied and the current process fails
 | 
						|
            the download, it can be retried in a new process. If called
 | 
						|
            with an existing file containing an unexpired timestamp,
 | 
						|
            we'll resume the transfer for this file; else we'll start a
 | 
						|
            new resumable download.
 | 
						|
 | 
						|
        :type num_retries: int
 | 
						|
        :param num_retries: the number of times we'll re-try a resumable
 | 
						|
            download making no progress. (Count resets every time we get
 | 
						|
            progress, so download can span many more than this number of
 | 
						|
            retries.)
 | 
						|
        """
 | 
						|
        self.tracker_file_name = tracker_file_name
 | 
						|
        self.num_retries = num_retries
 | 
						|
        self.etag_value_for_current_download = None
 | 
						|
        if tracker_file_name:
 | 
						|
            self._load_tracker_file_etag()
 | 
						|
        # Save download_start_point in instance state so caller can
 | 
						|
        # find how much was transferred by this ResumableDownloadHandler
 | 
						|
        # (across retries).
 | 
						|
        self.download_start_point = None
 | 
						|
 | 
						|
    def _load_tracker_file_etag(self):
 | 
						|
        f = None
 | 
						|
        try:
 | 
						|
            f = open(self.tracker_file_name, 'r')
 | 
						|
            etag_line = f.readline()
 | 
						|
            m = re.search(self.ETAG_REGEX, etag_line)
 | 
						|
            if m:
 | 
						|
                self.etag_value_for_current_download = m.group(1)
 | 
						|
            else:
 | 
						|
                print('Couldn\'t read etag in tracker file (%s). Restarting '
 | 
						|
                      'download from scratch.' % self.tracker_file_name)
 | 
						|
        except IOError, e:
 | 
						|
            # Ignore non-existent file (happens first time a download
 | 
						|
            # is attempted on an object), but warn user for other errors.
 | 
						|
            if e.errno != errno.ENOENT:
 | 
						|
                # Will restart because
 | 
						|
                # self.etag_value_for_current_download == None.
 | 
						|
                print('Couldn\'t read URI tracker file (%s): %s. Restarting '
 | 
						|
                      'download from scratch.' %
 | 
						|
                      (self.tracker_file_name, e.strerror))
 | 
						|
        finally:
 | 
						|
            if f:
 | 
						|
                f.close()
 | 
						|
 | 
						|
    def _save_tracker_info(self, key):
 | 
						|
        self.etag_value_for_current_download = key.etag.strip('"\'')
 | 
						|
        if not self.tracker_file_name:
 | 
						|
            return
 | 
						|
        f = None
 | 
						|
        try:
 | 
						|
            f = open(self.tracker_file_name, 'w')
 | 
						|
            f.write('%s\n' % self.etag_value_for_current_download)
 | 
						|
        except IOError, e:
 | 
						|
            raise ResumableDownloadException(
 | 
						|
                'Couldn\'t write tracker file (%s): %s.\nThis can happen'
 | 
						|
                'if you\'re using an incorrectly configured download tool\n'
 | 
						|
                '(e.g., gsutil configured to save tracker files to an '
 | 
						|
                'unwritable directory)' %
 | 
						|
                (self.tracker_file_name, e.strerror),
 | 
						|
                ResumableTransferDisposition.ABORT)
 | 
						|
        finally:
 | 
						|
            if f:
 | 
						|
                f.close()
 | 
						|
 | 
						|
    def _remove_tracker_file(self):
 | 
						|
        if (self.tracker_file_name and
 | 
						|
            os.path.exists(self.tracker_file_name)):
 | 
						|
                os.unlink(self.tracker_file_name)
 | 
						|
 | 
						|
    def _attempt_resumable_download(self, key, fp, headers, cb, num_cb,
 | 
						|
                                    torrent, version_id):
 | 
						|
        """
 | 
						|
        Attempts a resumable download.
 | 
						|
 | 
						|
        Raises ResumableDownloadException if any problems occur.
 | 
						|
        """
 | 
						|
        cur_file_size = get_cur_file_size(fp, position_to_eof=True)
 | 
						|
 | 
						|
        if (cur_file_size and 
 | 
						|
            self.etag_value_for_current_download and
 | 
						|
            self.etag_value_for_current_download == key.etag.strip('"\'')):
 | 
						|
            # Try to resume existing transfer.
 | 
						|
            if cur_file_size > key.size:
 | 
						|
              raise ResumableDownloadException(
 | 
						|
                  '%s is larger (%d) than %s (%d).\nDeleting tracker file, so '
 | 
						|
                  'if you re-try this download it will start from scratch' %
 | 
						|
                  (fp.name, cur_file_size, str(storage_uri_for_key(key)),
 | 
						|
                   key.size), ResumableTransferDisposition.ABORT)
 | 
						|
            elif cur_file_size == key.size:
 | 
						|
                if key.bucket.connection.debug >= 1:
 | 
						|
                    print 'Download complete.'
 | 
						|
                return
 | 
						|
            if key.bucket.connection.debug >= 1:
 | 
						|
                print 'Resuming download.'
 | 
						|
            headers = headers.copy()
 | 
						|
            headers['Range'] = 'bytes=%d-%d' % (cur_file_size, key.size - 1)
 | 
						|
            cb = ByteTranslatingCallbackHandler(cb, cur_file_size).call
 | 
						|
            self.download_start_point = cur_file_size
 | 
						|
        else:
 | 
						|
            if key.bucket.connection.debug >= 1:
 | 
						|
                print 'Starting new resumable download.'
 | 
						|
            self._save_tracker_info(key)
 | 
						|
            self.download_start_point = 0
 | 
						|
            # Truncate the file, in case a new resumable download is being
 | 
						|
            # started atop an existing file.
 | 
						|
            fp.truncate(0)
 | 
						|
 | 
						|
        # Disable AWSAuthConnection-level retry behavior, since that would
 | 
						|
        # cause downloads to restart from scratch.
 | 
						|
        key.get_file(fp, headers, cb, num_cb, torrent, version_id,
 | 
						|
                     override_num_retries=0)
 | 
						|
        fp.flush()
 | 
						|
 | 
						|
    def get_file(self, key, fp, headers, cb=None, num_cb=10, torrent=False,
 | 
						|
                 version_id=None):
 | 
						|
        """
 | 
						|
        Retrieves a file from a Key
 | 
						|
        :type key: :class:`boto.s3.key.Key` or subclass
 | 
						|
        :param key: The Key object from which upload is to be downloaded
 | 
						|
        
 | 
						|
        :type fp: file
 | 
						|
        :param fp: File pointer into which data should be downloaded
 | 
						|
        
 | 
						|
        :type headers: string
 | 
						|
        :param: headers to send when retrieving the files
 | 
						|
        
 | 
						|
        :type cb: function
 | 
						|
        :param cb: (optional) a callback function that will be called to report
 | 
						|
             progress on the download.  The callback should accept two integer
 | 
						|
             parameters, the first representing the number of bytes that have
 | 
						|
             been successfully transmitted from the storage service and
 | 
						|
             the second representing the total number of bytes that need
 | 
						|
             to be transmitted.
 | 
						|
        
 | 
						|
        :type num_cb: int
 | 
						|
        :param num_cb: (optional) If a callback is specified with the cb
 | 
						|
             parameter this parameter determines the granularity of the callback
 | 
						|
             by defining the maximum number of times the callback will be
 | 
						|
             called during the file transfer.
 | 
						|
             
 | 
						|
        :type torrent: bool
 | 
						|
        :param torrent: Flag for whether to get a torrent for the file
 | 
						|
 | 
						|
        :type version_id: string
 | 
						|
        :param version_id: The version ID (optional)
 | 
						|
 | 
						|
        Raises ResumableDownloadException if a problem occurs during
 | 
						|
            the transfer.
 | 
						|
        """
 | 
						|
 | 
						|
        debug = key.bucket.connection.debug
 | 
						|
        if not headers:
 | 
						|
            headers = {}
 | 
						|
 | 
						|
        # Use num-retries from constructor if one was provided; else check
 | 
						|
        # for a value specified in the boto config file; else default to 5.
 | 
						|
        if self.num_retries is None:
 | 
						|
            self.num_retries = config.getint('Boto', 'num_retries', 5)
 | 
						|
        progress_less_iterations = 0
 | 
						|
 | 
						|
        while True:  # Retry as long as we're making progress.
 | 
						|
            had_file_bytes_before_attempt = get_cur_file_size(fp)
 | 
						|
            try:
 | 
						|
                self._attempt_resumable_download(key, fp, headers, cb, num_cb,
 | 
						|
                                                 torrent, version_id)
 | 
						|
                # Download succceded, so remove the tracker file (if have one).
 | 
						|
                self._remove_tracker_file()
 | 
						|
                # Previously, check_final_md5() was called here to validate 
 | 
						|
                # downloaded file's checksum, however, to be consistent with
 | 
						|
                # non-resumable downloads, this call was removed. Checksum
 | 
						|
                # validation of file contents should be done by the caller.
 | 
						|
                if debug >= 1:
 | 
						|
                    print 'Resumable download complete.'
 | 
						|
                return
 | 
						|
            except self.RETRYABLE_EXCEPTIONS, e:
 | 
						|
                if debug >= 1:
 | 
						|
                    print('Caught exception (%s)' % e.__repr__())
 | 
						|
                if isinstance(e, IOError) and e.errno == errno.EPIPE:
 | 
						|
                    # Broken pipe error causes httplib to immediately
 | 
						|
                    # close the socket (http://bugs.python.org/issue5542),
 | 
						|
                    # so we need to close and reopen the key before resuming
 | 
						|
                    # the download.
 | 
						|
                    key.get_file(fp, headers, cb, num_cb, torrent, version_id,
 | 
						|
                                 override_num_retries=0)
 | 
						|
            except ResumableDownloadException, e:
 | 
						|
                if (e.disposition ==
 | 
						|
                    ResumableTransferDisposition.ABORT_CUR_PROCESS):
 | 
						|
                    if debug >= 1:
 | 
						|
                        print('Caught non-retryable ResumableDownloadException '
 | 
						|
                              '(%s)' % e.message)
 | 
						|
                    raise
 | 
						|
                elif (e.disposition ==
 | 
						|
                    ResumableTransferDisposition.ABORT):
 | 
						|
                    if debug >= 1:
 | 
						|
                        print('Caught non-retryable ResumableDownloadException '
 | 
						|
                              '(%s); aborting and removing tracker file' %
 | 
						|
                              e.message)
 | 
						|
                    self._remove_tracker_file()
 | 
						|
                    raise
 | 
						|
                else:
 | 
						|
                    if debug >= 1:
 | 
						|
                        print('Caught ResumableDownloadException (%s) - will '
 | 
						|
                              'retry' % e.message)
 | 
						|
 | 
						|
            # At this point we had a re-tryable failure; see if made progress.
 | 
						|
            if get_cur_file_size(fp) > had_file_bytes_before_attempt:
 | 
						|
                progress_less_iterations = 0
 | 
						|
            else:
 | 
						|
                progress_less_iterations += 1
 | 
						|
 | 
						|
            if progress_less_iterations > self.num_retries:
 | 
						|
                # Don't retry any longer in the current process.
 | 
						|
                raise ResumableDownloadException(
 | 
						|
                    'Too many resumable download attempts failed without '
 | 
						|
                    'progress. You might try this download again later',
 | 
						|
                    ResumableTransferDisposition.ABORT_CUR_PROCESS)
 | 
						|
 | 
						|
            # Close the key, in case a previous download died partway
 | 
						|
            # through and left data in the underlying key HTTP buffer.
 | 
						|
            # Do this within a try/except block in case the connection is
 | 
						|
            # closed (since key.close() attempts to do a final read, in which
 | 
						|
            # case this read attempt would get an IncompleteRead exception,
 | 
						|
            # which we can safely ignore.
 | 
						|
            try:
 | 
						|
                key.close()
 | 
						|
            except httplib.IncompleteRead:
 | 
						|
                pass
 | 
						|
 | 
						|
            sleep_time_secs = 2**progress_less_iterations
 | 
						|
            if debug >= 1:
 | 
						|
                print('Got retryable failure (%d progress-less in a row).\n'
 | 
						|
                      'Sleeping %d seconds before re-trying' %
 | 
						|
                      (progress_less_iterations, sleep_time_secs))
 | 
						|
            time.sleep(sleep_time_secs)
 |