minerbot2/plugins/radio.py

100 lines
3.1 KiB
Python

import requests, traceback, time, subprocess
from bot import IRCLine
BOT = None
URL = "http://radio.tildeverse.org:8000/status-json.xsl"
AZURA = "https://radio.tildeverse.org/api/nowplaying/1"
cround = lambda x: x if type(x)!=float else round(x,2)
cdivmod = lambda *args: [cround(int(x) if i==0 else x) for i,x in enumerate(divmod(*args))]
def respond(event,msg,prefix=True):
if not BOT: return
is_channel = event.target.startswith("#")
if prefix:
if is_channel:
prefix = f"{event.hostmask.nick}: "
else:
prefix = ""
else:
prefix = ""
target = event.target if is_channel else event.hostmask.nick
BOT.socket.send(IRCLine("PRIVMSG",[target,":"+prefix+msg]))
def request(func, azura=False):
try:
r = requests.get(AZURA if azura else URL)
r.raise_for_status()
return func(r.json())
except:
print("Error performing request:")
traceback.print_exc()
return False
def privmsg(event):
if BOT.in_batch: return
if event.hostmask.nick!="tildebot": return
if "now playing: Stream Offline" in event.message:
try:
song = request(lambda x: x["icestats"]["source"][0]["yp_currently_playing"])
assert bool(song)
respond(event,"now playing: "+song,False)
except:
print("Error formatting response:")
traceback.print_exc()
pass
def on_command_radiopull(event):
if len(event.parts)>1:
command, args = event.parts[0], event.parts[1:]
elif len(event.parts)==1:
command = event.parts[0]
args = []
else:
command=""
args = []
if command in "np nowplaying now".split():
song = request(lambda x: [x["icestats"]["source"][0]["yp_currently_playing"],sum([source["listeners"] for source in x["icestats"]["source"]])])
if not song:
respond(event,"Cannot find currently playing song for some reason.")
return
respond(event,f"now playing: {song[0]} (for {song[1]!s} listeners)")
elif command=="dj":
is_live, name, start = request(lambda x: [x["live"].get(a) for a in "is_live streamer_name broadcast_start".split()],True)
if not is_live or (name is None and start is None):
respond(event,"Nobody is streaming at the moment (AutoDJ?)")
if start is None and name:
respond(event,f"{name} is streaming right now! (I don't know for how long though...)")
if start is not None:
ts = ""
seconds = round(time.time()-start,2)
if seconds>60:
minutes, seconds = cdivmod(seconds,60)
if minutes>60:
hours, minutes = cdivmod(minutes,60)
ts = f"{hours}h {minutes}m {seconds}s"
else:
ts = f"{minutes}m {seconds}s"
else:
ts = f"{seconds}s"
if not name:
respond(event,f"Whoever's streaming has been streaming for {ts}. (This should never be seen; consult tildebot for dj name)")
else:
respond(event,f"{name} is streaming right now! (Streaming for {ts})")
elif command=="un":
r = requests.get("https://tilderadio.org/schedule/nextdj.php")
try:
r.raise_for_status()
except:
respond(event,"Error with tilderadio API")
return
respond(event,r.text.strip())
else:
respond(event,f"Usage: {BOT.prefix}radiopull <np/nowplaying/now/dj/un>")
def register(bot):
global BOT
BOT=bot
bot.event_manager.on("privmsg",privmsg)
bot.event_manager.on("command_radiopull",on_command_radiopull)