Implement message queue and worker for media processing/import tasks.

This commit is contained in:
Buster "Silver Eagle" Neece 2018-12-30 04:18:48 -06:00
parent 2dd69c241b
commit 13e0c744c6
17 changed files with 467 additions and 41 deletions

View File

@ -13,16 +13,20 @@
"ext-json": "*",
"ext-redis": "*",
"ext-PDO": "*",
,
"azuracast/azuracore": "dev-master",
"azuracast/azuraforms": "dev-master",
"azuracast/nowplaying": "dev-master",
"bernard/bernard": "dev-master",
"cakephp/chronos": "^1.1",
"doctrine/annotations": "^1.6",
"gettext/gettext": "^4.4",
"guzzlehttp/oauth-subscriber": "^0.3.0",
"influxdb/influxdb-php": "^1.14.3",
"james-heinrich/getid3": "dev-master",
"league/flysystem": "^1.0",
"league/flysystem-aws-s3-v3": "^1.0",
"league/flysystem-cached-adapter": "^1.0",
"lstrojny/fxmlrpc": "^0.14.0",
"maxmind-db/reader": "~1.0",
"mobiledetect/mobiledetectlib": "^2.8",
@ -37,9 +41,6 @@
"symfony/serializer": "^4.2",
"symfony/validator": "^4.2",
"ramsey/uuid": "^3.8",
"league/flysystem": "^1.0",
"league/flysystem-aws-s3-v3": "^1.0",
"league/flysystem-cached-adapter": "^1.0",
"wikimedia/composer-merge-plugin": "^1.4",
"zircote/swagger-php": "^3.0"
},

175
composer.lock generated
View File

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "6b5e3ad06e662f08423b65d1fed3687b",
"content-hash": "57ed1ca392dc17245799a7c90fd53736",
"packages": [
{
"name": "aws/aws-sdk-php",
@ -234,6 +234,178 @@
"description": "A lightweight PHP adapter for viewing the current now playing data in Icecast and SHOUTcast 1/2. A part of the AzuraCast software suite.",
"time": "2018-09-13T17:15:20+00:00"
},
{
"name": "beberlei/assert",
"version": "v2.9.6",
"source": {
"type": "git",
"url": "https://github.com/beberlei/assert.git",
"reference": "ec9e4cf0b63890edce844ee3922e2b95a526e936"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/beberlei/assert/zipball/ec9e4cf0b63890edce844ee3922e2b95a526e936",
"reference": "ec9e4cf0b63890edce844ee3922e2b95a526e936",
"shasum": ""
},
"require": {
"ext-mbstring": "*",
"php": ">=5.3"
},
"require-dev": {
"friendsofphp/php-cs-fixer": "^2.1.1",
"phpunit/phpunit": "^4.8.35|^5.7"
},
"type": "library",
"autoload": {
"psr-4": {
"Assert\\": "lib/Assert"
},
"files": [
"lib/Assert/functions.php"
]
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"BSD-2-Clause"
],
"authors": [
{
"name": "Benjamin Eberlei",
"email": "kontakt@beberlei.de",
"role": "Lead Developer"
},
{
"name": "Richard Quadling",
"email": "rquadling@gmail.com",
"role": "Collaborator"
}
],
"description": "Thin assertion library for input validation in business models.",
"keywords": [
"assert",
"assertion",
"validation"
],
"time": "2018-06-11T17:15:25+00:00"
},
{
"name": "bernard/bernard",
"version": "dev-master",
"source": {
"type": "git",
"url": "https://github.com/bernardphp/bernard.git",
"reference": "3ff88921bea87186d1baffc93708dd7aee4e963e"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/bernardphp/bernard/zipball/3ff88921bea87186d1baffc93708dd7aee4e963e",
"reference": "3ff88921bea87186d1baffc93708dd7aee4e963e",
"shasum": ""
},
"require": {
"beberlei/assert": "^2.1",
"bernard/normalt": "^1.0",
"php": "^5.6 || ^7.0",
"symfony/event-dispatcher": "^3.0 || ^4.0"
},
"require-dev": {
"aws/aws-sdk-php": "^3.0",
"doctrine/dbal": "^2.5",
"doctrine/instantiator": "^1.0.5",
"iron-io/iron_mq": "^4.0",
"leanphp/phpspec-code-coverage": "^3.0 || ^4.0",
"pda/pheanstalk": "^3.0",
"php-amqplib/php-amqplib": "^2.5",
"phpspec/phpspec": "^3.0 || ^4.0",
"phpunit/phpunit": "^5.7 || ^6.0 || ^7.0",
"predis/predis": "^1.0",
"psr/container": "^1.0",
"psr/log": "^1.0",
"queue-interop/amqp-interop": "^0.6",
"queue-interop/queue-interop": "^0.6",
"symfony/console": "^3.0 || ^4.0"
},
"suggest": {
"aws/aws-sdk-php": "Allow sending messages to AWS services like Simple Queue Service",
"doctrine/dbal": "Allow sending messages to simulated message queue in a database via doctrine dbal",
"iron-io/iron_mq": "Allow sending messages to IronMQ",
"mongodb/mongodb": "Allow sending messages to a MongoDB server via PHP Driver",
"pda/pheanstalk": "Allow sending messages to Beanstalk using pheanstalk",
"php-amqplib/php-amqplib": "Allow sending messages to an AMQP server using php-amqplib",
"predis/predis": "Allow sending messages to Redis using predis",
"queue-interop/amqp-interop": "Allow sending messages using amqp interop compatible transports",
"queue-interop/queue-interop": "Allow sending messages using queue interop compatible transports"
},
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "1.0.x-dev"
}
},
"autoload": {
"psr-4": {
"Bernard\\": "src/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"description": "Message queue abstraction layer",
"homepage": "https://github.com/bernardphp/bernard",
"keywords": [
"bernard",
"message",
"message queue",
"queue"
],
"time": "2018-06-24T11:41:28+00:00"
},
{
"name": "bernard/normalt",
"version": "v1.2.0",
"source": {
"type": "git",
"url": "https://github.com/bernardphp/normalt.git",
"reference": "9ed9b6bbc657bb1e5a52ff4da6607e32152b390e"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/bernardphp/normalt/zipball/9ed9b6bbc657bb1e5a52ff4da6607e32152b390e",
"reference": "9ed9b6bbc657bb1e5a52ff4da6607e32152b390e",
"shasum": ""
},
"require": {
"php": "^5.6||^7.0",
"symfony/serializer": "^2.3 || ^3.0 || ^4.0"
},
"require-dev": {
"doctrine/common": "^2.1",
"phpspec/phpspec": "^2.5"
},
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "1.1-dev"
}
},
"autoload": {
"psr-4": {
"Normalt\\": "src/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"description": "Normalt is a extension to Symfony Serializer that only implements the Normalization part",
"keywords": [
"denormalization",
"normalization"
],
"time": "2018-01-13T09:47:09+00:00"
},
{
"name": "cakephp/chronos",
"version": "1.2.3",
@ -7319,6 +7491,7 @@
"azuracast/azuraforms": 20,
"azuracast/nowplaying": 20,
"james-heinrich/getid3": 20,
"bernard/bernard": 20,
"roave/security-advisories": 20
},
"prefer-stable": true,

View File

@ -38,6 +38,7 @@ return function (\Azura\EventDispatcher $dispatcher)
// Maintenance
new Command\RestartRadio,
new Command\Sync,
new Command\ProcessMessageQueue,
new Command\ReprocessMedia,
new Command\GenerateApiDocs,

6
config/messagequeue.php Normal file
View File

@ -0,0 +1,6 @@
<?php
// An array of message queue types and the DI classes responsible for handling them.
return [
\App\Message\AddNewMedia::class => \App\Sync\Task\Media::class,
\App\Message\ReprocessMedia::class => \App\Sync\Task\Media::class,
];

View File

@ -208,6 +208,7 @@ return function (\Azura\Container $di)
return $view;
});
// MaxMind (IP Geolocation database for listener metadata)
$di[\MaxMind\Db\Reader::class] = function($di) {
$mmdb_path = dirname(APP_INCLUDE_ROOT).'/geoip/GeoLite2-City.mmdb';
return new \MaxMind\Db\Reader($mmdb_path);
@ -225,6 +226,39 @@ return function (\Azura\Container $di)
return $dispatcher;
});
$di[\App\MessageQueue::class] = function($di) {
// Build QueueFactory
/** @var \Redis $redis */
$redis = $di[\Redis::class];
$redis->select(4);
$driver = new \Bernard\Driver\PhpRedis\Driver($redis);
$serializer = new \Bernard\Serializer(new \Normalt\Normalizer\AggregateNormalizer([
new \Bernard\Normalizer\EnvelopeNormalizer(),
new \Symfony\Component\Serializer\Normalizer\PropertyNormalizer()
]));
$queue_factory = new \Bernard\QueueFactory\PersistentFactory($driver, $serializer);
// Event dispatcher
$dispatcher = new \Symfony\Component\EventDispatcher\EventDispatcher;
$dispatcher->addSubscriber(new \Bernard\EventListener\LoggerSubscriber($di[\Monolog\Logger::class]));
// Build Producer
$producer = new \Bernard\Producer($queue_factory, $dispatcher);
// Build Consumer
$receivers = require __DIR__.'/messagequeue.php';
$router = new \Bernard\Router\ReceiverMapRouter($receivers, new \Bernard\Router\ContainerReceiverResolver($di));
$consumer = new Bernard\Consumer($router, $dispatcher);
return new \App\MessageQueue(
$queue_factory,
$producer,
$consumer
);
};
//
// AzuraCast-specific dependencies
//

View File

@ -0,0 +1,44 @@
<?php
namespace App\Console\Command;
use App\MessageQueue;
use Azura\Console\Command\CommandAbstract;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class ProcessMessageQueue extends CommandAbstract
{
/**
* {@inheritdoc}
*/
protected function configure()
{
$this->setName('queue:process')
->setDescription('Process the message queue.')
->addArgument(
'runtime',
InputArgument::OPTIONAL,
'The total length of time (in seconds) to spend processing requests before exiting.',
0
);
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$runtime = (int)$input->getArgument('runtime');
if ($runtime < 1) {
$runtime = \PHP_INT_MAX;
}
/** @var MessageQueue $message_queue */
$message_queue = $this->get(MessageQueue::class);
$message_queue->consume([
'max-runtime' => $runtime,
]);
}
}

View File

@ -151,7 +151,7 @@ class StationMediaRepository extends Repository
$media_mtime = $fs->getTimestamp($media_uri);
// No need to update if all of these conditions are true.
if (!$force && $media->songMatches() && $media_mtime <= $media->getMtime()) {
if (!$force && !$media->needsReprocessing($media_mtime)) {
return false;
}

View File

@ -523,6 +523,23 @@ class StationMedia
$this->custom_fields = $custom_fields;
}
/**
* Indicate whether this media needs reprocessing given certain factors.
*
* @param int $current_mtime
* @return bool
*/
public function needsReprocessing($current_mtime = 0): bool
{
if ($current_mtime > $this->mtime) {
return true;
}
if (!$this->songMatches()) {
return true;
}
return false;
}
/**
* Assemble a list of annotations for LiquidSoap.
*

View File

@ -0,0 +1,15 @@
<?php
namespace App\Message;
use Bernard\Message;
abstract class AbstractMessage implements Message
{
/**
* @return string
*/
public function getName(): string
{
return get_called_class();
}
}

View File

@ -0,0 +1,11 @@
<?php
namespace App\Message;
class AddNewMedia extends AbstractMessage
{
/** @var int The numeric identifier for the station. */
public $station_id;
/** @var string The relative path for the media file to be processed. */
public $path;
}

View File

@ -0,0 +1,11 @@
<?php
namespace App\Message;
class ReprocessMedia extends AbstractMessage
{
/** @var int The numeric identifier for the StationMedia record being processed. */
public $media_id;
/** @var bool Whether to force reprocessing even if checks indicate it is not necessary. */
public $force = false;
}

78
src/MessageQueue.php Normal file
View File

@ -0,0 +1,78 @@
<?php
namespace App;
use Bernard\Consumer;
use Bernard\Message;
use Bernard\Producer;
use Bernard\Queue;
use Bernard\QueueFactory;
class MessageQueue
{
const GLOBAL_QUEUE_NAME = 'azuracast';
/** @var QueueFactory */
protected $queues;
/** @var Producer */
protected $producer;
/** @var Consumer */
protected $consumer;
/**
* @param QueueFactory $queues
* @param Producer $producer
* @param Consumer $consumer
*/
public function __construct(QueueFactory $queues, Producer $producer, Consumer $consumer)
{
$this->queues = $queues;
$this->producer = $producer;
$this->consumer = $consumer;
}
/**
* @return Queue
*/
public function getGlobalQueue(): Queue
{
return $this->queues->create(self::GLOBAL_QUEUE_NAME);
}
/**
* @return Producer
*/
public function getProducer(): Producer
{
return $this->producer;
}
/**
* @return Consumer
*/
public function getConsumer(): Consumer
{
return $this->consumer;
}
/**
* Produce (send) a message to the queue.
*
* @param Message $message
*/
public function produce(Message $message): void
{
$this->producer->produce($message, self::GLOBAL_QUEUE_NAME);
}
/**
* Consume (receive) messages from the queue.
*
* @param array $options
*/
public function consume(array $options = []): void
{
$this->consumer->consume($this->queues->create(self::GLOBAL_QUEUE_NAME), $options);
}
}

View File

@ -68,7 +68,8 @@ class SyncProvider implements ServiceProviderInterface
return new Task\Media(
$di[\Doctrine\ORM\EntityManager::class],
$di[\Monolog\Logger::class],
$di[\App\Radio\Filesystem::class]
$di[\App\Radio\Filesystem::class],
$di[\App\MessageQueue::class]
);
};

View File

@ -1,33 +1,78 @@
<?php
namespace App\Sync\Task;
use App\MessageQueue;
use App\Message;
use App\Radio\Filesystem;
use Doctrine\Common\Persistence\Mapping\MappingException;
use Doctrine\ORM\EntityManager;
use App\Entity;
use Monolog\Logger;
use Symfony\Component\Finder\Finder;
use Symfony\Component\VarDumper\VarDumper;
class Media extends AbstractTask
{
/** @var Filesystem */
protected $filesystem;
/** @var MessageQueue */
protected $message_queue;
/**
* @param EntityManager $em
* @param Logger $logger
* @param Filesystem $filesystem
* @param MessageQueue $message_queue
*
* @see \App\Provider\SyncProvider
*/
public function __construct(EntityManager $em, Logger $logger, Filesystem $filesystem)
{
public function __construct(
EntityManager $em,
Logger $logger,
Filesystem $filesystem,
MessageQueue $message_queue
) {
parent::__construct($em, $logger);
$this->filesystem = $filesystem;
$this->message_queue = $message_queue;
}
/**
* Handle event dispatch.
*
* @param Message\AbstractMessage $message
* @throws MappingException
*/
public function __invoke(Message\AbstractMessage $message)
{
/** @var Entity\Repository\StationMediaRepository $media_repo */
$media_repo = $this->em->getRepository(Entity\StationMedia::class);
try {
if ($message instanceof Message\ReprocessMedia) {
$media_row = $media_repo->find($message->media_id);
if ($media_row instanceof Entity\StationMedia) {
$media_repo->processMedia($media_row, $message->force);
$this->em->flush($media_row);
}
} else if ($message instanceof Message\AddNewMedia) {
$station = $this->em->find(Entity\Station::class, $message->station_id);
if ($station instanceof Entity\Station) {
$media_repo->getOrCreate($station, $message->path);
}
}
} finally {
$this->em->clear();
}
}
/**
* @inheritdoc
*/
public function run($force = false): void
{
$station_repo = $this->em->getRepository(Entity\Station::class);
@ -57,10 +102,8 @@ class Media extends AbstractTask
continue;
}
$path_short = $file['path'];
$path_hash = md5($path_short);
$music_files[$path_hash] = $path_short;
$path_hash = md5($file['path']);
$music_files[$path_hash] = $file;
}
$stats['total_files'] = count($music_files);
@ -89,20 +132,20 @@ class Media extends AbstractTask
$force_reprocess = true;
}
try {
$processed = $media_repo->processMedia($media_row, $force_reprocess);
} catch (\App\Exception\MediaProcessing $e) {
$this->logger->error(sprintf('Error processing media ID %d: %s', $media_row->getId(), $e->getMessage()));
continue;
} finally {
unset($music_files[$path_hash]);
}
$file_info = $music_files[$path_hash];
if ($force_reprocess || $media_row->needsReprocessing($file_info['timestamp'])) {
$message = new Message\ReprocessMedia;
$message->media_id = $media_row->getId();
$message->force = $force_reprocess;
$this->message_queue->produce($message);
if ($processed) {
$stats['updated']++;
} else {
$stats['unchanged']++;
}
unset($music_files[$path_hash]);
} else {
// Delete the now-nonexistent media item.
$this->em->remove($media_row);
@ -121,24 +164,14 @@ class Media extends AbstractTask
$this->_flushAndClearRecords();
// Create files that do not currently exist.
$i = 0;
foreach ($music_files as $path_hash => $new_music_file) {
$message = new Message\AddNewMedia;
$message->station_id = $station->getId();
$message->path = $new_music_file['path'];
foreach ($music_files as $new_file_path) {
try {
$media_repo->getOrCreate($station, $new_file_path);
} catch (\App\Exception\MediaProcessing $e) {
$this->logger->error(sprintf('Error creating media from path %s: %s', $new_file_path, $e->getMessage()));
continue;
}
$this->message_queue->produce($message);
$stats['created']++;
if ($i % $records_per_batch === 0) {
$this->_flushAndClearRecords();
}
++$i;
}
$this->_flushAndClearRecords();
@ -149,7 +182,7 @@ class Media extends AbstractTask
/**
* Flush the Doctrine Entity Manager and clear associated records to save memory space.
*/
protected function _flushAndClearRecords()
protected function _flushAndClearRecords(): void
{
$this->em->flush();

View File

@ -32,7 +32,7 @@ else
fi
APP_ENV="${APP_ENV:-production}"
UPDATE_REVISION="${UPDATE_REVISION:-33}"
UPDATE_REVISION="${UPDATE_REVISION:-34}"
echo "Updating AzuraCast (Environment: $APP_ENV, Update revision: $UPDATE_REVISION)"

View File

@ -1,5 +1,6 @@
0 * * * * php {{ util_base }}/cli.php sync:run long
*/5 * * * * php {{ util_base }}/cli.php sync:run medium
*/5 * * * * php {{ util_base }}/cli.php queue:process 275
* * * * * php {{ util_base }}/cli.php sync:run short
* * * * * php {{ util_base }}/cli.php sync:run nowplaying
* * * * * sleep 15; php {{ util_base }}/cli.php sync:run nowplaying

View File

@ -24,5 +24,5 @@
- { role: ufw, when: update_revision|int < 12 }
- { role: maxmind, when: update_revision|int < 24 }
- { role: services, when: update_revision|int < 13 }
- { role: azuracast-cron, when: update_revision|int < 2 }
- { role: azuracast-cron, when: update_revision|int < 34 }
- azuracast-setup