diff --git a/third_party/upload.py b/third_party/upload.py index 31542663af..37b1131c46 100755 --- a/third_party/upload.py +++ b/third_party/upload.py @@ -54,6 +54,8 @@ import urllib2 import urlparse import webbrowser +from multiprocessing.pool import ThreadPool + # The md5 module was deprecated in Python 2.5. try: from hashlib import md5 @@ -475,7 +477,9 @@ class AbstractRpcServer(object): url_loc = urlparse.urlparse(url) self.host = '%s://%s' % (url_loc[0], url_loc[1]) elif e.code >= 500: - ErrorExit(e.read()) + # TODO: We should error out on a 500, but the server is too flaky + # for that at the moment. + StatusUpdate('Upload got a 500 response: %d' % e.code) else: raise finally: @@ -617,6 +621,9 @@ group.add_option("--account_type", action="store", dest="account_type", help=("Override the default account type " "(defaults to '%default', " "valid choices are 'GOOGLE' and 'HOSTED').")) +group.add_option("-j", "--number-parallel-uploads", + dest="num_upload_threads", default=8, + help="Number of uploads to do in parallel.") # Issue group = parser.add_option_group("Issue options") group.add_option("-t", "--title", action="store", dest="title", @@ -1150,13 +1157,13 @@ class VersionControlSystem(object): else: type = "current" if len(content) > MAX_UPLOAD_SIZE: - print ("Not uploading the %s file for %s because it's too large." % - (type, filename)) + result = ("Not uploading the %s file for %s because it's too large." % + (type, filename)) file_too_large = True content = "" + elif options.verbose: + result = "Uploading %s file for %s" % (type, filename) checksum = md5(content).hexdigest() - if options.verbose > 0 and not file_too_large: - print "Uploading %s file for %s" % (type, filename) url = "/%d/upload_content/%d/%d" % (int(issue), int(patchset), file_id) form_fields = [("filename", filename), ("status", status), @@ -1170,14 +1177,24 @@ class VersionControlSystem(object): form_fields.append(("user", options.email)) ctype, body = EncodeMultipartFormData(form_fields, [("data", filename, content)]) - response_body = rpc_server.Send(url, body, - content_type=ctype) + try: + response_body = rpc_server.Send(url, body, content_type=ctype) + except urllib2.HTTPError, e: + response_body = ("Failed to upload file for %s. Got %d status code." % + (filename, e.code)) + if not response_body.startswith("OK"): StatusUpdate(" --> %s" % response_body) sys.exit(1) + return result + patches = dict() [patches.setdefault(v, k) for k, v in patch_list] + + threads = [] + thread_pool = ThreadPool(options.num_upload_threads) + for filename in patches.keys(): base_content, new_content, is_binary, status = files[filename] file_id_str = patches.get(filename) @@ -1186,9 +1203,17 @@ class VersionControlSystem(object): file_id_str = file_id_str[file_id_str.rfind("_") + 1:] file_id = int(file_id_str) if base_content != None: - UploadFile(filename, file_id, base_content, is_binary, status, True) + t = thread_pool.apply_async(UploadFile, args=(filename, + file_id, base_content, is_binary, status, True)) + threads.append(t) if new_content != None: - UploadFile(filename, file_id, new_content, is_binary, status, False) + t = thread_pool.apply_async(UploadFile, args=(filename, + file_id, new_content, is_binary, status, False)) + threads.append(t) + + for t in threads: + print t.get(timeout=60) + def IsImage(self, filename): """Returns true if the filename has an image extension.""" @@ -2199,26 +2224,48 @@ def UploadSeparatePatches(issue, rpc_server, patchset, data, options): Returns a list of [patch_key, filename] for each file. """ - patches = SplitPatch(data) - rv = [] - for patch in patches: - if len(patch[1]) > MAX_UPLOAD_SIZE: - print ("Not uploading the patch for " + patch[0] + - " because the file is too large.") - continue - form_fields = [("filename", patch[0])] + def UploadFile(filename, data): + form_fields = [("filename", filename)] if not options.download_base: form_fields.append(("content_upload", "1")) - files = [("data", "data.diff", patch[1])] + files = [("data", "data.diff", data)] ctype, body = EncodeMultipartFormData(form_fields, files) url = "/%d/upload_patch/%d" % (int(issue), int(patchset)) - print "Uploading patch for " + patch[0] - response_body = rpc_server.Send(url, body, content_type=ctype) + + try: + response_body = rpc_server.Send(url, body, content_type=ctype) + except urllib2.HTTPError, e: + response_body = ("Failed to upload patch for %s. Got %d status code." % + (filename, e.code)) + lines = response_body.splitlines() if not lines or lines[0] != "OK": StatusUpdate(" --> %s" % response_body) sys.exit(1) - rv.append([lines[1], patch[0]]) + return ("Uploaded patch for " + filename, [lines[1], filename]) + + threads = [] + thread_pool = ThreadPool(options.num_upload_threads) + + patches = SplitPatch(data) + rv = [] + for patch in patches: + if len(patch[1]) > MAX_UPLOAD_SIZE: + print ("Not uploading the patch for " + patch[0] + + " because the file is too large.") + continue + + filename = patch[0] + data = patch[1] + + t = thread_pool.apply_async(UploadFile, args=(filename, data)) + threads.append(t) + + for t in threads: + result = t.get(timeout=60) + print result[0] + rv.append(result[1]) + return rv