diff --git a/plugins/Web/config.py b/plugins/Web/config.py index 47be13519..71af827b4 100644 --- a/plugins/Web/config.py +++ b/plugins/Web/config.py @@ -96,4 +96,12 @@ conf.registerGlobalValue(Web.fetch, 'timeout', seconds the bot will wait for the site to respond, when using the 'fetch' command in this plugin. If 0, will use socket.defaulttimeout""")) +conf.registerGlobalValue(Web, 'useOembedRegistry', + registry.Boolean(False, _("""Determines whether the bot will use the + oembed.com providers registry."""))) + +conf.registerGlobalValue(Web, 'useOembedDiscovery', + registry.Boolean(False, _("""Determines whether the bot will use HTML + discovery to find oEmbed endpoints."""))) + # vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79: diff --git a/plugins/Web/plugin.py b/plugins/Web/plugin.py index 0af8a5d5f..5844250f1 100644 --- a/plugins/Web/plugin.py +++ b/plugins/Web/plugin.py @@ -33,6 +33,7 @@ import re import sys import string import socket +import json import supybot.conf as conf import supybot.utils as utils @@ -143,7 +144,23 @@ class Web(callbacks.PluginRegexp): """Add the help for 'help Web' here.""" regexps = ['titleSnarfer'] threaded = True - + _oembed_providers = None + + def _loadOEmbedProviders(self): + """ + Loads the oEmbed providers JSON if not already loaded. + Returns the providers list. + """ + if self._oembed_providers is None: + try: + providers_url = "https://oembed.com/providers.json" + response = utils.web.getUrl(providers_url) + self._oembed_providers = json.loads(response) + except Exception as e: + self.log.debug(f"Failed to load oEmbed providers: {e}") + self._oembed_providers = [] + return self._oembed_providers + def noIgnore(self, irc, msg): return not self.registryValue('checkIgnored', msg.channel, irc.network) @@ -264,6 +281,55 @@ class Web(callbacks.PluginRegexp): 'to have no HTML title within the first %S.', url, size) + def _getOEmbedEndpoint(self, url): + """ + Finds the appropriate oEmbed endpoint for the given URL. + First tries the providers registry if enabled, then falls back to + HTML discovery if needed and enabled. + """ + if self.registryValue('useOembedRegistry'): + providers = self._loadOEmbedProviders() + for provider in providers: + for pattern in provider.get('endpoints', []): + schemes = pattern.get('schemes', []) + endpoint = pattern.get('url', '') + for scheme in schemes: + regex = re.escape(scheme).replace(r'\*', '.*') + if re.match(regex, url): + return endpoint + if self.registryValue('useOembedDiscovery'): + try: + timeout = self.registryValue('timeout') + response = utils.web.getUrl(url, timeout=timeout) + text = response.decode('utf8', errors='replace') + match = re.search( + r']+?type="application/json\+oembed"[^>]+?href="([^"]+)"', + text, + re.IGNORECASE) + if match: + endpoint = match.group(1) + endpoint = endpoint.split('?')[0] + return endpoint + except Exception as e: + self.log.debug(f"Failed to discover oEmbed endpoint in HTML: {e}") + return None + + def getOEmbedTitle(self, url): + """ + Retrieves the oEmbed title. + """ + try: + oembed_endpoint = self._getOEmbedEndpoint(url) + if not oembed_endpoint: + return None + oembed_url = f"{oembed_endpoint}?format=json&url={url}" + response = utils.web.getUrl(oembed_url) + oembed_data = json.loads(response) + return oembed_data.get('title') + except Exception as e: + self.log.debug(f"Failed to retrieve oEmbed title: {e}") + return None + @fetch_sandbox def titleSnarfer(self, irc, msg, match): channel = msg.channel @@ -280,10 +346,13 @@ class Web(callbacks.PluginRegexp): if r and r.search(url): self.log.debug('Not titleSnarfing %q.', url) return - r = self.getTitle(irc, url, False, msg) - if not r: - return - (target, title) = r + title = self.getOEmbedTitle(url) + target = url + if not title: + r = self.getTitle(irc, url, False, msg) + if not r: + return + (target, title) = r if title: domain = utils.web.getDomain(target if self.registryValue('snarferShowTargetDomain', @@ -422,10 +491,13 @@ class Web(callbacks.PluginRegexp): if not self._checkURLWhitelist(irc, msg, url): irc.error("This url is not on the whitelist.") return - r = self.getTitle(irc, url, True, msg) - if not r: - return - (target, title) = r + title = self.getOEmbedTitle(url) + target = url + if not title: + r = self.getTitle(irc, url, True, msg) + if not r: + return + (target, title) = r if title: if not [y for x,y in optlist if x == 'no-filter']: for i in range(1, 4): diff --git a/plugins/Web/test.py b/plugins/Web/test.py index b35ceb5e7..bd847587b 100644 --- a/plugins/Web/test.py +++ b/plugins/Web/test.py @@ -179,6 +179,31 @@ class WebTestCase(ChannelPluginTestCase): conf.supybot.plugins.Web.urlWhitelist.set('') conf.supybot.plugins.Web.fetch.maximum.set(fm) + def testtitleOembedRegistry(self): + try: + conf.supybot.plugins.Web.useOembedRegistry.setValue(True) + self.assertResponse( + 'title https://www.flickr.com/photos/bees/2362225867/', + 'Bacon Lollys') + finally: + conf.supybot.plugins.Web.useOembedRegistry.setValue(False) + + def testtitleOembedDiscovery(self): + try: + conf.supybot.plugins.Web.useOembedDiscovery.setValue(True) + self.assertResponse( + 'title https://flickr.com/photos/bees/2362225867/', + 'Bacon Lollys') + finally: + conf.supybot.plugins.Web.useOembedDiscovery.setValue(False) + + def testtitleOembedError(self): + try: + conf.supybot.plugins.Web.useOembedDiscovery.setValue(True) + self.assertError('title https://nonexistent.example.com/post/123') + finally: + conf.supybot.plugins.Web.useOembedDiscovery.setValue(False) + def testNonSnarfingRegexpConfigurable(self): self.assertSnarfNoResponse('http://foo.bar.baz/', 2) try: