Skip to content

Kafka Event Streaming

1. Tổng quan

NX-Seller dùng Apache Kafka cho giao tiếp bất đồng bộ, hướng sự kiện giữa các microservice. Sự kiện được publish khi có thay đổi trạng thái nghiệp vụ (ví dụ thanh toán thành công, tạo merchant) và được consume bởi các dịch vụ phía dưới phản ứng với những thay đổi đó.

Tất cả tên topic được định nghĩa tập trung trong @nx/core tại src/queues/kafka/topics.ts.

2. Danh mục Topic

TopicMô tả
payment.successThanh toán bán hàng hoàn tất thành công
purchase-order.receivedCác mục đơn mua được nhận vào kho
commerce.initializedCài đặt commerce của organizer hoàn tất
merchant.createdMerchant mới được tạo
product-variant.createdBiến thể sản phẩm mới được tạo
product-variant.updatedBiến thể sản phẩm được cập nhật
ledger.generateYêu cầu tạo tài liệu ledger

3. Ma trận Producer & Consumer

TopicProducerConsumer
payment.successSale / PaymentFinance, Inventory
purchase-order.receivedInventoryFinance
commerce.initializedCommerceFinance, Inventory
merchant.createdCommerceInventory
product-variant.createdCommerceSearch (CDC)
product-variant.updatedCommerceSearch (CDC)
ledger.generateLedger (tự thân)Ledger (tự thân)

4. Sơ đồ Luồng Message

5. Consumer Group

Dịch vụClient IDGroup ID
FinanceSVC-00040-FINANCE_CONSUMERSVC-00040-FINANCE_CONSUMER_GROUP
InventorySVC-00030-INVENTORY_CONSUMERSVC-00030-INVENTORY_CONSUMER_GROUP
LedgerSVC-00060-LEDGER_CONSUMERSVC-00060-LEDGER_GROUP
Search (CDC)SVC-00080-SEARCH_CDC_CONSUMERSVC-00080-SEARCH_CDC_GROUP

6. Cấu hình Consumer

6.1. Consumer Finance

Cài đặtGiá trị
Topicpayment.success, purchase-order.received, commerce.initialized
Auto-commitbật
Fallback Modeearliest
DeserializerKey dạng String, Value dạng JSON

6.2. Consumer Inventory

Cài đặtGiá trị
Topicpayment.success, merchant.created, commerce.initialized
Fallback Modeearliest

6.3. Consumer Ledger

Cài đặtGiá trị
Topicledger.generate
Auto-commitfalse (commit thủ công sau khi xử lý)
Số lượng ConsumerCó thể cấu hình qua APP_ENV_KAFKA_CONSUMER_COUNT
Acks (Producer)ALL
Idempotent (Producer)true

6.4. Consumer Search CDC

Cài đặtGiá trị
Topic7 topic CDC Debezium
Auto-commitfalse (quản lý offset thủ công)
Max Batch Size200
Flush Interval2000ms
Max Wait Time500ms
Max Bytes5MB

Xem CDC / Debezium để biết chi tiết.

7. Ví dụ Payload Sự kiện

7.1. Payment Success

json
{
  "merchantId": "123456789",
  "orderId": "987654321",
  "payment": {
    "amount": 150000,
    "currency": "VND",
    "method": "QR",
    "finance": {
      "use": true,
      "walletId": null
    }
  }
}

7.2. Commerce Initialized

json
{
  "organizerId": "111222333",
  "merchantId": "444555666",
  "merchantName": { "en": "My Store", "vi": "Cửa hàng" }
}

7.3. Ledger Generate

json
{
  "ledgerId": "777888999",
  "ledgerType": "S1a-HKD",
  "merchantId": "444555666",
  "period": "2026-Q1",
  "version": 1,
  "isRetry": false,
  "enqueueTime": 1711785600000
}

8. Thực hành Tốt nhất

8.1. Idempotency

Tất cả consumer đều thực hiện kiểm tra idempotency để xử lý an toàn các message trùng lặp:

Dịch vụChiến lược
FinanceKiểm tra giao dịch hiện có theo merchantId + referenceType + referenceId
InventoryKiểm tra vị trí mặc định hiện có theo merchantId
LedgerKiểm tra tác vụ đang chờ hiện có theo ledgerId

8.2. Xử lý Lỗi

Chiến lượcĐược dùng bởi
Bỏ qua + LogFinance (log lỗi, tiếp tục xử lý)
DLQ (Dead Letter Queue)Search CDC (nx.seller.cdc.dlq)
Retry qua APILedger (endpoint retry thủ công)
Phục hồi khi khởi độngLedger (đưa lại các tác vụ bị treo vào hàng đợi)

8.3. Serialization

HướngĐịnh dạng
ProducerJSON serializer (value), string serializer (key)
ConsumerJSON deserializer (value), string deserializer (key)

9. Hạ tầng

9.1. Phát triển (Docker Compose)

Cluster Kafka 3 node:

  • nx-kafka-1:29092
  • nx-kafka-2:29092
  • nx-kafka-3:29092

9.2. Biến Môi trường

BiếnMô tả
APP_ENV_KAFKA_BROKERSĐịa chỉ broker cách nhau bằng dấu phẩy
APP_ENV_KAFKA_CLIENT_IDĐịnh danh client riêng cho dịch vụ
APP_ENV_KAFKA_GROUP_IDĐịnh danh consumer group
APP_ENV_KAFKA_CONSUMER_COUNTSố consumer song song (Ledger)
APP_ENV_KAFKA_SASL_MECHANISMCơ chế SASL (ví dụ SCRAM-SHA-256)
APP_ENV_KAFKA_SASL_USERNAMEUsername SASL
APP_ENV_KAFKA_SASL_PASSWORDPassword SASL

10. Tài liệu Liên quan

Tài liệuMô tả
CDC / DebeziumChange Data Capture qua Kafka
Finance ServiceConsumer sự kiện Finance
Inventory ServiceConsumer sự kiện Inventory
Ledger ServicePipeline tạo Ledger
Search ServiceConsumer CDC để đồng bộ Typesense

Proprietary and Confidential. Unauthorized copying, distribution, or use of this software is strictly prohibited.