Craft Easy Jobs — Batch Job Specification¶
Version: 1.0 Date: 2026-03-28 Related: specification.md, financial-ecosystem-specification.md
Contents¶
- Vision
- Architecture
- Job definition
- Job chains and dependencies
- Scheduling
- Cloud deployment
- Built-in jobs
- Technical stack
- Implementation plan
1. Vision¶
A batch job framework that integrates with Craft Easy API. Define jobs as Python functions, chain them with dependencies, schedule them, and deploy to Cloud Run Jobs (GCP) or Azure Container Apps Jobs.
# Run a job manually
craft-easy-job run settlement --period 2026-03
# List scheduled jobs
craft-easy-job schedule list
# Run a chain (settlement → invoicing → export)
craft-easy-job chain run month-end
Design principles¶
| Principle | Rule |
|---|---|
| Same models | Jobs import from craft-easy-api — same database, same models |
| Jobs are functions | A job is an async Python function with a decorator |
| Chains are declarative | Define dependencies, the runner handles order |
| Cloud-native | Designed for Cloud Run Jobs / Azure Container Apps Jobs |
| Observable | Every run logged with status, duration, output |
2. Architecture¶
┌──────────────────────────────────────────────────────┐
│ craft-easy-jobs │
│ │
│ ┌─ Job Registry ──────────────────────────────────┐ │
│ │ settlement_job │ │
│ │ invoice_generation_job │ │
│ │ bi_export_job │ │
│ │ gdpr_cleanup_job │ │
│ │ accounting_export_job │ │
│ │ ... (custom project jobs) │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ ┌─ Chain Engine ──────────────────────────────────┐ │
│ │ month-end: settlement → invoicing → export │ │
│ │ daily-sync: bi-export → accounting-export │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ ┌─ Scheduler ─────────────────────────────────────┐ │
│ │ Cron expressions → triggers jobs/chains │ │
│ │ Cloud Scheduler (GCP) / Azure Timer Trigger │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ ┌─ Job Runner ────────────────────────────────────┐ │
│ │ Connects to DB, runs job, logs result │ │
│ │ Handles retries, timeouts, error reporting │ │
│ └──────────────────────────────────────────────────┘ │
└──────────────────────┬────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ craft-easy-api (models + services) │
│ MongoDB (same database as API) │
└──────────────────────────────────────────────────────┘
3. Job Definition¶
A job is a decorated async function¶
from craft_easy_jobs import job, JobContext
@job(
name="settlement",
description="Calculate settlement for a billing period",
timeout_seconds=600,
retries=2,
)
async def settlement_job(ctx: JobContext):
"""Run settlement for all active tenants."""
period = ctx.params.get("period") # "2026-03"
tenants = await Tenant.find(Tenant.is_enabled == True).to_list()
for tenant in tenants:
ctx.log(f"Processing tenant: {tenant.name}")
await SettlementService.run(tenant.id, period)
ctx.progress(f"Completed {tenant.name}")
return {"tenants_processed": len(tenants)}
JobContext¶
class JobContext:
"""Context available to every job during execution."""
# Parameters passed to the job
params: dict[str, Any]
# The job run record (for logging progress)
run_id: str
job_name: str
# Tenant scope (if job runs per-tenant)
tenant_id: Optional[str] = None
# Logging
def log(self, message: str) -> None: ...
def progress(self, message: str, percentage: Optional[int] = None) -> None: ...
def warning(self, message: str) -> None: ...
def error(self, message: str) -> None: ...
# Check if job should stop (graceful shutdown)
def should_stop(self) -> bool: ...
Tenant-scoped jobs¶
Jobs can run once for the whole system or once per tenant:
@job(name="bi-export", per_tenant=True)
async def bi_export_job(ctx: JobContext):
"""Runs once per tenant that has BI export enabled."""
tenant = await Tenant.get(ctx.tenant_id)
await BIExportService.export(tenant)
When per_tenant=True, the runner automatically iterates over all tenants and calls the job for each one. Failures in one tenant don't stop others.
4. Job Chains and Dependencies¶
Chain definition¶
from craft_easy_jobs import chain, ChainStep
month_end = chain(
name="month-end",
description="Complete month-end processing",
steps=[
ChainStep(job="settlement", params={"period": "{period}"}),
ChainStep(job="invoice-generation", depends_on=["settlement"]),
ChainStep(job="accounting-export", depends_on=["invoice-generation"]),
ChainStep(job="bi-export", depends_on=["settlement"]), # Parallel with invoicing
],
)
Execution order¶
settlement
│
├──────────────────┐
▼ ▼
invoice-generation bi-export ← These run in parallel
│
▼
accounting-export
The chain engine: 1. Runs jobs with no dependencies first 2. When a job completes, checks which jobs have all dependencies satisfied 3. Runs those in parallel 4. Continues until all steps complete or one fails
Failure handling¶
| Strategy | Behavior |
|---|---|
on_failure="stop" (default) |
Chain stops, remaining jobs skipped |
on_failure="continue" |
Chain continues, failed job logged |
on_failure="retry" |
Retry the failed job (up to retries count) |
5. API-Managed Scheduling¶
Everything is managed through API endpoints — schedules, dependencies, chains, parameters. System owner, tenants, and partners can all manage their jobs through the admin UI.
5.1 API Endpoints (in craft-easy-api)¶
All job management endpoints live in craft-easy-api (requires JOBS_ENABLED=true in settings):
Job Registry (read-only — defined in code):
GET /jobs/registry — List all available jobs
GET /jobs/registry/{job_name} — Get job detail (params, description)
Schedules (full CRUD):
GET /jobs/schedules — List all schedules
POST /jobs/schedules — Create schedule
GET /jobs/schedules/{id} — Get schedule
PATCH /jobs/schedules/{id} — Update (cron, params, enabled, dependencies)
DELETE /jobs/schedules/{id} — Delete schedule
Chains (full CRUD):
GET /jobs/chains — List all chains
POST /jobs/chains — Create chain
GET /jobs/chains/{id} — Get chain (with step graph)
PATCH /jobs/chains/{id} — Update (add/remove/reorder steps)
DELETE /jobs/chains/{id} — Delete chain
Manual Execution:
POST /jobs/run — Run a job immediately
POST /jobs/chains/{id}/run — Run a chain immediately
Run History:
GET /jobs/runs — List runs (filter by job, status, date)
GET /jobs/runs/{id} — Get run detail (status, logs, result)
POST /jobs/runs/{id}/cancel — Cancel a running job
GET /jobs/runs/{id}/logs — Stream logs (SSE)
5.2 Schedule model¶
class JobSchedule(BaseDocument):
"""Scheduled job or chain — managed via API."""
name: str # "Nightly BI export"
description: Optional[str] = None
# What to run
type: str # "job" | "chain"
target: str # Job name or chain ID
# When to run
cron: str # "0 2 * * *"
timezone: str = "Europe/Stockholm"
is_enabled: bool = True
# Parameters (passed to job at runtime)
params: dict = {}
# Supports template variables:
# {today} → 2026-03-28
# {yesterday} → 2026-03-27
# {month} → 2026-03
# {previous_month} → 2026-02
allow_param_override: bool = False # If true, manual runs can override params
# Dependencies — other schedules that must complete first
depends_on_schedules: list[PydanticObjectId] = []
# E.g. "BI export must run after settlement completes"
# The scheduler checks that dependent schedule's last run succeeded
# before triggering this one.
dependency_mode: str = "same_day"
# "same_day" — dependency must have completed successfully today
# "last_run" — dependency's last run (any day) must be successful
# "none" — ignore dependencies (override)
# Scope
tenant_id: Optional[PydanticObjectId] = None # None = system-wide
per_tenant: bool = False # True = runs once per enabled tenant
# Status (readonly)
last_run_id: Optional[PydanticObjectId] = Field(
default=None, json_schema_extra={"readonly": True}
)
last_run_at: Optional[datetime] = Field(
default=None, json_schema_extra={"readonly": True}
)
last_run_status: Optional[str] = Field(
default=None, json_schema_extra={"readonly": True}
)
next_run_at: Optional[datetime] = Field(
default=None, json_schema_extra={"readonly": True}
)
class Settings:
name = "job_schedules"
indexes = [
[("is_enabled", 1), ("next_run_at", 1)],
[("tenant_id", 1)],
]
5.3 Chain model (API-managed)¶
Chains are stored in the database, not just in code — so they can be created and modified through the admin UI:
class JobChain(BaseDocument):
"""A chain of jobs with dependencies — managed via API."""
name: str # "month-end"
description: Optional[str] = None
is_system: bool = False # True = built-in, cannot be deleted (but can be modified)
steps: list[ChainStep]
# Default failure mode for all steps (can be overridden per step)
default_on_failure: str = "stop" # "stop" | "continue" | "retry"
class Settings:
name = "job_chains"
class ChainStep(BaseModel):
"""A step in a job chain."""
job: str # Job name from registry
params: dict = {} # Parameters for this step
# Dependencies within the chain
depends_on: list[str] = [] # Job names that must complete before this step
# Failure handling (overrides chain default)
on_failure: Optional[str] = None # "stop" | "continue" | "retry"
retries: int = 0
retry_delay_seconds: int = 60
5.4 Job run history¶
class JobRun(BaseDocument):
"""Record of a job execution."""
job_name: str
chain_id: Optional[PydanticObjectId] = None
chain_run_id: Optional[str] = None # Groups all runs in a chain execution
schedule_id: Optional[PydanticObjectId] = None
status: str # "pending" | "running" | "completed" | "failed" | "cancelled"
# Who triggered it
triggered_by: str # "schedule" | "manual" | "chain" | "dependency"
triggered_by_user: Optional[PydanticObjectId] = None
# Timing
started_at: datetime
completed_at: Optional[datetime] = None
duration_seconds: Optional[float] = None
# Context
tenant_id: Optional[PydanticObjectId] = None
params: dict = {}
# Result
result: Optional[dict] = None
error: Optional[str] = None
# Logs
logs: list[JobLogEntry] = []
class Settings:
name = "job_runs"
indexes = [
[("job_name", 1), ("started_at", -1)],
[("status", 1)],
[("schedule_id", 1), ("started_at", -1)],
[("chain_run_id", 1)],
]
class JobLogEntry(BaseModel):
"""A log line from a job execution."""
timestamp: datetime
level: str # "info" | "progress" | "warning" | "error"
message: str
percentage: Optional[int] = None # 0-100 for progress tracking
5.5 Job registry endpoint¶
Jobs are defined in code (via @job decorator) but the API exposes what's available so the admin UI can show them:
# GET /jobs/registry response:
{
"jobs": [
{
"name": "settlement",
"description": "Calculate settlement for a billing period",
"params": {
"period": {"type": "string", "required": true, "description": "YYYY-MM"},
"tenant_id": {"type": "string", "required": false, "description": "Specific tenant"}
},
"per_tenant": true,
"timeout_seconds": 600,
"retries": 2
},
{
"name": "bi-export",
"description": "Export data to BigQuery/Azure SQL",
"params": {
"target": {"type": "string", "required": false, "enum": ["bigquery", "azure_sql"]}
},
"per_tenant": true,
"timeout_seconds": 1800
}
]
}
This allows the admin UI to: - Show a dropdown of available jobs when creating a schedule - Render a form with the correct parameter fields - Validate parameters before submitting
5.6 Manual run with parameters¶
POST /jobs/run
{
"job": "settlement",
"params": {
"period": "2026-03",
"tenant_id": "abc123"
}
}
→ Response:
{
"run_id": "xyz789",
"status": "pending",
"message": "Job queued for execution"
}
5.7 Dependency example via admin UI¶
Admin UI: Jobs → Schedules → Create
Name: [Daily BI Export ]
Job: [bi-export ▼] ← Dropdown from registry
Cron: [0 3 * * * ] ← Daily 3 AM
Timezone: [Europe/Stockholm ▼]
Parameters: target = [bigquery ▼] ← Rendered from job's param schema
Dependencies:
☑ Must run after: [Nightly Settlement ▼] ← Dropdown of other schedules
Mode: [Same day ▼] ← Must have succeeded today
[Save]
This creates a schedule where BI export only runs if settlement has already completed successfully the same day. If settlement hasn't run or failed, BI export is skipped and flagged.
6. Cloud Deployment¶
GCP Cloud Run Jobs¶
# cloudbuild.yaml — deploy as Cloud Run Job
steps:
- name: gcr.io/cloud-builders/docker
args: ['build', '-t', 'gcr.io/$PROJECT_ID/craft-easy-jobs', '.']
- name: gcr.io/google.com/cloudsdktool/cloud-sdk
args:
- gcloud
- run
- jobs
- create
- craft-easy-settlement
- --image=gcr.io/$PROJECT_ID/craft-easy-jobs
- --command=craft-easy-job,run,settlement
- --region=europe-west1
- --memory=2Gi
- --task-timeout=30m
Cloud Scheduler triggers:
# Monthly settlement on the 1st at 3 AM
gcloud scheduler jobs create http settlement-monthly \
--uri="https://europe-west1-run.googleapis.com/apis/run.googleapis.com/v1/..." \
--schedule="0 3 1 * *" \
--time-zone="Europe/Stockholm"
Azure Container Apps Jobs¶
# Deploy as Azure Container Apps Job
az containerapp job create \
--name craft-easy-settlement \
--resource-group rg-craft-easy \
--image craft-easy-jobs:latest \
--trigger-type Schedule \
--cron-expression "0 3 1 * *" \
--cpu 1.0 \
--memory 2Gi \
--command "craft-easy-job" "run" "settlement"
Same Docker image for both¶
# Dockerfile.jobs
FROM python:3.12-slim
RUN groupadd -r craft && useradd -r -g craft craft
WORKDIR /app
COPY pyproject.toml README.md ./
COPY src/ src/
RUN pip install --no-cache-dir .
USER craft
# Entrypoint is the CLI — command determines which job runs
ENTRYPOINT ["craft-easy-job"]
# Run settlement:
docker run craft-easy-jobs run settlement --period 2026-03
# Run chain:
docker run craft-easy-jobs chain run month-end --period 2026-03
# List jobs:
docker run craft-easy-jobs list
7. Built-in Jobs¶
Jobs that come with the Craft Easy ecosystem (from craft-easy-api services):
| Job | Description | Schedule | Per-tenant |
|---|---|---|---|
settlement |
Calculate revenue splits and payables | Monthly 1st, 3 AM | Yes |
invoice-generation |
Generate recurring service fee invoices | Monthly 1st, 4 AM | Yes |
invoice-reminders |
Send reminders for overdue invoices | Daily, 8 AM | Yes |
collection-escalation |
Escalate claims (reminder → collection → enforcement) | Daily, 9 AM | Yes |
bi-export |
Export data to BigQuery/Azure SQL | Daily, 2 AM | Yes |
accounting-export |
Sync journal entries to external accounting systems | Daily, 4 AM | Per entity |
gdpr-cleanup |
Auto-depersonalize expired personal data | Daily, 1 AM | Yes |
token-cleanup |
Remove expired auth tokens | Daily, 0 AM | No |
audit-log-archive |
Archive old audit entries | Monthly, 2 AM | No |
Built-in chains¶
| Chain | Steps | Schedule |
|---|---|---|
month-end |
settlement → invoice-generation → accounting-export | Monthly 1st |
daily-maintenance |
token-cleanup → gdpr-cleanup → bi-export | Daily 1 AM |
collection-cycle |
invoice-reminders → collection-escalation | Daily 8 AM |
8. Technical Stack¶
| Component | Technology | Why |
|---|---|---|
| CLI | Click | Standard Python CLI framework |
| Job registry | Python decorators | Simple, no magic |
| Chain engine | Custom (async, topological sort) | Lightweight, no external deps |
| Schedule storage | MongoDB (via Beanie) | Same DB as API |
| Local scheduler | APScheduler 4 | For dev/small deployments |
| Cloud scheduler | Cloud Scheduler / Azure Timer | For production |
| Locking | MongoDB advisory locks | Prevent duplicate runs |
Package structure¶
craft-easy-jobs/
├── pyproject.toml
├── Dockerfile
├── src/
│ └── craft_easy_jobs/
│ ├── __init__.py # @job, @chain decorators
│ ├── cli.py # Click CLI: craft-easy-job run/schedule/chain
│ ├── runner.py # Job execution engine
│ ├── chain_engine.py # Dependency resolution + parallel execution
│ ├── scheduler.py # Local APScheduler wrapper
│ ├── context.py # JobContext
│ ├── locking.py # MongoDB-based distributed locks
│ ├── models.py # JobSchedule, JobRun, JobLogEntry
│ └── builtin/ # Built-in jobs
│ ├── settlement.py
│ ├── invoicing.py
│ ├── bi_export.py
│ ├── accounting_export.py
│ ├── gdpr_cleanup.py
│ └── maintenance.py
└── tests/
9. Implementation Plan¶
| Phase | What | Weeks | Depends on |
|---|---|---|---|
| 1 | Core: @job decorator, JobContext, runner, CLI (run/list) | 1 | craft-easy-api |
| 2 | Chain engine (dependency resolution, parallel execution) | 1 | Phase 1 |
| 3 | Scheduling (MongoDB storage, APScheduler for local) | 1 | Phase 1 |
| 4 | Job run history + locking | 1 | Phase 1 |
| 5 | Cloud integration (Cloud Run Jobs + Azure Container Apps Jobs) | 1 | Phase 1 |
| 6 | Built-in jobs (as financial modules are built) | Ongoing | Per module |
Total: ~5 weeks for the framework, then built-in jobs added as modules are completed.
Ecosystem overview¶
PyPI packages:
┌──────────────────────────────────────┐
│ craft-easy-api (foundation) │
│ pip install craft-easy-api │
└──────────────┬───────────────────────┘
│ depends on
┌──────────┴──────────┐
│ │
┌───▼──────────────┐ ┌──▼────────────────┐
│ craft-easy-jobs │ │ craft-easy-admin │
│ pip install ... │ │ (React/Expo app) │
│ Batch processing │ │ Universal admin UI │
└──────────────────┘ └───────────────────┘
GitHub repos (easy-software-system/):
├── craft-easy-api ✅ Created + scaffolded
├── craft-easy-admin ✅ Created (empty)
├── craft-easy-jobs ✅ Created (empty)
└── craft-easy-template ✅ Created (empty)
PyPI:
├── craft-easy-api v0.1.0 ✅ Published
├── craft-easy-admin v0.0.1 ✅ Reserved (npm)
└── craft-easy-jobs v0.0.1 ✅ Reserved
Airpark: val av jobbramverk¶
Beslut: airpark-batch använder craft-easy-api:s JobScheduler, inte craft-easy-jobs.
Motivering¶
craft-easy-api har ett inbyggt jobbramverk med:
- JobSchedule (cron, per-tenant, beroendekedjor)
- JobRun (körningslogg med status, duration, output)
- Distribuerat lås (MongoDB-baserat) — förhindrar dubbla körningar
- REST API för att administrera jobb via admin-appen
- Integrerat med Beanie/MongoDB — samma modeller som API:n
craft-easy-jobs är ett alternativt, dekorator-baserat CLI-ramverk som:
- Körs fristående via craft-easy-job run <name>
- Har enklare schemaläggning
- Saknar REST API för administration
- Bättre lämpad för enklare projekt utan admin-app
airpark-batch valde craft-easy-api:s system eftersom: 1. Admin-appen behöver kunna administrera jobb (starta, stoppa, se loggar) 2. Jobb behöver köras per tenant (21 jobb × N tenants) 3. Beroendekedjor (month-end: settlement → invoices → export) hanteras deklarativt 4. Samma databas och modeller som airpark-api — ingen extra konfiguration
craft-easy-jobs kan fortfarande användas parallellt för enklare engångsjobb eller debugging, men det är inte det primära jobbsystemet för Airpark.