Coverage for pyquickhelper/filehelper/internet_helper.py: 63%
98 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
1"""
2@file
3@brief Gather functions about downloading from internet, ...
4"""
5import os
6import sys
7import shutil
8import urllib.request as urllib_request
9import urllib.error as urllib_error
10from ..loghelper.flog import noLOG, _get_file_url
11from .fexceptions import FileException
12from ..loghelper.flog import _first_more_recent
15class ReadUrlException(Exception):
16 """
17 Raised by @see fn read_url.
18 """
19 pass
22def download(url, path_download=".", outfile=None, fLOG=noLOG):
23 """
24 Downloads a small file.
25 If *url* is an url, it downloads the file and returns the downloaded filename.
26 If it has already been downloaded, it is not downloaded again
27 The function raises an exception if the url does not contain
28 ``http://`` or ``https://`` or ``ftp://``.
30 @param url url
31 @param path_download download the file here
32 @param outfile see below
33 @param fLOG logging function
34 @return the filename
36 If *outfile* is None, the function will give a relative name
37 based on the last part of the url.
38 If *outfile* is "", the function will remove every weird character.
39 If *outfile* is not null, the function will use it. It will be relative to
40 the current folder and not *path_download*.
41 """
42 lurl = url.lower()
43 if lurl.startswith("file://"):
44 if outfile is None:
45 last = os.path.split(url)[-1]
46 if last.startswith("__cached__"):
47 last = last[len("__cached__"):]
48 dest = os.path.join(path_download, last)
49 elif outfile == "":
50 dest = _get_file_url(url, path_download)
51 else:
52 dest = outfile
54 src = url[7:].lstrip(
55 "/") if sys.platform.startswith("win") else url[7:]
56 shutil.copy(src, dest)
57 return dest
58 elif "http://" in lurl or "https://" in lurl or "ftp://" in lurl:
59 if outfile is None:
60 dest = os.path.join(path_download, os.path.split(url)[-1])
61 elif outfile == "":
62 dest = _get_file_url(url, path_download)
63 else:
64 dest = outfile
66 down = False
67 nyet = dest + ".notyet"
69 if os.path.exists(dest) and not os.path.exists(nyet):
70 try:
71 f1 = urllib_request.urlopen(url)
72 down = _first_more_recent(f1, dest)
73 newdate = down
74 f1.close()
75 except urllib_error.HTTPError as e:
76 raise ReadUrlException(
77 f"Unable to fetch '{url}'") from e
78 except IOError as e:
79 raise ReadUrlException(
80 f"Unable to download '{url}'") from e
81 else:
82 down = True
83 newdate = False
85 if down:
86 if newdate:
87 fLOG("[download] downloading (updated) ", url)
88 else:
89 fLOG("[download] downloading ", url)
91 if (len(url) > 4 and
92 url[-4].lower() in [".txt", ".csv", ".tsv", ".log", '.tmpl']):
93 fLOG("creating text file ", dest)
94 format = "w"
95 else:
96 fLOG("creating binary file ", dest)
97 format = "wb"
99 if os.path.exists(nyet):
100 size = os.stat(dest).st_size
101 fLOG("[download] resume downloading (stop at", size, ") from ", url)
102 try:
103 request = urllib_request.Request(url)
104 request.add_header("Range", "bytes=%d-" % size)
105 fu = urllib_request.urlopen(request)
106 except urllib_error.HTTPError as e:
107 raise ReadUrlException(
108 f"Unable to fetch '{url}'") from e
109 f = open(dest, format.replace("w", "a") # pylint: disable=W1501
110 ) # pylint: disable=W1501
111 else:
112 fLOG("[download] downloading ", url)
113 try:
114 request = urllib_request.Request(url)
115 fu = urllib_request.urlopen(url)
116 except urllib_error.HTTPError as e:
117 raise ReadUrlException(
118 f"Unable to fetch '{url}'") from e
119 f = open(dest, format)
121 open(nyet, "w").close()
122 c = fu.read(2 ** 21)
123 size = 0
124 while len(c) > 0:
125 size += len(c)
126 fLOG("[download] size", size)
127 f.write(c)
128 f.flush()
129 c = fu.read(2 ** 21)
130 fLOG("end downloading")
131 f.close()
132 fu.close()
133 os.remove(nyet)
135 url = dest
136 return url
137 else:
138 raise FileException("This url does not seem to be one: " + url)
141def read_url(url, encoding=None):
142 """
143 Reads the content of a url.
145 @param url url
146 @param encoding if None, the result type is bytes, str otherwise
147 @return str (encoding is not None) or bytes
148 """
149 request = urllib_request.Request(url)
150 try:
151 with urllib_request.urlopen(request) as fu:
152 content = fu.read()
153 except Exception as e:
154 import urllib.parse as urlparse
155 res = urlparse.urlparse(url)
156 raise ReadUrlException(
157 f"unable to open url '{url}' scheme: {res}\nexc: {e}")
159 if encoding is None:
160 return content
161 else:
162 return content.decode(encoding=encoding)