Move stuff over from previous incarnation into new frame

Thanks to Simon Wilson for the layout
This commit is contained in:
John Doty 2024-07-08 07:32:37 +09:00
commit 6be6afdbc3
15 changed files with 799 additions and 0 deletions

0
cry/__init__.py Normal file
View file

4
cry/__main__.py Normal file
View file

@ -0,0 +1,4 @@
from .cli import cli
if __name__ == "__main__":
cli()

21
cry/cli.py Normal file
View file

@ -0,0 +1,21 @@
import click
@click.group()
@click.version_option()
def cli():
"Command line feed reader"
@cli.command(name="command")
@click.argument(
"example"
)
@click.option(
"-o",
"--option",
help="An example option",
)
def first_command(example, option):
"Command description goes here"
click.echo("Here is some output")

85
cry/database.py Normal file
View file

@ -0,0 +1,85 @@
import pathlib
import sqlite3
import typing
def get_property(db: sqlite3.Connection, prop: str, default=None) -> typing.Any:
cursor = db.execute("SELECT value FROM properties WHERE name=?", (prop,))
result = cursor.fetchone()
if result is None:
return default
return result[0]
def set_property(db: sqlite3.Connection, prop: str, value):
db.execute(
"""
INSERT INTO properties (name, value) VALUES (?, ?)
ON CONFLICT DO UPDATE SET value=excluded.value
""",
(prop, value),
)
SCHEMA_STATEMENTS = [
"""
CREATE TABLE feeds (
url VARCHAR NOT NULL PRIMARY KEY,
last_fetched_ts INTEGER NOT NULL,
retry_after_ts INTEGER NOT NULL,
status INTEGER NOT NULL,
etag VARCHAR,
modified VARCHAR,
title VARCHAR,
link VARCHAR
);
CREATE TABLE entries(
id VARCHAR NOT NULL PRIMARY KEY,
inserted_at INTEGER NOT NULL,
feed_url VARCHAR,
title VARCHAR,
link VARCHAR,
FOREIGN KEY feed_url REFERENCES feeds(url)
ON UPDATE CASCADE
ON DELETE CASCADE
);
"""
]
def ensure_database_schema(db: sqlite3.Connection):
with db:
db.execute(
"""
CREATE TABLE IF NOT EXISTS properties (
name VARCHAR NOT NULL PRIMARY KEY,
value VARCHAR NOT NULL
)
"""
)
version = int(get_property(db, "version", 0))
for script in SCHEMA_STATEMENTS[version:]:
for statement in script.split(";"):
db.execute(statement)
set_property(db, "version", len(SCHEMA_STATEMENTS))
def database_path() -> pathlib.Path:
# TODO: Determine the name/slug from local state if necessary
return pathlib.Path.home() / "Dropbox" / "cry" / "testing-slug.db"
def connect_database(path: pathlib.Path) -> sqlite3.Connection:
path.parent.mkdir(parents=True, exist_ok=True)
connection = sqlite3.Connection(str(path), autocommit=False)
connection.execute("PRAGMA foreign_keys = ON")
return connection
def setup_database() -> sqlite3.Connection:
db_path = database_path()
db = connect_database(db_path)
ensure_database_schema(db)
return db

327
cry/feed.py Normal file
View file

@ -0,0 +1,327 @@
# I guess this is it.
import asyncio
import dataclasses
import functools
import logging
import time
import typing
import pathlib
import hashlib
import html.parser
import io
import re
import feedparser
import requests
import requests.structures
import database
import opml
LOG = logging.getLogger(__name__)
FEED_STATUS_DEAD = 0
FEED_STATUS_ALIVE = 1
FEED_STATUS_MISSING = 2
# TODO: Consider configuration here.
http = requests.Session()
@dataclasses.dataclass(frozen=True)
class FeedMeta:
url: str
last_fetched_ts: int
retry_after_ts: int
status: int
etag: str | None
modified: str | None
@classmethod
def from_url(cls, url: str) -> "FeedMeta":
return FeedMeta(
url=url,
last_fetched_ts=0,
retry_after_ts=0,
status=FEED_STATUS_ALIVE,
etag=None,
modified=None,
)
@dataclasses.dataclass(frozen=True)
class Entry:
id: str
title: str
link: str | None
@dataclasses.dataclass(frozen=True)
class Feed:
meta: FeedMeta
title: str
link: str
entries: list[Entry]
def the_worst_element_hash(value) -> str:
"""Compute a content hash for the given feed element, to use as an ID.
The hash must be as stable as we can make it, but obviously there are things
we cannot control. If we've gotten here then the feed author has already
failed us and there's little we can do. This is already *known to be wrong.*
"""
def process(value, hash):
if isinstance(value, feedparser.FeedParserDict):
hash.update(b"dict")
keys = sorted(value.keys())
for key in keys:
hash.update(b"key::")
hash.update(key.encode("utf-8"))
hash.update(b"value::")
process(value[key], hash)
hash.update(b"tcid")
elif isinstance(value, str):
hash.update(b"str")
hash.update(value.encode("utf-8"))
hash.update(b"rts")
elif isinstance(value, list):
hash.update(b"list")
for item in value:
process(item, hash)
hash.update(b"tsil")
elif isinstance(value, tuple):
hash.update(b"tuple")
for item in value:
process(item, hash)
hash.update(b"elput")
hash = hashlib.sha256(usedforsecurity=False)
process(value, hash)
return hash.hexdigest()
BLANK_TAGS = {"p", "br", "li", "div", "img"}
MULTI_SPACES = re.compile(r"\s+")
def clean_text(text: str) -> str:
"""Sometimes text is HTML and otherwise ugly. This reduces it to
something pretty to display. Strips tags, puts blank space in between
elements that should generate blank space, and then collapses blank
spaces down to one.
"""
class Cleaner(html.parser.HTMLParser):
def __init__(self, writer):
super().__init__()
self.writer = writer
def handle_data(self, data: str) -> None:
self.writer.write(data)
def handle_startendtag(
self, tag: str, attrs: list[tuple[str, str | None]]
) -> None:
del attrs
if tag.lower() in BLANK_TAGS:
self.writer.write(" ")
def handle_starttag(
self, tag: str, attrs: list[tuple[str, str | None]]
) -> None:
del attrs
if tag.lower() in BLANK_TAGS:
self.writer.write(" ")
writer = io.StringIO()
cleaner = Cleaner(writer)
cleaner.feed(text)
return MULTI_SPACES.sub(" ", writer.getvalue())
def entry_from_feed(entry: feedparser.FeedParserDict) -> Entry:
"""Convert an entry from feedparser into an Entry by extracting the
things we care about, fudging things and substituting things as
necessary.
"""
title = entry.get("title")
if not title:
title = entry.get("description")
id = entry.get("id")
link = entry.get("link")
if id and not link:
linkid = str(id).lower()
if linkid.startswith("http:") or linkid.startswith("https:"):
link = linkid
if link and not id:
id = link
if title and not id:
id = title
if not id:
id = entry.get("published")
if not id:
id = the_worst_element_hash(entry)
assert isinstance(id, str)
assert link is None or isinstance(link, str)
title = clean_text(str(title))
return Entry(id=id, title=title, link=link)
async def fetch_feed(
feed: FeedMeta,
) -> typing.Tuple[feedparser.FeedParserDict | None, FeedMeta]:
"""Potentially fetch the feed described by `feed`, returning a parsed feed
(if possible and necessary) and an updated FeedMeta.
This function can fail to return a parsed feed under a number of
circumstances. Among them:
- It's too soon to be checking this feed again.
- The feed has been failing for a while and we've called it's dead.
- The server told us it was dead.
- We checked the server and it told us our cache was good.
- We tried to contact the server, but a networking error happened.
Regardless, the new FeedMeta has the latest state of the feed.
"""
if feed.status == FEED_STATUS_DEAD:
return (None, feed)
if time.time() < feed.retry_after_ts:
LOG.info(f"{feed.url} will not be pulled until {feed.retry_after_ts}")
return (None, feed)
# We waffle back and forth about using feedreader's HTTP support vs
# calling requests ourselves. We have decided to use requests manually at
# this time because it make it much much easier to figure out whether or
# not a request has succeeded. (The straw was handling timeouts and
# understanding whether `bozo_exception` was a transport failure or not.)
headers = {"user-agent": "cry-reader v0.0"}
if feed.etag:
headers["if-none-match"] = feed.etag
if feed.modified:
headers["if-modified-since"] = feed.modified
LOG.info(f"{feed.url} fetching...")
try:
loop = asyncio.get_running_loop()
response = await loop.run_in_executor(
None,
functools.partial(http.get, feed.url, headers=headers),
)
LOG.info(f"{feed.url} fetched with status: {response.status_code}")
failed = response.status_code >= 400
except Exception as e:
LOG.error(f"{feed.url} error fetching: {e}")
failed = True
response = None
# Now, there are a number of things to consider in the response that
# we need to consider in updating our permanent record.
if response is not None and response.status_code == 410:
# Permanently gone, really stop asking.
LOG.error(f"{feed.url} permanently gone")
return (None, dataclasses.replace(feed, status=FEED_STATUS_DEAD))
if failed and time.time() > feed.last_fetched_ts + (7 * 24 * 60 * 60):
# If we've been failing to fetch the feed for more than a week then
# consider us dead, we must be doing something wrong.
LOG.error(f"{feed.url} failed for too long, giving up")
return (None, dataclasses.replace(feed, status=FEED_STATUS_DEAD))
if response and response.is_permanent_redirect:
# Permanent redirect, update the stored URL, but mark this as a
# successful fetch.
#
# TODO: Is this actually the right URL to store? We need the last
# permanently redirected URL, not just whatever the last thing
# is... e.g. imagine a permanent followed by a temporary
# redirect, then what?
assert response.url is not None
feed = dataclasses.replace(feed, url=response.url)
# NOTE: We might still be in a failure state here. But success or fail,
# the server might have told us when to next retry, so make a note
# of it.
retry_delta = None
if response is not None:
try:
retry_delta = int(response.headers.get("retry-after", "nope"))
except Exception:
pass
if retry_delta is None:
retry_delta = 60 * 60 # 1 hour default
feed = dataclasses.replace(feed, retry_after_ts=int(time.time()) + retry_delta)
# We've done everything we can on a failure, bail if we've got an error.
if failed:
LOG.info(f"{feed.url} failed at the network level")
return (None, feed)
assert response is not None
# Record our successful fetch now, to reset the failure timer above.
feed = dataclasses.replace(feed, last_fetched_ts=int(time.time()))
# We can *still* be successful but like, no changes.
if response.status_code != 200:
LOG.info(f"{feed.url} had no changes")
return (None, feed)
feed = dataclasses.replace(
feed,
etag=response.headers.get("etag"),
modified=response.headers.get("last-modified"),
)
parsed = feedparser.parse(response.content, response_headers=response.headers)
return (parsed, feed)
async def main() -> None:
database.setup_database()
feeds = [
FeedMeta.from_url(url)
for url in opml.load_opml(pathlib.Path.home() / "Downloads" / "fraidycat.opml")
]
async with asyncio.TaskGroup() as group:
tasks = [group.create_task(fetch_feed(f)) for f in feeds]
results = [t.result() for t in tasks]
for d, meta in results:
if d is not None:
title = None
page_url = None
if d.feed is not None:
title = d.feed.get("title")
page_url = d.feed.get("link")
if title is None or title == "":
title = meta.url
if page_url is None:
page_url = meta.url
print(f"[{title}]({page_url})")
print(f"{meta}")
entries = [entry_from_feed(e) for e in d.entries]
for entry in entries:
print(f" - {entry.title} ({entry.id})")
print(f" {entry.link}")
print()
if __name__ == "__main__":
asyncio.run(main())

12
cry/opml.py Normal file
View file

@ -0,0 +1,12 @@
import pathlib
import xml.etree.ElementTree
def parse_opml(opml: str) -> list[str]:
f = xml.etree.ElementTree.fromstring(opml)
return [e.attrib["xmlUrl"] for e in f.iterfind(".//*[@xmlUrl]")]
def load_opml(path: pathlib.Path) -> list[str]:
with open(path, "r", encoding="utf-8") as f:
return parse_opml(f.read())