API Events
Ledger là một self-loop trên Kafka: role
apiproduce tớiledger.generate, roleworkerconsume nó. Không service nào khác publish hay subscribe topic này. Không có queue BullMQ.
1. Inbound — Kafka
| Topic | Constant | Producer | Handler | Idempotency Key | Failure Mode |
|---|---|---|---|---|---|
ledger.generate | KafkaTopics.LEDGER_GENERATE | Service này (LedgerQueueService, RecoveryComponent) | LedgerWorkerService.handleGeneration(ledgerId) | ledgerId (message key) | setRejected + commit; không auto-replay |
Config consumer (src/components/kafka.component.ts):
| Cài đặt | Giá trị |
|---|---|
| Count | APP_ENV_KAFKA_CONSUMER_COUNT consumer |
autocommit | false — message.commit() thủ công chỉ sau khi handleGeneration thành công |
| Deserializer | jsonDeserializer (value đã parse) |
| Start | mode: 'committed', fallbackMode: 'latest' |
groupId | APP_ENV_KAFKA_GROUP_ID (mặc định SVC-00060-LEDGER_GROUP) |
| Timeouts | requestTimeout 60000, connectTimeout 30000, maxWaitTime 5000 |
| Role gate | Tự bỏ qua trừ khi role worker active |
2. Outbound — Kafka
| Topic | Constant | Trigger | Consumers | Payload |
|---|---|---|---|---|
ledger.generate | KafkaTopics.LEDGER_GENERATE | Enqueue (handleEnqueueGeneration/batch/retry/regenerate) + recovery re-enqueue job kẹt | Role worker của service này | → §6 |
Config producer:
| Cài đặt | Giá trị |
|---|---|
| Serializer | key stringSerializer, value jsonSerializer (object thô) |
requestTimeout / connectTimeout | 60000 / 30000 |
acks / idempotent | đã comment — mặc định của helper |
| Client ID | SVC-00060-LEDGER (override APP_ENV_KAFKA_CLIENT_ID) |
| SASL | APP_ENV_KAFKA_SASL_ENABLE (cơ chế mặc định SCRAM-SHA-512) |
3. Inbound — BullMQ
Không. Phục hồi job kẹt là một quét setInterval trong RecoveryComponent, không phải BullMQ.
4. Outbound — BullMQ
Không.
5. WebSocket Emissions
Chỉ role worker — emit từ
LedgerNotificationService.notifyJobStatusquaWebSocketEmitterdựa trên Redis. Best-effort; lỗi emit được log, không bao giờ chặn pipeline.
| Topic | Room | Trigger | Payload |
|---|---|---|---|
observation/ledger/job/status (LedgerEventChannels.LEDGER_JOB_STATUS) | LedgerRooms.getProcessRoom({ merchantId }) → wr:ledger/<merchantId>/process | Job vào PROCESSING / COMPLETED / REJECTED trong handleGeneration | → §6 |
6. Payload Schemas
ts
// ledger.generate — TQueueLedgerGenerateMessage (core/src/common/kafka/types.ts)
export interface TQueueLedgerGenerateMessage {
ledgerId: string; // Snowflake — worker nạp full ledger từ DB
enqueueTime: string; // ISO 8601
// tuỳ chọn, hiện chưa dùng (comment tại producer):
// ledgerType?, merchantId?, period?, version?, isRetry?
}
// WS observation/ledger/job/status — TLedgerJobStatusPayloadSchema (core/src/models/payloads/ledger.payload.ts)
export interface TLedgerJobStatusPayloadSchema {
ledgerId: string;
merchantId: string;
type: string; // TLedgerIdentifier
period: string;
jobStatus: 'DRAFT' | 'PENDING' | 'PROCESSING' | 'COMPLETED' | 'PARTIAL' | 'REJECTED';
attemptCount: number;
failureReason?: {
default: string;
en?: string;
vi?: string;
errorCode: string; // TLedgerJobErrorCode
};
}7. Idempotency & Thứ tự
| Bề mặt | Delivery | Thứ tự | Phục hồi |
|---|---|---|---|
Kafka ledger.generate | at-least-once (chế độ committed) | theo ledgerId qua message key | retry thủ công (POST /ledgers/:id/retry) hoặc quét recovery — không auto-replay |
| Job claim | hiệu ứng exactly-once | UPDATE WHERE status=PENDING/PROCESSING nguyên tử | delivery trùng lặp no-op (setProcessing trả null) |
| WebSocket | best-effort | không (broadcast) | client refetch qua GET /ledgers/:id/status khi reconnect |
8. Trang liên quan
- Generation Pipeline — luồng worker đầy đủ
- Integration
- Architecture — Kịch bản runtime