diff --git a/cry/web.py b/cry/web.py index 3dd8dba..7a5fea5 100644 --- a/cry/web.py +++ b/cry/web.py @@ -5,7 +5,9 @@ import html import http.server import io import time +import traceback import threading +import urllib.parse from . import database from . import feed @@ -109,13 +111,6 @@ class AcquiredDeadlineCondition: return True -class Closed: - pass - - -CLOSED = Closed() - - @dataclasses.dataclass class Event: event: str | None @@ -140,6 +135,15 @@ class EventChannel: self.closed = True lock.signal() + def log(self, line: str): + self.event("log", line) + + def status(self, status: str): + self.event("status", status) + + def redirect(self, url: str): + self.event("redirect", url) + def event(self, event: str, data: str | None = None): deadline = time.time() + 30 with self.condition.acquire(deadline) as lock: @@ -202,16 +206,17 @@ class RefreshTask: sink = self.sink try: db = database.Database.local() - sink.event("status", "Synchronizing state...") + sink.status("Synchronizing state...") database.sync(db) - sink.event("status", "Loading subscriptions...") + sink.status("Loading subscriptions...") metas = db.load_all_meta() - sink.event("status", "Refreshing subscriptions...") + sink.status("Refreshing subscriptions...") asyncio.run(self._refresh_all(db, metas)) - sink.event("status", "Done") + sink.status("Done") + sink.redirect("/") finally: sink.close() @@ -222,40 +227,102 @@ class RefreshTask: async def _refresh_meta(self, db: database.Database, meta: feed.FeedMeta): sink = self.sink - sink.event("log", f"[{meta.url}] Fetching...") + sink.log(f"[{meta.url}] Fetching...") d = None try: d, meta = await feed.fetch_feed(meta) if d is None: - sink.event("log", f"[{meta.url}] No updates") + sink.log(f"[{meta.url}] No updates") db.update_meta(meta) elif isinstance(d, str): - sink.event( - "log", + sink.log( f"[{meta.url}] WARNING: returned a non-feed result!", ) else: new_count = db.store_feed(d) - sink.event( - "log", + sink.log( f"[{meta.url}] {new_count} new items\n", ) except Exception as e: - sink.event("log", f"[{meta.url}] Error refressing feed: {e}") + sink.log(f"[{meta.url}] Error refressing feed: {e}") REFRESH_TASK: RefreshTask | None = None +class SubscribeTask: + url: str + sink: EventChannel + thread: threading.Thread + + def __init__(self, url: str): + self.url = url + + self.sink = EventChannel() + self.thread = threading.Thread(target=self._task_thread) + self.thread.daemon = True + self.thread.start() + + @property + def closed(self): + return self.sink.closed + + def _task_thread(self): + sink = self.sink + url = self.url + + try: + db = database.Database.local() + sink.status("Synchronizing state...") + database.sync(db) + + sink.status("Searching for feeds...") + feeds = asyncio.run(feed.feed_search(url)) + if len(feeds) == 0: + sink.status(f"Unable to find a suitable feed for {url}") + return + + if len(feeds) > 1: + candidates = [(("t", f.title), ("u", f.meta.url)) for f in feeds] + qs = urllib.parse.urlencode([e for c in candidates for e in c]) + sink.redirect(f"/subscribe-choose?{qs}") + return + + result = feeds[0] + sink.log(f"Identified {result.meta.url} as a feed for {url}") + + existing = db.load_meta(result.meta.url) + if existing is not None: + sink.log(f"This feed already exists (as {result.meta.url})") + sink.status("Already subscribed") + return + + db.store_feed(result) + sink.status("Done") + sink.redirect("/") + + finally: + sink.close() + + +SUBSCRIBE_TASK: SubscribeTask | None = None + + class Handler(http.server.BaseHTTPRequestHandler): def do_GET(self): if self.path == "/": return self.serve_feeds() elif self.path == "/refresh-status": - return self.serve_refresh_status() - elif self.path == "/events/refresh": + return self.serve_status() + elif self.path == "/subscribe-status": + return self.serve_status() + elif self.path == "/refresh-status/events": return self.serve_events(REFRESH_TASK) + elif self.path == "/subscribe-status/events": + return self.serve_events(SUBSCRIBE_TASK) + elif self.path.startswith("/subscribe-choose?"): + return self.serve_subscribe_choose() else: self.send_error(404) @@ -263,6 +330,8 @@ class Handler(http.server.BaseHTTPRequestHandler): print(f"{self.path}") if self.path == "/refresh": self.do_refresh() + if self.path == "/subscribe": + self.do_subscribe() else: self.send_error(400) @@ -275,7 +344,27 @@ class Handler(http.server.BaseHTTPRequestHandler): self.send_header("Location", "/refresh-status") self.end_headers() - def serve_events(self, task: RefreshTask | None): + def do_subscribe(self): + # pull url from form body + try: + content_length = int(self.headers.get("content-length", "0")) + content_str = self.rfile.read(content_length).decode("utf-8") + params = urllib.parse.parse_qs(content_str) + url = params["url"][0] + except Exception as e: + tb = "\n".join(traceback.format_exception(e)) + self.send_error(400, explain=tb) + return + + global SUBSCRIBE_TASK + if SUBSCRIBE_TASK is None or SUBSCRIBE_TASK.closed: + SUBSCRIBE_TASK = SubscribeTask(url) + + self.send_response(303) + self.send_header("Location", "/subscribe-status") + self.end_headers() + + def serve_events(self, task: RefreshTask | SubscribeTask | None): if task is None or task.closed: self.send_response(204) self.end_headers() @@ -321,7 +410,7 @@ class Handler(http.server.BaseHTTPRequestHandler): self.wfile.write(b"event: closed\ndata\n\n") self.wfile.flush() - def serve_refresh_status(self): + def serve_status(self): global REFRESH_TASK buffer = io.StringIO() @@ -329,7 +418,7 @@ class Handler(http.server.BaseHTTPRequestHandler): """
-More than one feed was found. Choose the feed to subscribe to.
+| Title | URL | + |
|---|