295 lines
8.4 KiB
Python
295 lines
8.4 KiB
Python
# https://simonwillison.net/2023/Sep/30/cli-tools-python/
|
|
import asyncio
|
|
import html
|
|
import http.server
|
|
import io
|
|
import logging
|
|
|
|
import click
|
|
|
|
from . import feed
|
|
from . import feedfinder
|
|
from . import database
|
|
from . import opml
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
@click.group()
|
|
@click.version_option()
|
|
@click.option(
|
|
"-v",
|
|
"--verbose",
|
|
count=True,
|
|
help="Increase the verbosity of the output. This option can be specified multiple times.",
|
|
)
|
|
def cli(verbose):
|
|
"Command line feed reader"
|
|
if verbose > 1:
|
|
level = logging.DEBUG
|
|
elif verbose > 0:
|
|
level = logging.INFO
|
|
else:
|
|
level = logging.WARN
|
|
|
|
logging.basicConfig(level=level)
|
|
|
|
|
|
@cli.command(name="search")
|
|
@click.argument("url")
|
|
def search(url):
|
|
"Search an URL for feeds."
|
|
# TODO: Rewrite to use our new one
|
|
feeds = feedfinder.find_feeds(url)
|
|
for feed in feeds:
|
|
click.echo(feed)
|
|
click.echo(f"Found {len(feeds)} feeds")
|
|
|
|
|
|
@cli.command(name="subscribe")
|
|
@click.argument("url")
|
|
@click.option("--literal/--no-literal", "-l/-L", default=False)
|
|
def subscribe(url, literal):
|
|
"Subscribe to a feed at the specified URL."
|
|
|
|
db = database.Database.local()
|
|
|
|
if not literal:
|
|
click.echo(f"Searching for feeds for {url} ...")
|
|
feeds = asyncio.run(feed.feed_search(url, db.origin))
|
|
if len(feeds) == 0:
|
|
click.echo(f"Unable to find a suitable feed for {url}")
|
|
return 1
|
|
|
|
if len(feeds) > 1:
|
|
# If we found more than one feed then we will try to see what the
|
|
# individual feeds are.
|
|
click.echo(f"Found {len(feeds)} feeds:")
|
|
|
|
max_title = max(len(f.title) for f in feeds)
|
|
max_url = max(len(f.meta.url) for f in feeds)
|
|
|
|
feeds.sort(key=lambda f: f.title)
|
|
|
|
for f in feeds:
|
|
click.echo(f"{f.title:{max_title}} {f.meta.url:{max_url}}")
|
|
|
|
click.echo(
|
|
"\nRun `subscribe` again with the URL of the feed you want to subscribe to."
|
|
)
|
|
return 1
|
|
|
|
result = feeds[0]
|
|
click.echo(f"Identified {result.meta.url} as a feed for {url}")
|
|
else:
|
|
click.echo(f"Fetching {url} ...")
|
|
meta = feed.FeedMeta.from_url(url, db.origin)
|
|
d, meta = asyncio.run(feed.fetch_feed(meta))
|
|
if d is None:
|
|
click.echo(f"Unable to fetch {url}")
|
|
return 1
|
|
|
|
if isinstance(d, str):
|
|
click.echo(f"{url} does not seem to be a feed")
|
|
return 1
|
|
|
|
result = d
|
|
|
|
# Check to see if this URL is already in the database.
|
|
existing = db.load_feed(result.meta.url)
|
|
if existing is not None:
|
|
click.echo(f"This feed already exists (as {result.meta.url})")
|
|
return 1
|
|
|
|
db.store_feed(result)
|
|
click.echo(f"Subscribed to {result.meta.url}")
|
|
|
|
|
|
@cli.command(name="import")
|
|
@click.argument("opml_file", type=click.File("r", encoding="utf-8"))
|
|
def import_opml(opml_file):
|
|
"Import the specified OPML file."
|
|
|
|
db = database.Database.local()
|
|
urls = opml.parse_opml(opml_file.read())
|
|
metas = [feed.FeedMeta.from_url(url, db.origin) for url in urls]
|
|
|
|
click.echo(f"Fetching {len(urls)} feeds ...")
|
|
results = asyncio.run(feed.fetch_many(metas))
|
|
|
|
subscribed = 0
|
|
for index, result in enumerate(results):
|
|
d, meta = result
|
|
url = urls[index]
|
|
if d is None:
|
|
LOG.warn(f"Unable to fetch {url}, skipping...")
|
|
continue
|
|
|
|
if isinstance(d, str):
|
|
click.echo(f"{url} does not seem to be a feed, skipping...")
|
|
continue
|
|
|
|
existing = db.load_feed(meta.url)
|
|
if existing is not None:
|
|
LOG.info(f"{url} already exists (as {meta.url})")
|
|
continue
|
|
|
|
db.store_feed(d)
|
|
subscribed = subscribed + 1
|
|
|
|
click.echo(f"Subscribed to {subscribed} new feeds")
|
|
|
|
|
|
@cli.command(name="refresh")
|
|
@click.argument("url", required=False, default=None)
|
|
def refresh(url):
|
|
"""Refresh one or more feeds.
|
|
|
|
If a URL is specified, refresh that URL. Otherwise, refresh all subscribed
|
|
feeds.
|
|
"""
|
|
|
|
db = database.Database.local()
|
|
if url:
|
|
f = db.load_feed(url)
|
|
if f is None:
|
|
click.echo(f"Not subscribed to {url}")
|
|
return 1
|
|
feeds = [f.meta]
|
|
else:
|
|
feeds = db.load_all_meta()
|
|
|
|
click.echo(f"Refreshing {len(feeds)} feed(s)...")
|
|
results = asyncio.run(feed.fetch_many(feeds))
|
|
|
|
new_count = 0
|
|
for d, meta in results:
|
|
if d is None:
|
|
# Nothing new.
|
|
db.update_meta(meta)
|
|
elif isinstance(d, str):
|
|
click.echo(f"WARNING: {meta.url} returned a non-feed result!")
|
|
else:
|
|
# New items, possibly!
|
|
new_count = new_count + db.store_feed(d)
|
|
|
|
click.echo(f"Fetched {new_count} new entries.")
|
|
|
|
|
|
@cli.command(name="show")
|
|
@click.argument("pattern", required=False, default="")
|
|
@click.option(
|
|
"--count",
|
|
"-c",
|
|
type=int,
|
|
default=10,
|
|
show_default=True,
|
|
help="Show at most this many entries from each feed.",
|
|
)
|
|
def show(pattern, count):
|
|
"""Show feeds and entries.
|
|
|
|
If a pattern is supplied, then filter the feeds to urls or titles that
|
|
match the pattern. Otherwise, just show everything.
|
|
"""
|
|
|
|
db = database.Database.local()
|
|
feeds = db.load_all(feed_limit=count, pattern=pattern or "")
|
|
|
|
feeds.sort(key=feed.sort_key, reverse=True)
|
|
for f in feeds:
|
|
click.echo(f"{f.title}")
|
|
if len(f.entries) > 0:
|
|
for entry in f.entries:
|
|
click.echo(f" {entry.title}")
|
|
else:
|
|
click.echo(f" <No Entries>")
|
|
click.echo()
|
|
|
|
|
|
@cli.command("list")
|
|
@click.argument("pattern", required=False, default="")
|
|
def list_feeds(pattern):
|
|
"""List subscribed feeds.
|
|
|
|
If a pattern is supplied, then filter the feeds to urls or titles that
|
|
match the pattern. Otherwise, just show everything.
|
|
"""
|
|
db = database.Database.local()
|
|
feeds = db.load_all(feed_limit=0, pattern=pattern)
|
|
|
|
max_title = max(len(f.title) for f in feeds)
|
|
max_url = max(len(f.meta.url) for f in feeds)
|
|
|
|
feeds.sort(key=lambda f: f.title)
|
|
|
|
for f in feeds:
|
|
click.echo(f"{f.title:{max_title}} {f.meta.url:{max_url}}")
|
|
|
|
|
|
@cli.command("unsubscribe")
|
|
@click.argument("url")
|
|
def unsubscribe(url):
|
|
"""Unsubscribe from the specified feed.
|
|
|
|
(If you need to find the URL for the feed to unsubscribe from, use the
|
|
`list` command.)
|
|
"""
|
|
db = database.Database.local()
|
|
count = db.set_feed_status(url, feed.FEED_STATUS_UNSUBSCRIBED)
|
|
if count == 0:
|
|
click.echo(f"Not subscribed to feed {url}")
|
|
return 1
|
|
|
|
|
|
@cli.command("serve")
|
|
def serve():
|
|
class Handler(http.server.BaseHTTPRequestHandler):
|
|
def do_GET(self):
|
|
db = database.Database.local()
|
|
feeds = db.load_all(feed_limit=10)
|
|
del db
|
|
|
|
feeds.sort(key=feed.sort_key, reverse=True)
|
|
|
|
buffer = io.StringIO()
|
|
buffer.write(
|
|
"""
|
|
<!doctype html>
|
|
<head>
|
|
<meta charset="utf8">
|
|
<title>Subscribed Feeds</title>
|
|
</head>
|
|
<h1>Feeds</h1>
|
|
"""
|
|
)
|
|
for f in feeds:
|
|
feed_title = html.escape(f.title)
|
|
if len(f.entries) > 0:
|
|
ago = f" ({f.entries[0].time_ago()})"
|
|
else:
|
|
ago = ""
|
|
buffer.write(f'<h2><a href="{f.link}">{feed_title}</a>{ago}</h2>')
|
|
buffer.write(f"<div>")
|
|
if len(f.entries) > 0:
|
|
for entry in f.entries:
|
|
title = html.escape(entry.title)
|
|
buffer.write(
|
|
f'<span class="entry">• <a href="{entry.link}">{title}</a> ({entry.time_ago()})</span> '
|
|
)
|
|
else:
|
|
buffer.write("<i>No entries...</i>")
|
|
buffer.write(f"</div>")
|
|
buffer.flush()
|
|
text = buffer.getvalue()
|
|
response = text.encode("utf-8")
|
|
|
|
self.send_response(200)
|
|
self.send_header("content-type", "text/html")
|
|
self.send_header("content-length", str(len(response)))
|
|
self.end_headers()
|
|
self.wfile.write(response)
|
|
|
|
with http.server.HTTPServer(("", 8000), Handler) as server:
|
|
click.echo("Serving at http://127.0.0.1:8000/")
|
|
server.serve_forever()
|