More Meilisearch infrastructure work.

This commit is contained in:
Buster Neece 2023-01-29 08:15:26 -06:00
parent bb499874ca
commit 81d16d619a
No known key found for this signature in database
GPG Key ID: F1D2E64A0005E80E
20 changed files with 620 additions and 226 deletions

View File

@ -20,5 +20,8 @@ return [
Message\DispatchWebhookMessage::class => App\Webhook\Dispatcher::class,
Message\TestWebhookMessage::class => App\Webhook\Dispatcher::class,
Message\Meilisearch\AddMediaMessage::class => App\Service\Meilisearch\MessageHandler::class,
Message\Meilisearch\UpdatePlaylistsMessage::class => App\Service\Meilisearch\MessageHandler::class,
Mailer\Messenger\SendEmailMessage::class => Mailer\Messenger\MessageHandler::class,
];

View File

@ -125,6 +125,7 @@ return [
// $config->setSQLLogger(new Doctrine\DBAL\Logging\EchoSQLLogger);
$config->addCustomNumericFunction('RAND', DoctrineExtensions\Query\Mysql\Rand::class);
$config->addCustomStringFunction('FIELD', DoctrineExtensions\Query\Mysql\Field::class);
if (!Doctrine\DBAL\Types\Type::hasType('carbon_immutable')) {
Doctrine\DBAL\Types\Type::addType('carbon_immutable', Carbon\Doctrine\CarbonImmutableType::class);

View File

@ -191,7 +191,6 @@ services:
- stereo_tool_install:/var/azuracast/servers/stereo_tool
- geolite_install:/var/azuracast/geoip
- sftpgo_data:/var/azuracast/sftpgo/persist
- meilisearch_data:/var/azuracast/meilisearch/persist
- backups:/var/azuracast/backups
- acme:/var/azuracast/acme
- db_data:/var/lib/mysql
@ -222,7 +221,6 @@ volumes:
stereo_tool_install: { }
geolite_install: { }
sftpgo_data: { }
meilisearch_data: { }
station_data: { }
www_uploads: { }
backups: { }

View File

@ -58,9 +58,9 @@
</a>
</template>
</template>
<template #cell(media_art)="row">
<template #cell(art)="row">
<a
:href="row.item.media_art"
:href="row.item.media.art"
class="album-art"
target="_blank"
data-fancybox="gallery"
@ -68,7 +68,7 @@
<img
class="media_manager_album_art"
:alt="$gettext('Album Art')"
:src="row.item.media_art"
:src="row.item.media.art"
>
</a>
</template>
@ -93,6 +93,7 @@ import {forEach} from 'lodash';
import Icon from '~/components/Common/Icon';
import PlayButton from "~/components/Common/PlayButton";
import {useTranslate} from "~/vendor/gettext";
import formatFileSize from "../../functions/formatFileSize";
const props = defineProps({
listUrl: {
@ -119,11 +120,29 @@ const {$gettext} = useTranslate();
let fields = [
{key: 'download_url', label: ' '},
{key: 'media_art', label: $gettext('Art')},
{key: 'media_title', label: $gettext('Title'), sortable: true, selectable: true},
{key: 'media_artist', label: $gettext('Artist'), sortable: true, selectable: true},
{key: 'media_album', label: $gettext('Album'), sortable: true, selectable: true, visible: false},
{key: 'playlist', label: $gettext('Playlist'), sortable: true, selectable: true, visible: false}
{key: 'art', label: $gettext('Art')},
{
key: 'title',
label: $gettext('Title'),
sortable: true,
selectable: true,
formatter: (value, key, item) => item.media.title,
},
{
key: 'artist',
label: $gettext('Artist'),
sortable: true,
selectable: true,
formatter: (value, key, item) => item.media.artist,
},
{
key: 'album',
label: $gettext('Album'),
sortable: true,
selectable: true,
visible: false,
formatter: (value, key, item) => item.media.album
}
];
forEach(props.customFields.slice(), (field) => {
@ -132,7 +151,8 @@ forEach(props.customFields.slice(), (field) => {
label: field.label,
sortable: true,
selectable: true,
visible: false
visible: false,
formatter: (value, key, item) => item.media.custom_fields[field.key]
});
});
</script>

View File

@ -7,6 +7,7 @@ namespace App\Controller\Api\Stations\Files;
use App\Doctrine\ReloadableEntityManagerInterface;
use App\Entity;
use App\Event\Radio\AnnotateNextSong;
use App\Flysystem\ExtendedFilesystemInterface;
use App\Flysystem\StationFilesystems;
use App\Http\Response;
use App\Http\ServerRequest;
@ -18,7 +19,6 @@ use App\Radio\Backend\Liquidsoap;
use App\Radio\Enums\BackendAdapters;
use App\Radio\Enums\LiquidsoapQueues;
use App\Utilities\File;
use App\Flysystem\ExtendedFilesystemInterface;
use Exception;
use InvalidArgumentException;
use League\Flysystem\StorageAttributes;
@ -154,7 +154,11 @@ final class BatchAction
/*
* NOTE: This iteration clears the entity manager.
*/
$mediaToReindex = [];
foreach ($this->batchUtilities->iterateMedia($storageLocation, $result->files) as $media) {
$mediaToReindex[] = $media->getIdRequired();
try {
$mediaPlaylists = $this->playlistMediaRepo->clearPlaylistsFromMedia($media, $station);
foreach ($mediaPlaylists as $playlistId => $playlistRecord) {
@ -193,6 +197,11 @@ final class BatchAction
$this->em->flush();
$this->batchUtilities->queuePlaylistsForUpdate(
$station,
$mediaToReindex
);
$this->writePlaylistChanges($station, $affectedPlaylists);
return $result;
@ -214,12 +223,18 @@ final class BatchAction
$this->batchUtilities->iterateUnprocessableMedia($storageLocation, $result->files),
];
$mediaToReindex = [];
foreach ($toMove as $iterator) {
foreach ($iterator as $record) {
/** @var Entity\Interfaces\PathAwareInterface $record */
$oldPath = $record->getPath();
$newPath = File::renameDirectoryInPath($oldPath, $from, $to);
if ($record instanceof Entity\StationMedia) {
$mediaToReindex[] = $record->getIdRequired();
}
try {
$fs->move($oldPath, $newPath);
$record->setPath($newPath);
@ -242,6 +257,10 @@ final class BatchAction
foreach ($toMove as $iterator) {
foreach ($iterator as $record) {
if ($record instanceof Entity\StationMedia) {
$mediaToReindex[] = $record->getIdRequired();
}
/** @var Entity\Interfaces\PathAwareInterface $record */
try {
$record->setPath(
@ -255,6 +274,10 @@ final class BatchAction
}
}
if (!empty($mediaToReindex)) {
$this->batchUtilities->queueMediaForIndex($storageLocation, $mediaToReindex);
}
return $result;
}

View File

@ -11,6 +11,7 @@ use App\Flysystem\StationFilesystems;
use App\Http\Response;
use App\Http\ServerRequest;
use App\Media\MediaProcessor;
use App\Message\Meilisearch\AddMediaMessage;
use App\Message\WritePlaylistFileMessage;
use App\OpenApi;
use App\Radio\Adapters;
@ -320,6 +321,9 @@ final class FilesController extends AbstractStationApiCrudController
$this->em->flush();
// Reindex file in search.
$this->reindexMedia($record);
// Handle playlist changes.
$backend = $this->adapters->getBackendAdapter($station);
if ($backend instanceof Liquidsoap) {
@ -396,6 +400,9 @@ final class FilesController extends AbstractStationApiCrudController
throw new InvalidArgumentException(sprintf('Record must be an instance of %s.', $this->entityClass));
}
// Trigger search reindex.
$this->reindexMedia($record);
// Delete the media file off the filesystem.
// Write new PLS playlist configuration.
foreach ($this->mediaRepo->remove($record, true) as $playlist_id => $playlist) {
@ -409,4 +416,15 @@ final class FilesController extends AbstractStationApiCrudController
}
}
}
private function reindexMedia(Entity\StationMedia $media): void
{
$indexMessage = new AddMediaMessage();
$indexMessage->storage_location_id = $media->getStorageLocation()->getIdRequired();
$indexMessage->media_ids = [
$media->getIdRequired(),
];
$indexMessage->include_playlists = true;
$this->messageBus->dispatch($indexMessage);
}
}

View File

@ -4,27 +4,20 @@ declare(strict_types=1);
namespace App\Controller\Api\Stations\OnDemand;
use App\Doctrine\ReadOnlyBatchIteratorAggregate;
use App\Entity;
use App\Http\Response;
use App\Http\RouterInterface;
use App\Http\ServerRequest;
use App\Paginator;
use App\Utilities;
use Doctrine\Common\Collections\ArrayCollection;
use Doctrine\Common\Collections\Criteria;
use App\Service\Meilisearch;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Http\Message\ResponseInterface;
use Symfony\Component\Cache\CacheItem;
use Symfony\Contracts\Cache\CacheInterface;
final class ListAction
final readonly class ListAction
{
public function __construct(
private readonly EntityManagerInterface $em,
private readonly Entity\Repository\CustomFieldRepository $customFieldRepo,
private readonly Entity\ApiGenerator\SongApiGenerator $songApiGenerator,
private readonly CacheInterface $cache,
private EntityManagerInterface $em,
private Entity\ApiGenerator\SongApiGenerator $songApiGenerator,
private Meilisearch $meilisearch
) {
}
@ -41,92 +34,50 @@ final class ListAction
->withJson(new Entity\Api\Error(403, __('This station does not support on-demand streaming.')));
}
$cacheKey = 'ondemand_' . $station->getId();
$trackList = $this->cache->get(
$cacheKey,
function (CacheItem $item) use ($station, $request) {
$item->expiresAfter(300);
return $this->buildTrackList($station, $request->getRouter());
}
);
if (!$this->meilisearch->isSupported()) {
return $response->withStatus(403)
->withJson(new Entity\Api\Error(403, __('This feature is not supported on this installation.')));
}
$trackList = new ArrayCollection($trackList);
$index = $this->meilisearch->getIndex($station->getMediaStorageLocation());
$queryParams = $request->getQueryParams();
$searchPhrase = trim($queryParams['searchPhrase'] ?? '');
if (!empty($searchPhrase)) {
$searchFields = [
'media_title',
'media_artist',
'media_album',
'playlist',
];
foreach (array_keys($this->customFieldRepo->getFieldIds()) as $customField) {
$searchFields[] = 'media_custom_fields_' . $customField;
}
$trackList = $trackList->filter(
function ($row) use ($searchFields, $searchPhrase) {
foreach ($searchFields as $searchField) {
if (false !== stripos($row[$searchField] ?? '', $searchPhrase)) {
return true;
}
}
return false;
}
);
}
$searchParams = [];
if (!empty($queryParams['sort'])) {
$sortField = (string)$queryParams['sort'];
$sortDirection = (string)($queryParams['sortOrder'] ?? Criteria::ASC);
$criteria = new Criteria();
$criteria->orderBy([$sortField => $sortDirection]);
$trackList = $trackList->matching($criteria);
$sortDirection = strtolower($queryParams['sortOrder'] ?? 'asc');
$searchParams['sort'] = [$sortField . ':' . $sortDirection];
}
return Paginator::fromCollection($trackList, $request)
->write($response);
}
$hydrateCallback = function (array $results) {
$ids = array_column($results, 'id');
/**
* @return mixed[]
*/
private function buildTrackList(Entity\Station $station, RouterInterface $router): array
{
$list = [];
$playlists = $this->em->createQuery(
<<<'DQL'
SELECT sp FROM App\Entity\StationPlaylist sp
WHERE sp.station = :station
AND sp.id IS NOT NULL
AND sp.is_enabled = 1
AND sp.include_in_on_demand = 1
DQL
)->setParameter('station', $station)
->getArrayResult();
foreach ($playlists as $playlist) {
$query = $this->em->createQuery(
return $this->em->createQuery(
<<<'DQL'
SELECT sm FROM App\Entity\StationMedia sm
WHERE sm.id IN (
SELECT spm.media_id
FROM App\Entity\StationPlaylistMedia spm
WHERE spm.playlist_id = :playlist_id
)
ORDER BY sm.artist ASC, sm.title ASC
SELECT sm
FROM App\Entity\StationMedia sm
WHERE sm.id IN (:ids)
ORDER BY FIELD(sm.id, :ids)
DQL
)->setParameter('playlist_id', $playlist['id']);
)->setParameter('ids', $ids)
->toIterable();
};
foreach (ReadOnlyBatchIteratorAggregate::fromQuery($query, 50) as $media) {
/** @var Entity\StationMedia $media */
$paginatorAdapter = $index->getOnDemandSearchPaginator(
$station,
$hydrateCallback,
$searchPhrase,
$searchParams,
);
$paginator = Paginator::fromAdapter($paginatorAdapter, $request);
$router = $request->getRouter();
$paginator->setPostprocessor(
function (Entity\StationMedia $media) use ($station, $router) {
$row = new Entity\Api\StationOnDemand();
$row->track_id = $media->getUniqueId();
@ -134,7 +85,7 @@ final class ListAction
song: $media,
station: $station
);
$row->playlist = $playlist['name'];
$row->download_url = $router->named(
'api:stations:ondemand:download',
[
@ -145,10 +96,10 @@ final class ListAction
$row->resolveUrls($router->getBaseUrl());
$list[] = Utilities\Arrays::flattenArray($row, '_');
return $row;
}
}
);
return $list;
return $paginator->write($response);
}
}

View File

@ -5,15 +5,21 @@ declare(strict_types=1);
namespace App\Controller\Api\Stations;
use App\Controller\Api\Traits\CanSortResults;
use App\Doctrine\ReloadableEntityManagerInterface;
use App\Entity;
use App\Http\Response;
use App\Http\ServerRequest;
use App\Message\Meilisearch\UpdatePlaylistsMessage;
use App\OpenApi;
use App\Radio\AutoDJ\Scheduler;
use Carbon\CarbonInterface;
use InvalidArgumentException;
use OpenApi\Attributes as OA;
use Psr\Http\Message\ResponseInterface;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Serializer\Normalizer\AbstractNormalizer;
use Symfony\Component\Serializer\Serializer;
use Symfony\Component\Validator\Validator\ValidatorInterface;
/** @extends AbstractScheduledEntityController<Entity\StationPlaylist> */
#[
@ -145,6 +151,17 @@ final class PlaylistsController extends AbstractScheduledEntityController
protected string $entityClass = Entity\StationPlaylist::class;
protected string $resourceRouteName = 'api:stations:playlist';
public function __construct(
Entity\Repository\StationScheduleRepository $scheduleRepo,
Scheduler $scheduler,
ReloadableEntityManagerInterface $em,
Serializer $serializer,
ValidatorInterface $validator,
private readonly MessageBus $messageBus
) {
parent::__construct($scheduleRepo, $scheduler, $em, $serializer, $validator);
}
public function listAction(
ServerRequest $request,
Response $response,
@ -323,4 +340,38 @@ final class PlaylistsController extends AbstractScheduledEntityController
)
);
}
public function editAction(
ServerRequest $request,
Response $response,
string $station_id,
string $id
): ResponseInterface {
$result = parent::editAction($request, $response, $station_id, $id);
$this->reindexPlaylists($request->getStation());
return $result;
}
public function deleteAction(
ServerRequest $request,
Response $response,
string $station_id,
string $id
): ResponseInterface {
$result = parent::deleteAction($request, $response, $station_id, $id);
$this->reindexPlaylists($request->getStation());
return $result;
}
private function reindexPlaylists(Entity\Station $station): void
{
$indexMessage = new UpdatePlaylistsMessage();
$indexMessage->station_id = $station->getIdRequired();
$this->messageBus->dispatch($indexMessage);
}
}

View File

@ -45,7 +45,7 @@ final class OnDemandAction
$customFields = [];
foreach ($customFieldsRaw as $row) {
$customFields[] = [
'display_key' => 'media_custom_fields_' . $row['short_name'],
'display_key' => 'custom_field_' . $row['id'],
'key' => $row['short_name'],
'label' => $row['name'],
];

View File

@ -7,16 +7,20 @@ namespace App\Media;
use App\Doctrine\ReadWriteBatchIteratorAggregate;
use App\Entity;
use App\Flysystem\ExtendedFilesystemInterface;
use App\Message\Meilisearch\AddMediaMessage;
use App\Message\Meilisearch\UpdatePlaylistsMessage;
use App\Utilities\File;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\MessageBus;
use Throwable;
final class BatchUtilities
final readonly class BatchUtilities
{
public function __construct(
private readonly EntityManagerInterface $em,
private readonly Entity\Repository\StationMediaRepository $mediaRepo,
private readonly Entity\Repository\UnprocessableMediaRepository $unprocessableMediaRepo,
private EntityManagerInterface $em,
private Entity\Repository\StationMediaRepository $mediaRepo,
private Entity\Repository\UnprocessableMediaRepository $unprocessableMediaRepo,
private MessageBus $messageBus,
) {
}
@ -36,8 +40,14 @@ final class BatchUtilities
$this->iteratePlaylistFoldersInDirectory($storageLocation, $from),
];
$mediaToReindex = [];
foreach ($toRename as $iterator) {
foreach ($iterator as $record) {
if ($record instanceof Entity\StationMedia) {
$mediaToReindex[] = $record->getIdRequired();
}
/** @var Entity\Interfaces\PathAwareInterface $record */
$record->setPath(
File::renameDirectoryInPath($record->getPath(), $from, $to)
@ -45,6 +55,8 @@ final class BatchUtilities
$this->em->persist($record);
}
}
$this->queueMediaForIndex($storageLocation, $mediaToReindex);
} else {
$record = $this->mediaRepo->findByPath($from, $storageLocation);
@ -52,6 +64,8 @@ final class BatchUtilities
$record->setPath($to);
$this->em->persist($record);
$this->em->flush();
$this->queueMediaForIndex($storageLocation, [$record->getIdRequired()]);
} else {
$record = $this->unprocessableMediaRepo->findByPath($from, $storageLocation);
@ -84,7 +98,11 @@ final class BatchUtilities
/*
* NOTE: This iteration clears the entity manager.
*/
$mediaToReindex = [];
foreach ($this->iterateMedia($storageLocation, $files) as $media) {
$mediaToReindex[] = $media->getIdRequired();
try {
foreach ($this->mediaRepo->remove($media, false, $fs) as $playlistId => $playlist) {
if (!isset($affectedPlaylists[$playlistId])) {
@ -95,6 +113,8 @@ final class BatchUtilities
}
}
$this->queueMediaForIndex($storageLocation, $mediaToReindex);
/*
* NOTE: This iteration clears the entity manager.
*/
@ -113,6 +133,30 @@ final class BatchUtilities
return $affectedPlaylists;
}
public function queueMediaForIndex(
Entity\StorageLocation $storageLocation,
array $ids,
bool $includePlaylists = false
): void {
$queueMessage = new AddMediaMessage();
$queueMessage->storage_location_id = $storageLocation->getIdRequired();
$queueMessage->media_ids = $ids;
$queueMessage->include_playlists = $includePlaylists;
$this->messageBus->dispatch($queueMessage);
}
public function queuePlaylistsForUpdate(
Entity\Station $station,
?array $ids = null
): void {
$queueMessage = new UpdatePlaylistsMessage();
$queueMessage->station_id = $station->getIdRequired();
$queueMessage->media_ids = $ids;
$this->messageBus->dispatch($queueMessage);
}
/**
* Iterate through the found media records, while occasionally flushing and clearing the entity manager.
*

View File

@ -1,33 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Message;
use App\MessageQueue\QueueManagerInterface;
final class AddMediaToSearchIndexMessage extends AbstractUniqueMessage
{
/** @var int The numeric identifier for the StorageLocation entity. */
public int $storage_location_id;
/** @var int[] An array of media IDs to process. */
public array $media;
public function getIdentifier(): string
{
$messageHash = md5(
json_encode([
'id' => $this->storage_location_id,
'media' => $this->media,
], JSON_THROW_ON_ERROR)
);
return 'AddMediaToSearchIndexMessage_' . $messageHash;
}
public function getQueue(): string
{
return QueueManagerInterface::QUEUE_MEDIA;
}
}

View File

@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
namespace App\Message\Meilisearch;
use App\Message\AbstractMessage;
final class AddMediaMessage extends AbstractMessage
{
/** @var int The numeric identifier for the StorageLocation entity. */
public int $storage_location_id;
/** @var int[] An array of media IDs to process. */
public array $media_ids;
/** @var bool Whether to include playlist data. */
public bool $include_playlists = false;
}

View File

@ -0,0 +1,16 @@
<?php
declare(strict_types=1);
namespace App\Message\Meilisearch;
use App\Message\AbstractMessage;
final class UpdatePlaylistsMessage extends AbstractMessage
{
/** @var int The numeric identifier for the Station entity. */
public int $station_id;
/** @var int[]|null Only update for specific media IDs. */
public ?array $media_ids = null;
}

View File

@ -13,6 +13,7 @@ use Doctrine\ORM\Query;
use Doctrine\ORM\QueryBuilder;
use Generator;
use IteratorAggregate;
use Pagerfanta\Adapter\AdapterInterface;
use Pagerfanta\Adapter\ArrayAdapter;
use Pagerfanta\Doctrine\Collections\CollectionAdapter;
use Pagerfanta\Doctrine\ORM\QueryAdapter;
@ -185,6 +186,22 @@ final class Paginator implements IteratorAggregate, Countable
);
}
/**
* @template X of mixed
*
* @param AdapterInterface<X> $adapter
* @return static<array-key, X>
*/
public static function fromAdapter(
AdapterInterface $adapter,
ServerRequestInterface $request
): self {
return new self(
new Pagerfanta($adapter),
$request
);
}
/**
* @template XKey of array-key
* @template X of mixed
@ -194,10 +211,7 @@ final class Paginator implements IteratorAggregate, Countable
*/
public static function fromArray(array $input, ServerRequestInterface $request): self
{
return new self(
new Pagerfanta(new ArrayAdapter($input)),
$request
);
return self::fromAdapter(new ArrayAdapter($input), $request);
}
/**
@ -209,10 +223,7 @@ final class Paginator implements IteratorAggregate, Countable
*/
public static function fromCollection(Collection $collection, ServerRequestInterface $request): self
{
return new self(
new Pagerfanta(new CollectionAdapter($collection)),
$request,
);
return self::fromAdapter(new CollectionAdapter($collection), $request);
}
/**
@ -220,10 +231,7 @@ final class Paginator implements IteratorAggregate, Countable
*/
public static function fromQueryBuilder(QueryBuilder $qb, ServerRequestInterface $request): self
{
return new self(
new Pagerfanta(new QueryAdapter($qb)),
$request
);
return self::fromAdapter(new QueryAdapter($qb), $request);
}
/**
@ -231,9 +239,6 @@ final class Paginator implements IteratorAggregate, Countable
*/
public static function fromQuery(Query $query, ServerRequestInterface $request): self
{
return new self(
new Pagerfanta(new QueryAdapter($query)),
$request
);
return self::fromAdapter(new QueryAdapter($query), $request);
}
}

View File

@ -12,14 +12,14 @@ use GuzzleHttp\Client as GuzzleClient;
use GuzzleHttp\Psr7\HttpFactory;
use Meilisearch\Client;
final class Meilisearch
final readonly class Meilisearch
{
public const BATCH_SIZE = 100;
public function __construct(
private readonly Environment $environment,
private readonly GuzzleClient $httpClient,
private readonly FactoryInterface $factory
private Environment $environment,
private GuzzleClient $httpClient,
private FactoryInterface $factory
) {
}

View File

@ -10,17 +10,19 @@ use App\Entity\Station;
use App\Entity\StorageLocation;
use App\Environment;
use App\Service\Meilisearch;
use Doctrine\ORM\AbstractQuery;
use Meilisearch\Contracts\DocumentsQuery;
use Meilisearch\Endpoints\Indexes;
use Meilisearch\Exceptions\ApiException;
final class Index
final readonly class Index
{
public function __construct(
private readonly ReloadableEntityManagerInterface $em,
private readonly CustomFieldRepository $customFieldRepo,
private readonly Environment $environment,
private readonly StorageLocation $storageLocation,
private readonly Indexes $indexClient,
private ReloadableEntityManagerInterface $em,
private CustomFieldRepository $customFieldRepo,
private Environment $environment,
private StorageLocation $storageLocation,
private Indexes $indexClient,
) {
}
@ -59,38 +61,54 @@ final class Index
];
// Avoid updating settings unless necessary to avoid triggering a reindex.
$this->indexClient->create(
$this->indexClient->getUid(),
['primaryKey' => 'id']
);
try {
$this->indexClient->fetchRawInfo();
} catch (ApiException) {
$response = $this->indexClient->create(
$this->indexClient->getUid() ?? '',
['primaryKey' => 'id']
);
$this->indexClient->waitForTask($response['taskUid']);
}
$currentSettings = $this->indexClient->getSettings();
$settingsToUpdate = [];
foreach ($indexSettings as $settingKey => $setting) {
$currentSetting = $currentSettings[$settingKey] ?? [];
sort($setting);
if ($currentSetting !== $setting) {
$settingsToUpdate[$settingKey] = $setting;
}
}
if (!empty($settingsToUpdate)) {
$this->indexClient->updateSettings($settingsToUpdate);
$response = $this->indexClient->updateSettings($settingsToUpdate);
$this->indexClient->waitForTask($response['taskUid']);
}
}
public function getIdsInIndex(): iterable
public function getIdsInIndex(): array
{
$ids = [];
foreach ($this->getAllDocuments(['id', 'mtime']) as $document) {
$ids[$document['id']] = $document['mtime'];
}
return $ids;
}
public function getAllDocuments(array $fields = ['*']): iterable
{
$perPage = Meilisearch::BATCH_SIZE;
$documentsQuery = (new DocumentsQuery())
->setOffset(0)
->setLimit($perPage)
->setFields(['id']);
->setFields($fields);
$documents = $this->indexClient->getDocuments($documentsQuery);
foreach ($documents->getIterator() as $document) {
yield $document['id'];
}
yield from $documents->getIterator();
if ($documents->getTotal() <= $perPage) {
return;
@ -100,9 +118,7 @@ final class Index
for ($page = 1; $page <= $totalPages; $page++) {
$documentsQuery->setOffset($page * $perPage);
$documents = $this->indexClient->getDocuments($documentsQuery);
foreach ($documents->getIterator() as $document) {
yield $document['id'];
}
yield from $documents->getIterator();
}
}
@ -111,11 +127,6 @@ final class Index
$this->indexClient->deleteDocuments($ids);
}
public function addMedia(array $ids): void
{
$this->refreshMedia($ids, true);
}
public function refreshMedia(
array $ids,
bool $includePlaylists = false
@ -182,7 +193,6 @@ final class Index
$mediaRaw = $this->em->createQuery(
<<<'DQL'
SELECT sm.id,
sm.unique_id,
sm.path,
sm.mtime,
sm.length_text,
@ -197,7 +207,7 @@ final class Index
DQL
)->setParameter('storageLocation', $this->storageLocation)
->setParameter('ids', $ids)
->toIterable();
->toIterable([], AbstractQuery::HYDRATE_ARRAY);
$media = [];
@ -205,7 +215,7 @@ final class Index
$mediaId = $row['id'];
$record = [
'id' => $row['unique_id'],
'id' => $row['id'],
'path' => $row['path'],
'mtime' => $row['mtime'],
'duration' => $row['length_text'],
@ -264,30 +274,44 @@ final class Index
}
}
public function refreshPlaylists(Station $station): void
{
public function refreshPlaylists(
Station $station,
?array $ids = null
): void {
$stationId = $station->getIdRequired();
$playlistsKey = 'station_' . $stationId . '_playlists';
$isRequestableKey = 'station_' . $stationId . '_is_requestable';
$isOnDemandKey = 'station_' . $stationId . '_is_on_demand';
$allMediaRaw = $this->em->createQuery(
<<<'DQL'
SELECT m.id, m.unique_id FROM App\Entity\StationMedia m
WHERE m.storage_location = :storageLocation
DQL
)->setParameter('storageLocation', $this->storageLocation)
->getArrayResult();
$media = [];
foreach ($allMediaRaw as $mediaRow) {
$media[$mediaRow['id']] = [
'id' => $mediaRow['unique_id'],
$playlistsKey => [],
$isRequestableKey => false,
$isOnDemandKey => false,
];
if (null === $ids) {
$allMediaRaw = $this->em->createQuery(
<<<'DQL'
SELECT m.id FROM App\Entity\StationMedia m
WHERE m.storage_location = :storageLocation
DQL
)->setParameter('storageLocation', $this->storageLocation)
->toIterable([], AbstractQuery::HYDRATE_ARRAY);
foreach ($allMediaRaw as $mediaRow) {
$media[$mediaRow['id']] = [
'id' => $mediaRow['id'],
$playlistsKey => [],
$isRequestableKey => false,
$isOnDemandKey => false,
];
}
} else {
foreach ($ids as $mediaId) {
$media[$mediaId] = [
'id' => $mediaId,
$playlistsKey => [],
$isRequestableKey => false,
$isOnDemandKey => false,
];
}
}
$allPlaylists = $this->em->createQuery(
@ -313,14 +337,27 @@ final class Index
}
}
$mediaInPlaylists = $this->em->createQuery(
<<<'DQL'
SELECT spm.media_id, spm.playlist_id
FROM App\Entity\StationPlaylistMedia spm
WHERE spm.playlist_id IN (:allPlaylistIds)
DQL
)->setParameter('allPlaylistIds', $allPlaylistIds)
->toIterable();
if (null === $ids) {
$mediaInPlaylists = $this->em->createQuery(
<<<'DQL'
SELECT spm.media_id, spm.playlist_id
FROM App\Entity\StationPlaylistMedia spm
WHERE spm.playlist_id IN (:allPlaylistIds)
DQL
)->setParameter('allPlaylistIds', $allPlaylistIds)
->toIterable([], AbstractQuery::HYDRATE_ARRAY);
} else {
$mediaInPlaylists = $this->em->createQuery(
<<<'DQL'
SELECT spm.media_id, spm.playlist_id
FROM App\Entity\StationPlaylistMedia spm
WHERE spm.playlist_id IN (:allPlaylistIds)
AND spm.media_id IN (:mediaIds)
DQL
)->setParameter('allPlaylistIds', $allPlaylistIds)
->setParameter('mediaIds', $ids)
->toIterable([], AbstractQuery::HYDRATE_ARRAY);
}
foreach ($mediaInPlaylists as $spmRow) {
$mediaId = $spmRow['media_id'];
@ -341,6 +378,74 @@ final class Index
);
}
/**
* @return PaginatorAdapter<int|string, mixed>
*/
public function getRequestableSearchPaginator(
Station $station,
callable $hydrateCallback,
?string $query,
array $searchParams = [],
array $options = [],
): PaginatorAdapter {
return $this->getSearchPaginator(
$hydrateCallback,
$query,
[
...$searchParams,
'filter' => [
[
'station_' . $station->getIdRequired() . '_is_requestable = true',
],
],
],
$options
);
}
/**
* @return PaginatorAdapter<int|string, mixed>
*/
public function getOnDemandSearchPaginator(
Station $station,
callable $hydrateCallback,
?string $query,
array $searchParams = [],
array $options = [],
): PaginatorAdapter {
return $this->getSearchPaginator(
$hydrateCallback,
$query,
[
...$searchParams,
'filter' => [
[
'station_' . $station->getIdRequired() . '_is_on_demand = true',
],
],
],
$options
);
}
/**
* @return PaginatorAdapter<int|string, mixed>
*/
public function getSearchPaginator(
callable $hydrateCallback,
?string $query,
array $searchParams = [],
array $options = [],
): PaginatorAdapter {
return new PaginatorAdapter(
$this->indexClient,
$hydrateCallback(...),
$query,
$searchParams,
$options,
);
}
/** @return int[] */
private function getStationIds(): array
{

View File

@ -0,0 +1,65 @@
<?php
declare(strict_types=1);
namespace App\Service\Meilisearch;
use App\Entity\Repository\StationRepository;
use App\Entity\Repository\StorageLocationRepository;
use App\Entity\Station;
use App\Entity\StorageLocation;
use App\Message\AbstractMessage;
use App\Message\Meilisearch\AddMediaMessage;
use App\Message\Meilisearch\UpdatePlaylistsMessage;
use App\Service\Meilisearch;
final readonly class MessageHandler
{
public function __construct(
private Meilisearch $meilisearch,
private StorageLocationRepository $storageLocationRepo,
private StationRepository $stationRepo
) {
}
public function __invoke(AbstractMessage $message): void
{
if (!$this->meilisearch->isSupported()) {
return;
}
match (true) {
$message instanceof AddMediaMessage => $this->addMedia($message),
$message instanceof UpdatePlaylistsMessage => $this->updatePlaylists($message),
default => null,
};
}
private function addMedia(AddMediaMessage $message): void
{
$storageLocation = $this->storageLocationRepo->find($message->storage_location_id);
if (!($storageLocation instanceof StorageLocation)) {
return;
}
$index = $this->meilisearch->getIndex($storageLocation);
$index->refreshMedia(
$message->media_ids,
$message->include_playlists
);
}
private function updatePlaylists(UpdatePlaylistsMessage $message): void
{
$station = $this->stationRepo->find($message->station_id);
if (!($station instanceof Station)) {
return;
}
$storageLocation = $station->getMediaStorageLocation();
$index = $this->meilisearch->getIndex($storageLocation);
$index->refreshPlaylists($station, $message->media_ids);
}
}

View File

@ -0,0 +1,60 @@
<?php
declare(strict_types=1);
namespace App\Service\Meilisearch;
use Closure;
use Meilisearch\Endpoints\Indexes;
use Meilisearch\Search\SearchResult;
use Pagerfanta\Adapter\AdapterInterface;
/**
* Adapter which uses Meilisearch to perform a search, then uses a callback to hydrate with database records.
*
* @template TKey of array-key
* @template T
* @implements AdapterInterface<T>
*/
final readonly class PaginatorAdapter implements AdapterInterface
{
public function __construct(
private Indexes $indexClient,
private Closure $hydrateCallback,
private ?string $query,
private array $searchParams = [],
private array $options = [],
) {
}
public function getNbResults(): int
{
/** @var SearchResult $results */
$results = $this->indexClient->search(
$this->query,
[
...$this->searchParams,
'hitsPerPage' => 0,
],
$this->options
);
return abs($results->getTotalHits() ?? 0);
}
public function getSlice(int $offset, int $length): iterable
{
/** @var SearchResult $results */
$results = $this->indexClient->search(
$this->query,
[
...$this->searchParams,
'offset' => $offset,
'limit' => $length,
],
$this->options
);
return ($this->hydrateCallback)($results->getHits());
}
}

View File

@ -6,15 +6,18 @@ namespace App\Sync\Task;
use App\Doctrine\ReloadableEntityManagerInterface;
use App\Entity;
use App\Flysystem\StationFilesystems;
use App\Flysystem\ExtendedFilesystemInterface;
use App\Flysystem\StationFilesystems;
use App\Message\Meilisearch\UpdatePlaylistsMessage;
use Doctrine\ORM\Query;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\MessageBus;
final class CheckFolderPlaylistsTask extends AbstractTask
{
public function __construct(
private readonly Entity\Repository\StationPlaylistMediaRepository $spmRepo,
private readonly MessageBus $messageBus,
ReloadableEntityManagerInterface $em,
LoggerInterface $logger,
) {
@ -110,6 +113,8 @@ final class CheckFolderPlaylistsTask extends AbstractTask
->getArrayResult();
$addedRecords = 0;
$mediaToIndex = [];
foreach ($mediaInFolderRaw as $row) {
$mediaId = $row['id'];
@ -119,12 +124,21 @@ final class CheckFolderPlaylistsTask extends AbstractTask
if ($media instanceof Entity\StationMedia) {
$this->spmRepo->addMediaToPlaylist($media, $playlist);
$mediaToIndex[] = $mediaId;
$mediaInPlaylist[$mediaId] = $mediaId;
$addedRecords++;
}
}
}
if (!empty($mediaToIndex)) {
$indexMessage = new UpdatePlaylistsMessage();
$indexMessage->station_id = $station->getIdRequired();
$indexMessage->media_ids = $mediaToIndex;
$this->messageBus->dispatch($indexMessage);
}
$logMessage = (0 === $addedRecords)
? 'No changes detected in folder.'
: sprintf('%d media records added from folder.', $addedRecords);

View File

@ -6,7 +6,7 @@ namespace App\Sync\Task;
use App\Doctrine\ReloadableEntityManagerInterface;
use App\Entity;
use App\Message\AddMediaToSearchIndexMessage;
use App\Message\Meilisearch\AddMediaMessage;
use App\MessageQueue\QueueManagerInterface;
use App\Service\Meilisearch;
use Doctrine\ORM\AbstractQuery;
@ -57,11 +57,19 @@ final class UpdateMeilisearchIndex extends AbstractTask
public function updateIndex(Entity\StorageLocation $storageLocation): void
{
$stats = [
'existing' => 0,
'queued' => 0,
'added' => 0,
'updated' => 0,
'deleted' => 0,
];
$index = $this->meilisearch->getIndex($storageLocation);
$index->configure();
$existingIdsRaw = iterator_to_array($index->getIdsInIndex(), false);
$existingIds = array_combine($existingIdsRaw, $existingIdsRaw);
$existingIds = $index->getIdsInIndex();
$stats['existing'] = count($existingIds);
$queuedMedia = [];
@ -70,16 +78,17 @@ final class UpdateMeilisearchIndex extends AbstractTask
QueueManagerInterface::QUEUE_NORMAL_PRIORITY
) as $message
) {
if ($message instanceof AddMediaToSearchIndexMessage) {
foreach ($message->media as $mediaId) {
if ($message instanceof AddMediaMessage) {
foreach ($message->media_ids as $mediaId) {
$queuedMedia[$mediaId] = $mediaId;
$stats['queued']++;
}
}
}
$mediaRaw = $this->em->createQuery(
<<<'DQL'
SELECT sm.id, sm.unique_id
SELECT sm.id, sm.mtime
FROM App\Entity\StationMedia sm
WHERE sm.storage_location = :storageLocation
DQL
@ -87,28 +96,53 @@ final class UpdateMeilisearchIndex extends AbstractTask
->toIterable([], AbstractQuery::HYDRATE_ARRAY);
$newIds = [];
$idsToUpdate = [];
foreach ($mediaRaw as $row) {
if (
isset($existingIds[$row['unique_id']])
|| isset($queuedMedia[$row['id']])
) {
unset($existingIds[$row['unique_id']]);
$mediaId = $row['id'];
if (isset($queuedMedia[$mediaId])) {
unset($existingIds[$mediaId]);
continue;
}
$newIds[] = $row['id'];
if (isset($existingIds[$mediaId])) {
if ($existingIds[$mediaId] < $row['mtime']) {
$idsToUpdate[] = $mediaId;
$stats['updated']++;
}
unset($existingIds[$mediaId]);
continue;
}
$newIds[] = $mediaId;
$stats['added']++;
}
foreach (array_chunk($idsToUpdate, Meilisearch::BATCH_SIZE) as $batchIds) {
$message = new AddMediaMessage();
$message->storage_location_id = $storageLocation->getIdRequired();
$message->media_ids = $batchIds;
$message->include_playlists = true;
$this->messageBus->dispatch($message);
}
foreach (array_chunk($newIds, Meilisearch::BATCH_SIZE) as $batchIds) {
$message = new AddMediaToSearchIndexMessage();
$message = new AddMediaMessage();
$message->storage_location_id = $storageLocation->getIdRequired();
$message->media = $batchIds;
$message->media_ids = $batchIds;
$message->include_playlists = true;
$this->messageBus->dispatch($message);
}
if (!empty($existingIds)) {
$stats['deleted'] = count($existingIds);
$index->deleteIds($existingIds);
}
$this->logger->debug(sprintf('Meilisearch processed for "%s".', $storageLocation), $stats);
}
}