Skip to content

Job Decorators & Context

This page is the reference for the @job() decorator, the chain() builder, and the JobContext interface used by the standalone craft-easy-jobs CLI. The API framework uses a slightly different JobContext — see API Job Framework for that variant.

@job()

from craft_easy_jobs import job

def job(
    name: str,
    description: str = "",
    timeout_seconds: int = 300,
    retries: int = 0,
    per_tenant: bool = False,
    scope: str = "tenant",
    params_schema: dict | None = None,
) -> Callable
Parameter Default Purpose
name required Unique identifier — the name used by craft-easy-job run <name>
description "" Human-readable description shown in craft-easy-job list
timeout_seconds 300 Max execution time before the runner kills the job
retries 0 Number of retry attempts on failure (0 = run once)
per_tenant False If True, the runner invokes the job once per tenant in the scope
scope "tenant" tenant, parent_tenant, or system (see scopes)
params_schema {} JSON schema for the expected parameters — used for validation and docs

A job function is async, takes ctx: JobContext as its only positional argument, and returns any JSON-serialisable value (or None):

from craft_easy_jobs import job, JobContext

@job(
    name="settlement",
    description="Monthly revenue settlement",
    timeout_seconds=600,
    retries=2,
    params_schema={
        "period": {"type": "string", "pattern": "^\\d{4}-\\d{2}$"},
    },
)
async def settlement_job(ctx: JobContext):
    period = ctx.params["period"]
    await ctx.log(f"Starting {period}")
    # ... work ...
    return {"period": period, "processed": 42}

JobContext

JobContext is what every job function receives. It carries parameters, tenant info, and a small set of logging and control methods.

class JobContext:
    run_id: str
    job_name: str
    params: dict[str, Any]
    tenant_id: str | None
    scope: str
    sub_tenant_ids: list[str]

    async def log(self, message: str) -> None: ...
    async def progress(self, message: str, percentage: int | None = None) -> None: ...
    async def warning(self, message: str) -> None: ...
    async def error(self, message: str) -> None: ...

    def should_stop(self) -> bool: ...
    def request_stop(self) -> None: ...

    def iter_tenant_ids(self) -> list[str]: ...
    def get_logs(self) -> list[dict]: ...

Properties

Property Purpose
run_id Unique identifier for this execution
job_name The registered job name
params Parameters passed to the runner (from CLI --params or programmatic call)
tenant_id The primary tenant (set if scope is tenant or parent_tenant)
scope "tenant", "parent_tenant", or "system"
sub_tenant_ids Resolved descendant tenant IDs (populated when scope is parent_tenant)

Logging methods

All logging methods are async and write to both the in-memory log buffer (available via get_logs()) and the configured Python logger:

@job(name="export")
async def export_job(ctx: JobContext):
    await ctx.log("Starting export")              # info
    await ctx.warning("Retrying stuck record")    # warning
    await ctx.error("Failed to write row 42")     # error (job continues)
    await ctx.progress("Processing...", percentage=50)  # progress with %

log() is for normal status messages. warning() and error() are for non-fatal issues that the operator should see — the job continues after these. To actually fail a job, raise an exception.

Progress reporting

total = len(items)
for i, item in enumerate(items):
    await process(item)
    await ctx.progress(
        f"Processed {i+1}/{total}",
        percentage=int((i + 1) / total * 100),
    )

Progress entries are just log entries with level="progress" and an optional percentage. They are surfaced by craft-easy-job run as live output.

Cooperative cancellation

Long-running jobs should check should_stop() periodically. When the runner receives a termination signal (SIGTERM, SIGINT) or an operator triggers a cancel, it calls request_stop() on the context. The job is expected to exit gracefully:

@job(name="bulk-export", timeout_seconds=3600)
async def bulk_export(ctx: JobContext):
    items = await fetch_items()
    exported = 0

    for i, item in enumerate(items):
        if ctx.should_stop():
            await ctx.warning(f"Stopped early at {exported}/{len(items)}")
            break
        await export_item(item)
        exported += 1

    return {"exported": exported, "total": len(items)}

The runner only signals stop; it does not forcibly cancel the task (except on hard timeout). Code that ignores should_stop() will keep running until the timeout expires.

Iterating tenants

For per_tenant or parent_tenant-scoped jobs, use iter_tenant_ids() to get the full list of tenant IDs the job should process:

@job(name="org-rollup", scope="parent_tenant")
async def org_rollup(ctx: JobContext):
    for tenant_id in ctx.iter_tenant_ids():
        if ctx.should_stop():
            break
        await rollup_tenant(tenant_id)
    return {"processed": len(ctx.iter_tenant_ids())}

For scope="tenant", iter_tenant_ids() returns [ctx.tenant_id]. For scope="parent_tenant", it returns the parent plus all sub-tenants. For scope="system", it returns an empty list.

JobResult

The runner returns a JobResult with everything you need for logging and post-run analysis:

@dataclass
class JobResult:
    job_name: str
    run_id: str
    status: str                       # "completed", "failed", or "running"
    started_at: datetime
    completed_at: datetime | None
    duration_seconds: float | None
    result: Any                       # Return value of the job function
    error: str | None                 # Exception message on failure
    logs: list[dict]                  # All log entries with timestamps

Each log entry is:

{
    "timestamp": "2026-04-05T10:30:45.123456+00:00",
    "level": "info" | "progress" | "warning" | "error",
    "message": "...",
    "percentage": 42 | None,
}

chain() and ChainStep

The standalone package can describe chains of jobs even though it doesn't execute them — the API framework reads chain fields on JobSchedule documents and runs the sequence. Use chain() as a design-time tool to express dependencies cleanly:

from craft_easy_jobs import chain, ChainStep

month_end = chain(
    name="month-end",
    description="Monthly close-of-business",
    steps=[
        ChainStep(job="settlement"),
        ChainStep(job="invoicing", depends_on=["settlement"]),
        ChainStep(job="accounting-export", depends_on=["invoicing"]),
    ],
)

ChainStep fields

@dataclass
class ChainStep:
    job: str                          # Job name to run
    params: dict[str, Any] = {}
    depends_on: list[str] = []        # Job names that must succeed first
    on_failure: str = "stop"          # "stop" | "continue" | "retry"
    retries: int = 0
    retry_delay_seconds: int = 60

depends_on is declarative — a runner that executes chains uses it to topologically sort the steps. on_failure can branch behaviour per step: stop (the default) halts the chain, continue ignores the failure and runs the next step, and retry re-runs the step after retry_delay_seconds.

Parameter validation with params_schema

params_schema accepts any JSON schema dict. The runner does not enforce it by default — validation is up to the caller, but providing a schema has two benefits:

  1. craft-easy-job list shows the expected parameters.
  2. If you wire up a JSON Schema validator in your runner or CI, you can reject bad parameters before the job starts.
@job(
    name="bi-export",
    params_schema={
        "type": "object",
        "required": ["target_type", "configs"],
        "properties": {
            "target_type": {"type": "string", "enum": ["bigquery", "azure_sql"]},
            "configs": {"type": "array", "items": {"type": "object"}},
            "batch_size": {"type": "integer", "minimum": 1, "default": 500},
        },
    },
)
async def bi_export(ctx: JobContext):
    ...

Complete example

from craft_easy_jobs import job, chain, ChainStep, JobContext

@job(
    name="settlement",
    description="Calculate settlement for a billing period",
    timeout_seconds=1800,
    retries=2,
    per_tenant=True,
    params_schema={
        "period": {"type": "string", "pattern": "^\\d{4}-\\d{2}$"},
    },
)
async def settlement(ctx: JobContext):
    period = ctx.params["period"]
    await ctx.log(f"Starting settlement for tenant={ctx.tenant_id} period={period}")

    from my_models import Tenant, Settlement
    tenants = await Tenant.find(Tenant.is_active == True).to_list()

    settled = 0
    for i, tenant in enumerate(tenants):
        if ctx.should_stop():
            await ctx.warning(f"Stopped at {settled}/{len(tenants)}")
            break
        try:
            s = Settlement(tenant_id=tenant.id, period=period, amount=await calc(tenant, period))
            await s.insert()
            settled += 1
            await ctx.progress(
                f"Settled {i+1}/{len(tenants)}",
                percentage=int((i+1) / len(tenants) * 100),
            )
        except Exception as e:
            await ctx.error(f"Failed for {tenant.name}: {e}")

    await ctx.log(f"Done: {settled}/{len(tenants)}")
    return {"period": period, "settled": settled, "total": len(tenants)}


# Define the month-end chain — one declarative source of truth
month_end = chain(
    name="month-end",
    description="Month-end closing workflow",
    steps=[
        ChainStep(job="settlement"),
        ChainStep(job="invoicing", depends_on=["settlement"]),
        ChainStep(job="reporting", depends_on=["invoicing"], on_failure="continue"),
    ],
)

Run with:

craft-easy-job run settlement \
  --tenant-id tenant_664abc \
  --params '{"period": "2026-03"}'