Subscribe to feeds

This commit is contained in:
John Doty 2024-07-10 07:30:23 +09:00
parent 6be6afdbc3
commit ff53b42b6f
3 changed files with 391 additions and 142 deletions

View file

@ -1,5 +1,10 @@
# https://simonwillison.net/2023/Sep/30/cli-tools-python/
import asyncio
import click
from . import feed
from . import database
@click.group()
@click.version_option()
@ -7,15 +12,27 @@ def cli():
"Command line feed reader"
@cli.command(name="command")
@click.argument(
"example"
)
@click.option(
"-o",
"--option",
help="An example option",
)
def first_command(example, option):
"Command description goes here"
click.echo("Here is some output")
@cli.command(name="subscribe")
@click.argument("url")
def subscribe(url):
"Subscribe to a feed at the specified URL."
db = database.Database.local()
click.echo(f"Fetching {url} ...")
meta = feed.FeedMeta.from_url(url, db.origin)
d, meta = asyncio.run(feed.fetch_feed(meta))
if d is None:
click.echo(f"Unable to fetch {url}")
return 1
# Check to see if this URL is already in the database.
existing = db.load_feed(meta.url)
if existing is not None:
click.echo(f"This feed already exists (as {meta.url})")
return 1
f = feed.Feed.from_parsed(d, meta)
db.store_feed(f)
click.echo(f"Subscribed to {meta.url}")

View file

@ -1,25 +1,13 @@
import pathlib
import random
import socket
import sqlite3
import string
import typing
import platformdirs
def get_property(db: sqlite3.Connection, prop: str, default=None) -> typing.Any:
cursor = db.execute("SELECT value FROM properties WHERE name=?", (prop,))
result = cursor.fetchone()
if result is None:
return default
return result[0]
def set_property(db: sqlite3.Connection, prop: str, value):
db.execute(
"""
INSERT INTO properties (name, value) VALUES (?, ?)
ON CONFLICT DO UPDATE SET value=excluded.value
""",
(prop, value),
)
from . import feed
SCHEMA_STATEMENTS = [
"""
@ -35,12 +23,13 @@ SCHEMA_STATEMENTS = [
);
CREATE TABLE entries(
id VARCHAR NOT NULL PRIMARY KEY,
id VARCHAR NOT NULL,
inserted_at INTEGER NOT NULL,
feed_url VARCHAR,
feed_url VARCHAR NOT NULL,
title VARCHAR,
link VARCHAR,
FOREIGN KEY feed_url REFERENCES feeds(url)
PRIMARY KEY (id, feed_url),
FOREIGN KEY (feed_url) REFERENCES feeds(url)
ON UPDATE CASCADE
ON DELETE CASCADE
);
@ -48,38 +37,214 @@ SCHEMA_STATEMENTS = [
]
def ensure_database_schema(db: sqlite3.Connection):
with db:
db.execute(
"""
CREATE TABLE IF NOT EXISTS properties (
name VARCHAR NOT NULL PRIMARY KEY,
value VARCHAR NOT NULL
)
"""
def origin_path() -> pathlib.Path:
return platformdirs.user_data_path("cry", "cry") / "origin"
def local_origin(path: pathlib.Path | None = None) -> str:
if path is None:
path = origin_path()
if path.exists():
with open(path, "r", encoding="utf-8") as f:
return f.read().strip()
host = socket.gethostname()
slug = "".join(
random.choices(
string.ascii_uppercase + string.ascii_lowercase + string.digits, k=8
)
version = int(get_property(db, "version", 0))
for script in SCHEMA_STATEMENTS[version:]:
for statement in script.split(";"):
db.execute(statement)
set_property(db, "version", len(SCHEMA_STATEMENTS))
)
origin = f"{host}-{slug}"
def database_path() -> pathlib.Path:
# TODO: Determine the name/slug from local state if necessary
return pathlib.Path.home() / "Dropbox" / "cry" / "testing-slug.db"
def connect_database(path: pathlib.Path) -> sqlite3.Connection:
path.parent.mkdir(parents=True, exist_ok=True)
connection = sqlite3.Connection(str(path), autocommit=False)
connection.execute("PRAGMA foreign_keys = ON")
return connection
with open(path, "w", encoding="utf-8") as f:
f.write(origin)
return origin
def setup_database() -> sqlite3.Connection:
db_path = database_path()
db = connect_database(db_path)
ensure_database_schema(db)
def database_path(origin: str) -> pathlib.Path:
# TODO: Determine the name/slug from local state if necessary
return pathlib.Path.home() / "Dropbox" / "cry" / f"{origin}.db"
return db
class Database:
db: sqlite3.Connection
origin: str
def __init__(self, path: pathlib.Path | str, origin: str):
if not isinstance(path, str):
path.parent.mkdir(parents=True, exist_ok=True)
db = sqlite3.Connection(str(path), autocommit=False)
db.execute("PRAGMA foreign_keys = ON")
self.db = db
self.origin = origin
@classmethod
def local(cls, origin: str | None = None) -> "Database":
if origin is None:
origin = local_origin()
db = Database(database_path(origin), origin)
db.ensure_database_schema()
return db
def get_property(self, prop: str, default=None) -> typing.Any:
cursor = self.db.execute("SELECT value FROM properties WHERE name=?", (prop,))
result = cursor.fetchone()
if result is None:
return default
return result[0]
def set_property(self, prop: str, value):
self.db.execute(
"""
INSERT INTO properties (name, value) VALUES (?, ?)
ON CONFLICT DO UPDATE SET value=excluded.value
""",
(prop, value),
)
def ensure_database_schema(self):
with self.db:
self.db.execute(
"""
CREATE TABLE IF NOT EXISTS properties (
name VARCHAR NOT NULL PRIMARY KEY,
value VARCHAR NOT NULL
)
"""
)
version = int(self.get_property("version", 0))
for script in SCHEMA_STATEMENTS[version:]:
for statement in script.split(";"):
try:
self.db.execute(statement)
except Exception as e:
raise Exception(f"Error executing:\n{statement}") from e
self.set_property("version", len(SCHEMA_STATEMENTS))
self.set_property("origin", self.origin)
def load_feed(self, url: str) -> feed.Feed | None:
cursor = self.db.execute(
"""
SELECT
last_fetched_ts,
retry_after_ts,
status,
etag,
modified,
title,
link
FROM feeds
WHERE url=?
""",
[url],
)
row = cursor.fetchone()
if row is None:
return None
last_fetched_ts, retry_after_ts, status, etag, modified, title, link = row
meta = feed.FeedMeta(
url=url,
last_fetched_ts=last_fetched_ts,
retry_after_ts=retry_after_ts,
status=status,
etag=etag,
modified=modified,
origin=self.origin,
)
cursor = self.db.execute(
"""
SELECT
id,
inserted_at,
title,
link
FROM entries
WHERE feed_url=?
""",
[url],
)
rows = cursor.fetchall()
entries = [
feed.Entry(id=id, inserted_at=inserted_at, title=title, link=link)
for id, inserted_at, title, link in rows
]
return feed.Feed(meta=meta, title=title, link=link, entries=entries)
def store_feed(self, f: feed.Feed):
with self.db:
self.db.execute(
"""
INSERT INTO feeds (
url,
last_fetched_ts,
retry_after_ts,
status,
etag,
modified,
title,
link
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT DO UPDATE
SET
last_fetched_ts=excluded.last_fetched_ts,
retry_after_ts=excluded.retry_after_ts,
status=excluded.status,
etag=excluded.etag,
modified=excluded.modified,
title=excluded.title,
link=excluded.link
""",
[
f.meta.url,
f.meta.last_fetched_ts,
f.meta.retry_after_ts,
f.meta.status,
f.meta.etag,
f.meta.modified,
f.title,
f.link,
],
)
self.db.executemany(
"""
INSERT INTO entries (
id,
inserted_at,
feed_url,
title,
link
) VALUES (?, ?, ?, ?, ?)
ON CONFLICT DO UPDATE
SET
-- NOTE: This is also part of the feed merge algorithm, BUT
-- we implement it here because feeds tend to be rolling
-- windows over some external content and we don't want
-- to read and write the entire feed just to update the
-- few new items. But we can't just do ON CONFLICT DO
-- NOTHING because we *might* be storing a feed where we
-- resolved conflicts with another instance. So we want
-- to handle all the cases. (In theory we could make two
-- different INSERTs to handle the two cases but that is
-- more complexity than it is worth.)
inserted_at=MIN(inserted_at, excluded.inserted_at),
title=CASE
WHEN inserted_at < excluded.inserted_at THEN title
ELSE excluded.title
END,
link=CASE
WHEN inserted_at < excluded.inserted_at THEN link
ELSE excluded.link
END
""",
[(e.id, e.inserted_at, f.meta.url, e.title, e.link) for e in f.entries],
)

View file

@ -5,7 +5,6 @@ import functools
import logging
import time
import typing
import pathlib
import hashlib
import html.parser
import io
@ -15,8 +14,6 @@ import feedparser
import requests
import requests.structures
import database
import opml
LOG = logging.getLogger(__name__)
@ -37,9 +34,10 @@ class FeedMeta:
status: int
etag: str | None
modified: str | None
origin: str
@classmethod
def from_url(cls, url: str) -> "FeedMeta":
def from_url(cls, url: str, origin: str) -> "FeedMeta":
return FeedMeta(
url=url,
last_fetched_ts=0,
@ -47,24 +45,10 @@ class FeedMeta:
status=FEED_STATUS_ALIVE,
etag=None,
modified=None,
origin=origin,
)
@dataclasses.dataclass(frozen=True)
class Entry:
id: str
title: str
link: str | None
@dataclasses.dataclass(frozen=True)
class Feed:
meta: FeedMeta
title: str
link: str
entries: list[Entry]
def the_worst_element_hash(value) -> str:
"""Compute a content hash for the given feed element, to use as an ID.
@ -142,39 +126,6 @@ def clean_text(text: str) -> str:
return MULTI_SPACES.sub(" ", writer.getvalue())
def entry_from_feed(entry: feedparser.FeedParserDict) -> Entry:
"""Convert an entry from feedparser into an Entry by extracting the
things we care about, fudging things and substituting things as
necessary.
"""
title = entry.get("title")
if not title:
title = entry.get("description")
id = entry.get("id")
link = entry.get("link")
if id and not link:
linkid = str(id).lower()
if linkid.startswith("http:") or linkid.startswith("https:"):
link = linkid
if link and not id:
id = link
if title and not id:
id = title
if not id:
id = entry.get("published")
if not id:
id = the_worst_element_hash(entry)
assert isinstance(id, str)
assert link is None or isinstance(link, str)
title = clean_text(str(title))
return Entry(id=id, title=title, link=link)
async def fetch_feed(
feed: FeedMeta,
) -> typing.Tuple[feedparser.FeedParserDict | None, FeedMeta]:
@ -288,40 +239,156 @@ async def fetch_feed(
return (parsed, feed)
async def main() -> None:
database.setup_database()
@dataclasses.dataclass(frozen=True)
class Entry:
id: str
inserted_at: int
title: str
link: str | None
feeds = [
FeedMeta.from_url(url)
for url in opml.load_opml(pathlib.Path.home() / "Downloads" / "fraidycat.opml")
]
async with asyncio.TaskGroup() as group:
tasks = [group.create_task(fetch_feed(f)) for f in feeds]
results = [t.result() for t in tasks]
@classmethod
def from_parsed(cls, entry: feedparser.FeedParserDict, insert_time: int) -> "Entry":
"""Convert an entry from feedparser into an Entry by extracting the
things we care about, fudging things and substituting things as
necessary.
for d, meta in results:
if d is not None:
title = None
page_url = None
The one thing we need from the outside is the "insert time", which
is *almost* `int(time.time())` but needs a little bit of fudging in
order to ensure that we can keep the items in order when we get a lot
of them all at once.
"""
title = entry.get("title")
if not title:
title = entry.get("description")
if d.feed is not None:
title = d.feed.get("title")
page_url = d.feed.get("link")
id = entry.get("id")
if title is None or title == "":
title = meta.url
if page_url is None:
page_url = meta.url
link = entry.get("link")
if id and not link:
linkid = str(id).lower()
if linkid.startswith("http:") or linkid.startswith("https:"):
link = linkid
print(f"[{title}]({page_url})")
print(f"{meta}")
if link and not id:
id = link
if title and not id:
id = title
if not id:
id = entry.get("published")
if not id:
id = the_worst_element_hash(entry)
entries = [entry_from_feed(e) for e in d.entries]
for entry in entries:
print(f" - {entry.title} ({entry.id})")
print(f" {entry.link}")
print()
assert isinstance(id, str)
assert link is None or isinstance(link, str)
title = clean_text(str(title))
return Entry(id=id, inserted_at=insert_time, title=title, link=link)
if __name__ == "__main__":
asyncio.run(main())
@dataclasses.dataclass(frozen=True)
class Feed:
meta: FeedMeta
title: str
link: str
entries: list[Entry]
@classmethod
def from_parsed(cls, d: feedparser.FeedParserDict, meta: FeedMeta) -> "Feed":
title = None
link = None
if d.feed is not None:
title = d.feed.get("title")
link = d.feed.get("link")
if title is None or title == "":
title = meta.url
if link is None:
link = meta.url
# =====================================================================
# FEED AND ENTRY ORDERING!
# =====================================================================
# In many ways this is the most critical part of a feed reader: in
# what order do we show the items in the feed?
#
# RSS is pretty unspecified in general, but also in what the meaning
# of the order of the entries in the feed actually is. (I can't
# remember if this is something that Atom specifies but it doesn't
# matter because RSS is still really popular, even in the ungodly
# late year of 2024.
#
# *We* want to show posts in reverse chronological order, of course,
# but we still have problems. You *cannot* trust the dates and times
# in the entries. Sure, sure, Atom does a great job of specifying at
# least three different timestamps in the feed, and they are supposed
# to have time zones and whatnot. But:
#
# a) Any kind of timestamp is optional in RSS, and
# b) Even if the timestamp is present, it can come in a variety of
# formats (which theoretically `feedparser` handles), but
# c) Even if you can parse the timestamp, many feed implementations
# just PUT THE WRONG TIME IN THERE.
#
# The only coherent thing to do is to ignore the dates in the feeds
# and just rely on our own sense of time. This comes with its own
# problems, of course: our clock can be highly unreliable. But in
# general it's good enough to work with, and feeds don't update so
# frequently that we need to worry about most of these problems if we
# use unix timestamps as our basis.
#
# If we just use our own timestamps, then what do we do with feed
# updates where multiple items are inserted at once? We want to
# preserve that ordering too! Our hack is to multiply the unix
# timestamp by 1000, and then use the lower two digits as a sequence
# number. (Maybe it looks like everything was posted a millisecond
# apart?) There's a *chance* of conflict if:
#
# a) a feed as more than 1000 items in it, and
# b) we update the feed again less than a second later
#
# But given the other rate limiting features in this RSS system (The
# `retry_after_ts` field, etc.) it's not a very likely thing to
# happen.
#
insert_time = int(time.time()) * 1000
entries = [
Entry.from_parsed(e, insert_time + i)
for i, e in enumerate(reversed(d.entries))
]
entries.reverse()
return Feed(meta=meta, title=title, link=link, entries=entries)
def merge_feeds(a: Feed, b: Feed) -> Feed:
"""Merge two known feeds. There are two conflict resolution policies:
1. The newer fetch of feed metadata wins.
2. The older fetch of a feed item wins.
This means that the merge order between feeds *should* be consistent,
unless somehow the feeds updated at the exact same time. In that case,
the feed with the lexographically smallest slug wins.
"""
results = {e.id: e for e in a.entries}
for entry in b.entries:
existing = results.get(entry.id)
if existing is None or existing.inserted_at > entry.inserted_at:
results[entry.id] = entry
entries = sorted(results.values(), key=lambda e: e.inserted_at, reverse=True)
source_feed = a
if a.meta.last_fetched_ts > b.meta.last_fetched_ts:
source_feed = a
elif a.meta.last_fetched_ts == b.meta.last_fetched_ts:
source_feed = a if a.meta.origin < b.meta.origin else b
else:
source_feed = b
return Feed(
meta=source_feed.meta,
title=source_feed.title,
link=source_feed.link,
entries=entries,
)