import { setupObservability } from "langwatch/observability/node";
import { LangWatchCallbackHandler } from "langwatch/observability/instrumentation/langchain";
import { getLangWatchTracer } from "langwatch";
import { ChatOpenAI } from "@langchain/openai";
import { HumanMessage, SystemMessage } from "@langchain/core/messages";
import { StateGraph, END, START } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph";
import { z } from "zod";
// Initialize LangWatch
setupObservability();
const tracer = getLangWatchTracer("langgraph-example");
// Define the state schema using Zod
const GraphState = z.object({
question: z.string(),
current_step: z.string().default("start"),
needs_search: z.boolean().default(false),
search_results: z.string().default(""),
analysis: z.string().default(""),
final_answer: z.string().default(""),
iterations: z.number().default(0),
});
type GraphStateType = z.infer<typeof GraphState>;
async function handleWorkflowWithCallback(userQuestion: string) {
return await tracer.withActiveSpan("LangGraph - Research Workflow", {
attributes: {
"langwatch.thread_id": "langgraph-user",
"langwatch.tags": ["langgraph", "research-agent", "multi-step"],
},
}, async (span) => {
span.setType("workflow");
const langWatchCallback = new LangWatchCallbackHandler();
const chatModel = new ChatOpenAI({
modelName: "gpt-4o-mini",
temperature: 0.3,
callbacks: [langWatchCallback],
});
// Node 1: Analyze the question
const analyzeQuestion = async (state: GraphStateType) => {
const prompt = `
Analyze this question and determine if it requires current/recent information that would need web search.
Question: ${state.question}
Respond with just "YES" if web search is needed, "NO" if general knowledge is sufficient.
`;
const result = await chatModel.invoke([
new SystemMessage("You are a question analyzer. Respond with only YES or NO."),
new HumanMessage(prompt),
]);
const needsSearch = (result.content as string).toUpperCase().includes("YES");
return {
current_step: "question_analyzed",
needs_search: needsSearch,
};
};
// Mock search tool for demo purposes
const performWebSearch = async (query: string): Promise<string> => {
// Simulate search delay
await new Promise((resolve) => setTimeout(resolve, 1000));
return `Mock search results for "${query}":
- Recent developments and current information
- Latest news and analysis from reliable sources
- Expert opinions and academic research
- Current market trends and data points`;
};
// Node 2: Perform web search
const performSearch = async (state: GraphStateType) => {
const searchResults = await performWebSearch(state.question);
return {
current_step: "search_completed",
search_results: searchResults,
};
};
// Node 3: Analyze information
const analyzeInformation = async (state: GraphStateType) => {
const context = state.search_results
? `Search Results:\n${state.search_results}\n\n`
: "Using general knowledge (no search performed).\n\n";
const prompt = `
${context}Question: ${state.question}
Provide a thorough analysis of this question, considering multiple perspectives and available information.
`;
const result = await chatModel.invoke([
new SystemMessage("You are an expert analyst. Provide comprehensive analysis."),
new HumanMessage(prompt),
]);
return {
current_step: "analysis_completed",
analysis: result.content as string,
};
};
// Node 4: Generate final answer
const generateAnswer = async (state: GraphStateType) => {
const prompt = `
Question: ${state.question}
Analysis: ${state.analysis}
${state.search_results ? `Search Results: ${state.search_results}` : ""}
Based on the analysis and available information, provide a comprehensive, well-structured answer.
`;
const result = await chatModel.invoke([
new SystemMessage("You are a helpful assistant. Provide clear, comprehensive answers."),
new HumanMessage(prompt),
]);
return {
current_step: "answer_generated",
final_answer: result.content as string,
};
};
// Router function to determine the next step
const router = (state: GraphStateType): string => {
switch (state.current_step) {
case "question_analyzed":
return state.needs_search ? "search" : "analyze";
case "search_completed":
return "analyze";
case "analysis_completed":
return "generate_answer";
case "answer_generated":
return END;
default:
return "analyze_question";
}
};
// Build the StateGraph
const workflow = new StateGraph(GraphState)
.addNode("analyze_question", analyzeQuestion)
.addNode("search", performSearch)
.addNode("analyze", analyzeInformation)
.addNode("generate_answer", generateAnswer)
.addEdge(START, "analyze_question")
.addConditionalEdges("analyze_question", router, {
search: "search",
analyze: "analyze",
})
.addConditionalEdges("search", router, {
analyze: "analyze",
})
.addConditionalEdges("analyze", router, {
generate_answer: "generate_answer",
})
.addConditionalEdges("generate_answer", router, {
[END]: END,
});
// Compile the graph with memory and callbacks
const memory = new MemorySaver();
const app = workflow
.compile({ checkpointer: memory })
.withConfig({ callbacks: [langWatchCallback] });
// Create initial state
const initialState: GraphStateType = {
question: userQuestion,
current_step: "start",
needs_search: false,
search_results: "",
analysis: "",
final_answer: "",
iterations: 0,
};
// Execute the workflow
const config = {
configurable: { thread_id: "langgraph-user" },
};
let finalState: GraphStateType = initialState;
// Stream through each node execution
for await (const step of await app.stream(initialState, config)) {
const nodeNames = Object.keys(step);
// Update final state with all node outputs
for (const nodeName of nodeNames) {
const nodeOutput = (step as any)[nodeName];
if (nodeOutput && typeof nodeOutput === "object") {
finalState = { ...finalState, ...nodeOutput };
}
}
}
return finalState.final_answer;
});
}
async function mainCallback() {
if (!process.env.OPENAI_API_KEY) {
console.log("OPENAI_API_KEY not set. Skipping LangGraph callback example.");
return;
}
const response = await handleWorkflowWithCallback("What is LangGraph? Explain briefly.");
console.log(`AI (LangGraph): ${response}`);
}
mainCallback().catch(console.error);