Source code for pyquickhelper.filehelper.transfer_api_ftp

"""
API to move files using FTP


:githublink:`%|py|5`
"""
import ftplib
from io import BytesIO
from ..loghelper import noLOG
from .transfer_api import TransferAPI
from .ftp_transfer import TransferFTP
from .ftp_mock import MockTransferFTP


[docs]class TransferAPIFtp(TransferAPI): """ Defines an API to transfer files over a remote location through :epkg:`FTP`. :githublink:`%|py|17` """
[docs] def __init__(self, site, login, password, root="backup", ftps='FTP', fLOG=noLOG): """ :param site: website :param login: login :param password: password :param root: root on the website :param ftps: protocol, see :class:`TransferFTP <pyquickhelper.filehelper.ftp_transfer.TransferFTP>` :param fLOG: logging function :githublink:`%|py|28` """ TransferAPI.__init__(self, fLOG=fLOG) self._ftp = (TransferFTP(site, login, password, fLOG=fLOG, ftps=ftps) if site else MockTransferFTP(ftps=ftps, fLOG=fLOG)) self._root = root
[docs] def connect(self): """ connect :githublink:`%|py|37` """ pass
# self._ftp.connect()
[docs] def close(self): """ close the connection :githublink:`%|py|44` """ pass
# self._ftp._close()
[docs] def transfer(self, path, data): """ It assumes a data holds in memory, tansfer data to path. :param data: bytes :param path: path to remove location :return: boolean :githublink:`%|py|56` """ spl = path.rsplit("/") to = self._root + "/" + "/".join(spl[:-1]) to = to.rstrip("/") byt = BytesIO(data) r = self._ftp.transfer(byt, to, spl[-1]) return r
[docs] def retrieve(self, path, exc=True): """ retrieve data from path :param path: remove location :param exc: keep exception :return: data :githublink:`%|py|71` """ spl = path.rsplit("/") src = self._root + "/" + "/".join(spl[:-1]) src = src.rstrip("/") if exc: r = self._ftp.retrieve(src, spl[-1], None) else: try: r = self._ftp.retrieve(src, spl[-1], None) except ftplib.error_perm: r = None return r