Skip to content

API Events

Commerce has no inbound Kafka consumer (in API role) and no application-level Kafka produceApplicationKafkaComponent binds a producer but no .send() call site exists in src/. The real integration seam is CDC (Debezium) tailing the Postgres WAL. Internally, product replication uses an in-process EventBus + a BullMQ queue.

1. Inbound Kafka

SurfaceStatus
Kafka consumer (API role)noneAPPLICATION_KAFKA_CONSUMER key declared, not wired
CDC consumer (WORKER role)ApplicationCdcComponent from @nx/search syncs commerce DB → Typesense

2. Outbound Kafka

No application produce. A producer is bound at BindingKeys.APPLICATION_KAFKA_PRODUCER (clientId SVC-00020-COMMERCE_PRODUCER) but is not invoked anywhere in src/. Outbound async to sister services happens via CDC, not service-level produce.

2.1 CDC topics (Debezium) — the integration seam

Debezium tails the commerce Postgres WAL and publishes one topic per table. Topic names from CDCKafkaTopics / CdcTables (@nx/core). See ADR-0002.

CDC tableTopic (public.<table>)Primary consumersPurpose downstream
Merchant...public.Merchantinventory, invoice, searchseed default location; TaxInfo upsert; index
Organizer...public.Organizersearchindex tenant
Product...public.Productsearch, taxationindex; tax-group provisioning
ProductInfo...public.ProductInfosearchi18n name/description fan-out
ProductVariant...public.ProductVariantinventory, pricing, searchseed InventoryItem; fare init; index
ProductCategory...public.ProductCategorysearchcascade categoryIds (no own collection)
ProductBundler...public.ProductBundlersearchcascade combo items to lead variant
Category...public.Categorysearchindex
SaleChannel...public.SaleChannelsearchindex
Device...public.Devicesearchindex
MetaLink...public.MetaLinksearchcascade images to product/variant
Fare / FareSetpricing.*searchrecompute variant defaultPrice/fareSet

Consumer wiring lives in the consuming packages (search/inventory/pricing/taxation/invoice), not in commerce. Commerce's only obligation is to write to Postgres.

3. Inbound BullMQ

QueueConstantJob namesWorkerHandler
@nx/commerce/sync-product-queueBindingKeys.SYNC_PRODUCT_QUEUE_NAME<queue>.create (default), <queue>.updateSyncProductWorkerProductCreateSyncService / ProductUpdateSyncService.syncToAdditionalMerchants()
JobPayload
create{ primaryProductId, additionalMerchantIds[], data }
update{ primaryProductId, additionalMerchantIds[] }

Worker enabled only when listed in APP_ENV_WORKERS (or ALL). Concurrency from APP_ENV_BULLMQ_WORKER_CONCURRENCY (default 1). Runs on the BullMQ Redis connection (separate from cache Redis).

4. Outbound BullMQ

ProducerQueueWhen
ProductAggregateCreatedListener.pushJobToQueue()SYNC_PRODUCT_QUEUE (.create job)after product.aggregate.created, if syncMerchantIds.length > 0
ProductAggregateUpdatedListener.pushJobToQueue()SYNC_PRODUCT_QUEUE (.update job)after product.aggregate.updated, if sync merchants present

Enqueue happens via JobDispatcherHelper.publish() from the in-process EventBus listeners (see §6).

5. WebSocket Emissions

ApplicationWebSocketComponent binds a WebSocketEmitter (identifier: 'commerce-ws-emitter') and registers CommerceSocketEventService (extends @nx/core BaseSocketEventService). Redis-backed (single/cluster). The emitter is the channel for real-time merchant-facing updates (e.g. footer-summary cache invalidation references the socket push path).

AspectValue
ComponentApplicationWebSocketComponent (src/components/websocket/)
ServiceCommerceSocketEventService
Emitter bindingCommerceWebSocketBindingKeys.WEBSOCKET_EMITTER
TransportRedis (APP_ENV_WEBSOCKET_REDIS_*), single or cluster

Topic/room constants are not enumerated in a dedicated websocket.ts (unlike @nx/sale); emission shape follows the BaseSocketEventService room convention.

6. In-process EventBus (eventemitter3)

EventBusComponent wires an EventBus singleton, two listeners, and a registry. Event keys in src/common/event.ts.

Event keyConstantEmitted byListener actions
product.aggregate.createdEVENT_EMITTER.PRODUCT.AGGREGATE_CREATEDProductCreateServicehandle (mark onboarding step) + pushJobToQueue
product.aggregate.updatedEVENT_EMITTER.PRODUCT.AGGREGATE_UPDATEDProductUpdateServicepushJobToQueue

The created key is registered twice — once for handle (onboarding) and once for pushJobToQueue (sync enqueue).

7. Payload Schemas

ts
// IProductAggregateCreatedEventData (src/common/interface)
{
  payloadRequest: TCreateProductRequest;  // the original aggregate request
  productId: string;                        // newly-created primary product id
  syncMerchantIds?: string[];               // additional merchants to replicate to
  step?: TMerchantOnboardingStep;           // onboarding step to mark (default PRODUCT)
}

// IProductAggregateUpdatedEventData — analogous (productId + sync merchants)

// SYNC_PRODUCT_QUEUE create job
{ primaryProductId: string; additionalMerchantIds: string[]; data: TCreateProductRequest }

// SYNC_PRODUCT_QUEUE update job
{ primaryProductId: string; additionalMerchantIds: string[] }

8. Idempotency & Ordering

SurfaceDeliveryOrderingRecovery
CDC (Debezium)at-least-onceper-row (Postgres WAL order, keyed by PK)Debezium offset replay; consumers idempotent on PK
EventBusin-process, best-effortsynchronous within processlost on crash before enqueue (aggregate already committed)
BullMQ SYNC_PRODUCT_QUEUEat-least-onceper-jobBullMQ retry/backoff; sync re-runs safely (upsert semantics)
WebSocketbest-effortnone (broadcast)clients refetch from REST on reconnect

Proprietary and Confidential. Unauthorized copying, distribution, or use of this software is strictly prohibited.