196 lines
5.1 KiB
Python
196 lines
5.1 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from irctokens import build, Line
|
|
from ircrobots import Bot as BaseBot
|
|
from ircrobots import Server as BaseServer
|
|
from ircrobots import ConnectionParams, SASLUserPass
|
|
from random import randint, choice
|
|
from tracery.modifiers import base_english
|
|
import asyncio
|
|
import configparser
|
|
import glob
|
|
import json
|
|
import os
|
|
import random
|
|
import traceback
|
|
import tracery
|
|
|
|
DB = {}
|
|
config = configparser.ConfigParser(
|
|
converters={"list": lambda x: [i.strip() for i in x.split(",")]}
|
|
)
|
|
config.read("config.ini")
|
|
bot = config["irc"]
|
|
|
|
# read account info if it exists
|
|
if os.path.isfile("account.ini"):
|
|
account = configparser.ConfigParser()
|
|
account.read("account.ini")
|
|
account = account["nickserv"]
|
|
|
|
|
|
def grammar(rules):
|
|
try:
|
|
res = tracery.Grammar(rules)
|
|
res.add_modifiers(base_english)
|
|
return res
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
|
|
def load_rules(path):
|
|
try:
|
|
with open(path) as f:
|
|
return json.loads(f.read())
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
|
|
def populate():
|
|
global DB
|
|
# DB = {}
|
|
for p in glob.glob("/home/*/.tracery/*"):
|
|
name, ext = os.path.splitext(p)
|
|
name = os.path.basename(name)
|
|
if name.startswith(".") or ext not in (".json", ""):
|
|
continue
|
|
if p in DB:
|
|
DB[name].append(grammar(load_rules(p)))
|
|
else:
|
|
DB[name] = [grammar(load_rules(p))]
|
|
|
|
|
|
populate()
|
|
|
|
|
|
def generate(rule):
|
|
populate()
|
|
if rule in DB:
|
|
g = random.choice(DB[rule])
|
|
return g.flatten("#origin#")
|
|
|
|
|
|
def listify(col):
|
|
if type(col) == type([]):
|
|
return col
|
|
else:
|
|
return [col]
|
|
|
|
|
|
def shuffle(col):
|
|
a = random.choice(list(col))
|
|
b = random.choice(list(col))
|
|
if "origin" in [a, b]:
|
|
return col
|
|
col[a], col[b] = col[b], col[a]
|
|
return col
|
|
|
|
|
|
def fuse(argv):
|
|
populate()
|
|
raw = {}
|
|
for gk in argv:
|
|
if gk in DB:
|
|
g = random.choice(DB[gk]).raw
|
|
for k in g:
|
|
if k in raw:
|
|
raw[k] = listify(raw[k]) + listify(g[k])
|
|
else:
|
|
raw[k] = g[k]
|
|
for i in range(20):
|
|
raw = shuffle(raw)
|
|
return grammar(raw).flatten("#origin#")
|
|
|
|
|
|
def think(line):
|
|
chan = line.params[0]
|
|
words = line.params[1].split(" ")
|
|
|
|
if len(words) > 0:
|
|
if words[0] == "!!list":
|
|
res = ""
|
|
for k in DB:
|
|
res += k + " "
|
|
return res
|
|
elif words[0] == "!!fuse":
|
|
if "|" in words:
|
|
res = fuse(words[1 : words.index("|")])
|
|
if res:
|
|
return " ".join(words[words.index("|") + 1 :]) + " " + res
|
|
else:
|
|
res = fuse(words[1:])
|
|
if res:
|
|
return res
|
|
elif words[0] == "!!source":
|
|
return "https://tildegit.org/ben/tracer"
|
|
elif words[0] == "!botlist" or words[0] == "!!help":
|
|
return "helo i'm a tracery bot that makes cool things from tracery grammars in your ~/.tracery. see http://tracery.io for more info"
|
|
elif words[0][0:2] == "!!":
|
|
print(words)
|
|
res = generate(words[0][2:])
|
|
if res:
|
|
if len(words) >= 3:
|
|
if words[1] == "|":
|
|
return " ".join(words[2:]) + " " + res
|
|
else:
|
|
return res
|
|
else:
|
|
return res
|
|
|
|
|
|
class Server(BaseServer):
|
|
async def line_send(self, line: Line):
|
|
print(f"{self.name} > {line.format()}")
|
|
|
|
async def line_read(self, line: Line):
|
|
print(f"{self.name} < {line.format()}")
|
|
if line.command == "001":
|
|
await self.send(build("JOIN", [",".join(bot.getlist("channels"))]))
|
|
await self.send(build("MODE", [self.nickname, "+B"]))
|
|
|
|
if line.command == "INVITE":
|
|
print(f"received invite to {line.params[1]}")
|
|
await self.send(build("JOIN", [line.params[1]]))
|
|
|
|
if (
|
|
line.command == "PRIVMSG"
|
|
and self.has_channel(line.params[0])
|
|
and not line.hostmask is None
|
|
and not self.casefold(line.hostmask.nickname) == self.nickname_lower
|
|
and not ("batch" in line.tags and line.tags["batch"] == "1")
|
|
and not "inspircd.org/bot" in line.tags
|
|
and self.has_user(line.hostmask.nickname)
|
|
):
|
|
try:
|
|
response = think(line)
|
|
if response is not None:
|
|
await self.send(build("PRIVMSG", [line.params[0], response]))
|
|
except Exception as e:
|
|
print("ERROR", line)
|
|
print(e)
|
|
traceback.print_exc()
|
|
|
|
|
|
class Bot(BaseBot):
|
|
def create_server(self, name: str):
|
|
return Server(self, name)
|
|
|
|
|
|
async def main():
|
|
ircrobot = Bot()
|
|
|
|
params = ConnectionParams(
|
|
bot["nick"],
|
|
host=bot["server"],
|
|
port=int(bot["port"]),
|
|
tls=bool(bot["tls"]),
|
|
sasl=SASLUserPass(account["username"], account["password"]),
|
|
)
|
|
|
|
await ircrobot.add_server("tilde", params)
|
|
await ircrobot.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|