Modern Agent Engineering

How to Stream LangChain and LangGraph into AI SDK

A source-audited, practical guide to building streaming APIs with LangChain and LangGraph, then consuming them cleanly with AI SDK from simple chat to durable agents.

19 min read

Many teams think they have to choose between LangChain/LangGraph and AI SDK.

You usually do not.

A very practical production setup is:

  • LangChain or LangGraph owns orchestration, tools, memory, checkpoints, and agent logic.
  • AI SDK owns the frontend contract, streaming protocol, and chat UI ergonomics.

That split is especially useful when you want to move from “simple streaming chat” to “tool-using, stateful, resumable agent” without rewriting your UI every time.

I audited the relevant official docs on March 30, 2026 before writing this guide. The examples below are based on the current AI SDK v6 docs and the current LangChain/LangGraph TypeScript docs.

TL;DR

  • If your backend is TypeScript, the fastest path is the official @ai-sdk/langchain adapter.
  • For simple chat, the pattern is: useChat -> /api/chat -> model.stream(...) -> toUIMessageStream(...).
  • For tool-using agents, createAgent(...).stream(...) is the clean default. Use streamEvents() when you want more granular semantic events for debugging or observability.
  • For LangGraph, the most important concept is stream mode:
    • messages for token streaming
    • updates or values for graph state
    • custom for progress events
  • For long-running or resumable work, LangGraph becomes much more valuable when you add:
    • a checkpointer
    • a thread_id
    • tasks around side effects
    • interrupts for approval or human review
  • AI SDK can consume:
    • a plain text stream
    • or the richer UIMessage data stream over SSE
  • If your backend is Python, AI SDK can still consume it. The stream contract is HTTP plus SSE, not “must be TypeScript.”

What You Will Learn Here

  • How LangChain, LangGraph, and AI SDK fit together in one architecture.
  • The difference between plain text streaming and UIMessage/data streaming.
  • A minimal LangChain streaming route.
  • A tool-using agent route.
  • A stateful LangGraph route with thread persistence.
  • How to stream custom progress updates into the UI.
  • Common design patterns for AI agents.
  • How to think about background processing and long-running tasks without painting yourself into a corner.

The Research Audit: What the Official Docs Confirm

Here is the cleanest reading of the official docs as of March 30, 2026.

1. AI SDK supports two important frontend streaming contracts

The current AI SDK docs say useChat and related UI helpers support both:

  • text streams
  • data streams using the AI SDK UIMessage protocol over SSE

The docs are explicit that:

  • text streams are fine for plain assistant text
  • the data/UIMessage stream is the right fit when you need richer parts like tool calls, tool results, reasoning parts, custom data, step boundaries, and structured message rendering
  • custom backends can be implemented in other languages such as Python
  • custom UIMessage backends need the x-vercel-ai-ui-message-stream: v1 header

That is an important architectural point: AI SDK is a frontend protocol and UI layer, not just a model SDK.

2. The official @ai-sdk/langchain adapter is the bridge most teams want

The adapter docs now explicitly provide:

  • toBaseMessages(...) to convert AI SDK UIMessage[] into LangChain messages
  • toUIMessageStream(...) to convert LangChain or LangGraph streams back into AI SDK UIMessage parts
  • support for streamEvents()
  • support for custom typed data events
  • LangSmithDeploymentTransport for connecting useChat directly to a LangGraph deployment

This means the “glue code” problem is much smaller than it used to be.

3. LangChain createAgent runs on LangGraph runtime underneath

The LangChain runtime docs are clear: createAgent runs on LangGraph’s runtime under the hood.

That matters because it explains why the stack feels so composable:

  • start with LangChain when you want the fastest path
  • drop to LangGraph when you need more control over state, branching, interrupts, or durable execution

This is not two unrelated ecosystems. It is more like a high-level and low-level layer of the same execution model.

4. LangGraph streaming is more than just tokens

The LangGraph streaming docs list the current stream modes:

  • values: full state after each step
  • updates: only state deltas after each step
  • messages: token chunks plus metadata from LLM calls
  • custom: your own streamed data from nodes/tools via writer
  • tools: tool lifecycle events
  • debug: everything

That is a big deal for product teams.

If you only stream tokens, users see words.

If you stream state, tool lifecycle, and progress, users see what the system is doing.

5. Durable execution has three non-negotiable ideas

The LangGraph durable execution docs are very explicit about the minimum requirements:

  1. enable persistence with a checkpointer
  2. run with a thread identifier
  3. wrap side effects and non-deterministic work in tasks

The docs also explain an easy-to-miss point: on resume, execution does not continue from the exact same line of code. It replays from an appropriate restart point.

That means “just resume later” is only safe if replay will not duplicate side effects.

6. Interrupts are the right primitive for approval gates

The LangGraph interrupt docs describe interrupts as the mechanism for:

  • approval workflows
  • review and edit flows
  • tool-call review before execution
  • external human input before continuing

This is one of the cleanest patterns for real business agents, especially when actions affect money, tickets, records, or customers.

The Mental Model

The cleanest architecture usually looks like this:

Browser
  |
  v
useChat() in AI SDK
  |
  v
/api/chat
  |
  +--> convert UIMessage[] -> LangChain messages
  |
  +--> LangChain model / agent / LangGraph graph
  |
  +--> stream tokens, state updates, tool events, or custom progress
  |
  v
toUIMessageStream(...)
  |
  v
AI SDK UIMessage SSE stream
  |
  v
useChat renders message parts

And when you add durability:

Browser
  |
  v
AI SDK useChat
  |
  v
API route / transport
  |
  +--> LangGraph thread_id
  +--> checkpointer
  +--> optional Redis / stream persistence
  |
  v
Durable graph execution
  |
  +--> messages
  +--> updates
  +--> custom progress
  +--> interrupts
  |
  v
Resume / reconnect / continue later

Example 1: The Simplest Working Setup

Start here if you only need:

  • streaming assistant text
  • a clean chat UI
  • minimal moving parts

This is the official adapter pattern, slightly simplified for clarity.

Backend: LangChain model -> AI SDK UIMessage stream

// app/api/chat/route.ts
import { ChatOpenAI } from "@langchain/openai";
import { createUIMessageStreamResponse, type UIMessage } from "ai";
import { toBaseMessages, toUIMessageStream } from "@ai-sdk/langchain";

const model = new ChatOpenAI({
  model: "gpt-4o-mini",
  temperature: 0,
});

export async function POST(req: Request) {
  const { messages }: { messages: UIMessage[] } = await req.json();

  const langchainMessages = await toBaseMessages(messages);
  const stream = await model.stream(langchainMessages);

  return createUIMessageStreamResponse({
    stream: toUIMessageStream(stream),
  });
}

Frontend: useChat

"use client";

import { useChat } from "@ai-sdk/react";

export default function ChatPage() {
  const { messages, sendMessage, status } = useChat();

  return (
    <form
      onSubmit={(event) => {
        event.preventDefault();

        const input = event.currentTarget.elements.namedItem(
          "message",
        ) as HTMLInputElement;

        sendMessage({ text: input.value });
        input.value = "";
      }}
    >
      {messages.map((message) => (
        <div key={message.id}>
          <strong>{message.role === "user" ? "User" : "AI"}:</strong>
          {message.parts.map((part, index) =>
            part.type === "text" ? <span key={index}> {part.text}</span> : null,
          )}
        </div>
      ))}

      <input name="message" placeholder="Ask something..." />
      <button type="submit" disabled={status === "streaming"}>
        Send
      </button>
    </form>
  );
}

Why this is a good first step

  • It keeps the frontend stable.
  • It lets you swap the backend later.
  • It already uses the richer UIMessage stream protocol, so you are not locked into “text only.”

Example 2: Add Tools with a LangChain Agent

Once you want tool calling, the clean next move is a LangChain agent.

The official docs confirm that agents created with createAgent(...) support the LangGraph-style execution methods such as stream(...) and invoke(...).

Backend: createAgent(...).stream(...)

// app/api/agent/route.ts
import { createUIMessageStreamResponse, type UIMessage } from "ai";
import { createAgent, tool } from "langchain";
import { ChatOpenAI } from "@langchain/openai";
import { z } from "zod";
import { toBaseMessages, toUIMessageStream } from "@ai-sdk/langchain";

const lookupOrder = tool(
  async ({ orderId }) => {
    return {
      orderId,
      status: "shipped",
      eta: "2026-03-31",
    };
  },
  {
    name: "lookup_order",
    description: "Look up order status by order id.",
    schema: z.object({
      orderId: z.string(),
    }),
  },
);

const agent = createAgent({
  model: new ChatOpenAI({
    model: "gpt-4o",
    temperature: 0,
  }),
  tools: [lookupOrder],
  systemPrompt: "You are a support operations copilot.",
});

export async function POST(req: Request) {
  const { messages }: { messages: UIMessage[] } = await req.json();

  const langchainMessages = await toBaseMessages(messages);

  const stream = await agent.stream(
    { messages: langchainMessages },
    { streamMode: ["values", "messages"] },
  );

  return createUIMessageStreamResponse({
    stream: toUIMessageStream(stream),
  });
}

What changed?

  • We now stream agent execution, not just model output.
  • messages gives token-level flow.
  • values gives the state after each step, which becomes more useful as the agent gets more complex.

Product-level insight

This is the stage where PMs usually start asking questions like:

  • “Can we show when the assistant is checking an order?”
  • “Can we surface progress instead of a spinner?”
  • “Can we keep the UI stable even if the backend changes?”

The answer becomes “yes” once you stop thinking in only token streams.

Example 3: Use streamEvents() for More Debuggable Streams

The AI SDK adapter docs explicitly support LangChain streamEvents() output.

This is especially helpful when:

  • you want semantic events such as on_tool_start and on_tool_end
  • you are debugging or tracing
  • you are migrating existing LCEL-style code that already uses event streams

Backend: streamEvents()

// app/api/stream-events/route.ts
import { ChatOpenAI } from "@langchain/openai";
import { createUIMessageStreamResponse, type UIMessage } from "ai";
import { toBaseMessages, toUIMessageStream } from "@ai-sdk/langchain";

const model = new ChatOpenAI({
  model: "gpt-4o-mini",
  temperature: 0,
});

export async function POST(req: Request) {
  const { messages }: { messages: UIMessage[] } = await req.json();

  const langchainMessages = await toBaseMessages(messages);

  const streamEvents = model.streamEvents(langchainMessages, {
    version: "v2",
  });

  return createUIMessageStreamResponse({
    stream: toUIMessageStream(streamEvents),
  });
}

When to use streamEvents() vs stream(...)

  • Use stream(...) when you mainly care about response rendering and graph state.
  • Use streamEvents() when you care more about semantic lifecycle events, debugging, or observability.

That distinction is small at first, but it matters a lot in production.

Example 4: Drop to LangGraph When You Need Stateful Threads

This is the moment where LangGraph becomes clearly better than “just an agent loop.”

Use LangGraph when you need:

  • a durable thread
  • checkpoints
  • multi-step orchestration
  • controlled branching
  • resumability

Backend: LangGraph with a checkpointer and thread_id

// app/api/langgraph/route.ts
import { ChatOpenAI } from "@langchain/openai";
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
import {
  END,
  START,
  MessagesAnnotation,
  StateGraph,
} from "@langchain/langgraph";
import { createUIMessageStreamResponse, type UIMessage } from "ai";
import { toBaseMessages, toUIMessageStream } from "@ai-sdk/langchain";

const checkpointer = PostgresSaver.fromConnString(process.env.DATABASE_URL!);

const model = new ChatOpenAI({
  model: "gpt-4o-mini",
  temperature: 0,
});

async function callModel(state: typeof MessagesAnnotation.State) {
  const response = await model.invoke(state.messages);
  return { messages: [response] };
}

const graph = new StateGraph(MessagesAnnotation)
  .addNode("agent", callModel)
  .addEdge(START, "agent")
  .addEdge("agent", END)
  .compile({ checkpointer });

export async function POST(req: Request) {
  const { id, messages }: { id: string; messages: UIMessage[] } = await req.json();

  const langchainMessages = await toBaseMessages(messages);

  const stream = await graph.stream(
    { messages: langchainMessages },
    {
      configurable: {
        thread_id: id,
      },
      streamMode: ["messages", "updates"],
    },
  );

  return createUIMessageStreamResponse({
    stream: toUIMessageStream(stream),
  });
}

In a real app, call await checkpointer.setup() once when you initialize the Postgres-backed checkpointer for the first time.

Why this matters

Now the same id can become:

  • your chat id in AI SDK
  • your thread_id in LangGraph
  • your durable execution key in the checkpointer

That is the kind of alignment that makes systems simpler.

updates vs values

Use updates when you want lighter, cheaper state signals.

Use values when you want the whole graph state after each step, such as:

  • rich debugging UIs
  • inspection panels
  • product flows where each step updates a visible plan or checklist

Example 5: Stream Custom Progress Events into the UI

This is one of the most useful patterns in the whole stack.

The LangChain runtime docs say the runtime includes a stream writer, and the LangGraph streaming docs say custom mode emits user-defined data through writer.

The AI SDK adapter then maps these events into typed data-{type} parts.

Backend: emit progress from a tool

// app/api/custom-data/route.ts
import { createUIMessageStreamResponse, type UIMessage } from "ai";
import { createAgent, tool } from "langchain";
import { ChatOpenAI } from "@langchain/openai";
import { z } from "zod";
import { toBaseMessages, toUIMessageStream } from "@ai-sdk/langchain";

const analyzeDataset = tool(
  async ({ dataset }, config) => {
    const steps = ["loading", "cleaning", "summarizing"];

    for (const [index, step] of steps.entries()) {
      config.writer?.({
        type: "progress",
        id: `dataset-${dataset}`,
        step,
        message: `${step} ${dataset}`,
        progress: Math.round(((index + 1) / steps.length) * 100),
      });

      await new Promise((resolve) => setTimeout(resolve, 500));
    }

    return {
      dataset,
      summary: "No major issues found",
    };
  },
  {
    name: "analyze_dataset",
    description: "Analyze a dataset and report progress.",
    schema: z.object({
      dataset: z.string(),
    }),
  },
);

const agent = createAgent({
  model: new ChatOpenAI({ model: "gpt-4o-mini", temperature: 0 }),
  tools: [analyzeDataset],
});

export async function POST(req: Request) {
  const { messages }: { messages: UIMessage[] } = await req.json();

  const langchainMessages = await toBaseMessages(messages);

  const stream = await agent.stream(
    { messages: langchainMessages },
    { streamMode: ["values", "messages", "custom"] },
  );

  return createUIMessageStreamResponse({
    stream: toUIMessageStream(stream),
  });
}

Frontend: render data-progress

"use client";

import { useChat } from "@ai-sdk/react";

export default function AnalysisPage() {
  const { messages, sendMessage } = useChat({
    onData: (dataPart) => {
      if (dataPart.type === "data-progress") {
        console.log("progress update", dataPart.data);
      }
    },
  });

  return (
    <div>
      {messages.map((message) => (
        <div key={message.id}>
          {message.parts.map((part, index) => {
            if (part.type === "text") {
              return <div key={index}>{part.text}</div>;
            }

            if (part.type === "data-progress") {
              return (
                <div key={index}>
                  {part.data.progress}% - {part.data.message}
                </div>
              );
            }

            return null;
          })}
        </div>
      ))}

      <button
        onClick={() =>
          sendMessage({
            text: "Analyze the sales dataset",
          })
        }
      >
        Start
      </button>
    </div>
  );
}

Why this pattern is so good

It replaces vague loading states:

"Thinking..."

with honest product feedback:

loading sales
cleaning sales
summarizing sales

That is better for:

  • UX
  • stakeholder demos
  • trust
  • debugging

Example 6: Connect useChat Directly to a Deployed LangGraph Runtime

This is the cleanest advanced option if you already have a LangGraph deployment.

The AI SDK adapter now includes LangSmithDeploymentTransport, which means the frontend can talk directly to a LangGraph deployment without a custom backend route in between.

"use client";

import { useMemo } from "react";
import { useChat } from "@ai-sdk/react";
import { LangSmithDeploymentTransport } from "@ai-sdk/langchain";

export default function LangGraphChat() {
  const transport = useMemo(
    () =>
      new LangSmithDeploymentTransport({
        url: "https://your-deployment.us.langgraph.app",
        apiKey: process.env.NEXT_PUBLIC_LANGGRAPH_API_KEY,
      }),
    [],
  );

  const { messages, sendMessage } = useChat({ transport });

  return (
    <div>
      {messages.map((message) => (
        <div key={message.id}>
          {message.parts.map((part, index) =>
            part.type === "text" ? <div key={index}>{part.text}</div> : null,
          )}
        </div>
      ))}

      <button onClick={() => sendMessage({ text: "Research AI agent architecture" })}>
        Ask
      </button>
    </div>
  );
}

This is a very attractive pattern when:

  • your LangGraph runtime is already deployed
  • you want to keep frontend code simple
  • you do not want to hand-roll transport logic

If Your Backend Is Python

This part matters because many real LangGraph teams are Python-first.

The AI SDK stream protocol docs explicitly say custom compatible endpoints can be implemented in other languages such as Python.

Rule of thumb

  • If you only need plain assistant text, emit a text stream.
  • If you need tool calls, progress events, or richer rendering, emit the UIMessage SSE protocol.

Minimal Python-compatible UIMessage SSE shape

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()


@app.post("/api/chat")
async def chat():
    async def event_stream():
        yield 'data: {"type":"start","messageId":"msg-1"}\n\n'
        yield 'data: {"type":"text-start","id":"text-1"}\n\n'
        yield f'data: {json.dumps({"type":"text-delta","id":"text-1","delta":"Hello from Python"})}\n\n'
        yield 'data: {"type":"text-end","id":"text-1"}\n\n'
        yield 'data: {"type":"finish"}\n\n'
        yield 'data: [DONE]\n\n'

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={
            "x-vercel-ai-ui-message-stream": "v1",
        },
    )

On the frontend, the same useChat() call still works.

That is the key insight:

AI SDK cares about the stream contract, not about whether your backend speaks TypeScript or Python.

Common Design Patterns for AI Agents

The LangGraph workflows guide is especially good here. It distinguishes between workflows and agents, then documents several recurring patterns.

Here is the short version I would use in real projects.

PatternBest forWhy it works
Prompt chainDeterministic multi-step tasksEasy to test and reason about
RouterOne request, several specialized pathsGood latency and cost control
Parallel fan-out / fan-inIndependent sub-tasksBetter throughput and faster answers
Orchestrator-workerDynamic decompositionFlexible when the number of sub-tasks is not known upfront
Evaluator-optimizerHigh-quality output loopsLets one step critique or improve another
Agent with approval gatesRisky actionsHuman review before side effects

A practical heuristic

Use the simplest pattern that fits the job:

  • If the path is mostly known, use a workflow.
  • If the path must adapt at runtime, use an agent.
  • If actions are risky, insert interrupt-based approval gates.

That advice sounds boring, but it prevents a lot of over-engineering.

Background Processing for Long-Running Tasks

This is where teams often mix up three separate problems:

  1. streaming
  2. durability
  3. resume/reconnect

They are related, but they are not the same thing.

The pattern I trust most

User sends message
    |
    v
API / transport receives chat id
    |
    +--> LangGraph runs with thread_id
    +--> checkpointer saves progress
    +--> worker emits messages / updates / custom events
    |
    v
AI SDK renders partial progress
    |
    +--> refresh / disconnect / later revisit
    |
    v
resume same thread / stream / approval state

What to do when a task might run for a while

1. Keep the web tier stateless

Do not store active thread state in process memory on a single Node instance.

Persist:

  • the chat id
  • the LangGraph thread_id
  • message history or UI messages
  • any active stream identifier

2. Use LangGraph checkpoints for execution state

If the work can pause, retry, or resume, checkpoints are usually more important than fancy UI.

Without them, “resume later” often just means “start again and hope nothing breaks.”

3. Treat side effects as replay-sensitive

This is one of the most important durable-execution rules in the LangGraph docs.

If resume can replay execution, then:

  • emails
  • payments
  • ticket writes
  • CRM updates
  • external API mutations

must be isolated carefully, typically through tasks or clearly separated nodes.

4. Use interrupts for approval, not ad-hoc booleans

If a person needs to approve a tool call or business action, model that pause explicitly.

That is cleaner than:

  • sleeping inside a request
  • polling random flags
  • keeping pending approval state inside a browser tab

AI SDK-specific advice for long-running work

The current AI SDK docs say useChat supports resume streams for long-running generations, but also say:

  • resume requires persisted messages and active streams
  • you need storage plus Redis plus POST/GET endpoints
  • resume: true is not compatible with abort

That leads to a very useful practical split:

  • If you need simple streaming chat, use normal useChat.
  • If you need long-running reconnectable UX, add AI SDK resume streams.
  • If you need durable orchestration and resumable state, pair that with LangGraph checkpoints and thread ids.

In other words:

AI SDK resume handles reconnecting the stream. LangGraph durable execution handles resuming the work.

Those are complementary, not competing concerns.

A Small Interrupt Example

Here is the basic idea of a human approval gate in LangGraph:

import { interrupt, Command } from "@langchain/langgraph";

async function approveRefund(state: {
  orderId: string;
  amount: number;
}) {
  const decision = interrupt({
    kind: "approval",
    action: "refund_order",
    orderId: state.orderId,
    amount: state.amount,
  });

  if (!decision.approved) {
    return {
      status: "cancelled",
    };
  }

  return {
    status: "approved",
  };
}

// later, resume the same thread:
await graph.invoke(
  new Command({
    resume: {
      approved: true,
    },
  }),
  {
    configurable: {
      thread_id: "chat_123",
    },
  },
);

The exact UI around this can vary.

The architectural point is more important than the specific button click:

interrupts give you a first-class pause/resume boundary that fits real business processes.

Common Mistakes to Avoid

1. Using text streams when you really need data streams

If you need:

  • tool rendering
  • progress events
  • reasoning parts
  • multi-step stitched messages

use the UIMessage/data stream, not plain text.

2. Forgetting thread_id

LangGraph persistence is thread-based.

If you do not pass a thread identifier, you do not really have resumable conversation state.

3. Mixing volatile UI state with durable execution state

The chat window being open is not your execution state.

The durable state should live in:

  • a checkpointer
  • a database
  • or a runtime designed for resumable threads

4. Hiding all progress behind a spinner

Streaming only the final text is often technically correct and product-wise weak.

Progress events are often cheap to add and make a huge difference in trust.

5. Replaying unsafe side effects

If a resumed graph can accidentally send the same refund, email, or mutation twice, you do not have a streaming problem.

You have a durability design problem.

Final Take

The cleanest way to think about this stack is:

  • LangChain is your fast path into agent behavior.
  • LangGraph is your control layer for threads, checkpoints, interrupts, and durable orchestration.
  • AI SDK is your frontend streaming and interaction layer.

That combination scales nicely from:

  • “just stream a response”

to:

  • “stream a stateful, tool-using, resumable agent with progress updates and approval gates”

If I were designing from scratch today, I would usually start with this progression:

  1. LangChain + AI SDK adapter for fast delivery
  2. LangGraph stream modes when product needs state and progress
  3. checkpointer + thread_id + interrupts when the work becomes durable or business-critical

That path keeps the UI stable while the backend grows up.

Sources