Source code for pyrsslocal.rss.rss_database
"""
Description of a :epkg:`RSS` stream.
:githublink:`%|py|5`
"""
import datetime
import os
from pyquickhelper.loghelper import fLOG
from pyensae.sql.database_main import Database
from .rss_blogpost import BlogPost
from .rss_stream import StreamRSS
[docs]class DatabaseRSS (Database):
"""
Database specific to :epkg:`RSS`.
:githublink:`%|py|18`
"""
[docs] @staticmethod
def schema_table(table):
"""
returns the schema for a specific table
:param table: name (in ["stats", "event"])
:return: dictionary
:githublink:`%|py|27`
"""
if table == "stats":
return {0: ("id_post", int),
1: ("dtime", datetime.datetime),
2: ("status", str),
3: ("rate", int),
4: ("comment", str),
}
elif table == "event":
return {-1: ("id_event", int, "PRIMARYKEY", "AUTOINCREMENT"),
0: ("dtime", datetime.datetime),
1: ("uuid", str),
2: ("type1", str),
3: ("type2", str),
4: ("args", str),
}
else:
raise Exception("unexpected table name")
[docs] def __init__(self, dbfile,
table_blogs="blogs",
table_posts="posts",
table_stats="posts_stat",
table_event="events",
LOG=fLOG):
"""
:param dbfile: file database
:param table_blogs: table name for the blogs
:param table_posts: table name for the posts
:param table_stats: table name for the posts stats
:param table_event: table name for the events
:param LOG: logging function
:githublink:`%|py|59`
"""
if not os.path.exists(dbfile):
raise FileNotFoundError(dbfile)
Database.__init__(self, dbfile, LOG=LOG)
self.dbfile = dbfile
self.table_blogs = table_blogs
self.table_posts = table_posts
self.table_stats = table_stats
self.table_event = table_event
self.connect()
for tbl in [table_blogs, table_posts]:
if not self.has_table(tbl):
raise Exception("table %s not found in %s" % (tbl, dbfile))
self.create_missing_table()
self.close()
[docs] def create_missing_table(self):
"""
Creates the missing tables.
:githublink:`%|py|79`
"""
if self.has_table(self.table_stats) and len(self.get_table_columns(
self.table_stats)) != len(DatabaseRSS.schema_table("stats")):
self.remove_table(self.table_stats)
if not self.has_table(self.table_stats):
schema = DatabaseRSS.schema_table("stats")
self.create_table(self.table_stats, schema)
self.commit()
self.create_index(
"id_post_" +
self.table_stats,
self.table_stats,
"id_post",
False)
self.commit()
if not self.has_table(self.table_event):
schema = DatabaseRSS.schema_table("event")
self.create_table(self.table_event, schema)
self.commit()
[docs] def __str__(self):
"""
usual
:githublink:`%|py|105`
"""
return "file:%s, t-blogs:%s, t-posts:%s" % (
self.dbfile, self.table_blogs, self.table_posts)
specific_search = {
"today": "SELECT DISTINCT id_rss FROM {0} WHERE pubDate >= '{1}'",
"twoday": "SELECT DISTINCT id_rss FROM {0} WHERE pubDate >= '{2}'",
"week": "SELECT DISTINCT id_rss FROM {0} WHERE pubDate >= '{3}'",
"frequent": """SELECT id_rss FROM (
SELECT id_rss, SUM(nb)*1.0/ (MAX(day) - MIN(day)+1) AS avg_nb FROM (
SELECT id_rss, day, COUNT(*) AS nb FROM (
SELECT id_rss, getdayn(pubDate) AS day FROM {0} WHERE pubDate >= '{4}'
) GROUP BY id_rss, day
) GROUP BY id_rss
) WHERE avg_nb >= {5}""",
"notfrequent": """SELECT id_rss FROM (
SELECT id_rss, SUM(nb)*1.0/ (MAX(day) - MIN(day)+1) AS avg_nb FROM (
SELECT id_rss, day, COUNT(*) AS nb FROM (
SELECT id_rss, getdayn(pubDate) AS day FROM {0} WHERE pubDate >= '{4}'
) GROUP BY id_rss, day
) GROUP BY id_rss
) WHERE avg_nb < {5}""",
}
[docs] @staticmethod
def getday(dt):
"""
Returns the same datetime but with no time.
:param dt: datetime
:return: datetime which correspond to the beginning of the day
:githublink:`%|py|136`
"""
if isinstance(dt, str):
res = dt.split(" ")
return res[0]
else:
res = datetime.datetime(dt.year, dt.month, dt.day)
return res
[docs] @staticmethod
def getdayn(dt):
"""
Returns the same datetime but with no time.
:param dt: datetime
:return: datetime which correspond to the beginning of the day
:githublink:`%|py|151`
"""
if isinstance(dt, str):
dt = dt.split()[0]
ymd = dt.split("-")
res = datetime.datetime(int(ymd[0]), int(ymd[1]), int(ymd[2]))
else:
res = datetime.datetime(dt.year, dt.month, dt.day)
one = datetime.datetime(2000, 1, 1)
d = res - one
return d.days
[docs] def enumerate_blogs(self, sorted_=True, specific=None, daily_freq=1.5,
now=None, addstat=False):
"""
Enumerates all the blogs from the database.
:param sorted_: sorted by title
:param specific: specific search
- None: all blogs
- today: get all blogs for today
- twoday: get all blogs for today and yesterday
- week: get all blogs for last week
- notfrequent: get all blogs publishing less posts in a day than ``daily_freq``
- frequent: get all blogs publishing more posts in a day than ``daily_freq``
:param daily_freq: see parameter specific
:param now: if None, today means today, if not None, ``now`` will have the meaning of today
:param addstat: if True, the function will a field corresponding to the number of posts from this blog
:return: enumeration of :class:`StreamRSS <pyrsslocal.rss.rss_stream.StreamRSS>`
:githublink:`%|py|179`
"""
if addstat:
sqlstatjoinA = "SELECT A.*, nbpost FROM ("
sqlstatjoinB = """) AS A INNER JOIN (SELECT id_rss, COUNT(*) AS nbpost FROM {0}
GROUP BY id_rss) ON id_rss == A.id""".format(self.table_posts)
orderby = "nbpost DESC"
else:
sqlstatjoinA = ""
sqlstatjoinB = ""
orderby = "titleb"
if isinstance(specific, list):
if len(specific) == 1:
specific = specific[0]
else:
raise TypeError(
"unable to process if specific is a list:" +
str(specific))
if specific in [None, ""]:
self.connect()
sql = "%sSELECT titleb, type, xmlUrl, htmlUrl, keywordsb, id FROM %s%s" % (
sqlstatjoinA, self.table_blogs, sqlstatjoinB)
if sorted_:
sql += " ORDER BY " + orderby
for row in self.execute(sql):
bl = StreamRSS(*row)
yield bl
self.close()
elif specific in DatabaseRSS.specific_search.keys():
today = datetime.datetime.now() if now is None else now
day = datetime.datetime(2013, 1, 2) - datetime.datetime(2013, 1, 1)
yesday = today - day
yes2 = yesday - day
yesweek = today - (day * 7)
yeshalf = today - (day * 180)
self.connect()
self.add_function("getdayn", 1, DatabaseRSS.getdayn)
sql = "%sSELECT titleb, type, xmlUrl, htmlUrl, keywordsb, id FROM %s WHERE id IN (%s)%s" % \
(sqlstatjoinA, self.table_blogs,
DatabaseRSS.specific_search[specific].format(
self.table_posts,
yesday,
yes2,
yesweek,
yeshalf,
daily_freq),
sqlstatjoinB)
if sorted_:
sql += " ORDER BY " + orderby
for row in self.execute(sql):
bl = StreamRSS(*row)
yield bl
self.close()
else:
raise ValueError(
"unable to interpret value %s for parameter specific" %
specific)
[docs] def enumerate_latest_status(self, postid, nb=1, connect=True):
"""
Retrieves the latest status for a post.
:param postid: post id
:param nb: number of desired status
:param connect: connect (True) or skip connection (False)
:return: enumerate on values from ``table_stats`` ordered by decreasing time
:githublink:`%|py|250`
"""
if connect:
self.connect()
sch = DatabaseRSS.schema_table("stats")
sql = "SELECT * FROM {0} WHERE id_post=={1} ORDER BY dtime DESC".format(
self.table_stats,
postid)
for row in self.execute(sql):
nb -= 1
if nb < 0:
break
yield {sch[i][0]: row[i] for i in range(len(row))}
if connect:
self.close()
[docs] def private_process_condition(self, blog_selection=None, post_selection=None,
sorted_=True, specific=None, now=None,
searchterm=None):
"""
Returns a :epkg:`SQL` query corresponding to list of posts.
:param blog_selection: list of blogs to consider (or empty for all)
:param post_selection: list of posts to consider
:param sorted_: sorted by date
:param specific: specific search
- None: all posts
- today: get all posts for today
- week: get all posts for last week
:param searchterm: if not None, filters using a SQL like search (using ``%``)
:param now: if None, today means today, if not None, ``now`` will have the meaning of today
:return: SQL query
:githublink:`%|py|281`
"""
if blog_selection is None:
blog_selection = []
if post_selection is None:
post_selection = []
if searchterm is not None:
if not searchterm.startswith("+") and "%" not in searchterm:
searchterm = "%{0}%".format(searchterm)
searchterm = searchterm.replace("'", "\\'").replace('"', '\\"')
where = "WHERE UPPER(title) LIKE '{0}'".format(searchterm.upper())
else:
where = ""
sql = """SELECT id_rss, title, guid, isPermaLink, link, description, pubDate, keywords, {0}.id AS id,
titleb, type, xmlUrl, htmlUrl, keywordsb, {1}.id AS idblog
FROM {0}
INNER JOIN {1}
ON {0}.id_rss == {1}.id
{2}
""".format(self.table_posts, self.table_blogs, where)
cond = []
if len(blog_selection) > 0:
condition = ",".join(map(str, blog_selection))
cond.append(" id_rss in (%s)" % condition)
if len(post_selection) > 0:
condition = ",".join(map(str, post_selection))
cond.append("%s.id in (%s)" % (self.table_posts, condition))
if specific in ["today", "week", "twoday"]:
today = datetime.datetime.now() if now is None else now
day = datetime.datetime(2013, 1, 2) - datetime.datetime(2013, 1, 1)
dec = {"week": 7, "today": 1, "twoday": 2}.get(specific, 7)
mdat = today - day * dec
st = "pubDate >= '{0}'".format(mdat)
cond.append(st)
if len(cond) > 0:
sql += " WHERE " + " AND ".join(cond)
if sorted_:
sql += " ORDER BY pubDate DESC"
return sql
[docs] def enumerate_posts(self, blog_selection=None, post_selection=None, sorted_=True,
first=1000, specific=None, daily_freq=1.5, now=None,
addstatus=False, searchterm=None):
"""
Enumerates all the posts from the database if the blog id
belongs to a selection (or all if blog_selection is empty).
:param blog_selection: list of blogs to consider (or empty for all)
:param post_selection: list of posts to consider
:param sorted_: sorted by date
:param first: we only consider the first ``first``
:param specific: specific search
- None: all posts
- today: get all posts for today
- week: get all posts for last week
:param daily_freq: see parameter specific
:param now: if None, today means today, if not None, ``now`` will have the meaning of today
:param addstatus: if True, fetches the status of a blog
:param searchterm: if not None, filters using a SQL like search (using ``%``)
:return: enumeration of :class:`BlogPost <pyrsslocal.rss.rss_blogpost.BlogPost>`
:githublink:`%|py|344`
"""
if blog_selection is None:
blog_selection = []
if post_selection is None:
post_selection = []
self.connect()
sql = self.private_process_condition(
blog_selection, post_selection, sorted_,
specific, now, searchterm)
sql += " LIMIT %d" % first
for row in self.execute(sql):
row = list(row)
row[-2] = row[-2].split(",")
row[3] = row[3] == 1
blog = StreamRSS(* (row[-6:]))
row = row[:-6]
row[0] = blog
bl = BlogPost(*row)
if addstatus:
for st in self.enumerate_latest_status(bl.id, connect=False):
bl.add_status(st)
yield bl
self.close()
[docs] def enumerate_posts_status(self, blog_selection=None, post_selection=None,
sorted_=True, specific=None, now=None,
searchterm=None):
"""
Enumerate status.
:param blog_selection: list of blogs to consider (or empty for all)
:param post_selection: list of posts to consider
:param sorted_: sorted by date
:param specific: specific search
- None: all posts
- today: get all posts for today
- week: get all posts for last week
:param now: if None, today means today, if not None, ``now`` will have the meaning of today
:param searchterm: if not None, filters using a SQL like search (using ``%``)
:return: enumerate on values from ``table_stats`` ordered by decreasing time
:githublink:`%|py|387`
"""
if blog_selection is None:
blog_selection = []
if post_selection is None:
post_selection = []
self.connect()
sql_po = self.private_process_condition(
blog_selection, post_selection, sorted_,
specific, now, searchterm)
sql_st = """SELECT A.id_post, status, A.dtime FROM (
SELECT id_post, MAX(dtime) AS dtime FROM {0}
GROUP BY id_post) AS A
INNER JOIN {0}
ON A.id_post == {0}.id_post""".format(self.table_stats)
sql = """SELECT DISTINCT id_rss, title, guid, isPermaLink, link, description, pubDate, keywords, id,
titleb, type, xmlUrl, htmlUrl, keywordsb, idblog, status, dtime
FROM (
{0}
)
AS tA
INNER JOIN (
{1}
) AS tB
ON tA.id == tB.id_post""". format(sql_po, sql_st)
for row in self.execute(sql):
row = list(row)
row[-4] = row[-4].split(",")
row[3] = row[3] == 1
blog = StreamRSS(* (row[-8:-2]))
st = {"status": row[-2], "dtime": row[-1]}
row = row[:-8]
row[0] = blog
bl = BlogPost(*row)
bl.add_status(st)
yield bl
self.close()