Điều phối Webhook
1. Tổng quan
Hệ thống webhook dispatch nhận sự kiện MQ-Pay qua
WebhookEventHandlerHelper(implementIPaymentEventHandler), truy vấnWebhookConfigRepositorytìm webhook đăng ký active khớp với loại sự kiện, và gửi request HTTP POST quaWebhookDispatcherServicevới cơ chế exponential backoff retry có cấu hình. Dispatch là fire-and-forget — không có dead-letter queue.
2. Luồng Sự kiện
3. Sự kiện Được hỗ trợ
Tất cả 9 sự kiện đều active trong MQ-Pay. Các sự kiện refund được định nghĩa nhưng bị comment out trong mã nguồn.
Sự kiện Active
| Sự kiện | Hằng số | Kiểu Payload | Webhook Dispatch | WebSocket | Khi nào |
|---|---|---|---|---|---|
mq-pay:transaction.created | TRANSACTION_CREATED | TTransactionEventPayload | Bỏ qua | Có | Transaction + attempt mới được tạo |
mq-pay:transaction.settled | TRANSACTION_SETTLED | TTransactionEventPayload | Có | Có | Transaction hoàn tất thanh toán |
mq-pay:transaction.cancelled | TRANSACTION_CANCELLED | TTransactionEventPayload | Có | Có | Tất cả attempt bị hủy trên transaction NEW |
mq-pay:attempt.created | ATTEMPT_CREATED | TAttemptEventPayload | Bỏ qua | Có | Payment attempt mới được tạo |
mq-pay:attempt.sent | ATTEMPT_SENT | TAttemptEventPayload | Bỏ qua | Có | Attempt gửi đến provider (VD: QR có sẵn) |
mq-pay:attempt.success | ATTEMPT_SUCCESS | TAttemptEventPayload | Có | Có | IPN xác nhận thanh toán thành công |
mq-pay:attempt.failed | ATTEMPT_FAILED | TAttemptEventPayload | Có | Có | Gọi nhà cung cấp thất bại |
mq-pay:attempt.expired | ATTEMPT_EXPIRED | TAttemptEventPayload | Có | Có | QR hết hạn, không nhận được thanh toán |
mq-pay:attempt.cancelled | ATTEMPT_CANCELLED | TAttemptEventPayload | Có | Có | Attempt bị hủy |
Sự kiện creation/sent (
TRANSACTION_CREATED,ATTEMPT_CREATED,ATTEMPT_SENT) chỉ gửi qua WebSocket — bỏ qua HTTP webhook dispatch để tránh nhiễu từ sự kiện lifecycle tần suất cao.
Bị Comment Out (Không Active)
| Sự kiện | Trạng thái |
|---|---|
mq-pay:refund.success | Bị comment out |
mq-pay:refund.failed | Bị comment out |
Kiểu Payload
// Trường cơ sở chung cho tất cả event payloads
interface IBaseEventFields {
timestamp: string; // Chuỗi ISO timestamp
source: string; // Định danh nguồn
}
// Sự kiện transaction — discriminated union với trường `event`
type TTransactionEventPayload =
| ITransactionCreatedPayload // { event, transaction, attempt }
| ITransactionSettledPayload // { event, transaction, attempt }
| ITransactionCancelledPayload // { event, transaction }
// Sự kiện attempt — discriminated union với trường `event`
type TAttemptEventPayload =
| IAttemptCreatedPayload // { event, transaction, attempt }
| IAttemptSentPayload // { event, transaction, attempt }
| IAttemptSuccessPayload // { event, transaction, attempt }
| IAttemptFailedPayload // { event, attempt } (không có transaction)
| IAttemptExpiredPayload // { event, attempt } (không có transaction)
| IAttemptCancelledPayload // { event, attempt } (không có transaction)
// Union tất cả sự kiện
type TMQPayEventPayload = TTransactionEventPayload | TAttemptEventPayload;NOTE
IAttemptFailedPayload, IAttemptExpiredPayload, và IAttemptCancelledPayload không bao gồm trường transaction. Chỉ payload creation/sent/success mới có cả transaction và attempt.
4. WebhookEventHandlerHelper
Nguồn: src/helpers/webhook-event-handler/helper.tsKế thừa: BaseHelperImplement: IPaymentEventHandler (từ @nx/mq-pay)
4.1. Khởi tạo
Không được quản lý bởi DI. Khởi tạo thủ công trong ApplicationPaymentComponent.setupMQPay():
const eventHandler = new WebhookEventHandlerHelper({
webhookConfigRepository, // WebhookConfigRepository từ DI
dispatcherService, // WebhookDispatcherService từ DI
socketEventService, // PaymentSocketEventService từ DI
});4.2. Interface IPaymentEventHandler
| Phương thức | Trả về | Mô tả |
|---|---|---|
handle(opts: { payload: TMQPayEventPayload }) | Promise<void> | Nhận sự kiện, dispatch webhooks + thông báo WS |
isReady() | boolean | Luôn trả về true |
getName() | string | Trả về 'WebhookEventHandlerHelper' |
4.3. Triển khai handle()
Phương thức handle() là bộ điều phối mỏng ủy quyền cho hai phương thức private:
async handle(opts: { payload: TMQPayEventPayload }): Promise<void> {
const { payload } = opts;
this.logger.for(this.handle.name).info('Received event | type: %s', payload.event);
await this._dispatchWebhooks({ payload });
this._notifySocket({ payload });
}4.4. _dispatchWebhooks() — Fan-out Webhook HTTP
private async _dispatchWebhooks(opts: { payload: TMQPayEventPayload }): Promise<void> {
const { payload } = opts;
const eventType = payload.event;
// Sự kiện creation/sent chỉ gửi WS — bỏ qua webhook dispatch
if (
eventType === MQPayEventTypes.TRANSACTION_CREATED ||
eventType === MQPayEventTypes.ATTEMPT_CREATED ||
eventType === MQPayEventTypes.ATTEMPT_SENT
) {
return;
}
// Truy vấn webhook active đăng ký loại sự kiện này
const webhooks = await this._webhookConfigRepository.find<TWebhookConfig>({
filter: {
where: {
status: WebhookConfigStatuses.ACTIVATED,
eventTypes: { contains: [eventType] },
} as TWhere<TWebhookConfig>,
},
});
if (webhooks.length === 0) {
logger.warn('No webhooks subscribed to event | type: %s', eventType);
return;
}
// Fan out đến mỗi webhook (fire-and-forget)
for (const webhookConfig of webhooks) {
this._dispatcherService.dispatch({ webhookConfig, eventType, payload });
}
}4.5. _notifySocket() — Broadcast WebSocket
private _notifySocket(opts: { payload: TMQPayEventPayload }): void {
const { payload } = opts;
const eventType = payload.event;
switch (eventType) {
case MQPayEventTypes.TRANSACTION_CREATED:
case MQPayEventTypes.TRANSACTION_SETTLED:
case MQPayEventTypes.TRANSACTION_CANCELLED: {
this._socketEventService.notifyTransactionUpdate({
payload: payload as TTransactionEventPayload,
});
return;
}
case MQPayEventTypes.ATTEMPT_CREATED:
case MQPayEventTypes.ATTEMPT_SENT:
case MQPayEventTypes.ATTEMPT_SUCCESS:
case MQPayEventTypes.ATTEMPT_FAILED:
case MQPayEventTypes.ATTEMPT_EXPIRED:
case MQPayEventTypes.ATTEMPT_CANCELLED: {
this._socketEventService.notifyAttemptUpdate({
payload: payload as TAttemptEventPayload,
});
return;
}
default: {
this.logger.for(this._notifySocket.name)
.warn('Unhandled event type for WS notification | type: %s', eventType);
}
}
}NOTE
Truy vấn webhook sử dụng eventTypes: { contains: [eventType] } dịch sang kiểm tra PostgreSQL array containment (@> operator). Một webhook với eventTypes: ['mq-pay:attempt.success', 'mq-pay:transaction.settled'] sẽ khớp cả hai loại sự kiện.
5. WebhookDispatcherService
Nguồn: packages/core/src/services/webhook-dispatcher.service.ts (do @nx/core cung cấp, không local trong payment) Kế thừa: BaseServicePhụ thuộc: Không (không có constructor injection — service độc lập) HTTP Client: axios
5.1. Phương thức dispatch()
dispatch<PayloadType>(opts: {
webhookConfig: TWebhookConfig;
eventType: string;
payload: PayloadType;
}): voidCác bước:
- Parse
webhookConfig.metadataquaWebhookConfigMetadataSchema.parse()để lấytimeoutMsvàmaxRetriesvới Zod defaults - Xây dựng body:
{ eventType, timestamp: Date.now(), payload } - Xây dựng headers:
Content-Type,X-Webhook-Timestamp,X-Webhook-Event-Type, cộng headers tùy chỉnh từwebhookConfig.headers - Gọi
sendWithRetry()từ attempt 0 (không chặn)
5.2. Logic Thử lại
Chiến lược: Exponential backoff với jitter, không chặn qua setTimeout.
Công thức backoff:
delay = 2^attempt * 1000 + random(0, 500)ms| Attempt | Khoảng Delay | Tích lũy |
|---|---|---|
| 0 (ban đầu) | Ngay lập tức | 0ms |
| 1 (thử lại 1) | 1,000 – 1,500ms | ~1.25s |
| 2 (thử lại 2) | 2,000 – 2,500ms | ~3.5s |
| 3 (thử lại 3) | 4,000 – 4,500ms | ~7.75s |
Xử lý thất bại:
private handleFailure(opts: { url, maxRetries, attempt, error, ... }): void {
const nextAttempt = attempt + 1;
// Hết retry → ghi log lỗi và dừng
if (nextAttempt > maxRetries) {
logger.error('Webhook failed | attempted: %d | url: %s | error: %s', nextAttempt, url, error);
return;
}
// Lên lịch retry (không chặn)
const delay = Math.pow(2, attempt) * 1000 + Math.random() * 500;
setTimeout(() => {
this.sendWithRetry({ ...opts, attempt: nextAttempt });
}, delay);
}Tiêu chí thành công: Mã trạng thái HTTP 200–299 (kiểm tra qua response.status >= 200 && response.status < 300).
Trích xuất thông báo lỗi:
| Điều kiện | Định dạng |
|---|---|
| Timeout kết nối | Timeout after {timeout}ms |
| HTTP error response | HTTP {status}: {statusText} |
| Lỗi mạng/khác | error.message |
WARNING
Webhook dispatch là fire-and-forget. Phương thức dispatch() trả về void ngay lập tức. Không có dead-letter queue, không có trạng thái retry persistent, và không có đảm bảo giao hàng. Các lần gửi thất bại sau max retries chỉ được ghi log. Nếu dịch vụ Payment khởi động lại, các retry đang chờ sẽ bị mất.
5.3. Định dạng Payload
{
"eventType": "mq-pay:attempt.success",
"timestamp": 1735689600000,
"payload": {
"attempt": {
"id": "123456789",
"type": "100_MAKE_PAYMENT",
"status": "302_SUCCESS",
"provider": "VNPAY_QR_MMS",
"amount": 150000
},
"transaction": {
"id": "987654321",
"uid": "TXN-2024-001",
"totalAmount": 150000,
"paidAmount": 150000,
"status": "304_SETTLED"
},
"timestamp": "2024-12-31T12:00:00.000Z",
"source": "mq-pay"
}
}5.4. Headers Request
| Header | Giá trị | Nguồn |
|---|---|---|
Content-Type | application/json | Hardcoded qua HTTP.HeaderValues.APPLICATION_JSON |
X-Webhook-Timestamp | String(Date.now()) | Tạo mỗi lần dispatch |
X-Webhook-Event-Type | Chuỗi loại sự kiện | Từ opts.eventType |
| (tùy chỉnh) | (thay đổi) | Merge từ webhookConfig.headers (JSONB) |
NOTE
Headers tùy chỉnh từ webhookConfig.headers được spread sau headers tiêu chuẩn, nên có thể ghi đè Content-Type nếu cần. Object headers có kiểu Record<string, string>.
6. Schema WebhookConfig (Chi tiết)
PostgreSQL schema: paymentTên bảng: WebhookConfigRepository: DefaultCRUDRepository (không có soft delete — DELETE là vĩnh viễn)
6.1. Cột
| Cột | DB Column | Kiểu Drizzle | Mặc định | Ràng buộc | Ghi chú |
|---|---|---|---|---|---|
id | id | text | Snowflake | PK | Tạo bởi generateIdColumnDefs |
name | name | text | — | NOT NULL | Tên hiển thị |
url | url | text | — | NOT NULL | URL endpoint đích |
eventTypes | event_types | text[] (array) | — | NOT NULL | Mảng chuỗi loại sự kiện MQ-Pay |
status | status | text | ACTIVATED | — | Enum TWebhookConfigStatus |
headers | headers | jsonb | {} | — | Record<string, string> — HTTP headers tùy chỉnh |
metadata | metadata | jsonb | {timeoutMs: 30000, maxRetries: 3} | — | Cấu hình dispatch |
createdAt | created_at | timestamp | Tự động | — | Từ generateTzColumnDefs |
modifiedAt | modified_at | timestamp | Tự động | — | Từ generateTzColumnDefs |
6.2. Indexes
| Tên Index | Cột | Loại |
|---|---|---|
IDX_WebhookConfig_status | status | B-tree |
6.3. Schema Metadata (Zod)
Định nghĩa trong WebhookConfigMetadataSchema:
const WebhookConfigMetadataSchema = z.object({
timeoutMs: z.number().int().min(1000).max(300_000).default(30_000),
maxRetries: z.number().int().min(0).max(10).default(3),
});| Trường | Kiểu | Mặc định | Min | Max | Sử dụng bởi |
|---|---|---|---|---|---|
timeoutMs | integer | 30,000 | 1,000 | 300,000 | Tham số timeout axios.post() |
maxRetries | integer | 3 | 0 | 10 | Kiểm tra số lần retry handleFailure() |
Schema Zod được gọi trong WebhookDispatcherService.dispatch() qua WebhookConfigMetadataSchema.parse(webhookConfig.metadata ?? {}) — metadata thiếu hoặc không đầy đủ sẽ được áp dụng giá trị mặc định.
6.4. Giá trị Trạng thái
Định nghĩa trong WebhookConfigStatuses (từ @nx/core):
| Trạng thái | Hằng số Ignis | Hiệu lực |
|---|---|---|
ACTIVATED | Statuses.ACTIVATED | Nhận sự kiện (được bao gồm trong truy vấn webhook) |
DEACTIVATED | Statuses.DEACTIVATED | Tạm vô hiệu (loại trừ khỏi truy vấn) |
ARCHIVED | Statuses.ARCHIVED | Vô hiệu vĩnh viễn (loại trừ khỏi truy vấn) |
NOTE
Chỉ webhook ACTIVATED mới được truy vấn bởi WebhookEventHandlerHelper. Thay đổi trạng thái webhook sang DEACTIVATED hoặc ARCHIVED ngay lập tức dừng gửi sự kiện mà không cần xóa cấu hình.
7. PaymentSocketEventService
Song song với webhook dispatch, sự kiện thanh toán cũng được phát sóng qua WebSocket cho consumer thời gian thực (VD: máy POS).
Nguồn: src/services/payment-socket-event.service.tsKế thừa: BaseSocketEventService (từ @nx/core) Binding key emitter: @nx/payment/websocket-emitter
7.1. Phương thức
| Phương thức | Chữ ký | Mô tả |
|---|---|---|
notifyTransactionUpdate | ({ payload: TTransactionEventPayload }) → void | Trích xuất merchantId, tính toán phòng, gửi dữ liệu transaction (fire-and-forget) |
notifyAttemptUpdate | ({ payload: TAttemptEventPayload }) → void | Trích xuất merchantId, tính toán phòng, gửi dữ liệu attempt (fire-and-forget) |
sendToRoom | ({ room, topic, data }) → Promise<void> | Gửi đến phòng WebSocket cụ thể (kế thừa) |
isReady | () → boolean | Kiểm tra emitter có sẵn (kế thừa) |
7.2. Định tuyến Phòng
Sự kiện transaction → PaymentWebSocketRooms.getTransactionRooms():
| Phòng | Pattern |
|---|---|
| Merchant | wr:observation/merchants/{merchantId} |
| Transaction merchant | wr:observation/merchants/{merchantId}/transactions |
| Transaction cụ thể | wr:observation/transactions/{transactionId} |
Sự kiện attempt → PaymentWebSocketRooms.getPaymentAttemptRooms():
| Phòng | Pattern |
|---|---|
| Merchant | wr:observation/merchants/{merchantId} |
| Attempt merchant | wr:observation/merchants/{merchantId}/payment-attempts |
| Attempt cụ thể | wr:observation/payment-attempts/{attemptId} |
| Attempt transaction | wr:observation/transactions/{transactionId}/payment-attempts (tùy chọn) |
7.3. Topics
| Hằng số | Giá trị | Sử dụng cho |
|---|---|---|
PaymentWebSocketTopics.TRANSACTION | ws:observation.payment.transaction | Tất cả sự kiện vòng đời transaction |
PaymentWebSocketTopics.ATTEMPT | ws:observation.payment.payment-attempt | Tất cả sự kiện vòng đời attempt |
7.4. Resolve Emitter
BaseSocketEventService resolve WebSocketEmitter lazy từ DI container sử dụng emitterBindingKey truyền trong constructor. Emitter được bind bởi ApplicationWebSocketComponent trong quá trình khởi tạo component.
export class PaymentSocketEventService extends BaseSocketEventService {
constructor(
@inject({ key: CoreBindings.APPLICATION_INSTANCE })
application: BaseApplication,
) {
super(application, {
scope: PaymentSocketEventService.name,
emitterBindingKey: PaymentWebSocketBindingKeys.WEBSOCKET_EMITTER,
});
}
}8. Tài liệu Liên quan
| Tài liệu | Mô tả |
|---|---|
| Tổng quan & Thiết lập | Kiến trúc, mô hình dữ liệu, components, services, biến môi trường |
| Cấu hình & Credential | Mã hóa AES-256-GCM, tra cứu credential qua Drizzle, dữ liệu khởi tạo |
| Triển khai | Định tuyến Traefik, cấu hình Docker, viết lại đường dẫn webhook |
| Gói Core | Schema WebhookConfig, Configuration, CryptoUtility |