Implement Concurrent Optimized Sync and Now Playing Processes (#4941)

This commit is contained in:
Buster "Silver Eagle" Neece 2022-01-04 16:46:49 -06:00 committed by GitHub
parent 0cc3008e4f
commit 9cfc2ee95d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
62 changed files with 1391 additions and 1380 deletions

View File

@ -18,10 +18,18 @@ release channel, you can take advantage of these new features and fixes.
- **Vue Components Everywhere**: As part of our Roadmap to 1.0, we've switched a vast majority of the AzuraCast
application to be powered by Vue frontend components that connect directly to, and exclusively use, our powerful REST
API to perform functions. Not only does this make for more snappy, responsive user experiences, but it also means that _everything_ you can do in the web application is now possible via the API as well; while we haven't documented
API to perform functions. Not only does this make for more snappy, responsive user experiences, but it also means
that _everything_ you can do in the web application is now possible via the API as well; while we haven't documented
all of these endpoints yet, you can use your browser's inspector console to see how we call our internal APIs and do
the same in your own applications.
- The routine synchronization process has been completely rebuilt from the ground up to be concurrent and asynchronous:
- The 1-minute, 5-minute and 1-hour sync tasks have been merged into a single task manager that staggers the tasks
across the hour to ensure CPU load never has huge peaks at the top of the hour.
- Each synchronized task is isolated in its own process, so any failure won't cause a failure for subsequent tasks.
- The "Now Playing" synchronization is now isolated and runs per-station, so an outage on a single station won't
affect other stations; this new worker-process method also ensures station metadata is checked more frequently.
- Storage Locations have been overhauled and made more useful:
- Quotas are now enforced for all storage location types (media, recordings, podcasts, and backups)
- Free space and used space is now shown on the podcast management page

View File

@ -38,6 +38,7 @@
"doctrine/dbal": "^2.8",
"doctrine/migrations": "^3.0",
"doctrine/orm": "~2.6",
"dragonmantank/cron-expression": "^3.1",
"gettext/gettext": "^4.4",
"guzzlehttp/guzzle": "^7.0",
"guzzlehttp/oauth-subscriber": "^0.6.0",

179
composer.lock generated
View File

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "a2801a551591a3d7b981e12ab81e7b8a",
"content-hash": "6b6bbb42f740ddb4d9916438a261865d",
"packages": [
{
"name": "aws/aws-crt-php",
@ -2225,6 +2225,67 @@
},
"time": "2021-10-25T19:59:10+00:00"
},
{
"name": "dragonmantank/cron-expression",
"version": "v3.1.0",
"source": {
"type": "git",
"url": "https://github.com/dragonmantank/cron-expression.git",
"reference": "7a8c6e56ab3ffcc538d05e8155bb42269abf1a0c"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/dragonmantank/cron-expression/zipball/7a8c6e56ab3ffcc538d05e8155bb42269abf1a0c",
"reference": "7a8c6e56ab3ffcc538d05e8155bb42269abf1a0c",
"shasum": ""
},
"require": {
"php": "^7.2|^8.0",
"webmozart/assert": "^1.7.0"
},
"replace": {
"mtdowling/cron-expression": "^1.0"
},
"require-dev": {
"phpstan/extension-installer": "^1.0",
"phpstan/phpstan": "^0.12",
"phpstan/phpstan-webmozart-assert": "^0.12.7",
"phpunit/phpunit": "^7.0|^8.0|^9.0"
},
"type": "library",
"autoload": {
"psr-4": {
"Cron\\": "src/Cron/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Chris Tankersley",
"email": "chris@ctankersley.com",
"homepage": "https://github.com/dragonmantank"
}
],
"description": "CRON for PHP: Calculate the next or previous run date and determine if a CRON expression is due",
"keywords": [
"cron",
"schedule"
],
"support": {
"issues": "https://github.com/dragonmantank/cron-expression/issues",
"source": "https://github.com/dragonmantank/cron-expression/tree/v3.1.0"
},
"funding": [
{
"url": "https://github.com/dragonmantank",
"type": "github"
}
],
"time": "2020-11-24T19:55:57+00:00"
},
{
"name": "egulias/email-validator",
"version": "3.1.2",
@ -9399,6 +9460,64 @@
],
"time": "2020-12-02T01:58:49+00:00"
},
{
"name": "webmozart/assert",
"version": "1.10.0",
"source": {
"type": "git",
"url": "https://github.com/webmozarts/assert.git",
"reference": "6964c76c7804814a842473e0c8fd15bab0f18e25"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/webmozarts/assert/zipball/6964c76c7804814a842473e0c8fd15bab0f18e25",
"reference": "6964c76c7804814a842473e0c8fd15bab0f18e25",
"shasum": ""
},
"require": {
"php": "^7.2 || ^8.0",
"symfony/polyfill-ctype": "^1.8"
},
"conflict": {
"phpstan/phpstan": "<0.12.20",
"vimeo/psalm": "<4.6.1 || 4.6.2"
},
"require-dev": {
"phpunit/phpunit": "^8.5.13"
},
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "1.10-dev"
}
},
"autoload": {
"psr-4": {
"Webmozart\\Assert\\": "src/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Bernhard Schussek",
"email": "bschussek@gmail.com"
}
],
"description": "Assertions to validate method input/output with nice error messages.",
"keywords": [
"assert",
"check",
"validate"
],
"support": {
"issues": "https://github.com/webmozarts/assert/issues",
"source": "https://github.com/webmozarts/assert/tree/1.10.0"
},
"time": "2021-03-09T10:59:23+00:00"
},
{
"name": "wikimedia/composer-merge-plugin",
"version": "dev-master",
@ -13998,64 +14117,6 @@
}
],
"time": "2021-07-28T10:34:58+00:00"
},
{
"name": "webmozart/assert",
"version": "1.10.0",
"source": {
"type": "git",
"url": "https://github.com/webmozarts/assert.git",
"reference": "6964c76c7804814a842473e0c8fd15bab0f18e25"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/webmozarts/assert/zipball/6964c76c7804814a842473e0c8fd15bab0f18e25",
"reference": "6964c76c7804814a842473e0c8fd15bab0f18e25",
"shasum": ""
},
"require": {
"php": "^7.2 || ^8.0",
"symfony/polyfill-ctype": "^1.8"
},
"conflict": {
"phpstan/phpstan": "<0.12.20",
"vimeo/psalm": "<4.6.1 || 4.6.2"
},
"require-dev": {
"phpunit/phpunit": "^8.5.13"
},
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "1.10-dev"
}
},
"autoload": {
"psr-4": {
"Webmozart\\Assert\\": "src/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Bernhard Schussek",
"email": "bschussek@gmail.com"
}
],
"description": "Assertions to validate method input/output with nice error messages.",
"keywords": [
"assert",
"check",
"validate"
],
"support": {
"issues": "https://github.com/webmozarts/assert/issues",
"source": "https://github.com/webmozarts/assert/tree/1.10.0"
},
"time": "2021-03-09T10:59:23+00:00"
}
],
"aliases": [],

View File

@ -31,7 +31,10 @@ return function (App\Event\BuildConsoleCommands $event) {
'azuracast:setup:fixtures' => Command\SetupFixturesCommand::class,
'azuracast:setup' => Command\SetupCommand::class,
'azuracast:radio:restart' => Command\RestartRadioCommand::class,
'azuracast:sync:run' => Command\SyncCommand::class,
'azuracast:sync:nowplaying' => Command\Sync\NowPlayingCommand::class,
'azuracast:sync:nowplaying:station' => Command\Sync\NowPlayingPerStationCommand::class,
'azuracast:sync:run' => Command\Sync\RunnerCommand::class,
'azuracast:sync:task' => Command\Sync\SingleTaskCommand::class,
'azuracast:media:reprocess' => Command\ReprocessMediaCommand::class,
'azuracast:api:docs' => Command\GenerateApiDocsCommand::class,
'locale:generate' => Command\Locale\GenerateCommand::class,
@ -39,6 +42,5 @@ return function (App\Event\BuildConsoleCommands $event) {
'queue:process' => Command\MessageQueue\ProcessCommand::class,
'queue:clear' => Command\MessageQueue\ClearCommand::class,
'cache:clear' => Command\ClearCacheCommand::class,
'sync:run' => Command\SyncCommand::class,
]);
};

View File

@ -124,12 +124,32 @@ return function (CallableEventDispatcherInterface $dispatcher) {
}
);
// Other event subscribers from across the application.
$dispatcher->addCallableListener(
Event\GetSyncTasks::class,
App\Sync\TaskLocator::class
$dispatcher->addListener(
App\Event\GetSyncTasks::class,
function (App\Event\GetSyncTasks $e) {
$e->addTasks([
App\Sync\Task\CheckFolderPlaylistsTask::class,
App\Sync\Task\CheckMediaTask::class,
App\Sync\Task\CheckRequestsTask::class,
App\Sync\Task\CheckUpdatesTask::class,
App\Sync\Task\CleanupHistoryTask::class,
App\Sync\Task\CleanupLoginTokensTask::class,
App\Sync\Task\CleanupRelaysTask::class,
App\Sync\Task\CleanupStorageTask::class,
App\Sync\Task\MoveBroadcastsTask::class,
App\Sync\Task\ReactivateStreamerTask::class,
App\Sync\Task\ReloadFrontendAfterSslChangeTask::class,
App\Sync\Task\RotateLogsTask::class,
App\Sync\Task\RunAnalyticsTask::class,
App\Sync\Task\RunAutomatedAssignmentTask::class,
App\Sync\Task\RunBackupTask::class,
App\Sync\Task\UpdateGeoLiteTask::class,
App\Sync\Task\UpdateStorageLocationSizesTask::class,
]);
}
);
// Other event subscribers from across the application.
$dispatcher->addCallableListener(
Event\GetNotifications::class,
App\Notification\Check\ComposeVersionCheck::class
@ -171,8 +191,7 @@ return function (CallableEventDispatcherInterface $dispatcher) {
App\Radio\AutoDJ\Queue::class,
App\Radio\AutoDJ\Annotations::class,
App\Radio\Backend\Liquidsoap\ConfigWriter::class,
App\Sync\Task\NowPlayingTask::class,
App\Controller\Api\NowPlayingAction::class,
App\Sync\NowPlaying\Task\NowPlayingTask::class,
]
);
};

View File

@ -6,19 +6,15 @@ use App\Sync\Task;
use Symfony\Component\Mailer;
return [
Message\AddNewMediaMessage::class => Task\CheckMediaTask::class,
Message\AddNewMediaMessage::class => Task\CheckMediaTask::class,
Message\ReprocessMediaMessage::class => Task\CheckMediaTask::class,
Message\WritePlaylistFileMessage::class => Liquidsoap\ConfigWriter::class,
Message\UpdateNowPlayingMessage::class => Task\NowPlayingTask::class,
Message\BackupMessage::class => Task\RunBackupTask::class,
Message\RunSyncTaskMessage::class => App\Sync\Runner::class,
Message\DispatchWebhookMessage::class => App\Webhook\Dispatcher::class,
Message\TestWebhookMessage::class => App\Webhook\Dispatcher::class,
Message\TestWebhookMessage::class => App\Webhook\Dispatcher::class,
Mailer\Messenger\SendEmailMessage::class => Mailer\Messenger\MessageHandler::class,
];

View File

@ -27,7 +27,7 @@ return static function (RouteCollectorProxy $app) {
)
->setName('admin:debug:clear-queue');
$group->get('/sync/{type}', Controller\Admin\DebugController::class . ':syncAction')
$group->get('/sync/{task}', Controller\Admin\DebugController::class . ':syncAction')
->setName('admin:debug:sync');
$group->get('/log/{path}', Controller\Admin\DebugController::class . ':logAction')
@ -36,6 +36,13 @@ return static function (RouteCollectorProxy $app) {
$group->group(
'/station/{station_id}',
function (RouteCollectorProxy $group) {
$group->map(
['GET', 'POST'],
'/nowplaying',
Controller\Admin\DebugController::class . ':nowplayingAction'
)
->setName('admin:debug:nowplaying');
$group->map(
['GET', 'POST'],
'/nextsong',

View File

@ -43,7 +43,6 @@ class ClearCacheCommand extends CommandAbstract
// Clear cached system settings.
$settings = $this->settingsRepo->readSettings();
$settings->setNowplaying(null);
$settings->updateUpdateLastRun();
$settings->setUpdateResults(null);

View File

@ -0,0 +1,170 @@
<?php
declare(strict_types=1);
namespace App\Console\Command\Sync;
use App\Console\Command\CommandAbstract;
use App\Entity\Repository\SettingsRepository;
use App\Environment;
use App\LockFactory;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Lock\Lock;
use Symfony\Component\Process\Process;
#[AsCommand(
name: 'azuracast:sync:nowplaying',
description: 'Task to run the Now Playing worker task.'
)]
class NowPlayingCommand extends CommandAbstract
{
protected array $processes = [];
public function __construct(
protected EntityManagerInterface $em,
protected SettingsRepository $settingsRepo,
protected LockFactory $lockFactory,
protected LoggerInterface $logger,
protected Environment $environment,
) {
parent::__construct();
}
protected function configure(): void
{
$this->addOption(
'timeout',
't',
InputOption::VALUE_OPTIONAL,
'Amount of time (in seconds) to run the worker process.',
600
);
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$settings = $this->settingsRepo->readSettings();
if ($settings->getSyncDisabled()) {
$io->error('Automated synchronization is temporarily disabled.');
return 1;
}
$timeout = (int)$input->getOption('timeout');
$this->loop($io, $timeout);
return 0;
}
protected function loop(SymfonyStyle $io, int $timeout): void
{
$threshold = time() + $timeout;
while (time() < $threshold || !empty($this->processes)) {
// Check existing processes.
foreach ($this->processes as $processName => $processGroup) {
/** @var Lock $lock */
$lock = $processGroup['lock'];
/** @var Process $process */
$process = $processGroup['process'];
// 10% chance that refresh will be called
if (\random_int(1, 100) <= 10) {
$lock->refresh();
}
if ($process->isRunning()) {
continue;
}
if ($process->isSuccessful()) {
$io->success('Task completed: ' . $processName);
} else {
$io->error('Task failed: ' . $processName);
}
$lock->release();
unset($this->processes[$processName]);
}
// Ensure a process is running for every active station.
if (time() < $threshold - 5) {
$activeStations = $this->em->createQuery(
<<<'DQL'
SELECT s.id, s.short_name, s.nowplaying_timestamp
FROM App\Entity\Station s
WHERE s.is_enabled = 1
DQL
)->getArrayResult();
foreach ($activeStations as $activeStation) {
$shortName = $activeStation['short_name'];
if (!isset($this->processes[$shortName])) {
$npTimestamp = (int)$activeStation['nowplaying_timestamp'];
if (time() > $npTimestamp + \random_int(5, 15)) {
$this->start($shortName, $io);
}
}
}
}
$this->em->clear();
gc_collect_cycles();
\usleep(1500000);
}
}
protected function start(
string $shortName,
SymfonyStyle $io
): void {
$lockName = 'nowplaying_' . $shortName;
$lock = $this->lockFactory->createAndAcquireLock($lockName, 30);
if (false === $lock) {
$this->logger->error(
sprintf('Could not obtain lock for task "%s"; skipping it.', $shortName)
);
return;
}
$process = new Process([
'php',
$this->environment->getBaseDirectory() . '/bin/console',
'azuracast:sync:nowplaying:station',
$shortName,
], $this->environment->getBaseDirectory());
$process->setTimeout(60);
$process->setIdleTimeout(60);
$stdout = [];
$stderr = [];
$io->info('Starting task: ' . $shortName);
$process->run(function ($type, $data) use ($process, $io, &$stdout, &$stderr): void {
if ($process::ERR === $type) {
$io->getErrorStyle()->write($data);
$stderr[] = $data;
} else {
$io->write($data);
$stdout[] = $data;
}
}, getenv());
$this->processes[$shortName] = [
'process' => $process,
'lock' => $lock,
];
}
}

View File

@ -0,0 +1,54 @@
<?php
declare(strict_types=1);
namespace App\Console\Command\Sync;
use App\Console\Command\CommandAbstract;
use App\Entity\Repository\StationRepository;
use App\Entity\Station;
use App\Sync\NowPlaying\Task\BuildQueueTask;
use App\Sync\NowPlaying\Task\NowPlayingTask;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
#[AsCommand(
name: 'azuracast:sync:nowplaying:station',
description: 'Task to run the Now Playing worker task for a specific station.',
)]
class NowPlayingPerStationCommand extends CommandAbstract
{
protected array $processes = [];
public function __construct(
protected StationRepository $stationRepo,
protected BuildQueueTask $buildQueueTask,
protected NowPlayingTask $nowPlayingTask,
) {
parent::__construct();
}
protected function configure(): void
{
$this->addArgument('station', InputArgument::REQUIRED);
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$stationName = $input->getArgument('station');
$station = $this->stationRepo->findByIdentifier($stationName);
if (!($station instanceof Station)) {
$io->error('Station not found.');
return 1;
}
$this->nowPlayingTask->run($station);
$this->buildQueueTask->run($station);
return 0;
}
}

View File

@ -0,0 +1,156 @@
<?php
declare(strict_types=1);
namespace App\Console\Command\Sync;
use App\Console\Command\CommandAbstract;
use App\Entity\Repository\SettingsRepository;
use App\Environment;
use App\Event\GetSyncTasks;
use App\LockFactory;
use Carbon\CarbonImmutable;
use Cron\CronExpression;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Lock\Lock;
use Symfony\Component\Process\Process;
#[AsCommand(
name: 'azuracast:sync:run',
description: 'Task to run the minute\'s synchronized tasks.'
)]
class RunnerCommand extends CommandAbstract
{
protected array $processes = [];
public function __construct(
protected EventDispatcherInterface $dispatcher,
protected LockFactory $lockFactory,
protected Environment $environment,
protected LoggerInterface $logger,
protected SettingsRepository $settingsRepo
) {
parent::__construct();
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$settings = $this->settingsRepo->readSettings();
if ($settings->getSyncDisabled()) {
$io->error('Automated synchronization is temporarily disabled.');
return 1;
}
$syncTasksEvent = new GetSyncTasks();
$this->dispatcher->dispatch($syncTasksEvent);
$now = CarbonImmutable::now(new \DateTimeZone('UTC'));
foreach ($syncTasksEvent->getTasks() as $taskClass) {
$schedulePattern = $taskClass::getSchedulePattern();
$cronExpression = new CronExpression($schedulePattern);
if ($cronExpression->isDue($now)) {
$this->start($taskClass, $io);
}
}
$this->manageStartedEvents($io);
$settings->updateSyncLastRun();
$this->settingsRepo->writeSettings($settings);
return 0;
}
/**
* @param class-string $taskClass
* @param SymfonyStyle $io
*/
protected function start(
string $taskClass,
SymfonyStyle $io
): void {
$taskShortName = (new \ReflectionClass($taskClass))->getShortName();
$lockName = 'sync_task_' . $taskShortName;
$lock = $this->lockFactory->createAndAcquireLock($lockName, 60);
if (false === $lock) {
$this->logger->error(
sprintf('Could not obtain lock for task "%s"; skipping it.', $taskShortName)
);
return;
}
$process = new Process([
'php',
$this->environment->getBaseDirectory() . '/bin/console',
'azuracast:sync:task',
$taskClass,
], $this->environment->getBaseDirectory());
$process->setTimeout(600);
$process->setIdleTimeout(600);
$stdout = [];
$stderr = [];
$io->info('Starting task: ' . $taskShortName);
$process->run(function ($type, $data) use ($process, $io, &$stdout, &$stderr): void {
if ($process::ERR === $type) {
$io->getErrorStyle()->write($data);
$stderr[] = $data;
} else {
$io->write($data);
$stdout[] = $data;
}
}, getenv());
$this->processes[$taskShortName] = [
'process' => $process,
'lock' => $lock,
];
}
protected function manageStartedEvents(SymfonyStyle $io): void
{
while ($this->processes) {
foreach ($this->processes as $processName => $processGroup) {
/** @var Lock $lock */
$lock = $processGroup['lock'];
/** @var Process $process */
$process = $processGroup['process'];
// 10% chance that refresh will be called
$refreshLocks = (\random_int(1, 100) <= 10);
if ($refreshLocks) {
$lock->refresh();
}
if ($process->isRunning()) {
continue;
}
if ($process->isSuccessful()) {
$io->success('Task completed: ' . $processName);
} else {
$io->error('Task failed: ' . $processName);
}
$lock->release();
unset($this->processes[$processName]);
}
}
\usleep(250000);
}
}

View File

@ -0,0 +1,91 @@
<?php
declare(strict_types=1);
namespace App\Console\Command\Sync;
use App\Console\Command\CommandAbstract;
use App\Sync\Task\AbstractTask;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Psr\SimpleCache\CacheInterface;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
#[AsCommand(
name: 'azuracast:sync:task',
description: 'Task to run a specific scheduled task.',
)]
class SingleTaskCommand extends CommandAbstract
{
protected array $processes = [];
public function __construct(
protected ContainerInterface $di,
protected CacheInterface $cache,
protected LoggerInterface $logger,
) {
parent::__construct();
}
protected function configure(): void
{
$this->addArgument('task', InputArgument::REQUIRED);
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$task = $input->getArgument('task');
try {
$this->runTask($task);
} catch (\InvalidArgumentException $e) {
$io->error($e->getMessage());
return 1;
}
return 0;
}
/**
* @param class-string $task
*/
public function runTask(string $task): void
{
if (!$this->di->has($task)) {
throw new \InvalidArgumentException('Task not found.');
}
$taskClass = $this->di->get($task);
if (!($taskClass instanceof AbstractTask)) {
throw new \InvalidArgumentException('Specified class is not a synchronized task.');
}
$cacheKey = self::getCacheKey($task);
$startTime = microtime(true);
$this->logger->debug('Starting sync task: ' . $cacheKey);
$taskClass->run();
$this->logger->debug('Sync task completed: ' . $cacheKey, [
'time' => microtime(true) - $startTime,
]);
$this->cache->set($cacheKey, time(), 86400);
}
/**
* @param class-string $taskClass
* @return string
*/
public static function getCacheKey(string $taskClass): string
{
$taskShortName = (new \ReflectionClass($taskClass))->getShortName();
return 'sync_last_run.' . $taskShortName;
}
}

View File

@ -1,42 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Console\Command;
use App;
use App\Sync\Runner;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
#[AsCommand(
name: 'azuracast:sync:run',
description: 'Run one or more scheduled synchronization tasks.',
aliases: ['sync:run']
)]
class SyncCommand extends CommandAbstract
{
public function __construct(
protected Runner $sync,
) {
parent::__construct();
}
protected function configure(): void
{
$this->addArgument('task', InputArgument::OPTIONAL)
->addOption('force', null, InputOption::VALUE_NONE);
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$task = $input->getArgument('task') ?? App\Event\GetSyncTasks::SYNC_NOWPLAYING;
$force = (bool)$input->getOption('force');
$this->sync->runSyncTask($task, $force);
return 0;
}
}

View File

@ -5,22 +5,26 @@ declare(strict_types=1);
namespace App\Controller\Admin;
use App\Console\Application;
use App\Console\Command\Sync\SingleTaskCommand;
use App\Controller\AbstractLogViewerController;
use App\Entity;
use App\Event\GetSyncTasks;
use App\Http\Response;
use App\Http\ServerRequest;
use App\Message\RunSyncTaskMessage;
use App\MessageQueue\AbstractQueueManager;
use App\MessageQueue\QueueManagerInterface;
use App\Radio\AutoDJ;
use App\Radio\Backend\Liquidsoap;
use App\Session\Flash;
use App\Sync\Runner;
use App\Utilities\File;
use App\Sync\NowPlaying\Task\NowPlayingTask;
use Carbon\CarbonImmutable;
use Cron\CronExpression;
use Doctrine\ORM\EntityManagerInterface;
use Monolog\Handler\TestHandler;
use Monolog\Logger;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\SimpleCache\CacheInterface;
use Symfony\Component\Messenger\MessageBus;
class DebugController extends AbstractLogViewerController
@ -39,8 +43,9 @@ class DebugController extends AbstractLogViewerController
ServerRequest $request,
Response $response,
Entity\Repository\StationRepository $stationRepo,
Runner $sync,
QueueManagerInterface $queueManager
QueueManagerInterface $queueManager,
EventDispatcherInterface $dispatcher,
CacheInterface $cache,
): ResponseInterface {
$queues = AbstractQueueManager::getAllQueues();
@ -49,48 +54,96 @@ class DebugController extends AbstractLogViewerController
$queueTotals[$queue] = $queueManager->getQueueCount($queue);
}
$syncTimes = [];
$now = CarbonImmutable::now(new \DateTimeZone('UTC'));
$syncTasksEvent = new GetSyncTasks();
$dispatcher->dispatch($syncTasksEvent);
foreach ($syncTasksEvent->getTasks() as $task) {
$cacheKey = SingleTaskCommand::getCacheKey($task);
$pattern = $task::getSchedulePattern();
$cronExpression = new CronExpression($pattern);
$syncTimes[$task] = [
'pattern' => $pattern,
'time' => $cache->get($cacheKey, 0),
'nextRun' => $cronExpression->getNextRunDate($now)->getTimestamp(),
];
}
return $request->getView()->renderToResponse(
$response,
'admin/debug/index',
[
'sync_times' => $sync->getSyncTimes(),
'queue_totals' => $queueTotals,
'stations' => $stationRepo->fetchArray(),
'sync_times' => $syncTimes,
'stations' => $stationRepo->fetchArray(),
]
);
}
/**
* @param ServerRequest $request
* @param Response $response
* @param SingleTaskCommand $taskCommand
* @param class-string|string $task
* @return ResponseInterface
*/
public function syncAction(
ServerRequest $request,
Response $response,
string $type
SingleTaskCommand $taskCommand,
EventDispatcherInterface $eventDispatcher,
string $task
): ResponseInterface {
$tempFile = File::generateTempPath('sync_task_' . $type . '.log');
$this->logger->pushHandler($this->testHandler);
$message = new RunSyncTaskMessage();
$message->type = $type;
$message->outputPath = $tempFile;
if ('all' === $task) {
$syncTasksEvent = new GetSyncTasks();
$eventDispatcher->dispatch($syncTasksEvent);
foreach ($syncTasksEvent->getTasks() as $taskClass) {
$taskCommand->runTask($taskClass);
}
} else {
/** @var class-string $task */
$taskCommand->runTask($task);
}
$this->messageBus->dispatch($message);
$this->logger->popHandler();
return $request->getView()->renderToResponse(
$response,
'admin/debug/sync',
'system/log_view',
[
'title' => __('Run Synchronized Task'),
'outputLog' => basename($tempFile),
'sidebar' => null,
'title' => __('Debug Output'),
'log_records' => $this->testHandler->getRecords(),
]
);
}
public function logAction(
public function nowplayingAction(
ServerRequest $request,
Response $response,
string $path
NowPlayingTask $nowPlayingTask
): ResponseInterface {
$logPath = File::validateTempPath($path);
$this->logger->pushHandler($this->testHandler);
return $this->streamLogToResponse($request, $response, $logPath, true);
$station = $request->getStation();
$nowPlayingTask->run($station);
$this->logger->popHandler();
return $request->getView()->renderToResponse(
$response,
'system/log_view',
[
'sidebar' => null,
'title' => __('Debug Output'),
'log_records' => $this->testHandler->getRecords(),
]
);
}
public function nextsongAction(

View File

@ -4,12 +4,10 @@ declare(strict_types=1);
namespace App\Controller\Admin;
use App\Acl;
use App\Environment;
use App\Http\Response;
use App\Http\ServerRequest;
use App\Radio\Quota;
use App\Sync\Runner;
use Brick\Math\BigInteger;
use Psr\Http\Message\ResponseInterface;
@ -18,7 +16,6 @@ class IndexAction
public function __invoke(
ServerRequest $request,
Response $response,
Runner $sync,
Environment $environment
): ResponseInterface {
$view = $request->getView();
@ -26,17 +23,6 @@ class IndexAction
// Remove the sidebar on the homepage.
$view->addData(['sidebar' => null]);
// Synchronization statuses
$acl = $request->getAcl();
if ($acl->isAllowed(Acl::GLOBAL_ALL)) {
$view->addData(
[
'sync_times' => $sync->getSyncTimes(),
]
);
}
$stationsBaseDir = $environment->getStationDirectory();
$spaceTotalFloat = disk_total_space($stationsBaseDir);

View File

@ -5,158 +5,158 @@ declare(strict_types=1);
namespace App\Controller\Api;
use App\Entity;
use App\Event\Radio\LoadNowPlaying;
use App\Http\Response;
use App\Http\RouterInterface;
use App\Http\ServerRequest;
use App\OpenApi;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\ORM\NoResultException;
use OpenApi\Attributes as OA;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\SimpleCache\CacheInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
class NowPlayingAction implements EventSubscriberInterface
#[
OA\Get(
path: '/nowplaying',
description: "Returns a full summary of all stations' current state.",
tags: ['Now Playing'],
parameters: [],
responses: [
new OA\Response(
response: 200,
description: 'Success',
content: new OA\JsonContent(
type: 'array',
items: new OA\Items(ref: '#/components/schemas/Api_NowPlaying')
)
),
]
),
OA\Get(
path: '/nowplaying/{station_id}',
description: "Returns a full summary of the specified station's current state.",
tags: ['Now Playing'],
parameters: [
new OA\Parameter(ref: OpenApi::STATION_ID_REQUIRED),
],
responses: [
new OA\Response(
response: 200,
description: 'Success',
content: new OA\JsonContent(ref: '#/components/schemas/Api_NowPlaying')
),
new OA\Response(
response: 404,
description: 'Station not found'
),
]
)
]
class NowPlayingAction
{
public function __construct(
protected EntityManagerInterface $em,
protected Entity\Repository\SettingsRepository $settingsRepo,
protected CacheInterface $cache,
protected EventDispatcherInterface $dispatcher
protected Entity\Repository\StationRepository $stationRepo,
protected CacheInterface $cache
) {
}
/**
* Returns an array of event names this subscriber wants to listen to.
*
* The array keys are event names and the value can be:
*
* * The method name to call (priority defaults to 0)
* * An array composed of the method name to call and the priority
* * An array of arrays composed of the method names to call and respective
* priorities, or 0 if unset
*
* For instance:
*
* * array('eventName' => 'methodName')
* * array('eventName' => array('methodName', $priority))
* * array('eventName' => array(array('methodName1', $priority), array('methodName2')))
*
* @return mixed[] The event names to listen to
*/
public static function getSubscribedEvents(): array
{
return [
LoadNowPlaying::class => [
['loadFromCache', 5],
['loadFromSettings', 0],
['loadFromStations', -5],
],
];
}
/**
* @param ServerRequest $request
* @param Response $response
* @param int|string|null $station_id
*/
#[
OA\Get(
path: '/nowplaying',
description: "Returns a full summary of all stations' current state.",
tags: ['Now Playing'],
parameters: [],
responses: [
new OA\Response(
response: 200,
description: 'Success',
content: new OA\JsonContent(
type: 'array',
items: new OA\Items(ref: '#/components/schemas/Api_NowPlaying')
)
),
]
),
OA\Get(
path: '/nowplaying/{station_id}',
description: "Returns a full summary of the specified station's current state.",
tags: ['Now Playing'],
parameters: [
new OA\Parameter(ref: OpenApi::STATION_ID_REQUIRED),
],
responses: [
new OA\Response(
response: 200,
description: 'Success',
content: new OA\JsonContent(ref: '#/components/schemas/Api_NowPlaying')
),
new OA\Response(
response: 404,
description: 'Station not found'
),
]
)
]
public function __invoke(ServerRequest $request, Response $response, $station_id = null): ResponseInterface
{
$router = $request->getRouter();
// Pull NP data from the fastest/first available source using the EventDispatcher.
$event = new LoadNowPlaying();
$this->dispatcher->dispatch($event);
if (!$event->hasNowPlaying()) {
return $response->withStatus(408)
->withJson(new Entity\Api\Error(408, 'Now Playing data has not loaded yet. Please try again later.'));
}
if (!empty($station_id)) {
$npStation = $event->getForStation($station_id);
if (null !== $npStation) {
$npStation->resolveUrls($router->getBaseUrl());
return $response->withJson($npStation);
$np = $this->getForStation($station_id, $router);
if (null !== $np) {
return $response->withJson($np);
}
return $response->withStatus(404)
->withJson(Entity\Api\Error::notFound());
}
// If unauthenticated, hide non-public stations from full view.
$np = ($request->getAttribute('user') === null)
? $event->getAllPublic()
: $event->getNowPlaying();
return $response->withJson(
$this->getForAllStations(
$router,
$request->getAttribute('user') === null // If unauthenticated, hide non-public stations from full view.
)
);
}
foreach ($np as $npRow) {
$npRow->resolveUrls($router->getBaseUrl());
protected function getForStation(
string|int $station,
RouterInterface $router
): ?Entity\Api\NowPlaying\NowPlaying {
// Check cache first.
$np = $this->cache->get('nowplaying.' . $station);
if (!($np instanceof Entity\Api\NowPlaying\NowPlaying)) {
// Pull from DB if possible.
if (is_numeric($station)) {
$dql = <<<'DQL'
SELECT s.nowplaying FROM App\Entity\Station s
WHERE s.id = :id
DQL;
} else {
$dql = <<<'DQL'
SELECT s.nowplaying FROM App\Entity\Station s
WHERE s.short_name = :id
DQL;
}
try {
$npResult = $this->em->createQuery($dql)
->setParameter('id', $station)
->setMaxResults(1)
->getSingleResult();
$np = $npResult['nowplaying'] ?? null;
} catch (NoResultException $e) {
return null;
}
}
return $response->withJson($np);
if ($np instanceof Entity\Api\NowPlaying\NowPlaying) {
$np->resolveUrls($router->getBaseUrl());
return $np;
}
return null;
}
public function loadFromCache(LoadNowPlaying $event): void
{
$event->setNowPlaying((array)$this->cache->get('nowplaying'), 'redis');
}
public function loadFromSettings(LoadNowPlaying $event): void
{
$settings = $this->settingsRepo->readSettings();
$event->setNowPlaying((array)$settings->getNowplaying(), 'settings');
}
public function loadFromStations(LoadNowPlaying $event): void
{
$nowplaying_db = $this->em->createQuery(
<<<'DQL'
SELECT s.nowplaying FROM App\Entity\Station s WHERE s.is_enabled = 1
DQL
)->getArrayResult();
protected function getForAllStations(
RouterInterface $router,
bool $publicOnly = false,
): array {
if ($publicOnly) {
$dql = <<<'DQL'
SELECT s.nowplaying FROM App\Entity\Station s
WHERE s.is_enabled = 1 AND s.enable_public_page = 1
DQL;
} else {
$dql = <<<'DQL'
SELECT s.nowplaying FROM App\Entity\Station s
WHERE s.is_enabled = 1
DQL;
}
$np = [];
foreach ($nowplaying_db as $np_row) {
$np[] = $np_row['nowplaying'];
$baseUrl = $router->getBaseUrl();
foreach ($this->em->createQuery($dql)->getArrayResult() as $row) {
$npRow = $row['nowplaying'];
if ($npRow instanceof Entity\Api\NowPlaying\NowPlaying) {
$npRow->resolveUrls($baseUrl);
$np[] = $npRow;
}
}
$event->setNowPlaying($np, 'station');
return $np;
}
}

View File

@ -5,7 +5,6 @@ declare(strict_types=1);
namespace App\Entity\ApiGenerator;
use App\Entity;
use App\Event\Radio\LoadNowPlaying;
use GuzzleHttp\Psr7\Uri;
use NowPlaying\Result\CurrentSong;
use NowPlaying\Result\Result;
@ -126,10 +125,7 @@ class NowPlayingApiGenerator
Entity\Station $station,
?UriInterface $baseUri = null
): Entity\Api\NowPlaying\NowPlaying {
$event = new LoadNowPlaying();
$this->eventDispatcher->dispatch($event);
$np = $event->getForStation($station);
$np = $station->getNowplaying();
if (null !== $np) {
return $np;
}

View File

@ -0,0 +1,26 @@
<?php
declare(strict_types=1);
namespace App\Entity\Migration;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\Migrations\AbstractMigration;
final class Version20220102033308 extends AbstractMigration
{
public function getDescription(): string
{
return 'New Settings columns for new sync process.';
}
public function up(Schema $schema): void
{
$this->addSql('ALTER TABLE settings ADD sync_disabled TINYINT(1) NOT NULL, ADD sync_last_run INT NOT NULL, DROP nowplaying, DROP sync_nowplaying_last_run, DROP sync_short_last_run, DROP sync_medium_last_run, DROP sync_long_last_run');
}
public function down(Schema $schema): void
{
$this->addSql('ALTER TABLE settings ADD nowplaying LONGTEXT CHARACTER SET utf8mb4 DEFAULT NULL COLLATE `utf8mb4_general_ci` COMMENT \'(DC2Type:json)\', ADD sync_short_last_run INT NOT NULL, ADD sync_medium_last_run INT NOT NULL, ADD sync_long_last_run INT NOT NULL, DROP sync_disabled, CHANGE sync_last_run sync_nowplaying_last_run INT NOT NULL');
}
}

View File

@ -7,7 +7,6 @@ namespace App\Entity;
use App\Customization;
use App\Doctrine\Generator\UuidV6Generator;
use App\Entity;
use App\Event\GetSyncTasks;
use App\OpenApi;
use App\Service\Avatar;
use App\Utilities\Urls;
@ -675,140 +674,41 @@ class Settings implements Stringable
$this->setSetupCompleteTime(time());
}
/**
* @var mixed[]|null
*/
#[
OA\Property(description: "The current cached now playing data.", example: ""),
ORM\Column(type: 'json', nullable: true),
OA\Property(description: "Temporarily disable all sync tasks.", example: "false"),
ORM\Column,
Attributes\AuditIgnore
]
protected ?array $nowplaying = null;
protected bool $sync_disabled = false;
/**
* @return mixed[]|null
*/
public function getNowplaying(): ?array
public function getSyncDisabled(): bool
{
return $this->nowplaying;
return $this->sync_disabled;
}
public function setNowplaying(?array $nowplaying): void
public function setSyncDisabled(bool $sync_disabled): void
{
$this->nowplaying = $nowplaying;
$this->sync_disabled = $sync_disabled;
}
#[
OA\Property(
description: "The UNIX timestamp when the now playing sync task was last run.",
description: "The last run timestamp for the unified sync task.",
example: OpenApi::SAMPLE_TIMESTAMP
),
ORM\Column,
Attributes\AuditIgnore
]
protected int $sync_nowplaying_last_run = 0;
protected int $sync_last_run = 0;
public function getSyncNowplayingLastRun(): int
public function updateSyncLastRun(): void
{
return $this->sync_nowplaying_last_run;
$this->sync_last_run = time();
}
public function setSyncNowplayingLastRun(int $syncNowplayingLastRun): void
public function getSyncLastRun(): int
{
$this->sync_nowplaying_last_run = $syncNowplayingLastRun;
}
#[
OA\Property(
description: "The UNIX timestamp when the 60-second 'short' sync task was last run.",
example: OpenApi::SAMPLE_TIMESTAMP
),
ORM\Column,
Attributes\AuditIgnore
]
protected int $sync_short_last_run = 0;
public function getSyncShortLastRun(): int
{
return $this->sync_short_last_run;
}
public function setSyncShortLastRun(int $syncShortLastRun): void
{
$this->sync_short_last_run = $syncShortLastRun;
}
#[
OA\Property(
description: "The UNIX timestamp when the 5-minute 'medium' sync task was last run.",
example: OpenApi::SAMPLE_TIMESTAMP
),
ORM\Column,
Attributes\AuditIgnore
]
protected int $sync_medium_last_run = 0;
public function getSyncMediumLastRun(): int
{
return $this->sync_medium_last_run;
}
public function setSyncMediumLastRun(int $syncMediumLastRun): void
{
$this->sync_medium_last_run = $syncMediumLastRun;
}
#[
OA\Property(
description: "The UNIX timestamp when the 1-hour 'long' sync task was last run.",
example: OpenApi::SAMPLE_TIMESTAMP
),
ORM\Column,
Attributes\AuditIgnore
]
protected int $sync_long_last_run = 0;
public function getSyncLongLastRun(): int
{
return $this->sync_long_last_run;
}
public function setSyncLongLastRun(int $syncLongLastRun): void
{
$this->sync_long_last_run = $syncLongLastRun;
}
public function getSyncLastRunTime(string $type): int
{
$timesByType = [
GetSyncTasks::SYNC_NOWPLAYING => $this->sync_nowplaying_last_run,
GetSyncTasks::SYNC_SHORT => $this->sync_short_last_run,
GetSyncTasks::SYNC_MEDIUM => $this->sync_medium_last_run,
GetSyncTasks::SYNC_LONG => $this->sync_long_last_run,
];
return $timesByType[$type] ?? 0;
}
public function updateSyncLastRunTime(string $type): void
{
switch ($type) {
case GetSyncTasks::SYNC_NOWPLAYING:
$this->sync_nowplaying_last_run = time();
break;
case GetSyncTasks::SYNC_SHORT:
$this->sync_short_last_run = time();
break;
case GetSyncTasks::SYNC_MEDIUM:
$this->sync_medium_last_run = time();
break;
case GetSyncTasks::SYNC_LONG:
$this->sync_long_last_run = time();
break;
}
return $this->sync_last_run;
}
#[

View File

@ -9,45 +9,28 @@ use Generator;
class GetSyncTasks
{
public const SYNC_NOWPLAYING = 'nowplaying';
public const SYNC_SHORT = 'short';
public const SYNC_MEDIUM = 'medium';
public const SYNC_LONG = 'long';
protected array $tasks = [];
public function __construct(
protected string $type
) {
}
public function getType(): string
{
return $this->type;
}
/**
* @return Generator|AbstractTask[]
* @return Generator|class-string<AbstractTask>[]
*/
public function getTasks(): Generator
{
yield from $this->tasks;
}
public function addTask(AbstractTask $task, ?string $key = null): void
/**
* @param class-string<AbstractTask> $className
*/
public function addTask(string $className): void
{
if (null === $key) {
$taskClassParts = explode("\\", get_class($task));
$key = array_pop($taskClassParts);
}
$this->tasks[$key] = $task;
$this->tasks[] = $className;
}
public function removeTask(string $key): void
public function addTasks(array $classNames): void
{
if (isset($this->tasks[$key])) {
unset($this->tasks[$key]);
foreach ($classNames as $className) {
$this->addTask($className);
}
}
}

View File

@ -1,89 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Event\Radio;
use App\Entity\Api\NowPlaying\NowPlaying;
use App\Entity\Station;
use Symfony\Contracts\EventDispatcher\Event;
class LoadNowPlaying extends Event
{
/** @var NowPlaying[] */
protected array $np = [];
/**
* @param array $np_raw
* @param string|null $source
*/
public function setNowPlaying(array $np_raw, ?string $source = null): void
{
$np = array_filter(
$np_raw,
static function ($np_row) {
return $np_row instanceof NowPlaying;
}
);
if (0 !== count($np)) {
if ($source !== null) {
foreach ($np as $np_row) {
/** @var NowPlaying $np_row */
$np_row->cache = $source;
$np_row->update();
}
}
$this->np = $np;
$this->stopPropagation();
}
}
public function hasNowPlaying(): bool
{
return (0 !== count($this->np));
}
/**
* @return NowPlaying[]
*/
public function getNowPlaying(): array
{
return $this->np;
}
/**
* @return NowPlaying[]
*/
public function getAllPublic(): array
{
return array_values(
array_filter(
$this->np,
static function (NowPlaying $np_row) {
return $np_row->station->is_public;
}
)
);
}
public function getForStation(
int|string|Station|null $stationId
): ?NowPlaying {
if ($stationId instanceof Station) {
$stationId = $stationId->getId();
}
if (null === $stationId) {
return null;
}
foreach ($this->np as $npRow) {
if ($npRow->station->id === (int)$stationId || $npRow->station->shortcode === $stationId) {
return $npRow;
}
}
return null;
}
}

View File

@ -1,19 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Message;
use App\MessageQueue\QueueManagerInterface;
class BuildStationQueue extends AbstractMessage
{
public int $station_id;
public bool $force = false;
public function getQueue(): string
{
return QueueManagerInterface::QUEUE_HIGH_PRIORITY;
}
}

View File

@ -1,31 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Message;
use App\Environment;
use App\MessageQueue\QueueManagerInterface;
class RunSyncTaskMessage extends AbstractUniqueMessage
{
public string $type;
/** @var string|null The path to log output of the Backup command to. */
public ?string $outputPath = null;
public function getIdentifier(): string
{
return 'SyncTask_' . $this->type;
}
public function getTtl(): ?float
{
return Environment::getInstance()->getSyncLongExecutionTime();
}
public function getQueue(): string
{
return QueueManagerInterface::QUEUE_HIGH_PRIORITY;
}
}

View File

@ -1,17 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Message;
use App\MessageQueue\QueueManagerInterface;
class UpdateNowPlayingMessage extends AbstractMessage
{
public int $station_id;
public function getQueue(): string
{
return QueueManagerInterface::QUEUE_HIGH_PRIORITY;
}
}

View File

@ -8,12 +8,10 @@ use App\Acl;
use App\Entity;
use App\Event\GetNotifications;
use App\Session\Flash;
use App\Sync\Runner;
class SyncTaskCheck
{
public function __construct(
protected Runner $syncRunner,
protected Entity\Repository\SettingsRepository $settingsRepo
) {
}
@ -27,35 +25,44 @@ class SyncTaskCheck
return;
}
$setupComplete = $this->settingsRepo->readSettings()->getSetupCompleteTime();
foreach ($this->syncRunner->getSyncTimes() as $taskKey => $task) {
$interval = $task['interval'];
$diff = $task['diff'];
$settings = $this->settingsRepo->readSettings();
// Don't show notification if this installation is freshly installed.
$threshold = time() - ($interval * 5);
if ($setupComplete >= $threshold) {
continue;
}
$setupComplete = $settings->getSetupCompleteTime();
if ($setupComplete > (time() - 60 * 60 * 2)) {
return;
}
if ($diff > ($interval * 5)) {
// phpcs:disable Generic.Files.LineLength
$notification = new Entity\Api\Notification();
$notification->title = __('Synchronized Task Not Recently Run');
$notification->body = __(
'The "%s" synchronization task has not run recently. This may indicate an error with your installation.',
$task['name']
);
$notification->type = Flash::ERROR;
if ($settings->getSyncDisabled()) {
// phpcs:disable Generic.Files.LineLength
$notification = new Entity\Api\Notification();
$notification->title = __('Synchronization Disabled');
$notification->body = __(
'Routine synchronization is currently disabled. Make sure to re-enable it to resume routine maintenance tasks.'
);
$notification->type = Flash::ERROR;
// phpcs:enable
$router = $request->getRouter();
$event->addNotification($notification);
return;
}
$notification->actionLabel = __('Manually Run Task');
$notification->actionUrl = (string)$router->named('admin:debug:sync', ['type' => $taskKey]);
// phpcs:enable
$syncLastRun = $settings->getSyncLastRun();
if ($syncLastRun < (time() - 60 * 5)) {
// phpcs:disable Generic.Files.LineLength
$notification = new Entity\Api\Notification();
$notification->title = __('Synchronization Not Recently Run');
$notification->body = __(
'The routine synchronization task has not run recently. This may indicate an error with your installation.'
);
$notification->type = Flash::ERROR;
$event->addNotification($notification);
}
$router = $request->getRouter();
$notification->actionLabel = __('System Debugger');
$notification->actionUrl = (string)$router->named('admin:debug:index');
// phpcs:enable
$event->addNotification($notification);
}
}
}

View File

@ -7,14 +7,12 @@ namespace App\Radio\AutoDJ;
use App\Entity;
use App\Event\Radio\AnnotateNextSong;
use App\Flysystem\StationFilesystems;
use App\Message\BuildStationQueue;
use App\Radio\Adapters;
use Doctrine\ORM\EntityManagerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\LogLevel;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\MessageBus;
class Annotations implements EventSubscriberInterface
{
@ -25,7 +23,6 @@ class Annotations implements EventSubscriberInterface
protected Adapters $adapters,
protected LoggerInterface $logger,
protected EventDispatcherInterface $eventDispatcher,
protected MessageBus $messageBus
) {
}
@ -67,11 +64,6 @@ class Annotations implements EventSubscriberInterface
],
]
);
$message = new BuildStationQueue();
$message->station_id = $station->getIdRequired();
$this->messageBus->dispatch($message);
return '';
}

View File

@ -7,7 +7,6 @@ namespace App\Radio\AutoDJ;
use App\Entity;
use App\Event\Radio\BuildQueue;
use App\LockFactory;
use App\Message;
use App\Radio\PlaylistParser;
use Carbon\CarbonImmutable;
use Carbon\CarbonInterface;
@ -59,25 +58,6 @@ class Queue implements EventSubscriberInterface
];
}
/**
* Handle event dispatch.
*
* @param Message\AbstractMessage $message
*/
public function __invoke(Message\AbstractMessage $message): void
{
if ($message instanceof Message\BuildStationQueue) {
$station = $this->em->find(Entity\Station::class, $message->station_id);
if ($station instanceof Entity\Station) {
$this->buildQueue(
station: $station,
force: $message->force
);
}
}
}
public function buildQueue(
Entity\Station $station,
bool $force = false

View File

@ -0,0 +1,25 @@
<?php
declare(strict_types=1);
namespace App\Sync\NowPlaying\Task;
use App\Entity\Station;
use App\Radio\AutoDJ;
class BuildQueueTask implements NowPlayingTaskInterface
{
public function __construct(
protected AutoDJ\Queue $queue
) {
}
public function run(Station $station, bool $force = false): void
{
if ($station->useManualAutoDJ()) {
return;
}
$this->queue->buildQueue($station, $force);
}
}

View File

@ -0,0 +1,214 @@
<?php
declare(strict_types=1);
namespace App\Sync\NowPlaying\Task;
use App\Doctrine\ReloadableEntityManagerInterface;
use App\Entity;
use App\Entity\Api\NowPlaying\NowPlaying;
use App\Entity\Station;
use App\Environment;
use App\Event\Radio\GenerateRawNowPlaying;
use App\Http\RouterInterface;
use App\Message;
use App\Radio\Adapters;
use DeepCopy\DeepCopy;
use Exception;
use Monolog\Logger;
use NowPlaying\Result\Result;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\SimpleCache\CacheInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\MessageBus;
class NowPlayingTask implements NowPlayingTaskInterface, EventSubscriberInterface
{
public function __construct(
protected Adapters $adapters,
protected CacheInterface $cache,
protected EventDispatcherInterface $eventDispatcher,
protected MessageBus $messageBus,
protected RouterInterface $router,
protected Entity\Repository\ListenerRepository $listenerRepo,
protected Entity\Repository\SettingsRepository $settingsRepo,
protected Entity\ApiGenerator\NowPlayingApiGenerator $nowPlayingApiGenerator,
protected ReloadableEntityManagerInterface $em,
protected Logger $logger,
) {
}
/**
* @inheritDoc
*/
public static function getSubscribedEvents(): array
{
if (Environment::getInstance()->isTesting()) {
return [];
}
return [
GenerateRawNowPlaying::class => [
['loadRawFromFrontend', 10],
['addToRawFromRemotes', 0],
],
];
}
public function run(Station $station, bool $force = false): void
{
if (!$station->getIsEnabled()) {
return;
}
$this->logger->pushProcessor(
function ($record) use ($station) {
$record['extra']['station'] = [
'id' => $station->getId(),
'name' => $station->getName(),
];
return $record;
}
);
$settings = $this->settingsRepo->readSettings();
$include_clients = (Entity\Analytics::LEVEL_NONE !== $settings->getAnalytics());
$frontend_adapter = $this->adapters->getFrontendAdapter($station);
$remote_adapters = $this->adapters->getRemoteAdapters($station);
// Build the new "raw" NowPlaying data.
try {
$event = new GenerateRawNowPlaying(
$station,
$frontend_adapter,
$remote_adapters,
$include_clients
);
$this->eventDispatcher->dispatch($event);
$npResult = $event->getResult();
} catch (Exception $e) {
$this->logger->log(
Logger::ERROR,
$e->getMessage(),
[
'file' => $e->getFile(),
'line' => $e->getLine(),
'code' => $e->getCode(),
]
);
$npResult = Result::blank();
}
$this->logger->debug(
'Final NowPlaying Response for Station',
[
'id' => $station->getId(),
'name' => $station->getName(),
'np' => $npResult,
]
);
// Update detailed listener statistics, if they exist for the station
if ($include_clients && null !== $npResult->clients) {
$this->listenerRepo->update($station, $npResult->clients);
}
$np = ($this->nowPlayingApiGenerator)($station, $npResult);
// Trigger the dispatching of webhooks.
$this->dispatchWebhooks($station, $np);
// Update caches
$this->cache->set('nowplaying.' . $station->getIdRequired(), $np, 120);
$this->cache->set('nowplaying.' . $station->getShortName(), $np, 120);
$station->setNowplaying($np);
$this->em->persist($station);
$this->em->flush();
$this->logger->popProcessor();
}
public function loadRawFromFrontend(GenerateRawNowPlaying $event): void
{
try {
$result = $event
->getFrontend()
->getNowPlaying($event->getStation(), $event->includeClients());
} catch (Exception $e) {
$this->logger->error(sprintf('NowPlaying adapter error: %s', $e->getMessage()));
return;
}
$event->setResult($result);
}
public function addToRawFromRemotes(GenerateRawNowPlaying $event): void
{
$result = $event->getResult();
// Loop through all remotes and update NP data accordingly.
foreach ($event->getRemotes() as $ra_proxy) {
try {
$result = $ra_proxy->getAdapter()->updateNowPlaying(
$result,
$ra_proxy->getRemote(),
$event->includeClients()
);
} catch (Exception $e) {
$this->logger->error(sprintf('NowPlaying adapter error: %s', $e->getMessage()));
}
}
$event->setResult($result);
}
protected function dispatchWebhooks(
Entity\Station $station,
NowPlaying $npOriginal
): void {
/** @var NowPlaying $np */
$np = (new DeepCopy())->copy($npOriginal);
$np->resolveUrls($this->router->getBaseUrl());
$np->cache = 'event';
$npOld = $station->getNowplaying();
$triggers = [
Entity\StationWebhook::TRIGGER_ALL,
];
if ($npOld instanceof NowPlaying) {
if ($npOld->now_playing?->song?->id !== $np->now_playing?->song?->id) {
$triggers[] = Entity\StationWebhook::TRIGGER_SONG_CHANGED;
}
if ($npOld->listeners->current > $np->listeners->current) {
$triggers[] = Entity\StationWebhook::TRIGGER_LISTENER_LOST;
} elseif ($npOld->listeners->current < $np->listeners->current) {
$triggers[] = Entity\StationWebhook::TRIGGER_LISTENER_GAINED;
}
if (!$npOld->live->is_live && $np->live->is_live) {
$triggers[] = Entity\StationWebhook::TRIGGER_LIVE_CONNECT;
} elseif ($npOld->live->is_live && !$np->live->is_live) {
$triggers[] = Entity\StationWebhook::TRIGGER_LIVE_DISCONNECT;
}
if ($npOld->is_online && !$np->is_online) {
$triggers[] = Entity\StationWebhook::TRIGGER_STATION_OFFLINE;
} elseif (!$npOld->is_online && $np->is_online) {
$triggers[] = Entity\StationWebhook::TRIGGER_STATION_ONLINE;
}
}
$message = new Message\DispatchWebhookMessage();
$message->station_id = (int)$station->getId();
$message->np = $np;
$message->triggers = $triggers;
$this->messageBus->dispatch($message);
}
}

View File

@ -0,0 +1,12 @@
<?php
declare(strict_types=1);
namespace App\Sync\NowPlaying\Task;
use App\Entity\Station;
interface NowPlayingTaskInterface
{
public function run(Station $station, bool $force = false): void;
}

View File

@ -1,198 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Sync;
use App\Entity\Repository\SettingsRepository;
use App\Environment;
use App\Event\GetSyncTasks;
use App\LockFactory;
use App\Message;
use Doctrine\ORM\EntityManagerInterface;
use Exception;
use InvalidArgumentException;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\Log\LogLevel;
/**
* The runner of scheduled synchronization tasks.
*/
class Runner
{
public function __construct(
protected SettingsRepository $settingsRepo,
protected Environment $environment,
protected Logger $logger,
protected LockFactory $lockFactory,
protected EventDispatcherInterface $eventDispatcher,
protected EntityManagerInterface $em
) {
}
public function __invoke(Message\AbstractMessage $message): void
{
if ($message instanceof Message\RunSyncTaskMessage) {
$outputPath = $message->outputPath;
if (null !== $outputPath) {
$logHandler = new StreamHandler($outputPath, LogLevel::DEBUG, true);
$this->logger->pushHandler($logHandler);
}
$this->runSyncTask($message->type, true);
if (null !== $outputPath) {
$this->logger->popHandler();
}
}
}
public function runSyncTask(string $type, bool $force = false): void
{
// Immediately halt if setup is not complete.
$settings = $this->settingsRepo->readSettings();
if (!$settings->isSetupComplete()) {
$this->logger->notice(
sprintf('Skipping sync task %s; setup not complete.', $type)
);
return;
}
$allSyncInfo = $this->getSyncTimes();
if (!isset($allSyncInfo[$type])) {
throw new InvalidArgumentException(sprintf('Invalid sync task: %s', $type));
}
$syncInfo = $allSyncInfo[$type];
set_time_limit($syncInfo['timeout']);
$this->logger->notice(
sprintf('Running sync task: %s', $syncInfo['name']),
[
'force' => $force,
]
);
$lock = $this->lockFactory->createAndAcquireLock(
resource: 'sync_' . $type,
ttl: $syncInfo['timeout'],
force: $force
);
if (false === $lock) {
return;
}
try {
$event = new GetSyncTasks($type);
$this->eventDispatcher->dispatch($event);
foreach ($event->getTasks() as $taskClass => $task) {
try {
$lock->refresh($syncInfo['timeout']);
} catch (Exception) {
// Noop
}
$this->logger->debug(
sprintf(
'Starting sub-task: %s',
$taskClass
)
);
$start_time = microtime(true);
$task->run($force);
$end_time = microtime(true);
$time_diff = $end_time - $start_time;
$this->logger->debug(
sprintf(
'Timer "%s" completed in %01.3f second(s).',
$taskClass,
round($time_diff, 3)
)
);
unset($task);
$this->em->clear();
gc_collect_cycles();
}
$settings = $this->settingsRepo->readSettings();
$settings->updateSyncLastRunTime($type);
$this->settingsRepo->writeSettings($settings);
} finally {
$lock->release();
}
$this->logger->debug(
sprintf('Sync task "%s" completed successfully.', $syncInfo['name']),
);
}
/**
* @return mixed[]
*/
public function getSyncTimes(): array
{
$shortTaskTimeout = $this->environment->getSyncShortExecutionTime();
$longTaskTimeout = $this->environment->getSyncLongExecutionTime();
$settings = $this->settingsRepo->readSettings();
$syncs = [
GetSyncTasks::SYNC_NOWPLAYING => [
'name' => __('Now Playing Data'),
'contents' => [
__('Now Playing Data'),
],
'timeout' => $shortTaskTimeout,
'latest' => $settings->getSyncNowplayingLastRun(),
'interval' => 15,
],
GetSyncTasks::SYNC_SHORT => [
'name' => __('1-Minute Sync'),
'contents' => [
__('Song Requests Queue'),
],
'timeout' => $shortTaskTimeout,
'latest' => $settings->getSyncShortLastRun(),
'interval' => 60,
],
GetSyncTasks::SYNC_MEDIUM => [
'name' => __('5-Minute Sync'),
'contents' => [
__('Check Media Folders'),
],
'timeout' => $shortTaskTimeout,
'latest' => $settings->getSyncMediumLastRun(),
'interval' => 300,
],
GetSyncTasks::SYNC_LONG => [
'name' => __('1-Hour Sync'),
'contents' => [
__('Analytics/Statistics'),
__('Cleanup'),
],
'timeout' => $longTaskTimeout,
'latest' => $settings->getSyncLongLastRun(),
'interval' => 3600,
],
];
foreach ($syncs as &$sync_info) {
$sync_info['diff'] = time() - $sync_info['latest'];
}
return $syncs;
}
}

View File

@ -9,7 +9,7 @@ use App\Entity;
use Azura\DoctrineBatchUtils\ReadWriteBatchIteratorAggregate;
use Psr\Log\LoggerInterface;
abstract class AbstractTask
abstract class AbstractTask implements ScheduledTaskInterface
{
public function __construct(
protected ReloadableEntityManagerInterface $em,

View File

@ -1,45 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Sync\Task;
use App\Doctrine\ReloadableEntityManagerInterface;
use App\Entity;
use App\LockFactory;
use App\Radio\AutoDJ;
use Psr\Log\LoggerInterface;
class BuildQueueTask extends AbstractTask
{
public function __construct(
protected AutoDJ\Queue $queue,
protected LockFactory $lockFactory,
ReloadableEntityManagerInterface $em,
LoggerInterface $logger,
) {
parent::__construct($em, $logger);
}
public function run(bool $force = false): void
{
$stations = $this->em->getRepository(Entity\Station::class)
->findBy(['is_enabled' => 1]);
foreach ($stations as $station) {
/** @var Entity\Station $station */
$this->processStation($station, $force);
}
}
public function processStation(
Entity\Station $station,
bool $force = false
): void {
if ($station->useManualAutoDJ()) {
return;
}
$this->queue->buildQueue($station, $force);
}
}

View File

@ -22,6 +22,11 @@ class CheckFolderPlaylistsTask extends AbstractTask
parent::__construct($em, $logger);
}
public static function getSchedulePattern(): string
{
return '*/5 * * * *';
}
public function run(bool $force = false): void
{
foreach ($this->iterateStations() as $station) {

View File

@ -13,7 +13,6 @@ use App\Radio\Quota;
use Azura\Files\Attributes\FileAttributes;
use Brick\Math\BigInteger;
use Doctrine\ORM\AbstractQuery;
use Doctrine\ORM\Query;
use League\Flysystem\FilesystemException;
use League\Flysystem\StorageAttributes;
use League\Flysystem\UnableToRetrieveMetadata;
@ -34,6 +33,11 @@ class CheckMediaTask extends AbstractTask
parent::__construct($em, $logger);
}
public static function getSchedulePattern(): string
{
return '1-59/5 * * * *';
}
/**
* Handle event dispatch.
*

View File

@ -12,7 +12,7 @@ use App\Radio\Backend\Liquidsoap;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\Log\LoggerInterface;
class CheckRequests extends AbstractTask
class CheckRequestsTask extends AbstractTask
{
public function __construct(
protected Entity\Repository\StationRequestRepository $requestRepo,
@ -24,6 +24,11 @@ class CheckRequests extends AbstractTask
parent::__construct($em, $logger);
}
public static function getSchedulePattern(): string
{
return self::SCHEDULE_EVERY_MINUTE;
}
/**
* Manually process any requests for stations that use "Manual AutoDJ" mode.
*

View File

@ -24,6 +24,11 @@ class CheckUpdatesTask extends AbstractTask
parent::__construct($em, $logger);
}
public static function getSchedulePattern(): string
{
return '3-59/5 * * * *';
}
public function run(bool $force = false): void
{
$settings = $this->settingsRepo->readSettings();

View File

@ -21,6 +21,11 @@ class CleanupHistoryTask extends AbstractTask
parent::__construct($em, $logger);
}
public static function getSchedulePattern(): string
{
return '17 * * * *';
}
public function run(bool $force = false): void
{
$daysToKeep = $this->settingsRepo->readSettings()->getHistoryKeepDays();

View File

@ -18,6 +18,11 @@ class CleanupLoginTokensTask extends AbstractTask
parent::__construct($em, $logger);
}
public static function getSchedulePattern(): string
{
return '12 * * * *';
}
public function run(bool $force = false): void
{
$this->loginTokenRepo->cleanup();

View File

@ -6,6 +6,11 @@ namespace App\Sync\Task;
class CleanupRelaysTask extends AbstractTask
{
public static function getSchedulePattern(): string
{
return self::SCHEDULE_EVERY_MINUTE;
}
public function run(bool $force = false): void
{
// Relays should update every 15 seconds, so be fairly aggressive with this.

View File

@ -11,6 +11,11 @@ use Symfony\Component\Finder\Finder;
class CleanupStorageTask extends AbstractTask
{
public static function getSchedulePattern(): string
{
return '24 * * * *';
}
public function run(bool $force = false): void
{
foreach ($this->iterateStations() as $station) {

View File

@ -11,6 +11,11 @@ use Symfony\Component\Finder\Finder;
class MoveBroadcastsTask extends AbstractTask
{
public static function getSchedulePattern(): string
{
return self::SCHEDULE_EVERY_MINUTE;
}
public function __construct(
ReloadableEntityManagerInterface $em,
LoggerInterface $logger,

View File

@ -1,317 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Sync\Task;
use App\Doctrine\ReloadableEntityManagerInterface;
use App\Entity;
use App\Entity\Api\NowPlaying\NowPlaying;
use App\Environment;
use App\Event\Radio\GenerateRawNowPlaying;
use App\Http\RouterInterface;
use App\LockFactory;
use App\Message;
use App\Radio\Adapters;
use DeepCopy\DeepCopy;
use Exception;
use Monolog\Logger;
use NowPlaying\Result\Result;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\Log\LoggerInterface;
use Psr\SimpleCache\CacheInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\MessageBus;
class NowPlayingTask extends AbstractTask implements EventSubscriberInterface
{
public function __construct(
protected Adapters $adapters,
protected CacheInterface $cache,
protected EventDispatcherInterface $eventDispatcher,
protected MessageBus $messageBus,
protected LockFactory $lockFactory,
protected RouterInterface $router,
protected Entity\Repository\ListenerRepository $listenerRepo,
protected Entity\Repository\SettingsRepository $settingsRepo,
protected Entity\ApiGenerator\NowPlayingApiGenerator $nowPlayingApiGenerator,
ReloadableEntityManagerInterface $em,
LoggerInterface $logger,
) {
parent::__construct($em, $logger);
}
/**
* @return mixed[]
*/
public static function getSubscribedEvents(): array
{
if (Environment::getInstance()->isTesting()) {
return [];
}
return [
GenerateRawNowPlaying::class => [
['loadRawFromFrontend', 10],
['addToRawFromRemotes', 0],
],
];
}
/**
* Handle event dispatch.
*
* @param Message\AbstractMessage $message
*/
public function __invoke(Message\AbstractMessage $message): void
{
if ($message instanceof Message\UpdateNowPlayingMessage) {
$station = $this->em->find(Entity\Station::class, $message->station_id);
if ($station instanceof Entity\Station) {
$this->processStation(
station: $station,
standalone: true
);
}
}
}
public function run(bool $force = false): void
{
$nowplaying = $this->loadNowPlaying($force);
$this->cache->set('nowplaying', $nowplaying, 120);
$settings = $this->settingsRepo->readSettings();
$settings->setNowplaying($nowplaying);
$this->settingsRepo->writeSettings($settings);
}
/**
* @param bool $force
*
* @return NowPlaying[]
*/
protected function loadNowPlaying(bool $force = false): array
{
$nowplaying = [];
foreach ($this->iterateStations() as $station) {
if (!$station->getIsEnabled()) {
continue;
}
$nowplaying[] = $this->processStation(
station: $station,
force: $force
);
}
return $nowplaying;
}
public function processStation(
Entity\Station $station,
bool $standalone = false,
bool $force = false
): NowPlaying {
$lock = $this->lockFactory->createAndAcquireLock(
resource: 'nowplaying_station_' . $station->getId(),
ttl: 600,
force: $force
);
if (false === $lock) {
return $this->nowPlayingApiGenerator->currentOrEmpty($station);
}
try {
/** @var Logger $logger */
$logger = $this->logger;
$logger->pushProcessor(
function ($record) use ($station) {
$record['extra']['station'] = [
'id' => $station->getId(),
'name' => $station->getName(),
];
return $record;
}
);
$settings = $this->settingsRepo->readSettings();
$include_clients = (Entity\Analytics::LEVEL_NONE !== $settings->getAnalytics());
$frontend_adapter = $this->adapters->getFrontendAdapter($station);
$remote_adapters = $this->adapters->getRemoteAdapters($station);
// Build the new "raw" NowPlaying data.
try {
$event = new GenerateRawNowPlaying(
$station,
$frontend_adapter,
$remote_adapters,
$include_clients
);
$this->eventDispatcher->dispatch($event);
$npResult = $event->getResult();
} catch (Exception $e) {
$this->logger->log(
Logger::ERROR,
$e->getMessage(),
[
'file' => $e->getFile(),
'line' => $e->getLine(),
'code' => $e->getCode(),
]
);
$npResult = Result::blank();
}
$this->logger->debug(
'Final NowPlaying Response for Station',
[
'id' => $station->getId(),
'name' => $station->getName(),
'np' => $npResult,
]
);
// Update detailed listener statistics, if they exist for the station
if ($include_clients && null !== $npResult->clients) {
$this->listenerRepo->update($station, $npResult->clients);
}
$np = ($this->nowPlayingApiGenerator)($station, $npResult);
// Trigger the dispatching of webhooks.
$this->dispatchWebhooks($station, $np);
if ($standalone) {
$this->updateCaches($station, $np);
}
$station->setNowplaying($np);
$this->em->persist($station);
$this->em->flush();
$logger->popProcessor();
return $np;
} finally {
$lock->release();
}
}
public function loadRawFromFrontend(GenerateRawNowPlaying $event): void
{
try {
$result = $event
->getFrontend()
->getNowPlaying($event->getStation(), $event->includeClients());
} catch (Exception $e) {
$this->logger->error(sprintf('NowPlaying adapter error: %s', $e->getMessage()));
return;
}
$event->setResult($result);
}
public function addToRawFromRemotes(GenerateRawNowPlaying $event): void
{
$result = $event->getResult();
// Loop through all remotes and update NP data accordingly.
foreach ($event->getRemotes() as $ra_proxy) {
try {
$result = $ra_proxy->getAdapter()->updateNowPlaying(
$result,
$ra_proxy->getRemote(),
$event->includeClients()
);
} catch (Exception $e) {
$this->logger->error(sprintf('NowPlaying adapter error: %s', $e->getMessage()));
}
}
$event->setResult($result);
}
protected function dispatchWebhooks(
Entity\Station $station,
NowPlaying $npOriginal,
bool $isStandalone = true
): void {
/** @var NowPlaying $np */
$np = (new DeepCopy())->copy($npOriginal);
$np->resolveUrls($this->router->getBaseUrl());
$np->cache = 'event';
$npOld = $station->getNowplaying();
$triggers = [
Entity\StationWebhook::TRIGGER_ALL,
];
if ($npOld instanceof NowPlaying) {
if ($npOld->now_playing?->song?->id !== $np->now_playing?->song?->id) {
$triggers[] = Entity\StationWebhook::TRIGGER_SONG_CHANGED;
}
if ($npOld->listeners->current > $np->listeners->current) {
$triggers[] = Entity\StationWebhook::TRIGGER_LISTENER_LOST;
} elseif ($npOld->listeners->current < $np->listeners->current) {
$triggers[] = Entity\StationWebhook::TRIGGER_LISTENER_GAINED;
}
if (!$npOld->live->is_live && $np->live->is_live) {
$triggers[] = Entity\StationWebhook::TRIGGER_LIVE_CONNECT;
} elseif ($npOld->live->is_live && !$np->live->is_live) {
$triggers[] = Entity\StationWebhook::TRIGGER_LIVE_DISCONNECT;
}
if ($npOld->is_online && !$np->is_online) {
$triggers[] = Entity\StationWebhook::TRIGGER_STATION_OFFLINE;
} elseif (!$npOld->is_online && $np->is_online) {
$triggers[] = Entity\StationWebhook::TRIGGER_STATION_ONLINE;
}
}
$message = new Message\DispatchWebhookMessage();
$message->station_id = (int)$station->getId();
$message->np = $np;
$message->triggers = $triggers;
$this->messageBus->dispatch($message);
}
protected function updateCaches(
Entity\Station $station,
NowPlaying $np
): void {
// Replace the relevant station information in the cache and database.
$this->logger->debug('Updating NowPlaying cache...');
$np_full = $this->cache->get('nowplaying');
if ($np_full) {
$np_new = [];
foreach ($np_full as $np_old) {
/** @var NowPlaying $np_old */
if ($np_old->station->id === $station->getId()) {
$np_new[] = $np;
} else {
$np_new[] = $np_old;
}
}
$this->cache->set('nowplaying', $np_new, 120);
$settings = $this->settingsRepo->readSettings();
$settings->setNowplaying($np_new);
$this->settingsRepo->writeSettings($settings);
}
}
}

View File

@ -18,6 +18,11 @@ class ReactivateStreamerTask extends AbstractTask
parent::__construct($em, $logger);
}
public static function getSchedulePattern(): string
{
return self::SCHEDULE_EVERY_MINUTE;
}
public function run(bool $force = false): void
{
foreach ($this->streamerRepo->getStreamersDueForReactivation() as $streamer) {

View File

@ -21,9 +21,14 @@ class ReloadFrontendAfterSslChangeTask extends AbstractTask
parent::__construct($em, $logger);
}
public static function getSchedulePattern(): string
{
return '2,19,32,47 * * * *';
}
public function run(bool $force = false): void
{
$threshold = $this->settingsRepo->readSettings()->getSyncLongLastRun();
$threshold = time() - 15;
$certs = CertificateLocator::findCertificate();

View File

@ -27,6 +27,11 @@ class RotateLogsTask extends AbstractTask
parent::__construct($em, $logger);
}
public static function getSchedulePattern(): string
{
return '34 * * * *';
}
public function run(bool $force = false): void
{
// Rotate logs for individual stations.

View File

@ -22,6 +22,11 @@ class RunAnalyticsTask extends AbstractTask
parent::__construct($em, $logger);
}
public static function getSchedulePattern(): string
{
return '4 * * * *';
}
public function run(bool $force = false): void
{
$analytics_level = $this->settingsRepo->readSettings()->getAnalytics();

View File

@ -25,6 +25,11 @@ class RunAutomatedAssignmentTask extends AbstractTask
parent::__construct($em, $logger);
}
public static function getSchedulePattern(): string
{
return '7 * * * *';
}
/**
* Iterate through all stations and attempt to run automated assignment.
*

View File

@ -25,6 +25,11 @@ class RunBackupTask extends AbstractTask
parent::__construct($em, $logger);
}
public static function getSchedulePattern(): string
{
return self::SCHEDULE_EVERY_MINUTE;
}
/**
* Handle event dispatch.
*

View File

@ -0,0 +1,20 @@
<?php
declare(strict_types=1);
namespace App\Sync\Task;
interface ScheduledTaskInterface
{
public const SCHEDULE_EVERY_MINUTE = '* * * * *';
public const SCHEDULE_EVERY_FIVE_MINUTES = '*/5 * * * *';
/**
* The CRON-styled pattern for execution of this task.
*
* @return string
*/
public static function getSchedulePattern(): string;
public function run(bool $force = false): void;
}

View File

@ -29,6 +29,11 @@ class UpdateGeoLiteTask extends AbstractTask
parent::__construct($em, $logger);
}
public static function getSchedulePattern(): string
{
return '42 */3 * * *';
}
public function run(bool $force = false): void
{
$settings = $this->settingsRepo->readSettings();

View File

@ -14,6 +14,11 @@ use League\Flysystem\StorageAttributes;
class UpdateStorageLocationSizesTask extends AbstractTask
{
public static function getSchedulePattern(): string
{
return '27 * * * *';
}
public function run(bool $force = false): void
{
$iterator = ReadWriteBatchIteratorAggregate::fromQuery(

View File

@ -1,59 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Sync;
use App\Event\GetSyncTasks;
use Psr\Container\ContainerInterface;
class TaskLocator
{
protected array $tasks;
public function __construct(
protected ContainerInterface $di
) {
$this->tasks = [
GetSyncTasks::SYNC_NOWPLAYING => [
Task\BuildQueueTask::class,
Task\NowPlayingTask::class,
Task\ReactivateStreamerTask::class,
],
GetSyncTasks::SYNC_SHORT => [
Task\CheckRequests::class,
Task\RunBackupTask::class,
Task\CleanupRelaysTask::class,
Task\MoveBroadcastsTask::class,
],
GetSyncTasks::SYNC_MEDIUM => [
Task\CheckMediaTask::class,
Task\CheckFolderPlaylistsTask::class,
Task\CheckUpdatesTask::class,
],
GetSyncTasks::SYNC_LONG => [
Task\RunAnalyticsTask::class,
Task\RunAutomatedAssignmentTask::class,
Task\CleanupLoginTokensTask::class,
Task\CleanupHistoryTask::class,
Task\CleanupStorageTask::class,
Task\UpdateStorageLocationSizesTask::class,
Task\RotateLogsTask::class,
Task\UpdateGeoLiteTask::class,
Task\ReloadFrontendAfterSslChangeTask::class,
],
];
}
public function __invoke(GetSyncTasks $event): void
{
$type = $event->getType();
if (!isset($this->tasks[$type])) {
return;
}
foreach ($this->tasks[$type] as $taskClass) {
$event->addTask($this->di->get($taskClass));
}
}
}

View File

@ -1,23 +1,32 @@
<?php
/**
* @var \App\Assets $assets
*/
$this->layout('main', [
'title' => __('System Debugger'),
'manual' => true,
]);
$assets
->load('luxon')
->addInlineJs($this->fetch('admin/debug/index.js'), 99);
?>
<h2 class="outside-card-header mb-1"><?=__('System Debugger')?></h2>
<h2 class="outside-card-header mb-1"><?= __('System Debugger') ?></h2>
<div class="row">
<div class="col-md-6">
<div class="card mb-3">
<div class="card-header bg-primary-dark">
<h2 class="card-title"><?=__('Clear Cache')?></h2>
<h2 class="card-title"><?= __('Clear Cache') ?></h2>
</div>
<div class="card-body">
<p><?=__('Clearing the application cache may log you out of your session.')?></p>
<p><?= __('Clearing the application cache may log you out of your session.') ?></p>
<div class="buttons">
<a class="btn btn-sm btn-primary" role="button" href="<?=$router->named('admin:debug:clear-cache')?>">
<?=__('Clear Cache')?>
<a class="btn btn-sm btn-primary" role="button"
href="<?= $router->named('admin:debug:clear-cache') ?>">
<?= __('Clear Cache') ?>
</a>
</div>
</div>
@ -26,13 +35,14 @@ $this->layout('main', [
<div class="col-md-6">
<div class="card mb-3">
<div class="card-header bg-primary-dark">
<h2 class="card-title"><?=__('Clear All Message Queues')?></h2>
<h2 class="card-title"><?= __('Clear All Message Queues') ?></h2>
</div>
<div class="card-body">
<p><?=__('This will clear any pending unprocessed messages in all message queues.')?></p>
<p><?= __('This will clear any pending unprocessed messages in all message queues.') ?></p>
<div class="buttons">
<a class="btn btn-sm btn-primary" role="button" href="<?=$router->named('admin:debug:clear-queue')?>">
<?=__('Clear All Message Queues')?>
<a class="btn btn-sm btn-primary" role="button"
href="<?= $router->named('admin:debug:clear-queue') ?>">
<?= __('Clear All Message Queues') ?>
</a>
</div>
</div>
@ -42,42 +52,79 @@ $this->layout('main', [
<div class="card mb-3">
<div class="card-header bg-primary-dark">
<h2 class="card-title"><?=__('Synchronization Tasks')?></h2>
<h2 class="card-title"><?= __('Synchronization Tasks') ?></h2>
</div>
<div class="card-body">
<div class="row">
<?php foreach ($sync_times as $sync_key => $sync_info): ?>
<div class="col">
<h5 class="mb-0"><?=$sync_info['name']?></h5>
<p><?=implode(', ', $sync_info['contents'])?></p>
<table class="table">
<colgroup>
<col width="40%">
<col width="20%">
<col width="20%">
<col width="20%">
</colgroup>
<thead>
<tr>
<th><?= __('Task Name') ?></th>
<th><?= __('Last Run') ?></th>
<th><?= __('Next Run') ?></th>
<th><?= __('Actions') ?></th>
</tr>
</thead>
<?php
foreach ($sync_times as $task => $taskInfo): ?>
<tr>
<td>
<big><?= $task ?></big><br>
<?= $taskInfo['pattern'] ?>
</td>
<td>
<?php
if (0 === $taskInfo['time']): ?>
<?= __('Not Run') ?>
<?php
else: ?>
<time <?php
if ($taskInfo['time'] < time() - (60 * 60 * 3)): ?>class="text-danger"<?php
endif; ?> data-duration="<?= $taskInfo['time'] ?>"></time>
<?php
endif; ?>
</td>
<td>
<time data-duration="<?= $taskInfo['nextRun'] ?>"></time>
</td>
<td>
<div class="buttons">
<a class="btn btn-sm btn-primary" role="button" href="<?=$router->named('admin:debug:sync',
['type' => $sync_key])?>">
<?=__('Run Task')?>
<a class="btn btn-sm btn-primary" role="button" href="<?= $router->named(
'admin:debug:sync',
['task' => $task]
) ?>">
<?= __('Run Task') ?>
</a>
</div>
</div>
<?php endforeach; ?>
</div>
</div>
</td>
</tr>
<?php
endforeach; ?>
</table>
</div>
<div class="card mb-3">
<div class="card-header bg-primary-dark">
<h2 class="card-title"><?=__('Message Queues')?></h2>
<h2 class="card-title"><?= __('Message Queues') ?></h2>
</div>
<div class="card-body">
<div class="row">
<?php foreach ($queue_totals as $queueType => $queueTotal): ?>
<?php
foreach ($queue_totals as $queueType => $queueTotal): ?>
<div class="col">
<h5 class="mb-0"><?=$queueType?></h5>
<p><?=__('%d queued messages', $queueTotal)?></p>
<h5 class="mb-0"><?= $queueType ?></h5>
<p><?= __('%d queued messages', $queueTotal) ?></p>
<div class="buttons">
<a class="btn btn-sm btn-primary" role="button" href="<?=$router->named('admin:debug:clear-queue',
['queue' => $queueType])?>">
<?=__('Clear Queue')?>
<a class="btn btn-sm btn-primary" role="button" href="<?= $router->named(
'admin:debug:clear-queue',
['queue' => $queueType]
) ?>">
<?= __('Clear Queue') ?>
</a>
</div>
</div>
@ -101,32 +148,50 @@ $this->layout('main', [
</div>
<div class="tab-content">
<?php foreach ($stations as $station): ?>
<div class="card-body card-padding-sm tab-pane" id="debug_station_<?=$station['id']?>">
<h3><?=$station['name']?></h3>
<div class="card-body card-padding-sm tab-pane" id="debug_station_<?= $station['id'] ?>">
<h3><?= $station['name'] ?></h3>
<div class="row">
<div class="col">
<h5><?=__('Rebuild AutoDJ Queue')?></h5>
<div class="col-md-3">
<h5><?= __('Rebuild AutoDJ Queue') ?></h5>
<div class="buttons">
<a class="btn btn-sm btn-primary" role="button" href="<?=$router->named('admin:debug:nextsong',
['station_id' => $station['id']])?>">
<?=__('Run Test')?>
<a class="btn btn-sm btn-primary" role="button" href="<?= $router->named(
'admin:debug:nextsong',
['station_id' => $station['id']]
) ?>">
<?= __('Run Task') ?>
</a>
</div>
</div>
<?php if ($station['backend_type'] === \App\Radio\Adapters::BACKEND_LIQUIDSOAP): ?>
<div class="col">
<h5><?=__('Send Liquidsoap Telnet Command')?></h5>
<div class="col-md-3">
<h5><?= __('Get Now Playing') ?></h5>
<form class="form" method="POST" action="<?=$router->named('admin:debug:telnet',
['station_id' => $station['id']])?>">
<div class="buttons">
<a class="btn btn-sm btn-primary" role="button" href="<?= $router->named(
'admin:debug:nowplaying',
['station_id' => $station['id']]
) ?>">
<?= __('Run Task') ?>
</a>
</div>
</div>
<?php
if ($station['backend_type'] === \App\Radio\Adapters::BACKEND_LIQUIDSOAP): ?>
<div class="col-md-6">
<h5><?= __('Send Liquidsoap Telnet Command') ?></h5>
<form class="form" method="POST" action="<?= $router->named(
'admin:debug:telnet',
['station_id' => $station['id']]
) ?>">
<div class="form-group">
<label for="<?=$station['id']?>_command"><?=__('Command')?>:</label>
<input id="<?=$station['id']?>_command" name="command" class="form-control" type="text">
<label for="<?= $station['id'] ?>_command"><?= __('Command') ?>:</label>
<input id="<?= $station['id'] ?>_command" name="command" class="form-control"
type="text">
</div>
<button type="submit" class="btn btn-primary"><?=__('Execute Command')?></button>
<button type="submit" class="btn btn-primary"><?= __('Execute Command') ?></button>
</form>
</div>
<?php endif; ?>

View File

@ -9,10 +9,6 @@ $this->layout('main', [
'manual' => true,
'page_class' => 'page-admin',
]);
$assets
->load('luxon')
->addInlineJs($this->fetch('admin/index/index.js'), 99);
?>
<h2 class="outside-card-header mb-1"><?=__('Administration')?></h2>
@ -87,32 +83,3 @@ $assets
</div>
</div>
</div>
<h2 class="outside-card-header mb-1"><?= __('Synchronization Tasks') ?></h2>
<div class="card-deck">
<?php
foreach ($sync_times as $sync_key => $sync_info): ?>
<section class="card" role="region">
<div class="card-header bg-primary-dark">
<h2 class="card-title"><?= $sync_info['name'] ?></h2>
<h3 class="card-subtitle"><?= implode(', ', $sync_info['contents']) ?></h3>
</div>
<div class="card-body">
<p class="card-text"><?= __(
'Last run: %s',
'<time data-duration="' . $sync_info['latest'] . '"></time>'
)?></p>
</div>
<div class="card-actions">
<a class="btn btn-outline-primary" role="button" href="<?=$router->named(
'admin:debug:sync',
['type' => $sync_key]
)?>">
<i class="material-icons" aria-hidden="true">send</i>
<?=__('Run Task')?>
</a>
</div>
</section>
<?php endforeach; ?>
</div>

View File

@ -6,47 +6,10 @@ class Admin_DebugCest extends CestAbstract
* @before setupComplete
* @before login
*/
public function runNowPlayingSync(FunctionalTester $I): void
public function syncTasks(FunctionalTester $I)
{
$I->wantTo('Run now-playing synchronization task.');
$I->amOnPage('/admin/debug/sync/nowplaying');
$I->seeInSource('Run Synchronization Task');
}
/**
* @before setupComplete
* @before login
*/
public function runShortSync(FunctionalTester $I): void
{
$I->wantTo('Run short synchronization task.');
$I->amOnPage('/admin/debug/sync/short');
$I->seeInSource('Run Synchronization Task');
}
/**
* @before setupComplete
* @before login
*/
public function runMediumSync(FunctionalTester $I): void
{
$I->wantTo('Run medium synchronization task.');
$I->amOnPage('/admin/debug/sync/medium');
$I->seeInSource('Run Synchronization Task');
}
/**
* @before setupComplete
* @before login
*/
public function runLongSync(FunctionalTester $I): void
{
$I->wantTo('Run long synchronization task.');
$I->amOnPage('/admin/debug/sync/long');
$I->seeInSource('Run Synchronization Task');
$I->wantTo('Test All Synchronized Tasks');
$I->amOnPage('/admin/debug/sync/all');
$I->seeResponseCodeIsSuccessful();
}
}

View File

@ -34,7 +34,7 @@ else
fi
APP_ENV="${APP_ENV:-production}"
UPDATE_REVISION="${UPDATE_REVISION:-70}"
UPDATE_REVISION="${UPDATE_REVISION:-71}"
echo "Updating AzuraCast (Environment: $APP_ENV, Update revision: $UPDATE_REVISION)"

View File

@ -1,9 +1,3 @@
0 * * * * php {{ www_base }}/bin/console sync:run long
*/5 * * * * php {{ www_base }}/bin/console sync:run medium
*/5 * * * * php {{ www_base }}/bin/console queue:process 275
* * * * * php {{ www_base }}/bin/console sync:run short
* * * * * php {{ www_base }}/bin/console sync:run nowplaying
* * * * * sleep 15; php {{ www_base }}/bin/console sync:run nowplaying
* * * * * sleep 30; php {{ www_base }}/bin/console sync:run nowplaying
* * * * * sleep 45; php {{ www_base }}/bin/console sync:run nowplaying
* * * * * php {{ www_base }}/bin/console azuracast:sync:run
*/5 * * * * php {{ www_base }}/bin/console azuracast:sync:nowplaying --timeout=300
0 */6 * * * tmpreaper 12h /var/azuracast/stations/*/temp

View File

@ -28,6 +28,6 @@
- { role : ufw, when : update_revision|int < 12 }
- { role : dbip, when : update_revision|int < 51 }
- { role : services, when : update_revision|int < 13 }
- { role : azuracast-cron, when : update_revision|int < 48 }
- { role: azuracast-cron, when: update_revision|int < 71 }
- azuracast-build
- azuracast-setup

View File

@ -1,8 +1,2 @@
0 * * * * root /usr/local/bin/cron_task azuracast_cli sync:run long
*/5 * * * * root /usr/local/bin/cron_task azuracast_cli sync:run medium
* * * * * root /usr/local/bin/cron_task azuracast_cli sync:run short
* * * * * root /usr/local/bin/cron_task azuracast_cli sync:run nowplaying
* * * * * root sleep 15; /usr/local/bin/cron_task azuracast_cli sync:run nowplaying
* * * * * root sleep 30; /usr/local/bin/cron_task azuracast_cli sync:run nowplaying
* * * * * root sleep 45; /usr/local/bin/cron_task azuracast_cli sync:run nowplaying
* * * * * root /usr/local/bin/cron_task azuracast_cli azuracast:sync:run
30 */6 * * * root /usr/local/bin/temp_cleanup

View File

@ -0,0 +1,3 @@
#!/bin/bash
sudo -E -u azuracast php /var/azuracast/www/bin/console azuracast:sync:nowplaying