Broadcast recording overhaul pt 1

- Write broadcasts to a temp file before moving to a final path
 - Create routine sync task to move recordings to permanent storage location.
This commit is contained in:
Buster "Silver Eagle" Neece 2021-12-06 02:28:53 -06:00
parent 829f870d43
commit f59c524442
No known key found for this signature in database
GPG Key ID: F1D2E64A0005E80E
7 changed files with 137 additions and 44 deletions

View File

@ -57,8 +57,16 @@ class StationStreamerBroadcastRepository extends Repository
public function getActiveBroadcasts(Entity\Station $station): array
{
return $this->repository->findBy([
'station' => $station,
'station' => $station,
'timestampEnd' => 0,
]);
}
public function findByPath(Entity\Station $station, string $path): ?Entity\StationStreamerBroadcast
{
return $this->repository->findOneBy([
'station' => $station,
'recordingPath' => $path,
]);
}
}

View File

@ -84,20 +84,18 @@ class StationStreamerRepository extends Repository
$record = new Entity\StationStreamerBroadcast($streamer);
$this->em->persist($record);
if (Adapters::BACKEND_LIQUIDSOAP === $station->getBackendType()) {
$backendConfig = $station->getBackendConfig();
$recordStreams = $backendConfig->recordStreams();
$backendConfig = $station->getBackendConfig();
$recordStreams = $backendConfig->recordStreams();
if ($recordStreams) {
$format = $backendConfig->getRecordStreamsFormat(
) ?? Entity\Interfaces\StationMountInterface::FORMAT_MP3;
$recordingPath = $record->generateRecordingPath($format);
$this->em->persist($record);
$this->em->flush();
if ($recordStreams) {
$format = $backendConfig->getRecordStreamsFormat()
?? Entity\Interfaces\StationMountInterface::FORMAT_MP3;
$recordingPath = $record->generateRecordingPath($format);
return (new StationFilesystems($station))->getTempFilesystem()
->getLocalPath($recordingPath);
}
$this->em->persist($record);
$this->em->flush();
return $recordingPath;
}
$this->em->flush();
@ -111,26 +109,6 @@ class StationStreamerRepository extends Repository
$fsRecordings = $fs->getRecordingsFilesystem();
foreach ($this->broadcastRepo->getActiveBroadcasts($station) as $broadcast) {
$broadcastPath = $broadcast->getRecordingPath();
if ((null !== $broadcastPath) && $fsTemp->fileExists($broadcastPath)) {
$recordingsStorageLocation = $station->getRecordingsStorageLocation();
$tempPath = $fsTemp->getLocalPath($broadcastPath);
if ($recordingsStorageLocation->canHoldFile($fsTemp->fileSize($broadcastPath))) {
$fsRecordings->uploadAndDeleteOriginal($tempPath, $broadcastPath);
} else {
$this->logger->error(
'Storage location full; broadcast not moved to storage location. '
. 'Check temporary directory at path to recover file.',
[
'storageLocation' => (string)$recordingsStorageLocation,
'path' => $tempPath,
]
);
}
}
$broadcast->setTimestampEnd(time());
$this->em->persist($broadcast);
}

View File

@ -24,6 +24,8 @@ class StationStreamerBroadcast implements IdentifiableEntityInterface
use Traits\HasAutoIncrementId;
use Traits\TruncateStrings;
public const PATH_PREFIX = 'stream';
#[ORM\ManyToOne(inversedBy: 'streamer_broadcasts')]
#[ORM\JoinColumn(name: 'station_id', referencedColumnName: 'id', nullable: false, onDelete: 'CASCADE')]
protected Station $station;
@ -92,7 +94,9 @@ class StationStreamerBroadcast implements IdentifiableEntityInterface
$this->timestampStart,
$this->station->getTimezoneObject()
);
$this->recordingPath = $this->streamer->getStreamerUsername() . '/' . $now->format('Ymd-His') . '.' . $ext;
$this->recordingPath = $this->streamer->getStreamerUsername()
. '/' . self::PATH_PREFIX . '_' . $now->format('Ymd-His') . '.' . $ext;
return $this->recordingPath;
}

View File

@ -8,6 +8,7 @@ use App\Entity;
use App\Environment;
use App\Event\Radio\WriteLiquidsoapConfiguration;
use App\Exception;
use App\Flysystem\StationFilesystems;
use Doctrine\ORM\EntityManagerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\Http\Message\UriInterface;
@ -341,7 +342,9 @@ class Liquidsoap extends AbstractBackend
$resp = $this->streamerRepo->onConnect($station, $user);
if (is_string($resp)) {
$this->command($station, 'recording.start ' . $resp);
$finalPath = (new StationFilesystems($station))->getTempFilesystem()->getLocalPath($resp);
$this->command($station, 'recording.start ' . $finalPath);
return 'recording';
}

View File

@ -838,12 +838,7 @@ class ConfigWriter implements EventSubscriberInterface
<<< EOF
# Record Live Broadcasts
stop_recording_f = ref (fun () -> ())
def start_recording(path) =
output_live_recording = output.file({$formatString}, fallible=true, reopen_on_metadata=false, "#{path}", live)
stop_recording_f := fun () -> output_live_recording.shutdown()
end
def stop_recording() =
f = !stop_recording_f
f ()
@ -851,6 +846,16 @@ class ConfigWriter implements EventSubscriberInterface
stop_recording_f := fun () -> ()
end
def start_recording(path) =
stop_recording ()
output_live_recording = output.file({$formatString}, fallible=true, reopen_on_metadata=false, "#{path}.tmp", live)
stop_recording_f := fun() -> begin
output_live_recording.shutdown()
process.run("mv #{path}.tmp #{path}")
end
end
server.register(namespace="recording", description="Start recording.", usage="recording.start filename", "start", fun (s) -> begin start_recording(s) "Done!" end)
server.register(namespace="recording", description="Stop recording.", usage="recording.stop", "stop", fun (s) -> begin stop_recording() "Done!" end)
EOF

View File

@ -0,0 +1,94 @@
<?php
declare(strict_types=1);
namespace App\Sync\Task;
use App\Doctrine\ReloadableEntityManagerInterface;
use App\Entity;
use Psr\Log\LoggerInterface;
use Symfony\Component\Finder\Finder;
class MoveBroadcastsTask extends AbstractTask
{
public function __construct(
ReloadableEntityManagerInterface $em,
LoggerInterface $logger,
protected Entity\Repository\StationStreamerBroadcastRepository $broadcastRepo,
protected Entity\Repository\StorageLocationRepository $storageLocationRepo,
) {
parent::__construct($em, $logger);
}
public function run(bool $force = false): void
{
foreach ($this->iterateStorageLocations(Entity\StorageLocation::TYPE_STATION_RECORDINGS) as $storageLocation) {
$this->processForStorageLocation($storageLocation);
}
}
protected function processForStorageLocation(Entity\StorageLocation $storageLocation): void
{
if ($storageLocation->isStorageFull()) {
$this->logger->error('Storage location is full; skipping broadcasts.', [
'storageLocation' => (string)$storageLocation,
]);
return;
}
$fs = $storageLocation->getFilesystem();
$stations = $this->storageLocationRepo->getStationsUsingLocation($storageLocation);
foreach ($stations as $station) {
$finder = (new Finder())
->files()
->in($station->getRadioTempDir())
->name(Entity\StationStreamerBroadcast::PATH_PREFIX . '_*')
->notName('*.tmp')
->depth(1);
$this->logger->debug('Files', ['files', iterator_to_array($finder)]);
foreach ($finder as $file) {
$this->logger->debug('File', ['file' => $file]);
$recordingPath = $file->getRelativePathname();
if (!$storageLocation->canHoldFile($file->getSize())) {
$this->logger->error(
'Storage location full; broadcast not moved to storage location. '
. 'Check temporary directory at path to recover file.',
[
'storageLocation' => (string)$storageLocation,
'path' => $recordingPath,
]
);
break;
}
$broadcast = $this->broadcastRepo->findByPath($station, $recordingPath);
if (null !== $broadcast) {
$tempPath = $file->getPathname();
$fs->uploadAndDeleteOriginal($tempPath, $recordingPath);
$this->logger->info(
'Uploaded broadcast to storage location.',
[
'storageLocation' => (string)$storageLocation,
'path' => $recordingPath,
]
);
} else {
@unlink($file->getPathname());
$this->logger->info(
'Could not find a corresponding broadcast.',
[
'path' => $recordingPath,
]
);
}
}
}
}
}

View File

@ -20,17 +20,18 @@ class TaskLocator
Task\NowPlayingTask::class,
Task\ReactivateStreamerTask::class,
],
GetSyncTasks::SYNC_SHORT => [
GetSyncTasks::SYNC_SHORT => [
Task\CheckRequests::class,
Task\RunBackupTask::class,
Task\CleanupRelaysTask::class,
],
GetSyncTasks::SYNC_MEDIUM => [
GetSyncTasks::SYNC_MEDIUM => [
Task\CheckMediaTask::class,
Task\CheckFolderPlaylistsTask::class,
Task\CheckUpdatesTask::class,
Task\MoveBroadcastsTask::class,
],
GetSyncTasks::SYNC_LONG => [
GetSyncTasks::SYNC_LONG => [
Task\RunAnalyticsTask::class,
Task\RunAutomatedAssignmentTask::class,
Task\CleanupLoginTokensTask::class,