Source code for pyrsslocal.rss.rss_stream

"""
Description of a RSS stream.


:githublink:`%|py|5`
"""
import datetime

from ..xmlhelper.xmlfilewalk import xml_filter_iterator
from .rss_blogpost import BlogPost
from ..helper.download_helper import get_url_content_timeout


[docs]class StreamRSS: """ Requires :epkg:`feedparser`. Description of an :epkg:`RSS` stream. :: <outline text="Freakonometrics" title="Freakonometrics" type="rss" xmlUrl="http://freakonometrics.hypotheses.org/feed" htmlUrl="http://freakonometrics.hypotheses.org" /> .. list-table:: :widths: auto :header-rows: 1 * - attribute - meaning * - titleb - title of the stream * - type - type * - xmlUrl - url of the rss stream * - htmlUrl - main page of the blog * - keywordsb - list of keywords :githublink:`%|py|30` """
[docs] def __init__(self, titleb, type, xmlUrl, htmlUrl, # pylint: disable=W0622 keywordsb, id=-1, nb=None): # pylint: disable=W0622 """ :param titleb: title of the stream :param type: type :param xmlUrl: url of the rss stream :param htmlUrl: main page of the blog :param keywordsb: keywords :param id: an id :param nb: not included in the database, part of the statistics with can be added if they not None :githublink:`%|py|42` """ self.titleb = titleb self.type = type self.xmlUrl = xmlUrl self.htmlUrl = htmlUrl self.keywordsb = keywordsb self.id = id self.stat = {} if nb is not None: self.stat["nb"] = nb
[docs] def __str__(self): """ usual :githublink:`%|py|56` """ return "%s: %s (%s)" % (self.type, self.titleb, self.xmlUrl)
[docs] def __lt__(self, o): """ cmp operator :githublink:`%|py|62` """ s1 = self.__str__().lower() s2 = self.__str__().lower() return s1 < s2
@property def index(self): """ Defines the column to use as an index. :githublink:`%|py|71` """ return "xmlUrl" @property def asdict(self): """ Returns all members as a dictionary. :return: dictionary :githublink:`%|py|80` """ return {"titleb": self.titleb, "type": self.type, "xmlUrl": self.xmlUrl, "htmlUrl": self.htmlUrl, "keywordsb": self.keywordsb}
[docs] @staticmethod def schema_database_read(): """ Returns all members names and types as a dictionary. :return: dictionary :githublink:`%|py|93` """ return {0: ("titleb", str), 1: ("type", str), 2: ("xmlUrl", str), 3: ("htmlUrl", str), 4: ("keywordsb", str), 5: ("id", int, "PRIMARYKEY", "AUTOINCREMENT")}
@property def schema_database(self): """ Returns all members names and types as a dictionary. :return: dictionary :githublink:`%|py|107` """ return {0: ("titleb", str), 1: ("type", str), 2: ("xmlUrl", str), 3: ("htmlUrl", str), 4: ("keywordsb", str), -1: ("id", int, "PRIMARYKEY", "AUTOINCREMENT")} @property def asrow(self): """ Returns all the values as a row (following the schema given by :meth:`schema_database`). :return: list of values :githublink:`%|py|122` """ return [self.titleb, self.type, self.xmlUrl, self.htmlUrl, ",".join(self.keywordsb)]
[docs] @staticmethod def enumerate_stream_from_google_list(file, encoding="utf8", fLOG=None): """ Retrieves the list of :epkg:`RSS` streams from a dump made with Google Reader. :param file: filename :param encoding: encoding :param fLOG: logging function :return: list of :class:`StreamRSS <pyrsslocal.rss.rss_stream.StreamRSS>` The format is the following: .. exref:: :title: An entry in the XML config file :: <outline text="Freakonometrics" title="Freakonometrics" type="rss" xmlUrl="http://freakonometrics.hypotheses.org/feed" htmlUrl="http://freakonometrics.hypotheses.org" /> :githublink:`%|py|152` """ with open(file, "r", encoding=encoding) as ff: for o in xml_filter_iterator(ff, lambda f: True, log=True, xmlformat=False, fLOG=fLOG): for oo in o.enumerate_on_tag("outline", recursive=True): if isinstance(oo, tuple): # pylint: disable=R1720 raise ValueError("wrong format file: " + file) else: if len(oo.other) == 0 and "xmlUrl" in oo: if len(oo["xmlUrl"]) > 0: obj = StreamRSS(titleb=oo["title"], type=oo["type"], xmlUrl=oo["xmlUrl"], htmlUrl=oo["htmlUrl"], keywordsb=[]) yield obj
[docs] @staticmethod def fill_table(db, tablename, iterator_on): """ Fills a table of a database, if the table does not exists, it creates it. :param db: database object (:class:`Database`) :param tablename: name of a table (created if it does not exists) :param iterator_on: iterator_on on StreamRSS object Example: :: res = list(StreamRSS.enumerate_stream_from_google_list(file)) StreamRSS.fill_table(db, "blogs", res) :githublink:`%|py|184` """ db.fill_table_with_objects( tablename, iterator_on, check_existence=True)
[docs] def enumerate_post(self, path=None, fLOG=None): """ Parses a :epkg:`RSS` stream. :param path: if None, use self.xmlUrl, otherwise, uses this path (url or local file) :param fLOG: logging function :return: list of :class:`BlogPost <pyrsslocal.rss.rss_blogpost.BlogPost>` We expect the format to be: :: {'summary_detail': {'base': '', 'value': '<p> J\'ai encore perdu des ... </p>', 'language': None, 'type': 'text/html'}, 'title_detail': {'base': '', 'value': 'Installer pip pour Python', 'language': None, 'type': 'text/plain'}, 'published': '2013-06-24 00:00:00', 'published_parsed': time.struct_time(tm_year=2013, tm_mon=6, tm_mday=24, tm_hour=0, tm_min=0, tm_sec=0, tm_wday=0, tm_yday=175, tm_isdst=0), 'link': 'http://www.xavierdupre.fr/blog/xd_blog.html?date=2013-06-24', 'summary': '<p> J\'ai encore perdu de... </p>', 'guidislink': False, 'title': 'Installer pip pour Python', 'links': [{'href': 'http://www.xavierdupre.fr/blog/xd_blog.html?date=2013-06-24', 'rel': 'alternate', 'type': 'text/html'}], 'id': 'http://www.xavierdupre.fr/blog/xd_blog.html?date=2013-06-24'} If there is no date, the function will give the date of today (assuming you fetch posts from this blog everyday). If the id is not present, the guid will be the url, otherwise, it will be the id. :githublink:`%|py|229` """ import feedparser if path is None: path = self.xmlUrl if path.startswith("http://") or path.startswith("https://"): cont = get_url_content_timeout(path) if cont is None: if fLOG: fLOG( "[enumerate_post] unable to retrieve content for url: '{}'.".format(path)) else: cont = path if cont is not None: if "<title>" not in cont: if fLOG: fLOG("unable to parse content from " + self.xmlUrl) try: d = feedparser.parse(cont) except RuntimeError as e: if fLOG: fLOG( "[enumerate_post] cannot enumerate post in '{}'.".format(path)) d = None else: d = None if d is not None: if len(d["entries"]) == 0: if fLOG: fLOG("[enumerate_post] no post for ", path) for post in d["entries"]: titleb = post.get("title", "-") url = post.get("link", "") try: id_ = post["id"] guid = url if post["guidislink"] else id_ except KeyError: id_ = url guid = url try: desc = post["summary_detail"]["value"] except KeyError: try: desc = post["summary"] except KeyError: desc = "" isPermaLink = True try: structTime = post["published_parsed"] date = datetime.datetime(*structTime[:6]) except KeyError: try: structTime = post["updated_parsed"] date = datetime.datetime(*structTime[:6]) except KeyError: date = datetime.datetime.now() except TypeError as e: structTime = post["published_parsed"] if structTime is None: date = datetime.datetime.now() else: raise e if date > datetime.datetime.now(): date = datetime.datetime.now() bl = BlogPost(self, titleb, guid, isPermaLink, url, desc, date) yield bl
[docs] @staticmethod def enumerate_post_from_rsslist(list_rss_stream, fLOG=None): """ Enumerates all posts found in all rss_streams given as a list. :param list_rss_stream: list of rss streams :param fLOG: logging function :return: enumeration of blog post :githublink:`%|py|315` """ for rss in list_rss_stream: try: if fLOG: fLOG("reading post from", rss) except UnicodeEncodeError: if fLOG: fLOG("reading post from", [rss], "encoding issue") for post in rss.enumerate_post(): yield post
@property def stat_nb(self): """ Returns the statistics nb: ``self.stat.get("nb", 0)``. :return: number :githublink:`%|py|331` """ return self.stat.get("nb", 0) templates = {"default": """ <p class="%s"><a href="%s" onmousedown="sendlog('blog/{0.id}/in')">{0.titleb}</a> <a href="{0.htmlUrl}" target="_blank" onmousedown="sendlog('blog/{0.id}/outimg')"> <img src="/arrowi.png" width="12px" /></a></p> """.replace(" ", ""), "default_stat": """ <tr class="%s"><td> <a href="%s" onmousedown="sendlog('blog/{0.id}/in')">{0.titleb}</a> <a href="{0.htmlUrl}" target="_blank" onmousedown="sendlog('blog/{0.id}/outimg')"> <img src="/arrowi.png" width="12px" /></a> </td><td>{0.stat_nb}</td></tr> """.replace(" ", ""), }
[docs] def html(self, template=None, action="{0.htmlUrl}", style="blogtitle", addlog=True): """ Displays the blogs in HTML format, the template contains two kinds of informations: - ``{0.member}``: this string will be replaced by the member :param template: html template, if not None, it can equal to another default template: - default - default_stat :param action: url to use when clicking on a blog :param style: style of the paragraph containing the url :param addlog: if True, url will be prefix by ``/logs/click/`` in order to be logged :return: html string If the template is None, it will be replaced a default value (see the code and the variable ``template``). :githublink:`%|py|369` """ if template is None: template = StreamRSS.templates["default"] % (style, action) else: template = StreamRSS.templates.get( template, template) % (style, action) template = template.replace("__id__", str(self.id)) res = template.format(self) return res