最近FastAPIを勉強していて、Udemy講座もいくつか受講しました。
講座の解説どおり、エンドポイントの関数はasync defで定義し、DBアクセス(SQLAlchemy使用)は同期(何も意識してないコード)で書いていました。
しかし、AIで調べ物をしていた時に、DB周りで見たことのないコードが提示されたので気になって調べてみたら、自分が書いていたコードがパフォーマンス最悪であることがわかったので、記録に残しておきます。
公式ドキュメントの確認
公式ドキュメントにも下記のような記載があります。
If you are using a third party library that communicates with something (a database, an API, the file system, etc.) and doesn’t have support for using
await, (this is currently the case for most database libraries), then declare your path operation functions as normally, with justdef, like:@app.get(‘/’)
def results():
results = some_library()
return resultsIf your application (somehow) doesn’t have to communicate with anything else and wait for it to respond, use
async def, even if you don’t need to useawaitinside.If you just don’t know, use normal
https://fastapi.tiangolo.com/async/#in-a-hurrydef.
外部APIとかDBとか、待ちが発生するライブラリが await に対応してない場合はdefを使おう。通信とか一切ないなら、async defでよい。どっちかわからなかったら、defにしとこう。
という感じかと思います。
ちなみに、SQLAlchemyはバージョン1.4以降、非同期に対応しています。
実験
スタンダードなFastAPI + SQLAlchemyの組み合わせで、負荷テストをしてみました。
(Python初心者かつ実務経験無しのため、適切でない点等あるかもしれません。あくまで参考程度に)
Go言語で開発されたベンチマークツールのheyを使用して、同時接続50ユーザー、合計1000件リクエストで計測してみました。
| def DB同期 | async def DB非同期 | async def DB同期 | |
|---|---|---|---|
| Total | 2.9573 secs | 2.6203 secs | 120.0144 secs |
| Slowest | 0.2869 secs | 1.9719 secs | 20.0020 secs |
| Fastest | 0.0065 secs | 0.0036 secs | 0.0604 secs |
| Average | 0.1438 secs | 0.0925 secs | 0.9812 secs |
| Requests/sec | 338.1407 | 381.6337 | 8.3323 |
| Error | 0 | 0 | 834 |
(一番右)async def + DB同期のパターンは明らかに遅い。
というか、834件 / 1000件がエラーという壊滅的な結果に。
ちなみにエラー内容は下記のとおり。
248: context deadline exceeded (Client.Timeout exceeded while awaiting headers)
586: dial tcp 127.0.0.1:8000: connectex: No connection could be made because the target machine actively refused it.
こうして結果を見ると明らかに問題があることがわかりますが、負荷をかけなければ普通に動いてしまうから、テストしないと気づけませんね・・・。
テストに使用したコード
from fastapi import Depends, FastAPI
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session
from crud import search_customers_async, search_customers_sync
from database import get_db, get_db_async
from schemas import CustomerResponse
app = FastAPI()
# --------------------
# def + sync DB
# --------------------
@app.get("/def-sync", response_model=list[CustomerResponse])
def read_def_sync(keyword: str, db: Session = Depends(get_db)):
customers = search_customers_sync(db, keyword)
return customers
# --------------------
# async def + async DB
# --------------------
@app.get("/async-async", response_model=list[CustomerResponse])
async def read_async_async(keyword: str, db: AsyncSession = Depends(get_db_async)):
customers = await search_customers_async(db, keyword)
return customers
# --------------------
# async def + sync DB
# --------------------
@app.get("/async-sync", response_model=list[CustomerResponse])
async def read_async_sync(keyword: str, db: Session = Depends(get_db)):
customers = search_customers_sync(db, keyword)
return customersfrom sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session
from models import Customer
# --- Sync CRUD ---
def search_customers_sync(db: Session, keyword: str):
result = (
db.query(Customer)
.filter(Customer.customer_name.ilike(f"%{keyword}%"))
.limit(50)
.all()
)
return result
# --- Async CRUD ---
async def search_customers_async(db: AsyncSession, keyword: str):
stmt = (
select(Customer).where(Customer.customer_name.ilike(f"%{keyword}%")).limit(50)
)
result = await db.execute(stmt)
return result.scalars().all()
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
DATABASE_URL_SYNC = "postgresql+psycopg2://testuser:testpass@localhost:5432/testdb"
DATABASE_URL_ASYNC = "postgresql+asyncpg://testuser:testpass@localhost:5432/testdb"
# --- Sync ---
engine_sync = create_engine(DATABASE_URL_SYNC, future=True)
SessionLocal = sessionmaker(bind=engine_sync, autoflush=False, autocommit=False)
# --- Async ---
engine_async = create_async_engine(DATABASE_URL_ASYNC, future=True)
AsyncSessionLocal = sessionmaker(
bind=engine_async, class_=AsyncSession, autoflush=False, autocommit=False
)
# --- Dependency: Sync ---
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# --- Dependency: Async ---
async def get_db_async():
async with AsyncSessionLocal() as session:
yield session
from sqlalchemy import Integer, String
from sqlalchemy.orm import Mapped, declarative_base, mapped_column
Base = declarative_base()
class Customer(Base):
__tablename__ = "customers"
customer_id: Mapped[str] = mapped_column(String, primary_key=True)
customer_name: Mapped[str] = mapped_column(String)from pydantic import BaseModel
class CustomerResponse(BaseModel):
customer_id: str
customer_name: str
class Config:
from_attributes = True
コメント