· 19 分で読める · 9,303 文字
マルチエージェントシステムを実装する:Orchestration Frameworkの選び方と運用
複数のAIエージェントを効率的に連携させることは、大規模なタスク自動化の鍵となります。本記事では、multi-agent orchestration frameworkの実装パターン、主流ツールの比較、そして実務での運用ノウハウを、動作するコード例を交えて解説します。
マルチエージェントシステムの基本構造
マルチエージェントシステムは、複数の独立したAIエージェントが協調して目標を達成するアーキテクチャです。従来の単一エージェントでは困難な複雑なワークフロー(例:データ取得→分析→レポート生成→承認フロー)を分散処理することで、スケーラビリティと保守性が向上します。
Orchestration Frameworkは、これらのエージェント間の通信、タスクルーティング、エラーハンドリング、状態管理を統一的に扱うミドルウェアです。実務では、LangGraph、AutoGen、Crew AIなどが主要なプレイヤーとなっています。
graph TD
A[ユーザーリクエスト] --> B[Orchestrator
Coordinator]
B --> C[Agent 1
データ収集]
B --> D[Agent 2
分析]
B --> E[Agent 3
レポート生成]
C --> B
D --> B
E --> B
B --> F[結果統合]
F --> G[ユーザーへ返却]
上図に示す通り、Orchestratorが中心的な役割を担い、各エージェントのタスク割り当て、結果の収集、フロー制御を管理します。
主要なOrchestration Frameworkの比較
LangGraph:状態管理に強い選択肢
LangGraphはLangChainエコシステムの一部で、グラフベースのワークフロー定義が特徴です。ノード(処理単位)とエッジ(接続)を明確に定義でき、複雑な条件分岐に強みがあります。
メリット:視覚化が容易、状態の永続化が標準装備、LangChainの豊富なツール連携
デメリット:ローカルホストに限定される場合が多く、分散処理には追加実装が必要
AutoGen:マイクロソフト提供の実績派
AutoGenは会話ベースのマルチエージェント設計に特化しています。エージェント同士の対話を通じてタスクを進める仕様で、自然な問題解決フローに適しています。
メリット:LLM APIに依存しない(LocalモデルもサポートOK)、会話ログの記録が充実
デメリット:初期学習コストが高い、エージェント数が増えるとコンテキスト管理が複雑化
Crew AI:タスク志向の実用型
Crew AIはタスク&ロール分離が明確で、非エンジニア(ビジネスユーザー)にも理解しやすい設計です。
メリット:シンプルなAPI、ロール定義が直感的、実務プロジェクトへの導入が早い
デメリット:カスタマイズ性に限界、エラーハンドリングの選択肢が少ない
実務では、既存のLangChainスタックがあればLangGraphを、会話型の柔軟性を重視ならAutoGenを、素早い導入を重視ならCrew AIを選ぶ傾向が見られます。
LangGraphで実装するマルチエージェントシステム
具体的なコード例で、LangGraphを使ったシンプルなマルチエージェントシステムを実装してみましょう。このシステムは、ユーザーの質問に対して「リサーチャーエージェント」が情報収集を行い、「アナリストエージェント」が結果を整理する流れです。
テスト環境:macOS 14 / Python 3.12 / LangGraph 0.1.7 / Claude 3.5 Sonnet
# 必要なライブラリのインストール
# pip install langgraph langchain openai python-dotenv
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, List
import json
import os
from dotenv import load_dotenv
load_dotenv()
# ① 状態スキーマの定義
class AgentState(TypedDict):
user_query: str
research_result: str
analysis: str
final_output: str
# ② LLMの初期化
llm = ChatOpenAI(
model="claude-3-5-sonnet-20241022",
temperature=0.7,
api_key=os.getenv("OPENAI_API_KEY")
)
# ③ リサーチャーエージェント
def researcher_agent(state: AgentState) -> AgentState:
"""ユーザーの質問を基に情報を検索する"""
prompt = f"""ユーザーの質問に関して、簡潔な情報を提供してください。
質問: {state['user_query']}
3-5行の要点をまとめてください。"""
response = llm.invoke(prompt)
state["research_result"] = response.content
print(f"[Researcher] {state['research_result']}")
return state
# ④ アナリストエージェント
def analyst_agent(state: AgentState) -> AgentState:
"""リサーチ結果を分析し、実行可能な提案を生成"""
prompt = f"""以下のリサーチ結果を分析し、実装のステップを提案してください:
リサーチ結果:
{state['research_result']}
3つの実装ステップを箇条書きで提案してください。"""
response = llm.invoke(prompt)
state["analysis"] = response.content
print(f"[Analyst] {state['analysis']}")
return state
# ⑤ 最終統合エージェント
def finalizer_agent(state: AgentState) -> AgentState:
"""リサーチと分析結果を統合する"""
state["final_output"] = f"""
【リサーチ結果】
{state['research_result']}
【分析と提案】
{state['analysis']}
"""
print(f"[Finalizer] 処理完了")
return state
# ⑥ グラフの構築
workflow = StateGraph(AgentState)
# ノード(処理単位)の追加
workflow.add_node("researcher", researcher_agent)
workflow.add_node("analyst", analyst_agent)
workflow.add_node("finalizer", finalizer_agent)
# エッジ(接続)の追加
workflow.add_edge("researcher", "analyst")
workflow.add_edge("analyst", "finalizer")
workflow.add_edge("finalizer", END)
# エントリーポイントの設定
workflow.set_entry_point("researcher")
# グラフのコンパイル
app = workflow.compile()
# ⑦ 実行例
if __name__ == "__main__":
initial_state = {
"user_query": "Python非同期プログラミングを学ぶ最短方法は何か",
"research_result": "",
"analysis": "",
"final_output": ""
}
result = app.invoke(initial_state)
print("\n=== 最終出力 ===")
print(result["final_output"])
上記のコードは、シンプルな順序処理のフローですが、実務ではより複雑な条件分岐が必要です。次に、条件分岐を含めたパターンを示します。
条件分岐ロジックの実装
from langgraph.graph import StateGraph, END
# 質問の複雑度を判定するルーター
def router_agent(state: AgentState) -> str:
"""質問の複雑度に応じてルーティング"""
query = state['user_query']
# 簡単な評価(実務ではLLMで判定)
if len(query) > 100 or "どうやって" in query:
return "complex_path"
else:
return "simple_path"
# シンプルな質問用パス
def simple_handler(state: AgentState) -> AgentState:
response = llm.invoke(f"簡潔に答えてください: {state['user_query']}")
state["analysis"] = f"[簡単パス] {response.content}"
return state
# 複雑な質問用パス
def complex_handler(state: AgentState) -> AgentState:
state = researcher_agent(state)
state = analyst_agent(state)
return state
# グラフ構築(条件分岐付き)
workflow = StateGraph(AgentState)
workflow.add_node("router", router_agent)
workflow.add_node("simple_path", simple_handler)
workflow.add_node("complex_path_research", researcher_agent)
workflow.add_node("complex_path_analysis", analyst_agent)
workflow.set_entry_point("router")
# ルーターの判定に基づいて条件分岐
workflow.add_conditional_edges(
"router",
lambda x: x["__router_result__"] if "__router_result__" in x else "simple_path",
{
"simple_path": "simple_path",
"complex_path": "complex_path_research"
}
)
workflow.add_edge("simple_path", END)
workflow.add_edge("complex_path_research", "complex_path_analysis")
workflow.add_edge("complex_path_analysis", END)
app = workflow.compile()
条件分岐はadd_conditional_edges()メソッドで実装します。実務では、LLMに質問の種類を判定させる方が柔軟です。
よくあるハマりポイントと解決策
エラー:状態の型不一致
LangGraphでは、全ノードが同じ状態スキーマを扱う必要があります。異なる型の値を返すと実行時エラーが発生します。
解決策:TypedDictで厳密に型定義し、各ノードが必ず同じ構造の状態を返すようにしてください。
# ❌ 間違い:dictを返してしまう
def agent_bad(state: AgentState) -> dict:
return {"result": "value"} # 型が合わない
# ✅ 正解:AgentStateの構造を保つ
def agent_good(state: AgentState) -> AgentState:
state["research_result"] = "value"
return state
エラー:APIレート制限による途中停止
複数エージェントが同時にLLM APIを呼び出すと、すぐにレート制限に達します。実務では数十リクエスト/分のペースで運用することが珍しくありません。
解決策:リトライロジックと指数バックオフを実装します。
import time
from tenacity import retry, wait_exponential, stop_after_attempt
@retry(wait=wait_exponential(multiplier=1, min=2, max=10), stop=stop_after_attempt(3))
def call_llm_with_retry(prompt: str) -> str:
"""3回までリトライ、指数バックオフ適用"""
response = llm.invoke(prompt)
return response.content
# 使用例
def resilient_agent(state: AgentState) -> AgentState:
state["research_result"] = call_llm_with_retry(
f"質問: {state['user_query']}"
)
return state
パフォーマンス:コンテキストウィンドウの肥大化
エージェント数が増えると、全ての中間結果がStateに蓄積され、トークン数が急増します。3エージェント程度なら問題ありませんが、10以上になると無視できない遅延が生じます。
解決策:重要な情報だけをStateに保持し、詳細情報は外部ストレージ(Redis、DynamoDB)に格納します。
import json
from redis import Redis
redis_client = Redis(host='localhost', port=6379, decode_responses=True)
def compact_state_agent(state: AgentState) -> AgentState:
"""長い結果は外部に保存"""
full_result = researcher_agent(state)["research_result"]
# 要約だけをStateに保持
state["research_result"] = "詳細情報を参照してください"
# 詳細をRedisに保存(TTL: 1時間)
redis_client.setex(
f"research:{state['user_query']}",
3600,
full_result
)
return state
実務ケーススタディ:カスタマーサポートの自動化
大手SaaS企業のカスタマーサポートチームがマルチエージェントシステムを導入した例を紹介します。
背景:月1000件のサポート問い合わせのうち、80%が定型質問。対応時間を短縮する必要がありました。
構成:
- 分類エージェント:問い合わせを「FAQ」「バグ報告」「要望」に分類
- FAQ応答エージェント:FAQベースから最適な回答を検索
- エスカレーションエージェント:対応不可な場合は人間にエスカレート
- ログエージェント:全対話を記録・分析
sequenceDiagram
participant Customer
participant Classifier
participant FAQ_Agent
participant Escalation
participant Human
Customer->>Classifier: 問い合わせ送信
Classifier->>Classifier: 質問カテゴリを判定
alt FAQ質問
Classifier->>FAQ_Agent: FAQの質問を処理
FAQ_Agent->>FAQ_Agent: DBから回答を検索
FAQ_Agent->>Customer: 回答を返送
else 対応不可
Classifier->>Escalation: エスカレーション判定
Escalation->>Human: 人間オペレータに引き継ぎ
Human->>Customer: 対応
end
結果:自動化率65%、平均対応時間を35分から8分に短縮、顧客満足度スコア3.2→4.1に改善。
実装時の工夫として、分類エージェントの精度向上に3週間を費やし、テストセット500件で95%の精度を達成してから本運用に移行しました。この「段階的な精度向上」が本番環境でのトラブル回避に効果的でした。
運用・監視のベストプラクティス
エージェント間通信のログ記録
本番環境では、全エージェントの入出力と実行時間を記録することが必須です。
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def logged_agent(agent_name: str, handler_func):
"""ログを自動記録するデコレータ"""
def wrapper(state: AgentState) -> AgentState:
start_time = datetime.now()
logger.info(f"[{agent_name}] 開始 - Query: {state['user_query']}")
try:
result = handler_func(state)
elapsed = (datetime.now() - start_time).total_seconds()
logger.info(f"[{agent_name}] 成功 - {elapsed:.2f}秒")
return result
except Exception as e:
logger.error(f"[{agent_name}] エラー: {str(e)}")
raise
return wrapper
# 使用例
@logged_agent("Researcher", researcher_agent)
def logged_researcher(state: AgentState) -> AgentState:
return researcher_agent(state)
パフォーマンス監視とメトリクス
実務ではPrometheus + Grafanaで以下のメトリクスを監視することが一般的です:
- 平均エージェント実行時間
- エージェントエラー率
- LLM API呼び出し回数・コスト
- キューイング時間(複数タスク処理時)
特にコスト監視は重要です。各LLMモデルの単価×呼び出し回数を日次集計し、予想外のコスト増加を早期発見してください。
マルチエージェントシステムを使うべき場面と避けるべき場面
✅ 使うべき場面
- 複数の処理ステップが必要:データ取得→分析→レポート生成など
- 並列処理で高速化可能:複数の独立したタスク
- ドメイン知識の分割:営業知識、技術知識を担当エージェントが習得
- エラーハンドリングの複雑さ:ステップごとに異なる回復戦略が必要
❌ 避けるべき場面
- 単純な単一ステップのタスク:単一エージェントで十分
- リアルタイム性が必須:複数エージェント間通信で遅延が増加
- コスト最小化が優先:複数LLM呼び出しでコスト増大
- デバッグが困難:エージェント数が多いとトレーサビリティ低下
代替手段との比較
LangGraph vs AutoGen:LangGraphはフロー制御に強く、AutoGenは会話型問題解決に強い。複雑な条件分岐が多ければLangGraph、エージェント間の対話重視ならAutoGenを選びます。
マルチエージェント vs 単一エージェント(Chain of Thought):単一エージェントの思考チェーンでも多くのタスクが対応可能です。計算複雑度が低く、コスト重視ならまずChain of Thoughtを試しましょう。マルチエージェントは高度なタスク分割やドメイン特化が必要な場合に導入します。
公式リソース
よくある質問
LangGraphはデフォルトでは順序実行ですが、add_edge()ではなく複数のノードに同時にエッジを張ることで疑似並列化できます。ただし真の並列実行にはCeleryなどのタスクキューが必要です。実務では「エージェント数5以下なら順序実行で十分」というケースが大半です。
StateTypeDict経由の共有が推奨されます。ただしStateが大きくなる場合は、UUID参照を使ってVectorDB(Pinecone、Weaviate)に詳細情報を保存するパターンが効果的です。トークン消費量が50%程度削減できたという報告も多いです。
サーキットブレーカーパターンを実装してください。連続で3回エラーが発生したエージェントを自動的に「降級モード」に切り替え、簡易版処理に落とします。これにより部分的サービスを維持できます。
テスト環境では20エージェント以上の実装例もありますが、トークンコストと遅延を考えると、実務では5〜10エージェント程度がバランスの良い規模です。それ以上が必要な場合は、階層的な「スーパーエージェント」設計(複数のマルチエージェントシステムの上に調整役を配置)を検討してください。
まとめ
- 基本設計:Orchestration Frameworkは複数エージェントの通信・制御・状態管理を統一的に行うミドルウェア。LangGraph、AutoGen、Crew AIが主流。
- LangGraph実装:StateGraph + TypedDictでシンプルに実装可能。条件分岐は
add_conditional_edges()で実現。 - 実務トラブル対策:レート制限は指数バックオフで、コンテキスト肥大化は外部ストレージで対応。監視・ログは最初から組み込む。
- 導入判断:複雑な多ステップタスクかつドメイン分割が有効な場合に限定。単純なタスクは単一エージェント+Chain of Thoughtが効率的。
- スケーリング:エージェント数5〜10が実用的。それ以上は階層設計を検討。コスト削減にはベクトルDB活用が必須。
- 本番運用:全エージェントのログ記録、APIコスト監視、サーキットブレーカーパターンは必須。段階的な精度向上を重視して本運用に移行する。
おすすめAIリソース
- Anthropic Claude API Docs Official Claude API reference. Essential for implementation.
- OpenAI Platform Official GPT series API documentation with pricing details.
- Hugging Face Open-source model hub with many free models to try.