更新: 2026年03月 · 13 分で読める · 6,446 文字
Claude APIのレート制限を超えずに運用する実装パターン
Claude APIのレート制限(RPM・TPM)に引っかかると、アプリケーションは一時的に使用不可になります。この記事では、レート制限の仕組みを理解し、バックオフ戦略・リクエスト調整・監視方法を使った実践的な対処法を4つのパターンで紹介します。すぐに本番環境で活用できるコード例も含めました。
Claude APIのレート制限とは
Claude APIには2つのレート制限があります。
- RPM(Requests Per Minute):1分あたりのリクエスト数
- TPM(Tokens Per Minute):1分あたりの入出力トークン数
制限を超えると、HTTPステータス429(Too Many Requests)が返され、Retry-Afterヘッダーに次のリクエスト可能な時間が記載されます。無視してリクエストを続けると、一時的にAPIアクセスがブロックされます。
テスト環境:macOS 14 / Python 3.12 / Claude API 2025-01で動作確認済み
対処法1:指数バックオフで自動再試行
最も一般的な対処法は、429エラー時に待機時間を段階的に増やしながら再試行することです。
実装コード
import anthropic
import time
import random
def call_claude_with_backoff(prompt: str, max_retries: int = 5) -> str:
"""
指数バックオフを使用してClaudeを呼び出す関数
"""
client = anthropic.Anthropic()
for attempt in range(max_retries):
try:
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": prompt}
]
)
return response.content[0].text
except anthropic.RateLimitError as e:
# レート制限エラーを検出
retry_after = int(e.response.headers.get("retry-after", 60))
# ジッターを追加してサーバー負荷を分散
wait_time = retry_after + random.uniform(0, 2)
print(f"レート制限に達しました。{wait_time:.1f}秒待機します(試行 {attempt + 1}/{max_retries})")
time.sleep(wait_time)
if attempt == max_retries - 1:
raise Exception(f"最大再試行回数に達しました。{max_retries}回の試行後も失敗しました。")
except anthropic.APIError as e:
# その他のAPIエラー
print(f"APIエラー: {e}")
raise
return ""
# 使用例
result = call_claude_with_backoff("日本の首都は?")
print(result)
重要なポイント
- Retry-Afterヘッダーを使う:サーバーが指定した時間を尊重する
- ジッター(乱数)を追加:複数クライアントが同時に再試行するのを防ぐ
- RateLimitErrorを明示的にキャッチ:他のエラーと区別できる
対処法2:リクエスト数を事前に制限する
根本的な対策は、レート制限に到達しないよう、送信するリクエスト数そのものを減らすことです。キューイングシステムを使用します。
実装コード
import anthropic
import asyncio
from datetime import datetime, timedelta
from collections import deque
class RateLimitedClaudeClient:
"""
レート制限を考慮したClaudeクライアント
"""
def __init__(self, rpm_limit: int = 60, tpm_limit: int = 90000):
self.client = anthropic.Anthropic()
self.rpm_limit = rpm_limit
self.tpm_limit = tpm_limit
# リクエスト履歴(タイムスタンプとトークン数を記録)
self.request_history = deque()
self.token_history = deque()
def _clean_history(self, now: datetime, window_minutes: int = 1):
"""
指定時間より古い履歴を削除
"""
cutoff_time = now - timedelta(minutes=window_minutes)
while self.request_history and self.request_history[0] < cutoff_time:
self.request_history.popleft()
while self.token_history and self.token_history[0][0] < cutoff_time:
self.token_history.popleft()
def should_wait(self, estimated_tokens: int = 1000) -> tuple[bool, float]:
"""
待機が必要かチェック。必要な場合は待機時間を返す
"""
now = datetime.now()
self._clean_history(now)
requests_in_minute = len(self.request_history)
tokens_in_minute = sum(tokens for _, tokens in self.token_history)
wait_seconds = 0
# RPM制限チェック
if requests_in_minute >= self.rpm_limit:
oldest_request = self.request_history[0]
wait_seconds = max(wait_seconds,
(oldest_request - now + timedelta(minutes=1)).total_seconds())
# TPM制限チェック
if tokens_in_minute + estimated_tokens > self.tpm_limit:
oldest_token = self.token_history[0][0]
wait_seconds = max(wait_seconds,
(oldest_token - now + timedelta(minutes=1)).total_seconds())
return wait_seconds > 0, max(0, wait_seconds)
def call_claude(self, prompt: str) -> str:
"""
レート制限を考慮したClaude呼び出し
"""
estimated_tokens = len(prompt) // 4 + 500 # 概算計算
should_wait, wait_time = self.should_wait(estimated_tokens)
if should_wait:
print(f"レート制限回避のため {wait_time:.1f}秒待機します")
asyncio.run(asyncio.sleep(wait_time))
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": prompt}
]
)
# 履歴に記録
now = datetime.now()
self.request_history.append(now)
actual_tokens = response.usage.input_tokens + response.usage.output_tokens
self.token_history.append((now, actual_tokens))
return response.content[0].text
# 使用例
client = RateLimitedClaudeClient(rpm_limit=60)
result = client.call_claude("Pythonの非同期処理について説明してください")
print(result)
メリットと注意点
- メリット:429エラーが発生する前に処理を制御できる
- 注意点:トークン数は正確に計算できないため、概算を使う。実際の使用量に応じて調整が必要
対処法3:バッチ処理で効率的に処理
複数のテキストを処理する場合、個別リクエストではなくバッチ処理を活用することで、TPMを効率的に使用できます。
実装コード
import anthropic
def process_batch_efficiently(texts: list[str], batch_size: int = 5) -> list[str]:
"""
複数のテキストをバッチ処理する
"""
client = anthropic.Anthropic()
results = []
# テキストをバッチに分割
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
# 複数のテキストを1つのプロンプトで処理
combined_prompt = "以下のテキストをそれぞれ要約してください。番号付きで結果を返してください:\n\n"
for j, text in enumerate(batch, 1):
combined_prompt += f"{j}. {text}\n\n"
try:
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2048,
messages=[
{"role": "user", "content": combined_prompt}
]
)
results.append(response.content[0].text)
print(f"バッチ {i // batch_size + 1}を処理しました")
except anthropic.RateLimitError:
print("レート制限に達しました。処理を一時停止します。")
# この場合、呼び出し側で対処法1のバックオフを適用
raise
return results
# 使用例
texts = [
"Claude APIは強力な言語モデルです。",
"レート制限は運用時の重要な考慮事項です。",
"バッチ処理で効率を上げることができます。"
]
results = process_batch_efficiently(texts, batch_size=2)
for result in results:
print(result)
print("---")
バッチ処理の工夫
- 1つのリクエストで複数データを処理することでRPMを削減
- トークン数は増えるが、リクエスト数が大幅に減ればTPMも効率的に使える
- Batch APIの利用(非同期バッチ)でさらに低コストな処理も可能
対処法4:監視とロギングで事前対応
レート制限に達する前に検知することで、グレースフルなエラーハンドリングができます。
実装コード
import anthropic
import logging
from datetime import datetime
from dataclasses import dataclass
@dataclass
class RateLimitMetrics:
timestamp: datetime
requests_per_minute: int
tokens_per_minute: int
rpm_limit: int
tpm_limit: int
rpm_usage_percent: float
tpm_usage_percent: float
class ClaudeClientWithMonitoring:
"""
使用状況を監視するClaudeクライアント
"""
def __init__(self, rpm_limit: int = 60, tpm_limit: int = 90000):
self.client = anthropic.Anthropic()
self.rpm_limit = rpm_limit
self.tpm_limit = tpm_limit
# ログ設定
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# ファイルハンドラ
handler = logging.FileHandler("claude_api_usage.log")
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.current_rpm = 0
self.current_tpm = 0
def log_metrics(self):
"""
現在の使用率をログ記録
"""
rpm_percent = (self.current_rpm / self.rpm_limit) * 100
tpm_percent = (self.current_tpm / self.tpm_limit) * 100
metrics = RateLimitMetrics(
timestamp=datetime.now(),
requests_per_minute=self.current_rpm,
tokens_per_minute=self.current_tpm,
rpm_limit=self.rpm_limit,
tpm_limit=self.tpm_limit,
rpm_usage_percent=rpm_percent,
tpm_usage_percent=tpm_percent
)
# 80%以上で警告
if rpm_percent > 80:
self.logger.warning(
f"RPM使用率が高い: {rpm_percent:.1f}% ({self.current_rpm}/{self.rpm_limit})"
)
if tpm_percent > 80:
self.logger.warning(
f"TPM使用率が高い: {tpm_percent:.1f}% ({self.current_tpm}/{self.tpm_limit})"
)
self.logger.info(f"メトリクス: {metrics}")
return metrics
def call_claude_with_monitoring(self, prompt: str) -> str:
"""
監視機能付きClaude呼び出し
"""
try:
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": prompt}
]
)
# 使用量を更新
self.current_rpm += 1
self.current_tpm += (response.usage.input_tokens +
response.usage.output_tokens)
# メトリクス記録
self.log_metrics()
return response.content[0].text
except anthropic.RateLimitError as e:
self.logger.error(f"レート制限エラー: {e}")
raise
# 使用例
client = ClaudeClientWithMonitoring()
try:
result =
おすすめAIリソース
- Anthropic Claude API Docs Official Claude API reference. Essential for implementation.
- OpenAI Platform Official GPT series API documentation with pricing details.
- Hugging Face Open-source model hub with many free models to try.