Job Decorators & Context¶
This page is the reference for the @job() decorator, the chain() builder, and the JobContext interface used by the standalone craft-easy-jobs CLI. The API framework uses a slightly different JobContext — see API Job Framework for that variant.
@job()¶
from craft_easy_jobs import job
def job(
name: str,
description: str = "",
timeout_seconds: int = 300,
retries: int = 0,
per_tenant: bool = False,
scope: str = "tenant",
params_schema: dict | None = None,
) -> Callable
| Parameter | Default | Purpose |
|---|---|---|
name |
required | Unique identifier — the name used by craft-easy-job run <name> |
description |
"" |
Human-readable description shown in craft-easy-job list |
timeout_seconds |
300 |
Max execution time before the runner kills the job |
retries |
0 |
Number of retry attempts on failure (0 = run once) |
per_tenant |
False |
If True, the runner invokes the job once per tenant in the scope |
scope |
"tenant" |
tenant, parent_tenant, or system (see scopes) |
params_schema |
{} |
JSON schema for the expected parameters — used for validation and docs |
A job function is async, takes ctx: JobContext as its only positional argument, and returns any JSON-serialisable value (or None):
from craft_easy_jobs import job, JobContext
@job(
name="settlement",
description="Monthly revenue settlement",
timeout_seconds=600,
retries=2,
params_schema={
"period": {"type": "string", "pattern": "^\\d{4}-\\d{2}$"},
},
)
async def settlement_job(ctx: JobContext):
period = ctx.params["period"]
await ctx.log(f"Starting {period}")
# ... work ...
return {"period": period, "processed": 42}
JobContext¶
JobContext is what every job function receives. It carries parameters, tenant info, and a small set of logging and control methods.
class JobContext:
run_id: str
job_name: str
params: dict[str, Any]
tenant_id: str | None
scope: str
sub_tenant_ids: list[str]
async def log(self, message: str) -> None: ...
async def progress(self, message: str, percentage: int | None = None) -> None: ...
async def warning(self, message: str) -> None: ...
async def error(self, message: str) -> None: ...
def should_stop(self) -> bool: ...
def request_stop(self) -> None: ...
def iter_tenant_ids(self) -> list[str]: ...
def get_logs(self) -> list[dict]: ...
Properties¶
| Property | Purpose |
|---|---|
run_id |
Unique identifier for this execution |
job_name |
The registered job name |
params |
Parameters passed to the runner (from CLI --params or programmatic call) |
tenant_id |
The primary tenant (set if scope is tenant or parent_tenant) |
scope |
"tenant", "parent_tenant", or "system" |
sub_tenant_ids |
Resolved descendant tenant IDs (populated when scope is parent_tenant) |
Logging methods¶
All logging methods are async and write to both the in-memory log buffer (available via get_logs()) and the configured Python logger:
@job(name="export")
async def export_job(ctx: JobContext):
await ctx.log("Starting export") # info
await ctx.warning("Retrying stuck record") # warning
await ctx.error("Failed to write row 42") # error (job continues)
await ctx.progress("Processing...", percentage=50) # progress with %
log() is for normal status messages. warning() and error() are for non-fatal issues that the operator should see — the job continues after these. To actually fail a job, raise an exception.
Progress reporting¶
total = len(items)
for i, item in enumerate(items):
await process(item)
await ctx.progress(
f"Processed {i+1}/{total}",
percentage=int((i + 1) / total * 100),
)
Progress entries are just log entries with level="progress" and an optional percentage. They are surfaced by craft-easy-job run as live output.
Cooperative cancellation¶
Long-running jobs should check should_stop() periodically. When the runner receives a termination signal (SIGTERM, SIGINT) or an operator triggers a cancel, it calls request_stop() on the context. The job is expected to exit gracefully:
@job(name="bulk-export", timeout_seconds=3600)
async def bulk_export(ctx: JobContext):
items = await fetch_items()
exported = 0
for i, item in enumerate(items):
if ctx.should_stop():
await ctx.warning(f"Stopped early at {exported}/{len(items)}")
break
await export_item(item)
exported += 1
return {"exported": exported, "total": len(items)}
The runner only signals stop; it does not forcibly cancel the task (except on hard timeout). Code that ignores should_stop() will keep running until the timeout expires.
Iterating tenants¶
For per_tenant or parent_tenant-scoped jobs, use iter_tenant_ids() to get the full list of tenant IDs the job should process:
@job(name="org-rollup", scope="parent_tenant")
async def org_rollup(ctx: JobContext):
for tenant_id in ctx.iter_tenant_ids():
if ctx.should_stop():
break
await rollup_tenant(tenant_id)
return {"processed": len(ctx.iter_tenant_ids())}
For scope="tenant", iter_tenant_ids() returns [ctx.tenant_id]. For scope="parent_tenant", it returns the parent plus all sub-tenants. For scope="system", it returns an empty list.
JobResult¶
The runner returns a JobResult with everything you need for logging and post-run analysis:
@dataclass
class JobResult:
job_name: str
run_id: str
status: str # "completed", "failed", or "running"
started_at: datetime
completed_at: datetime | None
duration_seconds: float | None
result: Any # Return value of the job function
error: str | None # Exception message on failure
logs: list[dict] # All log entries with timestamps
Each log entry is:
{
"timestamp": "2026-04-05T10:30:45.123456+00:00",
"level": "info" | "progress" | "warning" | "error",
"message": "...",
"percentage": 42 | None,
}
chain() and ChainStep¶
The standalone package can describe chains of jobs even though it doesn't execute them — the API framework reads chain fields on JobSchedule documents and runs the sequence. Use chain() as a design-time tool to express dependencies cleanly:
from craft_easy_jobs import chain, ChainStep
month_end = chain(
name="month-end",
description="Monthly close-of-business",
steps=[
ChainStep(job="settlement"),
ChainStep(job="invoicing", depends_on=["settlement"]),
ChainStep(job="accounting-export", depends_on=["invoicing"]),
],
)
ChainStep fields¶
@dataclass
class ChainStep:
job: str # Job name to run
params: dict[str, Any] = {}
depends_on: list[str] = [] # Job names that must succeed first
on_failure: str = "stop" # "stop" | "continue" | "retry"
retries: int = 0
retry_delay_seconds: int = 60
depends_on is declarative — a runner that executes chains uses it to topologically sort the steps. on_failure can branch behaviour per step: stop (the default) halts the chain, continue ignores the failure and runs the next step, and retry re-runs the step after retry_delay_seconds.
Parameter validation with params_schema¶
params_schema accepts any JSON schema dict. The runner does not enforce it by default — validation is up to the caller, but providing a schema has two benefits:
craft-easy-job listshows the expected parameters.- If you wire up a JSON Schema validator in your runner or CI, you can reject bad parameters before the job starts.
@job(
name="bi-export",
params_schema={
"type": "object",
"required": ["target_type", "configs"],
"properties": {
"target_type": {"type": "string", "enum": ["bigquery", "azure_sql"]},
"configs": {"type": "array", "items": {"type": "object"}},
"batch_size": {"type": "integer", "minimum": 1, "default": 500},
},
},
)
async def bi_export(ctx: JobContext):
...
Complete example¶
from craft_easy_jobs import job, chain, ChainStep, JobContext
@job(
name="settlement",
description="Calculate settlement for a billing period",
timeout_seconds=1800,
retries=2,
per_tenant=True,
params_schema={
"period": {"type": "string", "pattern": "^\\d{4}-\\d{2}$"},
},
)
async def settlement(ctx: JobContext):
period = ctx.params["period"]
await ctx.log(f"Starting settlement for tenant={ctx.tenant_id} period={period}")
from my_models import Tenant, Settlement
tenants = await Tenant.find(Tenant.is_active == True).to_list()
settled = 0
for i, tenant in enumerate(tenants):
if ctx.should_stop():
await ctx.warning(f"Stopped at {settled}/{len(tenants)}")
break
try:
s = Settlement(tenant_id=tenant.id, period=period, amount=await calc(tenant, period))
await s.insert()
settled += 1
await ctx.progress(
f"Settled {i+1}/{len(tenants)}",
percentage=int((i+1) / len(tenants) * 100),
)
except Exception as e:
await ctx.error(f"Failed for {tenant.name}: {e}")
await ctx.log(f"Done: {settled}/{len(tenants)}")
return {"period": period, "settled": settled, "total": len(tenants)}
# Define the month-end chain — one declarative source of truth
month_end = chain(
name="month-end",
description="Month-end closing workflow",
steps=[
ChainStep(job="settlement"),
ChainStep(job="invoicing", depends_on=["settlement"]),
ChainStep(job="reporting", depends_on=["invoicing"], on_failure="continue"),
],
)
Run with: