Chapter 11: Events
Previous: Data Mapping ← | Next: Workflows →
HTTP requests need fast responses. But the work triggered by a request — sending emails, updating search indexes, generating reports, syncing with third-party APIs — often takes longer than a user should wait. Events decouple the “acknowledge the request” from the “do the work,” letting your API respond in milliseconds while background processing happens asynchronously.
OriJS provides a type-safe event system built on the provider pattern. BullMQ is the default event provider, giving you persistent, retryable, Redis-backed queues out of the box. But BullMQ is a provider, not a requirement. If you need RabbitMQ, Kafka, SQS, or even a simple in-memory queue for testing, you write a provider that implements the EventProvider interface and plug it in. Your event definitions, handlers, and business logic stay exactly the same.
Why Events?
Loose Coupling
Without events, a user registration endpoint might look like this:
async createUser(data: CreateUserInput) {
const user = await this.userRepository.create(data);
await this.emailService.sendWelcome(user.email); // What if email is down?
await this.analyticsService.trackSignup(user.uuid); // What if analytics is slow?
await this.searchService.indexUser(user); // What if search is rebuilding?
return user;
}
Every downstream service is a direct dependency. If email is down, user creation fails. If analytics is slow, the user waits. If you add a new integration, you modify the user service.
With events:
async createUser(data: CreateUserInput) {
const user = await this.userRepository.create(data);
ctx.events.emit(UserCreated, { userId: user.uuid, email: user.email });
return user; // Responds immediately
}
The user service doesn’t know or care who handles UserCreated. Email, analytics, and search each subscribe independently. Adding a new subscriber requires zero changes to the user service.
Fast Responses
The emit call returns immediately. The actual processing happens asynchronously, in a background worker. Your API response time is decoupled from the time it takes to send emails or update indexes.
Resilience
If the email service is temporarily down, the event sits in the queue. BullMQ retries it with exponential backoff. The user’s account was already created successfully. When the email service recovers, the welcome email goes out. No data loss, no manual intervention.
Extensibility
A year from now, you need to send a Slack notification when a user signs up. You add a new consumer for UserCreated. The existing code doesn’t change. The user service doesn’t even know the Slack consumer exists.
Event.define() — Type-Safe Event Definitions
Events are defined using Event.define() with TypeBox schemas for the data and result:
import { Event } from '@orijs/core';
import { Type } from '@orijs/validation';
const UserCreated = Event.define({
name: 'user.created',
data: Type.Object({
userId: Type.String(),
email: Type.String(),
displayName: Type.Optional(Type.String()),
}),
result: Type.Object({
welcomeEmailSent: Type.Boolean(),
}),
});
This creates a frozen, immutable event definition with:
name: A unique event identifier using dot notation (entity.actionin past tense)data: A TypeBox schema defining the event payloadresult: A TypeBox schema defining the handler’s return value (useType.Void()for fire-and-forget)
Why Type Carriers Over Class-Based Events
Many frameworks define events as classes:
// NestJS-style class events
class UserCreatedEvent {
constructor(
public userId: string,
public email: string,
) {}
}
OriJS uses the “type carrier” pattern instead. Here’s why:
Serialization: Events are sent between processes, across queues, through Redis. Classes don’t serialize well — you lose methods, prototypes, and instanceof checks. A plain object with a TypeBox schema serializes perfectly and validates at the boundary.
Validation: TypeBox schemas validate at runtime. When an event arrives from a queue, you can verify its shape before processing. Classes give you no runtime validation — you trust that the sender constructed the object correctly.
No Inheritance: Class-based events tend to accumulate inheritance hierarchies (BaseEvent -> DomainEvent -> UserEvent -> UserCreatedEvent). Type carriers are flat, composable definitions. No prototype chains, no diamond inheritance, no hidden behavior.
Type Extraction: The _data and _result fields are undefined at runtime but carry the TypeScript type for compile-time extraction:
import type { Data, Result } from '@orijs/core';
type UserCreatedData = Data<typeof UserCreated>;
// { userId: string; email: string; displayName?: string }
type UserCreatedResult = Result<typeof UserCreated>;
// { welcomeEmailSent: boolean }
Event Naming Conventions
Follow the entity.action pattern in past tense:
user.created -- Good: entity + past tense action
order.placed -- Good
payment.processed -- Good
monitor.check.failed -- Good: multi-level for sub-actions
createUser -- Bad: imperative
user-created -- Bad: use dots, not hyphens
USER_CREATED -- Bad: not SCREAMING_SNAKE
Implementing Event Consumers
Event consumers handle incoming events. A consumer is a class that implements IEventConsumer:
import type { IEventConsumer, EventContext, Data, Result } from '@orijs/core';
class UserCreatedConsumer implements IEventConsumer<
Data<typeof UserCreated>,
Result<typeof UserCreated>
> {
constructor(
private emailService: EmailService,
private analyticsService: AnalyticsService,
) {}
// Must be an arrow function property for correct `this` binding
onEvent = async (ctx: EventContext<Data<typeof UserCreated>>) => {
const { userId, email, displayName } = ctx.data;
ctx.log.info('Processing user.created', { userId });
// Send welcome email
const sent = await this.emailService.sendWelcome(email, displayName);
// Track analytics
await this.analyticsService.track('signup', { userId });
return { welcomeEmailSent: sent };
};
// Optional: called after onEvent succeeds
onSuccess = async (ctx: EventContext<Data<typeof UserCreated>>, result: Result<typeof UserCreated>) => {
ctx.log.info('user.created handled successfully', {
userId: ctx.data.userId,
emailSent: result.welcomeEmailSent,
});
};
// Optional: called when onEvent throws
onError = async (ctx: EventContext<Data<typeof UserCreated>>, error: Error) => {
ctx.log.error('user.created handler failed', {
userId: ctx.data.userId,
error: error.message,
});
};
}
Why Arrow Function Properties?
The consumer’s onEvent is an arrow function property, not a method. This is intentional:
// Arrow function -- `this` is captured at definition time
onEvent = async (ctx) => {
await this.emailService.send(ctx.data.email); // `this` works correctly
};
// Regular method -- `this` would be undefined when framework calls handler
async onEvent(ctx) {
await this.emailService.send(ctx.data.email); // `this` is undefined!
}
When the framework invokes the handler, it detaches the function from the instance. Arrow functions capture this lexically, so dependency access always works.
EventContext
Every handler receives an EventContext with:
| Property | Type | Description |
|---|---|---|
eventId | string | Unique ID for this event instance (for idempotency) |
data | TPayload | The typed event payload |
log | Logger | Logger with propagated correlation context |
emit | function | Emit chained events with automatic causation tracking |
correlationId | string | Request-response correlation ID |
causationId | string? | Parent event ID (for event chains) |
eventName | string | The event name being handled |
timestamp | number | When the event was emitted |
Multiple Consumers per Event
Multiple consumers can handle the same event independently:
// Consumer 1: Send welcome email
class WelcomeEmailConsumer implements IEventConsumer<Data<typeof UserCreated>, void> {
onEvent = async (ctx) => {
await this.emailService.sendWelcome(ctx.data.email);
};
}
// Consumer 2: Index user for search
class SearchIndexConsumer implements IEventConsumer<Data<typeof UserCreated>, void> {
onEvent = async (ctx) => {
await this.searchService.indexUser(ctx.data.userId);
};
}
// Consumer 3: Track analytics
class AnalyticsConsumer implements IEventConsumer<Data<typeof UserCreated>, void> {
onEvent = async (ctx) => {
await this.analytics.track('user.created', ctx.data);
};
}
// Register all three
app
.event(UserCreated).consumer(WelcomeEmailConsumer, [EmailService])
.event(UserCreated).consumer(SearchIndexConsumer, [SearchService])
.event(UserCreated).consumer(AnalyticsConsumer, [AnalyticsService]);
Each consumer gets its own queue and processes independently. If the search indexer is slow, it doesn’t affect email delivery.
Emitting Events
Events are emitted from controllers and consumers (entry points only — services should never emit events directly):
class UserController {
constructor(private userService: UserClientService) {}
configure(r: RouteBuilder) {
r.post('/users', this.createUser);
}
private createUser = async (ctx: RequestContext) => {
const data = await ctx.json<CreateUserInput>();
const user = await this.userService.createUser(data);
// Emit event -- returns immediately
ctx.events.emit(UserCreated, {
userId: user.uuid,
email: user.email,
displayName: user.displayName,
});
return Response.json(user, { status: 201 });
};
}
Emitting from Event Handlers (Chained Events)
Event handlers can emit additional events, creating event chains. The causationId is automatically set to the current event’s correlation ID, enabling distributed tracing:
class OrderPlacedConsumer implements IEventConsumer<OrderData, void> {
onEvent = async (ctx) => {
// Process the order
const processed = await this.orderService.process(ctx.data.orderId);
// Emit chained event -- causationId automatically set
ctx.emit('inventory.reserved', {
orderId: ctx.data.orderId,
items: processed.reservedItems,
});
// Emit delayed event (send receipt email after 5 minutes)
ctx.emit('receipt.email.send', {
orderId: ctx.data.orderId,
}, { delay: 5 * 60 * 1000 }); // 5 minutes
};
}
The BullMQ Provider
BullMQ is the default event provider for production use. It provides persistent, Redis-backed queues with automatic retries, backoff, rate limiting, and job lifecycle management.
Configuration
import { BullMQEventProvider } from '@orijs/bullmq';
const eventProvider = new BullMQEventProvider({
connection: {
host: config.secrets.SECRET_REDIS_HOST,
port: 6379,
},
// Retry configuration
defaultJobOptions: {
attempts: 5,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: { age: 3600 }, // Keep completed jobs for 1 hour
removeOnFail: false, // Keep failed jobs for inspection
},
// Worker configuration
defaultWorkerOptions: {
concurrency: 10, // Process 10 events in parallel
limiter: { max: 100, duration: 1000 }, // Max 100 per second
},
});
// Pass to application
const app = Ori.create()
.eventProvider(eventProvider)
// ...
.listen(8001);
Per-Event-Type Queues
BullMQ creates a separate queue for each event type. This is a deliberate architectural decision:
Queue: event.user.created -> Worker (concurrency: 10)
Queue: event.order.placed -> Worker (concurrency: 5)
Queue: event.email.send -> Worker (concurrency: 20)
Queue: event.report.generate -> Worker (concurrency: 1)
Why separate queues?
No Head-of-Line Blocking: If report.generate takes 5 minutes per event, it doesn’t block email.send which takes 200ms. Each queue processes independently.
Independent Scaling: You can set different concurrency per event type. Email sending might have concurrency 20, while report generation has concurrency 1 to avoid overloading the database.
Isolation: A failing handler for one event type doesn’t affect other types. If order.placed is consistently failing due to a downstream service outage, user.created keeps processing normally.
Monitoring: Separate queues mean separate metrics. You can monitor backlog, processing time, and error rates per event type.
Scheduled and Delayed Events
BullMQ supports delayed delivery and recurring schedules:
// Delayed event -- process after 30 seconds (via provider directly)
eventProvider.emit('user.created', payload, meta, { delay: 30000 });
// Scheduled recurring event (via provider directly)
await eventProvider.scheduleEvent('cleanup.run', {
scheduleId: 'daily-cleanup',
cron: '0 0 * * *', // Every day at midnight
payload: { maxAge: 30 }, // Delete records older than 30 days
});
// Remove a schedule
await eventProvider.unscheduleEvent('cleanup.run', 'daily-cleanup');
Distributed Tracing
When an event is emitted, the current request’s trace context is automatically captured from AsyncLocalStorage and propagated through the queue:
[HTTP Request] correlationId: "req-abc-123"
-> emit('user.created') -- correlationId propagated to job data
-> [Worker] EventContext.correlationId: "req-abc-123"
-> ctx.log.info('Processing') -- log includes correlationId
-> ctx.emit('email.send') -- causationId: "req-abc-123"
-> [Worker] EventContext.causationId: "req-abc-123"
Every log line, error, and chained event carries the original request’s correlation ID. When debugging a failed email, you can trace it back to the exact HTTP request that triggered it.
Error Handling and Idempotency
Automatic Retries
BullMQ automatically retries failed events based on your configuration:
defaultJobOptions: {
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000,
},
}
This retries up to 5 times with exponential backoff: 2s, 4s, 8s, 16s, 32s.
Idempotency
Because events can be retried, handlers should be idempotent — processing the same event twice should have the same effect as processing it once. The EventIdempotency helper tracks processed event IDs:
import { EventIdempotency } from '@orijs/events';
class UserCreatedConsumer implements IEventConsumer<UserData, void> {
private idempotency = new EventIdempotency();
onEvent = async (ctx) => {
const result = await this.idempotency.processOnce(ctx.eventId, async () => {
await this.db.insertUser(ctx.data);
await this.emailService.sendWelcome(ctx.data.email);
return { processed: true };
});
if (!result.executed) {
ctx.log.info('Skipped duplicate event', { eventId: ctx.eventId });
}
};
}
The in-memory EventIdempotency is suitable for single-process deployments. For distributed systems, use Redis-backed idempotency or leverage BullMQ’s built-in jobId-based deduplication:
// Emit with idempotency key -- BullMQ ignores duplicate jobIds (via provider directly)
eventProvider.emit('user.created', payload, meta, {
idempotencyKey: `user-created-${userId}`,
});
Dead Letter Queue Pattern
Keep failed jobs for inspection instead of discarding them:
defaultJobOptions: {
attempts: 5,
removeOnFail: false, // Failed jobs stay in queue for inspection
}
You can then inspect failed jobs through BullMQ’s dashboard or programmatically query the queue for failed jobs, examine their error messages, fix the issue, and retry them.
Event TTL (Time-to-Live)
BullMQ has no native expiration for jobs in the “waiting” state. When events are emitted without a consumer — during phased rollouts, after removing a consumer, or for scheduled health checks — jobs accumulate in Redis indefinitely. For time-sensitive events, stale jobs are useless and waste resources.
Declaring TTL
Add a ttl field (in milliseconds) to your event definition:
const OnCallCheck = Event.define({
name: 'oncall.check',
data: Type.Object({ version: Type.Number() }),
result: Type.Void(),
ttl: 120_000, // Jobs stale after 2 minutes
});
Events without ttl retain current behavior — no expiration, no cleanup.
What TTL Controls
| Job State | Mechanism | Details |
|---|---|---|
| Waiting (no consumer) | Periodic sweep via queue.clean() | Main problem being solved |
| Failed (after retries) | Per-event removeOnFail: { age: ttl/1000 } | Belt-and-suspenders with sweep |
| Completed | Unchanged | Governed by existing removeOnComplete config |
| Active | Not affected | Active jobs should finish, not be killed |
BullMQ Provider Configuration
The BullMQ provider accepts two optional settings for the sweep:
addBullMQEvents(app, {
connection: { host: 'redis', port: 6379 },
sweepIntervalMs: 60_000, // How often to sweep (default: 60s)
sweepBatchLimit: 1000, // Max jobs cleaned per sweep per event (default: 1000)
});
The sweep runs on a setInterval with .unref() so it doesn’t keep your process alive during shutdown.
Startup Warnings
OriJS logs a warning at startup for any events registered without consumers:
WARN: Events registered without consumers: [oncall.check, oncall.escalation].
Jobs for these events will accumulate in queues with no processor.
Set a TTL on the event definition to auto-clean stale jobs.
This catches the common case of phased rollouts where events are defined but consumers haven’t been deployed yet.
Testing Events
Unit Testing Consumers
Test consumer handlers in isolation by creating a mock EventContext:
import { describe, test, expect, mock } from 'bun:test';
describe('UserCreatedConsumer', () => {
test('should send welcome email', async () => {
const mockEmailService = {
sendWelcome: mock(() => Promise.resolve(true)),
};
const consumer = new UserCreatedConsumer(mockEmailService as EmailService);
const ctx = {
eventId: 'evt-123',
data: { userId: 'usr-456', email: 'alice@test.com' },
log: { info: mock(), error: mock() },
emit: mock(),
correlationId: 'corr-789',
eventName: 'user.created',
timestamp: Date.now(),
};
const result = await consumer.onEvent(ctx);
expect(mockEmailService.sendWelcome).toHaveBeenCalledWith('alice@test.com', undefined);
expect(result).toEqual({ welcomeEmailSent: true });
});
});
Testing Event Emission
Verify that your controllers emit the correct events:
test('should emit user.created event on signup', async () => {
const emittedEvents: Array<{ event: unknown; data: unknown }> = [];
// Mock the events emitter
const mockEvents = {
emit: mock((event, data) => {
emittedEvents.push({ event, data });
}),
};
const controller = new UserController(mockUserService);
const ctx = createMockRequestContext({ events: mockEvents });
await controller.createUser(ctx);
expect(emittedEvents).toHaveLength(1);
expect(emittedEvents[0].event).toBe(UserCreated);
expect(emittedEvents[0].data).toEqual({
userId: 'usr-123',
email: 'alice@test.com',
});
});
Integration Testing with InProcessEventProvider
For integration tests that verify the full event flow (emit -> queue -> handler), use the InProcessEventProvider:
import { InProcessEventProvider } from '@orijs/events';
const provider = new InProcessEventProvider();
const events = createEventSystem(registry, { provider });
// Register handler
events.onEvent<UserData>('user.created', async (ctx) => {
await userService.handleCreated(ctx.data);
return { handled: true };
});
await events.start();
// Emit and wait for result
const subscription = events.emit('user.created', { userId: 'usr-123' });
// InProcessEventProvider processes synchronously for testing
The in-process provider executes handlers synchronously in the same process, making it perfect for tests that need to verify the entire event flow without Redis.
Writing a Custom Event Provider
The EventProvider interface extends two focused interfaces:
// What services see (emit + subscribe)
interface EventEmitter<TEventNames extends string = string> {
emit<TReturn = void>(
eventName: TEventNames,
payload: unknown,
meta?: PropagationMeta,
options?: EmitOptions,
): EventSubscription<TReturn>;
subscribe<TPayload = unknown, TReturn = void>(
eventName: TEventNames,
handler: EventHandlerFn<TPayload, TReturn>,
): void | Promise<void>;
}
// What the framework manages (start + stop)
interface EventLifecycle {
start(): Promise<void>;
stop(): Promise<void>;
}
// Full provider interface (combines both)
interface EventProvider<TEventNames extends string = string>
extends EventEmitter<TEventNames>, EventLifecycle {}
Example: Simple In-Memory Provider
import type {
EventProvider,
EventHandlerFn,
EmitOptions,
EventMessage,
EventSubscription,
} from '@orijs/events';
import { createSubscription, EVENT_MESSAGE_VERSION } from '@orijs/events';
class SimpleEventProvider implements EventProvider {
private handlers = new Map<string, EventHandlerFn[]>();
emit<TReturn = void>(
eventName: string,
payload: unknown,
meta?: PropagationMeta,
options?: EmitOptions,
): EventSubscription<TReturn> {
const subscription = createSubscription<TReturn>();
const message: EventMessage = {
version: EVENT_MESSAGE_VERSION,
eventId: crypto.randomUUID(),
eventName,
payload,
meta: meta ?? {},
correlationId: subscription.correlationId,
causationId: options?.causationId,
timestamp: Date.now(),
};
// Process asynchronously
setTimeout(async () => {
const handlers = this.handlers.get(eventName) ?? [];
for (const handler of handlers) {
try {
const result = await handler(message);
subscription._resolve(result as TReturn);
} catch (error) {
subscription._reject(error as Error);
}
}
}, options?.delay ?? 0);
return subscription;
}
subscribe<TPayload = unknown, TReturn = void>(
eventName: string,
handler: EventHandlerFn<TPayload, TReturn>,
): void {
const existing = this.handlers.get(eventName) ?? [];
existing.push(handler as EventHandlerFn);
this.handlers.set(eventName, existing);
}
async start(): Promise<void> {
// Nothing to initialize for in-memory
}
async stop(): Promise<void> {
this.handlers.clear();
}
}
Using a Custom Provider
const app = Ori.create()
.eventProvider(new SimpleEventProvider()) // Swap in your custom provider
.event(UserCreated).consumer(UserCreatedConsumer, [EmailService])
.listen(8001);
No other code changes. Event definitions, consumers, and emission code work identically regardless of which provider is used. That’s the power of the provider pattern.
Provider Design Considerations
When writing a production-quality provider, consider:
- Persistence: In-memory providers lose events on restart. Production providers should persist to a durable store (Redis, RabbitMQ, Kafka).
- Retry Logic: Implement retry with backoff for failed handlers.
- Concurrency Control: Limit how many events are processed simultaneously.
- Graceful Shutdown: Wait for in-flight handlers to complete before stopping.
- Context Propagation: Carry
PropagationMetathrough your transport layer so correlation IDs survive the journey from emitter to handler. - Request-Response: The
EventSubscriptionreturned byemit()needs to be resolved when the handler completes. For distributed providers, this requires a completion notification mechanism (BullMQ uses QueueEvents).
Event Registry (Advanced)
For applications that build event systems programmatically (not through Event.define()), the lower-level EventRegistry provides compile-time type safety for event names:
import { EventRegistry, createEventSystem } from '@orijs/events';
const Events = EventRegistry.create()
.event('user.created')
.event('order.placed')
.event('payment.processed')
.build();
// Type-safe: only registered event names are allowed
const events = createEventSystem(Events);
events.emit('user.created', payload); // OK
events.emit('user.deleted', payload); // TypeScript error!
The registry supports modular composition with .use():
function addUserEvents<T extends string>(reg: EventRegistryBuilder<T>) {
return reg
.event('user.created')
.event('user.updated')
.event('user.deleted');
}
function addOrderEvents<T extends string>(reg: EventRegistryBuilder<T>) {
return reg
.event('order.placed')
.event('order.shipped')
.event('order.cancelled');
}
const Events = EventRegistry.create()
.use(addUserEvents)
.use(addOrderEvents)
.build();
This is useful for large applications where event definitions are spread across multiple modules.
Summary
OriJS events solve the fundamental problem of decoupling request handling from background processing:
- Event.define() creates type-safe event definitions with TypeBox schemas and the type carrier pattern
- IEventConsumer provides a clean interface for handling events with lifecycle hooks
- BullMQ provider gives you production-ready persistent queues with per-event-type isolation, retries, and scheduled events
- Distributed tracing automatically propagates correlation IDs through event chains
- EventIdempotency prevents duplicate processing in retry scenarios
- EventProvider interface lets you swap BullMQ for any other queue technology
The key insight is that BullMQ is a provider, not the event system itself. The event definitions, consumers, and emission code are provider-agnostic. Today you use BullMQ with Redis. Tomorrow you might use Kafka for high-throughput streams. You swap the provider and everything else stays the same.