Skip to content

Craft Easy Jobs — Batch Job Specification

Version: 1.0 Date: 2026-03-28 Related: specification.md, financial-ecosystem-specification.md


Contents

  1. Vision
  2. Architecture
  3. Job definition
  4. Job chains and dependencies
  5. Scheduling
  6. Cloud deployment
  7. Built-in jobs
  8. Technical stack
  9. Implementation plan

1. Vision

A batch job framework that integrates with Craft Easy API. Define jobs as Python functions, chain them with dependencies, schedule them, and deploy to Cloud Run Jobs (GCP) or Azure Container Apps Jobs.

# Run a job manually
craft-easy-job run settlement --period 2026-03

# List scheduled jobs
craft-easy-job schedule list

# Run a chain (settlement → invoicing → export)
craft-easy-job chain run month-end

Design principles

Principle Rule
Same models Jobs import from craft-easy-api — same database, same models
Jobs are functions A job is an async Python function with a decorator
Chains are declarative Define dependencies, the runner handles order
Cloud-native Designed for Cloud Run Jobs / Azure Container Apps Jobs
Observable Every run logged with status, duration, output

2. Architecture

┌──────────────────────────────────────────────────────┐
│                  craft-easy-jobs                      │
│                                                       │
│  ┌─ Job Registry ──────────────────────────────────┐ │
│  │  settlement_job                                  │ │
│  │  invoice_generation_job                          │ │
│  │  bi_export_job                                   │ │
│  │  gdpr_cleanup_job                                │ │
│  │  accounting_export_job                           │ │
│  │  ... (custom project jobs)                       │ │
│  └──────────────────────────────────────────────────┘ │
│                                                       │
│  ┌─ Chain Engine ──────────────────────────────────┐ │
│  │  month-end: settlement → invoicing → export     │ │
│  │  daily-sync: bi-export → accounting-export      │ │
│  └──────────────────────────────────────────────────┘ │
│                                                       │
│  ┌─ Scheduler ─────────────────────────────────────┐ │
│  │  Cron expressions → triggers jobs/chains         │ │
│  │  Cloud Scheduler (GCP) / Azure Timer Trigger     │ │
│  └──────────────────────────────────────────────────┘ │
│                                                       │
│  ┌─ Job Runner ────────────────────────────────────┐ │
│  │  Connects to DB, runs job, logs result           │ │
│  │  Handles retries, timeouts, error reporting      │ │
│  └──────────────────────────────────────────────────┘ │
└──────────────────────┬────────────────────────────────┘
┌──────────────────────────────────────────────────────┐
│              craft-easy-api (models + services)       │
│              MongoDB (same database as API)           │
└──────────────────────────────────────────────────────┘

3. Job Definition

A job is a decorated async function

from craft_easy_jobs import job, JobContext

@job(
    name="settlement",
    description="Calculate settlement for a billing period",
    timeout_seconds=600,
    retries=2,
)
async def settlement_job(ctx: JobContext):
    """Run settlement for all active tenants."""
    period = ctx.params.get("period")  # "2026-03"

    tenants = await Tenant.find(Tenant.is_enabled == True).to_list()

    for tenant in tenants:
        ctx.log(f"Processing tenant: {tenant.name}")
        await SettlementService.run(tenant.id, period)
        ctx.progress(f"Completed {tenant.name}")

    return {"tenants_processed": len(tenants)}

JobContext

class JobContext:
    """Context available to every job during execution."""

    # Parameters passed to the job
    params: dict[str, Any]

    # The job run record (for logging progress)
    run_id: str
    job_name: str

    # Tenant scope (if job runs per-tenant)
    tenant_id: Optional[str] = None

    # Logging
    def log(self, message: str) -> None: ...
    def progress(self, message: str, percentage: Optional[int] = None) -> None: ...
    def warning(self, message: str) -> None: ...
    def error(self, message: str) -> None: ...

    # Check if job should stop (graceful shutdown)
    def should_stop(self) -> bool: ...

Tenant-scoped jobs

Jobs can run once for the whole system or once per tenant:

@job(name="bi-export", per_tenant=True)
async def bi_export_job(ctx: JobContext):
    """Runs once per tenant that has BI export enabled."""
    tenant = await Tenant.get(ctx.tenant_id)
    await BIExportService.export(tenant)

When per_tenant=True, the runner automatically iterates over all tenants and calls the job for each one. Failures in one tenant don't stop others.


4. Job Chains and Dependencies

Chain definition

from craft_easy_jobs import chain, ChainStep

month_end = chain(
    name="month-end",
    description="Complete month-end processing",
    steps=[
        ChainStep(job="settlement", params={"period": "{period}"}),
        ChainStep(job="invoice-generation", depends_on=["settlement"]),
        ChainStep(job="accounting-export", depends_on=["invoice-generation"]),
        ChainStep(job="bi-export", depends_on=["settlement"]),  # Parallel with invoicing
    ],
)

Execution order

settlement
    ├──────────────────┐
    ▼                  ▼
invoice-generation   bi-export        ← These run in parallel
accounting-export

The chain engine: 1. Runs jobs with no dependencies first 2. When a job completes, checks which jobs have all dependencies satisfied 3. Runs those in parallel 4. Continues until all steps complete or one fails

Failure handling

Strategy Behavior
on_failure="stop" (default) Chain stops, remaining jobs skipped
on_failure="continue" Chain continues, failed job logged
on_failure="retry" Retry the failed job (up to retries count)
ChainStep(
    job="settlement",
    on_failure="retry",
    retries=3,
    retry_delay_seconds=60,
)

5. API-Managed Scheduling

Everything is managed through API endpoints — schedules, dependencies, chains, parameters. System owner, tenants, and partners can all manage their jobs through the admin UI.

5.1 API Endpoints (in craft-easy-api)

All job management endpoints live in craft-easy-api (requires JOBS_ENABLED=true in settings):

Job Registry (read-only — defined in code):
GET    /jobs/registry                              — List all available jobs
GET    /jobs/registry/{job_name}                    — Get job detail (params, description)

Schedules (full CRUD):
GET    /jobs/schedules                             — List all schedules
POST   /jobs/schedules                             — Create schedule
GET    /jobs/schedules/{id}                        — Get schedule
PATCH  /jobs/schedules/{id}                        — Update (cron, params, enabled, dependencies)
DELETE /jobs/schedules/{id}                        — Delete schedule

Chains (full CRUD):
GET    /jobs/chains                                — List all chains
POST   /jobs/chains                                — Create chain
GET    /jobs/chains/{id}                           — Get chain (with step graph)
PATCH  /jobs/chains/{id}                           — Update (add/remove/reorder steps)
DELETE /jobs/chains/{id}                           — Delete chain

Manual Execution:
POST   /jobs/run                                   — Run a job immediately
POST   /jobs/chains/{id}/run                       — Run a chain immediately

Run History:
GET    /jobs/runs                                  — List runs (filter by job, status, date)
GET    /jobs/runs/{id}                             — Get run detail (status, logs, result)
POST   /jobs/runs/{id}/cancel                      — Cancel a running job
GET    /jobs/runs/{id}/logs                        — Stream logs (SSE)

5.2 Schedule model

class JobSchedule(BaseDocument):
    """Scheduled job or chain — managed via API."""
    name: str  # "Nightly BI export"
    description: Optional[str] = None

    # What to run
    type: str  # "job" | "chain"
    target: str  # Job name or chain ID

    # When to run
    cron: str  # "0 2 * * *"
    timezone: str = "Europe/Stockholm"
    is_enabled: bool = True

    # Parameters (passed to job at runtime)
    params: dict = {}
    # Supports template variables:
    # {today} → 2026-03-28
    # {yesterday} → 2026-03-27
    # {month} → 2026-03
    # {previous_month} → 2026-02
    allow_param_override: bool = False  # If true, manual runs can override params

    # Dependencies — other schedules that must complete first
    depends_on_schedules: list[PydanticObjectId] = []
    # E.g. "BI export must run after settlement completes"
    # The scheduler checks that dependent schedule's last run succeeded
    # before triggering this one.

    dependency_mode: str = "same_day"
    # "same_day" — dependency must have completed successfully today
    # "last_run" — dependency's last run (any day) must be successful
    # "none" — ignore dependencies (override)

    # Scope
    tenant_id: Optional[PydanticObjectId] = None  # None = system-wide
    per_tenant: bool = False  # True = runs once per enabled tenant

    # Status (readonly)
    last_run_id: Optional[PydanticObjectId] = Field(
        default=None, json_schema_extra={"readonly": True}
    )
    last_run_at: Optional[datetime] = Field(
        default=None, json_schema_extra={"readonly": True}
    )
    last_run_status: Optional[str] = Field(
        default=None, json_schema_extra={"readonly": True}
    )
    next_run_at: Optional[datetime] = Field(
        default=None, json_schema_extra={"readonly": True}
    )

    class Settings:
        name = "job_schedules"
        indexes = [
            [("is_enabled", 1), ("next_run_at", 1)],
            [("tenant_id", 1)],
        ]

5.3 Chain model (API-managed)

Chains are stored in the database, not just in code — so they can be created and modified through the admin UI:

class JobChain(BaseDocument):
    """A chain of jobs with dependencies — managed via API."""
    name: str  # "month-end"
    description: Optional[str] = None
    is_system: bool = False  # True = built-in, cannot be deleted (but can be modified)

    steps: list[ChainStep]

    # Default failure mode for all steps (can be overridden per step)
    default_on_failure: str = "stop"  # "stop" | "continue" | "retry"

    class Settings:
        name = "job_chains"


class ChainStep(BaseModel):
    """A step in a job chain."""
    job: str  # Job name from registry
    params: dict = {}  # Parameters for this step

    # Dependencies within the chain
    depends_on: list[str] = []  # Job names that must complete before this step

    # Failure handling (overrides chain default)
    on_failure: Optional[str] = None  # "stop" | "continue" | "retry"
    retries: int = 0
    retry_delay_seconds: int = 60

5.4 Job run history

class JobRun(BaseDocument):
    """Record of a job execution."""
    job_name: str
    chain_id: Optional[PydanticObjectId] = None
    chain_run_id: Optional[str] = None  # Groups all runs in a chain execution
    schedule_id: Optional[PydanticObjectId] = None

    status: str  # "pending" | "running" | "completed" | "failed" | "cancelled"

    # Who triggered it
    triggered_by: str  # "schedule" | "manual" | "chain" | "dependency"
    triggered_by_user: Optional[PydanticObjectId] = None

    # Timing
    started_at: datetime
    completed_at: Optional[datetime] = None
    duration_seconds: Optional[float] = None

    # Context
    tenant_id: Optional[PydanticObjectId] = None
    params: dict = {}

    # Result
    result: Optional[dict] = None
    error: Optional[str] = None

    # Logs
    logs: list[JobLogEntry] = []

    class Settings:
        name = "job_runs"
        indexes = [
            [("job_name", 1), ("started_at", -1)],
            [("status", 1)],
            [("schedule_id", 1), ("started_at", -1)],
            [("chain_run_id", 1)],
        ]


class JobLogEntry(BaseModel):
    """A log line from a job execution."""
    timestamp: datetime
    level: str  # "info" | "progress" | "warning" | "error"
    message: str
    percentage: Optional[int] = None  # 0-100 for progress tracking

5.5 Job registry endpoint

Jobs are defined in code (via @job decorator) but the API exposes what's available so the admin UI can show them:

# GET /jobs/registry response:
{
    "jobs": [
        {
            "name": "settlement",
            "description": "Calculate settlement for a billing period",
            "params": {
                "period": {"type": "string", "required": true, "description": "YYYY-MM"},
                "tenant_id": {"type": "string", "required": false, "description": "Specific tenant"}
            },
            "per_tenant": true,
            "timeout_seconds": 600,
            "retries": 2
        },
        {
            "name": "bi-export",
            "description": "Export data to BigQuery/Azure SQL",
            "params": {
                "target": {"type": "string", "required": false, "enum": ["bigquery", "azure_sql"]}
            },
            "per_tenant": true,
            "timeout_seconds": 1800
        }
    ]
}

This allows the admin UI to: - Show a dropdown of available jobs when creating a schedule - Render a form with the correct parameter fields - Validate parameters before submitting

5.6 Manual run with parameters

POST /jobs/run
{
    "job": "settlement",
    "params": {
        "period": "2026-03",
        "tenant_id": "abc123"
    }
}

→ Response:
{
    "run_id": "xyz789",
    "status": "pending",
    "message": "Job queued for execution"
}

5.7 Dependency example via admin UI

Admin UI: Jobs → Schedules → Create

Name:           [Daily BI Export              ]
Job:            [bi-export ▼]                    ← Dropdown from registry
Cron:           [0 3 * * *                    ]  ← Daily 3 AM
Timezone:       [Europe/Stockholm ▼]
Parameters:     target = [bigquery ▼]            ← Rendered from job's param schema

Dependencies:
  ☑ Must run after: [Nightly Settlement ▼]       ← Dropdown of other schedules
  Mode: [Same day ▼]                             ← Must have succeeded today

[Save]

This creates a schedule where BI export only runs if settlement has already completed successfully the same day. If settlement hasn't run or failed, BI export is skipped and flagged.


6. Cloud Deployment

GCP Cloud Run Jobs

# cloudbuild.yaml — deploy as Cloud Run Job
steps:
  - name: gcr.io/cloud-builders/docker
    args: ['build', '-t', 'gcr.io/$PROJECT_ID/craft-easy-jobs', '.']
  - name: gcr.io/google.com/cloudsdktool/cloud-sdk
    args:
      - gcloud
      - run
      - jobs
      - create
      - craft-easy-settlement
      - --image=gcr.io/$PROJECT_ID/craft-easy-jobs
      - --command=craft-easy-job,run,settlement
      - --region=europe-west1
      - --memory=2Gi
      - --task-timeout=30m

Cloud Scheduler triggers:

# Monthly settlement on the 1st at 3 AM
gcloud scheduler jobs create http settlement-monthly \
  --uri="https://europe-west1-run.googleapis.com/apis/run.googleapis.com/v1/..." \
  --schedule="0 3 1 * *" \
  --time-zone="Europe/Stockholm"

Azure Container Apps Jobs

# Deploy as Azure Container Apps Job
az containerapp job create \
  --name craft-easy-settlement \
  --resource-group rg-craft-easy \
  --image craft-easy-jobs:latest \
  --trigger-type Schedule \
  --cron-expression "0 3 1 * *" \
  --cpu 1.0 \
  --memory 2Gi \
  --command "craft-easy-job" "run" "settlement"

Same Docker image for both

# Dockerfile.jobs
FROM python:3.12-slim
RUN groupadd -r craft && useradd -r -g craft craft
WORKDIR /app
COPY pyproject.toml README.md ./
COPY src/ src/
RUN pip install --no-cache-dir .
USER craft

# Entrypoint is the CLI — command determines which job runs
ENTRYPOINT ["craft-easy-job"]
# Run settlement:
docker run craft-easy-jobs run settlement --period 2026-03

# Run chain:
docker run craft-easy-jobs chain run month-end --period 2026-03

# List jobs:
docker run craft-easy-jobs list

7. Built-in Jobs

Jobs that come with the Craft Easy ecosystem (from craft-easy-api services):

Job Description Schedule Per-tenant
settlement Calculate revenue splits and payables Monthly 1st, 3 AM Yes
invoice-generation Generate recurring service fee invoices Monthly 1st, 4 AM Yes
invoice-reminders Send reminders for overdue invoices Daily, 8 AM Yes
collection-escalation Escalate claims (reminder → collection → enforcement) Daily, 9 AM Yes
bi-export Export data to BigQuery/Azure SQL Daily, 2 AM Yes
accounting-export Sync journal entries to external accounting systems Daily, 4 AM Per entity
gdpr-cleanup Auto-depersonalize expired personal data Daily, 1 AM Yes
token-cleanup Remove expired auth tokens Daily, 0 AM No
audit-log-archive Archive old audit entries Monthly, 2 AM No

Built-in chains

Chain Steps Schedule
month-end settlement → invoice-generation → accounting-export Monthly 1st
daily-maintenance token-cleanup → gdpr-cleanup → bi-export Daily 1 AM
collection-cycle invoice-reminders → collection-escalation Daily 8 AM

8. Technical Stack

Component Technology Why
CLI Click Standard Python CLI framework
Job registry Python decorators Simple, no magic
Chain engine Custom (async, topological sort) Lightweight, no external deps
Schedule storage MongoDB (via Beanie) Same DB as API
Local scheduler APScheduler 4 For dev/small deployments
Cloud scheduler Cloud Scheduler / Azure Timer For production
Locking MongoDB advisory locks Prevent duplicate runs

Package structure

craft-easy-jobs/
├── pyproject.toml
├── Dockerfile
├── src/
│   └── craft_easy_jobs/
│       ├── __init__.py         # @job, @chain decorators
│       ├── cli.py              # Click CLI: craft-easy-job run/schedule/chain
│       ├── runner.py           # Job execution engine
│       ├── chain_engine.py     # Dependency resolution + parallel execution
│       ├── scheduler.py        # Local APScheduler wrapper
│       ├── context.py          # JobContext
│       ├── locking.py          # MongoDB-based distributed locks
│       ├── models.py           # JobSchedule, JobRun, JobLogEntry
│       └── builtin/            # Built-in jobs
│           ├── settlement.py
│           ├── invoicing.py
│           ├── bi_export.py
│           ├── accounting_export.py
│           ├── gdpr_cleanup.py
│           └── maintenance.py
└── tests/

9. Implementation Plan

Phase What Weeks Depends on
1 Core: @job decorator, JobContext, runner, CLI (run/list) 1 craft-easy-api
2 Chain engine (dependency resolution, parallel execution) 1 Phase 1
3 Scheduling (MongoDB storage, APScheduler for local) 1 Phase 1
4 Job run history + locking 1 Phase 1
5 Cloud integration (Cloud Run Jobs + Azure Container Apps Jobs) 1 Phase 1
6 Built-in jobs (as financial modules are built) Ongoing Per module

Total: ~5 weeks for the framework, then built-in jobs added as modules are completed.


Ecosystem overview

PyPI packages:
┌──────────────────────────────────────┐
│  craft-easy-api      (foundation)    │
│  pip install craft-easy-api          │
└──────────────┬───────────────────────┘
               │ depends on
    ┌──────────┴──────────┐
    │                     │
┌───▼──────────────┐  ┌──▼────────────────┐
│ craft-easy-jobs  │  │ craft-easy-admin   │
│ pip install ...  │  │ (React/Expo app)   │
│ Batch processing │  │ Universal admin UI │
└──────────────────┘  └───────────────────┘

GitHub repos (easy-software-system/):
├── craft-easy-api          ✅ Created + scaffolded
├── craft-easy-admin        ✅ Created (empty)
├── craft-easy-jobs         ✅ Created (empty)
└── craft-easy-template     ✅ Created (empty)

PyPI:
├── craft-easy-api    v0.1.0  ✅ Published
├── craft-easy-admin  v0.0.1  ✅ Reserved (npm)
└── craft-easy-jobs   v0.0.1  ✅ Reserved

Airpark: val av jobbramverk

Beslut: airpark-batch använder craft-easy-api:s JobScheduler, inte craft-easy-jobs.

Motivering

craft-easy-api har ett inbyggt jobbramverk med: - JobSchedule (cron, per-tenant, beroendekedjor) - JobRun (körningslogg med status, duration, output) - Distribuerat lås (MongoDB-baserat) — förhindrar dubbla körningar - REST API för att administrera jobb via admin-appen - Integrerat med Beanie/MongoDB — samma modeller som API:n

craft-easy-jobs är ett alternativt, dekorator-baserat CLI-ramverk som: - Körs fristående via craft-easy-job run <name> - Har enklare schemaläggning - Saknar REST API för administration - Bättre lämpad för enklare projekt utan admin-app

airpark-batch valde craft-easy-api:s system eftersom: 1. Admin-appen behöver kunna administrera jobb (starta, stoppa, se loggar) 2. Jobb behöver köras per tenant (21 jobb × N tenants) 3. Beroendekedjor (month-end: settlement → invoices → export) hanteras deklarativt 4. Samma databas och modeller som airpark-api — ingen extra konfiguration

craft-easy-jobs kan fortfarande användas parallellt för enklare engångsjobb eller debugging, men det är inte det primära jobbsystemet för Airpark.