Chat about this codebase

AI-powered code exploration

Online

Project Overview

An intelligent, DAG-based workflow engine for building flexible, fault-tolerant, AI-driven automation systems. agent-workflow simplifies complex task orchestration by combining static DAG scheduling with dynamic strategies, real-time streaming, and seamless AI SDK compatibility.

Why It Exists

  • Manage interdependent tasks as a directed acyclic graph (DAG)
  • Inject or adapt tasks at runtime based on custom conditions
  • Provide real-time feedback and integrate with front-end streaming UIs
  • Orchestrate AI agents and SDKs within workflows
  • Ensure resilience through retries, fallbacks, and error-recovery strategies

When to Use

  • ETL pipelines requiring conditional branching and dynamic sub-flows
  • Real-time data processing with streaming results
  • AI-driven automation (e.g., multi-step agent workflows, planning)
  • Complex deployment or migration tasks needing on-the-fly adjustments
  • Any scenario demanding high fault tolerance and observable execution

Core Capabilities

  • DAG Task Scheduling: Define dependencies and parallelism with addTask()/addEdge().
  • Dynamic Strategies: Use addDynamicStrategy(), whenCondition(), onTaskComplete() to inject tasks.
  • Streaming Execution: Subscribe to intermediate results for live dashboards or UIs.
  • AI SDK Compatibility: Plug in OpenAI, custom AI SDKs, or Agent-style APIs without boilerplate.
  • Fault Tolerance: Built-in retry policies, error handlers, and once-only strategies for recovery.
  • Builder & Agent API: Fluent WorkflowBuilder and simple agent interfaces for rapid integration.

Supported Scenarios

  • Data ingestion pipelines with dynamic validation and correction steps
  • Long-running ML training workflows with checkpointing and adaptive tuning
  • Serverless or microservice orchestration with conditional fallbacks
  • Interactive chatbots or AI agents requiring multi-step planning
  • Monitoring and alerting flows with on-the-fly escalation tasks

Differentiators

  • Intelligent Planning: Integrate AI planners that generate or optimize tasks at runtime.
  • Dynamic, Priority-Driven Strategies: Fine-grained control over when and how new tasks enter a workflow.
  • Streaming-First Design: Native support for incremental result delivery and progress tracking.
  • SDK-Agnostic AI Integration: Swap AI providers or leverage multiple SDKs within one workflow.
  • Lightweight Agent API: Build command-and-control logic with minimal glue code.

Quick Start Example

import { WorkflowBuilder } from 'agent-workflow';
import { HttpRequestTask, JsonParseTask } from 'agent-workflow/tasks';

const workflow = WorkflowBuilder
  .create()
  .addTask(new HttpRequestTask('https://api.example.com/data'))
  .addTask(new JsonParseTask(), ['HttpRequestTask'])
  .build();

// Execute with streaming callback
await workflow.execute(
  { timeout: 10000 },
  {
    onData: (taskId, output) => {
      console.log(`Task ${taskId} produced:`, output);
    },
    onError: (err) => {
      console.error('Workflow error:', err);
    }
  }
);
## Getting Started

Get up and running in three steps: install, run a minimal workflow, and explore built-in examples.

### 1. Install

Use your package manager to add the library:

```bash
npm install agent-workflow
# or
yarn add agent-workflow

2. Minimal Workflow Example

Create a file minimal.ts:

import { WorkflowBuilder, DAGTask } from 'agent-workflow';

// Task 1: produce a timestamp
class TimeTask extends DAGTask {
  name = 'time';
  async execute() {
    return { now: Date.now() };
  }
}

// Task 2: format the timestamp
class GreetTask extends DAGTask {
  name = 'greet';
  constructor(deps = [new TimeTask()]) {
    super(deps);
  }
  async execute({ now }: { now: number }) {
    const date = new Date(now).toISOString();
    return { message: `Hello! Current time is ${date}` };
  }
}

async function main() {
  // Build and run the workflow
  const result = await WorkflowBuilder
    .create()
    .addTasks([new TimeTask(), new GreetTask()])
    .build()
    .execute({});

  if (!result.success) {
    console.error('Workflow failed:', result.error);
    process.exit(1);
  }

  console.log(result.data.message);
  console.log(`Total time: ${result.executionTime}ms`);
}

main();

Run it with ts-node (or compile and run with tsc + node):

npx ts-node-esm minimal.ts

You should see:

Hello! Current time is 2025-06-26T15:04:05.000Z
Total time: 12ms

3. Run the Built-In Examples

The repository includes two ready-to-go workflows in examples/. Use the npm scripts defined in package.json:

# Basic data‐processing pipeline
npm run example:basic

# Improved, type‐safe DAG workflow
npm run example:improved

Each script compiles and executes its TypeScript file, then logs task‐by‐task outputs, metrics, and final results:

  • Basic example: fetch → validate → analyze → report
  • Improved example: demonstrates custom DAGTask subclasses, automatic dependency ordering, and structured data flow

CLI Summary

  • npm run build — produce ES & UMD bundles plus .d.ts files
  • npm test — run unit tests with Vitest
  • npm run example:basic — execute examples/basic-workflow.ts
  • npm run example:improved — execute examples/improved-workflow-example.ts

Now that you’ve run a workflow end-to-end, explore the examples/ directory for deeper patterns and extend DAGTask to suit your automation needs.

Core Concepts

This section covers the foundational building blocks of the agent-workflow library: how you define validated tasks, register and look them up, share data across steps, and assemble workflows for execution.

Task Interface and Schema Validation

Every workflow unit implements the Task interface, declaring optional Zod schemas for inputs and outputs and an execute method that returns validated results.

Key members:

  • name?: string – human-readable identifier
  • inputSchema?: ZodSchema – runtime validator for incoming data
  • outputSchema?: ZodSchema – runtime validator for results
  • execute(input: TaskInput): Promise<TaskOutput> – core logic

Example: defining a SendEmailTask

import { z } from "zod";
import { Task, TaskInput, TaskOutput } from "./workflow/Task";

export class SendEmailTask implements Task {
  name = "sendEmail";

  // Validate that `to`, `subject` are present, `body` optional
  inputSchema = z.object({
    to: z.string().email(),
    subject: z.string().min(1),
    body: z.string().optional(),
  });

  // Ensure result exposes a messageId
  outputSchema = z.object({
    messageId: z.string(),
  });

  async execute(input: TaskInput): Promise<TaskOutput> {
    // Parse and validate inputs
    const { to, subject, body } = this.inputSchema.parse(input);

    // ...call external email service...
    const messageId = await emailService.send({ to, subject, body });

    // Validate outputs before returning
    return this.outputSchema.parse({ messageId });
  }
}

Practical tips:

  1. Always call .parse() on inputs/outputs to catch schema violations early.
  2. Catch ZodError in orchestration to translate into user-friendly errors or retry logic.
  3. Assign a stable name to each task for registry lookup and metrics.

Task Registry: Registering and Retrieving Tasks

TaskRegistry is a singleton that stores all available tasks by name or capability. Use it to decouple registration from workflow construction.

API highlights:

  • register(task: Task): void
  • get(name: string): Task | undefined
  • getAll(): Task[]
  • clear(): void

Example: registering and fetching tasks

import { TaskRegistry } from "./workflow/TaskRegistry";
import { SendEmailTask } from "./tasks/SendEmailTask";
import { GenerateReportTask } from "./tasks/GenerateReportTask";

// Register tasks at startup
const registry = TaskRegistry.instance;
registry.register(new SendEmailTask());
registry.register(new GenerateReportTask());

// Later, lookup by name
const emailTask = registry.get("sendEmail");
if (!emailTask) throw new Error("sendEmail task not found");

// Instantiate workflow dynamically
const workflow = WorkflowBuilder
  .create()
  .addTask(emailTask)
  .build();

Best practices:

  • Register all custom tasks before any workflow build or execution.
  • Use clear() in tests to reset the registry.
  • Optionally extend Task with a capabilities: string[] field and query via a custom getByCapability() helper.

ContextManager: Sharing Data Across Workflow Tasks

ContextManager provides an in-memory key/value store for passing data between steps without tight coupling.

Core methods:

  • set(key: string, value: any): void
  • get(key: string): any
  • getAll(): Record<string, any>
  • clear(): void

Example: populating and consuming context

import { ContextManager } from "./workflow/ContextManager";
import type { TaskInput } from "./workflow/Task";

const ctx = new ContextManager();

// Seed context before workflow
ctx.set("userId", 42);
ctx.set("featureFlags", { beta: true });

// Pass full context into tasks
await fetchUserTask.execute(ctx.getAll());
await processDataTask.execute(ctx.getAll());

// Within a Task implementation
export class LogUserTask implements Task {
  async execute(input: TaskInput, context: ContextManager) {
    const userId = context.get("userId") as number;
    console.log(`Processing user ${userId}`);
    context.set("lastProcessed", userId);
  }
}

Guidance:

  • Instantiate one ContextManager per workflow run.
  • Use clear, namespaced keys to avoid collisions (e.g. "auth.token").
  • Call clear() when reusing a ContextManager across multiple runs.

WorkflowBuilder: Constructing and Executing Workflows

WorkflowBuilder orchestrates task sequences (DAGs), handles dependencies, context, and supports both batch and streaming execution.

Basic flow:

  1. create() – start a new builder
  2. addTask(task: Task) – append a step
  3. withConfig({...}) – optional tuning (e.g., maxDynamicSteps)
  4. build() – return a Workflow instance
  5. execute(input: TaskInput) or executeStream(input: TaskInput)

Example: simple two-step workflow

import { WorkflowBuilder } from "./workflow/WorkflowBuilder";
import { FetchUserTask } from "./tasks/FetchUserTask";
import { SendEmailTask } from "./tasks/SendEmailTask";

const workflow = WorkflowBuilder
  .create()
  .addTask(new FetchUserTask())    // loads user data into context
  .addTask(new SendEmailTask())    // uses context to send email
  .withConfig({ maxDynamicSteps: 5 })
  .build();

const result = await workflow.execute({ userId: 42 });
console.log("Workflow completed:", result);

Example: streaming execution for real-time updates

const stream = workflow.executeStream({ userId: 42 });
for await (const update of stream) {
  console.log("Progress:", update);
}

Key points:

  • The builder injects a fresh ContextManager into each run.
  • Downstream tasks see all previous context via context.getAll().
  • Streaming mode yields partial results as each task completes.
  • Advanced patterns (conditional tasks, dynamic strategies) layer on this core builder—see Dynamic Task Generation Strategies.

By mastering these core concepts—validated tasks, centralized registry, shared context, and the workflow builder—you can author, manage, and scale complex asynchronous workflows with confidence.

Usage Guides

This section walks through real-world workflows combining dynamic strategies, AI streaming, LLM-driven task generation, and robust error handling. Each guide references its example file under examples/.

Dynamic Strategies in WorkflowBuilder (examples/dynamic-strategies.ts)

Use dynamic strategies to inject tasks at runtime based on context, task completion, or custom conditions.

  1. Install and import
npm install agent-workflow
import { WorkflowBuilder, DAGTask } from "agent-workflow";
import {
  ProjectScanTask,
  TypeScriptCheckTask,
  SecurityAuditTask,
  PerformanceOptimizationTask,
  RefactorSuggestionTask
} from "./examples/tasks";
  1. Build workflow with strategies
const workflow = WorkflowBuilder.create()
  .addTask(new ProjectScanTask())
// Conditional: add TS type check when scanResult.fileTypes includes "typescript"
  .whenCondition(
    ctx => (ctx.get("scanResult") as any).fileTypes.includes("typescript"),
    async () => [ new TypeScriptCheckTask() ]
  )
// On context change: inject security or perf tasks
  .onContextChange("scanResult", async scanData => {
    const tasks: DAGTask[] = [];
    if ((scanData as any).framework === "react") {
      tasks.push(new SecurityAuditTask());
    }
    if ((scanData as any).dependencies > 40) {
      tasks.push(new PerformanceOptimizationTask());
    }
    return tasks;
  })
// Custom dynamic strategy: add refactor suggestions for high complexity
  .addDynamicStrategy({
    name: "complexity_analysis",
    condition: ctx => (ctx.get("codeQuality") as any)?.complexity > 0.7,
    generator: async () => [ new RefactorSuggestionTask() ],
    priority: 5,
    once: true
  })
  .withConfig({ maxDynamicSteps: 10 })
  .build();

const result = await workflow.execute({ projectPath: "./my-app" });
console.log("Tasks run:", result.taskResults.map(r => r.taskName));

AI SDK–Compatible Streaming Workflow (examples/ai-sdk-streaming-workflow.ts)

Create an Express route that streams AI task outputs via SSE, full data streams, or plain text, matching popular AI SDK interfaces.

  1. Define an AI-streaming task
import { DAGTask, AISDKStreamingTask, TaskInput } from "agent-workflow";
import { MockStreamTextResult } from "./examples/mocks";

class AICodeAnalysisTask extends DAGTask implements AISDKStreamingTask {
  name = "aiCodeAnalysis";
  isAISDKStreaming = true;

  async execute(input: TaskInput) {
    throw new Error("Use streaming API");
  }

  async executeStreamAI(input: TaskInput) {
    const prompt = `Analyze project at ${input.projectPath}`;
    const mock = new MockStreamTextResult(prompt);
    return {
      textStream: mock.textStream(),
      fullStream: mock.fullStream(),
      toDataStreamResponse: () => mock.toDataStreamResponse(),
      toReadableStream: () => mock.toReadableStream()
    };
  }
}
  1. Build an AI SDK–streaming workflow
import express from "express";
import { WorkflowBuilder } from "agent-workflow";
import { AICodeAnalysisTask, AIDocumentationTask } from "./examples/tasks";

const app = express();
app.use(express.json());

app.post("/api/ai/analyze", async (req, res) => {
  const workflow = WorkflowBuilder.create()
    .addTask(new AICodeAnalysisTask())
    .addTask(new AIDocumentationTask())
    .buildAISDKStreaming();

  const streamResult = workflow.executeStreamAISDK(req.body);
  // Stream SSE response
  const response = streamResult.toDataStreamResponse();
  response.headers.forEach((value, key) => res.setHeader(key, value));
  response.body.pipe(res);
});

app.listen(3000);
  1. Consume on the frontend
// Client-side SSE listener
const evtSource = new EventSource("/api/ai/analyze", { method: "POST", body: JSON.stringify({ projectPath: "./src" }) });
evtSource.onmessage = e => {
  const data = JSON.parse(e.data);
  console.log("Chunk:", data);
};

Dynamic Task Generation in LLM-Driven Workflows (examples/llm-integration.ts)

Spawn new tasks based on AI outputs using onTaskComplete and addDynamicStrategy.

import { WorkflowBuilder, DAGTask } from "agent-workflow";
import {
  AICodeAnalysisTask,
  AITestGeneratorTask,
  AIPerformanceOptimizerTask,
  AICodeReviewTask,
  AIStreamingDocumentationTask
} from "./examples/tasks";

(async () => {
  const workflow = WorkflowBuilder.create()
    .addTask(new AICodeAnalysisTask())
// Branch on analysis issues
    .onTaskComplete("aiCodeAnalysis", async (result, ctx) => {
      const issues = (result.aiAnalysis as any).issues as string[];
      const tasks: DAGTask[] = [];
      if (issues.includes("test-coverage")) {
        tasks.push(new AITestGeneratorTask());
      }
      if (issues.includes("performance")) {
        tasks.push(new AIPerformanceOptimizerTask());
      }
      tasks.push(new AICodeReviewTask());
      return tasks;
    })
// Inject streaming docs if AI recommends
    .addDynamicStrategy({
      name: "ai_documentation",
      condition: ctx => {
        const recs = (ctx.get("aiAnalysis") as any).recommendations as string[];
        return recs.some(r => r.includes("documentation"));
      },
      generator: async () => [ new AIStreamingDocumentationTask() ],
      priority: 2
    })
    .withConfig({ maxDynamicSteps: 15 })
    .build();

  const result = await workflow.execute({ projectPath: "./src" });
  console.log("Dynamic tasks generated:", result.dynamicTasksGenerated);
})();

Error Handling and Recovery Example (examples/error-handling.ts)

Compose a resilient workflow with retries, timeouts, fallbacks, and error logging.

import { WorkflowBuilder, DAGTask } from "agent-workflow";
import {
  NetworkRequestTask,
  FallbackTask,
  ErrorRecoveryTask,
  ErrorLoggingTask
} from "./examples/tasks";

(async () => {
  const workflow = WorkflowBuilder.create()
    .addTask(new NetworkRequestTask({ failureRate: 0.8 }))
// Fallback on specific network error once
    .addDynamicStrategy({
      name: "network_fallback",
      condition: ctx => {
        const err = ctx.get("lastError") as string;
        return err?.includes("Network request failed");
      },
      generator: async () => [ new FallbackTask() ],
      priority: 10,
      once: true
    })
// Global error recovery and logging
    .addDynamicStrategy({
      name: "error_recovery",
      condition: ctx => ctx.get("hasError") === true,
      generator: async () => [ new ErrorRecoveryTask(), new ErrorLoggingTask() ],
      priority: 5
    })
    .withConfig({ retryAttempts: 2, timeoutMs: 5000 })
    .build();

  const result = await workflow.execute();
  console.log(`Success: ${result.success}`);
  if (!result.success) console.error(`Error: ${result.error?.message}`);
})();

Practical tips:

  • Define idempotent tasks for safe retries.
  • Set timeoutMs to bound long operations.
  • Use once: true to avoid repeating fallback injections.
  • Inspect result.taskResults, result.dynamicTasksGenerated, and per-task statuses for monitoring. Which part of WorkflowBuilder.ts would you like documented?

• Core fluent API (withConfig, addTask, build)
• Dynamic strategy helpers (whenCondition, onTaskComplete, onContextChange)
• Streaming builder methods (buildStreaming, buildAISDKStreaming)
• Internal execution levels in StaticWorkflow

Let me know your choice and I’ll draft a focused API reference section.

Development & Contribution

Quickstart for building, testing, linting, and contributing to the agent-workflow library.

Prerequisites

• Node.js ≥16
• npm, Yarn or pnpm

Clone and install:

git clone https://github.com/your-org/agent-workflow.git
cd agent-workflow
npm install

Project Scripts

Inspect package.json for up-to-date commands:

{
  "scripts": {
    "build": "vite build",
    "test": "jest --passWithNoTests",
    "test:watch": "jest --watch",
    "lint": "biome check",
    "format": "biome fmt"
  }
}

Building the Library

Run the Vite build to emit ESM, UMD bundles and .d.ts files:

npm run build
# outputs:
#  - dist/agent-workflow.es.js
#  - dist/agent-workflow.umd.cjs
#  - dist/agent-workflow.d.ts

• Configuration: vite.config.ts
lib.entry: src/index.ts
formats: ['es','umd']
external: peer/runtime deps (e.g. zod, ai, Node built-ins)
dts plugin generates and injects types entry

Linting & Formatting

Use Biome for import-sorting, lint rules, and formatting. Configuration lives in biome.json.

# check style errors
npm run lint

# auto-format files
npm run format

Ignored paths: dist/, node_modules/, .git/, biome.json.

Running Tests

Jest runs TypeScript tests as ESM via ts-jest and jest.config.cjs.

# single run
npm test

# watch mode
npm run test:watch

Key config in jest.config.cjs:

  • transform uses ts-jest with useESM: true
  • extensionsToTreatAsEsm: ['.ts']
  • tsconfig points at tsconfig.test.json

Example test skeleton (.ts imports require .js extension):

// src/workflow/__tests__/MyFeature.test.ts
import { WorkflowBuilder } from '../WorkflowBuilder.js'

describe('MyFeature', () => {
  it('executes a static task', async () => {
    const wf = new WorkflowBuilder('demo')
    wf.addTask('step', async () => 'ok')
    const result = await wf.execute()
    expect(result.step).toBe('ok')
  })
})

TypeScript Configuration

tsconfig.json – production build

{
  "compilerOptions": {
    "target": "ES2020",
    "module": "ESNext",
    "declaration": true,
    "outDir": "dist",
    "strict": true,
    "esModuleInterop": true
  },
  "include": ["src", "examples"],
  "exclude": ["src/**/*.test.ts", "src/**/__tests__"]
}

tsconfig.test.json – tests

{
  "extends": "./tsconfig.json",
  "compilerOptions": {
    "module": "ESNext",
    "allowJs": true
  },
  "include": ["**/*.ts", "**/*.js"],
  "exclude": ["node_modules"]
}

.npmignore

Ensure only runtime artifacts publish to npm. Key excludes:

src/
examples/
**/*.test.ts
**/__tests__/
tsconfig*.json
vite.config.ts
jest.config.cjs
biome.json

Update this file when adding new dev-only files or folders.

Contributing Workflow

  1. Fork the repo and create a feature branch feature/your-feature.
  2. Install dependencies and implement your changes.
  3. Add or update tests under src/**/__tests__.
  4. Run and confirm all checks pass:
    npm run lint
    npm test
    npm run build
    
  5. Commit with descriptive message and open a PR.

All contributions should adhere to existing code style and include relevant tests. Continuous Integration runs Biome, Jest, and the Vite build on every pull request.