Kafka Event Streaming
1. Tổng quan
NX-Seller dùng Apache Kafka cho giao tiếp bất đồng bộ, hướng sự kiện giữa các microservice. Sự kiện được publish khi có thay đổi trạng thái nghiệp vụ (ví dụ thanh toán thành công, tạo merchant) và được consume bởi các dịch vụ phía dưới phản ứng với những thay đổi đó.
Tất cả tên topic được định nghĩa tập trung trong @nx/core tại src/queues/kafka/topics.ts.
2. Danh mục Topic
| Topic | Mô tả |
|---|---|
payment.success | Thanh toán bán hàng hoàn tất thành công |
purchase-order.received | Các mục đơn mua được nhận vào kho |
commerce.initialized | Cài đặt commerce của organizer hoàn tất |
merchant.created | Merchant mới được tạo |
product-variant.created | Biến thể sản phẩm mới được tạo |
product-variant.updated | Biến thể sản phẩm được cập nhật |
ledger.generate | Yêu cầu tạo tài liệu ledger |
3. Ma trận Producer & Consumer
| Topic | Producer | Consumer |
|---|---|---|
payment.success | Sale / Payment | Finance, Inventory |
purchase-order.received | Inventory | Finance |
commerce.initialized | Commerce | Finance, Inventory |
merchant.created | Commerce | Inventory |
product-variant.created | Commerce | Search (CDC) |
product-variant.updated | Commerce | Search (CDC) |
ledger.generate | Ledger (tự thân) | Ledger (tự thân) |
4. Sơ đồ Luồng Message
5. Consumer Group
| Dịch vụ | Client ID | Group ID |
|---|---|---|
| Finance | SVC-00040-FINANCE_CONSUMER | SVC-00040-FINANCE_CONSUMER_GROUP |
| Inventory | SVC-00030-INVENTORY_CONSUMER | SVC-00030-INVENTORY_CONSUMER_GROUP |
| Ledger | SVC-00060-LEDGER_CONSUMER | SVC-00060-LEDGER_GROUP |
| Search (CDC) | SVC-00080-SEARCH_CDC_CONSUMER | SVC-00080-SEARCH_CDC_GROUP |
6. Cấu hình Consumer
6.1. Consumer Finance
| Cài đặt | Giá trị |
|---|---|
| Topic | payment.success, purchase-order.received, commerce.initialized |
| Auto-commit | bật |
| Fallback Mode | earliest |
| Deserializer | Key dạng String, Value dạng JSON |
6.2. Consumer Inventory
| Cài đặt | Giá trị |
|---|---|
| Topic | payment.success, merchant.created, commerce.initialized |
| Fallback Mode | earliest |
6.3. Consumer Ledger
| Cài đặt | Giá trị |
|---|---|
| Topic | ledger.generate |
| Auto-commit | false (commit thủ công sau khi xử lý) |
| Số lượng Consumer | Có thể cấu hình qua APP_ENV_KAFKA_CONSUMER_COUNT |
| Acks (Producer) | ALL |
| Idempotent (Producer) | true |
6.4. Consumer Search CDC
| Cài đặt | Giá trị |
|---|---|
| Topic | 7 topic CDC Debezium |
| Auto-commit | false (quản lý offset thủ công) |
| Max Batch Size | 200 |
| Flush Interval | 2000ms |
| Max Wait Time | 500ms |
| Max Bytes | 5MB |
Xem CDC / Debezium để biết chi tiết.
7. Ví dụ Payload Sự kiện
7.1. Payment Success
json
{
"merchantId": "123456789",
"orderId": "987654321",
"payment": {
"amount": 150000,
"currency": "VND",
"method": "QR",
"finance": {
"use": true,
"walletId": null
}
}
}7.2. Commerce Initialized
json
{
"organizerId": "111222333",
"merchantId": "444555666",
"merchantName": { "en": "My Store", "vi": "Cửa hàng" }
}7.3. Ledger Generate
json
{
"ledgerId": "777888999",
"ledgerType": "S1a-HKD",
"merchantId": "444555666",
"period": "2026-Q1",
"version": 1,
"isRetry": false,
"enqueueTime": 1711785600000
}8. Thực hành Tốt nhất
8.1. Idempotency
Tất cả consumer đều thực hiện kiểm tra idempotency để xử lý an toàn các message trùng lặp:
| Dịch vụ | Chiến lược |
|---|---|
| Finance | Kiểm tra giao dịch hiện có theo merchantId + referenceType + referenceId |
| Inventory | Kiểm tra vị trí mặc định hiện có theo merchantId |
| Ledger | Kiểm tra tác vụ đang chờ hiện có theo ledgerId |
8.2. Xử lý Lỗi
| Chiến lược | Được dùng bởi |
|---|---|
| Bỏ qua + Log | Finance (log lỗi, tiếp tục xử lý) |
| DLQ (Dead Letter Queue) | Search CDC (nx.seller.cdc.dlq) |
| Retry qua API | Ledger (endpoint retry thủ công) |
| Phục hồi khi khởi động | Ledger (đưa lại các tác vụ bị treo vào hàng đợi) |
8.3. Serialization
| Hướng | Định dạng |
|---|---|
| Producer | JSON serializer (value), string serializer (key) |
| Consumer | JSON deserializer (value), string deserializer (key) |
9. Hạ tầng
9.1. Phát triển (Docker Compose)
Cluster Kafka 3 node:
nx-kafka-1:29092nx-kafka-2:29092nx-kafka-3:29092
9.2. Biến Môi trường
| Biến | Mô tả |
|---|---|
APP_ENV_KAFKA_BROKERS | Địa chỉ broker cách nhau bằng dấu phẩy |
APP_ENV_KAFKA_CLIENT_ID | Định danh client riêng cho dịch vụ |
APP_ENV_KAFKA_GROUP_ID | Định danh consumer group |
APP_ENV_KAFKA_CONSUMER_COUNT | Số consumer song song (Ledger) |
APP_ENV_KAFKA_SASL_MECHANISM | Cơ chế SASL (ví dụ SCRAM-SHA-256) |
APP_ENV_KAFKA_SASL_USERNAME | Username SASL |
APP_ENV_KAFKA_SASL_PASSWORD | Password SASL |
10. Tài liệu Liên quan
| Tài liệu | Mô tả |
|---|---|
| CDC / Debezium | Change Data Capture qua Kafka |
| Finance Service | Consumer sự kiện Finance |
| Inventory Service | Consumer sự kiện Inventory |
| Ledger Service | Pipeline tạo Ledger |
| Search Service | Consumer CDC để đồng bộ Typesense |