Source code for pyrsslocal.rss.rss_helper

"""
Various function to automate the collection of blog posts.


:githublink:`%|py|5`
"""
import os
import webbrowser
import sys
import threading
import datetime
from textwrap import dedent
from jinja2 import Template
from pyquickhelper.filehelper import read_content_ufs
from pyensae.sql.database_main import Database
from .rss_stream import StreamRSS
from .rss_blogpost import BlogPost
from .rss_simple_server import RSSServer


[docs]def rss_from_xml_to_database(file, database="database_rss.db3", table="blogs", fLOG=None): """ Parses a list of blogs stored in a :epkg:`XML` file using Google Reader format, stores the results in a :epkg:`SQLite` database. :param file: (str) xml file containing the list of blogs, example: :param database: database file (sqlite) :param table: table name :param fLOG: logging function :return: number of stored blogs The XML file should contain the following: :: <outline text="XD blog" title="XD blog" type="rss" xmlUrl="http://www.xavierdupre.fr/blog/xdbrss.xml" htmlUrl="http://www.xavierdupre.fr/blog/xd_blog.html" /> The function does not check whether or not the blogs were already added to the database, they will be added a second time. If the table does not exist, it will be created. :githublink:`%|py|45` """ res = list(StreamRSS.enumerate_stream_from_google_list(file)) db = Database(database, LOG=fLOG) db.connect() StreamRSS.fill_table(db, table, res) db.close() return len(res)
[docs]def rss_download_post_to_database(database="database_rss.db3", table_blog="blogs", table_post="posts", fLOG=None): """ Downloads all posts from a list of blogs stored in a database by function :func:`rss_from_xml_to_database <pyrsslocal.rss.rss_helper.rss_from_xml_to_database>`. :param database: database file name (SQLite format) :param table_blog: table name of the blogs :param table_post: table name of the post :param fLOG: logging function :return: number of posts downloaded :githublink:`%|py|67` """ db = Database(database, LOG=fLOG) db.connect() rss_list = list(db.enumerate_objects(table_blog, StreamRSS)) list_post = list( StreamRSS.enumerate_post_from_rsslist(rss_list, fLOG=fLOG)) BlogPost.fill_table(db, table_post, list_post, skip_exception=True) db.close() return len(list_post)
[docs]def rss_update_run_server(dbfile, xml_blogs, port=8093, browser=None, period="today", server=None, thread=False, fLOG=None): """ Creates a database if it does not exists, add a table for blogs and posts, update the database, starts a server and open a browser. :param dbfile: (str) sqllite database to create :param xml_blogs: (str) xml description of blogs (google format) (file or string) :param port: the main page will be ``http://localhost:port/`` :param browser: (str) to choose a different browser than the default one :param period: (str) when opening the browser, it can show the results for last day or last week :param server: to set up your own server :param thread: to start the server in a separate thread :param fLOG: logging function :return: see :func:`rss_run_server <pyrsslocal.rss.rss_helper.rss_run_server>` You can read the blog post `pyhome3 RSS Reader <http://www.xavierdupre.fr/blog/2013-07-28_nojs.html>`_. :githublink:`%|py|99` """ rss_from_xml_to_database(xml_blogs, database=dbfile, fLOG=fLOG) rss_download_post_to_database(database=dbfile, fLOG=fLOG) return rss_run_server(dbfile, port, browser=browser, period=period, server=server, thread=thread, fLOG=fLOG)
[docs]def rss_run_server(dbfile, port=8093, browser=None, period="today", server=None, thread=False, fLOG=None): """ Starts a server and open a browser on a page reading blog posts. :param dbfile: (str) sqllite database to create :param port: the main page will be ``http://localhost:port/`` :param browser: (str) to choose a different browser than the default one :param period: (str) when opening the browser, it can show the results for last day or last week :param server: to set up your own server :param thread: to start the server in a separate thread :param fLOG: logging function You can read the blog post `RSS Reader <http://www.xavierdupre.fr/blog/2013-07-28_nojs.html>`_. If *browser* is "none", the browser is not started. :githublink:`%|py|122` """ if not os.path.exists(dbfile): raise FileNotFoundError(dbfile) def open_browser(): url = "http://localhost:%d/rss_reader.html?search=%s" % (port, period) if fLOG: fLOG("opening ", url) if browser is not None: if browser in ["none", "None"]: pass else: try: b = webbrowser.get(browser) except webbrowser.Error as e: if browser == "firefox" and sys.platform.startswith("win"): webbrowser.register( 'firefox', None, webbrowser.GenericBrowser(r"C:\Program Files (x86)\Mozilla Firefox\firefox.exe")) b = webbrowser.get(browser) else: raise e b.open(url) else: webbrowser.open(url) # webbrowser.open does get back until the browser is closed if the browser was launched # with this only tab. If a new tab was create this function quickly endss th = threading.Thread(target=open_browser) th.start() ret = RSSServer.run_server( server, dbfile, port=port, thread=thread, fLOG=fLOG) # we should close the thread here if it is still alive return ret
[docs]def enumerate_post_from_rss(content, rss_stream=None): """ Parses a :epkg:`RSS` stream. :param content: :epkg:`RSS` content :return: list of :class:`BlogPost <pyrsslocal.rss.rss_blogpost.BlogPost>` :githublink:`%|py|165` """ import feedparser # pylint: disable=C0415 d = feedparser.parse(content) if d is not None: for post in d["entries"]: titleb = post.get("title", "-") url = post.get("link", "") try: id_ = post["id"] guid = url if post["guidislink"] else id_ except KeyError: id_ = url guid = url try: desc = post["summary_detail"]["value"] except KeyError: try: desc = post["summary"] except KeyError: desc = "" isPermaLink = True try: structTime = post["published_parsed"] date = datetime.datetime(*structTime[:6]) except KeyError: try: structTime = post["updated_parsed"] date = datetime.datetime(*structTime[:6]) except KeyError: date = datetime.datetime.now() except TypeError as e: structTime = post["published_parsed"] if structTime is None: date = datetime.datetime.now() else: raise e if date > datetime.datetime.now(): date = datetime.datetime.now() bl = BlogPost(rss_stream, titleb, guid, isPermaLink, url, desc, date) yield bl
[docs]def enumerate_rss_merge(rss_urls, title="compilation", min_size=None): """ Merges many :epkg:`rss` file or url. :param rss_urls: :epkg:`rss` files or urls :param title: title :param min_size: fails if the downloaded file is below this size :return: new RSS :githublink:`%|py|224` """ sts = StreamRSS(title, None, None, None, None, id=0) for name in rss_urls: content = read_content_ufs(name, min_size=min_size) for blog in enumerate_post_from_rss(content, rss_stream=sts): yield blog
[docs]def to_rss(obj, link, description): """ Converts something into :epkg:`RSS`. :param obj: object :param link: link :param description: description :return: content :githublink:`%|py|240` """ if isinstance(obj, list): if len(obj) == 0: raise ValueError("obj cannot be empty.") else: raise TypeError("Unexpected type {}.".format(type(obj))) if isinstance(obj[0], StreamRSS): st = obj[0] title = st.title else: title = "" items = [] for blog in obj: items.append(blog.to_rss_item()) template = dedent(""" <?xml version="1.0" encoding="utf-8"?> <rss version="2.0"> <channel> <title>{{title}}</title> <link>{{link}}</link> <description>{{description}}</description> {{items}} </channel> </rss> """) tpl = Template(template) return tpl.render(link=link, description=description, items='\n'.join(items), title=title)
template_html = """ <?xml version="1.0" encoding="utf-8"?> <html> <head> <link href="http://www.xavierdupre.fr/pyhome3.ico" rel="shortcut icon"/> <link href="http://www.xavierdupre.fr/blog/pMenu.css" rel="stylesheet" type="text/css"/> <link REL="stylesheet" TYPE="text/css" href="http://www.xavierdupre.fr/blog/javascript/run_prettify.css"/> <title>{{title}}</title> <meta content="{{author}}" name="author"/> <meta content="{{keywords}}" name="keywords"/> <meta content="text/html; charset=utf-8" http-equiv="Content-Type"/> <script src="http://www.xavierdupre.fr/blog/javascript/pMenu.js" type="text/javascript"></script> <script src="http://www.xavierdupre.fr/blog/javascript/latexit.js" type="text/javascript"></script> <script src="http://www.xavierdupre.fr/blog/javascript/run_prettify.js" type="text/javascript"></script> <link href="http://www.xavierdupre.fr/blog/javascript/shCore.css" rel="stylesheet" type="text/css"/> <link href="http://www.xavierdupre.fr/blog/javascript/shThemeDefault.css" rel="stylesheet" type="text/css"/> <script src="http://www.xavierdupre.fr/blog/javascript/shCore.js" type="text/javascript"></script> <script src="http://www.xavierdupre.fr/blog/javascript/shAutoloader.js" type="text/javascript"></script> </head> <body> <div class="otherlayer"> <!-- other layer --> </div> <div class="sidebar"> </div> <div class="maintitle"> <h1>{{title}}</h1> <p><a href="{{rssfile.xml}}"><img src="http://www.xavierdupre.fr/blog/documents/feed-icon-16x16.png"/></a> <i>{{header}}</i></p> </div> <div class="mainbody"> <hr /> {{items}} <hr /> </div> <script type="text/javascript"> SyntaxHighlighter.autoloader( 'js jscript javascript http://www.xavierdupre.fr/blog/javascript/shBrushJScript.js', 'py python http://www.xavierdupre.fr/blog/javascript/shBrushPython.js', 'cpp http://www.xavierdupre.fr/blog/javascript/shBrushCpp.js', 'sql http://www.xavierdupre.fr/blog/javascript/shBrushSql.js', 'flat plain http://www.xavierdupre.fr/blog/javascript/shBrushPlain.js', 'vba vb http://www.xavierdupre.fr/blog/javascript/shBrushVb.js', 'bash http://www.xavierdupre.fr/blog/javascript/shBrushBash.js', 'cs http://www.xavierdupre.fr/blog/javascript/shBrushCSharp.js', 'php http://www.xavierdupre.fr/blog/javascript/shBrushPhp.js', 'css http://www.xavierdupre.fr/blog/javascript/shBrushCss.js', 'xml html http://www.xavierdupre.fr/blog/javascript/shBrushXml.js' ); SyntaxHighlighter.all(); </script> <div id="playscript"/> </body> </html> """
[docs]def to_html(items, template=None, title="BLOG", author="AUTHOR", keywords="blog,python", header="", rssfile="rssfile.xml", **context): """ Produces a :epkg:`HTML`. :param items: list of blog post :param template: template or None to get the default one :param title: blog title :param author: author :param keywords: keywords :param header: blog description :param rssfile: file RSS :param context: other information :return: pages :githublink:`%|py|358` """ if template is None: template_ = Template(template_html) hitems = "\n".join(map(lambda b: b.to_html_item(), sorted(items, reverse=True, key=lambda i: i.pubDate))) return template_.render(title=title, author=author, keywords=keywords, items=hitems, header=header, rssfile=rssfile, **context)