LangGraph agentでワークフロー自動化を実装する実践ガイド

本記事では、LangGraphを使用して複雑なAIエージェントワークフローを構築し、実際のビジネスプロセスを自動化する具体的な方法を解説します。実装例とハマりポイント対策を含めて、すぐに仕事で活用できるレベルまで掘り下げます。

LangGraph agentとは?ワークフロー自動化の新しいアプローチ

LangGraphは、LangChainエコシステムの一部として提供されるグラフベースのワークフロー構築ライブラリです。従来のシーケンシャルなチェーン処理と異なり、条件分岐、ループ、並列処理を直感的に表現できます。

実務では、以下のような場面でLangGraph agentが活躍します:

  • カスタマーサポートの自動応答(複数の外部APIを組み合わせた問題解決)
  • データ分析ワークフロー(データ取得→前処理→分析→レポート生成)
  • コンテンツ生成パイプライン(調査→構成作成→執筆→編集)
  • リサーチエージェント(キーワード検索→情報抽出→統合)

一方、LangGraph agentが向かないケース:

  • 単純な質問応答(通常のLLMで十分)
  • リアルタイム性が最重要(複雑なワークフローは遅延が大きくなる)
  • トークンコストが極めて限定的な場合(複数ステップでコストが積み上がる)

LangGraph agentの基本アーキテクチャを理解する

LangGraph agentの動作原理を視覚化したのが以下の図です。エージェントはノード(処理ステップ)とエッジ(遷移条件)で構成されるグラフを実行します。


flowchart TD
    A["🚀 ユーザーリクエスト"] -->|タスク開始| B["📋 State初期化"]
    B --> C{"🤔 エージェント判定
どのツールを使う?"} C -->|tool_call| D["🔧 外部API実行"] D --> E["💾 State更新
実行結果を記録"] E --> C C -->|finish| F["✅ 最終回答生成"] F --> G["📤 ユーザーに返却"]

このループ構造が、LangGraph agentの強力な点です。エージェントは自身の判断でツール呼び出しを繰り返し、タスク完了まで自動的に進行します。

実装例:実際に動く顧客情報検索エージェント

ここから、実際に動作するコード例を紹介します。テスト環境:macOS 14 / Python 3.12 / LangGraph 0.1.12 / OpenAI API で動作確認済みです。

必要なライブラリのインストール

pip install langgraph langchain langchain-openai python-dotenv

基本的なエージェント構築コード

from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage, AIMessage
from typing import TypedDict, Annotated, Sequence
import operator
import os

# 1. Stateの定義:エージェントが保持する情報
class AgentState(TypedDict):
    messages: Annotated[Sequence, operator.add]
    current_task: str
    tool_results: dict

# 2. ツール関数の実装例
def search_customer_database(customer_id: str) -> dict:
    """顧客情報をダミーデータベースから検索"""
    customers = {
        "C001": {"name": "山田太郎", "email": "yamada@example.com", "status": "VIP"},
        "C002": {"name": "鈴木花子", "email": "suzuki@example.com", "status": "Regular"},
    }
    return customers.get(customer_id, {"error": "Customer not found"})

def get_order_history(customer_id: str) -> dict:
    """注文履歴を取得"""
    orders = {
        "C001": [{"order_id": "O001", "amount": 50000}, {"order_id": "O002", "amount": 30000}],
        "C002": [{"order_id": "O003", "amount": 12000}],
    }
    return {"orders": orders.get(customer_id, [])}

# 3. ノード関数の実装
def agent_node(state: AgentState) -> AgentState:
    """LLMにツール選択を判断させるノード"""
    llm = ChatOpenAI(model="gpt-4", temperature=0, api_key=os.getenv("OPENAI_API_KEY"))
    
    # ツール情報を含めたシステムプロンプト
    system_prompt = """あなたは顧客サポートエージェントです。
利用可能なツール:
1. search_customer_database(customer_id) - 顧客基本情報を検索
2. get_order_history(customer_id) - 注文履歴を取得

ユーザーリクエストに応じて、どのツールを使うかJSONで返してください。
形式: {"tool": "tool_name", "customer_id": "C001"}
不要な場合は {"tool": "none"} を返してください。"""
    
    messages = state["messages"] + [HumanMessage(content=system_prompt)]
    response = llm.invoke(messages)
    
    return {
        "messages": state["messages"] + [AIMessage(content=response.content)],
        "current_task": response.content,
        "tool_results": state.get("tool_results", {}),
    }

def tool_executor_node(state: AgentState) -> AgentState:
    """エージェントの判定に基づいてツールを実行"""
    import json
    
    try:
        task = json.loads(state["current_task"])
        tool_name = task.get("tool")
        customer_id = task.get("customer_id")
        
        results = {}
        if tool_name == "search_customer_database":
            results["customer_info"] = search_customer_database(customer_id)
        elif tool_name == "get_order_history":
            results["orders"] = get_order_history(customer_id)
        
        return {
            "messages": state["messages"],
            "current_task": state["current_task"],
            "tool_results": {**state.get("tool_results", {}), **results},
        }
    except json.JSONDecodeError:
        return state

def final_response_node(state: AgentState) -> AgentState:
    """ツール実行結果から最終回答を生成"""
    llm = ChatOpenAI(model="gpt-4", temperature=0)
    
    tool_results_text = str(state.get("tool_results", {}))
    final_prompt = f"""以下のツール実行結果に基づいて、ユーザーに対する最終回答を作成してください。
結果:{tool_results_text}"""
    
    response = llm.invoke([HumanMessage(content=final_prompt)])
    
    return {
        "messages": state["messages"] + [AIMessage(content=response.content)],
        "current_task": state["current_task"],
        "tool_results": state["tool_results"],
    }

# 4. グラフの構築
workflow = StateGraph(AgentState)

# ノードを追加
workflow.add_node("agent", agent_node)
workflow.add_node("tool_executor", tool_executor_node)
workflow.add_node("final_response", final_response_node)

# エッジを定義
workflow.add_edge("agent", "tool_executor")
workflow.add_edge("tool_executor", "final_response")
workflow.add_edge("final_response", END)

# 開始ノードを指定
workflow.set_entry_point("agent")

# グラフをコンパイル
app = workflow.compile()

# 5. エージェントの実行
initial_state = {
    "messages": [HumanMessage(content="顧客C001の情報と注文履歴を取得してください")],
    "current_task": "",
    "tool_results": {},
}

result = app.invoke(initial_state)
print("最終結果:", result["messages"][-1].content)

実装時の重要なハマりポイントと対策

Issue 1: JSONパースエラーでエージェントが停止する

実務でよくあるエラーです。LLMの出力がJSONとしてパースできず、ツール実行段階で例外が発生します。

対策:システムプロンプトにJSON形式を明確に指示し、パース失敗時のフォールバック処理を実装します。

def tool_executor_node(state: AgentState) -> AgentState:
    """ツール実行時のエラーハンドリング強化版"""
    import json
    
    try:
        # 最初の試行:JSONパース
        task = json.loads(state["current_task"])
    except json.JSONDecodeError:
        # フォールバック:テキスト検索で対応
        if "customer" in state["current_task"].lower():
            task = {"tool": "search_customer_database", "customer_id": "C001"}
        else:
            task = {"tool": "none"}
    
    tool_name = task.get("tool")
    customer_id = task.get("customer_id", "")
    
    results = {}
    if tool_name == "search_customer_database" and customer_id:
        results["customer_info"] = search_customer_database(customer_id)
    elif tool_name == "get_order_history" and customer_id:
        results["orders"] = get_order_history(customer_id)
    
    return {
        "messages": state["messages"],
        "current_task": state["current_task"],
        "tool_results": {**state.get("tool_results", {}), **results},
    }

Issue 2: 無限ループに陥るエージェント

ツール実行後も同じツールを呼び続けるケースです。特に外部APIのレスポンスが曖昧な場合に発生します。

対策:ループカウンター機能と最大ステップ数の制限を実装します。

# ループ防止機構を追加したState
class AgentStateWithCounter(TypedDict):
    messages: Annotated[Sequence, operator.add]
    current_task: str
    tool_results: dict
    step_count: int  # ステップカウンター
    max_steps: int  # 最大ステップ数

def should_continue(state: AgentStateWithCounter) -> str:
    """ループを続けるか判定"""
    if state.get("step_count", 0) >= state.get("max_steps", 5):
        return "final_response"  # ステップ上限到達時は終了
    return "tool_executor"

# グラフにこの判定をエッジとして追加
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "tool_executor": "tool_executor",
        "final_response": "final_response",
    }
)

Issue 3: トークンコストの急増

複数ステップのワークフローでは、会話履歴がStateに蓄積され、LLM呼び出しのたびにトークン消費が増加します。特にOpenAI APIでは月間コストが予想外に膨らむ危険があります。

対策:定期的に会話履歴を圧縮し、重要な情報のみを保持します。

def compress_messages(messages: Sequence) -> Sequence:
    """古いメッセージを圧縮(最新10件のみ保持)"""
    if len(messages) > 10:
        # 最初のメッセージと最後の10件を保持
        return messages[:1] + messages[-10:]
    return messages

# ツール実行後に圧縮を実行
def tool_executor_node_optimized(state: AgentState) -> AgentState:
    # ... ツール実行ロジック ...
    
    compressed_messages = compress_messages(state["messages"])
    return {
        "messages": compressed_messages,
        "current_task": state["current_task"],
        "tool_results": state["tool_results"],
    }

実用的な拡張パターン:並列ツール実行

複数のツールを同時に実行したい場合、LangGraphはこれをネイティブサポートしています。以下は、顧客情報と注文履歴を同時取得する例です。

from concurrent.futures import ThreadPoolExecutor
import threading

def parallel_tool_executor_node(state: AgentState) -> AgentState:
    """複数ツールを並列実行"""
    import json
    
    task = json.loads(state["current_task"])
    customer_id = task.get("customer_id")
    
    results = {}
    
    # ThreadPoolExecutorで並列実行
    with ThreadPoolExecutor(max_workers=2) as executor:
        future_customer = executor.submit(search_customer_database, customer_id)
        future_orders = executor.submit(get_order_history, customer_id)
        
        results["customer_info"] = future_customer.result()
        results["orders"] = future_orders.result()
    
    return {
        "messages": state["messages"],
        "current_task": state["current_task"],
        "tool_results": {**state.get("tool_results", {}), **results},
    }

# グラフ構築時:agent と parallel_tool_executor をつなぐ
workflow.add_node("parallel_tool_executor", parallel_tool_executor_node)
workflow.add_edge("agent", "parallel_tool_executor")
workflow.add_edge("parallel_tool_executor", "final_response")

この方法により、ツール実行時間がほぼ半減します(シリアル実行時の合計時間が2秒なら、並列実行は約1秒)。

LangGraph agentと他ツールの比較

ツール グラフ構造 条件分岐 状態管理 推奨用途
LangGraph ✓ ネイティブ ✓ 柔軟 ✓ TypedDict 複雑なマルチステップワークフロー
LangChain Chain ✗ シーケンシャルのみ △ 限定的 △ 基本的 シンプルな処理チェーン
Temporal ✓ 高度 ✓ 高度 ✓ 永続化 エンタープライズワークフロー・再実行必須
Apache Airflow ✓ DAG ✓ 複雑 ✓ 永続化 バッチ処理・スケジューリング

結論として、AIエージェントの迅速な実装にはLangGraph、エンタープライズ規模の信頼性が必要ならTemporalやAirflowを検討してください。

実装時のベストプラクティス

1. Stateのスキーマを明確に定義する

型安全性を確保し、デバッグを簡単にします。

class AgentState(TypedDict, total=False):  # total=False で全フィールドがオプション
    messages: Annotated[Sequence, operator.add]  # メッセージの追加を許可
    current_task: str
    tool_results: dict
    step_count: int
    error_log: list  # デバッグ用エラーログ

2. ツール実行の結果を常にStateに記録する

後の分析やデバッグに必須です。

def tool_executor_node(state: AgentState) -> AgentState:
    # ツール実行...
    
    # 実行ログを記録
    log_entry = {
        "tool": tool_name,
        "timestamp": datetime.now().isoformat(),
        "success": tool_name != "none",
        "result": results,
    }
    
    error_log = state.get("error_log", [])
    error_log.append(log_entry)
    
    return {
        "messages": state["messages"],
        "current_task": state["current_task"],
        "tool_results": results,
        "error_log": error_log,
    }

3. 条件付きエッジで柔軟な遷移を実装する

def route_based_on_result(state: AgentState) -> str:
    """ツール実行結果に基づいて次のノードを判定"""
    if state.get("tool_results", {}).get("customer_info", {}).get("error"):
        return "error_handler"  # エラー処理へ
    return "final_response"  # 通常処理へ

workflow.add_conditional_edges(
    "tool_executor",
    route_based_on_result,
    {
        "error_handler": "error_handler",
        "final_response": "final_response",
    }
)
💡 実務のヒント: 本番環境ではLangGraph agentの実行ログを外部ストレージ(CloudWatch、DatadogなどのAPM)に保存することをお勧めします。特に、tool_results とエラーログは後のトラブルシューティングに極めて重要です。

パフォーマンス最適化とコスト削減

LangGraph agentのコスト効率化は実装段階から意識すべきです。

トークンコストの最小化

各LLMベンダーのトークン消費量を計測し、不要なAPI呼び出しを削減します。

import tiktoken

def estimate_token_cost(messages: Sequence, model: str = "gpt-4") -> tuple[int, float]:
    """メッセージのトークン数とコストを推定"""
    encoding = tiktoken.encoding_for_model(model)
    
    total_tokens = 0
    for msg in messages:
        total_tokens += len(encoding.encode(msg.content))
    
    # GPT-4: $0.03/1K入力トークン, $0.06/1K出力トークン(概算)
    input_cost = (total_tokens / 1000) * 0.03
    output_cost = (total_tokens / 1000) * 0.06  # 出力は目安
    
    return total_tokens, input_cost + output_cost

# ワークフロー実行前にコストを見積もる
initial_state = { ... }
tokens, cost = estimate_token_cost(initial_state["messages"])
print(f"推定トークン数: {tokens}, 推定コスト: ${cost:.4f}")

キャッシング戦略

同じツール呼び出しを複数回実行する場合、結果をキャッシュします。

from functools import lru_cache

@lru_cache(maxsize=128)  # 最大128個の結果をメモリにキャッシュ
def search_customer_database_cached(customer_id: str) -> dict:
    """キャッシュ付き顧客検索"""
    # 実際のDB検索処理
    customers = {
        "C001": {"name": "山田太郎", "email": "yamada@example.com", "status": "VIP"},
    }
    return customers.get(customer_id, {"error": "Customer not found"})

デバッグとモニタリング

LangGraph agentは複数ステップで構成されるため、問題特定が難しい場合があります。以下のデバッグ戦略が役立ちます。

Stateダンプによる可視化

import json

def debug_state(state: AgentState, step_name: str) -> None:
    """各ステップ後の状態をJSONで出力"""
    debug_output = {
        "step": step_name,
        "message_count": len(state.get("messages", [])),
        "current_task": state.get("current_task", ""),
        "tool_results_keys": list(state.get("tool_results", {}).keys()),
    }
    print(json.dumps(debug_output, indent=2, ensure_ascii=False))

# ノード内で呼び出し
def agent_node(state: AgentState) -> AgentState:
    # ... ロジック ...
    result = { ... }
    debug_state(result, "agent_node")
    return result

本番環境でのロギング

import logging

logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

def agent_node_with_logging(state: AgentState) -> AgentState:
    logger.info(f"Agent starting with {len(state.get('messages', []))} messages")
    
    try:
        # ... ロジック ...
        logger.info("Agent completed successfully")
    except Exception as e:
        logger.error(f"Agent error: {str(e)}")
        raise
    
    return result

実装パターン:リサーチエージェントのミニケーススタディ

ここまでの知識を統合した実践例として、Webリサーチエージェントを構築します。このエージェントは、キーワードを受け取り、複数ステップで情報収集・整理するワークフローです。

from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage, AIMessage
from typing import TypedDict, Annotated, Sequence
import operator
from datetime import datetime

class ResearchState(TypedDict):
    keyword: str
    messages: Annotated[Sequence, operator.add]
    search_results: list
    analysis: str
    final_report: str
    step_count: int

def search_node(state: ResearchState) -> ResearchState:
    """キーワードで情報検索(ダミー実装)"""
    keyword = state.get("keyword", "")
    
    # 実装例:SerpAPI等の外部API呼び出し
    dummy_results = [
        {"title": f"{keyword}の基本情報", "snippet": f"{keyword}について説明"},
        {"title": f"{keyword}の市場調査", "snippet": "市場規模と成長率"},
        {"title": f"{keyword}のトレンド", "snippet": "最新のトレンド情報"},
    ]
    
    return {
        "keyword": state["keyword"],
        "messages": state["messages"] + [HumanMessage(content=f"Searched for: {keyword}")],
        "search_results": dummy_results,
        "analysis": state.get("analysis", ""),
        "final_report": state.get("final_report", ""),
        "step_count": state.get("step_count", 0) + 1,
    }

def analysis_node(state: ResearchState) -> ResearchState:
    """検索結果を分析"""
    llm = ChatOpenAI(model="gpt-4", temperature=0)
    
    search_text = "\n".join([r["snippet"] for r in state.get("search_results", [])])
    analysis_prompt = f"""以下の検索結果を分析してください:
{search_text}

主要なポイント、トレンド、推奨事項をまとめてください。"""
    
    response = llm.invoke([HumanMessage(content=analysis_prompt)])
    
    return {
        "keyword": state["keyword"],
        "messages": state["messages"] + [AIMessage(content=response.content)],
        "search_results": state["search_results"],
        "analysis": response.content,
        "final_report": state.get("final_report", ""),
        "step_count": state.get("step_count", 0) + 1,
    }

def report_generation_node(state: ResearchState) -> ResearchState:
    """最終レポート生成"""
    llm = ChatOpenAI(model="gpt-4", temperature=0)
    
    report_prompt = f"""以下の分析に基づいて、正式なレポートを作成してください:
キーワード: {state["keyword"]}
分析: {state["analysis"]}

レポート形式:
# {state['keyword']} リサーチレポート
## 実行日時
## 主要な発見
## 推奨事項
## 次のステップ"""
    
    response = llm.invoke([HumanMessage(content=report_prompt)])
    
    return {
        "keyword": state["keyword"],
        "messages": state["messages"] + [AIMessage(content=response.content)],
        "search_results": state["search_results"],
        "analysis": state["analysis"],
        "final_report": response.content,
        "step_count": state.get("step_count", 0) + 1,
    }

# グラフ構築
research_workflow = StateGraph(ResearchState)
research_workflow.add_node("search", search_node)
research_workflow.add_node("analysis", analysis_node)
research_workflow.add_node("report_generation", report_generation_node)

research_workflow.add_edge("search", "analysis")
research_workflow.add_edge("analysis", "report_generation")
research_workflow.add_edge("report_generation", END)
research_workflow.set_entry_point("search")

research_app = research_workflow.compile()

# 実行
research_state = {
    "keyword": "生成AI市場トレンド",
    "messages": [],
    "search_results": [],
    "analysis": "",
    "final_report": "",
    "step_count": 0,
}

result = research_app.invoke(research_state)
print("=== リサーチレポート ===")
print(result["final_report"])

公式リソースと参考資料