· 25 分で読める · 12,710 文字
LLM本番運用で見落とされるObservability:監視体制の構築と実装パターン
LLMを本番環境で運用する際、出力品質の低下やコスト超過に気づくのが遅れるのは、監視体制が不十分だからです。本記事では、LLMの動作状態をリアルタイムで可視化し、問題を早期発見するObservability戦略と実装コードを紹介します。
LLM運用における監視の課題
LLMアプリケーションは従来のWebアプリケーションとは異なる監視が必要です。実務では、以下のような問題が頻繁に発生します:
- 出力品質の劣化を検知できない:APIの応答は返ってくるが、実務で使えないレベルの回答が増えてくる
- トークンコストの爆増に気づくのが遅い:月末の請求書で初めて異常を発見
- プロンプトインジェクション攻撃を検知できない:ユーザー入力の異常なパターンが見逃される
- レイテンシの増加原因が不明:APIレスポンス時間が遅いのか、その後の処理が遅いのか特定できない
- ハルシネーション(幻覚)の頻度が追跡できない:ユーザーからの報告で初めて判明する
これらは単なるログやメトリクスでは検知できません。LLM固有の動作パターンを理解した上で、多層的な監視体制が必要です。
LLM Observabilityの3本柱
Observabilityは一般的に「Metrics(メトリクス)」「Logs(ログ)」「Traces(トレース)」の3本柱で構成されますが、LLMの場合は追加の監視レイヤーが重要です:
graph TD
A[LLM本番運用] --> B[Metrics]
A --> C[Logs]
A --> D[Traces]
A --> E[LLM固有の監視]
B --> B1[レスポンス時間]
B --> B2[トークン使用量]
B --> B3[エラー率]
C --> C1[プロンプト内容]
C --> C2[出力内容]
C --> C3[モデル選択]
D --> D1[API呼び出し順序]
D --> D2[処理フロー全体]
E --> E1[出力品質スコア]
E --> E2[ハルシネーション検知]
E --> E3[コスト効率]
E --> E4[プロンプト注入検知]
1. メトリクス:数値で把握する運用状態
LLMアプリケーションで監視すべき主要メトリクスは以下の通りです:
- Token Usage Metrics:入力トークン数、出力トークン数、合計使用量。日時別の推移
- Latency Metrics:時間から秒単位での応答時間分布(p50、p95、p99)
- Error Rate:API呼び出しの失敗率、レート制限エラー、認証エラーの内訳
- Cost Metrics:1リクエスト当たりのコスト、ユーザー別コスト、モデル別コスト
- Output Quality Score:出力の満足度スコア、ユーザー評価の集計
2. ログ:詳細なデバッグ情報
構造化ログは問題の根本原因特定に不可欠です。プロンプト内容、モデル選択、出力内容など、後から検索可能な形式で記録する必要があります。
3. トレース:システム全体の処理フロー
LLMアプリケーションは複数のAPI呼び出しや外部システムとの連携を含むため、処理全体を追跡できるトレース機能が重要です。
実装例1:Python + Langsmith + OpenAIでの監視
LangsmithはLangChainエコシステムの監視・デバッグプラットフォームで、LLM特化の監視が可能です。以下は実務で使えるサンプルコードです。
基本的なセットアップ
import os
from langsmith import Client
from langchain_openai import ChatOpenAI
from langchain import hub
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.tools import tool
import json
from datetime import datetime
# Langsmith初期化
os.environ["LANGSMITH_API_KEY"] = "your-api-key"
os.environ["LANGSMITH_PROJECT"] = "production-monitoring"
client = Client()
# OpenAI初期化
llm = ChatOpenAI(
model="gpt-4",
temperature=0.7,
api_key=os.environ.get("OPENAI_API_KEY")
)
# カスタムツール定義
@tool
def search_database(query: str) -> str:
"""データベースから情報を検索"""
return f"検索結果: {query}"
@tool
def validate_output(text: str) -> dict:
"""出力の有効性を検証"""
return {
"is_valid": len(text) > 0,
"length": len(text),
"timestamp": datetime.now().isoformat()
}
tools = [search_database, validate_output]
# エージェント作成
prompt = hub.pull("hwchase17/openai-tools-agent-prompt")
agent = create_openai_tools_agent(llm, tools, prompt)
agent_executor = AgentExecutor.from_agent_and_tools(
agent=agent,
tools=tools,
verbose=True,
max_iterations=3
)
トークン使用量と出力品質を記録するラッパー
import logging
from functools import wraps
from typing import Any, Dict
import time
# 構造化ログの設定
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(message)s'
)
logger = logging.getLogger(__name__)
class LLMObservabilityWrapper:
"""LLM監視用ラッパークラス"""
def __init__(self, llm_client, metrics_exporter=None):
self.llm = llm_client
self.metrics_exporter = metrics_exporter
self.call_count = 0
self.total_input_tokens = 0
self.total_output_tokens = 0
self.total_cost = 0.0
def estimate_cost(self, input_tokens: int, output_tokens: int,
model: str = "gpt-4") -> float:
"""トークン数からコストを推定(GPT-4の例)"""
# 2024年現在のGPT-4 pricing
input_cost_per_1k = 0.03
output_cost_per_1k = 0.06
cost = (input_tokens / 1000) * input_cost_per_1k + \
(output_tokens / 1000) * output_cost_per_1k
return round(cost, 6)
def track_llm_call(self, func):
"""LLM呼び出しを監視するデコレータ"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
call_id = f"call_{self.call_count}_{int(start_time)}"
try:
# LLM呼び出し実行
result = func(*args, **kwargs)
latency = time.time() - start_time
# トークン数を取得(OpenAIの場合)
input_tokens = result.get("usage", {}).get("prompt_tokens", 0)
output_tokens = result.get("usage", {}).get("completion_tokens", 0)
# コスト計算
cost = self.estimate_cost(input_tokens, output_tokens)
self.total_cost += cost
self.total_input_tokens += input_tokens
self.total_output_tokens += output_tokens
self.call_count += 1
# 構造化ログ出力
log_entry = {
"call_id": call_id,
"timestamp": datetime.now().isoformat(),
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
"latency_seconds": round(latency, 3),
"estimated_cost": cost,
"cumulative_cost": round(self.total_cost, 6),
"status": "success"
}
logger.info(f"LLM_CALL: {json.dumps(log_entry)}")
# メトリクス送信
if self.metrics_exporter:
self.metrics_exporter.export({
"metric": "llm.tokens.used",
"value": input_tokens + output_tokens,
"labels": {"call_id": call_id}
})
self.metrics_exporter.export({
"metric": "llm.latency",
"value": latency,
"labels": {"call_id": call_id}
})
return result
except Exception as e:
latency = time.time() - start_time
error_log = {
"call_id": call_id,
"timestamp": datetime.now().isoformat(),
"error": str(e),
"latency_seconds": round(latency, 3),
"status": "error"
}
logger.error(f"LLM_CALL_ERROR: {json.dumps(error_log)}")
raise
return wrapper
# 使用例
wrapper = LLMObservabilityWrapper(llm)
@wrapper.track_llm_call
def call_llm_with_monitoring(prompt: str) -> Dict[str, Any]:
"""LLM呼び出しをモニタリング"""
response = llm.invoke(prompt)
# 簡略版の使用量情報(実際はOpenAIの詳細情報を取得)
return {
"response": response.content,
"usage": {
"prompt_tokens": len(prompt.split()),
"completion_tokens": len(response.content.split())
}
}
# 実行例
result = call_llm_with_monitoring("日本の首都は?")
print(f"累積コスト: ${wrapper.total_cost}")
print(f"総トークン使用量: {wrapper.total_input_tokens + wrapper.total_output_tokens}")
実装例2:Datadogを使った本番環視システム
大規模運用ではDatadogのような専門的な監視プラットフォームが有効です。以下はDatadog APMでLLMアプリケーションを監視するコード例です。
Datadogトレーサー統合
from ddtrace import tracer, patch_all
from ddtrace.contrib.flask import patch_flask
import logging
# Datadog パッチ適用
patch_all()
patch_flask()
# ロギングハンドラ設定
from ddtrace.ext import SpanTypes
class DatadogLLMMonitor:
"""Datadogを使用したLLM監視"""
def __init__(self, service_name: str):
self.service_name = service_name
self.tracer = tracer
def monitor_llm_request(self, user_id: str, request_type: str):
"""LLMリクエストをDatadogで監視するデコレータ"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
with self.tracer.trace(
"llm.request",
service=self.service_name,
span_type=SpanTypes.WEB
) as span:
# スパンにカスタムタグ追加
span.set_tag("user_id", user_id)
span.set_tag("request_type", request_type)
span.set_tag("version", "1.0")
start_time = time.time()
try:
result = func(*args, **kwargs)
# メトリクス記録
latency = time.time() - start_time
span.set_tag("latency_ms", int(latency * 1000))
span.set_tag("status", "success")
# トークン情報をメトリクスとして送信
if "usage" in result:
span.set_metric(
"tokens.input",
result["usage"].get("input_tokens", 0)
)
span.set_metric(
"tokens.output",
result["usage"].get("output_tokens", 0)
)
return result
except Exception as e:
span.set_tag("error", True)
span.set_tag("error_type", type(e).__name__)
span.log_kv({"event": "error", "message": str(e)})
raise
return wrapper
return decorator
# 使用例
monitor = DatadogLLMMonitor(service_name="my-llm-app")
@monitor.monitor_llm_request(user_id="user_123", request_type="question_answering")
def process_user_query(query: str) -> Dict[str, Any]:
"""ユーザークエリを処理"""
response = llm.invoke(query)
return {
"response": response.content,
"usage": {
"input_tokens": 50,
"output_tokens": 150
}
}
実装例3:ハルシネーション検知と出力品質スコアリング
LLM固有の監視として、幻覚(ハルシネーション)の検知と出力品質スコアリングが重要です。
ハルシネーション検知ロジック
from typing import Tuple
import re
class HallucinationDetector:
"""ハルシネーション検知システム"""
def __init__(self, trusted_sources: Dict[str, list]):
# 既知の正しい情報源
self.trusted_sources = trusted_sources
self.detection_score = {}
def detect_contradictions(self, llm_output: str, source_text: str) -> Tuple[bool, float]:
"""
LLM出力がソースと矛盾しているか検知
戻り値: (は矛盾しているか, 矛盾スコア 0-1)
"""
# 簡易的な矛盾検知: キーワード抽出と比較
llm_keywords = set(re.findall(r'\b\w+\b', llm_output.lower()))
source_keywords = set(re.findall(r'\b\w+\b', source_text.lower()))
# 重要キーワードの不一致率を計算
if len(source_keywords) == 0:
return False, 0.0
mismatch_ratio = len(llm_keywords - source_keywords) / len(llm_keywords)
contradiction_threshold = 0.3
return mismatch_ratio > contradiction_threshold, mismatch_ratio
def check_factual_consistency(self, claims: list,
knowledge_base: Dict[str, bool]) -> Tuple[bool, float]:
"""
クレーム(主張)がナレッジベースと一致しているか確認
戻り値: (すべて一致したか, 一致率)
"""
consistent_claims = sum(
1 for claim in claims
if knowledge_base.get(claim, False)
)
consistency_ratio = consistent_claims / len(claims) if claims else 1.0
return consistency_ratio == 1.0, consistency_ratio
def score_output_quality(self,
llm_output: str,
reference_source: str = None,
expected_claims: list = None) -> Dict[str, float]:
"""
LLM出力の品質を多角的にスコアリング
戻り値: 各スコア要素を含む辞書
"""
scores = {
"overall": 0.0,
"hallucination_risk": 0.0,
"consistency": 0.0,
"completeness": 0.0,
"clarity": 0.0
}
# ハルシネーションリスク評価
if reference_source:
contradicts, mismatch = self.detect_contradictions(
llm_output, reference_source
)
scores["hallucination_risk"] = 1.0 - mismatch if not contradicts else 0.5
else:
scores["hallucination_risk"] = 0.7 # デフォルト値
# 一貫性評価
if expected_claims:
consistent, consistency_ratio = self.check_factual_consistency(
expected_claims,
{} # 実務では知識ベースDBを使用
)
scores["consistency"] = consistency_ratio
else:
scores["consistency"] = 0.8
# 完全性評価:出力の長さが適切か
if len(llm_output.split()) > 10:
scores["completeness"] = min(1.0, len(llm_output.split()) / 200)
else:
scores["completeness"] = 0.3
# 明確性評価:簡潔性と可読性
avg_word_length = sum(len(w) for w in llm_output.split()) / len(llm_output.split())
if 4 < avg_word_length < 8:
scores["clarity"] = 0.9
else:
scores["clarity"] = 0.6
# 総合スコア(重み付け平均)
weights = {
"hallucination_risk": 0.4,
"consistency": 0.3,
"completeness": 0.15,
"clarity": 0.15
}
scores["overall"] = sum(
scores[key] * weights[key]
for key in weights.keys()
)
return scores
# 使用例
detector = HallucinationDetector(trusted_sources={})
# LLM出力の品質評価
llm_response = "東京は日本の首都で、人口は約1370万人です。"
reference = "東京都は日本の政治経済の中心地で、人口約1400万人の大都市です。"
quality_scores = detector.score_output_quality(
llm_output=llm_response,
reference_source=reference
)
print(json.dumps(quality_scores, indent=2))
# ログに記録
logger.info(f"OUTPUT_QUALITY: {json.dumps(quality_scores)}")
# アラート条件
if quality_scores["overall"] < 0.6:
logger.warning(f"Low quality output detected: {quality_scores['overall']}")
よくある問題とトラブルシューティング
問題1:トークン数が予想より大幅に超過している
原因:システムプロンプトが過度に長い、またはコンテキストウィンドウを効率的に使用していない。
解決策:
def analyze_token_inefficiency(prompts_log: list) -> Dict[str, float]:
"""トークン効率の問題を分析"""
results = {
"avg_input_token_ratio": 0.0,
"avg_output_token_ratio": 0.0,
"outlier_count": 0
}
if not prompts_log:
return results
input_tokens = [p["input_tokens"] for p in prompts_log]
output_tokens = [p["output_tokens"] for p in prompts_log]
avg_input = sum(input_tokens) / len(input_tokens)
avg_output = sum(output_tokens) / len(output_tokens)
# 外れ値検出(IQR法)
q1_input = sorted(input_tokens)[len(input_tokens)//4]
q3_input = sorted(input_tokens)[len(input_tokens)*3//4]
iqr = q3_input - q1_input
outliers = sum(
1 for t in input_tokens
if t > q3_input + 1.5 * iqr
)
results["avg_input_token_ratio"] = avg_input / (avg_input + avg_output)
results["avg_output_token_ratio"] = avg_output / (avg_input + avg_output)
results["outlier_count"] = outliers
return results
問題2:特定のユーザーのみレスポンス時間が遅い
原因:特定ユーザーのリクエストに含まれる長いコンテキストやファイル参照、またはレート制限。
解決策:ユーザー別メトリクスの分析
def analyze_latency_by_user(logs: list) -> Dict[str, Dict[str, float]]:
"""ユーザー別レイテンシ分析"""
user_metrics = {}
for log in logs:
user_id = log.get("user_id")
latency = log.get("latency_seconds", 0)
if user_id not in user_metrics:
user_metrics[user_id] = {
"count": 0,
"total_latency": 0,
"max_latency": 0,
"min_latency": float('inf')
}
user_metrics[user_id]["count"] += 1
user_metrics[user_id]["total_latency"] += latency
user_metrics[user_id]["max_latency"] = max(
user_metrics[user_id]["max_latency"], latency
)
user_metrics[user_id]["min_latency"] = min(
user_metrics[user_id]["min_latency"], latency
)
# 平均値計算と異常検出
result = {}
for user_id, metrics in user_metrics.items():
avg = metrics["total_latency"] / metrics["count"]
result[user_id] = {
"avg_latency": round(avg, 3),
"max_latency": metrics["max_latency"],
"min_latency": metrics["min_latency"],
"request_count": metrics["count"],
"is_outlier": avg > 5.0 # 5秒以上は異常
}
return result
問題3:コスト急増の原因が特定できない
原因:高価なモデルへの自動フォールバック、リトライロジックの過剰実行、またはプロンプトの無意識な最適化不足。
解決策:モデル別・処理タイプ別のコスト分解
def analyze_cost_breakdown(logs: list) -> Dict[str, Any]:
"""コスト内訳を詳細に分析"""
cost_by_model = {}
cost_by_request_type = {}
cost_by_hour = {}
for log in logs:
model = log.get("model", "unknown")
request_type = log.get("request_type", "unknown")
timestamp = log.get("timestamp", "")
cost = log.get("estimated_cost", 0)
# モデル別
if model not in cost_by_model:
cost_by_model[model] = 0
cost_by_model[model] += cost
# リクエストタイプ別
if request_type not in cost_by_request_type:
cost_by_request_type[request_type] = 0
cost_by_request_type[request_type] += cost
# 時間別
hour = timestamp[:13] if timestamp else "unknown"
if hour not in cost_by_hour:
cost_by_hour[hour] = 0
cost_by_hour[hour] += cost
# 最もコスト効率の悪いモデルを特定
worst_model = max(cost_by_model, key=cost_by_model.get)
worst_percentage = (cost_by_model[worst_model] / sum(cost_by_model.values())) * 100
return {
"total_cost": sum(cost_by_model.values()),
"cost_by_model": cost_by_model,
"cost_by_request_type": cost_by_request_type,
"cost_by_hour": cost_by_hour,
"worst_model": worst_model,
"worst_model_percentage": round(worst_percentage, 1)
}
本番運用での監視ベストプラクティス
段階的デプロイメント時の監視戦略
新しいモデルやプロンプトをデプロイする際は、段階的に展開しながら監視する必要があります:
- カナリアデプロイメント(5%):全トラフィックの5%のみ新バージョンに割り当て
- 品質メトリクス監視:出力スコア、ハルシネーション率、ユーザー評価
- コスト監視:1リクエスト当たりの平均コストが予算範囲か確認
- レイテンシ監視:p95レイテンシが許容値内か確認
- 自動ロールバック:品質スコアが一定値以下なら自動的に前バージョンに戻す
sequenceDiagram
participant User
participant LoadBalancer
participant CanaryVersion as Canary Version (5%)
participant StableVersion as Stable Version (95%)
participant Monitor
participant AlertSystem
User->>LoadBalancer: Request
LoadBalancer->>CanaryVersion: 5% of traffic
LoadBalancer->>StableVersion: 95% of traffic
CanaryVersion->>Monitor: Send metrics
StableVersion->>Monitor: Send metrics
Monitor->>Monitor: Compare quality scores
alt Quality degradation detected
Monitor->>AlertSystem: Alert!
AlertSystem->>LoadBalancer: Rollback to stable
else All metrics OK
Monitor->>AlertSystem: Proceed to 25%
end
アラート設定の実例
class LLMAlertingSystem:
"""LLMアプリケーション用アラートシステム"""
def __init__(self):
self.alert_rules = [
{
"name": "high_error_rate",
"metric": "error_rate",
"threshold": 0.05, # 5%以上
"window_minutes": 5,
"severity": "critical",
"action": "page_on_call"
},
{
"name": "high_latency",
"metric": "p95_latency",
"threshold": 10.0, # 10秒以上
"window_minutes": 10,
"severity": "warning",
"action": "create_incident"
},
{
"name": "cost_spike",
"metric": "hourly_cost",
"threshold": 500, # $500以上
"window_minutes": 60,
"severity": "warning",
"action": "notify_team"
},
{
"name": "quality_degradation",
"metric":
おすすめAIリソース
- Anthropic Claude API Docs Official Claude API reference. Essential for implementation.
- OpenAI Platform Official GPT series API documentation with pricing details.
- Hugging Face Open-source model hub with many free models to try.