dustbot/bot.py

314 lines
8.6 KiB
Python

"""
Bot module
Core functionality for a bot instance
Contains a global module map containing loaded modules and their functions
Includes some builtin administrative commands
"""
import logging
import importlib
import sys
import toml
from protocol import IrcProtocol
import modules
# global module map, maps registered commands and parsers for each loaded module
# persists across reloads
try: _MOD_MAP
except NameError:
_MOD_MAP = {}
# single global bot object
try:
_BOT
# on a reload, reimport current module list from config
with open( _BOT['_cfg_file'] ) as cfg_file:
cfg = toml.loads( cfg_file.read() )
_load_modules( cfg['bot'].get( 'modules', [] ) )
except NameError:
_BOT = {}
def add_module( name ):
""" adds a new named empty module definition to the global module map
'fixes' passed module name by removing top-level package name
returns newly created empty module object """
mod_name = name[name.find( '.' ) + 1: ]
if mod_name not in _MOD_MAP.keys():
logging.info( 'Registering \'%s\' module...', # \'%s\' (%s)...',
mod_name )
_MOD_MAP[mod_name] = {'name':mod_name,'cmds':{},'parsers':[]}
return _MOD_MAP[mod_name]
def command( name, admin=False ):
"""
Command registering decorator.
Passed name is name of command, decorated function's docstring becomes command's help message.
Set admin to True to allow only bot's owner/admins to execute.
Function's module name is automatically appended to docstring.
Command function gets passed reference to bot instance, array of params, and context info
containing the nick that sent, channel it was sent in, and reference to connection object.
"""
def __deco__( func ):
mod = add_module( func.__module__ )
# set admin flag if needed
if admin: func.__dict__['admin'] = True
if not func.__doc__: func.__doc__ = 'No help available for command'
func.__doc__ += ' {}(Module: {})'.format( admin and '(Admin-Only) ' or '', mod['name'] )
mod['cmds'][name.lower()] = func
return func #__inner__ # func
return __deco__
def parser():
"""
message parser decorator.
decorated function gets passed con, dst, nick, msg on each privmsg
"""
def __deco__( func ):
mod = add_module( func.__module__ )
mod['parsers'].append( func )
return func
return __deco__
# initialize the bot instance, takes a config filename
def init( loop, cfg_file ):
_BOT['loop'] = loop
_BOT['_cfg_file'] = cfg_file
# load config file
cfg = toml.loads( open( cfg_file ).read() )
_BOT['cfg'] = cfg
# import initial module list from config
_load_modules( cfg['bot'].get( 'modules', [] ) )
# list of network connections
_BOT['cons'] = []
# create and connect all servers in config
for s in cfg.get( 'server', [] ):
add_connection( s )
# public getter functions for bot
def get_loop():
return _BOT.get( 'loop', None )
def get_cfg():
return _BOT.get( 'cfg', {} )
def get_cons():
return _BOT.get( 'cons', [] )
# add and connect to a new server
def add_connection( sv_cfg ):
loop = get_loop()
ncon = IrcProtocol( loop, get_cfg().get( 'user' ) )
# add connection callbacks
ncon.add_message_callback( _on_priv_message )
ncon.add_connect_callback( _on_connect )
ncon.add_disconnect_callback( _on_disconnect )
loop.create_task( ncon.do_connect( sv_cfg ) )
def _on_connect( con ):
con.warning( 'ON CONNECT!' )
cons = get_cons()
# add priv msg callback here since this could be a reconnect as well
# and it was removed on disconnect
#con.add_message_callback( _on_priv_message )
if con not in cons:
cons.append( con )
def _on_disconnect( con ):
con.warning( 'ON DISCONNECT!' )
cons = get_cons()
if con in cons:
cons.remove( con )
# called when a connected connection receives a priv message
# passes the connection that sent, destination of message,
# the nick that sent, and the message
def _on_priv_message( con, dst, nick, msg ):
#con.log( '{} {} {} {}'.format( con, dst, nick, msg ) )
bcfg = get_cfg()
# check for command-like message
# message split into words, first word is command rest are parms list
pfx = bcfg['bot']['prefix']
strp_msg = msg.lstrip()
if strp_msg[0 : len( pfx )] == pfx:
msg_words = strp_msg.split()
cmd = msg_words[0][len( pfx ):].lower()
parms = msg_words[1:]
# check if command is in a registered module
for k, m in _MOD_MAP.items():
if cmd in m['cmds'].keys():
cmd_func = m['cmds'][cmd]
con.log( '::CMD:: \'{}\' run by <{}> in ({}) with parms: {}'.format(
cmd, nick, dst, parms ) )
# if msg was sent as pm,
# change dst to nick so say_to's can respond in pm
if dst == bcfg['user']['nick']: dst = nick
# check for admin flag
if 'admin' in cmd_func.__dict__.keys() and nick != con.cfg['usr']['owner']:
con.say_to( dst, '> You do not have permission to execute this command.' )
break
get_loop().create_task( cmd_func(
parms, {'con':con,'dst':dst,'nick':nick} ) )
break
# callback registered parsers, not for self messages though
if nick != bcfg['user']['nick']:
# call each registered parser
for v in _MOD_MAP.values():
for p in v['parsers']:
p( con, dst, nick, msg )
# core commands
# join / part
@command( 'join', True )
async def cmd_join( parms, ctx ):
""" Joins a channel. """
con = ctx['con']
if parms \
and not (parms[0].lower() in con.cfg['sv']['channels']) and parms[0][0] == '#':
con.send( 'JOIN {}'.format( parms[0] ) )
con.cfg['sv']['channels'].append( parms[0].lower() )
@command( 'part', True )
async def cmd_part( parms, ctx ):
""" Leaves a channel. """
con = ctx['con']
if parms \
and parms[0].lower() in con.cfg['sv']['channels']:
con.send( 'PART {}'.format( parms[0] ) )
con.cfg['sv']['channels'].remove( parms[0].lower() )
# gets the doc string for specified command
@command( 'help' )
async def cmd_help( p, c ):
""" Shows help message for a specified command. """
con = c['con']
dst = c['dst']
if not p:
con.say_to( dst, 'No command specified. See list of commands with \'list\'.' )
return
helpstr = ''
for k, m in _MOD_MAP.items():
if p[0].lower() in m['cmds'].keys():
helpstr = '> {}: {}'.format( p[0].lower(), ' '.join( m['cmds'][p[0].lower()].__doc__.split() ) )
if helpstr:
con.say_to( dst, helpstr )
else:
con.say_to( dst, 'Command not found.' )
# list commands
@command( 'list' )
async def cmd_list( p, c ):
""" Lists available commands. Specify a module name from 'modules' command to filter. """
cn = c['con']
cmdlist = []
for k,v in _MOD_MAP.items():
if p:
if p[0].lower() == k.lower():
cmdlist = list(v['cmds'].keys())
break
else:
cmdlist.append( list(v['cmds'].keys()) )
cn.say_to( c['dst'], '> {}'.format( (cmdlist and cmdlist or 'Module not found.') ) )
# list loaded modules
@command( 'modules' )
async def cmd_modules( p, c ):
""" List all currently loaded modules. List commands within using 'list <module>' """
cn = c['con']
modlist = list(_MOD_MAP.keys())
cn.say_to( c['dst'], '> {}'.format( modlist ) )
# list active connections
@command( 'connections', True )
async def cmd_connections( p, c ):
""" List currently connected connections """
conlist = [c.cfg['sv']['name'] for c in get_cons()]
c['con'].say_to( c['dst'], '> {}'.format( conlist ) )
# reload a specified module
@command( 'reload', True )
async def cmd_import( p, ctx ):
""" Reloads a previously loaded module. """
con = ctx['con']
dst = ctx['dst']
if p and p[0] in _MOD_MAP.keys():
prev_mod = _MOD_MAP.pop( p[0], None )
# if not reloading root bot module, re-append the modules. package name
sys_mod = p[0]
if p[0] != 'bot': sys_mod = 'modules.' + sys_mod
try:
importlib.reload( sys.modules[sys_mod] )
con.say_to( dst, 'Success!' )
except Exception as e:
con.say_to( dst, str( e ) )
# reinsert using original fixed name
_MOD_MAP[p[0]] = prev_mod
else: con.say_to( dst, 'Module not found' )
# connect to a new server
@command( 'connect', True )
async def cmd_connect( p, c ):
"""
Connects to a new network.
Parameters: 'name' 'host'
"""
con = c['con']
if not p and len( p ) < 2: return
new_sv_cfg = {'host':p[1],'port':6697,'name':p[0]}
add_connection( new_sv_cfg )
con.say_to( c['dst'], 'OMW...' )
# disconnect from currently connected server
@command( 'disconnect', True )
async def cmd_discon( p, c ):
""" Disconnects from currently connected network """
con = c['con']
con._stop = True
con.send( 'QUIT :POOF' )
#b._con.remove( con )
# list of names on chan
@command( 'names', True )
async def cmd_names( p, c ):
cn = c['con']
nmstr = ''
for k,v in cn.names.items():
if c['dst'] in v:
nmstr += str(k)+'0|0'
cn.say_to( c['dst'], '> {}'.format( nmstr ) )
#cn.log( str(cn.names) )
# imports a list of module names from the modules package
def _load_modules( mod_list ):
for n in mod_list:
importlib.import_module( '.{}'.format( n ), 'modules' )