cry/cry/feed.py
John Doty 08fe7c1cf7 Changed my mind about feed times
I hope I haven't broken things, we'll see after a while I guess.
2024-11-22 12:09:03 -08:00

772 lines
25 KiB
Python

# I guess this is it.
import asyncio
import dataclasses
import functools
import hashlib
import html.parser
import io
import logging
import re
import time
import typing
import urllib.parse
import urllib.robotparser
import feedparser
import requests
import requests.structures
LOG = logging.getLogger(__name__)
USER_AGENT = "cry-reader v0.0"
FEED_STATUS_ALIVE = 0
FEED_STATUS_DEAD = 1
FEED_STATUS_UNSUBSCRIBED = 2
# TODO: Consider configuration here.
http = requests.Session()
@dataclasses.dataclass(frozen=True)
class FeedMeta:
url: str
last_fetched_ts: int
retry_after_ts: int
status: int
etag: str | None
modified: str | None
@classmethod
def from_url(cls, url: str) -> "FeedMeta":
return FeedMeta(
url=url,
last_fetched_ts=0,
retry_after_ts=0,
status=FEED_STATUS_ALIVE,
etag=None,
modified=None,
)
def should_fetch(self, now) -> bool:
if self.status != FEED_STATUS_ALIVE:
LOG.info(f"{self.url} is dead or unsubscribed")
return False
if now < self.retry_after_ts:
retry_str = time.strftime(
"%Y-%m-%d %H:%M:%S %z", time.localtime(self.retry_after_ts)
)
LOG.info(f"{self.url} will not be pulled until {retry_str}")
return False
return True
@dataclasses.dataclass(frozen=True)
class Entry:
id: str
inserted_at: int # Unix time, but ms, not sec
posted_at: int # Unix time, but ms, not sec
title: str
link: str | None
@classmethod
def from_parsed(cls, entry: feedparser.FeedParserDict, insert_time: int) -> "Entry":
"""Convert an entry from feedparser into an Entry by extracting the
things we care about, fudging things and substituting things as
necessary.
The one thing we need from the outside is the "insert time", which
is *almost* `int(time.time())` but needs a little bit of fudging in
order to ensure that we can keep the items in order when we get a lot
of them all at once.
"""
title = entry.get("title")
if not title:
title = entry.get("description")
id = entry.get("id")
link = entry.get("link")
if id and not link:
linkid = str(id).lower()
if linkid.startswith("http:") or linkid.startswith("https:"):
link = linkid
if link and not id:
id = link
if title and not id:
id = title
if not id:
id = entry.get("published")
if not id:
id = the_worst_element_hash(entry)
published = entry.get("published_parsed")
if published is None:
published = entry.get("updated_parsed")
if published is not None:
assert isinstance(published, tuple)
# NOTE: Take insert_time if it's smaller; publish time errors generate
# posts from the future.
posted_at = min(int(time.mktime(published) * 1000), insert_time)
else:
posted_at = int(insert_time)
assert isinstance(id, str)
assert link is None or isinstance(link, str)
title = clean_text(str(title))
return Entry(
id=id,
inserted_at=insert_time,
posted_at=posted_at,
title=title,
link=link,
)
def time_ago(self) -> str:
posted = int(self.posted_at / 1000)
seconds = int(time.time()) - posted
if seconds <= 90:
return f"{seconds}s"
minutes = int(seconds / 60)
if minutes <= 90:
return f"{minutes}m"
hours = int(minutes / 60)
if hours < 24:
return f"{hours}h"
days = int(hours / 24)
if days <= 7:
return f"{days}d"
weeks = int(days / 7)
if weeks < 52:
return f"{weeks}w"
years = int(weeks / 52)
return f"{years}y"
@dataclasses.dataclass(frozen=True)
class Feed:
meta: FeedMeta
title: str
link: str
entries: list[Entry]
@classmethod
def from_parsed(cls, d: feedparser.FeedParserDict, meta: FeedMeta) -> "Feed":
title = None
link = None
if d.feed is not None:
assert not isinstance(d.feed, list)
title = d.feed.get("title")
link = d.feed.get("link")
if title is None or title == "":
title = meta.url
if link is None:
link = meta.url
# =====================================================================
# FEED AND ENTRY ORDERING!
# =====================================================================
# In many ways this is the most critical part of a feed reader: in
# what order do we show the items in the feed?
#
# RSS is pretty unspecified in general, but also in what the meaning
# of the order of the entries in the feed actually is. (I can't
# remember if this is something that Atom specifies but it doesn't
# matter because RSS is still really popular, even in the ungodly
# late year of 2024.
#
# *We* want to show posts in reverse chronological order, of course,
# but we still have problems. You *cannot* trust the dates and times
# in the entries. Sure, sure, Atom does a great job of specifying at
# least three different timestamps in the feed, and they are supposed
# to have time zones and whatnot. But:
#
# a) Any kind of timestamp is optional in RSS, and
# b) Even if the timestamp is present, it can come in a variety of
# formats (which theoretically `feedparser` handles), but
# c) Even if you can parse the timestamp, many feed implementations
# just PUT THE WRONG TIME IN THERE.
#
# So we have to account for the fact that the publish time might be
# wildly unreliable, and back it up with our own clock. This comes with
# its own problems, of course: our clock can be highly unreliable. But
# in general it's good enough to work with, and feeds don't update so
# frequently that we need to worry about most of these problems if we
# use unix timestamps as our basis.
#
# If we just use our own timestamps, then what do we do with feed
# updates where multiple items are inserted at once? We want to
# preserve that ordering too! Our hack is to multiply the unix
# timestamp by 1000, and then use the lower two digits as a sequence
# number. (Maybe it looks like everything was posted a millisecond
# apart?) There's a *chance* of conflict if:
#
# a) a feed as more than 1000 items in it, and
# b) we update the feed again less than a second later
#
# But given the other rate limiting features in this RSS system (The
# `retry_after_ts` field, etc.) it's not a very likely thing to
# happen.
#
# The *other* big source of time instability is that "new" items might
# seem to have been published with a time that is "before" the last
# item we previously saw. (i.e., on the first refresh we see an item
# from October 3rd, then on the next refresh we see an item from October
# 1st.) We don't know anything about historical refreshes here in feed
# land, so that gets corrected in the database. (See store_feed.)
#
insert_time = int(time.time()) * 1000
entries = [
Entry.from_parsed(e, insert_time + i)
for i, e in enumerate(reversed(d.entries))
]
entries.reverse()
return Feed(meta=meta, title=title, link=link, entries=entries)
def the_worst_element_hash(value) -> str:
"""Compute a content hash for the given feed element, to use as an ID.
The hash must be as stable as we can make it, but obviously there are things
we cannot control. If we've gotten here then the feed author has already
failed us and there's little we can do. This is already *known to be wrong.*
"""
def process(value, hash):
if isinstance(value, feedparser.FeedParserDict):
hash.update(b"dict")
keys = sorted(value.keys())
for key in keys:
hash.update(b"key::")
hash.update(key.encode("utf-8"))
hash.update(b"value::")
process(value[key], hash)
hash.update(b"tcid")
elif isinstance(value, str):
hash.update(b"str")
hash.update(value.encode("utf-8"))
hash.update(b"rts")
elif isinstance(value, list):
hash.update(b"list")
for item in value:
process(item, hash)
hash.update(b"tsil")
elif isinstance(value, tuple):
hash.update(b"tuple")
for item in value:
process(item, hash)
hash.update(b"elput")
hash = hashlib.sha256(usedforsecurity=False)
process(value, hash)
return hash.hexdigest()
BLANK_TAGS = {"p", "br", "li", "div", "img"}
MULTI_SPACES = re.compile(r"\s+")
def clean_text(text: str) -> str:
"""Sometimes text is HTML and otherwise ugly. This reduces it to
something pretty to display. Strips tags, puts blank space in between
elements that should generate blank space, and then collapses blank
spaces down to one.
"""
class Cleaner(html.parser.HTMLParser):
def __init__(self, writer):
super().__init__()
self.writer = writer
def handle_data(self, data: str) -> None:
self.writer.write(data)
def handle_startendtag(
self, tag: str, attrs: list[tuple[str, str | None]]
) -> None:
del attrs
if tag.lower() in BLANK_TAGS:
self.writer.write(" ")
def handle_starttag(
self, tag: str, attrs: list[tuple[str, str | None]]
) -> None:
del attrs
if tag.lower() in BLANK_TAGS:
self.writer.write(" ")
writer = io.StringIO()
cleaner = Cleaner(writer)
cleaner.feed(text)
cleaner.close()
return MULTI_SPACES.sub(" ", writer.getvalue())
def could_be_feed_data(data: str) -> bool:
"""See if the data might be a feed."""
data = data.lower()
if data.count("<html"):
return False
return (data.count("<rss") + data.count("<rdf") + data.count("<feed")) > 0
class Guardian:
"""A keeper of robots.txt files."""
permissions: dict[str, urllib.robotparser.RobotFileParser | asyncio.Lock]
def __init__(self):
self.permissions = {}
async def get_robots_parser(self, url: str) -> urllib.robotparser.RobotFileParser:
"""Fetch the robots parser for the given URL. Only do it once per site."""
url = urllib.parse.urljoin(url, "/robots.txt")
parser = self.permissions.get(url)
if parser is None:
parser = asyncio.Lock()
self.permissions[url] = parser
if isinstance(parser, urllib.robotparser.RobotFileParser):
return parser
assert isinstance(parser, asyncio.Lock)
async with parser:
parser = self.permissions.get(url)
if isinstance(parser, urllib.robotparser.RobotFileParser):
return parser
LOG.debug(f"{url} Fetching robots.txt...")
headers = {"user-agent": USER_AGENT}
event_loop = asyncio.get_running_loop()
response = await event_loop.run_in_executor(
None,
functools.partial(
requests.get,
url,
headers=headers,
),
)
parser = urllib.robotparser.RobotFileParser(url)
if response.status_code in (401, 403):
LOG.debug(
f"{url} Server says {response.status_code}, asusming we can't fetch anything"
)
parser.disallow_all = True # type: ignore
elif response.status_code >= 400 and response.status_code < 500:
LOG.debug(
f"{url} Server says {response.status_code}, assume we have free reign"
)
parser.allow_all = True # type: ignore
elif response.status_code >= 300:
response.raise_for_status()
else:
text = await event_loop.run_in_executor(None, lambda: response.text)
parser.parse(text.splitlines())
self.permissions[url] = parser
return parser
async def can_fetch(self, url: str) -> bool:
"""Returns true if we are allowed to fetch the given URL."""
# Look, opinions differ on whether feed readers are supposed to be
# considered robots. I added robots.txt support for feeds based on
# the example of the feed finder python code but on reflection it
# does not do what I want it to do and the world seems to suggest
# that RSS readers should ignore it. (i.e., jwz blocks robots from
# accessing the RSS feed, so.)
#
# I'm leaving this code here so that I can resurrect it later if
# necessary.
#
# parser = await self.get_robots_parser(url)
# return parser.can_fetch(USER_AGENT, url)
del url
return True
async def crawl_delay(self, url: str) -> int | None:
"""Returns the number of seconds we should wait before fetching again."""
try:
parser = await self.get_robots_parser(url)
result = parser.crawl_delay(USER_AGENT)
if isinstance(result, str):
try:
return int(result)
except ValueError:
return None
except Exception as e:
LOG.error(f"Error fetching crawl delay for {url}: {e}")
return None
GUARDIAN = Guardian()
async def fetch_feed(meta: FeedMeta) -> typing.Tuple[None | Feed | str, FeedMeta]:
"""Fetch a feed from the internet. `meta` is a FeedMeta that has all the
details about what happened the last time we went to do a fetch, caching
information and whatnot.
The return value is a little funky. It returns a 2-tuple, where the first
element is one of:
- None, if we could not fetch anything
- A Feed, if we fetched something and it seemed to be a feed
- A string, if we fetched something but it was not a feed
The second element is a FeedMeta that describes the URL. It might be the
same as the FeedMeta that was provided, but it might not be:
- The etag might have been updated if the server sent us an etag
- The modified value might have been updated if the server sent us a
new value
- The URL might have been updated if we followed a permanent redirect
Just to be safe, callers should use the new FeedMeta in place of the
argument for everything after calling this function.
"""
if not meta.should_fetch(time.time()):
return (None, meta)
headers = {"user-agent": USER_AGENT}
if meta.etag:
headers["if-none-match"] = meta.etag
if meta.modified:
headers["if-modified-since"] = meta.modified
# We waffle back and forth about using feedreader's HTTP support vs
# calling requests ourselves. We have decided to use requests manually at
# this time because it make it much much easier to figure out whether or
# not a request has succeeded. (The straw was handling timeouts and
# understanding whether `bozo_exception` was a transport failure or not.)
if await GUARDIAN.can_fetch(meta.url):
try:
loop = asyncio.get_running_loop()
response = await loop.run_in_executor(
None,
functools.partial(http.get, meta.url, headers=headers),
)
LOG.info(f"{meta.url} fetched with status: {response.status_code}")
failed = response.status_code >= 400
except Exception as e:
LOG.error(f"{meta.url} error fetching: {e}")
failed = True
response = None
else:
LOG.error(f"{meta.url} Guardian says we cannot fetch")
failed = True
response = None
# Now, there are a number of things to consider in the response that
# we need to consider in updating our permanent record.
if response is not None and response.status_code == 410:
# Permanently gone, really stop asking.
LOG.error(f"{meta.url} permanently gone")
return (None, dataclasses.replace(meta, status=FEED_STATUS_DEAD))
if failed and time.time() > meta.last_fetched_ts + (7 * 24 * 60 * 60):
# If we've been failing to fetch the feed for more than a week then
# consider us dead, we must be doing something wrong.
LOG.error(f"{meta.url} failed for too long, giving up")
return (None, dataclasses.replace(meta, status=FEED_STATUS_DEAD))
if response:
# Check for permanent redirects and handle them properly. Note that
# requests is kinda dumb when it comes to permanent redirects: we
# have to slog through the history itself when it comes to the
# redirects, and we have to note the URL of the request *after* the
# permanent redirect in order to get the right one.
#
new_url = None
history = list(response.history)
history.append(response)
history.reverse()
last_url = response.url
for h in history:
if h.is_permanent_redirect:
new_url = last_url
break
last_url = h.url
if new_url is not None:
LOG.info(f"{meta.url} permanently redirected to {new_url}")
meta = dataclasses.replace(meta, url=new_url)
# TODO: Handle that bogus non-HTTP redirect that feedfinder uses.
# NOTE: We might still be in a failure state here. But success or fail,
# the server might have told us when to next retry, so make a note
# of it. The server might also have given us updated caching
# information (even on failure!) and so let's also make a note of that.
retry_delta = None
etag = meta.etag
modified = meta.modified
if response is not None:
etag = response.headers.get("etag", meta.etag)
modified = response.headers.get("last-modified", meta.modified)
try:
retry_delta = int(response.headers.get("retry-after", "nope"))
except Exception:
pass
if retry_delta is None:
# See if robots.txt has any guidance for us.
retry_delta = await GUARDIAN.crawl_delay(meta.url)
if retry_delta is None:
if failed:
retry_delta = 1 * 60 # Retry again in a minute
else:
retry_delta = 60 * 60 # 1 hour default
meta = dataclasses.replace(
meta,
retry_after_ts=int(time.time()) + retry_delta,
etag=etag,
modified=modified,
)
# We've done everything we can on a failure, bail if we've got an error.
if failed:
LOG.info(f"{meta.url} failed at the network level")
return (None, meta)
assert response is not None
# Record our successful fetch now, to reset the failure timer above.
meta = dataclasses.replace(meta, last_fetched_ts=int(time.time()))
# We can *still* be successful but like, no changes.
if response.status_code != 200:
LOG.info(f"{meta.url} had no changes")
return (None, meta)
# Does this seem to be a feed? Or not?
if could_be_feed_data(response.text):
parsed = feedparser.parse(response.content, response_headers=response.headers)
return (Feed.from_parsed(parsed, meta), meta)
# No this is not a feed, just return the content out for further
# processing.
return (response.text, meta)
async def fetch_many(
metas: list[FeedMeta],
) -> list[typing.Tuple[Feed | str | None, FeedMeta]]:
async with asyncio.TaskGroup() as group:
tasks = [group.create_task(fetch_feed(m)) for m in metas]
return [t.result() for t in tasks]
def sort_key(f: Feed) -> int:
"""A sort key for sorting feeds by recency."""
if len(f.entries) > 0:
return max(e.posted_at for e in f.entries)
return -1
class FeedSearchParser(html.parser.HTMLParser):
"""An HTML parser that tries to find links to feeds."""
FEED_TYPES = (
"application/rss+xml",
"text/xml",
"application/atom+xml",
"application/x.atom+xml",
"application/x-atom+xml",
)
link_links: list[str]
a_links: list[str]
def __init__(self, baseuri):
super().__init__()
self.baseuri = baseuri
self.link_links = []
self.a_links = []
def handle_starttag(self, tag, attrs):
attrs = {k: v for k, v in attrs}
if tag == "base":
self.do_base(attrs)
elif tag == "link":
self.do_link(attrs)
elif tag == "a":
self.do_a(attrs)
def do_base(self, attrs):
base = attrs.get("href")
if base is not None:
self.baseuri = base
def do_link(self, attrs):
rel = attrs.get("rel")
if rel is None:
return
if "alternate" not in rel.split():
return
if attrs.get("type", "").lower() not in self.FEED_TYPES:
return
href = attrs.get("href")
if href is None:
return
self.link_links.append(urllib.parse.urljoin(self.baseuri, href))
def do_a(self, attrs):
href = attrs.get("href")
if href is None:
return
self.a_links.append(urllib.parse.urljoin(self.baseuri, href))
def massage_url(uri: str) -> str:
uri = uri.strip()
if uri.startswith("feed://"):
uri = "http://" + uri.split("feed://", 1).pop()
for x in ["http", "https"]:
if uri.startswith("%s://" % x):
return uri
return "http://%s" % uri
def classify_links(links, baseuri) -> typing.Tuple[list[str], list[str]]:
"""Split the links into two sets: local (which start with baseuri) and
remote (which don't).
"""
baseuri = baseuri.lower()
local, remote = set(), set()
for link in links:
if link.lower().startswith(baseuri):
local.add(link)
else:
remote.add(link)
return list(local), list(remote)
def is_feed_link(link: str) -> bool:
"""Return True if the link seems to be a feed link, or False otherwise."""
link = link.lower()
return (
link.endswith(".rss")
or link.endswith(".rdf")
or link.endswith(".xml")
or link.endswith(".atom")
)
def is_XML_related_link(link: str) -> bool:
link = link.lower()
return (
"rss" in link
or "rdf" in link
or "xml" in link
or "atom" in link
or "feed" in link
)
async def check_feed(url: str) -> Feed | None:
"""Check to see if the given URL is a feed. If it is, return the feed,
otherwise return None.
"""
LOG.debug(f"Checking {url}: checking...")
meta = FeedMeta.from_url(url)
result, meta = await fetch_feed(meta)
if isinstance(result, Feed):
LOG.debug(f"Checking {url}: is a feed")
return result
LOG.debug(f"Checking {url}: is not a feed")
return None
async def check_links(links: typing.Iterable[str]) -> list[Feed]:
"""Fetch all the links and return the ones that appear to have feeds in
them. If none of them are fetchable or none of them have feeds then this
will return nothing.
"""
async with asyncio.TaskGroup() as group:
tasks = [group.create_task(check_feed(link)) for link in links]
outfeeds: list[Feed] = []
for task in tasks:
result = task.result()
if result is not None:
outfeeds.append(result)
return outfeeds
async def feed_search(uri: str) -> list[Feed]:
meta = FeedMeta.from_url(massage_url(uri))
result, meta = await fetch_feed(meta)
if result is None:
return []
if isinstance(result, Feed):
return [result]
# OK it was not a feed, let's try all our searching games.
parser = FeedSearchParser(meta.url)
parser.feed(result)
LOG.debug("Checking links...")
outfeeds = await check_links(parser.link_links)
if len(outfeeds) > 0:
return outfeeds
LOG.debug("No links, checking A tags...")
local_links, remote_links = classify_links(parser.a_links, meta.url)
for link in local_links:
LOG.debug(f" LOCAL {link}")
for link in remote_links:
LOG.debug(f" REMOTE {link}")
outfeeds = await check_links(filter(is_feed_link, local_links))
if len(outfeeds) > 0:
return outfeeds
outfeeds = await check_links(filter(is_XML_related_link, local_links))
if len(outfeeds) > 0:
return outfeeds
outfeeds = await check_links(filter(is_feed_link, remote_links))
if len(outfeeds) > 0:
return outfeeds
outfeeds = await check_links(filter(is_XML_related_link, remote_links))
if len(outfeeds) > 0:
return outfeeds
LOG.debug("no A tags, guessing")
suffixes = [ # filenames used by popular software:
"atom.xml", # blogger, TypePad
"index.atom", # MT, apparently
"index.rdf", # MT
"rss.xml", # Dave Winer/Manila
"index.xml", # MT
"index.rss", # Slash
"feed", # catandgirl.com and sometimes others.
]
outfeeds = await check_links([urllib.parse.urljoin(meta.url, x) for x in suffixes])
return outfeeds