import os, re, time, base64
import supybot.utils as utils
import http.server, sqlite3
host = 'http://domain.tld'
port = 80
standalone = True
webpath = '/bantracker'
username = 'username'
password = 'password'
filename = '/home/botaccount/data/networkname/ChanTracker.db'
channels = [] # empty to allow view of all channels recorded, otherwise restrict the views to channels
# httpd server address
if not standalone:
servaddr = '127.0.0.1'
else:
servaddr = ''
# usage python server.py
auth = '%s:%s' % (username,password)
base64string = base64.b64encode(auth.encode('utf-8')).decode('utf-8')
def weblink():
weblink = host
if standalone:
weblink += ':%s' % port
else:
weblink += webpath
weblink += '/?hash=%s' % base64string
return weblink
def htmlEscape(text):
return text.replace('&','&').replace('<','<').replace('>','>').replace('"','"')
class BanTracker(http.server.BaseHTTPRequestHandler):
if not standalone:
def log_request(self, *args):
pass # disable logging
def do_GET(self):
self.page(self.path)
def page(self, query):
def write(subtitle, body):
page = [
'', '', '
',
'BanTracker%s' % (' » %s' % subtitle if subtitle else ''),
'',
'',
'', ''
] + body + ['', '']
self.send_response(200)
self.send_header("Content-Type", "text/html")
full = '\n'.join(page)
print('HTML lines %s' % len(full))
self.send_header("Content-Length", len(full))
self.end_headers()
self.wfile.write(full.encode('utf-8'))
if standalone:
h = '%s:%s/' % (host,port)
else:
h = '%s/' % webpath
if not query:
return
if query.startswith('/?username='):
query = query.replace('/?','')
a = query.split('&')
u = p = None
for item in a:
aa = item.split('=')
if aa[0] == 'username':
u = aa[1]
if aa[0] == 'password':
p = aa[1]
if u and p:
auth = '%s:%s' % (u,p)
raw = base64.b64encode(auth.encode('utf-8')).decode('utf-8')
if raw != base64string:
query = ''
else:
query = '/?hash=%s' % base64string
if not query.startswith('/?hash='):
subtitle = ''
body = [
''
]
write(subtitle, body)
return
query = query.replace('%3D','=')
query = query.replace('/?hash=%s' % base64string,'')
query = query.lstrip('&')
q = '?hash=%s' % base64string
query = utils.web.urlunquote(query)
subtitle = ''
body = [
'
',
'
',
''
]
if not query:
write(subtitle, body)
return
print(query)
subtitle = query
db = self._getbandb()
c = db.cursor()
ar = []
if query.startswith('id='):
search = query.split('=')[1]
si = int(search)
c.execute("""SELECT id,channel,oper,kind,mask,begin_at,end_at,removed_at,removed_by FROM bans WHERE id=?""",(si,))
r = c.fetchall()
if len(r):
ban = r[0]
(bid,channel,oper,kind,mask,begin_at,end_at,removed_at,removed_by) = ban
if not channels or channel in channels:
body.extend([
'
' % (h,q,utils.web.urlencode({'removed_by':removed_by}),removed_by)])
c.execute("""SELECT full,log FROM nicks WHERE ban_id=?""",(bid,))
r = c.fetchall()
if len(r):
body.append('
Logs
')
for (full,log) in r:
body.append('
for %s
' % full)
if log != '':
body.append('
')
for line in log.split('\n'):
if line != '':
body.append('
%s
' % htmlEscape(line))
body.append('
')
c.execute("""SELECT oper,at,comment FROM comments WHERE ban_id=?""",(bid,))
r = c.fetchall()
if len(r):
body.extend(['
Comments
', '
'])
for (oper,at,com) in r:
s = time.strftime('%Y-%m-%d %H:%M:%S GMT',time.gmtime(float(at)))
body.append('
%s by %s: %s
' % (s,oper,htmlEscape(com)))
body.append('
')
c.close()
write(subtitle, body)
return
elif query.startswith('channel='):
search = '#'+query.split('=')[1]
c.execute("""SELECT id,channel,oper,kind,mask,begin_at,end_at,removed_at,removed_by FROM bans WHERE channel=? ORDER BY id DESC""",(search,))
r = c.fetchall()
if len(r):
ar.extend(r)
elif query.startswith('removed_by='):
search = query.split('=')[1]
c.execute("""SELECT id,channel,oper,kind,mask,begin_at,end_at,removed_at,removed_by FROM bans WHERE removed_by=? ORDER BY id DESC""",(search,))
r = c.fetchall()
if len(r):
ar.extend(r)
elif query.startswith('oper='):
search = query.split('=')[1]
c.execute("""SELECT id,channel,oper,kind,mask,begin_at,end_at,removed_at,removed_by FROM bans WHERE oper=? ORDER BY id DESC""",(search,))
r = c.fetchall()
if len(r):
ar.extend(r)
elif query.startswith('mask='):
search = query.split('=')[1]
sg = '*%s*' % search
sl = '%%%s%%' % search
c.execute("""SELECT ban_id,full FROM nicks WHERE full GLOB ? OR full LIKE ? OR log GLOB ? OR log LIKE ? ORDER BY ban_id DESC""",(sg,sl,sg,sl))
r = c.fetchall()
L = []
a = {}
if len(r):
d = []
for (bid,full) in r:
if bid not in d:
d.append(bid)
for bid in d:
c.execute("""SELECT id,channel,oper,kind,mask,begin_at,end_at,removed_at,removed_by FROM bans WHERE id=?""",(bid,))
r = c.fetchall()
if len(r):
for ban in r:
a[ban[0]] = ban
c.execute("""SELECT id,channel,oper,kind,mask,begin_at,end_at,removed_at,removed_by FROM bans WHERE mask GLOB ? OR mask LIKE ? ORDER BY id DESC""",(sg,sl))
r = c.fetchall()
if len(r):
for ban in r:
a[ban[0]] = ban
if len(a):
ar = []
for ban in list(a.keys()):
ar.append(a[ban])
ar.sort(key=lambda x: x[0], reverse=True)
elif query.startswith('search='):
search = query.split('=')[1]
search = search.replace('+','*')
print(search)
if search:
if not re.match(r'^[0-9]+$', search):
sg = '*%s*' % search
sl = '%%%s%%' % search
si = None
c.execute("""SELECT ban_id,full FROM nicks WHERE full GLOB ? OR full LIKE ? OR log GLOB ? OR log LIKE ? ORDER BY ban_id DESC""",(sg,sl,sg,sl))
r = c.fetchall()
else:
si = int(search)
r = []
L = []
a = {}
if len(r):
d = []
for (bid,full) in r:
if bid not in d:
d.append(bid)
for bid in d:
c.execute("""SELECT id,channel,oper,kind,mask,begin_at,end_at,removed_at,removed_by FROM bans WHERE id=?""",(bid,))
r = c.fetchall()
if len(r):
for ban in r:
a[ban[0]] = ban
if not si:
c.execute("""SELECT id,channel,oper,kind,mask,begin_at,end_at,removed_at,removed_by FROM bans WHERE mask GLOB ? OR mask LIKE ? OR channel GLOB ? OR channel LIKE ? OR oper GLOB ? OR oper LIKE ? ORDER BY id DESC""",(sg,sl,sg,sl,sg,sl))
else:
c.execute("""SELECT id,channel,oper,kind,mask,begin_at,end_at,removed_at,removed_by FROM bans WHERE id=?""",(si,))
r = c.fetchall()
if len(r):
for ban in r:
a[ban[0]] = ban
if not si:
c.execute("""SELECT ban_id, comment FROM comments WHERE comment GLOB ? OR comment LIKE ? ORDER BY ban_id DESC""",(sg,sl))
r = c.fetchall()
else:
r = []
if len(r):
d = []
for (bid,full) in r:
d.append(bid)
for bid in d:
if bid not in a:
c.execute("""SELECT id,channel,oper,kind,mask,begin_at,end_at,removed_at,removed_by FROM bans WHERE id=?""",(bid,))
r = c.fetchall()
if len(r):
for ban in r:
a[ban[0]] = ban
if len(a):
ar = []
for ban in list(a.keys()):
ar.append(a[ban])
ar.sort(key=lambda x: x[0], reverse=True)
if len(ar):
print('Found %s results' % len(ar))
body.extend([
'
Results %s
' % search,
'
',
'
ID
Channel
Operator
Type
Mask
Begin date
End date
Removed
Removed by
',
''
])
for ban in ar:
(bid,channel,oper,kind,mask,begin_at,end_at,removed_at,removed_by) = ban
if not channels or channel in channels:
s = time.strftime('%Y-%m-%d %H:%M:%S GMT',time.gmtime(float(begin_at)))
body.extend([
'
')
c.close()
write(subtitle, body)
def _getbandb(self):
if os.path.exists(filename):
db = sqlite3.connect(filename,timeout=10)
return db
db = sqlite3.connect(filename)
c = db.cursor()
c.execute("""CREATE TABLE bans (
id INTEGER PRIMARY KEY,
channel VARCHAR(100) NOT NULL,
oper VARCHAR(1000) NOT NULL,
kind VARCHAR(1) NOT NULL,
mask VARCHAR(1000) NOT NULL,
begin_at TIMESTAMP NOT NULL,
end_at TIMESTAMP NOT NULL,
removed_at TIMESTAMP,
removed_by VARCHAR(1000)
)""")
c.execute("""CREATE TABLE nicks (
ban_id INTEGER,
ban VARCHAR(1000) NOT NULL,
full VARCHAR(1000) NOT NULL,
log TEXT NOT NULL
)""")
c.execute("""CREATE TABLE comments (
ban_id INTEGER,
oper VARCHAR(1000) NOT NULL,
at TIMESTAMP NOT NULL,
comment TEXT NOT NULL
)""")
db.commit()
return db
def httpd(handler_class=BanTracker, server_address=(servaddr, port)):
srvr = http.server.HTTPServer(server_address, handler_class)
srvr.serve_forever()
if __name__ == "__main__":
httpd()