Skip to main content
This guide explains how to add new API endpoints to SnackBase, following the established patterns and architecture.

Overview

SnackBase uses FastAPI for REST API endpoints. All endpoints are organized in the src/snackbase/infrastructure/api/routes/ directory.

Existing API Routers

RouterPurposePath
auth_router.pyAuthentication/api/v1/auth/*
accounts_router.pyAccount management/api/v1/accounts/*
users_router.pyUser management/api/v1/users/*
roles_router.pyRole management/api/v1/roles/*
permissions_router.pyPermission management/api/v1/permissions/*
collections_router.pyCollection CRUD/api/v1/collections/*
records_router.pyDynamic record CRUD/api/v1/{collection}/*
groups_router.pyGroup management/api/v1/groups/*
invitations_router.pyUser invitations/api/v1/invitations/*
macros_router.pySQL macros/api/v1/macros/*
dashboard_router.pyDashboard stats/api/v1/dashboard/*
audit_log_router.pyAudit logs/api/v1/audit-logs/*
migrations_router.pyDB migrations/api/v1/migrations/*

Architecture Review

Layer Structure

Request -> API Router -> Service -> Repository -> Database
LayerResponsibilityLocation
API RouterHTTP handling, validation, responsesinfrastructure/api/routes/
ServiceBusiness logic, orchestrationdomain/services/ or infrastructure/services/
RepositoryData access, database queriesinfrastructure/persistence/repositories/
DatabaseData storageSQLAlchemy models

Clean Architecture Principles

  • Routers handle HTTP concerns (status codes, headers, parsing)
  • Services contain business logic
  • Repositories abstract database access
  • Models (Pydantic) define request/response schemas

Where to Add Endpoints

Decision Tree

Does endpoint handle dynamic collections?
|
+-- Yes --> Modify records_router.py
|
+-- No --> Does it fit existing router?
           |
           +-- Yes --> Add to existing router
           |
           +-- No --> Create new router

When to Create a New Router

Create a new router when:
  • Adding a new major feature area
  • Existing routers don’t match the domain concept
  • The feature has 3+ related endpoints
Examples of when to create new routers:
  • /api/v1/webhooks/* - Webhook management
  • /api/v1/scheduled-tasks/* - Task scheduling
  • /api/v1/integrations/* - Third-party integrations

Step-by-Step Guide

Let’s add a new feature: Tags for organizing records.

Step 1: Define Pydantic Schemas

Create request/response models in infrastructure/api/schemas/:
# src/snackbase/infrastructure/api/schemas/tags.py
from pydantic import BaseModel, ConfigDict
from datetime import datetime

class TagBase(BaseModel):
    name: str
    color: str | None = None

class TagCreate(TagBase):
    pass

class TagUpdate(TagBase):
    name: str | None = None
    color: str | None = None

class TagResponse(TagBase):
    model_config = ConfigDict(from_attributes=True)

    id: str
    account_id: str
    created_at: datetime
    updated_at: datetime
    created_by: str | None = None

Step 2: Create Database Model

Add SQLAlchemy model in infrastructure/persistence/models/:
# src/snackbase/infrastructure/persistence/models/tag.py
from sqlalchemy import Column, String, DateTime
from sqlalchemy.orm import Mapped, mapped_column

from src.snackbase.infrastructure.persistence.database import Base

class Tag(Base):
    __tablename__ = "tags"

    id: Mapped[str] = mapped_column(String(50), primary_key=True)
    account_id: Mapped[str] = mapped_column(String(10), nullable=False, index=True)
    name: Mapped[str] = mapped_column(String(255), nullable=False)
    color: Mapped[str | None] = mapped_column(String(7), nullable=True)  # Hex color
    created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
    updated_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
    created_by: Mapped[str | None] = mapped_column(String(50), nullable=True)

Step 3: Create Repository

Add repository in infrastructure/persistence/repositories/:
# src/snackbase/infrastructure/persistence/repositories/tag_repository.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

from src.snackbase.infrastructure.persistence.models.tag import Tag
from src.snackbase.core.context import Context

class TagRepository:
    def __init__(self, session: AsyncSession):
        self._session = session

    async def create(self, tag: Tag) -> Tag:
        self._session.add(tag)
        await self._session.flush()
        return tag

    async def get_by_id(self, tag_id: str, context: Context) -> Tag | None:
        result = await self._session.execute(
            select(Tag).where(Tag.id == tag_id, Tag.account_id == context.account_id)
        )
        return result.scalar_one_or_none()

    async def find_all(self, context: Context) -> list[Tag]:
        result = await self._session.execute(
            select(Tag).where(Tag.account_id == context.account_id)
        )
        return list(result.scalars().all())

    async def update(self, tag: Tag) -> Tag:
        await self._session.flush()
        return tag

    async def delete(self, tag: Tag) -> None:
        await self._session.delete(tag)
        await self._session.flush()

Step 4: Create API Router

Create router in infrastructure/api/routes/:
# src/snackbase/infrastructure/api/routes/tags_router.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession

from src.snackbase.infrastructure.api.dependencies import (
    get_db,
    get_context,
    require_permission,
)
from src.snackbase.infrastructure.api.schemas.tags import (
    TagCreate,
    TagUpdate,
    TagResponse,
)
from src.snackbase.core.context import Context
from src.snackbase.infrastructure.persistence.repositories.tag_repository import TagRepository
from src.snackbase.infrastructure.persistence.models.tag import Tag
from src.snackbase.core.id_generator import generate_id

router = APIRouter(prefix="/tags", tags=["tags"])

@router.post("/", response_model=TagResponse, status_code=status.HTTP_201_CREATED)
async def create_tag(
    tag_data: TagCreate,
    context: Context = Depends(get_context),
    db: AsyncSession = Depends(get_db),
):
    """Create a new tag."""
    repo = TagRepository(db)

    tag = Tag(
        id=generate_id("tag"),
        account_id=context.account_id,
        name=tag_data.name,
        color=tag_data.color,
        created_at=datetime.nowUTC(),
        updated_at=datetime.nowUTC(),
        created_by=context.user_id,
    )

    created = await repo.create(tag)
    return TagResponse.model_validate(created)

@router.get("/", response_model=list[TagResponse])
async def list_tags(
    context: Context = Depends(get_context),
    db: AsyncSession = Depends(get_db),
):
    """List all tags for the current account."""
    repo = TagRepository(db)
    tags = await repo.find_all(context)
    return [TagResponse.model_validate(tag) for tag in tags]

@router.get("/{tag_id}", response_model=TagResponse)
async def get_tag(
    tag_id: str,
    context: Context = Depends(get_context),
    db: AsyncSession = Depends(get_db),
):
    """Get a specific tag by ID."""
    repo = TagRepository(db)
    tag = await repo.get_by_id(tag_id, context)

    if not tag:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Tag not found"
        )

    return TagResponse.model_validate(tag)

@router.put("/{tag_id}", response_model=TagResponse)
async def update_tag(
    tag_id: str,
    tag_data: TagUpdate,
    context: Context = Depends(get_context),
    db: AsyncSession = Depends(get_db),
):
    """Update a tag."""
    repo = TagRepository(db)
    tag = await repo.get_by_id(tag_id, context)

    if not tag:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Tag not found"
        )

    # Update fields
    if tag_data.name is not None:
        tag.name = tag_data.name
    if tag_data.color is not None:
        tag.color = tag_data.color

    tag.updated_at = datetime.nowUTC()

    updated = await repo.update(tag)
    return TagResponse.model_validate(updated)

@router.delete("/{tag_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_tag(
    tag_id: str,
    context: Context = Depends(get_context),
    db: AsyncSession = Depends(get_db),
):
    """Delete a tag."""
    repo = TagRepository(db)
    tag = await repo.get_by_id(tag_id, context)

    if not tag:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Tag not found"
        )

    await repo.delete(tag)

Step 5: Register Router

Add the router to app.py:
# src/snackbase/infrastructure/api/app.py
from src.snackbase.infrastructure.api.routes.tags_router import router as tags_router

# ... existing imports

app = FastAPI(title="SnackBase")

# ... existing middleware

# Register routers
app.include_router(auth_router, prefix=API_PREFIX, tags=["auth"])
app.include_router(accounts_router, prefix=API_PREFIX, tags=["accounts"])
# ... existing routers

# NEW: Add tags router BEFORE records_router
app.include_router(tags_router, prefix=API_PREFIX, tags=["tags"])

# IMPORTANT: records_router must be LAST (catches /{collection})
app.include_router(records_router, prefix=API_PREFIX, tags=["records"])
The records_router must be registered LAST because it uses dynamic route matching (/) that will catch any unmatched paths.

Step 6: Create Migration

Generate and apply database migration:
# Generate migration
uv run alembic revision --autogenerate -m "Add tags table"

# Apply migration
uv run alembic upgrade head

Step 7: Add Permissions (Optional)

If your feature needs authorization, add permissions:
# In roles management UI or via API
{
  "role": "admin",
  "collection": "tags",
  "create": true,
  "read": true,
  "update": true,
  "delete": true
}

Request/Response Patterns

Request Body Validation

Use Pydantic for automatic validation:
from pydantic import BaseModel, Field, field_validator

class TagCreate(BaseModel):
    name: str = Field(..., min_length=1, max_length=255)
    color: str | None = Field(None, pattern=r'^#[0-9A-Fa-f]{6}$')

    @field_validator('name')
    @classmethod
    def name_must_not_be_empty(cls, v: str) -> str:
        if not v.strip():
            raise ValueError('name cannot be empty or whitespace')
        return v.strip()

Response Formatting

Use consistent response formats:
# Success response
{
  "id": "tag_abc123",
  "name": "Important",
  "color": "#ff0000",
  "account_id": "AB1001",
  "created_at": "2025-01-01T00:00:00Z",
  "updated_at": "2025-01-01T00:00:00Z"
}

# Error response
{
  "detail": "Tag not found"
}

Pagination

For list endpoints, support pagination:
from fastapi import Query
from typing import Optional

@router.get("/", response_model=list[TagResponse])
async def list_tags(
    skip: int = Query(0, ge=0),
    limit: int = Query(50, ge=1, le=100),
    context: Context = Depends(get_context),
    db: AsyncSession = Depends(get_db),
):
    repo = TagRepository(db)
    tags = await repo.find_all(context, skip=skip, limit=limit)
    return [TagResponse.model_validate(tag) for tag in tags]

Authentication & Authorization

Require Authentication

All endpoints automatically require authentication via get_context():
from src.snackbase.infrastructure.api.dependencies import get_context
from src.snackbase.core.context import Context

@router.get("/tags")
async def list_tags(
    context: Context = Depends(get_context),  # Ensures valid token
    db: AsyncSession = Depends(get_db),
):
    # context.user_id, context.account_id available
    pass

Require Permissions

Use require_permission() for authorization:
from src.snackbase.infrastructure.api.dependencies import require_permission

@router.delete("/tags/{tag_id}")
async def delete_tag(
    tag_id: str,
    authorized: bool = Depends(require_permission("tags", "delete")),
    context: Context = Depends(get_context),
    db: AsyncSession = Depends(get_db),
):
    # Only executes if user has "tags:delete" permission
    pass

Superadmin-Only Endpoints

For superadmin-only endpoints:
from src.snackbase.infrastructure.api.dependencies import require_superadmin

@router.post("/accounts")
async def create_account(
    account_data: AccountCreate,
    is_superadmin: bool = Depends(require_superadmin),
    db: AsyncSession = Depends(get_db),
):
    # Only superadmins can access
    pass

Testing Endpoints

Unit Tests

Test router logic:
# tests/unit/test_tags_router.py
import pytest
from fastapi.testclient import TestClient

from src.snackbase.infrastructure.api.app import app

client = TestClient(app)

def test_create_tag_requires_auth():
    response = client.post("/api/v1/tags/", json={"name": "Test"})
    assert response.status_code == 401

def test_create_tag_success(superadmin_token):
    response = client.post(
        "/api/v1/tags/",
        headers={"Authorization": f"Bearer {superadmin_token}"},
        json={"name": "Test", "color": "#ff0000"}
    )
    assert response.status_code == 201
    assert response.json()["name"] == "Test"

Integration Tests

Test full flow with database:
# tests/integration/test_tags_integration.py
import pytest
from httpx import AsyncClient, ASGITransport

from src.snackbase.infrastructure.api.app import app

@pytest.mark.asyncio
async def test_create_and_retrieve_tag(db_session):
    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test"
    ) as client:
        # Create tag
        create_response = await client.post(
            "/api/v1/tags/",
            json={"name": "Test Tag"}
        )
        assert create_response.status_code == 201

        tag_id = create_response.json()["id"]

        # Retrieve tag
        get_response = await client.get(f"/api/v1/tags/{tag_id}")
        assert get_response.status_code == 200
        assert get_response.json()["name"] == "Test Tag"

Manual Testing with Swagger UI

Visit http://localhost:8000/docs to test endpoints interactively.

Best Practices

1. Use Appropriate Status Codes

CodeUsageExample
200Success (GET, PUT, PATCH)Tag retrieved successfully
201Created (POST)Tag created successfully
204No Content (DELETE)Tag deleted successfully
400Bad RequestValidation error
401UnauthorizedMissing or invalid token
403ForbiddenInsufficient permissions
404Not FoundTag doesn’t exist
422Unprocessable EntityInvalid request data

2. Account Isolation

Always filter by account_id in repositories:
# ❌ BAD: No account filtering
async def get_by_id(self, tag_id: str) -> Tag | None:
    result = await self._session.execute(
        select(Tag).where(Tag.id == tag_id)
    )

# ✅ GOOD: Account filtering
async def get_by_id(self, tag_id: str, context: Context) -> Tag | None:
    result = await self._session.execute(
        select(Tag).where(Tag.id == tag_id, Tag.account_id == context.account_id)
    )

3. Use Dependency Injection

Inject dependencies via FastAPI’s Depends():
# ❌ BAD: Manual dependency handling
@router.get("/tags/{tag_id}")
async def get_tag(tag_id: str, token: str):
    context = decode_token(token)  # Manual
    db = get_db()  # Manual
    ...

# ✅ GOOD: Dependency injection
@router.get("/tags/{tag_id}")
async def get_tag(
    tag_id: str,
    context: Context = Depends(get_context),  # Automatic
    db: AsyncSession = Depends(get_db),  # Automatic
):
    ...

4. Handle Errors Gracefully

@router.get("/tags/{tag_id}")
async def get_tag(
    tag_id: str,
    context: Context = Depends(get_context),
    db: AsyncSession = Depends(get_db),
):
    repo = TagRepository(db)
    tag = await repo.get_by_id(tag_id, context)

    if not tag:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Tag {tag_id} not found"
        )

    return TagResponse.model_validate(tag)

5. Document Endpoints

Use FastAPI’s docstring support:
@router.post(
    "/",
    response_model=TagResponse,
    status_code=status.HTTP_201_CREATED,
    summary="Create a new tag",
    description="Creates a new tag for the current account. Tags can be used to organize and categorize records.",
    responses={
        201: {"description": "Tag created successfully"},
        400: {"description": "Invalid request data"},
        401: {"description": "Unauthorized"},
    }
)
async def create_tag(
    tag_data: TagCreate,
    context: Context = Depends(get_context),
    db: AsyncSession = Depends(get_db),
):
    """Create a new tag."""
    ...

Summary

StepActionLocation
1Define Pydantic schemasinfrastructure/api/schemas/
2Create SQLAlchemy modelinfrastructure/persistence/models/
3Create repositoryinfrastructure/persistence/repositories/
4Create API routerinfrastructure/api/routes/
5Register routerinfrastructure/api/app.py
6Create migrationAlembic
7Add permissionsVia UI or API
8Write teststests/unit/, tests/integration/