""" Bot module Core functionality for a bot instance Contains a global module map containing loaded modules and their functions Includes some builtin administrative commands """ import logging import importlib import sys import toml from protocol import IrcProtocol import modules # global module map, maps registered commands and parsers for each loaded module # persists across reloads try: _MOD_MAP except NameError: _MOD_MAP = {} # single global bot object try: _BOT # on a reload, reimport current module list from config with open( _BOT['_cfg_file'] ) as cfg_file: cfg = toml.loads( cfg_file.read() ) _load_modules( cfg['bot'].get( 'modules', [] ) ) except NameError: _BOT = {} def add_module( name ): """ adds a new named empty module definition to the global module map 'fixes' passed module name by removing top-level package name returns newly created empty module object """ mod_name = name[name.find( '.' ) + 1: ] if mod_name not in _MOD_MAP.keys(): logging.info( 'Registering \'%s\' module...', # \'%s\' (%s)...', mod_name ) _MOD_MAP[mod_name] = {'name':mod_name,'cmds':{},'parsers':[]} return _MOD_MAP[mod_name] def command( name, admin=False ): """ Command registering decorator. Passed name is name of command, decorated function's docstring becomes command's help message. Set admin to True to allow only bot's owner/admins to execute. Function's module name is automatically appended to docstring. Command function gets passed reference to bot instance, array of params, and context info containing the nick that sent, channel it was sent in, and reference to connection object. """ def __deco__( func ): mod = add_module( func.__module__ ) # set admin flag if needed if admin: func.__dict__['admin'] = True if not func.__doc__: func.__doc__ = 'No help available for command' func.__doc__ += ' {}(Module: {})'.format( admin and '(Admin-Only) ' or '', mod['name'] ) mod['cmds'][name.lower()] = func return func #__inner__ # func return __deco__ def parser(): """ message parser decorator. decorated function gets passed con, dst, nick, msg on each privmsg even bot's own messages are sent to parsers """ def __deco__( func ): mod = add_module( func.__module__ ) mod['parsers'].append( func ) return func return __deco__ # initialize the bot instance, takes a config filename def init( loop, cfg_file ): _BOT['loop'] = loop _BOT['_cfg_file'] = cfg_file # load config file cfg = toml.loads( open( cfg_file ).read() ) _BOT['cfg'] = cfg # import initial module list from config _load_modules( cfg['bot'].get( 'modules', [] ) ) # list of network connections _BOT['cons'] = [] # create and connect all servers in config for s in cfg.get( 'server', [] ): add_connection( s ) # public getter functions for bot def get_loop(): return _BOT.get( 'loop', None ) def get_cfg(): return _BOT.get( 'cfg', {} ) def get_cons(): return _BOT.get( 'cons', [] ) # add and connect to a new server def add_connection( sv_cfg ): loop = get_loop() ncon = IrcProtocol( loop, get_cfg().get( 'user' ) ) # add connection callbacks ncon.add_message_callback( _on_priv_message ) ncon.add_connect_callback( _on_connect ) ncon.add_disconnect_callback( _on_disconnect ) loop.create_task( ncon.do_connect( sv_cfg ) ) def _on_connect( con ): con.warning( 'ON CONNECT!' ) cons = get_cons() # add priv msg callback here since this could be a reconnect as well # and it was removed on disconnect #con.add_message_callback( _on_priv_message ) if con not in cons: cons.append( con ) def _on_disconnect( con ): con.warning( 'ON DISCONNECT!' ) cons = get_cons() if con in cons: cons.remove( con ) # called when a connected connection receives a priv message # passes the connection that sent, destination of message, # the nick that sent, and the message def _on_priv_message( con, dst, nick, msg ): #con.log( '{} {} {} {}'.format( con, dst, nick, msg ) ) bcfg = get_cfg() # check for command-like message # message split into words, first word is command rest are parms list pfx = bcfg['bot']['prefix'] strp_msg = msg.lstrip() # if prefixed and not from self if strp_msg[0 : len( pfx )] == pfx and nick != bcfg['user']['nick']: msg_words = strp_msg.split() cmd = msg_words[0][len( pfx ):].lower() parms = msg_words[1:] # check if command is in a registered module for k, m in _MOD_MAP.items(): if cmd in m['cmds'].keys(): cmd_func = m['cmds'][cmd] con.log( '::CMD:: \'{}\' run by <{}> in ({}) with parms: {}'.format( cmd, nick, dst, parms ) ) # if msg was sent as pm, # change dst to nick so say_to's can respond in pm if dst == bcfg['user']['nick']: dst = nick # check for admin flag if 'admin' in cmd_func.__dict__.keys() and nick != con.cfg['usr']['owner']: con.say_to( dst, '> You do not have permission to execute this command.' ) break get_loop().create_task( cmd_func( parms, {'con':con,'dst':dst,'nick':nick} ) ) break # callback registered parsers, for self messages too #if nick != bcfg['user']['nick']: for v in _MOD_MAP.values(): for p in v['parsers']: p( con, dst, nick, msg ) # core commands # join / part @command( 'join', True ) async def cmd_join( parms, ctx ): """ Joins a channel. """ con = ctx['con'] if parms \ and not (parms[0].lower() in con.cfg['sv']['channels']) and parms[0][0] == '#': con.send( 'JOIN {}'.format( parms[0] ) ) con.cfg['sv']['channels'].append( parms[0].lower() ) @command( 'part', True ) async def cmd_part( parms, ctx ): """ Leaves a channel. """ con = ctx['con'] if parms \ and parms[0].lower() in con.cfg['sv']['channels']: con.send( 'PART {}'.format( parms[0] ) ) con.cfg['sv']['channels'].remove( parms[0].lower() ) # gets the doc string for specified command @command( 'help' ) async def cmd_help( p, c ): """ Shows help message for a specified command. """ con = c['con'] dst = c['dst'] if not p: con.say_to( dst, 'No command specified. See list of commands with \'list\'.' ) return helpstr = '' for k, m in _MOD_MAP.items(): if p[0].lower() in m['cmds'].keys(): helpstr = '> {}: {}'.format( p[0].lower(), ' '.join( m['cmds'][p[0].lower()].__doc__.split() ) ) if helpstr: con.say_to( dst, helpstr ) else: con.say_to( dst, 'Command not found.' ) # list commands @command( 'list' ) async def cmd_list( p, c ): """ Lists available commands. Specify a module name from 'modules' command to filter. """ cn = c['con'] cmdlist = [] for k,v in _MOD_MAP.items(): if p: if p[0].lower() == k.lower(): cmdlist = list(v['cmds'].keys()) break else: cmdlist.append( list(v['cmds'].keys()) ) cn.say_to( c['dst'], '> {}'.format( (cmdlist and cmdlist or 'Module not found.') ) ) # list loaded modules @command( 'modules' ) async def cmd_modules( p, c ): """ List all currently loaded modules. List commands within using 'list ' """ cn = c['con'] modlist = list(_MOD_MAP.keys()) cn.say_to( c['dst'], '> {}'.format( modlist ) ) # list active connections @command( 'connections', True ) async def cmd_connections( p, c ): """ List currently connected connections """ conlist = [c.cfg['sv']['name'] for c in get_cons()] c['con'].say_to( c['dst'], '> {}'.format( conlist ) ) # reload a specified module @command( 'reload', True ) async def cmd_import( p, ctx ): """ Reloads a previously loaded module. """ con = ctx['con'] dst = ctx['dst'] if p and p[0] in _MOD_MAP.keys(): prev_mod = _MOD_MAP.pop( p[0], None ) # if not reloading root bot module, re-append the modules. package name sys_mod = p[0] if p[0] != 'bot': sys_mod = 'modules.' + sys_mod try: importlib.reload( sys.modules[sys_mod] ) con.say_to( dst, 'Success!' ) except Exception as e: con.say_to( dst, str( e ) ) # reinsert using original fixed name _MOD_MAP[p[0]] = prev_mod else: con.say_to( dst, 'Module not found' ) # connect to a new server @command( 'connect', True ) async def cmd_connect( p, c ): """ Connects to a new network. Parameters: 'name' 'host' """ con = c['con'] if not p and len( p ) < 2: return new_sv_cfg = {'host':p[1],'port':6697,'name':p[0]} add_connection( new_sv_cfg ) con.say_to( c['dst'], 'OMW...' ) # disconnect from currently connected server @command( 'disconnect', True ) async def cmd_discon( p, c ): """ Disconnects from currently connected network """ con = c['con'] con._stop = True con.send( 'QUIT :POOF' ) #b._con.remove( con ) # list of names on chan @command( 'names', True ) async def cmd_names( p, c ): cn = c['con'] nmstr = '' for k,v in cn.names.items(): if c['dst'] in v: nmstr += str(k)+'0|0' cn.say_to( c['dst'], '> {}'.format( nmstr ) ) #cn.log( str(cn.names) ) # imports a list of module names from the modules package def _load_modules( mod_list ): for n in mod_list: importlib.import_module( '.{}'.format( n ), 'modules' )