187 lines
4.8 KiB
Python
187 lines
4.8 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from irctokens import build, Line
|
|
from ircrobots import Bot as BaseBot
|
|
from ircrobots import Server as BaseServer
|
|
from ircrobots import ConnectionParams, SASLUserPass
|
|
from tracery.modifiers import base_english
|
|
import asyncio
|
|
import configparser
|
|
import glob
|
|
import json
|
|
import os
|
|
import random
|
|
import traceback
|
|
import tracery
|
|
|
|
HELP_TEXT = "helo i'm a tracery bot that makes cool things from tracery grammars in your ~/.tracery. see " \
|
|
"https://tracery.io for more info "
|
|
REPO_LINK = "https://tildegit.org/ben/tracer"
|
|
|
|
DB = {}
|
|
|
|
# read configuration
|
|
if not os.path.isfile("config.ini"):
|
|
print("you must create a config.ini file")
|
|
exit(1)
|
|
|
|
parser = configparser.ConfigParser()
|
|
parser.read("config.ini")
|
|
config = parser["irc"]
|
|
account = parser["sasl"]
|
|
|
|
|
|
def grammar(rules):
|
|
try:
|
|
res = tracery.Grammar(rules)
|
|
res.add_modifiers(base_english)
|
|
return res
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
|
|
def load_rules(path):
|
|
try:
|
|
with open(path) as f:
|
|
return json.loads(f.read())
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
|
|
def populate():
|
|
global DB
|
|
DB = {}
|
|
for p in glob.glob("/home/*/.tracery/*"):
|
|
name, ext = os.path.splitext(p)
|
|
name = os.path.basename(name)
|
|
if name.startswith(".") or ext not in [".json", ""]:
|
|
continue
|
|
if p in DB:
|
|
DB[name].append(p)
|
|
else:
|
|
DB[name] = [p]
|
|
|
|
|
|
populate()
|
|
|
|
|
|
def generate(rule):
|
|
populate()
|
|
if rule in DB:
|
|
return grammar(load_rules(random.choice(DB[rule]))).flatten("#origin#")
|
|
|
|
|
|
def listify(col):
|
|
return col if type(col) == type([]) else [col]
|
|
|
|
|
|
def shuffle(col):
|
|
a = random.choice(list(col))
|
|
b = random.choice(list(col))
|
|
if "origin" in [a, b]:
|
|
return col
|
|
col[a], col[b] = col[b], col[a]
|
|
return col
|
|
|
|
|
|
def fuse(argv):
|
|
populate()
|
|
raw = {}
|
|
for gk in argv:
|
|
if gk in DB:
|
|
g = grammar(load_rules(random.choice(DB[gk]))).raw
|
|
for k in g:
|
|
if k in raw:
|
|
raw[k] = listify(raw[k]) + listify(g[k])
|
|
else:
|
|
raw[k] = g[k]
|
|
for i in range(20):
|
|
raw = shuffle(raw)
|
|
return grammar(raw).flatten("#origin#")
|
|
|
|
|
|
def think(line):
|
|
command, *words = line.params[1].split(" ")
|
|
|
|
if command == "!!list":
|
|
return " ".join(DB)
|
|
elif command == "!!source":
|
|
return REPO_LINK
|
|
elif command in ["!!help", "!botlist"]:
|
|
return HELP_TEXT
|
|
elif command == "!!fuse":
|
|
if "|" in words:
|
|
w = words.index("|")
|
|
res = fuse(words[:w])
|
|
if res:
|
|
return " ".join(words[w + 1:]) + " " + res
|
|
else:
|
|
res = fuse(words)
|
|
if res:
|
|
return res
|
|
elif command.startswith("!!"):
|
|
res = generate(command[2:])
|
|
if res:
|
|
if "|" in words:
|
|
w = words.index("|")
|
|
return " ".join(words[w + 1:]) + " " + res
|
|
else:
|
|
return res
|
|
|
|
|
|
class Server(BaseServer):
|
|
async def line_send(self, line: Line):
|
|
print(f"{self.name} > {line.format()}")
|
|
|
|
async def line_read(self, line: Line):
|
|
print(f"{self.name} < {line.format()}")
|
|
|
|
if line.command == "001":
|
|
await self.send(build("MODE", [self.nickname, "+B"]))
|
|
|
|
if line.command == "INVITE":
|
|
await self.send(build("JOIN", [line.params[1]]))
|
|
|
|
if (
|
|
line.command == "PRIVMSG"
|
|
and self.has_channel(line.params[0])
|
|
and self.has_user(line.hostmask.nickname)
|
|
and len(line.params[1].split(" ")) > 0
|
|
and line.hostmask is not None
|
|
and not self.casefold(line.hostmask.nickname) == self.nickname_lower
|
|
and not ("batch" in line.tags and line.tags["batch"] == "1")
|
|
and "inspircd.org/bot" not in line.tags
|
|
and line.params[1].startswith("!")
|
|
):
|
|
try:
|
|
response = think(line)
|
|
if response is not None:
|
|
await self.send(build("PRIVMSG", [line.params[0], response]))
|
|
except Exception as e:
|
|
print("ERROR", line)
|
|
print(e)
|
|
traceback.print_exc()
|
|
|
|
|
|
class Bot(BaseBot):
|
|
def create_server(self, name: str):
|
|
return Server(self, name)
|
|
|
|
|
|
async def main():
|
|
params = ConnectionParams(
|
|
config["nick"],
|
|
host=config["server"],
|
|
port=config.getint("port"),
|
|
tls=config.getboolean("tls"),
|
|
sasl=SASLUserPass(account["username"], account["password"]),
|
|
autojoin=[config["channels"]]
|
|
)
|
|
bot = Bot()
|
|
await bot.add_server(config["server"], params)
|
|
await bot.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|