更新: 2026年03月 · 11 分で読める · 5,392 文字
Python asyncioで同時実行処理を実装する実践ガイド
本記事では、Pythonのasyncioライブラリを使って、複数の処理を効率的に並行実行する方法を学べます。APIリクエストやデータベース処理など、I/O待機時間が多い処理を大幅に高速化できる実装パターンを、すぐに仕事で活用できるコード例とともに解説します。
asyncioが解決する問題
通常のPythonプログラムは同期的に実行されます。つまり、ある処理が完了するまで次の処理は待機状態になります。例えば、Webサーバーへのリクエストが完了するまで、プログラムは進まないということです。
データベースクエリやAPI呼び出しなど、外部リソースとの通信が発生する場合、ネットワーク応答を待っている間、CPUは遊んでいます。このようなI/O待機の時間を有効活用するのが、非同期処理(async/await)の役割です。
asyncioを使えば、1つの処理が完了するまで待たず、別の処理を同時進行させることで、全体の実行時間を大幅に削減できます。
asyncioの基本構文を理解する
async関数とawait式の書き方
async defで非同期関数を定義し、その中でawaitを使って他の非同期処理の完了を待ちます。
import asyncio
# 非同期関数の定義
async def fetch_data(url):
print(f"データ取得開始: {url}")
# 実際にはここでHTTP通信が行われる(ここでは3秒待機をシミュレート)
await asyncio.sleep(3)
print(f"データ取得完了: {url}")
return f"Result from {url}"
# 実行
async def main():
result = await fetch_data("https://example.com")
print(result)
# asyncio.run()を使ってメインのコルーチンを実行
asyncio.run(main())
ただし、このコード例では1つの処理しか実行していないため、非同期処理のメリットがありません。複数の処理を並行実行するにはasyncio.gather()やasyncio.create_task()を使います。
複数の処理を並行実行する実装パターン
asyncio.gather()で複数の非同期処理を実行
asyncio.gather()は最も一般的な方法です。複数の非同期処理を同時に実行し、すべての結果が揃うまで待ちます。
import asyncio
async def fetch_data(url, delay):
"""URLからデータを取得するシミュレーション"""
print(f"[開始] {url}")
await asyncio.sleep(delay) # ネットワーク待機をシミュレート
print(f"[完了] {url}")
return f"Data from {url}"
async def main():
# 3つの処理を並行実行
results = await asyncio.gather(
fetch_data("https://api1.example.com", 2),
fetch_data("https://api2.example.com", 3),
fetch_data("https://api3.example.com", 1)
)
for result in results:
print(result)
# 実行
asyncio.run(main())
# 出力例:
# [開始] https://api1.example.com
# [開始] https://api2.example.com
# [開始] https://api3.example.com
# [完了] https://api3.example.com
# [完了] https://api1.example.com
# [完了] https://api2.example.com
# Data from https://api1.example.com
# Data from https://api2.example.com
# Data from https://api3.example.com
このコードは約3秒で完了します(最長の処理の時間)。同期処理なら6秒かかります。
asyncio.create_task()で柔軟なタスク管理
より細かい制御が必要な場合はcreate_task()を使って、タスクを明示的に作成します。
import asyncio
async def download_file(filename, duration):
"""ファイルダウンロードをシミュレート"""
print(f"ダウンロード開始: {filename}")
try:
await asyncio.sleep(duration)
print(f"ダウンロード完了: {filename}")
return filename
except asyncio.CancelledError:
print(f"ダウンロードキャンセル: {filename}")
raise
async def main():
# タスクを作成
task1 = asyncio.create_task(download_file("file1.zip", 2))
task2 = asyncio.create_task(download_file("file2.zip", 3))
task3 = asyncio.create_task(download_file("file3.zip", 1))
# すべてのタスクの完了を待つ
results = await asyncio.gather(task1, task2, task3)
print(f"すべてのダウンロード完了: {results}")
asyncio.run(main())
タイムアウト設定でハングを防ぐ
長時間待機を避けるため、asyncio.wait_for()でタイムアウトを設定します。これは実務で必須の実装です。
import asyncio
async def slow_api_call():
"""遅いAPIをシミュレート"""
await asyncio.sleep(5)
return "API Response"
async def main():
try:
# 2秒以内に完了する必要がある
result = await asyncio.wait_for(slow_api_call(), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("エラー: APIがタイムアウトしました")
asyncio.run(main())
# 出力: エラー: APIがタイムアウトしました
実務で使える実装例:複数URLからのデータ取得
aiohttpライブラリを組み合わせた、実際のAPI呼び出しの例です。
import asyncio
import aiohttp
async def fetch_url(session, url):
"""URLから非同期でHTTPリクエストを送信"""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as response:
print(f"[{response.status}] {url}")
return await response.text()
except asyncio.TimeoutError:
print(f"[TIMEOUT] {url}")
return None
except Exception as e:
print(f"[ERROR] {url}: {e}")
return None
async def fetch_all_urls(urls):
"""複数のURLから並行してデータを取得"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2",
"https://jsonplaceholder.typicode.com/posts/3"
]
results = await fetch_all_urls(urls)
print(f"取得結果数: {len([r for r in results if r])}")
asyncio.run(main())
※ このコードを実行するにはpip install aiohttpでライブラリをインストールしてください。
よくあるハマりポイントと解決策
同期関数とasync関数の混在エラー
問題: 同期関数(通常のdef)をawaitできません。
# ❌ これはエラーになる
def sync_function():
return "result"
async def main():
result = await sync_function() # TypeErrorが発生
# ✅ 正しい書き方
async def async_function():
return "result"
async def main():
result = await async_function() # OK
asyncio.run()の多重呼び出し
問題: 同じスクリプト内で複数回asyncio.run()を呼び出すと、イベントループのエラーが発生します。
# ❌ エラー
asyncio.run(main1())
asyncio.run(main2())
# ✅ 正しい書き方
async def combined_main():
await main1()
await main2()
asyncio.run(combined_main())
gather()で例外が発生した場合の処理
デフォルトでは1つのタスクで例外が発生すると、gather()全体が中断します。これを防ぐにはreturn_exceptions=Trueを設定します。
async def task_that_fails():
raise ValueError("エラー発生")
async def task_that_succeeds():
await asyncio.sleep(1)
return "成功"
async def main():
# return_exceptions=Trueで例外を結果として返す
results = await asyncio.gather(
task_that_fails(),
task_that_succeeds(),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"タスク{i}: 例外 - {result}")
else:
print(f"タスク{i}: {result}")
asyncio.run(main())
# 出力:
# タスク0: 例外 - エラー発生
# タスク1: 成功
asyncioを使うべき場面と避けるべき場面
使うべき場面
- I/O待機が多い処理: Web APIリクエスト、ファイル読み書き、データベースクエリ
- 複数の外部リソースへのアクセス: 複数のAPI呼び出しを並行実行
- Webスクレイピング: 大量のURLから効率よくデータ取得
- チャットボットやリアルタイムシステム: 複数の接続を同時処理
避けるべき場面
- CPU集約的な処理: 大量の計算やデータ処理には
multiprocessingを使う - ブロッキング関数の使用: asyncioと非互換な同期ライブラリを使うなら
concurrent.futuresを検討 - シンプルな処理: 処理が1つだけなら非同期処理のオーバーヘッドが無駄
代替手段との比較
threadingはスレッド単位で並行実行しますが、PythonのGIL(Global Interpreter Lock)の影響を受けます。I/O待機中心ならasyncioの方が効率的です。multiprocessingは真の並列実行が可能ですが、CPU集約的な処理向けで、セットアップのオーバーヘッドが大きくなります。
テスト環境と動作確認
本記事のコード例は以下の環境で動作確認済みです:
- OS: macOS 14 / Ubuntu 22.04 / Windows 11
- Python: 3.10 / 3.11 / 3.12
- 必須ライブラリ: asyncio(標準ライブラリ)、aiohttp(オプション)
よくある質問
gather()はすべてのタスクの完了を待ち、結果をリストで返します。wait()はより細かい制御が可能で、return_when=asyncio.FIRST_COMPLETEDで最初に完了したタスクから処理を開始できます。シンプルな用途ならgather()、複雑な制御が必要ならwait()を選択してください。
おすすめPythonリソース
- Python Official Tutorial The official Python tutorial. Perfect for building fundamentals.
- Real Python Practical Python tutorials for intermediate and advanced developers.
- PyPI Official Python package repository for library discovery.