API Events
Commerce không có consumer Kafka inbound (ở vai trò API) và không produce Kafka cấp ứng dụng —
ApplicationKafkaComponentbind một producer nhưng không tồn tại call site.send()trongsrc/. Seam tích hợp thực sự là CDC (Debezium) đọc Postgres WAL. Bên trong, sao chép sản phẩm dùng EventBus in-process + một hàng đợi BullMQ.
1. Kafka Inbound
| Bề mặt | Trạng thái |
|---|---|
| Kafka consumer (vai trò API) | không có — khóa APPLICATION_KAFKA_CONSUMER được khai báo, không đấu nối |
| CDC consumer (vai trò WORKER) | ApplicationCdcComponent từ @nx/search đồng bộ DB commerce → Typesense |
2. Kafka Outbound
Không produce cấp ứng dụng. Một producer được bind tại
BindingKeys.APPLICATION_KAFKA_PRODUCER(clientIdSVC-00020-COMMERCE_PRODUCER) nhưng không được gọi ở bất cứ đâu trongsrc/. Async outbound tới các dịch vụ chị em xảy ra qua CDC, không phải produce cấp dịch vụ.
2.1 CDC topics (Debezium) — seam tích hợp
Debezium đọc Postgres WAL của commerce và publish một topic cho mỗi bảng. Tên topic từ
CDCKafkaTopics/CdcTables(@nx/core). Xem ADR-0002.
| Bảng CDC | Topic (public.<table>) | Consumer chính | Mục đích downstream |
|---|---|---|---|
Merchant | ...public.Merchant | inventory, invoice, search | seed location mặc định; upsert TaxInfo; index |
Organizer | ...public.Organizer | search | index tenant |
Product | ...public.Product | search, taxation | index; cấp phát nhóm thuế |
ProductInfo | ...public.ProductInfo | search | fan-out name/description i18n |
ProductVariant | ...public.ProductVariant | inventory, pricing, search | seed InventoryItem; init fare; index |
ProductCategory | ...public.ProductCategory | search | cascade categoryIds (không có collection riêng) |
ProductBundler | ...public.ProductBundler | search | cascade item combo tới lead variant |
Category | ...public.Category | search | index |
SaleChannel | ...public.SaleChannel | search | index |
Device | ...public.Device | search | index |
MetaLink | ...public.MetaLink | search | cascade hình ảnh tới product/variant |
Fare / FareSet | pricing.* | search | tính lại defaultPrice/fareSet của variant |
Đấu nối consumer nằm trong các package tiêu thụ (search/inventory/pricing/taxation/invoice), không phải trong commerce. Nghĩa vụ duy nhất của commerce là ghi vào Postgres.
3. BullMQ Inbound
| Hàng đợi | Hằng số | Tên job | Worker | Handler |
|---|---|---|---|---|
@nx/commerce/sync-product-queue | BindingKeys.SYNC_PRODUCT_QUEUE_NAME | <queue>.create (mặc định), <queue>.update | SyncProductWorker | ProductCreateSyncService / ProductUpdateSyncService.syncToAdditionalMerchants() |
| Job | Payload |
|---|---|
| create | { primaryProductId, additionalMerchantIds[], data } |
| update | { primaryProductId, additionalMerchantIds[] } |
Worker chỉ được bật khi liệt kê trong
APP_ENV_WORKERS(hoặcALL). Concurrency từAPP_ENV_BULLMQ_WORKER_CONCURRENCY(mặc định1). Chạy trên kết nối BullMQ Redis (riêng với cache Redis).
4. BullMQ Outbound
| Producer | Hàng đợi | Khi nào |
|---|---|---|
ProductAggregateCreatedListener.pushJobToQueue() | SYNC_PRODUCT_QUEUE (job .create) | sau product.aggregate.created, nếu syncMerchantIds.length > 0 |
ProductAggregateUpdatedListener.pushJobToQueue() | SYNC_PRODUCT_QUEUE (job .update) | sau product.aggregate.updated, nếu có sync merchants |
Đưa vào hàng đợi xảy ra qua
JobDispatcherHelper.publish()từ các listener EventBus in-process (xem §6).
5. Phát WebSocket
ApplicationWebSocketComponentbind mộtWebSocketEmitter(identifier: 'commerce-ws-emitter') và đăng kýCommerceSocketEventService(kế thừaBaseSocketEventServicecủa@nx/core). Nền Redis (single/cluster). Emitter là kênh cho cập nhật thời gian thực hướng merchant (ví dụ làm mất hiệu lực cache footer-summary tham chiếu đường push socket).
| Khía cạnh | Giá trị |
|---|---|
| Component | ApplicationWebSocketComponent (src/components/websocket/) |
| Service | CommerceSocketEventService |
| Emitter binding | CommerceWebSocketBindingKeys.WEBSOCKET_EMITTER |
| Transport | Redis (APP_ENV_WEBSOCKET_REDIS_*), single hoặc cluster |
Hằng số topic/room không được liệt kê trong một
websocket.tsriêng (khác với@nx/sale); hình dạng phát tuân theo quy ước room củaBaseSocketEventService.
6. EventBus in-process (eventemitter3)
EventBusComponentđấu nối một singletonEventBus, hai listener và một registry. Khóa sự kiện trongsrc/common/event.ts.
| Khóa sự kiện | Hằng số | Phát bởi | Hành động listener |
|---|---|---|---|
product.aggregate.created | EVENT_EMITTER.PRODUCT.AGGREGATE_CREATED | ProductCreateService | handle (đánh dấu bước onboarding) + pushJobToQueue |
product.aggregate.updated | EVENT_EMITTER.PRODUCT.AGGREGATE_UPDATED | ProductUpdateService | pushJobToQueue |
Khóa
createdđược đăng ký hai lần — một chohandle(onboarding) và một chopushJobToQueue(đưa sync vào hàng đợi).
7. Schema Payload
// IProductAggregateCreatedEventData (src/common/interface)
{
payloadRequest: TCreateProductRequest; // request aggregate gốc
productId: string; // id product chính vừa tạo
syncMerchantIds?: string[]; // merchant bổ sung để sao chép tới
step?: TMerchantOnboardingStep; // bước onboarding cần đánh dấu (mặc định PRODUCT)
}
// IProductAggregateUpdatedEventData — tương tự (productId + sync merchants)
// SYNC_PRODUCT_QUEUE create job
{ primaryProductId: string; additionalMerchantIds: string[]; data: TCreateProductRequest }
// SYNC_PRODUCT_QUEUE update job
{ primaryProductId: string; additionalMerchantIds: string[] }8. Idempotency & Thứ tự
| Bề mặt | Phân phối | Thứ tự | Phục hồi |
|---|---|---|---|
| CDC (Debezium) | at-least-once | theo từng hàng (thứ tự Postgres WAL, khóa theo PK) | replay offset Debezium; consumer idempotent theo PK |
| EventBus | in-process, best-effort | đồng bộ trong tiến trình | mất khi crash trước khi đưa vào hàng đợi (aggregate đã commit) |
BullMQ SYNC_PRODUCT_QUEUE | at-least-once | theo từng job | retry/backoff BullMQ; sync chạy lại an toàn (ngữ nghĩa upsert) |
| WebSocket | best-effort | không (broadcast) | client refetch từ REST khi kết nối lại |
9. Trang liên quan
- Tích hợp — hợp đồng CDC consumer theo từng dịch vụ chị em
- Kiến trúc — Kịch bản Runtime
- Cấu hình — env Kafka / BullMQ / WebSocket
- Quyết định