Skip to content

API Events

1. Inbound — Kafka Topics

Subscribed in src/components/kafka/component.ts:74-80 (SUBSCRIBED_TOPICS const). Consumer is autocommit=false — manual commit after handler returns.

TopicConstantProducerHandlerIdempotency Key
payment.successKafkaTopics.PAYMENT_SUCCESS@nx/sale (after MQ-Pay attempt success)InventoryWorkerService.handlePaymentSuccess(referenceType=SALE_ORDER, referenceId=saleOrderId, inventoryStockId) via InventoryTracking lookup
kitchen-ticket-item.status-changedKafkaTopics.KITCHEN_TICKET_ITEM_STATUS_CHANGED@nx/sale (KitchenTicketItemService)MaterialWorkerService.handleKitchenTicketItemStatusChangedper (saleOrderId, kitchenTicketItemId, materialId)
material.transferredKafkaTopics.MATERIAL_TRANSFERRED@nx/inventory (self-loop for cross-location)InventoryWorkerService.handleMaterialTransferredper (transferId, fromStockId, toStockId)
nx.seller.public.merchantCDCKafkaTopics.MERCHANTDebezium (PostgreSQL WAL)InventoryWorkerService.handleMerchantCDCensureDefaultLocation(merchantId) is idempotent
nx.seller.public.product_variantCDCKafkaTopics.PRODUCT_VARIANTDebezium (PostgreSQL WAL)InventoryWorkerService.handleProductVariantCDCensureInventoryItem is idempotent

Failure mode

OutcomeBehavior
Handler throwsLogged via onMessageError, commit skipped → re-delivered on next poll
forceNonNegative block (oversell)Tracking row with note OVERSELL_BLOCKED written; deduction skipped silently; commit succeeds
Idempotency hit (already-processed key)Handler short-circuits, commit succeeds
Unknown topicLogged warning in _dispatchMessage default branch

2. Outbound — Kafka Topics

TopicConstantTriggerProducer SiteConsumers
purchase-order.receivedKafkaTopics.PURCHASE_ORDER_RECEIVEDAfter PO transitions to RECEIVED/COMPLETED, only when payments[] non-emptypurchase-order.service.ts (post-transaction-commit, fire-and-forget)@nx/finance (record COGS / EXPENSE legs)
material.stock-changedKafkaTopics.MATERIAL_STOCK_CHANGEDAfter material consumed (kitchen ticket READY)material-worker.service.ts@nx/sale (kitchen ticket UI sync)
material.transferredKafkaTopics.MATERIAL_TRANSFERREDAfter cross-location material movementmaterial-worker.service.tsself (re-consume for receiving location)

Producer config (src/components/kafka/component.ts:140-159)

SettingValue
acksKafkaAcks.ALL
idempotenttrue
compressionlz4
requestTimeout60_000ms
connectTimeout30_000ms

3. Inbound — BullMQ

N/A. Inventory uses Kafka exclusively for cross-service async; no BullMQ queues.

4. Outbound — BullMQ

N/A.

5. WebSocket Emissions

src/common/websocket.ts. One topic, three room types.

TopicConstantSourceTrigger
observation/inventory/inventory-stockInventoryWebSocketTopics.INVENTORY_STOCKInventorySocketEventServiceStock change (post adjustStock write)

Rooms (InventoryWebSocketRooms)

HelperRoom pathSubscriber
getMerchantInventoryStockRoom(merchantId)merchants/<merchantId>/inventory-stocksMerchant-wide listener (admin, ops)
getInventoryLocationRoom(locationId)inventory-locations/<locationId>Per-location dashboard
getInventoryItemRoom(itemId)inventory-items/<itemId>Per-item detail page

All three rooms are emitted in parallel for any single stock change (getAllInventoryRooms()).

6. Payload Schemas

Source-of-truth definitions live in @nx/core/src/models/messages/.

ts
// PAYMENT_SUCCESS — TSalePaymentSuccess
export interface TSalePaymentSuccess {
  saleOrderId: string;
  saleOrderNumber: string;
  saleOrderStatus: string;
  merchantId: string;
  saleChannelId: string;
  createdBy?: string;
  modifiedBy?: string;
  payment: { total: number; paid: number; status: string };
  items: Array<{
    itemId: string;
    itemType: 'PRODUCT_VARIANT' | 'MATERIAL';
    quantity: number;
    unitPrice: number;
    tax: number;
    discount: number;
    subtotal: number;
    total: number;
  }>;
}

// KITCHEN_TICKET_ITEM_STATUS_CHANGED — TKitchenTicketItemStatusChangedMessage
export interface TKitchenTicketItemStatusChangedMessage {
  kitchenTicketItemId: string;
  kitchenTicketId: string;
  saleOrderId: string;
  merchantId: string;
  status: 'PENDING' | 'COOKING' | 'READY' | 'SERVED' | 'VOIDED';
  item: {
    saleOrderItemId: string;
    itemType: 'PRODUCT_VARIANT' | 'MATERIAL';
    itemId: string;
    quantity: number;
  };
}

// PURCHASE_ORDER_RECEIVED — TPurchaseOrderReceived
export interface TPurchaseOrderReceived {
  purchaseOrderId: string;
  merchantId: string;
  payments: Array<{
    financeWalletId: string;
    amount: number;
    currency: string;
    exchangeRate: number;
  }>;
  items: Array<{
    inventoryItemId: string;
    quantity: number;
    unitCost: number;
    uomId?: string;
    multiplier?: number;
    effectiveAt: string; // ISO
  }>;
}

// MATERIAL_STOCK_CHANGED — TMaterialStockChangedMessage
export interface TMaterialStockChangedMessage {
  materialId: string;
  merchantId: string;
  inventoryStockId: string;
  quantityBefore: number;
  quantityAfter: number;
  delta: number;
  referenceType: string;
  referenceId: string;
}

// CDC envelope — TDebeziumMessage<T>
export interface TDebeziumMessage<T> {
  payload: T;
  // Debezium also sends schema/op fields; agents typically only consume `payload`
}

7. Idempotency & Ordering

TopicDeliveryOrderingRecovery
PAYMENT_SUCCESSat-least-onceper-saleOrderId via Kafka keyre-deliver triggers idempotency lookup → skip
KITCHEN_TICKET_ITEM_STATUS_CHANGEDat-least-onceper-saleOrderIdre-deliver → skip via tracking lookup
MATERIAL_TRANSFERREDat-least-onceper-transferIdidempotent adjustStock + tracking
MERCHANT CDCat-least-onceper-rowensureDefaultLocation idempotent
PRODUCT_VARIANT CDCat-least-onceper-rowensureInventoryItem idempotent

Outbound emission rule: post-transaction-commit only. Service holds the producer call until DB transaction succeeds; if emit fails, log + retry async (no dual-write rollback).

Proprietary and Confidential. Unauthorized copying, distribution, or use of this software is strictly prohibited.