Source code for pyrsslocal.helper.download_helper

"""
various function to get the content from a url


:githublink:`%|py|6`
"""

from urllib.error import HTTPError, URLError
import urllib
import urllib.request
import socket
import http
import gzip

from pyquickhelper.loghelper import fLOG


[docs]def get_url_content_timeout(url, timeout=10, output=None, encoding="utf8"): """ Downloads a file from internet (it assumes it is text information, otherwise, encoding should be None). :param url: (str) url :param timeout: (in seconds), after this time, the function drops an returns None, -1 for forever :param output: (str) if None, the content is stored in that file :param encoding: (str) utf8 by default, but if it is None, the returned information is binary :return: content of the url If the function automatically detects that the downloaded data is in gzip format, it will decompress it. :githublink:`%|py|30` """ try: if timeout != -1: with urllib.request.urlopen(url, timeout=timeout) as ur: res = ur.read() else: with urllib.request.urlopen(url) as ur: res = ur.read() except (HTTPError, URLError) as error: # pragma: no cover fLOG("[get_url_content_timeout] unable to retrieve content from ", url, "exc:", str(error)) return None except socket.timeout as e: # pragma: no cover fLOG( "[get_url_content_timeout] unable to retrieve content from ", url, " because of timeout: ", timeout) return None except ConnectionResetError as e: # pragma: no cover fLOG( "[get_url_content_timeout] unable to retrieve content from ", url, " because of ConnectionResetError: ", e) return None except http.client.BadStatusLine as e: # pragma: no cover fLOG( "[get_url_content_timeout] unable to retrieve content from ", url, " because of http.client.BadStatusLine: ", e) return None except http.client.IncompleteRead as e: # pragma: no cover fLOG( "[get_url_content_timeout] unable to retrieve content from ", url, " because of http.client.IncompleteRead: ", e) return None except Exception as e: # pragma: no cover fLOG( "[get_url_content_timeout] unable to retrieve content from ", url, " because of unknown exception: ", e) raise e if len(res) >= 2 and res[:2] == b"\x1f\x8B": # gzip format res = gzip.decompress(res) if encoding is not None: try: content = res.decode(encoding) except UnicodeDecodeError as e: # pragma: no cover # we try different encoding laste = [e] othenc = ["iso-8859-1", "latin-1"] for encode in othenc: try: content = res.decode(encode) break except UnicodeDecodeError as e: laste.append(e) content = None if content is None: mes = ["unable to parse blog post: " + url] mes.append("tried:" + str([encoding] + othenc)) mes.append("beginning:\n" + str([res])[:50]) for e in laste: mes.append("Exception: " + str(e)) raise ValueError("\n".join(mes)) from e else: content = res if output is not None: if encoding is not None: with open(output, "w", encoding=encoding) as f: f.write(content) else: with open(output, "wb") as f: f.write(content) return content