Skip to content

API Events

Ledger là một self-loop trên Kafka: role api produce tới ledger.generate, role worker consume nó. Không service nào khác publish hay subscribe topic này. Không có queue BullMQ.

1. Inbound — Kafka

TopicConstantProducerHandlerIdempotency KeyFailure Mode
ledger.generateKafkaTopics.LEDGER_GENERATEService này (LedgerQueueService, RecoveryComponent)LedgerWorkerService.handleGeneration(ledgerId)ledgerId (message key)setRejected + commit; không auto-replay

Config consumer (src/components/kafka.component.ts):

Cài đặtGiá trị
CountAPP_ENV_KAFKA_CONSUMER_COUNT consumer
autocommitfalsemessage.commit() thủ công chỉ sau khi handleGeneration thành công
DeserializerjsonDeserializer (value đã parse)
Startmode: 'committed', fallbackMode: 'latest'
groupIdAPP_ENV_KAFKA_GROUP_ID (mặc định SVC-00060-LEDGER_GROUP)
TimeoutsrequestTimeout 60000, connectTimeout 30000, maxWaitTime 5000
Role gateTự bỏ qua trừ khi role worker active

2. Outbound — Kafka

TopicConstantTriggerConsumersPayload
ledger.generateKafkaTopics.LEDGER_GENERATEEnqueue (handleEnqueueGeneration/batch/retry/regenerate) + recovery re-enqueue job kẹtRole worker của service này→ §6

Config producer:

Cài đặtGiá trị
Serializerkey stringSerializer, value jsonSerializer (object thô)
requestTimeout / connectTimeout60000 / 30000
acks / idempotentđã comment — mặc định của helper
Client IDSVC-00060-LEDGER (override APP_ENV_KAFKA_CLIENT_ID)
SASLAPP_ENV_KAFKA_SASL_ENABLE (cơ chế mặc định SCRAM-SHA-512)

3. Inbound — BullMQ

Không. Phục hồi job kẹt là một quét setInterval trong RecoveryComponent, không phải BullMQ.

4. Outbound — BullMQ

Không.

5. WebSocket Emissions

Chỉ role worker — emit từ LedgerNotificationService.notifyJobStatus qua WebSocketEmitter dựa trên Redis. Best-effort; lỗi emit được log, không bao giờ chặn pipeline.

TopicRoomTriggerPayload
observation/ledger/job/status (LedgerEventChannels.LEDGER_JOB_STATUS)LedgerRooms.getProcessRoom({ merchantId })wr:ledger/<merchantId>/processJob vào PROCESSING / COMPLETED / REJECTED trong handleGeneration→ §6

6. Payload Schemas

ts
// ledger.generate — TQueueLedgerGenerateMessage (core/src/common/kafka/types.ts)
export interface TQueueLedgerGenerateMessage {
  ledgerId: string;       // Snowflake — worker nạp full ledger từ DB
  enqueueTime: string;    // ISO 8601
  // tuỳ chọn, hiện chưa dùng (comment tại producer):
  // ledgerType?, merchantId?, period?, version?, isRetry?
}

// WS observation/ledger/job/status — TLedgerJobStatusPayloadSchema (core/src/models/payloads/ledger.payload.ts)
export interface TLedgerJobStatusPayloadSchema {
  ledgerId: string;
  merchantId: string;
  type: string;           // TLedgerIdentifier
  period: string;
  jobStatus: 'DRAFT' | 'PENDING' | 'PROCESSING' | 'COMPLETED' | 'PARTIAL' | 'REJECTED';
  attemptCount: number;
  failureReason?: {
    default: string;
    en?: string;
    vi?: string;
    errorCode: string;    // TLedgerJobErrorCode
  };
}

7. Idempotency & Thứ tự

Bề mặtDeliveryThứ tựPhục hồi
Kafka ledger.generateat-least-once (chế độ committed)theo ledgerId qua message keyretry thủ công (POST /ledgers/:id/retry) hoặc quét recovery — không auto-replay
Job claimhiệu ứng exactly-onceUPDATE WHERE status=PENDING/PROCESSING nguyên tửdelivery trùng lặp no-op (setProcessing trả null)
WebSocketbest-effortkhông (broadcast)client refetch qua GET /ledgers/:id/status khi reconnect

8. Trang liên quan

Proprietary and Confidential. Unauthorized copying, distribution, or use of this software is strictly prohibited.