Python + FastAPI — Windsurf Rules
Windsurf rules for FastAPI services with endpoint-to-database flow discipline.
Install path
Use this file for each supported tool in your project.
- Windsurf: Save as
.windsurfrulesin your project at.windsurfrules.
Configuration
.windsurfrules
1# Python + FastAPI — Windsurf Rules23FastAPI backend with layered architecture. Every feature flows endpoint → service → repository → database. Cascade: when implementing a feature, wire all layers — don't leave gaps between the API contract and the data layer.45## Quick Reference67| Area | Convention |8|---|---|9| Framework | FastAPI 0.110+ |10| Language | Python 3.12+, full type annotations |11| Validation | Pydantic v2 (BaseModel with `model_config`) |12| ORM | SQLAlchemy 2.0 (async, mapped_column style) |13| Database | PostgreSQL via `asyncpg` |14| Migrations | Alembic (async) |15| Testing | pytest + pytest-asyncio + httpx (AsyncClient) |16| Linting | Ruff (`ruff check . && ruff format .`) |17| Type checking | mypy --strict or pyright |18| Dependency injection | FastAPI `Depends()` |19| Package manager | Check for `uv.lock` → uv, `poetry.lock` → Poetry |2021## Project Structure2223```24├── src/25│ └── app/26│ ├── main.py # FastAPI app factory, lifespan, middleware27│ ├── config.py # Pydantic Settings (env parsing)28│ ├── database.py # Engine, session factory, Base29│ ├── dependencies.py # Shared Depends() providers30│ ├── exceptions.py # Custom exceptions + handlers31│ ├── models/ # SQLAlchemy ORM models32│ │ ├── __init__.py33│ │ ├── user.py34│ │ └── order.py35│ ├── schemas/ # Pydantic request/response schemas36│ │ ├── user.py37│ │ └── order.py38│ ├── repositories/ # Data access (SQL queries)39│ │ ├── base.py40│ │ ├── user.py41│ │ └── order.py42│ ├── services/ # Business logic43│ │ ├── user.py44│ │ └── order.py45│ └── api/46│ ├── router.py # Root router that includes all sub-routers47│ └── v1/48│ ├── users.py # User endpoints49│ └── orders.py # Order endpoints50├── alembic/51│ ├── env.py52│ └── versions/53├── tests/54│ ├── conftest.py # Fixtures: app, client, db session55│ ├── unit/56│ └── integration/57├── alembic.ini58├── pyproject.toml59└── Dockerfile60```6162**Dependency direction:** api → service → repository → model. Never skip layers. Endpoints never import SQLAlchemy. Repositories never raise HTTP exceptions.6364## Full-Stack Flow Pattern6566When Cascade implements a feature, trace from endpoint to database and back.6768### Example: Creating an Order6970**Step 1: ORM Model** (`models/order.py`)7172```python73from datetime import datetime74from decimal import Decimal75from sqlalchemy import ForeignKey, String, Numeric, Enum as SAEnum76from sqlalchemy.orm import Mapped, mapped_column, relationship77from app.database import Base78import enum7980class OrderStatus(str, enum.Enum):81 PENDING = "pending"82 CONFIRMED = "confirmed"83 SHIPPED = "shipped"84 DELIVERED = "delivered"85 CANCELLED = "cancelled"8687class Order(Base):88 __tablename__ = "orders"8990 id: Mapped[int] = mapped_column(primary_key=True)91 reference: Mapped[str] = mapped_column(String(50), unique=True, index=True)92 customer_id: Mapped[int] = mapped_column(ForeignKey("customers.id"))93 status: Mapped[OrderStatus] = mapped_column(94 SAEnum(OrderStatus, native_enum=False), default=OrderStatus.PENDING95 )96 total: Mapped[Decimal] = mapped_column(Numeric(10, 2))97 notes: Mapped[str | None] = mapped_column(String(500), nullable=True)98 created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)99 updated_at: Mapped[datetime] = mapped_column(100 default=datetime.utcnow, onupdate=datetime.utcnow101 )102103 customer: Mapped["Customer"] = relationship(back_populates="orders")104 items: Mapped[list["OrderItem"]] = relationship(105 back_populates="order", cascade="all, delete-orphan"106 )107```108109**Step 2: Pydantic Schemas** (`schemas/order.py`)110111```python112from datetime import datetime113from decimal import Decimal114from pydantic import BaseModel, Field, field_validator, model_validator115116class OrderItemCreate(BaseModel):117 product_id: int118 quantity: int = Field(gt=0)119 unit_price: Decimal = Field(gt=0, decimal_places=2)120121class OrderCreate(BaseModel):122 model_config = {"strict": True}123124 customer_id: int125 items: list[OrderItemCreate] = Field(min_length=1)126 notes: str | None = Field(None, max_length=500)127128 @field_validator("items")129 @classmethod130 def unique_products(cls, items: list[OrderItemCreate]) -> list[OrderItemCreate]:131 product_ids = [item.product_id for item in items]132 if len(product_ids) != len(set(product_ids)):133 raise ValueError("Duplicate product IDs not allowed")134 return items135136class OrderResponse(BaseModel):137 model_config = {"from_attributes": True}138139 id: int140 reference: str141 customer_id: int142 status: str143 total: Decimal144 notes: str | None145 created_at: datetime146 items: list["OrderItemResponse"]147148class OrderItemResponse(BaseModel):149 model_config = {"from_attributes": True}150151 id: int152 product_id: int153 quantity: int154 unit_price: Decimal155```156157**Step 3: Repository** (`repositories/order.py`)158159```python160from sqlalchemy import select161from sqlalchemy.ext.asyncio import AsyncSession162from sqlalchemy.orm import selectinload163164class OrderRepository:165 def __init__(self, session: AsyncSession) -> None:166 self.session = session167168 async def get_by_id(self, order_id: int) -> Order | None:169 stmt = select(Order).where(Order.id == order_id).options(selectinload(Order.items))170 result = await self.session.execute(stmt)171 return result.scalar_one_or_none()172173 async def create(self, order: Order) -> Order:174 self.session.add(order)175 await self.session.flush()176 await self.session.refresh(order, ["items"])177 return order178179 async def list_by_customer(180 self, customer_id: int, *, limit: int = 50, offset: int = 0181 ) -> list[Order]:182 stmt = (183 select(Order).where(Order.customer_id == customer_id)184 .options(selectinload(Order.items))185 .order_by(Order.created_at.desc()).limit(limit).offset(offset)186 )187 return list((await self.session.execute(stmt)).scalars().all())188```189190**Step 4: Service** (`services/order.py`)191192```python193from decimal import Decimal194from app.exceptions import NotFoundError, ConflictError195from app.models.order import Order, OrderItem, OrderStatus196from app.repositories.order import OrderRepository197from app.schemas.order import OrderCreate198import uuid199200class OrderService:201 def __init__(self, repo: OrderRepository) -> None:202 self.repo = repo203204 async def create_order(self, data: OrderCreate) -> Order:205 reference = f"ORD-{uuid.uuid4().hex[:8].upper()}"206207 items = [208 OrderItem(209 product_id=item.product_id,210 quantity=item.quantity,211 unit_price=item.unit_price,212 )213 for item in data.items214 ]215 total = sum(216 Decimal(str(item.quantity)) * item.unit_price for item in data.items217 )218219 order = Order(220 reference=reference,221 customer_id=data.customer_id,222 items=items,223 total=total,224 notes=data.notes,225 )226 return await self.repo.create(order)227228 async def get_order(self, order_id: int) -> Order:229 order = await self.repo.get_by_id(order_id)230 if order is None:231 raise NotFoundError("Order", str(order_id))232 return order233234 async def cancel_order(self, order_id: int) -> Order:235 order = await self.get_order(order_id)236 if order.status != OrderStatus.PENDING:237 raise ConflictError(238 f"Cannot cancel order in '{order.status.value}' status"239 )240 order.status = OrderStatus.CANCELLED241 return order242```243244**Step 5: Endpoint** (`api/v1/orders.py`)245246```python247from fastapi import APIRouter, Depends, status248from app.dependencies import get_order_service249from app.schemas.order import OrderCreate, OrderResponse250from app.services.order import OrderService251252router = APIRouter(prefix="/orders", tags=["orders"])253254@router.post("/", response_model=OrderResponse, status_code=status.HTTP_201_CREATED)255async def create_order(256 data: OrderCreate,257 service: OrderService = Depends(get_order_service),258) -> OrderResponse:259 order = await service.create_order(data)260 return OrderResponse.model_validate(order)261262@router.get("/{order_id}", response_model=OrderResponse)263async def get_order(264 order_id: int,265 service: OrderService = Depends(get_order_service),266) -> OrderResponse:267 order = await service.get_order(order_id)268 return OrderResponse.model_validate(order)269270@router.post("/{order_id}/cancel", response_model=OrderResponse)271async def cancel_order(272 order_id: int,273 service: OrderService = Depends(get_order_service),274) -> OrderResponse:275 order = await service.cancel_order(order_id)276 return OrderResponse.model_validate(order)277```278279## Dependency Injection280281Chain `Depends()` calls to wire layers: `get_db_session` → `get_order_repo(session)` → `get_order_service(repo)`. Define providers in `dependencies.py`. The session provider uses `async with session_factory() as session` + `session.begin()` in a generator (`yield`).282283## Exception Handling284285```python286# exceptions.py287class AppError(Exception):288 def __init__(self, message: str, code: str = "INTERNAL_ERROR", status_code: int = 500) -> None:289 super().__init__(message)290 self.message = message291 self.code = code292 self.status_code = status_code293294class NotFoundError(AppError):295 def __init__(self, resource: str, resource_id: str) -> None:296 super().__init__(f"{resource} '{resource_id}' not found", "NOT_FOUND", 404)297298class ConflictError(AppError):299 def __init__(self, message: str) -> None:300 super().__init__(message, "CONFLICT", 409)301```302303Register `AppError` as an exception handler in `main.py` to map `status_code` and `code` to JSON error responses.304305## Testing306307### Test Client Setup308309```python310# tests/conftest.py311import pytest312from httpx import ASGITransport, AsyncClient313from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession314from app.main import create_app315from app.database import Base316from app.dependencies import get_db_session317318@pytest.fixture319async def db_session():320 engine = create_async_engine("sqlite+aiosqlite:///:memory:")321 async with engine.begin() as conn:322 await conn.run_sync(Base.metadata.create_all)323324 session_factory = async_sessionmaker(engine, class_=AsyncSession)325 async with session_factory() as session:326 async with session.begin():327 yield session328329 async with engine.begin() as conn:330 await conn.run_sync(Base.metadata.drop_all)331 await engine.dispose()332333@pytest.fixture334async def client(db_session: AsyncSession):335 app = create_app()336 app.dependency_overrides[get_db_session] = lambda: db_session337 async with AsyncClient(338 transport=ASGITransport(app=app), base_url="http://test"339 ) as client:340 yield client341```342343### Integration Tests344345```python346# tests/integration/test_orders.py347@pytest.mark.asyncio348async def test_create_order(client: AsyncClient) -> None:349 response = await client.post("/api/v1/orders/", json={350 "customer_id": 1,351 "items": [{"product_id": 1, "quantity": 2, "unit_price": "29.99"}],352 })353 assert response.status_code == 201354 assert response.json()["status"] == "pending"355 assert response.json()["total"] == "59.98"356357@pytest.mark.asyncio358async def test_create_order_empty_items(client: AsyncClient) -> None:359 response = await client.post("/api/v1/orders/", json={"customer_id": 1, "items": []})360 assert response.status_code == 422361```362363### Unit Tests for Services364365```python366# tests/unit/test_order_service.py367@pytest.mark.asyncio368async def test_cancel_pending_order() -> None:369 mock_repo = AsyncMock()370 mock_repo.get_by_id.return_value = Order(id=1, status=OrderStatus.PENDING)371 service = OrderService(mock_repo)372373 result = await service.cancel_order(1)374 assert result.status == OrderStatus.CANCELLED375376@pytest.mark.asyncio377async def test_cancel_shipped_order_raises() -> None:378 mock_repo = AsyncMock()379 mock_repo.get_by_id.return_value = Order(id=1, status=OrderStatus.SHIPPED)380 service = OrderService(mock_repo)381382 with pytest.raises(ConflictError, match="Cannot cancel"):383 await service.cancel_order(1)384```385386## Cascade Flow Guidance387388When implementing an endpoint end-to-end:3893901. **Model first.** Check if the SQLAlchemy model exists or needs columns. Add migration if changed.3912. **Schema second.** Define Pydantic request/response schemas. This is the API contract.3923. **Repository third.** Write the data access method. Keep SQL here.3934. **Service fourth.** Business logic, validation beyond schema, orchestration.3945. **Endpoint last.** Thin wrapper: parse request → call service → return response.3956. **Tests alongside.** Write at least one happy-path and one error-path test.396397After schema changes: `alembic revision --autogenerate -m "<description>"` then `alembic upgrade head`.398399## Commands400401```bash402uvicorn app.main:app --reload # Dev server403pytest -x -v # Run tests (stop on first failure)404pytest --cov=app --cov-report=term-missing # Coverage report405ruff check . && ruff format . # Lint + format406mypy src/ # Type check407alembic upgrade head # Apply migrations408alembic revision --autogenerate -m "msg" # Generate migration409alembic downgrade -1 # Rollback one migration410```411
Community feedback
0 found this helpful
Works with: