mirror of
https://github.com/oddluck/limnoria-plugins.git
synced 2025-04-26 04:51:09 -05:00
729 lines
34 KiB
Python
729 lines
34 KiB
Python
###
|
|
# Copyright (c) 2014, spline
|
|
# Copyright (c) 2020, oddluck <oddluck@riseup.net>
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
#
|
|
# * Redistributions of source code must retain the above copyright notice,
|
|
# this list of conditions, and the following disclaimer.
|
|
# * Redistributions in binary form must reproduce the above copyright notice,
|
|
# this list of conditions, and the following disclaimer in the
|
|
# documentation and/or other materials provided with the distribution.
|
|
# * Neither the name of the author of this software nor the name of
|
|
# contributors to this software may be used to endorse or promote products
|
|
# derived from this software without specific prior written consent.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
###
|
|
|
|
# my libs
|
|
import urllib.request, urllib.error, urllib.parse
|
|
import json
|
|
import requests
|
|
import urllib
|
|
# libraries for time_created_at
|
|
import time
|
|
from datetime import datetime
|
|
# for unescape
|
|
import re
|
|
import html.entities
|
|
# oauthtwitter
|
|
import oauth2 as oauth
|
|
# supybot libs
|
|
import supybot.utils as utils
|
|
from supybot.commands import *
|
|
import supybot.plugins as plugins
|
|
import supybot.ircutils as ircutils
|
|
import supybot.callbacks as callbacks
|
|
import supybot.log as log
|
|
from bs4 import BeautifulSoup
|
|
import os
|
|
|
|
class OAuthApi:
|
|
"""OAuth class to work with Twitter v1.1 API."""
|
|
|
|
def __init__(self, consumer_key, consumer_secret, token, token_secret):
|
|
token = oauth.Token(token, token_secret)
|
|
self._Consumer = oauth.Consumer(consumer_key, consumer_secret)
|
|
self._signature_method = oauth.SignatureMethod_HMAC_SHA1()
|
|
self._access_token = token
|
|
|
|
def _FetchUrl(self,url, parameters=None):
|
|
"""Fetch a URL with oAuth. Returns a string containing the body of the response."""
|
|
|
|
extra_params = {}
|
|
if parameters:
|
|
extra_params.update(parameters)
|
|
|
|
req = self._makeOAuthRequest(url, params=extra_params)
|
|
opener = urllib.request.build_opener(urllib.request.HTTPHandler(debuglevel=0))
|
|
url = req.to_url()
|
|
url_data = opener.open(url)
|
|
opener.close()
|
|
return url_data
|
|
|
|
def _makeOAuthRequest(self, url, token=None, params=None):
|
|
"""Make a OAuth request from url and parameters. Returns oAuth object."""
|
|
|
|
oauth_base_params = {
|
|
'oauth_version': "1.0",
|
|
'oauth_nonce': oauth.generate_nonce(),
|
|
'oauth_timestamp': int(time.time())
|
|
}
|
|
|
|
if params:
|
|
params.update(oauth_base_params)
|
|
else:
|
|
params = oauth_base_params
|
|
|
|
if not token:
|
|
token = self._access_token
|
|
request = oauth.Request(method="GET", url=url, parameters=params)
|
|
request.sign_request(self._signature_method, self._Consumer, token)
|
|
return request
|
|
|
|
def ApiCall(self, call, parameters={}):
|
|
"""Calls the twitter API with 'call' and returns the twitter object (JSON)."""
|
|
|
|
try:
|
|
data = self._FetchUrl("https://api.twitter.com/1.1/" + call + ".json", parameters)
|
|
except urllib.error.HTTPError as e: # http error code.
|
|
return e.code
|
|
except urllib.error.URLError as e: # http "reason"
|
|
return e.reason
|
|
else: # return data if good.
|
|
return data
|
|
|
|
|
|
class Tweety(callbacks.Plugin):
|
|
"""Public Twitter class for working with the API."""
|
|
threaded = True
|
|
|
|
def __init__(self, irc):
|
|
self.__parent = super(Tweety, self)
|
|
self.__parent.__init__(irc)
|
|
self.twitterApi = False
|
|
self.since_id = {}
|
|
if not self.twitterApi:
|
|
self._checkAuthorization()
|
|
|
|
def _httpget(self, url, h=None, d=None, l=False):
|
|
"""General HTTP resource fetcher. Pass headers via h, data via d, and to log via l."""
|
|
|
|
try:
|
|
if h and d:
|
|
page = utils.web.getUrl(url, headers=h, data=d)
|
|
else:
|
|
h = {"User-Agent":"Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:17.0) Gecko/20100101 Firefox/17.0"}
|
|
page = utils.web.getUrl(url, headers=h)
|
|
try:
|
|
page = page.decode()
|
|
except:
|
|
page = page.decode('iso-8859-1')
|
|
return page
|
|
except utils.web.Error as e:
|
|
self.log.error("ERROR opening {0} message: {1}".format(url, e))
|
|
return None
|
|
|
|
|
|
def _shortenUrl(self, url):
|
|
"""Shortens a long URL into a short one."""
|
|
|
|
api_key = self.registryValue('bitlyKey')
|
|
url_enc = urllib.parse.quote_plus(url)
|
|
api_url = 'https://api-ssl.bitly.com/v3/shorten?access_token={}&longUrl={}&format=json'
|
|
|
|
try:
|
|
data = requests.get(api_url.format(api_key, url_enc), timeout=10)
|
|
data = json.loads(data.content)
|
|
url2 = data['data'].get('url')
|
|
if url2.strip():
|
|
return url2.strip()
|
|
else:
|
|
return url
|
|
except:
|
|
self.log.error("ERROR: Failed shortening url: {0}".format(longurl))
|
|
return url
|
|
|
|
def _checkAuthorization(self):
|
|
""" Check if we have our keys and can auth."""
|
|
|
|
if not self.twitterApi: # if not set, try and auth.
|
|
failTest = False # first check that we have all 4 keys.
|
|
for checkKey in ('consumerKey', 'consumerSecret', 'accessKey', 'accessSecret'):
|
|
try: # try to see if each key is set.
|
|
testKey = self.registryValue(checkKey)
|
|
except: # a key is not set, break and error.
|
|
self.log.debug("Failed checking keys. We're missing the config value for: {0}. Please set this and try again.".format(checkKey))
|
|
failTest = True
|
|
break
|
|
# if any missing, throw an error and keep twitterApi=False
|
|
if failTest:
|
|
self.log.error('Failed getting keys. You must set all 4 keys in config variables and reload plugin.')
|
|
return False
|
|
# We have all 4 keys. Now lets see if they are valid by calling verify_credentials in the API.
|
|
self.log.info("Got all 4 keys. Now trying to auth up with Twitter.")
|
|
twitterApi = OAuthApi(self.registryValue('consumerKey'), self.registryValue('consumerSecret'), self.registryValue('accessKey'), self.registryValue('accessSecret'))
|
|
data = twitterApi.ApiCall('account/verify_credentials')
|
|
# check the response. if we can load json, it means we're authenticated. else, return response.
|
|
try: # if we pass, response is validated. set self.twitterApi w/object.
|
|
json.loads(data.read())
|
|
self.log.info("I have successfully authorized and logged in to Twitter using your credentials.")
|
|
self.twitterApi = OAuthApi(self.registryValue('consumerKey'), self.registryValue('consumerSecret'), self.registryValue('accessKey'), self.registryValue('accessSecret'))
|
|
except: # response failed. Return what we got back.
|
|
self.log.error("ERROR: I could not log in using your credentials. Message: {0}".format(data))
|
|
return False
|
|
else: # if we're already validated, pass.
|
|
pass
|
|
|
|
########################
|
|
# COLOR AND FORMATTING #
|
|
########################
|
|
|
|
def _red(self, string):
|
|
"""Returns a red string."""
|
|
return ircutils.mircColor(string, 'red')
|
|
|
|
def _blue(self, string):
|
|
"""Returns a blue string."""
|
|
return ircutils.mircColor(string, 'blue')
|
|
|
|
def _bold(self, string):
|
|
"""Returns a bold string."""
|
|
return ircutils.bold(string)
|
|
|
|
def _ul(self, string):
|
|
"""Returns an underline string."""
|
|
return ircutils.underline(string)
|
|
|
|
def _bu(self, string):
|
|
"""Returns a bold/underline string."""
|
|
return ircutils.bold(ircutils.underline(string))
|
|
|
|
######################
|
|
# INTERNAL FUNCTIONS #
|
|
######################
|
|
|
|
def _unescape(self, text):
|
|
"""Created by Fredrik Lundh (http://effbot.org/zone/re-sub.htm#unescape-html)"""
|
|
|
|
# quick dump \n and \r, usually coming from bots that autopost html.
|
|
text = text.replace('\n', ' ').replace('\r', ' ')
|
|
# now the actual unescape.
|
|
def fixup(m):
|
|
text = m.group(0)
|
|
if text[:2] == "&#":
|
|
# character reference
|
|
try:
|
|
if text[:3] == "&#x":
|
|
return chr(int(text[3:-1], 16))
|
|
else:
|
|
return chr(int(text[2:-1]))
|
|
except (ValueError, OverflowError):
|
|
pass
|
|
else:
|
|
# named entity
|
|
try:
|
|
text = chr(html.entities.name2codepoint[text[1:-1]])
|
|
except KeyError:
|
|
pass
|
|
return text # leave as is
|
|
return re.sub("&#?\w+;", fixup, text)
|
|
|
|
def _time_created_at(self, s):
|
|
"""
|
|
Return relative time delta between now and s (dt string).
|
|
"""
|
|
|
|
try: # timeline's created_at Tue May 08 10:58:49 +0000 2012
|
|
ddate = time.strptime(s, "%a %b %d %H:%M:%S +0000 %Y")[:-2]
|
|
except ValueError:
|
|
try: # search's created_at Thu, 06 Oct 2011 19:41:12 +0000
|
|
ddate = time.strptime(s, "%a, %d %b %Y %H:%M:%S +0000")[:-2]
|
|
except ValueError:
|
|
return s
|
|
# do the math
|
|
d = datetime.utcnow() - datetime(*ddate, tzinfo=None)
|
|
# now parse and return.
|
|
if d.days:
|
|
rel_time = "{:1d}d ago".format(abs(d.days))
|
|
elif d.seconds > 3600:
|
|
rel_time = "{:.1f}h ago".format(round((abs(d.seconds) / 3600),1))
|
|
elif 60 <= d.seconds < 3600:
|
|
rel_time = "{:.1f}m ago".format(round((abs(d.seconds) / 60),1))
|
|
else:
|
|
rel_time = "%ss ago" % (abs(d.seconds))
|
|
return rel_time
|
|
|
|
def _outputTweet(self, irc, msg, nick, name, verified, text, time, tweetid):
|
|
"""
|
|
Constructs string to output for Tweet. Used for tsearch and twitter.
|
|
"""
|
|
|
|
# build output string.
|
|
if self.registryValue('outputColorTweets', msg.args[0]):
|
|
ret = "@{0}".format(self._ul(self._blue(nick)))
|
|
else: # bold otherwise.
|
|
ret = "@{0}".format(self._bu(nick))
|
|
if verified:
|
|
string = self._bold(ircutils.mircColor("", 'white', 'blue'))
|
|
ret += "{}".format(string)
|
|
# show real name in tweet output?
|
|
if not self.registryValue('hideRealName', msg.args[0]):
|
|
ret += " ({0})".format(name)
|
|
# add in the end with the text + tape.
|
|
if self.registryValue('colorTweetURLs', msg.args[0]): # color urls.
|
|
text = re.sub(r'(http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+)', self._red(r'\1'), text)
|
|
ret += ": {0} ({1})".format(text, self._bold(time))
|
|
else: # only bold time. no text color.
|
|
ret += ": {0} ({1})".format(text, self._bold(time))
|
|
# short url the link to the tweet?
|
|
if self.registryValue('addShortUrl', msg.args[0]):
|
|
url = self._createShortUrl(nick, tweetid)
|
|
if url: # if we got a url back.
|
|
ret += " {0}".format(url)
|
|
# now return.
|
|
return ret
|
|
|
|
def _createShortUrl(self, nick, tweetid):
|
|
"""Shortens a tweet into a short one."""
|
|
|
|
api_key = self.registryValue('bitlyKey')
|
|
longurl = "https://twitter.com/%s/status/%s" % (nick, tweetid)
|
|
api_url = 'https://api-ssl.bitly.com/v3/shorten?access_token={}&longUrl={}&format=json'
|
|
|
|
try:
|
|
data = requests.get(api_url.format(api_key, longurl), timeout=10)
|
|
data = json.loads(data.content)
|
|
url2 = data['data'].get('url')
|
|
if url2.strip():
|
|
return url2.strip()
|
|
else:
|
|
return longurl
|
|
except:
|
|
self.log.error("ERROR: Failed shortening url: {0}".format(longurl))
|
|
return longurl
|
|
|
|
def _woeid_lookup(self, lookup):
|
|
"""<location>
|
|
Use Yahoo's API to look-up a WOEID.
|
|
"""
|
|
try:
|
|
data = self.twitterApi.ApiCall('trends/available')
|
|
data = json.loads(data.read())
|
|
except:
|
|
data = None
|
|
log.debug('Tweety: error retrieving data from Trends API')
|
|
if not data:
|
|
log.debug("No location results for {0}".format(lookup))
|
|
return
|
|
return next((item["woeid"] for item in data if lookup.lower() in item["name"].lower()), None)
|
|
|
|
####################
|
|
# PUBLIC FUNCTIONS #
|
|
####################
|
|
|
|
def woeidlookup(self, irc, msg, args, lookup):
|
|
"""<location>
|
|
Search Yahoo's WOEID DB for a location. Useful for the trends variable.
|
|
Ex: London or Boston
|
|
"""
|
|
|
|
woeid = self._woeid_lookup(lookup)
|
|
if woeid:
|
|
irc.reply("WOEID: {0} for '{1}'".format(self._bold(woeid), lookup))
|
|
else:
|
|
irc.reply("ERROR: Something broke trying to find a WOEID for '{0}'".format(lookup))
|
|
|
|
woeidlookup = wrap(woeidlookup, ['text'])
|
|
|
|
def ratelimits(self, irc, msg, args):
|
|
"""
|
|
Display current rate limits for your twitter API account.
|
|
"""
|
|
|
|
# before we do anything, make sure we have a twitterApi object.
|
|
if not self.twitterApi:
|
|
irc.reply("ERROR: Twitter is not authorized. Please check logs before running this command.")
|
|
return
|
|
# make API call.
|
|
data = self.twitterApi.ApiCall('application/rate_limit_status', parameters={'resources':'trends,search,statuses,users'})
|
|
try:
|
|
data = json.loads(data.read())
|
|
except:
|
|
irc.reply("ERROR: Failed to lookup ratelimit data: {0}".format(data))
|
|
return
|
|
# parse data;
|
|
data = data.get('resources')
|
|
if not data: # simple check if we have part of the json dict.
|
|
irc.reply("ERROR: Failed to fetch application rate limit status. Something could be wrong with Twitter.")
|
|
self.log.error("ERROR: fetching rate limit data. '{0}'".format(data))
|
|
return
|
|
# dict of resources we want and how to parse. key=human name, values are for the json dict.
|
|
resources = {'trends':['trends', '/trends/place'],
|
|
'tsearch':['search', '/search/tweets'],
|
|
'twitter --id':['statuses', '/statuses/show/:id'],
|
|
'twitter --info':['users', '/users/show/:id'],
|
|
'twitter timeline':['statuses', '/statuses/user_timeline'] }
|
|
# now iterate through dict above.
|
|
for resource in resources:
|
|
rdict = resources[resource] # get value.
|
|
endpoint = data.get(rdict[0]).get(rdict[1]) # value[0], value[1]
|
|
minutes = "%sm%ss" % divmod(int(endpoint['reset'])-int(time.time()), 60) # math.
|
|
output = "Reset in: {0} Remaining: {1}".format(minutes, endpoint['remaining'])
|
|
irc.reply("{0} :: {1}".format(self._bold(resource), output))
|
|
|
|
ratelimits = wrap(ratelimits)
|
|
|
|
def trends(self, irc, msg, args, getopts, optwoeid):
|
|
"""[--exclude] [location]
|
|
|
|
Returns the Top 10 Twitter trends for a specific location. Use optional argument location for trends.
|
|
Defaults to worldwide and can be set via config variable.
|
|
Use --exclude to not include #hashtags in trends data.
|
|
Ex: Boston or --exclude London
|
|
"""
|
|
|
|
# enforce +voice or above to use command?
|
|
if self.registryValue('requireVoiceOrAbove', msg.args[0]): # should we check?
|
|
if ircutils.isChannel(msg.args[0]): # are we in a channel?
|
|
if not irc.state.channels[msg.args[0]].isVoicePlus(msg.nick): # are they + or @?
|
|
irc.error("ERROR: You have to be at least voiced to use the trends command in {0}.".format(msg.args[0]))
|
|
return
|
|
|
|
# before we do anything, make sure we have a twitterApi object.
|
|
if not self.twitterApi:
|
|
irc.reply("ERROR: Twitter is not authorized. Please check logs before running this command.")
|
|
return
|
|
|
|
# default arguments.
|
|
args = {'id': self.registryValue('woeid', msg.args[0]),
|
|
'exclude': self.registryValue('hideHashtagsTrends', msg.args[0])}
|
|
# handle input.
|
|
if getopts:
|
|
for (key, value) in getopts:
|
|
if key == 'exclude': # remove hashtags from trends.
|
|
args['exclude'] = 'hashtags'
|
|
# work with woeid. 1 is world, the default. can be set via input or via config.
|
|
if optwoeid: # if we have an input location, lookup the woeid.
|
|
if optwoeid.lower().startswith('world'): # looking for worldwide or some variation. (bypass)
|
|
args['id'] = 1 # "World Wide" is worldwide (odd bug) = 1.
|
|
else: # looking for something else.
|
|
woeid = self._woeid_lookup(optwoeid) # yahoo search for woeid.
|
|
if woeid: # if we get a returned value, set it. otherwise default value.
|
|
args['id'] = woeid
|
|
else: # location not found.
|
|
irc.reply("ERROR: I could not lookup location: {0}. Try a different location.".format(optwoeid))
|
|
return
|
|
# now build our API call
|
|
data = self.twitterApi.ApiCall('trends/place', parameters=args)
|
|
try:
|
|
data = json.loads(data.read())
|
|
except:
|
|
irc.reply("ERROR: failed to lookup trends on Twitter: {0}".format(data))
|
|
return
|
|
# now, before processing, check for errors:
|
|
if 'errors' in data:
|
|
if data['errors'][0]['code'] == 34: # 34 means location not found.
|
|
irc.reply("ERROR: I do not have any trends for: {0}".format(optwoeid))
|
|
return
|
|
else: # just return the message.
|
|
errmsg = data['errors'][0]
|
|
irc.reply("ERROR: Could not load trends. ({0} {1})".format(errmsg['code'], errmsg['message']))
|
|
return
|
|
# if no error here, we found trends. prepare string and output.
|
|
location = data[0]['locations'][0]['name']
|
|
ttrends = " | ".join([trend['name'] for trend in data[0]['trends']])
|
|
irc.reply("Top 10 Twitter Trends in {0} :: {1}".format(self._bold(location), ttrends))
|
|
|
|
trends = wrap(trends, [getopts({'exclude':''}), optional('text')])
|
|
|
|
def tsearch(self, irc, msg, args, optlist, optterm):
|
|
"""[--num number] [--searchtype mixed,recent,popular] [--lang xx] [--new] <term>
|
|
|
|
Searches Twitter for the <term> and returns the most recent results.
|
|
--num is number of results. (1-10)
|
|
--searchtype being recent, popular or mixed. Popular is the default.
|
|
Ex: --num 3 breaking news
|
|
"""
|
|
|
|
# enforce +voice or above to use command?
|
|
if self.registryValue('requireVoiceOrAbove', msg.args[0]): # should we check?
|
|
if ircutils.isChannel(msg.args[0]): # are we in a channel?
|
|
if not irc.state.channels[msg.args[0]].isVoicePlus(msg.nick): # are they + or @?
|
|
irc.error("ERROR: You have to be at least voiced to use the tsearch command in {0}.".format(msg.args[0]))
|
|
return
|
|
|
|
# before we do anything, make sure we have a twitterApi object.
|
|
if not self.twitterApi:
|
|
irc.reply("ERROR: Twitter is not authorized. Please check logs before running this command.")
|
|
return
|
|
|
|
self.since_id.setdefault(msg.channel, {})
|
|
self.since_id[msg.channel].setdefault('{0}'.format(optterm), None)
|
|
new = False
|
|
# default arguments.
|
|
tsearchArgs = {'include_entities':'false',
|
|
'tweet_mode': 'extended',
|
|
'count': self.registryValue('defaultSearchResults', msg.args[0]),
|
|
'lang':'en',
|
|
'q':utils.web.urlquote(optterm)}
|
|
# check input.
|
|
if optlist:
|
|
for (key, value) in optlist:
|
|
if key == 'num': # --num
|
|
maxresults = self.registryValue('maxSearchResults', msg.args[0])
|
|
if not (1 <= value <= maxresults): # make sure it's between what we should output.
|
|
irc.reply("ERROR: '{0}' is not a valid number of tweets. Range is between 1 and {1}.".format(value, maxresults))
|
|
return
|
|
else: # change number to output.
|
|
tsearchArgs['count'] = value
|
|
if key == 'searchtype': # getopts limits us here.
|
|
tsearchArgs['result_type'] = value # limited by getopts to valid values.
|
|
if key == 'lang': # lang . Uses ISO-639 codes like 'en' http://en.wikipedia.org/wiki/ISO_639-1
|
|
tsearchArgs['lang'] = value
|
|
if key == 'new' and self.since_id[msg.channel]['{0}'.format(optterm)]:
|
|
new = True
|
|
tsearchArgs['since_id'] = self.since_id[msg.channel]['{0}'.format(optterm)]
|
|
# now build our API call.
|
|
data = self.twitterApi.ApiCall('search/tweets', parameters=tsearchArgs)
|
|
try:
|
|
data = json.loads(data.read())
|
|
except:
|
|
irc.reply("ERROR: Something went wrong trying to search Twitter. ({0})".format(data))
|
|
return
|
|
# check the return data.
|
|
results = data.get('statuses') # data returned as a dict.
|
|
if not results or len(results) == 0: # found nothing or length 0.
|
|
if not new:
|
|
irc.reply("ERROR: No Twitter Search results found for '{0}'".format(optterm))
|
|
return
|
|
else: # we found something.
|
|
self.since_id[msg.channel]['{0}'.format(optterm)] = results[0].get('id')
|
|
for result in results[0:int(tsearchArgs['count'])]: # iterate over each.
|
|
nick = self._unescape(result['user'].get('screen_name'))
|
|
name = self._unescape(result["user"].get('name'))
|
|
verified = result['user'].get('verified')
|
|
text = self._unescape(result.get('full_text')) or self._unescape(result.get('text'))
|
|
date = self._time_created_at(result.get('created_at'))
|
|
tweetid = result.get('id_str')
|
|
# build output string and output.
|
|
output = self._outputTweet(irc, msg, nick, name, verified, text, date, tweetid)
|
|
irc.reply(output)
|
|
|
|
tsearch = wrap(tsearch, [getopts({'num':('int'),
|
|
'searchtype':('literal', ('popular', 'mixed', 'recent')),
|
|
'lang':('somethingWithoutSpaces'),
|
|
'new':''}),
|
|
('text')])
|
|
|
|
def twitter(self, irc, msg, args, optlist, optnick, opturl):
|
|
"""[--noreply] [--nort] [--new] [--num number] <nick> | [--id id] | [--info nick]
|
|
|
|
Returns last tweet or 'number' tweets (max 10). Shows all tweets, including rt and reply.
|
|
To not display replies or RT's, use --noreply or --nort, respectively.
|
|
Return new tweets since the last time you checked in channel with --new
|
|
Or returns specific tweet with --id 'tweet#'.
|
|
Or returns information on user with --info 'name'.
|
|
Ex: --info @cnn OR --id 337197009729622016 OR --number 3 @drudge
|
|
"""
|
|
|
|
self.since_id.setdefault(msg.channel, {})
|
|
self.since_id[msg.channel].setdefault('{0}'.format(optnick), None)
|
|
# enforce +voice or above to use command?
|
|
if self.registryValue('requireVoiceOrAbove', msg.args[0]): # should we check?
|
|
if ircutils.isChannel(msg.args[0]): # are we in a channel?
|
|
if not irc.state.channels[msg.args[0]].isVoicePlus(msg.nick): # are they + or @?
|
|
irc.error("ERROR: You have to be at least voiced to use the twitter command in {0}.".format(msg.args[0]))
|
|
return
|
|
|
|
# before we do anything, make sure we have a twitterApi object.
|
|
if not self.twitterApi:
|
|
irc.reply("ERROR: Twitter is not authorized. Please check logs before running this command.")
|
|
return
|
|
|
|
# now begin
|
|
optnick = optnick.replace('@','') # strip @ from input if given.
|
|
# default options.
|
|
args = {'id': False,
|
|
'nort': False,
|
|
'noreply': False,
|
|
'url': False,
|
|
'new': False,
|
|
'num': self.registryValue('defaultResults', msg.args[0]),
|
|
'info': False}
|
|
# handle input optlist.
|
|
if optlist:
|
|
for (key, value) in optlist:
|
|
if key == 'id':
|
|
args['id'] = True
|
|
if key == 'url':
|
|
args['url'] = True
|
|
if key == 'nort':
|
|
args['nort'] = True
|
|
if key == 'noreply':
|
|
args['noreply'] = True
|
|
if key == 'new':
|
|
args['new'] = True
|
|
if key == 'num':
|
|
maxresults = self.registryValue('maxResults', msg.args[0])
|
|
if not (1 <= value <= maxresults): # make sure it's between what we should output.
|
|
irc.reply("ERROR: '{0}' is not a valid number of tweets. Range is between 1 and {1}.".format(value, maxresults))
|
|
return
|
|
else: # number is valid so return this.
|
|
args['num'] = value
|
|
if key == 'info':
|
|
args['info'] = True
|
|
# handle the four different rest api endpoint urls + twitterArgs dict for options.
|
|
if args['id']: # -id #.
|
|
apiUrl = 'statuses/show'
|
|
twitterArgs = {'id': optnick, 'include_entities':'false', 'tweet_mode': 'extended'}
|
|
elif args['info']: # --info.
|
|
apiUrl = 'users/show'
|
|
twitterArgs = {'screen_name': optnick, 'include_entities':'false'}
|
|
elif args['new']: # --new.
|
|
apiUrl = 'statuses/user_timeline'
|
|
if self.since_id[msg.channel]['{0}'.format(optnick)]:
|
|
twitterArgs = {'screen_name': optnick, 'since_id':self.since_id[msg.channel]['{0}'.format(optnick)], 'count': args['num'], 'tweet_mode': 'extended'}
|
|
if args['nort']: # show retweets?
|
|
twitterArgs['include_rts'] = 'false'
|
|
else: # default is to show retweets.
|
|
twitterArgs['include_rts'] = 'true'
|
|
if args['noreply']: # show replies?
|
|
twitterArgs['exclude_replies'] = 'true'
|
|
else: # default is to NOT exclude replies.
|
|
twitterArgs['exclude_replies'] = 'false'
|
|
else:
|
|
twitterArgs = {'screen_name': optnick, 'count': args['num'], 'tweet_mode': 'extended'}
|
|
if args['nort']: # show retweets?
|
|
twitterArgs['include_rts'] = 'false'
|
|
else: # default is to show retweets.
|
|
twitterArgs['include_rts'] = 'true'
|
|
if args['noreply']: # show replies?
|
|
twitterArgs['exclude_replies'] = 'true'
|
|
else: # default is to NOT exclude replies.
|
|
twitterArgs['exclude_replies'] = 'false'
|
|
else: # if not an --id --info, or --new we're printing from their timeline.
|
|
apiUrl = 'statuses/user_timeline'
|
|
twitterArgs = {'screen_name': optnick, 'count': args['num'], 'tweet_mode': 'extended'}
|
|
if args['nort']: # show retweets?
|
|
twitterArgs['include_rts'] = 'false'
|
|
else: # default is to show retweets.
|
|
twitterArgs['include_rts'] = 'true'
|
|
if args['noreply']: # show replies?
|
|
twitterArgs['exclude_replies'] = 'true'
|
|
else: # default is to NOT exclude replies.
|
|
twitterArgs['exclude_replies'] = 'false'
|
|
# call the Twitter API with our data.
|
|
data = self.twitterApi.ApiCall(apiUrl, parameters=twitterArgs)
|
|
try:
|
|
data = json.loads(data.read())
|
|
except:
|
|
irc.reply("ERROR: Failed to lookup Twitter for '{0}' ({1}) ".format(optnick, data))
|
|
return
|
|
# before anything, check for errors. errmsg is conditional.
|
|
if 'errors' in data:
|
|
if data['errors'][0]['code'] == 34: # not found.
|
|
if args['id']: # --id #. # is not found.
|
|
errmsg = "ERROR: Tweet ID '{0}' not found.".format(optnick)
|
|
else: # --info <user> or twitter <user> not found.
|
|
errmsg = "ERROR: Twitter user '{0}' not found.".format(optnick)
|
|
irc.reply(errmsg) # print the error and exit.
|
|
return
|
|
else: # errmsg is not 34. just return it.
|
|
errmsg = data['errors'][0]
|
|
irc.reply("ERROR: {0} {1}".format(errmsg['code'], errmsg['message']))
|
|
return
|
|
# no errors, so we process data conditionally.
|
|
if args['id']: # If --id was given for a single tweet.
|
|
url = ''
|
|
if opturl:
|
|
url = ' - {}'.format(self._shortenUrl(opturl))
|
|
text = self._unescape(data.get('full_text')) or self._unescape(data.get('text'))
|
|
nick = self._unescape(data["user"].get('screen_name'))
|
|
name = self._unescape(data["user"].get('name'))
|
|
verified = data["user"].get('verified')
|
|
relativeTime = self._time_created_at(data.get('created_at'))
|
|
tweetid = data.get('id')
|
|
# prepare string to output and send to irc.
|
|
output = self._outputTweet(irc, msg, nick, name, verified, text, relativeTime, tweetid)
|
|
output += url
|
|
irc.reply(output)
|
|
return
|
|
elif args['info']: # --info to return info on a Twitter user.
|
|
location = data.get('location')
|
|
followers = data.get('followers_count')
|
|
friends = data.get('friends_count')
|
|
description = self._unescape(data.get('description'))
|
|
screen_name = self._unescape(data.get('screen_name'))
|
|
created_at = data.get('created_at')
|
|
statuses_count = data.get('statuses_count')
|
|
protected = data.get('protected')
|
|
name = self._unescape(data.get('name'))
|
|
url = data.get('url')
|
|
# build output string conditionally. build string conditionally.
|
|
ret = self._bu("@{0}".format(screen_name))
|
|
ret += " ({0})".format(name)
|
|
if protected: # is the account protected/locked?
|
|
ret += " [{0}]:".format(self._bu('LOCKED'))
|
|
else: # open.
|
|
ret += ":"
|
|
if url: # do they have a url?
|
|
ret += " {0}".format(self._ul(url))
|
|
if description: # a description?
|
|
ret += " {0}".format(self._unescape(description))
|
|
ret += " [{0} friends,".format(self._bold(friends))
|
|
ret += " {0} tweets,".format(self._bold(statuses_count))
|
|
ret += " {0} followers,".format(self._bold(followers))
|
|
ret += " signup: {0}".format(self._bold(self._time_created_at(created_at)))
|
|
if location: # do we have location?
|
|
ret += " Location: {0}]".format(self._bold(location))
|
|
else: # nope.
|
|
ret += "]"
|
|
# finally, output.
|
|
irc.reply(ret)
|
|
return
|
|
else: # this will display tweets/a user's timeline. can be n+1 tweets.
|
|
if len(data) == 0: # no tweets found.
|
|
if args['new']:
|
|
return
|
|
else:
|
|
irc.reply("ERROR: '{0}' has not tweeted yet.".format(optnick))
|
|
return
|
|
self.since_id[msg.channel]['{0}'.format(optnick)] = data[0].get('id')
|
|
for tweet in data: # n+1 tweets found. iterate through each tweet.
|
|
text = self._unescape(tweet.get('full_text')) or self._unescape(tweet.get('text'))
|
|
nick = self._unescape(tweet["user"].get('screen_name'))
|
|
name = self._unescape(tweet["user"].get('name'))
|
|
verified = tweet['user'].get('verified')
|
|
tweetid = tweet.get('id')
|
|
relativeTime = self._time_created_at(tweet.get('created_at'))
|
|
# prepare string to output and send to irc.
|
|
output = self._outputTweet(irc, msg, nick, name, verified, text, relativeTime, tweetid)
|
|
irc.reply(output)
|
|
|
|
twitter = wrap(twitter, [getopts({'noreply':'',
|
|
'nort':'',
|
|
'info':'',
|
|
'id':'',
|
|
'url':'',
|
|
'new':'',
|
|
'num':('int')}), ('somethingWithoutSpaces'), optional('somethingWithoutSpaces')])
|
|
|
|
Class = Tweety
|
|
|
|
|
|
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=279:
|