Skip to content

Generation Pipeline

1. Overview

PropertyValue
StatusStable (S1a real data; S2a–S2e fixture)
Ownerledger-team
Depends onKafka ledger.generate, @nx/asset (S3), Typst, ExcelJS

The generation pipeline is a Kafka-driven async workflow that transforms business data into encrypted PDF and XLSX documents stored in S3.

HTTP Request
    → LedgerQueueService (enqueue / idempotency check)
    → Kafka topic: ledger.generate
    → LedgerWorkerService (fetch → validate → generate → encrypt → upload)
    → MetaLinks + Job COMPLETED  (LedgerJob.status only — never Ledger.status)

2. Pipeline Architecture

3. Enqueue Phase

3.1. Single Generation

LedgerQueueService.handleEnqueueGeneration() — idempotent by { merchantId, type, period }.

State of Existing RecordBehaviorResponse action
Does not existCreate Ledger (PENDING) + LedgerJob (PENDING), send to Kafkacreated
Exists, job REJECTEDresetToPending() + re-enqueue to Kafkaretried
Exists, job PENDING / PROCESSING / COMPLETEDLog skip, return existing recordskipped

3.2. Batch Generation

LedgerQueueService.handleEnqueueBatchGeneration() — expands year + periodType + types into individual tasks and runs them with executePromiseWithLimit(limit: 6).

Period expansion (_buildPeriodValues):

periodTypeYear is current yearYear is past year
YEARLYNo expansion (single task per type)No expansion
QUARTERLYQ1 to last completed quarterQ1–Q4
MONTHLYM1 to last completed monthM1–M12

Example: In April 2026, a quarterly batch for 2026 generates only Q1. A quarterly batch for 2025 generates Q1–Q4.

Response shape:

typescript
{
  total: number;
  created: number;
  skipped: number;
  retried: number;
  failed: number;  // individual task errors are caught and counted, not thrown
}

3.3. Kafka Message Payload

Topic: ledger.generate

typescript
{
  ledgerId: string;      // Snowflake ID — used to load full ledger from DB
  enqueueTime: string;   // ISO 8601 timestamp
}

Message key is ledgerId (string) — ensures consistent partition routing per ledger.

4. Processing Phase

4.1. Worker Service

LedgerWorkerService.handleGeneration(ledgerId):

Pre-emption guard: Between upload and finalize, setCompletedIfProcessing uses an atomic UPDATE … WHERE status=PROCESSING to ensure only one worker finalizes — relevant when a stalled job is re-enqueued by RecoveryComponent while the original worker completes slowly.

4.2. S3 Object Path

ledgers/{period}/{merchantId}/{version}/{ledgerType}.pdf
ledgers/{period}/{merchantId}/{version}/{ledgerType}.xlsx

Example: ledgers/2026-Q1/merchant_abc/1/S1a-HKD.pdf

  • period — formatted string: 2026-Q1, 2026-M3, 2026-Y
  • version — integer from Ledger.version column
  • ledgerType — e.g. S1a-HKD
  • Files are stored with mimetype: text/plain (encrypted binary stored as generic blob)

4.3. Error Handling

On any exception during steps 4–9:

  1. MetaLinkService.cleanupOrphanedLinks(ledgerId) — hard-deletes any partially-created MetaLinks
  2. LedgerJobService.setRejected(jobId, error.message) — records failure reason
  3. Error is rethrown — consumer catches it in onMessageError (logs only), then message.commit() is called in the onMessage finally path

The Kafka message is always committed — no automatic replay. Re-processing requires an explicit retry via POST /ledgers/{id}/retry.

5. Download Phase

GET /ledgers/{id}/download/{format}?disposition=attachment|inlineformat is pdf or xlsx.

Filename format: {ledgerType}_{period}_v{version}.{format} — e.g. S1a-HKD_2026-Q1_v1.pdf

6. Recovery Mechanism

RecoveryComponent activates only in worker role. It runs an initial sweep during application startup (binding() phase), then continues on a configurable interval.

Stall detection uses a configurable cutoff (APP_ENV_STALL_THRESHOLD_MS, default 180 s):

  • Only PROCESSING jobs whose processStartAt is older than the cutoff are re-enqueued.
  • Jobs currently being processed (within the threshold) are left alone.
  • Pure PENDING jobs are never touched by RecoveryComponent.

Sweep interval (APP_ENV_SWEEP_INTERVAL_MS, default 300 s): the sweep is not just a startup action — it runs continuously, catching jobs that stall after startup (e.g., worker crash mid-pipeline). Errors in a sweep are logged but do not stop the interval.

RecoveryComponent mounts before KafkaConsumerComponent — recovery messages land in Kafka before consumers begin polling, preventing re-enqueued jobs from sitting in the queue until the next sweep.

7. Encryption

7.1. Algorithm

PropertyValue
AlgorithmAES-256-GCM (from ignis-helpers AES class)
Key sourceAPP_ENV_LEDGER_ENCRYPTION_KEY — loaded at service construction; throws if missing
Input encodingbase64 (buffer → base64 string before encrypt)
Output encodingbase64 (ciphertext decoded back to Buffer)

7.2. Encrypt/Decrypt Flow

encrypt:
  Buffer → .toString('base64') → AES.encrypt(base64, key) → Buffer.from(ciphertext, 'base64')

decrypt:
  Buffer → .toString('base64') → AES.decrypt(base64, key) → Buffer.from(plaintext, 'base64')

The ignis-helpers AES implementation handles IV generation and embedding internally.

8. Retry Behavior

ScenarioMechanismRe-enqueue?
Manual retry via POST /ledgers/{id}/retryhandleRetry()resetToPending() → Kafka sendYes
Batch retry via POST /ledgers/retry/batchSame as above, for each REJECTED in batchYes
Crash / stall during processingRecoveryComponent periodic sweep — jobs older than APP_ENV_STALL_THRESHOLD_MS re-enqueuedYes — on next sweep tick
Job timeout (APP_ENV_JOB_TIMEOUT_MS exceeded)_jobTimeout rejects → catch → cleanupOrphanedLinks + setRejectedNo — requires manual retry
Parse validation failuresetRejected() with Zod error messageNo — requires manual retry
Duplicate enqueue (same type/period)Returns existing without Kafka sendN/A
Pre-empted completion (concurrent workers)setCompletedIfProcessing returns false — finalize skippedN/A

attemptCount increments on each setProcessing() call and is not reset on retry — it tracks total lifetime attempts.

Proprietary and Confidential. Unauthorized copying, distribution, or use of this software is strictly prohibited.