from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import time
from datetime import datetime, timedelta
import random
from functools import wraps
import logging
import sys
import os
import threading
from concurrent.futures import ThreadPoolExecutor, CancelledError # ,wait
from math import trunc
import re
from operator import itemgetter
# Extracted from `futures`
try:
from multiprocessing import cpu_count
except ImportError:
# some platforms don't have multiprocessing
def cpu_count():
return None
try:
from urllib.parse import unquote, urlparse
unichr = chr
except ImportError:
from urllib import unquote
from urlparse import urlparse
try:
import cPickle as pickle
except ImportError:
import pickle
from distutils.dir_util import mkpath, remove_tree
from distutils.errors import DistutilsFileError
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from requests import Session
from requests.cookies import cookielib, RequestsCookieJar
from clint.textui import progress as clint_progress
here = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(here, 'VERSION'), mode='r') as fd:
__version__ = fd.read().strip()
_py3plus = (sys.version_info[0] >= 3) # Is Python version 3 and above?
if not _py3plus:
from functools import partial
_os_exit_force = partial(os._exit, -1)
# Default retry configuration
#: int: Default number of retries factor for :data:`_requests_extended_retries_factor`.
REQUESTS_EXTENDED_RETRIES_FACTOR = 3
#: int: Default number of retries on exception set through ``urllib3``'s `Retry` mechanism.
URLLIB3_BUILTIN_RETRIES_ON_EXCEPTION = 3
#: int: Default number of retries on exceptions raised while streaming the request content.
REQUESTS_RETRIES_ON_STREAM_EXCEPTION = 10
#: float: Default retry backoff factor.
RETRY_BACKOFF_FACTOR = 0.1
#: set: Default status codes to retry on intended for the underlying ``urllib3``.
URLLIB3_RETRY_STATUS_CODES = frozenset([413, 429, 500, 502, 503, 504])
COOKIE_STR_REGEX = re.compile(r'\s*(?:[^,; =]+=[^,; ]+\s*(?:$|\s+|;\s*))+\s*')
"""A compiled regular expression object used to match the cookie string in the form of key/value pairs.
See also :meth:`BDownloader.__init__()` for more details about `cookies`.
"""
_requests_extended_retries_factor = REQUESTS_EXTENDED_RETRIES_FACTOR
"""int: Number of retries that complements and extends the builtin `Retry` mechanism of ``urllib3``.
This global variable is meant for the decorator :func:`retry_requests()`, and its value can be modified through the
module level function :func:`set_requests_retries_factor`. It is initialized to :const:`REQUESTS_EXTENDED_RETRIES_FACTOR`
by default, and usually you don't want to change it.
Together with ``urllib3``'s builtin retry logic, they determine the total number of the retries on exceptions and bad
status codes at requests for downloading. For more details on the retry mechanisms, see :func:`requests_retry_session`.
Notes:
Don't mix these two retry mechanisms up with the retries at failed connections while streaming the request content.
"""
#: int: The highest pickle protocol number valid for both Python 2.x and Python 3.x.
PICKLE_PROTOCOL_NUMBER = 2
[docs]def _cpu_count():
"""A simple wrapper around the ``cpu_count()`` for escaping the `NotImplementedError`.
Returns:
The number of CPUs in the system. Return `None` if not obtained.
"""
try:
cpus = cpu_count()
except NotImplementedError:
cpus = None
return cpus
[docs]def set_requests_retries_factor(retries):
"""Set the retries factor for the decorator :func:`retry_requests`.
Args:
retries (int): Number of retries when a decorated method of ``requests`` raised an exception or returned any bad
status code. It should take a value of at least ``1``, or else nothing changes.
Returns:
None.
"""
global _requests_extended_retries_factor
if retries > 0:
_requests_extended_retries_factor = retries
[docs]def retry_requests(exceptions, backoff_factor=0.1, logger=None):
"""A decorator that retries calling the wrapped ``requests``' function using an exponential backoff on exception.
The retry attempt will be activated in the event of `exceptions` being caught and for all the bad status codes (i.e.
codes ranging from 400 to 600).
Args:
exceptions (:obj:`Exception` or :obj:`tuple` of :obj:`Exception`\ s): The exceptions to check against.
backoff_factor (float): The backoff factor to apply between retries.
logger (logging.Logger): An event logger.
Returns:
The wrapper function.
Raises:
`exceptions`: Re-raise the last caught exception when retries is exhausted.
Notes:
This function has an external dependency on the global variable :data:`_requests_extended_retries_factor`, whose
value can be changed through the function :func:`set_requests_retries_factor`. Also, it should be greater than
``0``, thus allowing the decorated method to retry at least once to cover the edge cases of exceptions and bad
status codes.
References:
[1] http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/
[2] https://en.wikipedia.org/wiki/Exponential_backoff
"""
if logger is None:
logger = logging.getLogger(__name__)
random.seed()
def deco_retry(f):
@wraps(f)
def f_retry(*args, **kwargs):
# The retry could be bypassed if the factor is not set through :func:`set_requests_retries_factor`,
# e.g. by setting it to ``0`` directly. This behavior is intentionally not disabled.
global _requests_extended_retries_factor
ntries = 0
while True:
try:
r = f(*args, **kwargs) # `r` is an instance of the ``requests.Response`` object
r.raise_for_status()
return r
except exceptions as e:
ntries += 1
if ntries > _requests_extended_retries_factor:
raise e
steps = random.randrange(0, 2**ntries)
backoff = steps * backoff_factor
logger.warning("Retrying %d/%d in %.2f seconds: '%r'",
ntries, _requests_extended_retries_factor, backoff, e)
time.sleep(backoff)
return f_retry # true decorator
return deco_retry
[docs]class RequestsSessionWrapper(Session):
"""Subclass of the ``requests.Session`` class with extended `retry-on-exception` behavior for the ``get`` method.
Note:
The retry mechanism here is independent from that built into ``urllib3`` (see :data:`_requests_extended_retries_factor`
and :func:`retry_requests`). That is, the decorated retry attempts will be triggered whenever the ``get`` method
raised on some ``requests.RequestException`` or for any bad status code, regardless of whether or not the builtin
Retry of ``urllib3`` is enabled. Nevertheless, they together determine the number of the total retries.
See :func:`requests_retry_session` for more information about their cooperation.
"""
#: Default timeouts: the connect timeout value defaults to 3.05 seconds, and the read timeout 6 seconds.
TIMEOUT = (3.05, 6)
[docs] def __init__(self, timeout=None, proxy=None, cookies=None, user_agent=None, referrer=None, verify=True, cert=None,
requester_cb=None):
"""Initialize the ``Session`` instance.
The HTTP header ``User-Agent`` of the session is set to a default value of `bdownload/VERSION`, if not provided,
with `VERSION` being replaced by the package's version number.
Args:
timeout (float or 2-tuple of float): Timeout value(s) as a float or ``(connect, read)`` tuple for both the
``connect`` and the ``read`` timeouts, respectively. If set to ``None``, ``0`` or ``()``, whether the
whole or any item thereof, it will take a default value from :attr:`TIMEOUT`, accordingly.
proxy (str): Same as for :meth:`BDownloader.__init__()`.
cookies (str, dict or CookieJar): Same as for :meth:`BDownloader.__init__()`.
user_agent (str): Same as for :meth:`BDownloader.__init__()`.
referrer (str): Same as for :meth:`BDownloader.__init__()`.
verify (bool or str): Same as for :meth:`requests.request()`.
cert (str or tuple): Same as for :meth:`requests.request()`.
requester_cb (func): The callback function provided by the downloader that uses the instantiated
session object as the HTTP(S) requester. It will get called when making an HTTP GET request.
"""
super(RequestsSessionWrapper, self).__init__()
timeout = timeout or self.TIMEOUT
if isinstance(timeout, tuple):
timeout = timeout + self.TIMEOUT[len(timeout):]
timeout = timeout[:len(self.TIMEOUT)]
tmo_li = list(timeout)
for idx, tm in enumerate(timeout):
if not tm or tm < 0:
tmo_li[idx] = self.TIMEOUT[idx]
timeout = tuple(tmo_li)
else: # float
if timeout < 0:
timeout = self.TIMEOUT
self.timeout = timeout
self.requester_cb = requester_cb
self.referrer = referrer.strip() if referrer is not None else referrer
if self.referrer:
self.headers.update({'Referer': self.referrer})
default_user_agent = 'bdownload/{}'.format(__version__)
self.user_agent = user_agent if user_agent and user_agent.strip() else default_user_agent
self.headers.update({'User-Agent': self.user_agent})
if proxy is not None:
self.proxies = dict(http=proxy, https=proxy)
if cookies is not None:
self.cookies = cookies if isinstance(cookies, (dict, cookielib.CookieJar)) else self._build_cookiejar_from_kvp(cookies)
self.verify = verify
self.cert = cert
[docs] @retry_requests(requests.RequestException, backoff_factor=RETRY_BACKOFF_FACTOR)
def get(self, url, **kwargs):
"""Wrapper around ``requests.Session``'s `get` method decorated with the :func:`retry_requests` decorator.
Args:
url: URL for the file to download from.
**kwargs: Same arguments as that ``requests.Session.get`` takes.
Returns:
``requests.Response``: The response to the HTTP ``GET`` request.
Raises:
:class:`BDownloaderException`: Raised when the termination or cancellation flag has been set, for example,
if :attr:`RequestsSessionWrapper.requester_cb` is initialized to :meth:`BDownloader.raise_on_interrupted`.
ExceptionByRequesterCB: Same exception(s) as that raised by :attr:`RequestsSessionWrapper.requester_cb`, if any.
"""
if self.requester_cb:
self.requester_cb() # e.g. jump instantly out of the retries when interrupted by user
kwargs.setdefault('timeout', self.timeout)
if self.referrer and self.referrer == '*':
self.headers.update({'Referer': url})
return super(RequestsSessionWrapper, self).get(url, **kwargs)
[docs] @staticmethod
def _build_cookiejar_from_kvp(key_values):
"""Build a CookieJar from cookies in the form of key/value pairs.
Args:
key_values (str): The cookies must take the form of ``'cookie_key=cookie_value'``, with multiple pairs separated
by whitespace and/or semicolon if applicable, e.g. ``'key1=val1 key2=val2; key3=val3'``.
Returns:
``requests.cookies.RequestsCookieJar``: The built CookieJar for ``requests`` sessions.
Raises:
ValueError: Raised when the cookies string `key_values` is not in valid format.
"""
if key_values:
if not COOKIE_STR_REGEX.match(key_values):
msg = 'Cookie {!r} is not in valid format!'.format(key_values)
raise ValueError(msg)
key_values = key_values.replace(';', ' ') # Convert semicolons to whitespaces for ease of split
cookiejar = RequestsCookieJar()
kvps = key_values.split()
for kvp in kvps:
key, value = kvp.split("=")
cookiejar.set(key, value)
return cookiejar
[docs]def requests_retry_session(builtin_retries=None, backoff_factor=0.1, status_forcelist=None,
session=None, num_pools=20, pool_maxsize=20, **kwargs):
"""Create a session object of the class :class:`RequestsSessionWrapper` by default.
Aside from the retry mechanism implemented by the wrapper decorator, the created session also leverages the built-in
retries bound to ``urllib3``. When both of them are enabled, they cooperate to determine the total retry attempts.
The worst-case retries is determined using the following formula:
`builtin_retries` * (:data:`_requests_extended_retries_factor` + 1)
which applies to all the exceptions and those status codes that fall into the `status_forcelist`. For other status
codes, the maximum retries shall be :data:`_requests_extended_retries_factor`.
Args:
builtin_retries (int): Maximum number of retry attempts allowed on errors and interested status codes, which will
apply to the retry logic of the underlying ``urllib3``. If set to `None` or ``0``, it will default to
:const:`URLLIB3_RETRIES_ON_EXCEPTION`.
backoff_factor (float): The backoff factor to apply between retries.
status_forcelist (set of int): A set of HTTP status codes that a retry should be enforced on. The default status
forcelist shall be :const:`URLLIB3_RETRY_STATUS_CODES` if not given.
session (:obj:`requests.Session`): An instance of the class ``requests.Session`` or its customized subclass.
When not provided, it will use :class:`RequestsSessionWrapper` to create by default.
num_pools (int): The number of connection pools to cache, which has the same meaning as `num_pools` in
``urllib3.PoolManager`` and will eventually be passed to it.
pool_maxsize (int): The maximum number of connections to save that can be reused in the ``urllib3`` connection
pool, which will be passed to the underlying ``requests.adapters.HTTPAdapter``.
**kwargs: Same arguments as that :meth:`RequestsSessionWrapper.__init__()` takes.
Returns:
``requests.Session``: The session instance with retry capability.
References:
https://www.peterbe.com/plog/best-practice-with-retries-with-requests
"""
session = session or RequestsSessionWrapper(**kwargs)
builtin_retries = builtin_retries or URLLIB3_BUILTIN_RETRIES_ON_EXCEPTION
status_forcelist = status_forcelist or URLLIB3_RETRY_STATUS_CODES
# Initialize the built-in retry mechanism and the connection pools
max_retries = Retry(
total=builtin_retries,
read=builtin_retries,
connect=builtin_retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=max_retries, pool_connections=num_pools, pool_maxsize=pool_maxsize, pool_block=True)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
[docs]def unquote_unicode(string):
"""Unquote a percent-encoded string.
Args:
string (str): A %xx- and %uxxxx- encoded string.
Returns:
str: The unquoted unicode string.
References:
https://stackoverflow.com/questions/300445
"""
try:
if isinstance(string, unicode): # python 2.7
string = string.encode('utf-8')
string = unquote(string) # handle two-digit %hh components first
parts = string.split(u'%u'.encode('utf-8'))
except NameError: # python 3.x has no type called `unicode`
string = unquote(string) # handle two-digit %hh components first
parts = string.split('%u')
if len(parts) > 1:
res = [parts[0]]
for part in parts[1:]:
try:
digits = part[:4].lower()
if len(digits) < 4:
raise ValueError
ch = unichr(int(digits, 16))
if (
not res[-1] and
u'\uDC00' <= ch <= u'\uDFFF' and
u'\uD800' <= res[-2] <= u'\uDBFF'
):
# UTF-16 surrogate pair, replace with single non-BMP codepoint
res[-2] = (res[-2] + ch).encode(
'utf-16', 'surrogatepass').decode('utf-16')
else:
res.append(ch)
res.append(part[4:])
except ValueError:
res.append(u'%u')
res.append(part)
string = u''.join(res)
try:
if not isinstance(string, unicode): # python 2.7
string = string.decode("utf-8")
except NameError:
pass
return string
[docs]class MillProgress(object):
"""Print a mill while progressing.
This class is adapted from ``clint.textui.progress``, with added support for unknown `expected_size`.
References:
https://github.com/kennethreitz-archive/clint/blob/master/clint/textui/progress.py
"""
STREAM = sys.stderr
MILL_TEMPLATE = '{} {} {:,d}/{:<} {} {} {}\r'
MILL_CHARS = ['|', '/', '-', '\\']
NULL_EXPECTED_DISP = '--'
NULL_EXPECTED_WIDTH = len(NULL_EXPECTED_DISP)
# How long to wait before recalculating the ETA
ETA_INTERVAL = 1
# How many intervals (excluding the current one) to calculate the simple moving average
ETA_SMA_WINDOW = 9
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.done()
return False
def __init__(self, label='', hide=None, expected_size=None, every=1, eta_tag='eta:', elapsed_tag='elapsed:'):
self.label = label
self.width = 0
timetag_width = max(len(eta_tag), len(elapsed_tag))
self.eta_tag = '{: >{width}}'.format(eta_tag, width=timetag_width)
self.elapsed_tag = '{: >{width}}'.format(elapsed_tag, width=timetag_width)
self.hide = hide
# Only show bar in terminals by default (better for piping, logging etc.)
if hide is None:
try:
self.hide = not self.STREAM.isatty()
except AttributeError: # output does not support isatty()
self.hide = True
self.expected_size = expected_size
self.every = every
self.last_progress = 0
self.delta_progress = 0
self.every_progress = 0
self.start = time.time()
self.elapsed = 0
self.ittimes = []
self.eta = 0
self.etadelta = time.time()
self.etadisp = self.format_time(self.eta)
self.show(0)
[docs] def mill_char(self, progress):
return self.MILL_CHARS[(progress // self.every) % len(self.MILL_CHARS)]
[docs] def show(self, progress, count=None):
if count is not None:
self.expected_size = count
self.last_progress = progress
if self.expected_size and progress <= self.expected_size:
if (time.time() - self.etadelta) > self.ETA_INTERVAL:
self.etadelta = time.time()
self.ittimes = \
self.ittimes[-self.ETA_SMA_WINDOW:] + \
[(time.time() - self.start) / (progress + 1)]
self.eta = \
sum(self.ittimes) / len(self.ittimes) * \
(self.expected_size - progress)
self.etadisp = self.format_time(self.eta)
time_disp = self.etadisp
time_label = self.eta_tag
else:
self.elapsed = time.time() - self.start
elapsed_disp = self.format_time(self.elapsed)
time_disp = elapsed_disp
time_label = self.elapsed_tag
expected_disp = '{:<{width},d}'.format(self.expected_size, width=self.NULL_EXPECTED_WIDTH) if self.expected_size else self.NULL_EXPECTED_DISP
percent_disp = '{:6.2f}%'.format(trunc(progress/self.expected_size*100*100)/100) if self.expected_size else ''
if not self.hide:
if ((progress % self.every) == 0 or (progress - self.delta_progress) // self.every >= 1 or # True every "every" updates
(self.expected_size and progress == self.expected_size)): # And when we're done
mill_bar = self.MILL_TEMPLATE.format(self.label, self.mill_char(self.every_progress),
progress, expected_disp, percent_disp, time_label, time_disp)
mill_bar_len = len(mill_bar)
if self.width > mill_bar_len:
mill_bar += ' ' * (self.width - mill_bar_len)
self.width = mill_bar_len
self.STREAM.write(mill_bar)
self.STREAM.flush()
self.delta_progress = progress
self.every_progress += self.every
[docs] def done(self):
self.elapsed = time.time() - self.start
elapsed_disp = self.format_time(self.elapsed)
time_label = self.elapsed_tag
expected_disp = '{:<{width},d}'.format(self.expected_size, width=self.NULL_EXPECTED_WIDTH) if self.expected_size else self.NULL_EXPECTED_DISP
percent_disp = '{:6}%'.format(trunc(self.last_progress/self.expected_size*100)) if self.expected_size else ''
if not self.hide:
mill_bar = self.MILL_TEMPLATE.format(self.label, ' ', self.last_progress, expected_disp, percent_disp,
time_label, elapsed_disp)
mill_bar_len = len(mill_bar)
if self.width > mill_bar_len:
mill_bar += ' ' * (self.width - mill_bar_len)
self.width = mill_bar_len
self.STREAM.write(mill_bar)
self.STREAM.write('\n')
self.STREAM.flush()
[docs]class BDownloader(object):
"""The class for executing and managing download jobs.
The context of the current downloading job is structured as::
ctx = {
"total_size": 2000, # total size of all the to-be-downloaded files, maybe inaccurate due to chunked transfer encoding
"accurate": True, # Is `total_size` accurate?
"orig_path_urls": [('file1', 'url1\turl2\turl3'), ('file2', 'url4\turl5\turl6')], # originally added downloads,
# which don't necessarily correspond to `files` e.g. due to duplicate or interruption
"file_cnt": 2, # number of current downloading files
"alt_files": [("full_path_to_file1", `ctx_file1_obj`), ("full_path_to_file2", `ctx_file2_obj`)], # flattened `files`
"files":{
"full_path_to_file1":{
"length": 2000, # 0 means 'unknown', i.e. file size can't be pre-determined through any one of provided URLs
"progress": 0, # `SUCCEEDED` downloaded bytes: initialized to 0, set to the last progress when
# resuming and updated on completion (SUCCEEDED only!) of every task (`Future`)
"last_progress": 0, # CONSTANT: the loaded progress of last run upon resuming from interruption
"downloaded": 0, # downloaded bytes: initialized to 0, set to the last progress when resuming
# and updated on completion (SUCCEEDED, FAILED, CANCELLED) of every task (`Future`)
"resumable": True,
"resuming_from_intr": False, # Are we resuming from keyboard interruption?
"download_state": "inprocess",
"cancelled_on_exception": False,
"futures": [future1, future2],
"tsk_num": 2, # number of the `ranges` and `futures`
"orig_path_url": ('file1', 'url1\turl2\turl3'), # (path, url) as a subparameter passed to :meth:`downloads`
"path_url": ('full_path_to_file1', 'url1\turl2\turl3'), # (full_pathname, active_URLs)
"urls":{"url1":{"accept_ranges": "bytes", "refcnt": 1, "interrupted": 2, "succeeded": -5},
"url2":{"accept_ranges": "none", "refcnt": 0, "interrupted": 0, "succeeded": 0},
"url3":{"accept_ranges": "bytes", "refcnt": 1, "interrupted": 0, "succeeded": -2}},
"ranges":{
"bytes=0-999": {
"start": 0, # start byte position
"end": 999, # end byte position, None for 'unkown', see above
"offset": 0, # current pointer position relative to 'start'(i.e. 0)
"start_time": 0,
"rt_dl_speed": 0, # x seconds interval
"download_state": "inprocess",
"future": future1,
"url": [url1],
"alt_urls": {}
},
"bytes=1000-1999": {
"start":1000,
"end":1999,
"offset": 0, # current pointer position relative to 'start'(i.e. 1000)
"start_time": 0,
"rt_dl_speed": 0, # x seconds interval
"download_state": "inprocess",
"future": future2,
"url": [url3],
"alt_urls": {}
}
}
},
"full_path_to_file2":{
}
},
"futures": {
future1: {"file": "full_path_to_file1", "range": "bytes=0-999"},
future2: {"file": "full_path_to_file1", "range": "bytes=1000-1999"}
}
}
"""
INPROCESS_EXT = '.bdl' # extension for the file in downloading (i.e. not succeeded yet)
RESUM_PARTS_EXT = '.bdl.par' # extension for the resumption parts file
# Possible download states of the files and ranges
PENDING = 'pending' # submitted but not yet processed
INPROCESS = 'inprocess' # in downloading
FAILED = 'failed' # aborted with exception raised
SUCCEEDED = 'succeeded' # finished without error
CANCELLED = 'cancelled' # aborted without being processed
_COMPLETED = [FAILED, CANCELLED, SUCCEEDED]
_FILE_STATES = [PENDING, INPROCESS, FAILED, CANCELLED, SUCCEEDED]
_RANGE_STATES = [PENDING, INPROCESS, FAILED, CANCELLED, SUCCEEDED]
# Progress bar styles
PROGRESS_BS_MILL = 'mill'
PROGRESS_BS_BAR = 'bar'
PROGRESS_BS_NONE = 'none'
_PROGRESS_BAR_STYLES = [PROGRESS_BS_MILL, PROGRESS_BS_BAR, PROGRESS_BS_NONE]
# Default value for `max_workers`
_MAX_WORKERS = (_cpu_count() or 1) * 5 # In line with `futures`
# Default chunk size for streaming the download
_STREAM_CHUNK_SIZE = 7168
# The timeout value to allow the waiting event to be interruptible
_INTERRUPTIBLE_WAIT_TIMEOUT = 0.5
# The number of time to wait in seconds before shutdown when interrupted on Python2.x
_PY2_SIGINT_WAIT_TIMEOUT = 3
# The number of time to wait in seconds for joining the thread when interrupted on Python2.x
_PY2_SIGINT_JOIN_TIMEOUT = 1
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
[docs] def __init__(self, max_workers=None, min_split_size=1024*1024, chunk_size=1024*100, proxy=None, cookies=None,
user_agent=None, logger=None, progress='mill', num_pools=20, pool_maxsize=20, request_timeout=None,
request_retries=None, status_forcelist=None, resumption_retries=None, continuation=True, referrer=None,
check_certificate=True, ca_certificate=None, certificate=None):
"""Create and initialize a :class:`BDownloader` object.
Args:
max_workers (int): The `max_workers` parameter specifies the number of the parallel downloading threads,
whose default value is determined by ``#num_of_processor * 5`` if set to `None`.
min_split_size (int): `min_split_size` denotes the size in bytes of file pieces split to be downloaded
in parallel, which defaults to 1024*1024 bytes (i.e. 1MB).
chunk_size (int): The `chunk_size` parameter specifies the chunk size in bytes of every http range request,
which will take a default value of 1024*100 (i.e. 100KB) if not provided.
proxy (str): The `proxy` supports both HTTP and SOCKS proxies in the form of ``'http://[user:pass@]host:port'``
and ``'socks5://[user:pass@]host:port'``, respectively.
cookies (str, dict or CookieJar): If `cookies` needs to be set, it must either take the form of ``'cookie_key=cookie_value'``,
with multiple pairs separated by whitespace and/or semicolon if applicable, e.g. ``'key1=val1 key2=val2;key3=val3'``,
be packed into a ``dict``, or be an instance of ``CookieJar``, i.e. ``cookielib.CookieJar`` for Python27,
``http.cookiejar.CookieJar`` for Python3.x or ``RequestsCookieJar`` from ``requests``.
user_agent (str): When `user_agent` is not given, it will default to ``'bdownload/VERSION'``, with ``VERSION``
being replaced by the package's version number.
logger (logging.Logger): The `logger` parameter specifies an event logger. If `logger` is not `None`,
it must be an object of class :class:`logging.Logger` or of its customized subclass. Otherwise,
it will use a default module-level logger returned by ``logging.getLogger(__name__)``.
progress (str): `progress` determines the style of the progress bar displayed while downloading files.
Possible values are ``'mill'``, ``'bar'`` and ``'none'``. ``'mill'`` is the default. To disable this
feature, e.g. while scripting or multi-instanced, set it to ``'none'``.
num_pools (int): The `num_pools` parameter has the same meaning as `num_pools` in ``urllib3.PoolManager``
and will eventually be passed to it. Specifically, `num_pools` specifies the number of connection pools
to cache.
pool_maxsize (int): `pool_maxsize` will be passed to the underlying ``requests.adapters.HTTPAdapter``.
It specifies the maximum number of connections to save that can be reused in the urllib3 connection pool.
request_timeout (float or 2-tuple of float): The `request_timeout` parameter specifies the timeouts for the
internal ``requests`` session. The timeout value(s) as a float or ``(connect, read)`` tuple is intended
for both the ``connect`` and the ``read`` timeouts, respectively. If set to ``None``, it will take a
default value of :attr:`RequestsSessionWrapper.TIMEOUT`.
request_retries (int): `request_retries` specifies the maximum number of retry attempts allowed on exceptions
and interested status codes(i.e. `status_forcelist`) for the builtin Retry logic of ``urllib3``. It will
default to :const:`URLLIB3_BUILTIN_RETRIES_ON_EXCEPTION` if not given.
Notes:
There are two retry mechanisms that jointly determine the total retries of a request. One is the
above-mentioned Retry logic that is built into ``urllib3``, and the other is the extended high-level
retry factor that is meant to complement the builtin retry mechanism. The total retries is bounded by
the following formula:
`request_retries` * (:data:`_requests_extended_retries_factor` + 1)
See :func:`retry_requests`, :class:`RequestsSessionWrapper` and :func:`requests_retry_session` for
more details on the retry mechanisms.
status_forcelist (set of int): `status_forcelist` specifies a set of HTTP status codes that a retry should
be enforced on. The default set of status codes shall be :const:`URLLIB3_RETRY_STATUS_CODES` if not given.
resumption_retries (int): The `resumption_retries` parameter specifies the maximum allowable number of retries
on error at resuming the interrupted download while streaming the request content. The default value of it
is :const:`REQUESTS_RETRIES_ON_STREAM_EXCEPTION` when not provided.
continuation (bool): The `continuation` parameter specifies whether, if possible, to resume the partially
downloaded files before, e.g. when the downloads had been terminated by the user by pressing `Ctrl-C`.
When not present, it will default to `True`.
referrer (str): `referrer` specifies an HTTP request header ``Referer`` that applies to all downloads.
If set to ``'*'``, the request URL shall be used as the referrer per download.
check_certificate (bool): The `check_certificate` parameter specifies whether to verify the server's TLS
certificate or not. It defaults to `True`.
ca_certificate (str): The `ca_certificate` parameter specifies a path to the preferred CA bundle file (.pem)
or directory with certificates in PEM format of trusted CAs. If set to a path to a directory, the directory
must have been processed using the ``c_rehash`` utility supplied with OpenSSL, according to ``requests``.
NB the cert files in the directory each only contain one CA certificate.
certificate (str or tuple): `certificate` specifies a client certificate. It has the same meaning as that of
`cert` in :meth:`requests.request()`.
Raises:
ValueError: Raised when the `cookies` is of the :obj:`str` type and not in valid format.
"""
if not resumption_retries or resumption_retries < 0:
# Fall back on the defaults if None, 0 or a negative number is given
resumption_retries = REQUESTS_RETRIES_ON_STREAM_EXCEPTION
self.resumption_retries = resumption_retries
if not request_retries or request_retries < 0:
# Fall back on the defaults if None, 0 or a negative number is given
request_retries = URLLIB3_BUILTIN_RETRIES_ON_EXCEPTION
verify = ca_certificate if check_certificate and ca_certificate else check_certificate
session = RequestsSessionWrapper(timeout=request_timeout, proxy=proxy, cookies=cookies, user_agent=user_agent,
referrer=referrer, verify=verify, cert=certificate, requester_cb=self.raise_on_interrupted)
self.requester = requests_retry_session(session=session, builtin_retries=request_retries,
backoff_factor=RETRY_BACKOFF_FACTOR,
status_forcelist=status_forcelist,
num_pools=num_pools, pool_maxsize=pool_maxsize)
self.executor = ThreadPoolExecutor(max_workers)
self.max_workers = max_workers or self._MAX_WORKERS
self.progress_thread = None
self.mgmnt_thread = None
self.all_done_event = threading.Event() # Event signaling the completion of all the download jobs
self.all_done = False # Flag denoting the completion of all the download jobs
# Flag indicating that **all** the download tasks have been submitted, i.e. no more downloads to be added
self.all_submitted = False
self.sigint = False # Received the SIGINT (i.e. `KeyboardInterrupt`) signal, e.g. raised by hitting `Ctrl-C`?
self.cmdquit = False # Received the QUIT command, e.g. triggered by pressing `q`?
# Flag indicating that cancellation of all the tasks have been done on demand, e.g. by pressing `Ctrl-C` or `q`
self.cancelled_on_interrupt = False
self.stop = False # Flag signaling waiting threads to exit
# The download context that maintains the status of the downloading files and the corresponding chunks
self._dl_ctx = {"total_size": 0, "accurate": True, "orig_path_urls": [],
"file_cnt": 0, "alt_files": [], "files": {}, "futures": {}}
# list: A downloadable subset of all the `(path, url)`\ s that were passed to :meth:`BDownloader.download` or
# :meth:`BDownloader.downloads`.
self.active_downloads_added = []
# list: The non-downloadable `(path, url)`\ s that were filtered out before downloading actually begins.
# Together with :attr:`BDownloader.active_downloads_added`, they form the whole of the input downloads.
self.failed_downloads_on_addition = []
# list: A subset of :attr:`BDownloader.active_downloads_added`, the downloading of which aborted abnormally.
self.failed_downloads_in_running = []
# list: The succeeded downloads, being a subset of :attr:`BDownloader.active_downloads_added`.
self.succeeded_downloads_in_running = []
# list: The downloads whose desired files already exist out there, without the need to re-download.
self.succeeded_downloads_on_addition = []
if logger is None:
logger = logging.getLogger(__name__)
self._logger = logger
self.min_split_size = min_split_size
self.chunk_size = chunk_size
self.progress = progress
if self.progress not in self._PROGRESS_BAR_STYLES:
self._logger.error("Error: invalid ProgressBar parameter '%s', default to '%s'",
self.progress, self.PROGRESS_BS_MILL)
self.progress = self.PROGRESS_BS_MILL
self.continuation = continuation
[docs] @staticmethod
def calc_req_ranges(req_len, split_size, req_start=0):
"""Split the request `req_len` into chunks of the size `split_size` starting from the point `req_start`.
Args:
req_len (int): The length of the request to split.
split_size (int): The size of each split chunk.
req_start (int): The start position to split from.
Returns:
list of tuple: The list of ranges in the form of 2-tuple ``'(start ,end)'``.
"""
ranges = []
range_cnt = req_len // split_size
for piece_id in range(range_cnt):
start = req_start + piece_id * split_size
end = start + split_size - 1
ranges.append((start, end))
# get the range of the last file piece
if req_len % split_size:
start = req_start + range_cnt * split_size
end = req_start + req_len - 1
ranges.append((start, end))
return ranges
[docs] @staticmethod
def list_split(li, chunk_size=5):
"""Break a list into chunks.
Args:
li (list): The list to split.
chunk_size (int): The size of the resultant chunk list.
Yields:
list: The next chunk of the split list `li`.
"""
for i in range(0, len(li), chunk_size):
yield li[i:i+chunk_size]
[docs] def _is_parallel_downloadable(self, path_name):
"""Check if the file can be downloaded in parallel, i.e. using multi-threads to download the file pieces simultaneously.
Args:
path_name (str): The full path name of the file to be downloaded.
Returns:
bool: ``True`` if the file length is known and the server accepts its range requests, otherwise ``False``.
"""
ctx_file = self._dl_ctx['files'][path_name]
parallel = True if ctx_file['length'] and ctx_file['resumable'] else False
return parallel
[docs] def _is_download_resumable(self, path_name):
"""Check if the current download of the file can be resumed from the point of last interruption through retrying.
Args:
path_name (str): The full path name of the file being downloaded.
Returns:
bool: ``True`` if the server accepts range requests for the file, otherwise ``False``.
"""
return True if self._dl_ctx['files'][path_name]['resumable'] else False
[docs] def raise_on_interrupted(self):
"""Raise a customized exception signaling that the downloads have been terminated by the user.
Raises:
:class:`BDownloaderException`: Raised when the termination or cancellation flag has been set.
"""
if self.sigint or self.cmdquit:
raise BDownloaderException("The download was intentionally interrupted by the user!")
[docs] def _get_remote_file_multipart(self, path_name, req_range):
"""The worker thread body for downloading an assigned piece of a file.
Args:
path_name (str): The full path name of the file to be downloaded.
req_range (str): A chunk of the file `path_name` as a range request of the form ``'bytes=start-end'``.
Returns:
None.
Raises:
:class:`BDownloaderException`: Raised when connect timeouts, read timeouts, failed connections or bad status
codes occurred and the retries is exhausted.
EnvironmentError: Raised when file operations failed.
"""
ctx_file = self._dl_ctx['files'][path_name]
ctx_range = ctx_file['ranges'][req_range]
url = ctx_range['url'][0]
alt_urls = None # get-on-error alternative URLs
alt_try = 0 # number of tries at alternative URLs
max_retries = self.resumption_retries
ctx_range['download_state'] = self.INPROCESS
if ctx_file['download_state'] == self.PENDING:
ctx_file['download_state'] = self.INPROCESS
path_name_inprocess = path_name + self.INPROCESS_EXT
try:
with open(path_name_inprocess, mode='r+b') as fd:
for tr in range(max_retries+1):
# request start position and end position, maybe resuming from a previous failed request
range_start, range_end = ctx_range['start'] + ctx_range['offset'], ctx_range['end']
ranges = self.calc_req_ranges(range_end - range_start + 1, self.chunk_size, range_start)
fd.seek(range_start)
for start, end in ranges:
req_range_new = "bytes={}-{}".format(start, end)
headers = {"Range": req_range_new}
try:
r = self.requester.get(url, headers=headers, allow_redirects=True, stream=True)
if r.status_code == requests.codes.partial:
try:
for chunk in r.iter_content(chunk_size=self._STREAM_CHUNK_SIZE):
fd.write(chunk)
ctx_range['offset'] += len(chunk)
self.raise_on_interrupted()
except requests.RequestException as e:
self._logger.error("Error while downloading '%s'(range:%d-%d/%d-%d): '%r'",
os.path.basename(path_name), start, end, ctx_range['start'],
ctx_range['end'], e)
# increment the interrupted connection count
ctx_url = ctx_range['alt_urls'].setdefault(url, {})
ctx_url['interrupted'] = ctx_url.get('interrupted', 0) + 1
break
else:
msg = "Unexpected status code {}, which should have been {}.".format(r.status_code, requests.codes.partial)
raise requests.RequestException(msg)
except requests.RequestException as e:
msg = "Error while downloading '{}'(range:{}-{}/{}-{}): '{!r}'".format(
os.path.basename(path_name), start, end, ctx_range['start'], ctx_range['end'], e)
self._logger.error(msg)
# increment the failed connection count
ctx_url = ctx_range['alt_urls'].setdefault(url, {})
ctx_url['interrupted'] = ctx_url.get('interrupted', 0) + 1
if alt_urls is None:
# Get alternative URLs from sorted sources used to resume downloading from
alt_urls_sorted = self._get_alt_urls(path_name)
alt_urls = [alt_url for alt_url in alt_urls_sorted if alt_url != url]
if not alt_urls or alt_try >= len(alt_urls):
raise BDownloaderException(msg)
else:
url = alt_urls[alt_try]
alt_try += 1
ctx_range['alt_urls'][url] = {'refcnt': 1}
ctx_range['url'].append(url)
break
else:
break
if tr < max_retries:
self._logger.error("Retrying %d/%d: '%s' at '%s'",
tr + 1, max_retries, os.path.basename(path_name), url)
time.sleep(0.1)
else:
msg = "Task error while downloading '{}'(range: {}-{})".format(os.path.basename(path_name),
ctx_range['start'], ctx_range['end'])
raise BDownloaderException(msg)
except EnvironmentError as e:
self._logger.error("Error while operating on '%s': 'Error number %d: %s'", path_name_inprocess, e.errno, e.strerror)
raise
[docs] def _get_remote_file_singlewhole(self, path_name, req_range):
"""The worker thread body for downloading the whole of a file, as opposed to :meth:`_get_remote_file_multipart`.
Args:
path_name (str): The full path name of the file to be downloaded.
req_range (str): The whole chunk of the file `path_name` as a mock range request of the form ``'bytes=0-None'``.
Returns:
None.
Raises:
:class:`BDownloaderException`: Raised when connect timeouts, read timeouts, failed connections or bad status
codes occurred and the retries is exhausted.
EnvironmentError: Raised when file operations failed.
"""
ctx_file = self._dl_ctx['files'][path_name]
ctx_range = ctx_file['ranges'][req_range]
url = ctx_range['url'][0]
# Get alternative URLs from sorted sources used to resume downloading from
alt_urls = [alt_url for alt_url in self._get_alt_urls(path_name) if alt_url != url]
alt_try = 0 # number of tries at alternative URLs
max_retries = self.resumption_retries
range_req_satisfiable = True # The serve may choose to ignore the `Range` header
range_end = file_end = ctx_file['length'] if ctx_file['length'] else ''
ctx_range['download_state'] = self.INPROCESS
if ctx_file['download_state'] == self.PENDING:
ctx_file['download_state'] = self.INPROCESS
path_name_inprocess = path_name + self.INPROCESS_EXT
try:
with open(path_name_inprocess, mode='r+b') as fd:
for tr in range(max_retries+1):
if self._is_download_resumable(path_name) and range_req_satisfiable and ctx_range['offset']:
# request start position and end position(which here we don't care about), maybe resuming from a previous failed request
range_start = ctx_range['start'] + ctx_range['offset']
req_range_new = "bytes={}-{}".format(range_start, '')
headers = {"Range": req_range_new}
status_code = requests.codes.partial
else:
range_start = ctx_range['start']
headers = {}
status_code = requests.codes.ok
fd.seek(range_start)
try:
r = self.requester.get(url, headers=headers, allow_redirects=True, stream=True)
if r.status_code == status_code:
try:
for chunk in r.iter_content(chunk_size=self._STREAM_CHUNK_SIZE):
fd.write(chunk)
ctx_range['offset'] += len(chunk)
if headers:
range_start = ctx_range['start'] + ctx_range['offset']
self.raise_on_interrupted()
break
except requests.RequestException as e:
self._logger.error("Error while downloading '%s'(range:%s-%s/%s-%s): '%r'",
os.path.basename(path_name), range_start, range_end,
ctx_range['start'], file_end, e)
else:
msg = "Unexpected status code {}, which should have been {}. " \
"This may be caused by ignored range request.".format(r.status_code, status_code)
self._logger.error(msg)
# In case the server responds with a '200' status code against a range request
if (not alt_urls or alt_try >= len(alt_urls)) and r.status_code == requests.codes.ok:
range_req_satisfiable = False
else:
raise requests.RequestException(msg)
except requests.RequestException as e:
msg = "Error while downloading '{}'(range:{}-{}/{}-{}): '{!r}'".format(
os.path.basename(path_name), range_start, range_end, ctx_range['start'], file_end, e)
self._logger.error(msg)
if not alt_urls or alt_try >= len(alt_urls):
raise BDownloaderException(msg)
else:
url = alt_urls[alt_try]
alt_try += 1
if tr < max_retries:
self._logger.error("Retrying %d/%d: '%s' at '%s'",
tr + 1, max_retries, os.path.basename(path_name), url)
time.sleep(0.1)
else:
msg = "Task error while downloading '{}'(range: {}-{})".format(os.path.basename(path_name),
ctx_range['start'], "")
raise BDownloaderException(msg)
except EnvironmentError as e:
self._logger.error("Error while operating on '%s': 'Error number %d: %s'", path_name_inprocess, e.errno, e.strerror)
raise
[docs] def _pick_file_url(self, path_name):
"""Select one URL from the multiple sources of the file to download from.
Args:
path_name (str): The full path name of the file to be downloaded.
Yields:
list: A list of URL(s) to download the file from using a strategy of ``Round Robin``.
"""
STRIPE_WIDTH = 3
ctx_file_urls = self._dl_ctx['files'][path_name]['urls']
if self._is_download_resumable(path_name):
urls = [url for url, ctx_url in ctx_file_urls.items() if ctx_url['accept_ranges'] == 'bytes']
else:
urls = list(ctx_file_urls.keys())
# Round Robin scheduling
while True:
for url in urls:
ctx_url = ctx_file_urls[url]
ctx_url['refcnt'] += 1
yield [url]
while ctx_url['refcnt'] % STRIPE_WIDTH:
ctx_url['refcnt'] += 1
yield [url]
[docs] def _get_alt_urls(self, path_name):
"""Get alternative URLs from the multiple sources of the file to resume downloading from.
Args:
path_name (str): The full path name of the file to be downloaded.
Returns:
list: The alternative source URLs sorted by descending succeeded downloads, then by ascending interrupted
and references.
"""
ctx_file_urls = self._dl_ctx['files'][path_name]['urls']
if self._is_download_resumable(path_name):
url_ctxs = [(url, ctx_url['succeeded'], ctx_url['interrupted'], ctx_url['refcnt'])
for url, ctx_url in ctx_file_urls.items() if ctx_url['accept_ranges'] == 'bytes']
else:
url_ctxs = [(url, ctx_url['succeeded'], ctx_url['interrupted'], ctx_url['refcnt'])
for url, ctx_url in ctx_file_urls.items()]
url_ctxs_sorted = sorted(url_ctxs, key=itemgetter(1, 2, 3))
return [url_ctx[0] for url_ctx in url_ctxs_sorted]
[docs] @staticmethod
def _get_fname_from_hdr(content_disposition):
""""Get the file name from the HTTP response header.
Args:
content_disposition (str): Content of the ``Content-Disposition`` field of the response header.
Returns:
str: The extracted file name.
References:
https://stackoverflow.com/questions/37060344
"""
fname = re.findall(r"filename\*=([^;]+)", content_disposition, flags=re.IGNORECASE)
if fname:
if "utf-8''" in fname[0].lower():
fname = re.sub("utf-8''", '', fname[0], flags=re.IGNORECASE)
fname = unquote_unicode(fname)
else:
fname = fname[0]
else:
fname = re.findall("filename=([^;]+)", content_disposition, flags=re.IGNORECASE)
if fname:
fname = fname[0]
fname = fname.strip().strip('"') if fname else ''
return fname
[docs] @staticmethod
def _get_fname_from_url(url):
"""Generate a file name from the download URL.
Args:
url (str): A URL referencing the intended file.
Returns:
str: The automatically generated file name.
"""
parsed = urlparse(url)
unquoted_path = unquote_unicode(parsed.path)
fname = os.path.basename(unquoted_path)
if not fname:
fn_path = '_'.join(unquoted_path.replace('/', ' ').split())
fn_netloc = parsed.netloc.replace(':', '_')
fname = fn_netloc + '-' + fn_path
# limit the length of the filename to 250
return fname[-250:].strip()
[docs] @staticmethod
def _topmost_missing_dir(path):
"""Find the topmost non-existent directory for a given path.
Args:
path (str): A path to the directory to save the downloaded file in.
Returns:
str: The uppermost directory that is missing from the `path`.
"""
if not path or os.path.exists(path):
return None
path = os.path.abspath(path)
drive, _ = os.path.splitdrive(path)
drive_len = len(drive)
last_missing = path
while True:
idx = path.rfind(os.sep)
if idx <= drive_len:
break
parent_dir = path[:idx]
if not os.path.exists(parent_dir):
last_missing = parent_dir
path = parent_dir
else:
break
return last_missing
[docs] def _rename_existing_file(self, full_pathname):
"""Rename the file or directory with the given pathname if present.
Args:
full_pathname (str): The full path name of the file to check for duplicate.
"""
if not os.path.exists(full_pathname):
return
file_path, file_name = os.path.split(full_pathname)
n = 2
while True:
new_name = "{}.({})".format(file_name, n)
new_pathname = os.path.join(file_path, new_name)
if not os.path.exists(new_pathname): # neither regular file nor directory
os.rename(full_pathname, new_pathname)
self._logger.warning("The existed file '%s' has been renamed '%s'", full_pathname, new_pathname)
break
else:
n += 1
continue
[docs] def _load_resumption_ctx(self, the_file, ctx_file):
"""Load from the resumption parts file to restore the download context.
Args:
the_file (str): The full path name of the file to download.
ctx_file (dict): The download context of the file `the_file`.
Returns:
(bool, dict): A 2-tuple ``(is_resuming, resumption_ctx)``, where ``is_resuming`` indicates whether the
download is resuming from last interruption, and if this is the case (``True``), ``resumption_ctx`` holds
the successfully loaded resumption context.
"""
is_resuming, resumption_ctx = False, None
if self.continuation:
file_inprocess = the_file + self.INPROCESS_EXT
file_resumption = the_file + self.RESUM_PARTS_EXT
if os.path.isfile(file_inprocess) and os.path.isfile(file_resumption):
try:
with open(file_resumption, "rb") as f:
resumption_ctx = pickle.load(f)
is_resuming = True
# verify that the loaded and newly obtained sizes match
last_len, cur_len = resumption_ctx['length'], ctx_file['length']
if last_len and cur_len and last_len != cur_len: # This should not usually happen
is_resuming = False
self._logger.warning("The resumption of the download from interruption couldn't be accomplished"
" due to the inconsistent file lengths, yet download will continue right "
"from the start of the file: (new: '%r' -- old: '%r')",
ctx_file['orig_path_url'], resumption_ctx['path_url'])
self._rename_existing_file(file_inprocess)
self._rename_existing_file(file_resumption)
except Exception as e:
self._logger.error("Error while loading the resumption parts info from '%s': '%s'", file_resumption, e)
return is_resuming, resumption_ctx
[docs] def _build_ctx_internal(self, path_name, url):
"""The helper method that actually does the build of the downloading context of the file.
Args:
path_name (str): The full path name of the file to download.
url (str): The URL referencing the target file.
Returns:
tuple: A 3-tuple ``'(downloadable, (path, url), (orig_path, orig_url))'``, where the ``downloadable``
indicates whether the desired file is downloadable, unavailable or existing by ``True``, ``False`` or
``None`` respectively, ``(path, url)`` denotes the converted full pathname and the URL that consists
only of active URLs, and ``(orig_path, orig_url)`` denotes the originally input pathname and URL.
Raises:
:class:`BDownloaderException`: Raised when the termination or cancellation flag has been set.
"""
path_url, orig_path_url = (path_name, url), (path_name, url) # original `(path, url)`
if not path_name:
path_name = '.'
if url is None:
url = ''
# Check whether `path_name` refers to a file (perhaps prefixed with a path) or a directory.
# If it is a directory, then a file name should be determined through the `url`.
path_head, path_tail = os.path.split(path_name)
if not path_tail or os.path.isdir(path_name):
file_name = ''
file_path = path_name
else:
file_name = path_tail
file_path = path_head
orig_urls = url.split('\t') # maybe TAB-separated URLs
ctx_file = {'length': 0, 'progress': 0, 'last_progress': 0, 'downloaded': 0, 'resumable': False,
'resuming_from_intr': False, 'download_state': self.PENDING, 'cancelled_on_exception': False,
'futures': [], 'tsk_num': 0, 'orig_path_url': orig_path_url, 'urls': {}, 'ranges': {}}
active_urls = []
downloadable = False # Must have at least one active URL to download the file
for mirror_url in orig_urls:
try:
r = self.requester.get(mirror_url, allow_redirects=True, stream=True)
if r.status_code == requests.codes.ok:
file_len = int(r.headers.get('Content-Length', 0))
if file_len:
if not ctx_file['length']:
ctx_file['length'] = file_len
else:
if file_len != ctx_file['length']:
self._logger.error("Error: the size of the file '%s' obtained from '%s' happened to "
"mismatch with that from others, download will continue but the "
"downloaded file may not be the intended one", file_name, mirror_url)
r.close()
continue
ctx_url = ctx_file['urls'][mirror_url] = {'accept_ranges': "none", 'refcnt': 0, 'interrupted': 0,
'succeeded': 0}
accept_ranges = r.headers.get('Accept-Ranges')
if "bytes" == accept_ranges:
ctx_url['accept_ranges'] = accept_ranges
ctx_file['resumable'] = True
if not file_name:
content_disposition = r.headers.get('Content-Disposition')
if content_disposition:
file_name = self._get_fname_from_hdr(content_disposition)
downloadable = True
active_urls.append(mirror_url)
else:
self._logger.warning("Unexpected status code %d: trying to determine the size of the file '%s' "
"using '%s'", r.status_code, file_name, mirror_url)
r.close()
except requests.RequestException as e:
self._logger.error("Error while trying to determine the size of the file '%s' using '%s': '%r'",
file_name, mirror_url, e)
if downloadable:
if not file_name:
file_name = self._get_fname_from_url(active_urls[0])
file_path_name = os.path.abspath(os.path.join(file_path, file_name))
path_url = (file_path_name, '\t'.join(active_urls))
# save the full pathname and active URLs. cf. 'orig_path_url'
ctx_file['path_url'] = path_url
# check for conflicting `file_path_name` in downloading jobs
if file_path_name in self._dl_ctx['files']:
dup_orig_path_url = self._dl_ctx['files'][file_path_name]['orig_path_url']
self._logger.error("Full path name conflicting error: '%s'. Intended: '%r';already in downloading: '%r'",
file_path_name, orig_path_url, dup_orig_path_url)
return False, path_url, orig_path_url
is_resuming, resumption_ctx = self._load_resumption_ctx(file_path_name, ctx_file)
ctx_file['resuming_from_intr'] = is_resuming
ctx_file['progress'] = \
ctx_file['downloaded'] = ctx_file['last_progress'] = resumption_ctx['progress'] if is_resuming else 0
# check whether the desired file already exists or not
if self.continuation and not is_resuming and os.path.isfile(file_path_name):
file_len = os.stat(file_path_name).st_size
if ctx_file['length'] and file_len == ctx_file['length']:
self._logger.warning("The desired file '%s' already exists out there, so that there is no need to "
"re-download it: '%r'", file_path_name, ctx_file['orig_path_url'])
return None, path_url, orig_path_url
else:
self._logger.warning("A file with the desired name '%s' has been detected, but its size cannot be "
"validated, so the download will start anew, and the existing file will be "
"renamed on completion: '%r'", file_path_name, ctx_file['orig_path_url'])
if not is_resuming:
# Prepare the necessary directory structure and file template
file_inprocess = file_path_name + self.INPROCESS_EXT
top_missing_dir = self._topmost_missing_dir(file_path)
try:
if top_missing_dir:
mkpath(file_path)
with open(file_inprocess, mode='w') as _:
pass
except (EnvironmentError, DistutilsFileError) as e:
self._logger.error("Error while operating on '%s': '%s'; Try downloading: '%r'", file_inprocess, e,
orig_path_url)
if top_missing_dir:
try:
remove_tree(top_missing_dir)
except Exception:
pass
return False, path_url, orig_path_url
# Add the file to the list ready to download
self._dl_ctx['files'][file_path_name] = ctx_file
self._dl_ctx['total_size'] += ctx_file['length']
if not ctx_file['length']:
self._dl_ctx['accurate'] = False
iter_url = self._pick_file_url(file_path_name)
if not is_resuming:
# calculate request ranges
if self._is_parallel_downloadable(file_path_name) and self.max_workers > 1:
ranges = self.calc_req_ranges(ctx_file['length'], self.min_split_size, 0)
else:
ranges = [(0, None)]
ctx_file['tsk_num'] = len(ranges) # How many tasks to complete the download job of the file
for start, end in ranges:
req_range = "bytes={}-{}".format(start, end)
ctx_range = ctx_file['ranges'][req_range] = {}
ctx_range.update({
'start': start,
'end': end,
'offset': 0,
'start_time': 0,
'rt_dl_speed': 0,
'download_state': self.PENDING,
'url': next(iter_url),
'alt_urls': {}})
else:
ctx_file['ranges'] = resumption_ctx['failed_ranges']
ctx_file['tsk_num'] = len(ctx_file['ranges'])
for ctx_range in ctx_file['ranges'].values():
ctx_range['url'] = next(iter_url)
ctx_range['alt_urls'] = {}
# make the file visible to the world
self._dl_ctx['alt_files'].append((file_path_name, ctx_file))
self._dl_ctx['file_cnt'] += 1
return downloadable, path_url, orig_path_url
[docs] def _build_ctx(self, path_urls):
"""Build the context for downloading the file(s).
Args:
path_urls (list of tuple): Paths and URLs for the file(s) to download, see :meth:`downloads` for details.
Returns:
A 6-tuple of lists ``'(active, active_orig, failed, failed_orig, existing, existing_orig)'``, where the
:obj:`list`\ s ``active`` and ``active_orig`` contain the active ``(path, url)``'s, converted and original
respectively; ``failed`` and ``failed_orig`` contain the same ``(path, url)``'s that are not downloadable;
``existing`` and ``existing_orig`` contain the downloads whose desired files already exist out there.
Raises:
:class:`BDownloaderException`: Raised when the termination or cancellation flag has been set.
"""
active, active_orig = [], []
failed, failed_orig = [], []
existing, existing_orig = [], []
for path_name, url in path_urls:
downloadable, path_url, orig_path_url = self._build_ctx_internal(path_name, url)
if downloadable:
active.append(path_url)
active_orig.append(orig_path_url)
elif downloadable is None:
existing.append(path_url)
existing_orig.append(orig_path_url)
else:
failed.append(path_url)
failed_orig.append(orig_path_url)
return active, active_orig, failed, failed_orig, existing, existing_orig
[docs] def _submit_dl_tasks(self, path_urls):
"""Submit the download tasks of the files to the thread pool.
Args:
path_urls (list of tuple): The meaning and format of the `path_urls` is similar to the parameter for
:meth:`downloads`.
Returns:
None.
"""
for path_name, _ in path_urls:
ctx_file = self._dl_ctx['files'][path_name]
if len(ctx_file['ranges']) > 1:
tsk = self._get_remote_file_multipart
else:
tsk = self._get_remote_file_singlewhole
for req_range, ctx_range in ctx_file['ranges'].items():
future = self.executor.submit(tsk, path_name, req_range)
ctx_file['futures'].append(future)
ctx_range['future'] = future
ctx_range['start_time'] = time.time()
self._dl_ctx['futures'][future] = {
"file": path_name,
"range": req_range
}
[docs] def _is_all_done(self):
"""Check if all the tasks have completed.
Returns:
bool: ``True`` if all the ``Future``\ s have been done, meaning that all the files have finished downloading,
whether successfully or not; ``False`` otherwise.
"""
return self.all_submitted and self._dl_ctx['file_cnt'] == (
len(self.succeeded_downloads_in_running) + len(self.failed_downloads_in_running))
[docs] def _backup_resumption_ctx(self, the_file, ctx_file):
"""Back up the necessary context of the unsuccessful download for resuming later.
Args:
the_file (str): The full path name of the file being downloaded.
ctx_file (dict): The download context of the file `the_file`.
Returns:
dict: The resumption context for the file `the_file`.
"""
ctx = {
'file_name': os.path.basename(the_file),
'path_url': ctx_file['orig_path_url'],
'length': ctx_file['length'],
'progress': ctx_file['progress'], # successfully downloaded bytes in this run
'failed_ranges': {} # ranges with a state in `_RANGE_STATES` except `SUCCEEDED`
}
for req_range, ctx_range in ctx_file['ranges'].items():
if ctx_range['download_state'] != self.SUCCEEDED:
crange = ctx['failed_ranges'][req_range] = {}
crange.update({
'start': ctx_range['start'],
'end': ctx_range['end'],
'offset': ctx_range['offset'],
'start_time': 0,
'rt_dl_speed': 0,
'download_state': self.PENDING
})
return ctx
[docs] def _on_succeeded(self, the_file, ctx_file):
"""When transitioning to the `SUCCEEDED` state, convert from in-process to finished file and do the cleanup."""
file_inprocess = the_file + self.INPROCESS_EXT
file_resumption = the_file + self.RESUM_PARTS_EXT
try:
self._rename_existing_file(the_file) # rename the existing conflicting file if present
os.rename(file_inprocess, the_file)
# delete the download progress file (i.e. `*.bdl.par` for resumption parts info) if present
if os.path.isfile(file_resumption):
os.remove(file_resumption)
self._logger.info("The download of the file '%s' has succeeded: '%r'", the_file, ctx_file['orig_path_url'])
except EnvironmentError as e:
self._logger.error("Error while operating on '%s': 'Error number %d: %s'", e.filename, e.errno, e.strerror)
[docs] def _on_failed(self, the_file, ctx_file):
"""When transitioning to the `FAILED` state, save the resumption ctx or remove the intermediate files."""
file_inprocess = the_file + self.INPROCESS_EXT
file_resumption = the_file + self.RESUM_PARTS_EXT
try:
if self.continuation and ctx_file['resumable']:
with open(file_resumption, "wb") as fd:
pickle.dump(self._backup_resumption_ctx(the_file, ctx_file), fd, PICKLE_PROTOCOL_NUMBER)
self._logger.warning("The download of the file '%s' has failed, but can be resumed by re-running it: "
"'%r'", the_file, ctx_file['orig_path_url'])
else:
os.remove(file_inprocess)
# delete the download progress file (i.e. `*.bdl.par` for resumption parts info) if present
if os.path.isfile(file_resumption):
os.remove(file_resumption)
self._logger.warning("The download of the file '%s' has failed, and can't be resumed either because "
"the resumption feature is intentionally disabled or because the URLs don't "
"support this; Accordingly, the broken file(s) has been removed: '%r'", the_file,
ctx_file['orig_path_url'])
except EnvironmentError as e:
self._logger.error("Error while operating on '%s': 'Error number %d: %s'", e.filename, e.errno, e.strerror)
except pickle.PicklingError as e:
self._logger.error("Error while dumping the resumption context into '%s': '%r'", file_resumption, e)
[docs] def _on_cancelled(self, the_file, ctx_file):
"""When transitioning to the `CANCELLED` state, remove the empty, obsolete files."""
file_inprocess = the_file + self.INPROCESS_EXT
file_resumption = the_file + self.RESUM_PARTS_EXT
try:
if not ctx_file['resuming_from_intr']:
os.remove(file_inprocess)
# delete the download progress file (i.e. `*.bdl.par` for resumption parts info) if present
if os.path.isfile(file_resumption):
os.remove(file_resumption)
self._logger.info("The download of the file '%s' has been cancelled on demand, and the broken file(s) "
"have been removed accordingly: '%r'", the_file, ctx_file['orig_path_url'])
else:
self._logger.warning("The download of the file '%s' has been cancelled on demand, but can be resumed "
"by re-running it: '%r'", the_file, ctx_file['orig_path_url'])
except EnvironmentError as e:
self._logger.error("Error while operating on '%s': 'Error number %d: %s'", e.filename, e.errno, e.strerror)
[docs] def _cancel_all_on_interrupted(self):
"""Cancel all the downloading tasks when receiving the ``SIGINT`` signal or the QUIT command."""
if self.all_submitted and not self.cancelled_on_interrupt:
for f in self._dl_ctx['futures']:
f.cancel()
self.cancelled_on_interrupt = True
self._logger.warning("The user terminated the downloads %s!",
"by pressing the interrupt key" if self.sigint else "by typing the QUIT command")
[docs] def _finalize_on_interrupted_py2(self):
"""When interrupted under Python2.x, perform state transitions manually and act accordingly."""
for the_file, ctx_file in self._dl_ctx['files'].items():
if ctx_file['download_state'] == self.INPROCESS:
self._on_failed(the_file, ctx_file)
elif ctx_file['download_state'] == self.PENDING:
self._on_cancelled(the_file, ctx_file)
[docs] def _state_mgmnt(self):
"""Perform the state-related operations of file downloading.
This method updates the download status of the files and their related chunks when the associated worker threads
completed, either because of finished without error, raised on exception or cancelled intentionally.
Returns:
None.
"""
# Cancel the downloading tasks repeatedly on the downloading queue when interrupted
if self.sigint or self.cmdquit:
self._cancel_all_on_interrupted()
for fi in range(self._dl_ctx['file_cnt']):
_, ctx_file = self._dl_ctx['alt_files'][fi]
if ctx_file['download_state'] not in self._COMPLETED:
ranges_all_done = True
future_aborted = False
fs = ctx_file['futures']
fs_num = len(fs)
tsk_num = ctx_file['tsk_num']
if fs_num == tsk_num: # Make sure that all the tasks of the file have been submitted
for ctx_range in ctx_file['ranges'].values():
future = ctx_range['future']
if future.done():
if ctx_range['download_state'] not in self._COMPLETED:
ctx_file['downloaded'] += ctx_range['offset']
try:
exception = future.exception()
if exception is None:
ctx_range['download_state'] = self.SUCCEEDED
ctx_file['progress'] += ctx_range['offset']
# Accumulate the download statistics of the source URLs
for url, ctx_url_range in ctx_range['alt_urls'].items():
ctx_url_file = ctx_file['urls'][url]
ctx_url_file['refcnt'] += ctx_url_range.get('refcnt', 0)
ctx_url_file['interrupted'] += ctx_url_range.get('interrupted', 0)
# use MINUS for ease of multi-level sorting
ctx_file['urls'][ctx_range['url'][-1]]['succeeded'] -= 1
else: # exception raised
ctx_range['download_state'] = self.FAILED
future_aborted = True
if not (self.cancelled_on_interrupt or ctx_file['cancelled_on_exception']):
# Cancel the download of the failed file
for f in fs:
f.cancel()
ctx_file['cancelled_on_exception'] = True
except CancelledError:
ctx_range['download_state'] = self.CANCELLED
future_aborted = True
else:
ranges_all_done = False
else:
ranges_all_done = False
if ranges_all_done:
the_file = ctx_file['path_url'][0]
if not future_aborted:
ctx_file['download_state'] = self.SUCCEEDED
self.succeeded_downloads_in_running.append(ctx_file['orig_path_url'])
self._on_succeeded(the_file, ctx_file)
else:
if ctx_file['download_state'] != self.PENDING:
ctx_file['download_state'] = self.FAILED
self._on_failed(the_file, ctx_file)
else:
ctx_file['download_state'] = self.CANCELLED
self._on_cancelled(the_file, ctx_file)
self.failed_downloads_in_running.append(ctx_file['orig_path_url'])
[docs] def _mgmnt_task(self):
"""The management thread body.
This thread manages the downloading process of the whole job queue, currently including state management only.
When all the tasks have been done, it signals the waiting thread and exits immediately.
Returns:
None.
"""
while not self.all_done:
self._state_mgmnt()
if self._is_all_done():
self.all_done = True
self.all_done_event.set()
continue
time.sleep(0.1)
[docs] def _calc_completed(self):
"""Calculate the already downloaded bytes of the files.
Returns:
int: The size in bytes of the downloaded pieces.
"""
completed = 0
for fi in range(self._dl_ctx['file_cnt']):
_, ctx_file = self._dl_ctx['alt_files'][fi]
if ctx_file['download_state'] not in self._COMPLETED:
completed += ctx_file['last_progress']
ctx_ranges = ctx_file.get('ranges')
if ctx_ranges:
for ctx_range in ctx_ranges.values():
completed += ctx_range.get('offset', 0)
else:
completed += ctx_file['downloaded']
return completed
[docs] def _progress_task(self):
"""The thread body for showing the progress of the downloading tasks.
Returns:
None.
"""
dl_acc_changed = False # Added unknown-sized downloads?
acc_label = 'Dl/Expect:'
inacc_label = 'Dl/Expect(approx.):'
progress_bar = MillProgress(label=acc_label, every=1024) if self.progress == self.PROGRESS_BS_MILL else clint_progress.Bar()
while not self.stop:
if not self._dl_ctx['accurate'] and not dl_acc_changed:
if self.progress != self.PROGRESS_BS_MILL:
progress_bar = MillProgress(every=1024)
self._logger.info("The progress bar has been changed to a mill due to unknown-sized download(s)")
progress_bar.label = inacc_label
dl_acc_changed = True
progress_bar.show(self._calc_completed(), count=self._dl_ctx['total_size'])
time.sleep(0.1)
else:
progress_bar.last_progress = self._dl_ctx['total_size'] \
if self._dl_ctx['accurate'] and not (self.failed_downloads_in_running or self.sigint or self.cmdquit) \
else self._calc_completed()
progress_bar.expected_size = self._dl_ctx['total_size']
progress_bar.done()
[docs] def downloads(self, path_urls):
"""Submit multiple downloading jobs at a time to the downloading queue.
Args:
path_urls (:obj:`list` of :obj:`tuple`\ s): `path_urls` accepts a list of tuples of the form ``(path, url)``,
where ``path`` should be a pathname, optionally prefixed with absolute or relative paths, and ``url`` should
be a URL string, which may consist of multiple TAB-separated URLs pointing to the same file.
A valid `path_urls`, for example, could be [('/opt/files/bar.tar.bz2', ``'https://foo.cc/bar.tar.bz2'``),
('./sanguoshuowen.pdf', ``'https://bar.cc/sanguoshuowen.pdf\\thttps://foo.cc/sanguoshuowen.pdf'``),
('/**to**/**be**/created/', ``'https://flash.jiefang.rmy/lc-cl/gaozhuang/chelsia/rockspeaker.tar.gz'``),
('/path/to/**existing**-dir', ``'https://ghosthat.bar/foo/puretonecone81.xz\\thttps://tpot.horn/foo/pure
tonecone81.xz\\thttps://hawkhill.bar/foo/puretonecone81.xz'``)].
Returns:
None.
Raises:
:class:`BDownloaderException`: Raised when the downloads were interrupted, e.g. by calling :meth:`cancel`
in a ``SIGINT`` signal handler, in the process of submitting the download requests.
Notes:
The method is not thread-safe, which means it should not be called at the same time in multiple threads
with one instance.
When multi-instanced (e.g. one instance per thread), the file paths specified in one instance should not
overlap those in another to avoid potential race conditions. File loss may occur, for example, if a failed
download task in one instance tries to delete a directory that is being accessed by some download tasks in
other instances.
However, this limitation doesn't apply to the file paths specified in a same instance.
"""
self._dl_ctx['orig_path_urls'].extend(path_urls)
for chunk_path_urls in self.list_split(path_urls, chunk_size=2):
active, active_orig, _, failed_orig, _, existing_orig = self._build_ctx(chunk_path_urls)
if active:
if self.progress != self.PROGRESS_BS_NONE and self.progress_thread is None:
self.progress_thread = threading.Thread(target=self._progress_task)
self.progress_thread.start()
if self.mgmnt_thread is None:
self.mgmnt_thread = threading.Thread(target=self._mgmnt_task)
self.mgmnt_thread.start()
self._submit_dl_tasks(active)
self.active_downloads_added.extend(active_orig)
if existing_orig:
self.succeeded_downloads_on_addition.extend(existing_orig)
if failed_orig:
self.failed_downloads_on_addition.extend(failed_orig)
[docs] def download(self, path_name, url):
"""Submit a single downloading job to the downloading queue.
This method is simply a wrapper of the method :meth:`downloads`.
Args:
path_name (str): The full path name of the file to be downloaded.
url (str): The URL referencing the target file.
Returns:
None.
Raises:
Same as in :meth:`downloads`.
Notes:
The limitation on the method and the `path_name` parameter herein is the same as in :meth:`downloads`.
"""
return self.downloads([(path_name, url)])
[docs] def _result(self):
""""Return both the succeeded and failed downloads when all done or interrupted by user.
Returns:
tuple of list: Same as that returned by :meth:`wait_for_all`.
"""
succeeded = self.succeeded_downloads_in_running + self.succeeded_downloads_on_addition
failed = [path_url for path_url in self._dl_ctx['orig_path_urls'] if path_url not in succeeded]
return succeeded, failed
[docs] def _wait_py3(self):
"""Wait for all the jobs done on Python 3.x and newer"""
while not self.all_done:
self.all_done_event.wait(self._INTERRUPTIBLE_WAIT_TIMEOUT)
[docs] def _wait_py2(self):
"""Wait for all the jobs done on Python 2.x"""
while not self.all_done:
if not self.sigint:
self.all_done_event.wait(self._INTERRUPTIBLE_WAIT_TIMEOUT)
else:
# https://github.com/agronholm/pythonfutures/issues/25
self.all_done_event.wait(self._PY2_SIGINT_WAIT_TIMEOUT)
break
[docs] def wait_for_all(self):
"""Wait for all the downloading jobs to complete.
Returns:
tuple of list: A 2-tuple of lists ``'(succeeded, failed)'``. The first list ``succeeded`` contains the
originally passed ``(path, url)``\ s that finished successfully, while the second list ``failed`` contains
the raised and cancelled ones.
"""
self.all_submitted = True
if self.active_downloads_added:
wait4all = self._wait_py3 if _py3plus else self._wait_py2
wait4all()
succeeded, failed = self._result()
if self.sigint and not _py3plus:
self._logger.warning('The download was interrupted by the user: '
'"succeeded in downloading: %r; failed to download: %r"', succeeded, failed)
return succeeded, failed
[docs] def results(self):
"""Get both the succeeded and failed downloads when all done or interrupted by user.
Returns:
tuple of list: Same as that returned by :meth:`wait_for_all`.
"""
if not self.all_submitted:
self._logger.warning('All the downloads may not have been submitted!')
return self.wait_for_all()
return self._result()
[docs] def result(self):
"""Return the final download status.
Returns:
int: 0 for success, and -1 failure.
"""
added = len(self._dl_ctx['orig_path_urls'])
succeeded = len(self.succeeded_downloads_on_addition) + len(self.succeeded_downloads_in_running)
return 0 if not (self.sigint or self.cmdquit) and added and added == succeeded else -1
[docs] def cancel(self, keyboard_interrupt=True):
"""Cancel all the download jobs.
Args:
keyboard_interrupt (bool): Specifies whether or not the user hit the interrupt key (e.g. Ctrl-C).
Returns:
None.
"""
if keyboard_interrupt:
self.sigint = True
else:
self.cmdquit = True
[docs] def close(self):
"""Shut down and perform the cleanup.
Returns:
None.
"""
if not self.all_submitted:
self._logger.warning('All the downloads may not have been submitted!')
self.wait_for_all()
# actual `close` starts here
self.stop = True
if self.sigint and not _py3plus:
timeout = self._PY2_SIGINT_JOIN_TIMEOUT
shutdown = _os_exit_force # non-gracefully shutdown on Python 2.x when interrupted
self._finalize_on_interrupted_py2()
# flush stdio buffers before forcibly shutting down
sys.stdout.flush()
sys.stderr.flush()
else:
timeout = None
shutdown = self.executor.shutdown
if self.progress_thread is not None:
self.progress_thread.join(timeout)
if self.mgmnt_thread is not None:
self.mgmnt_thread.join(timeout)
shutdown()
[docs]class BDownloaderException(Exception):
"""The exception indicating that an error occurred while executing the download tasks."""
pass