FastAPI: async defと同期DBアクセスは混ぜるな危険

最近FastAPIを勉強していて、Udemy講座もいくつか受講しました。

講座の解説どおり、エンドポイントの関数はasync defで定義し、DBアクセス(SQLAlchemy使用)は同期(何も意識してないコード)で書いていました。

しかし、AIで調べ物をしていた時に、DB周りで見たことのないコードが提示されたので気になって調べてみたら、自分が書いていたコードがパフォーマンス最悪であることがわかったので、記録に残しておきます。

公式ドキュメントの確認

公式ドキュメントにも下記のような記載があります。

If you are using a third party library that communicates with something (a database, an API, the file system, etc.) and doesn’t have support for using await, (this is currently the case for most database libraries), then declare your path operation functions as normally, with just def, like:

@app.get(‘/’)
def results():
  results = some_library()
  return results

If your application (somehow) doesn’t have to communicate with anything else and wait for it to respond, use async def, even if you don’t need to use await inside.

If you just don’t know, use normal def.

https://fastapi.tiangolo.com/async/#in-a-hurry

外部APIとかDBとか、待ちが発生するライブラリが await に対応してない場合はdefを使おう。通信とか一切ないなら、async defでよい。どっちかわからなかったら、defにしとこう。

という感じかと思います。
ちなみに、SQLAlchemyはバージョン1.4以降、非同期に対応しています。

実験

スタンダードなFastAPI + SQLAlchemyの組み合わせで、負荷テストをしてみました。
(Python初心者かつ実務経験無しのため、適切でない点等あるかもしれません。あくまで参考程度に)

Go言語で開発されたベンチマークツールのheyを使用して、同時接続50ユーザー、合計1000件リクエストで計測してみました。

def
DB同期
async def
DB非同期
async def
DB同期
Total2.9573 secs2.6203 secs120.0144 secs
Slowest0.2869 secs1.9719 secs20.0020 secs
Fastest0.0065 secs0.0036 secs0.0604 secs
Average0.1438 secs0.0925 secs0.9812 secs
Requests/sec338.1407381.63378.3323
Error00834

(一番右)async def + DB同期のパターンは明らかに遅い。
というか、834件 / 1000件がエラーという壊滅的な結果に。

ちなみにエラー内容は下記のとおり。

248: context deadline exceeded (Client.Timeout exceeded while awaiting headers)
586: dial tcp 127.0.0.1:8000: connectex: No connection could be made because the target machine actively refused it.

こうして結果を見ると明らかに問題があることがわかりますが、負荷をかけなければ普通に動いてしまうから、テストしないと気づけませんね・・・。

テストに使用したコード

from fastapi import Depends, FastAPI
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session

from crud import search_customers_async, search_customers_sync
from database import get_db, get_db_async
from schemas import CustomerResponse

app = FastAPI()


# --------------------
# def + sync DB
# --------------------
@app.get("/def-sync", response_model=list[CustomerResponse])
def read_def_sync(keyword: str, db: Session = Depends(get_db)):
    customers = search_customers_sync(db, keyword)
    return customers


# --------------------
# async def + async DB
# --------------------
@app.get("/async-async", response_model=list[CustomerResponse])
async def read_async_async(keyword: str, db: AsyncSession = Depends(get_db_async)):
    customers = await search_customers_async(db, keyword)
    return customers


# --------------------
# async def + sync DB
# --------------------
@app.get("/async-sync", response_model=list[CustomerResponse])
async def read_async_sync(keyword: str, db: Session = Depends(get_db)):
    customers = search_customers_sync(db, keyword)
    return customers
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session

from models import Customer


# --- Sync CRUD ---
def search_customers_sync(db: Session, keyword: str):
    result = (
        db.query(Customer)
        .filter(Customer.customer_name.ilike(f"%{keyword}%"))
        .limit(50)
        .all()
    )
    return result


# --- Async CRUD ---
async def search_customers_async(db: AsyncSession, keyword: str):
    stmt = (
        select(Customer).where(Customer.customer_name.ilike(f"%{keyword}%")).limit(50)
    )
    result = await db.execute(stmt)
    return result.scalars().all()
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

DATABASE_URL_SYNC = "postgresql+psycopg2://testuser:testpass@localhost:5432/testdb"
DATABASE_URL_ASYNC = "postgresql+asyncpg://testuser:testpass@localhost:5432/testdb"

# --- Sync ---
engine_sync = create_engine(DATABASE_URL_SYNC, future=True)
SessionLocal = sessionmaker(bind=engine_sync, autoflush=False, autocommit=False)

# --- Async ---
engine_async = create_async_engine(DATABASE_URL_ASYNC, future=True)
AsyncSessionLocal = sessionmaker(
    bind=engine_async, class_=AsyncSession, autoflush=False, autocommit=False
)


# --- Dependency: Sync ---
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


# --- Dependency: Async ---
async def get_db_async():
    async with AsyncSessionLocal() as session:
        yield session
from sqlalchemy import Integer, String
from sqlalchemy.orm import Mapped, declarative_base, mapped_column

Base = declarative_base()


class Customer(Base):
    __tablename__ = "customers"

    customer_id: Mapped[str] = mapped_column(String, primary_key=True)
    customer_name: Mapped[str] = mapped_column(String)
from pydantic import BaseModel


class CustomerResponse(BaseModel):
    customer_id: str
    customer_name: str

    class Config:
        from_attributes = True

コメント