Skip to content

Event Bus

Tổng quan

Event Bus là một trừu tượng hóa nhắn tin publish-subscribe trong @nx/core giúp tách rời bên tạo sự kiện khỏi bên tiêu thụ. Nó định nghĩa giao diện IEventBus không phụ thuộc transport và cung cấp triển khai RedisPubSubAdapter dựa trên Redis Pub/Sub.

Các sự kiện được bọc trong phong bì IEventBusMessage tiêu chuẩn bao gồm Snowflake ID, tên kênh, dấu thời gian và dữ liệu payload.

Thuộc tínhGiá trị
Giao diệnpackages/core/src/helpers/event-bus/types.ts
Redis Adapterpackages/core/src/helpers/event-bus/adapters/redis-pubsub.adapter.ts
Các Adapter Đã định nghĩaredis-pubsub (đã triển khai), kafka (đã định nghĩa), rabbitmq (đã định nghĩa)

Kiến trúc

Giao diện IEventBus

Trừu tượng hóa cốt lõi mà tất cả adapter triển khai.

typescript
interface IEventBus {
  publish<T>(opts: { channel: string; data: T }): Promise<void>;
  subscribe<T>(opts: { channel: string; handler: (data: T) => Promise<void> }): Promise<void>;
  unsubscribe(opts: { channel: string }): Promise<void>;
  disconnect(): Promise<void>;
}

Các Phương thức

Phương thứcTham sốMô tả
publish{ channel, data }Bọc dữ liệu trong phong bì và xuất bản đến kênh
subscribe{ channel, handler }Đăng ký handler cho thông điệp trên kênh
unsubscribe{ channel }Xóa handler và hủy đăng ký khỏi kênh
disconnect--Hủy đăng ký tất cả kênh và đóng kết nối

IEventBusMessage

Mỗi thông điệp được xuất bản đều được bọc trong phong bì tiêu chuẩn trước khi truyền tải.

typescript
interface IEventBusMessage<T = unknown> {
  id: string;          // Snowflake ID (via IdGenerator)
  type: string;        // Channel name
  publishedAt: string; // ISO 8601 timestamp
  data: T;             // The actual payload
  metadata?: { [key: string | symbol]: AnyType };
}

Các Trường Phong bì

TrườngKiểuNguồnMô tả
idstringIdGenerator.getInstance().nextId()Snowflake ID duy nhất toàn cục
typestringTên kênh truyền vào publish()Định danh loại sự kiện
publishedAtstringdayjs().toISOString()Dấu thời gian ISO 8601 khi thông điệp được xuất bản
dataTDo người gọi cung cấpDữ liệu payload sự kiện
metadataRecord<string | symbol, any>Tùy chọnNgữ cảnh bổ sung (không được adapter sử dụng)

RedisPubSubAdapter

Adapter production triển khai IEventBus sử dụng hai kết nối Redis riêng biệt: một cho xuất bản và một cho đăng ký. Nó kế thừa BaseService từ IGNIS để ghi log có phạm vi.

Constructor

typescript
constructor(options: IEventBusRedisAdapterOptions)
typescript
interface IEventBusRedisAdapterOptions {
  publisher: RedisHelper;   // Redis connection for PUBLISH commands
  subscriber: RedisHelper;  // Redis connection for SUBSCRIBE commands
}

Yêu cầu Hai Kết nối Riêng biệt

Redis yêu cầu kết nối riêng biệt cho xuất bản và đăng ký. Một kết nối Redis đơn ở chế độ subscriber không thể thực thi lệnh thông thường. Luôn tạo hai instance RedisHelper.

Hành vi Nội bộ

Khi khởi tạo, adapter thiết lập bốn event listener trên kết nối subscriber:

Sự kiệnHành vi
messagePhân tích phong bì JSON, tìm handler theo kênh, gọi handler với data
errorGhi log lỗi
connectGhi log kết nối đã thiết lập
readyGhi log subscriber sẵn sàng nhận thông điệp

Luồng Xuất bản

Luồng Đăng ký

Các Phương thức Bổ sung

Adapter cung cấp ba phương thức nội quan không nằm trên giao diện IEventBus:

Phương thứcTrả vềMô tả
getSubscriptions()string[]Tất cả tên kênh đang đăng ký
getHandlerCount()numberSố lượng handler đã đăng ký
isSubscribed(channel)booleanKiểm tra một kênh cụ thể có đang đăng ký không

Xử lý Lỗi

Redis Pub/Sub hoạt động theo kiểu fire-and-forget. Adapter xử lý lỗi như sau:

Tình huốngHành vi
Handler thông điệp ném lỗiLỗi được ghi log; thông điệp không được thử lại. Không có cơ chế NACK trong Redis Pub/Sub.
Phân tích JSON thất bạiLỗi được ghi log; thông điệp bị loại bỏ.
Không có handler cho kênhCảnh báo được ghi log; thông điệp bị loại bỏ.
Xuất bản thất bạiLỗi được ghi log và ném lại cho người gọi.
Đăng ký thất bạiLỗi được ghi log và ném lại cho người gọi.
Đăng ký trùng lặpCảnh báo được ghi log; không tạo đăng ký trùng.

Logic Thử lại

Nếu bạn cần đảm bảo gửi với thử lại, hãy triển khai logic thử lại bên trong hàm handler. Để xử lý hàng đợi đáng tin cậy, sử dụng BullMQ thông qua thành phần Queue thay thế.

Ngắt kết nối

Gọi disconnect() thực hiện các bước sau:

  1. Hủy đăng ký tất cả kênh trên kết nối subscriber
  2. Ngắt kết nối cả publisher và subscriber Redis client
  3. Xóa các map handler và đăng ký nội bộ

Các Adapter Khả dụng

AdapterTrạng tháiTransport
redis-pubsubĐã triển khaiRedis Pub/Sub (fire-and-forget)
kafkaĐã định nghĩa kiểuApache Kafka (chưa triển khai)
rabbitmqĐã định nghĩa kiểuRabbitMQ (chưa triển khai)

Kiểu adapter được định nghĩa như sau:

typescript
type TEventBusAdapter = 'redis-pubsub' | 'kafka' | 'rabbitmq';

Ví dụ Sử dụng

Thiết lập Event Bus

typescript
import { RedisConnectionFactory, RedisPubSubAdapter } from '@nx/core';

// Create two separate Redis connections
const publisher = RedisConnectionFactory.create({
  name: 'event-bus-publisher',
  host: 'localhost',
  port: 6379,
  database: 0,
});

const subscriber = RedisConnectionFactory.create({
  name: 'event-bus-subscriber',
  host: 'localhost',
  port: 6379,
  database: 0,
});

// Connect both
await publisher.connect();
await subscriber.connect();

// Create the event bus
const eventBus = new RedisPubSubAdapter({ publisher, subscriber });

Xuất bản Sự kiện

typescript
await eventBus.publish({
  channel: 'payment.success',
  data: {
    orderId: '12345',
    amount: 50000,
    provider: 'VNPAY_QR_MMS',
  },
});

Đăng ký Sự kiện

typescript
await eventBus.subscribe({
  channel: 'payment.success',
  handler: async (data) => {
    // data is the unwrapped payload (not the full envelope)
    console.log('Payment succeeded for order:', data.orderId);
    await financeService.recordIncome(data);
  },
});

Binding trong Component

Trong BANA, event bus thường được tạo và bind bên trong một BaseComponent:

typescript
import { RedisConnectionFactory, RedisPubSubAdapter } from '@nx/core';
import { BaseComponent, CoreBindings, inject } from '@venizia/ignis';

export class EventBusComponent extends BaseComponent {
  constructor(
    @inject({ key: CoreBindings.APPLICATION_INSTANCE })
    protected application: BaseApplication,
  ) {
    super({
      scope: EventBusComponent.name,
      initDefault: { enable: true, container: application },
      bindings: {},
    });
  }

  override async binding(): Promise<void> {
    const publisher = RedisConnectionFactory.create({
      name: 'pubsub-publisher',
      host: process.env.APP_ENV_CACHE_REDIS_HOST,
      port: Number(process.env.APP_ENV_CACHE_REDIS_PORT),
    });

    const subscriber = RedisConnectionFactory.create({
      name: 'pubsub-subscriber',
      host: process.env.APP_ENV_CACHE_REDIS_HOST,
      port: Number(process.env.APP_ENV_CACHE_REDIS_PORT),
    });

    await publisher.connect();
    await subscriber.connect();

    const eventBus = new RedisPubSubAdapter({ publisher, subscriber });

    this.application.bind({ key: BindingKeys.EVENT_BUS }).toValue(eventBus);

    // Subscribe to channels
    await eventBus.subscribe({
      channel: 'commerce.initialized',
      handler: async (data) => {
        // Handle commerce initialization
      },
    });
  }
}

Tài liệu Liên quan

Proprietary and Confidential. Unauthorized copying, distribution, or use of this software is strictly prohibited.