Kafka Event Streaming
1. Overview
NX-Seller uses Apache Kafka for asynchronous, event-driven communication between microservices. Events are published when business state changes occur (e.g., payment success, merchant creation) and consumed by downstream services that react to those changes.
All topic names are defined centrally in @nx/core at src/queues/kafka/topics.ts.
2. Topic Registry
| Topic | Description |
|---|---|
payment.success | Sale payment completed successfully |
purchase-order.received | Purchase order items received into inventory |
commerce.initialized | Organizer commerce setup completed |
merchant.created | New merchant created |
product-variant.created | New product variant created |
product-variant.updated | Product variant updated |
ledger.generate | Ledger document generation request |
3. Producer & Consumer Matrix
| Topic | Producer(s) | Consumer(s) |
|---|---|---|
payment.success | Sale / Payment | Finance, Inventory |
purchase-order.received | Inventory | Finance |
commerce.initialized | Commerce | Finance, Inventory |
merchant.created | Commerce | Inventory |
product-variant.created | Commerce | Search (CDC) |
product-variant.updated | Commerce | Search (CDC) |
ledger.generate | Ledger (self) | Ledger (self) |
4. Message Flow Diagram
5. Consumer Groups
| Service | Client ID | Group ID |
|---|---|---|
| Finance | SVC-00040-FINANCE_CONSUMER | SVC-00040-FINANCE_CONSUMER_GROUP |
| Inventory | SVC-00030-INVENTORY_CONSUMER | SVC-00030-INVENTORY_CONSUMER_GROUP |
| Ledger | SVC-00060-LEDGER_CONSUMER | SVC-00060-LEDGER_GROUP |
| Search (CDC) | SVC-00080-SEARCH_CDC_CONSUMER | SVC-00080-SEARCH_CDC_GROUP |
6. Consumer Configurations
6.1. Finance Consumer
| Setting | Value |
|---|---|
| Topics | payment.success, purchase-order.received, commerce.initialized |
| Auto-commit | enabled |
| Fallback Mode | earliest |
| Deserializer | String keys, JSON values |
6.2. Inventory Consumer
| Setting | Value |
|---|---|
| Topics | payment.success, merchant.created, commerce.initialized |
| Fallback Mode | earliest |
6.3. Ledger Consumer
| Setting | Value |
|---|---|
| Topics | ledger.generate |
| Auto-commit | false (manual commit after processing) |
| Consumer Count | Configurable via APP_ENV_KAFKA_CONSUMER_COUNT |
| Acks (Producer) | ALL |
| Idempotent (Producer) | true |
6.4. Search CDC Consumer
| Setting | Value |
|---|---|
| Topics | 7 Debezium CDC topics |
| Auto-commit | false (manual offset management) |
| Max Batch Size | 200 |
| Flush Interval | 2000ms |
| Max Wait Time | 500ms |
| Max Bytes | 5MB |
See CDC / Debezium for details.
7. Event Payload Examples
7.1. Payment Success
json
{
"merchantId": "123456789",
"orderId": "987654321",
"payment": {
"amount": 150000,
"currency": "VND",
"method": "QR",
"finance": {
"use": true,
"walletId": null
}
}
}7.2. Commerce Initialized
json
{
"organizerId": "111222333",
"merchantId": "444555666",
"merchantName": { "en": "My Store", "vi": "Cửa hàng" }
}7.3. Ledger Generate
json
{
"ledgerId": "777888999",
"ledgerType": "S1a-HKD",
"merchantId": "444555666",
"period": "2026-Q1",
"version": 1,
"isRetry": false,
"enqueueTime": 1711785600000
}8. Best Practices
8.1. Idempotency
All consumers implement idempotency checks to safely handle duplicate messages:
| Service | Strategy |
|---|---|
| Finance | Check existing transaction by merchantId + referenceType + referenceId |
| Inventory | Check existing default location by merchantId |
| Ledger | Check existing pending job by ledgerId |
8.2. Error Handling
| Strategy | Used By |
|---|---|
| Skip + Log | Finance (logs error, continues processing) |
| DLQ (Dead Letter Queue) | Search CDC (nx.seller.cdc.dlq) |
| Retry via API | Ledger (manual retry endpoint) |
| Startup Recovery | Ledger (re-enqueues stalled jobs) |
8.3. Serialization
| Direction | Format |
|---|---|
| Producer | JSON serializer (values), string serializer (keys) |
| Consumer | JSON deserializer (values), string deserializer (keys) |
9. Infrastructure
9.1. Development (Docker Compose)
3-node Kafka cluster:
nx-kafka-1:29092nx-kafka-2:29092nx-kafka-3:29092
9.2. Environment Variables
| Variable | Description |
|---|---|
APP_ENV_KAFKA_BROKERS | Comma-separated broker addresses |
APP_ENV_KAFKA_CLIENT_ID | Service-specific client identifier |
APP_ENV_KAFKA_GROUP_ID | Consumer group identifier |
APP_ENV_KAFKA_CONSUMER_COUNT | Number of parallel consumers (Ledger) |
APP_ENV_KAFKA_SASL_MECHANISM | SASL mechanism (e.g., SCRAM-SHA-256) |
APP_ENV_KAFKA_SASL_USERNAME | SASL username |
APP_ENV_KAFKA_SASL_PASSWORD | SASL password |
10. Related Documentation
| Document | Description |
|---|---|
| CDC / Debezium | Change Data Capture via Kafka |
| Finance Service | Finance event consumers |
| Inventory Service | Inventory event consumers |
| Ledger Service | Ledger generation pipeline |
| Search Service | CDC consumer for Typesense sync |