Move Now Playing tasks into separate per-station processes.

This commit is contained in:
Buster Neece 2023-01-11 21:13:51 -06:00
parent c25a60bc75
commit fe2b44df12
No known key found for this signature in database
GPG Key ID: F1D2E64A0005E80E
6 changed files with 92 additions and 167 deletions

View File

@ -29,7 +29,6 @@ return function (App\Event\BuildConsoleCommands $event) {
'azuracast:setup' => Command\SetupCommand::class,
'azuracast:radio:restart' => Command\RestartRadioCommand::class,
'azuracast:sync:nowplaying' => Command\Sync\NowPlayingCommand::class,
'azuracast:sync:nowplaying:station' => Command\Sync\NowPlayingPerStationCommand::class,
'azuracast:sync:run' => Command\Sync\RunnerCommand::class,
'azuracast:sync:task' => Command\Sync\SingleTaskCommand::class,
'azuracast:media:reprocess' => Command\ReprocessMediaCommand::class,

View File

@ -4,37 +4,42 @@ declare(strict_types=1);
namespace App\Console\Command\Sync;
use App\Entity\Repository\SettingsRepository;
use App\Environment;
use App\Lock\LockFactory;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use App\Console\Command\CommandAbstract;
use App\Doctrine\ReloadableEntityManagerInterface;
use App\Entity\Repository\StationRepository;
use App\Entity\Station;
use App\Sync\NowPlaying\Task\BuildQueueTask;
use App\Sync\NowPlaying\Task\NowPlayingTask;
use Monolog\Logger;
use Monolog\LogRecord;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use function random_int;
use Throwable;
#[AsCommand(
name: 'azuracast:sync:nowplaying',
description: 'Task to run the Now Playing worker task.'
description: 'Task to run the Now Playing worker task for a specific station.',
)]
final class NowPlayingCommand extends AbstractSyncCommand
final class NowPlayingCommand extends CommandAbstract
{
public function __construct(
LoggerInterface $logger,
LockFactory $lockFactory,
Environment $environment,
private readonly EntityManagerInterface $em,
private readonly SettingsRepository $settingsRepo,
private readonly ReloadableEntityManagerInterface $em,
private readonly StationRepository $stationRepo,
private readonly BuildQueueTask $buildQueueTask,
private readonly NowPlayingTask $nowPlayingTask,
private readonly Logger $logger,
) {
parent::__construct($logger, $lockFactory, $environment);
parent::__construct();
}
protected function configure(): void
{
$this->addArgument('station', InputArgument::REQUIRED);
$this->addOption(
'timeout',
't',
@ -47,69 +52,64 @@ final class NowPlayingCommand extends AbstractSyncCommand
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$stationName = $input->getArgument('station');
$settings = $this->settingsRepo->readSettings();
if ($settings->getSyncDisabled()) {
$this->logger->error('Automated synchronization is temporarily disabled.');
$station = $this->stationRepo->findByIdentifier($stationName);
if (!($station instanceof Station)) {
$io->error('Station not found.');
return 1;
}
$timeout = (int)$input->getOption('timeout');
$this->loop($io, $timeout);
$this->logger->pushProcessor(
function (LogRecord $record) use ($station) {
$record->extra['station'] = [
'id' => $station->getId(),
'name' => $station->getName(),
];
return $record;
}
);
$this->logger->info('Starting Now Playing sync task.');
$this->loop($station, $timeout);
$this->logger->info('Now Playing sync task complete.');
$this->logger->popProcessor();
return 0;
}
private function loop(SymfonyStyle $io, int $timeout): void
private function loop(Station $station, int $timeout): void
{
$threshold = time() + $timeout;
while (time() < $threshold || !empty($this->processes)) {
// Check existing processes.
$this->checkRunningProcesses();
while (time() < $threshold) {
$station = $this->em->refetch($station);
// Ensure a process is running for every active station.
if (time() < $threshold - 5) {
$activeStations = $this->em->createQuery(
<<<'DQL'
SELECT s.id, s.short_name, s.nowplaying_timestamp
FROM App\Entity\Station s
WHERE s.is_enabled = 1 AND s.has_started = 1
DQL
)->getArrayResult();
try {
$this->buildQueueTask->run($station);
} catch (Throwable $e) {
$this->logger->error(
'Queue builder error: ' . $e->getMessage(),
['exception' => $e]
);
}
foreach ($activeStations as $activeStation) {
$shortName = $activeStation['short_name'];
if (!isset($this->processes[$shortName])) {
$npTimestamp = (int)$activeStation['nowplaying_timestamp'];
if (time() > $npTimestamp + random_int(5, 15)) {
$this->start($io, $shortName);
usleep(250000);
}
}
}
try {
$this->nowPlayingTask->run($station);
} catch (Throwable $e) {
$this->logger->error(
'Now Playing error: ' . $e->getMessage(),
['exception' => $e]
);
}
$this->em->clear();
gc_collect_cycles();
usleep(1000000);
usleep(5000000);
}
}
private function start(
SymfonyStyle $io,
string $shortName
): void {
$this->lockAndRunConsoleCommand(
$io,
$shortName,
'nowplaying',
[
'azuracast:sync:nowplaying:station',
$shortName,
]
);
}
}

View File

@ -1,87 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Console\Command\Sync;
use App\Console\Command\CommandAbstract;
use App\Entity\Repository\StationRepository;
use App\Entity\Station;
use App\Sync\NowPlaying\Task\BuildQueueTask;
use App\Sync\NowPlaying\Task\NowPlayingTask;
use Monolog\Logger;
use Monolog\LogRecord;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Throwable;
#[AsCommand(
name: 'azuracast:sync:nowplaying:station',
description: 'Task to run the Now Playing worker task for a specific station.',
)]
final class NowPlayingPerStationCommand extends CommandAbstract
{
public function __construct(
private readonly StationRepository $stationRepo,
private readonly BuildQueueTask $buildQueueTask,
private readonly NowPlayingTask $nowPlayingTask,
private readonly Logger $logger,
) {
parent::__construct();
}
protected function configure(): void
{
$this->addArgument('station', InputArgument::REQUIRED);
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$stationName = $input->getArgument('station');
$station = $this->stationRepo->findByIdentifier($stationName);
if (!($station instanceof Station)) {
$io->error('Station not found.');
return 1;
}
$this->logger->pushProcessor(
function (LogRecord $record) use ($station) {
$record->extra['station'] = [
'id' => $station->getId(),
'name' => $station->getName(),
];
return $record;
}
);
$this->logger->info('Starting Now Playing sync task.');
try {
$this->buildQueueTask->run($station);
} catch (Throwable $e) {
$this->logger->error(
'Queue builder error: ' . $e->getMessage(),
['exception' => $e]
);
}
try {
$this->nowPlayingTask->run($station);
} catch (Throwable $e) {
$this->logger->error(
'Now Playing error: ' . $e->getMessage(),
['exception' => $e]
);
}
$this->logger->info('Now Playing sync task complete.');
$this->logger->popProcessor();
return 0;
}
}

View File

@ -127,7 +127,13 @@ final class Configuration
}
// Write group section of config
$programNames = [];
$stationGroup = self::getSupervisorGroupName($station);
$nowPlayingProgramName = self::getSupervisorProgramName($station, 'nowplaying');
$programNames = [
$nowPlayingProgramName,
];
$programs = [];
if (null !== $backend && $backend->hasCommand($station)) {
@ -144,12 +150,11 @@ final class Configuration
$programNames[] = $programName;
}
$stationGroup = self::getSupervisorGroupName($station);
$supervisorConfig[] = '[group:' . $stationGroup . ']';
$supervisorConfig[] = 'programs=' . implode(',', $programNames);
$supervisorConfig[] = '';
// Write backend/frontend programs
foreach ($programs as $programName => $adapter) {
$configLines = [
'user' => 'azuracast',
@ -174,6 +179,30 @@ final class Configuration
$supervisorConfig[] = '';
}
// Write Now Playing process
$configLines = [
'user' => 'azuracast',
'priority' => 975,
'startsecs' => 10,
'startretries' => 5,
'command' => 'php ' . $this->environment->getBaseDirectory()
. '/bin/console azuracast:sync:nowplaying ' . $station->getIdRequired(),
'directory' => $this->environment->getBaseDirectory(),
'autorestart' => 'true',
'stopasgroup' => 'true',
'killasgroup' => 'true',
'stdout_logfile' => '/proc/1/fd/1',
'stdout_logfile_maxbytes' => 0,
'stderr_logfile' => '/proc/1/fd/2',
'stderr_logfile_maxbytes' => 0,
];
$supervisorConfig[] = '[program:' . $nowPlayingProgramName . ']';
foreach ($configLines as $configKey => $configValue) {
$supervisorConfig[] = $configKey . '=' . $configValue;
}
$supervisorConfig[] = '';
// Write config contents
$supervisor_config_data = implode("\n", $supervisorConfig);
file_put_contents($supervisorConfigFile, $supervisor_config_data);

View File

@ -80,7 +80,6 @@ final class ServiceControl
'mariadb' => __('Database'),
'nginx' => __('Web server'),
'php-fpm' => __('PHP FastCGI Process Manager'),
'php-nowplaying' => __('Now Playing manager service'),
'php-worker' => __('PHP queue processing worker'),
'redis' => __('Cache'),
'sftpgo' => __('SFTP service'),

View File

@ -1,15 +0,0 @@
[program:php-nowplaying]
command=php /var/azuracast/www/bin/console azuracast:sync:nowplaying
user=azuracast
priority=600
numprocs=1
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stdout_logfile=/proc/1/fd/1
stdout_logfile_maxbytes=0
stderr_logfile=/proc/1/fd/2
stderr_logfile_maxbytes=0