147 lines
4.0 KiB
Python
147 lines
4.0 KiB
Python
# Part of rabbitears See LICENSE for permissions
|
|
# Copyright (C) 2022 Matt Arnold
|
|
from IRC import *
|
|
import os
|
|
import random
|
|
import ssl
|
|
import socket
|
|
import sys
|
|
import irctokens
|
|
import json
|
|
import sqlite3
|
|
import logging
|
|
from daemonize import Daemonize
|
|
import threading
|
|
from queue import Queue, Empty
|
|
|
|
NODEAMON = True
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.propagate = True
|
|
fh = logging.FileHandler("bot.log", "w")
|
|
fh.setLevel(logging.DEBUG)
|
|
logger.addHandler(fh)
|
|
keep_fds = [fh.stream.fileno()]
|
|
|
|
|
|
class NullDevice:
|
|
def write(self,s):
|
|
pass
|
|
|
|
LINEEND = '\r\n'
|
|
# IRC Config
|
|
config = None
|
|
with open('config.json') as f:
|
|
jld = f.read()
|
|
config = json.loads(jld)
|
|
|
|
|
|
uds_addr = config['uds']
|
|
try:
|
|
os.unlink(uds_addr)
|
|
except OSError:
|
|
if os.path.exists(uds_addr):
|
|
raise
|
|
|
|
def uds_thread(in_q):
|
|
uds = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
|
|
uds.bind(uds_addr)
|
|
uds.listen(1)
|
|
while True:
|
|
connection, client_address = uds.accept()
|
|
try:
|
|
data = connection.recv(255)
|
|
print(str(type(data)))
|
|
if data:
|
|
#pds = data.strip()
|
|
#pds = bytes(pds.strip(LINEEND))
|
|
connection.sendall(bytes("ACK" + LINEEND, 'UTF-8'))
|
|
in_q.put_nowait(data)
|
|
else:
|
|
connection.sendall(bytes("NACK" + LINEEND))
|
|
continue
|
|
except Exception as e:
|
|
logging.exception(str(e))
|
|
|
|
|
|
# Need to pass the IRCBot class a socket the reason it doesn't do this itself is
|
|
# so you can set up TLS or not as you need it
|
|
# These provide good defaults. But your milage may vary
|
|
def do_connect():
|
|
oursock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
context = ssl.SSLContext()
|
|
context.check_hostname = False
|
|
context.verify_mode = ssl.CERT_NONE
|
|
oursock = context.wrap_socket(oursock, server_hostname=config['hostname'])
|
|
|
|
irc = IRCBot(oursock, isBad=config["isBad"])
|
|
irc.connect(config['hostname'],
|
|
config['port'],
|
|
config['channel'],
|
|
config['nick'],
|
|
config['nickpass'])
|
|
return irc
|
|
|
|
def hup_handle(sig, fr):
|
|
sys.exit()
|
|
|
|
def do_mean():
|
|
con = sqlite3.connect(config['db'])
|
|
cur = con.cursor()
|
|
qr = cur.execute("SELECT count(*) from mean").fetchone()
|
|
maxrows = int(qr[0])
|
|
slrow = str(random.randint(1, maxrows))
|
|
qr = cur.execute("SELECT data from mean where rowid=(?)", (slrow,)).fetchone()
|
|
result = str(qr[0])
|
|
return result
|
|
|
|
def generate_response(person, message):
|
|
msg = message.strip(LINEEND)
|
|
if 'cool.person' in person and msg.lower() == "hello botley":
|
|
return "Greetings Master"
|
|
elif msg.lower() == "hello":
|
|
return "Greetings Human!"
|
|
elif "Ground Control to Major Tom" in msg:
|
|
return "Put your helmet on!"
|
|
elif "Ziggy Stardust" in msg:
|
|
return "Looks good in leather pants"
|
|
elif msg.lower() == "!mean":
|
|
return do_mean()
|
|
else:
|
|
return None
|
|
|
|
def do_main_loop():
|
|
irc = do_connect()
|
|
q = Queue()
|
|
x = threading.Thread(target=uds_thread, args=(q,))
|
|
x.start()
|
|
while True:
|
|
try:
|
|
|
|
text = irc.get_response()
|
|
logging.debug(text[0],text[1],text[2])
|
|
if text[1] == "MODE": # in leiu of RPL_WELCOME
|
|
botmode = irctokens.build("MODE", [config['nick'], '+B'])
|
|
irc.send_cmd(botmode)
|
|
if text[1] == 'PRIVMSG' and text[2][0] == config['channel']:
|
|
r = generate_response(text[0],text[2][1])
|
|
if r is not None:
|
|
irc.send_privmsg(config['channel'],r)
|
|
try:
|
|
d = q.get_nowait()
|
|
irc.send_privmsg(config['channel'], d.decode("UTF-8"))
|
|
except Empty:
|
|
printred("Empty")
|
|
|
|
|
|
|
|
except IRCError as e:
|
|
logging.error(e)
|
|
sys.exit(1)
|
|
|
|
pid = "bot.pid"
|
|
if not NODEAMON:
|
|
daemon = Daemonize(app="theodebot", pid=pid,keep_fds=keep_fds, action=do_main_loop)
|
|
daemon.start()
|
|
else:
|
|
do_main_loop() |