API Events
Finance ưu tiên consumer. Kafka component đăng ký một producer idempotent (
acks=ALL, lz4) cho tương lai, nhưng hiện chưa publish gì. Không có BullMQ. Một WebSocket emitter được đăng ký (finance-ws-emitter) nhưng chưa nối domain topic nào.
1. Inbound — Kafka
Subscribe trong
src/components/kafka/component.ts(SUBSCRIBED_TOPICS). Consumerautocommit=false, commit theo từng message khi thành công, fallbacklatest. GroupSVC-00040-FINANCE_CONSUMER_GROUP.
| Topic | Constant | Producer | Handler | Idempotency Key | Failure Mode |
|---|---|---|---|---|---|
payment.success | KafkaTopics.PAYMENT_SUCCESS | @nx/sale | FinanceWorkerService.handlePaymentSuccess | (SALE_ORDER, sourceEventUid=attempt.uid) | throw → offset không commit → redeliver |
purchase-order.received | KafkaTopics.PURCHASE_ORDER_RECEIVED | @nx/inventory | handlePurchaseOrderReceived | (merchantId, PAYMENT, PURCHASE_ORDER, sourceId=purchaseOrderId) | throw → redeliver |
inventory.issued-for-sale | KafkaTopics.INVENTORY_ISSUED_FOR_SALE | @nx/inventory | handleInventoryIssuedForSale | (SALE_ORDER, sourceEventUid=attemptUid) | throw → redeliver |
inventory.adjusted | KafkaTopics.INVENTORY_ADJUSTED | @nx/inventory | handleInventoryAdjusted | (INVENTORY_ADJUSTMENT, sourceEventUid=inventoryTrackingId) | throw → redeliver |
cdc.public.Merchant | CDCKafkaTopics.MERCHANT | Debezium (commerce) | handleMerchantCDC | đối soát là idempotent (kiểm tra tồn tại) | throw → redeliver |
Tác động của handler
| Handler | Phiếu tạo ra | Ghi chú |
|---|---|---|
handlePaymentSuccess | RECEIPT (1 dòng DEBIT) | Skip (INFO) khi thiếu payment.attempt.finance.source.id; category fallback 000_SALE |
handlePurchaseOrderReceived | PAYMENT (party VENDOR) | Leg cash CREDIT; thêm leg tài sản INVENTORY DEBIT khi inventoryValue>0 và tài khoản kiểm soát tồn tại |
handleInventoryIssuedForSale | ADJUSTMENT (reason cogs) | DEBIT COGS / CREDIT INVENTORY theo totalCostBasis; skip khi <= 0 |
handleInventoryAdjusted | ADJUSTMENT (reason inventory_adjustment) | Một dòng: DEBIT nếu valueDelta>0, CREDIT nếu <0; skip khi 0 |
handleMerchantCDC (op c/r) | không | Seed tài khoản mặc định + tài khoản kiểm soát INVENTORY/COGS; đánh dấu bước onboarding FINANCE_ACCOUNT |
2. Outbound — Kafka
Không. Producer được khởi tạo và bind (BindingKeys.APPLICATION_KAFKA_PRODUCER) nhưng không có đường dẫn code nào publish. Dự trữ cho các sự kiện do finance khởi nguồn trong tương lai.
3. Inbound — BullMQ
Không. Finance không chạy queue.
4. Outbound — BullMQ
Không.
5. WebSocket Emissions
ApplicationWebSocketComponent bind một WebSocketEmitter (finance-ws-emitter, Redis single/cluster) và đăng ký FinanceSocketEventService (một BaseSocketEventService mỏng). Không có topic hay room riêng cho finance được emit trong code hiện tại — phần đường ống đã sẵn sàng cho broadcast account/voucher real-time tương lai.
6. Payload Schemas
Nguồn chân lý:
packages/core/src/common/kafka/types.ts.
// payment.success — TSalePaymentSuccess (finance đọc payment.attempt.finance)
export type TSalePaymentSuccess = {
saleOrderId: string;
saleOrderNumber: string;
saleOrderStatus: string;
merchantId: string;
saleChannelId: string;
createdBy: string;
modifiedBy: string;
items: Array<{ id: string; itemType: string; itemId: string; quantity: number; mode: string; recipeId?: string }>;
payment: {
total: number; paid: number; currency: string; isFullyPaid: boolean; paidAt: string; sessionId?: string;
attempt?: {
uid: string; // idempotency key
amount: number; // số tiền của tender này
finance?: {
name: string; code: string;
source: { type: string; id: string }; // → FinanceAccount.id
category?: { name: string; code: string; source: { type: string; id: string } };
};
};
};
};
// purchase-order.received — TPurchaseOrderReceived
export type TPurchaseOrderReceived = {
merchantId: string;
purchaseOrderId: string;
receivedAt: string;
payments: Array<{ financeAccountId: string; amount: string; currency?: string; transactionDate?: string; exchangeRate?: string; note?: string | null }>;
items?: Array<{ inventoryItemId: string; inventoryLocationId: string; quantity: string; unitCost: string; uom: string; effectiveAt: string }>;
};
// inventory.issued-for-sale — TInventoryIssuedForSale
export type TInventoryIssuedForSale = {
saleOrderId: string;
saleOrderNumber: string;
merchantId: string;
attemptUid: string; // idempotency key
currency: string;
transactionDate: string;
totalCostBasis: number;
items: Array<{ inventoryItemId: string; inventoryLocationId: string; inventoryTrackingId: string; quantity: number; unitCost: number; costBasis: number; costingMethod: 'AVERAGE' | 'ZERO' }>;
};
// inventory.adjusted — TInventoryAdjusted
export type TInventoryAdjusted = {
merchantId: string;
inventoryStockId: string;
inventoryItemId: string;
inventoryLocationId: string;
inventoryTrackingId: string; // idempotency key
reason: string;
quantityBefore: string; quantityChange: string; quantityAfter: string;
oldAverageCost: number; newAverageCost: number;
valueDelta: number; // dấu quyết định DEBIT/CREDIT
note?: string;
adjustedAt: string;
};
// cdc.public.Merchant — TDebeziumMessage<TMerchantPgRow> (payload.after = dòng merchant)7. Idempotency & Ordering
| Topic | Delivery | Ordering | Recovery |
|---|---|---|---|
payment.success | at-least-once | per-key (saleOrder) | dedup trên (SALE_ORDER, attempt.uid) → replay phiếu hiện có |
purchase-order.received | at-least-once | per-key | dedup trên (merchantId, PAYMENT, PURCHASE_ORDER, purchaseOrderId) |
inventory.issued-for-sale | at-least-once | per-key | dedup trên (SALE_ORDER, attemptUid) |
inventory.adjusted | at-least-once | per-key | dedup trên (INVENTORY_ADJUSTMENT, inventoryTrackingId) |
cdc.public.Merchant | at-least-once | per-key (merchant id) | đối soát chạy lại vô hại (kiểm tra tồn tại tài khoản + đánh dấu onboarding idempotent) |
Khi handler ném ngoại lệ thì offset không được commit, nên message được redeliver. Vì mọi đường dẫn hạch toán đều idempotent, redeliver là an toàn.