# Send FastAPI Production Engineering to your agent
Hand the extracted package to your coding agent with a concrete install brief instead of figuring it out manually.
## Fast path
- Download the package from Yavira.
- Extract it into a folder your agent can access.
- Paste one of the prompts below and point your agent at the extracted folder.
## Suggested prompts
### New install

```text
I downloaded a skill package from Yavira. Read SKILL.md from the extracted folder and install it by following the included instructions. Then review README.md for any prerequisites, environment setup, or post-install checks. Tell me what you changed and call out any manual steps you could not complete.
```
### Upgrade existing

```text
I downloaded an updated skill package from Yavira. Read SKILL.md from the extracted folder, compare it with my current installation, and upgrade it while preserving any custom configuration unless the package docs explicitly say otherwise. Then review README.md for any prerequisites, environment setup, or post-install checks. Summarize what changed and any follow-up checks I should run.
```
## Machine-readable fields
```json
{
  "schemaVersion": "1.0",
  "item": {
    "slug": "afrexai-fastapi-production",
    "name": "FastAPI Production Engineering",
    "source": "tencent",
    "type": "skill",
    "category": "效率提升",
    "sourceUrl": "https://clawhub.ai/1kalin/afrexai-fastapi-production",
    "canonicalUrl": "https://clawhub.ai/1kalin/afrexai-fastapi-production",
    "targetPlatform": "OpenClaw"
  },
  "install": {
    "downloadUrl": "/downloads/afrexai-fastapi-production",
    "sourceDownloadUrl": "https://wry-manatee-359.convex.site/api/v1/download?slug=afrexai-fastapi-production",
    "sourcePlatform": "tencent",
    "targetPlatform": "OpenClaw",
    "packageFormat": "ZIP package",
    "primaryDoc": "SKILL.md",
    "includedAssets": [
      "README.md",
      "SKILL.md"
    ],
    "downloadMode": "redirect",
    "sourceHealth": {
      "source": "tencent",
      "slug": "afrexai-fastapi-production",
      "status": "healthy",
      "reason": "direct_download_ok",
      "recommendedAction": "download",
      "checkedAt": "2026-04-29T21:29:55.086Z",
      "expiresAt": "2026-05-06T21:29:55.086Z",
      "httpStatus": 200,
      "finalUrl": "https://wry-manatee-359.convex.site/api/v1/download?slug=afrexai-fastapi-production",
      "contentType": "application/zip",
      "probeMethod": "head",
      "details": {
        "probeUrl": "https://wry-manatee-359.convex.site/api/v1/download?slug=afrexai-fastapi-production",
        "contentDisposition": "attachment; filename=\"afrexai-fastapi-production-1.0.0.zip\"",
        "redirectLocation": null,
        "bodySnippet": null,
        "slug": "afrexai-fastapi-production"
      },
      "scope": "item",
      "summary": "Item download looks usable.",
      "detail": "Yavira can redirect you to the upstream package for this item.",
      "primaryActionLabel": "Download for OpenClaw",
      "primaryActionHref": "/downloads/afrexai-fastapi-production"
    },
    "validation": {
      "installChecklist": [
        "Use the Yavira download entry.",
        "Review SKILL.md after the package is downloaded.",
        "Confirm the extracted package contains the expected setup assets."
      ],
      "postInstallChecks": [
        "Confirm the extracted package includes the expected docs or setup files.",
        "Validate the skill or prompts are available in your target agent workspace.",
        "Capture any manual follow-up steps the agent could not complete."
      ]
    }
  },
  "links": {
    "detailUrl": "https://openagent3.xyz/skills/afrexai-fastapi-production",
    "downloadUrl": "https://openagent3.xyz/downloads/afrexai-fastapi-production",
    "agentUrl": "https://openagent3.xyz/skills/afrexai-fastapi-production/agent",
    "manifestUrl": "https://openagent3.xyz/skills/afrexai-fastapi-production/agent.json",
    "briefUrl": "https://openagent3.xyz/skills/afrexai-fastapi-production/agent.md"
  }
}
```
## Documentation

### FastAPI Production Engineering

Complete methodology for building, deploying, and scaling production FastAPI applications. Not a tutorial — a production operating system.

### Quick Health Check (/16)

Score 2 points each. Total < 8 = critical work needed.

SignalHealthyUnhealthyType safetyPydantic v2 models everywheredict returns, no validationError handlingStructured error hierarchyBare HTTPException stringsAuthJWT + dependency injectionManual token parsingTesting80%+ coverage, async testsNo tests or sync-onlyDatabaseAsync ORM, migrationsRaw SQL, no migrationsObservabilityStructured logging + tracingprint() debuggingDeploymentMulti-stage Docker, health checksuvicorn main:app on bare metalDocumentationAuto-generated, accurate OpenAPIDefault /docs untouched

### Recommended Structure

src/
├── app/
│   ├── __init__.py
│   ├── main.py              # App factory
│   ├── config.py             # Pydantic Settings
│   ├── dependencies.py       # Shared DI
│   ├── middleware.py          # Custom middleware
│   ├── features/
│   │   ├── users/
│   │   │   ├── __init__.py
│   │   │   ├── router.py     # Endpoints
│   │   │   ├── schemas.py    # Pydantic models
│   │   │   ├── service.py    # Business logic
│   │   │   ├── repository.py # Data access
│   │   │   ├── models.py     # SQLAlchemy/SQLModel
│   │   │   ├── dependencies.py
│   │   │   └── exceptions.py
│   │   ├── auth/
│   │   ├── orders/
│   │   └── ...
│   ├── core/
│   │   ├── database.py       # Engine, session factory
│   │   ├── security.py       # JWT, hashing
│   │   ├── errors.py         # Error hierarchy
│   │   └── logging.py        # Structlog config
│   └── shared/
│       ├── pagination.py
│       ├── filters.py
│       └── responses.py
├── migrations/               # Alembic
├── tests/
│   ├── conftest.py
│   ├── unit/
│   ├── integration/
│   └── e2e/
├── pyproject.toml
├── Dockerfile
└── docker-compose.yml

### 7 Architecture Rules

Feature-based modules — group by domain, not by layer
Router → Service → Repository — strict layering, no skipping
Dependency injection everywhere — use Depends() for testability
Pydantic models at boundaries — validate all input AND output
No business logic in routers — routers are thin, services are thick
Config via environment — Pydantic Settings with .env support
Async by default — use async def for all I/O-bound operations

### Framework Selection Context

# When to choose FastAPI over alternatives
fastapi_is_best_when:
  - "You need auto-generated OpenAPI docs"
  - "Team knows Python type hints"
  - "API-first (no server-rendered HTML as primary)"
  - "High concurrency with async I/O"
  - "Microservice or API gateway"

consider_alternatives:
  django: "Full-featured web app with admin, ORM, auth batteries"
  flask: "Simple app, team prefers explicit over magic"
  litestar: "Need WebSocket-heavy or more opinionated framework"
  hono_or_express: "Team prefers TypeScript"

### Pydantic Settings Pattern

from pydantic_settings import BaseSettings
from pydantic import SecretStr, field_validator
from functools import lru_cache

class Settings(BaseSettings):
    # App
    app_name: str = "MyAPI"
    debug: bool = False
    environment: str = "production"  # development | staging | production
    
    # Server
    host: str = "0.0.0.0"
    port: int = 8000
    workers: int = 4
    
    # Database
    database_url: SecretStr  # Required — no default
    db_pool_size: int = 20
    db_max_overflow: int = 10
    db_pool_timeout: int = 30
    
    # Auth
    jwt_secret: SecretStr  # Required
    jwt_algorithm: str = "HS256"
    jwt_expire_minutes: int = 30
    
    # Redis
    redis_url: str = "redis://localhost:6379/0"
    
    # CORS
    cors_origins: list[str] = ["http://localhost:3000"]
    
    @field_validator("environment")
    @classmethod
    def validate_environment(cls, v: str) -> str:
        allowed = {"development", "staging", "production"}
        if v not in allowed:
            raise ValueError(f"environment must be one of {allowed}")
        return v
    
    model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}

@lru_cache
def get_settings() -> Settings:
    return Settings()

### 5 Configuration Rules

Never hardcode secrets — use SecretStr for sensitive values
Fail fast — required fields have no defaults; app won't start without them
Validate at startup — use @field_validator for constraint checking
Cache settings — @lru_cache ensures single parse
Type everything — no str for structured values; use enums, Literal types

### Schema Design Patterns

from pydantic import BaseModel, Field, ConfigDict
from datetime import datetime
from uuid import UUID

# Base with common config
class AppSchema(BaseModel):
    model_config = ConfigDict(
        from_attributes=True,      # ORM mode
        str_strip_whitespace=True,  # Auto-strip
        validate_default=True,      # Validate defaults too
    )

# Input schemas (what the API accepts)
class UserCreate(AppSchema):
    email: str = Field(..., pattern=r"^[\\w\\.-]+@[\\w\\.-]+\\.\\w+$")
    name: str = Field(..., min_length=1, max_length=100)
    password: str = Field(..., min_length=8, max_length=128)

class UserUpdate(AppSchema):
    name: str | None = Field(None, min_length=1, max_length=100)
    email: str | None = Field(None, pattern=r"^[\\w\\.-]+@[\\w\\.-]+\\.\\w+$")

# Output schemas (what the API returns)
class UserResponse(AppSchema):
    id: UUID
    email: str
    name: str
    created_at: datetime
    # Note: password is NEVER in response schema

# List response with pagination
class PaginatedResponse[T](AppSchema):
    items: list[T]
    total: int
    page: int
    page_size: int
    has_next: bool

### 8 Pydantic Rules

Separate Create/Update/Response schemas — never reuse input as output
Never expose internal fields — no passwords, internal IDs, or debug info in responses
Use Field() for constraints — min/max length, regex patterns, gt/lt for numbers
Enable from_attributes=True — for ORM model → schema conversion
Use generics for wrappers — PaginatedResponse[T], ApiResponse[T]
Validate at boundaries — request body, query params, path params, headers
Use computed fields — @computed_field for derived values
Document with examples — model_config = {"json_schema_extra": {"examples": [...]}}

### Structured Error Hierarchy

from fastapi import Request
from fastapi.responses import JSONResponse
from starlette.status import (
    HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED,
    HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND,
    HTTP_409_CONFLICT, HTTP_422_UNPROCESSABLE_ENTITY,
    HTTP_429_TOO_MANY_REQUESTS, HTTP_500_INTERNAL_SERVER_ERROR,
)

class AppError(Exception):
    """Base application error."""
    def __init__(
        self,
        message: str,
        code: str,
        status_code: int = HTTP_500_INTERNAL_SERVER_ERROR,
        details: dict | None = None,
    ):
        self.message = message
        self.code = code
        self.status_code = status_code
        self.details = details or {}
        super().__init__(message)

class NotFoundError(AppError):
    def __init__(self, resource: str, identifier: str | int):
        super().__init__(
            message=f"{resource} not found: {identifier}",
            code="NOT_FOUND",
            status_code=HTTP_404_NOT_FOUND,
            details={"resource": resource, "identifier": str(identifier)},
        )

class ConflictError(AppError):
    def __init__(self, message: str, field: str | None = None):
        super().__init__(
            message=message, code="CONFLICT",
            status_code=HTTP_409_CONFLICT,
            details={"field": field} if field else {},
        )

class AuthenticationError(AppError):
    def __init__(self, message: str = "Invalid credentials"):
        super().__init__(message=message, code="UNAUTHORIZED", status_code=HTTP_401_UNAUTHORIZED)

class AuthorizationError(AppError):
    def __init__(self, message: str = "Insufficient permissions"):
        super().__init__(message=message, code="FORBIDDEN", status_code=HTTP_403_FORBIDDEN)

class ValidationError(AppError):
    def __init__(self, message: str, errors: list[dict] | None = None):
        super().__init__(
            message=message, code="VALIDATION_ERROR",
            status_code=HTTP_422_UNPROCESSABLE_ENTITY,
            details={"errors": errors or []},
        )

class RateLimitError(AppError):
    def __init__(self, retry_after: int = 60):
        super().__init__(
            message="Rate limit exceeded", code="RATE_LIMITED",
            status_code=HTTP_429_TOO_MANY_REQUESTS,
            details={"retry_after": retry_after},
        )

# Global error handler
async def app_error_handler(request: Request, exc: AppError) -> JSONResponse:
    return JSONResponse(
        status_code=exc.status_code,
        content={
            "error": {
                "code": exc.code,
                "message": exc.message,
                "details": exc.details,
            }
        },
    )

# Register in app factory
# app.add_exception_handler(AppError, app_error_handler)

### 6 Error Handling Rules

Never return bare strings — always structured {"error": {"code", "message", "details"}}
Use domain-specific errors — NotFoundError("User", user_id) not HTTPException(404)
Global handler catches all — register AppError handler in app factory
Log server errors, don't expose — 5xx returns generic message, logs full traceback
Include actionable details — which field failed, what's allowed, retry-after for rate limits
Never leak internals — no stack traces, SQL queries, or file paths in responses

### JWT + Dependency Injection Pattern

from fastapi import Depends, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
from datetime import datetime, timedelta, timezone

security = HTTPBearer()

def create_access_token(user_id: str, roles: list[str], settings: Settings) -> str:
    expire = datetime.now(timezone.utc) + timedelta(minutes=settings.jwt_expire_minutes)
    payload = {
        "sub": user_id,
        "roles": roles,
        "exp": expire,
        "iat": datetime.now(timezone.utc),
    }
    return jwt.encode(payload, settings.jwt_secret.get_secret_value(), algorithm=settings.jwt_algorithm)

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Security(security),
    settings: Settings = Depends(get_settings),
    db: AsyncSession = Depends(get_db),
) -> User:
    try:
        payload = jwt.decode(
            credentials.credentials,
            settings.jwt_secret.get_secret_value(),
            algorithms=[settings.jwt_algorithm],
        )
        user_id = payload.get("sub")
        if not user_id:
            raise AuthenticationError("Invalid token payload")
    except JWTError:
        raise AuthenticationError("Invalid or expired token")
    
    user = await db.get(User, user_id)
    if not user:
        raise AuthenticationError("User not found")
    return user

# Role-based authorization
def require_role(*roles: str):
    async def checker(user: User = Depends(get_current_user)) -> User:
        if not any(r in user.roles for r in roles):
            raise AuthorizationError(f"Requires one of: {', '.join(roles)}")
        return user
    return checker

# Usage in router
@router.get("/admin/users")
async def list_users(
    admin: User = Depends(require_role("admin", "superadmin")),
    service: UserService = Depends(get_user_service),
):
    return await service.list_all()

### 10-Point Security Checklist

#CheckPriority1JWT secret ≥ 256 bits, from envP02Token expiry ≤ 30 min for access, ≤ 7 days refreshP03Password hashed with bcrypt/argon2P04CORS configured per environmentP05Rate limiting on auth endpointsP06HTTPS enforced (redirect HTTP)P07Security headers (HSTS, CSP, X-Frame)P18Input validation on ALL endpointsP19SQL injection prevented (parameterized queries)P010Dependency scanning (safety/pip-audit)P1

### Async SQLAlchemy + Repository Pattern

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import select, func
from uuid import uuid4, UUID
from datetime import datetime, timezone

# Engine setup
engine = create_async_engine(
    settings.database_url.get_secret_value(),
    pool_size=settings.db_pool_size,
    max_overflow=settings.db_max_overflow,
    pool_timeout=settings.db_pool_timeout,
    pool_pre_ping=True,  # Check connection health
    echo=settings.debug,
)

SessionFactory = async_sessionmaker(engine, expire_on_commit=False)

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with SessionFactory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

# Base model with common fields
class Base(DeclarativeBase):
    pass

class TimestampMixin:
    created_at: Mapped[datetime] = mapped_column(default=lambda: datetime.now(timezone.utc))
    updated_at: Mapped[datetime] = mapped_column(
        default=lambda: datetime.now(timezone.utc),
        onupdate=lambda: datetime.now(timezone.utc),
    )

# Repository pattern
class BaseRepository[T]:
    def __init__(self, session: AsyncSession, model: type[T]):
        self.session = session
        self.model = model
    
    async def get_by_id(self, id: UUID) -> T | None:
        return await self.session.get(self.model, id)
    
    async def get_or_raise(self, id: UUID) -> T:
        entity = await self.get_by_id(id)
        if not entity:
            raise NotFoundError(self.model.__name__, str(id))
        return entity
    
    async def list(
        self, *, offset: int = 0, limit: int = 20, **filters
    ) -> tuple[list[T], int]:
        query = select(self.model)
        count_query = select(func.count()).select_from(self.model)
        
        for field, value in filters.items():
            if value is not None:
                query = query.where(getattr(self.model, field) == value)
                count_query = count_query.where(getattr(self.model, field) == value)
        
        total = await self.session.scalar(count_query) or 0
        result = await self.session.execute(
            query.offset(offset).limit(limit).order_by(self.model.created_at.desc())
        )
        return list(result.scalars().all()), total
    
    async def create(self, entity: T) -> T:
        self.session.add(entity)
        await self.session.flush()
        return entity
    
    async def delete(self, entity: T) -> None:
        await self.session.delete(entity)

### ORM Selection Guide

ORMBest ForAsyncType SafetyLearning CurveSQLAlchemy 2.0Complex queries, enterprise✅✅ Mapped[]MediumSQLModelSimple CRUD, Pydantic sync✅✅LowTortoiseDjango-like feel✅PartialLowPiccoloModern, migrations built-in✅✅Low

Recommendation: SQLAlchemy 2.0 for production. SQLModel for prototypes.

### Migration Strategy (Alembic)

# Setup
alembic init migrations
# Edit alembic.ini: sqlalchemy.url = from env

# Generate migration
alembic revision --autogenerate -m "add users table"

# Apply
alembic upgrade head

# Rollback
alembic downgrade -1

Migration Rules:

Always review autogenerated migrations before applying
Never edit applied migrations — create new ones
Test migrations in staging before production
Include downgrade() for every upgrade()
Use batch_alter_table for SQLite compatibility

### Test Pyramid

LevelCoverage TargetToolsFocusUnit80%+pytest, unittest.mockService logic, validatorsIntegrationKey pathspytest-asyncio, testcontainersDB queries, external APIsE2ECritical flowshttpx.AsyncClientFull request→responseContractAPI boundariesschemathesisOpenAPI compliance

### Test Patterns

import pytest
from httpx import AsyncClient, ASGITransport
from app.main import create_app

@pytest.fixture
async def app():
    app = create_app()
    yield app

@pytest.fixture
async def client(app):
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac

@pytest.fixture
async def auth_client(client, test_user):
    token = create_access_token(test_user.id, test_user.roles)
    client.headers["Authorization"] = f"Bearer {token}"
    return client

# E2E test
@pytest.mark.asyncio
async def test_create_user(client: AsyncClient):
    response = await client.post("/api/users", json={
        "email": "test@example.com",
        "name": "Test User",
        "password": "securepass123",
    })
    assert response.status_code == 201
    data = response.json()
    assert data["email"] == "test@example.com"
    assert "password" not in data  # Never expose

# Unit test (service layer)
@pytest.mark.asyncio
async def test_user_service_duplicate_email(user_service, mock_repo):
    mock_repo.get_by_email.return_value = existing_user
    with pytest.raises(ConflictError, match="Email already registered"):
        await user_service.create(UserCreate(email="taken@example.com", ...))

# Parametrized validation
@pytest.mark.parametrize("email,expected", [
    ("valid@example.com", True),
    ("invalid", False),
    ("", False),
    ("a@b.c", True),
])
def test_email_validation(email, expected):
    if expected:
        UserCreate(email=email, name="Test", password="12345678")
    else:
        with pytest.raises(ValidationError):
            UserCreate(email=email, name="Test", password="12345678")

### 7 Testing Rules

Test services, not routers — business logic lives in services
Use fixtures for DI override — swap real DB with test DB via app.dependency_overrides
One assertion per test — clear what broke when it fails
Test error paths — 40% of tests should be sad-path
Use factories for test data — UserFactory.create() not manual dict construction
Async tests need @pytest.mark.asyncio — or set asyncio_mode = "auto" in config
Run tests in CI — block merge if tests fail

### Structlog Setup

import structlog
from uuid import uuid4
from starlette.middleware.base import BaseHTTPMiddleware

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.add_log_level,
        structlog.stdlib.add_logger_name,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer(),
    ],
    logger_factory=structlog.stdlib.LoggerFactory(),
)

logger = structlog.get_logger()

# Request ID middleware
class RequestIDMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        request_id = request.headers.get("X-Request-ID", str(uuid4()))
        structlog.contextvars.clear_contextvars()
        structlog.contextvars.bind_contextvars(
            request_id=request_id,
            method=request.method,
            path=request.url.path,
        )
        
        response = await call_next(request)
        response.headers["X-Request-ID"] = request_id
        
        logger.info(
            "request_completed",
            status_code=response.status_code,
        )
        return response

### Health Check Endpoints

@router.get("/health")
async def health():
    """Liveness probe — is the process running?"""
    return {"status": "ok"}

@router.get("/ready")
async def ready(db: AsyncSession = Depends(get_db)):
    """Readiness probe — can we serve traffic?"""
    checks = {}
    try:
        await db.execute(text("SELECT 1"))
        checks["database"] = "ok"
    except Exception:
        checks["database"] = "error"
    
    all_ok = all(v == "ok" for v in checks.values())
    return JSONResponse(
        status_code=200 if all_ok else 503,
        content={"status": "ok" if all_ok else "degraded", "checks": checks},
    )

### Priority Stack

#TechniqueImpactEffort1Async database queriesHighLow2Connection pooling (tuned)HighLow3Response caching (Redis)HighMedium4Background tasks for heavy workHighLow5Pagination on all list endpointsMediumLow6Select only needed columnsMediumLow7Eager loading (joinedload)MediumMedium8Rate limitingMediumLow

### Background Tasks

from fastapi import BackgroundTasks

@router.post("/users", status_code=201)
async def create_user(
    user_in: UserCreate,
    background_tasks: BackgroundTasks,
    service: UserService = Depends(get_user_service),
):
    user = await service.create(user_in)
    background_tasks.add_task(send_welcome_email, user.email, user.name)
    return user

### Caching Pattern

from redis.asyncio import Redis
import json

class CacheService:
    def __init__(self, redis: Redis):
        self.redis = redis
    
    async def get_or_set(self, key: str, factory, ttl: int = 300):
        cached = await self.redis.get(key)
        if cached:
            return json.loads(cached)
        result = await factory()
        await self.redis.setex(key, ttl, json.dumps(result, default=str))
        return result
    
    async def invalidate(self, pattern: str):
        keys = await self.redis.keys(pattern)
        if keys:
            await self.redis.delete(*keys)

### Multi-Stage Dockerfile

# Build stage
FROM python:3.12-slim AS builder
WORKDIR /app

RUN pip install --no-cache-dir uv
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev --no-editable

# Production stage
FROM python:3.12-slim
WORKDIR /app

RUN adduser --disabled-password --no-create-home appuser

COPY --from=builder /app/.venv /app/.venv
COPY src/ ./src/
COPY migrations/ ./migrations/
COPY alembic.ini ./

ENV PATH="/app/.venv/bin:$PATH"
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1

USER appuser
EXPOSE 8000

HEALTHCHECK --interval=30s --timeout=5s --retries=3 \\
    CMD ["python", "-c", "import httpx; httpx.get('http://localhost:8000/health').raise_for_status()"]

CMD ["uvicorn", "src.app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

### App Factory Pattern

from fastapi import FastAPI
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    logger.info("starting_up", environment=settings.environment)
    await init_db()
    yield
    # Shutdown
    logger.info("shutting_down")
    await engine.dispose()

def create_app() -> FastAPI:
    settings = get_settings()
    
    app = FastAPI(
        title=settings.app_name,
        lifespan=lifespan,
        docs_url="/docs" if settings.debug else None,
        redoc_url=None,
    )
    
    # Middleware (order matters — last added = first executed)
    app.add_middleware(
        CORSMiddleware,
        allow_origins=settings.cors_origins,
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )
    app.add_middleware(RequestIDMiddleware)
    
    # Error handlers
    app.add_exception_handler(AppError, app_error_handler)
    
    # Routers
    app.include_router(auth_router, prefix="/api/auth", tags=["auth"])
    app.include_router(users_router, prefix="/api/users", tags=["users"])
    app.include_router(health_router, tags=["health"])
    
    return app

app = create_app()

### GitHub Actions CI

name: CI
on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    services:
      postgres:
        image: postgres:16
        env:
          POSTGRES_PASSWORD: test
          POSTGRES_DB: testdb
        ports: ["5432:5432"]
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
    
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: { python-version: "3.12" }
      - run: pip install uv && uv sync
      - run: uv run ruff check .
      - run: uv run mypy src/
      - run: uv run pytest --cov=src --cov-report=xml -x
        env:
          DATABASE_URL: postgresql+asyncpg://postgres:test@localhost:5432/testdb
          JWT_SECRET: test-secret-key-at-least-32-chars

### Production Checklist

P0 — Mandatory:

All secrets from environment variables (SecretStr)
 HTTPS enforced
 CORS configured per environment
 Rate limiting on auth endpoints
 Input validation on all endpoints
 Structured error responses (no stack traces)
 Health + readiness endpoints
 Database connection pooling
 Migrations run before deploy
 Structured logging (JSON)
 Tests passing in CI

P1 — Recommended:

OpenTelemetry tracing
 Prometheus metrics endpoint
 Background task queue (Celery/ARQ)
 Redis caching layer
 API versioning strategy
 Request/response logging
 Dependency security scanning
 Performance benchmarks established

### Middleware Stack Order

# Applied bottom-to-top (last added = first executed)
app.add_middleware(GZipMiddleware, minimum_size=1000)    # 5. Compress
app.add_middleware(CORSMiddleware, ...)                  # 4. CORS
app.add_middleware(RequestIDMiddleware)                   # 3. Request ID
app.add_middleware(RateLimitMiddleware)                   # 2. Rate limit
app.add_middleware(TrustedHostMiddleware, allowed=["*"])  # 1. Host check

### Pagination with Cursor-Based Option

from fastapi import Query

class PaginationParams:
    def __init__(
        self,
        page: int = Query(1, ge=1, description="Page number"),
        page_size: int = Query(20, ge=1, le=100, description="Items per page"),
    ):
        self.offset = (page - 1) * page_size
        self.limit = page_size
        self.page = page
        self.page_size = page_size

@router.get("/users", response_model=PaginatedResponse[UserResponse])
async def list_users(
    pagination: PaginationParams = Depends(),
    service: UserService = Depends(get_user_service),
):
    items, total = await service.list(
        offset=pagination.offset, limit=pagination.limit
    )
    return PaginatedResponse(
        items=items, total=total,
        page=pagination.page, page_size=pagination.page_size,
        has_next=(pagination.offset + pagination.limit) < total,
    )

### WebSocket Pattern

from fastapi import WebSocket, WebSocketDisconnect

class ConnectionManager:
    def __init__(self):
        self.connections: dict[str, WebSocket] = {}
    
    async def connect(self, user_id: str, ws: WebSocket):
        await ws.accept()
        self.connections[user_id] = ws
    
    def disconnect(self, user_id: str):
        self.connections.pop(user_id, None)
    
    async def send(self, user_id: str, message: dict):
        if ws := self.connections.get(user_id):
            await ws.send_json(message)

manager = ConnectionManager()

@router.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    await manager.connect(user_id, websocket)
    try:
        while True:
            data = await websocket.receive_json()
            # Process message
    except WebSocketDisconnect:
        manager.disconnect(user_id)

### File Upload Pattern

from fastapi import UploadFile, File

@router.post("/upload")
async def upload_file(
    file: UploadFile = File(..., description="File to upload"),
    user: User = Depends(get_current_user),
):
    # Validate
    if file.size and file.size > 10 * 1024 * 1024:  # 10MB
        raise ValidationError("File too large (max 10MB)")
    
    allowed_types = {"image/jpeg", "image/png", "application/pdf"}
    if file.content_type not in allowed_types:
        raise ValidationError(f"File type not allowed: {file.content_type}")
    
    # Save
    contents = await file.read()
    path = f"uploads/{user.id}/{file.filename}"
    # Save to S3/local storage...
    
    return {"filename": file.filename, "size": len(contents)}

### Phase 12: Common Mistakes

#MistakeFix1Sync database calls in async appUse async SQLAlchemy/databases2Business logic in route handlersMove to service layer3No input validationPydantic models on every endpoint4Returning ORM models directlyUse response schemas (from_attributes)5Hardcoded config valuesPydantic Settings + env vars6No error handling strategyCustom exception hierarchy + global handler7Missing health checks/health + /ready endpoints8print() for loggingstructlog with JSON output9No pagination on list endpointsDefault limit, max cap (100)10Testing against production DBTest fixtures with separate DB

### Quality Scoring (0–100)

DimensionWeight0–255075100Type Safety15%No typesPartial PydanticFull schemasStrict mypy passError Handling15%Bare HTTPExceptionCustom errorsFull hierarchy+ monitoringTesting15%NoneHappy path80%+ coverage+ contract testsSecurity15%No authBasic JWT+ RBAC + rate limit+ scanning + auditPerformance10%Sync everythingAsync DB+ caching+ profilingObservability10%print()Structured logs+ tracing+ metrics + alertsDatabase10%Raw SQLORM + migrations+ repository pattern+ connection tuningDeployment10%ManualDockerfile+ CI/CD+ health + rollback

Scoring: Your Score = Σ (dimension score × weight). < 40 = critical, 40–60 = needs work, 60–80 = solid, 80+ = production-grade.

### 10 Commandments of FastAPI Production

Pydantic models at every boundary — request, response, config
Async all the way down — one sync call blocks the event loop
Services own business logic — routers are thin wrappers
Dependency injection for testability — Depends() is your best friend
Structured errors, structured logs — JSON everything
Health checks are non-negotiable — liveness + readiness
Test the sad paths — 40% of tests should be error cases
Migrations before deployment — never modify schema manually
Secrets in environment, never in code — SecretStr enforces this
Profile before optimizing — measure, don't guess

### Natural Language Commands

audit my FastAPI project → Run health check, identify gaps
set up a new FastAPI project → Generate project structure + config
add authentication to my API → JWT + RBAC dependency pattern
create a CRUD feature for [resource] → Full router/service/repo/schemas
optimize my database queries → Connection pooling + async + N+1 prevention
add structured logging → Structlog + request ID middleware
write tests for [feature] → Async test patterns + fixtures
prepare for production deployment → Dockerfile + CI + checklist
add caching to my API → Redis caching pattern
set up error handling → Custom exception hierarchy + global handler
add WebSocket support → Connection manager pattern
review my API security → 10-point security checklist audit

⚡ Level up your FastAPI APIs → Get the AfrexAI SaaS Context Pack ($47) for complete SaaS architecture, pricing strategies, and go-to-market playbooks.

🔗 More free skills by AfrexAI:

afrexai-python-production — Python production engineering
afrexai-api-architecture — API design methodology
afrexai-database-engineering — Database patterns
afrexai-test-automation-engineering — Testing strategy
afrexai-cicd-engineering — CI/CD pipeline design

🛒 Browse all packs → AfrexAI Storefront
## Trust
- Source: tencent
- Verification: Indexed source record
- Publisher: 1kalin
- Version: 1.0.0
## Source health
- Status: healthy
- Item download looks usable.
- Yavira can redirect you to the upstream package for this item.
- Health scope: item
- Reason: direct_download_ok
- Checked at: 2026-04-29T21:29:55.086Z
- Expires at: 2026-05-06T21:29:55.086Z
- Recommended action: Download for OpenClaw
## Links
- [Detail page](https://openagent3.xyz/skills/afrexai-fastapi-production)
- [Send to Agent page](https://openagent3.xyz/skills/afrexai-fastapi-production/agent)
- [JSON manifest](https://openagent3.xyz/skills/afrexai-fastapi-production/agent.json)
- [Markdown brief](https://openagent3.xyz/skills/afrexai-fastapi-production/agent.md)
- [Download page](https://openagent3.xyz/downloads/afrexai-fastapi-production)