Source code for pyquickhelper.filehelper.download_helper

"""
A function to download the content of a url.


:githublink:`%|py|5`
"""
import os
from datetime import datetime
import socket
import gzip
import warnings
import hashlib
import urllib.error as urllib_error
import urllib.request as urllib_request
import http.client as http_client
try:
    from http.client import InvalidURL
except ImportError:
    InvalidURL = ValueError


[docs]class InternetException(Exception): """ Exception for the function :func:`get_url_content_timeout <pyquickhelper.filehelper.download_helper.get_url_content_timeout>` :githublink:`%|py|24` """ pass
[docs]def get_url_content_timeout(url, timeout=10, output=None, encoding="utf8", raise_exception=True, chunk=None, fLOG=None): """ Downloads a file from internet (by default, it assumes it is text information, otherwise, encoding should be None). :param url: (str) url :param timeout: (int) in seconds, after this time, the function drops an returns None, -1 for forever :param output: (str) if None, the content is stored in that file :param encoding: (str) utf8 by default, but if it is None, the returned information is binary :param raise_exception: (bool) True to raise an exception, False to send a warnings :param chunk: (int|None) save data every chunk (only if output is not None) :param fLOG: logging function (only applies when chunk is not None) :return: content of the url If the function automatically detects that the downloaded data is in gzip format, it will decompress it. The function raises the exception :class:`InternetException <pyquickhelper.filehelper.download_helper.InternetException>`. :githublink:`%|py|47` """ def save_content(content, append=False): "local function" app = "a" if append else "w" if encoding is not None: with open(output, app, encoding=encoding) as f: f.write(content) else: with open(output, app + "b") as f: f.write(content) try: if chunk is not None: if output is None: raise ValueError( "output cannot be None if chunk is not None") app = [False] size = [0] def _local_loop(ur): while True: res = ur.read(chunk) size[0] += len(res) # pylint: disable=E1137 if fLOG is not None: fLOG("[get_url_content_timeout] downloaded", size, "bytes") if len(res) > 0: if encoding is not None: res = res.decode(encoding=encoding) save_content(res, app) else: break app[0] = True # pylint: disable=E1137 if timeout != -1: with urllib_request.urlopen(url, timeout=timeout) as ur: _local_loop(ur) else: with urllib_request.urlopen(url) as ur: _local_loop(ur) app = app[0] size = size[0] else: if timeout != -1: with urllib_request.urlopen(url, timeout=timeout) as ur: res = ur.read() else: with urllib_request.urlopen(url) as ur: res = ur.read() except (urllib_error.HTTPError, urllib_error.URLError, ConnectionRefusedError) as e: if raise_exception: raise InternetException( "Unable to retrieve content, url='{0}'".format(url)) from e warnings.warn( "Unable to retrieve content from '{0}' exc: {1}".format(url, e), ResourceWarning) return None except socket.timeout as e: if raise_exception: raise InternetException( "Unable to retrieve content, url='{0}'".format(url)) from e warnings.warn("unable to retrieve content from {0} because of timeout {1}: {2}".format( url, timeout, e), ResourceWarning) return None except ConnectionResetError as e: if raise_exception: raise InternetException( "Unable to retrieve content, url='{0}'".format(url)) from e warnings.warn( "unable to retrieve content from {0} because of ConnectionResetError: {1}".format(url, e), ResourceWarning) return None except http_client.BadStatusLine as e: if raise_exception: raise InternetException( "Unable to retrieve content, url='{0}'".format(url)) from e warnings.warn( "Unable to retrieve content from '{0}' because of http.client.BadStatusLine: {1}".format(url, e), ResourceWarning) return None except http_client.IncompleteRead as e: if raise_exception: raise InternetException( "Unable to retrieve content url='{0}'".format(url)) from e warnings.warn( "Unable to retrieve content from '{0}' because of http.client.IncompleteRead: {1}".format(url, e), ResourceWarning) return None except (ValueError, InvalidURL) as e: if raise_exception: raise InternetException( "Unable to retrieve content url='{0}'".format(url)) from e warnings.warn( "Unable to retrieve content from '{0}' because of {1}".format(url, e), ResourceWarning) return None except Exception as e: if raise_exception: raise InternetException( "Unable to retrieve content, url='{0}', exc={1}".format(url, e)) from e warnings.warn( "Unable to retrieve content from '{0}' because of unknown exception: {1}".format(url, e), ResourceWarning) raise e if chunk is None: if len(res) >= 2 and res[:2] == b"\x1f\x8B": # gzip format res = gzip.decompress(res) if encoding is not None: try: content = res.decode(encoding) except UnicodeDecodeError as e: # it tries different encoding laste = [e] othenc = ["iso-8859-1", "latin-1"] for encode in othenc: try: content = res.decode(encode) break except UnicodeDecodeError as e: laste.append(e) content = None if content is None: mes = ["Unable to parse text from '{0}'.".format(url)] mes.append("tried:" + str([encoding] + othenc)) mes.append("beginning:\n" + str([res])[:50]) for e in laste: mes.append("Exception: " + str(e)) raise ValueError("\n".join(mes)) else: content = res else: content = None if output is not None and chunk is None: save_content(content) return content
[docs]def _hash_url(url): m = hashlib.sha256() m.update(url.encode('utf-8')) return m.hexdigest()[:25]
[docs]def get_urls_content_timeout(urls, timeout=10, folder=None, encoding=None, raise_exception=True, chunk=None, fLOG=None): """ Downloads data from urls (by default, it assumes it is text information, otherwise, encoding should be None). :param urls: urls :param timeout: in seconds, after this time, the function drops an returns None, -1 for forever :param folder: if None, the content is stored in that file :param encoding: None by default, but if it is None, the returned information is binary :param raise_exception: True to raise an exception, False to send a warnings :param chunk: save data every chunk (only if output is not None) :param fLOG: logging function (only applies when chunk is not None) :return: list of downloaded content If the function automatically detects that the downloaded data is in gzip format, it will decompress it. The function raises the exception :class:`InternetException <pyquickhelper.filehelper.download_helper.InternetException>`. :githublink:`%|py|212` """ import pandas import pandas.errors if not isinstance(urls, list): raise TypeError("urls must be a list") if folder is None: raise ValueError("folder should not be None") summary = os.path.join(folder, "summary.csv") if os.path.exists(summary): try: df = pandas.read_csv(summary) except pandas.errors.EmptyDataError: df = None else: df = None if df is not None: all_obs = [dict(url=df.loc[i, 'url'], size=df.loc[i, 'size'], date=df.loc[i, 'date'], dest=df.loc[i, 'dest']) for i in range(df.shape[0])] done = set(d['dest'] for d in all_obs) else: all_obs = [] done = set() for i, url in enumerate(urls): dest = _hash_url(url) if dest in done: continue full_dest = os.path.join(folder, dest + '.bin') content = get_url_content_timeout(url, timeout=timeout, output=full_dest, encoding=encoding, chunk=chunk, raise_exception=raise_exception) if content is None: continue if fLOG is not None: fLOG("{}/{} downloaded {} bytes from '{}' to '{}'.".format( i + 1, len(urls), len(content), url, dest + '.bin')) obs = dict(url=url, size=len(content), date=datetime.now(), dest=dest) all_obs.append(obs) done.add(dest) new_df = pandas.DataFrame(all_obs) new_df.to_csv(summary, index=False) return all_obs
[docs]def local_url(url, folder=None, envvar='REPO_LOCAL_URLS'): """ Replaces the url by a local file in a folder or an environment variable if *folder* is None. :param url: url to replace :param folder: local folder :param envvar: environment variable :return: local file or url :githublink:`%|py|270` """ if folder is None: folder = os.environ.get(envvar, None) # pragma: no cover if folder is None: raise FileNotFoundError( "Unable to find local folder '{}' or environment variable '{}'.".format( folder, envvar)) loc = _hash_url(url) name = os.path.join(folder, loc + '.bin') if os.path.exists(name): return name return url