API Sự kiện
1. Inbound — Kafka Topics
Subscribe trong
src/components/kafka/component.ts:74-80(hằng sốSUBSCRIBED_TOPICS). Consumer là autocommit=false — commit thủ công sau khi handler trả về.
| Topic | Hằng số | Producer | Handler | Idempotency Key |
|---|---|---|---|---|
payment.success | KafkaTopics.PAYMENT_SUCCESS | @nx/sale (sau khi MQ-Pay attempt success) | InventoryWorkerService.handlePaymentSuccess | (referenceType=SALE_ORDER, referenceId=saleOrderId, inventoryStockId) qua lookup InventoryTracking |
kitchen-ticket-item.status-changed | KafkaTopics.KITCHEN_TICKET_ITEM_STATUS_CHANGED | @nx/sale (KitchenTicketItemService) | MaterialWorkerService.handleKitchenTicketItemStatusChanged | theo (saleOrderId, kitchenTicketItemId, materialId) |
material.transferred | KafkaTopics.MATERIAL_TRANSFERRED | @nx/inventory (self-loop cho cross-location) | InventoryWorkerService.handleMaterialTransferred | theo (transferId, fromStockId, toStockId) |
nx.seller.public.merchant | CDCKafkaTopics.MERCHANT | Debezium (PostgreSQL WAL) | InventoryWorkerService.handleMerchantCDC | ensureDefaultLocation(merchantId) là idempotent |
nx.seller.public.product_variant | CDCKafkaTopics.PRODUCT_VARIANT | Debezium (PostgreSQL WAL) | InventoryWorkerService.handleProductVariantCDC | ensureInventoryItem là idempotent |
Failure mode
| Tình huống | Hành vi |
|---|---|
| Handler ném lỗi | Log qua onMessageError, bỏ qua commit → re-deliver ở lần poll kế tiếp |
Block do forceNonNegative (oversell) | Ghi tracking row với note OVERSELL_BLOCKED; bỏ qua trừ stock im lặng; commit thành công |
| Trúng idempotency (key đã xử lý) | Handler short-circuit, commit thành công |
| Topic không xác định | Log warning ở nhánh default của _dispatchMessage |
2. Outbound — Kafka Topics
| Topic | Hằng số | Trigger | Vị trí phát | Consumers |
|---|---|---|---|---|
purchase-order.received | KafkaTopics.PURCHASE_ORDER_RECEIVED | Sau khi PO chuyển sang RECEIVED/COMPLETED, chỉ khi payments[] không rỗng | purchase-order.service.ts (post-transaction-commit, fire-and-forget) | @nx/finance (ghi nhận chân COGS / EXPENSE) |
material.stock-changed | KafkaTopics.MATERIAL_STOCK_CHANGED | Sau khi material được tiêu thụ (kitchen ticket READY) | material-worker.service.ts | @nx/sale (đồng bộ UI kitchen ticket) |
material.transferred | KafkaTopics.MATERIAL_TRANSFERRED | Sau khi chuyển material giữa các location | material-worker.service.ts | self (re-consume cho location nhận) |
Cấu hình Producer (src/components/kafka/component.ts:140-159)
| Cài đặt | Giá trị |
|---|---|
acks | KafkaAcks.ALL |
idempotent | true |
compression | lz4 |
requestTimeout | 60_000ms |
connectTimeout | 30_000ms |
3. Inbound — BullMQ
N/A. Inventory chỉ dùng Kafka cho async cross-service; không có BullMQ queue.
4. Outbound — BullMQ
N/A.
5. WebSocket Emissions
src/common/websocket.ts. Một topic, ba loại room.
| Topic | Hằng số | Nguồn | Trigger |
|---|---|---|---|
observation/inventory/inventory-stock | InventoryWebSocketTopics.INVENTORY_STOCK | InventorySocketEventService | Thay đổi stock (sau khi ghi adjustStock) |
Rooms (InventoryWebSocketRooms)
| Helper | Đường dẫn room | Subscriber |
|---|---|---|
getMerchantInventoryStockRoom(merchantId) | merchants/<merchantId>/inventory-stocks | Listener cấp merchant (admin, ops) |
getInventoryLocationRoom(locationId) | inventory-locations/<locationId> | Dashboard theo location |
getInventoryItemRoom(itemId) | inventory-items/<itemId> | Trang chi tiết theo item |
Cả ba room được phát song song cho mỗi thay đổi stock đơn lẻ (
getAllInventoryRooms()).
6. Payload Schemas
Định nghĩa nguồn-sự-thật nằm trong
@nx/core/src/models/messages/.
ts
// PAYMENT_SUCCESS — TSalePaymentSuccess
export interface TSalePaymentSuccess {
saleOrderId: string;
saleOrderNumber: string;
saleOrderStatus: string;
merchantId: string;
saleChannelId: string;
createdBy?: string;
modifiedBy?: string;
payment: { total: number; paid: number; status: string };
items: Array<{
itemId: string;
itemType: 'PRODUCT_VARIANT' | 'MATERIAL';
quantity: number;
unitPrice: number;
tax: number;
discount: number;
subtotal: number;
total: number;
}>;
}
// KITCHEN_TICKET_ITEM_STATUS_CHANGED — TKitchenTicketItemStatusChangedMessage
export interface TKitchenTicketItemStatusChangedMessage {
kitchenTicketItemId: string;
kitchenTicketId: string;
saleOrderId: string;
merchantId: string;
status: 'PENDING' | 'COOKING' | 'READY' | 'SERVED' | 'VOIDED';
item: {
saleOrderItemId: string;
itemType: 'PRODUCT_VARIANT' | 'MATERIAL';
itemId: string;
quantity: number;
};
}
// PURCHASE_ORDER_RECEIVED — TPurchaseOrderReceived
export interface TPurchaseOrderReceived {
purchaseOrderId: string;
merchantId: string;
payments: Array<{
financeWalletId: string;
amount: number;
currency: string;
exchangeRate: number;
}>;
items: Array<{
inventoryItemId: string;
quantity: number;
unitCost: number;
uomId?: string;
multiplier?: number;
effectiveAt: string; // ISO
}>;
}
// MATERIAL_STOCK_CHANGED — TMaterialStockChangedMessage
export interface TMaterialStockChangedMessage {
materialId: string;
merchantId: string;
inventoryStockId: string;
quantityBefore: number;
quantityAfter: number;
delta: number;
referenceType: string;
referenceId: string;
}
// CDC envelope — TDebeziumMessage<T>
export interface TDebeziumMessage<T> {
payload: T;
// Debezium also sends schema/op fields; agents typically only consume `payload`
}7. Idempotency & Thứ tự
| Topic | Delivery | Thứ tự | Recovery |
|---|---|---|---|
PAYMENT_SUCCESS | at-least-once | theo saleOrderId qua Kafka key | re-deliver kích hoạt idempotency lookup → skip |
KITCHEN_TICKET_ITEM_STATUS_CHANGED | at-least-once | theo saleOrderId | re-deliver → skip qua tracking lookup |
MATERIAL_TRANSFERRED | at-least-once | theo transferId | adjustStock idempotent + tracking |
MERCHANT CDC | at-least-once | theo row | ensureDefaultLocation idempotent |
PRODUCT_VARIANT CDC | at-least-once | theo row | ensureInventoryItem idempotent |
Quy tắc emit outbound: chỉ sau khi commit transaction. Service giữ lời gọi producer cho đến khi DB transaction thành công; nếu emit lỗi, log + retry async (không rollback dual-write).
8. Trang liên quan
- Tích hợp
- Kiến trúc — Kịch bản Runtime
- Inventory Tracking — idempotency keys