398 lines
15 KiB
Python
398 lines
15 KiB
Python
import emoji
|
|
import json
|
|
import random
|
|
from dns.resolver import query
|
|
from email.message import EmailMessage
|
|
from email.utils import parseaddr
|
|
from mastodon import Mastodon
|
|
from smtplib import SMTP
|
|
from src import ModuleManager, utils
|
|
|
|
CHANNEL = "#tilderadio"
|
|
NOTIFY_CHANNELS = [CHANNEL]
|
|
LISTEN_URL = "https://tilderadio.org/listen"
|
|
SCHEDULE_URL = "https://tilderadio.org/schedule/"
|
|
SOURCE_URL = "https://tildegit.org/ben/bitbot-modules"
|
|
AZURACAST_API_BASE = "https://azuracast.tilderadio.org/api"
|
|
ICECAST_API_BASE = "https://azuracast.tilderadio.org/radio/8000"
|
|
TOOT_CREDS_FILE = "/home/ben/.bitbot/tilderadio-toot.json"
|
|
SLOGANS = [
|
|
"The soundtrack in your cat's head",
|
|
"Your cat says she likes us better",
|
|
"radio for the people",
|
|
"radio by bots, for bots",
|
|
"On the Internet, no one knows you're a weakly superhuman AGI with a really good speech synth module.",
|
|
"We found your cat",
|
|
"cats.",
|
|
"a hacker's excuse for radio",
|
|
"0::1 is where the heart is",
|
|
"now with the longest slogan in human history",
|
|
"reticulating splines",
|
|
"Hold the shift",
|
|
"a very serious place for very serious happenings",
|
|
"sorry, this extension is invalid",
|
|
"This is the radio you were looking for",
|
|
"Not The Illuminati, We Promise",
|
|
"the radio is coming from INSIDE the house",
|
|
"Dont Sue Us!",
|
|
"it's radio minus radio",
|
|
"where your passwords are stored in plain text",
|
|
"Like radio. Only better.",
|
|
"tomasino 0wnz y0u!",
|
|
"that escalated quickly",
|
|
"music for your brain meats",
|
|
"003 Days Since Last Workplace Accident",
|
|
"*heavy breathing*",
|
|
"Dizzy? We stop the world from spinning",
|
|
"Where Disney Princesses Go Slumming",
|
|
'any slogan but "eat the poop or die"',
|
|
"that's not what she said!",
|
|
"not product placement, we promise! (shop.tildeverse.org)",
|
|
"no longer crashes on russian metadata",
|
|
"SYNTH TUBA JAZZ VOMIT",
|
|
]
|
|
EMAIL_TEMPLATE = """
|
|
{dj} just came on the air on tilderadio.org!
|
|
|
|
now playing: {now_playing}
|
|
|
|
have a listen here: {link}
|
|
|
|
~tildebot
|
|
"""
|
|
|
|
|
|
class Module(ModuleManager.BaseModule):
|
|
now_playing = ""
|
|
dj = ""
|
|
song = ""
|
|
listeners = 0
|
|
mastodon = None
|
|
is_online = False
|
|
|
|
def save_nowplaying(self, jsontxt):
|
|
if jsontxt == "":
|
|
data = utils.http.request(AZURACAST_API_BASE + "/nowplaying/1").json()
|
|
else:
|
|
data = json.loads(jsontxt)
|
|
|
|
# get the song name directly from icecast
|
|
icecast_data = utils.http.request(ICECAST_API_BASE + "/status-json.xsl").json()
|
|
np = icecast_data["icestats"]["source"]
|
|
self.song = data["now_playing"]["song"]["text"]
|
|
self.listeners = sum(i["listeners"] for i in np)
|
|
|
|
# azuracast's now playing info is broken
|
|
# https://github.com/AzuraCast/AzuraCast/issues/3142
|
|
# self.song = data["now_playing"]["song"]["text"]
|
|
self.is_online = data["live"]["is_live"]
|
|
self.dj = data["live"]["streamer_name"]
|
|
self.broadcast_start = data["live"]["broadcast_start"]
|
|
# self.listeners = data["listeners"]["current"]
|
|
|
|
def format_nowplaying(self):
|
|
ret = ""
|
|
if self.is_online:
|
|
ret = f"({self.dj}) "
|
|
ret += f"now playing: {self.song} ~~ {self.listeners} listeners"
|
|
return ret
|
|
|
|
def on_load(self):
|
|
self.save_nowplaying("")
|
|
|
|
with open(TOOT_CREDS_FILE, "r") as f:
|
|
creds = json.load(f)
|
|
self.mastodon = Mastodon(
|
|
client_id=creds["client_id"],
|
|
client_secret=creds["client_secret"],
|
|
access_token=creds["access_token"],
|
|
api_base_url=creds["base_url"],
|
|
)
|
|
|
|
@utils.hook("api.get.slogan", authenticated=False)
|
|
def httpslogan(self, event):
|
|
return random.choice(SLOGANS)
|
|
|
|
@utils.hook("api.post.radio")
|
|
def hook(self, event):
|
|
previous_dj = self.dj
|
|
previous_song = self.song
|
|
self.save_nowplaying(event["data"])
|
|
|
|
# new dj online!
|
|
if self.is_online:
|
|
|
|
server = self.bot.get_server_by_alias("tilde")
|
|
if server is not None:
|
|
|
|
# a different dj is online
|
|
if self.dj != "" and self.dj != previous_dj:
|
|
newdj_message = f"{self.dj} is now streaming on tilderadio! tune in here now: {LISTEN_URL}"
|
|
|
|
# post toot to @tilderadio
|
|
self.mastodon.toot(newdj_message)
|
|
|
|
# send dm notifications
|
|
for n in self.bot.get_setting("tilderadio-subscriptions", []):
|
|
if n in server.users:
|
|
user = server.users[n]
|
|
self.events.on("send.stdout").call(
|
|
target=user,
|
|
module_name="Tilderadio",
|
|
server=server,
|
|
message=newdj_message,
|
|
)
|
|
|
|
# send email notifications
|
|
with SMTP("localhost") as smtp:
|
|
for nick, email in self.bot.get_setting(
|
|
"tilderadio-email-subscriptions", {}
|
|
).items():
|
|
msg = EmailMessage()
|
|
msg["Subject"] = f"tilderadio: {self.dj} is now on the air"
|
|
msg["From"] = "tildebot@tilde.chat"
|
|
msg["To"] = f"{nick} <{email}>"
|
|
msg.set_content(
|
|
EMAIL_TEMPLATE.format(
|
|
dj=self.dj,
|
|
now_playing=self.format_nowplaying(),
|
|
link=LISTEN_URL,
|
|
)
|
|
)
|
|
smtp.send_message(msg)
|
|
|
|
# post to all registered channels
|
|
for channel_name in NOTIFY_CHANNELS:
|
|
if channel_name in server.channels:
|
|
channel = server.channels[channel_name]
|
|
|
|
self.events.on("send.stdout").call(
|
|
target=channel,
|
|
module_name="Tilderadio",
|
|
server=server,
|
|
message=newdj_message,
|
|
)
|
|
|
|
# post new songs while streaming
|
|
if self.song != "" and self.song != previous_song:
|
|
if CHANNEL in server.channels:
|
|
channel = server.channels[CHANNEL]
|
|
|
|
self.events.on("send.stdout").call(
|
|
target=channel,
|
|
module_name="Tilderadio",
|
|
server=server,
|
|
message=utils.irc.color(
|
|
self.format_nowplaying(), utils.consts.GREEN
|
|
),
|
|
)
|
|
|
|
@utils.hook("received.command.np", alias_of="nowplaying")
|
|
@utils.hook("received.command.nowplaying")
|
|
@utils.kwarg("help", "show the current song on tilderadio")
|
|
def nowplaying(self, event):
|
|
self.save_nowplaying("")
|
|
event["stdout"].write(self.format_nowplaying())
|
|
|
|
@utils.hook("received.command.schedule")
|
|
@utils.kwarg("help", "show a link to the tilderadio schedule")
|
|
def schedule(self, event):
|
|
event["stdout"].write(f"you can find the schedule here: {SCHEDULE_URL}")
|
|
|
|
@utils.hook("received.command.un", alias_of="upnext")
|
|
@utils.hook("received.command.upnext")
|
|
@utils.kwarg("help", "show who's up next to stream")
|
|
def upnext(self, event):
|
|
js = utils.http.request(
|
|
AZURACAST_API_BASE + "/station/1/schedule"
|
|
).json()
|
|
if len(js) < 1:
|
|
event["stdout"].write("nothing scheduled")
|
|
else:
|
|
data = js[0]
|
|
start = utils.datetime.parse.iso8601(data["start"])
|
|
now = utils.datetime.utcnow()
|
|
total_secs = (start - now).total_seconds()
|
|
event["stdout"].write(
|
|
"{} is up next at {} UTC in {}!".format(
|
|
data["name"],
|
|
utils.datetime.format.datetime_human(start),
|
|
utils.datetime.format.to_pretty_time(total_secs),
|
|
)
|
|
)
|
|
|
|
@utils.hook("received.command.unn", alias_of="upnextnext")
|
|
@utils.hook("received.command.upnextnext")
|
|
@utils.kwarg("help", "show who's up after the next to stream")
|
|
def upnextnext(self, event):
|
|
js = utils.http.request(
|
|
AZURACAST_API_BASE + "/station/1/schedule"
|
|
).json()
|
|
if len(js) < 1:
|
|
event["stdout"].write("nothing scheduled")
|
|
else:
|
|
data = js[1]
|
|
start = utils.datetime.parse.iso8601(data["start"])
|
|
now = utils.datetime.utcnow()
|
|
total_secs = (start - now).total_seconds()
|
|
event["stdout"].write(
|
|
"{} is up next next at {} UTC in {}!".format(
|
|
data["name"],
|
|
utils.datetime.format.datetime_human(start),
|
|
utils.datetime.format.to_pretty_time(total_secs),
|
|
)
|
|
)
|
|
|
|
@utils.hook("received.command.subscribe")
|
|
@utils.kwarg("help", "sign up to get a direct message when a new dj goes online")
|
|
def subscribe(self, event):
|
|
subs = self.bot.get_setting("tilderadio-subscriptions", [])
|
|
nick = event["user"].nickname
|
|
if not nick in subs:
|
|
subs.append(nick)
|
|
self.bot.set_setting("tilderadio-subscriptions", subs)
|
|
event["stdout"].write("i'll send you a message when a dj goes online")
|
|
else:
|
|
event["stdout"].write("you're already subscribed!")
|
|
|
|
@utils.hook("received.command.unsubscribe")
|
|
@utils.kwarg("help", "unsubscribe from dj announcements")
|
|
def unsubscribe(self, event):
|
|
subs = self.bot.get_setting("tilderadio-subscriptions", [])
|
|
nick = event["user"].nickname
|
|
if nick in subs:
|
|
subs.remove(nick)
|
|
self.bot.set_setting("tilderadio-subscriptions", subs)
|
|
event["stdout"].write("ok, i'll no longer send you dj announcements")
|
|
else:
|
|
event["stdout"].write("you weren't subscribed. unable to remove")
|
|
|
|
@utils.hook("received.command.emailsubscribe")
|
|
@utils.kwarg("help", "sign up to get an email when a dj goes online")
|
|
@utils.spec("!<email>word")
|
|
def email_subscribe(self, event):
|
|
email = event["spec"][0]
|
|
_, addr = parseaddr(email)
|
|
if "@" in addr:
|
|
domain = addr.rsplit("@", 1)[-1]
|
|
try:
|
|
mxfound = bool(query(domain, "MX"))
|
|
subs = self.bot.get_setting("tilderadio-email-subscriptions", {})
|
|
nick = event["user"].nickname
|
|
subs.update({nick: addr})
|
|
self.bot.set_setting("tilderadio-email-subscriptions", subs)
|
|
|
|
event["stdout"].write(
|
|
f"ok, i'll send an email to you at {addr} when a dj goes online"
|
|
)
|
|
except:
|
|
event["stdout"].write(
|
|
f"your specified mail server {domain} is not configured to receive mail"
|
|
)
|
|
else:
|
|
event["stdout"].write(f"invalid email address")
|
|
|
|
@utils.hook("received.command.emailunsubscribe")
|
|
@utils.kwarg("help", "stop sending email notifications")
|
|
def email_unsubscribe(self, event):
|
|
subs = self.bot.get_setting("tilderadio-email-subscriptions", {})
|
|
nick = event["user"].nickname
|
|
if nick in subs:
|
|
subs.pop(nick)
|
|
self.bot.set_setting("tilderadio-email-subscriptions", subs)
|
|
event["stdout"].write("ok i'll stop sending you emails")
|
|
else:
|
|
event["stdout"].write("you weren't subscribed. unable to remove")
|
|
|
|
@utils.hook("received.command.dj")
|
|
@utils.kwarg("help", "check if someone is currently streaming")
|
|
def showdj(self, event):
|
|
if self.dj == "":
|
|
message = "no one is currently on the air"
|
|
else:
|
|
message = f"{self.dj} is now streaming!"
|
|
if self.broadcast_start:
|
|
now = utils.datetime.utcnow().timestamp()
|
|
total_seconds = now - self.broadcast_start
|
|
message += " (for {})".format(
|
|
utils.datetime.format.to_pretty_time(total_seconds)
|
|
)
|
|
event["stdout"].write(message)
|
|
|
|
@utils.hook("received.command.slogan")
|
|
@utils.kwarg("help", "get a random tilderadio slogan")
|
|
def slogan(self, event):
|
|
event["stdout"].write(random.choice(SLOGANS))
|
|
|
|
@utils.hook("received.command.paymybills")
|
|
def paymybills(self, event):
|
|
event["stdout"].write("whaddya mean?! i'm broker than you!")
|
|
|
|
@utils.hook("received.command.toot")
|
|
@utils.kwarg("require_mode", "v")
|
|
@utils.kwarg("help", "send a toot from the tilderadio mastodon account")
|
|
@utils.spec("!<status>string")
|
|
def toot(self, event):
|
|
status = event["spec"][0]
|
|
if event["target"].name != CHANNEL:
|
|
event["stderr"].write(f"tooting can only be used in {CHANNEL}")
|
|
return
|
|
|
|
if len(status) > 8:
|
|
nick = event["user"].nickname
|
|
status = emoji.emojize(status, use_aliases=True)
|
|
uri = self.mastodon.toot(f"{status}\n~~{nick}")["uri"]
|
|
event["stdout"].write(f"tooted!: {uri}")
|
|
|
|
@utils.hook("received.command.radiochannels")
|
|
@utils.kwarg("help", "show tilderadio notification channels")
|
|
def radiochannel(self, event):
|
|
event["stdout"].write(
|
|
f"my primary channel is {CHANNEL} and i will send dj announcements to {NOTIFY_CHANNELS}"
|
|
)
|
|
|
|
@utils.hook("received.command.now", alias_of="utcnow")
|
|
@utils.hook("received.command.utcnow")
|
|
@utils.kwarg("help", "show current utc time")
|
|
def utcnow(self, event):
|
|
event["stdout"].write(
|
|
utils.datetime.format.datetime_human(utils.datetime.utcnow())
|
|
)
|
|
|
|
@utils.hook("received.command.setdjtopic")
|
|
@utils.kwarg("require_mode", "v")
|
|
@utils.kwarg("help", "set your personal DJ topic")
|
|
@utils.spec("!<topic>string")
|
|
def setdjtopic(self, event):
|
|
if event["target"].name != CHANNEL:
|
|
event["stderr"].write(f"setdjtopic command can only be used in {CHANNEL}")
|
|
return
|
|
|
|
topic = event["spec"][0]
|
|
event["user"].set_setting("tilderadio-djtopic", topic)
|
|
event["stdout"].write(
|
|
f"Set {event['user'].nickname}'s DJ topic to \"{topic}\"."
|
|
)
|
|
|
|
@utils.hook("received.command.djtopic")
|
|
@utils.kwarg("help", "get the DJ topic for the active DJ or for any DJ")
|
|
@utils.spec("?<dj>user")
|
|
def djtopic(self, event):
|
|
dj = event["spec"][0]
|
|
if dj is None:
|
|
if self.dj == "":
|
|
event["stderr"].write(f"Nobody is streaming right now.")
|
|
return
|
|
if event["server"].has_user_id(self.dj):
|
|
dj = event["server"].get_user(self.dj)
|
|
else:
|
|
event["stderr"].write(
|
|
f"I can't find an account for {self.dj}. Try manually putting in their nickname?"
|
|
)
|
|
return
|
|
topic = dj.get_setting("tilderadio-djtopic", None)
|
|
if topic is None:
|
|
event["stderr"].write(f"{dj.nickname} doesn't have a DJ topic set.")
|
|
return
|
|
event["stdout"].write(f"DJ topic for {dj.nickname}: {topic}")
|