Bigger Applications - Multiple Files - FastAPI
FastAPI framework, high performance, easy to learn, fast to code, ready for production
https://fastapi.tiangolo.com/tutorial/bigger-applications/
FastAPIも導入すれば、とりあえず動くところまでは簡単でした。
NestJSはCLIでひな型の自動生成をしてくれたので、それに乗っかることにしました。スキーマ単位の構造は、スキーマ駆動でマイクロサービスな感じで、これからはこういう構造になっていくのかなぁと思いました。
で、FastAPIです。公式では、どうやらレイヤ構造を推奨しているようです。
多くの記事で上記に近い構造が紹介されていましたが、ここはNestJSのようにスキーマ単位の構造を採用してみようと思います。ディレクトリがスキーマ名になり、機能名がファイル名になるだけなので、縦と横をひっくり返したようになるだけです。
$ tree src -I __pycache__
src
├── config.py
├── customers
│ ├── __init__.py
│ ├── models.py
│ ├── router.py
│ ├── schemas.py
│ └── service.py
├── database.py
├── __init__.py
├── main.py
├── orders
└── products
最初に実装するcustomersと同様にordersとproductsにも実装していきます。
折角なので、データベースアクセスも非同期にしてみます。
(venv) $ pip install sqlalchemy
(venv) $ pip install asyncpg
pydanticのBeseSettingsを使って.envの設定を取り込みます。
from pydantic import BaseSettings
class Settings(BaseSettings):
DB_HOST: str
DB_USER: str
DB_PASS: str
class Config:
env_file = ".env.dev"
上記を使用して、SQLAlchemyの共通設定をします。SQLAlchemy1.4相当の実装例が多かったので、本家のドキュメントを読んで、できるだけ2.0相当で、かつ、非同期を使うように実装しました。
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from . import config
settings = config.Settings()
DB_URL = "postgresql+asyncpg://" \
f"{settings.DB_USER}:{settings.DB_PASS}@{settings.DB_HOST}:5432/postgres"
engine = create_async_engine(DB_URL, echo=True)
async_session = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase):
pass
async def get_session():
async with async_session() as session:
yield session
あとは、main.pyですが、これは個別部の後にします。
まず、データベースのモデルを定義します。
from sqlalchemy.orm import Mapped, mapped_column
from src.database import Base
class Customer(Base):
__tablename__ = "customer"
customerId: Mapped[int] = mapped_column(
name="customer_id", primary_key=True)
name: Mapped[str]
address: Mapped[str]
def __repr__(self) -> str:
return f"Customer(customerId={self.customerId!r}, " \
f"name={self.name!r}, address={self.address!r})"
次に、リソース表現としてのリクエスト、レスポンスのスキーマをpydanticのBaseModelで定義します。
from pydantic import BaseModel
class Customer(BaseModel):
customerId: int
name: str
address: str
class Config:
orm_mode = True
orm_mode = True については、以下の意味になります。
Pydantic's orm_mode will tell the Pydantic model to read the data even if it is not a dict, but an ORM model (or any other arbitrary object with attributes).
モデルとスキーマの準備ができたら、この両者の変換とSQLAlchemyへの命令を実装します。
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from . import models, schemas
async def get_customer(db: AsyncSession, customerId: int):
result = await (db.execute(select(models.Customer).filter(
models.Customer.customerId == customerId)))
customer = result.first()
if customer is None:
raise HTTPException(status_code=404, detail="Customer not found")
return customer[0]
async def create_customer(db: AsyncSession, customer: schemas.Customer):
db_customer = models.Customer(
customerId=customer.customerId,
name=customer.name,
address=customer.address)
db.add(db_customer)
await db.commit()
await db.refresh(db_customer)
return db_customer
async def update_customer(
db: AsyncSession, customerId: int, customer: schemas.Customer
):
original = await get_customer(db, customerId)
original.name = customer.name
original.address = customer.address
db.add(original)
await db.commit()
await db.refresh(original)
return original
async def delete_customer(db: AsyncSession, customerId: int):
original = await get_customer(db, customerId)
await db.delete(original)
await db.commit()
そして、最後にルーターを定義します。
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from src.database import get_session
from . import schemas, service
router = APIRouter()
@router.post("/customers", response_model=schemas.Customer)
async def create_customer(
customer: schemas.Customer, db: AsyncSession = Depends(get_session)
):
return await service.create_customer(db, customer)
@router.get("/customers/{customerId}", response_model=schemas.Customer)
async def get_customer(
customerId: int, db: AsyncSession = Depends(get_session)
):
return await service.get_customer(db, customerId)
@router.put("/customers/{customerId}", response_model=schemas.Customer)
async def update_customer(
customerId: int,
customer: schemas.Customer,
db: AsyncSession = Depends(get_session)
):
return await service.update_customer(db, customerId, customer)
@router.delete("/customers/{customerId}", response_model=None)
async def delete_customer(
customerId: int, db: AsyncSession = Depends(get_session)
):
await service.delete_customer(db, customerId)
リソース毎にまとめたルーターをアプリケーションに組み込みます。
from fastapi import FastAPI
from src.customers.router import router as customers_router
app = FastAPI()
app.include_router(customers_router)
残りのリソースを実装していきます。