mirror of
https://github.com/oddluck/limnoria-plugins.git
synced 2025-04-26 04:51:09 -05:00
1724 lines
70 KiB
Python
1724 lines
70 KiB
Python
###
|
|
# Copyright (c) 2015, butterscotchstallion
|
|
# Copyright (c) 2020, oddluck <oddluck@riseup.net>
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
#
|
|
# * Redistributions of source code must retain the above copyright notice,
|
|
# this list of conditions, and the following disclaimer.
|
|
# * Redistributions in binary form must reproduce the above copyright notice,
|
|
# this list of conditions, and the following disclaimer in the
|
|
# documentation and/or other materials provided with the distribution.
|
|
# * Neither the name of the author of this software nor the name of
|
|
# contributors to this software may be used to endorse or promote products
|
|
# derived from this software without specific prior written consent.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
###
|
|
from supybot.commands import *
|
|
import supybot.ircmsgs as ircmsgs
|
|
import supybot.ircutils as ircutils
|
|
import supybot.callbacks as callbacks
|
|
import supybot.utils as utils
|
|
import supybot.ircdb as ircdb
|
|
import supybot.log as log
|
|
import re, sys, random, time, json, unicodedata, datetime
|
|
from urllib.parse import urlparse, parse_qsl
|
|
from bs4 import BeautifulSoup
|
|
from jinja2 import Template
|
|
import requests
|
|
|
|
try:
|
|
from supybot.i18n import PluginInternationalization
|
|
|
|
_ = PluginInternationalization("SpiffyTitles")
|
|
except ImportError:
|
|
# Placeholder that allows to run the plugin on a bot
|
|
# without the i18n module
|
|
_ = lambda x: x
|
|
|
|
|
|
class SpiffyTitles(callbacks.Plugin):
|
|
"""Displays link titles when posted in a channel"""
|
|
|
|
threaded = True
|
|
callBefore = ["Web"]
|
|
|
|
def __init__(self, irc):
|
|
self.__parent = super(SpiffyTitles, self)
|
|
self.__parent.__init__(irc)
|
|
self.link_cache = {}
|
|
self.handlers = {}
|
|
self.timeout = self.registryValue("timeout")
|
|
self.add_handlers()
|
|
|
|
def add_handlers(self):
|
|
"""
|
|
Adds all handlers
|
|
"""
|
|
self.add_youtube_handlers()
|
|
self.add_imdb_handlers()
|
|
self.add_imgur_handlers()
|
|
self.add_coub_handlers()
|
|
self.add_vimeo_handlers()
|
|
self.add_dailymotion_handlers()
|
|
self.add_wikipedia_handlers()
|
|
self.add_reddit_handlers()
|
|
self.add_twitch_handlers()
|
|
self.add_twitter_handlers()
|
|
|
|
def add_twitter_handlers(self):
|
|
self.handlers["twitter.com"] = self.handler_twitter
|
|
self.handlers["www.twitter.com"] = self.handler_twitter
|
|
|
|
def add_dailymotion_handlers(self):
|
|
self.handlers["www.dailymotion.com"] = self.handler_dailymotion
|
|
self.handlers["dailymotion.com"] = self.handler_dailymotion
|
|
self.handlers["dai.ly"] = self.handler_dailymotion
|
|
|
|
def add_vimeo_handlers(self):
|
|
self.handlers["vimeo.com"] = self.handler_vimeo
|
|
|
|
def add_coub_handlers(self):
|
|
self.handlers["coub.com"] = self.handler_coub
|
|
|
|
def add_wikipedia_handlers(self):
|
|
self.handlers["wikipedia.org"] = self.handler_wikipedia
|
|
|
|
def add_reddit_handlers(self):
|
|
self.handlers["reddit.com"] = self.handler_reddit
|
|
self.handlers["www.reddit.com"] = self.handler_reddit
|
|
|
|
def add_twitch_handlers(self):
|
|
"""
|
|
Enables meta info about Twitch links through the Twitch API
|
|
"""
|
|
self.handlers["twitch.tv"] = self.handler_twitch
|
|
self.handlers["www.twitch.tv"] = self.handler_twitch
|
|
self.handlers["go.twitch.tv"] = self.handler_twitch
|
|
self.handlers["clips.twitch.tv"] = self.handler_twitch
|
|
|
|
def add_imdb_handlers(self):
|
|
"""
|
|
Enables meta info about IMDB links through the OMDB API
|
|
"""
|
|
self.handlers["imdb.com"] = self.handler_imdb
|
|
|
|
def add_youtube_handlers(self):
|
|
"""
|
|
Adds handlers for Youtube videos. The handler is matched based on the
|
|
domain used in the URL.
|
|
"""
|
|
self.handlers["youtube.com"] = self.handler_youtube
|
|
self.handlers["youtu.be"] = self.handler_youtube
|
|
|
|
def add_imgur_handlers(self):
|
|
# Images mostly
|
|
self.handlers["i.imgur.com"] = self.handler_imgur_image
|
|
# Albums, galleries, etc
|
|
self.handlers["imgur.com"] = self.handler_imgur
|
|
|
|
def doPrivmsg(self, irc, msg):
|
|
"""
|
|
Observe each channel message and look for links
|
|
"""
|
|
channel = msg.args[0]
|
|
message = msg.args[1]
|
|
title = None
|
|
if not irc.isChannel(channel):
|
|
return
|
|
if msg.nick.lower() == irc.nick.lower():
|
|
return
|
|
if callbacks.addressed(irc, msg) and self.registryValue(
|
|
"ignoreAddressed", channel=channel
|
|
):
|
|
return
|
|
"""
|
|
Check if we require a capability to acknowledge this link
|
|
"""
|
|
if self.registryValue("requireCapability", channel=channel):
|
|
if not self.user_has_capability(msg):
|
|
return
|
|
"""
|
|
Configuration option determines whether we should
|
|
ignore links that appear within an action
|
|
"""
|
|
if self.registryValue("ignoreActionLinks", channel=channel) and (
|
|
ircmsgs.isCtcp(msg) or ircmsgs.isAction(msg)
|
|
):
|
|
return
|
|
if self.message_matches_ignore_pattern(message, channel):
|
|
log.debug(
|
|
"SpiffyTitles: ignoring message due to ignoredMessagePattern match"
|
|
)
|
|
return
|
|
if not self.is_channel_allowed(channel):
|
|
log.debug(
|
|
"SpiffyTitles: not responding to link in %s due to black/white "
|
|
"list restrictions"
|
|
% (channel)
|
|
)
|
|
return
|
|
urls = self.get_urls_from_message(message, channel)
|
|
if not urls:
|
|
return
|
|
for url in urls:
|
|
if url.strip():
|
|
url = self.remove_control_characters(url)
|
|
# Check if channel is allowed based on white/black list restrictions
|
|
info = urlparse(url)
|
|
domain = info.netloc
|
|
is_ignored = self.is_ignored_domain(domain, channel)
|
|
if is_ignored:
|
|
log.debug(
|
|
"SpiffyTitles: URL ignored due to domain blacklist match: %s"
|
|
% url
|
|
)
|
|
return
|
|
is_whitelisted_domain = self.is_whitelisted_domain(domain, channel)
|
|
whitelist_pattern = self.registryValue(
|
|
"whitelistDomainPattern", channel=channel
|
|
)
|
|
if whitelist_pattern and not is_whitelisted_domain:
|
|
log.debug(
|
|
"SpiffyTitles: URL ignored due to domain whitelist mismatch: %s"
|
|
% url
|
|
)
|
|
return
|
|
title = self.get_title_by_url(url, channel, msg.nick)
|
|
if title:
|
|
ignore_match = self.title_matches_ignore_pattern(title, channel)
|
|
if ignore_match:
|
|
return
|
|
elif not is_ignored:
|
|
irc.reply(title, prefixNick=False)
|
|
else:
|
|
if self.registryValue("default.enabled", channel):
|
|
log.debug("SpiffyTitles: could not get a title for %s" % (url))
|
|
else:
|
|
log.debug(
|
|
"SpiffyTitles: could not get a title for %s but default "
|
|
" handler is disabled"
|
|
% (url)
|
|
)
|
|
|
|
def handler_default(self, url, channel):
|
|
"""
|
|
Default handler for websites
|
|
"""
|
|
if self.registryValue("default.enabled", channel):
|
|
log.debug("SpiffyTitles: calling default handler for %s" % (url))
|
|
default_template = Template(
|
|
self.registryValue("default.template", channel=channel)
|
|
)
|
|
(title, is_redirect) = self.get_source_by_url(url, channel)
|
|
if title:
|
|
title_template = default_template.render(
|
|
title=title, redirect=is_redirect
|
|
)
|
|
return title_template
|
|
else:
|
|
return
|
|
else:
|
|
log.debug(
|
|
"SpiffyTitles: default handler fired but doing nothing because disabled"
|
|
)
|
|
|
|
def get_title_by_url(self, url, channel, origin_nick=None):
|
|
"""
|
|
Retrieves the title of a website based on the URL provided
|
|
"""
|
|
info = urlparse(url)
|
|
domain = info.netloc
|
|
title = None
|
|
"""
|
|
Check if we have this link cached according to the cache lifetime. If so, serve
|
|
link from the cache instead of calling handlers.
|
|
"""
|
|
if self.registryValue("cacheGlobal"):
|
|
channel = "global"
|
|
cached_link = self.get_link_from_cache(url, channel)
|
|
if cached_link:
|
|
title = cached_link["title"]
|
|
else:
|
|
if domain in self.handlers:
|
|
handler = self.handlers[domain]
|
|
title = handler(url, info, channel)
|
|
else:
|
|
base_domain = self.get_base_domain("http://" + domain)
|
|
if base_domain in self.handlers:
|
|
handler = self.handlers[base_domain]
|
|
title = handler(url, info, channel)
|
|
else:
|
|
if self.registryValue("default.enabled", channel):
|
|
title = self.handler_default(url, channel)
|
|
if title and not cached_link:
|
|
title = self.get_formatted_title(title, channel)
|
|
# Update link cache
|
|
log.debug("SpiffyTitles: caching %s" % (url))
|
|
now = datetime.datetime.now()
|
|
if channel not in self.link_cache:
|
|
self.link_cache[channel] = []
|
|
self.link_cache[channel].append(
|
|
{
|
|
"url": url,
|
|
"timestamp": now,
|
|
"title": title,
|
|
"from": origin_nick,
|
|
"channel": channel,
|
|
}
|
|
)
|
|
elif title and cached_link:
|
|
self.link_cache[channel].append(cached_link)
|
|
log.debug("SpiffyTitles: serving link from cache: %s" % (url))
|
|
return title
|
|
|
|
def get_link_from_cache(self, url, channel):
|
|
"""
|
|
Looks for a URL in the link cache and returns info about if it's not stale
|
|
according to the configured cache lifetime, or None. If cacheLifetime is 0,
|
|
then cache is disabled and we can immediately return
|
|
"""
|
|
cache_lifetime_in_seconds = int(self.registryValue("cacheLifetime"))
|
|
if cache_lifetime_in_seconds == 0:
|
|
return
|
|
# No cache yet
|
|
if len(self.link_cache) == 0:
|
|
return
|
|
cached_link = None
|
|
now = datetime.datetime.now()
|
|
stale = False
|
|
seconds = 0
|
|
if channel in self.link_cache:
|
|
for i in range(len(self.link_cache[channel])):
|
|
if self.link_cache[channel][i]["url"] == url:
|
|
cached_link = self.link_cache[channel].pop(i)
|
|
break
|
|
# Found link, check timestamp
|
|
if cached_link:
|
|
seconds = (now - cached_link["timestamp"]).total_seconds()
|
|
if seconds >= cache_lifetime_in_seconds:
|
|
stale = True
|
|
if stale:
|
|
log.debug("SpiffyTitles: %s was sent %s seconds ago" % (url, seconds))
|
|
else:
|
|
return cached_link
|
|
|
|
def is_channel_allowed(self, channel):
|
|
"""
|
|
Checks channel whitelist and blacklist to determine if the current
|
|
channel is allowed to display titles.
|
|
"""
|
|
channel = channel.lower()
|
|
is_allowed = False
|
|
white_list = self.filter_empty(self.registryValue("channelWhitelist"))
|
|
black_list = self.filter_empty(self.registryValue("channelBlacklist"))
|
|
white_list_empty = len(white_list) == 0
|
|
black_list_empty = len(black_list) == 0
|
|
# Most basic case: both white and blacklist are empty. Any channel is allowed.
|
|
if white_list_empty and black_list_empty:
|
|
is_allowed = True
|
|
# If there is a white list, blacklist is ignored.
|
|
if white_list:
|
|
is_allowed = channel in white_list
|
|
# Finally, check blacklist
|
|
if not white_list and black_list:
|
|
is_allowed = channel not in black_list
|
|
return is_allowed
|
|
|
|
def filter_empty(self, input):
|
|
"""
|
|
Remove all empty strings from a list
|
|
"""
|
|
return set([channel for channel in input if len(channel.strip())])
|
|
|
|
def is_ignored_domain(self, domain, channel):
|
|
"""
|
|
Checks domain against a regular expression
|
|
"""
|
|
pattern = self.registryValue("ignoredDomainPattern", channel=channel)
|
|
if pattern:
|
|
log.debug("SpiffyTitles: matching %s against %s" % (domain, str(pattern)))
|
|
try:
|
|
pattern_search_result = re.search(pattern, domain)
|
|
if pattern_search_result:
|
|
match = pattern_search_result.group()
|
|
return match
|
|
except re.error:
|
|
log.error("SpiffyTitles: invalid regular expression: %s" % (pattern))
|
|
|
|
def is_whitelisted_domain(self, domain, channel):
|
|
"""
|
|
Checks domain against a regular expression
|
|
"""
|
|
pattern = self.registryValue("whitelistDomainPattern", channel=channel)
|
|
if pattern:
|
|
log.debug("SpiffyTitles: matching %s against %s" % (domain, str(pattern)))
|
|
try:
|
|
pattern_search_result = re.search(pattern, domain)
|
|
if pattern_search_result:
|
|
match = pattern_search_result.group()
|
|
return match
|
|
except re.error:
|
|
log.error("SpiffyTitles: invalid regular expression: %s" % (pattern))
|
|
|
|
def get_formatted_title(self, title, channel):
|
|
"""
|
|
Remove cruft from title and apply bold if applicable
|
|
"""
|
|
use_bold = self.registryValue("useBold", channel=channel)
|
|
# Replace anywhere in string
|
|
title = re.sub(r"\s+", " ", title)
|
|
if use_bold:
|
|
title = ircutils.bold(title).strip()
|
|
return title
|
|
|
|
def get_title_from_html(self, html, channel):
|
|
"""
|
|
Retrieves value of <title> tag from HTML
|
|
"""
|
|
title = None
|
|
soup = BeautifulSoup(html)
|
|
if soup:
|
|
try:
|
|
title = soup.title.string.strip()
|
|
except:
|
|
title = self.registryValue("badLinkText", channel=channel)
|
|
return title
|
|
|
|
def get_source_by_url(self, url, channel, retries=1):
|
|
"""
|
|
Get the HTML of a website based on a URL
|
|
"""
|
|
max_retries = self.registryValue("maxRetries")
|
|
if retries >= max_retries:
|
|
log.debug("SpiffyTitles: hit maximum retries for %s" % url)
|
|
return (None, False)
|
|
log.debug("SpiffyTitles: attempt #%s for %s" % (retries, url))
|
|
is_redirect = False
|
|
try:
|
|
headers = self.get_headers(channel)
|
|
log.debug("SpiffyTitles: requesting %s" % (url))
|
|
with requests.get(
|
|
url,
|
|
headers=headers,
|
|
timeout=self.timeout,
|
|
allow_redirects=True,
|
|
stream=True,
|
|
) as request:
|
|
request.raise_for_status()
|
|
if request.history:
|
|
# check the top two domain levels
|
|
link_domain = self.get_base_domain(request.history[0].url)
|
|
real_domain = self.get_base_domain(request.url)
|
|
if link_domain != real_domain:
|
|
is_redirect = True
|
|
for redir in request.history:
|
|
log.debug(
|
|
"SpiffyTitles: Redirect %s from %s"
|
|
% (redir.status_code, redir.url)
|
|
)
|
|
log.debug("SpiffyTitles: Final url %s" % (request.url))
|
|
info = urlparse(request.url)
|
|
domain = info.netloc
|
|
is_ignored = self.is_ignored_domain(domain, channel)
|
|
if is_ignored:
|
|
log.debug(
|
|
"SpiffyTitles: URL ignored due to domain blacklist"
|
|
" match: %s"
|
|
% url
|
|
)
|
|
return
|
|
whitelist_pattern = self.registryValue(
|
|
"whitelistDomainPattern", channel=channel
|
|
)
|
|
is_whitelisted_domain = self.is_whitelisted_domain(
|
|
domain, channel
|
|
)
|
|
if whitelist_pattern and not is_whitelisted_domain:
|
|
log.debug(
|
|
"SpiffyTitles: URL ignored due to domain whitelist"
|
|
" mismatch: %s"
|
|
% url
|
|
)
|
|
return
|
|
text = self.get_title_by_url(request.url, channel)
|
|
text = text.lstrip("\x02").lstrip("^").strip()
|
|
return (text, is_redirect)
|
|
# Check the content type
|
|
content_type = request.headers.get("content-type").split(";")[0].strip()
|
|
acceptable_types = self.registryValue("default.mimeTypes")
|
|
log.debug("SpiffyTitles: content type %s" % (content_type))
|
|
if content_type in acceptable_types:
|
|
text = request.content
|
|
if text:
|
|
return (self.get_title_from_html(text, channel), is_redirect)
|
|
else:
|
|
log.debug("SpiffyTitles: empty content from %s" % (url))
|
|
else:
|
|
log.debug(
|
|
"SpiffyTitles: unacceptable mime type %s for url %s"
|
|
% (content_type, url)
|
|
)
|
|
size = request.headers.get("content-length")
|
|
if size:
|
|
size = self.get_readable_file_size(int(size))
|
|
file_template = self.registryValue(
|
|
"default.fileTemplate", channel=channel
|
|
)
|
|
text = Template(file_template).render(
|
|
{"type": content_type, "size": size}
|
|
)
|
|
return (text, is_redirect)
|
|
except requests.exceptions.MissingSchema as e:
|
|
url_wschema = "http://%s" % (url)
|
|
log.error("SpiffyTitles missing schema. Retrying with %s" % (url_wschema))
|
|
info = urlparse(url_wschema)
|
|
if self.is_ignored_domain(info.netloc, channel):
|
|
return
|
|
else:
|
|
return self.get_source_by_url(url_wschema, channel)
|
|
except requests.exceptions.Timeout as e:
|
|
log.error("SpiffyTitles Timeout: %s" % (str(e)))
|
|
self.get_source_by_url(url, channel, retries + 1)
|
|
except requests.exceptions.ConnectionError as e:
|
|
log.error("SpiffyTitles ConnectionError: %s" % (str(e)))
|
|
self.get_source_by_url(url, channel, retries + 1)
|
|
except requests.exceptions.HTTPError as e:
|
|
log.error("SpiffyTitles HTTPError: %s" % (str(e)))
|
|
text = self.registryValue("badLinkText", channel=channel)
|
|
return (text, is_redirect)
|
|
except requests.exceptions.InvalidURL as e:
|
|
log.error("SpiffyTitles InvalidURL: %s" % (str(e)))
|
|
text = self.registryValue("badLinkText", channel=channel)
|
|
return (text, is_redirect)
|
|
return (None, False)
|
|
|
|
def get_base_domain(self, url):
|
|
"""
|
|
Returns the FQDN comprising the top two domain levels
|
|
"""
|
|
return ".".join(urlparse(url).netloc.rsplit(".", 2)[-2:])
|
|
|
|
def get_headers(self, channel):
|
|
agent = self.get_user_agent()
|
|
language = self.registryValue("default.language", channel)
|
|
headers = {
|
|
"User-Agent": agent,
|
|
"Accept-Language": "{0}".format(language),
|
|
}
|
|
return headers
|
|
|
|
def get_user_agent(self):
|
|
"""
|
|
Returns a random user agent from the ones available
|
|
"""
|
|
agents = self.registryValue("default.userAgents")
|
|
return random.choice(agents)
|
|
|
|
def message_matches_ignore_pattern(self, input, channel):
|
|
"""
|
|
Checks message against ignoredMessagePattern to determine
|
|
whether the message should be ignored.
|
|
"""
|
|
match = False
|
|
pattern = self.registryValue("ignoredMessagePattern")
|
|
if pattern:
|
|
match = re.search(pattern, input)
|
|
return match
|
|
|
|
def title_matches_ignore_pattern(self, input, channel):
|
|
"""
|
|
Checks message against ignoredTitlePattern to determine
|
|
whether the title should be ignored.
|
|
"""
|
|
match = False
|
|
pattern = self.registryValue("ignoredTitlePattern", channel=channel)
|
|
if pattern:
|
|
match = re.search(pattern, input)
|
|
if match:
|
|
log.debug(
|
|
"SpiffyTitles: title %s matches ignoredTitlePattern for %s"
|
|
% (input, channel)
|
|
)
|
|
return match
|
|
|
|
def get_urls_from_message(self, input, channel):
|
|
"""
|
|
Find the first string that looks like a URL from the message
|
|
"""
|
|
try:
|
|
url_re = self.registryValue("urlRegularExpression", channel)
|
|
except:
|
|
url_re = self.registryValue("urlRegularExpression")
|
|
if not url_re:
|
|
url_re = utils.web._httpUrlRe
|
|
matches = re.findall(url_re, input)
|
|
return matches
|
|
|
|
def remove_control_characters(self, s):
|
|
return "".join(ch for ch in s if unicodedata.category(ch)[0] != "C")
|
|
|
|
def user_has_capability(self, msg):
|
|
channel = msg.args[0]
|
|
mask = msg.prefix
|
|
required_capability = self.registryValue("requireCapability", channel=channel)
|
|
cap = ircdb.makeChannelCapability(channel, required_capability)
|
|
has_cap = ircdb.checkCapability(mask, cap, ignoreDefaultAllow=True)
|
|
if has_cap:
|
|
log.debug(
|
|
"SpiffyTitles: %s has required capability '%s'"
|
|
% (mask, required_capability)
|
|
)
|
|
else:
|
|
log.debug(
|
|
"SpiffyTitles: %s does NOT have required capability '%s'"
|
|
% (mask, required_capability)
|
|
)
|
|
return has_cap
|
|
|
|
def get_readable_file_size(self, num, suffix="B"):
|
|
"""
|
|
Returns human readable file size
|
|
"""
|
|
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
|
|
if abs(num) < 1024.0:
|
|
return "%3.1f%s%s" % (num, unit, suffix)
|
|
num /= 1024.0
|
|
return "%.1f%s%s" % (num, "Yi", suffix)
|
|
|
|
def get_template(self, handler_template, channel):
|
|
"""
|
|
Returns the requested template object.
|
|
"""
|
|
template = Template(self.registryValue(handler_template, channel=channel))
|
|
return template
|
|
|
|
def handler_dailymotion(self, url, info, channel):
|
|
"""
|
|
Handles dailymotion links
|
|
"""
|
|
dailymotion_handler_enabled = self.registryValue(
|
|
"dailymotion.enabled", channel=channel
|
|
)
|
|
if not dailymotion_handler_enabled:
|
|
return self.handler_default(url, channel)
|
|
log.debug("SpiffyTitles: calling dailymotion handler for %s" % url)
|
|
title = None
|
|
video_id = None
|
|
""" Get video ID """
|
|
if "/video/" in info.path:
|
|
video_id = info.path.lstrip("/video/").split("_")[0]
|
|
elif "dai.ly" in url:
|
|
video_id = url.split("/")[-1].split("?")[0]
|
|
if not video_id:
|
|
log.error(
|
|
"SpiffyTitles: unable to get dailymotion video_id for URL: %s" % url
|
|
)
|
|
return self.handler_default(url, channel)
|
|
fields = "id,title,owner.screenname,duration,views_total"
|
|
api_url = "https://api.dailymotion.com/video/%s?fields=%s" % (video_id, fields,)
|
|
log.debug("SpiffyTitles: looking up dailymotion info: %s", api_url)
|
|
try:
|
|
request = requests.get(api_url, timeout=self.timeout)
|
|
request.raise_for_status()
|
|
except (
|
|
requests.exceptions.RequestException,
|
|
requests.exceptions.HTTPError,
|
|
) as e:
|
|
log.error("SpiffyTitles: dailymotion error: {0}".format(e))
|
|
return self.handler_default(url, channel)
|
|
response = json.loads(request.content.decode())
|
|
if response and "title" in response:
|
|
video = response
|
|
dailymotion_template = Template(
|
|
self.registryValue("dailymotion.template", channel=channel)
|
|
)
|
|
video["views_total"] = "{:,}".format(int(video["views_total"]))
|
|
video["duration"] = self.get_duration_from_seconds(video["duration"])
|
|
video["ownerscreenname"] = video["owner.screenname"]
|
|
title = dailymotion_template.render(video)
|
|
else:
|
|
log.debug(
|
|
"SpiffyTitles: received unexpected payload from video: %s" % api_url
|
|
)
|
|
if not title:
|
|
log.debug("SpiffyTitles: could not get dailymotion info for %s" % url)
|
|
return self.handler_default(url, channel)
|
|
else:
|
|
return title
|
|
|
|
def handler_vimeo(self, url, domain, channel):
|
|
"""
|
|
Handles Vimeo links
|
|
"""
|
|
vimeo_handler_enabled = self.registryValue("vimeo.enabled", channel=channel)
|
|
if not vimeo_handler_enabled:
|
|
return self.handler_default(url, channel)
|
|
log.debug("SpiffyTitles: calling vimeo handler for %s" % url)
|
|
title = None
|
|
video_id = None
|
|
""" Get video ID """
|
|
result = re.search(r"^(http(s)://)?(www\.)?(vimeo\.com/)?(\d+)", url)
|
|
if result:
|
|
video_id = result.group(5)
|
|
if not video_id:
|
|
log.error("SpiffyTitles: Failef to get Vimeo video ID from URL: %s" % url)
|
|
return self.handler_default(url, channel)
|
|
api_url = "https://vimeo.com/api/v2/video/%s.json" % video_id
|
|
log.debug("SpiffyTitles: looking up vimeo info: %s", api_url)
|
|
try:
|
|
request = requests.get(api_url, timeout=self.timeout)
|
|
request.raise_for_status()
|
|
except (
|
|
requests.exceptions.RequestException,
|
|
requests.exceptions.HTTPError,
|
|
) as e:
|
|
log.error("SpiffyTitles: vimeo error: {0}".format(e))
|
|
return self.handler_default(url, channel)
|
|
response = json.loads(request.content.decode())
|
|
if response and "title" in response[0]:
|
|
video = response[0]
|
|
vimeo_template = Template(
|
|
self.registryValue("vimeo.template", channel=channel)
|
|
)
|
|
"""
|
|
Some videos do not have this information available
|
|
"""
|
|
if "stats_number_of_plays" in video:
|
|
int_plays = int(video["stats_number_of_plays"])
|
|
video["stats_number_of_plays"] = "{:,}".format(int_plays)
|
|
else:
|
|
video["stats_number_of_plays"] = 0
|
|
if "stats_number_of_comments" in video:
|
|
int_comments = int(video["stats_number_of_comments"])
|
|
video["stats_number_of_comments"] = "{:,}".format(int_comments)
|
|
else:
|
|
video["stats_number_of_comments"] = 0
|
|
video["duration"] = self.get_duration_from_seconds(video["duration"])
|
|
title = vimeo_template.render(video)
|
|
else:
|
|
log.debug(
|
|
"SpiffyTitles: received unexpected payload from video: %s" % api_url
|
|
)
|
|
if not title:
|
|
log.debug("SpiffyTitles: could not get vimeo info for %s" % url)
|
|
return self.handler_default(url, channel)
|
|
else:
|
|
return title
|
|
|
|
def handler_coub(self, url, domain, channel):
|
|
"""
|
|
Handles coub.com links
|
|
"""
|
|
coub_handler_enabled = self.registryValue("coub.enabled", channel=channel)
|
|
if not coub_handler_enabled:
|
|
return self.handler_default(url, channel)
|
|
log.debug("SpiffyTitles: calling coub handler for %s" % url)
|
|
title = None
|
|
video_id = None
|
|
""" Get video ID """
|
|
if "/view/" in url:
|
|
video_id = url.split("/view/")[1]
|
|
""" Remove any query strings """
|
|
if "?" in video_id:
|
|
video_id = video_id.split("?")[0]
|
|
if not video_id:
|
|
log.error("SpiffyTitles: Failed to get Coub video ID for URL: %s" % url)
|
|
return self.handler_default(url, channel)
|
|
api_url = "http://coub.com/api/v2/coubs/%s" % video_id
|
|
try:
|
|
request = requests.get(api_url, timeout=self.timeout)
|
|
request.raise_for_status()
|
|
except (
|
|
requests.exceptions.RequestException,
|
|
requests.exceptions.HTTPError,
|
|
) as e:
|
|
log.error("SpiffyTitles: coub error: {0}".format(e))
|
|
return self.handler_default(url, channel)
|
|
response = json.loads(request.content.decode())
|
|
if response:
|
|
video = response
|
|
coub_template = Template(self.registryValue("coub.template"))
|
|
video["likes_count"] = "{:,}".format(int(video["likes_count"]))
|
|
video["recoubs_count"] = "{:,}".format(int(video["recoubs_count"]))
|
|
video["views_count"] = "{:,}".format(int(video["views_count"]))
|
|
title = coub_template.render(video)
|
|
if not title:
|
|
log.debug("SpiffyTitles: Coub handler failed to get data for URL: %s" % url)
|
|
return self.handler_default(url, channel)
|
|
else:
|
|
return title
|
|
|
|
def get_video_id_from_url(self, url, info):
|
|
"""
|
|
Get YouTube video ID from URL
|
|
"""
|
|
try:
|
|
path = info.path
|
|
domain = info.netloc
|
|
video_id = ""
|
|
if domain == "youtu.be":
|
|
video_id = path.split("/")[1]
|
|
else:
|
|
parsed = parse_qsl(info.query)
|
|
params = dict(parsed)
|
|
if "v" in params:
|
|
video_id = params["v"]
|
|
if video_id:
|
|
return video_id
|
|
else:
|
|
log.error("SpiffyTitles: error getting video id from %s" % (url))
|
|
except IndexError as e:
|
|
log.error(
|
|
"SpiffyTitles: error getting video id from %s (%s)" % (url, str(e))
|
|
)
|
|
|
|
def handler_youtube(self, url, domain, channel):
|
|
"""
|
|
Uses the Youtube API to provide additional meta data about
|
|
Youtube Video links posted.
|
|
"""
|
|
youtube_handler_enabled = self.registryValue("youtube.enabled", channel)
|
|
if not youtube_handler_enabled:
|
|
return self.handler_default(url, channel)
|
|
developer_key = self.registryValue("youtube.developerKey")
|
|
if not developer_key:
|
|
log.info(
|
|
"SpiffyTitles: no Youtube developer key set! Check the documentation "
|
|
"for instructions."
|
|
)
|
|
return None
|
|
log.debug("SpiffyTitles: calling Youtube handler for %s" % (url))
|
|
video_id = self.get_video_id_from_url(url, domain)
|
|
if not video_id:
|
|
log.debug(
|
|
"SpiffyTitles: Failed to get YouTube video ID for URL: {0}".format(url)
|
|
)
|
|
return self.handler_default(url, channel)
|
|
yt_template = Template(self.registryValue("youtube.template", channel))
|
|
title = ""
|
|
options = {
|
|
"part": "snippet,statistics,contentDetails",
|
|
"maxResults": 1,
|
|
"key": developer_key,
|
|
"id": video_id,
|
|
}
|
|
api_url = "https://www.googleapis.com/youtube/v3/videos"
|
|
log.debug("SpiffyTitles: requesting %s" % (api_url))
|
|
try:
|
|
request = requests.get(api_url, params=options, timeout=self.timeout)
|
|
request.raise_for_status()
|
|
except (
|
|
requests.exceptions.RequestException,
|
|
requests.exceptions.HTTPError,
|
|
) as e:
|
|
log.error("SpiffyTitiles: YouTube Error: {0}".format(e))
|
|
return self.handler_default(url, channel)
|
|
response = json.loads(request.content.decode())
|
|
if not response or not response.get("items"):
|
|
log.error("SpiffyTitles: Failed to parse YouTube JSON response")
|
|
return self.handler_default(url, channel)
|
|
try:
|
|
items = response["items"]
|
|
video = items[0]
|
|
snippet = video["snippet"]
|
|
title = snippet["title"]
|
|
statistics = video["statistics"]
|
|
view_count = 0
|
|
like_count = 0
|
|
dislike_count = 0
|
|
comment_count = 0
|
|
favorite_count = 0
|
|
if "viewCount" in statistics:
|
|
view_count = "{:,}".format(int(statistics["viewCount"]))
|
|
if "likeCount" in statistics:
|
|
like_count = "{:,}".format(int(statistics["likeCount"]))
|
|
if "dislikeCount" in statistics:
|
|
dislike_count = "{:,}".format(int(statistics["dislikeCount"]))
|
|
if "favoriteCount" in statistics:
|
|
favorite_count = "{:,}".format(int(statistics["favoriteCount"]))
|
|
if "commentCount" in statistics:
|
|
comment_count = "{:,}".format(int(statistics["commentCount"]))
|
|
channel_title = snippet["channelTitle"]
|
|
video_duration = video["contentDetails"]["duration"]
|
|
duration_seconds = self.get_total_seconds_from_duration(video_duration)
|
|
"""
|
|
#23 - If duration is zero, then it"s a LIVE video
|
|
"""
|
|
if duration_seconds > 0:
|
|
duration = self.get_duration_from_seconds(duration_seconds)
|
|
else:
|
|
duration = "LIVE"
|
|
published = snippet["publishedAt"].split("T")[0]
|
|
timestamp = self.get_timestamp_from_youtube_url(url)
|
|
yt_logo = self.get_youtube_logo(channel)
|
|
compiled_template = yt_template.render(
|
|
{
|
|
"title": title,
|
|
"duration": duration,
|
|
"timestamp": timestamp,
|
|
"view_count": view_count,
|
|
"like_count": like_count,
|
|
"dislike_count": dislike_count,
|
|
"comment_count": comment_count,
|
|
"favorite_count": favorite_count,
|
|
"channel_title": channel_title,
|
|
"published": published,
|
|
"yt_logo": yt_logo,
|
|
}
|
|
)
|
|
title = compiled_template
|
|
except IndexError as e:
|
|
log.error(
|
|
"SpiffyTitles: IndexError. Youtube API JSON response: %s" % (str(e))
|
|
)
|
|
# If we found a title, return that. otherwise, use default handler
|
|
if title:
|
|
return title
|
|
else:
|
|
log.debug("SpiffyTitles: falling back to default handler")
|
|
return self.handler_default(url, channel)
|
|
|
|
def get_duration_from_seconds(self, duration_seconds):
|
|
m, s = divmod(duration_seconds, 60)
|
|
h, m = divmod(m, 60)
|
|
duration = "%02d:%02d" % (m, s)
|
|
""" Only include hour if the video is at least 1 hour long """
|
|
if h > 0:
|
|
duration = "%02d:%s" % (h, duration)
|
|
return duration
|
|
|
|
def get_youtube_logo(self, channel):
|
|
use_bold = self.registryValue("useBold", channel)
|
|
if use_bold:
|
|
yt_logo = "{0}\x0F\x02".format(self.registryValue("youtube.logo", channel))
|
|
else:
|
|
yt_logo = "{0}\x0F".format(self.registryValue("youtube.logo", channel))
|
|
return yt_logo
|
|
|
|
def get_total_seconds_from_duration(self, input):
|
|
"""
|
|
Duration comes in a format like this: PT4M41S which translates to
|
|
4 minutes and 41 seconds. This method returns the total seconds
|
|
so that the duration can be parsed as usual.
|
|
"""
|
|
regex = re.compile(
|
|
r"""
|
|
(?P<sign> -?) P
|
|
(?:(?P<years> \d+) Y)?
|
|
(?:(?P<months> \d+) M)?
|
|
(?:(?P<days> \d+) D)?
|
|
(?: T
|
|
(?:(?P<hours> \d+) H)?
|
|
(?:(?P<minutes>\d+) M)?
|
|
(?:(?P<seconds>\d+) S)?
|
|
)?
|
|
""",
|
|
re.VERBOSE,
|
|
)
|
|
duration = regex.match(input).groupdict(0)
|
|
delta = datetime.timedelta(
|
|
hours=int(duration["hours"]),
|
|
minutes=int(duration["minutes"]),
|
|
seconds=int(duration["seconds"]),
|
|
)
|
|
return delta.total_seconds()
|
|
|
|
def get_timestamp_from_youtube_url(self, url):
|
|
"""
|
|
Get YouTube timestamp
|
|
"""
|
|
pattern = r"[?&]t=([^&]+)"
|
|
match = re.search(pattern, url)
|
|
if match:
|
|
timestamp = match.group(1).upper()
|
|
try:
|
|
seconds = float(timestamp)
|
|
except ValueError:
|
|
seconds = self.get_total_seconds_from_duration("PT" + timestamp)
|
|
if seconds > 0:
|
|
return self.get_duration_from_seconds(seconds)
|
|
else:
|
|
return ""
|
|
|
|
def handler_twitch(self, url, domain, channel):
|
|
"""
|
|
Queries twitch API for additional information about twitch links.
|
|
This handler is for (www.)twitch.tv
|
|
"""
|
|
twitch_handler_enabled = self.registryValue("twitch.enabled", channel=channel)
|
|
if not twitch_handler_enabled:
|
|
return self.handler_default(url, channel)
|
|
twitch_client_id = self.registryValue("twitch.clientID")
|
|
if not twitch_client_id:
|
|
log.error("SpiffyTitles: Please set your Twitch client ID")
|
|
return self.handler_default(url, channel)
|
|
url = url.split("?")[0]
|
|
self.log.debug("SpiffyTitles: calling twitch handler for %s" % (url))
|
|
patterns = {
|
|
"video": {
|
|
"pattern": r"^http(s)?:\/\/(www\.|go\.|player\.)?twitch\.tv\/"
|
|
r"(videos/|\?video=v)(?P<video_id>[0-9]+)",
|
|
"url": "https://api.twitch.tv/helix/videos?id={video_id}",
|
|
},
|
|
"clip": {
|
|
"pattern": r"(http(s)?:\/\/clips\.twitch\.tv\/|http(s)?:\/\/"
|
|
r"www\.twitch\.tv\/([^\/]+)\/clip\/)(?P<clip>.+)$",
|
|
"url": "https://api.twitch.tv/helix/clips?id={clip}",
|
|
},
|
|
"channel": {
|
|
"pattern": r"^http(s)?:\/\/(www\.|go\.)?twitch\.tv\/"
|
|
r"(?P<channel_name>[^\/]+)",
|
|
"url": "https://api.twitch.tv/helix/streams?user_login={channel_name}",
|
|
},
|
|
}
|
|
for name in patterns:
|
|
match = re.search(patterns[name]["pattern"], url)
|
|
if match:
|
|
link_type = name
|
|
link_info = match.groupdict()
|
|
data_url = patterns[name]["url"].format(**link_info)
|
|
break
|
|
if not match:
|
|
self.log.debug("SpiffyTitles: twitch - no title found.")
|
|
return self.handler_default(url, channel)
|
|
headers = {"Client-ID": twitch_client_id}
|
|
self.log.debug("SpiffyTitles: twitch - requesting %s" % (data_url))
|
|
try:
|
|
request = requests.get(data_url, timeout=self.timeout, headers=headers)
|
|
request.raise_for_status()
|
|
except (
|
|
requests.exceptions.RequestException,
|
|
requests.exceptions.HTTPError,
|
|
) as e:
|
|
log.error("SpiffytTitles: Twitch Error: {0}".format(e))
|
|
return self.handler_default(url, channel)
|
|
data = {}
|
|
try:
|
|
response = json.loads(request.content.decode())
|
|
except:
|
|
response = None
|
|
if not response:
|
|
log.error("SpiffyTitles: Error parsing Twitch.TV JSON response")
|
|
return self.handler_default(url, channel)
|
|
self.log.debug("SpiffyTitles: twitch - got response:\n%s" % (response))
|
|
if "error" in response:
|
|
log.error("SpiffyTitles: Twitch Error: {0}".format(response["error"]))
|
|
return self.handler_default(url, channel)
|
|
if link_type == "channel":
|
|
self.log.debug("SpiffyTitles: Twitch: link_type is channel")
|
|
if "data" in response and response["data"]:
|
|
self.log.debug("SpiffyTitles: Twitch: Got data[0]")
|
|
data = response["data"][0]
|
|
link_type = "stream"
|
|
else:
|
|
self.log.debug("SpiffyTitles: Twitch: No data[0]")
|
|
data_url = "https://api.twitch.tv/helix/users?login={channel_name}".format(
|
|
**link_info
|
|
)
|
|
try:
|
|
request = requests.get(data_url, timeout=self.timeout, headers=headers)
|
|
request.raise_for_status()
|
|
except (
|
|
requests.exceptions.RequestException,
|
|
requests.exceptions.HTTPError,
|
|
) as e:
|
|
log.error("SpiffyTitles: Twitch Error: {0}".format(e))
|
|
return self.handler_default(url, channel)
|
|
user_data = {}
|
|
try:
|
|
response = json.loads(request.content.decode())
|
|
user_data = response["data"][0]
|
|
display_name = user_data["display_name"]
|
|
description = user_data["description"]
|
|
view_count = user_data["view_count"]
|
|
except Exception as e:
|
|
log.error(
|
|
"SpiffyTitles: KeyError parsing Twitch.TV JSON response: %s"
|
|
% (str(e))
|
|
)
|
|
elif link_type == "video":
|
|
data = response
|
|
elif link_type == "clip":
|
|
data = response
|
|
if not data or not user_data:
|
|
log.error("SpiffyTitles: Twitch: Failed to get data from Twitch API")
|
|
return self.handler_default(url, channel)
|
|
log.debug("SpiffyTitles: twitch - Got data '%s'" % (data))
|
|
twitch_template = self.get_template(
|
|
"".join(["twitch.", link_type, "Template"]), channel
|
|
)
|
|
twitch_logo = self.get_twitch_logo(channel)
|
|
if not twitch_template:
|
|
self.log.debug("SpiffyTitles - twitch: bad template for %s" % (link_type))
|
|
log.error("SpiffyTitles: Twitch: Got data, but template was bad")
|
|
return self.handler_default(url, channel)
|
|
if link_type == "stream":
|
|
display_name = data["user_name"]
|
|
game_id = data["game_id"]
|
|
game_name = game_id
|
|
title = data["title"]
|
|
view_count = data["viewer_count"]
|
|
created_at = self._time_created_at(data["started_at"])
|
|
if game_id:
|
|
get_game = requests.get(
|
|
"https://api.twitch.tv/helix/games?id={}".format(game_id),
|
|
timeout=self.timeout,
|
|
headers=headers,
|
|
)
|
|
game_data = json.loads(get_game.content.decode())
|
|
game_name = game_data["data"][0]["name"]
|
|
template_vars = {
|
|
"display_name": display_name,
|
|
"game_name": game_name,
|
|
"title": title,
|
|
"view_count": view_count,
|
|
"description": description,
|
|
"created_at": created_at,
|
|
"twitch_logo": twitch_logo,
|
|
}
|
|
reply = twitch_template.render(template_vars)
|
|
elif link_type == "clip":
|
|
data = response["data"][0]
|
|
display_name = data["broadcaster_name"]
|
|
data_url = "https://api.twitch.tv/helix/users?login={}".format(display_name)
|
|
try:
|
|
request = requests.get(data_url, timeout=self.timeout, headers=headers)
|
|
request.raise_for_status()
|
|
except (
|
|
requests.exceptions.RequestException,
|
|
requests.exceptions.HTTPError,
|
|
) as e:
|
|
log.error("SpiffyTitles: Twitch Error: {0}".format(e))
|
|
return self.handler_default(url, channel)
|
|
user_data = {}
|
|
try:
|
|
response = json.loads(request.content.decode())
|
|
user_data = response["data"][0]
|
|
description = user_data["description"]
|
|
game_id = data["game_id"]
|
|
game_name = game_id
|
|
title = data["title"]
|
|
view_count = data["view_count"]
|
|
created_at = self._time_created_at(data["created_at"])
|
|
if game_id:
|
|
get_game = requests.get(
|
|
"https://api.twitch.tv/helix/games?id={}".format(game_id),
|
|
timeout=self.timeout,
|
|
headers=headers,
|
|
)
|
|
game_data = json.loads(get_game.content.decode())
|
|
game_name = game_data["data"][0]["name"]
|
|
template_vars = {
|
|
"display_name": display_name,
|
|
"game_name": game_name,
|
|
"title": title,
|
|
"view_count": view_count,
|
|
"description": description,
|
|
"created_at": created_at,
|
|
"twitch_logo": twitch_logo,
|
|
}
|
|
reply = twitch_template.render(template_vars)
|
|
except Exception as e:
|
|
self.log.error(
|
|
"SpiffyTitles: Error parsing Twitch.TV JSON response: %s" % (str(e))
|
|
)
|
|
elif link_type == "video":
|
|
data = response["data"][0]
|
|
display_name = data["user_name"]
|
|
data_url = "https://api.twitch.tv/helix/users?login={}".format(display_name)
|
|
try:
|
|
request = requests.get(data_url, timeout=self.timeout, headers=headers)
|
|
except (
|
|
requests.exceptions.RequestException,
|
|
requests.exceptions.HTTPError,
|
|
) as e:
|
|
log.error("SpiffyTitles: Twitch Error: {0}".format(e))
|
|
return self.handler_default(url, channel)
|
|
user_data = {}
|
|
try:
|
|
response = json.loads(request.content.decode())
|
|
user_data = response["data"][0]
|
|
description = user_data["description"]
|
|
title = data["title"]
|
|
view_count = data["view_count"]
|
|
created_at = self._time_created_at(data["created_at"])
|
|
duration = data["duration"]
|
|
template_vars = {
|
|
"display_name": display_name,
|
|
"title": title,
|
|
"view_count": view_count,
|
|
"created_at": created_at,
|
|
"description": description,
|
|
"duration": duration,
|
|
"twitch_logo": twitch_logo,
|
|
}
|
|
reply = twitch_template.render(template_vars)
|
|
except Exception as e:
|
|
self.log.error(
|
|
"SpiffyTitles: Error parsing Twitch.TV JSON response: %s" % (str(e))
|
|
)
|
|
else:
|
|
try:
|
|
template_vars = {
|
|
"display_name": display_name,
|
|
"description": description,
|
|
"view_count": view_count,
|
|
"twitch_logo": twitch_logo,
|
|
}
|
|
reply = twitch_template.render(template_vars)
|
|
except Exception as e:
|
|
self.log.error(
|
|
"SpiffyTitles: Error parsing Twitch.TV JSON response: %s" % (str(e))
|
|
)
|
|
return reply
|
|
|
|
def _time_created_at(self, s):
|
|
"""
|
|
Return relative time delta between now and s (dt string).
|
|
"""
|
|
|
|
try: # timeline's created_at Tue May 08 10:58:49 +0000 2012
|
|
ddate = time.strptime(s, "%Y-%m-%dT%H:%M:%SZ")[:-2]
|
|
except ValueError:
|
|
try: # search's created_at Thu, 06 Oct 2011 19:41:12 +0000
|
|
ddate = time.strptime(s, "%a, %d %b %Y %H:%M:%S +0000")[:-2]
|
|
except ValueError:
|
|
return s
|
|
# do the math
|
|
d = datetime.datetime.now() - datetime.datetime(*ddate, tzinfo=None)
|
|
# now parse and return.
|
|
if d.days:
|
|
rel_time = "{:1d}d ago".format(abs(d.days))
|
|
elif d.seconds > 3600:
|
|
rel_time = "{:.1f}h ago".format(round((abs(d.seconds) / 3600), 1))
|
|
elif 60 <= d.seconds < 3600:
|
|
rel_time = "{:.1f}m ago".format(round((abs(d.seconds) / 60), 1))
|
|
else:
|
|
rel_time = "%ss ago" % (abs(d.seconds))
|
|
return rel_time
|
|
|
|
def get_twitch_logo(self, channel):
|
|
use_bold = self.registryValue("useBold", channel)
|
|
if use_bold:
|
|
twitch_logo = "{0}\x0F\x02".format(
|
|
self.registryValue("twitch.logo", channel)
|
|
)
|
|
else:
|
|
twitch_logo = "{0}\x0F".format(self.registryValue("twitch.logo", channel))
|
|
return twitch_logo
|
|
|
|
def handler_imdb(self, url, info, channel):
|
|
"""
|
|
Handles imdb.com links, querying the OMDB API for additional info
|
|
Typical IMDB URL: http://www.imdb.com/title/tt2467372/
|
|
"""
|
|
if not self.registryValue("imdb.enabled", channel=channel):
|
|
log.debug(
|
|
"SpiffyTitles: IMDB handler disabled. Falling back to default handler."
|
|
)
|
|
return self.handler_default(url, channel)
|
|
apikey = self.registryValue("imdb.omdbAPI")
|
|
result = None
|
|
response = None
|
|
imdb_id = None
|
|
# Don't care about query strings
|
|
if "?" in url:
|
|
url = url.split("?")[0]
|
|
# We can only accommodate a specific format of URL here
|
|
match = re.match(r"https?:\/\/.*\.?imdb\.com\/title\/(tt[0-9]+)\/?.*", url)
|
|
if match:
|
|
imdb_id = match.group(1)
|
|
if not imdb_id:
|
|
log.error("SpiffyTitles: Invalid IMDB URL: %s" % url)
|
|
return self.handler_default(url, channel)
|
|
omdb_url = "http://www.omdbapi.com/"
|
|
options = {"apikey": apikey, "i": imdb_id, "r": "json", "plot": "short"}
|
|
try:
|
|
request = requests.get(omdb_url, params=options, timeout=self.timeout)
|
|
request.raise_for_status()
|
|
except (
|
|
requests.exceptions.RequestException,
|
|
requests.exceptions.HTTPError,
|
|
) as e:
|
|
log.error("SpiffyTitles OMDB Error: %s" % (str(e)))
|
|
try:
|
|
response = json.loads(request.content.decode())
|
|
imdb_template = Template(self.registryValue("imdb.template"))
|
|
if "Error" in response or response["Response"] != "True":
|
|
response = None
|
|
except:
|
|
log.error(
|
|
"SpiffyTitles: JSON error opening OMDB response: %s"
|
|
% (request.content.decode())
|
|
)
|
|
response = None
|
|
if response:
|
|
imdb_template = Template(self.registryValue("imdb.template"))
|
|
meta = None
|
|
tomato = None
|
|
for rating in response["Ratings"]:
|
|
if rating["Source"] == "Rotten Tomatoes":
|
|
tomato = rating["Value"]
|
|
if rating["Source"] == "Metacritic":
|
|
meta = "{0}%".format(rating["Value"].split("/")[0])
|
|
template_vars = {
|
|
"title": response.get("Title"),
|
|
"year": response.get("Year"),
|
|
"country": response.get("Country"),
|
|
"director": response.get("Director"),
|
|
"plot": response.get("Plot"),
|
|
"imdb_id": response.get("imdbID"),
|
|
"imdb_rating": response.get("imdbRating"),
|
|
"tomatoMeter": tomato,
|
|
"metascore": meta,
|
|
"released": response.get("Released"),
|
|
"genre": response.get("Genre"),
|
|
"awards": response.get("Awards"),
|
|
"actors": response.get("Actors"),
|
|
"rated": response.get("Rated"),
|
|
"runtime": response.get("Runtime"),
|
|
"writer": response.get("Writer"),
|
|
"votes": response.get("imdbVotes"),
|
|
"website": response.get("Website"),
|
|
"language": response.get("Language"),
|
|
"box_office": response.get("BoxOffice"),
|
|
"production": response.get("Production"),
|
|
"poster": response.get("Poster"),
|
|
"imdb_logo": self.get_imdb_logo(channel),
|
|
}
|
|
result = imdb_template.render(template_vars)
|
|
if result:
|
|
return result
|
|
else:
|
|
log.debug("SpiffyTitles: IMDB handler failed. calling default handler")
|
|
return self.handler_default(url, channel)
|
|
|
|
def get_imdb_logo(self, channel):
|
|
use_bold = self.registryValue("useBold", channel)
|
|
if use_bold:
|
|
imdb_logo = "{0}\x0F\x02".format(self.registryValue("imdb.logo", channel))
|
|
else:
|
|
imdb_logo = "{0}\x0F".format(self.registryValue("imdb.logo", channel))
|
|
return imdb_logo
|
|
|
|
def handler_wikipedia(self, url, domain, channel):
|
|
"""
|
|
Queries wikipedia API for article extracts.
|
|
"""
|
|
wikipedia_handler_enabled = self.registryValue(
|
|
"wikipedia.enabled", channel=channel
|
|
)
|
|
if not wikipedia_handler_enabled:
|
|
return self.handler_default(url, channel)
|
|
self.log.debug("SpiffyTitles: calling Wikipedia handler for %s" % (url))
|
|
pattern = r"/(?:w(?:iki))/(?P<page>[^/]+)$"
|
|
info = urlparse(url)
|
|
match = re.search(pattern, info.path)
|
|
if not match:
|
|
self.log.debug("SpiffyTitles: no title found.")
|
|
return self.handler_default(url, channel)
|
|
elif info.fragment and self.registryValue(
|
|
"wikipedia.ignoreSectionLinks", channel=channel
|
|
):
|
|
self.log.debug("SpiffyTitles: ignoring section link.")
|
|
return self.handler_default(url, channel)
|
|
else:
|
|
page_title = match.groupdict()["page"]
|
|
default_api_params = {
|
|
"format": "json",
|
|
"action": "query",
|
|
"prop": "extracts",
|
|
"exsentences": "2",
|
|
"exlimit": "1",
|
|
"exintro": "",
|
|
"explaintext": "",
|
|
}
|
|
wiki_api_params = self.registryValue("wikipedia.apiParams", channel=channel)
|
|
extra_params = dict(parse_qsl("&".join(wiki_api_params)))
|
|
title_param = {
|
|
self.registryValue("wikipedia.titleParam", channel=channel): page_title
|
|
}
|
|
# merge dicts
|
|
api_params = default_api_params.copy()
|
|
api_params.update(extra_params)
|
|
api_params.update(title_param)
|
|
api_url = "https://%s/w/api.php" % (info.netloc)
|
|
extract = ""
|
|
self.log.debug("SpiffyTitles: requesting %s" % (api_url))
|
|
try:
|
|
request = requests.get(api_url, params=api_params, timeout=self.timeout)
|
|
request.raise_for_status()
|
|
except (
|
|
requests.exceptions.RequestException,
|
|
requests.exceptions.HTTPError,
|
|
) as e:
|
|
log.error("SpiffyTitles: Wikipedia Error: {0}".format(e))
|
|
return self.handler_default(url, channel)
|
|
response = json.loads(request.content.decode())
|
|
if response:
|
|
try:
|
|
extract = list(response["query"]["pages"].values())[0]["extract"]
|
|
except KeyError as e:
|
|
self.log.error(
|
|
"SpiffyTitles: KeyError. Wikipedia API JSON response: %s" % (str(e))
|
|
)
|
|
else:
|
|
self.log.error("SpiffyTitles: Error parsing Wikipedia API JSON response")
|
|
if extract:
|
|
if self.registryValue("wikipedia.removeParentheses"):
|
|
extract = re.sub(r" ?\([^)]*\)", "", extract)
|
|
max_chars = self.registryValue("wikipedia.maxChars", channel=channel)
|
|
if len(extract) > max_chars:
|
|
extract = (
|
|
extract[: max_chars - 3].rsplit(" ", 1)[0].rstrip(",.") + "..."
|
|
)
|
|
extract_template = self.registryValue(
|
|
"wikipedia.extractTemplate", channel=channel
|
|
)
|
|
wikipedia_template = Template(extract_template)
|
|
return wikipedia_template.render({"extract": extract})
|
|
else:
|
|
self.log.debug("SpiffyTitles: falling back to default handler")
|
|
return self.handler_default(url, channel)
|
|
|
|
def handler_reddit(self, url, domain, channel):
|
|
"""
|
|
Queries wikipedia API for article extracts.
|
|
"""
|
|
reddit_handler_enabled = self.registryValue("reddit.enabled", channel=channel)
|
|
if not reddit_handler_enabled:
|
|
return self.handler_default(url, channel)
|
|
self.log.debug("SpiffyTitles: calling reddit handler for %s" % (url))
|
|
patterns = {
|
|
"thread": {
|
|
"pattern": r"^/r/(?P<subreddit>[^/]+)/comments/(?P<thread>[^/]+)"
|
|
r"(?:/[^/]+/?)?$",
|
|
"url": "https://www.reddit.com/r/{subreddit}/comments/{thread}.json",
|
|
},
|
|
"comment": {
|
|
"pattern": r"^/r/(?P<subreddit>[^/]+)/comments/(?P<thread>[^/]+)/"
|
|
r"[^/]+/(?P<comment>\w+/?)$",
|
|
"url": (
|
|
"https://www.reddit.com/r/{subreddit}/comments/{thread}/x/"
|
|
"{comment}.json"
|
|
),
|
|
},
|
|
"user": {
|
|
"pattern": r"^/u(?:ser)?/(?P<user>[^/]+)/?$",
|
|
"url": "https://www.reddit.com/user/{user}/about.json",
|
|
},
|
|
}
|
|
info = urlparse(url)
|
|
for name in patterns:
|
|
match = re.search(patterns[name]["pattern"], info.path)
|
|
if match:
|
|
link_type = name
|
|
link_info = match.groupdict()
|
|
data_url = patterns[name]["url"].format(**link_info)
|
|
break
|
|
if not match:
|
|
self.log.debug("SpiffyTitles: no title found.")
|
|
return self.handler_default(url, channel)
|
|
self.log.debug("SpiffyTitles: requesting %s" % (data_url))
|
|
headers = {"User-Agent": self.get_user_agent()}
|
|
try:
|
|
request = requests.get(data_url, headers=headers, timeout=self.timeout)
|
|
request.raise_for_status()
|
|
except (
|
|
requests.exceptions.RequestException,
|
|
requests.exceptions.HTTPError,
|
|
) as e:
|
|
log.error("SpiffyTitles: Reddit Error: {0}".format(e))
|
|
return self.handler_default(url, channel)
|
|
data = {}
|
|
extract = ""
|
|
response = json.loads(request.content.decode())
|
|
if response:
|
|
try:
|
|
if link_type == "thread":
|
|
data = response[0]["data"]["children"][0]["data"]
|
|
if link_type == "comment":
|
|
data = response[1]["data"]["children"][0]["data"]
|
|
data["title"] = response[0]["data"]["children"][0]["data"]["title"]
|
|
if link_type == "user":
|
|
data = response["data"]
|
|
except KeyError as e:
|
|
self.log.error(
|
|
"SpiffyTitles: KeyError parsing Reddit JSON response: %s" % (str(e))
|
|
)
|
|
else:
|
|
self.log.error("SpiffyTitles: Error parsing Reddit JSON response")
|
|
if data:
|
|
today = datetime.datetime.now().date()
|
|
created = datetime.datetime.fromtimestamp(data["created_utc"]).date()
|
|
age_days = (today - created).days
|
|
if age_days == 0:
|
|
age = "today"
|
|
elif age_days == 1:
|
|
age = "yesterday"
|
|
else:
|
|
age = "{}d".format(age_days % 365)
|
|
if age_days > 365:
|
|
age = "{}y, ".format(age_days // 365) + age
|
|
age = age + " ago"
|
|
if link_type == "thread":
|
|
link_type = "linkThread"
|
|
if data["is_self"]:
|
|
link_type = "textThread"
|
|
data["url"] = ""
|
|
extract = data.get("selftext", "")
|
|
if link_type == "comment":
|
|
extract = data.get("body", "")
|
|
link_type_template = self.registryValue(
|
|
"reddit." + link_type + "Template", channel=channel
|
|
)
|
|
reddit_template = Template(link_type_template)
|
|
template_vars = {
|
|
"id": data.get("id", ""),
|
|
"user": data.get("name", ""),
|
|
"gold": (data.get("is_gold", False) is True),
|
|
"mod": (data.get("is_mod", False) is True),
|
|
"author": data.get("author", ""),
|
|
"subreddit": data.get("subreddit", ""),
|
|
"url": data.get("url", ""),
|
|
"title": data.get("title", ""),
|
|
"domain": data.get("domain", ""),
|
|
"score": data.get("score", 0),
|
|
"percent": "{}%".format(int(data.get("upvote_ratio", 0) * 100)),
|
|
"comments": "{:,}".format(data.get("num_comments", 0)),
|
|
"created": created.strftime("%Y-%m-%d"),
|
|
"age": age,
|
|
"link_karma": "{:,}".format(data.get("link_karma", 0)),
|
|
"comment_karma": "{:,}".format(data.get("comment_karma", 0)),
|
|
"extract": "%%extract%%",
|
|
}
|
|
reply = reddit_template.render(template_vars)
|
|
if extract:
|
|
max_chars = self.registryValue("reddit.maxChars", channel=channel)
|
|
max_extract_chars = max_chars + len("%%extract%%") - len(reply)
|
|
if len(extract) > max_extract_chars:
|
|
extract = (
|
|
extract[: max_extract_chars - 3].rsplit(" ", 1)[0].rstrip(",.")
|
|
+ "..."
|
|
)
|
|
template_vars["extract"] = extract
|
|
reply = reddit_template.render(template_vars)
|
|
return reply
|
|
else:
|
|
self.log.debug("SpiffyTitles: falling back to default handler")
|
|
return self.handler_default(url, channel)
|
|
|
|
def is_valid_imgur_id(self, input):
|
|
"""
|
|
Tests if input matches the typical imgur id, which seems to be alphanumeric.
|
|
Images, galleries, and albums all share their format in their identifier.
|
|
"""
|
|
match = re.match(r"[a-z0-9]+", input, re.IGNORECASE)
|
|
if match:
|
|
return match
|
|
else:
|
|
return
|
|
|
|
def handler_imgur(self, url, info, channel):
|
|
"""
|
|
Queries imgur API for additional information about imgur links.
|
|
This handler is for any imgur.com domain.
|
|
"""
|
|
is_album = info.path.startswith("/a/") or info.path.startswith("/gallery/")
|
|
result = None
|
|
if is_album:
|
|
result = self.handler_imgur_album(url, info, channel)
|
|
else:
|
|
result = self.handler_imgur_image(url, info, channel)
|
|
return result
|
|
|
|
def handler_imgur_album(self, url, info, channel):
|
|
"""
|
|
Handles retrieving information about albums from the imgur API.
|
|
imgur provides the following information about albums:
|
|
https://api.imgur.com/models/album
|
|
"""
|
|
if not self.registryValue("imgur.enabled", channel=channel):
|
|
return self.handler_default(url, channel)
|
|
client_id = self.registryValue("imgur.clientID")
|
|
if not client_id:
|
|
log.error("SpiffyTitles: imgur client ID not set")
|
|
return self.handler_default(url, channel)
|
|
if info.path.startswith("/a/"):
|
|
album_id = info.path.split("/a/")[1]
|
|
elif info.path.startswith("/gallery/"):
|
|
album_id = info.path.split("/gallery/")[1]
|
|
""" If there is a query string appended, remove it """
|
|
if "?" in album_id:
|
|
album_id = album_id.split("?")[0]
|
|
if not self.is_valid_imgur_id(album_id):
|
|
log.error("SpiffyTitles: Invalid imgur ID")
|
|
return self.handler_default(url, channel)
|
|
log.debug("SpiffyTitles: found imgur album id %s" % (album_id))
|
|
headers = {"Authorization": "Client-ID {0}".format(client_id)}
|
|
api_url = "https://api.imgur.com/3/album/{0}".format(album_id)
|
|
try:
|
|
request = requests.get(api_url, headers=headers, timeout=self.timeout)
|
|
request.raise_for_status()
|
|
except (
|
|
requests.exceptions.RequestException,
|
|
requests.exceptions.HTTPError,
|
|
) as e:
|
|
log.error("SpiffyTitles: Imgur Error: {0}".format(e))
|
|
return self.handler_default(url, channel)
|
|
album = None
|
|
album = json.loads(request.content.decode())
|
|
album = album.get("data")
|
|
if album:
|
|
album_template = self.registryValue("imgur.albumTemplate", channel=channel)
|
|
imgur_album_template = Template(album_template)
|
|
compiled_template = imgur_album_template.render(
|
|
{
|
|
"title": album.get("title"),
|
|
"section": album.get("section"),
|
|
"view_count": "{:,}".format(album["views"]),
|
|
"image_count": "{:,}".format(album["images_count"]),
|
|
"nsfw": album.get("nsfw"),
|
|
"description": album.get("description"),
|
|
}
|
|
)
|
|
return compiled_template
|
|
else:
|
|
log.error("SpiffyTitles: imgur album API returned unexpected results!")
|
|
return self.handler_default(url, channel)
|
|
|
|
def handler_imgur_image(self, url, info, channel):
|
|
"""
|
|
Handles retrieving information about images from the imgur API.
|
|
Used for both direct images and imgur.com/some_image_id_here type links, as
|
|
they're both single images.
|
|
"""
|
|
if not self.registryValue("imgur.enabled", channel=channel):
|
|
return self.handler_default(url, channel)
|
|
client_id = self.registryValue("imgur.clientID")
|
|
if not client_id:
|
|
log.error("SpiffyTitles: imgur client ID not set")
|
|
return self.handler_default(url, channel)
|
|
title = None
|
|
"""
|
|
If there is a period in the path, it's a direct link to an image.
|
|
If not, then it's an imgur.com/some_image_id_here type link
|
|
"""
|
|
if "." in info.path:
|
|
path = info.path.lstrip("/")
|
|
image_id = path.split(".")[0]
|
|
else:
|
|
image_id = info.path.lstrip("/")
|
|
if not self.is_valid_imgur_id(image_id):
|
|
log.error("SpiffyTitles: Invalid imgur image ID")
|
|
return self.handler_default(url, channel)
|
|
log.debug("SpiffyTitles: found image id %s" % (image_id))
|
|
headers = {"Authorization": "Client-ID {0}".format(client_id)}
|
|
api_url = "https://api.imgur.com/3/image/{0}".format(image_id)
|
|
try:
|
|
request = requests.get(api_url, headers=headers, timeout=self.timeout)
|
|
request.raise_for_status()
|
|
except (
|
|
requests.exceptions.RequestException,
|
|
requests.exceptions.HTTPError,
|
|
) as e:
|
|
log.error("SpiffyTitles: Imgur Error: {0}".format(e))
|
|
return self.handler_default(url, channel)
|
|
image = None
|
|
try:
|
|
image = json.loads(request.content.decode())
|
|
image = image.get("data")
|
|
except:
|
|
log.error("SpiffyTitles: Error reading imgur JSON response")
|
|
image = None
|
|
if image:
|
|
channel_template = self.registryValue(
|
|
"imgur.imageTemplate", channel=channel
|
|
)
|
|
imgur_template = Template(channel_template)
|
|
readable_file_size = self.get_readable_file_size(image["size"])
|
|
compiled_template = imgur_template.render(
|
|
{
|
|
"title": image.get("title"),
|
|
"type": image.get("type"),
|
|
"nsfw": image.get("nsfw"),
|
|
"width": image.get("width"),
|
|
"height": image.get("height"),
|
|
"view_count": "{:,}".format(image["views"]),
|
|
"file_size": readable_file_size,
|
|
"section": image.get("section"),
|
|
}
|
|
)
|
|
title = compiled_template
|
|
else:
|
|
log.error("SpiffyTitles: imgur API returned unexpected results!")
|
|
if title:
|
|
return title
|
|
else:
|
|
return self.handler_default(url, channel)
|
|
|
|
def handler_twitter(self, url, info, channel):
|
|
if not self.registryValue("twitter.enabled", channel):
|
|
return self.handler_default(url, channel)
|
|
api_url = "https://publish.twitter.com/oembed?url={0}&omit_script=True".format(
|
|
url
|
|
)
|
|
try:
|
|
request = requests.get(api_url, timeout=self.timeout)
|
|
request.raise_for_status()
|
|
except (
|
|
requests.exceptions.RequestException,
|
|
requests.exceptions.HTTPError,
|
|
) as e:
|
|
log.error("SpiffyTitles: Twitter Error: {0}".format(e))
|
|
return self.handler_default(url, channel)
|
|
response = None
|
|
try:
|
|
response = json.loads(request.content.decode())
|
|
except:
|
|
log.error("SpiffyTitles: Error reading Twitter JSON response")
|
|
return self.handler_default(url, channel)
|
|
results = {}
|
|
soup = BeautifulSoup(response["html"])
|
|
results["text"] = soup.text.replace("—", " - ").strip()
|
|
template = Template(self.registryValue("twitter.template", channel))
|
|
title = template.render(results).strip()
|
|
if title:
|
|
return title
|
|
else:
|
|
return self.handler_default(url, channel)
|
|
|
|
def t(self, irc, msg, args, query):
|
|
"""
|
|
Retrieves title for a URL on demand
|
|
"""
|
|
channel = msg.args[0]
|
|
title = None
|
|
try:
|
|
title = self.get_title_by_url(query, channel)
|
|
except:
|
|
pass
|
|
if title:
|
|
irc.reply(title)
|
|
else:
|
|
return
|
|
|
|
t = wrap(t, ["text"])
|
|
|
|
|
|
Class = SpiffyTitles
|