Events & Event Bus¶
Craft Easy exposes a lightweight in-process event bus for decoupling domain logic. When something interesting happens — a resource is created, a payment succeeds, a user registers — publish a DomainEvent, and any number of subscribers can react to it without coupling back to the publisher.
The bus is designed for use inside a single API process. It is not a cross-service message broker — for that, use the Webhooks system, a real message queue, or the BI export pipeline.
The DomainEvent base type¶
Every event is a Pydantic model inheriting from DomainEvent:
class DomainEvent(BaseModel):
event_id: str # UUID, auto-generated
event_type: str # dot-separated, e.g. "order.confirmed"
timestamp: datetime # UTC, auto
tenant_id: str | None = None
actor_id: str | None = None # User or service that caused the event
payload: dict[str, Any] = {}
Built-in event types¶
from craft_easy.core.events.types import (
ResourceCreated,
ResourceUpdated,
ResourceDeleted,
PaymentCompleted,
UserRegistered,
)
| Type | Fields | Emitted when |
|---|---|---|
ResourceCreated |
resource, item_id |
A document is inserted via CRUD |
ResourceUpdated |
resource, item_id, changes |
A document is updated |
ResourceDeleted |
resource, item_id |
A document is soft- or hard-deleted |
PaymentCompleted |
payment_id, amount, currency |
Payment lifecycle hits completed |
UserRegistered |
user_id, email |
A new user account is created |
Define your own by subclassing DomainEvent and setting event_type as a class default:
class BookingCancelled(DomainEvent):
event_type: str = "booking.cancelled"
booking_id: str
reason: str
Publishing events¶
from craft_easy.core.events import get_event_bus
from craft_easy.core.events.types import PaymentCompleted
bus = get_event_bus()
await bus.publish(
PaymentCompleted(
payment_id=str(payment.id),
amount=float(payment.amount),
currency=payment.currency,
tenant_id=str(payment.tenant_id),
actor_id=str(current_user.id),
)
)
When you call publish():
- All synchronous subscribers for the matching
event_typeare invoked in order and awaited. If one raises, the error is logged but the remaining subscribers still run (handler isolation). - All background subscribers are scheduled as
asyncio.create_task— fire and forget. - If an
EventStorehas been configured, the event is persisted to theevent_logcollection.
Subscribing with @event_handler¶
The decorator is the idiomatic way to wire up handlers at module load time:
from craft_easy.core.events import event_handler
from craft_easy.core.events.types import DomainEvent
@event_handler("payment.completed")
async def on_payment_completed(event: DomainEvent) -> None:
payment_id = event.payload.get("payment_id")
await mark_invoice_paid(payment_id)
@event_handler("user.registered", background=True)
async def send_welcome_email(event: DomainEvent) -> None:
email = event.payload.get("email")
await notification_registry.send(
channel="email",
to=email,
subject="Welcome",
body="Thanks for signing up.",
)
background=True lets the publisher return immediately and runs the handler as an asyncio.create_task. Use this for any handler that does I/O — notifications, external HTTP calls, BI exports. Reserve synchronous handlers for tight, fast logic that must complete before the publishing request returns.
Wildcard subscriptions¶
Subscribe to "*" to see every event on the bus — useful for audit sinks, debug dashboards, and the EventStore:
@event_handler("*", background=True)
async def audit_sink(event: DomainEvent) -> None:
logger.info("Event: %s by %s", event.event_type, event.actor_id)
The event store (optional persistence)¶
By default the bus is in-memory — events vanish after handlers run. To persist every event to MongoDB for replay and forensics, attach an EventStore:
from craft_easy.core.events import get_event_bus
from craft_easy.core.events.store import EventStore
bus = get_event_bus()
bus.configure_store(EventStore())
Now every publish() writes to the event_log collection. Query it via EventStore.replay():
store = EventStore()
# Replay all events for a specific tenant
events = await store.replay(tenant_id="tenant_123", limit=500)
# Replay just payment events
payments = await store.replay(event_type="payment.completed", limit=100)
The EventLog model¶
class EventLog(BaseDocument):
event_id: str
event_type: str
tenant_id: str | None
actor_id: str | None
payload: dict
timestamp: datetime
Indexes on event_id, event_type, tenant_id, and timestamp. The collection is system-scoped (tenant_scoped = False) even though individual events carry tenant_id — this gives operators a single audit trail across all tenants.
Pattern: decoupled side effects¶
The main use of the event bus is to keep domain services thin. Instead of this:
async def complete_payment(payment_id):
payment = await Payment.get(payment_id)
payment.status = "completed"
await payment.save()
# Side effects — now the service knows about every downstream concern
await send_receipt_email(payment)
await update_invoice_balance(payment)
await credit_rewards_points(payment)
await post_to_analytics(payment)
Do this:
async def complete_payment(payment_id):
payment = await Payment.get(payment_id)
payment.status = "completed"
await payment.save()
await bus.publish(PaymentCompleted(
payment_id=str(payment.id),
amount=float(payment.amount),
currency=payment.currency,
))
# Wire side effects independently — each in its own module
@event_handler("payment.completed", background=True)
async def send_receipt(event): ...
@event_handler("payment.completed", background=True)
async def update_invoice(event): ...
@event_handler("payment.completed", background=True)
async def credit_rewards(event): ...
The payment service no longer needs to know what the receipt module or the rewards module is doing. New side effects are added by registering a new handler — no changes to the service.
Handler ordering and error isolation¶
- Synchronous handlers run in registration order.
- A failing synchronous handler is logged but does not stop subsequent handlers.
- A failing synchronous handler does not fail the
publish()call either — publishers should assume event delivery is best-effort. - Background handlers are fire-and-forget; their errors are logged but never propagate.
If you need guaranteed delivery across process restarts, attach an EventStore and write a background job that reads unprocessed events and re-dispatches them.