Skip to main content
This guide explains how to create custom hooks in SnackBase to extend functionality and automate workflows.

Overview

Hooks allow you to execute custom code in response to events within SnackBase. They’re the primary extension mechanism for adding custom business logic.

What Can Hooks Do?

CapabilityExample
Send NotificationsEmail when record is created
Transform DataAuto-generate slugs from titles
Integrate External ServicesCall webhook on record update
Enforce Business RulesPrevent deletion of published posts
Audit ChangesLog all modifications to sensitive fields
Sync DataReplicate changes to external systems

Hook System Stability

The hook registration mechanism is a STABLE API contract (v1.0). This means:
  • Hook registration syntax won’t change in breaking ways
  • Existing hooks will continue to work in future versions
  • New hook types will be additive, not breaking changes
The hook system API is stable and guaranteed to maintain backward compatibility.

Hook System Review

Hook Registration Pattern

Hooks are registered using the app.hook decorator:
from src.snackbase.infrastructure.api.app import app

@app.hook.on_record_after_create("posts", priority=10)
async def send_post_notification(record: dict, context: Context):
    """Send notification when post is created."""
    await notification_service.send(
        user_id=record["created_by"],
        message="Your post has been published!"
    )

Built-in Hooks

SnackBase includes built-in hooks that cannot be disabled:
HookPurposeEvent
timestamp_hookAuto-set created_at/updated_atAll record operations
account_isolation_hookEnforce account_id filteringAll record queries
created_by_hookSet created_by user IDRecord creation

Hook Categories

Hooks are organized into 8 categories:

1. App Lifecycle Hooks

EventDescriptionWhen
on_app_startupApp starts upServer initialization
on_app_shutdownApp shuts downServer shutdown

2. Model Operation Hooks

EventDescriptionWhen
on_model_before_createBefore model insertBefore database insert
on_model_after_createAfter model insertAfter database insert
on_model_before_updateBefore model updateBefore database update
on_model_after_updateAfter model updateAfter database update
on_model_before_deleteBefore model deleteBefore database delete
on_model_after_deleteAfter model deleteAfter database delete

3. Record Operation Hooks

EventDescriptionWhen
on_record_before_createBefore record creationBefore inserting dynamic record
on_record_after_createAfter record creationAfter inserting dynamic record
on_record_before_updateBefore record updateBefore updating dynamic record
on_record_after_updateAfter record updateAfter updating dynamic record
on_record_before_deleteBefore record deletionBefore deleting dynamic record
on_record_after_deleteAfter record deletionAfter deleting dynamic record
on_record_before_queryBefore querying recordsBefore executing query
on_record_after_queryAfter querying recordsAfter executing query

4. Collection Operation Hooks

EventDescriptionWhen
on_collection_before_createBefore collection creationBefore creating collection
on_collection_after_createAfter collection creationAfter creating collection
on_collection_before_updateBefore collection updateBefore updating collection
on_collection_after_updateAfter collection updateAfter updating collection
on_collection_before_deleteBefore collection deletionBefore deleting collection
on_collection_after_deleteAfter collection deletionAfter deleting collection

5. Auth Operation Hooks

EventDescriptionWhen
on_auth_before_loginBefore loginBefore validating credentials
on_auth_after_loginAfter loginAfter successful login
on_auth_before_logoutBefore logoutBefore logout processing
on_auth_after_logoutAfter logoutAfter logout processing
on_auth_before_registerBefore registrationBefore creating user
on_auth_after_registerAfter registrationAfter creating user

6. User Operation Hooks

EventDescriptionWhen
on_user_before_createBefore user creationBefore creating user
on_user_after_createAfter user creationAfter creating user
on_user_before_updateBefore user updateBefore updating user
on_user_after_updateAfter user updateAfter updating user
on_user_before_deleteBefore user deletionBefore deleting user
on_user_after_deleteAfter user deletionAfter deleting user

7. Request Processing Hooks

EventDescriptionWhen
on_request_beforeBefore request processingAt start of request
on_request_afterAfter request processingAt end of request
on_request_errorOn request errorWhen exception occurs

8. Custom Event Hooks

EventDescriptionWhen
on_custom_eventCustom eventTriggered manually

Step-by-Step Guide

Let’s create a custom hook that sends a Slack notification when a post is published.

Step 1: Create Hook File

Create hooks in a dedicated file:
# src/snackbase/infrastructure/hooks/custom_hooks.py
import httpx
from sqlalchemy.ext.asyncio import AsyncSession

from src.snackbase.core.context import Context
from src.snackbase.infrastructure.api.app import app
from src.snackbase.core.config import settings

SLACK_WEBHOOK_URL = settings.slack_webhook_url

@app.hook.on_record_after_update("posts", priority=20)
async def notify_slack_on_publish(
    record: dict,
    context: Context,
    old_record: dict | None = None,
    db: AsyncSession = None
):
    """
    Send Slack notification when post status changes to 'published'.

    Trigger: After updating a post record
    Condition: status changed to 'published'
    """
    # Check if status changed to 'published'
    if old_record and record.get("status") == "published":
        if old_record.get("status") != "published":
            await _send_slack_notification(record)

async def _send_slack_notification(post: dict):
    """Send notification to Slack."""
    message = {
        "text": f"New post published: {post.get('title')}",
        "blocks": [
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"*New Post Published*\n*Title:* {post.get('title')}\n*Author:* <https://example.com|View Post>"
                }
            }
        ]
    }

    async with httpx.AsyncClient() as client:
        await client.post(SLACK_WEBHOOK_URL, json=message)

Step 2: Import Hooks

Import your hooks file in app.py to register them:
# src/snackbase/infrastructure/api/app.py
# ... existing imports

# Import custom hooks (this registers them)
from src.snackbase.infrastructure.hooks.custom_hooks import (
    notify_slack_on_publish,
)

# ... rest of app.py

Step 3: Configure Environment

Add required configuration to .env:
# Slack webhook URL
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/WEBHOOK/URL

Step 4: Test the Hook

# Update a post to published
curl -X PUT http://localhost:8000/api/v1/posts/post_abc123 \
  -H "Authorization: Bearer <token>" \
  -H "Content-Type: application/json" \
  -d '{
    "status": "published"
  }'

# Check Slack for notification

Hook Context

Hook Parameters

Hooks receive different parameters based on their type:
@app.hook.on_record_after_create("posts")
async def my_hook(
    record: dict,           # The record being created/updated/deleted
    context: Context,       # Request context (user_id, account_id)
    old_record: dict | None = None,  # Previous state (for updates)
    db: AsyncSession = None,  # Database session (optional)
):
    pass

Context Object

The context object provides request information:
FieldTypeDescription
user_idstrCurrent user ID
account_idstrCurrent account ID
request_idstrCorrelation ID for tracing
is_superadminboolWhether user is superadmin

Advanced Features

Priority Control

Hooks execute in priority order (higher priority = earlier execution):
@app.hook.on_record_after_create("posts", priority=100)
async def high_priority_hook(record: dict, context: Context):
    """Executes first."""
    pass

@app.hook.on_record_after_create("posts", priority=10)
async def low_priority_hook(record: dict, context: Context):
    """Executes after high_priority_hook."""
    pass

@app.hook.on_record_after_create("posts", priority=0)
async def default_priority_hook(record: dict, context: Context):
    """Executes last."""
    pass

Conditional Execution

Only execute hooks based on conditions:
@app.hook.on_record_after_update("posts")
async def conditional_hook(
    record: dict,
    context: Context,
    old_record: dict | None = None
):
    """Only execute for specific conditions."""

    # Only if specific field changed
    if old_record and record.get("status") != old_record.get("status"):
        # Status changed
        pass

    # Only for specific account
    if context.account_id == "AB1001":
        # Special handling for this account
        pass

    # Only if specific field value
    if record.get("category") == "urgent":
        # Handle urgent posts
        pass

Error Handling

Handle errors gracefully in hooks:
import logging

logger = logging.getLogger(__name__)

@app.hook.on_record_after_create("posts")
async def safe_hook(record: dict, context: Context):
    """Hook with error handling."""
    try:
        await external_api_call(record)
    except httpx.HTTPError as e:
        # Log error but don't fail the request
        logger.error(f"Hook failed: {e}", exc_info=True)
        # Optionally: send alert
    except Exception as e:
        # Unexpected error
        logger.critical(f"Unexpected hook error: {e}", exc_info=True)
        raise  # Re-raise if critical

Aborting Operations

Some before_* hooks can abort operations:
from src.snackbase.core.exceptions import HookAbortException

@app.hook.on_record_before_delete("posts")
async def prevent_published_deletion(
    record: dict,
    context: Context
):
    """Prevent deletion of published posts."""
    if record.get("status") == "published":
        raise HookAbortException(
            message="Cannot delete published posts",
            status_code=400
        )

Async Database Operations

Hooks can perform database operations:
from sqlalchemy import select

@app.hook.on_record_after_create("posts")
async def create_audit_log(
    record: dict,
    context: Context,
    db: AsyncSession
):
    """Create audit log entry."""
    from src.snackbase.infrastructure.persistence.models.audit_log import AuditLog

    log = AuditLog(
        id=generate_id("audit"),
        account_id=context.account_id,
        user_id=context.user_id,
        action="create",
        collection="posts",
        record_id=record["id"],
        changes=record,
        timestamp=datetime.nowUTC()
    )

    db.add(log)
    await db.commit()

Best Practices

1. Keep Hooks Focused

Each hook should do one thing well:
# ❌ BAD: Hook doing too much
@app.hook.on_record_after_create("posts")
async def mega_hook(record: dict, context: Context):
    await send_slack_notification(record)
    await send_email_notification(record)
    await update_search_index(record)
    await create_audit_log(record)
    await invalidate_cache(record)

# ✅ GOOD: Separate, focused hooks
@app.hook.on_record_after_create("posts", priority=50)
async def send_slack_notification(record: dict, context: Context):
    await slack_service.notify(record)

@app.hook.on_record_after_create("posts", priority=40)
async def send_email_notification(record: dict, context: Context):
    await email_service.notify(record)

@app.hook.on_record_after_create("posts", priority=30)
async def update_search_index(record: dict, context: Context):
    await search_service.index(record)

2. Use Priority Wisely

Set appropriate priorities for execution order:
PriorityUse Case
100+Critical validation that should run first
50-99Core business logic
10-49Notifications and integrations
1-9Logging and analytics
0 (default)Cleanup and finalization

3. Handle Idempotency

Make hooks idempotent when possible:
# ❌ NOT idempotent
@app.hook.on_record_after_create("posts")
async def send_notification(record: dict, context: Context):
    await notification_service.send(record["created_by"], "Post created")
    # Will send duplicate if hook runs twice

# ✅ Idempotent
@app.hook.on_record_after_create("posts")
async def send_notification(record: dict, context: Context):
    await notification_service.send(
        user_id=record["created_by"],
        message=f"Post {record['id']} created",
        deduplication_key=f"post_created:{record['id']}"
    )

4. Log Hook Execution

Add logging for debugging:
import logging

logger = logging.getLogger(__name__)

@app.hook.on_record_after_create("posts")
async def logged_hook(record: dict, context: Context):
    logger.info(
        "Hook executed",
        extra={
            "hook": "logged_hook",
            "record_id": record.get("id"),
            "user_id": context.user_id,
            "account_id": context.account_id
        }
    )
    # ... hook logic

5. Avoid Blocking Operations

Keep hooks fast and non-blocking:
# ❌ BAD: Blocking operation
@app.hook.on_record_after_create("posts")
async def slow_hook(record: dict, context: Context):
    result = await slow_external_api_call(timeout=30)  # Blocks response
    # User waits 30 seconds for response

# ✅ GOOD: Fire and forget
@app.hook.on_record_after_create("posts")
async def fast_hook(record: dict, context: Context):
    # Queue for background processing
    await background_queue.enqueue(
        "slow_operation",
        record_id=record["id"]
    )
    # Returns immediately

6. Test Hooks

Write tests for your hooks:
# tests/unit/test_custom_hooks.py
import pytest
from unittest.mock import AsyncMock

@pytest.mark.asyncio
async def test_slack_notification_on_publish(db_session, context):
    """Test Slack notification is sent when post is published."""

    # Mock Slack service
    slack_service.send = AsyncMock()

    # Simulate hook execution
    from src.snackbase.infrastructure.hooks.custom_hooks import notify_slack_on_publish

    old_record = {"status": "draft"}
    new_record = {"status": "published", "title": "Test Post"}

    await notify_slack_on_publish(
        record=new_record,
        context=context,
        old_record=old_record,
        db=db_session
    )

    # Verify Slack was called
    slack_service.send.assert_called_once()

Examples

Example 1: Auto-Generate Slugs

import re

@app.hook.on_record_before_create("posts")
async def generate_slug(
    record: dict,
    context: Context
):
    """Auto-generate slug from title if not provided."""
    if "title" in record and "slug" not in record:
        title = record["title"]
        # Convert to slug
        slug = re.sub(r"[^\w\s-]", "", title.lower())
        slug = re.sub(r"[-\s]+", "-", slug)
        record["slug"] = slug

Example 2: Enforce Validation

from src.snackbase.core.exceptions import HookAbortException

@app.hook.on_record_before_create("posts")
async def validate_post_content(
    record: dict,
    context: Context
):
    """Ensure posts have minimum content length."""
    content = record.get("content", "")
    if len(content) < 50:
        raise HookAbortException(
            message="Post content must be at least 50 characters",
            status_code=400
        )

Example 3: Sync to External System

import httpx

@app.hook.on_record_after_update("products")
async def sync_to_external_crm(
    record: dict,
    context: Context,
    old_record: dict | None = None
):
    """Sync product changes to external CRM."""
    if old_record:
        # Only sync if fields changed
        changed_fields = [
            k for k in record
            if k in old_record and record[k] != old_record[k]
        ]

        if changed_fields:
            async with httpx.AsyncClient() as client:
                await client.put(
                    f"https://crm.example.com/api/products/{record['id']}",
                    json=record,
                    headers={"Authorization": f"Bearer {settings.crm_api_key}"}
                )

Example 4: Track Field Changes

@app.hook.on_record_after_update("users")
async def track_email_changes(
    record: dict,
    context: Context,
    old_record: dict | None = None
):
    """Log when user email changes."""
    if old_record and record.get("email") != old_record.get("email"):
        await audit_service.log(
            user_id=context.user_id,
            action="email_changed",
            details={
                "old_email": old_record.get("email"),
                "new_email": record.get("email")
            }
        )

Summary

ConceptKey Takeaway
Hook Categories8 categories: App Lifecycle, Model, Record, Collection, Auth, User, Request, Custom
RegistrationUse @app.hook.on_event_name() decorator
PriorityHigher priority = earlier execution (0-100+)
ContextHooks receive record, context, old_record, db parameters
Error HandlingLog errors, handle gracefully, use HookAbortException to abort
Best PracticesKeep focused, use priority wisely, ensure idempotency, log execution