#!/usr/bin/env python # this file is under the WTFPLv2 [http://sam.zoy.org/wtfpl] # v1: 2010/05/23 # Author: Tila # You need a configuration file: ~/.fschfsch.py. Config example: # --- # host = 'irc.freenode.net' # port = 7000 # ssl = True # nickname = 'botnickname' # password = 'secret' # channels = ['##fschfsch', '#channel2', '#channel3'] # texts = {'help': 'I am fschfsch, a robot snake that evals python code', # 'sandbox': "I am powered by setrlimit and pysandbox [http://github.com/haypo/pysandbox], I don't fear you"} # --- ''' fschfsch is a Python-evaluating bot. fschfsch is pronounced "fssshh! fssshh!". ''' IN_MAXLEN = 300 # bytes OUT_MAXLEN = 300 # bytes TIMEOUT = 3 # seconds EVAL_MAXTIMESECONDS = TIMEOUT EVAL_MAXMEMORYBYTES = 10 * 1024 * 1024 # 10 MiB try: import sandbox as S except ImportError: print 'You need pysandbox in order to run fschfsch [http://github.com/haypo/pysandbox].' raise try: import twisted except ImportError: print 'You need twisted in order to run fschfsch.' raise from twisted.internet.protocol import ReconnectingClientFactory from twisted.internet import ssl, reactor from twisted.words.im.ircsupport import IRCProto from twisted.words.protocols.irc import IRCClient # other imports import re import sys import os import resource as R import select import signal import time import threading import random def createSandboxConfig(): cfg = S.SandboxConfig( 'stdout', 'stderr', 'regex', 'unicodedata', # flow wants u'\{ATOM SYMBOL}' :-) 'future', 'code', 'time', 'datetime', 'math', 'itertools', 'random', 'encodings', ) cfg.allowModule('sys', 'version', 'hexversion', 'version_info') return cfg def _evalPython(line, locals): locals = dict(locals) try: if "\n" in line: raise SyntaxError() code = compile(line, "", "single") except SyntaxError: code = compile(line, "", "exec") exec code in locals def evalPython(line, locals=None): sandbox = S.Sandbox(config=createSandboxConfig()) if locals is not None: locals = dict(locals) else: locals = dict() try: sandbox.call(_evalPython, line, locals) except BaseException, e: print 'Error: [%s] %s' % (e.__class__.__name__, str(e)) except: print 'Error: ' sys.stdout.flush() def childProcess(line, w, locals): # reseed after a fork to avoid generating the same sequence for each child random.seed() sys.stdout = sys.stderr = os.fdopen(w, 'w') R.setrlimit(R.RLIMIT_CPU, (EVAL_MAXTIMESECONDS, EVAL_MAXTIMESECONDS)) R.setrlimit(R.RLIMIT_AS, (EVAL_MAXMEMORYBYTES, EVAL_MAXMEMORYBYTES)) R.setrlimit(R.RLIMIT_NPROC, (0, 0)) # 0 forks evalPython(line, locals) def handleChild(childpid, r): txt = '' if any(select.select([r], [], [], TIMEOUT)): txt = os.read(r, OUT_MAXLEN + 1) os.close(r) if OUT_MAXLEN < len(txt): txt = txt[:OUT_MAXLEN] + '...' n = 0 while n < 6: pid, status = os.waitpid(childpid, os.WNOHANG) if pid: break time.sleep(.5) n += 1 if not pid: os.kill(childpid, signal.SIGKILL) return 'Timeout' elif os.WIFEXITED(status): txts = txt.rstrip().split('\n') if len(txts) > 1: txt = txts[0].rstrip() + ' [+ %d line(s)]' % (len(txts) - 1) else: txt = txts[0].rstrip() return 'Output: ' + txt elif os.WIFSIGNALED(status): return 'Killed' class EvalJob(threading.Thread): def __init__(self, line, irc, channel): super(EvalJob, self).__init__() self.line = line self.irc = irc self.channel = channel def run(self): output = self.handle_line(self.line) reactor.callFromThread(self.irc.say, self.channel, output) self.irc.executionLock.release() def handle_line(self, line): if IN_MAXLEN < len(line): return '(command is too long: %s bytes, the maximum is %s)' % (len(line), IN_MAXLEN) print("Process %s" % repr(line)) r, w = os.pipe() childpid = os.fork() if not childpid: os.close(r) childProcess(line, w, self.irc.factory.morevars) os._exit(0) else: os.close(w) result = handleChild(childpid, r) print("=> %s" % repr(result)) return result class EvalBot(IRCClient): versionName = 'fschfsch' versionNum = '0.1' #~ def __init__(self, *a, **k): def connectionMade(self): self.nickname = self.factory.nick self.password = self.factory.password self.talkre = re.compile('^%s[>:,] (.*)$' % self.nickname) self.executionLock = threading.Semaphore() self.pingSelfId = None IRCClient.connectionMade(self) def signedOn(self): self.pingSelfId = reactor.callLater(180, self.pingSelf) for chan in self.factory.channels: self.join(chan) def pingSelf(self): # used to avoid some timeouts where fschfsch does not reconnect self.ping(self.nickname) self.pingSelfId = reactor.callLater(180, self.pingSelf) def privmsg(self, user, channel, message): if self.pingSelfId is not None: self.pingSelfId.reset(180) if user.startswith('haypo') and message.startswith('exit'): os._exit(0) if not channel: return if not message.startswith(self.nickname): return if not self.talkre.match(message): return if not self.executionLock.acquire(blocking=False): return pyline = self.talkre.match(message).group(1) pyline = pyline.replace(' $$ ', '\n') self.handleThread = EvalJob(pyline, self, channel) self.handleThread.start() class MyFactory(ReconnectingClientFactory): def __init__(self, **kw): for k in kw: if k in ('nick', 'password', 'channels', 'morevars'): setattr(self, k, kw[k]) protocol = EvalBot def check_output(expr, expected, locals=None): from cStringIO import StringIO original_stdout = sys.stdout try: output = StringIO() sys.stdout = output evalPython(expr, locals) stdout = output.getvalue() assert stdout == expected, "%r != %r" % (stdout, expected) finally: sys.stdout = original_stdout def runTests(): # single check_output('1+1', '2\n') check_output('1; 2', '1\n2\n') check_output( # written in a single line "prime=lambda n,i=2:" "False if n%i==0 else prime(n,i+1) if i*i