Add 'Tweety/' from commit '9b7d3f9b4f4d54f2569f4ea9f375d92d154dcb2c'

git-subtree-dir: Tweety
git-subtree-mainline: 63c4552f426bd660b2844bee4791f6674f27b1ed
git-subtree-split: 9b7d3f9b4f4d54f2569f4ea9f375d92d154dcb2c
This commit is contained in:
oddluck 2019-03-05 23:21:31 -05:00
commit 547e7ff9cb
10 changed files with 948 additions and 0 deletions

27
Tweety/.gitignore vendored Normal file
View File

@ -0,0 +1,27 @@
*.py[co]
# Packages
*.egg
*.egg-info
dist
build
eggs
parts
bin
var
sdist
develop-eggs
.installed.cfg
# Installer logs
pip-log.txt
# Unit test / coverage reports
.coverage
.tox
#Translations
*.mo
#Mr Developer
.mr.developer.cfg

19
Tweety/.travis.yml Normal file
View File

@ -0,0 +1,19 @@
language: python
python:
- "2.7"
- pypy
# command to install dependencies,
install:
- pip install -vr requirements.txt||true
# command to run tests, e.g. python setup.py test
script:
- echo $TRAVIS_PYTHON_VERSION
- cd .. && supybot-test Tweety
notifications:
email: false
irc:
channels:
- "irc.efnet.net#supybot"
matrix:
fast_finish: true

20
Tweety/LICENSE.txt Normal file
View File

@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2014 spline
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

111
Tweety/README.md Normal file
View File

@ -0,0 +1,111 @@
[![Build Status](https://travis-ci.org/reticulatingspline/Tweety.svg?branch=master)](https://travis-ci.org/reticulatingspline/Tweety)
# Limnoria plugin for Twitter.
## Introduction
This began with [Hoaas](https://github.com/Hoaas) making a slimmed down version of
ProgVal's [Twitter plugin](https://github.com/ProgVal/Supybot-Plugins/Twitter). He
was just interested in reading Tweets and showing information about the account, not
having any write cabaility. I started adding features to it and had to do an entire
rewrite after Twitter introduced the v1.1 API.
This plugin is able to display information on accounts, display specific tweets, search for tweets, and display trends.
If you are looking for anything outside of this, I suggest you do not run this plugin and instead install
ProgVal's version that I linked to above.
## Install
You will need a working Limnoria bot on Python 2.7 for this to work.
Go into your Limnoria plugin dir, usually ~/supybot/plugins and run:
```
git clone https://github.com/reticulatingspline/Tweety
```
To install additional requirements, run:
```
pip install -r requirements.txt
```
Next, load the plugin:
```
/msg bot load Tweety
```
[Fetch the API keys for Twitter](http://dev.twitter.com) by signing up (free).
Create an application. Fill out the requested information. Name does not matter
but the name of the application must be unique. Default is read-only, which is fine.
Once complete, they'll issue you 4 different "strings" that you need to input
into the bot, matching up with the config variable names.
```
/msg <bot> config plugins.Tweety.consumerKey xxxxx
/msg <bot> config plugins.Tweety.consumerSecret xxxxx
/msg <bot> config plugins.Tweety.accessKey xxxxx
/msg <bot> config plugins.Tweety.accessSecret xxxxx
```
Now, reload the bot and you should be good to go:
```
/msg bot reload Tweety
```
Optional: There are some config variables that can be set for the bot. They mainly control output stuff.
```
/msg bot config search Tweety
```
## Example Usage
```
<me> trends
<bot> Top 10 Twitter Trends in United States :: #BeforeIDieIWantTo | #ThingsIMissAboutMyChildhood | Happy Memorial Day | #RG13 | #USA | #america | BBQ | WWII | God Bless | Facebook
<me> tsearch news
<bot> @ray_gallego (Ray Gallego): http://t.co/ftNbDEzXaR (Researchers say Western IQs dropped 14 points over last century) (14s ago)
<bot> @surfing93 (emilyhenderson): @MariaaEveline Hay here is the Crestillion Interview. http://t.co/CEiDpboeMX (15s ago)
<me> twitter --num 3 @ESPNStatsInfo
<bot> @ESPNStatsInfo (ESPN Stats & Info): In 1st-round win vs Daniel Brands, Rafael Nadal lost 19 games. He lost a total of 19 games in the 1st 4 rounds at last year's French Open. (30m ago)
<bot> @ESPNStatsInfo (ESPN Stats & Info): Key stats from Miami's win yesterday. Haslem's jump shot, LeBron's post-up and more: http://t.co/a4CcUnKJMi (53m ago)
<bot> @ESPNStatsInfo (ESPN Stats & Info): Heat avoid losing consecutive games. They haven't lost 2 straight in more than 5 months (January 8-10) (1h ago)
```
## Extras
Want the bot to function like others do parsing out Twitter links and displaying? (Thanks to Hoaas)
```
<@snackle> https://twitter.com/JSportsnet/status/348114324004413440
<@milo> @JSportsnet (John Shannon): Am told that Tippett's new deal is for 5 years, and he's "committed to the franchise where ever it ends up". (44m ago)
```
```
<@Hoaas> Should work on links to profiles aswell: https://twitter.com/EricFrancis
<@Bunisher> @EricFrancis (Eric Francis): HNIC-turned-Sportsnet analyst, Calgary Sun columnist...
```
Load the messageparser plugin:
```
/msg <bot> load MessageParser
/msg <bot> messageparser add global "https?://twitter\.com/([^ \t/]+)(?:$|[ \t])" "Tweety twitter --info $1"
/msg <bot> messageparser add global "https?://twitter\.com/([A-Za-z0-9_]+)/status/([0-9]+)" "Tweety twitter --id $2"
```
## About
All of my plugins are free and open source. When I first started out, one of the main reasons I was
able to learn was due to other code out there. If you find a bug or would like an improvement, feel
free to give me a message on IRC or fork and submit a pull request. Many hours do go into each plugin,
so, if you're feeling generous, I do accept donations via Amazon or browse my [wish list](http://amzn.com/w/380JKXY7P5IKE).
I'm always looking for work, so if you are in need of a custom feature, plugin or something bigger, contact me via GitHub or IRC.

41
Tweety/__init__.py Normal file
View File

@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2013, spline
###
"""
Add a description of the plugin (to be presented to the user inside the wizard)
here. This should describe *what* the plugin does.
"""
import supybot
import supybot.world as world
# Use this for the version of this plugin. You may wish to put a CVS keyword
# in here if you're keeping the plugin in CVS or some similar system.
__version__ = ""
# XXX Replace this with an appropriate author or supybot.Author instance.
__author__ = supybot.Author('reticulatingspline', 'spline', 'spline')
# This is a dictionary mapping supybot.Author instances to lists of
# contributions.
__contributors__ = {}
# This is a url where the most recent plugin package can be downloaded.
__url__ = 'http://github.com/reticulatingspline/Supybot-Tweety'
import config
import plugin
reload(plugin) # In case we're being reloaded.
reload(config)
# Add more reloads here if you add third-party modules and want them to be
# reloaded when this plugin is reloaded. Don't forget to import them as well!
if world.testing:
import test
Class = plugin.Class
configure = config.configure
# vim:set shiftwidth=4 tabstop=4 expandtab textwidth=250:

34
Tweety/config.py Normal file
View File

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2013, spline
###
import supybot.conf as conf
import supybot.registry as registry
def configure(advanced):
# This will be called by supybot to configure this module. advanced is
# a bool that specifies whether the user identified himself as an advanced
# user or not. You should effect your configuration by manipulating the
# registry as appropriate.
from supybot.questions import expect, anything, something, yn
conf.registerPlugin('Tweety', True)
Tweety = conf.registerPlugin('Tweety')
conf.registerGlobalValue(Tweety,'consumerKey',registry.String('', """The consumer key of the application."""))
conf.registerGlobalValue(Tweety,'consumerSecret',registry.String('', """The consumer secret of the application.""", private=True))
conf.registerGlobalValue(Tweety,'accessKey',registry.String('', """The Twitter Access Token key for the bot's account"""))
conf.registerGlobalValue(Tweety,'accessSecret',registry.String('', """The Twitter Access Token secret for the bot's account""", private=True))
conf.registerChannelValue(Tweety,'hideRealName',registry.Boolean(False, """Do not show real name when displaying tweets."""))
conf.registerChannelValue(Tweety,'addShortUrl',registry.Boolean(False, """Whether or not to add a short URL to the tweets."""))
conf.registerChannelValue(Tweety,'woeid',registry.Integer(1, """Where On Earth ID. World Wide is 1. USA is 23424977."""))
conf.registerChannelValue(Tweety,'defaultSearchResults',registry.Integer(3, """Default number of results to return on searches."""))
conf.registerChannelValue(Tweety,'maxSearchResults',registry.Integer(10, """Maximum number of results to return on searches"""))
conf.registerChannelValue(Tweety,'defaultResults',registry.Integer(1, """Default number of results to return on timelines."""))
conf.registerChannelValue(Tweety,'maxResults',registry.Integer(10, """Maximum number of results to return on timelines."""))
conf.registerChannelValue(Tweety,'outputColorTweets',registry.Boolean(False, """When outputting Tweets, display them with some color."""))
conf.registerChannelValue(Tweety,'hideHashtagsTrends',registry.Boolean(False, """When displaying trends, should we display #hashtags? Default is no."""))
conf.registerChannelValue(Tweety,'requireVoiceOrAbove',registry.Boolean(False, """Only allows a user with voice or above on a channel to use commands."""))
conf.registerChannelValue(Tweety,'colorTweetURLs',registry.Boolean(False, """Try and color URLs (red) in Tweets?"""))
# vim:set shiftwidth=4 tabstop=4 expandtab textwidth=250:

1
Tweety/local/__init__.py Normal file
View File

@ -0,0 +1 @@
# Stub so local is a module, used for third-party modules

663
Tweety/plugin.py Normal file
View File

@ -0,0 +1,663 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2013, spline
###
# my libs
import urllib.request, urllib.error, urllib.parse
import json
import requests
import urllib
# libraries for time_created_at
import time
from datetime import datetime
# for unescape
import re
import html.entities
# oauthtwitter
import oauth2 as oauth
# supybot libs
import supybot.utils as utils
from supybot.commands import *
import supybot.plugins as plugins
import supybot.ircutils as ircutils
import supybot.callbacks as callbacks
from bs4 import BeautifulSoup
class OAuthApi:
"""OAuth class to work with Twitter v1.1 API."""
def __init__(self, consumer_key, consumer_secret, token, token_secret):
token = oauth.Token(token, token_secret)
self._Consumer = oauth.Consumer(consumer_key, consumer_secret)
self._signature_method = oauth.SignatureMethod_HMAC_SHA1()
self._access_token = token
def _FetchUrl(self,url, parameters=None):
"""Fetch a URL with oAuth. Returns a string containing the body of the response."""
extra_params = {}
if parameters:
extra_params.update(parameters)
req = self._makeOAuthRequest(url, params=extra_params)
opener = urllib.request.build_opener(urllib.request.HTTPHandler(debuglevel=0))
url = req.to_url()
url_data = opener.open(url)
opener.close()
return url_data
def _makeOAuthRequest(self, url, token=None, params=None):
"""Make a OAuth request from url and parameters. Returns oAuth object."""
oauth_base_params = {
'oauth_version': "1.0",
'oauth_nonce': oauth.generate_nonce(),
'oauth_timestamp': int(time.time())
}
if params:
params.update(oauth_base_params)
else:
params = oauth_base_params
if not token:
token = self._access_token
request = oauth.Request(method="GET", url=url, parameters=params)
request.sign_request(self._signature_method, self._Consumer, token)
return request
def ApiCall(self, call, parameters={}):
"""Calls the twitter API with 'call' and returns the twitter object (JSON)."""
try:
data = self._FetchUrl("https://api.twitter.com/1.1/" + call + ".json", parameters)
except urllib.error.HTTPError as e: # http error code.
return e.code
except urllib.error.URLError as e: # http "reason"
return e.reason
else: # return data if good.
return data
class Tweety(callbacks.Plugin):
"""Public Twitter class for working with the API."""
threaded = True
def __init__(self, irc):
self.__parent = super(Tweety, self)
self.__parent.__init__(irc)
self.twitterApi = False
if not self.twitterApi:
self._checkAuthorization()
def _httpget(self, url, h=None, d=None, l=False):
"""General HTTP resource fetcher. Pass headers via h, data via d, and to log via l."""
try:
if h and d:
page = utils.web.getUrl(url, headers=h, data=d)
else:
h = {"User-Agent":"Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:17.0) Gecko/20100101 Firefox/17.0"}
page = utils.web.getUrl(url, headers=h)
try:
page = page.decode()
except:
page = page.decode('iso-8859-1')
return page
except utils.web.Error as e:
self.log.error("ERROR opening {0} message: {1}".format(url, e))
return None
def _shortenUrl(self, url):
"""Shortens a long URL into a short one."""
api_key = self.registryValue('bitlyKey')
url_enc = urllib.parse.quote_plus(url)
api_url = 'https://api-ssl.bitly.com/v3/shorten?access_token={}&longUrl={}&format=json'
try:
data = requests.get(api_url.format(api_key, url_enc)).json()
url2 = data['data'].get('url')
if url2.strip():
return url2.strip()
else:
return url
except:
self.log.error("ERROR: Failed shortening url: {0}".format(longurl))
return url
def _checkAuthorization(self):
""" Check if we have our keys and can auth."""
if not self.twitterApi: # if not set, try and auth.
failTest = False # first check that we have all 4 keys.
for checkKey in ('consumerKey', 'consumerSecret', 'accessKey', 'accessSecret'):
try: # try to see if each key is set.
testKey = self.registryValue(checkKey)
except: # a key is not set, break and error.
self.log.debug("Failed checking keys. We're missing the config value for: {0}. Please set this and try again.".format(checkKey))
failTest = True
break
# if any missing, throw an error and keep twitterApi=False
if failTest:
self.log.error('Failed getting keys. You must set all 4 keys in config variables and reload plugin.')
return False
# We have all 4 keys. Now lets see if they are valid by calling verify_credentials in the API.
self.log.info("Got all 4 keys. Now trying to auth up with Twitter.")
twitterApi = OAuthApi(self.registryValue('consumerKey'), self.registryValue('consumerSecret'), self.registryValue('accessKey'), self.registryValue('accessSecret'))
data = twitterApi.ApiCall('account/verify_credentials')
# check the response. if we can load json, it means we're authenticated. else, return response.
try: # if we pass, response is validated. set self.twitterApi w/object.
json.loads(data.read().decode())
self.log.info("I have successfully authorized and logged in to Twitter using your credentials.")
self.twitterApi = OAuthApi(self.registryValue('consumerKey'), self.registryValue('consumerSecret'), self.registryValue('accessKey'), self.registryValue('accessSecret'))
except: # response failed. Return what we got back.
self.log.error("ERROR: I could not log in using your credentials. Message: {0}".format(data))
return False
else: # if we're already validated, pass.
pass
########################
# COLOR AND FORMATTING #
########################
def _red(self, string):
"""Returns a red string."""
return ircutils.mircColor(string, 'red')
def _blue(self, string):
"""Returns a blue string."""
return ircutils.mircColor(string, 'blue')
def _bold(self, string):
"""Returns a bold string."""
return ircutils.bold(string)
def _ul(self, string):
"""Returns an underline string."""
return ircutils.underline(string)
def _bu(self, string):
"""Returns a bold/underline string."""
return ircutils.bold(ircutils.underline(string))
######################
# INTERNAL FUNCTIONS #
######################
def _unescape(self, text):
"""Created by Fredrik Lundh (http://effbot.org/zone/re-sub.htm#unescape-html)"""
# quick dump \n and \r, usually coming from bots that autopost html.
text = text.replace('\n', ' ').replace('\r', ' ')
# now the actual unescape.
def fixup(m):
text = m.group(0)
if text[:2] == "&#":
# character reference
try:
if text[:3] == "&#x":
return chr(int(text[3:-1], 16))
else:
return chr(int(text[2:-1]))
except (ValueError, OverflowError):
pass
else:
# named entity
try:
text = chr(html.entities.name2codepoint[text[1:-1]])
except KeyError:
pass
return text # leave as is
return re.sub("&#?\w+;", fixup, text)
def _time_created_at(self, s):
"""
Return relative time delta between now and s (dt string).
"""
try: # timeline's created_at Tue May 08 10:58:49 +0000 2012
ddate = time.strptime(s, "%a %b %d %H:%M:%S +0000 %Y")[:-2]
except ValueError:
try: # search's created_at Thu, 06 Oct 2011 19:41:12 +0000
ddate = time.strptime(s, "%a, %d %b %Y %H:%M:%S +0000")[:-2]
except ValueError:
return s
# do the math
d = datetime.utcnow() - datetime(*ddate, tzinfo=None)
# now parse and return.
if d.days:
rel_time = "{:1d}d ago".format(abs(d.days))
elif d.seconds > 3600:
rel_time = "{:.1f}h ago".format(round((abs(d.seconds) / 3600),1))
elif 60 <= d.seconds < 3600:
rel_time = "{:.1f}m ago".format(round((abs(d.seconds) / 60),1))
else:
rel_time = "%ss ago" % (abs(d.seconds))
return rel_time
def _outputTweet(self, irc, msg, nick, name, verified, text, time, tweetid):
"""
Constructs string to output for Tweet. Used for tsearch and twitter.
"""
# build output string.
if self.registryValue('outputColorTweets', msg.args[0]):
ret = "@{0}".format(self._ul(self._blue(nick)))
else: # bold otherwise.
ret = "@{0}".format(self._bu(nick))
if verified:
string = self._bold(ircutils.mircColor("", 'white', 'blue'))
ret += "{}".format(string)
# show real name in tweet output?
if not self.registryValue('hideRealName', msg.args[0]):
ret += " ({0})".format(name)
# add in the end with the text + tape.
if self.registryValue('colorTweetURLs', msg.args[0]): # color urls.
text = re.sub(r'(http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+)', self._red(r'\1'), text)
ret += ": {0} ({1})".format(text, self._bold(time))
else: # only bold time. no text color.
ret += ": {0} ({1})".format(text, self._bold(time))
# short url the link to the tweet?
if self.registryValue('addShortUrl', msg.args[0]):
url = self._createShortUrl(nick, tweetid)
if url: # if we got a url back.
ret += " {0}".format(url)
# now return.
return ret
def _createShortUrl(self, nick, tweetid):
"""Shortens a tweet into a short one."""
api_key = self.registryValue('bitlyKey')
longurl = "https://twitter.com/%s/status/%s" % (nick, tweetid)
api_url = 'https://api-ssl.bitly.com/v3/shorten?access_token={}&longUrl={}&format=json'
try:
data = requests.get(api_url.format(api_key, longurl)).json()
url2 = data['data'].get('url')
if url2.strip():
return url2.strip()
else:
return longurl
except:
self.log.error("ERROR: Failed shortening url: {0}".format(longurl))
return longurl
def _woeid_lookup(self, lookup):
"""<location>
Use Yahoo's API to look-up a WOEID.
"""
query = "SELECT * FROM geo.places WHERE text='%s'" % lookup
params = {"q": query,
"format":"json",
"diagnostics":"false",
"env":"store://datatables.org/alltableswithkeys" }
# everything in try/except block incase it breaks.
try:
data = requests.get('http://woeid.rosselliot.co.nz/lookup/{0}'.format(lookup))
if not data: # http fetch breaks.
irc.reply("ERROR")
return
soup = BeautifulSoup(data.text)
woeid = soup.find("td", class_='woeid').getText()
return woeid
except Exception as err:
self.log.error("ERROR: Failed looking up WOEID for '{0}' :: {1}".format(lookup, err))
return None
####################
# PUBLIC FUNCTIONS #
####################
def woeidlookup(self, irc, msg, args, lookup):
"""<location>
Search Yahoo's WOEID DB for a location. Useful for the trends variable.
Ex: London or Boston
"""
woeid = self._woeid_lookup(lookup)
if woeid:
irc.reply("WOEID: {0} for '{1}'".format(self._bold(woeid), lookup))
else:
irc.reply("ERROR: Something broke trying to find a WOEID for '{0}'".format(lookup))
woeidlookup = wrap(woeidlookup, ['text'])
def ratelimits(self, irc, msg, args):
"""
Display current rate limits for your twitter API account.
"""
# before we do anything, make sure we have a twitterApi object.
if not self.twitterApi:
irc.reply("ERROR: Twitter is not authorized. Please check logs before running this command.")
return
# make API call.
data = self.twitterApi.ApiCall('application/rate_limit_status', parameters={'resources':'trends,search,statuses,users'})
try:
data = json.loads(data.read().decode())
except:
irc.reply("ERROR: Failed to lookup ratelimit data: {0}".format(data))
return
# parse data;
data = data.get('resources')
if not data: # simple check if we have part of the json dict.
irc.reply("ERROR: Failed to fetch application rate limit status. Something could be wrong with Twitter.")
self.log.error("ERROR: fetching rate limit data. '{0}'".format(data))
return
# dict of resources we want and how to parse. key=human name, values are for the json dict.
resources = {'trends':['trends', '/trends/place'],
'tsearch':['search', '/search/tweets'],
'twitter --id':['statuses', '/statuses/show/:id'],
'twitter --info':['users', '/users/show/:id'],
'twitter timeline':['statuses', '/statuses/user_timeline'] }
# now iterate through dict above.
for resource in resources:
rdict = resources[resource] # get value.
endpoint = data.get(rdict[0]).get(rdict[1]) # value[0], value[1]
minutes = "%sm%ss" % divmod(int(endpoint['reset'])-int(time.time()), 60) # math.
output = "Reset in: {0} Remaining: {1}".format(minutes, endpoint['remaining'])
irc.reply("{0} :: {1}".format(self._bold(resource), output))
ratelimits = wrap(ratelimits)
def trends(self, irc, msg, args, getopts, optwoeid):
"""[--exclude] [location]
Returns the Top 10 Twitter trends for a specific location. Use optional argument location for trends.
Defaults to worldwide and can be set via config variable.
Use --exclude to not include #hashtags in trends data.
Ex: Boston or --exclude London
"""
# enforce +voice or above to use command?
if self.registryValue('requireVoiceOrAbove', msg.args[0]): # should we check?
if ircutils.isChannel(msg.args[0]): # are we in a channel?
if not irc.state.channels[msg.args[0]].isVoicePlus(msg.nick): # are they + or @?
irc.error("ERROR: You have to be at least voiced to use the trends command in {0}.".format(msg.args[0]))
return
# before we do anything, make sure we have a twitterApi object.
if not self.twitterApi:
irc.reply("ERROR: Twitter is not authorized. Please check logs before running this command.")
return
# default arguments.
args = {'id': self.registryValue('woeid', msg.args[0]),
'exclude': self.registryValue('hideHashtagsTrends', msg.args[0])}
# handle input.
if getopts:
for (key, value) in getopts:
if key == 'exclude': # remove hashtags from trends.
args['exclude'] = 'hashtags'
# work with woeid. 1 is world, the default. can be set via input or via config.
if optwoeid: # if we have an input location, lookup the woeid.
if optwoeid.lower().startswith('world'): # looking for worldwide or some variation. (bypass)
args['id'] = 1 # "World Wide" is worldwide (odd bug) = 1.
else: # looking for something else.
woeid = self._woeid_lookup(optwoeid) # yahoo search for woeid.
if woeid: # if we get a returned value, set it. otherwise default value.
args['id'] = woeid
else: # location not found.
irc.reply("ERROR: I could not lookup location: {0}. Try a different location.".format(optwoeid))
return
# now build our API call
data = self.twitterApi.ApiCall('trends/place', parameters=args)
try:
data = json.loads(data.read().decode())
except:
irc.reply("ERROR: failed to lookup trends on Twitter: {0}".format(data))
return
# now, before processing, check for errors:
if 'errors' in data:
if data['errors'][0]['code'] == 34: # 34 means location not found.
irc.reply("ERROR: I do not have any trends for: {0}".format(optwoeid))
return
else: # just return the message.
errmsg = data['errors'][0]
irc.reply("ERROR: Could not load trends. ({0} {1})".format(errmsg['code'], errmsg['message']))
return
# if no error here, we found trends. prepare string and output.
location = data[0]['locations'][0]['name']
ttrends = " | ".join([trend['name'] for trend in data[0]['trends']])
irc.reply("Top 10 Twitter Trends in {0} :: {1}".format(self._bold(location), ttrends))
trends = wrap(trends, [getopts({'exclude':''}), optional('text')])
def tsearch(self, irc, msg, args, optlist, optterm):
"""[--num number] [--searchtype mixed,recent,popular] [--lang xx] <term>
Searches Twitter for the <term> and returns the most recent results.
--num is number of results. (1-10)
--searchtype being recent, popular or mixed. Popular is the default.
Ex: --num 3 breaking news
"""
# enforce +voice or above to use command?
if self.registryValue('requireVoiceOrAbove', msg.args[0]): # should we check?
if ircutils.isChannel(msg.args[0]): # are we in a channel?
if not irc.state.channels[msg.args[0]].isVoicePlus(msg.nick): # are they + or @?
irc.error("ERROR: You have to be at least voiced to use the tsearch command in {0}.".format(msg.args[0]))
return
# before we do anything, make sure we have a twitterApi object.
if not self.twitterApi:
irc.reply("ERROR: Twitter is not authorized. Please check logs before running this command.")
return
# default arguments.
tsearchArgs = {'include_entities':'false',
'tweet_mode': 'extended',
'count': self.registryValue('defaultSearchResults', msg.args[0]),
'lang':'en',
'q':utils.web.urlquote(optterm)}
# check input.
if optlist:
for (key, value) in optlist:
if key == 'num': # --num
maxresults = self.registryValue('maxSearchResults', msg.args[0])
if not (1 <= value <= maxresults): # make sure it's between what we should output.
irc.reply("ERROR: '{0}' is not a valid number of tweets. Range is between 1 and {1}.".format(value, maxresults))
return
else: # change number to output.
tsearchArgs['count'] = value
if key == 'searchtype': # getopts limits us here.
tsearchArgs['result_type'] = value # limited by getopts to valid values.
if key == 'lang': # lang . Uses ISO-639 codes like 'en' http://en.wikipedia.org/wiki/ISO_639-1
tsearchArgs['lang'] = value
# now build our API call.
data = self.twitterApi.ApiCall('search/tweets', parameters=tsearchArgs)
try:
data = json.loads(data.read().decode())
except:
irc.reply("ERROR: Something went wrong trying to search Twitter. ({0})".format(data))
return
# check the return data.
results = data.get('statuses') # data returned as a dict.
if not results or len(results) == 0: # found nothing or length 0.
irc.reply("ERROR: No Twitter Search results found for '{0}'".format(optterm))
return
else: # we found something.
for result in results[0:int(tsearchArgs['count'])]: # iterate over each.
nick = self._unescape(result['user'].get('screen_name'))
name = self._unescape(result["user"].get('name'))
verified = result['user'].get('verified')
text = self._unescape(result.get('full_text')) or self._unescape(result.get('text'))
date = self._time_created_at(result.get('created_at'))
tweetid = result.get('id_str')
# build output string and output.
output = self._outputTweet(irc, msg, nick, name, verified, text, date, tweetid)
irc.reply(output)
tsearch = wrap(tsearch, [getopts({'num':('int'),
'searchtype':('literal', ('popular', 'mixed', 'recent')),
'lang':('somethingWithoutSpaces')}),
('text')])
def twitter(self, irc, msg, args, optlist, optnick, opturl):
"""[--noreply] [--nort] [--num number] <nick> | [--id id] | [--info nick]
Returns last tweet or 'number' tweets (max 10). Shows all tweets, including rt and reply.
To not display replies or RT's, use --noreply or --nort, respectively.
Or returns specific tweet with --id 'tweet#'.
Or returns information on user with --info 'name'.
Ex: --info @cnn OR --id 337197009729622016 OR --number 3 @drudge
"""
# enforce +voice or above to use command?
if self.registryValue('requireVoiceOrAbove', msg.args[0]): # should we check?
if ircutils.isChannel(msg.args[0]): # are we in a channel?
if not irc.state.channels[msg.args[0]].isVoicePlus(msg.nick): # are they + or @?
irc.error("ERROR: You have to be at least voiced to use the twitter command in {0}.".format(msg.args[0]))
return
# before we do anything, make sure we have a twitterApi object.
if not self.twitterApi:
irc.reply("ERROR: Twitter is not authorized. Please check logs before running this command.")
return
# now begin
optnick = optnick.replace('@','') # strip @ from input if given.
# default options.
args = {'id': False,
'nort': False,
'noreply': False,
'url': False,
'num': self.registryValue('defaultResults', msg.args[0]),
'info': False}
# handle input optlist.
if optlist:
for (key, value) in optlist:
if key == 'id':
args['id'] = True
if key == 'url':
args['url'] = True
if key == 'nort':
args['nort'] = True
if key == 'noreply':
args['noreply'] = True
if key == 'num':
maxresults = self.registryValue('maxResults', msg.args[0])
if not (1 <= value <= maxresults): # make sure it's between what we should output.
irc.reply("ERROR: '{0}' is not a valid number of tweets. Range is between 1 and {1}.".format(value, maxresults))
return
else: # number is valid so return this.
args['num'] = value
if key == 'info':
args['info'] = True
# handle the three different rest api endpoint urls + twitterArgs dict for options.
if args['id']: # -id #.
apiUrl = 'statuses/show'
twitterArgs = {'id': optnick, 'include_entities':'false', 'tweet_mode': 'extended'}
elif args['info']: # --info.
apiUrl = 'users/show'
twitterArgs = {'screen_name': optnick, 'include_entities':'false'}
else: # if not an --id or --info, we're printing from their timeline.
apiUrl = 'statuses/user_timeline'
twitterArgs = {'screen_name': optnick, 'count': args['num'], 'tweet_mode': 'extended'}
if args['nort']: # show retweets?
twitterArgs['include_rts'] = 'false'
else: # default is to show retweets.
twitterArgs['include_rts'] = 'true'
if args['noreply']: # show replies?
twitterArgs['exclude_replies'] = 'true'
else: # default is to NOT exclude replies.
twitterArgs['exclude_replies'] = 'false'
# call the Twitter API with our data.
data = self.twitterApi.ApiCall(apiUrl, parameters=twitterArgs)
try:
data = json.loads(data.read().decode())
except:
irc.reply("ERROR: Failed to lookup Twitter for '{0}' ({1}) ".format(optnick, data))
return
# before anything, check for errors. errmsg is conditional.
if 'errors' in data:
if data['errors'][0]['code'] == 34: # not found.
if args['id']: # --id #. # is not found.
errmsg = "ERROR: Tweet ID '{0}' not found.".format(optnick)
else: # --info <user> or twitter <user> not found.
errmsg = "ERROR: Twitter user '{0}' not found.".format(optnick)
irc.reply(errmsg) # print the error and exit.
return
else: # errmsg is not 34. just return it.
errmsg = data['errors'][0]
irc.reply("ERROR: {0} {1}".format(errmsg['code'], errmsg['message']))
return
# no errors, so we process data conditionally.
if args['id']: # If --id was given for a single tweet.
url = ''
if opturl:
url = ' - {}'.format(self._shortenUrl(opturl))
text = self._unescape(data.get('full_text')) or self._unescape(data.get('text'))
nick = self._unescape(data["user"].get('screen_name'))
name = self._unescape(data["user"].get('name'))
verified = data["user"].get('verified')
relativeTime = self._time_created_at(data.get('created_at'))
tweetid = data.get('id')
# prepare string to output and send to irc.
output = self._outputTweet(irc, msg, nick, name, verified, text, relativeTime, tweetid)
output += url
irc.reply(output)
return
elif args['info']: # --info to return info on a Twitter user.
location = data.get('location')
followers = data.get('followers_count')
friends = data.get('friends_count')
description = self._unescape(data.get('description'))
screen_name = self._unescape(data.get('screen_name'))
created_at = data.get('created_at')
statuses_count = data.get('statuses_count')
protected = data.get('protected')
name = self._unescape(data.get('name'))
url = data.get('url')
# build output string conditionally. build string conditionally.
ret = self._bu("@{0}".format(screen_name))
ret += " ({0})".format(name)
if protected: # is the account protected/locked?
ret += " [{0}]:".format(self._bu('LOCKED'))
else: # open.
ret += ":"
if url: # do they have a url?
ret += " {0}".format(self._ul(url))
if description: # a description?
ret += " {0}".format(self._unescape(description))
ret += " [{0} friends,".format(self._bold(friends))
ret += " {0} tweets,".format(self._bold(statuses_count))
ret += " {0} followers,".format(self._bold(followers))
ret += " signup: {0}".format(self._bold(self._time_created_at(created_at)))
if location: # do we have location?
ret += " Location: {0}]".format(self._bold(location))
else: # nope.
ret += "]"
# finally, output.
irc.reply(ret)
return
else: # this will display tweets/a user's timeline. can be n+1 tweets.
if len(data) == 0: # no tweets found.
irc.reply("ERROR: '{0}' has not tweeted yet.".format(optnick))
return
for tweet in data: # n+1 tweets found. iterate through each tweet.
text = self._unescape(tweet.get('full_text')) or self._unescape(tweet.get('text'))
nick = self._unescape(tweet["user"].get('screen_name'))
name = self._unescape(tweet["user"].get('name'))
verified = tweet['user'].get('verified')
tweetid = tweet.get('id')
relativeTime = self._time_created_at(tweet.get('created_at'))
# prepare string to output and send to irc.
output = self._outputTweet(irc, msg, nick, name, verified, text, relativeTime, tweetid)
irc.reply(output)
twitter = wrap(twitter, [getopts({'noreply':'',
'nort':'',
'info':'',
'id':'',
'url':'',
'num':('int')}), ('somethingWithoutSpaces'), optional('somethingWithoutSpaces')])
Class = Tweety
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=279:

3
Tweety/requirements.txt Normal file
View File

@ -0,0 +1,3 @@
oauth2
requests
bs4

29
Tweety/test.py Normal file
View File

@ -0,0 +1,29 @@
# Copyright (c) 2013-2014, spline
###
from supybot.test import *
import os
class TweetyTestCase(PluginTestCase):
plugins = ('Tweety',)
def setUp(self):
PluginTestCase.setUp(self)
# get our variables via the secure environment.
consumerKey = os.environ.get('consumerKey')
consumerSecret = os.environ.get('consumerSecret')
accessKey = os.environ.get('accessKey')
accessSecret = os.environ.get('accessSecret')
# now set them.
conf.supybot.plugins.Tweety.consumerKey.setValue(consumerKey)
conf.supybot.plugins.Tweety.consumerSecret.setValue(consumerSecret)
conf.supybot.plugins.Tweety.accessKey.setValue(accessKey)
conf.supybot.plugins.Tweety.accessSecret.setValue(accessSecret)
def testTweety(self):
self.assertSnarfResponse('reload Tweety', 'The operation succeeded.')
self.assertRegexp('trends', 'Top 10 Twitter Trends')
self.assertRegexp('twitter --info CNN', 'CNN')
self.assertRegexp('twitter CNN', 'CNN')
# vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79: