feat(pulse): implement reliable event tracking and delivery system

Implement comprehensive event tracking system for TeleCart Pulse that ensures
all user interactions and order events are reliably captured and delivered to
the analytics platform, even in case of network failures or service outages.

Business Value:
- Guaranteed event delivery: All events are stored in database before sending,
  ensuring no data loss even if SaaS service is temporarily unavailable
- Automatic retry mechanism: Failed events are automatically retried with
  configurable attempts, reducing manual intervention
- Real-time monitoring: Admin dashboard displays event statistics (pending,
  sent, failed) to track system health and delivery status
- Data integrity: Idempotency keys prevent duplicate events, ensuring accurate
  analytics and metrics
- Performance optimization: Statistics are cached for 1 hour to reduce database
  load while maintaining visibility

Key Features:
- Event queue system: Events are queued in database with status tracking
  (pending/sent/failed)
- Asynchronous processing: Events are sent via background tasks, not blocking
  user interactions
- Error tracking: Failed events include detailed error reasons for debugging
- Campaign tracking: Only events with valid campaign_id and tracking_id are
  stored, ensuring data quality
- Admin visibility: Statistics dashboard shows delivery status at a glance

This system ensures reliable data collection for campaign analytics, A/B testing,
and performance metrics, providing accurate insights for business decisions.
This commit is contained in:
2025-12-07 19:46:33 +03:00
committed by Nikita Kiselev
parent 1f5ef4353d
commit 4a3dcc11d1
19 changed files with 745 additions and 5468 deletions

View File

@@ -77,6 +77,7 @@ export const useSettingsStore = defineStore('settings', {
pulse: { pulse: {
api_key: '', api_key: '',
batch_size: 50,
}, },
cron: { cron: {

View File

@@ -1,15 +1,97 @@
<template> <template>
<ItemInput label="API ключ" <div class="tw:space-y-6">
v-model="settings.items.pulse.api_key" <SettingsItem v-if="settings.items.pulse.api_key" label="Статистика за 7 дней">
placeholder="AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE" <template #default>
> <div v-if="stats" class="tw:space-y-4">
Используется для обмена информацией по кампаниям, рассылкам, сбору метрик. <div class="tw:flex tw:gap-3 tw:max-w-2xl">
</ItemInput> <div class="tw:group tw:bg-white tw:rounded-lg tw:shadow tw:p-3 tw:relative tw:flex-1 tw:transition-all tw:duration-200 tw:cursor-default tw:hover:shadow-md tw:hover:-translate-y-0.5">
<div class="tw:flex tw:justify-between tw:items-start tw:mb-1.5">
<div class="tw:text-xs tw:font-medium tw:text-gray-700">В очереди</div>
<div
class="tw:w-8 tw:h-8 tw:rounded-lg tw:bg-gradient-to-br tw:from-yellow-400 tw:to-yellow-600 tw:flex tw:items-center tw:justify-center tw:transition-transform tw:duration-200 tw:group-hover:scale-110">
<i class="fa fa-clock-o tw:text-white tw:text-xs"></i>
</div>
</div>
<div class="tw:text-3xl tw:font-bold tw:text-gray-800 tw:mb-0.5">{{
stats.pending
}}
</div>
<div class="tw:text-xs tw:text-gray-500">Ожидают отправки</div>
</div>
<div class="tw:group tw:bg-white tw:rounded-lg tw:shadow tw:p-3 tw:relative tw:flex-1 tw:transition-all tw:duration-200 tw:cursor-default tw:hover:shadow-md tw:hover:-translate-y-0.5">
<div class="tw:flex tw:justify-between tw:items-start tw:mb-1.5">
<div class="tw:text-xs tw:font-medium tw:text-gray-700">Отправлено</div>
<div
class="tw:w-8 tw:h-8 tw:rounded-lg tw:bg-gradient-to-br tw:from-green-400 tw:to-green-600 tw:flex tw:items-center tw:justify-center tw:transition-transform tw:duration-200 tw:group-hover:scale-110">
<i class="fa fa-check-circle tw:text-white tw:text-xs"></i>
</div>
</div>
<div class="tw:text-3xl tw:font-bold tw:text-gray-800 tw:mb-0.5">{{
stats.sent
}}
</div>
<div class="tw:text-xs tw:text-gray-500">Успешно доставлено</div>
</div>
<div class="tw:group tw:bg-white tw:rounded-lg tw:shadow tw:p-3 tw:relative tw:flex-1 tw:transition-all tw:duration-200 tw:cursor-default tw:hover:shadow-md tw:hover:-translate-y-0.5">
<div class="tw:flex tw:justify-between tw:items-start tw:mb-1.5">
<div class="tw:text-xs tw:font-medium tw:text-gray-700">Ошибки</div>
<div
class="tw:w-8 tw:h-8 tw:rounded-lg tw:bg-gradient-to-br tw:from-red-400 tw:to-red-600 tw:flex tw:items-center tw:justify-center tw:transition-transform tw:duration-200 tw:group-hover:scale-110">
<i class="fa fa-exclamation-circle tw:text-white tw:text-xs"></i>
</div>
</div>
<div class="tw:text-3xl tw:font-bold tw:text-gray-800 tw:mb-0.5">{{
stats.failed
}}
</div>
<div class="tw:text-xs tw:text-gray-500">Требуют внимания</div>
</div>
</div>
</div>
</template>
<template #help>
Статистика обновляется 1 раз в час
</template>
</SettingsItem>
<ItemInput label="API ключ"
v-model="settings.items.pulse.api_key"
placeholder="AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE"
>
Используется для обмена информацией по кампаниям, рассылкам, сбору метрик.
</ItemInput>
<ItemInput label="Размер пакета обработки"
v-model.number="settings.items.pulse.batch_size"
type="number"
placeholder="50"
>
Определяет, сколько событий отправляется в TeleCart Pulse за один запуск фоновой задачи.
При большом значении события обрабатываются быстрее, но увеличивается нагрузка на сервер.
При малом значении нагрузка ниже, но обработка занимает больше времени.
Рекомендуемое значение: 50.
</ItemInput>
</div>
</template> </template>
<script setup> <script setup>
import {onMounted, ref} from "vue";
import {useSettingsStore} from "@/stores/settings.js"; import {useSettingsStore} from "@/stores/settings.js";
import ItemInput from "@/components/Settings/ItemInput.vue"; import ItemInput from "@/components/Settings/ItemInput.vue";
import {apiGet} from "@/utils/http.js";
import SettingsItem from "@/components/SettingsItem.vue";
const settings = useSettingsStore(); const settings = useSettingsStore();
const stats = ref(null);
const loadStats = async () => {
const response = await apiGet('getTeleCartPulseStats');
if (response.success) {
stats.value = response.data;
}
};
onMounted(() => {
loadStats();
});
</script> </script>

View File

@@ -20,14 +20,16 @@ export const usePulseStore = defineStore('pulse', {
}, },
ingest(event, eventData = {}) { ingest(event, eventData = {}) {
const idempotencyKey = crypto.randomUUID();
ingest({ ingest({
event: event, event: event,
idempotency_key: idempotencyKey,
payload: { payload: {
webapp: window.Telegram.WebApp, webapp: window.Telegram.WebApp,
eventData: eventData, eventData: eventData,
}, },
}) })
.then(() => console.debug('[Pulse] Event Ingested', event, eventData)) .then(() => console.debug('[Pulse] Event Ingested', event, eventData, idempotencyKey))
.catch(err => console.error('Ingest failed:', err)); .catch(err => console.error('Ingest failed:', err));
}, },

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4.2 KiB

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,33 @@
<?php
namespace Bastion\Handlers;
use Openguru\OpenCartFramework\Cache\CacheInterface;
use Openguru\OpenCartFramework\TeleCartPulse\TeleCartEvent;
use Symfony\Component\HttpFoundation\JsonResponse;
class TeleCartPulseStatsHandler
{
private TeleCartEvent $eventModel;
private CacheInterface $cache;
private const CACHE_KEY = 'telecart_pulse_stats';
private const CACHE_TTL = 3600; // 1 час
public function __construct(TeleCartEvent $eventModel, CacheInterface $cache)
{
$this->eventModel = $eventModel;
$this->cache = $cache;
}
public function getStats(): JsonResponse
{
$stats = $this->cache->get(self::CACHE_KEY);
if ($stats === null) {
$stats = $this->eventModel->getStats();
$this->cache->set(self::CACHE_KEY, $stats, self::CACHE_TTL);
}
return new JsonResponse(['data' => $stats]);
}
}

View File

@@ -0,0 +1,176 @@
<?php
namespace Bastion\ScheduledTasks;
use GuzzleHttp\Exception\GuzzleException;
use Openguru\OpenCartFramework\Cache\CacheInterface;
use Openguru\OpenCartFramework\Config\Settings;
use Openguru\OpenCartFramework\Scheduler\TaskInterface;
use Openguru\OpenCartFramework\Settings\UserSettingsInterface;
use Openguru\OpenCartFramework\TeleCartPulse\TeleCartEvent;
use Openguru\OpenCartFramework\TeleCartPulse\TeleCartPulseEventsSender;
use Psr\Log\LoggerInterface;
use Throwable;
class TeleCartPulseSendEventsTask implements TaskInterface
{
private TeleCartEvent $eventModel;
private TeleCartPulseEventsSender $eventsSender;
private LoggerInterface $logger;
private CacheInterface $cache;
private Settings $settings;
private int $maxAttempts;
private int $batchSize;
public function __construct(
Settings $settings,
TeleCartEvent $eventModel,
TeleCartPulseEventsSender $eventsSender,
LoggerInterface $logger,
CacheInterface $cache
) {
$this->settings = $settings;
$this->eventModel = $eventModel;
$this->eventsSender = $eventsSender;
$this->logger = $logger;
$this->cache = $cache;
// Получаем конфигурацию из настроек пользователя
$this->maxAttempts = (int) $this->settings->get('pulse.max_attempts', env('PULSE_MAX_ATTEMPTS', 3));
$this->batchSize = (int) $this->settings->get('pulse.batch_size', env('PULSE_BATCH_SIZE', 50));
}
public function execute(): void
{
try {
// Получаем события со статусом pending
$events = $this->eventModel->findPending($this->batchSize);
if (empty($events)) {
$this->logger->debug('No pending events to send');
return;
}
$count = count($events);
$this->logger->info("Processing pending events: $count", [
'count' => $count,
]);
$processed = 0;
$succeeded = 0;
$failed = 0;
foreach ($events as $event) {
try {
$result = $this->processEvent($event);
$result ? $succeeded++ : $failed++;
} catch (Throwable $e) {
$this->logger->error("Failed to process event {$event['id']}: " . $e->getMessage(), [
'event_id' => $event['id'],
'event' => $event['event'] ?? null,
'payload' => $event['payload'] ?? null,
'exception' => $e,
]);
$failed++;
} finally {
$processed++;
}
}
$this->logger->info("Events processing completed", [
'processed' => $processed,
'succeeded' => $succeeded,
'failed' => $failed,
]);
} catch (Throwable $e) {
$this->logger->error("TeleCartPulseSendEventsTask failed: " . $e->getMessage(), [
'exception' => $e,
]);
} finally {
// Сбрасываем кеш статистики после каждого прогона
$this->clearStatsCache();
}
}
/**
* Обработать одно событие
*
* @param array $event Данные события из БД
* @return bool true если событие успешно отправлено, false если требуется повторная попытка
* @throws Throwable
*/
private function processEvent(array $event): bool
{
$eventId = (int) $event['id'];
$attemptsCount = (int) $event['attempts_count'];
try {
// Пытаемся отправить событие
$success = $this->eventsSender->sendEvent($event);
if ($success) {
// Успешная отправка
$this->eventModel->updateStatus($eventId, 'sent');
$this->logger->debug("Event {$eventId} sent successfully", [
'event_id' => $eventId,
'event' => $event['event'],
]);
return true;
}
// TeleCart Pulse не вернул подтверждение
$errorReason = 'No confirmation received from TeleCart Pulse';
$this->handleFailedAttempt($eventId, $attemptsCount, $errorReason);
} catch (GuzzleException $e) {
// Ошибка HTTP запроса
$errorReason = 'HTTP error: ' . $e->getMessage();
$this->handleFailedAttempt($eventId, $attemptsCount, $errorReason);
} catch (Throwable $e) {
// Другие ошибки (валидация, подпись и т.д.)
$errorReason = 'Error: ' . $e->getMessage();
$this->handleFailedAttempt($eventId, $attemptsCount, $errorReason);
}
return false;
}
/**
* Обработать неудачную попытку отправки
*
* @param int $eventId ID события
* @param int $currentAttempts Текущее количество попыток
* @param string $errorReason Причина ошибки
*/
private function handleFailedAttempt(int $eventId, int $currentAttempts, string $errorReason): void
{
$newAttempts = $currentAttempts + 1;
if ($newAttempts >= $this->maxAttempts) {
// Превышен лимит попыток - переводим в failed
$this->eventModel->updateStatus($eventId, 'failed', $errorReason);
$this->logger->warning("Event {$eventId} marked as failed after {$newAttempts} attempts", [
'event_id' => $eventId,
'attempts' => $newAttempts,
'error' => $errorReason,
]);
return;
}
// Увеличиваем счетчик попыток, оставляем статус pending
$this->eventModel->incrementAttempts($eventId);
$this->logger->debug("Event {$eventId} attempt failed, will retry", [
'event_id' => $eventId,
'attempts' => $newAttempts,
'max_attempts' => $this->maxAttempts,
'error' => $errorReason,
]);
}
/**
* Сбросить кеш статистики
*/
private function clearStatsCache(): void
{
$this->cache->delete('telecart_pulse_stats');
}
}

View File

@@ -8,6 +8,7 @@ use Bastion\Handlers\LogsHandler;
use Bastion\Handlers\SendMessageHandler; use Bastion\Handlers\SendMessageHandler;
use Bastion\Handlers\SettingsHandler; use Bastion\Handlers\SettingsHandler;
use Bastion\Handlers\StatsHandler; use Bastion\Handlers\StatsHandler;
use Bastion\Handlers\TeleCartPulseStatsHandler;
use Bastion\Handlers\TelegramCustomersHandler; use Bastion\Handlers\TelegramCustomersHandler;
use Bastion\Handlers\TelegramHandler; use Bastion\Handlers\TelegramHandler;
@@ -30,4 +31,5 @@ return [
'sendMessageToCustomer' => [SendMessageHandler::class, 'sendMessage'], 'sendMessageToCustomer' => [SendMessageHandler::class, 'sendMessage'],
'testTgMessage' => [TelegramHandler::class, 'testTgMessage'], 'testTgMessage' => [TelegramHandler::class, 'testTgMessage'],
'tgGetMe' => [TelegramHandler::class, 'tgGetMe'], 'tgGetMe' => [TelegramHandler::class, 'tgGetMe'],
'getTeleCartPulseStats' => [TeleCartPulseStatsHandler::class, 'getStats'],
]; ];

View File

@@ -64,6 +64,12 @@ TEXT,
'order_default_status_id' => 1, 'order_default_status_id' => 1,
], ],
'pulse' => [
'api_key' => '',
'batch_size' => 50,
'max_attempts' => 3,
],
'mainpage_blocks' => [ 'mainpage_blocks' => [
[ [
'type' => 'products_feed', 'type' => 'products_feed',

View File

@@ -15,3 +15,7 @@
// Example: Custom cron expression // Example: Custom cron expression
// $scheduler->add(\My\Task\Class::class)->at('0 12 * * *'); // $scheduler->add(\My\Task\Class::class)->at('0 12 * * *');
use Bastion\ScheduledTasks\TeleCartPulseSendEventsTask;
$scheduler->add(TeleCartPulseSendEventsTask::class, 'telecart_pulse_send_events')->everyTenMinutes();

View File

@@ -0,0 +1,33 @@
<?php
use Openguru\OpenCartFramework\Migrations\Migration;
return new class extends Migration {
public function up(): void
{
$tableName = 'telecart_events';
$sql = <<<SQL
CREATE TABLE IF NOT EXISTS `{$tableName}` (
`id` INT(11) UNSIGNED NOT NULL AUTO_INCREMENT,
`event` VARCHAR(255) NOT NULL,
`payload` TEXT NOT NULL,
`idempotency_key` VARCHAR(64) NOT NULL,
`event_time` DATETIME NOT NULL,
`status` VARCHAR(50) NOT NULL DEFAULT 'pending',
`attempts_count` INT(11) UNSIGNED NOT NULL DEFAULT 0,
`error_reason` TEXT DEFAULT NULL,
`created_at` DATETIME NOT NULL,
`updated_at` DATETIME NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `unique_idempotency_key` (`idempotency_key`),
KEY `idx_status` (`status`),
KEY `idx_event_time` (`event_time`),
KEY `idx_updated_at` (`updated_at`)
) DEFAULT CHARACTER SET utf8mb4 COLLATE `utf8mb4_unicode_ci` ENGINE = InnoDB;
SQL;
$this->database->statement($sql);
}
};

View File

@@ -125,7 +125,7 @@ class SchedulerService
$duration = microtime(true) - $startTime; $duration = microtime(true) - $startTime;
$this->updateLastRun($id); $this->updateLastRun($id);
$this->logger->info("Job executed: {$name}", ['duration' => $duration]); $this->logger->debug("Job executed: {$name}", ['duration' => $duration]);
$result->addExecuted($name, $duration); $result->addExecuted($name, $duration);
} catch (Throwable $e) { } catch (Throwable $e) {
$this->updateLastFailure($id, $e->getMessage()); $this->updateLastFailure($id, $e->getMessage());

View File

@@ -0,0 +1,234 @@
<?php
declare(strict_types=1);
namespace Openguru\OpenCartFramework\TeleCartPulse;
use Carbon\Carbon;
use Openguru\OpenCartFramework\QueryBuilder\Builder;
use Openguru\OpenCartFramework\QueryBuilder\Connections\ConnectionInterface;
use Openguru\OpenCartFramework\QueryBuilder\RawExpression;
use Ramsey\Uuid\Uuid;
use RuntimeException;
class TeleCartEvent
{
private const TABLE_NAME = 'telecart_events';
private ConnectionInterface $database;
private Builder $builder;
public function __construct(ConnectionInterface $database, Builder $builder)
{
$this->database = $database;
$this->builder = $builder;
}
/**
* Найти запись по ID
*
* @param int $id ID записи
* @return array|null Данные события или null если не найдено
*/
public function findById(int $id): ?array
{
return $this->builder
->newQuery()
->select(['*'])
->from(self::TABLE_NAME)
->where('id', '=', $id)
->firstOrNull();
}
/**
* Создать новое событие
*
* @param string $event Название события
* @param array $payload Payload события (webapp и eventData)
* @param string|null $idempotencyKey Ключ идемпотентности (если не передан - генерируется UUID)
* @return int ID созданной записи
* @throws RuntimeException Если не удалось создать запись
*/
public function create(string $event, array $payload, ?string $idempotencyKey = null): int
{
// Если ключ не передан - генерируем UUID
if ($idempotencyKey === null) {
$idempotencyKey = $this->generateIdempotencyKey();
}
// Проверяем, существует ли уже событие с таким ключом идемпотентности
$existing = $this->findByIdempotencyKey($idempotencyKey);
if ($existing !== null) {
// Возвращаем ID существующего события
return (int) $existing['id'];
}
$eventTime = Carbon::now('UTC');
$now = Carbon::now('UTC');
$payloadJson = json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
$data = [
'event' => $event,
'payload' => $payloadJson,
'idempotency_key' => $idempotencyKey,
'event_time' => $eventTime->format('Y-m-d H:i:s'),
'status' => 'pending',
'attempts_count' => 0,
'error_reason' => null,
'created_at' => $now->format('Y-m-d H:i:s'),
'updated_at' => $now->format('Y-m-d H:i:s'),
];
$success = $this->database->insert(self::TABLE_NAME, $data);
if (! $success) {
$error = $this->database->getLastError();
// Обработка race condition: если произошла ошибка уникальности,
// значит событие было создано другим запросом, пытаемся найти его
if ($error && isset($error[1]) && strpos($error[1], 'Duplicate entry') !== false) {
$existing = $this->findByIdempotencyKey($idempotencyKey);
if ($existing !== null) {
return (int) $existing['id'];
}
}
$errorMessage = $error ? $error[1] : 'Unknown error';
throw new RuntimeException("Failed to insert telecart event. Error: {$errorMessage}");
}
return $this->database->lastInsertId();
}
/**
* Генерирует idempotency key как UUID
*
* @return string Idempotency key (UUID v4)
*/
private function generateIdempotencyKey(): string
{
return Uuid::uuid4()->toString();
}
/**
* Найти запись по idempotency key
*
* @param string $idempotencyKey Idempotency key
* @return array|null Данные события или null если не найдено
*/
public function findByIdempotencyKey(string $idempotencyKey): ?array
{
return $this->builder
->newQuery()
->select(['*'])
->from(self::TABLE_NAME)
->where('idempotency_key', '=', $idempotencyKey)
->firstOrNull();
}
/**
* Обновить статус события
*
* @param int $id ID события
* @param string $status Новый статус
* @param string|null $errorReason Причина ошибки (если статус failed)
* @return bool true если обновление успешно
*/
public function updateStatus(int $id, string $status, ?string $errorReason = null): bool
{
$data = [
'status' => $status,
];
if ($errorReason !== null) {
$data['error_reason'] = $errorReason;
}
return $this->updateById($id, $data);
}
/**
* Увеличить счетчик попыток отправки
*
* @param int $id ID события
* @return bool true если обновление успешно
*/
public function incrementAttempts(int $id): bool
{
$now = Carbon::now('UTC')->format('Y-m-d H:i:s');
$table = self::TABLE_NAME;
$sql = "UPDATE `$table` SET `attempts_count` = `attempts_count` + 1, updated_at = ? WHERE id = ?";
return $this->database->statement($sql, [$now, $id]);
}
/**
* Обновить запись по ID
*
* @param int $id ID события
* @param array $data Данные для обновления
* @return bool true если обновление успешно
*/
public function updateById(int $id, array $data): bool
{
// Автоматически обновляем updated_at, если он не указан явно
if (!isset($data['updated_at'])) {
$data['updated_at'] = Carbon::now('UTC')->format('Y-m-d H:i:s');
}
return $this->builder->newQuery()
->where('id', '=', $id)
->update(self::TABLE_NAME, $data);
}
/**
* Найти события со статусом pending
*
* @param int $limit Лимит записей
* @return array Массив событий
*/
public function findPending(int $limit = 100): array
{
return $this->builder
->newQuery()
->select(['*'])
->from(self::TABLE_NAME)
->where('status', '=', 'pending')
->orderBy('event_time', 'ASC')
->limit($limit)
->get();
}
/**
* Получить статистику событий по статусам
*
* @return array Статистика: pending, sent, failed
*/
public function getStats(): array
{
$stats = $this->builder
->newQuery()
->select([
'status',
new RawExpression('COUNT(*) as count'),
])
->from(self::TABLE_NAME)
->groupBy(['status'])
->get();
$result = [
'pending' => 0,
'sent' => 0,
'failed' => 0,
];
foreach ($stats as $stat) {
$status = $stat['status'];
if (isset($result[$status])) {
$result[$status] = (int) $stat['count'];
}
}
return $result;
}
}

View File

@@ -0,0 +1,90 @@
<?php
namespace Openguru\OpenCartFramework\TeleCartPulse;
use GuzzleHttp\Client;
use GuzzleHttp\Exception\GuzzleException;
use Openguru\OpenCartFramework\Support\Arr;
use Psr\Log\LoggerInterface;
class TeleCartPulseEventsSender
{
private PayloadSigner $payloadSigner;
private LoggerInterface $logger;
private ?string $apiKey;
public function __construct(
PayloadSigner $payloadSigner,
LoggerInterface $logger,
?string $apiKey = null
) {
$this->payloadSigner = $payloadSigner;
$this->logger = $logger;
$this->apiKey = $apiKey;
}
/**
* Универсальный метод для отправки события из БД
* Payload уже готов в БД в формате для отправки в SaaS
*
* @param array $eventData Данные события из БД (содержит event, payload, и т.д.)
* @return bool true если SaaS принял событие (ответ содержит {"ok": true}), false в противном случае
* @throws GuzzleException
* @throws PayloadSignException
*/
public function sendEvent(array $eventData): bool
{
$payloadJson = $eventData['payload'] ?? '{}';
$payload = json_decode($payloadJson, true);
if (! is_array($payload)) {
throw new \InvalidArgumentException('Invalid payload format');
}
// Проверяем обязательные поля
if (! isset($payload['event'], $payload['campaign_id'], $payload['tracking_id'], $payload['meta'], $payload['timestamp'])) {
throw new \InvalidArgumentException(
'Payload must contain event, campaign_id, tracking_id, meta, and timestamp'
);
}
// Payload уже готов, просто добавляем signature и отправляем
$dataToSend = [
'payload' => $payload,
'signature' => $this->payloadSigner->sign($payload),
];
return $this->executeHttpRequest($dataToSend);
}
/**
* Выполняет HTTP запрос к SaaS-приложению для отправки события
*
* @param array $json Данные для отправки (payload и signature)
* @return bool true если SaaS принял событие (ответ содержит {"ok": true}), false в противном случае
* @throws GuzzleException
*/
private function executeHttpRequest(array $json): bool
{
$this->logger->debug('Pushing event: ' . Arr::get($json, 'payload.event'), [
'data' => $json,
]);
$baseUri = rtrim(env('PULSE_API_HOST', 'http://localhost'), '/') . '/';
$client = new Client([
'base_uri' => $baseUri,
'timeout' => env('PULSE_TIMEOUT', 5.0),
'headers' => [
'Authorization' => 'Bearer ' . $this->apiKey,
'X-TELECART-VERSION' => module_version(),
],
]);
$response = $client->post('events', compact('json'));
$body = $response->getBody()->getContents();
$responseData = json_decode($body, true) ?? [];
// Проверяем, что SaaS принял событие
return isset($responseData['ok']) && $responseData['ok'] === true;
}
}

View File

@@ -4,7 +4,6 @@ namespace Openguru\OpenCartFramework\TeleCartPulse;
use Carbon\Carbon; use Carbon\Carbon;
use GuzzleHttp\Client; use GuzzleHttp\Client;
use GuzzleHttp\Exception\ClientException;
use GuzzleHttp\Exception\GuzzleException; use GuzzleHttp\Exception\GuzzleException;
use Openguru\OpenCartFramework\Cache\CacheInterface; use Openguru\OpenCartFramework\Cache\CacheInterface;
use Openguru\OpenCartFramework\Support\Arr; use Openguru\OpenCartFramework\Support\Arr;
@@ -17,28 +16,27 @@ use Throwable;
class TeleCartPulseService class TeleCartPulseService
{ {
private TelegramInitDataDecoder $initDataDecoder; private TelegramInitDataDecoder $initDataDecoder;
private PayloadSigner $payloadSigner;
private TelegramService $telegramService; private TelegramService $telegramService;
private CacheInterface $cache; private CacheInterface $cache;
private LoggerInterface $logger; private LoggerInterface $logger;
private TeleCartEvent $eventModel;
private ?string $apiKey; private ?string $apiKey;
private ?PayloadSigner $heartbeatPayloadSigner; private ?PayloadSigner $heartbeatPayloadSigner;
private ?string $moduleVersion = null;
public function __construct( public function __construct(
TelegramInitDataDecoder $initDataDecoder, TelegramInitDataDecoder $initDataDecoder,
PayloadSigner $payloadSigner,
TelegramService $telegramService, TelegramService $telegramService,
CacheInterface $cache, CacheInterface $cache,
LoggerInterface $logger, LoggerInterface $logger,
TeleCartEvent $eventModel,
?string $apiKey = null, ?string $apiKey = null,
?PayloadSigner $heartbeatPayloadSigner = null ?PayloadSigner $heartbeatPayloadSigner = null
) { ) {
$this->initDataDecoder = $initDataDecoder; $this->initDataDecoder = $initDataDecoder;
$this->payloadSigner = $payloadSigner;
$this->telegramService = $telegramService; $this->telegramService = $telegramService;
$this->cache = $cache; $this->cache = $cache;
$this->logger = $logger; $this->logger = $logger;
$this->eventModel = $eventModel;
$this->apiKey = $apiKey; $this->apiKey = $apiKey;
$this->heartbeatPayloadSigner = $heartbeatPayloadSigner; $this->heartbeatPayloadSigner = $heartbeatPayloadSigner;
} }
@@ -68,22 +66,67 @@ class TeleCartPulseService
$startParam = Arr::get($decoded, 'start_param', ''); $startParam = Arr::get($decoded, 'start_param', '');
$deserialized = StartParamSerializer::deserialize($startParam); $deserialized = StartParamSerializer::deserialize($startParam);
if ($event === PulseEvents::WEBAPP_OPEN) { // Сохраняем событие в БД только если есть campaign_id и tracking_id
$this->handleWebAppInit($data, $deserialized); if (isset($deserialized['campaign_id'], $deserialized['tracking_id'])) {
} $webapp = Arr::get($data, 'payload.webapp', []);
$eventData = Arr::get($data, 'payload.eventData', []);
$payload = $this->buildEventPayload($event, $deserialized, $webapp, $eventData);
if ($event === PulseEvents::ORDER_CREATED) { $idempotencyKey = Arr::get($data, 'idempotency_key');
$this->handleOrderCreated($data, $deserialized); $eventId = $this->eventModel->create($event, $payload, $idempotencyKey);
$this->logger->debug("Event {$event} fired", [
'event_id' => $eventId,
'event' => $event,
'campaign_id' => $deserialized['campaign_id'],
'tracking_id' => $deserialized['tracking_id'],
]);
} }
} catch (ClientException $exception) {
$contents = (string)$exception->getResponse()->getBody();
$decoded = json_decode($contents, true);
throw new PulseIngestException('TeleCart Pulse API error: ' . $decoded['error'] ?? '', 0, $exception);
} catch (Throwable $exception) { } catch (Throwable $exception) {
$this->logger->error('Failed to handle ingest: ' . $exception->getMessage(), [
'exception' => $exception,
]);
throw new PulseIngestException('Could not handle ingest: ' . $exception->getMessage(), 0, $exception); throw new PulseIngestException('Could not handle ingest: ' . $exception->getMessage(), 0, $exception);
} }
} }
/**
* Формирует полный payload для отправки в SaaS (готовый к отправке)
*
* @param string $event Название события
* @param array $deserialized Десериализованные start_param данные
* @param array $webapp Данные webapp
* @param array $eventData Данные события
* @return array Готовый payload для отправки в SaaS
*/
private function buildEventPayload(string $event, array $deserialized, array $webapp, array $eventData): array
{
// Общие поля для всех событий
$payload = [
'event' => $event,
'campaign_id' => $deserialized['campaign_id'],
'tracking_id' => $deserialized['tracking_id'],
'meta' => [
'domain' => Utils::getCurrentDomain(),
],
'timestamp' => Carbon::now('UTC')->toJSON(),
];
// Добавляем version и platform в meta (для всех событий)
if (isset($webapp['version'])) {
$payload['meta']['version'] = $webapp['version'];
}
if (isset($webapp['platform'])) {
$payload['meta']['platform'] = $webapp['platform'];
}
// Добавляем все данные из eventData в meta
foreach ($eventData as $key => $value) {
$payload['meta'][$key] = $value;
}
return $payload;
}
/** /**
* @throws GuzzleException * @throws GuzzleException
*/ */
@@ -101,6 +144,7 @@ class TeleCartPulseService
'TeleCart Pulse heartbeat prerequisites failed: ' . $e->getMessage(), 'TeleCart Pulse heartbeat prerequisites failed: ' . $e->getMessage(),
['exception' => $e] ['exception' => $e]
); );
return; return;
} }
@@ -132,6 +176,7 @@ class TeleCartPulseService
'TeleCart Pulse heartbeat signing failed: ' . $exception->getMessage(), 'TeleCart Pulse heartbeat signing failed: ' . $exception->getMessage(),
['exception' => $exception] ['exception' => $exception]
); );
return; return;
} }
@@ -143,58 +188,6 @@ class TeleCartPulseService
$this->pushHeartbeat($dataToSend); $this->pushHeartbeat($dataToSend);
} }
/**
* @throws PayloadSignException
* @throws GuzzleException
*/
private function handleWebAppInit(array $data, array $deserialized): void
{
// Campaign Event
if (isset($deserialized['campaign_id'], $deserialized['tracking_id'])) {
$payload = [
'event' => PulseEvents::WEBAPP_OPEN,
'campaign_id' => $deserialized['campaign_id'],
'tracking_id' => $deserialized['tracking_id'],
'meta' => [
'domain' => Utils::getCurrentDomain(),
'version' => Arr::get($data, 'webapp.version'),
'platform' => Arr::get($data, 'webapp.platform'),
],
'timestamp' => Carbon::now('UTC')->toJSON(),
];
$dataToSend = [
'payload' => $payload,
'signature' => $this->payloadSigner->sign($payload),
];
$this->pushEvent($dataToSend);
}
}
/**
* @throws GuzzleException
*/
private function pushEvent(array $json): void
{
$this->logger->debug('Pushing event: ' . Arr::get($json, 'payload.event'), [
'data' => $json,
]);
$baseUri = rtrim(env('PULSE_API_HOST', 'http://localhost'), '/') . '/';
$client = new Client([
'base_uri' => $baseUri,
'timeout' => env('PULSE_TIMEOUT', 5.0),
'headers' => [
'Authorization' => 'Bearer ' . $this->apiKey,
'X-TELECART-VERSION' => module_version(),
],
]);
$client->post('events', compact('json'));
}
/** /**
* @throws GuzzleException * @throws GuzzleException
*/ */
@@ -212,30 +205,4 @@ class TeleCartPulseService
$client->post('heartbeat', compact('json')); $client->post('heartbeat', compact('json'));
} }
private function handleOrderCreated(array $data, array $deserialized): void
{
if (isset($deserialized['campaign_id'], $deserialized['tracking_id'])) {
$payload = [
'event' => PulseEvents::ORDER_CREATED,
'campaign_id' => $deserialized['campaign_id'],
'tracking_id' => $deserialized['tracking_id'],
'meta' => [
'domain' => Utils::getCurrentDomain(),
'version' => Arr::get($data, 'payload.webapp.version'),
'platform' => Arr::get($data, 'payload.webapp.platform'),
'order_id' => Arr::get($data, 'payload.eventData.order_id'),
'currency' => Arr::get($data, 'payload.eventData.currency'),
],
'timestamp' => Carbon::now('UTC')->toJSON(),
];
$dataToSend = [
'payload' => $payload,
'signature' => $this->payloadSigner->sign($payload),
];
$this->pushEvent($dataToSend);
}
}
} }

View File

@@ -25,13 +25,21 @@ class TeleCartPulseServiceProvider extends ServiceProvider
return new TeleCartPulseService( return new TeleCartPulseService(
$app->get(TelegramInitDataDecoder::class), $app->get(TelegramInitDataDecoder::class),
$app->get(PayloadSigner::class),
$app->get(TelegramService::class), $app->get(TelegramService::class),
$app->get(CacheInterface::class), $app->get(CacheInterface::class),
$app->get(LoggerInterface::class), $app->get(LoggerInterface::class),
$app->get(TeleCartEvent::class),
$app->getConfigValue('pulse.api_key'), $app->getConfigValue('pulse.api_key'),
$heartbeatSigner, $heartbeatSigner,
); );
}); });
$this->container->singleton(TeleCartPulseEventsSender::class, function (Container $app) {
return new TeleCartPulseEventsSender(
$app->get(PayloadSigner::class),
$app->get(LoggerInterface::class),
$app->getConfigValue('pulse.api_key'),
);
});
} }
} }

View File

@@ -34,8 +34,6 @@ class SignatureValidator
public function validate(Request $request): void public function validate(Request $request): void
{ {
if ($this->settings->config()->getApp()->isAppDebug()) { if ($this->settings->config()->getApp()->isAppDebug()) {
$this->logger->warning('Dev Mode is enabled. Ignoring Signature Validation.');
return; return;
} }

View File

@@ -283,10 +283,15 @@ class SchedulerServiceTest extends TestCase
$registryMock->shouldReceive('getJobs')->andReturn([$jobMock]); $registryMock->shouldReceive('getJobs')->andReturn([$jobMock]);
$this->loggerMock->shouldReceive('info') $this->loggerMock->shouldReceive('debug')
->with('Job executed: TestJob', Mockery::type('array')) ->with('Job executed: TestJob', Mockery::type('array'))
->once(); ->once();
// Разрешаем любые другие вызовы логгера (на случай неожиданных вызовов)
$this->loggerMock->shouldReceive('error')
->zeroOrMoreTimes()
->andReturn(true);
// Inject registry for testing // Inject registry for testing
$this->scheduler->setRegistry($registryMock); $this->scheduler->setRegistry($registryMock);