API Events
1. Inbound — Kafka
| Topic | Constant | Producer | Handler | Failure Mode |
|---|---|---|---|---|
signal.activity-notification | KafkaTopics.SIGNAL_ACTIVITY_NOTIFICATION | Sale / Payment (khi thanh toán thành công) | ActivityNotificationWorkerService.handleActivityNotification | log khi lỗi, không retry/DLQ; offset commit theo từng message sau khi xử lý |
Config consumer (NotificationKafkaComponent):
| Cài đặt | Giá trị |
|---|---|
autocommit | false (message.commit() thủ công sau mỗi handler) |
fallbackMode | latest |
clientId | APP_ENV_KAFKA_CLIENT_ID ?? ${ServiceCodes.SIGNAL}_NOTIF_CONSUMER |
groupId | APP_ENV_KAFKA_GROUP_ID ?? ${ServiceCodes.SIGNAL}_NOTIF_CONSUMER_GROUP |
| deserializers | key: string, value: json |
| shutdown | SIGTERM / SIGINT → consumer.close() graceful |
2. Outbound — Kafka
Không. Signal chỉ consume; nó không bao giờ sản xuất sự kiện Kafka.
3. Inbound — BullMQ
Không.
4. Outbound — BullMQ
Không.
5. WebSocket Emissions
| Topic | Room | Trigger | Emit bởi |
|---|---|---|---|
observation/signal/notification/created | signal/notification/{recipientId} | Sau khi lưu hàng notification | NotificationSocketEventService.notifyActivityNotificationCreated |
| (caller định nghĩa) | broadcast (all) | POST /socket/websocket/clients/broadcast | SignalEventService.broadcast |
| (caller định nghĩa) | {roomName} | POST /socket/websocket/clients/rooms/{roomName}/send | SignalEventService.sendToRoom |
| (caller định nghĩa) | single client | POST /socket/websocket/clients/{clientId}/send hoặc cross-service WebSocketEmitter.toClient | SignalEventService.sendToClient |
Builder topic/room:
NotificationWebSocketTopics(topics.ts) vàNotificationWebSocketRooms(rooms.ts), cả hai dựng quaWebSocketTopics.build/WebSocketRooms.buildtừ@nx/core.
6. Payload Schemas
Message Kafka inbound — TActivityNotificationMessage (packages/core/src/common/kafka/types.ts):
ts
type TActivityNotificationMessage = {
eventType: TActivityNotificationTypes; // 'PAYMENT_SUCCESS'
recipientScope: 'org' | 'merchant' | 'users';
actorId: string;
actor: { id: string; name: string; type: string };
organizerId: string;
merchantId?: string;
recipientIds?: string[]; // dùng khi scope = 'users'
data: Record<string, unknown>; // vd { payment: { subject } }
occurredAt: string;
};Payload WebSocket outbound (notification.created):
ts
interface NotificationCreatedPayload {
id: string;
type: string; // ActivityNotification.type
organizerId: string | null;
content: string;
actionUrl: string | null;
data: Record<string, unknown>;
isRead: boolean;
createdAt: string;
}7. Idempotency & Thứ tự
| Topic | Delivery | Thứ tự | Phục hồi |
|---|---|---|---|
signal.activity-notification | at-least-once | theo partition | không dedup — một message re-deliver tạo hàng ActivityNotification trùng; cần dọn thủ công |
Xem ADR-0002 cho thiết kế persist-then-push và lỗ hổng idempotency đã biết.