Skip to content

Kafka Event Streaming

1. Overview

NX-Seller uses Apache Kafka for asynchronous, event-driven communication between microservices. Events are published when business state changes occur (e.g., payment success, merchant creation) and consumed by downstream services that react to those changes.

All topic names are defined centrally in @nx/core at src/queues/kafka/topics.ts.

2. Topic Registry

TopicDescription
payment.successSale payment completed successfully
purchase-order.receivedPurchase order items received into inventory
commerce.initializedOrganizer commerce setup completed
merchant.createdNew merchant created
product-variant.createdNew product variant created
product-variant.updatedProduct variant updated
ledger.generateLedger document generation request

3. Producer & Consumer Matrix

TopicProducer(s)Consumer(s)
payment.successSale / PaymentFinance, Inventory
purchase-order.receivedInventoryFinance
commerce.initializedCommerceFinance, Inventory
merchant.createdCommerceInventory
product-variant.createdCommerceSearch (CDC)
product-variant.updatedCommerceSearch (CDC)
ledger.generateLedger (self)Ledger (self)

4. Message Flow Diagram

5. Consumer Groups

ServiceClient IDGroup ID
FinanceSVC-00040-FINANCE_CONSUMERSVC-00040-FINANCE_CONSUMER_GROUP
InventorySVC-00030-INVENTORY_CONSUMERSVC-00030-INVENTORY_CONSUMER_GROUP
LedgerSVC-00060-LEDGER_CONSUMERSVC-00060-LEDGER_GROUP
Search (CDC)SVC-00080-SEARCH_CDC_CONSUMERSVC-00080-SEARCH_CDC_GROUP

6. Consumer Configurations

6.1. Finance Consumer

SettingValue
Topicspayment.success, purchase-order.received, commerce.initialized
Auto-commitenabled
Fallback Modeearliest
DeserializerString keys, JSON values

6.2. Inventory Consumer

SettingValue
Topicspayment.success, merchant.created, commerce.initialized
Fallback Modeearliest

6.3. Ledger Consumer

SettingValue
Topicsledger.generate
Auto-commitfalse (manual commit after processing)
Consumer CountConfigurable via APP_ENV_KAFKA_CONSUMER_COUNT
Acks (Producer)ALL
Idempotent (Producer)true

6.4. Search CDC Consumer

SettingValue
Topics7 Debezium CDC topics
Auto-commitfalse (manual offset management)
Max Batch Size200
Flush Interval2000ms
Max Wait Time500ms
Max Bytes5MB

See CDC / Debezium for details.

7. Event Payload Examples

7.1. Payment Success

json
{
  "merchantId": "123456789",
  "orderId": "987654321",
  "payment": {
    "amount": 150000,
    "currency": "VND",
    "method": "QR",
    "finance": {
      "use": true,
      "walletId": null
    }
  }
}

7.2. Commerce Initialized

json
{
  "organizerId": "111222333",
  "merchantId": "444555666",
  "merchantName": { "en": "My Store", "vi": "Cửa hàng" }
}

7.3. Ledger Generate

json
{
  "ledgerId": "777888999",
  "ledgerType": "S1a-HKD",
  "merchantId": "444555666",
  "period": "2026-Q1",
  "version": 1,
  "isRetry": false,
  "enqueueTime": 1711785600000
}

8. Best Practices

8.1. Idempotency

All consumers implement idempotency checks to safely handle duplicate messages:

ServiceStrategy
FinanceCheck existing transaction by merchantId + referenceType + referenceId
InventoryCheck existing default location by merchantId
LedgerCheck existing pending job by ledgerId

8.2. Error Handling

StrategyUsed By
Skip + LogFinance (logs error, continues processing)
DLQ (Dead Letter Queue)Search CDC (nx.seller.cdc.dlq)
Retry via APILedger (manual retry endpoint)
Startup RecoveryLedger (re-enqueues stalled jobs)

8.3. Serialization

DirectionFormat
ProducerJSON serializer (values), string serializer (keys)
ConsumerJSON deserializer (values), string deserializer (keys)

9. Infrastructure

9.1. Development (Docker Compose)

3-node Kafka cluster:

  • nx-kafka-1:29092
  • nx-kafka-2:29092
  • nx-kafka-3:29092

9.2. Environment Variables

VariableDescription
APP_ENV_KAFKA_BROKERSComma-separated broker addresses
APP_ENV_KAFKA_CLIENT_IDService-specific client identifier
APP_ENV_KAFKA_GROUP_IDConsumer group identifier
APP_ENV_KAFKA_CONSUMER_COUNTNumber of parallel consumers (Ledger)
APP_ENV_KAFKA_SASL_MECHANISMSASL mechanism (e.g., SCRAM-SHA-256)
APP_ENV_KAFKA_SASL_USERNAMESASL username
APP_ENV_KAFKA_SASL_PASSWORDSASL password
DocumentDescription
CDC / DebeziumChange Data Capture via Kafka
Finance ServiceFinance event consumers
Inventory ServiceInventory event consumers
Ledger ServiceLedger generation pipeline
Search ServiceCDC consumer for Typesense sync

Proprietary and Confidential. Unauthorized copying, distribution, or use of this software is strictly prohibited.