Source code for pyquickhelper.filehelper.ftp_transfer

"""
provides some functionalities to upload file to a website


:githublink:`%|py|5`
"""
from ftplib import FTP, FTP_TLS, error_perm
import os
import sys
import time
import datetime
from io import BytesIO
from ..loghelper.flog import noLOG


[docs]class CannotReturnToFolderException(Exception): """ raised when a transfer is interrupted by an exception and the class cannot return to the original folder :githublink:`%|py|18` """ pass
[docs]class CannotCompleteWithoutNewLoginException(Exception): """ raised when a transfer is interrupted by a new login :githublink:`%|py|25` """ pass
[docs]class TransferFTP: """ This class uploads files to a website, if the remote does not exists, it creates it first. .. exref:: :title: Transfer files to webste through FTP Simple sketch to transfer a list of ``files`` to a website through FTP :: ftp = TransferFTP('ftp.<website>', alias, password, fLOG=print) issues = [ ] done = [ ] notdone = [ ] for file in files : try : r = ftp.transfer (file, path) if r : done.append( (file, path) ) else : notdone.append ( (file, path) ) except Exception as e : issues.append( (file, e) ) try : ftp.close() except Exception as e : print ("unable to close FTP connection using ftp.close") The class may access to a server using :epkg:`SFTP` protocol but it relies on :epkg:`pysftp` and :epkg:`paramiko`. :githublink:`%|py|64` """ errorNoDirectory = "Can't change directory" blockSize = 2 ** 20
[docs] def __init__(self, site, login, password, ftps='FTP', fLOG=noLOG): """ :param site: website :param login: login :param password: password :param ftps: if ``'TLS'``, use class :epkg:`*py:ftplib:FTP_TLS`, if ``'FTP'``, use :epkg:`*py:ftplib:TLS`, if ``'SFTP'``, use :epkg:`pysftp` :param fLOG: logging function :githublink:`%|py|78` """ self._ftps_ = ftps if site is not None: if ftps == 'TLS': cls = FTP_TLS self.is_sftp = False elif ftps == 'SFTP': import pysftp import paramiko import socket sock = socket.socket() sock.connect((site, 22)) trans = paramiko.transport.Transport(sock) trans.start_client() k = trans.get_remote_server_key() hk = paramiko.hostkeys.HostKeys() hk.add(site, 'ssh-rsa', k) cnopts = pysftp.CnOpts() cnopts.hostkeys = hk def cls(si, lo, pw, cnopts=cnopts): return pysftp.Connection( si, username=lo, password=pw, cnopts=cnopts) self._login_ = lambda si=site, lo=login, pw=password: cls( si, lo, pw) self.is_sftp = True elif ftps == 'FTP': cls = FTP self.is_sftp = False else: raise RuntimeError("No implementation for '{}'.".format(ftps)) if not self.is_sftp: self._ftp = cls(site, login, password) self._logins = [(datetime.datetime.now(), site)] else: # mocking if ftps != 'FTP': raise NotImplementedError( "Option ftps is not implemented for mocking.") self._logins = [] self._ftp = FTP(site) self.is_sftp = False self.LOG = fLOG self._atts = dict(site=site, login=login, password=password)
[docs] def _check_can_logged(self): if self.is_sftp and not hasattr(self, '_ftp'): self._ftp = self._login_()
@property def Site(self): """ return the website :githublink:`%|py|132` """ return self._atts["site"]
[docs] def _private_login(self): """ logs in :githublink:`%|py|138` """ self.LOG("reconnecting to ", self.Site, " - ", len(self._logins)) try: if self.is_sftp: self._ftp = self._login_() else: self._ftp.login() self._logins.append((datetime.datetime.now(), self.Site)) except Exception as e: se = str(e) if "You're already logged in" in se: return elif (not self.is_sftp and ( "An existing connection was forcibly closed by the remote host" in se or "An established connection was aborted by the software in your host machine" in se)): # it starts a new connection self.LOG("reconnecting failed, starting a new connection", self.Site, " - ", len(self._logins)) self._ftp = FTP(self.Site, self._atts[ "login"], self._atts["password"]) self._logins.append((datetime.datetime.now(), self.Site)) else: raise e
[docs] def run_command(self, command, *args, **kwargs): """ Runs a FTP command. :param command: command :param args: list of argument :return: output of the command or True for success, False for failure :githublink:`%|py|169` """ try: t = command(*args, **kwargs) if (command == self._ftp.pwd or command == getattr(self._ftp, 'dir', None) or command == getattr(self._ftp, 'mlsd', 'listdir')): return t elif command != self._ftp.cwd: pass return True except Exception as e: # pragma: no cover if self.is_sftp and 'No such file' in str(e): raise FileNotFoundError( "Unable to find {}.".format(args)) from e if TransferFTP.errorNoDirectory in str(e): raise e self.LOG(e) self.LOG(" ** run exc ", str(command), str(args)) self._private_login() if command == self._ftp.pwd or command is self._ftp.pwd: t = command(self) else: t = command(self, *args, **kwargs) self.LOG(" ** run ", str(command), str(args)) return t
[docs] def print_list(self): """ Returns the list of files in the current directory the function sends everything to the logging function. :return: output of the command or True for success, False for failure :githublink:`%|py|201` """ self._check_can_logged() if hasattr(self._ftp, 'retrlines'): return self.run_command(self._ftp.retrlines, 'LIST') raise NotImplementedError( "Not implemented for ftps='{}'.".format(self._ftps_))
[docs] def mkd(self, path): """ Creates a directory. :param path: path to the directory :return: True or False :githublink:`%|py|214` """ self._check_can_logged() self.LOG("[mkd]", path) cmd = self._ftp.mkd if hasattr(self._ftp, 'mkd') else self._ftp.mkdir return self.run_command(cmd, path)
[docs] def cwd(self, path, create=False): """ Goes to a directory, if it does not exist, creates it (if *create* is True). :param path: path to the directory :param create: True to create it :return: True or False :githublink:`%|py|228` """ self._check_can_logged() try: self.run_command(self._ftp.cwd, path) except EOFError as e: # pragma: no cover raise EOFError("unable to go to: {0}".format(path)) from e except FileNotFoundError as e: # pragma: no cover if create: self.mkd(path) self.cwd(path, create) else: raise e except Exception as e: # pragma: no cover if create and TransferFTP.errorNoDirectory in str(e): self.mkd(path) self.cwd(path, create) else: raise e
[docs] def pwd(self): """ Returns the pathname of the current directory on the server. :return: pathname :githublink:`%|py|252` """ self._check_can_logged() if hasattr(self._ftp, 'getcwd'): r = self._ftp.getcwd() return self._ftp.pwd else: return self.run_command(self._ftp.pwd)
[docs] def dir(self, path='.'): """ Lists the content of a path. :param path: path :return: list of path See :meth:`enumerate_ls <pyquickhelper.filehelper.ftp_transfer.TransferFTP.enumerate_ls>` :githublink:`%|py|268` """ return list(self.enumerate_ls(path))
[docs] def ls(self, path='.'): """ Lists the content of a path. :param path: path :return: list of path see :meth:`enumerate_ls <pyquickhelper.filehelper.ftp_transfer.TransferFTP.enumerate_ls>` .. exref:: :title: List files from FTP site :: from pyquickhelper.filehelper import TransferFTP ftp = TransferFTP("ftp....", "login", "password") res = ftp.ls("path") for v in res: print(v["name"]) ftp.close() :githublink:`%|py|291` """ return list(self.enumerate_ls(path))
[docs] def enumerate_ls(self, path='.'): """ Enumerates the content of a path. :param path: path :return: list of dictionaries One dictionary:: {'name': 'www', 'type': 'dir', 'unique': 'aaaa', 'unix.uid': '1111', 'unix.mode': '111', 'sizd': '5', 'unix.gid': '000', 'modify': '111111'} :githublink:`%|py|311` """ self._check_can_logged() if not self.is_sftp: for a in self.run_command(self._ftp.mlsd, path): r = dict(name=a[0]) r.update(a[1]) yield r else: with self._ftp.cd(path): for name in self._ftp.listdir(): yield dict(name=name)
[docs] def transfer(self, file, to, name, debug=False, blocksize=None, callback=None): """ Transfers a file. :param file: file name or stream (binary, BytesIO) :param to: destination (a folder) :param name: name of the stream on the website :param debug: if True, displays more information :param blocksize: see :tpl:`py,m='ftplib',o='FTP.storbinary'` :param callback: see :tpl:`py,m='ftplib',o='FTP.storbinary'` :return: status When an error happens, the original current directory is restored. :githublink:`%|py|336` """ self._check_can_logged() path = to.split("/") path = [_ for _ in path if len(_) > 0] nb_logins = len(self._logins) cpwd = self.pwd() done = [] exc = None for i, p in enumerate(path): p_ = ('/' + '/'.join(path[:i + 1])) if self.is_sftp else p try: self.cwd(p_, True) except Exception as e: # pragma: no cover exc = e break done.append(p) if nb_logins != len(self._logins): raise CannotCompleteWithoutNewLoginException( "Cannot reach folder '{0}' without new login".format(to)) bs = blocksize if blocksize else TransferFTP.blockSize if not self.is_sftp: def runc(name, f, bs, callback): return self.run_command( self._ftp.storbinary, 'STOR ' + name, f, bs, callback) else: def runc(name, f, bs, callback): return self.run_command( self._ftp.putfo, remotepath=name, flo=f, file_size=bs, callback=None) if exc is None: try: if isinstance(file, str): if not os.path.exists(file): raise FileNotFoundError(file) with open(file, "rb") as f: r = runc(name, f, bs, callback) elif isinstance(file, BytesIO): r = runc(name, file, bs, callback) elif isinstance(file, bytes): st = BytesIO(file) r = runc(name, st, bs, callback) else: r = runc(name, file, bs, callback) except Exception as ee: exc = ee if nb_logins != len(self._logins): try: self.cwd(cpwd) done = [] except Exception as e: raise CannotCompleteWithoutNewLoginException( "Cannot transfer '{0}' without new login".format(to)) # It may fail here, it hopes not. nbtry = 0 nbth = len(done) * 2 + 1 while len(done) > 0: if nb_logins != len(self._logins): try: self.cwd(cpwd) break except Exception as e: raise CannotCompleteWithoutNewLoginException( "Cannot return to original folder'{0}' without new login".format(to)) from e nbtry += 1 try: self.cwd("..") done.pop() except Exception as e: # pragma: no cover time.sleep(0.5) self.LOG( " issue with command .. len(done) == {0}".format(len(done))) if nbtry > nbth: raise CannotReturnToFolderException( "len(path)={0} nbtry={1} exc={2} nbl={3} act={4}".format( len(done), nbtry, exc, nb_logins, len(self._logins))) from e if exc is not None: raise exc else: return r
[docs] def retrieve(self, fold, name, file=None, debug=False): """ Downloads a file. :param file: file name or stream (binary, BytesIO) :param fold: full remote path :param name: name of the stream on the website :param debug: if True, displays more information :return: status :githublink:`%|py|429` """ self._check_can_logged() if self.is_sftp: self.cwd(fold, False) else: path = fold.split("/") path = [_ for _ in path if len(_) > 0] for p in path: self.cwd(p, True) raise_exc = None if not self.is_sftp: def _retrbinary_(name, callback, size, f): r = self.run_command(self._ftp.retrbinary, 'RETR ' + name, callback, size) if isinstance(r, (bytes, str)): f.write(r) return r runc = _retrbinary_ else: def runc(name, callback, size, f): return self.run_command( self._ftp.getfo, remotepath=name, flo=f, callback=None) if isinstance(file, str): with open(file, "wb") as f: def callback(block): f.write(block) try: runc(name, callback, TransferFTP.blockSize, f) r = True except error_perm as e: # pragma: no cover raise_exc = e r = False elif isinstance(file, BytesIO): def callback(block): file.write(block) try: r = runc(name, callback, TransferFTP.blockSize, file) except error_perm as e: raise_exc = e r = False else: b = BytesIO() def callback(block): b.write(block) try: runc(name, callback, TransferFTP.blockSize, b) except error_perm as e: raise_exc = e r = b.getvalue() if not self.is_sftp: for p in path: self.cwd("..") if raise_exc: raise raise_exc # pragma: no cover return r
[docs] def close(self): """ Closes the connection. :githublink:`%|py|499` """ self._check_can_logged() self._ftp.close() if self.is_sftp: self._ftp = None