468 lines
14 KiB
Python
Executable File
468 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
from time import sleep, time
|
|
from random import randint
|
|
from datetime import datetime, date
|
|
import musicbrainzngs
|
|
from musicbrainzngs.musicbrainz import ResponseError
|
|
from asyncio.subprocess import create_subprocess_exec, PIPE
|
|
from threading import Thread
|
|
from urllib.request import urlopen
|
|
from PIL import Image, ImageFont, ImageDraw, ImageFilter
|
|
import asyncio
|
|
import signal
|
|
from io import BytesIO
|
|
import json
|
|
from pathlib import Path
|
|
from argparse import ArgumentParser, FileType
|
|
from configparser import ConfigParser
|
|
from fuzzy_match import algorithims
|
|
from mastodon import Mastodon
|
|
from textdistance import overlap
|
|
import requests
|
|
|
|
|
|
CURRENT_TRACK_SCRIPT = """
|
|
on getOrNull(value)
|
|
if value is "" or value is 0 then
|
|
return "null"
|
|
else if class of value is text then
|
|
return quote & value & quote
|
|
else
|
|
return value
|
|
end if
|
|
end getOrNull
|
|
tell application "iTunes"
|
|
"name: " & my getOrNull(name of current track) & "
|
|
artist: " & my getOrNull(artist of current track) & "
|
|
album: " & my getOrNull(album of current track) & "
|
|
duration: " & (duration of current track as text) & "
|
|
albumArtist: " & my getOrNull(album artist of current track)
|
|
end tell
|
|
"""
|
|
|
|
PLAYER_POSITION_SCRIPT = """
|
|
tell application "iTunes"
|
|
"position: " & (player position as text)
|
|
end tell
|
|
"""
|
|
|
|
PLAYER_STATE_SCRIPT = """
|
|
tell application "iTunes"
|
|
"state: " & player state
|
|
end tell
|
|
"""
|
|
|
|
|
|
async def osa(script):
|
|
|
|
proc = await create_subprocess_exec("osascript", stdin=PIPE, stdout=PIPE)
|
|
stdout, stderr = await proc.communicate(script.encode())
|
|
# import pdb
|
|
# pdb.set_trace()
|
|
await proc.wait()
|
|
return (
|
|
proc.returncode,
|
|
stdout and stdout.decode() or "",
|
|
stderr and stderr.decode() or "",
|
|
)
|
|
|
|
|
|
async def current_track():
|
|
code, msg, err = await osa(CURRENT_TRACK_SCRIPT)
|
|
|
|
if code != 0:
|
|
raise Exception(err)
|
|
|
|
ret = {}
|
|
for line in msg.split("\n"):
|
|
if len(line.strip()) == 0:
|
|
continue
|
|
k, v = line.split(":", 1)
|
|
ret[k] = v.strip()
|
|
|
|
ret["duration"] = float(ret["duration"].replace(",", "."))
|
|
return ret
|
|
|
|
|
|
async def player_state():
|
|
code, msg, err = await osa(PLAYER_STATE_SCRIPT)
|
|
|
|
if code != 0:
|
|
raise Exception(err)
|
|
|
|
return msg.split(":", 1)[1].strip()
|
|
|
|
|
|
async def player_position():
|
|
code, msg, err = await osa(PLAYER_POSITION_SCRIPT)
|
|
|
|
if code != 0:
|
|
raise Exception(err)
|
|
|
|
return float(msg.split(":", 1)[1].replace(",", "."))
|
|
|
|
|
|
async def mb_getinfo(track):
|
|
name = track["name"]
|
|
artist = track["artist"]
|
|
album = track["album"]
|
|
musicbrainzngs.set_useragent("playlistener", "0.1")
|
|
q = f"+recording:{name}^10 +artist:{artist}^5 +release:{album}^1"
|
|
try:
|
|
result = musicbrainzngs.search_recordings(
|
|
query=q,
|
|
strict=True,
|
|
)
|
|
except:
|
|
return None
|
|
if "recording-list" not in result:
|
|
return None
|
|
res = None
|
|
for releases in result["recording-list"]:
|
|
img = None
|
|
if "release-list" in releases:
|
|
res = {
|
|
"title": releases["title"],
|
|
"artist": ", ".join(
|
|
[
|
|
a["name"]
|
|
for a in releases["artist-credit"]
|
|
if isinstance(a, dict)
|
|
]
|
|
),
|
|
}
|
|
for release in releases["release-list"]:
|
|
try:
|
|
trig = algorithims.trigram(release["title"], album)
|
|
if trig < 0.9:
|
|
continue
|
|
req = BytesIO(musicbrainzngs.get_image_front(release["id"], 250))
|
|
img = Image.open(req)
|
|
img = img.resize((100, 100), resample=Image.LANCZOS)
|
|
except Exception as e:
|
|
print(release["id"], e)
|
|
else:
|
|
break
|
|
res["image"] = img
|
|
return res
|
|
|
|
|
|
class Osa:
|
|
listening = True
|
|
|
|
def close(self):
|
|
self.listening = False
|
|
|
|
async def listener(self):
|
|
current = None
|
|
start_time = None
|
|
begin = datetime.now()
|
|
trackinfo = None
|
|
while self.listening:
|
|
try:
|
|
state = await player_state()
|
|
except Exception:
|
|
continue
|
|
if state == "playing":
|
|
try:
|
|
track = await current_track()
|
|
except Exception:
|
|
continue
|
|
if current is None:
|
|
current = track
|
|
start_time = int(time())
|
|
trackinfo = await mb_getinfo(track)
|
|
elif current != track:
|
|
old = current
|
|
end_time = int(time())
|
|
yield trackinfo, end_time, start_time, old
|
|
current = None
|
|
start_time = None
|
|
print(
|
|
datetime.fromtimestamp(start_time)
|
|
if start_time is not None
|
|
else "",
|
|
datetime.now(),
|
|
0,
|
|
current["name"] if current is not None else "",
|
|
end="\r",
|
|
)
|
|
elif start_time is not None:
|
|
try:
|
|
pos = await player_position()
|
|
except:
|
|
continue
|
|
start_time += 1 if pos > 0 else 0
|
|
await asyncio.sleep(1)
|
|
if start_time is None:
|
|
start_time = int(time())
|
|
end_time = int(time())
|
|
old = current
|
|
if trackinfo:
|
|
yield trackinfo, end_time, start_time, old
|
|
|
|
def stop(self):
|
|
self.close()
|
|
|
|
|
|
async def out_playlist(osa, args):
|
|
playlist = []
|
|
total_duration = 0
|
|
async for trackinfo, end_time, start_time, track in osa.listener():
|
|
try:
|
|
if end_time - start_time > int(track["duration"] * args.threshold):
|
|
playlist.append(trackinfo)
|
|
total_duration += track["duration"] // 60
|
|
print(
|
|
total_duration,
|
|
track["duration"],
|
|
track["duration"] // 60,
|
|
args.duration,
|
|
"\n\n",
|
|
)
|
|
if total_duration >= args.duration:
|
|
osa.stop()
|
|
print(
|
|
datetime.fromtimestamp(start_time),
|
|
datetime.fromtimestamp(end_time),
|
|
end_time - start_time,
|
|
track["name"],
|
|
"OK",
|
|
)
|
|
else:
|
|
print(
|
|
datetime.fromtimestamp(start_time),
|
|
datetime.fromtimestamp(end_time),
|
|
end_time - start_time,
|
|
track["name"],
|
|
"NO",
|
|
end_time - start_time,
|
|
"<",
|
|
int(track["duration"] * args.threshold),
|
|
)
|
|
except Exception as e:
|
|
pass
|
|
return playlist
|
|
|
|
|
|
def gen_collage_with_text(playlist, outdir):
|
|
collage = Image.new("RGBA", (320, len(playlist) * 120))
|
|
for y, item in enumerate(playlist):
|
|
try:
|
|
collage.paste(item["image"], (0, y * 120))
|
|
draw = ImageDraw.Draw(collage)
|
|
except:
|
|
pass
|
|
try:
|
|
titlefont = ImageFont.FreeTypeFont("ChunkFive-Regular.ttf", 16)
|
|
artistfont = ImageFont.FreeTypeFont("ChunkFive-Regular.ttf", 12)
|
|
draw.text(
|
|
(120, y * 120),
|
|
item["title"],
|
|
font=titlefont,
|
|
)
|
|
draw.text(
|
|
(120, y * 120 + 20),
|
|
item["artist"],
|
|
font=artistfont,
|
|
)
|
|
except Exception as e:
|
|
print(e, item)
|
|
collage.save((outdir / "collage.png").open("wb+"), format="PNG")
|
|
|
|
|
|
def gen_collage(pl, outdir):
|
|
playlist = list(pl)
|
|
# https://math.stackexchange.com/questions/2907234/organizing-objects-in-a-near-square-pattern
|
|
# n is the size of the playlist
|
|
n = len(playlist)
|
|
# m^2 and (m+1)^2 are the two consecutive squares in which lies n
|
|
m = int(n ** 0.5)
|
|
# k is the surplus n - m^2
|
|
k = n - m ** 2
|
|
if 0 <= n < m:
|
|
rows = cols = m
|
|
else:
|
|
rows = m
|
|
cols = m + 1
|
|
collage = Image.new("RGBA", (cols * 120, rows * 120))
|
|
for x in range(cols):
|
|
for y in range(rows):
|
|
try:
|
|
item = playlist.pop(0)
|
|
except IndexError:
|
|
break
|
|
try:
|
|
collage.paste(item["image"], (x * 120 + 10, y * 120 + 10))
|
|
draw = ImageDraw.Draw(collage)
|
|
except:
|
|
pass
|
|
base = Image.new("RGBA", collage.size)
|
|
basedraw = ImageDraw.Draw(base)
|
|
basedraw.rectangle(((0, 0), collage.size), fill=(17, 24, 39, 255))
|
|
base.paste(collage, (0, 0), collage)
|
|
base = base.filter(ImageFilter.GaussianBlur(40))
|
|
base.paste(collage, (0, 0), collage)
|
|
base.save((outdir / "collage.png").open("wb+"), format="PNG")
|
|
|
|
|
|
def gen_json(playlist, outdir):
|
|
p = [
|
|
{k: v for k, v in z.items() if k != "image"} for z in playlist if z is not None
|
|
]
|
|
json.dump(p, (outdir / "playlist.json").open("w+"))
|
|
|
|
|
|
def gen_youtube(playlist, outdir):
|
|
instances = [
|
|
k[1]["uri"]
|
|
for k in requests.get(
|
|
"https://api.invidious.io/instances.json?sort_by=type,users"
|
|
).json()
|
|
if k[1]["monitor"] is not None
|
|
and float(k[1]["monitor"]["30dRatio"]["ratio"]) > 99.8
|
|
]
|
|
instance = instances.pop()
|
|
|
|
video_ids = []
|
|
for item in playlist:
|
|
if item is None:
|
|
continue
|
|
res = requests.get(
|
|
f"{instance}/api/v1/search",
|
|
params={
|
|
"q": f"{item['artist']} {item['title']}",
|
|
"type": "video",
|
|
"sort_by": "relevance",
|
|
},
|
|
)
|
|
try:
|
|
if len(results := res.json()) > 0:
|
|
first = results[0]
|
|
if overlap(f"{item['artist']} {item['title']}", first["title"]) >= 0.95:
|
|
video_ids.append(first["videoId"])
|
|
except:
|
|
pass
|
|
|
|
with requests.session() as s:
|
|
res = s.get(
|
|
"https://www.youtube.com/watch_videos",
|
|
params={"video_ids": ",".join(video_ids)},
|
|
cookies={
|
|
"CONSENT": "YES+cb.20210328-17-p0.en-GB+FX+{}".format(randint(100, 999))
|
|
},
|
|
)
|
|
(outdir / "youtube.url").write_text(res.url)
|
|
return res.url
|
|
|
|
|
|
def cmd_listen(args):
|
|
|
|
osa = Osa()
|
|
loop = asyncio.get_event_loop()
|
|
loop.add_signal_handler(signal.SIGINT, osa.stop)
|
|
|
|
playlist = loop.run_until_complete(out_playlist(osa, args))
|
|
loop.stop()
|
|
|
|
today = date.today()
|
|
|
|
if len(playlist) == 0:
|
|
return
|
|
|
|
outdir = Path(".") / "playlists" / str(today)
|
|
outdir.mkdir(parents=True, exist_ok=True)
|
|
gen_collage(playlist, outdir)
|
|
gen_json(playlist, outdir)
|
|
|
|
text = (
|
|
f"Here, have some music to listen ♫\n\n"
|
|
"My human friend plays some tracks and i "
|
|
"generate a playlist for you\n\n"
|
|
)
|
|
|
|
if args.do_youtube:
|
|
yt = gen_youtube(playlist, outdir)
|
|
text += f"you can listen it here: {yt} 🎶"
|
|
|
|
text += (
|
|
"\n\nI am still in a testing phase, please check before listening!"
|
|
"\n\n#mastomusic #mastoradio #music #fediplay #tootradio "
|
|
"#nowplaying #playlist #bot #fediversemusic"
|
|
)
|
|
|
|
if args.do_toot:
|
|
config = ConfigParser()
|
|
config.read_file(args.config)
|
|
|
|
mastodon = Mastodon(
|
|
client_id=config["mastodon"]["clientid"],
|
|
client_secret=config["mastodon"]["clientsecret"],
|
|
api_base_url=config["mastodon"]["instance"],
|
|
)
|
|
access_token = mastodon.log_in(
|
|
config["mastodon"]["username"], config["mastodon"]["password"]
|
|
)
|
|
media = mastodon.media_post((outdir / "collage.png").open("rb"), "image/png")
|
|
mastodon.status_post(text, media_ids=[media["id"]])
|
|
|
|
|
|
def cmd_request_mastodon_auth(args):
|
|
config = ConfigParser()
|
|
config.read_file(args.config)
|
|
|
|
if not "mastodon" in config.sections():
|
|
config.add_section("mastodon")
|
|
|
|
config.set("mastodon", "username", args.mastodon_username)
|
|
config.set("mastodon", "password", args.mastodon_password)
|
|
config.set("mastodon", "instance", args.mastodon_instance)
|
|
|
|
client_id, client_secret = Mastodon.create_app(
|
|
"playlistener",
|
|
api_base_url=args.mastodon_instance,
|
|
)
|
|
|
|
config.set("mastodon", "clientid", client_id)
|
|
config.set("mastodon", "clientsecret", client_secret)
|
|
|
|
args.config.seek(0)
|
|
config.write(args.config)
|
|
|
|
|
|
def main():
|
|
|
|
parser = ArgumentParser()
|
|
commands = [cmd[4:] for cmd in globals() if cmd.startswith("cmd_")]
|
|
|
|
parser.add_argument("command", choices=commands, type=str)
|
|
parser.add_argument(
|
|
"-t",
|
|
dest="threshold",
|
|
type=float,
|
|
default=0.8,
|
|
help="minimum %% of played tracks in order to be considered",
|
|
)
|
|
parser.add_argument(
|
|
"-d",
|
|
dest="duration",
|
|
type=int,
|
|
default=60,
|
|
help="maximum duration of the playlistin minutes",
|
|
)
|
|
parser.add_argument(
|
|
"-c", dest="config", type=FileType("r+"), default="playlistener.ini"
|
|
)
|
|
parser.add_argument("--mast-user", dest="mastodon_username", type=str)
|
|
parser.add_argument("--mast-pass", dest="mastodon_password", type=str)
|
|
parser.add_argument("--mast-instance", dest="mastodon_instance", type=str)
|
|
parser.add_argument("--toot", dest="do_toot", action="store_true")
|
|
parser.add_argument("--youtube", dest="do_youtube", action="store_true")
|
|
|
|
args = parser.parse_args()
|
|
|
|
globals()[f"cmd_{args.command}"](args)
|
|
|
|
|
|
main()
|