first commit

This commit is contained in:
leonardo 2022-05-14 16:32:27 +02:00
commit 977de95d01
2 changed files with 667 additions and 0 deletions

200
.gitignore vendored Normal file
View File

@ -0,0 +1,200 @@
# Created by https://www.toptal.com/developers/gitignore/api/python,macos
# Edit at https://www.toptal.com/developers/gitignore?templates=python,macos
### macOS ###
# General
.DS_Store
.AppleDouble
.LSOverride
# Icon must end with two \r
Icon
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
### macOS Patch ###
# iCloud generated files
*.icloud
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# End of https://www.toptal.com/developers/gitignore/api/python,macos

467
playlistener.py Executable file
View File

@ -0,0 +1,467 @@
#!/usr/bin/env python3
from time import sleep, time
from random import randint
from datetime import datetime, date
import musicbrainzngs
from musicbrainzngs.musicbrainz import ResponseError
from asyncio.subprocess import create_subprocess_exec, PIPE
from threading import Thread
from urllib.request import urlopen
from PIL import Image, ImageFont, ImageDraw, ImageFilter
import asyncio
import signal
from io import BytesIO
import json
from pathlib import Path
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from fuzzy_match import algorithims
from mastodon import Mastodon
from textdistance import overlap
import requests
CURRENT_TRACK_SCRIPT = """
on getOrNull(value)
if value is "" or value is 0 then
return "null"
else if class of value is text then
return quote & value & quote
else
return value
end if
end getOrNull
tell application "iTunes"
"name: " & my getOrNull(name of current track) & "
artist: " & my getOrNull(artist of current track) & "
album: " & my getOrNull(album of current track) & "
duration: " & (duration of current track as text) & "
albumArtist: " & my getOrNull(album artist of current track)
end tell
"""
PLAYER_POSITION_SCRIPT = """
tell application "iTunes"
"position: " & (player position as text)
end tell
"""
PLAYER_STATE_SCRIPT = """
tell application "iTunes"
"state: " & player state
end tell
"""
async def osa(script):
proc = await create_subprocess_exec("osascript", stdin=PIPE, stdout=PIPE)
stdout, stderr = await proc.communicate(script.encode())
# import pdb
# pdb.set_trace()
await proc.wait()
return (
proc.returncode,
stdout and stdout.decode() or "",
stderr and stderr.decode() or "",
)
async def current_track():
code, msg, err = await osa(CURRENT_TRACK_SCRIPT)
if code != 0:
raise Exception(err)
ret = {}
for line in msg.split("\n"):
if len(line.strip()) == 0:
continue
k, v = line.split(":", 1)
ret[k] = v.strip()
ret["duration"] = float(ret["duration"].replace(",", "."))
return ret
async def player_state():
code, msg, err = await osa(PLAYER_STATE_SCRIPT)
if code != 0:
raise Exception(err)
return msg.split(":", 1)[1].strip()
async def player_position():
code, msg, err = await osa(PLAYER_POSITION_SCRIPT)
if code != 0:
raise Exception(err)
return float(msg.split(":", 1)[1].replace(",", "."))
async def mb_getinfo(track):
name = track["name"]
artist = track["artist"]
album = track["album"]
musicbrainzngs.set_useragent("playlistener", "0.1")
q = f"+recording:{name}^10 +artist:{artist}^5 +release:{album}^1"
try:
result = musicbrainzngs.search_recordings(
query=q,
strict=True,
)
except:
return None
if "recording-list" not in result:
return None
res = None
for releases in result["recording-list"]:
img = None
if "release-list" in releases:
res = {
"title": releases["title"],
"artist": ", ".join(
[
a["name"]
for a in releases["artist-credit"]
if isinstance(a, dict)
]
),
}
for release in releases["release-list"]:
try:
trig = algorithims.trigram(release["title"], album)
if trig < 0.9:
continue
req = BytesIO(musicbrainzngs.get_image_front(release["id"], 250))
img = Image.open(req)
img = img.resize((100, 100), resample=Image.LANCZOS)
except Exception as e:
print(release["id"], e)
else:
break
res["image"] = img
return res
class Osa:
listening = True
def close(self):
self.listening = False
async def listener(self):
current = None
start_time = None
begin = datetime.now()
trackinfo = None
while self.listening:
try:
state = await player_state()
except Exception:
continue
if state == "playing":
try:
track = await current_track()
except Exception:
continue
if current is None:
current = track
start_time = int(time())
trackinfo = await mb_getinfo(track)
elif current != track:
old = current
end_time = int(time())
yield trackinfo, end_time, start_time, old
current = None
start_time = None
print(
datetime.fromtimestamp(start_time)
if start_time is not None
else "",
datetime.now(),
0,
current["name"] if current is not None else "",
end="\r",
)
elif start_time is not None:
try:
pos = await player_position()
except:
continue
start_time += 1 if pos > 0 else 0
await asyncio.sleep(1)
if start_time is None:
start_time = int(time())
end_time = int(time())
old = current
if trackinfo:
yield trackinfo, end_time, start_time, old
def stop(self):
self.close()
async def out_playlist(osa, args):
playlist = []
total_duration = 0
async for trackinfo, end_time, start_time, track in osa.listener():
try:
if end_time - start_time > int(track["duration"] * args.threshold):
playlist.append(trackinfo)
total_duration += track["duration"] // 60
print(
total_duration,
track["duration"],
track["duration"] // 60,
args.duration,
"\n\n",
)
if total_duration >= args.duration:
osa.stop()
print(
datetime.fromtimestamp(start_time),
datetime.fromtimestamp(end_time),
end_time - start_time,
track["name"],
"OK",
)
else:
print(
datetime.fromtimestamp(start_time),
datetime.fromtimestamp(end_time),
end_time - start_time,
track["name"],
"NO",
end_time - start_time,
"<",
int(track["duration"] * args.threshold),
)
except Exception as e:
pass
return playlist
def gen_collage_with_text(playlist, outdir):
collage = Image.new("RGBA", (320, len(playlist) * 120))
for y, item in enumerate(playlist):
try:
collage.paste(item["image"], (0, y * 120))
draw = ImageDraw.Draw(collage)
except:
pass
try:
titlefont = ImageFont.FreeTypeFont("ChunkFive-Regular.ttf", 16)
artistfont = ImageFont.FreeTypeFont("ChunkFive-Regular.ttf", 12)
draw.text(
(120, y * 120),
item["title"],
font=titlefont,
)
draw.text(
(120, y * 120 + 20),
item["artist"],
font=artistfont,
)
except Exception as e:
print(e, item)
collage.save((outdir / "collage.png").open("wb+"), format="PNG")
def gen_collage(pl, outdir):
playlist = list(pl)
# https://math.stackexchange.com/questions/2907234/organizing-objects-in-a-near-square-pattern
# n is the size of the playlist
n = len(playlist)
# m^2 and (m+1)^2 are the two consecutive squares in which lies n
m = int(n ** 0.5)
# k is the surplus n - m^2
k = n - m ** 2
if 0 <= n < m:
rows = cols = m
else:
rows = m
cols = m + 1
collage = Image.new("RGBA", (cols * 120, rows * 120))
for x in range(cols):
for y in range(rows):
try:
item = playlist.pop(0)
except IndexError:
break
try:
collage.paste(item["image"], (x * 120 + 10, y * 120 + 10))
draw = ImageDraw.Draw(collage)
except:
pass
base = Image.new("RGBA", collage.size)
basedraw = ImageDraw.Draw(base)
basedraw.rectangle(((0, 0), collage.size), fill=(17, 24, 39, 255))
base.paste(collage, (0, 0), collage)
base = base.filter(ImageFilter.GaussianBlur(40))
base.paste(collage, (0, 0), collage)
base.save((outdir / "collage.png").open("wb+"), format="PNG")
def gen_json(playlist, outdir):
p = [
{k: v for k, v in z.items() if k != "image"} for z in playlist if z is not None
]
json.dump(p, (outdir / "playlist.json").open("w+"))
def gen_youtube(playlist, outdir):
instances = [
k[1]["uri"]
for k in requests.get(
"https://api.invidious.io/instances.json?sort_by=type,users"
).json()
if k[1]["monitor"] is not None
and float(k[1]["monitor"]["30dRatio"]["ratio"]) > 99.8
]
instance = instances.pop()
video_ids = []
for item in playlist:
if item is None:
continue
res = requests.get(
f"{instance}/api/v1/search",
params={
"q": f"{item['artist']} {item['title']}",
"type": "video",
"sort_by": "relevance",
},
)
try:
if len(results := res.json()) > 0:
first = results[0]
if overlap(f"{item['artist']} {item['title']}", first["title"]) >= 0.95:
video_ids.append(first["videoId"])
except:
pass
with requests.session() as s:
res = s.get(
"https://www.youtube.com/watch_videos",
params={"video_ids": ",".join(video_ids)},
cookies={
"CONSENT": "YES+cb.20210328-17-p0.en-GB+FX+{}".format(randint(100, 999))
},
)
(outdir / "youtube.url").write_text(res.url)
return res.url
def cmd_listen(args):
osa = Osa()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, osa.stop)
playlist = loop.run_until_complete(out_playlist(osa, args))
loop.stop()
today = date.today()
if len(playlist) == 0:
return
outdir = Path(".") / "playlists" / str(today)
outdir.mkdir(parents=True, exist_ok=True)
gen_collage(playlist, outdir)
gen_json(playlist, outdir)
text = (
f"Here, have some music to listen ♫\n\n"
"My human friend plays some tracks and i "
"generate a playlist for you\n\n"
)
if args.do_youtube:
yt = gen_youtube(playlist, outdir)
text += f"you can listen it here: {yt} 🎶"
text += (
"\n\nI am still in a testing phase, please check before listening!"
"\n\n#mastomusic #mastoradio #music #fediplay #tootradio "
"#nowplaying #playlist #bot #fediversemusic"
)
if args.do_toot:
config = ConfigParser()
config.read_file(args.config)
mastodon = Mastodon(
client_id=config["mastodon"]["clientid"],
client_secret=config["mastodon"]["clientsecret"],
api_base_url=config["mastodon"]["instance"],
)
access_token = mastodon.log_in(
config["mastodon"]["username"], config["mastodon"]["password"]
)
media = mastodon.media_post((outdir / "collage.png").open("rb"), "image/png")
mastodon.status_post(text, media_ids=[media["id"]])
def cmd_request_mastodon_auth(args):
config = ConfigParser()
config.read_file(args.config)
if not "mastodon" in config.sections():
config.add_section("mastodon")
config.set("mastodon", "username", args.mastodon_username)
config.set("mastodon", "password", args.mastodon_password)
config.set("mastodon", "instance", args.mastodon_instance)
client_id, client_secret = Mastodon.create_app(
"playlistener",
api_base_url=args.mastodon_instance,
)
config.set("mastodon", "clientid", client_id)
config.set("mastodon", "clientsecret", client_secret)
args.config.seek(0)
config.write(args.config)
def main():
parser = ArgumentParser()
commands = [cmd[4:] for cmd in globals() if cmd.startswith("cmd_")]
parser.add_argument("command", choices=commands, type=str)
parser.add_argument(
"-t",
dest="threshold",
type=float,
default=0.8,
help="minimum %% of played tracks in order to be considered",
)
parser.add_argument(
"-d",
dest="duration",
type=int,
default=60,
help="maximum duration of the playlistin minutes",
)
parser.add_argument(
"-c", dest="config", type=FileType("r+"), default="playlistener.ini"
)
parser.add_argument("--mast-user", dest="mastodon_username", type=str)
parser.add_argument("--mast-pass", dest="mastodon_password", type=str)
parser.add_argument("--mast-instance", dest="mastodon_instance", type=str)
parser.add_argument("--toot", dest="do_toot", action="store_true")
parser.add_argument("--youtube", dest="do_youtube", action="store_true")
args = parser.parse_args()
globals()[f"cmd_{args.command}"](args)
main()