From bb499874ca074d7bb92ce8a49623923a3ad55275 Mon Sep 17 00:00:00 2001 From: Buster Neece Date: Sat, 28 Jan 2023 08:13:02 -0600 Subject: [PATCH] More PHP work! --- config/events.php | 1 + src/Entity/StationMedia.php | 9 - src/Service/Meilisearch.php | 7 +- src/Service/Meilisearch/Index.php | 209 +++++++++++++++++++++-- src/Sync/Task/UpdateMeilisearchIndex.php | 114 +++++++++++++ 5 files changed, 311 insertions(+), 29 deletions(-) create mode 100644 src/Sync/Task/UpdateMeilisearchIndex.php diff --git a/config/events.php b/config/events.php index f1e7dd4fb..99f2f38a0 100644 --- a/config/events.php +++ b/config/events.php @@ -144,6 +144,7 @@ return static function (CallableEventDispatcherInterface $dispatcher) { App\Sync\Task\RunBackupTask::class, App\Sync\Task\SendTimeOnSocketTask::class, App\Sync\Task\UpdateGeoLiteTask::class, + App\Sync\Task\UpdateMeilisearchIndex::class, App\Sync\Task\UpdateStorageLocationSizesTask::class, ]); } diff --git a/src/Entity/StationMedia.php b/src/Entity/StationMedia.php index 738dd8c7d..f4476ed96 100644 --- a/src/Entity/StationMedia.php +++ b/src/Entity/StationMedia.php @@ -192,15 +192,6 @@ class StationMedia implements ] protected int $art_updated_at = 0; - #[ - OA\Property( - description: "The latest time (UNIX timestamp) when the search record for this entry was updated.", - example: OpenApi::SAMPLE_TIMESTAMP - ), - ORM\Column - ] - protected int $search_updated_at = 0; - /** @var Collection */ #[ OA\Property(type: "array", items: new OA\Items()), diff --git a/src/Service/Meilisearch.php b/src/Service/Meilisearch.php index b73d1904f..be89386f3 100644 --- a/src/Service/Meilisearch.php +++ b/src/Service/Meilisearch.php @@ -14,7 +14,7 @@ use Meilisearch\Client; final class Meilisearch { - public const BATCH_SIZE = 50; + public const BATCH_SIZE = 100; public function __construct( private readonly Environment $environment, @@ -52,12 +52,13 @@ final class Meilisearch public function getIndex(StorageLocation $storageLocation): Index { + $client = $this->getClient(); + return $this->factory->make( Index::class, [ 'storageLocation' => $storageLocation, - 'indexUid' => self::getIndexUid($storageLocation), - 'client' => $this->getClient(), + 'indexClient' => $client->index(self::getIndexUid($storageLocation)), ] ); } diff --git a/src/Service/Meilisearch/Index.php b/src/Service/Meilisearch/Index.php index 2a18dddfd..82dfacb51 100644 --- a/src/Service/Meilisearch/Index.php +++ b/src/Service/Meilisearch/Index.php @@ -9,7 +9,9 @@ use App\Entity\Repository\CustomFieldRepository; use App\Entity\Station; use App\Entity\StorageLocation; use App\Environment; -use Meilisearch\Client; +use App\Service\Meilisearch; +use Meilisearch\Contracts\DocumentsQuery; +use Meilisearch\Endpoints\Indexes; final class Index { @@ -17,9 +19,8 @@ final class Index private readonly ReloadableEntityManagerInterface $em, private readonly CustomFieldRepository $customFieldRepo, private readonly Environment $environment, - private readonly Client $client, private readonly StorageLocation $storageLocation, - private readonly string $indexUid + private readonly Indexes $indexClient, ) { } @@ -39,9 +40,7 @@ final class Index 'isrc', ]; - foreach ($this->iterateStations() as $station) { - $stationId = $station->getIdRequired(); - + foreach ($this->getStationIds() as $stationId) { $filterableAttributes[] = 'station_' . $stationId . '_playlists'; $filterableAttributes[] = 'station_' . $stationId . '_is_requestable'; $filterableAttributes[] = 'station_' . $stationId . '_is_on_demand'; @@ -52,7 +51,6 @@ final class Index } $indexSettings = [ - 'primaryKey' => 'id', 'filterableAttributes' => $filterableAttributes, 'sortableAttributes' => $mediaFields, 'displayedAttributes' => $this->environment->isProduction() @@ -61,10 +59,12 @@ final class Index ]; // Avoid updating settings unless necessary to avoid triggering a reindex. - $this->client->createIndex($this->indexUid); + $this->indexClient->create( + $this->indexClient->getUid(), + ['primaryKey' => 'id'] + ); - $index = $this->client->index($this->indexUid); - $currentSettings = $index->getSettings(); + $currentSettings = $this->indexClient->getSettings(); $settingsToUpdate = []; foreach ($indexSettings as $settingKey => $setting) { @@ -75,10 +75,42 @@ final class Index } if (!empty($settingsToUpdate)) { - $index->updateSettings($settingsToUpdate); + $this->indexClient->updateSettings($settingsToUpdate); } } + public function getIdsInIndex(): iterable + { + $perPage = Meilisearch::BATCH_SIZE; + $documentsQuery = (new DocumentsQuery()) + ->setOffset(0) + ->setLimit($perPage) + ->setFields(['id']); + + $documents = $this->indexClient->getDocuments($documentsQuery); + foreach ($documents->getIterator() as $document) { + yield $document['id']; + } + + if ($documents->getTotal() <= $perPage) { + return; + } + + $totalPages = ceil($documents->getTotal() / $perPage); + for ($page = 1; $page <= $totalPages; $page++) { + $documentsQuery->setOffset($page * $perPage); + $documents = $this->indexClient->getDocuments($documentsQuery); + foreach ($documents->getIterator() as $document) { + yield $document['id']; + } + } + } + + public function deleteIds(array $ids): void + { + $this->indexClient->deleteDocuments($ids); + } + public function addMedia(array $ids): void { $this->refreshMedia($ids, true); @@ -88,6 +120,148 @@ final class Index array $ids, bool $includePlaylists = false ): void { + if ($includePlaylists) { + $mediaPlaylistsRaw = $this->em->createQuery( + <<<'DQL' + SELECT spm.media_id, spm.playlist_id + FROM App\Entity\StationPlaylistMedia spm + WHERE spm.media_id IN (:mediaIds) + DQL + )->setParameter('mediaIds', $ids) + ->getArrayResult(); + + $mediaPlaylists = []; + $playlistIds = []; + + foreach ($mediaPlaylistsRaw as $mediaPlaylistRow) { + $mediaId = $mediaPlaylistRow['media_id']; + $playlistId = $mediaPlaylistRow['playlist_id']; + + $playlistIds[$playlistId] = $playlistId; + + $mediaPlaylists[$mediaId] ??= []; + $mediaPlaylists[$mediaId][] = $playlistId; + } + + $stationIds = $this->getStationIds(); + + $playlistsRaw = $this->em->createQuery( + <<<'DQL' + SELECT p.id, p.station_id, p.include_in_on_demand, p.include_in_requests + FROM App\Entity\StationPlaylist p + WHERE p.id IN (:playlistIds) AND p.station_id IN (:stationIds) + AND p.is_enabled = 1 + DQL + )->setParameter('playlistIds', $playlistIds) + ->setParameter('stationIds', $stationIds) + ->getArrayResult(); + + $playlists = []; + foreach ($playlistsRaw as $playlistRow) { + $playlists[$playlistRow['id']] = $playlistRow; + } + } + + $customFieldsRaw = $this->em->createQuery( + <<<'DQL' + SELECT smcf.media_id, smcf.field_id, smcf.value + FROM App\Entity\StationMediaCustomField smcf + WHERE smcf.media_id IN (:mediaIds) + DQL + )->setParameter('mediaIds', $ids) + ->getArrayResult(); + + $customFields = []; + foreach ($customFieldsRaw as $customFieldRow) { + $mediaId = $customFieldRow['media_id']; + + $customFields[$mediaId] ??= []; + $customFields[$mediaId]['custom_field_' . $customFieldRow['field_id']] = $customFieldRow['value']; + } + + $mediaRaw = $this->em->createQuery( + <<<'DQL' + SELECT sm.id, + sm.unique_id, + sm.path, + sm.mtime, + sm.length_text, + sm.title, + sm.artist, + sm.album, + sm.genre, + sm.isrc + FROM App\Entity\StationMedia sm + WHERE sm.storage_location = :storageLocation + AND sm.id IN (:ids) + DQL + )->setParameter('storageLocation', $this->storageLocation) + ->setParameter('ids', $ids) + ->toIterable(); + + $media = []; + + foreach ($mediaRaw as $row) { + $mediaId = $row['id']; + + $record = [ + 'id' => $row['unique_id'], + 'path' => $row['path'], + 'mtime' => $row['mtime'], + 'duration' => $row['length_text'], + 'title' => $row['title'], + 'artist' => $row['artist'], + 'album' => $row['album'], + 'genre' => $row['genre'], + 'isrc' => $row['isrc'], + ]; + + if (isset($customFields[$mediaId])) { + $record = array_merge($record, $customFields[$mediaId]); + } + + if ($includePlaylists) { + foreach ($stationIds as $stationId) { + $record['station_' . $stationId . '_playlists'] = []; + $record['station_' . $stationId . '_is_requestable'] = false; + $record['station_' . $stationId . '_is_on_demand'] = false; + } + + if (isset($mediaPlaylists[$mediaId])) { + foreach ($mediaPlaylists[$mediaId] as $mediaPlaylistId) { + if (!isset($playlists[$mediaPlaylistId])) { + continue; + } + + $playlist = $playlists[$mediaPlaylistId]; + $stationId = $playlist['station_id']; + + $record['station_' . $stationId . '_playlists'][] = $mediaPlaylistId; + + if ($playlist['include_in_requests']) { + $record['station_' . $stationId . '_is_requestable'] = true; + } + if ($playlist['include_in_on_demand']) { + $record['station_' . $stationId . '_is_on_demand'] = true; + } + } + } + } + + $media[$mediaId] = $record; + } + + if ($includePlaylists) { + $this->indexClient->addDocumentsInBatches( + $media, + Meilisearch::BATCH_SIZE + ); + } else { + $this->indexClient->updateDocumentsInBatches( + $media, + Meilisearch::BATCH_SIZE + ); + } } public function refreshPlaylists(Station $station): void @@ -161,21 +335,22 @@ final class Index } } - $this->client->index($this->indexUid)->updateDocumentsInBatches( - array_values($media) + $this->indexClient->updateDocumentsInBatches( + array_values($media), + Meilisearch::BATCH_SIZE ); } - /** @return Station[] */ - private function iterateStations(): iterable + /** @return int[] */ + private function getStationIds(): array { return $this->em->createQuery( <<<'DQL' - SELECT s FROM App\Entity\Station s + SELECT s.id FROM App\Entity\Station s WHERE s.media_storage_location = :storageLocation AND s.is_enabled = 1 DQL )->setParameter('storageLocation', $this->storageLocation) - ->toIterable(); + ->getSingleColumnResult(); } } diff --git a/src/Sync/Task/UpdateMeilisearchIndex.php b/src/Sync/Task/UpdateMeilisearchIndex.php new file mode 100644 index 000000000..25d9e29fd --- /dev/null +++ b/src/Sync/Task/UpdateMeilisearchIndex.php @@ -0,0 +1,114 @@ +meilisearch->isSupported()) { + $this->logger->debug('Meilisearch is not supported on this instance. Skipping sync task.'); + } + + $storageLocations = $this->iterateStorageLocations(Entity\Enums\StorageLocationTypes::StationMedia); + + foreach ($storageLocations as $storageLocation) { + $this->logger->info( + sprintf( + 'Updating MeiliSearch index for storage location %s...', + $storageLocation + ) + ); + + $this->updateIndex($storageLocation); + } + } + + public function updateIndex(Entity\StorageLocation $storageLocation): void + { + $index = $this->meilisearch->getIndex($storageLocation); + $index->configure(); + + $existingIdsRaw = iterator_to_array($index->getIdsInIndex(), false); + $existingIds = array_combine($existingIdsRaw, $existingIdsRaw); + + $queuedMedia = []; + + foreach ( + $this->queueManager->getMessagesInTransport( + QueueManagerInterface::QUEUE_NORMAL_PRIORITY + ) as $message + ) { + if ($message instanceof AddMediaToSearchIndexMessage) { + foreach ($message->media as $mediaId) { + $queuedMedia[$mediaId] = $mediaId; + } + } + } + + $mediaRaw = $this->em->createQuery( + <<<'DQL' + SELECT sm.id, sm.unique_id + FROM App\Entity\StationMedia sm + WHERE sm.storage_location = :storageLocation + DQL + )->setParameter('storageLocation', $storageLocation) + ->toIterable([], AbstractQuery::HYDRATE_ARRAY); + + $newIds = []; + + foreach ($mediaRaw as $row) { + if ( + isset($existingIds[$row['unique_id']]) + || isset($queuedMedia[$row['id']]) + ) { + unset($existingIds[$row['unique_id']]); + continue; + } + + $newIds[] = $row['id']; + } + + foreach (array_chunk($newIds, Meilisearch::BATCH_SIZE) as $batchIds) { + $message = new AddMediaToSearchIndexMessage(); + $message->storage_location_id = $storageLocation->getIdRequired(); + $message->media = $batchIds; + $this->messageBus->dispatch($message); + } + + if (!empty($existingIds)) { + $index->deleteIds($existingIds); + } + } +}