Skip to content

API Events

Finance is consumer-first. The Kafka component registers an idempotent producer (acks=ALL, lz4) for future use, but nothing is published today. There is no BullMQ. A WebSocket emitter is registered (finance-ws-emitter) but no domain topics are wired yet.

1. Inbound — Kafka

Subscribed in src/components/kafka/component.ts (SUBSCRIBED_TOPICS). Consumer autocommit=false, commits per message on success, fallback latest. Group SVC-00040-FINANCE_CONSUMER_GROUP.

TopicConstantProducerHandlerIdempotency KeyFailure Mode
payment.successKafkaTopics.PAYMENT_SUCCESS@nx/saleFinanceWorkerService.handlePaymentSuccess(SALE_ORDER, sourceEventUid=attempt.uid)throw → offset not committed → redelivered
purchase-order.receivedKafkaTopics.PURCHASE_ORDER_RECEIVED@nx/inventoryhandlePurchaseOrderReceived(merchantId, PAYMENT, PURCHASE_ORDER, sourceId=purchaseOrderId)throw → redelivered
inventory.issued-for-saleKafkaTopics.INVENTORY_ISSUED_FOR_SALE@nx/inventoryhandleInventoryIssuedForSale(SALE_ORDER, sourceEventUid=attemptUid)throw → redelivered
inventory.adjustedKafkaTopics.INVENTORY_ADJUSTED@nx/inventoryhandleInventoryAdjusted(INVENTORY_ADJUSTMENT, sourceEventUid=inventoryTrackingId)throw → redelivered
cdc.public.MerchantCDCKafkaTopics.MERCHANTDebezium (commerce)handleMerchantCDCreconciliation is idempotent (existence checks)throw → redelivered

Handler effects

HandlerVoucher producedNotes
handlePaymentSuccessRECEIPT (1 DEBIT line)Skips (INFO) when payment.attempt.finance.source.id absent; category falls back to 000_SALE
handlePurchaseOrderReceivedPAYMENT (party VENDOR)CREDIT cash legs; adds DEBIT INVENTORY asset leg when inventoryValue>0 and control account exists
handleInventoryIssuedForSaleADJUSTMENT (reason cogs)DEBIT COGS / CREDIT INVENTORY by totalCostBasis; skips when <= 0
handleInventoryAdjustedADJUSTMENT (reason inventory_adjustment)Single line: DEBIT if valueDelta>0, CREDIT if <0; skips when 0
handleMerchantCDC (op c/r)noneSeeds default accounts + INVENTORY/COGS control accounts; marks onboarding step FINANCE_ACCOUNT

2. Outbound — Kafka

None. The producer is initialized and bound (BindingKeys.APPLICATION_KAFKA_PRODUCER) but no code path publishes. Reserved for future finance-originated events.

3. Inbound — BullMQ

None. Finance does not run a queue.

4. Outbound — BullMQ

None.

5. WebSocket Emissions

ApplicationWebSocketComponent binds a WebSocketEmitter (finance-ws-emitter, Redis single/cluster) and registers FinanceSocketEventService (a thin BaseSocketEventService). No finance-specific topics or rooms are emitted in the current code — the plumbing is in place for future real-time account/voucher broadcasts.

6. Payload Schemas

Source of truth: packages/core/src/common/kafka/types.ts.

ts
// payment.success — TSalePaymentSuccess (finance reads payment.attempt.finance)
export type TSalePaymentSuccess = {
  saleOrderId: string;
  saleOrderNumber: string;
  saleOrderStatus: string;
  merchantId: string;
  saleChannelId: string;
  createdBy: string;
  modifiedBy: string;
  items: Array<{ id: string; itemType: string; itemId: string; quantity: number; mode: string; recipeId?: string }>;
  payment: {
    total: number; paid: number; currency: string; isFullyPaid: boolean; paidAt: string; sessionId?: string;
    attempt?: {
      uid: string;            // idempotency key
      amount: number;         // this tender's amount
      finance?: {
        name: string; code: string;
        source: { type: string; id: string };               // → FinanceAccount.id
        category?: { name: string; code: string; source: { type: string; id: string } };
      };
    };
  };
};

// purchase-order.received — TPurchaseOrderReceived
export type TPurchaseOrderReceived = {
  merchantId: string;
  purchaseOrderId: string;
  receivedAt: string;
  payments: Array<{ financeAccountId: string; amount: string; currency?: string; transactionDate?: string; exchangeRate?: string; note?: string | null }>;
  items?: Array<{ inventoryItemId: string; inventoryLocationId: string; quantity: string; unitCost: string; uom: string; effectiveAt: string }>;
};

// inventory.issued-for-sale — TInventoryIssuedForSale
export type TInventoryIssuedForSale = {
  saleOrderId: string;
  saleOrderNumber: string;
  merchantId: string;
  attemptUid: string;     // idempotency key
  currency: string;
  transactionDate: string;
  totalCostBasis: number;
  items: Array<{ inventoryItemId: string; inventoryLocationId: string; inventoryTrackingId: string; quantity: number; unitCost: number; costBasis: number; costingMethod: 'AVERAGE' | 'ZERO' }>;
};

// inventory.adjusted — TInventoryAdjusted
export type TInventoryAdjusted = {
  merchantId: string;
  inventoryStockId: string;
  inventoryItemId: string;
  inventoryLocationId: string;
  inventoryTrackingId: string;   // idempotency key
  reason: string;
  quantityBefore: string; quantityChange: string; quantityAfter: string;
  oldAverageCost: number; newAverageCost: number;
  valueDelta: number;            // sign drives DEBIT/CREDIT
  note?: string;
  adjustedAt: string;
};

// cdc.public.Merchant — TDebeziumMessage<TMerchantPgRow> (payload.after = merchant row)

7. Idempotency & Ordering

TopicDeliveryOrderingRecovery
payment.successat-least-onceper-key (saleOrder)dedup on (SALE_ORDER, attempt.uid) → existing voucher replayed
purchase-order.receivedat-least-onceper-keydedup on (merchantId, PAYMENT, PURCHASE_ORDER, purchaseOrderId)
inventory.issued-for-saleat-least-onceper-keydedup on (SALE_ORDER, attemptUid)
inventory.adjustedat-least-onceper-keydedup on (INVENTORY_ADJUSTMENT, inventoryTrackingId)
cdc.public.Merchantat-least-onceper-key (merchant id)reconciliation re-runs harmlessly (account existence checks + idempotent onboarding mark)

On any handler exception the offset is not committed, so the message is redelivered. Because every posting path is idempotent, redelivery is safe.

Proprietary and Confidential. Unauthorized copying, distribution, or use of this software is strictly prohibited.