Event Bus
Tổng quan
Event Bus là một trừu tượng hóa nhắn tin publish-subscribe trong @nx/core giúp tách rời bên tạo sự kiện khỏi bên tiêu thụ. Nó định nghĩa giao diện IEventBus không phụ thuộc transport và cung cấp triển khai RedisPubSubAdapter dựa trên Redis Pub/Sub.
Các sự kiện được bọc trong phong bì IEventBusMessage tiêu chuẩn bao gồm Snowflake ID, tên kênh, dấu thời gian và dữ liệu payload.
| Thuộc tính | Giá trị |
|---|---|
| Giao diện | packages/core/src/helpers/event-bus/types.ts |
| Redis Adapter | packages/core/src/helpers/event-bus/adapters/redis-pubsub.adapter.ts |
| Các Adapter Đã định nghĩa | redis-pubsub (đã triển khai), kafka (đã định nghĩa), rabbitmq (đã định nghĩa) |
Kiến trúc
Giao diện IEventBus
Trừu tượng hóa cốt lõi mà tất cả adapter triển khai.
interface IEventBus {
publish<T>(opts: { channel: string; data: T }): Promise<void>;
subscribe<T>(opts: { channel: string; handler: (data: T) => Promise<void> }): Promise<void>;
unsubscribe(opts: { channel: string }): Promise<void>;
disconnect(): Promise<void>;
}Các Phương thức
| Phương thức | Tham số | Mô tả |
|---|---|---|
publish | { channel, data } | Bọc dữ liệu trong phong bì và xuất bản đến kênh |
subscribe | { channel, handler } | Đăng ký handler cho thông điệp trên kênh |
unsubscribe | { channel } | Xóa handler và hủy đăng ký khỏi kênh |
disconnect | -- | Hủy đăng ký tất cả kênh và đóng kết nối |
IEventBusMessage
Mỗi thông điệp được xuất bản đều được bọc trong phong bì tiêu chuẩn trước khi truyền tải.
interface IEventBusMessage<T = unknown> {
id: string; // Snowflake ID (via IdGenerator)
type: string; // Channel name
publishedAt: string; // ISO 8601 timestamp
data: T; // The actual payload
metadata?: { [key: string | symbol]: AnyType };
}Các Trường Phong bì
| Trường | Kiểu | Nguồn | Mô tả |
|---|---|---|---|
id | string | IdGenerator.getInstance().nextId() | Snowflake ID duy nhất toàn cục |
type | string | Tên kênh truyền vào publish() | Định danh loại sự kiện |
publishedAt | string | dayjs().toISOString() | Dấu thời gian ISO 8601 khi thông điệp được xuất bản |
data | T | Do người gọi cung cấp | Dữ liệu payload sự kiện |
metadata | Record<string | symbol, any> | Tùy chọn | Ngữ cảnh bổ sung (không được adapter sử dụng) |
RedisPubSubAdapter
Adapter production triển khai IEventBus sử dụng hai kết nối Redis riêng biệt: một cho xuất bản và một cho đăng ký. Nó kế thừa BaseService từ IGNIS để ghi log có phạm vi.
Constructor
constructor(options: IEventBusRedisAdapterOptions)interface IEventBusRedisAdapterOptions {
publisher: RedisHelper; // Redis connection for PUBLISH commands
subscriber: RedisHelper; // Redis connection for SUBSCRIBE commands
}Yêu cầu Hai Kết nối Riêng biệt
Redis yêu cầu kết nối riêng biệt cho xuất bản và đăng ký. Một kết nối Redis đơn ở chế độ subscriber không thể thực thi lệnh thông thường. Luôn tạo hai instance RedisHelper.
Hành vi Nội bộ
Khi khởi tạo, adapter thiết lập bốn event listener trên kết nối subscriber:
| Sự kiện | Hành vi |
|---|---|
message | Phân tích phong bì JSON, tìm handler theo kênh, gọi handler với data |
error | Ghi log lỗi |
connect | Ghi log kết nối đã thiết lập |
ready | Ghi log subscriber sẵn sàng nhận thông điệp |
Luồng Xuất bản
Luồng Đăng ký
Các Phương thức Bổ sung
Adapter cung cấp ba phương thức nội quan không nằm trên giao diện IEventBus:
| Phương thức | Trả về | Mô tả |
|---|---|---|
getSubscriptions() | string[] | Tất cả tên kênh đang đăng ký |
getHandlerCount() | number | Số lượng handler đã đăng ký |
isSubscribed(channel) | boolean | Kiểm tra một kênh cụ thể có đang đăng ký không |
Xử lý Lỗi
Redis Pub/Sub hoạt động theo kiểu fire-and-forget. Adapter xử lý lỗi như sau:
| Tình huống | Hành vi |
|---|---|
| Handler thông điệp ném lỗi | Lỗi được ghi log; thông điệp không được thử lại. Không có cơ chế NACK trong Redis Pub/Sub. |
| Phân tích JSON thất bại | Lỗi được ghi log; thông điệp bị loại bỏ. |
| Không có handler cho kênh | Cảnh báo được ghi log; thông điệp bị loại bỏ. |
| Xuất bản thất bại | Lỗi được ghi log và ném lại cho người gọi. |
| Đăng ký thất bại | Lỗi được ghi log và ném lại cho người gọi. |
| Đăng ký trùng lặp | Cảnh báo được ghi log; không tạo đăng ký trùng. |
Logic Thử lại
Nếu bạn cần đảm bảo gửi với thử lại, hãy triển khai logic thử lại bên trong hàm handler. Để xử lý hàng đợi đáng tin cậy, sử dụng BullMQ thông qua thành phần Queue thay thế.
Ngắt kết nối
Gọi disconnect() thực hiện các bước sau:
- Hủy đăng ký tất cả kênh trên kết nối subscriber
- Ngắt kết nối cả publisher và subscriber Redis client
- Xóa các map handler và đăng ký nội bộ
Các Adapter Khả dụng
| Adapter | Trạng thái | Transport |
|---|---|---|
redis-pubsub | Đã triển khai | Redis Pub/Sub (fire-and-forget) |
kafka | Đã định nghĩa kiểu | Apache Kafka (chưa triển khai) |
rabbitmq | Đã định nghĩa kiểu | RabbitMQ (chưa triển khai) |
Kiểu adapter được định nghĩa như sau:
type TEventBusAdapter = 'redis-pubsub' | 'kafka' | 'rabbitmq';Ví dụ Sử dụng
Thiết lập Event Bus
import { RedisConnectionFactory, RedisPubSubAdapter } from '@nx/core';
// Create two separate Redis connections
const publisher = RedisConnectionFactory.create({
name: 'event-bus-publisher',
host: 'localhost',
port: 6379,
database: 0,
});
const subscriber = RedisConnectionFactory.create({
name: 'event-bus-subscriber',
host: 'localhost',
port: 6379,
database: 0,
});
// Connect both
await publisher.connect();
await subscriber.connect();
// Create the event bus
const eventBus = new RedisPubSubAdapter({ publisher, subscriber });Xuất bản Sự kiện
await eventBus.publish({
channel: 'payment.success',
data: {
orderId: '12345',
amount: 50000,
provider: 'VNPAY_QR_MMS',
},
});Đăng ký Sự kiện
await eventBus.subscribe({
channel: 'payment.success',
handler: async (data) => {
// data is the unwrapped payload (not the full envelope)
console.log('Payment succeeded for order:', data.orderId);
await financeService.recordIncome(data);
},
});Binding trong Component
Trong BANA, event bus thường được tạo và bind bên trong một BaseComponent:
import { RedisConnectionFactory, RedisPubSubAdapter } from '@nx/core';
import { BaseComponent, CoreBindings, inject } from '@venizia/ignis';
export class EventBusComponent extends BaseComponent {
constructor(
@inject({ key: CoreBindings.APPLICATION_INSTANCE })
protected application: BaseApplication,
) {
super({
scope: EventBusComponent.name,
initDefault: { enable: true, container: application },
bindings: {},
});
}
override async binding(): Promise<void> {
const publisher = RedisConnectionFactory.create({
name: 'pubsub-publisher',
host: process.env.APP_ENV_CACHE_REDIS_HOST,
port: Number(process.env.APP_ENV_CACHE_REDIS_PORT),
});
const subscriber = RedisConnectionFactory.create({
name: 'pubsub-subscriber',
host: process.env.APP_ENV_CACHE_REDIS_HOST,
port: Number(process.env.APP_ENV_CACHE_REDIS_PORT),
});
await publisher.connect();
await subscriber.connect();
const eventBus = new RedisPubSubAdapter({ publisher, subscriber });
this.application.bind({ key: BindingKeys.EVENT_BUS }).toValue(eventBus);
// Subscribe to channels
await eventBus.subscribe({
channel: 'commerce.initialized',
handler: async (data) => {
// Handle commerce initialization
},
});
}
}Tài liệu Liên quan
- IGNIS Redis Helper -- Các phương thức publish/subscribe của
RedisHelper - Tham chiếu IGNIS Services --
BaseServicemàRedisPubSubAdapterkế thừa - RedisConnectionFactory -- Tạo kết nối Redis cho publisher và subscriber
- IdGenerator -- Tạo Snowflake ID sử dụng cho ID thông điệp