From c4a25c1683702b09ddaa4143251c91d712e8b201 Mon Sep 17 00:00:00 2001 From: John Doty Date: Thu, 2 Jan 2025 08:15:05 -0800 Subject: [PATCH] More work on post times --- cry/database.py | 21 ++++++++++++++----- cry/feed.py | 54 ++++++++++++++++++++++++++++++++----------------- cry/web.py | 19 ++++++++++++----- 3 files changed, 66 insertions(+), 28 deletions(-) diff --git a/cry/database.py b/cry/database.py index 3ef2a6d..c94f1e1 100644 --- a/cry/database.py +++ b/cry/database.py @@ -549,6 +549,11 @@ class Database: ) entries_results = entries_cursor.fetchmany() + def get_max_post_time(self) -> int: + """Get the latest any entry was recorded.""" + with self.db: + return self._get_max_post_time(feed_url=None) + def _get_property(self, prop: str, default=None) -> typing.Any: cursor = self.db.execute("SELECT value FROM properties WHERE name=?", (prop,)) result = cursor.fetchone() @@ -618,11 +623,17 @@ class Database: ], ) - def _get_max_post_time(self, feed_url: str) -> int: - cursor = self.db.execute( - "SELECT MAX(COALESCE(posted_at, inserted_at)) FROM entries WHERE feed_url=?", - [feed_url], - ) + def _get_max_post_time(self, feed_url: str | None) -> int: + if feed_url: + cursor = self.db.execute( + "SELECT MAX(COALESCE(posted_at, inserted_at)) FROM entries WHERE feed_url=?", + [feed_url], + ) + else: + cursor = self.db.execute( + "SELECT MAX(COALESCE(posted_at, inserted_at)) FROM entries" + ) + result = cursor.fetchone() if result is None or result[0] is None: return 0 diff --git a/cry/feed.py b/cry/feed.py index c965b64..af48578 100644 --- a/cry/feed.py +++ b/cry/feed.py @@ -77,15 +77,24 @@ class Entry: link: str | None @classmethod - def from_parsed(cls, entry: feedparser.FeedParserDict, insert_time: int) -> "Entry": + def from_parsed( + cls, + entry: feedparser.FeedParserDict, + insert_time: int, + last_post_time: int, + ) -> "Entry": """Convert an entry from feedparser into an Entry by extracting the things we care about, fudging things and substituting things as necessary. - The one thing we need from the outside is the "insert time", which - is *almost* `int(time.time())` but needs a little bit of fudging in - order to ensure that we can keep the items in order when we get a lot - of them all at once. + We also need to correct the post's timestamps, possibly, and to do + that we have two times from the outside: insert_time and last_post_time. + insert_time is a monotonic clock that mostly measures "now", and + last_post_time is mostly the time of the latest post in the feed. + + (The reason for "mostly" above is that they're actually ticked + forward a little in order to make sure that entries without + timestamps maintain their feed order.) """ title = entry.get("title") if not title: @@ -114,10 +123,11 @@ class Entry: if published is not None: assert isinstance(published, tuple) - # NOTE: Take insert_time if it's smaller; publish time errors generate - # posts from the future. + # NOTE: Take last_post_time if it's bigger, so that we don't get + # new entries from the past, and insert_time if it's smaller; + # publish time errors generate posts from the future. pub_time = int(calendar.timegm(published) * 1000) - posted_at = min(pub_time, insert_time) + posted_at = min(max(pub_time, last_post_time), insert_time) else: posted_at = insert_time @@ -166,7 +176,12 @@ class Feed: entries: list[Entry] @classmethod - def from_parsed(cls, d: feedparser.FeedParserDict, meta: FeedMeta) -> "Feed": + def from_parsed( + cls, + d: feedparser.FeedParserDict, + meta: FeedMeta, + last_post_time: int, + ) -> "Feed": title = None link = None @@ -225,16 +240,16 @@ class Feed: # `retry_after_ts` field, etc.) it's not a very likely thing to # happen. # - # The *other* big source of time instability is that "new" items might - # seem to have been published with a time that is "before" the last - # item we previously saw. (i.e., on the first refresh we see an item - # from October 3rd, then on the next refresh we see an item from October - # 1st.) We don't know anything about historical refreshes here in feed - # land, so that gets corrected in the database. (See store_feed.) + # The *other* big source of time instability is that "new" items + # might seem to have been published with a time that is "before" the + # last item we previously saw. (i.e., on the first refresh we see an + # item from October 3rd, then on the next refresh we see an item from + # October 1st.) That value comes in as "last_post", and if we see a + # time less than that it is bumped up to that time + 1. # insert_time = int(time.time()) * 1000 entries = [ - Entry.from_parsed(e, insert_time + i) + Entry.from_parsed(e, insert_time + i, last_post_time + i) for i, e in enumerate(reversed(d.entries)) ] entries.reverse() @@ -420,7 +435,10 @@ class Guardian: GUARDIAN = Guardian() -async def fetch_feed(meta: FeedMeta) -> typing.Tuple[None | Feed | str, FeedMeta]: +async def fetch_feed( + meta: FeedMeta, + last_post_time: int = 0, +) -> typing.Tuple[None | Feed | str, FeedMeta]: """Fetch a feed from the internet. `meta` is a FeedMeta that has all the details about what happened the last time we went to do a fetch, caching information and whatnot. @@ -567,7 +585,7 @@ async def fetch_feed(meta: FeedMeta) -> typing.Tuple[None | Feed | str, FeedMeta # Does this seem to be a feed? Or not? if could_be_feed_data(response.text): parsed = feedparser.parse(response.content, response_headers=response.headers) - return (Feed.from_parsed(parsed, meta), meta) + return (Feed.from_parsed(parsed, meta, last_post_time), meta) # No this is not a feed, just return the content out for further # processing. diff --git a/cry/web.py b/cry/web.py index c177e13..38774ae 100644 --- a/cry/web.py +++ b/cry/web.py @@ -253,11 +253,15 @@ def background_task( def refresh_feeds(sink: EventChannel): """Refresh all the subscribed feeds.""" - async def _refresh_meta(db: database.Database, meta: feed.FeedMeta): + async def _refresh_meta( + db: database.Database, + meta: feed.FeedMeta, + last_post_time: int, + ): sink.log(f"[{meta.url}] Fetching...") d = None try: - d, meta = await feed.fetch_feed(meta) + d, meta = await feed.fetch_feed(meta, last_post_time) if d is None: sink.log(f"[{meta.url}] No updates") db.update_meta(meta) @@ -274,10 +278,14 @@ def refresh_feeds(sink: EventChannel): except Exception as e: sink.log(f"[{meta.url}] Error refressing feed: {e}") - async def _refresh_all(db: database.Database, metas: list[feed.FeedMeta]): + async def _refresh_all( + db: database.Database, + metas: list[feed.FeedMeta], + last_post_time: int, + ): async with asyncio.TaskGroup() as group: for meta in metas: - group.create_task(_refresh_meta(db, meta)) + group.create_task(_refresh_meta(db, meta, last_post_time)) db = database.Database.local() sink.status("Synchronizing state...") @@ -285,9 +293,10 @@ def refresh_feeds(sink: EventChannel): sink.status("Loading subscriptions...") metas = db.load_all_meta() + last_post_time = db.get_max_post_time() sink.status("Refreshing subscriptions...") - asyncio.run(_refresh_all(db, metas)) + asyncio.run(_refresh_all(db, metas, last_post_time)) sink.status("Done") sink.redirect("/")