119 lines
3 KiB
Python
119 lines
3 KiB
Python
# https://simonwillison.net/2023/Sep/30/cli-tools-python/
|
|
import asyncio
|
|
import logging
|
|
|
|
import click
|
|
|
|
from . import feed
|
|
from . import database
|
|
from . import opml
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
@click.group()
|
|
@click.version_option()
|
|
@click.option("-v", "--verbose", count=True)
|
|
def cli(verbose):
|
|
"Command line feed reader"
|
|
if verbose > 1:
|
|
level = logging.DEBUG
|
|
elif verbose > 0:
|
|
level = logging.INFO
|
|
else:
|
|
level = logging.WARN
|
|
|
|
logging.basicConfig(level=level)
|
|
|
|
|
|
@cli.command(name="subscribe")
|
|
@click.argument("url")
|
|
def subscribe(url):
|
|
"Subscribe to a feed at the specified URL."
|
|
|
|
db = database.Database.local()
|
|
|
|
click.echo(f"Fetching {url} ...")
|
|
meta = feed.FeedMeta.from_url(url, db.origin)
|
|
d, meta = asyncio.run(feed.fetch_feed(meta))
|
|
if d is None:
|
|
click.echo(f"Unable to fetch {url}")
|
|
return 1
|
|
|
|
# Check to see if this URL is already in the database.
|
|
existing = db.load_feed(meta.url)
|
|
if existing is not None:
|
|
click.echo(f"This feed already exists (as {meta.url})")
|
|
return 1
|
|
|
|
f = feed.Feed.from_parsed(d, meta)
|
|
db.store_feed(f)
|
|
|
|
click.echo(f"Subscribed to {meta.url}")
|
|
|
|
|
|
@cli.command(name="import")
|
|
@click.argument("opml_file", type=click.File("r", encoding="utf-8"))
|
|
def import_opml(opml_file):
|
|
"Import the specified OPML file."
|
|
|
|
db = database.Database.local()
|
|
urls = opml.parse_opml(opml_file.read())
|
|
metas = [feed.FeedMeta.from_url(url, db.origin) for url in urls]
|
|
|
|
click.echo(f"Fetching {len(urls)} feeds ...")
|
|
results = asyncio.run(feed.fetch_many(metas))
|
|
|
|
subscribed = 0
|
|
for index, result in enumerate(results):
|
|
d, meta = result
|
|
url = urls[index]
|
|
if d is None:
|
|
LOG.warn(f"Unable to fetch {url}, skipping...")
|
|
continue
|
|
|
|
existing = db.load_feed(meta.url)
|
|
if existing is not None:
|
|
LOG.info(f"{url} already exists (as {meta.url})")
|
|
continue
|
|
|
|
f = feed.Feed.from_parsed(d, meta)
|
|
db.store_feed(f)
|
|
subscribed = subscribed + 1
|
|
|
|
click.echo(f"Subscribed to {subscribed} new feeds")
|
|
|
|
|
|
@cli.command(name="refresh")
|
|
@click.argument("url", required=False, default=None)
|
|
def refresh(url):
|
|
"""Refresh one or more feeds.
|
|
|
|
If a URL is specified, refresh that URL. Otherwise, refresh all subscribed
|
|
feeds.
|
|
"""
|
|
|
|
db = database.Database.local()
|
|
if url:
|
|
f = db.load_feed(url)
|
|
if f is None:
|
|
click.echo(f"Not subscribed to {url}")
|
|
return 1
|
|
feeds = [f.meta]
|
|
else:
|
|
feeds = db.load_all_meta()
|
|
|
|
click.echo(f"Refreshing {len(feeds)} feed(s)...")
|
|
results = asyncio.run(feed.fetch_many(feeds))
|
|
|
|
new_count = 0
|
|
for d, meta in results:
|
|
if d is None:
|
|
# Nothing new.
|
|
db.update_meta(meta)
|
|
else:
|
|
# New items, possibly!
|
|
f = feed.Feed.from_parsed(d, meta)
|
|
new_count = new_count + db.store_feed(f)
|
|
|
|
click.echo(f"Fetched {new_count} new entries.")
|