settings = $settings; $this->eventModel = $eventModel; $this->eventsSender = $eventsSender; $this->logger = $logger; $this->cache = $cache; // Получаем конфигурацию из настроек пользователя $this->maxAttempts = (int) $this->settings->get('pulse.max_attempts', env('PULSE_MAX_ATTEMPTS', 3)); $this->batchSize = (int) $this->settings->get('pulse.batch_size', env('PULSE_BATCH_SIZE', 50)); } public function execute(): void { try { // Получаем события со статусом pending $events = $this->eventModel->findPending($this->batchSize); if (empty($events)) { $this->logger->debug('No pending events to send'); return; } $count = count($events); $this->logger->info("Processing pending events: $count", [ 'count' => $count, ]); $processed = 0; $succeeded = 0; $failed = 0; foreach ($events as $event) { try { $result = $this->processEvent($event); $result ? $succeeded++ : $failed++; } catch (Throwable $e) { $this->logger->error("Failed to process event {$event['id']}: " . $e->getMessage(), [ 'event_id' => $event['id'], 'event' => $event['event'] ?? null, 'payload' => $event['payload'] ?? null, 'exception' => $e, ]); $failed++; } finally { $processed++; } } $this->logger->info("Events processing completed", [ 'processed' => $processed, 'succeeded' => $succeeded, 'failed' => $failed, ]); } catch (Throwable $e) { $this->logger->error("AcmeShopPulseSendEventsTask failed: " . $e->getMessage(), [ 'exception' => $e, ]); } finally { // Сбрасываем кеш статистики после каждого прогона $this->clearStatsCache(); } } /** * Обработать одно событие * * @param array $event Данные события из БД * @return bool true если событие успешно отправлено, false если требуется повторная попытка * @throws Throwable */ private function processEvent(array $event): bool { $eventId = (int) $event['id']; $attemptsCount = (int) $event['attempts_count']; try { // Пытаемся отправить событие $success = $this->eventsSender->sendEvent($event); if ($success) { // Успешная отправка $this->eventModel->updateStatus($eventId, 'sent'); $this->logger->debug("Event {$eventId} sent successfully", [ 'event_id' => $eventId, 'event' => $event['event'], ]); return true; } // AcmeShop Pulse не вернул подтверждение $errorReason = 'No confirmation received from AcmeShop Pulse'; $this->handleFailedAttempt($eventId, $attemptsCount, $errorReason); } catch (GuzzleException $e) { // Ошибка HTTP запроса $errorReason = 'HTTP error: ' . $e->getMessage(); $this->handleFailedAttempt($eventId, $attemptsCount, $errorReason); } catch (Throwable $e) { // Другие ошибки (валидация, подпись и т.д.) $errorReason = 'Error: ' . $e->getMessage(); $this->handleFailedAttempt($eventId, $attemptsCount, $errorReason); } return false; } /** * Обработать неудачную попытку отправки * * @param int $eventId ID события * @param int $currentAttempts Текущее количество попыток * @param string $errorReason Причина ошибки */ private function handleFailedAttempt(int $eventId, int $currentAttempts, string $errorReason): void { $newAttempts = $currentAttempts + 1; if ($newAttempts >= $this->maxAttempts) { // Превышен лимит попыток - переводим в failed $this->eventModel->updateStatus($eventId, 'failed', $errorReason); $this->logger->warning("Event {$eventId} marked as failed after {$newAttempts} attempts", [ 'event_id' => $eventId, 'attempts' => $newAttempts, 'error' => $errorReason, ]); return; } // Увеличиваем счетчик попыток, оставляем статус pending $this->eventModel->incrementAttempts($eventId); $this->logger->debug("Event {$eventId} attempt failed, will retry", [ 'event_id' => $eventId, 'attempts' => $newAttempts, 'max_attempts' => $this->maxAttempts, 'error' => $errorReason, ]); } /** * Сбросить кеш статистики */ private function clearStatsCache(): void { $this->cache->delete('acmeshop_pulse_stats'); } }