More PHP work!
This commit is contained in:
parent
5bd5c77416
commit
bb499874ca
|
@ -144,6 +144,7 @@ return static function (CallableEventDispatcherInterface $dispatcher) {
|
|||
App\Sync\Task\RunBackupTask::class,
|
||||
App\Sync\Task\SendTimeOnSocketTask::class,
|
||||
App\Sync\Task\UpdateGeoLiteTask::class,
|
||||
App\Sync\Task\UpdateMeilisearchIndex::class,
|
||||
App\Sync\Task\UpdateStorageLocationSizesTask::class,
|
||||
]);
|
||||
}
|
||||
|
|
|
@ -192,15 +192,6 @@ class StationMedia implements
|
|||
]
|
||||
protected int $art_updated_at = 0;
|
||||
|
||||
#[
|
||||
OA\Property(
|
||||
description: "The latest time (UNIX timestamp) when the search record for this entry was updated.",
|
||||
example: OpenApi::SAMPLE_TIMESTAMP
|
||||
),
|
||||
ORM\Column
|
||||
]
|
||||
protected int $search_updated_at = 0;
|
||||
|
||||
/** @var Collection<int, StationPlaylistMedia> */
|
||||
#[
|
||||
OA\Property(type: "array", items: new OA\Items()),
|
||||
|
|
|
@ -14,7 +14,7 @@ use Meilisearch\Client;
|
|||
|
||||
final class Meilisearch
|
||||
{
|
||||
public const BATCH_SIZE = 50;
|
||||
public const BATCH_SIZE = 100;
|
||||
|
||||
public function __construct(
|
||||
private readonly Environment $environment,
|
||||
|
@ -52,12 +52,13 @@ final class Meilisearch
|
|||
|
||||
public function getIndex(StorageLocation $storageLocation): Index
|
||||
{
|
||||
$client = $this->getClient();
|
||||
|
||||
return $this->factory->make(
|
||||
Index::class,
|
||||
[
|
||||
'storageLocation' => $storageLocation,
|
||||
'indexUid' => self::getIndexUid($storageLocation),
|
||||
'client' => $this->getClient(),
|
||||
'indexClient' => $client->index(self::getIndexUid($storageLocation)),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
|
|
@ -9,7 +9,9 @@ use App\Entity\Repository\CustomFieldRepository;
|
|||
use App\Entity\Station;
|
||||
use App\Entity\StorageLocation;
|
||||
use App\Environment;
|
||||
use Meilisearch\Client;
|
||||
use App\Service\Meilisearch;
|
||||
use Meilisearch\Contracts\DocumentsQuery;
|
||||
use Meilisearch\Endpoints\Indexes;
|
||||
|
||||
final class Index
|
||||
{
|
||||
|
@ -17,9 +19,8 @@ final class Index
|
|||
private readonly ReloadableEntityManagerInterface $em,
|
||||
private readonly CustomFieldRepository $customFieldRepo,
|
||||
private readonly Environment $environment,
|
||||
private readonly Client $client,
|
||||
private readonly StorageLocation $storageLocation,
|
||||
private readonly string $indexUid
|
||||
private readonly Indexes $indexClient,
|
||||
) {
|
||||
}
|
||||
|
||||
|
@ -39,9 +40,7 @@ final class Index
|
|||
'isrc',
|
||||
];
|
||||
|
||||
foreach ($this->iterateStations() as $station) {
|
||||
$stationId = $station->getIdRequired();
|
||||
|
||||
foreach ($this->getStationIds() as $stationId) {
|
||||
$filterableAttributes[] = 'station_' . $stationId . '_playlists';
|
||||
$filterableAttributes[] = 'station_' . $stationId . '_is_requestable';
|
||||
$filterableAttributes[] = 'station_' . $stationId . '_is_on_demand';
|
||||
|
@ -52,7 +51,6 @@ final class Index
|
|||
}
|
||||
|
||||
$indexSettings = [
|
||||
'primaryKey' => 'id',
|
||||
'filterableAttributes' => $filterableAttributes,
|
||||
'sortableAttributes' => $mediaFields,
|
||||
'displayedAttributes' => $this->environment->isProduction()
|
||||
|
@ -61,10 +59,12 @@ final class Index
|
|||
];
|
||||
|
||||
// Avoid updating settings unless necessary to avoid triggering a reindex.
|
||||
$this->client->createIndex($this->indexUid);
|
||||
$this->indexClient->create(
|
||||
$this->indexClient->getUid(),
|
||||
['primaryKey' => 'id']
|
||||
);
|
||||
|
||||
$index = $this->client->index($this->indexUid);
|
||||
$currentSettings = $index->getSettings();
|
||||
$currentSettings = $this->indexClient->getSettings();
|
||||
$settingsToUpdate = [];
|
||||
|
||||
foreach ($indexSettings as $settingKey => $setting) {
|
||||
|
@ -75,10 +75,42 @@ final class Index
|
|||
}
|
||||
|
||||
if (!empty($settingsToUpdate)) {
|
||||
$index->updateSettings($settingsToUpdate);
|
||||
$this->indexClient->updateSettings($settingsToUpdate);
|
||||
}
|
||||
}
|
||||
|
||||
public function getIdsInIndex(): iterable
|
||||
{
|
||||
$perPage = Meilisearch::BATCH_SIZE;
|
||||
$documentsQuery = (new DocumentsQuery())
|
||||
->setOffset(0)
|
||||
->setLimit($perPage)
|
||||
->setFields(['id']);
|
||||
|
||||
$documents = $this->indexClient->getDocuments($documentsQuery);
|
||||
foreach ($documents->getIterator() as $document) {
|
||||
yield $document['id'];
|
||||
}
|
||||
|
||||
if ($documents->getTotal() <= $perPage) {
|
||||
return;
|
||||
}
|
||||
|
||||
$totalPages = ceil($documents->getTotal() / $perPage);
|
||||
for ($page = 1; $page <= $totalPages; $page++) {
|
||||
$documentsQuery->setOffset($page * $perPage);
|
||||
$documents = $this->indexClient->getDocuments($documentsQuery);
|
||||
foreach ($documents->getIterator() as $document) {
|
||||
yield $document['id'];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public function deleteIds(array $ids): void
|
||||
{
|
||||
$this->indexClient->deleteDocuments($ids);
|
||||
}
|
||||
|
||||
public function addMedia(array $ids): void
|
||||
{
|
||||
$this->refreshMedia($ids, true);
|
||||
|
@ -88,6 +120,148 @@ final class Index
|
|||
array $ids,
|
||||
bool $includePlaylists = false
|
||||
): void {
|
||||
if ($includePlaylists) {
|
||||
$mediaPlaylistsRaw = $this->em->createQuery(
|
||||
<<<'DQL'
|
||||
SELECT spm.media_id, spm.playlist_id
|
||||
FROM App\Entity\StationPlaylistMedia spm
|
||||
WHERE spm.media_id IN (:mediaIds)
|
||||
DQL
|
||||
)->setParameter('mediaIds', $ids)
|
||||
->getArrayResult();
|
||||
|
||||
$mediaPlaylists = [];
|
||||
$playlistIds = [];
|
||||
|
||||
foreach ($mediaPlaylistsRaw as $mediaPlaylistRow) {
|
||||
$mediaId = $mediaPlaylistRow['media_id'];
|
||||
$playlistId = $mediaPlaylistRow['playlist_id'];
|
||||
|
||||
$playlistIds[$playlistId] = $playlistId;
|
||||
|
||||
$mediaPlaylists[$mediaId] ??= [];
|
||||
$mediaPlaylists[$mediaId][] = $playlistId;
|
||||
}
|
||||
|
||||
$stationIds = $this->getStationIds();
|
||||
|
||||
$playlistsRaw = $this->em->createQuery(
|
||||
<<<'DQL'
|
||||
SELECT p.id, p.station_id, p.include_in_on_demand, p.include_in_requests
|
||||
FROM App\Entity\StationPlaylist p
|
||||
WHERE p.id IN (:playlistIds) AND p.station_id IN (:stationIds)
|
||||
AND p.is_enabled = 1
|
||||
DQL
|
||||
)->setParameter('playlistIds', $playlistIds)
|
||||
->setParameter('stationIds', $stationIds)
|
||||
->getArrayResult();
|
||||
|
||||
$playlists = [];
|
||||
foreach ($playlistsRaw as $playlistRow) {
|
||||
$playlists[$playlistRow['id']] = $playlistRow;
|
||||
}
|
||||
}
|
||||
|
||||
$customFieldsRaw = $this->em->createQuery(
|
||||
<<<'DQL'
|
||||
SELECT smcf.media_id, smcf.field_id, smcf.value
|
||||
FROM App\Entity\StationMediaCustomField smcf
|
||||
WHERE smcf.media_id IN (:mediaIds)
|
||||
DQL
|
||||
)->setParameter('mediaIds', $ids)
|
||||
->getArrayResult();
|
||||
|
||||
$customFields = [];
|
||||
foreach ($customFieldsRaw as $customFieldRow) {
|
||||
$mediaId = $customFieldRow['media_id'];
|
||||
|
||||
$customFields[$mediaId] ??= [];
|
||||
$customFields[$mediaId]['custom_field_' . $customFieldRow['field_id']] = $customFieldRow['value'];
|
||||
}
|
||||
|
||||
$mediaRaw = $this->em->createQuery(
|
||||
<<<'DQL'
|
||||
SELECT sm.id,
|
||||
sm.unique_id,
|
||||
sm.path,
|
||||
sm.mtime,
|
||||
sm.length_text,
|
||||
sm.title,
|
||||
sm.artist,
|
||||
sm.album,
|
||||
sm.genre,
|
||||
sm.isrc
|
||||
FROM App\Entity\StationMedia sm
|
||||
WHERE sm.storage_location = :storageLocation
|
||||
AND sm.id IN (:ids)
|
||||
DQL
|
||||
)->setParameter('storageLocation', $this->storageLocation)
|
||||
->setParameter('ids', $ids)
|
||||
->toIterable();
|
||||
|
||||
$media = [];
|
||||
|
||||
foreach ($mediaRaw as $row) {
|
||||
$mediaId = $row['id'];
|
||||
|
||||
$record = [
|
||||
'id' => $row['unique_id'],
|
||||
'path' => $row['path'],
|
||||
'mtime' => $row['mtime'],
|
||||
'duration' => $row['length_text'],
|
||||
'title' => $row['title'],
|
||||
'artist' => $row['artist'],
|
||||
'album' => $row['album'],
|
||||
'genre' => $row['genre'],
|
||||
'isrc' => $row['isrc'],
|
||||
];
|
||||
|
||||
if (isset($customFields[$mediaId])) {
|
||||
$record = array_merge($record, $customFields[$mediaId]);
|
||||
}
|
||||
|
||||
if ($includePlaylists) {
|
||||
foreach ($stationIds as $stationId) {
|
||||
$record['station_' . $stationId . '_playlists'] = [];
|
||||
$record['station_' . $stationId . '_is_requestable'] = false;
|
||||
$record['station_' . $stationId . '_is_on_demand'] = false;
|
||||
}
|
||||
|
||||
if (isset($mediaPlaylists[$mediaId])) {
|
||||
foreach ($mediaPlaylists[$mediaId] as $mediaPlaylistId) {
|
||||
if (!isset($playlists[$mediaPlaylistId])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$playlist = $playlists[$mediaPlaylistId];
|
||||
$stationId = $playlist['station_id'];
|
||||
|
||||
$record['station_' . $stationId . '_playlists'][] = $mediaPlaylistId;
|
||||
|
||||
if ($playlist['include_in_requests']) {
|
||||
$record['station_' . $stationId . '_is_requestable'] = true;
|
||||
}
|
||||
if ($playlist['include_in_on_demand']) {
|
||||
$record['station_' . $stationId . '_is_on_demand'] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$media[$mediaId] = $record;
|
||||
}
|
||||
|
||||
if ($includePlaylists) {
|
||||
$this->indexClient->addDocumentsInBatches(
|
||||
$media,
|
||||
Meilisearch::BATCH_SIZE
|
||||
);
|
||||
} else {
|
||||
$this->indexClient->updateDocumentsInBatches(
|
||||
$media,
|
||||
Meilisearch::BATCH_SIZE
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public function refreshPlaylists(Station $station): void
|
||||
|
@ -161,21 +335,22 @@ final class Index
|
|||
}
|
||||
}
|
||||
|
||||
$this->client->index($this->indexUid)->updateDocumentsInBatches(
|
||||
array_values($media)
|
||||
$this->indexClient->updateDocumentsInBatches(
|
||||
array_values($media),
|
||||
Meilisearch::BATCH_SIZE
|
||||
);
|
||||
}
|
||||
|
||||
/** @return Station[] */
|
||||
private function iterateStations(): iterable
|
||||
/** @return int[] */
|
||||
private function getStationIds(): array
|
||||
{
|
||||
return $this->em->createQuery(
|
||||
<<<'DQL'
|
||||
SELECT s FROM App\Entity\Station s
|
||||
SELECT s.id FROM App\Entity\Station s
|
||||
WHERE s.media_storage_location = :storageLocation
|
||||
AND s.is_enabled = 1
|
||||
DQL
|
||||
)->setParameter('storageLocation', $this->storageLocation)
|
||||
->toIterable();
|
||||
->getSingleColumnResult();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Sync\Task;
|
||||
|
||||
use App\Doctrine\ReloadableEntityManagerInterface;
|
||||
use App\Entity;
|
||||
use App\Message\AddMediaToSearchIndexMessage;
|
||||
use App\MessageQueue\QueueManagerInterface;
|
||||
use App\Service\Meilisearch;
|
||||
use Doctrine\ORM\AbstractQuery;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\MessageBus;
|
||||
|
||||
final class UpdateMeilisearchIndex extends AbstractTask
|
||||
{
|
||||
public function __construct(
|
||||
private readonly MessageBus $messageBus,
|
||||
private readonly QueueManagerInterface $queueManager,
|
||||
private readonly Meilisearch $meilisearch,
|
||||
ReloadableEntityManagerInterface $em,
|
||||
LoggerInterface $logger
|
||||
) {
|
||||
parent::__construct($em, $logger);
|
||||
}
|
||||
|
||||
public static function getSchedulePattern(): string
|
||||
{
|
||||
return '3-59/5 * * * *';
|
||||
}
|
||||
|
||||
public static function isLongTask(): bool
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
public function run(bool $force = false): void
|
||||
{
|
||||
if (!$this->meilisearch->isSupported()) {
|
||||
$this->logger->debug('Meilisearch is not supported on this instance. Skipping sync task.');
|
||||
}
|
||||
|
||||
$storageLocations = $this->iterateStorageLocations(Entity\Enums\StorageLocationTypes::StationMedia);
|
||||
|
||||
foreach ($storageLocations as $storageLocation) {
|
||||
$this->logger->info(
|
||||
sprintf(
|
||||
'Updating MeiliSearch index for storage location %s...',
|
||||
$storageLocation
|
||||
)
|
||||
);
|
||||
|
||||
$this->updateIndex($storageLocation);
|
||||
}
|
||||
}
|
||||
|
||||
public function updateIndex(Entity\StorageLocation $storageLocation): void
|
||||
{
|
||||
$index = $this->meilisearch->getIndex($storageLocation);
|
||||
$index->configure();
|
||||
|
||||
$existingIdsRaw = iterator_to_array($index->getIdsInIndex(), false);
|
||||
$existingIds = array_combine($existingIdsRaw, $existingIdsRaw);
|
||||
|
||||
$queuedMedia = [];
|
||||
|
||||
foreach (
|
||||
$this->queueManager->getMessagesInTransport(
|
||||
QueueManagerInterface::QUEUE_NORMAL_PRIORITY
|
||||
) as $message
|
||||
) {
|
||||
if ($message instanceof AddMediaToSearchIndexMessage) {
|
||||
foreach ($message->media as $mediaId) {
|
||||
$queuedMedia[$mediaId] = $mediaId;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$mediaRaw = $this->em->createQuery(
|
||||
<<<'DQL'
|
||||
SELECT sm.id, sm.unique_id
|
||||
FROM App\Entity\StationMedia sm
|
||||
WHERE sm.storage_location = :storageLocation
|
||||
DQL
|
||||
)->setParameter('storageLocation', $storageLocation)
|
||||
->toIterable([], AbstractQuery::HYDRATE_ARRAY);
|
||||
|
||||
$newIds = [];
|
||||
|
||||
foreach ($mediaRaw as $row) {
|
||||
if (
|
||||
isset($existingIds[$row['unique_id']])
|
||||
|| isset($queuedMedia[$row['id']])
|
||||
) {
|
||||
unset($existingIds[$row['unique_id']]);
|
||||
continue;
|
||||
}
|
||||
|
||||
$newIds[] = $row['id'];
|
||||
}
|
||||
|
||||
foreach (array_chunk($newIds, Meilisearch::BATCH_SIZE) as $batchIds) {
|
||||
$message = new AddMediaToSearchIndexMessage();
|
||||
$message->storage_location_id = $storageLocation->getIdRequired();
|
||||
$message->media = $batchIds;
|
||||
$this->messageBus->dispatch($message);
|
||||
}
|
||||
|
||||
if (!empty($existingIds)) {
|
||||
$index->deleteIds($existingIds);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue