Python asyncioで同時実行処理を実装する実践ガイド

本記事では、Pythonのasyncioライブラリを使って、複数の処理を効率的に並行実行する方法を学べます。APIリクエストやデータベース処理など、I/O待機時間が多い処理を大幅に高速化できる実装パターンを、すぐに仕事で活用できるコード例とともに解説します。

asyncioが解決する問題

通常のPythonプログラムは同期的に実行されます。つまり、ある処理が完了するまで次の処理は待機状態になります。例えば、Webサーバーへのリクエストが完了するまで、プログラムは進まないということです。

データベースクエリやAPI呼び出しなど、外部リソースとの通信が発生する場合、ネットワーク応答を待っている間、CPUは遊んでいます。このようなI/O待機の時間を有効活用するのが、非同期処理(async/await)の役割です。

asyncioを使えば、1つの処理が完了するまで待たず、別の処理を同時進行させることで、全体の実行時間を大幅に削減できます。

asyncioの基本構文を理解する

async関数とawait式の書き方

async defで非同期関数を定義し、その中でawaitを使って他の非同期処理の完了を待ちます。

import asyncio

# 非同期関数の定義
async def fetch_data(url):
    print(f"データ取得開始: {url}")
    # 実際にはここでHTTP通信が行われる(ここでは3秒待機をシミュレート)
    await asyncio.sleep(3)
    print(f"データ取得完了: {url}")
    return f"Result from {url}"

# 実行
async def main():
    result = await fetch_data("https://example.com")
    print(result)

# asyncio.run()を使ってメインのコルーチンを実行
asyncio.run(main())

ただし、このコード例では1つの処理しか実行していないため、非同期処理のメリットがありません。複数の処理を並行実行するにはasyncio.gather()asyncio.create_task()を使います。

複数の処理を並行実行する実装パターン

asyncio.gather()で複数の非同期処理を実行

asyncio.gather()は最も一般的な方法です。複数の非同期処理を同時に実行し、すべての結果が揃うまで待ちます。

import asyncio

async def fetch_data(url, delay):
    """URLからデータを取得するシミュレーション"""
    print(f"[開始] {url}")
    await asyncio.sleep(delay)  # ネットワーク待機をシミュレート
    print(f"[完了] {url}")
    return f"Data from {url}"

async def main():
    # 3つの処理を並行実行
    results = await asyncio.gather(
        fetch_data("https://api1.example.com", 2),
        fetch_data("https://api2.example.com", 3),
        fetch_data("https://api3.example.com", 1)
    )
    
    for result in results:
        print(result)

# 実行
asyncio.run(main())
# 出力例:
# [開始] https://api1.example.com
# [開始] https://api2.example.com
# [開始] https://api3.example.com
# [完了] https://api3.example.com
# [完了] https://api1.example.com
# [完了] https://api2.example.com
# Data from https://api1.example.com
# Data from https://api2.example.com
# Data from https://api3.example.com

このコードは約3秒で完了します(最長の処理の時間)。同期処理なら6秒かかります。

asyncio.create_task()で柔軟なタスク管理

より細かい制御が必要な場合はcreate_task()を使って、タスクを明示的に作成します。

import asyncio

async def download_file(filename, duration):
    """ファイルダウンロードをシミュレート"""
    print(f"ダウンロード開始: {filename}")
    try:
        await asyncio.sleep(duration)
        print(f"ダウンロード完了: {filename}")
        return filename
    except asyncio.CancelledError:
        print(f"ダウンロードキャンセル: {filename}")
        raise

async def main():
    # タスクを作成
    task1 = asyncio.create_task(download_file("file1.zip", 2))
    task2 = asyncio.create_task(download_file("file2.zip", 3))
    task3 = asyncio.create_task(download_file("file3.zip", 1))
    
    # すべてのタスクの完了を待つ
    results = await asyncio.gather(task1, task2, task3)
    print(f"すべてのダウンロード完了: {results}")

asyncio.run(main())

タイムアウト設定でハングを防ぐ

長時間待機を避けるため、asyncio.wait_for()でタイムアウトを設定します。これは実務で必須の実装です。

import asyncio

async def slow_api_call():
    """遅いAPIをシミュレート"""
    await asyncio.sleep(5)
    return "API Response"

async def main():
    try:
        # 2秒以内に完了する必要がある
        result = await asyncio.wait_for(slow_api_call(), timeout=2.0)
        print(result)
    except asyncio.TimeoutError:
        print("エラー: APIがタイムアウトしました")

asyncio.run(main())
# 出力: エラー: APIがタイムアウトしました

実務で使える実装例:複数URLからのデータ取得

aiohttpライブラリを組み合わせた、実際のAPI呼び出しの例です。

import asyncio
import aiohttp

async def fetch_url(session, url):
    """URLから非同期でHTTPリクエストを送信"""
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as response:
            print(f"[{response.status}] {url}")
            return await response.text()
    except asyncio.TimeoutError:
        print(f"[TIMEOUT] {url}")
        return None
    except Exception as e:
        print(f"[ERROR] {url}: {e}")
        return None

async def fetch_all_urls(urls):
    """複数のURLから並行してデータを取得"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

async def main():
    urls = [
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/posts/2",
        "https://jsonplaceholder.typicode.com/posts/3"
    ]
    
    results = await fetch_all_urls(urls)
    print(f"取得結果数: {len([r for r in results if r])}")

asyncio.run(main())

※ このコードを実行するにはpip install aiohttpでライブラリをインストールしてください。

よくあるハマりポイントと解決策

同期関数とasync関数の混在エラー

問題: 同期関数(通常のdef)をawaitできません。

# ❌ これはエラーになる
def sync_function():
    return "result"

async def main():
    result = await sync_function()  # TypeErrorが発生

# ✅ 正しい書き方
async def async_function():
    return "result"

async def main():
    result = await async_function()  # OK

asyncio.run()の多重呼び出し

問題: 同じスクリプト内で複数回asyncio.run()を呼び出すと、イベントループのエラーが発生します。

# ❌ エラー
asyncio.run(main1())
asyncio.run(main2())

# ✅ 正しい書き方
async def combined_main():
    await main1()
    await main2()

asyncio.run(combined_main())

gather()で例外が発生した場合の処理

デフォルトでは1つのタスクで例外が発生すると、gather()全体が中断します。これを防ぐにはreturn_exceptions=Trueを設定します。

async def task_that_fails():
    raise ValueError("エラー発生")

async def task_that_succeeds():
    await asyncio.sleep(1)
    return "成功"

async def main():
    # return_exceptions=Trueで例外を結果として返す
    results = await asyncio.gather(
        task_that_fails(),
        task_that_succeeds(),
        return_exceptions=True
    )
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"タスク{i}: 例外 - {result}")
        else:
            print(f"タスク{i}: {result}")

asyncio.run(main())
# 出力:
# タスク0: 例外 - エラー発生
# タスク1: 成功

asyncioを使うべき場面と避けるべき場面

使うべき場面

  • I/O待機が多い処理: Web APIリクエスト、ファイル読み書き、データベースクエリ
  • 複数の外部リソースへのアクセス: 複数のAPI呼び出しを並行実行
  • Webスクレイピング: 大量のURLから効率よくデータ取得
  • チャットボットやリアルタイムシステム: 複数の接続を同時処理

避けるべき場面

  • CPU集約的な処理: 大量の計算やデータ処理にはmultiprocessingを使う
  • ブロッキング関数の使用: asyncioと非互換な同期ライブラリを使うならconcurrent.futuresを検討
  • シンプルな処理: 処理が1つだけなら非同期処理のオーバーヘッドが無駄

代替手段との比較

threadingはスレッド単位で並行実行しますが、PythonのGIL(Global Interpreter Lock)の影響を受けます。I/O待機中心ならasyncioの方が効率的です。multiprocessingは真の並列実行が可能ですが、CPU集約的な処理向けで、セットアップのオーバーヘッドが大きくなります。

テスト環境と動作確認

本記事のコード例は以下の環境で動作確認済みです:

  • OS: macOS 14 / Ubuntu 22.04 / Windows 11
  • Python: 3.10 / 3.11 / 3.12
  • 必須ライブラリ: asyncio(標準ライブラリ)、aiohttp(オプション)

よくある質問

gather()はすべてのタスクの完了を待ち、結果をリストで返します。wait()はより細かい制御が可能で、return_when=asyncio.FIRST_COMPLETEDで最初に完了したタスクから処理を開始できます。シンプルな用途ならgather()、複雑な制御が必要ならwait()を選択してください。

K
AWS・Python・生成AIを専門とするソフトウェアエンジニア。AI・クラウド・開発ワークフローの実践ガイドを執筆しています。詳しく見る →