Source code for pymmails.grabber.mailboximap

"""
Defines a mailbox using IMAP


:githublink:`%|py|5`
"""

import imaplib
import re
import email
import email.message
from pyquickhelper.loghelper import noLOG
from .mail_exception import MailException
from .email_message import EmailMessage


[docs]class MailBoxImap: """ Defines a mail box with :epkg:`IMAP` interface. .. exref:: :title: Fetch mails from a gmail account :: user = "address no domain" pwd = "password" server = "imap.gmail.com" box = MailBoxImap(user, pwd, server, ssl=True) box.login() # ... fetch emails box.logout() :githublink:`%|py|35` """ expFolderName = re.compile('\\"(.*?)\\"')
[docs] def __init__(self, user, pwd, server, ssl=False, fLOG=noLOG): """ :param user: user :param pwd: password :param server: server something like ``imap.domain.ext`` :param ssl: select ``IMPA_SSL`` or ``IMAP`` :param fLOG: logging function For gmail, it is ``imap.gmail.com`` and ssl must be true. :githublink:`%|py|48` """ self.M = imaplib.IMAP4_SSL(server) if ssl else imaplib.IMAP4(server) self._user = user self._password = pwd self.fLOG = fLOG
[docs] def login(self): """ login :githublink:`%|py|57` """ self.M.login(self._user, self._password)
[docs] def logout(self): """ logout :githublink:`%|py|63` """ self.M.logout()
[docs] def folders(self): """ Returns the list of folder of the mail box. :githublink:`%|py|69` """ folders = self.M.list() if folders[0] != "OK": raise MailException( "unable to retrieve the folder list for " + self._user) res = [] for f in folders[1]: s = f.decode("utf8") if r"\Noselect" in s: continue # s looks like this: (\HasNoChildren) "/" "INBOX/Something" exp = MailBoxImap.expFolderName.findall(s) name = exp[-1] res.append(name) return res
[docs] def enumerate_mails_in_folder( self, folder, skip_function=None, date=None, pattern="ALL", body=True): """ Enumerates all mails in folder folder. :param folder: folder name :param skip_function: if not None, use this function on the header/body to avoid loading the entire message (and skip it) :param pattern: search pattern (see below) :param date: add a date to the pattern :param body: add body :return: iterator on (message) The search pattern can be used to look for a subset of email. It follows these `specifications <http://tools.ietf.org/html/rfc3501#page-49>`_. If a folder is a subfolder, the syntax should be ``folder/subfolder``. .. exref:: :title: Search pattern :: pattern='FROM "xavier" SINCE 1-Feb-2013' pattern='FROM "xavier" SINCE 1-Feb-2013 BEFORE 5-May-2013' pattern='FROM "xavier" SINCE 1-Feb-2013 BEFORE 5-May-2013 (UNANSWERED)' pattern='CC "jacques" FROM "xavier" (DELETED)' pattern='TEXT "github"' pattern='LARGER 10000 SMALLER 1000000' pattern='SUBJECT "programmation"' pattern='TO "student" (FLAGGED)' pattern='(UNSEEN)' If the function generates an error such as:: imaplib.error: command: SEARCH => got more than 10000 bytes The keyword RECENT will be added to the search pattern in order to retreive the newest mails. :githublink:`%|py|125` """ if isinstance(folder, list): for fold in folder: iter = self.enumerate_mails_in_folder(folder=fold, skip_function=skip_function, date=date, pattern=pattern, body=body) for mail in iter: yield mail else: qfold = self.M._quote(folder) self.M.select(qfold, readonly=True) if date is not None: pdat = 'SINCE {0}'.format(date) if pattern == "ALL": pattern = pdat else: pattern += " " + pdat try: pattern.encode('ascii') charset = None except UnicodeEncodeError: charset = 'UTF8' pattern = pattern.encode('utf-8') pattern = "".join(chr(b) for b in pattern) try: try: _, data = self.M.search(charset, pattern) except UnicodeEncodeError as e: charset = None pattern = pattern.encode( 'ascii', errors='ignore').decode("ascii") _, data = self.M.search(None, pattern) except Exception as e: if "SEARCH => got more " in str(e): if pattern == "ALL": pattern = "RECENT" else: pattern += " RECENT" pattern = pattern.strip() self.fLOG("[MailBoxImap.enumerate_mails_in_folder] limit email " "search for folder '{0}' to recent emails with " "pattern '{1}'".format(folder, pattern)) data = self.M.search(charset, pattern)[1] else: raise MailException( "Unable to search for pattern: '{0}' " "(charset='{1}')\nin subfolder {2}\n" "check the folder you search for is right." .format(pattern, charset, qfold)) from e spl = data[0].split() self.fLOG("MailBoxImap.enumerate_mails_in_folder [folder={0} nbm={1} body={2} pattern={3}]".format( folder, len(spl), body, pattern)) for num in spl: if skip_function is not None: data = self.M.fetch(num, '(BODY[HEADER])')[1] emailBody = data[0][1] mail = email.message_from_bytes( emailBody, _class=EmailMessage) if skip_function(mail): continue if body: data = self.M.fetch(num, '(RFC822)')[1] emailBody = data[0][1] mail = email.message_from_bytes( emailBody, _class=EmailMessage) elif skip_function is None: data = self.M.fetch(num, '(BODY[HEADER])')[1] emailBody = data[0][1] mail = email.message_from_bytes( emailBody, _class=EmailMessage) yield mail self.M.close()
[docs] def enumerate_search_person(self, person, folder, skip_function=None, date=None, max_dest=5, body=True): """ Enumerates all mails in folder folder from a user or sent to a user. :param person: person to look for or persons to look for :param folder: folder name :param skip_function: if not None, use this function on the header/body to avoid loading the entire message (and skip it) :param pattern: search pattern (see below) :param max_dest: maximum number of receivers :param body: get the body :return: iterator on (message) If *person* is a list, the function iterates on the list of persons to look for. It returns only unique mails. :githublink:`%|py|219` """ if isinstance(person, list): unique_id = set() for p in person: mail_set = self.enumerate_search_person(p, folder=folder, skip_function=skip_function, date=date, max_dest=max_dest, body=body) for mail in mail_set: uid = mail.UniqueID if uid not in unique_id: unique_id.add(uid) yield mail else: pat1 = 'FROM "{0}"'.format(person) if date is not None: pat1 += ' SINCE {0}'.format(date) for mail in self.enumerate_mails_in_folder(folder, skip_function=skip_function, pattern=pat1, body=body): yield mail pat2 = 'TO "{0}"'.format(person) if date is not None: pat2 += ' SINCE {0}'.format(date) for mail in self.enumerate_mails_in_folder( folder, skip_function=skip_function, pattern=pat2): if max_dest > 0: tos = mail.get_to() if tos: ll = len(tos) if ll <= max_dest: yield mail else: yield mail
[docs] def enumerate_search_subject(self, subject, folder, skip_function=None, date=None, max_dest=5): """ Enumerates all mails in folder folder with a subject verifying a regular expression. :param subject: subject to look for :param folder: folder name :param skip_function: if not None, use this function on the header/body to avoid loading the entire message (and skip it) :param pattern: search pattern (see below) :param max_dest: maximum number of receivers :return: iterator on (message) :githublink:`%|py|268` """ pat1 = 'SUBJECT "{0}"'.format(subject) for mail in self.enumerate_mails_in_folder( folder, skip_function=skip_function, pattern=pat1): yield mail