Compare commits

...

2 commits

Author SHA1 Message Date
c6bd8d8556 Start reconcile 2024-07-22 16:20:22 -07:00
5a834d5d26 Clocks 2024-07-22 15:58:30 -07:00
3 changed files with 263 additions and 36 deletions

View file

@ -248,3 +248,35 @@ def unsubscribe(url):
@cli.command("serve")
def serve():
web.serve()
@cli.command("reconcile")
def reconcile():
local_db = database.Database.local()
local_version = local_db.get_property("version", 0)
for p in database.databases_directory().glob("*.db"):
if not p.is_file():
continue
try:
other_db = database.Database.from_file(p)
if local_db.origin == other_db.origin:
continue
other_version = other_db.get_property("version", 0)
if other_version != local_version:
click.echo(
f"{other_db.origin}: Not reconciling version {other_version} against {local_version}"
)
continue
# TODO: GET CLOCK OF BOTH.
other_clock = other_db.get_clock()
reconciled_clock = local_db.get_reconcile_clock(other_db.origin)
if other_clock == reconciled_clock:
continue
# TODO: RECONCILE FOR REALS
except Exception as e:
click.echo(f"Error loading {p}: {e}")

View file

@ -45,6 +45,66 @@ SCHEMA_STATEMENTS = [
ELSE status
END
""",
# The "clock" is a number that increments as we make changes. We use this
# to do reconciliation, and track which versions of other databases we
# have reconciled already.
"""
INSERT INTO properties (name, value) VALUES ('clock', 1);
CREATE TRIGGER update_clock_on_feed_insert
AFTER INSERT ON feeds
BEGIN
UPDATE properties SET value=value + 1 WHERE name='clock';
END;
CREATE TRIGGER update_clock_on_feed_delete
AFTER DELETE ON feeds
BEGIN
UPDATE properties SET value=value + 1 WHERE name='clock';
END;
CREATE TRIGGER update_clock_on_feed_update
AFTER UPDATE ON feeds
WHEN (NEW.last_fetched_ts IS NOT OLD.last_fetched_ts)
OR (NEW.retry_after_ts IS NOT OLD.retry_after_ts)
OR (NEW.status IS NOT OLD.status)
OR (NEW.etag IS NOT OLD.etag)
OR (NEW.modified IS NOT OLD.modified)
OR (NEW.title IS NOT OLD.title)
OR (NEW.link IS NOT OLD.link)
BEGIN
UPDATE properties SET value=value + 1 WHERE name='clock';
END;
CREATE TRIGGER update_clock_on_entries_insert
AFTER INSERT ON entries
BEGIN
UPDATE properties SET value=value + 1 WHERE name='clock';
END;
CREATE TRIGGER update_clock_on_entries_delete
AFTER DELETE ON entries
BEGIN
UPDATE properties SET value=value + 1 WHERE name='clock';
END;
CREATE TRIGGER update_clock_on_entries_update
AFTER UPDATE ON entries
WHEN (NEW.id IS NOT OLD.id)
OR (NEW.inserted_at IS NOT OLD.inserted_at)
OR (NEW.feed_url IS NOT OLD.feed_url)
OR (NEW.title IS NOT OLD.title)
OR (NEW.link IS NOT OLD.link)
BEGIN
UPDATE properties SET value=value + 1 WHERE name='clock';
END;
""",
"""
CREATE TABLE reconcile_status (
origin VARCHAR NOT NULL PRIMARY KEY,
clock INT NOT NULL
);
""",
]
@ -75,8 +135,12 @@ def local_origin(path: pathlib.Path | None = None) -> str:
return origin
def databases_directory() -> pathlib.Path:
return pathlib.Path.home() / "Dropbox" / "cry"
def database_path(origin: str) -> pathlib.Path:
return pathlib.Path.home() / "Dropbox" / "cry" / f"{origin}.db"
return databases_directory() / f"{origin}.db"
# TODO: Refactor into:
@ -87,13 +151,18 @@ class Database:
db: sqlite3.Connection
origin: str
def __init__(self, path: pathlib.Path | str, origin: str):
def __init__(self, path: pathlib.Path | str, origin: str, readonly: bool = False):
uri = False
if not isinstance(path, str):
path.parent.mkdir(parents=True, exist_ok=True)
path = f"file:{str(path)}"
uri = True
if readonly:
path = f"{path}?mode=ro"
# Enable autocommit as a separate step so that I can enable foreign
# keys cleanly. (Can't enable foreign keys in a transaction.)
db = sqlite3.Connection(str(path))
db = sqlite3.connect(str(path), uri=uri)
db.execute("PRAGMA foreign_keys = ON")
db.autocommit = False
@ -111,6 +180,16 @@ class Database:
db = Database(database_path(origin), origin)
db.ensure_database_schema()
db.set_property("origin", origin)
return db
@classmethod
def from_file(cls, path: pathlib.Path) -> "Database":
db = Database(path, "", readonly=True)
origin = db.get_property("origin")
if origin is None:
raise Exception("No origin!")
db.origin = str(origin)
return db
def get_property(self, prop: str, default=None) -> typing.Any:
@ -121,6 +200,9 @@ class Database:
with self.db:
return self._set_property(prop, value)
def get_clock(self) -> int:
return int(self.get_property("clock", 0))
def ensure_database_schema(self):
with self.db:
self.db.execute(
@ -133,11 +215,10 @@ class Database:
)
version = int(self._get_property("version", 0))
for script in SCHEMA_STATEMENTS[version:]:
for statement in script.split(";"):
try:
self.db.execute(statement)
except Exception as e:
raise Exception(f"Error executing:\n{statement}") from e
try:
self.db.executescript(script)
except Exception as e:
raise Exception(f"Error executing:\n{script}") from e
self._set_property("version", len(SCHEMA_STATEMENTS))
self._set_property("origin", self.origin)
@ -248,33 +329,7 @@ class Database:
def load_meta(self, url: str) -> feed.FeedMeta | None:
with self.db:
cursor = self.db.execute(
"""
SELECT
last_fetched_ts,
retry_after_ts,
status,
etag,
modified
FROM feeds
WHERE url=?
""",
[url],
)
row = cursor.fetchone()
if row is None:
return None
last_fetched_ts, retry_after_ts, status, etag, modified = row
return feed.FeedMeta(
url=url,
last_fetched_ts=last_fetched_ts,
retry_after_ts=retry_after_ts,
status=status,
etag=etag,
modified=modified,
)
return self._load_meta(url)
def update_meta(self, f: feed.FeedMeta):
with self.db:
@ -475,3 +530,32 @@ class Database:
[status, new_ts, meta.url],
)
return cursor.rowcount
def _load_meta(self, url: str) -> feed.FeedMeta | None:
cursor = self.db.execute(
"""
SELECT
last_fetched_ts,
retry_after_ts,
status,
etag,
modified
FROM feeds
WHERE url=?
""",
[url],
)
row = cursor.fetchone()
if row is None:
return None
last_fetched_ts, retry_after_ts, status, etag, modified = row
return feed.FeedMeta(
url=url,
last_fetched_ts=last_fetched_ts,
retry_after_ts=retry_after_ts,
status=status,
etag=etag,
modified=modified,
)

View file

@ -69,6 +69,18 @@ def test_database_prop_get_set():
assert db.get_property("foo") == val
def test_database_prop_get_set_clock():
db = database.Database(":memory:", random_slug())
db.ensure_database_schema()
clock_a = db.get_clock()
db.get_property("foo")
assert db.get_clock() == clock_a
val = random_slug()
db.set_property("foo", val)
assert db.get_clock() == clock_a
REF_TIME = int(time.time())
FEED = feed.Feed(
meta=feed.FeedMeta(
@ -121,6 +133,17 @@ def test_database_store_feed_dups():
assert new_entries == 0
def test_database_store_feed_dups_clock():
db = database.Database(":memory:", random_slug())
db.ensure_database_schema()
db.store_feed(FEED)
old_clock = db.get_clock()
db.store_feed(FEED)
assert db.get_clock() == old_clock, "No change, no change!"
def test_database_store_feed_fetch_meta():
db = database.Database(":memory:", random_slug())
db.ensure_database_schema()
@ -150,12 +173,26 @@ def test_database_store_feed_fetch_all_dups():
assert all_feeds == [FEED]
def test_database_store_feed_reads_no_clock():
db = database.Database(":memory:", random_slug())
db.ensure_database_schema()
db.store_feed(FEED)
old_clock = db.get_clock()
db.load_meta(FEED.meta.url)
assert db.get_clock() == old_clock
db.load_all()
assert db.get_clock() == old_clock
db.load_all_meta()
assert db.get_clock() == old_clock
def test_database_store_feed_fetch_pattern_miss():
db = database.Database(":memory:", random_slug())
db.ensure_database_schema()
db.store_feed(FEED)
expected = dataclasses.replace(FEED, entries=FEED.entries[:13])
all_feeds = db.load_all(feed_limit=13, pattern="no_existo")
assert all_feeds == []
@ -205,6 +242,31 @@ def test_database_store_with_update():
assert all_feeds == [updated_feed]
def test_database_store_with_update_clock():
db = database.Database(":memory:", random_slug())
db.ensure_database_schema()
db.store_feed(FEED)
old_clock = db.get_clock()
updated_feed = dataclasses.replace(
FEED,
meta=dataclasses.replace(
FEED.meta,
last_fetched_ts=FEED.meta.last_fetched_ts + 10,
retry_after_ts=FEED.meta.retry_after_ts + 20,
# status=feed.FEED_STATUS_UNSUBSCRIBED,
etag=None,
modified=random_slug(),
),
title=FEED.title + " (updated)",
link=FEED.link + "/updated",
)
db.store_feed(updated_feed)
assert db.get_clock() != old_clock
def test_database_store_with_older_entries():
db = database.Database(":memory:", random_slug())
db.ensure_database_schema()
@ -252,6 +314,26 @@ def test_database_store_update_meta():
assert db.load_all_meta()[0] == new_meta
def test_database_store_update_meta():
db = database.Database(":memory:", random_slug())
db.ensure_database_schema()
db.store_feed(FEED)
old_clock = db.get_clock()
new_meta = dataclasses.replace(
FEED.meta,
last_fetched_ts=FEED.meta.last_fetched_ts + 10,
retry_after_ts=FEED.meta.last_fetched_ts + 20,
status=feed.FEED_STATUS_DEAD,
etag=random_slug(),
modified=random_slug(),
)
db.update_meta(new_meta)
assert db.get_clock() != old_clock
def test_database_update_feed_status():
db = database.Database(":memory:", random_slug())
db.ensure_database_schema()
@ -267,6 +349,21 @@ def test_database_update_feed_status():
assert db.load_all_meta()[0].status == feed.FEED_STATUS_UNSUBSCRIBED
def test_database_update_feed_status_clock():
db = database.Database(":memory:", random_slug())
db.ensure_database_schema()
db.store_feed(FEED)
old_clock = db.get_clock()
db.update_feed_status(
FEED.meta,
feed.FEED_STATUS_UNSUBSCRIBED,
)
assert db.get_clock() != old_clock
def test_database_redirect_clean():
db = database.Database(":memory:", random_slug())
db.ensure_database_schema()
@ -302,3 +399,17 @@ def test_database_redirect_with_merge():
old_dead_meta = dataclasses.replace(FEED.meta, status=feed.FEED_STATUS_UNSUBSCRIBED)
assert db.load_all_meta() == [old_dead_meta, expected_meta]
assert db.load_all(feed_limit=9999) == [expected_feed]
def test_database_redirect_clock():
db = database.Database(":memory:", random_slug())
db.ensure_database_schema()
db.store_feed(FEED)
old_clock = db.get_clock()
new_url = f"http://example.com/redirect/{random_slug()}"
db.redirect_feed(FEED.meta.url, new_url)
assert db.get_clock() != old_clock