From 5a834d5d26b77b6fb990af7111ad1e3205eeb69c Mon Sep 17 00:00:00 2001 From: John Doty Date: Mon, 22 Jul 2024 15:58:30 -0700 Subject: [PATCH 1/2] Clocks --- cry/database.py | 123 ++++++++++++++++++++++++++++++----------- tests/test_database.py | 113 ++++++++++++++++++++++++++++++++++++- 2 files changed, 203 insertions(+), 33 deletions(-) diff --git a/cry/database.py b/cry/database.py index a73e971..4e5a934 100644 --- a/cry/database.py +++ b/cry/database.py @@ -45,6 +45,60 @@ SCHEMA_STATEMENTS = [ ELSE status END """, + # The "clock" is a number that increments as we make changes. We use this + # to do reconciliation, and track which versions of other databases we + # have reconciled already. + """ + INSERT INTO properties (name, value) VALUES ('clock', 1); + + CREATE TRIGGER update_clock_on_feed_insert + AFTER INSERT ON feeds + BEGIN + UPDATE properties SET value=value + 1 WHERE name='clock'; + END; + + CREATE TRIGGER update_clock_on_feed_delete + AFTER DELETE ON feeds + BEGIN + UPDATE properties SET value=value + 1 WHERE name='clock'; + END; + + CREATE TRIGGER update_clock_on_feed_update + AFTER UPDATE ON feeds + WHEN (NEW.last_fetched_ts IS NOT OLD.last_fetched_ts) + OR (NEW.retry_after_ts IS NOT OLD.retry_after_ts) + OR (NEW.status IS NOT OLD.status) + OR (NEW.etag IS NOT OLD.etag) + OR (NEW.modified IS NOT OLD.modified) + OR (NEW.title IS NOT OLD.title) + OR (NEW.link IS NOT OLD.link) + BEGIN + UPDATE properties SET value=value + 1 WHERE name='clock'; + END; + + CREATE TRIGGER update_clock_on_entries_insert + AFTER INSERT ON entries + BEGIN + UPDATE properties SET value=value + 1 WHERE name='clock'; + END; + + CREATE TRIGGER update_clock_on_entries_delete + AFTER DELETE ON entries + BEGIN + UPDATE properties SET value=value + 1 WHERE name='clock'; + END; + + CREATE TRIGGER update_clock_on_entries_update + AFTER UPDATE ON entries + WHEN (NEW.id IS NOT OLD.id) + OR (NEW.inserted_at IS NOT OLD.inserted_at) + OR (NEW.feed_url IS NOT OLD.feed_url) + OR (NEW.title IS NOT OLD.title) + OR (NEW.link IS NOT OLD.link) + BEGIN + UPDATE properties SET value=value + 1 WHERE name='clock'; + END; + """, ] @@ -121,6 +175,9 @@ class Database: with self.db: return self._set_property(prop, value) + def get_clock(self) -> int: + return int(self.get_property("clock", 0)) + def ensure_database_schema(self): with self.db: self.db.execute( @@ -133,11 +190,10 @@ class Database: ) version = int(self._get_property("version", 0)) for script in SCHEMA_STATEMENTS[version:]: - for statement in script.split(";"): - try: - self.db.execute(statement) - except Exception as e: - raise Exception(f"Error executing:\n{statement}") from e + try: + self.db.executescript(script) + except Exception as e: + raise Exception(f"Error executing:\n{script}") from e self._set_property("version", len(SCHEMA_STATEMENTS)) self._set_property("origin", self.origin) @@ -248,33 +304,7 @@ class Database: def load_meta(self, url: str) -> feed.FeedMeta | None: with self.db: - cursor = self.db.execute( - """ - SELECT - last_fetched_ts, - retry_after_ts, - status, - etag, - modified - FROM feeds - WHERE url=? - """, - [url], - ) - - row = cursor.fetchone() - if row is None: - return None - - last_fetched_ts, retry_after_ts, status, etag, modified = row - return feed.FeedMeta( - url=url, - last_fetched_ts=last_fetched_ts, - retry_after_ts=retry_after_ts, - status=status, - etag=etag, - modified=modified, - ) + return self._load_meta(url) def update_meta(self, f: feed.FeedMeta): with self.db: @@ -475,3 +505,32 @@ class Database: [status, new_ts, meta.url], ) return cursor.rowcount + + def _load_meta(self, url: str) -> feed.FeedMeta | None: + cursor = self.db.execute( + """ + SELECT + last_fetched_ts, + retry_after_ts, + status, + etag, + modified + FROM feeds + WHERE url=? + """, + [url], + ) + + row = cursor.fetchone() + if row is None: + return None + + last_fetched_ts, retry_after_ts, status, etag, modified = row + return feed.FeedMeta( + url=url, + last_fetched_ts=last_fetched_ts, + retry_after_ts=retry_after_ts, + status=status, + etag=etag, + modified=modified, + ) diff --git a/tests/test_database.py b/tests/test_database.py index 511b760..50a245d 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -69,6 +69,18 @@ def test_database_prop_get_set(): assert db.get_property("foo") == val +def test_database_prop_get_set_clock(): + db = database.Database(":memory:", random_slug()) + db.ensure_database_schema() + + clock_a = db.get_clock() + db.get_property("foo") + assert db.get_clock() == clock_a + val = random_slug() + db.set_property("foo", val) + assert db.get_clock() == clock_a + + REF_TIME = int(time.time()) FEED = feed.Feed( meta=feed.FeedMeta( @@ -121,6 +133,17 @@ def test_database_store_feed_dups(): assert new_entries == 0 +def test_database_store_feed_dups_clock(): + db = database.Database(":memory:", random_slug()) + db.ensure_database_schema() + + db.store_feed(FEED) + old_clock = db.get_clock() + + db.store_feed(FEED) + assert db.get_clock() == old_clock, "No change, no change!" + + def test_database_store_feed_fetch_meta(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() @@ -150,12 +173,26 @@ def test_database_store_feed_fetch_all_dups(): assert all_feeds == [FEED] +def test_database_store_feed_reads_no_clock(): + db = database.Database(":memory:", random_slug()) + db.ensure_database_schema() + + db.store_feed(FEED) + old_clock = db.get_clock() + + db.load_meta(FEED.meta.url) + assert db.get_clock() == old_clock + db.load_all() + assert db.get_clock() == old_clock + db.load_all_meta() + assert db.get_clock() == old_clock + + def test_database_store_feed_fetch_pattern_miss(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() db.store_feed(FEED) - expected = dataclasses.replace(FEED, entries=FEED.entries[:13]) all_feeds = db.load_all(feed_limit=13, pattern="no_existo") assert all_feeds == [] @@ -205,6 +242,31 @@ def test_database_store_with_update(): assert all_feeds == [updated_feed] +def test_database_store_with_update_clock(): + db = database.Database(":memory:", random_slug()) + db.ensure_database_schema() + + db.store_feed(FEED) + old_clock = db.get_clock() + + updated_feed = dataclasses.replace( + FEED, + meta=dataclasses.replace( + FEED.meta, + last_fetched_ts=FEED.meta.last_fetched_ts + 10, + retry_after_ts=FEED.meta.retry_after_ts + 20, + # status=feed.FEED_STATUS_UNSUBSCRIBED, + etag=None, + modified=random_slug(), + ), + title=FEED.title + " (updated)", + link=FEED.link + "/updated", + ) + db.store_feed(updated_feed) + + assert db.get_clock() != old_clock + + def test_database_store_with_older_entries(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() @@ -252,6 +314,26 @@ def test_database_store_update_meta(): assert db.load_all_meta()[0] == new_meta +def test_database_store_update_meta(): + db = database.Database(":memory:", random_slug()) + db.ensure_database_schema() + + db.store_feed(FEED) + old_clock = db.get_clock() + + new_meta = dataclasses.replace( + FEED.meta, + last_fetched_ts=FEED.meta.last_fetched_ts + 10, + retry_after_ts=FEED.meta.last_fetched_ts + 20, + status=feed.FEED_STATUS_DEAD, + etag=random_slug(), + modified=random_slug(), + ) + + db.update_meta(new_meta) + assert db.get_clock() != old_clock + + def test_database_update_feed_status(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() @@ -267,6 +349,21 @@ def test_database_update_feed_status(): assert db.load_all_meta()[0].status == feed.FEED_STATUS_UNSUBSCRIBED +def test_database_update_feed_status_clock(): + db = database.Database(":memory:", random_slug()) + db.ensure_database_schema() + + db.store_feed(FEED) + old_clock = db.get_clock() + + db.update_feed_status( + FEED.meta, + feed.FEED_STATUS_UNSUBSCRIBED, + ) + + assert db.get_clock() != old_clock + + def test_database_redirect_clean(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() @@ -302,3 +399,17 @@ def test_database_redirect_with_merge(): old_dead_meta = dataclasses.replace(FEED.meta, status=feed.FEED_STATUS_UNSUBSCRIBED) assert db.load_all_meta() == [old_dead_meta, expected_meta] assert db.load_all(feed_limit=9999) == [expected_feed] + + +def test_database_redirect_clock(): + db = database.Database(":memory:", random_slug()) + db.ensure_database_schema() + + db.store_feed(FEED) + + old_clock = db.get_clock() + + new_url = f"http://example.com/redirect/{random_slug()}" + db.redirect_feed(FEED.meta.url, new_url) + + assert db.get_clock() != old_clock From c6bd8d8556ebee910bb6afe7bdedb05fe81446ac Mon Sep 17 00:00:00 2001 From: John Doty Date: Mon, 22 Jul 2024 16:20:22 -0700 Subject: [PATCH 2/2] Start reconcile --- cry/cli.py | 32 ++++++++++++++++++++++++++++++++ cry/database.py | 31 ++++++++++++++++++++++++++++--- 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/cry/cli.py b/cry/cli.py index 6bb988b..9a93c30 100644 --- a/cry/cli.py +++ b/cry/cli.py @@ -248,3 +248,35 @@ def unsubscribe(url): @cli.command("serve") def serve(): web.serve() + + +@cli.command("reconcile") +def reconcile(): + local_db = database.Database.local() + local_version = local_db.get_property("version", 0) + for p in database.databases_directory().glob("*.db"): + if not p.is_file(): + continue + + try: + other_db = database.Database.from_file(p) + if local_db.origin == other_db.origin: + continue + + other_version = other_db.get_property("version", 0) + if other_version != local_version: + click.echo( + f"{other_db.origin}: Not reconciling version {other_version} against {local_version}" + ) + continue + + # TODO: GET CLOCK OF BOTH. + other_clock = other_db.get_clock() + reconciled_clock = local_db.get_reconcile_clock(other_db.origin) + if other_clock == reconciled_clock: + continue + + # TODO: RECONCILE FOR REALS + + except Exception as e: + click.echo(f"Error loading {p}: {e}") diff --git a/cry/database.py b/cry/database.py index 4e5a934..138a831 100644 --- a/cry/database.py +++ b/cry/database.py @@ -99,6 +99,12 @@ SCHEMA_STATEMENTS = [ UPDATE properties SET value=value + 1 WHERE name='clock'; END; """, + """ + CREATE TABLE reconcile_status ( + origin VARCHAR NOT NULL PRIMARY KEY, + clock INT NOT NULL + ); + """, ] @@ -129,8 +135,12 @@ def local_origin(path: pathlib.Path | None = None) -> str: return origin +def databases_directory() -> pathlib.Path: + return pathlib.Path.home() / "Dropbox" / "cry" + + def database_path(origin: str) -> pathlib.Path: - return pathlib.Path.home() / "Dropbox" / "cry" / f"{origin}.db" + return databases_directory() / f"{origin}.db" # TODO: Refactor into: @@ -141,13 +151,18 @@ class Database: db: sqlite3.Connection origin: str - def __init__(self, path: pathlib.Path | str, origin: str): + def __init__(self, path: pathlib.Path | str, origin: str, readonly: bool = False): + uri = False if not isinstance(path, str): path.parent.mkdir(parents=True, exist_ok=True) + path = f"file:{str(path)}" + uri = True + if readonly: + path = f"{path}?mode=ro" # Enable autocommit as a separate step so that I can enable foreign # keys cleanly. (Can't enable foreign keys in a transaction.) - db = sqlite3.Connection(str(path)) + db = sqlite3.connect(str(path), uri=uri) db.execute("PRAGMA foreign_keys = ON") db.autocommit = False @@ -165,6 +180,16 @@ class Database: db = Database(database_path(origin), origin) db.ensure_database_schema() + db.set_property("origin", origin) + return db + + @classmethod + def from_file(cls, path: pathlib.Path) -> "Database": + db = Database(path, "", readonly=True) + origin = db.get_property("origin") + if origin is None: + raise Exception("No origin!") + db.origin = str(origin) return db def get_property(self, prop: str, default=None) -> typing.Any: