API Events
Nguồn:
src/components/kafka-consumer/component.ts,src/common/queues.ts,src/services/invoice-issuance-queue.service.ts,src/common/websocket.ts.
1. Inbound — Kafka
| Topic | Producer | Handler | Khoá Idempotency | Chế độ lỗi |
|---|---|---|---|---|
KafkaTopics.PAYMENT_SUCCESS | @nx/sale | InvoiceWorkerService.handlePaymentSuccess | hoá đơn unique-một-phần (sourceType, sourceId) | bỏ qua nếu không COMPLETED / đã có hoá đơn active; log + return |
CDCKafkaTopics.MERCHANT (Debezium) | @nx/commerce (CDC) | InvoiceWorkerService.handleMerchantCDC | (principalType, principalId) trên TaxInfo | op c/r đối chiếu, u diff-và-bỏ-qua, d bị bỏ qua |
Consumer:
autocommit: true,fallbackMode: 'earliest', group mặc địnhSVC-00150-INVOICE_CONSUMER_GROUP. Một consumer điều phối bằng switch theomessage.topic.
2. Outbound — Kafka
Không có. Service không sản xuất topic Kafka. Thông báo downstream chỉ qua WebSocket và callback nhà cung cấp/webhook.
3. Inbound — BullMQ
3 phân vùng mỗi loại queue; phân vùng chọn bằng
getPartitionByKey(orderId)(JavahashCodemod 3 — xác định theo từng đơn).
| Loại queue | Job | Producer | Handler | Concurrency |
|---|---|---|---|---|
issuance (@nx/invoice/queue/issuance-{01,02,03}) | invoice-issuance (jobType 000_ISSUANCE) | worker / cron / retry / claim-expiry | InvoiceIssuanceQueueService.processIssuanceJob → issueOrder | APP_ENV_INVOICE_ISSUANCE_WORKER_CONCURRENCY (mặc định 10) |
issuance | invoice-issuance (jobType 100_ADJUSTMENT) | luồng adjustment | processIssuanceJob → _processAdjustmentJob | (cùng worker) |
claim-expiry (@nx/invoice/queue/claim-expiry-{01,02,03}) | claim-expiry | payment success (BUYER_SELF_SERVICE) | processClaimExpiryJob | mặc định 3 |
Job ID (idempotency):
| Job | jobId |
|---|---|
| Issuance | orderId |
| Retry | ${orderId}:retry:${n} |
| Adjustment | adjust:${invoiceId} (removeOnComplete/Fail: true) |
| Claim expiry | expiry:${orderId} (delay đến deadline) |
4. Outbound — BullMQ
Tự enqueue bởi queue service (không có consumer BullMQ bên ngoài).
| Trigger | Job | Đích |
|---|---|---|
| Thanh toán REAL_TIME / cron / resolve claim | enqueueIssuance | phân vùng issuance |
| Lỗi phát hành tạm thời | enqueueIssuance (delay [5,15,60]phút) | phân vùng issuance |
| Yêu cầu adjustment | enqueueAdjustment | phân vùng issuance |
| Thanh toán BUYER_SELF_SERVICE | enqueueClaimExpiry (delay = deadline − now) | phân vùng claim-expiry |
5. Phát WebSocket
| Topic | Rooms | Trigger | Payload |
|---|---|---|---|
observation/invoice/invoice | merchants/{merchantId}, merchants/{merchantId}/invoices, sale-orders/{saleOrderId}/invoices | thay đổi trạng thái phát hành | → §6 |
Phát bởi
InvoiceSocketEventService.notifyInvoiceUpdate(best-effort,Promise.allSettled; no-op nếu emitter chưa sẵn sàng).
6. Payload Schemas
// Inbound: KafkaTopics.PAYMENT_SUCCESS (TSalePaymentSuccess, from @nx/core)
interface TSalePaymentSuccess {
saleOrderId: string;
saleOrderNumber: string;
merchantId: string;
saleChannelId: string;
createdBy: string;
// ...additional sale fields
}
// Inbound: CDCKafkaTopics.MERCHANT (Debezium envelope)
interface TDebeziumMessage<TMerchantPgRow> {
payload: {
op: 'c' | 'r' | 'u' | 'd';
before: TMerchantPgRow | null;
after: TMerchantPgRow | null; // metadata.tax read here
};
}
// BullMQ issuance job (src/common/queues.ts)
interface IInvoiceIssuanceJob {
orderId: string;
merchantId: string;
triggeredBy: string;
isRetry?: boolean;
jobType?: '000_ISSUANCE' | '100_ADJUSTMENT';
invoiceId?: string; // required for ADJUSTMENT
}
// BullMQ claim-expiry job
interface IClaimExpiryJob {
orderId: string;
merchantId: string;
}
// WebSocket: observation/invoice/invoice (IInvoiceSocketEventPayload)
interface IInvoiceSocketEventPayload {
invoiceId: string;
sourceId: string;
sourceNumber: string;
issuanceStatus: string;
invoiceNumber: string | null;
providerStatus: string | null;
taxAuthorityStatus: number | null;
eventSource: string;
}7. Idempotency & Thứ tự
| Kênh | Phân phối | Thứ tự | Phục hồi |
|---|---|---|---|
PAYMENT_SUCCESS | at-least-once | theo phân vùng Kafka | guard hoá đơn-active khiến re-delivery thành no-op |
Merchant CDC | at-least-once | theo phân vùng Kafka | op=u diff so với TaxInfo đã lưu → không đổi thì bỏ qua |
| Issuance job | theo jobId = orderId | theo phân vùng BullMQ (ổn định theo đơn) | retry với backoff; DLQ → FAILED + audit |
| Claim-expiry job | theo jobId = expiry:${orderId} | delayed | bắn lại an toàn (EXPIRED short-circuit) |