API Job Framework¶
Craft Easy's built-in job framework lives inside craft-easy-api. It persists every run to MongoDB, tracks progress in real time, supports cron scheduling with distributed locking, and exposes a full REST API for managing jobs and schedules.
For lightweight standalone CLI jobs (Cloud Run, GitHub Actions, ad-hoc scripts), see the craft-easy-jobs CLI — the two are complementary and can be used together.
Enable the API job framework in settings.py:
JOBS_ENABLED=true
JOBS_CLEANUP_DAYS=90 # How long to keep JobRun history
JOBS_AUDIT_RETENTION_DAYS=365 # Used by cleanup_audit_log job
Registering jobs¶
Jobs are ordinary async functions registered with the module-level job_registry:
from craft_easy.core.jobs import job_registry
@job_registry.job(
"settlement",
description="Monthly revenue settlement",
timeout=600,
max_attempts=2,
)
async def settlement(ctx, **kwargs) -> dict:
period = kwargs.get("period")
await ctx.log(f"Starting settlement for {period}")
# ... settlement logic
return {"processed": 42, "period": period}
The decorator takes:
| Parameter | Default | Purpose |
|---|---|---|
name |
required | Unique identifier — used by schedules and the REST API |
description |
"" |
Human-readable description surfaced in the admin UI |
timeout |
3600 |
Max execution time in seconds before the run is killed |
max_attempts |
3 |
Total tries including the first — set to 1 to disable retries |
Job functions are async, take a ctx: JobContext, and accept arbitrary **kwargs as parameters. Return any JSON-serialisable value — it is stored on the JobRun.result field.
Running a job¶
from craft_easy.core.jobs import JobRunner
runner = JobRunner()
run = await runner.run(
"settlement",
parameters={"period": "2026-03"},
triggered_by="user_664abc",
job_type="manual",
)
print(run.status) # "completed" or "failed"
print(run.duration_ms)
print(run.result) # {"processed": 42, ...}
JobRunner.run() returns the persisted JobRun document. The full execution history is in Mongo — you can query it later by run ID, job name, or status.
Progress tracking¶
The JobContext passed to every job updates the JobRun document in MongoDB as the job runs. The admin UI (and any API consumer) can poll GET /jobs/runs/{run_id} to see live progress:
@job_registry.job("bulk-invoice-send", timeout=1800)
async def bulk_invoice_send(ctx, **kwargs) -> dict:
invoices = await Invoice.find(Invoice.status == "pending").to_list()
total = len(invoices)
sent = 0
for i, invoice in enumerate(invoices):
await send_invoice(invoice)
sent += 1
await ctx.update_progress(i + 1, total, f"Sent {sent}/{total}")
await ctx.log("Done")
return {"sent": sent}
update_progress(current, total, message) saves directly to the JobRun document, so the admin can watch a long-running job tick forward without polling a log file.
The JobRun model¶
class JobRun(BaseDocument):
job_name: str
job_type: str # "scheduled" | "manual" | "chain"
status: str # pending | running | completed | failed | cancelled
started_at: datetime | None
completed_at: datetime | None
duration_ms: int | None
parameters: dict # Input params
result: dict | None # Return value on success
error: str | None # Error message on failure
error_traceback: str | None
progress_current: int
progress_total: int | None
progress_message: str | None
parent_job_id: PydanticObjectId | None # For chained jobs
chain_index: int | None
attempt: int # Current attempt number
max_attempts: int
triggered_by: str | None # User ID or "scheduler"
Collection: job_runs. Indexes on (job_name, started_at desc) and status.
Cron scheduling¶
Schedules are stored in MongoDB as JobSchedule documents and evaluated by a background JobScheduler:
class JobSchedule(BaseDocument):
job_name: str # unique
description: str | None
cron_expression: str # "0 3 1 * *"
is_enabled: bool
job_type: str # Registered job name to run
parameters: dict
max_attempts: int
timeout_seconds: int
chain: list[str] # Jobs to run after this one succeeds
last_run_at: datetime | None
last_run_status: str | None
next_run_at: datetime | None
Create schedules via the REST API or directly with the Beanie model. See Scheduling for the cron syntax and timezone handling.
Distributed locking¶
When you run multiple API replicas, the scheduler needs to guarantee that only one replica picks up a given due job. The framework uses a MongoDB-backed DistributedLock:
from craft_easy.core.jobs.lock import DistributedLock
from craft_easy.core.jobs.scheduler import JobScheduler
lock = DistributedLock(db, ttl_seconds=3600)
await lock.ensure_indexes()
scheduler = JobScheduler(lock=lock, timezone="Europe/Stockholm")
Lock acquisition is atomic — it relies on a MongoDB unique index. If a replica crashes holding a lock, the lock document expires after ttl_seconds (default 1 hour) and another replica can pick up the job.
Without a lock, the scheduler runs without distributed coordination — fine for single-replica deployments but risky in HA. Always pass a DistributedLock when running more than one instance.
Job chains¶
A schedule can declare a chain — a list of job names to run sequentially after the main job completes successfully:
{
"job_name": "month-end",
"job_type": "settlement",
"cron_expression": "0 3 1 * *",
"chain": ["invoice-generation", "accounting-export"],
"is_enabled": true
}
After settlement succeeds, invoice-generation runs with the same parameters, then accounting-export. Each chained run has its parent_job_id set to the triggering run and chain_index set to its position, so you can reconstruct the whole workflow from JobRun records.
If a step in the chain fails, the remaining steps are skipped — the chain stops. Failed chains do not auto-retry beyond the max_attempts of the individual step.
REST API reference¶
All endpoints require authentication. Base path: /jobs.
Registry¶
| Method | Path | Purpose |
|---|---|---|
GET |
/jobs/registry |
List every registered job definition |
Response:
[
{"name": "settlement", "description": "Monthly revenue settlement", "timeout": 600, "max_attempts": 2},
{"name": "cleanup_expired_tokens", "description": "...", "timeout": 300, "max_attempts": 2}
]
Schedules CRUD¶
| Method | Path | Purpose |
|---|---|---|
GET |
/jobs/schedules |
List schedules |
POST |
/jobs/schedules |
Create schedule |
GET |
/jobs/schedules/{id} |
Get schedule |
PATCH |
/jobs/schedules/{id} |
Update schedule |
DELETE |
/jobs/schedules/{id} |
Delete schedule |
Creating a schedule:
curl -X POST "http://localhost:5001/jobs/schedules" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"job_name": "daily-cleanup",
"job_type": "cleanup_expired_tokens",
"cron_expression": "0 3 * * *",
"is_enabled": true,
"max_attempts": 2,
"timeout_seconds": 300
}'
Runs¶
| Method | Path | Purpose |
|---|---|---|
GET |
/jobs/runs?job_name=&status=&page= |
List runs, filterable |
GET |
/jobs/runs/{run_id} |
Get a single run (with progress) |
POST |
/jobs/run/{job_name} |
Manually trigger a job |
POST |
/jobs/runs/{run_id}/cancel |
Cancel a pending or running job |
POST |
/jobs/check-schedules |
Force the scheduler to evaluate due jobs now |
Manually triggering:
curl -X POST "http://localhost:5001/jobs/run/settlement" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"parameters": {"period": "2026-03"}}'
Built-in jobs¶
craft-easy-api ships with five built-in maintenance jobs that use the framework. They are auto-registered when JOBS_ENABLED=true:
| Job | Purpose | Timeout |
|---|---|---|
cleanup_expired_tokens |
Delete expired authentication sessions | 300s |
cleanup_audit_log |
Delete audit entries older than JOBS_AUDIT_RETENTION_DAYS |
600s |
cleanup_job_runs |
Delete JobRun documents older than JOBS_CLEANUP_DAYS |
300s |
purge_soft_deleted |
Permanently delete soft-deleted documents older than SOFT_DELETE_RETENTION_DAYS |
600s |
calculate_next_schedules |
Recalculate next_run_at on all enabled schedules |
120s |
Schedule them via the REST API:
curl -X POST "http://localhost:5001/jobs/schedules" \
-H "Content-Type: application/json" \
-d '{
"job_name": "nightly-cleanup",
"job_type": "cleanup_expired_tokens",
"cron_expression": "0 3 * * *",
"is_enabled": true
}'
See Built-in Jobs for the full catalogue including the standalone bi-export job.
Retries and error handling¶
Each job has a max_attempts. When a run fails, the runner waits briefly and retries up to max_attempts - 1 more times. Timeouts count as failures. The final failing attempt persists the exception message and traceback on run.error and run.error_traceback.
Retries share the same JobRun document — run.attempt counts up as the runner retries. This means a successful retry looks identical to a first-try success in reporting: status completed, attempt 2.