API Events
Ledger is a self-loop on Kafka: the
apirole produces toledger.generate, theworkerrole consumes it. No other service publishes to or subscribes to this topic. There are no BullMQ queues.
1. Inbound — Kafka
| Topic | Constant | Producer | Handler | Idempotency Key | Failure Mode |
|---|---|---|---|---|---|
ledger.generate | KafkaTopics.LEDGER_GENERATE | This service (LedgerQueueService, RecoveryComponent) | LedgerWorkerService.handleGeneration(ledgerId) | ledgerId (message key) | setRejected + commit; no auto-replay |
Consumer config (src/components/kafka.component.ts):
| Setting | Value |
|---|---|
| Count | APP_ENV_KAFKA_CONSUMER_COUNT consumers |
autocommit | false — manual message.commit() only after handleGeneration succeeds |
| Deserializer | jsonDeserializer (value already parsed) |
| Start | mode: 'committed', fallbackMode: 'latest' |
groupId | APP_ENV_KAFKA_GROUP_ID (default SVC-00060-LEDGER_GROUP) |
| Timeouts | requestTimeout 60000, connectTimeout 30000, maxWaitTime 5000 |
| Role gate | Self-skips unless worker role active |
2. Outbound — Kafka
| Topic | Constant | Trigger | Consumers | Payload |
|---|---|---|---|---|
ledger.generate | KafkaTopics.LEDGER_GENERATE | Enqueue (handleEnqueueGeneration/batch/retry/regenerate) + recovery re-enqueue of stalled jobs | This service's worker role | → §6 |
Producer config:
| Setting | Value |
|---|---|
| Serializer | key stringSerializer, value jsonSerializer (raw objects) |
requestTimeout / connectTimeout | 60000 / 30000 |
acks / idempotent | commented out — helper defaults |
| Client ID | SVC-00060-LEDGER (override APP_ENV_KAFKA_CLIENT_ID) |
| SASL | APP_ENV_KAFKA_SASL_ENABLE (default mechanism SCRAM-SHA-512) |
3. Inbound — BullMQ
None. Stalled-job recovery is a setInterval sweep in RecoveryComponent, not BullMQ.
4. Outbound — BullMQ
None.
5. WebSocket Emissions
Worker role only — emitted from
LedgerNotificationService.notifyJobStatusvia the Redis-backedWebSocketEmitter. Best-effort; emit failures are logged, never block the pipeline.
| Topic | Room | Trigger | Payload |
|---|---|---|---|
observation/ledger/job/status (LedgerEventChannels.LEDGER_JOB_STATUS) | LedgerRooms.getProcessRoom({ merchantId }) → wr:ledger/<merchantId>/process | Job enters PROCESSING / COMPLETED / REJECTED during handleGeneration | → §6 |
6. Payload Schemas
ts
// ledger.generate — TQueueLedgerGenerateMessage (core/src/common/kafka/types.ts)
export interface TQueueLedgerGenerateMessage {
ledgerId: string; // Snowflake — worker loads full ledger from DB
enqueueTime: string; // ISO 8601
// optional, currently unused (commented at producer site):
// ledgerType?, merchantId?, period?, version?, isRetry?
}
// WS observation/ledger/job/status — TLedgerJobStatusPayloadSchema (core/src/models/payloads/ledger.payload.ts)
export interface TLedgerJobStatusPayloadSchema {
ledgerId: string;
merchantId: string;
type: string; // TLedgerIdentifier
period: string;
jobStatus: 'DRAFT' | 'PENDING' | 'PROCESSING' | 'COMPLETED' | 'PARTIAL' | 'REJECTED';
attemptCount: number;
failureReason?: {
default: string;
en?: string;
vi?: string;
errorCode: string; // TLedgerJobErrorCode
};
}7. Idempotency & Ordering
| Surface | Delivery | Ordering | Recovery |
|---|---|---|---|
Kafka ledger.generate | at-least-once (committed mode) | per-ledgerId via message key | manual retry (POST /ledgers/:id/retry) or recovery sweep — no auto-replay |
| Job claim | exactly-once effect | atomic UPDATE WHERE status=PENDING/PROCESSING | duplicate deliveries no-op (setProcessing returns null) |
| WebSocket | best-effort | none (broadcast) | client refetches via GET /ledgers/:id/status on reconnect |
8. Related Pages
- Generation Pipeline — full worker flow
- Integration
- Architecture — Runtime Scenarios