Webhook and Sync Task Refactor

Moving service locator dependencies to their own Collections and make webhooks dispatch via MessageQueue messages.
This commit is contained in:
Buster "Silver Eagle" Neece 2020-08-28 02:04:23 -05:00
parent bc1352e279
commit 93ff68881f
No known key found for this signature in database
GPG Key ID: 6D9E12FF03411F4E
11 changed files with 235 additions and 220 deletions

View File

@ -13,4 +13,6 @@ return [
Message\UpdateNowPlayingMessage::class => Task\NowPlaying::class,
Message\BackupMessage::class => Task\Backup::class,
Message\DispatchWebhookMessage::class => App\Webhook\Dispatcher::class,
];

View File

@ -377,47 +377,37 @@ return [
},
// Synchronized (Cron) Tasks
App\Sync\Runner::class => function (
ContainerInterface $di,
Monolog\Logger $logger,
App\Lock\LockManager $lockManager,
App\Entity\Repository\SettingsRepository $settingsRepo
) {
return new App\Sync\Runner(
$settingsRepo,
$logger,
$lockManager,
[ // Every 15 seconds tasks
App\Sync\TaskCollection::class => function (ContainerInterface $di) {
return new App\Sync\TaskCollection([
App\Sync\TaskCollection::SYNC_NOWPLAYING => [
$di->get(App\Sync\Task\BuildQueue::class),
$di->get(App\Sync\Task\NowPlaying::class),
$di->get(App\Sync\Task\ReactivateStreamer::class),
],
[ // Every minute tasks
App\Sync\TaskCollection::SYNC_SHORT => [
$di->get(App\Sync\Task\RadioRequests::class),
$di->get(App\Sync\Task\Backup::class),
$di->get(App\Sync\Task\RelayCleanup::class),
],
[ // Every 5 minutes tasks
App\Sync\TaskCollection::SYNC_MEDIUM => [
$di->get(App\Sync\Task\Media::class),
$di->get(App\Sync\Task\FolderPlaylists::class),
$di->get(App\Sync\Task\CheckForUpdates::class),
],
[ // Every hour tasks
App\Sync\TaskCollection::SYNC_LONG => [
$di->get(App\Sync\Task\Analytics::class),
$di->get(App\Sync\Task\RadioAutomation::class),
$di->get(App\Sync\Task\HistoryCleanup::class),
$di->get(App\Sync\Task\RotateLogs::class),
$di->get(App\Sync\Task\UpdateGeoLiteDatabase::class),
]
);
],
]);
},
// Web Hooks
App\Webhook\Dispatcher::class => function (
App\Webhook\ConnectorCollection::class => function (
ContainerInterface $di,
App\Config $config,
Monolog\Logger $logger,
App\ApiUtilities $apiUtils
App\Config $config
) {
$webhooks = $config->get('webhooks');
$services = [];
@ -425,6 +415,6 @@ return [
$services[$webhook_key] = $di->get($webhook_info['class']);
}
return new App\Webhook\Dispatcher($logger, $apiUtils, $services);
return new App\Webhook\ConnectorCollection($services);
},
];

View File

@ -7,9 +7,6 @@ use App\Webhook\Connector;
return [
'webhooks' => [
Connector\Local::NAME => [
'class' => Connector\Local::class,
],
Connector\Generic::NAME => [
'class' => Connector\Generic::class,
'name' => __('Generic Web Hook'),

View File

@ -3,7 +3,6 @@ namespace App\Event;
use App\Entity\Api\NowPlaying;
use App\Entity\Station;
use App\Http\Router;
use Symfony\Contracts\EventDispatcher\Event;
class SendWebhooks extends Event
@ -12,8 +11,6 @@ class SendWebhooks extends Event
protected NowPlaying $np;
protected Router $router;
protected array $triggers = [];
protected bool $is_standalone = true;
@ -21,30 +18,38 @@ class SendWebhooks extends Event
public function __construct(
Station $station,
NowPlaying $np,
$np_old = null,
$is_standalone = true
bool $is_standalone = true,
?array $triggers = []
) {
$this->station = $station;
$this->np = $np;
$this->is_standalone = $is_standalone;
if (empty($triggers)) {
$triggers = ['all'];
}
$this->triggers = $triggers;
}
public function computeTriggers($np_old): void
{
$to_trigger = ['all'];
if ($np_old instanceof NowPlaying) {
if ($np_old->now_playing->song->id !== $np->now_playing->song->id) {
if ($np_old->now_playing->song->id !== $this->np->now_playing->song->id) {
$to_trigger[] = 'song_changed';
}
if ($np_old->listeners->current > $np->listeners->current) {
if ($np_old->listeners->current > $this->np->listeners->current) {
$to_trigger[] = 'listener_lost';
} elseif ($np_old->listeners->current < $np->listeners->current) {
} elseif ($np_old->listeners->current < $this->np->listeners->current) {
$to_trigger[] = 'listener_gained';
}
if ($np_old->live->is_live === false && $np->live->is_live === true) {
if ($np_old->live->is_live === false && $this->np->live->is_live === true) {
$to_trigger[] = 'live_connect';
} elseif ($np_old->live->is_live === true && $np->live->is_live === false) {
} elseif ($np_old->live->is_live === true && $this->np->live->is_live === false) {
$to_trigger[] = 'live_disconnect';
}
}

View File

@ -0,0 +1,16 @@
<?php
namespace App\Message;
use App\Entity\Api\NowPlaying;
class DispatchWebhookMessage extends AbstractMessage
{
/** @var int The numeric identifier for the StationWebhook record being processed. */
public int $webhook_id;
public NowPlaying $np;
public bool $is_standalone = true;
public array $triggers = [];
}

View File

@ -18,35 +18,18 @@ class Runner
protected LockManager $lockManager;
/** @var Task\AbstractTask[] */
protected array $tasks_nowplaying;
/** @var Task\AbstractTask[] */
protected array $tasks_short;
/** @var Task\AbstractTask[] */
protected array $tasks_medium;
/** @var Task\AbstractTask[] */
protected array $tasks_long;
protected TaskCollection $taskCollection;
public function __construct(
SettingsRepository $settingsRepo,
Logger $logger,
LockManager $lockManager,
array $tasks_nowplaying,
array $tasks_short,
array $tasks_medium,
array $tasks_long
TaskCollection $taskCollection
) {
$this->settingsRepo = $settingsRepo;
$this->logger = $logger;
$this->lockManager = $lockManager;
$this->tasks_nowplaying = $tasks_nowplaying;
$this->tasks_short = $tasks_short;
$this->tasks_medium = $tasks_medium;
$this->tasks_long = $tasks_long;
$this->taskCollection = $taskCollection;
}
/**
@ -58,21 +41,7 @@ class Runner
*/
public function syncNowplaying($force = false): void
{
$this->logger->info('Running Now Playing sync task');
$this->initSync(600);
$lock = $this->lockManager->getLock('sync_nowplaying', 600, $force);
$lock->run(function () use ($force) {
foreach ($this->tasks_nowplaying as $task) {
$this->runTimer(get_class($task), function () use ($task, $force) {
/** @var Task\AbstractTask $task */
$task->run($force);
});
}
$this->settingsRepo->setSetting(Entity\Settings::NOWPLAYING_LAST_RUN, time());
});
$this->runSyncTask(TaskCollection::SYNC_NOWPLAYING);
}
/**
@ -83,21 +52,7 @@ class Runner
*/
public function syncShort($force = false): void
{
$this->logger->info('Running 1-minute sync task');
$this->initSync(600);
$lock = $this->lockManager->getLock('sync_short', 600, $force);
$lock->run(function () use ($force) {
foreach ($this->tasks_short as $task) {
$this->runTimer(get_class($task), function () use ($task, $force) {
/** @var Task\AbstractTask $task */
$task->run($force);
});
}
$this->settingsRepo->setSetting(Entity\Settings::SHORT_SYNC_LAST_RUN, time());
});
$this->runSyncTask(TaskCollection::SYNC_SHORT);
}
/**
@ -108,21 +63,7 @@ class Runner
*/
public function syncMedium($force = false): void
{
$this->logger->info('Running 5-minute sync task');
$this->initSync(600);
$lock = $this->lockManager->getLock('sync_medium', 600, $force);
$lock->run(function () use ($force) {
foreach ($this->tasks_medium as $task) {
$this->runTimer(get_class($task), function () use ($task, $force) {
/** @var Task\AbstractTask $task */
$task->run($force);
});
}
$this->settingsRepo->setSetting(Entity\Settings::MEDIUM_SYNC_LAST_RUN, time());
});
$this->runSyncTask(TaskCollection::SYNC_MEDIUM);
}
/**
@ -133,20 +74,55 @@ class Runner
*/
public function syncLong($force = false): void
{
$this->logger->info('Running 1-hour sync task');
$this->initSync(1800);
$this->runSyncTask(TaskCollection::SYNC_LONG);
}
$lock = $this->lockManager->getLock('sync_medium', 1800, $force);
public function runSyncTask(string $type, bool $force = false): void
{
// Immediately halt if setup is not complete.
if ($this->settingsRepo->getSetting(Entity\Settings::SETUP_COMPLETE, 0) == 0) {
die('Setup not complete; halting synchronized task.');
}
$lock->run(function () use ($force) {
foreach ($this->tasks_long as $task) {
$this->runTimer(get_class($task), function () use ($task, $force) {
/** @var Task\AbstractTask $task */
$task->run($force);
});
$allSyncInfo = $this->getSyncTimes();
$syncInfo = $allSyncInfo[$type];
set_time_limit($syncInfo['timeout']);
ini_set('memory_limit', '256M');
if (Settings::getInstance()->isCli()) {
error_reporting(E_ALL & ~E_STRICT & ~E_NOTICE);
ini_set('display_errors', 1);
ini_set('log_errors', 1);
}
$this->logger->info(sprintf('Running sync task: %s', $syncInfo['name']));
$lock = $this->lockManager->getLock('sync_' . $type, $syncInfo['timeout'], $force);
$lock->run(function () use ($syncInfo, $type, $force) {
$tasks = $this->taskCollection->getTasks($type);
foreach ($tasks as $task) {
// Filter namespace name
$timer_description_parts = explode("\\", get_class($task));
$timer_description = array_pop($timer_description_parts);
$start_time = microtime(true);
$task->run($force);
$end_time = microtime(true);
$time_diff = $end_time - $start_time;
$this->logger->debug(sprintf(
'Timer "%s" completed in %01.3f second(s).',
$timer_description,
round($time_diff, 3)
));
}
$this->settingsRepo->setSetting(Entity\Settings::LONG_SYNC_LAST_RUN, time());
$this->settingsRepo->setSetting($syncInfo['lastRunSetting'], time());
});
}
@ -155,74 +131,46 @@ class Runner
$this->settingsRepo->clearCache();
$syncs = [
'nowplaying' => [
TaskCollection::SYNC_NOWPLAYING => [
'name' => __('Now Playing Data'),
'latest' => $this->settingsRepo->getSetting(Entity\Settings::NOWPLAYING_LAST_RUN, 0),
'contents' => [
__('Now Playing Data'),
],
'lastRunSetting' => Entity\Settings::NOWPLAYING_LAST_RUN,
'timeout' => 600,
],
'short' => [
TaskCollection::SYNC_SHORT => [
'name' => __('1-Minute Sync'),
'latest' => $this->settingsRepo->getSetting(Entity\Settings::SHORT_SYNC_LAST_RUN, 0),
'contents' => [
__('Song Requests Queue'),
],
'lastRunSetting' => Entity\Settings::SHORT_SYNC_LAST_RUN,
'timeout' => 600,
],
'medium' => [
TaskCollection::SYNC_MEDIUM => [
'name' => __('5-Minute Sync'),
'latest' => $this->settingsRepo->getSetting(Entity\Settings::MEDIUM_SYNC_LAST_RUN, 0),
'contents' => [
__('Check Media Folders'),
],
'lastRunSetting' => Entity\Settings::MEDIUM_SYNC_LAST_RUN,
'timeout' => 600,
],
'long' => [
TaskCollection::SYNC_LONG => [
'name' => __('1-Hour Sync'),
'latest' => $this->settingsRepo->getSetting(Entity\Settings::LONG_SYNC_LAST_RUN, 0),
'contents' => [
__('Analytics/Statistics'),
__('Cleanup'),
],
'lastRunSetting' => Entity\Settings::LONG_SYNC_LAST_RUN,
'timeout' => 1800,
],
];
foreach ($syncs as &$sync_info) {
$sync_info['latest'] = $this->settingsRepo->getSetting($sync_info['lastRunSetting'], 0);
$sync_info['diff'] = time() - $sync_info['latest'];
}
return $syncs;
}
protected function initSync($script_timeout = 60): void
{
// Immediately halt if setup is not complete.
if ($this->settingsRepo->getSetting(Entity\Settings::SETUP_COMPLETE, 0) == 0) {
die('Setup not complete; halting synchronized task.');
}
set_time_limit($script_timeout);
ini_set('memory_limit', '256M');
if (Settings::getInstance()->isCli()) {
error_reporting(E_ALL & ~E_STRICT & ~E_NOTICE);
ini_set('display_errors', 1);
ini_set('log_errors', 1);
}
}
protected function runTimer($timer_description, callable $timed_function): void
{
// Filter namespace name
$timer_description_parts = explode("\\", $timer_description);
$timer_description = array_pop($timer_description_parts);
$start_time = microtime(true);
$timed_function();
$end_time = microtime(true);
$time_diff = $end_time - $start_time;
$this->logger->debug('Timer "' . $timer_description . '" completed in ' . round($time_diff, 3) . ' second(s).');
}
}

View File

@ -330,7 +330,9 @@ class NowPlaying extends AbstractTask implements EventSubscriberInterface
$np_event->resolveUrls($this->api_utils->getRouter()->getBaseUrl(false));
$np_event->cache = 'event';
$webhook_event = new SendWebhooks($station, $np_event, $np_old, $standalone);
$webhook_event = new SendWebhooks($station, $np_event, $standalone);
$webhook_event->computeTriggers($np_old);
$this->event_dispatcher->dispatch($webhook_event);
$logger->popProcessor();

View File

@ -0,0 +1,26 @@
<?php
namespace App\Sync;
use App\Sync\Task\AbstractTask;
use Doctrine\Common\Collections\ArrayCollection;
class TaskCollection extends ArrayCollection
{
public const SYNC_NOWPLAYING = 'nowplaying';
public const SYNC_SHORT = 'short';
public const SYNC_MEDIUM = 'medium';
public const SYNC_LONG = 'long';
/**
* @param string $type
*
* @return AbstractTask[]
*/
public function getTasks(string $type): array
{
return $this->get($type);
}
}

View File

@ -0,0 +1,17 @@
<?php
namespace App\Webhook;
use App\Webhook\Connector\ConnectorInterface;
use Doctrine\Common\Collections\ArrayCollection;
class ConnectorCollection extends ArrayCollection
{
public function getConnector(string $name): ConnectorInterface
{
if ($this->offsetExists($name)) {
return $this->get($name);
}
throw new \InvalidArgumentException('Invalid web hook connector type specified.');
}
}

View File

@ -5,29 +5,41 @@ use App\ApiUtilities;
use App\Entity;
use App\Event\SendWebhooks;
use App\Exception;
use App\Message;
use App\Settings;
use InvalidArgumentException;
use Doctrine\ORM\EntityManagerInterface;
use Monolog\Handler\TestHandler;
use Monolog\Logger;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\MessageBus;
/**
* @package App\Webhook
* @see WebhookProvider
*/
class Dispatcher implements EventSubscriberInterface
{
protected Logger $logger;
/** @var Connector\ConnectorInterface[] */
protected array $connectors;
protected MessageBus $messageBus;
protected LocalWebhookHandler $localHandler;
protected ConnectorCollection $connectors;
protected ApiUtilities $apiUtils;
public function __construct(Logger $logger, ApiUtilities $apiUtils, array $connectors)
{
protected EntityManagerInterface $em;
public function __construct(
Logger $logger,
EntityManagerInterface $em,
MessageBus $messageBus,
ApiUtilities $apiUtils,
LocalWebhookHandler $localHandler,
ConnectorCollection $connectors
) {
$this->logger = $logger;
$this->em = $em;
$this->messageBus = $messageBus;
$this->apiUtils = $apiUtils;
$this->localHandler = $localHandler;
$this->connectors = $connectors;
}
@ -45,6 +57,31 @@ class Dispatcher implements EventSubscriberInterface
];
}
/**
* Handle event dispatch.
*
* @param Message\AbstractMessage $message
*/
public function __invoke(Message\AbstractMessage $message)
{
if ($message instanceof Message\DispatchWebhookMessage) {
$webhook = $this->em->find(Entity\StationWebhook::class, $message->webhook_id);
if (!($webhook instanceof Entity\StationWebhook)) {
return;
}
$event = new SendWebhooks(
$webhook->getStation(),
$message->np,
$message->is_standalone,
$message->triggers
);
$this->dispatchWebhook($event, $webhook);
}
}
/**
* Always dispatch the special "local" updater task for standalone updates.
*
@ -52,9 +89,7 @@ class Dispatcher implements EventSubscriberInterface
*/
public function localDispatch(SendWebhooks $event): void
{
/** @var Connector\Local $connector_obj */
$connector_obj = $this->connectors[Connector\Local::NAME];
$connector_obj->dispatch($event);
$this->localHandler->dispatch($event);
}
/**
@ -70,39 +105,28 @@ class Dispatcher implements EventSubscriberInterface
}
// Assemble list of webhooks for the station
$station_webhooks = $event->getStation()->getWebhooks();
if (0 === $station_webhooks->count()) {
$stationWebhooks = $event->getStation()->getWebhooks();
if (0 === $stationWebhooks->count()) {
return;
}
/** @var Entity\StationWebhook[] $connectors */
$connectors = [];
foreach ($station_webhooks as $webhook) {
/** @var Entity\StationWebhook[] $enabledWebhooks */
$enabledWebhooks = $stationWebhooks->filter(function ($webhook) {
/** @var Entity\StationWebhook $webhook */
if ($webhook->isEnabled()) {
$connectors[] = $webhook;
}
}
return $webhook->isEnabled();
});
$this->logger->debug('Triggering events: ' . implode(', ', $event->getTriggers()));
// Trigger all appropriate webhooks.
foreach ($connectors as $connector) {
if (!isset($this->connectors[$connector->getType()])) {
$this->logger->error(sprintf('Webhook connector "%s" does not exist; skipping.',
$connector->getType()));
continue;
}
foreach ($enabledWebhooks as $webhook) {
$message = new Message\DispatchWebhookMessage;
$message->webhook_id = $webhook->getId();
$message->np = $event->getNowPlaying();
$message->triggers = $event->getTriggers();
$message->is_standalone = $event->isStandalone();
/** @var Connector\ConnectorInterface $connector_obj */
$connector_obj = $this->connectors[$connector->getType()];
if ($connector_obj->shouldDispatch($event, $connector)) {
$this->logger->debug(sprintf('Dispatching connector "%s".', $connector->getType()));
$connector_obj->dispatch($event, $connector);
}
$this->messageBus->dispatch($message);
}
}
@ -118,43 +142,31 @@ class Dispatcher implements EventSubscriberInterface
*/
public function testDispatch(Entity\Station $station, Entity\StationWebhook $webhook): TestHandler
{
$webhook_type = $webhook->getType();
if (!isset($this->connectors[$webhook_type])) {
throw new Exception(sprintf('Webhook connector "%s" does not exist; skipping.', $webhook_type));
}
$handler = new TestHandler(Logger::DEBUG, false);
$this->logger->pushHandler($handler);
/** @var Connector\ConnectorInterface $connector_obj */
$connector_obj = $this->connectors[$webhook_type];
$np = $station->getNowplaying();
$np->resolveUrls($this->apiUtils->getRouter()->getBaseUrl(false));
$np->cache = 'event';
$event = new SendWebhooks($station, $np);
$connector_obj->dispatch($event, $webhook);
$this->dispatchWebhook($event, $webhook);
$this->logger->popHandler();
return $handler;
}
/**
* Directly access a webhook connector of the specified type.
*
* @param string $type
*
* @return Connector\ConnectorInterface
*/
public function getConnector($type): Connector\ConnectorInterface
{
if (isset($this->connectors[$type])) {
return $this->connectors[$type];
}
protected function dispatchWebhook(
SendWebhooks $event,
Entity\StationWebhook $webhook
): void {
$connectorObj = $this->connectors->getConnector($webhook->getType());
throw new InvalidArgumentException('Invalid web hook connector type specified.');
if ($connectorObj->shouldDispatch($event, $webhook)) {
$this->logger->debug(sprintf('Dispatching connector "%s".', $webhook->getType()));
$connectorObj->dispatch($event, $webhook);
}
}
}

View File

@ -1,5 +1,5 @@
<?php
namespace App\Webhook\Connector;
namespace App\Webhook;
use App\Entity;
use App\Event\SendWebhooks;
@ -13,11 +13,11 @@ use Psr\SimpleCache\CacheInterface;
use RuntimeException;
use const JSON_PRETTY_PRINT;
class Local
class LocalWebhookHandler
{
public const NAME = 'local';
protected Client $http_client;
protected Client $httpClient;
protected Logger $logger;
@ -25,20 +25,20 @@ class Local
protected CacheInterface $cache;
protected Entity\Repository\SettingsRepository $settings_repo;
protected Entity\Repository\SettingsRepository $settingsRepo;
public function __construct(
Logger $logger,
Client $http_client,
Client $httpClient,
Database $influx,
CacheInterface $cache,
Entity\Repository\SettingsRepository $settings_repo
Entity\Repository\SettingsRepository $settingsRepo
) {
$this->logger = $logger;
$this->http_client = $http_client;
$this->httpClient = $httpClient;
$this->influx = $influx;
$this->cache = $cache;
$this->settings_repo = $settings_repo;
$this->settingsRepo = $settingsRepo;
}
public function dispatch(SendWebhooks $event): void
@ -77,7 +77,7 @@ class Local
}
$this->cache->set('api_nowplaying_data', $np_new, 120);
$this->settings_repo->setSetting('nowplaying', $np_new);
$this->settingsRepo->setSetting('nowplaying', $np_new);
}
}
@ -118,7 +118,7 @@ class Local
if (NChan::isSupported()) {
$this->logger->debug('Dispatching Nchan notification...');
$this->http_client->post('http://localhost:9010/pub/' . urlencode($station->getShortName()), [
$this->httpClient->post('http://localhost:9010/pub/' . urlencode($station->getShortName()), [
'json' => $np,
]);
}