LLM本番運用で見落とされるObservability:監視体制の構築と実装パターン

LLMを本番環境で運用する際、出力品質の低下やコスト超過に気づくのが遅れるのは、監視体制が不十分だからです。本記事では、LLMの動作状態をリアルタイムで可視化し、問題を早期発見するObservability戦略と実装コードを紹介します。

LLM運用における監視の課題

LLMアプリケーションは従来のWebアプリケーションとは異なる監視が必要です。実務では、以下のような問題が頻繁に発生します:

  • 出力品質の劣化を検知できない:APIの応答は返ってくるが、実務で使えないレベルの回答が増えてくる
  • トークンコストの爆増に気づくのが遅い:月末の請求書で初めて異常を発見
  • プロンプトインジェクション攻撃を検知できない:ユーザー入力の異常なパターンが見逃される
  • レイテンシの増加原因が不明:APIレスポンス時間が遅いのか、その後の処理が遅いのか特定できない
  • ハルシネーション(幻覚)の頻度が追跡できない:ユーザーからの報告で初めて判明する

これらは単なるログやメトリクスでは検知できません。LLM固有の動作パターンを理解した上で、多層的な監視体制が必要です。

LLM Observabilityの3本柱

Observabilityは一般的に「Metrics(メトリクス)」「Logs(ログ)」「Traces(トレース)」の3本柱で構成されますが、LLMの場合は追加の監視レイヤーが重要です:


graph TD
    A[LLM本番運用] --> B[Metrics]
    A --> C[Logs]
    A --> D[Traces]
    A --> E[LLM固有の監視]
    
    B --> B1[レスポンス時間]
    B --> B2[トークン使用量]
    B --> B3[エラー率]
    
    C --> C1[プロンプト内容]
    C --> C2[出力内容]
    C --> C3[モデル選択]
    
    D --> D1[API呼び出し順序]
    D --> D2[処理フロー全体]
    
    E --> E1[出力品質スコア]
    E --> E2[ハルシネーション検知]
    E --> E3[コスト効率]
    E --> E4[プロンプト注入検知]

1. メトリクス:数値で把握する運用状態

LLMアプリケーションで監視すべき主要メトリクスは以下の通りです:

  • Token Usage Metrics:入力トークン数、出力トークン数、合計使用量。日時別の推移
  • Latency Metrics:時間から秒単位での応答時間分布(p50、p95、p99)
  • Error Rate:API呼び出しの失敗率、レート制限エラー、認証エラーの内訳
  • Cost Metrics:1リクエスト当たりのコスト、ユーザー別コスト、モデル別コスト
  • Output Quality Score:出力の満足度スコア、ユーザー評価の集計

2. ログ:詳細なデバッグ情報

構造化ログは問題の根本原因特定に不可欠です。プロンプト内容、モデル選択、出力内容など、後から検索可能な形式で記録する必要があります。

3. トレース:システム全体の処理フロー

LLMアプリケーションは複数のAPI呼び出しや外部システムとの連携を含むため、処理全体を追跡できるトレース機能が重要です。

実装例1:Python + Langsmith + OpenAIでの監視

LangsmithはLangChainエコシステムの監視・デバッグプラットフォームで、LLM特化の監視が可能です。以下は実務で使えるサンプルコードです。

基本的なセットアップ

import os
from langsmith import Client
from langchain_openai import ChatOpenAI
from langchain import hub
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.tools import tool
import json
from datetime import datetime

# Langsmith初期化
os.environ["LANGSMITH_API_KEY"] = "your-api-key"
os.environ["LANGSMITH_PROJECT"] = "production-monitoring"
client = Client()

# OpenAI初期化
llm = ChatOpenAI(
    model="gpt-4",
    temperature=0.7,
    api_key=os.environ.get("OPENAI_API_KEY")
)

# カスタムツール定義
@tool
def search_database(query: str) -> str:
    """データベースから情報を検索"""
    return f"検索結果: {query}"

@tool
def validate_output(text: str) -> dict:
    """出力の有効性を検証"""
    return {
        "is_valid": len(text) > 0,
        "length": len(text),
        "timestamp": datetime.now().isoformat()
    }

tools = [search_database, validate_output]

# エージェント作成
prompt = hub.pull("hwchase17/openai-tools-agent-prompt")
agent = create_openai_tools_agent(llm, tools, prompt)
agent_executor = AgentExecutor.from_agent_and_tools(
    agent=agent,
    tools=tools,
    verbose=True,
    max_iterations=3
)

トークン使用量と出力品質を記録するラッパー

import logging
from functools import wraps
from typing import Any, Dict
import time

# 構造化ログの設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(message)s'
)
logger = logging.getLogger(__name__)

class LLMObservabilityWrapper:
    """LLM監視用ラッパークラス"""
    
    def __init__(self, llm_client, metrics_exporter=None):
        self.llm = llm_client
        self.metrics_exporter = metrics_exporter
        self.call_count = 0
        self.total_input_tokens = 0
        self.total_output_tokens = 0
        self.total_cost = 0.0
    
    def estimate_cost(self, input_tokens: int, output_tokens: int, 
                      model: str = "gpt-4") -> float:
        """トークン数からコストを推定(GPT-4の例)"""
        # 2024年現在のGPT-4 pricing
        input_cost_per_1k = 0.03
        output_cost_per_1k = 0.06
        
        cost = (input_tokens / 1000) * input_cost_per_1k + \
               (output_tokens / 1000) * output_cost_per_1k
        return round(cost, 6)
    
    def track_llm_call(self, func):
        """LLM呼び出しを監視するデコレータ"""
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            call_id = f"call_{self.call_count}_{int(start_time)}"
            
            try:
                # LLM呼び出し実行
                result = func(*args, **kwargs)
                latency = time.time() - start_time
                
                # トークン数を取得(OpenAIの場合)
                input_tokens = result.get("usage", {}).get("prompt_tokens", 0)
                output_tokens = result.get("usage", {}).get("completion_tokens", 0)
                
                # コスト計算
                cost = self.estimate_cost(input_tokens, output_tokens)
                self.total_cost += cost
                self.total_input_tokens += input_tokens
                self.total_output_tokens += output_tokens
                self.call_count += 1
                
                # 構造化ログ出力
                log_entry = {
                    "call_id": call_id,
                    "timestamp": datetime.now().isoformat(),
                    "input_tokens": input_tokens,
                    "output_tokens": output_tokens,
                    "total_tokens": input_tokens + output_tokens,
                    "latency_seconds": round(latency, 3),
                    "estimated_cost": cost,
                    "cumulative_cost": round(self.total_cost, 6),
                    "status": "success"
                }
                
                logger.info(f"LLM_CALL: {json.dumps(log_entry)}")
                
                # メトリクス送信
                if self.metrics_exporter:
                    self.metrics_exporter.export({
                        "metric": "llm.tokens.used",
                        "value": input_tokens + output_tokens,
                        "labels": {"call_id": call_id}
                    })
                    self.metrics_exporter.export({
                        "metric": "llm.latency",
                        "value": latency,
                        "labels": {"call_id": call_id}
                    })
                
                return result
                
            except Exception as e:
                latency = time.time() - start_time
                error_log = {
                    "call_id": call_id,
                    "timestamp": datetime.now().isoformat(),
                    "error": str(e),
                    "latency_seconds": round(latency, 3),
                    "status": "error"
                }
                logger.error(f"LLM_CALL_ERROR: {json.dumps(error_log)}")
                raise
        
        return wrapper

# 使用例
wrapper = LLMObservabilityWrapper(llm)

@wrapper.track_llm_call
def call_llm_with_monitoring(prompt: str) -> Dict[str, Any]:
    """LLM呼び出しをモニタリング"""
    response = llm.invoke(prompt)
    
    # 簡略版の使用量情報(実際はOpenAIの詳細情報を取得)
    return {
        "response": response.content,
        "usage": {
            "prompt_tokens": len(prompt.split()),
            "completion_tokens": len(response.content.split())
        }
    }

# 実行例
result = call_llm_with_monitoring("日本の首都は?")
print(f"累積コスト: ${wrapper.total_cost}")
print(f"総トークン使用量: {wrapper.total_input_tokens + wrapper.total_output_tokens}")

実装例2:Datadogを使った本番環視システム

大規模運用ではDatadogのような専門的な監視プラットフォームが有効です。以下はDatadog APMでLLMアプリケーションを監視するコード例です。

Datadogトレーサー統合

from ddtrace import tracer, patch_all
from ddtrace.contrib.flask import patch_flask
import logging

# Datadog パッチ適用
patch_all()
patch_flask()

# ロギングハンドラ設定
from ddtrace.ext import SpanTypes

class DatadogLLMMonitor:
    """Datadogを使用したLLM監視"""
    
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.tracer = tracer
    
    def monitor_llm_request(self, user_id: str, request_type: str):
        """LLMリクエストをDatadogで監視するデコレータ"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                with self.tracer.trace(
                    "llm.request",
                    service=self.service_name,
                    span_type=SpanTypes.WEB
                ) as span:
                    # スパンにカスタムタグ追加
                    span.set_tag("user_id", user_id)
                    span.set_tag("request_type", request_type)
                    span.set_tag("version", "1.0")
                    
                    start_time = time.time()
                    
                    try:
                        result = func(*args, **kwargs)
                        
                        # メトリクス記録
                        latency = time.time() - start_time
                        span.set_tag("latency_ms", int(latency * 1000))
                        span.set_tag("status", "success")
                        
                        # トークン情報をメトリクスとして送信
                        if "usage" in result:
                            span.set_metric(
                                "tokens.input",
                                result["usage"].get("input_tokens", 0)
                            )
                            span.set_metric(
                                "tokens.output",
                                result["usage"].get("output_tokens", 0)
                            )
                        
                        return result
                        
                    except Exception as e:
                        span.set_tag("error", True)
                        span.set_tag("error_type", type(e).__name__)
                        span.log_kv({"event": "error", "message": str(e)})
                        raise
            
            return wrapper
        return decorator

# 使用例
monitor = DatadogLLMMonitor(service_name="my-llm-app")

@monitor.monitor_llm_request(user_id="user_123", request_type="question_answering")
def process_user_query(query: str) -> Dict[str, Any]:
    """ユーザークエリを処理"""
    response = llm.invoke(query)
    return {
        "response": response.content,
        "usage": {
            "input_tokens": 50,
            "output_tokens": 150
        }
    }

実装例3:ハルシネーション検知と出力品質スコアリング

LLM固有の監視として、幻覚(ハルシネーション)の検知と出力品質スコアリングが重要です。

ハルシネーション検知ロジック

from typing import Tuple
import re

class HallucinationDetector:
    """ハルシネーション検知システム"""
    
    def __init__(self, trusted_sources: Dict[str, list]):
        # 既知の正しい情報源
        self.trusted_sources = trusted_sources
        self.detection_score = {}
    
    def detect_contradictions(self, llm_output: str, source_text: str) -> Tuple[bool, float]:
        """
        LLM出力がソースと矛盾しているか検知
        
        戻り値: (は矛盾しているか, 矛盾スコア 0-1)
        """
        # 簡易的な矛盾検知: キーワード抽出と比較
        llm_keywords = set(re.findall(r'\b\w+\b', llm_output.lower()))
        source_keywords = set(re.findall(r'\b\w+\b', source_text.lower()))
        
        # 重要キーワードの不一致率を計算
        if len(source_keywords) == 0:
            return False, 0.0
        
        mismatch_ratio = len(llm_keywords - source_keywords) / len(llm_keywords)
        contradiction_threshold = 0.3
        
        return mismatch_ratio > contradiction_threshold, mismatch_ratio
    
    def check_factual_consistency(self, claims: list, 
                                  knowledge_base: Dict[str, bool]) -> Tuple[bool, float]:
        """
        クレーム(主張)がナレッジベースと一致しているか確認
        
        戻り値: (すべて一致したか, 一致率)
        """
        consistent_claims = sum(
            1 for claim in claims 
            if knowledge_base.get(claim, False)
        )
        consistency_ratio = consistent_claims / len(claims) if claims else 1.0
        
        return consistency_ratio == 1.0, consistency_ratio
    
    def score_output_quality(self, 
                           llm_output: str,
                           reference_source: str = None,
                           expected_claims: list = None) -> Dict[str, float]:
        """
        LLM出力の品質を多角的にスコアリング
        
        戻り値: 各スコア要素を含む辞書
        """
        scores = {
            "overall": 0.0,
            "hallucination_risk": 0.0,
            "consistency": 0.0,
            "completeness": 0.0,
            "clarity": 0.0
        }
        
        # ハルシネーションリスク評価
        if reference_source:
            contradicts, mismatch = self.detect_contradictions(
                llm_output, reference_source
            )
            scores["hallucination_risk"] = 1.0 - mismatch if not contradicts else 0.5
        else:
            scores["hallucination_risk"] = 0.7  # デフォルト値
        
        # 一貫性評価
        if expected_claims:
            consistent, consistency_ratio = self.check_factual_consistency(
                expected_claims,
                {}  # 実務では知識ベースDBを使用
            )
            scores["consistency"] = consistency_ratio
        else:
            scores["consistency"] = 0.8
        
        # 完全性評価:出力の長さが適切か
        if len(llm_output.split()) > 10:
            scores["completeness"] = min(1.0, len(llm_output.split()) / 200)
        else:
            scores["completeness"] = 0.3
        
        # 明確性評価:簡潔性と可読性
        avg_word_length = sum(len(w) for w in llm_output.split()) / len(llm_output.split())
        if 4 < avg_word_length < 8:
            scores["clarity"] = 0.9
        else:
            scores["clarity"] = 0.6
        
        # 総合スコア(重み付け平均)
        weights = {
            "hallucination_risk": 0.4,
            "consistency": 0.3,
            "completeness": 0.15,
            "clarity": 0.15
        }
        
        scores["overall"] = sum(
            scores[key] * weights[key] 
            for key in weights.keys()
        )
        
        return scores

# 使用例
detector = HallucinationDetector(trusted_sources={})

# LLM出力の品質評価
llm_response = "東京は日本の首都で、人口は約1370万人です。"
reference = "東京都は日本の政治経済の中心地で、人口約1400万人の大都市です。"

quality_scores = detector.score_output_quality(
    llm_output=llm_response,
    reference_source=reference
)

print(json.dumps(quality_scores, indent=2))

# ログに記録
logger.info(f"OUTPUT_QUALITY: {json.dumps(quality_scores)}")

# アラート条件
if quality_scores["overall"] < 0.6:
    logger.warning(f"Low quality output detected: {quality_scores['overall']}")

よくある問題とトラブルシューティング

問題1:トークン数が予想より大幅に超過している

原因:システムプロンプトが過度に長い、またはコンテキストウィンドウを効率的に使用していない。

解決策

def analyze_token_inefficiency(prompts_log: list) -> Dict[str, float]:
    """トークン効率の問題を分析"""
    
    results = {
        "avg_input_token_ratio": 0.0,
        "avg_output_token_ratio": 0.0,
        "outlier_count": 0
    }
    
    if not prompts_log:
        return results
    
    input_tokens = [p["input_tokens"] for p in prompts_log]
    output_tokens = [p["output_tokens"] for p in prompts_log]
    
    avg_input = sum(input_tokens) / len(input_tokens)
    avg_output = sum(output_tokens) / len(output_tokens)
    
    # 外れ値検出(IQR法)
    q1_input = sorted(input_tokens)[len(input_tokens)//4]
    q3_input = sorted(input_tokens)[len(input_tokens)*3//4]
    iqr = q3_input - q1_input
    
    outliers = sum(
        1 for t in input_tokens 
        if t > q3_input + 1.5 * iqr
    )
    
    results["avg_input_token_ratio"] = avg_input / (avg_input + avg_output)
    results["avg_output_token_ratio"] = avg_output / (avg_input + avg_output)
    results["outlier_count"] = outliers
    
    return results

問題2:特定のユーザーのみレスポンス時間が遅い

原因:特定ユーザーのリクエストに含まれる長いコンテキストやファイル参照、またはレート制限。

解決策:ユーザー別メトリクスの分析

def analyze_latency_by_user(logs: list) -> Dict[str, Dict[str, float]]:
    """ユーザー別レイテンシ分析"""
    
    user_metrics = {}
    
    for log in logs:
        user_id = log.get("user_id")
        latency = log.get("latency_seconds", 0)
        
        if user_id not in user_metrics:
            user_metrics[user_id] = {
                "count": 0,
                "total_latency": 0,
                "max_latency": 0,
                "min_latency": float('inf')
            }
        
        user_metrics[user_id]["count"] += 1
        user_metrics[user_id]["total_latency"] += latency
        user_metrics[user_id]["max_latency"] = max(
            user_metrics[user_id]["max_latency"], latency
        )
        user_metrics[user_id]["min_latency"] = min(
            user_metrics[user_id]["min_latency"], latency
        )
    
    # 平均値計算と異常検出
    result = {}
    for user_id, metrics in user_metrics.items():
        avg = metrics["total_latency"] / metrics["count"]
        result[user_id] = {
            "avg_latency": round(avg, 3),
            "max_latency": metrics["max_latency"],
            "min_latency": metrics["min_latency"],
            "request_count": metrics["count"],
            "is_outlier": avg > 5.0  # 5秒以上は異常
        }
    
    return result

問題3:コスト急増の原因が特定できない

原因:高価なモデルへの自動フォールバック、リトライロジックの過剰実行、またはプロンプトの無意識な最適化不足。

解決策:モデル別・処理タイプ別のコスト分解

def analyze_cost_breakdown(logs: list) -> Dict[str, Any]:
    """コスト内訳を詳細に分析"""
    
    cost_by_model = {}
    cost_by_request_type = {}
    cost_by_hour = {}
    
    for log in logs:
        model = log.get("model", "unknown")
        request_type = log.get("request_type", "unknown")
        timestamp = log.get("timestamp", "")
        cost = log.get("estimated_cost", 0)
        
        # モデル別
        if model not in cost_by_model:
            cost_by_model[model] = 0
        cost_by_model[model] += cost
        
        # リクエストタイプ別
        if request_type not in cost_by_request_type:
            cost_by_request_type[request_type] = 0
        cost_by_request_type[request_type] += cost
        
        # 時間別
        hour = timestamp[:13] if timestamp else "unknown"
        if hour not in cost_by_hour:
            cost_by_hour[hour] = 0
        cost_by_hour[hour] += cost
    
    # 最もコスト効率の悪いモデルを特定
    worst_model = max(cost_by_model, key=cost_by_model.get)
    worst_percentage = (cost_by_model[worst_model] / sum(cost_by_model.values())) * 100
    
    return {
        "total_cost": sum(cost_by_model.values()),
        "cost_by_model": cost_by_model,
        "cost_by_request_type": cost_by_request_type,
        "cost_by_hour": cost_by_hour,
        "worst_model": worst_model,
        "worst_model_percentage": round(worst_percentage, 1)
    }

本番運用での監視ベストプラクティス

段階的デプロイメント時の監視戦略

新しいモデルやプロンプトをデプロイする際は、段階的に展開しながら監視する必要があります:

  • カナリアデプロイメント(5%):全トラフィックの5%のみ新バージョンに割り当て
  • 品質メトリクス監視:出力スコア、ハルシネーション率、ユーザー評価
  • コスト監視:1リクエスト当たりの平均コストが予算範囲か確認
  • レイテンシ監視:p95レイテンシが許容値内か確認
  • 自動ロールバック:品質スコアが一定値以下なら自動的に前バージョンに戻す

sequenceDiagram
    participant User
    participant LoadBalancer
    participant CanaryVersion as Canary Version (5%)
    participant StableVersion as Stable Version (95%)
    participant Monitor
    participant AlertSystem
    
    User->>LoadBalancer: Request
    LoadBalancer->>CanaryVersion: 5% of traffic
    LoadBalancer->>StableVersion: 95% of traffic
    
    CanaryVersion->>Monitor: Send metrics
    StableVersion->>Monitor: Send metrics
    
    Monitor->>Monitor: Compare quality scores
    alt Quality degradation detected
        Monitor->>AlertSystem: Alert!
        AlertSystem->>LoadBalancer: Rollback to stable
    else All metrics OK
        Monitor->>AlertSystem: Proceed to 25%
    end

アラート設定の実例

class LLMAlertingSystem:
    """LLMアプリケーション用アラートシステム"""
    
    def __init__(self):
        self.alert_rules = [
            {
                "name": "high_error_rate",
                "metric": "error_rate",
                "threshold": 0.05,  # 5%以上
                "window_minutes": 5,
                "severity": "critical",
                "action": "page_on_call"
            },
            {
                "name": "high_latency",
                "metric": "p95_latency",
                "threshold": 10.0,  # 10秒以上
                "window_minutes": 10,
                "severity": "warning",
                "action": "create_incident"
            },
            {
                "name": "cost_spike",
                "metric": "hourly_cost",
                "threshold": 500,  # $500以上
                "window_minutes": 60,
                "severity": "warning",
                "action": "notify_team"
            },
            {
                "name": "quality_degradation",
                "metric":
    
K
AWS・Python・生成AIを専門とするソフトウェアエンジニア。AI・クラウド・開発ワークフローの実践ガイドを執筆しています。詳しく見る →