Skip to content

Event Bus

Overview

The Event Bus is a publish-subscribe messaging abstraction in @nx/core that decouples event producers from consumers. It defines a transport-agnostic IEventBus interface and provides a RedisPubSubAdapter implementation backed by Redis Pub/Sub.

Events are wrapped in a standardized IEventBusMessage envelope that includes a Snowflake ID, channel name, timestamp, and the payload data.

PropertyValue
Interfacepackages/core/src/helpers/event-bus/types.ts
Redis Adapterpackages/core/src/helpers/event-bus/adapters/redis-pubsub.adapter.ts
Defined Adaptersredis-pubsub (implemented), kafka (defined), rabbitmq (defined)

Architecture

IEventBus Interface

The core abstraction that all adapters implement.

typescript
interface IEventBus {
  publish<T>(opts: { channel: string; data: T }): Promise<void>;
  subscribe<T>(opts: { channel: string; handler: (data: T) => Promise<void> }): Promise<void>;
  unsubscribe(opts: { channel: string }): Promise<void>;
  disconnect(): Promise<void>;
}

Methods

MethodParametersDescription
publish{ channel, data }Wraps data in an envelope and publishes to the channel
subscribe{ channel, handler }Registers a handler for messages on the channel
unsubscribe{ channel }Removes the handler and unsubscribes from the channel
disconnect--Unsubscribes from all channels and closes connections

IEventBusMessage

Every published message is wrapped in a standardized envelope before transmission.

typescript
interface IEventBusMessage<T = unknown> {
  id: string;          // Snowflake ID (via IdGenerator)
  type: string;        // Channel name
  publishedAt: string; // ISO 8601 timestamp
  data: T;             // The actual payload
  metadata?: { [key: string | symbol]: AnyType };
}

Envelope Fields

FieldTypeSourceDescription
idstringIdGenerator.getInstance().nextId()Globally unique Snowflake ID
typestringChannel name passed to publish()Identifies the event type
publishedAtstringdayjs().toISOString()ISO 8601 timestamp of when the message was published
dataTCaller-providedThe event payload
metadataRecord<string | symbol, any>OptionalAdditional context (not used by the adapter itself)

RedisPubSubAdapter

The production adapter that implements IEventBus using two dedicated Redis connections: one for publishing and one for subscribing. It extends BaseService from IGNIS for scoped logging.

Constructor

typescript
constructor(options: IEventBusRedisAdapterOptions)
typescript
interface IEventBusRedisAdapterOptions {
  publisher: RedisHelper;   // Redis connection for PUBLISH commands
  subscriber: RedisHelper;  // Redis connection for SUBSCRIBE commands
}

Two Separate Connections Required

Redis requires separate connections for publishing and subscribing. A single Redis connection in subscriber mode cannot issue regular commands. Always create two RedisHelper instances.

Internal Behavior

On construction, the adapter sets up four event listeners on the subscriber connection:

EventBehavior
messageParses JSON envelope, looks up handler by channel, invokes handler with data
errorLogs the error
connectLogs connection established
readyLogs subscriber is ready to receive messages

Publish Flow

Subscribe Flow

Additional Methods

The adapter provides three introspection methods not on the IEventBus interface:

MethodReturnsDescription
getSubscriptions()string[]All currently subscribed channel names
getHandlerCount()numberNumber of registered handlers
isSubscribed(channel)booleanWhether a specific channel is subscribed

Error Handling

Redis Pub/Sub is fire-and-forget. The adapter handles errors as follows:

ScenarioBehavior
Message handler throwsError is logged; message is not retried. No NACK mechanism in Redis Pub/Sub.
JSON parse failsError is logged; message is discarded.
No handler for channelWarning is logged; message is discarded.
Publish failsError is logged and re-thrown to the caller.
Subscribe failsError is logged and re-thrown to the caller.
Duplicate subscribeWarning is logged; no duplicate subscription is created.

Retry Logic

If you need guaranteed delivery with retries, implement retry logic inside your handler function. For queue-based reliable processing, use BullMQ via the Queue component instead.

Disconnect

Calling disconnect() performs the following steps:

  1. Unsubscribes from all channels on the subscriber connection
  2. Disconnects both publisher and subscriber Redis clients
  3. Clears internal handler and subscription maps

Available Adapters

AdapterStatusTransport
redis-pubsubImplementedRedis Pub/Sub (fire-and-forget)
kafkaType definedApache Kafka (not yet implemented)
rabbitmqType definedRabbitMQ (not yet implemented)

The adapter type is defined as:

typescript
type TEventBusAdapter = 'redis-pubsub' | 'kafka' | 'rabbitmq';

Usage Example

Setting Up the Event Bus

typescript
import { RedisConnectionFactory, RedisPubSubAdapter } from '@nx/core';

// Create two separate Redis connections
const publisher = RedisConnectionFactory.create({
  name: 'event-bus-publisher',
  host: 'localhost',
  port: 6379,
  database: 0,
});

const subscriber = RedisConnectionFactory.create({
  name: 'event-bus-subscriber',
  host: 'localhost',
  port: 6379,
  database: 0,
});

// Connect both
await publisher.connect();
await subscriber.connect();

// Create the event bus
const eventBus = new RedisPubSubAdapter({ publisher, subscriber });

Publishing Events

typescript
await eventBus.publish({
  channel: 'payment.success',
  data: {
    orderId: '12345',
    amount: 50000,
    provider: 'VNPAY_QR_MMS',
  },
});

Subscribing to Events

typescript
await eventBus.subscribe({
  channel: 'payment.success',
  handler: async (data) => {
    // data is the unwrapped payload (not the full envelope)
    console.log('Payment succeeded for order:', data.orderId);
    await financeService.recordIncome(data);
  },
});

Binding in a Component

In BANA, the event bus is typically created and bound inside a BaseComponent:

typescript
import { RedisConnectionFactory, RedisPubSubAdapter } from '@nx/core';
import { BaseComponent, CoreBindings, inject } from '@venizia/ignis';

export class EventBusComponent extends BaseComponent {
  constructor(
    @inject({ key: CoreBindings.APPLICATION_INSTANCE })
    protected application: BaseApplication,
  ) {
    super({
      scope: EventBusComponent.name,
      initDefault: { enable: true, container: application },
      bindings: {},
    });
  }

  override async binding(): Promise<void> {
    const publisher = RedisConnectionFactory.create({
      name: 'pubsub-publisher',
      host: process.env.APP_ENV_CACHE_REDIS_HOST,
      port: Number(process.env.APP_ENV_CACHE_REDIS_PORT),
    });

    const subscriber = RedisConnectionFactory.create({
      name: 'pubsub-subscriber',
      host: process.env.APP_ENV_CACHE_REDIS_HOST,
      port: Number(process.env.APP_ENV_CACHE_REDIS_PORT),
    });

    await publisher.connect();
    await subscriber.connect();

    const eventBus = new RedisPubSubAdapter({ publisher, subscriber });

    this.application.bind({ key: BindingKeys.EVENT_BUS }).toValue(eventBus);

    // Subscribe to channels
    await eventBus.subscribe({
      channel: 'commerce.initialized',
      handler: async (data) => {
        // Handle commerce initialization
      },
    });
  }
}

Proprietary and Confidential. Unauthorized copying, distribution, or use of this software is strictly prohibited.