Skip to content

API Sự kiện

1. Inbound — Kafka Topics

Subscribe trong src/components/kafka/component.ts:74-80 (hằng số SUBSCRIBED_TOPICS). Consumer là autocommit=false — commit thủ công sau khi handler trả về.

TopicHằng sốProducerHandlerIdempotency Key
payment.successKafkaTopics.PAYMENT_SUCCESS@nx/sale (sau khi MQ-Pay attempt success)InventoryWorkerService.handlePaymentSuccess(referenceType=SALE_ORDER, referenceId=saleOrderId, inventoryStockId) qua lookup InventoryTracking
kitchen-ticket-item.status-changedKafkaTopics.KITCHEN_TICKET_ITEM_STATUS_CHANGED@nx/sale (KitchenTicketItemService)MaterialWorkerService.handleKitchenTicketItemStatusChangedtheo (saleOrderId, kitchenTicketItemId, materialId)
material.transferredKafkaTopics.MATERIAL_TRANSFERRED@nx/inventory (self-loop cho cross-location)InventoryWorkerService.handleMaterialTransferredtheo (transferId, fromStockId, toStockId)
nx.seller.public.merchantCDCKafkaTopics.MERCHANTDebezium (PostgreSQL WAL)InventoryWorkerService.handleMerchantCDCensureDefaultLocation(merchantId) là idempotent
nx.seller.public.product_variantCDCKafkaTopics.PRODUCT_VARIANTDebezium (PostgreSQL WAL)InventoryWorkerService.handleProductVariantCDCensureInventoryItem là idempotent

Failure mode

Tình huốngHành vi
Handler ném lỗiLog qua onMessageError, bỏ qua commit → re-deliver ở lần poll kế tiếp
Block do forceNonNegative (oversell)Ghi tracking row với note OVERSELL_BLOCKED; bỏ qua trừ stock im lặng; commit thành công
Trúng idempotency (key đã xử lý)Handler short-circuit, commit thành công
Topic không xác địnhLog warning ở nhánh default của _dispatchMessage

2. Outbound — Kafka Topics

TopicHằng sốTriggerVị trí phátConsumers
purchase-order.receivedKafkaTopics.PURCHASE_ORDER_RECEIVEDSau khi PO chuyển sang RECEIVED/COMPLETED, chỉ khi payments[] không rỗngpurchase-order.service.ts (post-transaction-commit, fire-and-forget)@nx/finance (ghi nhận chân COGS / EXPENSE)
material.stock-changedKafkaTopics.MATERIAL_STOCK_CHANGEDSau khi material được tiêu thụ (kitchen ticket READY)material-worker.service.ts@nx/sale (đồng bộ UI kitchen ticket)
material.transferredKafkaTopics.MATERIAL_TRANSFERREDSau khi chuyển material giữa các locationmaterial-worker.service.tsself (re-consume cho location nhận)

Cấu hình Producer (src/components/kafka/component.ts:140-159)

Cài đặtGiá trị
acksKafkaAcks.ALL
idempotenttrue
compressionlz4
requestTimeout60_000ms
connectTimeout30_000ms

3. Inbound — BullMQ

N/A. Inventory chỉ dùng Kafka cho async cross-service; không có BullMQ queue.

4. Outbound — BullMQ

N/A.

5. WebSocket Emissions

src/common/websocket.ts. Một topic, ba loại room.

TopicHằng sốNguồnTrigger
observation/inventory/inventory-stockInventoryWebSocketTopics.INVENTORY_STOCKInventorySocketEventServiceThay đổi stock (sau khi ghi adjustStock)

Rooms (InventoryWebSocketRooms)

HelperĐường dẫn roomSubscriber
getMerchantInventoryStockRoom(merchantId)merchants/<merchantId>/inventory-stocksListener cấp merchant (admin, ops)
getInventoryLocationRoom(locationId)inventory-locations/<locationId>Dashboard theo location
getInventoryItemRoom(itemId)inventory-items/<itemId>Trang chi tiết theo item

Cả ba room được phát song song cho mỗi thay đổi stock đơn lẻ (getAllInventoryRooms()).

6. Payload Schemas

Định nghĩa nguồn-sự-thật nằm trong @nx/core/src/models/messages/.

ts
// PAYMENT_SUCCESS — TSalePaymentSuccess
export interface TSalePaymentSuccess {
  saleOrderId: string;
  saleOrderNumber: string;
  saleOrderStatus: string;
  merchantId: string;
  saleChannelId: string;
  createdBy?: string;
  modifiedBy?: string;
  payment: { total: number; paid: number; status: string };
  items: Array<{
    itemId: string;
    itemType: 'PRODUCT_VARIANT' | 'MATERIAL';
    quantity: number;
    unitPrice: number;
    tax: number;
    discount: number;
    subtotal: number;
    total: number;
  }>;
}

// KITCHEN_TICKET_ITEM_STATUS_CHANGED — TKitchenTicketItemStatusChangedMessage
export interface TKitchenTicketItemStatusChangedMessage {
  kitchenTicketItemId: string;
  kitchenTicketId: string;
  saleOrderId: string;
  merchantId: string;
  status: 'PENDING' | 'COOKING' | 'READY' | 'SERVED' | 'VOIDED';
  item: {
    saleOrderItemId: string;
    itemType: 'PRODUCT_VARIANT' | 'MATERIAL';
    itemId: string;
    quantity: number;
  };
}

// PURCHASE_ORDER_RECEIVED — TPurchaseOrderReceived
export interface TPurchaseOrderReceived {
  purchaseOrderId: string;
  merchantId: string;
  payments: Array<{
    financeWalletId: string;
    amount: number;
    currency: string;
    exchangeRate: number;
  }>;
  items: Array<{
    inventoryItemId: string;
    quantity: number;
    unitCost: number;
    uomId?: string;
    multiplier?: number;
    effectiveAt: string; // ISO
  }>;
}

// MATERIAL_STOCK_CHANGED — TMaterialStockChangedMessage
export interface TMaterialStockChangedMessage {
  materialId: string;
  merchantId: string;
  inventoryStockId: string;
  quantityBefore: number;
  quantityAfter: number;
  delta: number;
  referenceType: string;
  referenceId: string;
}

// CDC envelope — TDebeziumMessage<T>
export interface TDebeziumMessage<T> {
  payload: T;
  // Debezium also sends schema/op fields; agents typically only consume `payload`
}

7. Idempotency & Thứ tự

TopicDeliveryThứ tựRecovery
PAYMENT_SUCCESSat-least-oncetheo saleOrderId qua Kafka keyre-deliver kích hoạt idempotency lookup → skip
KITCHEN_TICKET_ITEM_STATUS_CHANGEDat-least-oncetheo saleOrderIdre-deliver → skip qua tracking lookup
MATERIAL_TRANSFERREDat-least-oncetheo transferIdadjustStock idempotent + tracking
MERCHANT CDCat-least-oncetheo rowensureDefaultLocation idempotent
PRODUCT_VARIANT CDCat-least-oncetheo rowensureInventoryItem idempotent

Quy tắc emit outbound: chỉ sau khi commit transaction. Service giữ lời gọi producer cho đến khi DB transaction thành công; nếu emit lỗi, log + retry async (không rollback dual-write).

8. Trang liên quan

Proprietary and Confidential. Unauthorized copying, distribution, or use of this software is strictly prohibited.