Now reloadeable.
This commit is contained in:
parent
f46f98bfe9
commit
4f299caaa9
112
bot.py
112
bot.py
|
@ -1,90 +1,40 @@
|
|||
#!/usr/bin/python3
|
||||
import socket, asyncio, time, re, ssl, ircstates
|
||||
import socket, asyncio, time, re, ssl, ircstates, importlib
|
||||
from config import *
|
||||
import shared
|
||||
|
||||
|
||||
Parse = importlib.import_module('parse')
|
||||
def _send(msg: str, log=True):
|
||||
s.send(f"{msg}\n".encode('utf-8'))
|
||||
shared.sock.send(f"{msg}\n".encode('utf-8'))
|
||||
if log == True: print(f"> {msg}")
|
||||
|
||||
def usermodes(chan, user):
|
||||
if __name__ == '__main__':
|
||||
srv = ircstates.Server(NETNAME)
|
||||
shared.sock = socket.socket()
|
||||
ctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
|
||||
if tls == True:
|
||||
shared.sock = ctx.wrap_socket(s)
|
||||
shared.sock.connect((HOST, PORT))
|
||||
_send(f"NICK {NICK}")
|
||||
_send(f"USER {NICK} 0 * :{REALNAME}")
|
||||
|
||||
try:
|
||||
return srv.channels[chan].users[user].modes
|
||||
except:
|
||||
return []
|
||||
while True:
|
||||
recv_data = shared.sock.recv(1024)
|
||||
recv_lines = srv.recv(recv_data)
|
||||
|
||||
def botdesc():
|
||||
return fileload('desc')
|
||||
#return "I am a utility bot. Public commands *.echo | Ran by ~julian@~team/~jmjl@~town/~jmjl@~club"
|
||||
for line in recv_lines:
|
||||
srv.parse_tokens(line)
|
||||
|
||||
def is_admin(source):
|
||||
return 'julian@envs.net' == source.split('!')[1]
|
||||
|
||||
def nslogin():
|
||||
_send(f"PRIVMSG NickServ@services.tilde.chat :IDENTIFY {NICK} {SERVICES_PW}", log=False)
|
||||
print("Logged into services")
|
||||
_send(f"MODE {NICK} +B")
|
||||
Chan.joins()
|
||||
|
||||
class Chan:
|
||||
def joins():
|
||||
for i in db['chan'].all():
|
||||
_send(f"JOIN {','.join([i['name']])}")
|
||||
def join(channel):
|
||||
_send(f"JOIN {channel}")
|
||||
db['chan'].insert(dict(name=channel))
|
||||
def leave(channel, msg="", kick=False):
|
||||
if not kick: _send(f"PART {channel} :{msg}")
|
||||
db['chan'].delete(name=channel)
|
||||
|
||||
srv = ircstates.Server(NETNAME)
|
||||
s = socket.socket()
|
||||
ctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
|
||||
if tls == True:
|
||||
s = ctx.wrap_socket(s)
|
||||
s.connect((HOST, PORT))
|
||||
_send(f"NICK {NICK}")
|
||||
_send(f"USER {NICK} 0 * :{REALNAME}")
|
||||
|
||||
try:
|
||||
while True:
|
||||
recv_data = s.recv(1024)
|
||||
recv_lines = srv.recv(recv_data)
|
||||
|
||||
for line in recv_lines:
|
||||
srv.parse_tokens(line)
|
||||
|
||||
if not line.command == "PING": print(f"< {line.format()}")
|
||||
if line.command == "PING": _send(f"PONG :{line.params[0]}", log=False)
|
||||
elif (line.source == f"NickServ!{SERVICES_HOSTMASK}" and line.command == 'NOTICE' and line.params == ['util','If you do not change within 1 minute, I will change your nick.']): nslogin()
|
||||
elif (line.command == "INVITE" and line.params[0] == NICK): Chan.join(line.params[1])
|
||||
elif line.command == "KICK" and line.params[1] == NICK: Chan.leave(line.params[0], kick=True)
|
||||
elif line.command == "PRIVMSG" and line.params[1] == "!botlist": _send(f"PRIVMSG {line.source.split('!')[0]} :{botdesc()}")
|
||||
elif line.command == "PRIVMSG" and line.params[1].startswith('%') and is_admin(line.source):
|
||||
cmdargs = line.params[1].split(' ')
|
||||
cmd = ''.join(cmdargs[0].split('%')[1:])
|
||||
args = cmdargs[1:]
|
||||
usernick = line.source.split('!')[0]
|
||||
chan = line.params[0]
|
||||
if cmd == "mode": _send(f"MODE {chan} {' '.join(args)}")
|
||||
elif cmd == "quit": _send(f"QUIT :{' '.join(args)}")
|
||||
elif cmd == "msg": _send(f"PRIVMSG {args[0]} :{' '.join(args[1:])}")
|
||||
elif cmd == "raw": _send(f"{' '.join(args)}")
|
||||
elif cmd == "join": Chan.join(args[0])
|
||||
elif cmd == "part": Chan.leave(args[0])
|
||||
elif (line.command == "PRIVMSG" and line.params[1].startswith('*.') ):
|
||||
commandargs = line.params[1].split(' ')
|
||||
command = ''.join(commandargs[0].split('*.')[1:])
|
||||
args = commandargs[1:]
|
||||
usernick = line.source.split('!')[0]
|
||||
chan = line.params[0]
|
||||
if command == "help": _send(f"PRIVMSG {chan} :{botdesc()}")
|
||||
elif command == "echo": _send(f"PRIVMSG {chan} :\x033[Echo]\x0F \x032{usernick}\x0F said: \x039{' '.join(args[0:])}")
|
||||
elif command == "psa" and args[0].startswith('#'):
|
||||
if 'o' in usermodes(args[0], usernick) or 'q' in usermodes(args[0], usernick): _send(f"PRIVMSG {args[0]} :\x0307**PSA:\x0F {' '.join(args[1:])}")
|
||||
else: _send(f"PRIVMSG {chan} :Sorry, you aren't allowed to make public service anouncements.")
|
||||
elif command == "psa" and not args[0].startswith('#'): _send(f"PRIVMSG {chan} :Sorry, the syntax for PSA is: *.psa <channel> <MESSAGE>")
|
||||
elif 'o' in usermodes(chan, usernick) or 'q' in usermodes(chan, usernick):
|
||||
if command == "part": Chan.leave(chan, msg=f"Leaving per request of {usernick}")
|
||||
if command == "topic": _send(f"TOPIC {chan} :{' '.join(args[0:])}")
|
||||
except KeyboardInterrupt: _send("QUIT :^C")
|
||||
if line.command == 'PRIVMSG':
|
||||
if line.params[1] == '%reload':
|
||||
if Parse.is_admin(line.source):
|
||||
oldprs = Parse
|
||||
try:
|
||||
prs = importlib.reload(Parse)
|
||||
Parse = prs
|
||||
except Exception:
|
||||
_send(f"PRIVMSG {line.source.split('!')[0]} :Error reloading")
|
||||
Parse = oldprs
|
||||
Parse.Parse(_send=_send, srv=srv, line=line)
|
||||
except KeyboardInterrupt: _send("QUIT :^C")
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
import socket, asyncio, time, re, ircstates, importlib
|
||||
from config import *
|
||||
import shared
|
||||
|
||||
def _send(msg: str, log=True):
|
||||
shared.sock.send(f"{msg}\n".encode('utf-8'))
|
||||
if log == True: print(f"> {msg}")
|
||||
|
||||
def usermodes(chan, user):
|
||||
try: return srv.channels[chan].users[user].modes
|
||||
except NameError: return []
|
||||
|
||||
#def _send(msg: str, log=True):
|
||||
#s.send(f"{msg}\n".encode('utf-8'))
|
||||
#if log == True: print(f"> {msg}")
|
||||
|
||||
def botdesc():
|
||||
return fileload('desc')
|
||||
|
||||
db['oper'].insert(dict(hostmask='julian!julian@envs.net'))
|
||||
|
||||
def is_admin(source):
|
||||
#return 'julian@envs.net' == source.split('!')[1]
|
||||
#db['oper'].insert(dict(hostmask='friend!good@person'))
|
||||
#db['oper'].delete(hostmask='nick!badowner@bad.opers')
|
||||
try:
|
||||
for i in db['oper'].all():
|
||||
if source == i['hostmask']: return True
|
||||
else: continue
|
||||
except:
|
||||
_send(f"PRIVMSG julian :{source} isn't operator, permission denied for {source.split('!')[0]}.")
|
||||
return False
|
||||
return False
|
||||
|
||||
def nslogin():
|
||||
_send(f"PRIVMSG NickServ@services.tilde.chat :IDENTIFY {NICK} {SERVICES_PW}", log=False)
|
||||
print("Logged into services")
|
||||
_send(f"MODE {NICK} +B")
|
||||
Chan.joins()
|
||||
|
||||
class Chan:
|
||||
def joins():
|
||||
for i in db['chan'].all():
|
||||
_send(f"JOIN {','.join([i['name']])}")
|
||||
def join(channel):
|
||||
_send(f"JOIN {channel}")
|
||||
db['chan'].insert(dict(name=channel))
|
||||
def leave(channel, msg="", kick=False):
|
||||
if not kick: _send(f"PART {channel} :{msg}")
|
||||
db['chan'].delete(name=channel)
|
||||
|
||||
class Command:
|
||||
def ExplicitCommandParser(line):
|
||||
commandargs = line.params[1].split(' ')
|
||||
command = ''.join(commandargs[0].split('*.')[1:])
|
||||
args = commandargs[1:]
|
||||
usernick = line.source.split('!')[0]
|
||||
chan = line.params[0]
|
||||
if command == "help": _send(f"PRIVMSG {chan} :{botdesc()}")
|
||||
elif command == "echo": _send(f"PRIVMSG {chan} :\x033[Echo]\x0F \x032{usernick}\x0F said: \x039{' '.join(args[0:])}")
|
||||
elif command == "psa" and args[0].startswith('#'):
|
||||
if 'o' in usermodes(args[0], usernick) or 'q' in usermodes(args[0], usernick): _send(f"PRIVMSG {args[0]} :\x0307**PSA:\x0F {' '.join(args[1:])}")
|
||||
else: _send(f"PRIVMSG {chan} :Sorry, you aren't allowed to make public service anouncements.")
|
||||
elif command == "psa" and not args[0].startswith('#'): _send(f"PRIVMSG {chan} :Sorry, the syntax for PSA is: *.psa <channel> <MESSAGE>")
|
||||
elif 'o' in usermodes(chan, usernick) or 'q' in usermodes(chan, usernick):
|
||||
if command == "part": Chan.leave(chan, msg=f"Leaving per request of {usernick}")
|
||||
if command == "topic": _send(f"TOPIC {chan} :{' '.join(args[0:])}")
|
||||
|
||||
def AdminCommandParser(line):
|
||||
cmdargs = line.params[1].split(' ')
|
||||
cmd = ''.join(cmdargs[0].split('%')[1:])
|
||||
args = cmdargs[1:]
|
||||
usernick = line.source.split('!')[0]
|
||||
chan = line.params[0]
|
||||
if cmd == "mode": _send(f"MODE {chan} {' '.join(args)}")
|
||||
elif cmd == "quit": _send(f"QUIT :{' '.join(args)}")
|
||||
elif cmd == "msg": _send(f"PRIVMSG {args[0]} :{' '.join(args[1:])}")
|
||||
elif cmd == "raw": _send(f"{' '.join(args)}")
|
||||
elif cmd == "join": Chan.join(args[0])
|
||||
elif cmd == "part": Chan.leave(args[0])
|
||||
|
||||
def privmsg(line):
|
||||
if line.params[1].startswith('%') and is_admin(line.source): Command.AdminCommandParser(line)
|
||||
elif line.params[0] != NICK and line.params[1] == "!botlist": _send(f"PRIVMSG {line.params[0]} :{botdesc()}")
|
||||
elif line.params[0] == NICK and line.params[1] == "!botlist": _send(f"PRIVMSG {line.source.split('!')[0]} :{botdesc()}")
|
||||
elif line.params[1].startswith('*.'): Command.ExplicitCommandParser(line)
|
||||
|
||||
def Parse(_send=None, srv=None, line=None):
|
||||
if not line.command == "PING": print(f"< {line.format()}")
|
||||
if line.command == "PING":
|
||||
_send(f"PONG :{line.params[0]}", log=False)
|
||||
if shared.nick_has_changed: _send(f"NICK {NICK}")
|
||||
elif line.command == "433": _send(f"NICK {NICK}_"); shared.nick_has_changed = True
|
||||
elif (line.source == f"NickServ!{SERVICES_HOSTMASK}" and line.command == 'NOTICE' and line.params == ['util','If you do not change within 1 minute, I will change your nick.']): nslogin()
|
||||
elif (line.command == "INVITE" and line.params[0] == NICK): Chan.join(line.params[1])
|
||||
elif line.command == "KICK" and line.params[1] == NICK: Chan.leave(line.params[0], kick=True)
|
||||
elif line.command == "PRIVMSG": Command.privmsg(line)
|
Loading…
Reference in New Issue