Skip to content

API Events

Nguồn: src/components/kafka-consumer/component.ts, src/common/queues.ts, src/services/invoice-issuance-queue.service.ts, src/common/websocket.ts.

1. Inbound — Kafka

TopicProducerHandlerKhoá IdempotencyChế độ lỗi
KafkaTopics.PAYMENT_SUCCESS@nx/saleInvoiceWorkerService.handlePaymentSuccesshoá đơn unique-một-phần (sourceType, sourceId)bỏ qua nếu không COMPLETED / đã có hoá đơn active; log + return
CDCKafkaTopics.MERCHANT (Debezium)@nx/commerce (CDC)InvoiceWorkerService.handleMerchantCDC(principalType, principalId) trên TaxInfoop c/r đối chiếu, u diff-và-bỏ-qua, d bị bỏ qua

Consumer: autocommit: true, fallbackMode: 'earliest', group mặc định SVC-00150-INVOICE_CONSUMER_GROUP. Một consumer điều phối bằng switch theo message.topic.

2. Outbound — Kafka

Không có. Service không sản xuất topic Kafka. Thông báo downstream chỉ qua WebSocket và callback nhà cung cấp/webhook.

3. Inbound — BullMQ

3 phân vùng mỗi loại queue; phân vùng chọn bằng getPartitionByKey(orderId) (Java hashCode mod 3 — xác định theo từng đơn).

Loại queueJobProducerHandlerConcurrency
issuance (@nx/invoice/queue/issuance-{01,02,03})invoice-issuance (jobType 000_ISSUANCE)worker / cron / retry / claim-expiryInvoiceIssuanceQueueService.processIssuanceJobissueOrderAPP_ENV_INVOICE_ISSUANCE_WORKER_CONCURRENCY (mặc định 10)
issuanceinvoice-issuance (jobType 100_ADJUSTMENT)luồng adjustmentprocessIssuanceJob_processAdjustmentJob(cùng worker)
claim-expiry (@nx/invoice/queue/claim-expiry-{01,02,03})claim-expirypayment success (BUYER_SELF_SERVICE)processClaimExpiryJobmặc định 3

Job ID (idempotency):

JobjobId
IssuanceorderId
Retry${orderId}:retry:${n}
Adjustmentadjust:${invoiceId} (removeOnComplete/Fail: true)
Claim expiryexpiry:${orderId} (delay đến deadline)

4. Outbound — BullMQ

Tự enqueue bởi queue service (không có consumer BullMQ bên ngoài).

TriggerJobĐích
Thanh toán REAL_TIME / cron / resolve claimenqueueIssuancephân vùng issuance
Lỗi phát hành tạm thờienqueueIssuance (delay [5,15,60]phút)phân vùng issuance
Yêu cầu adjustmentenqueueAdjustmentphân vùng issuance
Thanh toán BUYER_SELF_SERVICEenqueueClaimExpiry (delay = deadline − now)phân vùng claim-expiry

5. Phát WebSocket

TopicRoomsTriggerPayload
observation/invoice/invoicemerchants/{merchantId}, merchants/{merchantId}/invoices, sale-orders/{saleOrderId}/invoicesthay đổi trạng thái phát hành→ §6

Phát bởi InvoiceSocketEventService.notifyInvoiceUpdate (best-effort, Promise.allSettled; no-op nếu emitter chưa sẵn sàng).

6. Payload Schemas

ts
// Inbound: KafkaTopics.PAYMENT_SUCCESS (TSalePaymentSuccess, from @nx/core)
interface TSalePaymentSuccess {
  saleOrderId: string;
  saleOrderNumber: string;
  merchantId: string;
  saleChannelId: string;
  createdBy: string;
  // ...additional sale fields
}

// Inbound: CDCKafkaTopics.MERCHANT (Debezium envelope)
interface TDebeziumMessage<TMerchantPgRow> {
  payload: {
    op: 'c' | 'r' | 'u' | 'd';
    before: TMerchantPgRow | null;
    after: TMerchantPgRow | null; // metadata.tax read here
  };
}

// BullMQ issuance job (src/common/queues.ts)
interface IInvoiceIssuanceJob {
  orderId: string;
  merchantId: string;
  triggeredBy: string;
  isRetry?: boolean;
  jobType?: '000_ISSUANCE' | '100_ADJUSTMENT';
  invoiceId?: string; // required for ADJUSTMENT
}

// BullMQ claim-expiry job
interface IClaimExpiryJob {
  orderId: string;
  merchantId: string;
}

// WebSocket: observation/invoice/invoice (IInvoiceSocketEventPayload)
interface IInvoiceSocketEventPayload {
  invoiceId: string;
  sourceId: string;
  sourceNumber: string;
  issuanceStatus: string;
  invoiceNumber: string | null;
  providerStatus: string | null;
  taxAuthorityStatus: number | null;
  eventSource: string;
}

7. Idempotency & Thứ tự

KênhPhân phốiThứ tựPhục hồi
PAYMENT_SUCCESSat-least-oncetheo phân vùng Kafkaguard hoá đơn-active khiến re-delivery thành no-op
Merchant CDCat-least-oncetheo phân vùng Kafkaop=u diff so với TaxInfo đã lưu → không đổi thì bỏ qua
Issuance jobtheo jobId = orderIdtheo phân vùng BullMQ (ổn định theo đơn)retry với backoff; DLQ → FAILED + audit
Claim-expiry jobtheo jobId = expiry:${orderId}delayedbắn lại an toàn (EXPIRED short-circuit)

8. Trang liên quan

Proprietary and Confidential. Unauthorized copying, distribution, or use of this software is strictly prohibited.