From 474e8656e270a15d9e3494ea80cdf3765df7bf51 Mon Sep 17 00:00:00 2001 From: John Doty Date: Sun, 14 Jul 2024 20:55:49 +0900 Subject: [PATCH] Feed finder (basic) --- cry/cli.py | 11 ++ cry/feedfinder.py | 350 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 361 insertions(+) create mode 100644 cry/feedfinder.py diff --git a/cry/cli.py b/cry/cli.py index d08e001..b3e9415 100644 --- a/cry/cli.py +++ b/cry/cli.py @@ -8,6 +8,7 @@ import logging import click from . import feed +from . import feedfinder from . import database from . import opml @@ -34,6 +35,16 @@ def cli(verbose): logging.basicConfig(level=level) +@cli.command(name="search") +@click.argument("url") +def search(url): + "Search an URL for feeds." + feeds = feedfinder.find_feeds(url) + for feed in feeds: + click.echo(feed) + click.echo(f"Found {len(feeds)} feeds") + + @cli.command(name="subscribe") @click.argument("url") def subscribe(url): diff --git a/cry/feedfinder.py b/cry/feedfinder.py new file mode 100644 index 0000000..aa21a3f --- /dev/null +++ b/cry/feedfinder.py @@ -0,0 +1,350 @@ +"""feedfinder: Find the Web feed for a Web page + +Based on http://www.aaronsw.com/2002/feedfinder/ + +Kinda rewritten by John Doty for the Python3 and the cry aggregator, but the +basic frame remains. +""" + +import logging +import re +import sys +import typing +import urllib +import urllib.parse +import urllib.request +import urllib.robotparser + +import requests + + +LOG = logging.getLogger(__name__) + + +class URLGatekeeper: + """a class to track robots.txt rules across multiple servers""" + + def __init__(self): + self.rpcache = {} # a dictionary of RobotFileParser objects, by domain + self.agent = f"cry/0.9" + self.session = requests.Session() + self.session.headers["user-agent"] = self.agent + LOG.debug(f"User agent: {self.agent}") + + def _getrp(self, url): + protocol, domain = urllib.parse.urlparse(url)[:2] + if domain in self.rpcache: + return self.rpcache[domain] + baseurl = "%s://%s" % (protocol, domain) + robotsurl = urllib.parse.urljoin(baseurl, "robots.txt") + + rp = urllib.robotparser.RobotFileParser(robotsurl) + try: + response = self.session.get(robotsurl) + lines = response.text.splitlines() + rp.parse(lines) + except: + pass + self.rpcache[domain] = rp + return rp + + def can_fetch(self, url): + rp = self._getrp(url) + allow = rp.can_fetch(self.agent, url) + LOG.debug(f"gatekeeper of {url} says {allow}") + return allow + + def get(self, url, check=True): + if check and not self.can_fetch(url): + return "" + try: + return self.session.get(url, timeout=10).text + except: + return "" + + +_gatekeeper = URLGatekeeper() + +import html.parser + + +class HtmlBasedParser(html.parser.HTMLParser): + FEED_TYPES = ( + "application/rss+xml", + "text/xml", + "application/atom+xml", + "application/x.atom+xml", + "application/x-atom+xml", + ) + + link_links: list[str] + a_links: list[str] + + def __init__(self, baseuri): + super().__init__() + self.baseuri = baseuri + self.link_links = [] + self.a_links = [] + + def handle_starttag(self, tag, attrs): + attrs = {k: v for k, v in attrs} + if tag == "base": + self.do_base(attrs) + elif tag == "link": + self.do_link(attrs) + elif tag == "a": + self.do_a(attrs) + + def do_base(self, attrs): + base = attrs.get("href") + if base is not None: + self.baseuri = base + + def do_link(self, attrs): + rel = attrs.get("rel") + if rel is None: + return + + if "alternate" not in rel.split(): + return + + if attrs.get("type", "").lower() not in self.FEED_TYPES: + return + + href = attrs.get("href") + if href is None: + return + + self.link_links.append(urllib.parse.urljoin(self.baseuri, href)) + + def do_a(self, attrs): + href = attrs.get("href") + if href is None: + return + + self.a_links.append(urllib.parse.urljoin(self.baseuri, href)) + + +def makeFullURI(uri): + uri = uri.strip() + if uri.startswith("feed://"): + uri = "http://" + uri.split("feed://", 1).pop() + for x in ["http", "https"]: + if uri.startswith("%s://" % x): + return uri + return "http://%s" % uri + + +def classify_links(links, baseuri) -> typing.Tuple[list[str], list[str]]: + """Split the links into two sets: local (which start with baseuri) and + remote (which don't). + """ + baseuri = baseuri.lower() + + local, remote = [], [] + for link in links: + if link.lower().startswith(baseuri): + local.append(link) + else: + remote.append(link) + + return local, remote + + +def is_feed_link(link: str) -> bool: + """Return True if the link seems to be a feed link, or False otherwise.""" + link = link.lower() + return ( + link.endswith(".rss") + or link.endswith(".rdf") + or link.endswith(".xml") + or link.endswith(".atom") + ) + + +def is_XML_related_link(link: str) -> bool: + link = link.lower() + return "rss" in link or "rdf" in link or "xml" in link or "atom" in link + + +r_brokenRedirect = re.compile("]*>(.*?)", re.S) + + +def try_broken_redirect(data) -> str | None: + """See if the content is a 'broken redirect'. + + This is in the code taken from aaronsw and I don't know what, if anything, + ever generated this. + """ + if " bool: + """See if the data might be a feed.""" + data = data.lower() + if data.count(" 0 + + +def is_feed(uri: str) -> bool: + """See if the data at `uri` might be a feed.""" + LOG.debug(f"seeing if {uri} is a feed") + protocol = urllib.parse.urlparse(uri) + if protocol[0] not in ("http", "https"): + return False + data = _gatekeeper.get(uri) + return could_be_feed_data(data) + + +def find_feeds(uri: str, all: bool = False, _recurs=None) -> list[str]: + """Find feeds for the given URI. + + How it works: + 0. + + 1. If the URI points to a feed, it is simply returned; otherwise + the page is downloaded and the real fun begins. + + 2. Feeds pointed to by LINK tags in the header of the page (autodiscovery) + + 3. links to feeds on the same server ending in ".rss", ".rdf", ".xml", + or ".atom" + + 4. links to feeds on the same server containing "rss", "rdf", "xml", or + "atom" + + 5. links to feeds on external servers ending in ".rss", ".rdf", ".xml", + or ".atom" + + 6. links to feeds on external servers containing "rss", "rdf", "xml", or + "atom" + + 7. Try some guesses about common places for feeds. (index.xml, atom.xml, + etc.) + + (At every step, feeds are minimally verified to make sure they are really + feeds.) + + If `all` is True then return all possible feeds, kinda sorta ordered in + terms of goodness. Otherwise, we stop as soon as one of the above steps + finds a likely feed. + """ + if _recurs is None: + _recurs = [uri] + fulluri = makeFullURI(uri) + try: + data = _gatekeeper.get(fulluri, check=False) + except: + return [] + + # is this already a feed? + if could_be_feed_data(data): + return [fulluri] + + newuri = try_broken_redirect(data) + if newuri and newuri not in _recurs: + _recurs.append(newuri) + return feeds(newuri, all=all, _recurs=_recurs) + + # nope, it's a page, try LINK tags first + parser = HtmlBasedParser(fulluri) + parser.feed(data) + + outfeeds = [link for link in parser.link_links if is_feed(link)] + LOG.info(f"found {len(outfeeds)} through LINK tags") + + if all or len(outfeeds) == 0: + # no LINK tags, look for regular links that point to feeds + if not all: + LOG.info("no LINK tags, looking at A tags") + + local_links, remote_links = classify_links(parser.a_links, fulluri) + + # look for obvious feed links on the same server + outfeeds.extend(filter(is_feed, filter(is_feed_link, local_links))) + if all or len(outfeeds) == 0: + # look harder for feed links on the same server + outfeeds.extend(filter(is_feed, filter(is_XML_related_link, local_links))) + + if all or len(outfeeds) == 0: + # look for obvious feed links on another server + outfeeds.extend(filter(is_feed, filter(is_feed_link, remote_links))) + + if all or len(outfeeds) == 0: + # look harder for feed links on another server + outfeeds.extend(filter(is_feed, filter(is_XML_related_link, remote_links))) + + if all or len(outfeeds) == 0: + LOG.debug("no A tags, guessing") + suffixes = [ # filenames used by popular software: + "atom.xml", # blogger, TypePad + "index.atom", # MT, apparently + "index.rdf", # MT + "rss.xml", # Dave Winer/Manila + "index.xml", # MT + "index.rss", # Slash + ] + outfeeds.extend( + filter(is_feed, [urllib.parse.urljoin(fulluri, x) for x in suffixes]) + ) + + return list(set(outfeeds)) + + +##### test harness ###### + + +def test(): + uri = "http://diveintomark.org/tests/client/autodiscovery/html4-001.html" + failed = [] + count = 0 + while 1: + data = _gatekeeper.get(uri) + if data.find("Atom autodiscovery test") == -1: + break + sys.stdout.write(".") + sys.stdout.flush() + count += 1 + links = getLinks(data, uri) + if not links: + print(f"\n*** FAILED *** {uri} could not find link") + failed.append(uri) + elif len(links) > 1: + print(f"\n*** FAILED *** {uri} found too many links") + failed.append(uri) + else: + atomdata = requests.get(links[0]).text + if atomdata.find('