1
/
5

FastAPIでREST(その2)

FastAPIも導入すれば、とりあえず動くところまでは簡単でした。

設計と実装

プロジェクトの構造はどうする?

NestJSはCLIでひな型の自動生成をしてくれたので、それに乗っかることにしました。スキーマ単位の構造は、スキーマ駆動でマイクロサービスな感じで、これからはこういう構造になっていくのかなぁと思いました。

で、FastAPIです。公式では、どうやらレイヤ構造を推奨しているようです。


Bigger Applications - Multiple Files - FastAPI
FastAPI framework, high performance, easy to learn, fast to code, ready for production
https://fastapi.tiangolo.com/tutorial/bigger-applications/

多くの記事で上記に近い構造が紹介されていましたが、ここはNestJSのようにスキーマ単位の構造を採用してみようと思います。ディレクトリがスキーマ名になり、機能名がファイル名になるだけなので、縦と横をひっくり返したようになるだけです。

$ tree src -I __pycache__

src
├── config.py
├── customers
│ ├── __init__.py
│ ├── models.py
│ ├── router.py
│ ├── schemas.py
│ └── service.py
├── database.py
├── __init__.py
├── main.py
├── orders
└── products

最初に実装するcustomersと同様にordersとproductsにも実装していきます。

SQLAlchemy & Postgres

折角なので、データベースアクセスも非同期にしてみます。

(venv) $ pip install sqlalchemy

(venv) $ pip install asyncpg

共通部

pydanticのBeseSettingsを使って.envの設定を取り込みます。


from pydantic import BaseSettings



class Settings(BaseSettings):
DB_HOST: str
DB_USER: str
DB_PASS: str

class Config:
env_file = ".env.dev"

上記を使用して、SQLAlchemyの共通設定をします。SQLAlchemy1.4相当の実装例が多かったので、本家のドキュメントを読んで、できるだけ2.0相当で、かつ、非同期を使うように実装しました。


from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

from sqlalchemy.orm import DeclarativeBase

from . import config


settings = config.Settings()
DB_URL = "postgresql+asyncpg://" \
f"{settings.DB_USER}:{settings.DB_PASS}@{settings.DB_HOST}:5432/postgres"

engine = create_async_engine(DB_URL, echo=True)
async_session = async_sessionmaker(engine, expire_on_commit=False)


class Base(DeclarativeBase):
pass


async def get_session():
async with async_session() as session:
yield session

あとは、main.pyですが、これは個別部の後にします。

customers

まず、データベースのモデルを定義します。


from sqlalchemy.orm import Mapped, mapped_column


from src.database import Base


class Customer(Base):
__tablename__ = "customer"

customerId: Mapped[int] = mapped_column(
name="customer_id", primary_key=True)
name: Mapped[str]
address: Mapped[str]

def __repr__(self) -> str:
return f"Customer(customerId={self.customerId!r}, " \
f"name={self.name!r}, address={self.address!r})"


次に、リソース表現としてのリクエスト、レスポンスのスキーマをpydanticのBaseModelで定義します。


from pydantic import BaseModel



class Customer(BaseModel):
customerId: int
name: str
address: str

class Config:
orm_mode = True

orm_mode = True については、以下の意味になります。

Pydantic's orm_mode will tell the Pydantic model to read the data even if it is not a dict, but an ORM model (or any other arbitrary object with attributes).


SQL (Relational) Databases - FastAPI
FastAPI framework, high performance, easy to learn, fast to code, ready for production
https://fastapi.tiangolo.com/ja/tutorial/sql-databases/

モデルとスキーマの準備ができたら、この両者の変換とSQLAlchemyへの命令を実装します。

from fastapi import HTTPException

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

from . import models, schemas


async def get_customer(db: AsyncSession, customerId: int):
result = await (db.execute(select(models.Customer).filter(
models.Customer.customerId == customerId)))
customer = result.first()
if customer is None:
raise HTTPException(status_code=404, detail="Customer not found")
return customer[0]


async def create_customer(db: AsyncSession, customer: schemas.Customer):
db_customer = models.Customer(
customerId=customer.customerId,
name=customer.name,
address=customer.address)
db.add(db_customer)
await db.commit()
await db.refresh(db_customer)
return db_customer


async def update_customer(
db: AsyncSession, customerId: int, customer: schemas.Customer
):
original = await get_customer(db, customerId)
original.name = customer.name
original.address = customer.address
db.add(original)
await db.commit()
await db.refresh(original)
return original


async def delete_customer(db: AsyncSession, customerId: int):
original = await get_customer(db, customerId)
await db.delete(original)
await db.commit()

そして、最後にルーターを定義します。

from fastapi import APIRouter, Depends

from sqlalchemy.ext.asyncio import AsyncSession

from src.database import get_session
from . import schemas, service


router = APIRouter()


@router.post("/customers", response_model=schemas.Customer)
async def create_customer(
customer: schemas.Customer, db: AsyncSession = Depends(get_session)
):
return await service.create_customer(db, customer)


@router.get("/customers/{customerId}", response_model=schemas.Customer)
async def get_customer(
customerId: int, db: AsyncSession = Depends(get_session)
):
return await service.get_customer(db, customerId)


@router.put("/customers/{customerId}", response_model=schemas.Customer)
async def update_customer(
customerId: int,
customer: schemas.Customer,
db: AsyncSession = Depends(get_session)
):
return await service.update_customer(db, customerId, customer)


@router.delete("/customers/{customerId}", response_model=None)
async def delete_customer(
customerId: int, db: AsyncSession = Depends(get_session)
):
await service.delete_customer(db, customerId)

もう一度、共通部

リソース毎にまとめたルーターをアプリケーションに組み込みます。

from fastapi import FastAPI


from src.customers.router import router as customers_router

app = FastAPI()
app.include_router(customers_router)

次回は

残りのリソースを実装していきます。

Invitation from 株式会社ROBON
If this story triggered your interest, have a chat with the team?
株式会社ROBON's job postings

Weekly ranking

Show other rankings
Like 荒木 岳夫's Story
Let 荒木 岳夫's company know you're interested in their content