Feed finder (basic)
This commit is contained in:
parent
33b998236b
commit
474e8656e2
2 changed files with 361 additions and 0 deletions
11
cry/cli.py
11
cry/cli.py
|
|
@ -8,6 +8,7 @@ import logging
|
|||
import click
|
||||
|
||||
from . import feed
|
||||
from . import feedfinder
|
||||
from . import database
|
||||
from . import opml
|
||||
|
||||
|
|
@ -34,6 +35,16 @@ def cli(verbose):
|
|||
logging.basicConfig(level=level)
|
||||
|
||||
|
||||
@cli.command(name="search")
|
||||
@click.argument("url")
|
||||
def search(url):
|
||||
"Search an URL for feeds."
|
||||
feeds = feedfinder.find_feeds(url)
|
||||
for feed in feeds:
|
||||
click.echo(feed)
|
||||
click.echo(f"Found {len(feeds)} feeds")
|
||||
|
||||
|
||||
@cli.command(name="subscribe")
|
||||
@click.argument("url")
|
||||
def subscribe(url):
|
||||
|
|
|
|||
350
cry/feedfinder.py
Normal file
350
cry/feedfinder.py
Normal file
|
|
@ -0,0 +1,350 @@
|
|||
"""feedfinder: Find the Web feed for a Web page
|
||||
|
||||
Based on http://www.aaronsw.com/2002/feedfinder/
|
||||
|
||||
Kinda rewritten by John Doty for the Python3 and the cry aggregator, but the
|
||||
basic frame remains.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
import sys
|
||||
import typing
|
||||
import urllib
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
import urllib.robotparser
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class URLGatekeeper:
|
||||
"""a class to track robots.txt rules across multiple servers"""
|
||||
|
||||
def __init__(self):
|
||||
self.rpcache = {} # a dictionary of RobotFileParser objects, by domain
|
||||
self.agent = f"cry/0.9"
|
||||
self.session = requests.Session()
|
||||
self.session.headers["user-agent"] = self.agent
|
||||
LOG.debug(f"User agent: {self.agent}")
|
||||
|
||||
def _getrp(self, url):
|
||||
protocol, domain = urllib.parse.urlparse(url)[:2]
|
||||
if domain in self.rpcache:
|
||||
return self.rpcache[domain]
|
||||
baseurl = "%s://%s" % (protocol, domain)
|
||||
robotsurl = urllib.parse.urljoin(baseurl, "robots.txt")
|
||||
|
||||
rp = urllib.robotparser.RobotFileParser(robotsurl)
|
||||
try:
|
||||
response = self.session.get(robotsurl)
|
||||
lines = response.text.splitlines()
|
||||
rp.parse(lines)
|
||||
except:
|
||||
pass
|
||||
self.rpcache[domain] = rp
|
||||
return rp
|
||||
|
||||
def can_fetch(self, url):
|
||||
rp = self._getrp(url)
|
||||
allow = rp.can_fetch(self.agent, url)
|
||||
LOG.debug(f"gatekeeper of {url} says {allow}")
|
||||
return allow
|
||||
|
||||
def get(self, url, check=True):
|
||||
if check and not self.can_fetch(url):
|
||||
return ""
|
||||
try:
|
||||
return self.session.get(url, timeout=10).text
|
||||
except:
|
||||
return ""
|
||||
|
||||
|
||||
_gatekeeper = URLGatekeeper()
|
||||
|
||||
import html.parser
|
||||
|
||||
|
||||
class HtmlBasedParser(html.parser.HTMLParser):
|
||||
FEED_TYPES = (
|
||||
"application/rss+xml",
|
||||
"text/xml",
|
||||
"application/atom+xml",
|
||||
"application/x.atom+xml",
|
||||
"application/x-atom+xml",
|
||||
)
|
||||
|
||||
link_links: list[str]
|
||||
a_links: list[str]
|
||||
|
||||
def __init__(self, baseuri):
|
||||
super().__init__()
|
||||
self.baseuri = baseuri
|
||||
self.link_links = []
|
||||
self.a_links = []
|
||||
|
||||
def handle_starttag(self, tag, attrs):
|
||||
attrs = {k: v for k, v in attrs}
|
||||
if tag == "base":
|
||||
self.do_base(attrs)
|
||||
elif tag == "link":
|
||||
self.do_link(attrs)
|
||||
elif tag == "a":
|
||||
self.do_a(attrs)
|
||||
|
||||
def do_base(self, attrs):
|
||||
base = attrs.get("href")
|
||||
if base is not None:
|
||||
self.baseuri = base
|
||||
|
||||
def do_link(self, attrs):
|
||||
rel = attrs.get("rel")
|
||||
if rel is None:
|
||||
return
|
||||
|
||||
if "alternate" not in rel.split():
|
||||
return
|
||||
|
||||
if attrs.get("type", "").lower() not in self.FEED_TYPES:
|
||||
return
|
||||
|
||||
href = attrs.get("href")
|
||||
if href is None:
|
||||
return
|
||||
|
||||
self.link_links.append(urllib.parse.urljoin(self.baseuri, href))
|
||||
|
||||
def do_a(self, attrs):
|
||||
href = attrs.get("href")
|
||||
if href is None:
|
||||
return
|
||||
|
||||
self.a_links.append(urllib.parse.urljoin(self.baseuri, href))
|
||||
|
||||
|
||||
def makeFullURI(uri):
|
||||
uri = uri.strip()
|
||||
if uri.startswith("feed://"):
|
||||
uri = "http://" + uri.split("feed://", 1).pop()
|
||||
for x in ["http", "https"]:
|
||||
if uri.startswith("%s://" % x):
|
||||
return uri
|
||||
return "http://%s" % uri
|
||||
|
||||
|
||||
def classify_links(links, baseuri) -> typing.Tuple[list[str], list[str]]:
|
||||
"""Split the links into two sets: local (which start with baseuri) and
|
||||
remote (which don't).
|
||||
"""
|
||||
baseuri = baseuri.lower()
|
||||
|
||||
local, remote = [], []
|
||||
for link in links:
|
||||
if link.lower().startswith(baseuri):
|
||||
local.append(link)
|
||||
else:
|
||||
remote.append(link)
|
||||
|
||||
return local, remote
|
||||
|
||||
|
||||
def is_feed_link(link: str) -> bool:
|
||||
"""Return True if the link seems to be a feed link, or False otherwise."""
|
||||
link = link.lower()
|
||||
return (
|
||||
link.endswith(".rss")
|
||||
or link.endswith(".rdf")
|
||||
or link.endswith(".xml")
|
||||
or link.endswith(".atom")
|
||||
)
|
||||
|
||||
|
||||
def is_XML_related_link(link: str) -> bool:
|
||||
link = link.lower()
|
||||
return "rss" in link or "rdf" in link or "xml" in link or "atom" in link
|
||||
|
||||
|
||||
r_brokenRedirect = re.compile("<newLocation[^>]*>(.*?)</newLocation>", re.S)
|
||||
|
||||
|
||||
def try_broken_redirect(data) -> str | None:
|
||||
"""See if the content is a 'broken redirect'.
|
||||
|
||||
This is in the code taken from aaronsw and I don't know what, if anything,
|
||||
ever generated this.
|
||||
"""
|
||||
if "<newLocation" in data:
|
||||
newuris = r_brokenRedirect.findall(data)
|
||||
if newuris:
|
||||
return newuris[0].strip()
|
||||
|
||||
|
||||
def could_be_feed_data(data: str) -> bool:
|
||||
"""See if the data might be a feed."""
|
||||
data = data.lower()
|
||||
if data.count("<html"):
|
||||
return False
|
||||
return (data.count("<rss") + data.count("<rdf") + data.count("<feed")) > 0
|
||||
|
||||
|
||||
def is_feed(uri: str) -> bool:
|
||||
"""See if the data at `uri` might be a feed."""
|
||||
LOG.debug(f"seeing if {uri} is a feed")
|
||||
protocol = urllib.parse.urlparse(uri)
|
||||
if protocol[0] not in ("http", "https"):
|
||||
return False
|
||||
data = _gatekeeper.get(uri)
|
||||
return could_be_feed_data(data)
|
||||
|
||||
|
||||
def find_feeds(uri: str, all: bool = False, _recurs=None) -> list[str]:
|
||||
"""Find feeds for the given URI.
|
||||
|
||||
How it works:
|
||||
0.
|
||||
|
||||
1. If the URI points to a feed, it is simply returned; otherwise
|
||||
the page is downloaded and the real fun begins.
|
||||
|
||||
2. Feeds pointed to by LINK tags in the header of the page (autodiscovery)
|
||||
|
||||
3. <A> links to feeds on the same server ending in ".rss", ".rdf", ".xml",
|
||||
or ".atom"
|
||||
|
||||
4. <A> links to feeds on the same server containing "rss", "rdf", "xml", or
|
||||
"atom"
|
||||
|
||||
5. <A> links to feeds on external servers ending in ".rss", ".rdf", ".xml",
|
||||
or ".atom"
|
||||
|
||||
6. <A> links to feeds on external servers containing "rss", "rdf", "xml", or
|
||||
"atom"
|
||||
|
||||
7. Try some guesses about common places for feeds. (index.xml, atom.xml,
|
||||
etc.)
|
||||
|
||||
(At every step, feeds are minimally verified to make sure they are really
|
||||
feeds.)
|
||||
|
||||
If `all` is True then return all possible feeds, kinda sorta ordered in
|
||||
terms of goodness. Otherwise, we stop as soon as one of the above steps
|
||||
finds a likely feed.
|
||||
"""
|
||||
if _recurs is None:
|
||||
_recurs = [uri]
|
||||
fulluri = makeFullURI(uri)
|
||||
try:
|
||||
data = _gatekeeper.get(fulluri, check=False)
|
||||
except:
|
||||
return []
|
||||
|
||||
# is this already a feed?
|
||||
if could_be_feed_data(data):
|
||||
return [fulluri]
|
||||
|
||||
newuri = try_broken_redirect(data)
|
||||
if newuri and newuri not in _recurs:
|
||||
_recurs.append(newuri)
|
||||
return feeds(newuri, all=all, _recurs=_recurs)
|
||||
|
||||
# nope, it's a page, try LINK tags first
|
||||
parser = HtmlBasedParser(fulluri)
|
||||
parser.feed(data)
|
||||
|
||||
outfeeds = [link for link in parser.link_links if is_feed(link)]
|
||||
LOG.info(f"found {len(outfeeds)} through LINK tags")
|
||||
|
||||
if all or len(outfeeds) == 0:
|
||||
# no LINK tags, look for regular <A> links that point to feeds
|
||||
if not all:
|
||||
LOG.info("no LINK tags, looking at A tags")
|
||||
|
||||
local_links, remote_links = classify_links(parser.a_links, fulluri)
|
||||
|
||||
# look for obvious feed links on the same server
|
||||
outfeeds.extend(filter(is_feed, filter(is_feed_link, local_links)))
|
||||
if all or len(outfeeds) == 0:
|
||||
# look harder for feed links on the same server
|
||||
outfeeds.extend(filter(is_feed, filter(is_XML_related_link, local_links)))
|
||||
|
||||
if all or len(outfeeds) == 0:
|
||||
# look for obvious feed links on another server
|
||||
outfeeds.extend(filter(is_feed, filter(is_feed_link, remote_links)))
|
||||
|
||||
if all or len(outfeeds) == 0:
|
||||
# look harder for feed links on another server
|
||||
outfeeds.extend(filter(is_feed, filter(is_XML_related_link, remote_links)))
|
||||
|
||||
if all or len(outfeeds) == 0:
|
||||
LOG.debug("no A tags, guessing")
|
||||
suffixes = [ # filenames used by popular software:
|
||||
"atom.xml", # blogger, TypePad
|
||||
"index.atom", # MT, apparently
|
||||
"index.rdf", # MT
|
||||
"rss.xml", # Dave Winer/Manila
|
||||
"index.xml", # MT
|
||||
"index.rss", # Slash
|
||||
]
|
||||
outfeeds.extend(
|
||||
filter(is_feed, [urllib.parse.urljoin(fulluri, x) for x in suffixes])
|
||||
)
|
||||
|
||||
return list(set(outfeeds))
|
||||
|
||||
|
||||
##### test harness ######
|
||||
|
||||
|
||||
def test():
|
||||
uri = "http://diveintomark.org/tests/client/autodiscovery/html4-001.html"
|
||||
failed = []
|
||||
count = 0
|
||||
while 1:
|
||||
data = _gatekeeper.get(uri)
|
||||
if data.find("Atom autodiscovery test") == -1:
|
||||
break
|
||||
sys.stdout.write(".")
|
||||
sys.stdout.flush()
|
||||
count += 1
|
||||
links = getLinks(data, uri)
|
||||
if not links:
|
||||
print(f"\n*** FAILED *** {uri} could not find link")
|
||||
failed.append(uri)
|
||||
elif len(links) > 1:
|
||||
print(f"\n*** FAILED *** {uri} found too many links")
|
||||
failed.append(uri)
|
||||
else:
|
||||
atomdata = requests.get(links[0]).text
|
||||
if atomdata.find('<link rel="alternate"') == -1:
|
||||
print(f"\n*** FAILED *** {uri} retrieved something that is not a feed")
|
||||
failed.append(uri)
|
||||
else:
|
||||
backlink = atomdata.split('href="').pop().split('"')[0]
|
||||
if backlink != uri:
|
||||
print(f"\n*** FAILED *** {uri} retrieved wrong feed")
|
||||
failed.append(uri)
|
||||
if data.find('<link rel="next" href="') == -1:
|
||||
break
|
||||
uri = urllib.parse.urljoin(
|
||||
uri, data.split('<link rel="next" href="').pop().split('"')[0]
|
||||
)
|
||||
print()
|
||||
print(f"{count} tests executed, {len(failed)} failed")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = sys.argv[1:]
|
||||
if args and args[0] == "--debug":
|
||||
_debug = 1
|
||||
args.pop(0)
|
||||
if args:
|
||||
uri = args[0]
|
||||
else:
|
||||
uri = "http://diveintomark.org/"
|
||||
if uri == "test":
|
||||
test()
|
||||
else:
|
||||
print("\n".join(getFeeds(uri)))
|
||||
Loading…
Add table
Add a link
Reference in a new issue