From 08fe7c1cf7cbfaecacce881c00ba966d45f8aa6e Mon Sep 17 00:00:00 2001 From: John Doty Date: Fri, 22 Nov 2024 12:09:03 -0800 Subject: [PATCH] Changed my mind about feed times I hope I haven't broken things, we'll see after a while I guess. --- cry/cli.py | 27 ++++++++++++ cry/database.py | 108 +++++++++++++++++++++++++++++++++++++++++------- cry/feed.py | 76 +++++++++++++++------------------- 3 files changed, 153 insertions(+), 58 deletions(-) diff --git a/cry/cli.py b/cry/cli.py index 9d91850..e6f9c8e 100644 --- a/cry/cli.py +++ b/cry/cli.py @@ -259,6 +259,33 @@ def unsubscribe(url): db.update_feed_status(meta, feed.FEED_STATUS_UNSUBSCRIBED) +@cli.command(name="fetch") +@click.argument("url") +def fetch(url): + """Just fetch a feed and display the entries. + + Nothing local is updated. + """ + meta = feed.FeedMeta.from_url(url) + + click.echo(f"Fetching {url}...") + d, _ = asyncio.run(feed.fetch_feed(meta)) + + if d is None: + click.echo("No changes. (?)") + elif isinstance(d, str): + click.echo(f"WARNING: {url} returned a non-feed result!") + click.echo(d) + else: + click.echo(f"{d.title}") + if len(d.entries) > 0: + for entry in d.entries: + click.echo(f" {entry.title} ({entry.time_ago()})") + else: + click.echo(f" ") + click.echo() + + @cli.command("serve") def serve(): web.serve() diff --git a/cry/database.py b/cry/database.py index b9026d2..d3c90cb 100644 --- a/cry/database.py +++ b/cry/database.py @@ -91,16 +91,8 @@ SCHEMA_STATEMENTS = [ UPDATE properties SET value=value + 1 WHERE name='clock'; END; - CREATE TRIGGER update_clock_on_entries_update - AFTER UPDATE ON entries - WHEN (NEW.id IS NOT OLD.id) - OR (NEW.inserted_at IS NOT OLD.inserted_at) - OR (NEW.feed_url IS NOT OLD.feed_url) - OR (NEW.title IS NOT OLD.title) - OR (NEW.link IS NOT OLD.link) - BEGIN - UPDATE properties SET value=value + 1 WHERE name='clock'; - END; + -- Superceded by later definition, no need to re-run this. + -- CREATE TRIGGER update_clock_on_entries_update """, """ CREATE TABLE sync_status ( @@ -108,6 +100,22 @@ SCHEMA_STATEMENTS = [ clock INT NOT NULL ); """, + """ + ALTER TABLE entries ADD COLUMN posted_at INTEGER; + + DROP TRIGGER IF EXISTS update_clock_on_entries_update; + CREATE TRIGGER update_clock_on_entries_update + AFTER UPDATE ON entries + WHEN (NEW.id IS NOT OLD.id) + OR (NEW.inserted_at IS NOT OLD.inserted_at) + OR (NEW.posted_at IS NOT OLD.posted_at) + OR (NEW.feed_url IS NOT OLD.feed_url) + OR (NEW.title IS NOT OLD.title) + OR (NEW.link IS NOT OLD.link) + BEGIN + UPDATE properties SET value=value + 1 WHERE name='clock'; + END; + """, ] @@ -308,6 +316,7 @@ class Database: SELECT id, inserted_at, + COALESCE(posted_at, inserted_at) AS posted_at, title, link FROM entries @@ -322,8 +331,14 @@ class Database: rows = [] entries = [ - feed.Entry(id=id, inserted_at=inserted_at, title=title, link=link) - for id, inserted_at, title, link in rows + feed.Entry( + id=id, + inserted_at=inserted_at, + posted_at=posted_at, + title=title, + link=link, + ) + for id, inserted_at, posted_at, title, link in rows ] f = feed.Feed(meta=meta, title=title, link=link, entries=entries) feeds.append(f) @@ -362,8 +377,42 @@ class Database: Returns the number of new entries inserted. """ with self.db: + # Correct the entries to make sure that we do not hide "new" + # entries behind old entries. (This can happen because the + # times in feeds are historically untrustworthy.) e.g., what + # should we do if we previously saw an entry from "October 3rd" + # but suddenly see a *new* entry from "October 1st"? That can't + # be right! So we bring every item's posted time to at last the + # maximum posted time we previously saw. + # + # This correction is incorrect in the case of feed sync, so feed + # sync can't go through here. + # + # NOTE: This this might seem to bring entries already in the + # database forward to a newer time. BUT! When we insert we + # take the *older* time on conflict, so the change we do + # here is undone on insert. Given that I don't want to do + # "new entry" detection here in memory, that seems to be + # OK. The other fix is to actually do "new entry" detection + # in python, and stop relying on insert conflicts. But + # alas, the insert conflict mechanism still must still + # exist in order to do the local-state synchronization, so + # we really don't save anything with that. + # + max_post_time = self._get_max_post_time(f.meta.url) + 1 + fixed_entries = [ + feed.Entry( + id=e.id, + inserted_at=e.inserted_at, + posted_at=max(e.posted_at, max_post_time), + title=e.title, + link=e.link, + ) + for e in f.entries + ] + self._insert_feed(f.meta, f.title, f.link) - return self._insert_entries(f.meta.url, f.entries) + return self._insert_entries(f.meta.url, fixed_entries) def update_feed_status(self, meta: feed.FeedMeta, status: int) -> int: with self.db: @@ -471,6 +520,7 @@ class Database: SELECT id, inserted_at, + COALESCE(posted_at, inserted_at), title, link FROM entries @@ -480,16 +530,21 @@ class Database: ) entries_results = entries_cursor.fetchmany() while len(entries_results) > 0: + # NOTE: It is critical that this here does not go + # through the logic in "store_feed" as it can cause + # wildly incorrect times to get propagated. (Although + # maybe corrected on the next sync?) self._insert_entries( url, [ feed.Entry( id=id, inserted_at=int(inserted_at), + posted_at=int(posted_at), title=title, link=link, ) - for id, inserted_at, title, link in entries_results + for id, inserted_at, posted_at, title, link in entries_results ], ) entries_results = entries_cursor.fetchmany() @@ -563,6 +618,16 @@ class Database: ], ) + def _get_max_post_time(self, feed_url: str) -> int: + cursor = self.db.execute( + "SELECT MAX(COALESCE(posted_at, inserted_at)) FROM entries WHERE feed_url=?", + [feed_url], + ) + result = cursor.fetchone() + if result is None or result[0] is None: + return 0 + return int(result[0]) + def _insert_entries(self, feed_url: str, entries: list[feed.Entry]) -> int: cursor = self.db.execute( "SELECT COUNT (*) FROM entries WHERE feed_url=?", [feed_url] @@ -574,10 +639,11 @@ class Database: INSERT INTO entries ( id, inserted_at, + posted_at, feed_url, title, link - ) VALUES (?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT DO UPDATE SET -- NOTE: This is also part of the feed merge algorithm, BUT @@ -590,7 +656,14 @@ class Database: -- to handle all the cases. (In theory we could make two -- different INSERTs to handle the two cases but that is -- more complexity than it is worth.) + -- inserted_at=MIN(inserted_at, excluded.inserted_at), + + -- NOTE: This behavior of MIN() on collision is relied upon to + -- correct another correction we make in store_feed. See + -- the comment there about publish times. + -- + posted_at=COALESCE(MIN(posted_at, excluded.posted_at), MIN(inserted_at, excluded.inserted_at)), title=CASE WHEN inserted_at < excluded.inserted_at THEN title ELSE excluded.title @@ -600,7 +673,10 @@ class Database: ELSE excluded.link END """, - [(e.id, e.inserted_at, feed_url, e.title, e.link) for e in entries], + [ + (e.id, e.inserted_at, e.posted_at, feed_url, e.title, e.link) + for e in entries + ], ) cursor = self.db.execute( diff --git a/cry/feed.py b/cry/feed.py index 7be8cff..7f00fbe 100644 --- a/cry/feed.py +++ b/cry/feed.py @@ -69,7 +69,8 @@ class FeedMeta: @dataclasses.dataclass(frozen=True) class Entry: id: str - inserted_at: int + inserted_at: int # Unix time, but ms, not sec + posted_at: int # Unix time, but ms, not sec title: str link: str | None @@ -105,15 +106,32 @@ class Entry: if not id: id = the_worst_element_hash(entry) + published = entry.get("published_parsed") + if published is None: + published = entry.get("updated_parsed") + if published is not None: + assert isinstance(published, tuple) + # NOTE: Take insert_time if it's smaller; publish time errors generate + # posts from the future. + posted_at = min(int(time.mktime(published) * 1000), insert_time) + else: + posted_at = int(insert_time) + assert isinstance(id, str) assert link is None or isinstance(link, str) title = clean_text(str(title)) - return Entry(id=id, inserted_at=insert_time, title=title, link=link) + return Entry( + id=id, + inserted_at=insert_time, + posted_at=posted_at, + title=title, + link=link, + ) def time_ago(self) -> str: - inserted = int(self.inserted_at / 1000) - seconds = int(time.time()) - inserted + posted = int(self.posted_at / 1000) + seconds = int(time.time()) - posted if seconds <= 90: return f"{seconds}s" minutes = int(seconds / 60) @@ -179,10 +197,10 @@ class Feed: # c) Even if you can parse the timestamp, many feed implementations # just PUT THE WRONG TIME IN THERE. # - # The only coherent thing to do is to ignore the dates in the feeds - # and just rely on our own sense of time. This comes with its own - # problems, of course: our clock can be highly unreliable. But in - # general it's good enough to work with, and feeds don't update so + # So we have to account for the fact that the publish time might be + # wildly unreliable, and back it up with our own clock. This comes with + # its own problems, of course: our clock can be highly unreliable. But + # in general it's good enough to work with, and feeds don't update so # frequently that we need to worry about most of these problems if we # use unix timestamps as our basis. # @@ -200,6 +218,13 @@ class Feed: # `retry_after_ts` field, etc.) it's not a very likely thing to # happen. # + # The *other* big source of time instability is that "new" items might + # seem to have been published with a time that is "before" the last + # item we previously saw. (i.e., on the first refresh we see an item + # from October 3rd, then on the next refresh we see an item from October + # 1st.) We don't know anything about historical refreshes here in feed + # land, so that gets corrected in the database. (See store_feed.) + # insert_time = int(time.time()) * 1000 entries = [ Entry.from_parsed(e, insert_time + i) @@ -550,43 +575,10 @@ async def fetch_many( return [t.result() for t in tasks] -def merge_feeds(a: Feed, a_origin: str, b: Feed, b_origin: str) -> Feed: - """Merge two known feeds. There are two conflict resolution policies: - - 1. The newer fetch of feed metadata wins. - 2. The older fetch of a feed item wins. - - This means that the merge order between feeds *should* be consistent, - unless somehow the feeds updated at the exact same time. In that case, - the feed with the lexographically smallest slug wins. - """ - results = {e.id: e for e in a.entries} - for entry in b.entries: - existing = results.get(entry.id) - if existing is None or existing.inserted_at > entry.inserted_at: - results[entry.id] = entry - - entries = sorted(results.values(), key=lambda e: e.inserted_at, reverse=True) - source_feed = a - if a.meta.last_fetched_ts > b.meta.last_fetched_ts: - source_feed = a - elif a.meta.last_fetched_ts == b.meta.last_fetched_ts: - source_feed = a if a_origin < b_origin else b - else: - source_feed = b - - return Feed( - meta=source_feed.meta, - title=source_feed.title, - link=source_feed.link, - entries=entries, - ) - - def sort_key(f: Feed) -> int: """A sort key for sorting feeds by recency.""" if len(f.entries) > 0: - return max(e.inserted_at for e in f.entries) + return max(e.posted_at for e in f.entries) return -1