Event Bus
Overview
The Event Bus is a publish-subscribe messaging abstraction in @nx/core that decouples event producers from consumers. It defines a transport-agnostic IEventBus interface and provides a RedisPubSubAdapter implementation backed by Redis Pub/Sub.
Events are wrapped in a standardized IEventBusMessage envelope that includes a Snowflake ID, channel name, timestamp, and the payload data.
| Property | Value |
|---|---|
| Interface | packages/core/src/helpers/event-bus/types.ts |
| Redis Adapter | packages/core/src/helpers/event-bus/adapters/redis-pubsub.adapter.ts |
| Defined Adapters | redis-pubsub (implemented), kafka (defined), rabbitmq (defined) |
Architecture
IEventBus Interface
The core abstraction that all adapters implement.
interface IEventBus {
publish<T>(opts: { channel: string; data: T }): Promise<void>;
subscribe<T>(opts: { channel: string; handler: (data: T) => Promise<void> }): Promise<void>;
unsubscribe(opts: { channel: string }): Promise<void>;
disconnect(): Promise<void>;
}Methods
| Method | Parameters | Description |
|---|---|---|
publish | { channel, data } | Wraps data in an envelope and publishes to the channel |
subscribe | { channel, handler } | Registers a handler for messages on the channel |
unsubscribe | { channel } | Removes the handler and unsubscribes from the channel |
disconnect | -- | Unsubscribes from all channels and closes connections |
IEventBusMessage
Every published message is wrapped in a standardized envelope before transmission.
interface IEventBusMessage<T = unknown> {
id: string; // Snowflake ID (via IdGenerator)
type: string; // Channel name
publishedAt: string; // ISO 8601 timestamp
data: T; // The actual payload
metadata?: { [key: string | symbol]: AnyType };
}Envelope Fields
| Field | Type | Source | Description |
|---|---|---|---|
id | string | IdGenerator.getInstance().nextId() | Globally unique Snowflake ID |
type | string | Channel name passed to publish() | Identifies the event type |
publishedAt | string | dayjs().toISOString() | ISO 8601 timestamp of when the message was published |
data | T | Caller-provided | The event payload |
metadata | Record<string | symbol, any> | Optional | Additional context (not used by the adapter itself) |
RedisPubSubAdapter
The production adapter that implements IEventBus using two dedicated Redis connections: one for publishing and one for subscribing. It extends BaseService from IGNIS for scoped logging.
Constructor
constructor(options: IEventBusRedisAdapterOptions)interface IEventBusRedisAdapterOptions {
publisher: RedisHelper; // Redis connection for PUBLISH commands
subscriber: RedisHelper; // Redis connection for SUBSCRIBE commands
}Two Separate Connections Required
Redis requires separate connections for publishing and subscribing. A single Redis connection in subscriber mode cannot issue regular commands. Always create two RedisHelper instances.
Internal Behavior
On construction, the adapter sets up four event listeners on the subscriber connection:
| Event | Behavior |
|---|---|
message | Parses JSON envelope, looks up handler by channel, invokes handler with data |
error | Logs the error |
connect | Logs connection established |
ready | Logs subscriber is ready to receive messages |
Publish Flow
Subscribe Flow
Additional Methods
The adapter provides three introspection methods not on the IEventBus interface:
| Method | Returns | Description |
|---|---|---|
getSubscriptions() | string[] | All currently subscribed channel names |
getHandlerCount() | number | Number of registered handlers |
isSubscribed(channel) | boolean | Whether a specific channel is subscribed |
Error Handling
Redis Pub/Sub is fire-and-forget. The adapter handles errors as follows:
| Scenario | Behavior |
|---|---|
| Message handler throws | Error is logged; message is not retried. No NACK mechanism in Redis Pub/Sub. |
| JSON parse fails | Error is logged; message is discarded. |
| No handler for channel | Warning is logged; message is discarded. |
| Publish fails | Error is logged and re-thrown to the caller. |
| Subscribe fails | Error is logged and re-thrown to the caller. |
| Duplicate subscribe | Warning is logged; no duplicate subscription is created. |
Retry Logic
If you need guaranteed delivery with retries, implement retry logic inside your handler function. For queue-based reliable processing, use BullMQ via the Queue component instead.
Disconnect
Calling disconnect() performs the following steps:
- Unsubscribes from all channels on the subscriber connection
- Disconnects both publisher and subscriber Redis clients
- Clears internal handler and subscription maps
Available Adapters
| Adapter | Status | Transport |
|---|---|---|
redis-pubsub | Implemented | Redis Pub/Sub (fire-and-forget) |
kafka | Type defined | Apache Kafka (not yet implemented) |
rabbitmq | Type defined | RabbitMQ (not yet implemented) |
The adapter type is defined as:
type TEventBusAdapter = 'redis-pubsub' | 'kafka' | 'rabbitmq';Usage Example
Setting Up the Event Bus
import { RedisConnectionFactory, RedisPubSubAdapter } from '@nx/core';
// Create two separate Redis connections
const publisher = RedisConnectionFactory.create({
name: 'event-bus-publisher',
host: 'localhost',
port: 6379,
database: 0,
});
const subscriber = RedisConnectionFactory.create({
name: 'event-bus-subscriber',
host: 'localhost',
port: 6379,
database: 0,
});
// Connect both
await publisher.connect();
await subscriber.connect();
// Create the event bus
const eventBus = new RedisPubSubAdapter({ publisher, subscriber });Publishing Events
await eventBus.publish({
channel: 'payment.success',
data: {
orderId: '12345',
amount: 50000,
provider: 'VNPAY_QR_MMS',
},
});Subscribing to Events
await eventBus.subscribe({
channel: 'payment.success',
handler: async (data) => {
// data is the unwrapped payload (not the full envelope)
console.log('Payment succeeded for order:', data.orderId);
await financeService.recordIncome(data);
},
});Binding in a Component
In BANA, the event bus is typically created and bound inside a BaseComponent:
import { RedisConnectionFactory, RedisPubSubAdapter } from '@nx/core';
import { BaseComponent, CoreBindings, inject } from '@venizia/ignis';
export class EventBusComponent extends BaseComponent {
constructor(
@inject({ key: CoreBindings.APPLICATION_INSTANCE })
protected application: BaseApplication,
) {
super({
scope: EventBusComponent.name,
initDefault: { enable: true, container: application },
bindings: {},
});
}
override async binding(): Promise<void> {
const publisher = RedisConnectionFactory.create({
name: 'pubsub-publisher',
host: process.env.APP_ENV_CACHE_REDIS_HOST,
port: Number(process.env.APP_ENV_CACHE_REDIS_PORT),
});
const subscriber = RedisConnectionFactory.create({
name: 'pubsub-subscriber',
host: process.env.APP_ENV_CACHE_REDIS_HOST,
port: Number(process.env.APP_ENV_CACHE_REDIS_PORT),
});
await publisher.connect();
await subscriber.connect();
const eventBus = new RedisPubSubAdapter({ publisher, subscriber });
this.application.bind({ key: BindingKeys.EVENT_BUS }).toValue(eventBus);
// Subscribe to channels
await eventBus.subscribe({
channel: 'commerce.initialized',
handler: async (data) => {
// Handle commerce initialization
},
});
}
}Related Resources
- IGNIS Redis Helper --
RedisHelperpublish/subscribe methods - IGNIS Services Reference --
BaseServicethatRedisPubSubAdapterextends - RedisConnectionFactory -- Creating Redis connections for publisher and subscriber
- IdGenerator -- Snowflake ID generation used for message IDs