Compare commits

...

5 Commits

3 changed files with 190 additions and 81 deletions

112
bot.py
View File

@ -1,90 +1,40 @@
#!/usr/bin/python3
import socket, asyncio, time, re, ssl, ircstates
import socket, asyncio, time, re, ssl, ircstates, importlib
from config import *
import shared
Parse = importlib.import_module('parse')
def _send(msg: str, log=True):
s.send(f"{msg}\n".encode('utf-8'))
shared.sock.send(f"{msg}\n".encode('utf-8'))
if log == True: print(f"> {msg}")
def usermodes(chan, user):
if __name__ == '__main__':
srv = ircstates.Server(NETNAME)
shared.sock = socket.socket()
ctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
if tls == True:
shared.sock = ctx.wrap_socket(shared.sock)
shared.sock.connect((HOST, PORT))
_send(f"NICK {NICK}")
_send(f"USER {NICK} 0 * :{REALNAME}")
try:
return srv.channels[chan].users[user].modes
except:
return []
while True:
recv_data = shared.sock.recv(1024)
recv_lines = srv.recv(recv_data)
def botdesc():
return fileload('desc')
#return "I am a utility bot. Public commands *.echo | Ran by ~julian@~team/~jmjl@~town/~jmjl@~club"
for line in recv_lines:
srv.parse_tokens(line)
def is_admin(source):
return 'julian@envs.net' == source.split('!')[1]
def nslogin():
_send(f"PRIVMSG NickServ@services.tilde.chat :IDENTIFY {NICK} {SERVICES_PW}", log=False)
print("Logged into services")
_send(f"MODE {NICK} +B")
Chan.joins()
class Chan:
def joins():
for i in db['chan'].all():
_send(f"JOIN {','.join([i['name']])}")
def join(channel):
_send(f"JOIN {channel}")
db['chan'].insert(dict(name=channel))
def leave(channel, msg="", kick=False):
if not kick: _send(f"PART {channel} :{msg}")
db['chan'].delete(name=channel)
srv = ircstates.Server(NETNAME)
s = socket.socket()
ctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
if tls == True:
s = ctx.wrap_socket(s)
s.connect((HOST, PORT))
_send(f"NICK {NICK}")
_send(f"USER {NICK} 0 * :{REALNAME}")
try:
while True:
recv_data = s.recv(1024)
recv_lines = srv.recv(recv_data)
for line in recv_lines:
srv.parse_tokens(line)
if not line.command == "PING": print(f"< {line.format()}")
if line.command == "PING": _send(f"PONG :{line.params[0]}", log=False)
elif (line.source == f"NickServ!{SERVICES_HOSTMASK}" and line.command == 'NOTICE' and line.params == ['util','If you do not change within 1 minute, I will change your nick.']): nslogin()
elif (line.command == "INVITE" and line.params[0] == NICK): Chan.join(line.params[1])
elif line.command == "KICK" and line.params[1] == NICK: Chan.leave(line.params[0], kick=True)
elif line.command == "PRIVMSG" and line.params[1] == "!botlist": _send(f"PRIVMSG {line.source.split('!')[0]} :{botdesc()}")
elif line.command == "PRIVMSG" and line.params[1].startswith('%') and is_admin(line.source):
cmdargs = line.params[1].split(' ')
cmd = ''.join(cmdargs[0].split('%')[1:])
args = cmdargs[1:]
usernick = line.source.split('!')[0]
chan = line.params[0]
if cmd == "mode": _send(f"MODE {chan} {' '.join(args)}")
elif cmd == "quit": _send(f"QUIT :{' '.join(args)}")
elif cmd == "msg": _send(f"PRIVMSG {args[0]} :{' '.join(args[1:])}")
elif cmd == "raw": _send(f"{' '.join(args)}")
elif cmd == "join": Chan.join(args[0])
elif cmd == "part": Chan.leave(args[0])
elif (line.command == "PRIVMSG" and line.params[1].startswith('*.') ):
commandargs = line.params[1].split(' ')
command = ''.join(commandargs[0].split('*.')[1:])
args = commandargs[1:]
usernick = line.source.split('!')[0]
chan = line.params[0]
if command == "help": _send(f"PRIVMSG {chan} :{botdesc()}")
elif command == "echo": _send(f"PRIVMSG {chan} :\x033[Echo]\x0F \x032{usernick}\x0F said: \x039{' '.join(args[0:])}")
elif command == "psa" and args[0].startswith('#'):
if 'o' in usermodes(args[0], usernick) or 'q' in usermodes(args[0], usernick): _send(f"PRIVMSG {args[0]} :\x0307**PSA:\x0F {' '.join(args[1:])}")
else: _send(f"PRIVMSG {chan} :Sorry, you aren't allowed to make public service anouncements.")
elif command == "psa" and not args[0].startswith('#'): _send(f"PRIVMSG {chan} :Sorry, the syntax for PSA is: *.psa <channel> <MESSAGE>")
elif 'o' in usermodes(chan, usernick) or 'q' in usermodes(chan, usernick):
if command == "part": Chan.leave(chan, msg=f"Leaving per request of {usernick}")
if command == "topic": _send(f"TOPIC {chan} :{' '.join(args[0:])}")
except KeyboardInterrupt: _send("QUIT :^C")
if line.command == 'PRIVMSG':
if line.params[1] == '%reload':
if Parse.is_admin(line.source):
oldprs = Parse
try:
prs = importlib.reload(Parse)
Parse = prs
except Exception as e:
_send(f"PRIVMSG {line.source.split('!')[0]} :Error reloading: Exception({e})")
Parse = oldprs
Parse.Parse(_send=_send, srv=srv, line=line)
except KeyboardInterrupt: _send("QUIT :^C")

157
parse.py Normal file
View File

@ -0,0 +1,157 @@
import socket, asyncio, time, re, ircstates, importlib
from config import *
import shared
def _send(msg: str, log=True):
shared.sock.send(f"{msg}\n".encode('utf-8'))
if log == True: print(f"> {msg}")
def usermodes(chan, user, srv):
try: return srv.channels[chan].users[user].modes
except KeyError: return []
#def _send(msg: str, log=True):
#s.send(f"{msg}\n".encode('utf-8'))
#if log == True: print(f"> {msg}")
def botdesc():
return fileload('desc')
def is_admin(source):
#return 'julian@envs.net' == source.split('!')[1]
#db['oper'].insert(dict(hostmask='friend!good@person'))
#db['oper'].delete(hostmask='nick!badowner@bad.opers')
try:
for i in db['oper'].all():
if source == i['hostmask']: return True
else: continue
except Exception as e:
_send(f"PRIVMSG julian :{source} caused {e}")
return False
return False
def nslogin():
_send("CAP REQ :echo-message")
_send("CAP END")
_send(f"PRIVMSG NickServ@services.tilde.chat :IDENTIFY {NICK} {SERVICES_PW}", log=False)
print("Logged into services")
_send(f"MODE {NICK} +B")
Chan.joins()
class Chan:
def joins():
for i in db['chan'].all():
_send(f"JOIN {','.join([i['name']])}")
def join(channel):
_send(f"JOIN {channel}")
db['chan'].insert(dict(name=channel))
def leave(channel, msg="", kick=False):
if not kick: _send(f"PART {channel} :{msg}")
db['chan'].delete(name=channel)
class Command:
def ExplicitCommandParser(line, srv):
commandargs = line.params[1].split(' ')
command = ''.join(commandargs[0].split('*.')[1:])
args = commandargs[1:]
usernick = line.source.split('!')[0]
chan = line.params[0]
if command == "help": _send(f"PRIVMSG {chan} :{botdesc()}")
elif command == "echo": _send(f"PRIVMSG {chan} :\x033[Echo]\x0F \x032{usernick}\x0F said: \x039{' '.join(args[0:])}")
elif command == "psa" and args[0].startswith('#'):
if 'o' in usermodes(args[0], usernick, srv) or 'q' in usermodes(args[0], usernick, srv): _send(f"PRIVMSG {args[0]} :\x0307**PSA:\x0F {' '.join(args[1:])}")
else: _send(f"PRIVMSG {chan} :{usernick}: Sorry, you aren't allowed to make public service anouncements.")
elif command == "psa" and not args[0].startswith('#'): _send(f"PRIVMSG {chan} :Sorry, the syntax for PSA is: *.psa <channel> <MESSAGE>")
elif 'o' in usermodes(chan, usernick, srv) or 'q' in usermodes(chan, usernick, srv):
if command == "part": Chan.leave(chan, msg=f"Leaving per request of {usernick}")
if command == "topic": _send(f"TOPIC {chan} :{' '.join(args[0:])}")
if command.endswith("op"):
cmd = command.split("op")[0]
if cmd == 'de': _send("MODE {} -vhoaq {}".format(chan, 5*f"{usernick} "))
elif command == "op": _send("MODE {} +vhoa {}".format(chan, 4*f"{usernick} "))
elif cmd == "deh": _send("MODE {} -h {}".format(chan, 1*f"{usernick} "))
elif cmd == "dev": _send("MODE {} -v {}".format(chan, 1*f"{usernick} "))
elif cmd == "dea": _send("MODE {} -a {}".format(chan, 1*f"{usernick} "))
elif cmd == "deo": _send("MODE {} -o {}".format(chan, 1*f"{usernick} "))
def AdminCommandParser(line, srv):
cmdargs = line.params[1].split(' ')
cmd = ''.join(cmdargs[0].split('%')[1:])
args = cmdargs[1:]
usernick = line.source.split('!')[0]
chan = line.params[0]
if cmd == "mode": _send(f"MODE {chan} {' '.join(args)}")
elif cmd == "quit": _send(f"QUIT :{' '.join(args)}")
elif cmd == "msg": _send(f"PRIVMSG {args[0]} :{' '.join(args[1:])}")
elif cmd == "raw": _send(f"{' '.join(args)}")
elif cmd == "join": Chan.join(args[0])
elif cmd == "part": Chan.leave(args[0])
elif cmd == "rpart": _send(f"PART {chan} :~Bye")
elif cmd == "rjoin" and len(args) > 0: _send(f"JOIN {args[0]}")
elif cmd == "regdrop":
try:
if len(args) > 0: thatchan = args[0]
else:
thatchan = ["#"]
for i in range(0, 5): thatchan.append(importlib.import_module('random').choice(importlib.import_module('string').ascii_letters))
thatchan = ''.join(thatchan)
_send(f"JOIN {thatchan}")
_send(f"PRIVMSG {chan} :{thatchan}")
_send(f"PRIVMSG ChanServ :REGISTER {thatchan}")
_send(f"PRIVMSG ChanServ :DROP {thatchan} {thatchan}")
if len(args) > 1: db['ops'].insert(dict(mask=line.source, chan=thatchan, delete=True, modes=args[1]))
else: db['ops'].insert(dict(mask=line.source, chan=thatchan, delete=True, modes='o'))
_send(f"INVITE {usernick} {thatchan}")
except Exception as e: print(e)
elif cmd == "oper":
try:
if args[0].startswith('-'):
db['oper'].delete(hostmask=args[0][1:])
_send(f"PRIVMSG {chan} :Hostmask {args[0][1:]} is no longer oper")
elif args[0].startswith('+'):
db['oper'].insert(dict(hostmask=args[0][1:]))
_send(f"PRIVMSG {chan} :Hostmask {args[0][1:]} is now oper")
except Exception as e: _send(f"PRIVMSG {chan} :Exception({e})")
elif cmd == "op" and len(args) > 2:
#db['ops'].insert(dict(mask=line.source, chan=thatchan, delete=True, modes=args[1]))
try:
if args[1].startswith('-'):
db['ops'].delete(chan=args[0], modes=args[1][1:])
_send(f"PRIVMSG {chan} :Mode {args[0]}:-{args[1][1:]} {args[2].split('!')[0]}")
elif args[1].startswith('+'):
if len(args) > 3: db['ops'].insert(dict(chan=args[0], modes=args[1][1:], mask=args[2], delete=True))
else: db['ops'].insert(dict(chan=args[0], modes=args[1][1:], mask=args[2], delete=False))
_send(f"PRIVMSG {chan} :Mode {args[0]}:+{args[1][1:]} {args[2].split('!')[0]}")
except Exception as e: _send(f"PRIVMSG {chan} :Exception({e})")
elif cmd == "eval":
try: _send(f"PRIVMSG {chan} :{eval(' '.join(args))}")
except Exception as e: _send(f"PRIVMSG {chan} :Exception({e})")
def privmsg(line, srv):
if line.params[1].startswith('%') and is_admin(line.source): Command.AdminCommandParser(line, srv)
elif line.params[0] != NICK and line.params[1] == "!botlist": _send(f"PRIVMSG {line.params[0]} :{botdesc()}")
elif line.params[0] == NICK and line.params[1] == "!botlist": _send(f"PRIVMSG {line.source.split('!')[0]} :{botdesc()}")
if line.params[1].startswith('*.'): Command.ExplicitCommandParser(line, srv)
def Parse(_send=None, srv=None, line=None):
try: Parser(_send, srv, line)
except Exception as e: print(e)
def Parser(_send=None, srv=None, line=None):
if not line.command == "PING": print(f"< {line.format()}")
if line.command == "PING":
_send(f"PONG :{line.params[0]}", log=False)
if shared.nick_has_changed: _send(f"NICK {NICK}")
elif line.command == "433": _send(f"NICK {NICK}_"); shared.nick_has_changed = True
elif (line.source == f"NickServ!{SERVICES_HOSTMASK}" and line.command == 'NOTICE' and line.params == ['util','If you do not change within 1 minute, I will change your nick.']): nslogin()
elif (line.command == "INVITE" and line.params[0] == NICK): Chan.join(line.params[1])
elif line.command == "KICK" and line.params[1] == NICK: Chan.leave(line.params[0], kick=True)
elif line.command == "PRIVMSG": Command.privmsg(line, srv)
elif line.command == "JOIN":
try:
for i in db['ops'].all():
if line.command == "JOIN" and line.source == i['mask'] and line.params[0] == i['chan']:
nickspace = f"{i['mask'].split('!')[0]} "
_send(f"MODE {i['chan']} +{i['modes']} {len(i['modes'])*nickspace}")
if i['delete']: db['ops'].delete(mask=i['mask'], chan=i['chan'], modes=i['modes'])
except Exception as e: print(e)

2
shared.py Normal file
View File

@ -0,0 +1,2 @@
socket = ""
nick_has_changed = False