Skip to content

Events & Event Bus

Craft Easy exposes a lightweight in-process event bus for decoupling domain logic. When something interesting happens — a resource is created, a payment succeeds, a user registers — publish a DomainEvent, and any number of subscribers can react to it without coupling back to the publisher.

The bus is designed for use inside a single API process. It is not a cross-service message broker — for that, use the Webhooks system, a real message queue, or the BI export pipeline.

The DomainEvent base type

Every event is a Pydantic model inheriting from DomainEvent:

class DomainEvent(BaseModel):
    event_id: str                      # UUID, auto-generated
    event_type: str                    # dot-separated, e.g. "order.confirmed"
    timestamp: datetime                # UTC, auto
    tenant_id: str | None = None
    actor_id: str | None = None        # User or service that caused the event
    payload: dict[str, Any] = {}

Built-in event types

from craft_easy.core.events.types import (
    ResourceCreated,
    ResourceUpdated,
    ResourceDeleted,
    PaymentCompleted,
    UserRegistered,
)
Type Fields Emitted when
ResourceCreated resource, item_id A document is inserted via CRUD
ResourceUpdated resource, item_id, changes A document is updated
ResourceDeleted resource, item_id A document is soft- or hard-deleted
PaymentCompleted payment_id, amount, currency Payment lifecycle hits completed
UserRegistered user_id, email A new user account is created

Define your own by subclassing DomainEvent and setting event_type as a class default:

class BookingCancelled(DomainEvent):
    event_type: str = "booking.cancelled"
    booking_id: str
    reason: str

Publishing events

from craft_easy.core.events import get_event_bus
from craft_easy.core.events.types import PaymentCompleted

bus = get_event_bus()

await bus.publish(
    PaymentCompleted(
        payment_id=str(payment.id),
        amount=float(payment.amount),
        currency=payment.currency,
        tenant_id=str(payment.tenant_id),
        actor_id=str(current_user.id),
    )
)

When you call publish():

  1. All synchronous subscribers for the matching event_type are invoked in order and awaited. If one raises, the error is logged but the remaining subscribers still run (handler isolation).
  2. All background subscribers are scheduled as asyncio.create_task — fire and forget.
  3. If an EventStore has been configured, the event is persisted to the event_log collection.

Subscribing with @event_handler

The decorator is the idiomatic way to wire up handlers at module load time:

from craft_easy.core.events import event_handler
from craft_easy.core.events.types import DomainEvent

@event_handler("payment.completed")
async def on_payment_completed(event: DomainEvent) -> None:
    payment_id = event.payload.get("payment_id")
    await mark_invoice_paid(payment_id)

@event_handler("user.registered", background=True)
async def send_welcome_email(event: DomainEvent) -> None:
    email = event.payload.get("email")
    await notification_registry.send(
        channel="email",
        to=email,
        subject="Welcome",
        body="Thanks for signing up.",
    )

background=True lets the publisher return immediately and runs the handler as an asyncio.create_task. Use this for any handler that does I/O — notifications, external HTTP calls, BI exports. Reserve synchronous handlers for tight, fast logic that must complete before the publishing request returns.

Wildcard subscriptions

Subscribe to "*" to see every event on the bus — useful for audit sinks, debug dashboards, and the EventStore:

@event_handler("*", background=True)
async def audit_sink(event: DomainEvent) -> None:
    logger.info("Event: %s by %s", event.event_type, event.actor_id)

The event store (optional persistence)

By default the bus is in-memory — events vanish after handlers run. To persist every event to MongoDB for replay and forensics, attach an EventStore:

from craft_easy.core.events import get_event_bus
from craft_easy.core.events.store import EventStore

bus = get_event_bus()
bus.configure_store(EventStore())

Now every publish() writes to the event_log collection. Query it via EventStore.replay():

store = EventStore()

# Replay all events for a specific tenant
events = await store.replay(tenant_id="tenant_123", limit=500)

# Replay just payment events
payments = await store.replay(event_type="payment.completed", limit=100)

The EventLog model

class EventLog(BaseDocument):
    event_id: str
    event_type: str
    tenant_id: str | None
    actor_id: str | None
    payload: dict
    timestamp: datetime

Indexes on event_id, event_type, tenant_id, and timestamp. The collection is system-scoped (tenant_scoped = False) even though individual events carry tenant_id — this gives operators a single audit trail across all tenants.

Pattern: decoupled side effects

The main use of the event bus is to keep domain services thin. Instead of this:

async def complete_payment(payment_id):
    payment = await Payment.get(payment_id)
    payment.status = "completed"
    await payment.save()

    # Side effects — now the service knows about every downstream concern
    await send_receipt_email(payment)
    await update_invoice_balance(payment)
    await credit_rewards_points(payment)
    await post_to_analytics(payment)

Do this:

async def complete_payment(payment_id):
    payment = await Payment.get(payment_id)
    payment.status = "completed"
    await payment.save()

    await bus.publish(PaymentCompleted(
        payment_id=str(payment.id),
        amount=float(payment.amount),
        currency=payment.currency,
    ))

# Wire side effects independently — each in its own module
@event_handler("payment.completed", background=True)
async def send_receipt(event): ...

@event_handler("payment.completed", background=True)
async def update_invoice(event): ...

@event_handler("payment.completed", background=True)
async def credit_rewards(event): ...

The payment service no longer needs to know what the receipt module or the rewards module is doing. New side effects are added by registering a new handler — no changes to the service.

Handler ordering and error isolation

  • Synchronous handlers run in registration order.
  • A failing synchronous handler is logged but does not stop subsequent handlers.
  • A failing synchronous handler does not fail the publish() call either — publishers should assume event delivery is best-effort.
  • Background handlers are fire-and-forget; their errors are logged but never propagate.

If you need guaranteed delivery across process restarts, attach an EventStore and write a background job that reads unprocessed events and re-dispatches them.