@orijs/workflows - Technical Reference
Package: packages/workflows/src/
Overview
@orijs/workflows provides multi-step workflow orchestration with sequential and parallel step execution, result accumulation, rollback support, and timeout handling. Like the event system, it uses Interface Segregation to separate consumer, framework, and implementation concerns.
1. WorkflowProvider Interface
Source: workflow.types.ts
WorkflowExecutor (consumer interface)
What business services inject. Narrow interface prevents accidental lifecycle calls.
interface WorkflowExecutor {
execute<TData, TResult>(
workflow: WorkflowDefinitionLike<TData, TResult>,
data: TData
): Promise<FlowHandle<TResult>>;
getStatus(flowId: string): Promise<FlowStatus>;
}
WorkflowLifecycle (framework interface)
What the OriJS application manages during startup/shutdown. The TOptions generic allows provider-specific configuration (e.g., BullMQ concurrency, retry config).
interface WorkflowLifecycle<TOptions = unknown> {
registerDefinitionConsumer?(
workflowName: string,
handler: (data: unknown, meta?: unknown, stepResults?: Record<string, unknown>) => Promise<unknown>,
stepGroups?: readonly StepGroup[],
stepHandlers?: Record<string, { execute: StepHandler; rollback?: RollbackHandler }>,
onError?: (data: unknown, meta?: unknown, error?: Error, stepResults?: Record<string, unknown>) => Promise<void>,
options?: TOptions
): void;
registerEmitterWorkflow?(workflowName: string): void;
start(): Promise<void>;
stop(): Promise<void>;
}
registerDefinitionConsumer() has two modes:
- With stepGroups: Provider registers step handlers, creates child jobs/tasks for each step, executes in order, then calls the handler (onComplete) after all steps complete
- Without stepGroups: Handler is called directly (simple workflow without steps)
registerEmitterWorkflow() tracks workflows that this instance can emit to without having a local consumer (for distributed deployments where consumer runs on a different instance).
WorkflowProvider (full implementation)
interface WorkflowProvider<TOptions = unknown> extends WorkflowExecutor, WorkflowLifecycle<TOptions> {}
FlowHandle
interface FlowHandle<TResult = unknown> {
readonly id: string;
status(): Promise<FlowStatus>;
result(): Promise<TResult>;
}
Returned from execute(). The result() method returns a Promise that resolves when the workflow completes or rejects on failure/timeout.
FlowStatus
type FlowStatus =
| 'pending' // Created but not started
| 'running' // Currently executing steps
| 'completed' // All steps complete (or handled)
| 'failed'; // Workflow failed (after onError handling)
Note: individual step failures do not automatically set the workflow to 'failed'. The parent’s onError handler decides whether to continue or fail.
2. InProcessWorkflowProvider
Source: in-process-workflow-provider.ts
Local synchronous workflow execution for development and testing.
Configuration
interface WorkflowProviderConfig {
logger?: Logger;
defaultTimeout?: number; // Default: 30000 (30 seconds)
parallelConcurrency?: number; // Default: 10
}
The constructor accepts either a WorkflowProviderConfig or a Logger directly (backward compatibility). When a Logger is passed, defaults are used for timeout and concurrency.
execute()
async execute<TData, TResult>(
workflow: WorkflowDefinitionLike<TData, TResult>,
data: TData,
timeout?: number
): Promise<FlowHandle<TResult>>
- Validates provider is started (throws if not)
- Validates workflow is registered in
definitionConsumersmap (throws if not) - Generates a flow ID:
flow-${Date.now()}-${random} - Creates
FlowStatewithstatus: 'pending' - Creates a result Promise via
new Promise()with storedresolve/reject - Sets up timeout:
setTimeoutthat rejects withWorkflowTimeoutErrorif workflow is still pending/running - Chains
.catch().finally()on result promise to clear timeout - Starts
executeDefinitionWorkflowInternal()asynchronously (does not await) - Returns
FlowHandleimmediately
Sequential Group Execution
executeDefinitionSequentialGroup() iterates step definitions in order:
- Looks up handler from
stepHandlersmap by step name - If not found: returns
WorkflowStepErrorimmediately - Creates
WorkflowContextviacreateWorkflowContext() - Calls
executeDefinitionStepSafely()which usesPromise.resolve(handler(ctx)).catch()to convert rejections to resolved marker values (avoiding double-await issues) - On success: adds result to
state.resultsand rollback tostate.completedStepsWithRollback - On failure: runs rollbacks, returns error
Parallel Group Execution
executeDefinitionParallelGroup() executes steps concurrently with a configurable concurrency limit (default: 10).
The executeWithConcurrencyLimit() async pool implementation:
For each item:
1. Create executor promise
2. Wrap in self-removing promise, add to `executing` Set
3. If executing.size >= limit: await Promise.race(executing)
After loop: await Promise.all(executing)
Results are collected with index-based ordering to preserve deterministic result positions.
After parallel execution:
- Separates successful and failed steps
- Adds successful results to
state.results - Tracks rollbacks for successful steps
- If any failure: runs all rollbacks and returns the first error
Rollback Pattern
runRollbacks() executes rollback handlers in LIFO order (last completed first):
- Reverses
completedStepsWithRollbackarray - For each step with a rollback handler:
- Creates a new
WorkflowContextwith step name suffixed:rollback - Executes the rollback handler
- Logs duration
- Creates a new
- Rollback errors are logged but do not stop other rollbacks from running
Timeout Handling
- Effective timeout:
timeoutparameter overridesdefaultTimeout, 0 disables WorkflowTimeoutErroris thrown (rejects the result promise)- Currently executing step continues (no cancellation)
- Rollbacks are NOT triggered on timeout
- Timeout is cleared in
.finally()when result promise settles
class WorkflowTimeoutError extends Error {
public readonly flowId: string;
public readonly timeoutMs: number;
}
Flow State Cleanup
Completed/failed flow states are cleaned up after FLOW_CLEANUP_DELAY_MS (5 minutes) via setTimeout. This prevents memory leaks while allowing time for status queries after completion. Cleanup timeouts are tracked in a separate Map and cleared on stop().
3. WorkflowContext
Source: workflow-context.ts
Context passed to workflow step handlers.
Interface
interface WorkflowContext<TData = unknown, TSteps extends Record<string, unknown> = Record<string, unknown>> {
readonly flowId: string;
readonly data: TData;
readonly results: TSteps;
readonly log: Logger;
readonly meta: PropagationMeta;
readonly correlationId: string;
readonly providerId?: string;
}
Result Accumulation
The results property accumulates results from all completed steps as { stepName: result, ... }. When the workflow definition declares typed steps via StepBuilder, the TSteps generic carries those types through to consumers, providing type-safe access without manual assertions:
const validation = ctx.results['validate'];
// Already typed based on the step's output schema
Results are accumulated mutably in StepExecutionState.results during execution but the context itself is frozen.
DefaultWorkflowContext Class
class DefaultWorkflowContext<TData = unknown, TSteps extends Record<string, unknown> = Record<string, unknown>> implements WorkflowContext<TData, TSteps> {
constructor(
public readonly flowId: string,
public readonly data: TData,
public readonly results: Record<string, unknown>,
public readonly log: Logger,
public readonly meta: PropagationMeta,
public readonly providerId?: string
)
}
correlationId is extracted from meta.correlationId, falling back to flowId if not present.
createWorkflowContext()
function createWorkflowContext<TData>(
flowId: string,
data: TData,
results: Record<string, unknown>,
log: Logger,
meta: PropagationMeta,
options?: WorkflowContextOptions
): WorkflowContext<TData>
Behavior:
- Fail-fast validation: Throws on invalid
flowId(non-empty string), missinglog, invalidresults(must be plain object), invalidmeta(must be plain object) - Logger enrichment: Adds
flowId,workflow,step, andproviderIdto the logger vialog.with() - Returns
Object.freeze(new DefaultWorkflowContext(...))— immutable context
WorkflowContextOptions
interface WorkflowContextOptions {
workflowName?: string; // For logging context
stepName?: string; // For logging context
providerId?: string; // For distributed tracing
}
4. Workflow Types
Source: workflow.types.ts
WorkflowDefinitionLike
Structural type compatible with WorkflowDefinition from @orijs/core without requiring the import:
interface WorkflowDefinitionLike<TData = unknown, TResult = unknown> {
readonly name: string;
readonly stepGroups: readonly StepGroup[];
readonly _data: TData; // Phantom type for TData inference
readonly _result: TResult; // Phantom type for TResult inference
}
StepGroup
interface StepGroup {
readonly type: 'sequential' | 'parallel';
readonly definitions: readonly StepDefinitionBase[];
}
StepDefinitionBase
interface StepDefinitionBase {
readonly name: string; // Unique within workflow
}
StepHandler and RollbackHandler
type StepHandler<TData = unknown, TResult = unknown> = (
ctx: WorkflowContext<TData, Record<string, unknown>>
) => Promise<TResult> | TResult;
type RollbackHandler<TData = unknown> = (
ctx: WorkflowContext<TData, Record<string, unknown>>
) => Promise<void> | void;
Rollback handlers must be idempotent. In distributed systems with retries, a rollback may be called multiple times for the same workflow execution.
StepOptions
interface StepOptions<TData = unknown, TResult = unknown> {
execute: StepHandler<TData, TResult>;
rollback?: RollbackHandler<TData>;
}
WorkflowStepError
class WorkflowStepError extends Error {
public readonly stepName: string;
public override readonly cause: Error;
constructor(stepName: string, originalError: Error)
}
The stack property is augmented with \n\nCaused by: ${originalError.stack} for debugging.
StepExecutionContext (internal)
interface StepExecutionContext<TData = unknown> {
readonly flowId: string;
readonly workflowName: string;
readonly data: TData;
readonly results: Record<string, unknown>;
readonly meta: PropagationMeta;
readonly log: Logger;
}
Consolidates parameters needed by step execution methods, reducing parameter count.
StepExecutionState (internal)
interface StepExecutionState {
results: Record<string, unknown>;
completedStepsWithRollback: Array<{ name: string; rollback: RollbackHandler }>;
}
Separated from StepExecutionContext because this state mutates as steps complete.
5. Distributed Design Constraints
These constraints apply to all workflow provider implementations and must be honored by user code:
-
Workflow data (
TData) must be JSON-serializable: No functions, no circular references, no class instances with methods. The data is serialized into job payloads for transport. -
Step handlers must be stateless: No
thisreferences to instance state. In distributed deployments, different instances may execute different steps of the same workflow. -
Step results must be JSON-serializable: Step results are stored in BullMQ job return values and retrieved via
job.getChildrenValues(). Functions, symbols, and circular references will be lost. -
PropagationMeta is serialized into job data: The
metaobject containing correlation IDs, trace IDs, etc. is included in the job payload for context propagation across process boundaries. -
Workflows are registered by name: The
WorkflowDefinitionLike.nameis used as the key for distributed lookup. Worker instances use this name to find the correct handler when processing jobs from the queue.