cry/cry/feedfinder.py
John Doty cd20db0c4c Subscribe now searches
Rewrite feed finder again to not multi-fetch when not needed
2024-07-15 17:23:24 +09:00

296 lines
8.4 KiB
Python

"""feedfinder: Find the Web feed for a Web page
Based on http://www.aaronsw.com/2002/feedfinder/
Rewritted by John Doty for the Python3 and the cry aggregator, but the basic
frame remains. The big thing *this* does is also return the FeedMeta when it
has found feeds, instead of just URLs. This is more useful for the rest of
processing.
"""
import logging
import re
import sys
import typing
import urllib
import urllib.parse
import urllib.request
import urllib.robotparser
import requests
from . import feed
LOG = logging.getLogger(__name__)
class URLGatekeeper:
"""a class to track robots.txt rules across multiple servers"""
def __init__(self):
self.rpcache = {} # a dictionary of RobotFileParser objects, by domain
self.agent = f"cry/0.9"
self.session = requests.Session()
self.session.headers["user-agent"] = self.agent
LOG.debug(f"User agent: {self.agent}")
def _getrp(self, url):
protocol, domain = urllib.parse.urlparse(url)[:2]
if domain in self.rpcache:
return self.rpcache[domain]
baseurl = "%s://%s" % (protocol, domain)
robotsurl = urllib.parse.urljoin(baseurl, "robots.txt")
rp = urllib.robotparser.RobotFileParser(robotsurl)
try:
response = self.session.get(robotsurl)
lines = response.text.splitlines()
rp.parse(lines)
except:
pass
self.rpcache[domain] = rp
return rp
def can_fetch(self, url):
rp = self._getrp(url)
allow = rp.can_fetch(self.agent, url)
LOG.debug(f"gatekeeper of {url} says {allow}")
return allow
def get(self, url, check=True):
if check and not self.can_fetch(url):
return ""
try:
return self.session.get(url, timeout=10).text
except:
return ""
_gatekeeper = URLGatekeeper()
import html.parser
class HtmlBasedParser(html.parser.HTMLParser):
FEED_TYPES = (
"application/rss+xml",
"text/xml",
"application/atom+xml",
"application/x.atom+xml",
"application/x-atom+xml",
)
link_links: list[str]
a_links: list[str]
def __init__(self, baseuri):
super().__init__()
self.baseuri = baseuri
self.link_links = []
self.a_links = []
def handle_starttag(self, tag, attrs):
attrs = {k: v for k, v in attrs}
if tag == "base":
self.do_base(attrs)
elif tag == "link":
self.do_link(attrs)
elif tag == "a":
self.do_a(attrs)
def do_base(self, attrs):
base = attrs.get("href")
if base is not None:
self.baseuri = base
def do_link(self, attrs):
rel = attrs.get("rel")
if rel is None:
return
if "alternate" not in rel.split():
return
if attrs.get("type", "").lower() not in self.FEED_TYPES:
return
href = attrs.get("href")
if href is None:
return
self.link_links.append(urllib.parse.urljoin(self.baseuri, href))
def do_a(self, attrs):
href = attrs.get("href")
if href is None:
return
self.a_links.append(urllib.parse.urljoin(self.baseuri, href))
def makeFullURI(uri: str) -> str:
uri = uri.strip()
if uri.startswith("feed://"):
uri = "http://" + uri.split("feed://", 1).pop()
for x in ["http", "https"]:
if uri.startswith("%s://" % x):
return uri
return "http://%s" % uri
def classify_links(links, baseuri) -> typing.Tuple[list[str], list[str]]:
"""Split the links into two sets: local (which start with baseuri) and
remote (which don't).
"""
baseuri = baseuri.lower()
local, remote = [], []
for link in links:
if link.lower().startswith(baseuri):
local.append(link)
else:
remote.append(link)
return local, remote
def is_feed_link(link: str) -> bool:
"""Return True if the link seems to be a feed link, or False otherwise."""
link = link.lower()
return (
link.endswith(".rss")
or link.endswith(".rdf")
or link.endswith(".xml")
or link.endswith(".atom")
)
def is_XML_related_link(link: str) -> bool:
link = link.lower()
return "rss" in link or "rdf" in link or "xml" in link or "atom" in link
r_brokenRedirect = re.compile("<newLocation[^>]*>(.*?)</newLocation>", re.S)
def try_broken_redirect(data) -> str | None:
"""See if the content is a 'broken redirect'.
This is in the code taken from aaronsw and I don't know what, if anything,
ever generated this.
"""
if "<newLocation" in data:
newuris = r_brokenRedirect.findall(data)
if newuris:
return newuris[0].strip()
def could_be_feed_data(data: str) -> bool:
"""See if the data might be a feed."""
data = data.lower()
if data.count("<html"):
return False
return (data.count("<rss") + data.count("<rdf") + data.count("<feed")) > 0
def is_feed(uri: str) -> bool:
"""See if the data at `uri` might be a feed."""
LOG.debug(f"seeing if {uri} is a feed")
protocol = urllib.parse.urlparse(uri)
if protocol[0] not in ("http", "https"):
return False
data = _gatekeeper.get(uri)
return could_be_feed_data(data)
def find_feeds(uri: str, all: bool = False, _recurs=None) -> list[str]:
"""Find feeds for the given URI.
How it works:
1. If the URI points to a feed, it is simply returned; otherwise
the page is downloaded and the real fun begins.
2. Feeds pointed to by LINK tags in the header of the page (autodiscovery)
3. <A> links to feeds on the same server ending in ".rss", ".rdf", ".xml",
or ".atom"
4. <A> links to feeds on the same server containing "rss", "rdf", "xml", or
"atom"
5. <A> links to feeds on external servers ending in ".rss", ".rdf", ".xml",
or ".atom"
6. <A> links to feeds on external servers containing "rss", "rdf", "xml", or
"atom"
7. Try some guesses about common places for feeds. (index.xml, atom.xml,
etc.)
(At every step, feeds are minimally verified to make sure they are really
feeds.)
If `all` is True then return all possible feeds, kinda sorta ordered in
terms of goodness. Otherwise, we stop as soon as one of the above steps
finds a likely feed.
"""
if _recurs is None:
_recurs = [uri]
fulluri = makeFullURI(uri)
try:
data = _gatekeeper.get(fulluri, check=False)
except:
return []
# is this already a feed?
if could_be_feed_data(data):
return [fulluri]
newuri = try_broken_redirect(data)
if newuri and newuri not in _recurs:
_recurs.append(newuri)
return feeds(newuri, all=all, _recurs=_recurs)
# nope, it's a page, try LINK tags first
parser = HtmlBasedParser(fulluri)
parser.feed(data)
outfeeds = [link for link in parser.link_links if is_feed(link)]
LOG.info(f"found {len(outfeeds)} through LINK tags")
if all or len(outfeeds) == 0:
# no LINK tags, look for regular <A> links that point to feeds
if not all:
LOG.info("no LINK tags, looking at A tags")
local_links, remote_links = classify_links(parser.a_links, fulluri)
# look for obvious feed links on the same server
outfeeds.extend(filter(is_feed, filter(is_feed_link, local_links)))
if all or len(outfeeds) == 0:
# look harder for feed links on the same server
outfeeds.extend(filter(is_feed, filter(is_XML_related_link, local_links)))
if all or len(outfeeds) == 0:
# look for obvious feed links on another server
outfeeds.extend(filter(is_feed, filter(is_feed_link, remote_links)))
if all or len(outfeeds) == 0:
# look harder for feed links on another server
outfeeds.extend(filter(is_feed, filter(is_XML_related_link, remote_links)))
if all or len(outfeeds) == 0:
LOG.debug("no A tags, guessing")
suffixes = [ # filenames used by popular software:
"atom.xml", # blogger, TypePad
"index.atom", # MT, apparently
"index.rdf", # MT
"rss.xml", # Dave Winer/Manila
"index.xml", # MT
"index.rss", # Slash
]
outfeeds.extend(
filter(is_feed, [urllib.parse.urljoin(fulluri, x) for x in suffixes])
)
return list(set(outfeeds))