API Events
1. Inbound — Kafka Topics
Subscribed in
src/components/kafka/component.ts:74-80(SUBSCRIBED_TOPICSconst). Consumer is autocommit=false — manual commit after handler returns.
| Topic | Constant | Producer | Handler | Idempotency Key |
|---|---|---|---|---|
payment.success | KafkaTopics.PAYMENT_SUCCESS | @nx/sale (after MQ-Pay attempt success) | InventoryWorkerService.handlePaymentSuccess | (referenceType=SALE_ORDER, referenceId=saleOrderId, inventoryStockId) via InventoryTracking lookup |
kitchen-ticket-item.status-changed | KafkaTopics.KITCHEN_TICKET_ITEM_STATUS_CHANGED | @nx/sale (KitchenTicketItemService) | MaterialWorkerService.handleKitchenTicketItemStatusChanged | per (saleOrderId, kitchenTicketItemId, materialId) |
material.transferred | KafkaTopics.MATERIAL_TRANSFERRED | @nx/inventory (self-loop for cross-location) | InventoryWorkerService.handleMaterialTransferred | per (transferId, fromStockId, toStockId) |
nx.seller.public.merchant | CDCKafkaTopics.MERCHANT | Debezium (PostgreSQL WAL) | InventoryWorkerService.handleMerchantCDC | ensureDefaultLocation(merchantId) is idempotent |
nx.seller.public.product_variant | CDCKafkaTopics.PRODUCT_VARIANT | Debezium (PostgreSQL WAL) | InventoryWorkerService.handleProductVariantCDC | ensureInventoryItem is idempotent |
Failure mode
| Outcome | Behavior |
|---|---|
| Handler throws | Logged via onMessageError, commit skipped → re-delivered on next poll |
forceNonNegative block (oversell) | Tracking row with note OVERSELL_BLOCKED written; deduction skipped silently; commit succeeds |
| Idempotency hit (already-processed key) | Handler short-circuits, commit succeeds |
| Unknown topic | Logged warning in _dispatchMessage default branch |
2. Outbound — Kafka Topics
| Topic | Constant | Trigger | Producer Site | Consumers |
|---|---|---|---|---|
purchase-order.received | KafkaTopics.PURCHASE_ORDER_RECEIVED | After PO transitions to RECEIVED/COMPLETED, only when payments[] non-empty | purchase-order.service.ts (post-transaction-commit, fire-and-forget) | @nx/finance (record COGS / EXPENSE legs) |
material.stock-changed | KafkaTopics.MATERIAL_STOCK_CHANGED | After material consumed (kitchen ticket READY) | material-worker.service.ts | @nx/sale (kitchen ticket UI sync) |
material.transferred | KafkaTopics.MATERIAL_TRANSFERRED | After cross-location material movement | material-worker.service.ts | self (re-consume for receiving location) |
Producer config (src/components/kafka/component.ts:140-159)
| Setting | Value |
|---|---|
acks | KafkaAcks.ALL |
idempotent | true |
compression | lz4 |
requestTimeout | 60_000ms |
connectTimeout | 30_000ms |
3. Inbound — BullMQ
N/A. Inventory uses Kafka exclusively for cross-service async; no BullMQ queues.
4. Outbound — BullMQ
N/A.
5. WebSocket Emissions
src/common/websocket.ts. One topic, three room types.
| Topic | Constant | Source | Trigger |
|---|---|---|---|
observation/inventory/inventory-stock | InventoryWebSocketTopics.INVENTORY_STOCK | InventorySocketEventService | Stock change (post adjustStock write) |
Rooms (InventoryWebSocketRooms)
| Helper | Room path | Subscriber |
|---|---|---|
getMerchantInventoryStockRoom(merchantId) | merchants/<merchantId>/inventory-stocks | Merchant-wide listener (admin, ops) |
getInventoryLocationRoom(locationId) | inventory-locations/<locationId> | Per-location dashboard |
getInventoryItemRoom(itemId) | inventory-items/<itemId> | Per-item detail page |
All three rooms are emitted in parallel for any single stock change (
getAllInventoryRooms()).
6. Payload Schemas
Source-of-truth definitions live in
@nx/core/src/models/messages/.
ts
// PAYMENT_SUCCESS — TSalePaymentSuccess
export interface TSalePaymentSuccess {
saleOrderId: string;
saleOrderNumber: string;
saleOrderStatus: string;
merchantId: string;
saleChannelId: string;
createdBy?: string;
modifiedBy?: string;
payment: { total: number; paid: number; status: string };
items: Array<{
itemId: string;
itemType: 'PRODUCT_VARIANT' | 'MATERIAL';
quantity: number;
unitPrice: number;
tax: number;
discount: number;
subtotal: number;
total: number;
}>;
}
// KITCHEN_TICKET_ITEM_STATUS_CHANGED — TKitchenTicketItemStatusChangedMessage
export interface TKitchenTicketItemStatusChangedMessage {
kitchenTicketItemId: string;
kitchenTicketId: string;
saleOrderId: string;
merchantId: string;
status: 'PENDING' | 'COOKING' | 'READY' | 'SERVED' | 'VOIDED';
item: {
saleOrderItemId: string;
itemType: 'PRODUCT_VARIANT' | 'MATERIAL';
itemId: string;
quantity: number;
};
}
// PURCHASE_ORDER_RECEIVED — TPurchaseOrderReceived
export interface TPurchaseOrderReceived {
purchaseOrderId: string;
merchantId: string;
payments: Array<{
financeWalletId: string;
amount: number;
currency: string;
exchangeRate: number;
}>;
items: Array<{
inventoryItemId: string;
quantity: number;
unitCost: number;
uomId?: string;
multiplier?: number;
effectiveAt: string; // ISO
}>;
}
// MATERIAL_STOCK_CHANGED — TMaterialStockChangedMessage
export interface TMaterialStockChangedMessage {
materialId: string;
merchantId: string;
inventoryStockId: string;
quantityBefore: number;
quantityAfter: number;
delta: number;
referenceType: string;
referenceId: string;
}
// CDC envelope — TDebeziumMessage<T>
export interface TDebeziumMessage<T> {
payload: T;
// Debezium also sends schema/op fields; agents typically only consume `payload`
}7. Idempotency & Ordering
| Topic | Delivery | Ordering | Recovery |
|---|---|---|---|
PAYMENT_SUCCESS | at-least-once | per-saleOrderId via Kafka key | re-deliver triggers idempotency lookup → skip |
KITCHEN_TICKET_ITEM_STATUS_CHANGED | at-least-once | per-saleOrderId | re-deliver → skip via tracking lookup |
MATERIAL_TRANSFERRED | at-least-once | per-transferId | idempotent adjustStock + tracking |
MERCHANT CDC | at-least-once | per-row | ensureDefaultLocation idempotent |
PRODUCT_VARIANT CDC | at-least-once | per-row | ensureInventoryItem idempotent |
Outbound emission rule: post-transaction-commit only. Service holds the producer call until DB transaction succeeds; if emit fails, log + retry async (no dual-write rollback).
8. Related Pages
- Integration
- Architecture — Runtime Scenarios
- Inventory Tracking — idempotency keys