Generation Pipeline
1. Overview
| Property | Value |
|---|---|
| Status | Stable (S1a real data; S2a–S2e fixture) |
| Owner | ledger-team |
| Depends on | Kafka ledger.generate, @nx/asset (S3), Typst, ExcelJS |
The generation pipeline is a Kafka-driven async workflow that transforms business data into encrypted PDF and XLSX documents stored in S3.
HTTP Request
→ LedgerQueueService (enqueue / idempotency check)
→ Kafka topic: ledger.generate
→ LedgerWorkerService (fetch → validate → generate → encrypt → upload)
→ MetaLinks + Job COMPLETED (LedgerJob.status only — never Ledger.status)2. Pipeline Architecture
3. Enqueue Phase
3.1. Single Generation
LedgerQueueService.handleEnqueueGeneration() — idempotent by { merchantId, type, period }.
| State of Existing Record | Behavior | Response action |
|---|---|---|
| Does not exist | Create Ledger (PENDING) + LedgerJob (PENDING), send to Kafka | created |
| Exists, job REJECTED | resetToPending() + re-enqueue to Kafka | retried |
| Exists, job PENDING / PROCESSING / COMPLETED | Log skip, return existing record | skipped |
3.2. Batch Generation
LedgerQueueService.handleEnqueueBatchGeneration() — expands year + periodType + types into individual tasks and runs them with executePromiseWithLimit(limit: 6).
Period expansion (_buildPeriodValues):
periodType | Year is current year | Year is past year |
|---|---|---|
| YEARLY | No expansion (single task per type) | No expansion |
| QUARTERLY | Q1 to last completed quarter | Q1–Q4 |
| MONTHLY | M1 to last completed month | M1–M12 |
Example: In April 2026, a quarterly batch for 2026 generates only Q1. A quarterly batch for 2025 generates Q1–Q4.
Response shape:
{
total: number;
created: number;
skipped: number;
retried: number;
failed: number; // individual task errors are caught and counted, not thrown
}3.3. Kafka Message Payload
Topic: ledger.generate
{
ledgerId: string; // Snowflake ID — used to load full ledger from DB
enqueueTime: string; // ISO 8601 timestamp
}Message key is ledgerId (string) — ensures consistent partition routing per ledger.
4. Processing Phase
4.1. Worker Service
LedgerWorkerService.handleGeneration(ledgerId):
Pre-emption guard: Between upload and finalize, setCompletedIfProcessing uses an atomic UPDATE … WHERE status=PROCESSING to ensure only one worker finalizes — relevant when a stalled job is re-enqueued by RecoveryComponent while the original worker completes slowly.
4.2. S3 Object Path
ledgers/{period}/{merchantId}/{version}/{ledgerType}.pdf
ledgers/{period}/{merchantId}/{version}/{ledgerType}.xlsxExample: ledgers/2026-Q1/merchant_abc/1/S1a-HKD.pdf
period— formatted string:2026-Q1,2026-M3,2026-Yversion— integer fromLedger.versioncolumnledgerType— e.g.S1a-HKD- Files are stored with
mimetype: text/plain(encrypted binary stored as generic blob)
4.3. Error Handling
On any exception during steps 4–9:
MetaLinkService.cleanupOrphanedLinks(ledgerId)— hard-deletes any partially-created MetaLinksLedgerJobService.setRejected(jobId, error.message)— records failure reason- Error is rethrown — consumer catches it in
onMessageError(logs only), thenmessage.commit()is called in theonMessagefinally path
The Kafka message is always committed — no automatic replay. Re-processing requires an explicit retry via POST /ledgers/{id}/retry.
5. Download Phase
GET /ledgers/{id}/download/{format}?disposition=attachment|inline — format is pdf or xlsx.
Filename format: {ledgerType}_{period}_v{version}.{format} — e.g. S1a-HKD_2026-Q1_v1.pdf
6. Recovery Mechanism
RecoveryComponent activates only in worker role. It runs an initial sweep during application startup (binding() phase), then continues on a configurable interval.
Stall detection uses a configurable cutoff (APP_ENV_STALL_THRESHOLD_MS, default 180 s):
- Only PROCESSING jobs whose
processStartAtis older than the cutoff are re-enqueued. - Jobs currently being processed (within the threshold) are left alone.
- Pure PENDING jobs are never touched by
RecoveryComponent.
Sweep interval (APP_ENV_SWEEP_INTERVAL_MS, default 300 s): the sweep is not just a startup action — it runs continuously, catching jobs that stall after startup (e.g., worker crash mid-pipeline). Errors in a sweep are logged but do not stop the interval.
RecoveryComponentmounts beforeKafkaConsumerComponent— recovery messages land in Kafka before consumers begin polling, preventing re-enqueued jobs from sitting in the queue until the next sweep.
7. Encryption
7.1. Algorithm
| Property | Value |
|---|---|
| Algorithm | AES-256-GCM (from ignis-helpers AES class) |
| Key source | APP_ENV_LEDGER_ENCRYPTION_KEY — loaded at service construction; throws if missing |
| Input encoding | base64 (buffer → base64 string before encrypt) |
| Output encoding | base64 (ciphertext decoded back to Buffer) |
7.2. Encrypt/Decrypt Flow
encrypt:
Buffer → .toString('base64') → AES.encrypt(base64, key) → Buffer.from(ciphertext, 'base64')
decrypt:
Buffer → .toString('base64') → AES.decrypt(base64, key) → Buffer.from(plaintext, 'base64')The ignis-helpers AES implementation handles IV generation and embedding internally.
8. Retry Behavior
| Scenario | Mechanism | Re-enqueue? |
|---|---|---|
Manual retry via POST /ledgers/{id}/retry | handleRetry() → resetToPending() → Kafka send | Yes |
Batch retry via POST /ledgers/retry/batch | Same as above, for each REJECTED in batch | Yes |
| Crash / stall during processing | RecoveryComponent periodic sweep — jobs older than APP_ENV_STALL_THRESHOLD_MS re-enqueued | Yes — on next sweep tick |
Job timeout (APP_ENV_JOB_TIMEOUT_MS exceeded) | _jobTimeout rejects → catch → cleanupOrphanedLinks + setRejected | No — requires manual retry |
| Parse validation failure | setRejected() with Zod error message | No — requires manual retry |
| Duplicate enqueue (same type/period) | Returns existing without Kafka send | N/A |
| Pre-empted completion (concurrent workers) | setCompletedIfProcessing returns false — finalize skipped | N/A |
attemptCount increments on each setProcessing() call and is not reset on retry — it tracks total lifetime attempts.
9. Related Pages
- API Events — Kafka/WS payload reference
- HKD Templates — PDF/XLSX rendering detail
- Architecture — Runtime Scenarios
- Configuration
- REST endpoints — live OpenAPI at
/v1/api/ledger/doc/openapi.json