from langgraph_sdk import get_clientclient = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)# Using the graph deployed with the name "agent"assistant_id = "agent"# create a threadthread = await client.threads.create()thread_id = thread["thread_id"]# create a streaming runasync for chunk in client.runs.stream( thread_id, assistant_id, input=inputs, stream_mode="updates"): print(chunk.data)
Copy
Ask AI
import { Client } from "@langchain/langgraph-sdk";const client = new Client({ apiUrl: <DEPLOYMENT_URL>, apiKey: <API_KEY> });// Using the graph deployed with the name "agent"const assistantID = "agent";// create a threadconst thread = await client.threads.create();const threadID = thread["thread_id"];// create a streaming runconst streamResponse = client.runs.stream( threadID, assistantID, { input, streamMode: "updates" });for await (const chunk of streamResponse) { console.log(chunk.data);}
Create a thread:
Copy
Ask AI
curl --request POST \--url <DEPLOYMENT_URL>/threads \--header 'Content-Type: application/json' \--data '{}'
Once you have a running Agent Server, you can interact with it using
LangGraph SDK
Python
JavaScript
cURL
Copy
Ask AI
from langgraph_sdk import get_clientclient = get_client(url=<DEPLOYMENT_URL>)# Using the graph deployed with the name "agent"assistant_id = "agent"# create a threadthread = await client.threads.create()thread_id = thread["thread_id"]# create a streaming runasync for chunk in client.runs.stream( # (1)! thread_id, assistant_id, input={"topic": "ice cream"}, stream_mode="updates" # (2)!): print(chunk.data)
The client.runs.stream() method returns an iterator that yields streamed outputs.
2. Set stream_mode="updates" to stream only the updates to the graph state after each node. Other stream modes are also available. See supported stream modes for details.
Copy
Ask AI
import { Client } from "@langchain/langgraph-sdk";const client = new Client({ apiUrl: <DEPLOYMENT_URL> });// Using the graph deployed with the name "agent"const assistantID = "agent";// create a threadconst thread = await client.threads.create();const threadID = thread["thread_id"];// create a streaming runconst streamResponse = client.runs.stream( // (1)! threadID, assistantID, { input: { topic: "ice cream" }, streamMode: "updates" // (2)! });for await (const chunk of streamResponse) { console.log(chunk.data);}
The client.runs.stream() method returns an iterator that yields streamed outputs.
Set streamMode: "updates" to stream only the updates to the graph state after each node. Other stream modes are also available. See supported stream modes for details.
Create a thread:
Copy
Ask AI
curl --request POST \--url <DEPLOYMENT_URL>/threads \--header 'Content-Type: application/json' \--data '{}'
{'run_id': '1f02c2b3-3cef-68de-b720-eec2a4a8e920', 'attempt': 1}{'refine_topic': {'topic': 'ice cream and cats'}}{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}
Streams the updates to the state after each step of the graph. If multiple updates are made in the same step (e.g., multiple nodes are run), those updates are streamed separately.
You can pass a list as the stream_mode parameter to stream multiple modes at once.The streamed outputs will be tuples of (mode, chunk) where mode is the name of the stream mode and chunk is the data streamed by that mode.
Python
JavaScript
cURL
Copy
Ask AI
async for chunk in client.runs.stream( thread_id, assistant_id, input=inputs, stream_mode=["updates", "custom"]): print(chunk)
Use the stream modes updates and values to stream the state of the graph as it executes.
updates streams the updates to the state after each step of the graph.
values streams the full value of the state after each step of the graph.
Example graph
Copy
Ask AI
from typing import TypedDictfrom langgraph.graph import StateGraph, START, ENDclass State(TypedDict): topic: str joke: strdef refine_topic(state: State): return {"topic": state["topic"] + " and cats"}def generate_joke(state: State): return {"joke": f"This is a joke about {state['topic']}"}graph = ( StateGraph(State) .add_node(refine_topic) .add_node(generate_joke) .add_edge(START, "refine_topic") .add_edge("refine_topic", "generate_joke") .add_edge("generate_joke", END) .compile())
Stateful runs
Examples below assume that you want to persist the outputs of a streaming run in the checkpointer DB and have created a thread. To create a thread:
Python
JavaScript
cURL
Copy
Ask AI
from langgraph_sdk import get_clientclient = get_client(url=<DEPLOYMENT_URL>)# Using the graph deployed with the name "agent"assistant_id = "agent"# create a threadthread = await client.threads.create()thread_id = thread["thread_id"]
Copy
Ask AI
import { Client } from "@langchain/langgraph-sdk";const client = new Client({ apiUrl: <DEPLOYMENT_URL> });// Using the graph deployed with the name "agent"const assistantID = "agent";// create a threadconst thread = await client.threads.create();const threadID = thread["thread_id"]
Copy
Ask AI
curl --request POST \--url <DEPLOYMENT_URL>/threads \--header 'Content-Type: application/json' \--data '{}'
If you donโt need to persist the outputs of a run, you can pass None instead of thread_id when streaming.
Use this to stream only the state updates returned by the nodes after each step. The streamed outputs include the name of the node as well as the update.
Python
JavaScript
cURL
Copy
Ask AI
async for chunk in client.runs.stream( thread_id, assistant_id, input={"topic": "ice cream"}, stream_mode="updates"): print(chunk.data)
To include outputs from subgraphs in the streamed outputs, you can set subgraphs=True in the .stream() method of the parent graph. This will stream outputs from both the parent graph and any subgraphs.
Copy
Ask AI
async for chunk in client.runs.stream( thread_id, assistant_id, input={"foo": "foo"}, stream_subgraphs=True, # (1)! stream_mode="updates",): print(chunk)
Set stream_subgraphs=True to stream outputs from subgraphs.
Extended example: streaming from subgraphs
This is an example graph you can run in the Agent Server.
See LangSmith quickstart for more details.
Copy
Ask AI
# graph.pyfrom langgraph.graph import START, StateGraphfrom typing import TypedDict# Define subgraphclass SubgraphState(TypedDict): foo: str # note that this key is shared with the parent graph state bar: strdef subgraph_node_1(state: SubgraphState): return {"bar": "bar"}def subgraph_node_2(state: SubgraphState): return {"foo": state["foo"] + state["bar"]}subgraph_builder = StateGraph(SubgraphState)subgraph_builder.add_node(subgraph_node_1)subgraph_builder.add_node(subgraph_node_2)subgraph_builder.add_edge(START, "subgraph_node_1")subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")subgraph = subgraph_builder.compile()# Define parent graphclass ParentState(TypedDict): foo: strdef node_1(state: ParentState): return {"foo": "hi! " + state["foo"]}builder = StateGraph(ParentState)builder.add_node("node_1", node_1)builder.add_node("node_2", subgraph)builder.add_edge(START, "node_1")builder.add_edge("node_1", "node_2")graph = builder.compile()
Once you have a running Agent Server, you can interact with it using
LangGraph SDK
Python
JavaScript
cURL
Copy
Ask AI
from langgraph_sdk import get_clientclient = get_client(url=<DEPLOYMENT_URL>)# Using the graph deployed with the name "agent"assistant_id = "agent"# create a threadthread = await client.threads.create()thread_id = thread["thread_id"]async for chunk in client.runs.stream( thread_id, assistant_id, input={"foo": "foo"}, stream_subgraphs=True, # (1)! stream_mode="updates",): print(chunk)
Set stream_subgraphs=True to stream outputs from subgraphs.
Copy
Ask AI
import { Client } from "@langchain/langgraph-sdk";const client = new Client({ apiUrl: <DEPLOYMENT_URL> });// Using the graph deployed with the name "agent"const assistantID = "agent";// create a threadconst thread = await client.threads.create();const threadID = thread["thread_id"];// create a streaming runconst streamResponse = client.runs.stream( threadID, assistantID, { input: { foo: "foo" }, streamSubgraphs: true, // (1)! streamMode: "updates" });for await (const chunk of streamResponse) { console.log(chunk);}
Set streamSubgraphs: true to stream outputs from subgraphs.
Create a thread:
Copy
Ask AI
curl --request POST \--url <DEPLOYMENT_URL>/threads \--header 'Content-Type: application/json' \--data '{}'
Use the debug streaming mode to stream as much information as possible throughout the execution of the graph. The streamed outputs include the name of the node as well as the full state.
Python
JavaScript
cURL
Copy
Ask AI
async for chunk in client.runs.stream( thread_id, assistant_id, input={"topic": "ice cream"}, stream_mode="debug"): print(chunk.data)
Use the messages-tuple streaming mode to stream Large Language Model (LLM) outputs token by token from any part of your graph, including nodes, tools, subgraphs, or tasks.The streamed output from messages-tuple mode is a tuple (message_chunk, metadata) where:
message_chunk: the token or message segment from the LLM.
metadata: a dictionary containing details about the graph node and LLM invocation.
Example graph
Copy
Ask AI
from dataclasses import dataclassfrom langchain.chat_models import init_chat_modelfrom langgraph.graph import StateGraph, START@dataclassclass MyState: topic: str joke: str = ""model = init_chat_model(model="gpt-4o-mini")def call_model(state: MyState): """Call the LLM to generate a joke about a topic""" model_response = model.invoke( # (1)! [ {"role": "user", "content": f"Generate a joke about {state.topic}"} ] ) return {"joke": model_response.content}graph = ( StateGraph(MyState) .add_node(call_model) .add_edge(START, "call_model") .compile())
Note that the message events are emitted even when the LLM is run using invoke rather than stream.
Python
JavaScript
cURL
Copy
Ask AI
async for chunk in client.runs.stream( thread_id, assistant_id, input={"topic": "ice cream"}, stream_mode="messages-tuple",): if chunk.event != "messages": continue message_chunk, metadata = chunk.data # (1)! if message_chunk["content"]: print(message_chunk["content"], end="|", flush=True)
The โmessages-tupleโ stream mode returns an iterator of tuples (message_chunk, metadata) where message_chunk is the token streamed by the LLM and metadata is a dictionary with information about the graph node where the LLM was called and other information.
The โmessages-tupleโ stream mode returns an iterator of tuples (message_chunk, metadata) where message_chunk is the token streamed by the LLM and metadata is a dictionary with information about the graph node where the LLM was called and other information.
LangSmith allows you to join an active background run and stream outputs from it. To do so, you can use LangGraph SDKโsclient.runs.join_stream method:
Python
JavaScript
cURL
Copy
Ask AI
from langgraph_sdk import get_clientclient = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)async for chunk in client.runs.join_stream( thread_id, run_id, # (1)!): print(chunk)
This is the run_id of an existing run you want to join.
Copy
Ask AI
import { Client } from "@langchain/langgraph-sdk";const client = new Client({ apiUrl: <DEPLOYMENT_URL>, apiKey: <API_KEY> });const streamResponse = client.runs.joinStream( threadID, runId // (1)!);for await (const chunk of streamResponse) { console.log(chunk);}
This is the run_id of an existing run you want to join.