Claude APIのレート制限を超えずに運用する実装パターン

Claude APIのレート制限(RPM・TPM)に引っかかると、アプリケーションは一時的に使用不可になります。この記事では、レート制限の仕組みを理解し、バックオフ戦略・リクエスト調整・監視方法を使った実践的な対処法を4つのパターンで紹介します。すぐに本番環境で活用できるコード例も含めました。

Claude APIのレート制限とは

Claude APIには2つのレート制限があります。

  • RPM(Requests Per Minute):1分あたりのリクエスト数
  • TPM(Tokens Per Minute):1分あたりの入出力トークン数

制限を超えると、HTTPステータス429(Too Many Requests)が返され、Retry-Afterヘッダーに次のリクエスト可能な時間が記載されます。無視してリクエストを続けると、一時的にAPIアクセスがブロックされます。

テスト環境:macOS 14 / Python 3.12 / Claude API 2025-01で動作確認済み

対処法1:指数バックオフで自動再試行

最も一般的な対処法は、429エラー時に待機時間を段階的に増やしながら再試行することです。

実装コード

import anthropic
import time
import random

def call_claude_with_backoff(prompt: str, max_retries: int = 5) -> str:
    """
    指数バックオフを使用してClaudeを呼び出す関数
    """
    client = anthropic.Anthropic()
    
    for attempt in range(max_retries):
        try:
            response = client.messages.create(
                model="claude-3-5-sonnet-20241022",
                max_tokens=1024,
                messages=[
                    {"role": "user", "content": prompt}
                ]
            )
            return response.content[0].text
        
        except anthropic.RateLimitError as e:
            # レート制限エラーを検出
            retry_after = int(e.response.headers.get("retry-after", 60))
            
            # ジッターを追加してサーバー負荷を分散
            wait_time = retry_after + random.uniform(0, 2)
            
            print(f"レート制限に達しました。{wait_time:.1f}秒待機します(試行 {attempt + 1}/{max_retries})")
            time.sleep(wait_time)
            
            if attempt == max_retries - 1:
                raise Exception(f"最大再試行回数に達しました。{max_retries}回の試行後も失敗しました。")
        
        except anthropic.APIError as e:
            # その他のAPIエラー
            print(f"APIエラー: {e}")
            raise
    
    return ""

# 使用例
result = call_claude_with_backoff("日本の首都は?")
print(result)

重要なポイント

  • Retry-Afterヘッダーを使う:サーバーが指定した時間を尊重する
  • ジッター(乱数)を追加:複数クライアントが同時に再試行するのを防ぐ
  • RateLimitErrorを明示的にキャッチ:他のエラーと区別できる

対処法2:リクエスト数を事前に制限する

根本的な対策は、レート制限に到達しないよう、送信するリクエスト数そのものを減らすことです。キューイングシステムを使用します。

実装コード

import anthropic
import asyncio
from datetime import datetime, timedelta
from collections import deque

class RateLimitedClaudeClient:
    """
    レート制限を考慮したClaudeクライアント
    """
    def __init__(self, rpm_limit: int = 60, tpm_limit: int = 90000):
        self.client = anthropic.Anthropic()
        self.rpm_limit = rpm_limit
        self.tpm_limit = tpm_limit
        
        # リクエスト履歴(タイムスタンプとトークン数を記録)
        self.request_history = deque()
        self.token_history = deque()
    
    def _clean_history(self, now: datetime, window_minutes: int = 1):
        """
        指定時間より古い履歴を削除
        """
        cutoff_time = now - timedelta(minutes=window_minutes)
        
        while self.request_history and self.request_history[0] < cutoff_time:
            self.request_history.popleft()
        
        while self.token_history and self.token_history[0][0] < cutoff_time:
            self.token_history.popleft()
    
    def should_wait(self, estimated_tokens: int = 1000) -> tuple[bool, float]:
        """
        待機が必要かチェック。必要な場合は待機時間を返す
        """
        now = datetime.now()
        self._clean_history(now)
        
        requests_in_minute = len(self.request_history)
        tokens_in_minute = sum(tokens for _, tokens in self.token_history)
        
        wait_seconds = 0
        
        # RPM制限チェック
        if requests_in_minute >= self.rpm_limit:
            oldest_request = self.request_history[0]
            wait_seconds = max(wait_seconds, 
                             (oldest_request - now + timedelta(minutes=1)).total_seconds())
        
        # TPM制限チェック
        if tokens_in_minute + estimated_tokens > self.tpm_limit:
            oldest_token = self.token_history[0][0]
            wait_seconds = max(wait_seconds,
                             (oldest_token - now + timedelta(minutes=1)).total_seconds())
        
        return wait_seconds > 0, max(0, wait_seconds)
    
    def call_claude(self, prompt: str) -> str:
        """
        レート制限を考慮したClaude呼び出し
        """
        estimated_tokens = len(prompt) // 4 + 500  # 概算計算
        
        should_wait, wait_time = self.should_wait(estimated_tokens)
        if should_wait:
            print(f"レート制限回避のため {wait_time:.1f}秒待機します")
            asyncio.run(asyncio.sleep(wait_time))
        
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=[
                {"role": "user", "content": prompt}
            ]
        )
        
        # 履歴に記録
        now = datetime.now()
        self.request_history.append(now)
        
        actual_tokens = response.usage.input_tokens + response.usage.output_tokens
        self.token_history.append((now, actual_tokens))
        
        return response.content[0].text

# 使用例
client = RateLimitedClaudeClient(rpm_limit=60)
result = client.call_claude("Pythonの非同期処理について説明してください")
print(result)

メリットと注意点

  • メリット:429エラーが発生する前に処理を制御できる
  • 注意点:トークン数は正確に計算できないため、概算を使う。実際の使用量に応じて調整が必要

対処法3:バッチ処理で効率的に処理

複数のテキストを処理する場合、個別リクエストではなくバッチ処理を活用することで、TPMを効率的に使用できます。

実装コード

import anthropic

def process_batch_efficiently(texts: list[str], batch_size: int = 5) -> list[str]:
    """
    複数のテキストをバッチ処理する
    """
    client = anthropic.Anthropic()
    results = []
    
    # テキストをバッチに分割
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        
        # 複数のテキストを1つのプロンプトで処理
        combined_prompt = "以下のテキストをそれぞれ要約してください。番号付きで結果を返してください:\n\n"
        for j, text in enumerate(batch, 1):
            combined_prompt += f"{j}. {text}\n\n"
        
        try:
            response = client.messages.create(
                model="claude-3-5-sonnet-20241022",
                max_tokens=2048,
                messages=[
                    {"role": "user", "content": combined_prompt}
                ]
            )
            
            results.append(response.content[0].text)
            
            print(f"バッチ {i // batch_size + 1}を処理しました")
        
        except anthropic.RateLimitError:
            print("レート制限に達しました。処理を一時停止します。")
            # この場合、呼び出し側で対処法1のバックオフを適用
            raise
    
    return results

# 使用例
texts = [
    "Claude APIは強力な言語モデルです。",
    "レート制限は運用時の重要な考慮事項です。",
    "バッチ処理で効率を上げることができます。"
]

results = process_batch_efficiently(texts, batch_size=2)
for result in results:
    print(result)
    print("---")

バッチ処理の工夫

  • 1つのリクエストで複数データを処理することでRPMを削減
  • トークン数は増えるが、リクエスト数が大幅に減ればTPMも効率的に使える
  • Batch APIの利用(非同期バッチ)でさらに低コストな処理も可能

対処法4:監視とロギングで事前対応

レート制限に達する前に検知することで、グレースフルなエラーハンドリングができます。

実装コード

import anthropic
import logging
from datetime import datetime
from dataclasses import dataclass

@dataclass
class RateLimitMetrics:
    timestamp: datetime
    requests_per_minute: int
    tokens_per_minute: int
    rpm_limit: int
    tpm_limit: int
    rpm_usage_percent: float
    tpm_usage_percent: float

class ClaudeClientWithMonitoring:
    """
    使用状況を監視するClaudeクライアント
    """
    def __init__(self, rpm_limit: int = 60, tpm_limit: int = 90000):
        self.client = anthropic.Anthropic()
        self.rpm_limit = rpm_limit
        self.tpm_limit = tpm_limit
        
        # ログ設定
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        
        # ファイルハンドラ
        handler = logging.FileHandler("claude_api_usage.log")
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        
        self.current_rpm = 0
        self.current_tpm = 0
    
    def log_metrics(self):
        """
        現在の使用率をログ記録
        """
        rpm_percent = (self.current_rpm / self.rpm_limit) * 100
        tpm_percent = (self.current_tpm / self.tpm_limit) * 100
        
        metrics = RateLimitMetrics(
            timestamp=datetime.now(),
            requests_per_minute=self.current_rpm,
            tokens_per_minute=self.current_tpm,
            rpm_limit=self.rpm_limit,
            tpm_limit=self.tpm_limit,
            rpm_usage_percent=rpm_percent,
            tpm_usage_percent=tpm_percent
        )
        
        # 80%以上で警告
        if rpm_percent > 80:
            self.logger.warning(
                f"RPM使用率が高い: {rpm_percent:.1f}% ({self.current_rpm}/{self.rpm_limit})"
            )
        
        if tpm_percent > 80:
            self.logger.warning(
                f"TPM使用率が高い: {tpm_percent:.1f}% ({self.current_tpm}/{self.tpm_limit})"
            )
        
        self.logger.info(f"メトリクス: {metrics}")
        
        return metrics
    
    def call_claude_with_monitoring(self, prompt: str) -> str:
        """
        監視機能付きClaude呼び出し
        """
        try:
            response = self.client.messages.create(
                model="claude-3-5-sonnet-20241022",
                max_tokens=1024,
                messages=[
                    {"role": "user", "content": prompt}
                ]
            )
            
            # 使用量を更新
            self.current_rpm += 1
            self.current_tpm += (response.usage.input_tokens + 
                                response.usage.output_tokens)
            
            # メトリクス記録
            self.log_metrics()
            
            return response.content[0].text
        
        except anthropic.RateLimitError as e:
            self.logger.error(f"レート制限エラー: {e}")
            raise

# 使用例
client = ClaudeClientWithMonitoring()
try:
    result =
    
K
AWS・Python・生成AIを専門とするソフトウェアエンジニア。AI・クラウド・開発ワークフローの実践ガイドを執筆しています。詳しく見る →