Skip to content

API Events

Commerce không có consumer Kafka inbound (ở vai trò API) và không produce Kafka cấp ứng dụngApplicationKafkaComponent bind một producer nhưng không tồn tại call site .send() trong src/. Seam tích hợp thực sự là CDC (Debezium) đọc Postgres WAL. Bên trong, sao chép sản phẩm dùng EventBus in-process + một hàng đợi BullMQ.

1. Kafka Inbound

Bề mặtTrạng thái
Kafka consumer (vai trò API)không có — khóa APPLICATION_KAFKA_CONSUMER được khai báo, không đấu nối
CDC consumer (vai trò WORKER)ApplicationCdcComponent từ @nx/search đồng bộ DB commerce → Typesense

2. Kafka Outbound

Không produce cấp ứng dụng. Một producer được bind tại BindingKeys.APPLICATION_KAFKA_PRODUCER (clientId SVC-00020-COMMERCE_PRODUCER) nhưng không được gọi ở bất cứ đâu trong src/. Async outbound tới các dịch vụ chị em xảy ra qua CDC, không phải produce cấp dịch vụ.

2.1 CDC topics (Debezium) — seam tích hợp

Debezium đọc Postgres WAL của commerce và publish một topic cho mỗi bảng. Tên topic từ CDCKafkaTopics / CdcTables (@nx/core). Xem ADR-0002.

Bảng CDCTopic (public.<table>)Consumer chínhMục đích downstream
Merchant...public.Merchantinventory, invoice, searchseed location mặc định; upsert TaxInfo; index
Organizer...public.Organizersearchindex tenant
Product...public.Productsearch, taxationindex; cấp phát nhóm thuế
ProductInfo...public.ProductInfosearchfan-out name/description i18n
ProductVariant...public.ProductVariantinventory, pricing, searchseed InventoryItem; init fare; index
ProductCategory...public.ProductCategorysearchcascade categoryIds (không có collection riêng)
ProductBundler...public.ProductBundlersearchcascade item combo tới lead variant
Category...public.Categorysearchindex
SaleChannel...public.SaleChannelsearchindex
Device...public.Devicesearchindex
MetaLink...public.MetaLinksearchcascade hình ảnh tới product/variant
Fare / FareSetpricing.*searchtính lại defaultPrice/fareSet của variant

Đấu nối consumer nằm trong các package tiêu thụ (search/inventory/pricing/taxation/invoice), không phải trong commerce. Nghĩa vụ duy nhất của commerce là ghi vào Postgres.

3. BullMQ Inbound

Hàng đợiHằng sốTên jobWorkerHandler
@nx/commerce/sync-product-queueBindingKeys.SYNC_PRODUCT_QUEUE_NAME<queue>.create (mặc định), <queue>.updateSyncProductWorkerProductCreateSyncService / ProductUpdateSyncService.syncToAdditionalMerchants()
JobPayload
create{ primaryProductId, additionalMerchantIds[], data }
update{ primaryProductId, additionalMerchantIds[] }

Worker chỉ được bật khi liệt kê trong APP_ENV_WORKERS (hoặc ALL). Concurrency từ APP_ENV_BULLMQ_WORKER_CONCURRENCY (mặc định 1). Chạy trên kết nối BullMQ Redis (riêng với cache Redis).

4. BullMQ Outbound

ProducerHàng đợiKhi nào
ProductAggregateCreatedListener.pushJobToQueue()SYNC_PRODUCT_QUEUE (job .create)sau product.aggregate.created, nếu syncMerchantIds.length > 0
ProductAggregateUpdatedListener.pushJobToQueue()SYNC_PRODUCT_QUEUE (job .update)sau product.aggregate.updated, nếu có sync merchants

Đưa vào hàng đợi xảy ra qua JobDispatcherHelper.publish() từ các listener EventBus in-process (xem §6).

5. Phát WebSocket

ApplicationWebSocketComponent bind một WebSocketEmitter (identifier: 'commerce-ws-emitter') và đăng ký CommerceSocketEventService (kế thừa BaseSocketEventService của @nx/core). Nền Redis (single/cluster). Emitter là kênh cho cập nhật thời gian thực hướng merchant (ví dụ làm mất hiệu lực cache footer-summary tham chiếu đường push socket).

Khía cạnhGiá trị
ComponentApplicationWebSocketComponent (src/components/websocket/)
ServiceCommerceSocketEventService
Emitter bindingCommerceWebSocketBindingKeys.WEBSOCKET_EMITTER
TransportRedis (APP_ENV_WEBSOCKET_REDIS_*), single hoặc cluster

Hằng số topic/room không được liệt kê trong một websocket.ts riêng (khác với @nx/sale); hình dạng phát tuân theo quy ước room của BaseSocketEventService.

6. EventBus in-process (eventemitter3)

EventBusComponent đấu nối một singleton EventBus, hai listener và một registry. Khóa sự kiện trong src/common/event.ts.

Khóa sự kiệnHằng sốPhát bởiHành động listener
product.aggregate.createdEVENT_EMITTER.PRODUCT.AGGREGATE_CREATEDProductCreateServicehandle (đánh dấu bước onboarding) + pushJobToQueue
product.aggregate.updatedEVENT_EMITTER.PRODUCT.AGGREGATE_UPDATEDProductUpdateServicepushJobToQueue

Khóa created được đăng ký hai lần — một cho handle (onboarding) và một cho pushJobToQueue (đưa sync vào hàng đợi).

7. Schema Payload

ts
// IProductAggregateCreatedEventData (src/common/interface)
{
  payloadRequest: TCreateProductRequest;  // request aggregate gốc
  productId: string;                        // id product chính vừa tạo
  syncMerchantIds?: string[];               // merchant bổ sung để sao chép tới
  step?: TMerchantOnboardingStep;           // bước onboarding cần đánh dấu (mặc định PRODUCT)
}

// IProductAggregateUpdatedEventData — tương tự (productId + sync merchants)

// SYNC_PRODUCT_QUEUE create job
{ primaryProductId: string; additionalMerchantIds: string[]; data: TCreateProductRequest }

// SYNC_PRODUCT_QUEUE update job
{ primaryProductId: string; additionalMerchantIds: string[] }

8. Idempotency & Thứ tự

Bề mặtPhân phốiThứ tựPhục hồi
CDC (Debezium)at-least-oncetheo từng hàng (thứ tự Postgres WAL, khóa theo PK)replay offset Debezium; consumer idempotent theo PK
EventBusin-process, best-effortđồng bộ trong tiến trìnhmất khi crash trước khi đưa vào hàng đợi (aggregate đã commit)
BullMQ SYNC_PRODUCT_QUEUEat-least-oncetheo từng jobretry/backoff BullMQ; sync chạy lại an toàn (ngữ nghĩa upsert)
WebSocketbest-effortkhông (broadcast)client refetch từ REST khi kết nối lại

9. Trang liên quan

Proprietary and Confidential. Unauthorized copying, distribution, or use of this software is strictly prohibited.