import dataclasses import pathlib import random import string import tempfile import time from cry import database from cry import feed def random_slug() -> str: return "".join( random.choices( string.ascii_uppercase + string.ascii_lowercase + string.digits, k=8 ) ) def test_database_origin_path(): op = database.origin_path() assert op is not None def test_database_local_origin(): with tempfile.TemporaryDirectory() as op: origin_file = pathlib.Path(op) / "origin" assert not origin_file.exists() origin = database.local_origin(origin_file) assert origin_file.exists() assert len(origin) > 0 def test_database_local_origin_repeatable(): with tempfile.TemporaryDirectory() as op: origin_file = pathlib.Path(op) / "origin" a = database.local_origin(origin_file) b = database.local_origin(origin_file) assert len(a) > 0 assert a == b def test_database_origin_in_path(): slug = random_slug() p = database.database_path(slug) assert slug in str(p) def test_database_schema(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() c = db.db.execute("SELECT value FROM properties WHERE name = 'version'") row = c.fetchone() assert int(row[0]) == len(database.SCHEMA_STATEMENTS) def test_database_prop_get_set(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() assert db.get_property("foo") is None val = random_slug() db.set_property("foo", val) assert db.get_property("foo") == val REF_TIME = int(time.time()) FEED = feed.Feed( meta=feed.FeedMeta( url="http://example.com/test/feed", last_fetched_ts=REF_TIME, retry_after_ts=REF_TIME, status=feed.FEED_STATUS_ALIVE, etag=random_slug(), modified=random_slug(), ), title="Test Feed", link="http://example.com/test", entries=[ feed.Entry( id=random_slug(), inserted_at=(REF_TIME * 1000) + index, title=f"Entry {index}", link=f"http://example.com/test/a{index}", ) for index in range(100, 0, -1) ], ) def test_database_load_store_meta(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() metas = db.load_all_meta() assert metas == [] def test_database_store_feed(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() db.store_feed(FEED) loaded_meta = db.load_meta(FEED.meta.url) assert loaded_meta == FEED.meta def test_database_store_feed_dups(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() count = db.store_feed(FEED) assert count == len(FEED.entries) new_entries = db.store_feed(FEED) assert new_entries == 0 def test_database_store_feed_fetch_meta(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() db.store_feed(FEED) meta = db.load_all_meta() assert meta == [FEED.meta] def test_database_store_feed_fetch_all(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() db.store_feed(FEED) expected = dataclasses.replace(FEED, entries=FEED.entries[:13]) all_feeds = db.load_all(feed_limit=13) assert all_feeds == [expected] def test_database_store_feed_fetch_all_dups(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() db.store_feed(FEED) db.store_feed(FEED) all_feeds = db.load_all(feed_limit=10000) assert all_feeds == [FEED] def test_database_store_feed_fetch_pattern_miss(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() db.store_feed(FEED) expected = dataclasses.replace(FEED, entries=FEED.entries[:13]) all_feeds = db.load_all(feed_limit=13, pattern="no_existo") assert all_feeds == [] def test_database_store_feed_fetch_pattern_url(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() db.store_feed(FEED) expected = dataclasses.replace(FEED, entries=FEED.entries[:13]) all_feeds = db.load_all(feed_limit=13, pattern=FEED.link) assert all_feeds == [expected] def test_database_store_feed_fetch_pattern_name(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() db.store_feed(FEED) expected = dataclasses.replace(FEED, entries=FEED.entries[:13]) all_feeds = db.load_all(feed_limit=13, pattern=FEED.title) assert all_feeds == [expected] def test_database_store_with_update(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() db.store_feed(FEED) updated_feed = dataclasses.replace( FEED, meta=dataclasses.replace( FEED.meta, last_fetched_ts=FEED.meta.last_fetched_ts + 10, retry_after_ts=FEED.meta.retry_after_ts + 20, # status=feed.FEED_STATUS_UNSUBSCRIBED, etag=None, modified=random_slug(), ), title=FEED.title + " (updated)", link=FEED.link + "/updated", ) db.store_feed(updated_feed) all_feeds = db.load_all(feed_limit=100) assert all_feeds == [updated_feed] def test_database_store_with_older_entries(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() db.store_feed(FEED) old_entry = FEED.entries[0] older_entry = dataclasses.replace( old_entry, inserted_at=old_entry.inserted_at - 10, title=old_entry.title + " (older)", link=old_entry.link + "/older", ) updated_feed = dataclasses.replace(FEED, entries=[older_entry]) db.store_feed(updated_feed) all_feeds = db.load_all(feed_limit=100) found_entries = list( filter( lambda e: e.id == older_entry.id, all_feeds[0].entries, ) ) assert found_entries == [older_entry] def test_database_store_update_meta(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() db.store_feed(FEED) new_meta = dataclasses.replace( FEED.meta, last_fetched_ts=FEED.meta.last_fetched_ts + 10, retry_after_ts=FEED.meta.last_fetched_ts + 20, status=feed.FEED_STATUS_DEAD, etag=random_slug(), modified=random_slug(), ) db.update_meta(new_meta) assert db.load_all_meta()[0] == new_meta def test_database_update_feed_status(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() db.store_feed(FEED) assert db.load_all_meta()[0].status != feed.FEED_STATUS_UNSUBSCRIBED db.update_feed_status( FEED.meta, feed.FEED_STATUS_UNSUBSCRIBED, ) assert db.load_all_meta()[0].status == feed.FEED_STATUS_UNSUBSCRIBED def test_database_redirect_clean(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() db.store_feed(FEED) new_url = f"http://example.com/redirect/{random_slug()}" db.redirect_feed(FEED.meta.url, new_url) expected_meta = dataclasses.replace(FEED.meta, url=new_url) assert db.load_all_meta() == [expected_meta] expected_feed = dataclasses.replace(FEED, meta=expected_meta) assert db.load_all(feed_limit=9999) == [expected_feed] def test_database_redirect_with_merge(): db = database.Database(":memory:", random_slug()) db.ensure_database_schema() db.store_feed(FEED) new_url = f"http://example.com/redirect/{random_slug()}" expected_meta = dataclasses.replace(FEED.meta, url=new_url) expected_feed = dataclasses.replace(FEED, meta=expected_meta) db.store_feed(expected_feed) # NOTE: This is flaky because the time might shift on me. db.redirect_feed(FEED.meta.url, new_url) old_dead_meta = dataclasses.replace(FEED.meta, status=feed.FEED_STATUS_UNSUBSCRIBED) assert db.load_all_meta() == [old_dead_meta, expected_meta] assert db.load_all(feed_limit=9999) == [expected_feed]