notice( 'Starting new Message Queue worker process.', [ 'runtime' => $runtime, 'workerName' => $workerName, ] ); if (null !== $workerName) { $queueManager->setWorkerName($workerName); } $receivers = $queueManager->getTransports(); $eventDispatcher->addServiceSubscriber(ClearEntityManagerSubscriber::class); $eventDispatcher->addServiceSubscriber(LogWorkerExceptionSubscriber::class); $eventDispatcher->addServiceSubscriber(ResetArrayCacheMiddleware::class); if ($runtime <= 0) { $runtime = $environment->isProduction() ? 300 : 30; } $eventDispatcher->addSubscriber(new StopWorkerOnTimeLimitListener($runtime, $logger)); try { $worker = new Worker($receivers, $messageBus, $eventDispatcher, $logger); $worker->run(); } catch (Throwable $e) { $logger->error( sprintf('Message queue error: %s', $e->getMessage()), [ 'workerName' => $workerName, 'exception' => $e, ] ); return 1; } return 0; } }