Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2@file
3@brief provides some functionalities to upload file to a website
4"""
5from ftplib import FTP, FTP_TLS, error_perm
6import os
7import sys
8import time
9import datetime
10from io import BytesIO
11from ..loghelper.flog import noLOG
14class CannotReturnToFolderException(Exception):
15 """
16 raised when a transfer is interrupted by an exception
17 and the class cannot return to the original folder
18 """
19 pass
22class CannotCompleteWithoutNewLoginException(Exception):
23 """
24 raised when a transfer is interrupted by a new login
25 """
26 pass
29class TransferFTP:
31 """
32 This class uploads files to a website,
33 if the remote does not exists, it creates it first.
35 .. exref::
36 :title: Transfer files to webste through FTP
38 Simple sketch to transfer a list of ``files`` to
39 a website through FTP
41 ::
43 ftp = TransferFTP('ftp.<website>', alias, password, fLOG=print)
45 issues = [ ]
46 done = [ ]
47 notdone = [ ]
48 for file in files :
50 try :
51 r = ftp.transfer (file, path)
52 if r : done.append( (file, path) )
53 else : notdone.append ( (file, path) )
54 except Exception as e :
55 issues.append( (file, e) )
57 try :
58 ftp.close()
59 except Exception as e :
60 print ("unable to close FTP connection using ftp.close")
62 The class may access to a server using :epkg:`SFTP`
63 protocol but it relies on :epkg:`pysftp` and :epkg:`paramiko`.
64 """
66 errorNoDirectory = "Can't change directory"
67 blockSize = 2 ** 20
69 def __init__(self, site, login, password, ftps='FTP', fLOG=noLOG):
70 """
71 @param site website
72 @param login login
73 @param password password
74 @param ftps if ``'TLS'``, use class :epkg:`*py:ftplib:FTP_TLS`,
75 if ``'FTP'``, use :epkg:`*py:ftplib:TLS`,
76 if ``'SFTP'``, use :epkg:`pysftp`
77 @param fLOG logging function
78 """
79 self._ftps_ = ftps
80 if site is not None:
81 if ftps == 'TLS':
82 cls = FTP_TLS
83 self.is_sftp = False
84 elif ftps == 'SFTP':
85 import pysftp
86 import paramiko
87 import socket
88 sock = socket.socket()
89 sock.connect((site, 22))
90 trans = paramiko.transport.Transport(sock)
91 trans.start_client()
92 k = trans.get_remote_server_key()
94 hk = paramiko.hostkeys.HostKeys()
95 hk.add(site, 'ssh-rsa', k)
96 cnopts = pysftp.CnOpts()
97 cnopts.hostkeys = hk
99 def cls(si, lo, pw, cnopts=cnopts): return pysftp.Connection(
100 si, username=lo, password=pw, cnopts=cnopts)
101 self._login_ = lambda si=site, lo=login, pw=password: cls(
102 si, lo, pw)
103 self.is_sftp = True
105 elif ftps == 'FTP':
106 cls = FTP
107 self.is_sftp = False
108 else:
109 raise RuntimeError( # pragma: no cover
110 "No implementation for '{}'.".format(ftps))
111 if not self.is_sftp:
112 self._ftp = cls(site, login, password)
113 self._logins = [(datetime.datetime.now(), site)]
114 else:
115 # mocking
116 if ftps != 'FTP':
117 raise NotImplementedError( # pragma: no cover
118 "Option ftps is not implemented for mocking.")
119 self._logins = []
120 self._ftp = FTP(site)
121 self.is_sftp = False
122 self.LOG = fLOG
123 self._atts = dict(site=site, login=login, password=password)
125 def _check_can_logged(self):
126 if self.is_sftp and not hasattr(self, '_ftp'):
127 self._ftp = self._login_()
129 @property
130 def Site(self):
131 """
132 return the website
133 """
134 return self._atts["site"]
136 def _private_login(self): # pragma: no cover
137 """
138 logs in
139 """
140 self.LOG("reconnecting to ", self.Site, " - ", len(self._logins))
141 try:
142 if self.is_sftp:
143 self._ftp = self._login_()
144 else:
145 self._ftp.login()
146 self._logins.append((datetime.datetime.now(), self.Site))
147 except Exception as e:
148 se = str(e)
149 if "You're already logged in" in se:
150 return
151 elif (not self.is_sftp and (
152 "An existing connection was forcibly closed by the remote host" in se or
153 "An established connection was aborted by the software in your host machine" in se)):
154 # it starts a new connection
155 self.LOG("reconnecting failed, starting a new connection",
156 self.Site, " - ", len(self._logins))
157 self._ftp = FTP(self.Site, self._atts[
158 "login"], self._atts["password"])
159 self._logins.append((datetime.datetime.now(), self.Site))
160 else:
161 raise e
163 def run_command(self, command, *args, **kwargs):
164 """
165 Runs a FTP command.
167 @param command command
168 @param args list of argument
169 @return output of the command or True for success, False for failure
170 """
171 try:
172 t = command(*args, **kwargs)
173 if (command == self._ftp.pwd or
174 command == getattr(self._ftp, 'dir', None) or
175 command == getattr(self._ftp, 'mlsd', 'listdir')):
176 return t
177 elif command != self._ftp.cwd:
178 pass
179 return True
180 except Exception as e: # pragma: no cover
181 if self.is_sftp and 'No such file' in str(e):
182 raise FileNotFoundError(
183 "Unable to find {}.".format(args)) from e
184 if TransferFTP.errorNoDirectory in str(e):
185 raise e
186 self.LOG(e)
187 self.LOG(" ** run exc ", str(command), str(args))
188 self._private_login()
189 if command == self._ftp.pwd or command is self._ftp.pwd:
190 t = command(self)
191 else:
192 t = command(self, *args, **kwargs)
193 self.LOG(" ** run ", str(command), str(args))
194 return t
196 def print_list(self): # pragma: no cover
197 """
198 Returns the list of files in the current directory
199 the function sends everything to the logging function.
201 @return output of the command or True for success, False for failure
202 """
203 self._check_can_logged()
204 if hasattr(self._ftp, 'retrlines'):
205 return self.run_command(self._ftp.retrlines, 'LIST')
206 raise NotImplementedError(
207 "Not implemented for ftps='{}'.".format(self._ftps_))
209 def mkd(self, path): # pragma: no cover
210 """
211 Creates a directory.
213 @param path path to the directory
214 @return True or False
215 """
216 self._check_can_logged()
217 self.LOG("[mkd]", path)
218 cmd = self._ftp.mkd if hasattr(self._ftp, 'mkd') else self._ftp.mkdir
219 return self.run_command(cmd, path)
221 def cwd(self, path, create=False):
222 """
223 Goes to a directory, if it does not exist, creates it
224 (if *create* is True).
226 @param path path to the directory
227 @param create True to create it
228 @return True or False
229 """
230 self._check_can_logged()
231 try:
232 self.run_command(self._ftp.cwd, path)
233 except EOFError as e: # pragma: no cover
234 raise EOFError("unable to go to: {0}".format(path)) from e
235 except FileNotFoundError as e: # pragma: no cover
236 if create:
237 self.mkd(path)
238 self.cwd(path, create)
239 else:
240 raise e
241 except Exception as e: # pragma: no cover
242 if create and TransferFTP.errorNoDirectory in str(e):
243 self.mkd(path)
244 self.cwd(path, create)
245 else:
246 raise e
248 def pwd(self):
249 """
250 Returns the pathname of the current directory on the server.
252 @return pathname
253 """
254 self._check_can_logged()
255 if hasattr(self._ftp, 'getcwd'):
256 r = self._ftp.getcwd()
257 return self._ftp.pwd
258 else:
259 return self.run_command(self._ftp.pwd)
261 def dir(self, path='.'):
262 """
263 Lists the content of a path.
265 @param path path
266 @return list of path
268 See :meth:`enumerate_ls <pyquickhelper.filehelper.ftp_transfer.TransferFTP.enumerate_ls>`
269 """
270 return list(self.enumerate_ls(path))
272 def ls(self, path='.'):
273 """
274 Lists the content of a path.
276 @param path path
277 @return list of path
279 see :meth:`enumerate_ls <pyquickhelper.filehelper.ftp_transfer.TransferFTP.enumerate_ls>`
281 .. exref::
282 :title: List files from FTP site
284 ::
286 from pyquickhelper.filehelper import TransferFTP
287 ftp = TransferFTP("ftp....", "login", "password")
288 res = ftp.ls("path")
289 for v in res:
290 print(v["name"])
291 ftp.close()
292 """
293 return list(self.enumerate_ls(path))
295 def enumerate_ls(self, path='.'):
296 """
297 Enumerates the content of a path.
299 @param path path
300 @return list of dictionaries
302 One dictionary::
304 {'name': 'www',
305 'type': 'dir',
306 'unique': 'aaaa',
307 'unix.uid': '1111',
308 'unix.mode': '111',
309 'sizd': '5',
310 'unix.gid': '000',
311 'modify': '111111'}
312 """
313 self._check_can_logged()
314 if not self.is_sftp:
315 for a in self.run_command(self._ftp.mlsd, path):
316 r = dict(name=a[0])
317 r.update(a[1])
318 yield r
319 else:
320 with self._ftp.cd(path):
321 for name in self._ftp.listdir():
322 yield dict(name=name)
324 def transfer(self, file, to, name, debug=False, blocksize=None, callback=None):
325 """
326 Transfers a file.
328 @param file file name or stream (binary, BytesIO)
329 @param to destination (a folder)
330 @param name name of the stream on the website
331 @param debug if True, displays more information
332 @param blocksize see :tpl:`py,m='ftplib',o='FTP.storbinary'`
333 @param callback see :tpl:`py,m='ftplib',o='FTP.storbinary'`
334 @return status
336 When an error happens, the original current directory is restored.
337 """
338 self._check_can_logged()
339 path = to.split("/")
340 path = [_ for _ in path if len(_) > 0]
341 nb_logins = len(self._logins)
342 cpwd = self.pwd()
344 done = []
345 exc = None
346 for i, p in enumerate(path):
347 p_ = ('/' + '/'.join(path[:i + 1])) if self.is_sftp else p
348 try:
349 self.cwd(p_, True)
350 except Exception as e: # pragma: no cover
351 exc = e
352 break
353 done.append(p)
355 if nb_logins != len(self._logins):
356 raise CannotCompleteWithoutNewLoginException( # pragma: no cover
357 "Cannot reach folder '{0}' without new login".format(to))
359 bs = blocksize if blocksize else TransferFTP.blockSize
360 if not self.is_sftp:
361 def runc(name, f, bs, callback):
362 return self.run_command(
363 self._ftp.storbinary, 'STOR ' + name, f, bs, callback)
364 else:
365 def runc(name, f, bs, callback):
366 return self.run_command(
367 self._ftp.putfo, remotepath=name, flo=f, file_size=bs,
368 callback=None)
369 if exc is None:
370 try:
371 if isinstance(file, str):
372 if not os.path.exists(file):
373 raise FileNotFoundError(file) # pragma: no cover
374 with open(file, "rb") as f:
375 r = runc(name, f, bs, callback)
376 elif isinstance(file, BytesIO):
377 r = runc(name, file, bs, callback)
378 elif isinstance(file, bytes):
379 st = BytesIO(file)
380 r = runc(name, st, bs, callback)
381 else:
382 r = runc(name, file, bs, callback)
383 except Exception as ee: # pragma: no cover
384 exc = ee
386 if nb_logins != len(self._logins): # pragma: no cover
387 try:
388 self.cwd(cpwd)
389 done = []
390 except Exception as e:
391 raise CannotCompleteWithoutNewLoginException(
392 "Cannot transfer '{0}' without new login".format(to))
394 # It may fail here, it hopes not.
395 nbtry = 0
396 nbth = len(done) * 2 + 1
397 while len(done) > 0:
398 if nb_logins != len(self._logins): # pragma: no cover
399 try:
400 self.cwd(cpwd)
401 break
402 except Exception as e:
403 raise CannotCompleteWithoutNewLoginException(
404 "Cannot return to original folder'{0}' without new login".format(to)) from e
406 nbtry += 1
407 try:
408 self.cwd("..")
409 done.pop()
410 except Exception as e: # pragma: no cover
411 time.sleep(0.5)
412 self.LOG(
413 " issue with command .. len(done) == {0}".format(len(done)))
414 if nbtry > nbth:
415 raise CannotReturnToFolderException(
416 "len(path)={0} nbtry={1} exc={2} nbl={3} act={4}".format(
417 len(done), nbtry, exc, nb_logins, len(self._logins))) from e
419 if exc is not None:
420 raise exc # pragma: no cover
421 return r
423 def retrieve(self, fold, name, file=None, debug=False):
424 """
425 Downloads a file.
427 @param file file name or stream (binary, BytesIO)
428 @param fold full remote path
429 @param name name of the stream on the website
430 @param debug if True, displays more information
431 @return status
432 """
433 self._check_can_logged()
435 if self.is_sftp:
436 self.cwd(fold, False)
437 else:
438 path = fold.split("/")
439 path = [_ for _ in path if len(_) > 0]
441 for p in path:
442 self.cwd(p, True)
444 raise_exc = None
446 if not self.is_sftp:
448 def _retrbinary_(name, callback, size, f):
449 r = self.run_command(self._ftp.retrbinary,
450 'RETR ' + name, callback, size)
451 if isinstance(r, (bytes, str)):
452 f.write(r)
453 return r
455 runc = _retrbinary_
456 else:
457 def runc(name, callback, size, f): return self.run_command(
458 self._ftp.getfo, remotepath=name, flo=f, callback=None)
460 if isinstance(file, str):
461 with open(file, "wb") as f:
462 def callback(block):
463 f.write(block)
464 try:
465 runc(name, callback, TransferFTP.blockSize, f)
466 r = True
467 except error_perm as e: # pragma: no cover
468 raise_exc = e
469 r = False
470 elif isinstance(file, BytesIO):
471 def callback(block):
472 file.write(block)
473 try:
474 r = runc(name, callback, TransferFTP.blockSize, file)
475 except error_perm as e: # pragma: no cover
476 raise_exc = e
477 r = False
478 else:
479 b = BytesIO()
481 def callback(block):
482 b.write(block)
483 try:
484 runc(name, callback, TransferFTP.blockSize, b)
485 except error_perm as e: # pragma: no cover
486 raise_exc = e
488 r = b.getvalue()
490 if not self.is_sftp:
491 for p in path:
492 self.cwd("..")
494 if raise_exc:
495 raise raise_exc # pragma: no cover
497 return r
499 def close(self):
500 """
501 Closes the connection.
502 """
503 self._check_can_logged()
504 self._ftp.close()
505 if self.is_sftp:
506 self._ftp = None