219 lines
5.7 KiB
Python
219 lines
5.7 KiB
Python
# Part of rabbitears See LICENSE for permissions
|
|
# Copyright (C) 2022 Matt Arnold
|
|
from IRC import IRCBot, IRCError, printred
|
|
import os
|
|
import random
|
|
import ssl
|
|
import socket
|
|
import signal
|
|
import sys
|
|
import irctokens
|
|
import json
|
|
import sqlite3
|
|
import logging
|
|
from daemonize import Daemonize
|
|
from random import randint, choice
|
|
import threading
|
|
from queue import Queue
|
|
import datetime
|
|
import math
|
|
|
|
shutdown = threading.Event()
|
|
|
|
NODEAMON = False
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.propagate = True
|
|
fh = logging.FileHandler("bot.log", "w")
|
|
fh.setLevel(logging.DEBUG)
|
|
logger.addHandler(fh)
|
|
keep_fds = [fh.stream.fileno()]
|
|
|
|
|
|
class NullDevice:
|
|
def write(self, s):
|
|
pass
|
|
|
|
|
|
def get_tenpko_time():
|
|
litu = "living"
|
|
ko = "error"
|
|
now = datetime.datetime.utcnow()
|
|
stHrs = now.strftime("%H")
|
|
utc_hrs = int(stHrs)
|
|
utc_part = math.floor(utc_hrs % 6 / 2)
|
|
if utc_part == 0:
|
|
litu = "rising"
|
|
elif utc_part == 1:
|
|
litu = "living"
|
|
elif utc_part == 2:
|
|
litu = "ending"
|
|
|
|
if utc_hrs >= 0 and utc_hrs < 6:
|
|
ko = "fire"
|
|
elif utc_hrs >= 6 and utc_hrs < 12:
|
|
ko = "air"
|
|
elif utc_hrs >= 12 and utc_hrs < 18:
|
|
ko = "water"
|
|
else:
|
|
ko = "earth"
|
|
return ko, litu
|
|
|
|
|
|
LINEEND = "\r\n"
|
|
# IRC Config
|
|
config = None
|
|
with open("config.json") as f:
|
|
jld = f.read()
|
|
config = json.loads(jld)
|
|
|
|
bread = None
|
|
with open("auxfiles/conquest.txt") as f:
|
|
ldx = f.read()
|
|
bread = ldx.replace("?", ".").split(".")
|
|
|
|
|
|
NODEAMON = config["nodaemon"]
|
|
|
|
|
|
uds_addr = config["uds"]
|
|
try:
|
|
os.unlink(uds_addr)
|
|
except OSError:
|
|
if os.path.exists(uds_addr):
|
|
raise
|
|
|
|
|
|
def anarchist():
|
|
line = choice(bread)
|
|
line = line.lstrip()
|
|
return line
|
|
|
|
|
|
def uds_thread(in_q):
|
|
uds = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
uds.bind(uds_addr)
|
|
uds.listen(1)
|
|
while not shutdown.is_set():
|
|
connection, client_address = uds.accept()
|
|
try:
|
|
data = connection.recv(255)
|
|
print(str(type(data)))
|
|
if data:
|
|
# pds = data.strip()
|
|
# pds = bytes(pds.strip(LINEEND))
|
|
connection.sendall(bytes("ACK" + LINEEND, "UTF-8"))
|
|
in_q.put_nowait(data)
|
|
else:
|
|
connection.sendall(bytes("NACK" + LINEEND, "UTF-8"))
|
|
continue
|
|
except Exception as e:
|
|
logging.exception(str(e))
|
|
|
|
|
|
# Need to pass the IRCBot class a socket the reason it doesn't do this itself is
|
|
# so you can set up TLS or not as you need it
|
|
# These provide good defaults. But your milage may vary
|
|
def do_connect():
|
|
oursock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
context = ssl.SSLContext()
|
|
context.check_hostname = False
|
|
context.verify_mode = ssl.CERT_NONE
|
|
oursock = context.wrap_socket(oursock, server_hostname=config["hostname"])
|
|
|
|
irc = IRCBot(oursock, isBad=config["isBad"])
|
|
irc.connect(
|
|
config["hostname"],
|
|
config["port"],
|
|
config["channel"],
|
|
config["nick"],
|
|
config["nickpass"],
|
|
)
|
|
return irc
|
|
|
|
|
|
def hup_handle(sig, fr):
|
|
shutdown.set()
|
|
|
|
|
|
def do_mean():
|
|
con = sqlite3.connect(config["db"])
|
|
cur = con.cursor()
|
|
qr = cur.execute("SELECT count(*) from mean").fetchone()
|
|
maxrows = int(qr[0])
|
|
slrow = str(random.randint(1, maxrows))
|
|
qr = cur.execute("SELECT data from mean where rowid=(?)", (slrow,)).fetchone()
|
|
result = str(qr[0])
|
|
return result
|
|
|
|
|
|
def generate_response(person, message):
|
|
msg = message.strip(LINEEND)
|
|
if "hoot.hoot" in person and msg.lower() == "hello bot":
|
|
return "Greetings Creator Mine"
|
|
elif msg.lower() == "hello":
|
|
return "Greetings Human!"
|
|
elif "Ground Control to Major Tom" in msg:
|
|
return "Put your helmet on!"
|
|
elif "Ziggy Stardust" in msg:
|
|
return "Looks good in leather pants"
|
|
elif msg.lower() == "!roll20":
|
|
return "You rolled " + str(randint(1, 20))
|
|
else:
|
|
return None
|
|
|
|
|
|
def do_main_loop():
|
|
current_ko, _ = get_tenpko_time()
|
|
irc = do_connect()
|
|
q = Queue()
|
|
x = threading.Thread(target=uds_thread, args=(q,))
|
|
x.daemon = True
|
|
x.start()
|
|
while not shutdown.is_set():
|
|
try:
|
|
text = irc.get_response()
|
|
logging.debug(text[0], text[1], text[2])
|
|
if text[1] == "MODE": # in leiu of RPL_WELCOME
|
|
botmode = irctokens.build("MODE", [config["nick"], "+B"])
|
|
irc.send_cmd(botmode)
|
|
if text[1] == "PRIVMSG" and text[2][0] == config["channel"]:
|
|
r = generate_response(text[0], text[2][1])
|
|
if r is not None:
|
|
irc.send_privmsg(config["channel"], r)
|
|
|
|
if text[1] == "PING":
|
|
k, _ = get_tenpko_time()
|
|
if k != current_ko:
|
|
current_ko = k
|
|
roll = randint(1, 20) + 1
|
|
if roll >= 17:
|
|
line = anarchist()
|
|
irc.send_privmsg(config["channel"], line)
|
|
while not q.empty():
|
|
d = q.get_nowait()
|
|
irc.send_privmsg(config["channel"], d.decode("UTF-8"))
|
|
|
|
except IRCError as e:
|
|
logging.error(e)
|
|
sys.exit(1)
|
|
except KeyboardInterrupt:
|
|
printred("shutdown ordered")
|
|
logging.debug("Shutting down")
|
|
irc.send_quit("daddy said to go night night")
|
|
shutdown.set()
|
|
x.join(5)
|
|
sys.exit(0)
|
|
printred("Shutdown ordered")
|
|
irc.send_quit("Shutdown in progress")
|
|
sys.exit(0)
|
|
|
|
|
|
pid = "bot.pid"
|
|
signal.signal(signal.SIGHUP, hup_handle)
|
|
if not NODEAMON:
|
|
daemon = Daemonize(app="theodebot", pid=pid, keep_fds=keep_fds, action=do_main_loop)
|
|
daemon.start()
|
|
else:
|
|
do_main_loop()
|