Skip to content

API Job Framework

Craft Easy's built-in job framework lives inside craft-easy-api. It persists every run to MongoDB, tracks progress in real time, supports cron scheduling with distributed locking, and exposes a full REST API for managing jobs and schedules.

For lightweight standalone CLI jobs (Cloud Run, GitHub Actions, ad-hoc scripts), see the craft-easy-jobs CLI — the two are complementary and can be used together.

Enable the API job framework in settings.py:

JOBS_ENABLED=true
JOBS_CLEANUP_DAYS=90                    # How long to keep JobRun history
JOBS_AUDIT_RETENTION_DAYS=365           # Used by cleanup_audit_log job

Registering jobs

Jobs are ordinary async functions registered with the module-level job_registry:

from craft_easy.core.jobs import job_registry

@job_registry.job(
    "settlement",
    description="Monthly revenue settlement",
    timeout=600,
    max_attempts=2,
)
async def settlement(ctx, **kwargs) -> dict:
    period = kwargs.get("period")
    await ctx.log(f"Starting settlement for {period}")
    # ... settlement logic
    return {"processed": 42, "period": period}

The decorator takes:

Parameter Default Purpose
name required Unique identifier — used by schedules and the REST API
description "" Human-readable description surfaced in the admin UI
timeout 3600 Max execution time in seconds before the run is killed
max_attempts 3 Total tries including the first — set to 1 to disable retries

Job functions are async, take a ctx: JobContext, and accept arbitrary **kwargs as parameters. Return any JSON-serialisable value — it is stored on the JobRun.result field.

Running a job

from craft_easy.core.jobs import JobRunner

runner = JobRunner()
run = await runner.run(
    "settlement",
    parameters={"period": "2026-03"},
    triggered_by="user_664abc",
    job_type="manual",
)

print(run.status)          # "completed" or "failed"
print(run.duration_ms)
print(run.result)          # {"processed": 42, ...}

JobRunner.run() returns the persisted JobRun document. The full execution history is in Mongo — you can query it later by run ID, job name, or status.

Progress tracking

The JobContext passed to every job updates the JobRun document in MongoDB as the job runs. The admin UI (and any API consumer) can poll GET /jobs/runs/{run_id} to see live progress:

@job_registry.job("bulk-invoice-send", timeout=1800)
async def bulk_invoice_send(ctx, **kwargs) -> dict:
    invoices = await Invoice.find(Invoice.status == "pending").to_list()
    total = len(invoices)
    sent = 0

    for i, invoice in enumerate(invoices):
        await send_invoice(invoice)
        sent += 1
        await ctx.update_progress(i + 1, total, f"Sent {sent}/{total}")

    await ctx.log("Done")
    return {"sent": sent}

update_progress(current, total, message) saves directly to the JobRun document, so the admin can watch a long-running job tick forward without polling a log file.

The JobRun model

class JobRun(BaseDocument):
    job_name: str
    job_type: str                       # "scheduled" | "manual" | "chain"
    status: str                         # pending | running | completed | failed | cancelled

    started_at: datetime | None
    completed_at: datetime | None
    duration_ms: int | None

    parameters: dict                    # Input params
    result: dict | None                 # Return value on success
    error: str | None                   # Error message on failure
    error_traceback: str | None

    progress_current: int
    progress_total: int | None
    progress_message: str | None

    parent_job_id: PydanticObjectId | None    # For chained jobs
    chain_index: int | None

    attempt: int                        # Current attempt number
    max_attempts: int

    triggered_by: str | None            # User ID or "scheduler"

Collection: job_runs. Indexes on (job_name, started_at desc) and status.

Cron scheduling

Schedules are stored in MongoDB as JobSchedule documents and evaluated by a background JobScheduler:

class JobSchedule(BaseDocument):
    job_name: str                       # unique
    description: str | None
    cron_expression: str                # "0 3 1 * *"
    is_enabled: bool
    job_type: str                       # Registered job name to run
    parameters: dict
    max_attempts: int
    timeout_seconds: int
    chain: list[str]                    # Jobs to run after this one succeeds
    last_run_at: datetime | None
    last_run_status: str | None
    next_run_at: datetime | None

Create schedules via the REST API or directly with the Beanie model. See Scheduling for the cron syntax and timezone handling.

Distributed locking

When you run multiple API replicas, the scheduler needs to guarantee that only one replica picks up a given due job. The framework uses a MongoDB-backed DistributedLock:

from craft_easy.core.jobs.lock import DistributedLock
from craft_easy.core.jobs.scheduler import JobScheduler

lock = DistributedLock(db, ttl_seconds=3600)
await lock.ensure_indexes()

scheduler = JobScheduler(lock=lock, timezone="Europe/Stockholm")

Lock acquisition is atomic — it relies on a MongoDB unique index. If a replica crashes holding a lock, the lock document expires after ttl_seconds (default 1 hour) and another replica can pick up the job.

Without a lock, the scheduler runs without distributed coordination — fine for single-replica deployments but risky in HA. Always pass a DistributedLock when running more than one instance.

Job chains

A schedule can declare a chain — a list of job names to run sequentially after the main job completes successfully:

{
  "job_name": "month-end",
  "job_type": "settlement",
  "cron_expression": "0 3 1 * *",
  "chain": ["invoice-generation", "accounting-export"],
  "is_enabled": true
}

After settlement succeeds, invoice-generation runs with the same parameters, then accounting-export. Each chained run has its parent_job_id set to the triggering run and chain_index set to its position, so you can reconstruct the whole workflow from JobRun records.

If a step in the chain fails, the remaining steps are skipped — the chain stops. Failed chains do not auto-retry beyond the max_attempts of the individual step.

REST API reference

All endpoints require authentication. Base path: /jobs.

Registry

Method Path Purpose
GET /jobs/registry List every registered job definition

Response:

[
  {"name": "settlement", "description": "Monthly revenue settlement", "timeout": 600, "max_attempts": 2},
  {"name": "cleanup_expired_tokens", "description": "...", "timeout": 300, "max_attempts": 2}
]

Schedules CRUD

Method Path Purpose
GET /jobs/schedules List schedules
POST /jobs/schedules Create schedule
GET /jobs/schedules/{id} Get schedule
PATCH /jobs/schedules/{id} Update schedule
DELETE /jobs/schedules/{id} Delete schedule

Creating a schedule:

curl -X POST "http://localhost:5001/jobs/schedules" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "job_name": "daily-cleanup",
    "job_type": "cleanup_expired_tokens",
    "cron_expression": "0 3 * * *",
    "is_enabled": true,
    "max_attempts": 2,
    "timeout_seconds": 300
  }'

Runs

Method Path Purpose
GET /jobs/runs?job_name=&status=&page= List runs, filterable
GET /jobs/runs/{run_id} Get a single run (with progress)
POST /jobs/run/{job_name} Manually trigger a job
POST /jobs/runs/{run_id}/cancel Cancel a pending or running job
POST /jobs/check-schedules Force the scheduler to evaluate due jobs now

Manually triggering:

curl -X POST "http://localhost:5001/jobs/run/settlement" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"parameters": {"period": "2026-03"}}'

Built-in jobs

craft-easy-api ships with five built-in maintenance jobs that use the framework. They are auto-registered when JOBS_ENABLED=true:

Job Purpose Timeout
cleanup_expired_tokens Delete expired authentication sessions 300s
cleanup_audit_log Delete audit entries older than JOBS_AUDIT_RETENTION_DAYS 600s
cleanup_job_runs Delete JobRun documents older than JOBS_CLEANUP_DAYS 300s
purge_soft_deleted Permanently delete soft-deleted documents older than SOFT_DELETE_RETENTION_DAYS 600s
calculate_next_schedules Recalculate next_run_at on all enabled schedules 120s

Schedule them via the REST API:

curl -X POST "http://localhost:5001/jobs/schedules" \
  -H "Content-Type: application/json" \
  -d '{
    "job_name": "nightly-cleanup",
    "job_type": "cleanup_expired_tokens",
    "cron_expression": "0 3 * * *",
    "is_enabled": true
  }'

See Built-in Jobs for the full catalogue including the standalone bi-export job.

Retries and error handling

Each job has a max_attempts. When a run fails, the runner waits briefly and retries up to max_attempts - 1 more times. Timeouts count as failures. The final failing attempt persists the exception message and traceback on run.error and run.error_traceback.

Retries share the same JobRun document — run.attempt counts up as the runner retries. This means a successful retry looks identical to a first-try success in reporting: status completed, attempt 2.