playlistener/playlistener.py

468 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
from time import sleep, time
from random import randint
from datetime import datetime, date
import musicbrainzngs
from musicbrainzngs.musicbrainz import ResponseError
from asyncio.subprocess import create_subprocess_exec, PIPE
from threading import Thread
from urllib.request import urlopen
from PIL import Image, ImageFont, ImageDraw, ImageFilter
import asyncio
import signal
from io import BytesIO
import json
from pathlib import Path
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from fuzzy_match import algorithims
from mastodon import Mastodon
from textdistance import overlap
import requests
CURRENT_TRACK_SCRIPT = """
on getOrNull(value)
if value is "" or value is 0 then
return "null"
else if class of value is text then
return quote & value & quote
else
return value
end if
end getOrNull
tell application "iTunes"
"name: " & my getOrNull(name of current track) & "
artist: " & my getOrNull(artist of current track) & "
album: " & my getOrNull(album of current track) & "
duration: " & (duration of current track as text) & "
albumArtist: " & my getOrNull(album artist of current track)
end tell
"""
PLAYER_POSITION_SCRIPT = """
tell application "iTunes"
"position: " & (player position as text)
end tell
"""
PLAYER_STATE_SCRIPT = """
tell application "iTunes"
"state: " & player state
end tell
"""
async def osa(script):
proc = await create_subprocess_exec("osascript", stdin=PIPE, stdout=PIPE)
stdout, stderr = await proc.communicate(script.encode())
# import pdb
# pdb.set_trace()
await proc.wait()
return (
proc.returncode,
stdout and stdout.decode() or "",
stderr and stderr.decode() or "",
)
async def current_track():
code, msg, err = await osa(CURRENT_TRACK_SCRIPT)
if code != 0:
raise Exception(err)
ret = {}
for line in msg.split("\n"):
if len(line.strip()) == 0:
continue
k, v = line.split(":", 1)
ret[k] = v.strip()
ret["duration"] = float(ret["duration"].replace(",", "."))
return ret
async def player_state():
code, msg, err = await osa(PLAYER_STATE_SCRIPT)
if code != 0:
raise Exception(err)
return msg.split(":", 1)[1].strip()
async def player_position():
code, msg, err = await osa(PLAYER_POSITION_SCRIPT)
if code != 0:
raise Exception(err)
return float(msg.split(":", 1)[1].replace(",", "."))
async def mb_getinfo(track):
name = track["name"]
artist = track["artist"]
album = track["album"]
musicbrainzngs.set_useragent("playlistener", "0.1")
q = f"+recording:{name}^10 +artist:{artist}^5 +release:{album}^1"
try:
result = musicbrainzngs.search_recordings(
query=q,
strict=True,
)
except:
return None
if "recording-list" not in result:
return None
res = None
for releases in result["recording-list"]:
img = None
if "release-list" in releases:
res = {
"title": releases["title"],
"artist": ", ".join(
[
a["name"]
for a in releases["artist-credit"]
if isinstance(a, dict)
]
),
}
for release in releases["release-list"]:
try:
trig = algorithims.trigram(release["title"], album)
if trig < 0.9:
continue
req = BytesIO(musicbrainzngs.get_image_front(release["id"], 250))
img = Image.open(req)
img = img.resize((100, 100), resample=Image.LANCZOS)
except Exception as e:
print(release["id"], e)
else:
break
res["image"] = img
return res
class Osa:
listening = True
def close(self):
self.listening = False
async def listener(self):
current = None
start_time = None
begin = datetime.now()
trackinfo = None
while self.listening:
try:
state = await player_state()
except Exception:
continue
if state == "playing":
try:
track = await current_track()
except Exception:
continue
if current is None:
current = track
start_time = int(time())
trackinfo = await mb_getinfo(track)
elif current != track:
old = current
end_time = int(time())
yield trackinfo, end_time, start_time, old
current = None
start_time = None
print(
datetime.fromtimestamp(start_time)
if start_time is not None
else "",
datetime.now(),
0,
current["name"] if current is not None else "",
end="\r",
)
elif start_time is not None:
try:
pos = await player_position()
except:
continue
start_time += 1 if pos > 0 else 0
await asyncio.sleep(1)
if start_time is None:
start_time = int(time())
end_time = int(time())
old = current
if trackinfo:
yield trackinfo, end_time, start_time, old
def stop(self):
self.close()
async def out_playlist(osa, args):
playlist = []
total_duration = 0
async for trackinfo, end_time, start_time, track in osa.listener():
try:
if end_time - start_time > int(track["duration"] * args.threshold):
playlist.append(trackinfo)
total_duration += track["duration"] // 60
print(
total_duration,
track["duration"],
track["duration"] // 60,
args.duration,
"\n\n",
)
if total_duration >= args.duration:
osa.stop()
print(
datetime.fromtimestamp(start_time),
datetime.fromtimestamp(end_time),
end_time - start_time,
track["name"],
"OK",
)
else:
print(
datetime.fromtimestamp(start_time),
datetime.fromtimestamp(end_time),
end_time - start_time,
track["name"],
"NO",
end_time - start_time,
"<",
int(track["duration"] * args.threshold),
)
except Exception as e:
pass
return playlist
def gen_collage_with_text(playlist, outdir):
collage = Image.new("RGBA", (320, len(playlist) * 120))
for y, item in enumerate(playlist):
try:
collage.paste(item["image"], (0, y * 120))
draw = ImageDraw.Draw(collage)
except:
pass
try:
titlefont = ImageFont.FreeTypeFont("ChunkFive-Regular.ttf", 16)
artistfont = ImageFont.FreeTypeFont("ChunkFive-Regular.ttf", 12)
draw.text(
(120, y * 120),
item["title"],
font=titlefont,
)
draw.text(
(120, y * 120 + 20),
item["artist"],
font=artistfont,
)
except Exception as e:
print(e, item)
collage.save((outdir / "collage.png").open("wb+"), format="PNG")
def gen_collage(pl, outdir):
playlist = list(pl)
# https://math.stackexchange.com/questions/2907234/organizing-objects-in-a-near-square-pattern
# n is the size of the playlist
n = len(playlist)
# m^2 and (m+1)^2 are the two consecutive squares in which lies n
m = int(n ** 0.5)
# k is the surplus n - m^2
k = n - m ** 2
if 0 <= n < m:
rows = cols = m
else:
rows = m
cols = m + 1
collage = Image.new("RGBA", (cols * 120, rows * 120))
for x in range(cols):
for y in range(rows):
try:
item = playlist.pop(0)
except IndexError:
break
try:
collage.paste(item["image"], (x * 120 + 10, y * 120 + 10))
draw = ImageDraw.Draw(collage)
except:
pass
base = Image.new("RGBA", collage.size)
basedraw = ImageDraw.Draw(base)
basedraw.rectangle(((0, 0), collage.size), fill=(17, 24, 39, 255))
base.paste(collage, (0, 0), collage)
base = base.filter(ImageFilter.GaussianBlur(40))
base.paste(collage, (0, 0), collage)
base.save((outdir / "collage.png").open("wb+"), format="PNG")
def gen_json(playlist, outdir):
p = [
{k: v for k, v in z.items() if k != "image"} for z in playlist if z is not None
]
json.dump(p, (outdir / "playlist.json").open("w+"))
def gen_youtube(playlist, outdir):
instances = [
k[1]["uri"]
for k in requests.get(
"https://api.invidious.io/instances.json?sort_by=type,users"
).json()
if k[1]["monitor"] is not None
and float(k[1]["monitor"]["30dRatio"]["ratio"]) > 99.8
]
instance = instances.pop()
video_ids = []
for item in playlist:
if item is None:
continue
res = requests.get(
f"{instance}/api/v1/search",
params={
"q": f"{item['artist']} {item['title']}",
"type": "video",
"sort_by": "relevance",
},
)
try:
if len(results := res.json()) > 0:
first = results[0]
if overlap(f"{item['artist']} {item['title']}", first["title"]) >= 0.95:
video_ids.append(first["videoId"])
except:
pass
with requests.session() as s:
res = s.get(
"https://www.youtube.com/watch_videos",
params={"video_ids": ",".join(video_ids)},
cookies={
"CONSENT": "YES+cb.20210328-17-p0.en-GB+FX+{}".format(randint(100, 999))
},
)
(outdir / "youtube.url").write_text(res.url)
return res.url
def cmd_listen(args):
osa = Osa()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, osa.stop)
playlist = loop.run_until_complete(out_playlist(osa, args))
loop.stop()
today = date.today()
if len(playlist) == 0:
return
outdir = Path(".") / "playlists" / str(today)
outdir.mkdir(parents=True, exist_ok=True)
gen_collage(playlist, outdir)
gen_json(playlist, outdir)
text = (
f"Here, have some music to listen ♫\n\n"
"My human friend plays some tracks and i "
"generate a playlist for you\n\n"
)
if args.do_youtube:
yt = gen_youtube(playlist, outdir)
text += f"you can listen it here: {yt} 🎶"
text += (
"\n\nI am still in a testing phase, please check before listening!"
"\n\n#mastomusic #mastoradio #music #fediplay #tootradio "
"#nowplaying #playlist #bot #fediversemusic"
)
if args.do_toot:
config = ConfigParser()
config.read_file(args.config)
mastodon = Mastodon(
client_id=config["mastodon"]["clientid"],
client_secret=config["mastodon"]["clientsecret"],
api_base_url=config["mastodon"]["instance"],
)
access_token = mastodon.log_in(
config["mastodon"]["username"], config["mastodon"]["password"]
)
media = mastodon.media_post((outdir / "collage.png").open("rb"), "image/png")
mastodon.status_post(text, media_ids=[media["id"]])
def cmd_request_mastodon_auth(args):
config = ConfigParser()
config.read_file(args.config)
if not "mastodon" in config.sections():
config.add_section("mastodon")
config.set("mastodon", "username", args.mastodon_username)
config.set("mastodon", "password", args.mastodon_password)
config.set("mastodon", "instance", args.mastodon_instance)
client_id, client_secret = Mastodon.create_app(
"playlistener",
api_base_url=args.mastodon_instance,
)
config.set("mastodon", "clientid", client_id)
config.set("mastodon", "clientsecret", client_secret)
args.config.seek(0)
config.write(args.config)
def main():
parser = ArgumentParser()
commands = [cmd[4:] for cmd in globals() if cmd.startswith("cmd_")]
parser.add_argument("command", choices=commands, type=str)
parser.add_argument(
"-t",
dest="threshold",
type=float,
default=0.8,
help="minimum %% of played tracks in order to be considered",
)
parser.add_argument(
"-d",
dest="duration",
type=int,
default=60,
help="maximum duration of the playlistin minutes",
)
parser.add_argument(
"-c", dest="config", type=FileType("r+"), default="playlistener.ini"
)
parser.add_argument("--mast-user", dest="mastodon_username", type=str)
parser.add_argument("--mast-pass", dest="mastodon_password", type=str)
parser.add_argument("--mast-instance", dest="mastodon_instance", type=str)
parser.add_argument("--toot", dest="do_toot", action="store_true")
parser.add_argument("--youtube", dest="do_youtube", action="store_true")
args = parser.parse_args()
globals()[f"cmd_{args.command}"](args)
main()