· 27 分で読める · 13,635 文字
LangGraph agentでワークフロー自動化を実装する実践ガイド
本記事では、LangGraphを使用して複雑なAIエージェントワークフローを構築し、実際のビジネスプロセスを自動化する具体的な方法を解説します。実装例とハマりポイント対策を含めて、すぐに仕事で活用できるレベルまで掘り下げます。
LangGraph agentとは?ワークフロー自動化の新しいアプローチ
LangGraphは、LangChainエコシステムの一部として提供されるグラフベースのワークフロー構築ライブラリです。従来のシーケンシャルなチェーン処理と異なり、条件分岐、ループ、並列処理を直感的に表現できます。
実務では、以下のような場面でLangGraph agentが活躍します:
- カスタマーサポートの自動応答(複数の外部APIを組み合わせた問題解決)
- データ分析ワークフロー(データ取得→前処理→分析→レポート生成)
- コンテンツ生成パイプライン(調査→構成作成→執筆→編集)
- リサーチエージェント(キーワード検索→情報抽出→統合)
一方、LangGraph agentが向かないケース:
- 単純な質問応答(通常のLLMで十分)
- リアルタイム性が最重要(複雑なワークフローは遅延が大きくなる)
- トークンコストが極めて限定的な場合(複数ステップでコストが積み上がる)
LangGraph agentの基本アーキテクチャを理解する
LangGraph agentの動作原理を視覚化したのが以下の図です。エージェントはノード(処理ステップ)とエッジ(遷移条件)で構成されるグラフを実行します。
flowchart TD
A["🚀 ユーザーリクエスト"] -->|タスク開始| B["📋 State初期化"]
B --> C{"🤔 エージェント判定
どのツールを使う?"}
C -->|tool_call| D["🔧 外部API実行"]
D --> E["💾 State更新
実行結果を記録"]
E --> C
C -->|finish| F["✅ 最終回答生成"]
F --> G["📤 ユーザーに返却"]
このループ構造が、LangGraph agentの強力な点です。エージェントは自身の判断でツール呼び出しを繰り返し、タスク完了まで自動的に進行します。
実装例:実際に動く顧客情報検索エージェント
ここから、実際に動作するコード例を紹介します。テスト環境:macOS 14 / Python 3.12 / LangGraph 0.1.12 / OpenAI API で動作確認済みです。
必要なライブラリのインストール
pip install langgraph langchain langchain-openai python-dotenv
基本的なエージェント構築コード
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage, AIMessage
from typing import TypedDict, Annotated, Sequence
import operator
import os
# 1. Stateの定義:エージェントが保持する情報
class AgentState(TypedDict):
messages: Annotated[Sequence, operator.add]
current_task: str
tool_results: dict
# 2. ツール関数の実装例
def search_customer_database(customer_id: str) -> dict:
"""顧客情報をダミーデータベースから検索"""
customers = {
"C001": {"name": "山田太郎", "email": "yamada@example.com", "status": "VIP"},
"C002": {"name": "鈴木花子", "email": "suzuki@example.com", "status": "Regular"},
}
return customers.get(customer_id, {"error": "Customer not found"})
def get_order_history(customer_id: str) -> dict:
"""注文履歴を取得"""
orders = {
"C001": [{"order_id": "O001", "amount": 50000}, {"order_id": "O002", "amount": 30000}],
"C002": [{"order_id": "O003", "amount": 12000}],
}
return {"orders": orders.get(customer_id, [])}
# 3. ノード関数の実装
def agent_node(state: AgentState) -> AgentState:
"""LLMにツール選択を判断させるノード"""
llm = ChatOpenAI(model="gpt-4", temperature=0, api_key=os.getenv("OPENAI_API_KEY"))
# ツール情報を含めたシステムプロンプト
system_prompt = """あなたは顧客サポートエージェントです。
利用可能なツール:
1. search_customer_database(customer_id) - 顧客基本情報を検索
2. get_order_history(customer_id) - 注文履歴を取得
ユーザーリクエストに応じて、どのツールを使うかJSONで返してください。
形式: {"tool": "tool_name", "customer_id": "C001"}
不要な場合は {"tool": "none"} を返してください。"""
messages = state["messages"] + [HumanMessage(content=system_prompt)]
response = llm.invoke(messages)
return {
"messages": state["messages"] + [AIMessage(content=response.content)],
"current_task": response.content,
"tool_results": state.get("tool_results", {}),
}
def tool_executor_node(state: AgentState) -> AgentState:
"""エージェントの判定に基づいてツールを実行"""
import json
try:
task = json.loads(state["current_task"])
tool_name = task.get("tool")
customer_id = task.get("customer_id")
results = {}
if tool_name == "search_customer_database":
results["customer_info"] = search_customer_database(customer_id)
elif tool_name == "get_order_history":
results["orders"] = get_order_history(customer_id)
return {
"messages": state["messages"],
"current_task": state["current_task"],
"tool_results": {**state.get("tool_results", {}), **results},
}
except json.JSONDecodeError:
return state
def final_response_node(state: AgentState) -> AgentState:
"""ツール実行結果から最終回答を生成"""
llm = ChatOpenAI(model="gpt-4", temperature=0)
tool_results_text = str(state.get("tool_results", {}))
final_prompt = f"""以下のツール実行結果に基づいて、ユーザーに対する最終回答を作成してください。
結果:{tool_results_text}"""
response = llm.invoke([HumanMessage(content=final_prompt)])
return {
"messages": state["messages"] + [AIMessage(content=response.content)],
"current_task": state["current_task"],
"tool_results": state["tool_results"],
}
# 4. グラフの構築
workflow = StateGraph(AgentState)
# ノードを追加
workflow.add_node("agent", agent_node)
workflow.add_node("tool_executor", tool_executor_node)
workflow.add_node("final_response", final_response_node)
# エッジを定義
workflow.add_edge("agent", "tool_executor")
workflow.add_edge("tool_executor", "final_response")
workflow.add_edge("final_response", END)
# 開始ノードを指定
workflow.set_entry_point("agent")
# グラフをコンパイル
app = workflow.compile()
# 5. エージェントの実行
initial_state = {
"messages": [HumanMessage(content="顧客C001の情報と注文履歴を取得してください")],
"current_task": "",
"tool_results": {},
}
result = app.invoke(initial_state)
print("最終結果:", result["messages"][-1].content)
実装時の重要なハマりポイントと対策
Issue 1: JSONパースエラーでエージェントが停止する
実務でよくあるエラーです。LLMの出力がJSONとしてパースできず、ツール実行段階で例外が発生します。
対策:システムプロンプトにJSON形式を明確に指示し、パース失敗時のフォールバック処理を実装します。
def tool_executor_node(state: AgentState) -> AgentState:
"""ツール実行時のエラーハンドリング強化版"""
import json
try:
# 最初の試行:JSONパース
task = json.loads(state["current_task"])
except json.JSONDecodeError:
# フォールバック:テキスト検索で対応
if "customer" in state["current_task"].lower():
task = {"tool": "search_customer_database", "customer_id": "C001"}
else:
task = {"tool": "none"}
tool_name = task.get("tool")
customer_id = task.get("customer_id", "")
results = {}
if tool_name == "search_customer_database" and customer_id:
results["customer_info"] = search_customer_database(customer_id)
elif tool_name == "get_order_history" and customer_id:
results["orders"] = get_order_history(customer_id)
return {
"messages": state["messages"],
"current_task": state["current_task"],
"tool_results": {**state.get("tool_results", {}), **results},
}
Issue 2: 無限ループに陥るエージェント
ツール実行後も同じツールを呼び続けるケースです。特に外部APIのレスポンスが曖昧な場合に発生します。
対策:ループカウンター機能と最大ステップ数の制限を実装します。
# ループ防止機構を追加したState
class AgentStateWithCounter(TypedDict):
messages: Annotated[Sequence, operator.add]
current_task: str
tool_results: dict
step_count: int # ステップカウンター
max_steps: int # 最大ステップ数
def should_continue(state: AgentStateWithCounter) -> str:
"""ループを続けるか判定"""
if state.get("step_count", 0) >= state.get("max_steps", 5):
return "final_response" # ステップ上限到達時は終了
return "tool_executor"
# グラフにこの判定をエッジとして追加
workflow.add_conditional_edges(
"agent",
should_continue,
{
"tool_executor": "tool_executor",
"final_response": "final_response",
}
)
Issue 3: トークンコストの急増
複数ステップのワークフローでは、会話履歴がStateに蓄積され、LLM呼び出しのたびにトークン消費が増加します。特にOpenAI APIでは月間コストが予想外に膨らむ危険があります。
対策:定期的に会話履歴を圧縮し、重要な情報のみを保持します。
def compress_messages(messages: Sequence) -> Sequence:
"""古いメッセージを圧縮(最新10件のみ保持)"""
if len(messages) > 10:
# 最初のメッセージと最後の10件を保持
return messages[:1] + messages[-10:]
return messages
# ツール実行後に圧縮を実行
def tool_executor_node_optimized(state: AgentState) -> AgentState:
# ... ツール実行ロジック ...
compressed_messages = compress_messages(state["messages"])
return {
"messages": compressed_messages,
"current_task": state["current_task"],
"tool_results": state["tool_results"],
}
実用的な拡張パターン:並列ツール実行
複数のツールを同時に実行したい場合、LangGraphはこれをネイティブサポートしています。以下は、顧客情報と注文履歴を同時取得する例です。
from concurrent.futures import ThreadPoolExecutor
import threading
def parallel_tool_executor_node(state: AgentState) -> AgentState:
"""複数ツールを並列実行"""
import json
task = json.loads(state["current_task"])
customer_id = task.get("customer_id")
results = {}
# ThreadPoolExecutorで並列実行
with ThreadPoolExecutor(max_workers=2) as executor:
future_customer = executor.submit(search_customer_database, customer_id)
future_orders = executor.submit(get_order_history, customer_id)
results["customer_info"] = future_customer.result()
results["orders"] = future_orders.result()
return {
"messages": state["messages"],
"current_task": state["current_task"],
"tool_results": {**state.get("tool_results", {}), **results},
}
# グラフ構築時:agent と parallel_tool_executor をつなぐ
workflow.add_node("parallel_tool_executor", parallel_tool_executor_node)
workflow.add_edge("agent", "parallel_tool_executor")
workflow.add_edge("parallel_tool_executor", "final_response")
この方法により、ツール実行時間がほぼ半減します(シリアル実行時の合計時間が2秒なら、並列実行は約1秒)。
LangGraph agentと他ツールの比較
| ツール | グラフ構造 | 条件分岐 | 状態管理 | 推奨用途 |
|---|---|---|---|---|
| LangGraph | ✓ ネイティブ | ✓ 柔軟 | ✓ TypedDict | 複雑なマルチステップワークフロー |
| LangChain Chain | ✗ シーケンシャルのみ | △ 限定的 | △ 基本的 | シンプルな処理チェーン |
| Temporal | ✓ 高度 | ✓ 高度 | ✓ 永続化 | エンタープライズワークフロー・再実行必須 |
| Apache Airflow | ✓ DAG | ✓ 複雑 | ✓ 永続化 | バッチ処理・スケジューリング |
結論として、AIエージェントの迅速な実装にはLangGraph、エンタープライズ規模の信頼性が必要ならTemporalやAirflowを検討してください。
実装時のベストプラクティス
1. Stateのスキーマを明確に定義する
型安全性を確保し、デバッグを簡単にします。
class AgentState(TypedDict, total=False): # total=False で全フィールドがオプション
messages: Annotated[Sequence, operator.add] # メッセージの追加を許可
current_task: str
tool_results: dict
step_count: int
error_log: list # デバッグ用エラーログ
2. ツール実行の結果を常にStateに記録する
後の分析やデバッグに必須です。
def tool_executor_node(state: AgentState) -> AgentState:
# ツール実行...
# 実行ログを記録
log_entry = {
"tool": tool_name,
"timestamp": datetime.now().isoformat(),
"success": tool_name != "none",
"result": results,
}
error_log = state.get("error_log", [])
error_log.append(log_entry)
return {
"messages": state["messages"],
"current_task": state["current_task"],
"tool_results": results,
"error_log": error_log,
}
3. 条件付きエッジで柔軟な遷移を実装する
def route_based_on_result(state: AgentState) -> str:
"""ツール実行結果に基づいて次のノードを判定"""
if state.get("tool_results", {}).get("customer_info", {}).get("error"):
return "error_handler" # エラー処理へ
return "final_response" # 通常処理へ
workflow.add_conditional_edges(
"tool_executor",
route_based_on_result,
{
"error_handler": "error_handler",
"final_response": "final_response",
}
)
パフォーマンス最適化とコスト削減
LangGraph agentのコスト効率化は実装段階から意識すべきです。
トークンコストの最小化
各LLMベンダーのトークン消費量を計測し、不要なAPI呼び出しを削減します。
import tiktoken
def estimate_token_cost(messages: Sequence, model: str = "gpt-4") -> tuple[int, float]:
"""メッセージのトークン数とコストを推定"""
encoding = tiktoken.encoding_for_model(model)
total_tokens = 0
for msg in messages:
total_tokens += len(encoding.encode(msg.content))
# GPT-4: $0.03/1K入力トークン, $0.06/1K出力トークン(概算)
input_cost = (total_tokens / 1000) * 0.03
output_cost = (total_tokens / 1000) * 0.06 # 出力は目安
return total_tokens, input_cost + output_cost
# ワークフロー実行前にコストを見積もる
initial_state = { ... }
tokens, cost = estimate_token_cost(initial_state["messages"])
print(f"推定トークン数: {tokens}, 推定コスト: ${cost:.4f}")
キャッシング戦略
同じツール呼び出しを複数回実行する場合、結果をキャッシュします。
from functools import lru_cache
@lru_cache(maxsize=128) # 最大128個の結果をメモリにキャッシュ
def search_customer_database_cached(customer_id: str) -> dict:
"""キャッシュ付き顧客検索"""
# 実際のDB検索処理
customers = {
"C001": {"name": "山田太郎", "email": "yamada@example.com", "status": "VIP"},
}
return customers.get(customer_id, {"error": "Customer not found"})
デバッグとモニタリング
LangGraph agentは複数ステップで構成されるため、問題特定が難しい場合があります。以下のデバッグ戦略が役立ちます。
Stateダンプによる可視化
import json
def debug_state(state: AgentState, step_name: str) -> None:
"""各ステップ後の状態をJSONで出力"""
debug_output = {
"step": step_name,
"message_count": len(state.get("messages", [])),
"current_task": state.get("current_task", ""),
"tool_results_keys": list(state.get("tool_results", {}).keys()),
}
print(json.dumps(debug_output, indent=2, ensure_ascii=False))
# ノード内で呼び出し
def agent_node(state: AgentState) -> AgentState:
# ... ロジック ...
result = { ... }
debug_state(result, "agent_node")
return result
本番環境でのロギング
import logging
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def agent_node_with_logging(state: AgentState) -> AgentState:
logger.info(f"Agent starting with {len(state.get('messages', []))} messages")
try:
# ... ロジック ...
logger.info("Agent completed successfully")
except Exception as e:
logger.error(f"Agent error: {str(e)}")
raise
return result
実装パターン:リサーチエージェントのミニケーススタディ
ここまでの知識を統合した実践例として、Webリサーチエージェントを構築します。このエージェントは、キーワードを受け取り、複数ステップで情報収集・整理するワークフローです。
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage, AIMessage
from typing import TypedDict, Annotated, Sequence
import operator
from datetime import datetime
class ResearchState(TypedDict):
keyword: str
messages: Annotated[Sequence, operator.add]
search_results: list
analysis: str
final_report: str
step_count: int
def search_node(state: ResearchState) -> ResearchState:
"""キーワードで情報検索(ダミー実装)"""
keyword = state.get("keyword", "")
# 実装例:SerpAPI等の外部API呼び出し
dummy_results = [
{"title": f"{keyword}の基本情報", "snippet": f"{keyword}について説明"},
{"title": f"{keyword}の市場調査", "snippet": "市場規模と成長率"},
{"title": f"{keyword}のトレンド", "snippet": "最新のトレンド情報"},
]
return {
"keyword": state["keyword"],
"messages": state["messages"] + [HumanMessage(content=f"Searched for: {keyword}")],
"search_results": dummy_results,
"analysis": state.get("analysis", ""),
"final_report": state.get("final_report", ""),
"step_count": state.get("step_count", 0) + 1,
}
def analysis_node(state: ResearchState) -> ResearchState:
"""検索結果を分析"""
llm = ChatOpenAI(model="gpt-4", temperature=0)
search_text = "\n".join([r["snippet"] for r in state.get("search_results", [])])
analysis_prompt = f"""以下の検索結果を分析してください:
{search_text}
主要なポイント、トレンド、推奨事項をまとめてください。"""
response = llm.invoke([HumanMessage(content=analysis_prompt)])
return {
"keyword": state["keyword"],
"messages": state["messages"] + [AIMessage(content=response.content)],
"search_results": state["search_results"],
"analysis": response.content,
"final_report": state.get("final_report", ""),
"step_count": state.get("step_count", 0) + 1,
}
def report_generation_node(state: ResearchState) -> ResearchState:
"""最終レポート生成"""
llm = ChatOpenAI(model="gpt-4", temperature=0)
report_prompt = f"""以下の分析に基づいて、正式なレポートを作成してください:
キーワード: {state["keyword"]}
分析: {state["analysis"]}
レポート形式:
# {state['keyword']} リサーチレポート
## 実行日時
## 主要な発見
## 推奨事項
## 次のステップ"""
response = llm.invoke([HumanMessage(content=report_prompt)])
return {
"keyword": state["keyword"],
"messages": state["messages"] + [AIMessage(content=response.content)],
"search_results": state["search_results"],
"analysis": state["analysis"],
"final_report": response.content,
"step_count": state.get("step_count", 0) + 1,
}
# グラフ構築
research_workflow = StateGraph(ResearchState)
research_workflow.add_node("search", search_node)
research_workflow.add_node("analysis", analysis_node)
research_workflow.add_node("report_generation", report_generation_node)
research_workflow.add_edge("search", "analysis")
research_workflow.add_edge("analysis", "report_generation")
research_workflow.add_edge("report_generation", END)
research_workflow.set_entry_point("search")
research_app = research_workflow.compile()
# 実行
research_state = {
"keyword": "生成AI市場トレンド",
"messages": [],
"search_results": [],
"analysis": "",
"final_report": "",
"step_count": 0,
}
result = research_app.invoke(research_state)
print("=== リサーチレポート ===")
print(result["final_report"])