addArgument('station', InputArgument::REQUIRED); $this->addOption( 'timeout', 't', InputOption::VALUE_OPTIONAL, 'Amount of time (in seconds) to run the worker process.', 600 ); } protected function execute(InputInterface $input, OutputInterface $output): int { $io = new SymfonyStyle($input, $output); $stationName = $input->getArgument('station'); $station = $this->stationRepo->findByIdentifier($stationName); if (!($station instanceof Station)) { $io->error('Station not found.'); return 1; } $timeout = (int)$input->getOption('timeout'); $delay = $this->environment->getNowPlayingDelayTime(); // If delay is default value, auto-scale it with station count. if ($delay < 1) { $numStations = $this->stationRepo->getActiveCount(); $delay = (int)min( 4 + round($numStations / 5), 20 ); } $this->logger->pushProcessor( function (LogRecord $record) use ($station) { $record->extra['station'] = [ 'id' => $station->getId(), 'name' => $station->getName(), ]; return $record; } ); $this->logger->info('Starting Now Playing sync task.'); $this->loop($station, $timeout, $delay); $this->logger->info('Now Playing sync task complete.'); $this->logger->popProcessor(); return 0; } private function loop(Station $station, int $timeout, int $delay): void { $threshold = time() + $timeout; while (time() < $threshold) { $station = $this->em->refetch($station); try { $this->buildQueueTask->run($station); } catch (Throwable $e) { $this->logger->error( 'Queue builder error: ' . $e->getMessage(), ['exception' => $e] ); } try { $this->nowPlayingTask->run($station); } catch (Throwable $e) { $this->logger->error( 'Now Playing error: ' . $e->getMessage(), ['exception' => $e] ); } $this->em->clear(); gc_collect_cycles(); sleep($delay); } } }