API Events
Commerce has no inbound Kafka consumer (in API role) and no application-level Kafka produce —
ApplicationKafkaComponentbinds a producer but no.send()call site exists insrc/. The real integration seam is CDC (Debezium) tailing the Postgres WAL. Internally, product replication uses an in-process EventBus + a BullMQ queue.
1. Inbound Kafka
| Surface | Status |
|---|---|
| Kafka consumer (API role) | none — APPLICATION_KAFKA_CONSUMER key declared, not wired |
| CDC consumer (WORKER role) | ApplicationCdcComponent from @nx/search syncs commerce DB → Typesense |
2. Outbound Kafka
No application produce. A producer is bound at
BindingKeys.APPLICATION_KAFKA_PRODUCER(clientIdSVC-00020-COMMERCE_PRODUCER) but is not invoked anywhere insrc/. Outbound async to sister services happens via CDC, not service-level produce.
2.1 CDC topics (Debezium) — the integration seam
Debezium tails the commerce Postgres WAL and publishes one topic per table. Topic names from
CDCKafkaTopics/CdcTables(@nx/core). See ADR-0002.
| CDC table | Topic (public.<table>) | Primary consumers | Purpose downstream |
|---|---|---|---|
Merchant | ...public.Merchant | inventory, invoice, search | seed default location; TaxInfo upsert; index |
Organizer | ...public.Organizer | search | index tenant |
Product | ...public.Product | search, taxation | index; tax-group provisioning |
ProductInfo | ...public.ProductInfo | search | i18n name/description fan-out |
ProductVariant | ...public.ProductVariant | inventory, pricing, search | seed InventoryItem; fare init; index |
ProductCategory | ...public.ProductCategory | search | cascade categoryIds (no own collection) |
ProductBundler | ...public.ProductBundler | search | cascade combo items to lead variant |
Category | ...public.Category | search | index |
SaleChannel | ...public.SaleChannel | search | index |
Device | ...public.Device | search | index |
MetaLink | ...public.MetaLink | search | cascade images to product/variant |
Fare / FareSet | pricing.* | search | recompute variant defaultPrice/fareSet |
Consumer wiring lives in the consuming packages (search/inventory/pricing/taxation/invoice), not in commerce. Commerce's only obligation is to write to Postgres.
3. Inbound BullMQ
| Queue | Constant | Job names | Worker | Handler |
|---|---|---|---|---|
@nx/commerce/sync-product-queue | BindingKeys.SYNC_PRODUCT_QUEUE_NAME | <queue>.create (default), <queue>.update | SyncProductWorker | ProductCreateSyncService / ProductUpdateSyncService.syncToAdditionalMerchants() |
| Job | Payload |
|---|---|
| create | { primaryProductId, additionalMerchantIds[], data } |
| update | { primaryProductId, additionalMerchantIds[] } |
Worker enabled only when listed in
APP_ENV_WORKERS(orALL). Concurrency fromAPP_ENV_BULLMQ_WORKER_CONCURRENCY(default1). Runs on the BullMQ Redis connection (separate from cache Redis).
4. Outbound BullMQ
| Producer | Queue | When |
|---|---|---|
ProductAggregateCreatedListener.pushJobToQueue() | SYNC_PRODUCT_QUEUE (.create job) | after product.aggregate.created, if syncMerchantIds.length > 0 |
ProductAggregateUpdatedListener.pushJobToQueue() | SYNC_PRODUCT_QUEUE (.update job) | after product.aggregate.updated, if sync merchants present |
Enqueue happens via
JobDispatcherHelper.publish()from the in-process EventBus listeners (see §6).
5. WebSocket Emissions
ApplicationWebSocketComponentbinds aWebSocketEmitter(identifier: 'commerce-ws-emitter') and registersCommerceSocketEventService(extends@nx/coreBaseSocketEventService). Redis-backed (single/cluster). The emitter is the channel for real-time merchant-facing updates (e.g. footer-summary cache invalidation references the socket push path).
| Aspect | Value |
|---|---|
| Component | ApplicationWebSocketComponent (src/components/websocket/) |
| Service | CommerceSocketEventService |
| Emitter binding | CommerceWebSocketBindingKeys.WEBSOCKET_EMITTER |
| Transport | Redis (APP_ENV_WEBSOCKET_REDIS_*), single or cluster |
Topic/room constants are not enumerated in a dedicated
websocket.ts(unlike@nx/sale); emission shape follows theBaseSocketEventServiceroom convention.
6. In-process EventBus (eventemitter3)
EventBusComponentwires anEventBussingleton, two listeners, and a registry. Event keys insrc/common/event.ts.
| Event key | Constant | Emitted by | Listener actions |
|---|---|---|---|
product.aggregate.created | EVENT_EMITTER.PRODUCT.AGGREGATE_CREATED | ProductCreateService | handle (mark onboarding step) + pushJobToQueue |
product.aggregate.updated | EVENT_EMITTER.PRODUCT.AGGREGATE_UPDATED | ProductUpdateService | pushJobToQueue |
The
createdkey is registered twice — once forhandle(onboarding) and once forpushJobToQueue(sync enqueue).
7. Payload Schemas
// IProductAggregateCreatedEventData (src/common/interface)
{
payloadRequest: TCreateProductRequest; // the original aggregate request
productId: string; // newly-created primary product id
syncMerchantIds?: string[]; // additional merchants to replicate to
step?: TMerchantOnboardingStep; // onboarding step to mark (default PRODUCT)
}
// IProductAggregateUpdatedEventData — analogous (productId + sync merchants)
// SYNC_PRODUCT_QUEUE create job
{ primaryProductId: string; additionalMerchantIds: string[]; data: TCreateProductRequest }
// SYNC_PRODUCT_QUEUE update job
{ primaryProductId: string; additionalMerchantIds: string[] }8. Idempotency & Ordering
| Surface | Delivery | Ordering | Recovery |
|---|---|---|---|
| CDC (Debezium) | at-least-once | per-row (Postgres WAL order, keyed by PK) | Debezium offset replay; consumers idempotent on PK |
| EventBus | in-process, best-effort | synchronous within process | lost on crash before enqueue (aggregate already committed) |
BullMQ SYNC_PRODUCT_QUEUE | at-least-once | per-job | BullMQ retry/backoff; sync re-runs safely (upsert semantics) |
| WebSocket | best-effort | none (broadcast) | clients refetch from REST on reconnect |
9. Related Pages
- Integration — CDC consumer contracts per sister service
- Architecture — Runtime Scenarios
- Configuration — Kafka / BullMQ / WebSocket env
- Decisions