mirror of
https://github.com/progval/Limnoria.git
synced 2025-04-26 04:51:06 -05:00
Merge 1a92dcd73f8cb888b06b94f44c7df9ca571ce19f into c81ff286975701ae78246cd8f24284ca3aeac86d
This commit is contained in:
commit
6b0eb6972f
@ -96,4 +96,12 @@ conf.registerGlobalValue(Web.fetch, 'timeout',
|
|||||||
seconds the bot will wait for the site to respond, when using the 'fetch'
|
seconds the bot will wait for the site to respond, when using the 'fetch'
|
||||||
command in this plugin. If 0, will use socket.defaulttimeout"""))
|
command in this plugin. If 0, will use socket.defaulttimeout"""))
|
||||||
|
|
||||||
|
conf.registerGlobalValue(Web, 'useOembedRegistry',
|
||||||
|
registry.Boolean(False, _("""Determines whether the bot will use the
|
||||||
|
oembed.com providers registry.""")))
|
||||||
|
|
||||||
|
conf.registerGlobalValue(Web, 'useOembedDiscovery',
|
||||||
|
registry.Boolean(False, _("""Determines whether the bot will use HTML
|
||||||
|
discovery to find oEmbed endpoints.""")))
|
||||||
|
|
||||||
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:
|
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:
|
||||||
|
@ -33,6 +33,7 @@ import re
|
|||||||
import sys
|
import sys
|
||||||
import string
|
import string
|
||||||
import socket
|
import socket
|
||||||
|
import json
|
||||||
|
|
||||||
import supybot.conf as conf
|
import supybot.conf as conf
|
||||||
import supybot.utils as utils
|
import supybot.utils as utils
|
||||||
@ -143,6 +144,22 @@ class Web(callbacks.PluginRegexp):
|
|||||||
"""Add the help for 'help Web' here."""
|
"""Add the help for 'help Web' here."""
|
||||||
regexps = ['titleSnarfer']
|
regexps = ['titleSnarfer']
|
||||||
threaded = True
|
threaded = True
|
||||||
|
_oembed_providers = None
|
||||||
|
|
||||||
|
def _loadOEmbedProviders(self):
|
||||||
|
"""
|
||||||
|
Loads the oEmbed providers JSON if not already loaded.
|
||||||
|
Returns the providers list.
|
||||||
|
"""
|
||||||
|
if self._oembed_providers is None:
|
||||||
|
try:
|
||||||
|
providers_url = "https://oembed.com/providers.json"
|
||||||
|
response = utils.web.getUrl(providers_url)
|
||||||
|
self._oembed_providers = json.loads(response)
|
||||||
|
except Exception as e:
|
||||||
|
self.log.debug(f"Failed to load oEmbed providers: {e}")
|
||||||
|
self._oembed_providers = []
|
||||||
|
return self._oembed_providers
|
||||||
|
|
||||||
def noIgnore(self, irc, msg):
|
def noIgnore(self, irc, msg):
|
||||||
return not self.registryValue('checkIgnored', msg.channel, irc.network)
|
return not self.registryValue('checkIgnored', msg.channel, irc.network)
|
||||||
@ -264,6 +281,55 @@ class Web(callbacks.PluginRegexp):
|
|||||||
'to have no HTML title within the first %S.',
|
'to have no HTML title within the first %S.',
|
||||||
url, size)
|
url, size)
|
||||||
|
|
||||||
|
def _getOEmbedEndpoint(self, url):
|
||||||
|
"""
|
||||||
|
Finds the appropriate oEmbed endpoint for the given URL.
|
||||||
|
First tries the providers registry if enabled, then falls back to
|
||||||
|
HTML discovery if needed and enabled.
|
||||||
|
"""
|
||||||
|
if self.registryValue('useOembedRegistry'):
|
||||||
|
providers = self._loadOEmbedProviders()
|
||||||
|
for provider in providers:
|
||||||
|
for pattern in provider.get('endpoints', []):
|
||||||
|
schemes = pattern.get('schemes', [])
|
||||||
|
endpoint = pattern.get('url', '')
|
||||||
|
for scheme in schemes:
|
||||||
|
regex = re.escape(scheme).replace(r'\*', '.*')
|
||||||
|
if re.match(regex, url):
|
||||||
|
return endpoint
|
||||||
|
if self.registryValue('useOembedDiscovery'):
|
||||||
|
try:
|
||||||
|
timeout = self.registryValue('timeout')
|
||||||
|
response = utils.web.getUrl(url, timeout=timeout)
|
||||||
|
text = response.decode('utf8', errors='replace')
|
||||||
|
match = re.search(
|
||||||
|
r'<link[^>]+?type="application/json\+oembed"[^>]+?href="([^"]+)"',
|
||||||
|
text,
|
||||||
|
re.IGNORECASE)
|
||||||
|
if match:
|
||||||
|
endpoint = match.group(1)
|
||||||
|
endpoint = endpoint.split('?')[0]
|
||||||
|
return endpoint
|
||||||
|
except Exception as e:
|
||||||
|
self.log.debug(f"Failed to discover oEmbed endpoint in HTML: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def getOEmbedTitle(self, url):
|
||||||
|
"""
|
||||||
|
Retrieves the oEmbed title.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
oembed_endpoint = self._getOEmbedEndpoint(url)
|
||||||
|
if not oembed_endpoint:
|
||||||
|
return None
|
||||||
|
oembed_url = f"{oembed_endpoint}?format=json&url={url}"
|
||||||
|
response = utils.web.getUrl(oembed_url)
|
||||||
|
oembed_data = json.loads(response)
|
||||||
|
return oembed_data.get('title')
|
||||||
|
except Exception as e:
|
||||||
|
self.log.debug(f"Failed to retrieve oEmbed title: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
@fetch_sandbox
|
@fetch_sandbox
|
||||||
def titleSnarfer(self, irc, msg, match):
|
def titleSnarfer(self, irc, msg, match):
|
||||||
channel = msg.channel
|
channel = msg.channel
|
||||||
@ -280,6 +346,9 @@ class Web(callbacks.PluginRegexp):
|
|||||||
if r and r.search(url):
|
if r and r.search(url):
|
||||||
self.log.debug('Not titleSnarfing %q.', url)
|
self.log.debug('Not titleSnarfing %q.', url)
|
||||||
return
|
return
|
||||||
|
title = self.getOEmbedTitle(url)
|
||||||
|
target = url
|
||||||
|
if not title:
|
||||||
r = self.getTitle(irc, url, False, msg)
|
r = self.getTitle(irc, url, False, msg)
|
||||||
if not r:
|
if not r:
|
||||||
return
|
return
|
||||||
@ -422,6 +491,9 @@ class Web(callbacks.PluginRegexp):
|
|||||||
if not self._checkURLWhitelist(irc, msg, url):
|
if not self._checkURLWhitelist(irc, msg, url):
|
||||||
irc.error("This url is not on the whitelist.")
|
irc.error("This url is not on the whitelist.")
|
||||||
return
|
return
|
||||||
|
title = self.getOEmbedTitle(url)
|
||||||
|
target = url
|
||||||
|
if not title:
|
||||||
r = self.getTitle(irc, url, True, msg)
|
r = self.getTitle(irc, url, True, msg)
|
||||||
if not r:
|
if not r:
|
||||||
return
|
return
|
||||||
|
@ -179,6 +179,31 @@ class WebTestCase(ChannelPluginTestCase):
|
|||||||
conf.supybot.plugins.Web.urlWhitelist.set('')
|
conf.supybot.plugins.Web.urlWhitelist.set('')
|
||||||
conf.supybot.plugins.Web.fetch.maximum.set(fm)
|
conf.supybot.plugins.Web.fetch.maximum.set(fm)
|
||||||
|
|
||||||
|
def testtitleOembedRegistry(self):
|
||||||
|
try:
|
||||||
|
conf.supybot.plugins.Web.useOembedRegistry.setValue(True)
|
||||||
|
self.assertResponse(
|
||||||
|
'title https://www.flickr.com/photos/bees/2362225867/',
|
||||||
|
'Bacon Lollys')
|
||||||
|
finally:
|
||||||
|
conf.supybot.plugins.Web.useOembedRegistry.setValue(False)
|
||||||
|
|
||||||
|
def testtitleOembedDiscovery(self):
|
||||||
|
try:
|
||||||
|
conf.supybot.plugins.Web.useOembedDiscovery.setValue(True)
|
||||||
|
self.assertResponse(
|
||||||
|
'title https://flickr.com/photos/bees/2362225867/',
|
||||||
|
'Bacon Lollys')
|
||||||
|
finally:
|
||||||
|
conf.supybot.plugins.Web.useOembedDiscovery.setValue(False)
|
||||||
|
|
||||||
|
def testtitleOembedError(self):
|
||||||
|
try:
|
||||||
|
conf.supybot.plugins.Web.useOembedDiscovery.setValue(True)
|
||||||
|
self.assertError('title https://nonexistent.example.com/post/123')
|
||||||
|
finally:
|
||||||
|
conf.supybot.plugins.Web.useOembedDiscovery.setValue(False)
|
||||||
|
|
||||||
def testNonSnarfingRegexpConfigurable(self):
|
def testNonSnarfingRegexpConfigurable(self):
|
||||||
self.assertSnarfNoResponse('http://foo.bar.baz/', 2)
|
self.assertSnarfNoResponse('http://foo.bar.baz/', 2)
|
||||||
try:
|
try:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user