From eab6cf609d5b7a9b9a675ca177f73b93bf90574a Mon Sep 17 00:00:00 2001 From: John Doty Date: Sat, 27 Jul 2024 09:53:40 -0700 Subject: [PATCH] Remove the feedfinder import We have our own version now. The only difference is it doesn't respect robots. I think this might be OK? --- cry/cli.py | 18 ++- cry/feedfinder.py | 296 ---------------------------------------------- 2 files changed, 14 insertions(+), 300 deletions(-) delete mode 100644 cry/feedfinder.py diff --git a/cry/cli.py b/cry/cli.py index 2158738..50d5e5a 100644 --- a/cry/cli.py +++ b/cry/cli.py @@ -5,7 +5,6 @@ import logging import click from . import feed -from . import feedfinder from . import database from . import opml from . import web @@ -38,9 +37,17 @@ def cli(verbose): def search(url): "Search an URL for feeds." # TODO: Rewrite to use our new one - feeds = feedfinder.find_feeds(url) - for feed in feeds: - click.echo(feed) + feeds = asyncio.run(feed.feed_search(url)) + if len(feeds) == 0: + click.echo(f"No feeds found for {url}") + return 1 + + max_url = max(len(f.meta.url) for f in feeds) + max_title = max(len(f.title) for f in feeds) + for f in feeds: + click.echo( + f"{f.meta.url:{max_url}} {f.title:{max_title}} ({len(f.entries)} entries)" + ) click.echo(f"Found {len(feeds)} feeds") @@ -218,6 +225,9 @@ def list_feeds(pattern): """ db = database.Database.local() feeds = db.load_all(feed_limit=0, pattern=pattern) + if len(feeds) == 0: + click.echo("Not subscribed to any feeds.") + return 0 max_title = max(len(f.title) for f in feeds) max_url = max(len(f.meta.url) for f in feeds) diff --git a/cry/feedfinder.py b/cry/feedfinder.py deleted file mode 100644 index bc2f14e..0000000 --- a/cry/feedfinder.py +++ /dev/null @@ -1,296 +0,0 @@ -"""feedfinder: Find the Web feed for a Web page - -Based on http://www.aaronsw.com/2002/feedfinder/ - -Rewritted by John Doty for the Python3 and the cry aggregator, but the basic -frame remains. The big thing *this* does is also return the FeedMeta when it -has found feeds, instead of just URLs. This is more useful for the rest of -processing. -""" - -import logging -import re -import sys -import typing -import urllib -import urllib.parse -import urllib.request -import urllib.robotparser - -import requests - -from . import feed - -LOG = logging.getLogger(__name__) - - -class URLGatekeeper: - """a class to track robots.txt rules across multiple servers""" - - def __init__(self): - self.rpcache = {} # a dictionary of RobotFileParser objects, by domain - self.agent = f"cry/0.9" - self.session = requests.Session() - self.session.headers["user-agent"] = self.agent - LOG.debug(f"User agent: {self.agent}") - - def _getrp(self, url): - protocol, domain = urllib.parse.urlparse(url)[:2] - if domain in self.rpcache: - return self.rpcache[domain] - baseurl = "%s://%s" % (protocol, domain) - robotsurl = urllib.parse.urljoin(baseurl, "robots.txt") - - rp = urllib.robotparser.RobotFileParser(robotsurl) - try: - response = self.session.get(robotsurl) - lines = response.text.splitlines() - rp.parse(lines) - except: - pass - self.rpcache[domain] = rp - return rp - - def can_fetch(self, url): - rp = self._getrp(url) - allow = rp.can_fetch(self.agent, url) - LOG.debug(f"gatekeeper of {url} says {allow}") - return allow - - def get(self, url, check=True): - if check and not self.can_fetch(url): - return "" - try: - return self.session.get(url, timeout=10).text - except: - return "" - - -_gatekeeper = URLGatekeeper() - -import html.parser - - -class HtmlBasedParser(html.parser.HTMLParser): - FEED_TYPES = ( - "application/rss+xml", - "text/xml", - "application/atom+xml", - "application/x.atom+xml", - "application/x-atom+xml", - ) - - link_links: list[str] - a_links: list[str] - - def __init__(self, baseuri): - super().__init__() - self.baseuri = baseuri - self.link_links = [] - self.a_links = [] - - def handle_starttag(self, tag, attrs): - attrs = {k: v for k, v in attrs} - if tag == "base": - self.do_base(attrs) - elif tag == "link": - self.do_link(attrs) - elif tag == "a": - self.do_a(attrs) - - def do_base(self, attrs): - base = attrs.get("href") - if base is not None: - self.baseuri = base - - def do_link(self, attrs): - rel = attrs.get("rel") - if rel is None: - return - - if "alternate" not in rel.split(): - return - - if attrs.get("type", "").lower() not in self.FEED_TYPES: - return - - href = attrs.get("href") - if href is None: - return - - self.link_links.append(urllib.parse.urljoin(self.baseuri, href)) - - def do_a(self, attrs): - href = attrs.get("href") - if href is None: - return - - self.a_links.append(urllib.parse.urljoin(self.baseuri, href)) - - -def makeFullURI(uri: str) -> str: - uri = uri.strip() - if uri.startswith("feed://"): - uri = "http://" + uri.split("feed://", 1).pop() - for x in ["http", "https"]: - if uri.startswith("%s://" % x): - return uri - return "http://%s" % uri - - -def classify_links(links, baseuri) -> typing.Tuple[list[str], list[str]]: - """Split the links into two sets: local (which start with baseuri) and - remote (which don't). - """ - baseuri = baseuri.lower() - - local, remote = [], [] - for link in links: - if link.lower().startswith(baseuri): - local.append(link) - else: - remote.append(link) - - return local, remote - - -def is_feed_link(link: str) -> bool: - """Return True if the link seems to be a feed link, or False otherwise.""" - link = link.lower() - return ( - link.endswith(".rss") - or link.endswith(".rdf") - or link.endswith(".xml") - or link.endswith(".atom") - ) - - -def is_XML_related_link(link: str) -> bool: - link = link.lower() - return "rss" in link or "rdf" in link or "xml" in link or "atom" in link - - -r_brokenRedirect = re.compile("]*>(.*?)", re.S) - - -def try_broken_redirect(data) -> str | None: - """See if the content is a 'broken redirect'. - - This is in the code taken from aaronsw and I don't know what, if anything, - ever generated this. - """ - if " bool: - """See if the data might be a feed.""" - data = data.lower() - if data.count(" 0 - - -def is_feed(uri: str) -> bool: - """See if the data at `uri` might be a feed.""" - LOG.debug(f"seeing if {uri} is a feed") - protocol = urllib.parse.urlparse(uri) - if protocol[0] not in ("http", "https"): - return False - data = _gatekeeper.get(uri) - return could_be_feed_data(data) - - -def find_feeds(uri: str, all: bool = False, _recurs=None) -> list[str]: - """Find feeds for the given URI. - - How it works: - 1. If the URI points to a feed, it is simply returned; otherwise - the page is downloaded and the real fun begins. - - 2. Feeds pointed to by LINK tags in the header of the page (autodiscovery) - - 3. links to feeds on the same server ending in ".rss", ".rdf", ".xml", - or ".atom" - - 4. links to feeds on the same server containing "rss", "rdf", "xml", or - "atom" - - 5. links to feeds on external servers ending in ".rss", ".rdf", ".xml", - or ".atom" - - 6. links to feeds on external servers containing "rss", "rdf", "xml", or - "atom" - - 7. Try some guesses about common places for feeds. (index.xml, atom.xml, - etc.) - - (At every step, feeds are minimally verified to make sure they are really - feeds.) - - If `all` is True then return all possible feeds, kinda sorta ordered in - terms of goodness. Otherwise, we stop as soon as one of the above steps - finds a likely feed. - """ - if _recurs is None: - _recurs = [uri] - fulluri = makeFullURI(uri) - try: - data = _gatekeeper.get(fulluri, check=False) - except: - return [] - - # is this already a feed? - if could_be_feed_data(data): - return [fulluri] - - newuri = try_broken_redirect(data) - if newuri and newuri not in _recurs: - _recurs.append(newuri) - return feeds(newuri, all=all, _recurs=_recurs) - - # nope, it's a page, try LINK tags first - parser = HtmlBasedParser(fulluri) - parser.feed(data) - - outfeeds = [link for link in parser.link_links if is_feed(link)] - LOG.info(f"found {len(outfeeds)} through LINK tags") - - if all or len(outfeeds) == 0: - # no LINK tags, look for regular links that point to feeds - if not all: - LOG.info("no LINK tags, looking at A tags") - - local_links, remote_links = classify_links(parser.a_links, fulluri) - - # look for obvious feed links on the same server - outfeeds.extend(filter(is_feed, filter(is_feed_link, local_links))) - if all or len(outfeeds) == 0: - # look harder for feed links on the same server - outfeeds.extend(filter(is_feed, filter(is_XML_related_link, local_links))) - - if all or len(outfeeds) == 0: - # look for obvious feed links on another server - outfeeds.extend(filter(is_feed, filter(is_feed_link, remote_links))) - - if all or len(outfeeds) == 0: - # look harder for feed links on another server - outfeeds.extend(filter(is_feed, filter(is_XML_related_link, remote_links))) - - if all or len(outfeeds) == 0: - LOG.debug("no A tags, guessing") - suffixes = [ # filenames used by popular software: - "atom.xml", # blogger, TypePad - "index.atom", # MT, apparently - "index.rdf", # MT - "rss.xml", # Dave Winer/Manila - "index.xml", # MT - "index.rss", # Slash - ] - outfeeds.extend( - filter(is_feed, [urllib.parse.urljoin(fulluri, x) for x in suffixes]) - ) - - return list(set(outfeeds))