dotmd

Python + FastAPI — Windsurf Rules

Windsurf rules for FastAPI services with endpoint-to-database flow discipline.

By dotmd TeamCC0Published Feb 19, 2026View source ↗

Install path

Use this file for each supported tool in your project.

  • Windsurf: Save as .windsurfrules in your project at .windsurfrules.

Configuration

.windsurfrules

1# Python + FastAPI — Windsurf Rules
2
3FastAPI backend with layered architecture. Every feature flows endpoint → service → repository → database. Cascade: when implementing a feature, wire all layers — don't leave gaps between the API contract and the data layer.
4
5## Quick Reference
6
7| Area | Convention |
8|---|---|
9| Framework | FastAPI 0.110+ |
10| Language | Python 3.12+, full type annotations |
11| Validation | Pydantic v2 (BaseModel with `model_config`) |
12| ORM | SQLAlchemy 2.0 (async, mapped_column style) |
13| Database | PostgreSQL via `asyncpg` |
14| Migrations | Alembic (async) |
15| Testing | pytest + pytest-asyncio + httpx (AsyncClient) |
16| Linting | Ruff (`ruff check . && ruff format .`) |
17| Type checking | mypy --strict or pyright |
18| Dependency injection | FastAPI `Depends()` |
19| Package manager | Check for `uv.lock` → uv, `poetry.lock` → Poetry |
20
21## Project Structure
22
23```
24├── src/
25│ └── app/
26│ ├── main.py # FastAPI app factory, lifespan, middleware
27│ ├── config.py # Pydantic Settings (env parsing)
28│ ├── database.py # Engine, session factory, Base
29│ ├── dependencies.py # Shared Depends() providers
30│ ├── exceptions.py # Custom exceptions + handlers
31│ ├── models/ # SQLAlchemy ORM models
32│ │ ├── __init__.py
33│ │ ├── user.py
34│ │ └── order.py
35│ ├── schemas/ # Pydantic request/response schemas
36│ │ ├── user.py
37│ │ └── order.py
38│ ├── repositories/ # Data access (SQL queries)
39│ │ ├── base.py
40│ │ ├── user.py
41│ │ └── order.py
42│ ├── services/ # Business logic
43│ │ ├── user.py
44│ │ └── order.py
45│ └── api/
46│ ├── router.py # Root router that includes all sub-routers
47│ └── v1/
48│ ├── users.py # User endpoints
49│ └── orders.py # Order endpoints
50├── alembic/
51│ ├── env.py
52│ └── versions/
53├── tests/
54│ ├── conftest.py # Fixtures: app, client, db session
55│ ├── unit/
56│ └── integration/
57├── alembic.ini
58├── pyproject.toml
59└── Dockerfile
60```
61
62**Dependency direction:** api → service → repository → model. Never skip layers. Endpoints never import SQLAlchemy. Repositories never raise HTTP exceptions.
63
64## Full-Stack Flow Pattern
65
66When Cascade implements a feature, trace from endpoint to database and back.
67
68### Example: Creating an Order
69
70**Step 1: ORM Model** (`models/order.py`)
71
72```python
73from datetime import datetime
74from decimal import Decimal
75from sqlalchemy import ForeignKey, String, Numeric, Enum as SAEnum
76from sqlalchemy.orm import Mapped, mapped_column, relationship
77from app.database import Base
78import enum
79
80class OrderStatus(str, enum.Enum):
81 PENDING = "pending"
82 CONFIRMED = "confirmed"
83 SHIPPED = "shipped"
84 DELIVERED = "delivered"
85 CANCELLED = "cancelled"
86
87class Order(Base):
88 __tablename__ = "orders"
89
90 id: Mapped[int] = mapped_column(primary_key=True)
91 reference: Mapped[str] = mapped_column(String(50), unique=True, index=True)
92 customer_id: Mapped[int] = mapped_column(ForeignKey("customers.id"))
93 status: Mapped[OrderStatus] = mapped_column(
94 SAEnum(OrderStatus, native_enum=False), default=OrderStatus.PENDING
95 )
96 total: Mapped[Decimal] = mapped_column(Numeric(10, 2))
97 notes: Mapped[str | None] = mapped_column(String(500), nullable=True)
98 created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
99 updated_at: Mapped[datetime] = mapped_column(
100 default=datetime.utcnow, onupdate=datetime.utcnow
101 )
102
103 customer: Mapped["Customer"] = relationship(back_populates="orders")
104 items: Mapped[list["OrderItem"]] = relationship(
105 back_populates="order", cascade="all, delete-orphan"
106 )
107```
108
109**Step 2: Pydantic Schemas** (`schemas/order.py`)
110
111```python
112from datetime import datetime
113from decimal import Decimal
114from pydantic import BaseModel, Field, field_validator, model_validator
115
116class OrderItemCreate(BaseModel):
117 product_id: int
118 quantity: int = Field(gt=0)
119 unit_price: Decimal = Field(gt=0, decimal_places=2)
120
121class OrderCreate(BaseModel):
122 model_config = {"strict": True}
123
124 customer_id: int
125 items: list[OrderItemCreate] = Field(min_length=1)
126 notes: str | None = Field(None, max_length=500)
127
128 @field_validator("items")
129 @classmethod
130 def unique_products(cls, items: list[OrderItemCreate]) -> list[OrderItemCreate]:
131 product_ids = [item.product_id for item in items]
132 if len(product_ids) != len(set(product_ids)):
133 raise ValueError("Duplicate product IDs not allowed")
134 return items
135
136class OrderResponse(BaseModel):
137 model_config = {"from_attributes": True}
138
139 id: int
140 reference: str
141 customer_id: int
142 status: str
143 total: Decimal
144 notes: str | None
145 created_at: datetime
146 items: list["OrderItemResponse"]
147
148class OrderItemResponse(BaseModel):
149 model_config = {"from_attributes": True}
150
151 id: int
152 product_id: int
153 quantity: int
154 unit_price: Decimal
155```
156
157**Step 3: Repository** (`repositories/order.py`)
158
159```python
160from sqlalchemy import select
161from sqlalchemy.ext.asyncio import AsyncSession
162from sqlalchemy.orm import selectinload
163
164class OrderRepository:
165 def __init__(self, session: AsyncSession) -> None:
166 self.session = session
167
168 async def get_by_id(self, order_id: int) -> Order | None:
169 stmt = select(Order).where(Order.id == order_id).options(selectinload(Order.items))
170 result = await self.session.execute(stmt)
171 return result.scalar_one_or_none()
172
173 async def create(self, order: Order) -> Order:
174 self.session.add(order)
175 await self.session.flush()
176 await self.session.refresh(order, ["items"])
177 return order
178
179 async def list_by_customer(
180 self, customer_id: int, *, limit: int = 50, offset: int = 0
181 ) -> list[Order]:
182 stmt = (
183 select(Order).where(Order.customer_id == customer_id)
184 .options(selectinload(Order.items))
185 .order_by(Order.created_at.desc()).limit(limit).offset(offset)
186 )
187 return list((await self.session.execute(stmt)).scalars().all())
188```
189
190**Step 4: Service** (`services/order.py`)
191
192```python
193from decimal import Decimal
194from app.exceptions import NotFoundError, ConflictError
195from app.models.order import Order, OrderItem, OrderStatus
196from app.repositories.order import OrderRepository
197from app.schemas.order import OrderCreate
198import uuid
199
200class OrderService:
201 def __init__(self, repo: OrderRepository) -> None:
202 self.repo = repo
203
204 async def create_order(self, data: OrderCreate) -> Order:
205 reference = f"ORD-{uuid.uuid4().hex[:8].upper()}"
206
207 items = [
208 OrderItem(
209 product_id=item.product_id,
210 quantity=item.quantity,
211 unit_price=item.unit_price,
212 )
213 for item in data.items
214 ]
215 total = sum(
216 Decimal(str(item.quantity)) * item.unit_price for item in data.items
217 )
218
219 order = Order(
220 reference=reference,
221 customer_id=data.customer_id,
222 items=items,
223 total=total,
224 notes=data.notes,
225 )
226 return await self.repo.create(order)
227
228 async def get_order(self, order_id: int) -> Order:
229 order = await self.repo.get_by_id(order_id)
230 if order is None:
231 raise NotFoundError("Order", str(order_id))
232 return order
233
234 async def cancel_order(self, order_id: int) -> Order:
235 order = await self.get_order(order_id)
236 if order.status != OrderStatus.PENDING:
237 raise ConflictError(
238 f"Cannot cancel order in '{order.status.value}' status"
239 )
240 order.status = OrderStatus.CANCELLED
241 return order
242```
243
244**Step 5: Endpoint** (`api/v1/orders.py`)
245
246```python
247from fastapi import APIRouter, Depends, status
248from app.dependencies import get_order_service
249from app.schemas.order import OrderCreate, OrderResponse
250from app.services.order import OrderService
251
252router = APIRouter(prefix="/orders", tags=["orders"])
253
254@router.post("/", response_model=OrderResponse, status_code=status.HTTP_201_CREATED)
255async def create_order(
256 data: OrderCreate,
257 service: OrderService = Depends(get_order_service),
258) -> OrderResponse:
259 order = await service.create_order(data)
260 return OrderResponse.model_validate(order)
261
262@router.get("/{order_id}", response_model=OrderResponse)
263async def get_order(
264 order_id: int,
265 service: OrderService = Depends(get_order_service),
266) -> OrderResponse:
267 order = await service.get_order(order_id)
268 return OrderResponse.model_validate(order)
269
270@router.post("/{order_id}/cancel", response_model=OrderResponse)
271async def cancel_order(
272 order_id: int,
273 service: OrderService = Depends(get_order_service),
274) -> OrderResponse:
275 order = await service.cancel_order(order_id)
276 return OrderResponse.model_validate(order)
277```
278
279## Dependency Injection
280
281Chain `Depends()` calls to wire layers: `get_db_session` → `get_order_repo(session)` → `get_order_service(repo)`. Define providers in `dependencies.py`. The session provider uses `async with session_factory() as session` + `session.begin()` in a generator (`yield`).
282
283## Exception Handling
284
285```python
286# exceptions.py
287class AppError(Exception):
288 def __init__(self, message: str, code: str = "INTERNAL_ERROR", status_code: int = 500) -> None:
289 super().__init__(message)
290 self.message = message
291 self.code = code
292 self.status_code = status_code
293
294class NotFoundError(AppError):
295 def __init__(self, resource: str, resource_id: str) -> None:
296 super().__init__(f"{resource} '{resource_id}' not found", "NOT_FOUND", 404)
297
298class ConflictError(AppError):
299 def __init__(self, message: str) -> None:
300 super().__init__(message, "CONFLICT", 409)
301```
302
303Register `AppError` as an exception handler in `main.py` to map `status_code` and `code` to JSON error responses.
304
305## Testing
306
307### Test Client Setup
308
309```python
310# tests/conftest.py
311import pytest
312from httpx import ASGITransport, AsyncClient
313from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
314from app.main import create_app
315from app.database import Base
316from app.dependencies import get_db_session
317
318@pytest.fixture
319async def db_session():
320 engine = create_async_engine("sqlite+aiosqlite:///:memory:")
321 async with engine.begin() as conn:
322 await conn.run_sync(Base.metadata.create_all)
323
324 session_factory = async_sessionmaker(engine, class_=AsyncSession)
325 async with session_factory() as session:
326 async with session.begin():
327 yield session
328
329 async with engine.begin() as conn:
330 await conn.run_sync(Base.metadata.drop_all)
331 await engine.dispose()
332
333@pytest.fixture
334async def client(db_session: AsyncSession):
335 app = create_app()
336 app.dependency_overrides[get_db_session] = lambda: db_session
337 async with AsyncClient(
338 transport=ASGITransport(app=app), base_url="http://test"
339 ) as client:
340 yield client
341```
342
343### Integration Tests
344
345```python
346# tests/integration/test_orders.py
347@pytest.mark.asyncio
348async def test_create_order(client: AsyncClient) -> None:
349 response = await client.post("/api/v1/orders/", json={
350 "customer_id": 1,
351 "items": [{"product_id": 1, "quantity": 2, "unit_price": "29.99"}],
352 })
353 assert response.status_code == 201
354 assert response.json()["status"] == "pending"
355 assert response.json()["total"] == "59.98"
356
357@pytest.mark.asyncio
358async def test_create_order_empty_items(client: AsyncClient) -> None:
359 response = await client.post("/api/v1/orders/", json={"customer_id": 1, "items": []})
360 assert response.status_code == 422
361```
362
363### Unit Tests for Services
364
365```python
366# tests/unit/test_order_service.py
367@pytest.mark.asyncio
368async def test_cancel_pending_order() -> None:
369 mock_repo = AsyncMock()
370 mock_repo.get_by_id.return_value = Order(id=1, status=OrderStatus.PENDING)
371 service = OrderService(mock_repo)
372
373 result = await service.cancel_order(1)
374 assert result.status == OrderStatus.CANCELLED
375
376@pytest.mark.asyncio
377async def test_cancel_shipped_order_raises() -> None:
378 mock_repo = AsyncMock()
379 mock_repo.get_by_id.return_value = Order(id=1, status=OrderStatus.SHIPPED)
380 service = OrderService(mock_repo)
381
382 with pytest.raises(ConflictError, match="Cannot cancel"):
383 await service.cancel_order(1)
384```
385
386## Cascade Flow Guidance
387
388When implementing an endpoint end-to-end:
389
3901. **Model first.** Check if the SQLAlchemy model exists or needs columns. Add migration if changed.
3912. **Schema second.** Define Pydantic request/response schemas. This is the API contract.
3923. **Repository third.** Write the data access method. Keep SQL here.
3934. **Service fourth.** Business logic, validation beyond schema, orchestration.
3945. **Endpoint last.** Thin wrapper: parse request → call service → return response.
3956. **Tests alongside.** Write at least one happy-path and one error-path test.
396
397After schema changes: `alembic revision --autogenerate -m "<description>"` then `alembic upgrade head`.
398
399## Commands
400
401```bash
402uvicorn app.main:app --reload # Dev server
403pytest -x -v # Run tests (stop on first failure)
404pytest --cov=app --cov-report=term-missing # Coverage report
405ruff check . && ruff format . # Lint + format
406mypy src/ # Type check
407alembic upgrade head # Apply migrations
408alembic revision --autogenerate -m "msg" # Generate migration
409alembic downgrade -1 # Rollback one migration
410```
411

Community feedback

0 found this helpful

Works with: