From ecd42ad004eccd1bd635e5735d546057700bca8e Mon Sep 17 00:00:00 2001 From: lod Date: Wed, 20 Nov 2024 17:10:18 +0100 Subject: [PATCH 1/6] use oEmbed to check for title before parsing the page --- plugins/Web/plugin.py | 60 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 56 insertions(+), 4 deletions(-) diff --git a/plugins/Web/plugin.py b/plugins/Web/plugin.py index c9d9c069e..790d9cc20 100644 --- a/plugins/Web/plugin.py +++ b/plugins/Web/plugin.py @@ -33,6 +33,7 @@ import re import sys import string import socket +import json import supybot.conf as conf import supybot.utils as utils @@ -144,6 +145,54 @@ class Web(callbacks.PluginRegexp): regexps = ['titleSnarfer'] threaded = True + def __init__(self, irc): + self.__parent = super(Web, self) + self.__parent.__init__(irc) + self.oembed_providers = self._loadOEmbedProviders() + + def _loadOEmbedProviders(self): + """ + Loads the oEmbed providers JSON. + """ + try: + providers_url = "https://oembed.com/providers.json" + response = utils.web.getUrl(providers_url) + return json.loads(response) + except Exception as e: + self.log.debug(f"Failed to load oEmbed providers: {e}") + return [] + + def _getOEmbedEndpoint(self, url): + """ + Finds the appropriate oEmbed endpoint for the given URL based on providers.json. + """ + for provider in self.oembed_providers: + for pattern in provider.get('endpoints', []): + schemes = pattern.get('schemes', []) + endpoint = pattern.get('url', '') + for scheme in schemes: + regex = re.escape(scheme).replace(r'\*', '.*') # Convert wildcard to regex + if re.match(regex, url): + return endpoint + return None + + def getOEmbedTitle(self, url): + """ + Retrieves the oEmbed title using the providers JSON. + """ + try: + oembed_endpoint = self._getOEmbedEndpoint(url) + if not oembed_endpoint: + return None + + oembed_url = f"{oembed_endpoint}?format=json&url={url}" + response = utils.web.getUrl(oembed_url) + oembed_data = json.loads(response) + return oembed_data.get('title') + except Exception as e: + self.log.debug(f"Failed to retrieve oEmbed title: {e}") + return None + def noIgnore(self, irc, msg): return not self.registryValue('checkIgnored', msg.channel, irc.network) @@ -280,10 +329,13 @@ class Web(callbacks.PluginRegexp): if r and r.search(url): self.log.debug('Not titleSnarfing %q.', url) return - r = self.getTitle(irc, url, False, msg) - if not r: - return - (target, title) = r + title = self.getOEmbedTitle(url) + target = url + if not title: + r = self.getTitle(irc, url, False, msg) + if not r: + return + (target, title) = r if title: domain = utils.web.getDomain(target if self.registryValue('snarferShowTargetDomain', From e7f79b5098e59cd9d15ede4e0a28b3c8c5b3f4c7 Mon Sep 17 00:00:00 2001 From: lodriguez Date: Sat, 1 Feb 2025 15:03:13 +0100 Subject: [PATCH 2/6] refactor oEmbed, only download json when needed --- plugins/Web/plugin.py | 90 ++++++++++++++++++++++--------------------- 1 file changed, 46 insertions(+), 44 deletions(-) diff --git a/plugins/Web/plugin.py b/plugins/Web/plugin.py index 790d9cc20..d46cf1165 100644 --- a/plugins/Web/plugin.py +++ b/plugins/Web/plugin.py @@ -144,54 +144,22 @@ class Web(callbacks.PluginRegexp): """Add the help for 'help Web' here.""" regexps = ['titleSnarfer'] threaded = True - - def __init__(self, irc): - self.__parent = super(Web, self) - self.__parent.__init__(irc) - self.oembed_providers = self._loadOEmbedProviders() + _oembed_providers = None def _loadOEmbedProviders(self): """ - Loads the oEmbed providers JSON. + Loads the oEmbed providers JSON if not already loaded. + Returns the providers list. """ - try: - providers_url = "https://oembed.com/providers.json" - response = utils.web.getUrl(providers_url) - return json.loads(response) - except Exception as e: - self.log.debug(f"Failed to load oEmbed providers: {e}") - return [] - - def _getOEmbedEndpoint(self, url): - """ - Finds the appropriate oEmbed endpoint for the given URL based on providers.json. - """ - for provider in self.oembed_providers: - for pattern in provider.get('endpoints', []): - schemes = pattern.get('schemes', []) - endpoint = pattern.get('url', '') - for scheme in schemes: - regex = re.escape(scheme).replace(r'\*', '.*') # Convert wildcard to regex - if re.match(regex, url): - return endpoint - return None - - def getOEmbedTitle(self, url): - """ - Retrieves the oEmbed title using the providers JSON. - """ - try: - oembed_endpoint = self._getOEmbedEndpoint(url) - if not oembed_endpoint: - return None - - oembed_url = f"{oembed_endpoint}?format=json&url={url}" - response = utils.web.getUrl(oembed_url) - oembed_data = json.loads(response) - return oembed_data.get('title') - except Exception as e: - self.log.debug(f"Failed to retrieve oEmbed title: {e}") - return None + if self._oembed_providers is None: + try: + providers_url = "https://oembed.com/providers.json" + response = utils.web.getUrl(providers_url) + self._oembed_providers = json.loads(response) + except Exception as e: + self.log.debug(f"Failed to load oEmbed providers: {e}") + self._oembed_providers = [] + return self._oembed_providers def noIgnore(self, irc, msg): return not self.registryValue('checkIgnored', msg.channel, irc.network) @@ -313,6 +281,40 @@ class Web(callbacks.PluginRegexp): 'to have no HTML title within the first %S.', url, size) + def _getOEmbedEndpoint(self, url): + """ + Finds the appropriate oEmbed endpoint for the given URL. + First tries the providers registry if enabled, then falls back to + HTML discovery if needed and enabled. + """ + providers = self._loadOEmbedProviders() + for provider in providers: + for pattern in provider.get('endpoints', []): + schemes = pattern.get('schemes', []) + endpoint = pattern.get('url', '') + for scheme in schemes: + regex = re.escape(scheme).replace(r'\*', '.*') + if re.match(regex, url): + return endpoint + return None + + def getOEmbedTitle(self, url): + """ + Retrieves the oEmbed title using the providers JSON. + """ + try: + oembed_endpoint = self._getOEmbedEndpoint(url) + if not oembed_endpoint: + return None + + oembed_url = f"{oembed_endpoint}?format=json&url={url}" + response = utils.web.getUrl(oembed_url) + oembed_data = json.loads(response) + return oembed_data.get('title') + except Exception as e: + self.log.debug(f"Failed to retrieve oEmbed title: {e}") + return None + @fetch_sandbox def titleSnarfer(self, irc, msg, match): channel = msg.channel From eadac11ab6cdc4db95983d6b34db51c8d6f9ba2e Mon Sep 17 00:00:00 2001 From: lodriguez Date: Sat, 1 Feb 2025 15:12:54 +0100 Subject: [PATCH 3/6] add oEmbed discovery --- plugins/Web/plugin.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/plugins/Web/plugin.py b/plugins/Web/plugin.py index d46cf1165..c6ab208c4 100644 --- a/plugins/Web/plugin.py +++ b/plugins/Web/plugin.py @@ -296,17 +296,30 @@ class Web(callbacks.PluginRegexp): regex = re.escape(scheme).replace(r'\*', '.*') if re.match(regex, url): return endpoint + try: + timeout = self.registryValue('timeout') + response = utils.web.getUrl(url, timeout=timeout) + text = response.decode('utf8', errors='replace') + match = re.search( + r']+?type="application/json\+oembed"[^>]+?href="([^"]+)"', + text, + re.IGNORECASE) + if match: + endpoint = match.group(1) + endpoint = endpoint.split('?')[0] + return endpoint + except Exception as e: + self.log.debug(f"Failed to discover oEmbed endpoint in HTML: {e}") return None def getOEmbedTitle(self, url): """ - Retrieves the oEmbed title using the providers JSON. + Retrieves the oEmbed title. """ try: oembed_endpoint = self._getOEmbedEndpoint(url) if not oembed_endpoint: return None - oembed_url = f"{oembed_endpoint}?format=json&url={url}" response = utils.web.getUrl(oembed_url) oembed_data = json.loads(response) From 427845a358ef3843c33244a86dfbc255baeb45b3 Mon Sep 17 00:00:00 2001 From: lodriguez Date: Sat, 1 Feb 2025 15:29:12 +0100 Subject: [PATCH 4/6] add config options useOembedRegistry and useOembedDiscovery --- plugins/Web/config.py | 8 ++++++++ plugins/Web/plugin.py | 48 ++++++++++++++++++++++--------------------- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/plugins/Web/config.py b/plugins/Web/config.py index faacfa8d3..f5610300c 100644 --- a/plugins/Web/config.py +++ b/plugins/Web/config.py @@ -95,4 +95,12 @@ conf.registerGlobalValue(Web.fetch, 'timeout', seconds the bot will wait for the site to respond, when using the 'fetch' command in this plugin. If 0, will use socket.defaulttimeout""")) +conf.registerGlobalValue(Web, 'useOembedRegistry', + registry.Boolean(False, _("""Determines whether the bot will use the + oembed.com providers registry."""))) + +conf.registerGlobalValue(Web, 'useOembedDiscovery', + registry.Boolean(False, _("""Determines whether the bot will use HTML + discovery to find oEmbed endpoints."""))) + # vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79: diff --git a/plugins/Web/plugin.py b/plugins/Web/plugin.py index c6ab208c4..a4b6bc9b0 100644 --- a/plugins/Web/plugin.py +++ b/plugins/Web/plugin.py @@ -287,29 +287,31 @@ class Web(callbacks.PluginRegexp): First tries the providers registry if enabled, then falls back to HTML discovery if needed and enabled. """ - providers = self._loadOEmbedProviders() - for provider in providers: - for pattern in provider.get('endpoints', []): - schemes = pattern.get('schemes', []) - endpoint = pattern.get('url', '') - for scheme in schemes: - regex = re.escape(scheme).replace(r'\*', '.*') - if re.match(regex, url): - return endpoint - try: - timeout = self.registryValue('timeout') - response = utils.web.getUrl(url, timeout=timeout) - text = response.decode('utf8', errors='replace') - match = re.search( - r']+?type="application/json\+oembed"[^>]+?href="([^"]+)"', - text, - re.IGNORECASE) - if match: - endpoint = match.group(1) - endpoint = endpoint.split('?')[0] - return endpoint - except Exception as e: - self.log.debug(f"Failed to discover oEmbed endpoint in HTML: {e}") + if self.registryValue('useOembedRegistry'): + providers = self._loadOEmbedProviders() + for provider in providers: + for pattern in provider.get('endpoints', []): + schemes = pattern.get('schemes', []) + endpoint = pattern.get('url', '') + for scheme in schemes: + regex = re.escape(scheme).replace(r'\*', '.*') + if re.match(regex, url): + return endpoint + if self.registryValue('useOembedDiscovery'): + try: + timeout = self.registryValue('timeout') + response = utils.web.getUrl(url, timeout=timeout) + text = response.decode('utf8', errors='replace') + match = re.search( + r']+?type="application/json\+oembed"[^>]+?href="([^"]+)"', + text, + re.IGNORECASE) + if match: + endpoint = match.group(1) + endpoint = endpoint.split('?')[0] + return endpoint + except Exception as e: + self.log.debug(f"Failed to discover oEmbed endpoint in HTML: {e}") return None def getOEmbedTitle(self, url): From c1ceb7712382f7691fc84eca63bbaca32eb99a62 Mon Sep 17 00:00:00 2001 From: lodriguez Date: Sat, 1 Feb 2025 16:26:17 +0100 Subject: [PATCH 5/6] add oEmbed too title function --- plugins/Web/plugin.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/plugins/Web/plugin.py b/plugins/Web/plugin.py index a4b6bc9b0..6f4e296b3 100644 --- a/plugins/Web/plugin.py +++ b/plugins/Web/plugin.py @@ -489,10 +489,13 @@ class Web(callbacks.PluginRegexp): if not self._checkURLWhitelist(url): irc.error("This url is not on the whitelist.") return - r = self.getTitle(irc, url, True, msg) - if not r: - return - (target, title) = r + title = self.getOEmbedTitle(url) + target = url + if not title: + r = self.getTitle(irc, url, True, msg) + if not r: + return + (target, title) = r if title: if not [y for x,y in optlist if x == 'no-filter']: for i in range(1, 4): From 1a92dcd73f8cb888b06b94f44c7df9ca571ce19f Mon Sep 17 00:00:00 2001 From: lodriguez Date: Sat, 1 Feb 2025 16:41:11 +0100 Subject: [PATCH 6/6] add tests (result from flickr is different to the html-title-tag) --- plugins/Web/test.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/plugins/Web/test.py b/plugins/Web/test.py index b35ceb5e7..bd847587b 100644 --- a/plugins/Web/test.py +++ b/plugins/Web/test.py @@ -179,6 +179,31 @@ class WebTestCase(ChannelPluginTestCase): conf.supybot.plugins.Web.urlWhitelist.set('') conf.supybot.plugins.Web.fetch.maximum.set(fm) + def testtitleOembedRegistry(self): + try: + conf.supybot.plugins.Web.useOembedRegistry.setValue(True) + self.assertResponse( + 'title https://www.flickr.com/photos/bees/2362225867/', + 'Bacon Lollys') + finally: + conf.supybot.plugins.Web.useOembedRegistry.setValue(False) + + def testtitleOembedDiscovery(self): + try: + conf.supybot.plugins.Web.useOembedDiscovery.setValue(True) + self.assertResponse( + 'title https://flickr.com/photos/bees/2362225867/', + 'Bacon Lollys') + finally: + conf.supybot.plugins.Web.useOembedDiscovery.setValue(False) + + def testtitleOembedError(self): + try: + conf.supybot.plugins.Web.useOembedDiscovery.setValue(True) + self.assertError('title https://nonexistent.example.com/post/123') + finally: + conf.supybot.plugins.Web.useOembedDiscovery.setValue(False) + def testNonSnarfingRegexpConfigurable(self): self.assertSnarfNoResponse('http://foo.bar.baz/', 2) try: