Skip to content

API Events

1. Inbound — Kafka

TopicConstantProducerHandlerFailure Mode
signal.activity-notificationKafkaTopics.SIGNAL_ACTIVITY_NOTIFICATIONSale / Payment (on payment success)ActivityNotificationWorkerService.handleActivityNotificationlogged on error, no retry/DLQ; offset committed per-message after handling

Consumer config (NotificationKafkaComponent):

SettingValue
autocommitfalse (manual message.commit() after each handler)
fallbackModelatest
clientIdAPP_ENV_KAFKA_CLIENT_ID ?? ${ServiceCodes.SIGNAL}_NOTIF_CONSUMER
groupIdAPP_ENV_KAFKA_GROUP_ID ?? ${ServiceCodes.SIGNAL}_NOTIF_CONSUMER_GROUP
deserializerskey: string, value: json
shutdownSIGTERM / SIGINT → graceful consumer.close()

2. Outbound — Kafka

None. Signal consumes only; it never produces Kafka events.

3. Inbound — BullMQ

None.

4. Outbound — BullMQ

None.

5. WebSocket Emissions

TopicRoomTriggerEmitted by
observation/signal/notification/createdsignal/notification/{recipientId}After persisting notification rowsNotificationSocketEventService.notifyActivityNotificationCreated
(caller-defined)broadcast (all)POST /socket/websocket/clients/broadcastSignalEventService.broadcast
(caller-defined){roomName}POST /socket/websocket/clients/rooms/{roomName}/sendSignalEventService.sendToRoom
(caller-defined)single clientPOST /socket/websocket/clients/{clientId}/send or cross-service WebSocketEmitter.toClientSignalEventService.sendToClient

Topic/room builders: NotificationWebSocketTopics (topics.ts) and NotificationWebSocketRooms (rooms.ts), both built via WebSocketTopics.build / WebSocketRooms.build from @nx/core.

6. Payload Schemas

Inbound Kafka messageTActivityNotificationMessage (packages/core/src/common/kafka/types.ts):

ts
type TActivityNotificationMessage = {
  eventType: TActivityNotificationTypes;          // 'PAYMENT_SUCCESS'
  recipientScope: 'org' | 'merchant' | 'users';
  actorId: string;
  actor: { id: string; name: string; type: string };
  organizerId: string;
  merchantId?: string;
  recipientIds?: string[];                         // used when scope = 'users'
  data: Record<string, unknown>;                   // e.g. { payment: { subject } }
  occurredAt: string;
};

Outbound WebSocket payload (notification.created):

ts
interface NotificationCreatedPayload {
  id: string;
  type: string;                  // ActivityNotification.type
  organizerId: string | null;
  content: string;
  actionUrl: string | null;
  data: Record<string, unknown>;
  isRead: boolean;
  createdAt: string;
}

7. Idempotency & Ordering

TopicDeliveryOrderingRecovery
signal.activity-notificationat-least-onceper-partitionno dedup — a re-delivered message creates duplicate ActivityNotification rows; manual cleanup required

See ADR-0002 for the persist-then-push design and the known idempotency gap.

Proprietary and Confidential. Unauthorized copying, distribution, or use of this software is strictly prohibited.