API Events
1. Inbound — Kafka
| Topic | Constant | Producer | Handler | Failure Mode |
|---|---|---|---|---|
signal.activity-notification | KafkaTopics.SIGNAL_ACTIVITY_NOTIFICATION | Sale / Payment (on payment success) | ActivityNotificationWorkerService.handleActivityNotification | logged on error, no retry/DLQ; offset committed per-message after handling |
Consumer config (NotificationKafkaComponent):
| Setting | Value |
|---|---|
autocommit | false (manual message.commit() after each handler) |
fallbackMode | latest |
clientId | APP_ENV_KAFKA_CLIENT_ID ?? ${ServiceCodes.SIGNAL}_NOTIF_CONSUMER |
groupId | APP_ENV_KAFKA_GROUP_ID ?? ${ServiceCodes.SIGNAL}_NOTIF_CONSUMER_GROUP |
| deserializers | key: string, value: json |
| shutdown | SIGTERM / SIGINT → graceful consumer.close() |
2. Outbound — Kafka
None. Signal consumes only; it never produces Kafka events.
3. Inbound — BullMQ
None.
4. Outbound — BullMQ
None.
5. WebSocket Emissions
| Topic | Room | Trigger | Emitted by |
|---|---|---|---|
observation/signal/notification/created | signal/notification/{recipientId} | After persisting notification rows | NotificationSocketEventService.notifyActivityNotificationCreated |
| (caller-defined) | broadcast (all) | POST /socket/websocket/clients/broadcast | SignalEventService.broadcast |
| (caller-defined) | {roomName} | POST /socket/websocket/clients/rooms/{roomName}/send | SignalEventService.sendToRoom |
| (caller-defined) | single client | POST /socket/websocket/clients/{clientId}/send or cross-service WebSocketEmitter.toClient | SignalEventService.sendToClient |
Topic/room builders:
NotificationWebSocketTopics(topics.ts) andNotificationWebSocketRooms(rooms.ts), both built viaWebSocketTopics.build/WebSocketRooms.buildfrom@nx/core.
6. Payload Schemas
Inbound Kafka message — TActivityNotificationMessage (packages/core/src/common/kafka/types.ts):
ts
type TActivityNotificationMessage = {
eventType: TActivityNotificationTypes; // 'PAYMENT_SUCCESS'
recipientScope: 'org' | 'merchant' | 'users';
actorId: string;
actor: { id: string; name: string; type: string };
organizerId: string;
merchantId?: string;
recipientIds?: string[]; // used when scope = 'users'
data: Record<string, unknown>; // e.g. { payment: { subject } }
occurredAt: string;
};Outbound WebSocket payload (notification.created):
ts
interface NotificationCreatedPayload {
id: string;
type: string; // ActivityNotification.type
organizerId: string | null;
content: string;
actionUrl: string | null;
data: Record<string, unknown>;
isRead: boolean;
createdAt: string;
}7. Idempotency & Ordering
| Topic | Delivery | Ordering | Recovery |
|---|---|---|---|
signal.activity-notification | at-least-once | per-partition | no dedup — a re-delivered message creates duplicate ActivityNotification rows; manual cleanup required |
See ADR-0002 for the persist-then-push design and the known idempotency gap.