Project Overview
An intelligent, DAG-based workflow engine for building flexible, fault-tolerant, AI-driven automation systems. agent-workflow simplifies complex task orchestration by combining static DAG scheduling with dynamic strategies, real-time streaming, and seamless AI SDK compatibility.
Why It Exists
- Manage interdependent tasks as a directed acyclic graph (DAG)
- Inject or adapt tasks at runtime based on custom conditions
- Provide real-time feedback and integrate with front-end streaming UIs
- Orchestrate AI agents and SDKs within workflows
- Ensure resilience through retries, fallbacks, and error-recovery strategies
When to Use
- ETL pipelines requiring conditional branching and dynamic sub-flows
- Real-time data processing with streaming results
- AI-driven automation (e.g., multi-step agent workflows, planning)
- Complex deployment or migration tasks needing on-the-fly adjustments
- Any scenario demanding high fault tolerance and observable execution
Core Capabilities
- DAG Task Scheduling: Define dependencies and parallelism with addTask()/addEdge().
- Dynamic Strategies: Use addDynamicStrategy(), whenCondition(), onTaskComplete() to inject tasks.
- Streaming Execution: Subscribe to intermediate results for live dashboards or UIs.
- AI SDK Compatibility: Plug in OpenAI, custom AI SDKs, or Agent-style APIs without boilerplate.
- Fault Tolerance: Built-in retry policies, error handlers, and once-only strategies for recovery.
- Builder & Agent API: Fluent WorkflowBuilder and simple agent interfaces for rapid integration.
Supported Scenarios
- Data ingestion pipelines with dynamic validation and correction steps
- Long-running ML training workflows with checkpointing and adaptive tuning
- Serverless or microservice orchestration with conditional fallbacks
- Interactive chatbots or AI agents requiring multi-step planning
- Monitoring and alerting flows with on-the-fly escalation tasks
Differentiators
- Intelligent Planning: Integrate AI planners that generate or optimize tasks at runtime.
- Dynamic, Priority-Driven Strategies: Fine-grained control over when and how new tasks enter a workflow.
- Streaming-First Design: Native support for incremental result delivery and progress tracking.
- SDK-Agnostic AI Integration: Swap AI providers or leverage multiple SDKs within one workflow.
- Lightweight Agent API: Build command-and-control logic with minimal glue code.
Quick Start Example
import { WorkflowBuilder } from 'agent-workflow';
import { HttpRequestTask, JsonParseTask } from 'agent-workflow/tasks';
const workflow = WorkflowBuilder
.create()
.addTask(new HttpRequestTask('https://api.example.com/data'))
.addTask(new JsonParseTask(), ['HttpRequestTask'])
.build();
// Execute with streaming callback
await workflow.execute(
{ timeout: 10000 },
{
onData: (taskId, output) => {
console.log(`Task ${taskId} produced:`, output);
},
onError: (err) => {
console.error('Workflow error:', err);
}
}
);
## Getting Started
Get up and running in three steps: install, run a minimal workflow, and explore built-in examples.
### 1. Install
Use your package manager to add the library:
```bash
npm install agent-workflow
# or
yarn add agent-workflow
2. Minimal Workflow Example
Create a file minimal.ts
:
import { WorkflowBuilder, DAGTask } from 'agent-workflow';
// Task 1: produce a timestamp
class TimeTask extends DAGTask {
name = 'time';
async execute() {
return { now: Date.now() };
}
}
// Task 2: format the timestamp
class GreetTask extends DAGTask {
name = 'greet';
constructor(deps = [new TimeTask()]) {
super(deps);
}
async execute({ now }: { now: number }) {
const date = new Date(now).toISOString();
return { message: `Hello! Current time is ${date}` };
}
}
async function main() {
// Build and run the workflow
const result = await WorkflowBuilder
.create()
.addTasks([new TimeTask(), new GreetTask()])
.build()
.execute({});
if (!result.success) {
console.error('Workflow failed:', result.error);
process.exit(1);
}
console.log(result.data.message);
console.log(`Total time: ${result.executionTime}ms`);
}
main();
Run it with ts-node
(or compile and run with tsc
+ node
):
npx ts-node-esm minimal.ts
You should see:
Hello! Current time is 2025-06-26T15:04:05.000Z
Total time: 12ms
3. Run the Built-In Examples
The repository includes two ready-to-go workflows in examples/
. Use the npm scripts defined in package.json
:
# Basic data‐processing pipeline
npm run example:basic
# Improved, type‐safe DAG workflow
npm run example:improved
Each script compiles and executes its TypeScript file, then logs task‐by‐task outputs, metrics, and final results:
- Basic example: fetch → validate → analyze → report
- Improved example: demonstrates custom
DAGTask
subclasses, automatic dependency ordering, and structured data flow
CLI Summary
npm run build
— produce ES & UMD bundles plus.d.ts
filesnpm test
— run unit tests with Vitestnpm run example:basic
— executeexamples/basic-workflow.ts
npm run example:improved
— executeexamples/improved-workflow-example.ts
Now that you’ve run a workflow end-to-end, explore the examples/
directory for deeper patterns and extend DAGTask
to suit your automation needs.
Core Concepts
This section covers the foundational building blocks of the agent-workflow library: how you define validated tasks, register and look them up, share data across steps, and assemble workflows for execution.
Task Interface and Schema Validation
Every workflow unit implements the Task
interface, declaring optional Zod schemas for inputs and outputs and an execute
method that returns validated results.
Key members:
name?: string
– human-readable identifierinputSchema?: ZodSchema
– runtime validator for incoming dataoutputSchema?: ZodSchema
– runtime validator for resultsexecute(input: TaskInput): Promise<TaskOutput>
– core logic
Example: defining a SendEmailTask
import { z } from "zod";
import { Task, TaskInput, TaskOutput } from "./workflow/Task";
export class SendEmailTask implements Task {
name = "sendEmail";
// Validate that `to`, `subject` are present, `body` optional
inputSchema = z.object({
to: z.string().email(),
subject: z.string().min(1),
body: z.string().optional(),
});
// Ensure result exposes a messageId
outputSchema = z.object({
messageId: z.string(),
});
async execute(input: TaskInput): Promise<TaskOutput> {
// Parse and validate inputs
const { to, subject, body } = this.inputSchema.parse(input);
// ...call external email service...
const messageId = await emailService.send({ to, subject, body });
// Validate outputs before returning
return this.outputSchema.parse({ messageId });
}
}
Practical tips:
- Always call
.parse()
on inputs/outputs to catch schema violations early. - Catch
ZodError
in orchestration to translate into user-friendly errors or retry logic. - Assign a stable
name
to each task for registry lookup and metrics.
Task Registry: Registering and Retrieving Tasks
TaskRegistry
is a singleton that stores all available tasks by name or capability. Use it to decouple registration from workflow construction.
API highlights:
register(task: Task): void
get(name: string): Task | undefined
getAll(): Task[]
clear(): void
Example: registering and fetching tasks
import { TaskRegistry } from "./workflow/TaskRegistry";
import { SendEmailTask } from "./tasks/SendEmailTask";
import { GenerateReportTask } from "./tasks/GenerateReportTask";
// Register tasks at startup
const registry = TaskRegistry.instance;
registry.register(new SendEmailTask());
registry.register(new GenerateReportTask());
// Later, lookup by name
const emailTask = registry.get("sendEmail");
if (!emailTask) throw new Error("sendEmail task not found");
// Instantiate workflow dynamically
const workflow = WorkflowBuilder
.create()
.addTask(emailTask)
.build();
Best practices:
- Register all custom tasks before any workflow build or execution.
- Use
clear()
in tests to reset the registry. - Optionally extend
Task
with acapabilities: string[]
field and query via a customgetByCapability()
helper.
ContextManager: Sharing Data Across Workflow Tasks
ContextManager
provides an in-memory key/value store for passing data between steps without tight coupling.
Core methods:
set(key: string, value: any): void
get(key: string): any
getAll(): Record<string, any>
clear(): void
Example: populating and consuming context
import { ContextManager } from "./workflow/ContextManager";
import type { TaskInput } from "./workflow/Task";
const ctx = new ContextManager();
// Seed context before workflow
ctx.set("userId", 42);
ctx.set("featureFlags", { beta: true });
// Pass full context into tasks
await fetchUserTask.execute(ctx.getAll());
await processDataTask.execute(ctx.getAll());
// Within a Task implementation
export class LogUserTask implements Task {
async execute(input: TaskInput, context: ContextManager) {
const userId = context.get("userId") as number;
console.log(`Processing user ${userId}`);
context.set("lastProcessed", userId);
}
}
Guidance:
- Instantiate one
ContextManager
per workflow run. - Use clear, namespaced keys to avoid collisions (e.g.
"auth.token"
). - Call
clear()
when reusing aContextManager
across multiple runs.
WorkflowBuilder: Constructing and Executing Workflows
WorkflowBuilder
orchestrates task sequences (DAGs), handles dependencies, context, and supports both batch and streaming execution.
Basic flow:
create()
– start a new builderaddTask(task: Task)
– append a stepwithConfig({...})
– optional tuning (e.g.,maxDynamicSteps
)build()
– return aWorkflow
instanceexecute(input: TaskInput)
orexecuteStream(input: TaskInput)
Example: simple two-step workflow
import { WorkflowBuilder } from "./workflow/WorkflowBuilder";
import { FetchUserTask } from "./tasks/FetchUserTask";
import { SendEmailTask } from "./tasks/SendEmailTask";
const workflow = WorkflowBuilder
.create()
.addTask(new FetchUserTask()) // loads user data into context
.addTask(new SendEmailTask()) // uses context to send email
.withConfig({ maxDynamicSteps: 5 })
.build();
const result = await workflow.execute({ userId: 42 });
console.log("Workflow completed:", result);
Example: streaming execution for real-time updates
const stream = workflow.executeStream({ userId: 42 });
for await (const update of stream) {
console.log("Progress:", update);
}
Key points:
- The builder injects a fresh
ContextManager
into each run. - Downstream tasks see all previous context via
context.getAll()
. - Streaming mode yields partial results as each task completes.
- Advanced patterns (conditional tasks, dynamic strategies) layer on this core builder—see Dynamic Task Generation Strategies.
By mastering these core concepts—validated tasks, centralized registry, shared context, and the workflow builder—you can author, manage, and scale complex asynchronous workflows with confidence.
Usage Guides
This section walks through real-world workflows combining dynamic strategies, AI streaming, LLM-driven task generation, and robust error handling. Each guide references its example file under examples/
.
Dynamic Strategies in WorkflowBuilder (examples/dynamic-strategies.ts)
Use dynamic strategies to inject tasks at runtime based on context, task completion, or custom conditions.
- Install and import
npm install agent-workflow
import { WorkflowBuilder, DAGTask } from "agent-workflow";
import {
ProjectScanTask,
TypeScriptCheckTask,
SecurityAuditTask,
PerformanceOptimizationTask,
RefactorSuggestionTask
} from "./examples/tasks";
- Build workflow with strategies
const workflow = WorkflowBuilder.create()
.addTask(new ProjectScanTask())
// Conditional: add TS type check when scanResult.fileTypes includes "typescript"
.whenCondition(
ctx => (ctx.get("scanResult") as any).fileTypes.includes("typescript"),
async () => [ new TypeScriptCheckTask() ]
)
// On context change: inject security or perf tasks
.onContextChange("scanResult", async scanData => {
const tasks: DAGTask[] = [];
if ((scanData as any).framework === "react") {
tasks.push(new SecurityAuditTask());
}
if ((scanData as any).dependencies > 40) {
tasks.push(new PerformanceOptimizationTask());
}
return tasks;
})
// Custom dynamic strategy: add refactor suggestions for high complexity
.addDynamicStrategy({
name: "complexity_analysis",
condition: ctx => (ctx.get("codeQuality") as any)?.complexity > 0.7,
generator: async () => [ new RefactorSuggestionTask() ],
priority: 5,
once: true
})
.withConfig({ maxDynamicSteps: 10 })
.build();
const result = await workflow.execute({ projectPath: "./my-app" });
console.log("Tasks run:", result.taskResults.map(r => r.taskName));
AI SDK–Compatible Streaming Workflow (examples/ai-sdk-streaming-workflow.ts)
Create an Express route that streams AI task outputs via SSE, full data streams, or plain text, matching popular AI SDK interfaces.
- Define an AI-streaming task
import { DAGTask, AISDKStreamingTask, TaskInput } from "agent-workflow";
import { MockStreamTextResult } from "./examples/mocks";
class AICodeAnalysisTask extends DAGTask implements AISDKStreamingTask {
name = "aiCodeAnalysis";
isAISDKStreaming = true;
async execute(input: TaskInput) {
throw new Error("Use streaming API");
}
async executeStreamAI(input: TaskInput) {
const prompt = `Analyze project at ${input.projectPath}`;
const mock = new MockStreamTextResult(prompt);
return {
textStream: mock.textStream(),
fullStream: mock.fullStream(),
toDataStreamResponse: () => mock.toDataStreamResponse(),
toReadableStream: () => mock.toReadableStream()
};
}
}
- Build an AI SDK–streaming workflow
import express from "express";
import { WorkflowBuilder } from "agent-workflow";
import { AICodeAnalysisTask, AIDocumentationTask } from "./examples/tasks";
const app = express();
app.use(express.json());
app.post("/api/ai/analyze", async (req, res) => {
const workflow = WorkflowBuilder.create()
.addTask(new AICodeAnalysisTask())
.addTask(new AIDocumentationTask())
.buildAISDKStreaming();
const streamResult = workflow.executeStreamAISDK(req.body);
// Stream SSE response
const response = streamResult.toDataStreamResponse();
response.headers.forEach((value, key) => res.setHeader(key, value));
response.body.pipe(res);
});
app.listen(3000);
- Consume on the frontend
// Client-side SSE listener
const evtSource = new EventSource("/api/ai/analyze", { method: "POST", body: JSON.stringify({ projectPath: "./src" }) });
evtSource.onmessage = e => {
const data = JSON.parse(e.data);
console.log("Chunk:", data);
};
Dynamic Task Generation in LLM-Driven Workflows (examples/llm-integration.ts)
Spawn new tasks based on AI outputs using onTaskComplete
and addDynamicStrategy
.
import { WorkflowBuilder, DAGTask } from "agent-workflow";
import {
AICodeAnalysisTask,
AITestGeneratorTask,
AIPerformanceOptimizerTask,
AICodeReviewTask,
AIStreamingDocumentationTask
} from "./examples/tasks";
(async () => {
const workflow = WorkflowBuilder.create()
.addTask(new AICodeAnalysisTask())
// Branch on analysis issues
.onTaskComplete("aiCodeAnalysis", async (result, ctx) => {
const issues = (result.aiAnalysis as any).issues as string[];
const tasks: DAGTask[] = [];
if (issues.includes("test-coverage")) {
tasks.push(new AITestGeneratorTask());
}
if (issues.includes("performance")) {
tasks.push(new AIPerformanceOptimizerTask());
}
tasks.push(new AICodeReviewTask());
return tasks;
})
// Inject streaming docs if AI recommends
.addDynamicStrategy({
name: "ai_documentation",
condition: ctx => {
const recs = (ctx.get("aiAnalysis") as any).recommendations as string[];
return recs.some(r => r.includes("documentation"));
},
generator: async () => [ new AIStreamingDocumentationTask() ],
priority: 2
})
.withConfig({ maxDynamicSteps: 15 })
.build();
const result = await workflow.execute({ projectPath: "./src" });
console.log("Dynamic tasks generated:", result.dynamicTasksGenerated);
})();
Error Handling and Recovery Example (examples/error-handling.ts)
Compose a resilient workflow with retries, timeouts, fallbacks, and error logging.
import { WorkflowBuilder, DAGTask } from "agent-workflow";
import {
NetworkRequestTask,
FallbackTask,
ErrorRecoveryTask,
ErrorLoggingTask
} from "./examples/tasks";
(async () => {
const workflow = WorkflowBuilder.create()
.addTask(new NetworkRequestTask({ failureRate: 0.8 }))
// Fallback on specific network error once
.addDynamicStrategy({
name: "network_fallback",
condition: ctx => {
const err = ctx.get("lastError") as string;
return err?.includes("Network request failed");
},
generator: async () => [ new FallbackTask() ],
priority: 10,
once: true
})
// Global error recovery and logging
.addDynamicStrategy({
name: "error_recovery",
condition: ctx => ctx.get("hasError") === true,
generator: async () => [ new ErrorRecoveryTask(), new ErrorLoggingTask() ],
priority: 5
})
.withConfig({ retryAttempts: 2, timeoutMs: 5000 })
.build();
const result = await workflow.execute();
console.log(`Success: ${result.success}`);
if (!result.success) console.error(`Error: ${result.error?.message}`);
})();
Practical tips:
- Define idempotent tasks for safe retries.
- Set
timeoutMs
to bound long operations. - Use
once: true
to avoid repeating fallback injections. - Inspect
result.taskResults
,result.dynamicTasksGenerated
, and per-task statuses for monitoring. Which part ofWorkflowBuilder.ts
would you like documented?
• Core fluent API (withConfig, addTask, build)
• Dynamic strategy helpers (whenCondition, onTaskComplete, onContextChange)
• Streaming builder methods (buildStreaming, buildAISDKStreaming)
• Internal execution levels in StaticWorkflow
Let me know your choice and I’ll draft a focused API reference section.
Development & Contribution
Quickstart for building, testing, linting, and contributing to the agent-workflow library.
Prerequisites
• Node.js ≥16
• npm, Yarn or pnpm
Clone and install:
git clone https://github.com/your-org/agent-workflow.git
cd agent-workflow
npm install
Project Scripts
Inspect package.json
for up-to-date commands:
{
"scripts": {
"build": "vite build",
"test": "jest --passWithNoTests",
"test:watch": "jest --watch",
"lint": "biome check",
"format": "biome fmt"
}
}
Building the Library
Run the Vite build to emit ESM, UMD bundles and .d.ts
files:
npm run build
# outputs:
# - dist/agent-workflow.es.js
# - dist/agent-workflow.umd.cjs
# - dist/agent-workflow.d.ts
• Configuration: vite.config.ts
– lib.entry
: src/index.ts
– formats
: ['es','umd']
– external
: peer/runtime deps (e.g. zod
, ai
, Node built-ins)
– dts
plugin generates and injects types entry
Linting & Formatting
Use Biome for import-sorting, lint rules, and formatting. Configuration lives in biome.json
.
# check style errors
npm run lint
# auto-format files
npm run format
Ignored paths: dist/
, node_modules/
, .git/
, biome.json
.
Running Tests
Jest runs TypeScript tests as ESM via ts-jest
and jest.config.cjs
.
# single run
npm test
# watch mode
npm run test:watch
Key config in jest.config.cjs
:
transform
usests-jest
withuseESM: true
extensionsToTreatAsEsm: ['.ts']
tsconfig
points attsconfig.test.json
Example test skeleton (.ts
imports require .js
extension):
// src/workflow/__tests__/MyFeature.test.ts
import { WorkflowBuilder } from '../WorkflowBuilder.js'
describe('MyFeature', () => {
it('executes a static task', async () => {
const wf = new WorkflowBuilder('demo')
wf.addTask('step', async () => 'ok')
const result = await wf.execute()
expect(result.step).toBe('ok')
})
})
TypeScript Configuration
• tsconfig.json
– production build
{
"compilerOptions": {
"target": "ES2020",
"module": "ESNext",
"declaration": true,
"outDir": "dist",
"strict": true,
"esModuleInterop": true
},
"include": ["src", "examples"],
"exclude": ["src/**/*.test.ts", "src/**/__tests__"]
}
• tsconfig.test.json
– tests
{
"extends": "./tsconfig.json",
"compilerOptions": {
"module": "ESNext",
"allowJs": true
},
"include": ["**/*.ts", "**/*.js"],
"exclude": ["node_modules"]
}
.npmignore
Ensure only runtime artifacts publish to npm. Key excludes:
src/
examples/
**/*.test.ts
**/__tests__/
tsconfig*.json
vite.config.ts
jest.config.cjs
biome.json
Update this file when adding new dev-only files or folders.
Contributing Workflow
- Fork the repo and create a feature branch
feature/your-feature
. - Install dependencies and implement your changes.
- Add or update tests under
src/**/__tests__
. - Run and confirm all checks pass:
npm run lint npm test npm run build
- Commit with descriptive message and open a PR.
All contributions should adhere to existing code style and include relevant tests. Continuous Integration runs Biome, Jest, and the Vite build on every pull request.