Skip to content

Scheduling

Scheduling lives in the API job framework (craft-easy-api). The standalone craft-easy-jobs CLI has no built-in scheduler — it is triggered by whatever external system decides to run it (cron, Cloud Scheduler, GitHub Actions). This page covers the API scheduler.

The JobSchedule model

A schedule binds a registered job to a cron expression. The scheduler evaluates every enabled schedule and triggers any whose next_run_at has passed:

class JobSchedule(BaseDocument):
    job_name: str                       # unique — schedule identifier
    description: str | None
    cron_expression: str                # standard 5-field cron
    is_enabled: bool

    job_type: str                       # Registered job to run
    parameters: dict                    # Parameters to pass to the job
    max_attempts: int                   # Override the job's default
    timeout_seconds: int                # Override the job's default

    chain: list[str]                    # Job names to run after success

    last_run_at: datetime | None
    last_run_status: str | None
    next_run_at: datetime | None        # Calculated from cron_expression

Schedules are persisted in the job_schedules collection with indexes on job_name and (is_enabled, next_run_at) so the scheduler can efficiently find due jobs.

Creating a schedule

Via the REST API:

curl -X POST "http://localhost:5001/jobs/schedules" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "job_name": "nightly-cleanup",
    "description": "Delete expired tokens every night at 03:00",
    "job_type": "cleanup_expired_tokens",
    "cron_expression": "0 3 * * *",
    "is_enabled": true,
    "max_attempts": 2,
    "timeout_seconds": 300,
    "parameters": {}
  }'

Or directly with Beanie:

from craft_easy.models.job import JobSchedule

schedule = JobSchedule(
    job_name="nightly-cleanup",
    description="Delete expired tokens every night at 03:00",
    job_type="cleanup_expired_tokens",
    cron_expression="0 3 * * *",
    is_enabled=True,
)
await schedule.insert()

Once inserted, the scheduler picks it up on the next check cycle — no restart or re-registration needed.

Cron expression syntax

The scheduler accepts standard 5-field cron expressions in the order minute hour day-of-month month day-of-week:

┌──────────────  minute       (0-59)
│ ┌────────────  hour         (0-23)
│ │ ┌──────────  day of month (1-31)
│ │ │ ┌────────  month        (1-12)
│ │ │ │ ┌──────  day of week  (0-6, 0=Monday)
│ │ │ │ │
* * * * *

Supported operators

Syntax Meaning Example
* Every value * * * * * (every minute)
N Exact value 0 3 * * * (03:00 daily)
N,M,P Multiple values 0,15,30,45 * * * * (every 15 min at :00, :15, :30, :45)
N-M Range 0 9-17 * * * (9 AM through 5 PM)
*/N Step */10 * * * * (every 10 minutes)
N/M Step starting at N 15/30 * * * * (:15, :45)

Common expressions

Expression Meaning
0 3 * * * 03:00 every day
*/15 * * * * Every 15 minutes
0 * * * * Top of every hour
0 9 * * 1-5 09:00 Monday through Friday
0 0 1 * * Midnight on the 1st of every month
0 3 1 * * 03:00 on the 1st of every month
0 0 * * 0 Midnight every Sunday
30 2 * * 6 02:30 every Saturday

Day of week is ISO: 0 = Monday, 6 = Sunday. This differs from POSIX cron (where 0 = Sunday). If you are migrating schedules from crontab, double-check weekday values.

Timezones

All cron expressions are evaluated in the scheduler's configured timezone, which defaults to Europe/Stockholm. This matters because cron is wall-clock-based — "0 3 * * " means 03:00 local time*, and the actual UTC timestamp shifts twice a year with daylight saving.

Configure the timezone when constructing the scheduler:

from craft_easy.core.jobs.scheduler import JobScheduler

scheduler = JobScheduler(
    timezone="Europe/Stockholm",
)

Supported values are any IANA timezone: Europe/Stockholm, UTC, America/New_York, Asia/Tokyo, etc.

DST transitions

During the "spring forward" transition (02:00 → 03:00), any schedule targeting a missing minute is skipped that day — the next run is pushed to the following day. During "fall back" (03:00 → 02:00), the ambiguous hour is run once. The scheduler compares its last-run timestamp against the calculated next-run to avoid double execution.

Running the scheduler

The scheduler has two entry points:

from craft_easy.core.jobs.scheduler import JobScheduler
from craft_easy.core.jobs.lock import DistributedLock

lock = DistributedLock(db, ttl_seconds=3600)
await lock.ensure_indexes()

scheduler = JobScheduler(lock=lock, timezone="Europe/Stockholm")

# One pass — evaluate schedules, trigger due jobs, return the triggered runs
runs = await scheduler.check_and_run_due_jobs()

# Catch up on schedules missed during downtime
missed = await scheduler.handle_missed_runs()

The usual deployment pattern is to start a background task that calls check_and_run_due_jobs() once per minute:

async def scheduler_loop():
    while True:
        try:
            await scheduler.check_and_run_due_jobs()
        except Exception:
            logger.exception("Scheduler cycle failed")
        await asyncio.sleep(60)

asyncio.create_task(scheduler_loop())

Pair that with handle_missed_runs() on startup and you have a robust scheduler that survives restarts without silently dropping jobs.

Forcing evaluation from the API

For testing or operator-driven recovery, the REST API exposes a manual trigger:

curl -X POST "http://localhost:5001/jobs/check-schedules" \
  -H "Authorization: Bearer $TOKEN"

This runs one cycle of check_and_run_due_jobs() and returns the triggered runs.

Distributed locking

When running multiple API replicas, the scheduler must ensure that only one replica triggers a given due job. Pass a DistributedLock:

from craft_easy.core.jobs.lock import DistributedLock

lock = DistributedLock(db, ttl_seconds=3600)
await lock.ensure_indexes()

scheduler = JobScheduler(lock=lock)

Locks are backed by a MongoDB unique index on lock_key. When a replica tries to run job settlement, it first acquires lock job:settlement. If the lock is taken, the replica skips this cycle. If the lock holder crashes, the lock expires after ttl_seconds and another replica can take over.

The lock TTL should be longer than the longest expected job runtime so a still-running job isn't overtaken by a second replica. 1 hour (default) is a reasonable baseline.

Intervals vs cron

The scheduler only supports cron expressions. If you want "every 30 minutes" semantics, express it with */30 * * * *. If you need sub-minute granularity, you cannot use cron — invoke the job via your own async loop or use an external scheduler.

Updating next_run_at after code changes

When you deploy a change to a job's timeout or max attempts — or adjust cron_expression in the database — the next_run_at field may be out of sync. The built-in calculate_next_schedules job recomputes every enabled schedule:

curl -X POST "http://localhost:5001/jobs/run/calculate_next_schedules"

Schedule it once a day as an insurance policy:

{
  "job_name": "refresh-schedules",
  "job_type": "calculate_next_schedules",
  "cron_expression": "0 4 * * *",
  "is_enabled": true
}