API Events
Source:
src/components/kafka-consumer/component.ts,src/common/queues.ts,src/services/invoice-issuance-queue.service.ts,src/common/websocket.ts.
1. Inbound — Kafka
| Topic | Producer | Handler | Idempotency Key | Failure Mode |
|---|---|---|---|---|
KafkaTopics.PAYMENT_SUCCESS | @nx/sale | InvoiceWorkerService.handlePaymentSuccess | (sourceType, sourceId) partial-unique invoice | skip if not COMPLETED / active invoice exists; log + return |
CDCKafkaTopics.MERCHANT (Debezium) | @nx/commerce (CDC) | InvoiceWorkerService.handleMerchantCDC | (principalType, principalId) on TaxInfo | op c/r reconcile, u diff-and-skip, d ignored |
Consumer:
autocommit: true,fallbackMode: 'earliest', default groupSVC-00150-INVOICE_CONSUMER_GROUP. Single consumer dispatches bymessage.topicswitch.
2. Outbound — Kafka
None. The service does not produce Kafka topics. Downstream notification is via WebSocket and provider/webhook callbacks only.
3. Inbound — BullMQ
3 partitions per queue type; partition chosen by
getPartitionByKey(orderId)(JavahashCodemod 3 — deterministic per order).
| Queue type | Job | Producer | Handler | Concurrency |
|---|---|---|---|---|
issuance (@nx/invoice/queue/issuance-{01,02,03}) | invoice-issuance (jobType 000_ISSUANCE) | worker / cron / retry / claim-expiry | InvoiceIssuanceQueueService.processIssuanceJob → issueOrder | APP_ENV_INVOICE_ISSUANCE_WORKER_CONCURRENCY (default 10) |
issuance | invoice-issuance (jobType 100_ADJUSTMENT) | adjustment flow | processIssuanceJob → _processAdjustmentJob | (same worker) |
claim-expiry (@nx/invoice/queue/claim-expiry-{01,02,03}) | claim-expiry | payment success (BUYER_SELF_SERVICE) | processClaimExpiryJob | default 3 |
Job IDs (idempotency):
| Job | jobId |
|---|---|
| Issuance | orderId |
| Retry | ${orderId}:retry:${n} |
| Adjustment | adjust:${invoiceId} (removeOnComplete/Fail: true) |
| Claim expiry | expiry:${orderId} (delayed to deadline) |
4. Outbound — BullMQ
Self-enqueued by the queue service (no external BullMQ consumer).
| Trigger | Job | Target |
|---|---|---|
| REAL_TIME payment / cron / claim resolve | enqueueIssuance | issuance partition |
| Transient issuance failure | enqueueIssuance (delay [5,15,60]min) | issuance partition |
| Adjustment requested | enqueueAdjustment | issuance partition |
| BUYER_SELF_SERVICE payment | enqueueClaimExpiry (delay = deadline − now) | claim-expiry partition |
5. WebSocket Emissions
| Topic | Rooms | Trigger | Payload |
|---|---|---|---|
observation/invoice/invoice | merchants/{merchantId}, merchants/{merchantId}/invoices, sale-orders/{saleOrderId}/invoices | issuance status change | → §6 |
Emitted by
InvoiceSocketEventService.notifyInvoiceUpdate(best-effort,Promise.allSettled; no-op if emitter not ready).
6. Payload Schemas
// Inbound: KafkaTopics.PAYMENT_SUCCESS (TSalePaymentSuccess, from @nx/core)
interface TSalePaymentSuccess {
saleOrderId: string;
saleOrderNumber: string;
merchantId: string;
saleChannelId: string;
createdBy: string;
// ...additional sale fields
}
// Inbound: CDCKafkaTopics.MERCHANT (Debezium envelope)
interface TDebeziumMessage<TMerchantPgRow> {
payload: {
op: 'c' | 'r' | 'u' | 'd';
before: TMerchantPgRow | null;
after: TMerchantPgRow | null; // metadata.tax read here
};
}
// BullMQ issuance job (src/common/queues.ts)
interface IInvoiceIssuanceJob {
orderId: string;
merchantId: string;
triggeredBy: string;
isRetry?: boolean;
jobType?: '000_ISSUANCE' | '100_ADJUSTMENT';
invoiceId?: string; // required for ADJUSTMENT
}
// BullMQ claim-expiry job
interface IClaimExpiryJob {
orderId: string;
merchantId: string;
}
// WebSocket: observation/invoice/invoice (IInvoiceSocketEventPayload)
interface IInvoiceSocketEventPayload {
invoiceId: string;
sourceId: string;
sourceNumber: string;
issuanceStatus: string;
invoiceNumber: string | null;
providerStatus: string | null;
taxAuthorityStatus: number | null;
eventSource: string;
}7. Idempotency & Ordering
| Channel | Delivery | Ordering | Recovery |
|---|---|---|---|
PAYMENT_SUCCESS | at-least-once | per Kafka partition | active-invoice guard makes re-delivery a no-op |
Merchant CDC | at-least-once | per Kafka partition | op=u diffs vs persisted TaxInfo → unchanged skipped |
| Issuance job | per jobId = orderId | per BullMQ partition (stable per order) | retry with backoff; DLQ → FAILED + audit |
| Claim-expiry job | per jobId = expiry:${orderId} | delayed | re-fire safe (EXPIRED short-circuits) |