Remove the feedfinder import

We have our own version now. The only difference is it doesn't respect
robots. I think this might be OK?
This commit is contained in:
John Doty 2024-07-27 09:53:40 -07:00
parent ed2587816c
commit eab6cf609d
2 changed files with 14 additions and 300 deletions

View file

@ -5,7 +5,6 @@ import logging
import click
from . import feed
from . import feedfinder
from . import database
from . import opml
from . import web
@ -38,9 +37,17 @@ def cli(verbose):
def search(url):
"Search an URL for feeds."
# TODO: Rewrite to use our new one
feeds = feedfinder.find_feeds(url)
for feed in feeds:
click.echo(feed)
feeds = asyncio.run(feed.feed_search(url))
if len(feeds) == 0:
click.echo(f"No feeds found for {url}")
return 1
max_url = max(len(f.meta.url) for f in feeds)
max_title = max(len(f.title) for f in feeds)
for f in feeds:
click.echo(
f"{f.meta.url:{max_url}} {f.title:{max_title}} ({len(f.entries)} entries)"
)
click.echo(f"Found {len(feeds)} feeds")
@ -218,6 +225,9 @@ def list_feeds(pattern):
"""
db = database.Database.local()
feeds = db.load_all(feed_limit=0, pattern=pattern)
if len(feeds) == 0:
click.echo("Not subscribed to any feeds.")
return 0
max_title = max(len(f.title) for f in feeds)
max_url = max(len(f.meta.url) for f in feeds)

View file

@ -1,296 +0,0 @@
"""feedfinder: Find the Web feed for a Web page
Based on http://www.aaronsw.com/2002/feedfinder/
Rewritted by John Doty for the Python3 and the cry aggregator, but the basic
frame remains. The big thing *this* does is also return the FeedMeta when it
has found feeds, instead of just URLs. This is more useful for the rest of
processing.
"""
import logging
import re
import sys
import typing
import urllib
import urllib.parse
import urllib.request
import urllib.robotparser
import requests
from . import feed
LOG = logging.getLogger(__name__)
class URLGatekeeper:
"""a class to track robots.txt rules across multiple servers"""
def __init__(self):
self.rpcache = {} # a dictionary of RobotFileParser objects, by domain
self.agent = f"cry/0.9"
self.session = requests.Session()
self.session.headers["user-agent"] = self.agent
LOG.debug(f"User agent: {self.agent}")
def _getrp(self, url):
protocol, domain = urllib.parse.urlparse(url)[:2]
if domain in self.rpcache:
return self.rpcache[domain]
baseurl = "%s://%s" % (protocol, domain)
robotsurl = urllib.parse.urljoin(baseurl, "robots.txt")
rp = urllib.robotparser.RobotFileParser(robotsurl)
try:
response = self.session.get(robotsurl)
lines = response.text.splitlines()
rp.parse(lines)
except:
pass
self.rpcache[domain] = rp
return rp
def can_fetch(self, url):
rp = self._getrp(url)
allow = rp.can_fetch(self.agent, url)
LOG.debug(f"gatekeeper of {url} says {allow}")
return allow
def get(self, url, check=True):
if check and not self.can_fetch(url):
return ""
try:
return self.session.get(url, timeout=10).text
except:
return ""
_gatekeeper = URLGatekeeper()
import html.parser
class HtmlBasedParser(html.parser.HTMLParser):
FEED_TYPES = (
"application/rss+xml",
"text/xml",
"application/atom+xml",
"application/x.atom+xml",
"application/x-atom+xml",
)
link_links: list[str]
a_links: list[str]
def __init__(self, baseuri):
super().__init__()
self.baseuri = baseuri
self.link_links = []
self.a_links = []
def handle_starttag(self, tag, attrs):
attrs = {k: v for k, v in attrs}
if tag == "base":
self.do_base(attrs)
elif tag == "link":
self.do_link(attrs)
elif tag == "a":
self.do_a(attrs)
def do_base(self, attrs):
base = attrs.get("href")
if base is not None:
self.baseuri = base
def do_link(self, attrs):
rel = attrs.get("rel")
if rel is None:
return
if "alternate" not in rel.split():
return
if attrs.get("type", "").lower() not in self.FEED_TYPES:
return
href = attrs.get("href")
if href is None:
return
self.link_links.append(urllib.parse.urljoin(self.baseuri, href))
def do_a(self, attrs):
href = attrs.get("href")
if href is None:
return
self.a_links.append(urllib.parse.urljoin(self.baseuri, href))
def makeFullURI(uri: str) -> str:
uri = uri.strip()
if uri.startswith("feed://"):
uri = "http://" + uri.split("feed://", 1).pop()
for x in ["http", "https"]:
if uri.startswith("%s://" % x):
return uri
return "http://%s" % uri
def classify_links(links, baseuri) -> typing.Tuple[list[str], list[str]]:
"""Split the links into two sets: local (which start with baseuri) and
remote (which don't).
"""
baseuri = baseuri.lower()
local, remote = [], []
for link in links:
if link.lower().startswith(baseuri):
local.append(link)
else:
remote.append(link)
return local, remote
def is_feed_link(link: str) -> bool:
"""Return True if the link seems to be a feed link, or False otherwise."""
link = link.lower()
return (
link.endswith(".rss")
or link.endswith(".rdf")
or link.endswith(".xml")
or link.endswith(".atom")
)
def is_XML_related_link(link: str) -> bool:
link = link.lower()
return "rss" in link or "rdf" in link or "xml" in link or "atom" in link
r_brokenRedirect = re.compile("<newLocation[^>]*>(.*?)</newLocation>", re.S)
def try_broken_redirect(data) -> str | None:
"""See if the content is a 'broken redirect'.
This is in the code taken from aaronsw and I don't know what, if anything,
ever generated this.
"""
if "<newLocation" in data:
newuris = r_brokenRedirect.findall(data)
if newuris:
return newuris[0].strip()
def could_be_feed_data(data: str) -> bool:
"""See if the data might be a feed."""
data = data.lower()
if data.count("<html"):
return False
return (data.count("<rss") + data.count("<rdf") + data.count("<feed")) > 0
def is_feed(uri: str) -> bool:
"""See if the data at `uri` might be a feed."""
LOG.debug(f"seeing if {uri} is a feed")
protocol = urllib.parse.urlparse(uri)
if protocol[0] not in ("http", "https"):
return False
data = _gatekeeper.get(uri)
return could_be_feed_data(data)
def find_feeds(uri: str, all: bool = False, _recurs=None) -> list[str]:
"""Find feeds for the given URI.
How it works:
1. If the URI points to a feed, it is simply returned; otherwise
the page is downloaded and the real fun begins.
2. Feeds pointed to by LINK tags in the header of the page (autodiscovery)
3. <A> links to feeds on the same server ending in ".rss", ".rdf", ".xml",
or ".atom"
4. <A> links to feeds on the same server containing "rss", "rdf", "xml", or
"atom"
5. <A> links to feeds on external servers ending in ".rss", ".rdf", ".xml",
or ".atom"
6. <A> links to feeds on external servers containing "rss", "rdf", "xml", or
"atom"
7. Try some guesses about common places for feeds. (index.xml, atom.xml,
etc.)
(At every step, feeds are minimally verified to make sure they are really
feeds.)
If `all` is True then return all possible feeds, kinda sorta ordered in
terms of goodness. Otherwise, we stop as soon as one of the above steps
finds a likely feed.
"""
if _recurs is None:
_recurs = [uri]
fulluri = makeFullURI(uri)
try:
data = _gatekeeper.get(fulluri, check=False)
except:
return []
# is this already a feed?
if could_be_feed_data(data):
return [fulluri]
newuri = try_broken_redirect(data)
if newuri and newuri not in _recurs:
_recurs.append(newuri)
return feeds(newuri, all=all, _recurs=_recurs)
# nope, it's a page, try LINK tags first
parser = HtmlBasedParser(fulluri)
parser.feed(data)
outfeeds = [link for link in parser.link_links if is_feed(link)]
LOG.info(f"found {len(outfeeds)} through LINK tags")
if all or len(outfeeds) == 0:
# no LINK tags, look for regular <A> links that point to feeds
if not all:
LOG.info("no LINK tags, looking at A tags")
local_links, remote_links = classify_links(parser.a_links, fulluri)
# look for obvious feed links on the same server
outfeeds.extend(filter(is_feed, filter(is_feed_link, local_links)))
if all or len(outfeeds) == 0:
# look harder for feed links on the same server
outfeeds.extend(filter(is_feed, filter(is_XML_related_link, local_links)))
if all or len(outfeeds) == 0:
# look for obvious feed links on another server
outfeeds.extend(filter(is_feed, filter(is_feed_link, remote_links)))
if all or len(outfeeds) == 0:
# look harder for feed links on another server
outfeeds.extend(filter(is_feed, filter(is_XML_related_link, remote_links)))
if all or len(outfeeds) == 0:
LOG.debug("no A tags, guessing")
suffixes = [ # filenames used by popular software:
"atom.xml", # blogger, TypePad
"index.atom", # MT, apparently
"index.rdf", # MT
"rss.xml", # Dave Winer/Manila
"index.xml", # MT
"index.rss", # Slash
]
outfeeds.extend(
filter(is_feed, [urllib.parse.urljoin(fulluri, x) for x in suffixes])
)
return list(set(outfeeds))