From ff53b42b6fb1f06b3974308c061d01588fae5c90 Mon Sep 17 00:00:00 2001 From: John Doty Date: Wed, 10 Jul 2024 07:30:23 +0900 Subject: [PATCH] Subscribe to feeds --- cry/cli.py | 41 +++++--- cry/database.py | 263 +++++++++++++++++++++++++++++++++++++++--------- cry/feed.py | 229 ++++++++++++++++++++++++++--------------- 3 files changed, 391 insertions(+), 142 deletions(-) diff --git a/cry/cli.py b/cry/cli.py index d6248fe..d7ea8e6 100644 --- a/cry/cli.py +++ b/cry/cli.py @@ -1,5 +1,10 @@ +# https://simonwillison.net/2023/Sep/30/cli-tools-python/ +import asyncio import click +from . import feed +from . import database + @click.group() @click.version_option() @@ -7,15 +12,27 @@ def cli(): "Command line feed reader" -@cli.command(name="command") -@click.argument( - "example" -) -@click.option( - "-o", - "--option", - help="An example option", -) -def first_command(example, option): - "Command description goes here" - click.echo("Here is some output") +@cli.command(name="subscribe") +@click.argument("url") +def subscribe(url): + "Subscribe to a feed at the specified URL." + + db = database.Database.local() + + click.echo(f"Fetching {url} ...") + meta = feed.FeedMeta.from_url(url, db.origin) + d, meta = asyncio.run(feed.fetch_feed(meta)) + if d is None: + click.echo(f"Unable to fetch {url}") + return 1 + + # Check to see if this URL is already in the database. + existing = db.load_feed(meta.url) + if existing is not None: + click.echo(f"This feed already exists (as {meta.url})") + return 1 + + f = feed.Feed.from_parsed(d, meta) + db.store_feed(f) + + click.echo(f"Subscribed to {meta.url}") diff --git a/cry/database.py b/cry/database.py index 13dba9d..86b9797 100644 --- a/cry/database.py +++ b/cry/database.py @@ -1,25 +1,13 @@ import pathlib +import random +import socket import sqlite3 +import string import typing +import platformdirs -def get_property(db: sqlite3.Connection, prop: str, default=None) -> typing.Any: - cursor = db.execute("SELECT value FROM properties WHERE name=?", (prop,)) - result = cursor.fetchone() - if result is None: - return default - return result[0] - - -def set_property(db: sqlite3.Connection, prop: str, value): - db.execute( - """ - INSERT INTO properties (name, value) VALUES (?, ?) - ON CONFLICT DO UPDATE SET value=excluded.value - """, - (prop, value), - ) - +from . import feed SCHEMA_STATEMENTS = [ """ @@ -35,12 +23,13 @@ SCHEMA_STATEMENTS = [ ); CREATE TABLE entries( - id VARCHAR NOT NULL PRIMARY KEY, + id VARCHAR NOT NULL, inserted_at INTEGER NOT NULL, - feed_url VARCHAR, + feed_url VARCHAR NOT NULL, title VARCHAR, link VARCHAR, - FOREIGN KEY feed_url REFERENCES feeds(url) + PRIMARY KEY (id, feed_url), + FOREIGN KEY (feed_url) REFERENCES feeds(url) ON UPDATE CASCADE ON DELETE CASCADE ); @@ -48,38 +37,214 @@ SCHEMA_STATEMENTS = [ ] -def ensure_database_schema(db: sqlite3.Connection): - with db: - db.execute( - """ - CREATE TABLE IF NOT EXISTS properties ( - name VARCHAR NOT NULL PRIMARY KEY, - value VARCHAR NOT NULL - ) - """ +def origin_path() -> pathlib.Path: + return platformdirs.user_data_path("cry", "cry") / "origin" + + +def local_origin(path: pathlib.Path | None = None) -> str: + if path is None: + path = origin_path() + + if path.exists(): + with open(path, "r", encoding="utf-8") as f: + return f.read().strip() + + host = socket.gethostname() + slug = "".join( + random.choices( + string.ascii_uppercase + string.ascii_lowercase + string.digits, k=8 ) - version = int(get_property(db, "version", 0)) - for script in SCHEMA_STATEMENTS[version:]: - for statement in script.split(";"): - db.execute(statement) - set_property(db, "version", len(SCHEMA_STATEMENTS)) + ) + origin = f"{host}-{slug}" - -def database_path() -> pathlib.Path: - # TODO: Determine the name/slug from local state if necessary - return pathlib.Path.home() / "Dropbox" / "cry" / "testing-slug.db" - - -def connect_database(path: pathlib.Path) -> sqlite3.Connection: path.parent.mkdir(parents=True, exist_ok=True) - connection = sqlite3.Connection(str(path), autocommit=False) - connection.execute("PRAGMA foreign_keys = ON") - return connection + with open(path, "w", encoding="utf-8") as f: + f.write(origin) + + return origin -def setup_database() -> sqlite3.Connection: - db_path = database_path() - db = connect_database(db_path) - ensure_database_schema(db) +def database_path(origin: str) -> pathlib.Path: + # TODO: Determine the name/slug from local state if necessary + return pathlib.Path.home() / "Dropbox" / "cry" / f"{origin}.db" - return db + +class Database: + db: sqlite3.Connection + origin: str + + def __init__(self, path: pathlib.Path | str, origin: str): + if not isinstance(path, str): + path.parent.mkdir(parents=True, exist_ok=True) + db = sqlite3.Connection(str(path), autocommit=False) + db.execute("PRAGMA foreign_keys = ON") + self.db = db + self.origin = origin + + @classmethod + def local(cls, origin: str | None = None) -> "Database": + if origin is None: + origin = local_origin() + + db = Database(database_path(origin), origin) + db.ensure_database_schema() + return db + + def get_property(self, prop: str, default=None) -> typing.Any: + cursor = self.db.execute("SELECT value FROM properties WHERE name=?", (prop,)) + result = cursor.fetchone() + if result is None: + return default + return result[0] + + def set_property(self, prop: str, value): + self.db.execute( + """ + INSERT INTO properties (name, value) VALUES (?, ?) + ON CONFLICT DO UPDATE SET value=excluded.value + """, + (prop, value), + ) + + def ensure_database_schema(self): + with self.db: + self.db.execute( + """ + CREATE TABLE IF NOT EXISTS properties ( + name VARCHAR NOT NULL PRIMARY KEY, + value VARCHAR NOT NULL + ) + """ + ) + version = int(self.get_property("version", 0)) + for script in SCHEMA_STATEMENTS[version:]: + for statement in script.split(";"): + try: + self.db.execute(statement) + except Exception as e: + raise Exception(f"Error executing:\n{statement}") from e + self.set_property("version", len(SCHEMA_STATEMENTS)) + self.set_property("origin", self.origin) + + def load_feed(self, url: str) -> feed.Feed | None: + cursor = self.db.execute( + """ + SELECT + last_fetched_ts, + retry_after_ts, + status, + etag, + modified, + title, + link + FROM feeds + WHERE url=? + """, + [url], + ) + + row = cursor.fetchone() + if row is None: + return None + + last_fetched_ts, retry_after_ts, status, etag, modified, title, link = row + meta = feed.FeedMeta( + url=url, + last_fetched_ts=last_fetched_ts, + retry_after_ts=retry_after_ts, + status=status, + etag=etag, + modified=modified, + origin=self.origin, + ) + + cursor = self.db.execute( + """ + SELECT + id, + inserted_at, + title, + link + FROM entries + WHERE feed_url=? + """, + [url], + ) + + rows = cursor.fetchall() + entries = [ + feed.Entry(id=id, inserted_at=inserted_at, title=title, link=link) + for id, inserted_at, title, link in rows + ] + + return feed.Feed(meta=meta, title=title, link=link, entries=entries) + + def store_feed(self, f: feed.Feed): + with self.db: + self.db.execute( + """ + INSERT INTO feeds ( + url, + last_fetched_ts, + retry_after_ts, + status, + etag, + modified, + title, + link + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT DO UPDATE + SET + last_fetched_ts=excluded.last_fetched_ts, + retry_after_ts=excluded.retry_after_ts, + status=excluded.status, + etag=excluded.etag, + modified=excluded.modified, + title=excluded.title, + link=excluded.link + """, + [ + f.meta.url, + f.meta.last_fetched_ts, + f.meta.retry_after_ts, + f.meta.status, + f.meta.etag, + f.meta.modified, + f.title, + f.link, + ], + ) + + self.db.executemany( + """ + INSERT INTO entries ( + id, + inserted_at, + feed_url, + title, + link + ) VALUES (?, ?, ?, ?, ?) + ON CONFLICT DO UPDATE + SET + -- NOTE: This is also part of the feed merge algorithm, BUT + -- we implement it here because feeds tend to be rolling + -- windows over some external content and we don't want + -- to read and write the entire feed just to update the + -- few new items. But we can't just do ON CONFLICT DO + -- NOTHING because we *might* be storing a feed where we + -- resolved conflicts with another instance. So we want + -- to handle all the cases. (In theory we could make two + -- different INSERTs to handle the two cases but that is + -- more complexity than it is worth.) + inserted_at=MIN(inserted_at, excluded.inserted_at), + title=CASE + WHEN inserted_at < excluded.inserted_at THEN title + ELSE excluded.title + END, + link=CASE + WHEN inserted_at < excluded.inserted_at THEN link + ELSE excluded.link + END + """, + [(e.id, e.inserted_at, f.meta.url, e.title, e.link) for e in f.entries], + ) diff --git a/cry/feed.py b/cry/feed.py index 172d445..3539543 100644 --- a/cry/feed.py +++ b/cry/feed.py @@ -5,7 +5,6 @@ import functools import logging import time import typing -import pathlib import hashlib import html.parser import io @@ -15,8 +14,6 @@ import feedparser import requests import requests.structures -import database -import opml LOG = logging.getLogger(__name__) @@ -37,9 +34,10 @@ class FeedMeta: status: int etag: str | None modified: str | None + origin: str @classmethod - def from_url(cls, url: str) -> "FeedMeta": + def from_url(cls, url: str, origin: str) -> "FeedMeta": return FeedMeta( url=url, last_fetched_ts=0, @@ -47,24 +45,10 @@ class FeedMeta: status=FEED_STATUS_ALIVE, etag=None, modified=None, + origin=origin, ) -@dataclasses.dataclass(frozen=True) -class Entry: - id: str - title: str - link: str | None - - -@dataclasses.dataclass(frozen=True) -class Feed: - meta: FeedMeta - title: str - link: str - entries: list[Entry] - - def the_worst_element_hash(value) -> str: """Compute a content hash for the given feed element, to use as an ID. @@ -142,39 +126,6 @@ def clean_text(text: str) -> str: return MULTI_SPACES.sub(" ", writer.getvalue()) -def entry_from_feed(entry: feedparser.FeedParserDict) -> Entry: - """Convert an entry from feedparser into an Entry by extracting the - things we care about, fudging things and substituting things as - necessary. - """ - title = entry.get("title") - if not title: - title = entry.get("description") - - id = entry.get("id") - - link = entry.get("link") - if id and not link: - linkid = str(id).lower() - if linkid.startswith("http:") or linkid.startswith("https:"): - link = linkid - - if link and not id: - id = link - if title and not id: - id = title - if not id: - id = entry.get("published") - if not id: - id = the_worst_element_hash(entry) - - assert isinstance(id, str) - assert link is None or isinstance(link, str) - - title = clean_text(str(title)) - return Entry(id=id, title=title, link=link) - - async def fetch_feed( feed: FeedMeta, ) -> typing.Tuple[feedparser.FeedParserDict | None, FeedMeta]: @@ -288,40 +239,156 @@ async def fetch_feed( return (parsed, feed) -async def main() -> None: - database.setup_database() +@dataclasses.dataclass(frozen=True) +class Entry: + id: str + inserted_at: int + title: str + link: str | None - feeds = [ - FeedMeta.from_url(url) - for url in opml.load_opml(pathlib.Path.home() / "Downloads" / "fraidycat.opml") - ] - async with asyncio.TaskGroup() as group: - tasks = [group.create_task(fetch_feed(f)) for f in feeds] - results = [t.result() for t in tasks] + @classmethod + def from_parsed(cls, entry: feedparser.FeedParserDict, insert_time: int) -> "Entry": + """Convert an entry from feedparser into an Entry by extracting the + things we care about, fudging things and substituting things as + necessary. - for d, meta in results: - if d is not None: - title = None - page_url = None + The one thing we need from the outside is the "insert time", which + is *almost* `int(time.time())` but needs a little bit of fudging in + order to ensure that we can keep the items in order when we get a lot + of them all at once. + """ + title = entry.get("title") + if not title: + title = entry.get("description") - if d.feed is not None: - title = d.feed.get("title") - page_url = d.feed.get("link") + id = entry.get("id") - if title is None or title == "": - title = meta.url - if page_url is None: - page_url = meta.url + link = entry.get("link") + if id and not link: + linkid = str(id).lower() + if linkid.startswith("http:") or linkid.startswith("https:"): + link = linkid - print(f"[{title}]({page_url})") - print(f"{meta}") + if link and not id: + id = link + if title and not id: + id = title + if not id: + id = entry.get("published") + if not id: + id = the_worst_element_hash(entry) - entries = [entry_from_feed(e) for e in d.entries] - for entry in entries: - print(f" - {entry.title} ({entry.id})") - print(f" {entry.link}") - print() + assert isinstance(id, str) + assert link is None or isinstance(link, str) + + title = clean_text(str(title)) + return Entry(id=id, inserted_at=insert_time, title=title, link=link) -if __name__ == "__main__": - asyncio.run(main()) +@dataclasses.dataclass(frozen=True) +class Feed: + meta: FeedMeta + title: str + link: str + entries: list[Entry] + + @classmethod + def from_parsed(cls, d: feedparser.FeedParserDict, meta: FeedMeta) -> "Feed": + title = None + link = None + + if d.feed is not None: + title = d.feed.get("title") + link = d.feed.get("link") + + if title is None or title == "": + title = meta.url + if link is None: + link = meta.url + + # ===================================================================== + # FEED AND ENTRY ORDERING! + # ===================================================================== + # In many ways this is the most critical part of a feed reader: in + # what order do we show the items in the feed? + # + # RSS is pretty unspecified in general, but also in what the meaning + # of the order of the entries in the feed actually is. (I can't + # remember if this is something that Atom specifies but it doesn't + # matter because RSS is still really popular, even in the ungodly + # late year of 2024. + # + # *We* want to show posts in reverse chronological order, of course, + # but we still have problems. You *cannot* trust the dates and times + # in the entries. Sure, sure, Atom does a great job of specifying at + # least three different timestamps in the feed, and they are supposed + # to have time zones and whatnot. But: + # + # a) Any kind of timestamp is optional in RSS, and + # b) Even if the timestamp is present, it can come in a variety of + # formats (which theoretically `feedparser` handles), but + # c) Even if you can parse the timestamp, many feed implementations + # just PUT THE WRONG TIME IN THERE. + # + # The only coherent thing to do is to ignore the dates in the feeds + # and just rely on our own sense of time. This comes with its own + # problems, of course: our clock can be highly unreliable. But in + # general it's good enough to work with, and feeds don't update so + # frequently that we need to worry about most of these problems if we + # use unix timestamps as our basis. + # + # If we just use our own timestamps, then what do we do with feed + # updates where multiple items are inserted at once? We want to + # preserve that ordering too! Our hack is to multiply the unix + # timestamp by 1000, and then use the lower two digits as a sequence + # number. (Maybe it looks like everything was posted a millisecond + # apart?) There's a *chance* of conflict if: + # + # a) a feed as more than 1000 items in it, and + # b) we update the feed again less than a second later + # + # But given the other rate limiting features in this RSS system (The + # `retry_after_ts` field, etc.) it's not a very likely thing to + # happen. + # + insert_time = int(time.time()) * 1000 + entries = [ + Entry.from_parsed(e, insert_time + i) + for i, e in enumerate(reversed(d.entries)) + ] + entries.reverse() + + return Feed(meta=meta, title=title, link=link, entries=entries) + + +def merge_feeds(a: Feed, b: Feed) -> Feed: + """Merge two known feeds. There are two conflict resolution policies: + + 1. The newer fetch of feed metadata wins. + 2. The older fetch of a feed item wins. + + This means that the merge order between feeds *should* be consistent, + unless somehow the feeds updated at the exact same time. In that case, + the feed with the lexographically smallest slug wins. + """ + results = {e.id: e for e in a.entries} + for entry in b.entries: + existing = results.get(entry.id) + if existing is None or existing.inserted_at > entry.inserted_at: + results[entry.id] = entry + + entries = sorted(results.values(), key=lambda e: e.inserted_at, reverse=True) + source_feed = a + if a.meta.last_fetched_ts > b.meta.last_fetched_ts: + source_feed = a + elif a.meta.last_fetched_ts == b.meta.last_fetched_ts: + source_feed = a if a.meta.origin < b.meta.origin else b + else: + source_feed = b + + return Feed( + meta=source_feed.meta, + title=source_feed.title, + link=source_feed.link, + entries=entries, + )