Pipeline Tạo báo cáo
1. Tổng quan
Pipeline tạo báo cáo là một workflow bất đồng bộ điều khiển bằng Kafka chuyển đổi dữ liệu nghiệp vụ thành tài liệu PDF và XLSX đã mã hóa được lưu trữ trong S3.
HTTP Request
→ LedgerQueueService (enqueue / kiểm tra idempotency)
→ Kafka topic: ledger.generate
→ LedgerWorkerService (fetch → validate → generate → encrypt → upload)
→ MetaLinks + Job COMPLETED2. Kiến trúc Pipeline
3. Pha Enqueue
3.1. Tạo đơn
LedgerQueueService.handleEnqueueGeneration() — idempotent theo { merchantId, type, period }.
| Trạng thái Record Tồn tại | Hành vi | action Response |
|---|---|---|
| Chưa tồn tại | Tạo Ledger (PENDING) + LedgerJob (PENDING), gửi đến Kafka | created |
| Tồn tại, job REJECTED | resetToPending() + re-enqueue đến Kafka | retried |
| Tồn tại, job PENDING / PROCESSING / COMPLETED | Log skip, trả về record có sẵn | skipped |
3.2. Tạo Batch
LedgerQueueService.handleEnqueueBatchGeneration() — mở rộng year + periodType + types thành các task riêng biệt và chạy chúng với executePromiseWithLimit(limit: 6).
Mở rộng period (_buildPeriodValues):
periodType | Năm là năm hiện tại | Năm là năm trước |
|---|---|---|
| YEARLY | Không mở rộng (một task trên mỗi type) | Không mở rộng |
| QUARTERLY | Q1 đến quý hoàn tất cuối cùng | Q1–Q4 |
| MONTHLY | M1 đến tháng hoàn tất cuối cùng | M1–M12 |
Ví dụ: Vào tháng 4 năm 2026, một batch quarterly cho 2026 chỉ tạo Q1. Một batch quarterly cho 2025 tạo Q1–Q4.
Hình dạng Response:
{
total: number;
created: number;
skipped: number;
retried: number;
failed: number; // individual task errors are caught and counted, not thrown
}3.3. Payload Message Kafka
Topic: ledger.generate
{
ledgerId: string; // Snowflake ID — used to load full ledger from DB
enqueueTime: string; // ISO 8601 timestamp
}Message key là ledgerId (string) — đảm bảo định tuyến partition nhất quán cho mỗi ledger.
4. Pha Xử lý
4.1. Worker Service
LedgerWorkerService.handleGeneration(ledgerId):
Bảo vệ chiếm trước: Giữa upload và hoàn tất, setCompletedIfProcessing dùng UPDATE … WHERE status=PROCESSING nguyên tử để đảm bảo chỉ một worker hoàn tất — liên quan khi RecoveryComponent re-enqueue một job treo trong khi worker gốc hoàn thành chậm.
4.2. S3 Object Path
ledgers/{period}/{merchantId}/{version}/{ledgerType}.pdf
ledgers/{period}/{merchantId}/{version}/{ledgerType}.xlsxVí dụ: ledgers/2026-Q1/merchant_abc/1/S1a-HKD.pdf
period— chuỗi định dạng:2026-Q1,2026-M3,2026-Yversion— integer từ cộtLedger.versionledgerType— vd:S1a-HKD- Files được lưu với
mimetype: text/plain(binary đã mã hóa lưu dưới dạng blob chung)
4.3. Xử lý Lỗi
Khi có exception trong các bước 3–9:
MetaLinkService.cleanupOrphanedLinks(ledgerId)— hard-delete bất kỳ MetaLink nào được tạo một phầnLedgerJobService.setRejected(jobId, error.message)— ghi lại lý do thất bại- Lỗi được rethrow — consumer catch nó trong
onMessageError(chỉ log), sau đómessage.commit()được gọi trong đườngfinallycủaonMessage
Message Kafka luôn được commit — không có replay tự động. Việc xử lý lại yêu cầu retry rõ ràng qua POST /operations/{id}/retry.
5. Pha Tải về
GET /operations/{id}/download?format=pdf|xlsx&disposition=attachment|inline
Định dạng tên file: {ledgerType}_{period}_v{version}.{format} — vd S1a-HKD_2026-Q1_v1.pdf
6. Cơ chế Recovery
RecoveryComponent chỉ hoạt động trong role worker. Nó chạy sweep ban đầu trong quá trình khởi động application (pha binding()), sau đó tiếp tục theo interval có thể cấu hình.
Phát hiện job treo dùng cutoff có thể cấu hình (APP_ENV_STALL_THRESHOLD_MS, mặc định 180 giây):
- Chỉ job PROCESSING có
processStartAtcũ hơn cutoff mới được re-enqueue. - Job đang được xử lý (trong threshold) được giữ nguyên.
- Job PENDING thuần không bao giờ bị
RecoveryComponentchạm vào.
Sweep interval (APP_ENV_SWEEP_INTERVAL_MS, mặc định 300 giây): sweep không chỉ là hành động lúc khởi động — nó chạy liên tục, bắt các job bị treo sau khởi động (vd: worker crash giữa pipeline). Lỗi trong sweep được log nhưng không dừng interval.
RecoveryComponentmount trướcKafkaConsumerComponent— message recovery nằm trong Kafka trước khi consumer bắt đầu poll, ngăn job được re-enqueue phải đợi đến lần sweep tiếp theo.
7. Mã hóa
7.1. Thuật toán
| Thuộc tính | Giá trị |
|---|---|
| Thuật toán | AES-256-GCM (từ class AES của ignis-helpers) |
| Nguồn khóa | APP_ENV_LEDGER_ENCRYPTION_KEY — load tại construction service; throws nếu thiếu |
| Encoding đầu vào | base64 (buffer → chuỗi base64 trước khi mã hóa) |
| Encoding đầu ra | base64 (ciphertext decode lại thành Buffer) |
7.2. Luồng Encrypt/Decrypt
encrypt:
Buffer → .toString('base64') → AES.encrypt(base64, key) → Buffer.from(ciphertext, 'base64')
decrypt:
Buffer → .toString('base64') → AES.decrypt(base64, key) → Buffer.from(plaintext, 'base64')Implementation AES của ignis-helpers xử lý sinh và embed IV bên trong.
8. Hành vi Retry
| Tình huống | Cơ chế | Re-enqueue? |
|---|---|---|
Retry thủ công qua POST /operations/{id}/retry | handleRetry() → resetToPending() → Kafka send | Có |
Retry batch qua POST /ledgers/retry/batch | Như trên, cho mỗi REJECTED trong batch | Có |
| Crash / treo trong khi xử lý | Sweep định kỳ RecoveryComponent — job cũ hơn APP_ENV_STALL_THRESHOLD_MS được re-enqueue | Có — ở lần sweep tiếp theo |
Timeout job (vượt quá APP_ENV_JOB_TIMEOUT_MS) | _jobTimeout reject → catch → cleanupOrphanedLinks + setRejected | Không — yêu cầu retry thủ công |
| Thất bại validation parse | setRejected() với thông báo lỗi Zod | Không — yêu cầu retry thủ công |
| Enqueue trùng (cùng type/period) | Trả về cái có sẵn mà không gửi Kafka | N/A |
| Hoàn thành bị chiếm trước (worker đồng thời) | setCompletedIfProcessing trả về false — bỏ qua hoàn tất | N/A |
attemptCount tăng trên mỗi lần gọi setProcessing() và không reset khi retry — nó theo dõi tổng số lần thử trong vòng đời.
9. Trang liên quan
- API Events — tham chiếu payload Kafka/WS
- HKD Templates — chi tiết render PDF/XLSX
- Architecture — Kịch bản runtime
- Configuration
- REST endpoints — OpenAPI trực tiếp tại
/v1/api/ledger/doc/openapi.json