diff --git a/cry/cli.py b/cry/cli.py
index d08e001..b3e9415 100644
--- a/cry/cli.py
+++ b/cry/cli.py
@@ -8,6 +8,7 @@ import logging
import click
from . import feed
+from . import feedfinder
from . import database
from . import opml
@@ -34,6 +35,16 @@ def cli(verbose):
logging.basicConfig(level=level)
+@cli.command(name="search")
+@click.argument("url")
+def search(url):
+ "Search an URL for feeds."
+ feeds = feedfinder.find_feeds(url)
+ for feed in feeds:
+ click.echo(feed)
+ click.echo(f"Found {len(feeds)} feeds")
+
+
@cli.command(name="subscribe")
@click.argument("url")
def subscribe(url):
diff --git a/cry/feedfinder.py b/cry/feedfinder.py
new file mode 100644
index 0000000..aa21a3f
--- /dev/null
+++ b/cry/feedfinder.py
@@ -0,0 +1,350 @@
+"""feedfinder: Find the Web feed for a Web page
+
+Based on http://www.aaronsw.com/2002/feedfinder/
+
+Kinda rewritten by John Doty for the Python3 and the cry aggregator, but the
+basic frame remains.
+"""
+
+import logging
+import re
+import sys
+import typing
+import urllib
+import urllib.parse
+import urllib.request
+import urllib.robotparser
+
+import requests
+
+
+LOG = logging.getLogger(__name__)
+
+
+class URLGatekeeper:
+ """a class to track robots.txt rules across multiple servers"""
+
+ def __init__(self):
+ self.rpcache = {} # a dictionary of RobotFileParser objects, by domain
+ self.agent = f"cry/0.9"
+ self.session = requests.Session()
+ self.session.headers["user-agent"] = self.agent
+ LOG.debug(f"User agent: {self.agent}")
+
+ def _getrp(self, url):
+ protocol, domain = urllib.parse.urlparse(url)[:2]
+ if domain in self.rpcache:
+ return self.rpcache[domain]
+ baseurl = "%s://%s" % (protocol, domain)
+ robotsurl = urllib.parse.urljoin(baseurl, "robots.txt")
+
+ rp = urllib.robotparser.RobotFileParser(robotsurl)
+ try:
+ response = self.session.get(robotsurl)
+ lines = response.text.splitlines()
+ rp.parse(lines)
+ except:
+ pass
+ self.rpcache[domain] = rp
+ return rp
+
+ def can_fetch(self, url):
+ rp = self._getrp(url)
+ allow = rp.can_fetch(self.agent, url)
+ LOG.debug(f"gatekeeper of {url} says {allow}")
+ return allow
+
+ def get(self, url, check=True):
+ if check and not self.can_fetch(url):
+ return ""
+ try:
+ return self.session.get(url, timeout=10).text
+ except:
+ return ""
+
+
+_gatekeeper = URLGatekeeper()
+
+import html.parser
+
+
+class HtmlBasedParser(html.parser.HTMLParser):
+ FEED_TYPES = (
+ "application/rss+xml",
+ "text/xml",
+ "application/atom+xml",
+ "application/x.atom+xml",
+ "application/x-atom+xml",
+ )
+
+ link_links: list[str]
+ a_links: list[str]
+
+ def __init__(self, baseuri):
+ super().__init__()
+ self.baseuri = baseuri
+ self.link_links = []
+ self.a_links = []
+
+ def handle_starttag(self, tag, attrs):
+ attrs = {k: v for k, v in attrs}
+ if tag == "base":
+ self.do_base(attrs)
+ elif tag == "link":
+ self.do_link(attrs)
+ elif tag == "a":
+ self.do_a(attrs)
+
+ def do_base(self, attrs):
+ base = attrs.get("href")
+ if base is not None:
+ self.baseuri = base
+
+ def do_link(self, attrs):
+ rel = attrs.get("rel")
+ if rel is None:
+ return
+
+ if "alternate" not in rel.split():
+ return
+
+ if attrs.get("type", "").lower() not in self.FEED_TYPES:
+ return
+
+ href = attrs.get("href")
+ if href is None:
+ return
+
+ self.link_links.append(urllib.parse.urljoin(self.baseuri, href))
+
+ def do_a(self, attrs):
+ href = attrs.get("href")
+ if href is None:
+ return
+
+ self.a_links.append(urllib.parse.urljoin(self.baseuri, href))
+
+
+def makeFullURI(uri):
+ uri = uri.strip()
+ if uri.startswith("feed://"):
+ uri = "http://" + uri.split("feed://", 1).pop()
+ for x in ["http", "https"]:
+ if uri.startswith("%s://" % x):
+ return uri
+ return "http://%s" % uri
+
+
+def classify_links(links, baseuri) -> typing.Tuple[list[str], list[str]]:
+ """Split the links into two sets: local (which start with baseuri) and
+ remote (which don't).
+ """
+ baseuri = baseuri.lower()
+
+ local, remote = [], []
+ for link in links:
+ if link.lower().startswith(baseuri):
+ local.append(link)
+ else:
+ remote.append(link)
+
+ return local, remote
+
+
+def is_feed_link(link: str) -> bool:
+ """Return True if the link seems to be a feed link, or False otherwise."""
+ link = link.lower()
+ return (
+ link.endswith(".rss")
+ or link.endswith(".rdf")
+ or link.endswith(".xml")
+ or link.endswith(".atom")
+ )
+
+
+def is_XML_related_link(link: str) -> bool:
+ link = link.lower()
+ return "rss" in link or "rdf" in link or "xml" in link or "atom" in link
+
+
+r_brokenRedirect = re.compile("]*>(.*?)", re.S)
+
+
+def try_broken_redirect(data) -> str | None:
+ """See if the content is a 'broken redirect'.
+
+ This is in the code taken from aaronsw and I don't know what, if anything,
+ ever generated this.
+ """
+ if " bool:
+ """See if the data might be a feed."""
+ data = data.lower()
+ if data.count(" 0
+
+
+def is_feed(uri: str) -> bool:
+ """See if the data at `uri` might be a feed."""
+ LOG.debug(f"seeing if {uri} is a feed")
+ protocol = urllib.parse.urlparse(uri)
+ if protocol[0] not in ("http", "https"):
+ return False
+ data = _gatekeeper.get(uri)
+ return could_be_feed_data(data)
+
+
+def find_feeds(uri: str, all: bool = False, _recurs=None) -> list[str]:
+ """Find feeds for the given URI.
+
+ How it works:
+ 0.
+
+ 1. If the URI points to a feed, it is simply returned; otherwise
+ the page is downloaded and the real fun begins.
+
+ 2. Feeds pointed to by LINK tags in the header of the page (autodiscovery)
+
+ 3. links to feeds on the same server ending in ".rss", ".rdf", ".xml",
+ or ".atom"
+
+ 4. links to feeds on the same server containing "rss", "rdf", "xml", or
+ "atom"
+
+ 5. links to feeds on external servers ending in ".rss", ".rdf", ".xml",
+ or ".atom"
+
+ 6. links to feeds on external servers containing "rss", "rdf", "xml", or
+ "atom"
+
+ 7. Try some guesses about common places for feeds. (index.xml, atom.xml,
+ etc.)
+
+ (At every step, feeds are minimally verified to make sure they are really
+ feeds.)
+
+ If `all` is True then return all possible feeds, kinda sorta ordered in
+ terms of goodness. Otherwise, we stop as soon as one of the above steps
+ finds a likely feed.
+ """
+ if _recurs is None:
+ _recurs = [uri]
+ fulluri = makeFullURI(uri)
+ try:
+ data = _gatekeeper.get(fulluri, check=False)
+ except:
+ return []
+
+ # is this already a feed?
+ if could_be_feed_data(data):
+ return [fulluri]
+
+ newuri = try_broken_redirect(data)
+ if newuri and newuri not in _recurs:
+ _recurs.append(newuri)
+ return feeds(newuri, all=all, _recurs=_recurs)
+
+ # nope, it's a page, try LINK tags first
+ parser = HtmlBasedParser(fulluri)
+ parser.feed(data)
+
+ outfeeds = [link for link in parser.link_links if is_feed(link)]
+ LOG.info(f"found {len(outfeeds)} through LINK tags")
+
+ if all or len(outfeeds) == 0:
+ # no LINK tags, look for regular links that point to feeds
+ if not all:
+ LOG.info("no LINK tags, looking at A tags")
+
+ local_links, remote_links = classify_links(parser.a_links, fulluri)
+
+ # look for obvious feed links on the same server
+ outfeeds.extend(filter(is_feed, filter(is_feed_link, local_links)))
+ if all or len(outfeeds) == 0:
+ # look harder for feed links on the same server
+ outfeeds.extend(filter(is_feed, filter(is_XML_related_link, local_links)))
+
+ if all or len(outfeeds) == 0:
+ # look for obvious feed links on another server
+ outfeeds.extend(filter(is_feed, filter(is_feed_link, remote_links)))
+
+ if all or len(outfeeds) == 0:
+ # look harder for feed links on another server
+ outfeeds.extend(filter(is_feed, filter(is_XML_related_link, remote_links)))
+
+ if all or len(outfeeds) == 0:
+ LOG.debug("no A tags, guessing")
+ suffixes = [ # filenames used by popular software:
+ "atom.xml", # blogger, TypePad
+ "index.atom", # MT, apparently
+ "index.rdf", # MT
+ "rss.xml", # Dave Winer/Manila
+ "index.xml", # MT
+ "index.rss", # Slash
+ ]
+ outfeeds.extend(
+ filter(is_feed, [urllib.parse.urljoin(fulluri, x) for x in suffixes])
+ )
+
+ return list(set(outfeeds))
+
+
+##### test harness ######
+
+
+def test():
+ uri = "http://diveintomark.org/tests/client/autodiscovery/html4-001.html"
+ failed = []
+ count = 0
+ while 1:
+ data = _gatekeeper.get(uri)
+ if data.find("Atom autodiscovery test") == -1:
+ break
+ sys.stdout.write(".")
+ sys.stdout.flush()
+ count += 1
+ links = getLinks(data, uri)
+ if not links:
+ print(f"\n*** FAILED *** {uri} could not find link")
+ failed.append(uri)
+ elif len(links) > 1:
+ print(f"\n*** FAILED *** {uri} found too many links")
+ failed.append(uri)
+ else:
+ atomdata = requests.get(links[0]).text
+ if atomdata.find('