Controller Pattern¶
Controllers provide a unified pattern for handling requests from any source: HTTP, Celery, CLI, etc.
The Core Abstraction¶
All controllers inherit from the base Controller class:
# src/infrastructure/delivery/controllers.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any
@dataclass(kw_only=True)
class Controller(ABC):
def __post_init__(self) -> None:
self._wrap_methods()
@abstractmethod
def register(self, registry: Any) -> None:
"""Register this controller with the appropriate registry."""
...
def handle_exception(self, exception: Exception) -> Any:
"""Handle exceptions raised by controller methods."""
raise exception
Key Features¶
1. The register() Method¶
Every controller implements register() to connect to its delivery mechanism:
# HTTP Controller
def register(self, registry: APIRouter) -> None:
registry.add_api_route("/v1/users", self.list_users, methods=["GET"])
# Celery Task Controller
def register(self, registry: Celery) -> None:
registry.task(name=TaskName.PING)(self.ping)
2. Automatic Exception Handling¶
The __post_init__ method wraps all public methods with exception handling:
def __post_init__(self) -> None:
self._wrap_methods()
def _wrap_methods(self) -> None:
for attr_name in dir(self):
attr = getattr(self, attr_name)
if (
callable(attr)
and not hasattr(Controller, attr_name)
and not attr_name.startswith("_")
and attr_name not in dir(Controller)
):
setattr(self, attr_name, self._wrap_route(attr))
def _wrap_route(self, method: Callable[..., Any]) -> Callable[..., Any]:
return self._add_exception_handler(method)
This means every public method automatically goes through handle_exception() if it raises.
3. Custom Exception Handling¶
Override handle_exception() to map domain exceptions to responses:
def handle_exception(self, exception: Exception) -> Any:
if isinstance(exception, TodoNotFoundError):
raise HTTPException(status_code=404, detail=str(exception))
if isinstance(exception, TodoAccessDeniedError):
raise HTTPException(status_code=403, detail=str(exception))
return super().handle_exception(exception)
TransactionController¶
For database operations, use TransactionController:
# src/infrastructure/delivery/controllers.py
from infrastructure.frameworks.logfire.transaction import traced_atomic
@dataclass(kw_only=True)
class TransactionController(Controller, ABC):
def _wrap_route(self, method: Callable[..., Any]) -> Callable[..., Any]:
method = self._add_transaction(method)
return super()._wrap_route(method)
def _add_transaction(self, method: Callable[..., Any]) -> Callable[..., Any]:
@wraps(method)
def wrapper(*args: Any, **kwargs: Any) -> Any:
with traced_atomic(
"controller transaction",
controller=type(self).__name__,
method=method.__name__,
):
return method(*args, **kwargs)
return wrapper
This wraps methods with:
traced_atomic- Combined database transaction and Logfire tracing- Controller and method names as span attributes
HTTP Controller Example¶
# src/delivery/http/controllers/user/controllers.py
from dataclasses import dataclass
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from core.user.services.user import UserService
from delivery.http.auth.jwt import AuthenticatedRequest, JWTAuthFactory
from infrastructure.delivery.controllers import TransactionController
@dataclass(kw_only=True)
class UserController(TransactionController):
"""HTTP controller for user operations."""
_jwt_auth_factory: JWTAuthFactory
_user_service: UserService
def __post_init__(self) -> None:
self._jwt_auth = self._jwt_auth_factory()
self._staff_jwt_auth = self._jwt_auth_factory(require_staff=True)
super().__post_init__()
def register(self, registry: APIRouter) -> None:
registry.add_api_route(
path="/v1/users/me",
endpoint=self.get_current_user,
methods=["GET"],
response_model=UserSchema,
dependencies=[Depends(self._jwt_auth)],
)
def get_current_user(self, request: AuthenticatedRequest) -> UserSchema:
return UserSchema.model_validate(request.state.user, from_attributes=True)
def handle_exception(self, exception: Exception) -> Any:
if isinstance(exception, UserNotFoundError):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exception),
) from exception
return super().handle_exception(exception)
Key Patterns¶
- Dataclass with
kw_only=True: Explicit named parameters - Dependencies as fields:
_user_service,_jwt_auth_factory - Computed values in
__post_init__: Create auth dependencies at initialization __post_init__: Initialize auth dependencies, then callsuper().__post_init__()
Celery Task Controller Example¶
# src/delivery/tasks/tasks/ping.py
from typing import Literal, TypedDict
from celery import Celery
from delivery.tasks.registry import TaskName
from infrastructure.delivery.controllers import Controller
class PingResult(TypedDict):
result: Literal["pong"]
class PingTaskController(Controller):
"""Simple task controller with no dependencies."""
def register(self, registry: Celery) -> None:
registry.task(name=TaskName.PING)(self.ping)
def ping(self) -> PingResult:
return PingResult(result="pong")
Dataclass decorator
Controllers without dependencies don't need the @dataclass decorator. The base Controller class already uses @dataclass(kw_only=True), so subclasses inherit that behavior. Only add @dataclass(kw_only=True) when you have dependency fields to inject.
Sync vs Async Handlers¶
Prefer Sync Handlers¶
FastAPI runs sync handlers in a thread pool automatically:
# ✅ Recommended - sync handler
def get_user(self, request: AuthenticatedRequest, user_id: int) -> UserSchema:
user = self._user_service.get_user_by_id(user_id)
return UserSchema.model_validate(user, from_attributes=True)
Async When Needed¶
For truly async operations (external APIs, etc.):
from asgiref.sync import sync_to_async
async def get_user_async(self, request: AuthenticatedRequest, user_id: int) -> UserSchema:
user = await sync_to_async(
self._user_service.get_user_by_id,
thread_sensitive=False, # Read-only = parallel OK
)(user_id)
return UserSchema.model_validate(user, from_attributes=True)
Thread sensitivity:
thread_sensitive |
Use Case |
|---|---|
False |
Read-only operations (SELECT) |
True |
Write operations (INSERT/UPDATE/DELETE) |
Controller Registration¶
Controllers are injected as fields into the factory and registered with tagged routers:
# src/delivery/http/factories.py
@dataclass(kw_only=True)
class FastAPIFactory:
# Controllers are injected as fields (auto-resolved by IoC)
_health_controller: HealthController
_user_token_controller: UserTokenController
_user_controller: UserController
def _register_controllers(self, app: FastAPI) -> None:
# Create routers with tags for OpenAPI grouping
health_router = APIRouter(tags=["health"])
self._health_controller.register(health_router)
app.include_router(health_router)
user_token_router = APIRouter(tags=["user", "token"])
self._user_token_controller.register(user_token_router)
app.include_router(user_token_router)
user_router = APIRouter(tags=["user"])
self._user_controller.register(user_router)
app.include_router(user_router)
Controller injection
Controllers are declared as dataclass fields and auto-resolved by the IoC container when FastAPIFactory is resolved. This ensures all controller dependencies are properly injected.
Benefits¶
1. Consistent Pattern¶
Same structure for HTTP and Celery:
2. Automatic Tracing¶
TransactionController adds Logfire spans automatically.
3. Exception Isolation¶
Exceptions are caught and handled uniformly.
4. Easy Testing¶
Mock dependencies, test business logic:
def test_get_user():
mock_service = MagicMock()
controller = UserController(_user_service=mock_service, ...)
# Test controller methods directly
Summary¶
The controller pattern:
- Unifies request handling across HTTP and Celery
- Enforces consistent structure via
register() - Wraps methods with exception handling
- Provides
TransactionControllerfor database operations - Enables easy testing through dependency injection