Source code for tomputils.downloader.downloader

# -*- coding: utf8 -*-
"""
Download a single file. Download file segments concurrently if supported by
the remote server.

Inspired by:
https://github.com/dragondjf/QMusic/blob/master/test/pycurldownload.py

"""
from __future__ import absolute_import, division, print_function, unicode_literals
from future.builtins import *  # NOQA
import logging
import sys
import os
import time

import pycurl
from six import BytesIO

if os.name == "posix":
    import signal

    signal.signal(signal.SIGPIPE, signal.SIG_IGN)
    del signal

STATUS_OK = (200, 203, 206)
STATUS_ERROR = range(400, 600)
DEFAULT_MIN_SEG_SIZE = 16 * 1024
DEFAULT_MAX_CON = 4
DEFAULT_MAX_RETRY = 5

LOG = logging.getLogger(__name__)


class Connection(object):
    def __init__(self, req_url, can_segment):
        self.curl = pycurl.Curl()
        self.curl.setopt(pycurl.FOLLOWLOCATION, 1)
        self.curl.setopt(pycurl.MAXREDIRS, 5)
        self.curl.setopt(pycurl.CONNECTTIMEOUT, 30)
        self.curl.setopt(pycurl.TIMEOUT, 300)
        self.curl.setopt(pycurl.NOSIGNAL, 1)
        self.curl.setopt(pycurl.WRITEFUNCTION, self.write_cb)
        self.curl.setopt(pycurl.URL, req_url)
        self.can_segment = can_segment
        self.curl.connection = self
        self.total_downloaded = 0
        self.id = None
        self.name = None
        self.segment_size = None
        self.segment = None
        self.link_downloaded = None
        self.segment_downloaded = None
        self.retried = None
        self.out_file = None

    def prepare(self, out_file, segment):
        if isinstance(segment, list):
            self.id = segment[0]
            self.name = "Segment % 02d" % segment[0]
            self.curl.setopt(pycurl.RANGE, "%d-%d" % (segment[1], segment[2]))
            self.segment_size = segment[2] - segment[1] + 1
            self.segment = segment
        else:
            self.id = 0
            self.name = "TASK"
            self.segment_size = segment
            self.segment = None

        self.link_downloaded = 0
        self.segment_downloaded = 0
        self.retried = 0
        self.out_file = out_file
        self.segment = segment

    def prepare_retry(self):
        if self.can_segment:
            self.curl.setopt(
                pycurl.RANGE,
                "%d-%d" % (self.segment[1] + self.segment_downloaded, self.segment[2]),
            )
        if self.link_downloaded:
            self.link_downloaded = 0
        else:
            self.retried += 1

    def close(self):
        self.curl.close()

    def write_cb(self, buf):
        if self.can_segment:
            self.out_file.seek(self.segment[1] + self.segment_downloaded, 0)
            self.out_file.write(buf)
            self.out_file.flush()
            size = len(buf)
            self.link_downloaded += size
            self.segment_downloaded += size
            self.total_downloaded += size
        else:
            self.out_file.write(buf)
            self.out_file.flush()
            size = len(buf)
            self.link_downloaded += size
            self.segment_downloaded += size
            self.total_downloaded += size


[docs]class Downloader(object): """ Download a file, possibly in segments. Parameters ---------- max_retry : int, optional Maximum attempts that will be made to retrieve a segment. min_seg_size : int, optional Largest file size, in bytes, that will not trigger segmenting. max_con : int, optional Maximum number of concurrent connections to the remote server. """ def __init__( self, max_retry=DEFAULT_MAX_RETRY, min_seg_size=DEFAULT_MIN_SEG_SIZE, max_con=DEFAULT_MAX_CON, ): self.min_seg_size = min_seg_size self.max_retry = max_retry self.max_con = max_con
[docs] def fetch(self, req_url, output=None): """ Fetch a file. Parameters ---------- req_url : str URL of the file to retrieve output : str, optional filename, possibly with path, of the downloaded file. TODO: test can_segment == false """ (eurl, size, can_segment) = _check_headers(req_url) if output is None: output = os.path.split(eurl)[1] if len(output) < 1: raise RuntimeError( "Output file must be provided if URL points " "to a directory." ) LOG.info("Downloading %s, (%d bytes)" % (output, size)) segments = self._get_segments(size, can_segment) # allocate file space afile = open(output, str("wb")) if size > 0: afile.truncate(size) afile.close() out_file = open(output, str("r+b")) connections = [] for i in range(len(segments)): c = Connection(eurl, can_segment) connections.append(c) con = {"connections": connections, "free": connections[:], "working": []} start_time = time.time() elapsed = None mcurl = pycurl.CurlMulti() while True: while segments and con["free"]: p = segments.pop(0) c = con["free"].pop(0) c.prepare(out_file, p) con["working"].append(c) mcurl.add_handle(c.curl) LOG.debug("%s:Start downloading", c.name) while True: ret, handles_num = mcurl.perform() if ret != pycurl.E_CALL_MULTI_PERFORM: break while True: num_q, ok_list, err_list = mcurl.info_read() for curl in ok_list: curl.errno = pycurl.E_OK mcurl.remove_handle(curl) c = curl.connection con["working"].remove(c) c.errno = curl.errno c.errmsg = None c.code = curl.getinfo(pycurl.RESPONSE_CODE) if c.code in STATUS_OK: LOG.info( "%s: Download successful. (%d/%d)", c.name, c.segment_downloaded, c.segment_size, ) con["free"].append(c) elif c.code in STATUS_ERROR: msg = "%s:Error < %d >! Connection will be closed" LOG.error(msg.format(c.name, c.code)) con["connections"].remove(c) c.close() segments.append(c.segment) new_c = Connection(c.getopt(pycurl.URL)) con["connections"].append(new_c) con["free"].append(new_c) else: msg = "%s: Unhandled http status code %d" raise Exception(msg.format(c.name, c.code)) for curl, errno, errmsg in err_list: curl.errno = errno curl.errmsg = errmsg mcurl.remove_handle(curl) c = curl.connection c.errno = curl.errno c.errmsg = curl.errmsg con["working"].remove(c) msg = "%s:Download failed < %s >" LOG.error(msg, c.name, c.errmsg) if c.can_segment and c.retried < self.max_retry: c.prepare_retry() con["working"].append(c) mcurl.add_handle(c.curl) LOG.error("%s:Try again", c.name) else: raise RuntimeError(c.errmsg) if num_q == 0: break elapsed = time.time() - start_time downloaded = sum( [connection.total_downloaded for connection in connections] ) _show_progress(size, downloaded, elapsed) if not con["working"]: break mcurl.select(1.0) msg = "Download Succeeded! Total Elapsed {}s".format(elapsed) LOG.info(msg)
def _get_segments(self, file_size, can_segment): """ Calculate segements to request. Parameters ---------- file_size : int Lenght of file in bytes. can_segment: : bool If true, remote server supports segmented downloads. Returns ------- bytearray A two-dimensional array of segments to download. """ if can_segment: num = self.max_con while num * self.min_seg_size > file_size and num > 1: num -= 1 segment_size = int(file_size / num + 0.5) segments = [ [i, i * segment_size, (i + 1) * segment_size - 1] for i in range(num) ] segments[-1][2] = file_size - 1 else: segments = [file_size] LOG.debug("Using %d segments.", len(segments)) return segments
def _check_headers(url): """ Request and parse file headers in preparation of file retireval. Parameters ---------- url : str URL of the file to be retrieved. Returns ------- (str, str, str) Three-tuple of effective URL, size of the file in bytes, and true if the remote server supports segmented downloads. """ headers = BytesIO() curl = pycurl.Curl() curl.setopt(pycurl.FOLLOWLOCATION, 1) curl.setopt(pycurl.MAXREDIRS, 5) curl.setopt(pycurl.CONNECTTIMEOUT, 30) curl.setopt(pycurl.TIMEOUT, 300) curl.setopt(pycurl.NOSIGNAL, 1) curl.setopt(pycurl.NOPROGRESS, 1) curl.setopt(pycurl.NOBODY, 1) curl.setopt(pycurl.HEADERFUNCTION, headers.write) curl.setopt(pycurl.URL, url) curl.perform() response_code = curl.getinfo(pycurl.RESPONSE_CODE) if curl.errstr() or response_code not in STATUS_OK: msg = "Cannot retrieve %s. (%s)".format(url, pycurl.RESPONSE_CODE) raise RuntimeError(msg) eurl = curl.getinfo(pycurl.EFFECTIVE_URL) size = int(curl.getinfo(pycurl.CONTENT_LENGTH_DOWNLOAD)) headers = headers.getvalue().decode("UTF-8") can_segment = headers.find("Accept-Ranges") != -1 if size < 1: can_segment = False return (eurl, size, can_segment) def _show_progress(size, downloaded, elapsed): if not sys.stdout.isatty(): return percent = min(100, downloaded * 100 / size) if elapsed != 0: rate = downloaded * 1.0 / 1024.0 / elapsed info = " D / L:%d / %d ( % 6.2f%%) - Avg:%4.1fkB / s" % ( downloaded, size, percent, rate, ) space = " " * (60 - len(info)) prog_len = int(percent * 20 / 100) prog = "|" + "o" * prog_len + "." * (20 - prog_len) + "|" sys.stdout.write(info + space + prog) sys.stdout.flush() sys.stdout.write("\b" * 82)
[docs]def fetch(req_url, output=None): """ Fetch a single URL using default settings. Parameters ---------- req_url : unicode or str URL to request. File will be written to teh current working directory. """ dl = Downloader() dl.fetch(req_url, output)