Skip to content

API Events

Source: src/components/kafka-consumer/component.ts, src/common/queues.ts, src/services/invoice-issuance-queue.service.ts, src/common/websocket.ts.

1. Inbound — Kafka

TopicProducerHandlerIdempotency KeyFailure Mode
KafkaTopics.PAYMENT_SUCCESS@nx/saleInvoiceWorkerService.handlePaymentSuccess(sourceType, sourceId) partial-unique invoiceskip if not COMPLETED / active invoice exists; log + return
CDCKafkaTopics.MERCHANT (Debezium)@nx/commerce (CDC)InvoiceWorkerService.handleMerchantCDC(principalType, principalId) on TaxInfoop c/r reconcile, u diff-and-skip, d ignored

Consumer: autocommit: true, fallbackMode: 'earliest', default group SVC-00150-INVOICE_CONSUMER_GROUP. Single consumer dispatches by message.topic switch.

2. Outbound — Kafka

None. The service does not produce Kafka topics. Downstream notification is via WebSocket and provider/webhook callbacks only.

3. Inbound — BullMQ

3 partitions per queue type; partition chosen by getPartitionByKey(orderId) (Java hashCode mod 3 — deterministic per order).

Queue typeJobProducerHandlerConcurrency
issuance (@nx/invoice/queue/issuance-{01,02,03})invoice-issuance (jobType 000_ISSUANCE)worker / cron / retry / claim-expiryInvoiceIssuanceQueueService.processIssuanceJobissueOrderAPP_ENV_INVOICE_ISSUANCE_WORKER_CONCURRENCY (default 10)
issuanceinvoice-issuance (jobType 100_ADJUSTMENT)adjustment flowprocessIssuanceJob_processAdjustmentJob(same worker)
claim-expiry (@nx/invoice/queue/claim-expiry-{01,02,03})claim-expirypayment success (BUYER_SELF_SERVICE)processClaimExpiryJobdefault 3

Job IDs (idempotency):

JobjobId
IssuanceorderId
Retry${orderId}:retry:${n}
Adjustmentadjust:${invoiceId} (removeOnComplete/Fail: true)
Claim expiryexpiry:${orderId} (delayed to deadline)

4. Outbound — BullMQ

Self-enqueued by the queue service (no external BullMQ consumer).

TriggerJobTarget
REAL_TIME payment / cron / claim resolveenqueueIssuanceissuance partition
Transient issuance failureenqueueIssuance (delay [5,15,60]min)issuance partition
Adjustment requestedenqueueAdjustmentissuance partition
BUYER_SELF_SERVICE paymentenqueueClaimExpiry (delay = deadline − now)claim-expiry partition

5. WebSocket Emissions

TopicRoomsTriggerPayload
observation/invoice/invoicemerchants/{merchantId}, merchants/{merchantId}/invoices, sale-orders/{saleOrderId}/invoicesissuance status change→ §6

Emitted by InvoiceSocketEventService.notifyInvoiceUpdate (best-effort, Promise.allSettled; no-op if emitter not ready).

6. Payload Schemas

ts
// Inbound: KafkaTopics.PAYMENT_SUCCESS (TSalePaymentSuccess, from @nx/core)
interface TSalePaymentSuccess {
  saleOrderId: string;
  saleOrderNumber: string;
  merchantId: string;
  saleChannelId: string;
  createdBy: string;
  // ...additional sale fields
}

// Inbound: CDCKafkaTopics.MERCHANT (Debezium envelope)
interface TDebeziumMessage<TMerchantPgRow> {
  payload: {
    op: 'c' | 'r' | 'u' | 'd';
    before: TMerchantPgRow | null;
    after: TMerchantPgRow | null; // metadata.tax read here
  };
}

// BullMQ issuance job (src/common/queues.ts)
interface IInvoiceIssuanceJob {
  orderId: string;
  merchantId: string;
  triggeredBy: string;
  isRetry?: boolean;
  jobType?: '000_ISSUANCE' | '100_ADJUSTMENT';
  invoiceId?: string; // required for ADJUSTMENT
}

// BullMQ claim-expiry job
interface IClaimExpiryJob {
  orderId: string;
  merchantId: string;
}

// WebSocket: observation/invoice/invoice (IInvoiceSocketEventPayload)
interface IInvoiceSocketEventPayload {
  invoiceId: string;
  sourceId: string;
  sourceNumber: string;
  issuanceStatus: string;
  invoiceNumber: string | null;
  providerStatus: string | null;
  taxAuthorityStatus: number | null;
  eventSource: string;
}

7. Idempotency & Ordering

ChannelDeliveryOrderingRecovery
PAYMENT_SUCCESSat-least-onceper Kafka partitionactive-invoice guard makes re-delivery a no-op
Merchant CDCat-least-onceper Kafka partitionop=u diffs vs persisted TaxInfo → unchanged skipped
Issuance jobper jobId = orderIdper BullMQ partition (stable per order)retry with backoff; DLQ → FAILED + audit
Claim-expiry jobper jobId = expiry:${orderId}delayedre-fire safe (EXPIRED short-circuits)

Proprietary and Confidential. Unauthorized copying, distribution, or use of this software is strictly prohibited.