Coverage for src/pyrsslocal/helper/download_helper.py: 67%
27 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-04-30 08:45 +0200
« prev ^ index » next coverage.py v7.1.0, created at 2024-04-30 08:45 +0200
1"""
2@file
4@brief various function to get the content from a url
5"""
7from urllib.error import HTTPError, URLError
8import urllib
9import urllib.request
10import socket
11import http
12import gzip
14from pyquickhelper.loghelper import fLOG
17def get_url_content_timeout(url, timeout=10, output=None, encoding="utf8"):
18 """
19 Downloads a file from internet
20 (it assumes it is text information, otherwise, encoding should be None).
22 @param url (str) url
23 @param timeout (in seconds), after this time, the function drops an returns None, -1 for forever
24 @param output (str) if None, the content is stored in that file
25 @param encoding (str) utf8 by default, but if it is None, the returned information is binary
26 @return content of the url
28 If the function automatically detects that the downloaded data is in gzip
29 format, it will decompress it.
30 """
31 try:
32 if timeout != -1:
33 with urllib.request.urlopen(url, timeout=timeout) as ur:
34 res = ur.read()
35 else:
36 with urllib.request.urlopen(url) as ur:
37 res = ur.read()
38 except (HTTPError, URLError) as error: # pragma: no cover
39 fLOG("[get_url_content_timeout] unable to retrieve content from ",
40 url, "exc:", str(error))
41 return None
42 except socket.timeout: # pragma: no cover
43 fLOG(
44 "[get_url_content_timeout] unable to retrieve content from ",
45 url,
46 " because of timeout: ",
47 timeout)
48 return None
49 except ConnectionResetError as e: # pragma: no cover
50 fLOG(
51 "[get_url_content_timeout] unable to retrieve content from ",
52 url,
53 " because of ConnectionResetError: ",
54 e)
55 return None
56 except http.client.BadStatusLine as e: # pragma: no cover
57 fLOG(
58 "[get_url_content_timeout] unable to retrieve content from ",
59 url,
60 " because of http.client.BadStatusLine: ",
61 e)
62 return None
63 except http.client.IncompleteRead as e: # pragma: no cover
64 fLOG(
65 "[get_url_content_timeout] unable to retrieve content from ",
66 url,
67 " because of http.client.IncompleteRead: ",
68 e)
69 return None
70 except Exception as e: # pragma: no cover
71 fLOG(
72 "[get_url_content_timeout] unable to retrieve content from ",
73 url,
74 " because of unknown exception: ",
75 e)
76 raise e
78 if len(res) >= 2 and res[:2] == b"\x1f\x8B":
79 # gzip format
80 res = gzip.decompress(res)
82 if encoding is not None:
83 try:
84 content = res.decode(encoding)
85 except UnicodeDecodeError as e: # pragma: no cover
86 # we try different encoding
88 laste = [e]
89 othenc = ["iso-8859-1", "latin-1"]
91 for encode in othenc:
92 try:
93 content = res.decode(encode)
94 break
95 except UnicodeDecodeError as ee:
96 laste.append(ee)
97 content = None
99 if content is None:
100 mes = ["unable to parse blog post: " + url]
101 mes.append("tried:" + str([encoding] + othenc))
102 mes.append("beginning:\n" + str([res])[:50])
103 for e in laste:
104 mes.append("Exception: " + str(e))
105 raise ValueError("\n".join(mes)) from e
106 else:
107 content = res
109 if output is not None:
110 if encoding is not None:
111 with open(output, "w", encoding=encoding) as f:
112 f.write(content)
113 else:
114 with open(output, "wb") as f:
115 f.write(content)
117 return content