Skip to content

Pipeline Tạo báo cáo

1. Tổng quan

Pipeline tạo báo cáo là một workflow bất đồng bộ điều khiển bằng Kafka chuyển đổi dữ liệu nghiệp vụ thành tài liệu PDF và XLSX đã mã hóa được lưu trữ trong S3.

HTTP Request
    → LedgerQueueService (enqueue / kiểm tra idempotency)
    → Kafka topic: ledger.generate
    → LedgerWorkerService (fetch → validate → generate → encrypt → upload)
    → MetaLinks + Job COMPLETED

2. Kiến trúc Pipeline

3. Pha Enqueue

3.1. Tạo đơn

LedgerQueueService.handleEnqueueGeneration() — idempotent theo { merchantId, type, period }.

Trạng thái Record Tồn tạiHành viaction Response
Chưa tồn tạiTạo Ledger (PENDING) + LedgerJob (PENDING), gửi đến Kafkacreated
Tồn tại, job REJECTEDresetToPending() + re-enqueue đến Kafkaretried
Tồn tại, job PENDING / PROCESSING / COMPLETEDLog skip, trả về record có sẵnskipped

3.2. Tạo Batch

LedgerQueueService.handleEnqueueBatchGeneration() — mở rộng year + periodType + types thành các task riêng biệt và chạy chúng với executePromiseWithLimit(limit: 6).

Mở rộng period (_buildPeriodValues):

periodTypeNăm là năm hiện tạiNăm là năm trước
YEARLYKhông mở rộng (một task trên mỗi type)Không mở rộng
QUARTERLYQ1 đến quý hoàn tất cuối cùngQ1–Q4
MONTHLYM1 đến tháng hoàn tất cuối cùngM1–M12

Ví dụ: Vào tháng 4 năm 2026, một batch quarterly cho 2026 chỉ tạo Q1. Một batch quarterly cho 2025 tạo Q1–Q4.

Hình dạng Response:

typescript
{
  total: number;
  created: number;
  skipped: number;
  retried: number;
  failed: number;  // individual task errors are caught and counted, not thrown
}

3.3. Payload Message Kafka

Topic: ledger.generate

typescript
{
  ledgerId: string;      // Snowflake ID — used to load full ledger from DB
  enqueueTime: string;   // ISO 8601 timestamp
}

Message key là ledgerId (string) — đảm bảo định tuyến partition nhất quán cho mỗi ledger.

4. Pha Xử lý

4.1. Worker Service

LedgerWorkerService.handleGeneration(ledgerId):

Bảo vệ chiếm trước: Giữa upload và hoàn tất, setCompletedIfProcessing dùng UPDATE … WHERE status=PROCESSING nguyên tử để đảm bảo chỉ một worker hoàn tất — liên quan khi RecoveryComponent re-enqueue một job treo trong khi worker gốc hoàn thành chậm.

4.2. S3 Object Path

ledgers/{period}/{merchantId}/{version}/{ledgerType}.pdf
ledgers/{period}/{merchantId}/{version}/{ledgerType}.xlsx

Ví dụ: ledgers/2026-Q1/merchant_abc/1/S1a-HKD.pdf

  • period — chuỗi định dạng: 2026-Q1, 2026-M3, 2026-Y
  • version — integer từ cột Ledger.version
  • ledgerType — vd: S1a-HKD
  • Files được lưu với mimetype: text/plain (binary đã mã hóa lưu dưới dạng blob chung)

4.3. Xử lý Lỗi

Khi có exception trong các bước 3–9:

  1. MetaLinkService.cleanupOrphanedLinks(ledgerId) — hard-delete bất kỳ MetaLink nào được tạo một phần
  2. LedgerJobService.setRejected(jobId, error.message) — ghi lại lý do thất bại
  3. Lỗi được rethrow — consumer catch nó trong onMessageError (chỉ log), sau đó message.commit() được gọi trong đường finally của onMessage

Message Kafka luôn được commit — không có replay tự động. Việc xử lý lại yêu cầu retry rõ ràng qua POST /operations/{id}/retry.

5. Pha Tải về

GET /operations/{id}/download?format=pdf|xlsx&disposition=attachment|inline

Định dạng tên file: {ledgerType}_{period}_v{version}.{format} — vd S1a-HKD_2026-Q1_v1.pdf

6. Cơ chế Recovery

RecoveryComponent chỉ hoạt động trong role worker. Nó chạy sweep ban đầu trong quá trình khởi động application (pha binding()), sau đó tiếp tục theo interval có thể cấu hình.

Phát hiện job treo dùng cutoff có thể cấu hình (APP_ENV_STALL_THRESHOLD_MS, mặc định 180 giây):

  • Chỉ job PROCESSING có processStartAt cũ hơn cutoff mới được re-enqueue.
  • Job đang được xử lý (trong threshold) được giữ nguyên.
  • Job PENDING thuần không bao giờ bị RecoveryComponent chạm vào.

Sweep interval (APP_ENV_SWEEP_INTERVAL_MS, mặc định 300 giây): sweep không chỉ là hành động lúc khởi động — nó chạy liên tục, bắt các job bị treo sau khởi động (vd: worker crash giữa pipeline). Lỗi trong sweep được log nhưng không dừng interval.

RecoveryComponent mount trước KafkaConsumerComponent — message recovery nằm trong Kafka trước khi consumer bắt đầu poll, ngăn job được re-enqueue phải đợi đến lần sweep tiếp theo.

7. Mã hóa

7.1. Thuật toán

Thuộc tínhGiá trị
Thuật toánAES-256-GCM (từ class AES của ignis-helpers)
Nguồn khóaAPP_ENV_LEDGER_ENCRYPTION_KEY — load tại construction service; throws nếu thiếu
Encoding đầu vàobase64 (buffer → chuỗi base64 trước khi mã hóa)
Encoding đầu rabase64 (ciphertext decode lại thành Buffer)

7.2. Luồng Encrypt/Decrypt

encrypt:
  Buffer → .toString('base64') → AES.encrypt(base64, key) → Buffer.from(ciphertext, 'base64')

decrypt:
  Buffer → .toString('base64') → AES.decrypt(base64, key) → Buffer.from(plaintext, 'base64')

Implementation AES của ignis-helpers xử lý sinh và embed IV bên trong.

8. Hành vi Retry

Tình huốngCơ chếRe-enqueue?
Retry thủ công qua POST /operations/{id}/retryhandleRetry()resetToPending() → Kafka send
Retry batch qua POST /ledgers/retry/batchNhư trên, cho mỗi REJECTED trong batch
Crash / treo trong khi xử lýSweep định kỳ RecoveryComponent — job cũ hơn APP_ENV_STALL_THRESHOLD_MS được re-enqueueCó — ở lần sweep tiếp theo
Timeout job (vượt quá APP_ENV_JOB_TIMEOUT_MS)_jobTimeout reject → catch → cleanupOrphanedLinks + setRejectedKhông — yêu cầu retry thủ công
Thất bại validation parsesetRejected() với thông báo lỗi ZodKhông — yêu cầu retry thủ công
Enqueue trùng (cùng type/period)Trả về cái có sẵn mà không gửi KafkaN/A
Hoàn thành bị chiếm trước (worker đồng thời)setCompletedIfProcessing trả về false — bỏ qua hoàn tấtN/A

attemptCount tăng trên mỗi lần gọi setProcessing()không reset khi retry — nó theo dõi tổng số lần thử trong vòng đời.

9. Trang liên quan

Proprietary and Confidential. Unauthorized copying, distribution, or use of this software is strictly prohibited.