Skip to content

API Events

Ledger is a self-loop on Kafka: the api role produces to ledger.generate, the worker role consumes it. No other service publishes to or subscribes to this topic. There are no BullMQ queues.

1. Inbound — Kafka

TopicConstantProducerHandlerIdempotency KeyFailure Mode
ledger.generateKafkaTopics.LEDGER_GENERATEThis service (LedgerQueueService, RecoveryComponent)LedgerWorkerService.handleGeneration(ledgerId)ledgerId (message key)setRejected + commit; no auto-replay

Consumer config (src/components/kafka.component.ts):

SettingValue
CountAPP_ENV_KAFKA_CONSUMER_COUNT consumers
autocommitfalse — manual message.commit() only after handleGeneration succeeds
DeserializerjsonDeserializer (value already parsed)
Startmode: 'committed', fallbackMode: 'latest'
groupIdAPP_ENV_KAFKA_GROUP_ID (default SVC-00060-LEDGER_GROUP)
TimeoutsrequestTimeout 60000, connectTimeout 30000, maxWaitTime 5000
Role gateSelf-skips unless worker role active

2. Outbound — Kafka

TopicConstantTriggerConsumersPayload
ledger.generateKafkaTopics.LEDGER_GENERATEEnqueue (handleEnqueueGeneration/batch/retry/regenerate) + recovery re-enqueue of stalled jobsThis service's worker role→ §6

Producer config:

SettingValue
Serializerkey stringSerializer, value jsonSerializer (raw objects)
requestTimeout / connectTimeout60000 / 30000
acks / idempotentcommented out — helper defaults
Client IDSVC-00060-LEDGER (override APP_ENV_KAFKA_CLIENT_ID)
SASLAPP_ENV_KAFKA_SASL_ENABLE (default mechanism SCRAM-SHA-512)

3. Inbound — BullMQ

None. Stalled-job recovery is a setInterval sweep in RecoveryComponent, not BullMQ.

4. Outbound — BullMQ

None.

5. WebSocket Emissions

Worker role only — emitted from LedgerNotificationService.notifyJobStatus via the Redis-backed WebSocketEmitter. Best-effort; emit failures are logged, never block the pipeline.

TopicRoomTriggerPayload
observation/ledger/job/status (LedgerEventChannels.LEDGER_JOB_STATUS)LedgerRooms.getProcessRoom({ merchantId })wr:ledger/<merchantId>/processJob enters PROCESSING / COMPLETED / REJECTED during handleGeneration→ §6

6. Payload Schemas

ts
// ledger.generate — TQueueLedgerGenerateMessage (core/src/common/kafka/types.ts)
export interface TQueueLedgerGenerateMessage {
  ledgerId: string;       // Snowflake — worker loads full ledger from DB
  enqueueTime: string;    // ISO 8601
  // optional, currently unused (commented at producer site):
  // ledgerType?, merchantId?, period?, version?, isRetry?
}

// WS observation/ledger/job/status — TLedgerJobStatusPayloadSchema (core/src/models/payloads/ledger.payload.ts)
export interface TLedgerJobStatusPayloadSchema {
  ledgerId: string;
  merchantId: string;
  type: string;           // TLedgerIdentifier
  period: string;
  jobStatus: 'DRAFT' | 'PENDING' | 'PROCESSING' | 'COMPLETED' | 'PARTIAL' | 'REJECTED';
  attemptCount: number;
  failureReason?: {
    default: string;
    en?: string;
    vi?: string;
    errorCode: string;    // TLedgerJobErrorCode
  };
}

7. Idempotency & Ordering

SurfaceDeliveryOrderingRecovery
Kafka ledger.generateat-least-once (committed mode)per-ledgerId via message keymanual retry (POST /ledgers/:id/retry) or recovery sweep — no auto-replay
Job claimexactly-once effectatomic UPDATE WHERE status=PENDING/PROCESSINGduplicate deliveries no-op (setProcessing returns null)
WebSocketbest-effortnone (broadcast)client refetches via GET /ledgers/:id/status on reconnect

Proprietary and Confidential. Unauthorized copying, distribution, or use of this software is strictly prohibited.