#5623 -- Optimize ingest of new listeners with LOAD DATA LOCAL INFILE.
This commit is contained in:
parent
c44fd64124
commit
b48cb5e49a
|
@ -92,6 +92,8 @@ return [
|
|||
'driverOptions' => [
|
||||
// PDO::MYSQL_ATTR_INIT_COMMAND = 1002;
|
||||
1002 => 'SET NAMES utf8mb4 COLLATE utf8mb4_general_ci',
|
||||
// PDO::MYSQL_ATTR_LOCAL_INFILE = 1001
|
||||
1001 => true,
|
||||
],
|
||||
'platform' => new Doctrine\DBAL\Platforms\MariaDb1027Platform(),
|
||||
]
|
||||
|
|
|
@ -9,11 +9,14 @@ use App\Doctrine\Repository;
|
|||
use App\Entity;
|
||||
use App\Service\DeviceDetector;
|
||||
use App\Service\IpGeolocation;
|
||||
use App\Utilities\Logger;
|
||||
use App\Utilities\File;
|
||||
use Carbon\CarbonImmutable;
|
||||
use DateTimeInterface;
|
||||
use Doctrine\DBAL\Connection;
|
||||
use League\Csv\Writer;
|
||||
use NowPlaying\Result\Client;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Filesystem\Filesystem;
|
||||
use Throwable;
|
||||
|
||||
/**
|
||||
|
@ -30,7 +33,8 @@ final class ListenerRepository extends Repository
|
|||
public function __construct(
|
||||
ReloadableEntityManagerInterface $em,
|
||||
private readonly DeviceDetector $deviceDetector,
|
||||
private readonly IpGeolocation $ipGeolocation
|
||||
private readonly IpGeolocation $ipGeolocation,
|
||||
private readonly LoggerInterface $logger
|
||||
) {
|
||||
parent::__construct($em);
|
||||
|
||||
|
@ -111,17 +115,11 @@ final class ListenerRepository extends Repository
|
|||
$existingClients[$client['listener_hash']] = $client['id'];
|
||||
}
|
||||
|
||||
foreach ($clients as $client) {
|
||||
$identifier = Entity\Listener::calculateListenerHash($client);
|
||||
|
||||
// Check for an existing record for this client.
|
||||
if (isset($existingClients[$identifier])) {
|
||||
unset($existingClients[$identifier]);
|
||||
} else {
|
||||
// Create a new record.
|
||||
$this->batchAddRow($station, $client);
|
||||
}
|
||||
}
|
||||
$this->batchAddClients(
|
||||
$station,
|
||||
$clients,
|
||||
$existingClients
|
||||
);
|
||||
|
||||
// Mark the end of all other clients on this station.
|
||||
if (!empty($existingClients)) {
|
||||
|
@ -139,7 +137,82 @@ final class ListenerRepository extends Repository
|
|||
);
|
||||
}
|
||||
|
||||
public function batchAddRow(Entity\Station $station, Client $client): array
|
||||
private function batchAddClients(
|
||||
Entity\Station $station,
|
||||
array &$clients,
|
||||
array &$existingClients
|
||||
): void {
|
||||
$tempCsvPath = File::generateTempPath('mariadb_listeners.csv');
|
||||
(new Filesystem())->chmod($tempCsvPath, 0o777);
|
||||
|
||||
$csv = Writer::createFromPath($tempCsvPath);
|
||||
$csv->setEscape('');
|
||||
$csv->addFormatter(function ($row) {
|
||||
return array_map(function ($col) {
|
||||
if (null === $col) {
|
||||
return '\N';
|
||||
}
|
||||
|
||||
return is_string($col)
|
||||
? str_replace('"', '""', $col)
|
||||
: $col;
|
||||
}, $row);
|
||||
});
|
||||
|
||||
$csvColumns = null;
|
||||
|
||||
foreach ($clients as $client) {
|
||||
$identifier = Entity\Listener::calculateListenerHash($client);
|
||||
|
||||
// Check for an existing record for this client.
|
||||
if (isset($existingClients[$identifier])) {
|
||||
unset($existingClients[$identifier]);
|
||||
} else {
|
||||
// Create a new record.
|
||||
$record = $this->batchAddRow($station, $client);
|
||||
|
||||
if (null === $csvColumns) {
|
||||
$csvColumns = array_keys($record);
|
||||
}
|
||||
|
||||
$csv->insertOne($record);
|
||||
}
|
||||
}
|
||||
|
||||
if (null === $csvColumns) {
|
||||
@unlink($tempCsvPath);
|
||||
return;
|
||||
}
|
||||
|
||||
// Use LOAD DATA INFILE for listener dumps
|
||||
$csvLoadQuery = sprintf(
|
||||
<<<'SQL'
|
||||
LOAD DATA LOCAL INFILE %s IGNORE
|
||||
INTO TABLE %s
|
||||
FIELDS TERMINATED BY ','
|
||||
OPTIONALLY ENCLOSED BY '"'
|
||||
LINES TERMINATED BY '\n'
|
||||
(%s)
|
||||
SQL,
|
||||
$this->conn->quote($tempCsvPath),
|
||||
$this->conn->quoteIdentifier($this->tableName),
|
||||
implode(
|
||||
',',
|
||||
array_map(
|
||||
fn($col) => $this->conn->quoteIdentifier($col),
|
||||
$csvColumns
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
try {
|
||||
$this->conn->executeQuery($csvLoadQuery);
|
||||
} finally {
|
||||
@unlink($tempCsvPath);
|
||||
}
|
||||
}
|
||||
|
||||
private function batchAddRow(Entity\Station $station, Client $client): array
|
||||
{
|
||||
$record = [
|
||||
'station_id' => $station->getId(),
|
||||
|
@ -149,6 +222,21 @@ final class ListenerRepository extends Repository
|
|||
'listener_user_agent' => $this->truncateString($client->userAgent ?? ''),
|
||||
'listener_ip' => $client->ip,
|
||||
'listener_hash' => Entity\Listener::calculateListenerHash($client),
|
||||
'mount_id' => null,
|
||||
'remote_id' => null,
|
||||
'hls_stream_id' => null,
|
||||
'device_client' => null,
|
||||
'device_is_browser' => null,
|
||||
'device_is_mobile' => null,
|
||||
'device_is_bot' => null,
|
||||
'device_browser_family' => null,
|
||||
'device_os_family' => null,
|
||||
'location_description' => null,
|
||||
'location_region' => null,
|
||||
'location_city' => null,
|
||||
'location_country' => null,
|
||||
'location_lat' => null,
|
||||
'location_lon' => null,
|
||||
];
|
||||
|
||||
if (!empty($client->mount)) {
|
||||
|
@ -163,15 +251,13 @@ final class ListenerRepository extends Repository
|
|||
}
|
||||
}
|
||||
|
||||
$record = $this->batchAddDeviceDetails($record);
|
||||
$record = $this->batchAddLocationDetails($record);
|
||||
|
||||
$this->conn->insert($this->tableName, $record);
|
||||
$this->batchAddDeviceDetails($record);
|
||||
$this->batchAddLocationDetails($record);
|
||||
|
||||
return $record;
|
||||
}
|
||||
|
||||
private function batchAddDeviceDetails(array $record): array
|
||||
private function batchAddDeviceDetails(array &$record): void
|
||||
{
|
||||
$userAgent = $record['listener_user_agent'];
|
||||
|
||||
|
@ -185,16 +271,14 @@ final class ListenerRepository extends Repository
|
|||
$record['device_browser_family'] = $this->truncateNullableString($browserResult->browserFamily, 150);
|
||||
$record['device_os_family'] = $this->truncateNullableString($browserResult->osFamily, 150);
|
||||
} catch (Throwable $e) {
|
||||
Logger::getInstance()->error('Device Detector error: ' . $e->getMessage(), [
|
||||
$this->logger->error('Device Detector error: ' . $e->getMessage(), [
|
||||
'user_agent' => $userAgent,
|
||||
'exception' => $e,
|
||||
]);
|
||||
}
|
||||
|
||||
return $record;
|
||||
}
|
||||
|
||||
private function batchAddLocationDetails(array $record): array
|
||||
private function batchAddLocationDetails(array &$record): void
|
||||
{
|
||||
$ip = $record['listener_ip'];
|
||||
|
||||
|
@ -208,15 +292,13 @@ final class ListenerRepository extends Repository
|
|||
$record['location_lat'] = $ipInfo->lat;
|
||||
$record['location_lon'] = $ipInfo->lon;
|
||||
} catch (Throwable $e) {
|
||||
Logger::getInstance()->error('IP Geolocation error: ' . $e->getMessage(), [
|
||||
$this->logger->error('IP Geolocation error: ' . $e->getMessage(), [
|
||||
'ip' => $ip,
|
||||
'exception' => $e,
|
||||
]);
|
||||
|
||||
$record['location_description'] = 'Unknown';
|
||||
}
|
||||
|
||||
return $record;
|
||||
}
|
||||
|
||||
public function clearAll(): void
|
||||
|
|
Loading…
Reference in New Issue