API Events
Finance is consumer-first. The Kafka component registers an idempotent producer (
acks=ALL, lz4) for future use, but nothing is published today. There is no BullMQ. A WebSocket emitter is registered (finance-ws-emitter) but no domain topics are wired yet.
1. Inbound — Kafka
Subscribed in
src/components/kafka/component.ts(SUBSCRIBED_TOPICS). Consumerautocommit=false, commits per message on success, fallbacklatest. GroupSVC-00040-FINANCE_CONSUMER_GROUP.
| Topic | Constant | Producer | Handler | Idempotency Key | Failure Mode |
|---|---|---|---|---|---|
payment.success | KafkaTopics.PAYMENT_SUCCESS | @nx/sale | FinanceWorkerService.handlePaymentSuccess | (SALE_ORDER, sourceEventUid=attempt.uid) | throw → offset not committed → redelivered |
purchase-order.received | KafkaTopics.PURCHASE_ORDER_RECEIVED | @nx/inventory | handlePurchaseOrderReceived | (merchantId, PAYMENT, PURCHASE_ORDER, sourceId=purchaseOrderId) | throw → redelivered |
inventory.issued-for-sale | KafkaTopics.INVENTORY_ISSUED_FOR_SALE | @nx/inventory | handleInventoryIssuedForSale | (SALE_ORDER, sourceEventUid=attemptUid) | throw → redelivered |
inventory.adjusted | KafkaTopics.INVENTORY_ADJUSTED | @nx/inventory | handleInventoryAdjusted | (INVENTORY_ADJUSTMENT, sourceEventUid=inventoryTrackingId) | throw → redelivered |
cdc.public.Merchant | CDCKafkaTopics.MERCHANT | Debezium (commerce) | handleMerchantCDC | reconciliation is idempotent (existence checks) | throw → redelivered |
Handler effects
| Handler | Voucher produced | Notes |
|---|---|---|
handlePaymentSuccess | RECEIPT (1 DEBIT line) | Skips (INFO) when payment.attempt.finance.source.id absent; category falls back to 000_SALE |
handlePurchaseOrderReceived | PAYMENT (party VENDOR) | CREDIT cash legs; adds DEBIT INVENTORY asset leg when inventoryValue>0 and control account exists |
handleInventoryIssuedForSale | ADJUSTMENT (reason cogs) | DEBIT COGS / CREDIT INVENTORY by totalCostBasis; skips when <= 0 |
handleInventoryAdjusted | ADJUSTMENT (reason inventory_adjustment) | Single line: DEBIT if valueDelta>0, CREDIT if <0; skips when 0 |
handleMerchantCDC (op c/r) | none | Seeds default accounts + INVENTORY/COGS control accounts; marks onboarding step FINANCE_ACCOUNT |
2. Outbound — Kafka
None. The producer is initialized and bound (BindingKeys.APPLICATION_KAFKA_PRODUCER) but no code path publishes. Reserved for future finance-originated events.
3. Inbound — BullMQ
None. Finance does not run a queue.
4. Outbound — BullMQ
None.
5. WebSocket Emissions
ApplicationWebSocketComponent binds a WebSocketEmitter (finance-ws-emitter, Redis single/cluster) and registers FinanceSocketEventService (a thin BaseSocketEventService). No finance-specific topics or rooms are emitted in the current code — the plumbing is in place for future real-time account/voucher broadcasts.
6. Payload Schemas
Source of truth:
packages/core/src/common/kafka/types.ts.
// payment.success — TSalePaymentSuccess (finance reads payment.attempt.finance)
export type TSalePaymentSuccess = {
saleOrderId: string;
saleOrderNumber: string;
saleOrderStatus: string;
merchantId: string;
saleChannelId: string;
createdBy: string;
modifiedBy: string;
items: Array<{ id: string; itemType: string; itemId: string; quantity: number; mode: string; recipeId?: string }>;
payment: {
total: number; paid: number; currency: string; isFullyPaid: boolean; paidAt: string; sessionId?: string;
attempt?: {
uid: string; // idempotency key
amount: number; // this tender's amount
finance?: {
name: string; code: string;
source: { type: string; id: string }; // → FinanceAccount.id
category?: { name: string; code: string; source: { type: string; id: string } };
};
};
};
};
// purchase-order.received — TPurchaseOrderReceived
export type TPurchaseOrderReceived = {
merchantId: string;
purchaseOrderId: string;
receivedAt: string;
payments: Array<{ financeAccountId: string; amount: string; currency?: string; transactionDate?: string; exchangeRate?: string; note?: string | null }>;
items?: Array<{ inventoryItemId: string; inventoryLocationId: string; quantity: string; unitCost: string; uom: string; effectiveAt: string }>;
};
// inventory.issued-for-sale — TInventoryIssuedForSale
export type TInventoryIssuedForSale = {
saleOrderId: string;
saleOrderNumber: string;
merchantId: string;
attemptUid: string; // idempotency key
currency: string;
transactionDate: string;
totalCostBasis: number;
items: Array<{ inventoryItemId: string; inventoryLocationId: string; inventoryTrackingId: string; quantity: number; unitCost: number; costBasis: number; costingMethod: 'AVERAGE' | 'ZERO' }>;
};
// inventory.adjusted — TInventoryAdjusted
export type TInventoryAdjusted = {
merchantId: string;
inventoryStockId: string;
inventoryItemId: string;
inventoryLocationId: string;
inventoryTrackingId: string; // idempotency key
reason: string;
quantityBefore: string; quantityChange: string; quantityAfter: string;
oldAverageCost: number; newAverageCost: number;
valueDelta: number; // sign drives DEBIT/CREDIT
note?: string;
adjustedAt: string;
};
// cdc.public.Merchant — TDebeziumMessage<TMerchantPgRow> (payload.after = merchant row)7. Idempotency & Ordering
| Topic | Delivery | Ordering | Recovery |
|---|---|---|---|
payment.success | at-least-once | per-key (saleOrder) | dedup on (SALE_ORDER, attempt.uid) → existing voucher replayed |
purchase-order.received | at-least-once | per-key | dedup on (merchantId, PAYMENT, PURCHASE_ORDER, purchaseOrderId) |
inventory.issued-for-sale | at-least-once | per-key | dedup on (SALE_ORDER, attemptUid) |
inventory.adjusted | at-least-once | per-key | dedup on (INVENTORY_ADJUSTMENT, inventoryTrackingId) |
cdc.public.Merchant | at-least-once | per-key (merchant id) | reconciliation re-runs harmlessly (account existence checks + idempotent onboarding mark) |
On any handler exception the offset is not committed, so the message is redelivered. Because every posting path is idempotent, redelivery is safe.