Craft Easy API — Fullständig Systemspecifikation¶
Version: 1.0 Datum: 2026-03-27 Status: Utkast Föregångare: ESI API (Eve/Flask, ~52K LOC)
Innehållsförteckning¶
- Vision och mål
- Nulägesanalys — vad som porteras vs byggs nytt
- Teknisk stack och beroenden
- Systemarkitektur och mappstruktur
- Datamodeller (Beanie/Pydantic)
- CRUD Resource-system
- Autentisering
- Rättighetssystem
- Cascade-operationer
- Middleware, ETag, audit, felhantering
- Säkerhet
- Testning och kvalitet
- Migrationsstrategi och fasplan
- Deployment och DevOps
- Utvecklingsmiljö
- Admin-schema (generisk CRUD-admin)
Appendix - A: Konfigurationsreferens
Infrastrukturanalys (hosting, priser, DR) finns i separat dokument:
hosting.md
1. Vision och mål¶
1.1 Vision¶
Craft Easy API är ett produktionsklart bassystem för REST API:er byggt på FastAPI och MongoDB. Det ger utvecklare ett komplett fundament — autentisering, rättigheter, CRUD, cascade-operationer, audit trail — utan att behöva bygga från grunden.
Målet är att bassystemet ska vara så värdefullt att det kan bli en open source community-lösning, liknande vad Python Eve var men byggt med moderna verktyg och bättre arkitektur.
1.2 Designprinciper¶
| Princip | Beskrivning |
|---|---|
| Allt är valfritt | Varje modul (auth, access, audit, cascade) kan slås av/på via konfiguration |
| Explicit framför implicit | Ingen magi — alla beteenden syns i koden |
| Schema-first | Pydantic-modeller är sanningskällan för validering, dokumentation och databasschema |
| Secure by default | Säkerhet är på från start; man väljer aktivt att stänga av |
| Composition over inheritance | Dependency injection och middleware framför djupa klassarv |
| Zero boilerplate CRUD | En modell + en rad kod = komplett REST endpoint med validering, pagination, filtrering |
| API-managed everything | Alla konfigurationer (SFTP, jobb, scheman, avtal, kostnadstyper) hanteras genom API:t — aldrig bara i kod eller config-filer |
| Community-ready | Ren separation mellan bas och systemspecifik kod |
1.3 Mål för version 1.0¶
- [ ] Komplett CRUD-system med auto-genererade endpoints
- [ ] JWT-autentisering (ES512) med toggle on/off
- [ ] OAuth2 med pluggbara providers
- [ ] Feature-baserat rättighetssystem med accessgrupper
- [ ] Cascade delete och update med kedjestöd
- [ ] ETag-baserad concurrency control
- [ ] Operationslogg (audit trail)
- [ ] Automatisk OpenAPI-dokumentation
- [ ] Rate limiting per route och per användare
- [ ] 100% typat — inga
Anyellerdictutan modell - [ ] >90% testtäckning på kärnmoduler
1.4 Icke-mål (medvetet borttaget från Eve-systemet)¶
| Borttaget | Motivering |
|---|---|
| Dynamiska endpoints (custom--endpoints) | Varje system definierar sina endpoints statiskt i kod |
| Dynamiska attribut (custom--attributes) | Fält definieras i Pydantic-modeller |
| Multi-tenant databasrouting | En instans = en databas = ett system |
| System-registrering (esi-systems) | Ingen central systemhantering |
x-{system}-- prefix |
Endpoints heter vad de heter |
| Cerberus-validering | Ersätts av Pydantic v2 |
2. Nulägesanalys¶
2.1 Nuvarande system (ESI API på Eve)¶
| Dimension | Nuvarande |
|---|---|
| Ramverk | Eve 2.2.0 (Flask/WSGI) |
| Validering | Cerberus 1.3.2 |
| Databas | MongoDB via PyMongo 4.8 |
| Auth | JWT (ES512), OAuth2 (8 providers), Email/SMS/BankID |
| Rättigheter | 3-lager: resurs/metod, attributnivå, datafilter |
| Multi-tenant | Fullständig DB-isolering per system |
| Dynamiskt | Custom endpoints + custom attributes runtime |
| Storlek | ~52K LOC, 348 filer |
| Community | Eve är i princip övergivet |
2.2 Migrationskarta¶
Porteras (kärnlogik återanvänds)¶
| Komponent | Nuvarande implementation | Ny implementation |
|---|---|---|
| JWT-hantering | TokenService med ES512 |
core.auth.jwt — samma algoritm, Pydantic-modeller för payload |
| OAuth2-flöden | Authlib + Flask routes | core.auth.oauth2 — Authlib + FastAPI routes |
| Cascade delete | CascadeDeleteService (deny/null/delete) |
core.cascade.delete — samma logik, Beanie queries |
| Cascade update | CascadeUpdateService (subscriber, max depth 10) |
core.cascade.update — samma logik, async |
| Organisationshierarki | Eve DOMAIN-dicts | Beanie Document-modeller |
| Access rights ackumulering | _accumulate_user_access_rights() |
core.access.accumulator — samma merge-logik |
| ETag concurrency | Eve inbyggt | core.middleware.etag — manuell implementation |
| Oplog | Eve inbyggt | core.audit.oplog — Beanie middleware |
Byggs nytt¶
| Komponent | Motivering |
|---|---|
| CRUD Router Factory | Ersätter Eve:s automatiska CRUD |
| Feature-baserat rättighetssystem | Ersätter endpoint/metod-matrisen |
| Pydantic-modeller | Ersätter Cerberus-schema + Eve DOMAIN |
| Dependency Injection pipeline | Ersätter Flask before_request hooks |
| Konfigurationssystem | Pydantic Settings med feature flags |
| Rate limiting | Ny implementation med slidingwindow |
| Request/Response middleware | ASGI middleware pipeline |
| Testramverk | pytest + httpx async TestClient |
Tas bort permanent¶
| Komponent | Nuvarande LOC | Motivering |
|---|---|---|
| Custom endpoints builder | ~2000 | Statiska modeller istället |
| Custom attributes builder | ~1500 | Pydantic-fält istället |
| Multi-tenant DB routing | ~800 | En instans = en DB |
| System registration | ~600 | Borttaget koncept |
| DomainService (dynamisk schema) | ~3000 | Ersätts av Pydantic |
| Totalt borttaget | ~8000 | ~15% av nuvarande kodbas |
3. Teknisk stack¶
3.1 Kärn-beroenden¶
| Paket | Version | Syfte | Motivering |
|---|---|---|---|
| fastapi | >=0.115 | HTTP-ramverk | Bästa DI, automatisk OpenAPI, async |
| uvicorn | >=0.34 | ASGI-server | Snabbast, stöd för HTTP/2 |
| pydantic | >=2.10 | Validering & serialisering | Snabbare än v1, bättre felmeddelanden |
| pydantic-settings | >=2.7 | Konfiguration | Env vars + .env + defaults |
| beanie | >=1.27 | MongoDB ODM | Async, Pydantic-native, indexhantering |
| motor | >=3.6 | Async MongoDB-driver | Krävs av Beanie |
| pyjwt[crypto] | >=2.10 | JWT encoding/decoding | ES512-stöd, beprövat |
| authlib | >=1.4 | OAuth2-klient | Samma som nuvarande, fungerar väl |
| cryptography | >=44.0 | Kryptografi | ECDSA-nycklar, hashing |
| passlib[bcrypt] | >=1.7 | Lösenordshashing | bcrypt med auto-upgrade |
3.2 Infrastruktur-beroenden¶
| Paket | Version | Syfte |
|---|---|---|
| gunicorn | >=23.0 | Processhantering (prod) |
| prometheus-fastapi-instrumentator | >=7.0 | Metrics |
| structlog | >=24.0 | Strukturerad loggning |
| sentry-sdk[fastapi] | >=2.0 | Felrapportering (valfritt) |
| slowapi | >=0.1.9 | Rate limiting |
3.3 Utvecklingsberoenden¶
| Paket | Version | Syfte |
|---|---|---|
| pytest | >=8.0 | Test runner |
| pytest-asyncio | >=0.24 | Async test support |
| httpx | >=0.28 | Async HTTP-klient (tester) |
| coverage | >=7.0 | Kodtäckning |
| ruff | >=0.8 | Linting + formatering |
| mypy | >=1.13 | Statisk typanalys |
| mongomock-motor | >=0.0.34 | MongoDB mock för unit tests |
| faker | >=33.0 | Testdata-generering |
| freezegun | >=1.4 | Tidskontroll i tester |
3.4 Python-version¶
Minimum: Python 3.12 — för:
- type-satser (PEP 695)
- Förbättrade felmeddelanden
- asyncio.TaskGroup
- Performance-förbättringar i Pydantic v2
4. Systemarkitektur¶
4.1 Övergripande arkitektur¶
┌─────────────────────────────────────────────────┐
│ Klienter │
│ (Webb, Mobil, API-konsumenter) │
└────────────────────┬────────────────────────────┘
│ HTTPS
┌────────────────────▼────────────────────────────┐
│ Reverse Proxy (nginx/Caddy) │
│ TLS termination, rate limit L1 │
└────────────────────┬────────────────────────────┘
│
┌────────────────────▼────────────────────────────┐
│ Uvicorn / Gunicorn │
│ ASGI server, worker management │
└────────────────────┬────────────────────────────┘
│
┌────────────────────▼────────────────────────────┐
│ FastAPI Application │
│ ┌──────────────────────────────────────────┐ │
│ │ ASGI Middleware Stack │ │
│ │ ┌─ CORS ──────────────────────────────┐ │ │
│ │ │ ┌─ Request ID ──────────────────┐ │ │ │
│ │ │ │ ┌─ Structured Logging ────┐ │ │ │ │
│ │ │ │ │ ┌─ Rate Limiting ───┐ │ │ │ │ │
│ │ │ │ │ │ ┌─ ETag ──────┐ │ │ │ │ │ │
│ │ │ │ │ │ │ Router │ │ │ │ │ │ │
│ │ │ │ │ │ └─────────────┘ │ │ │ │ │ │
│ │ │ │ │ └───────────────────┘ │ │ │ │ │
│ │ │ │ └─────────────────────────┘ │ │ │ │
│ │ │ └───────────────────────────────┘ │ │ │
│ │ └─────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ Dependency Injection │ │
│ │ ┌────────┐ ┌──────────┐ ┌────────────┐ │ │
│ │ │ Auth │ │ Access │ │ Services │ │ │
│ │ │ Guard │ │ Guard │ │ (DB, etc) │ │ │
│ │ └────────┘ └──────────┘ └────────────┘ │ │
│ └──────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ Route Handlers │ │
│ │ ┌─ CRUD Router Factory ──────────────┐ │ │
│ │ │ GET / POST / GET /:id │ │ │
│ │ │ PATCH /:id PUT /:id DELETE /:id │ │ │
│ │ └────────────────────────────────────┘ │ │
│ │ ┌─ Custom Routes ────────────────────┐ │ │
│ │ │ Auth endpoints, webhooks, etc. │ │ │
│ │ └────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ Core Services │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Cascade │ │ Audit │ │ Events │ │ │
│ │ │ Service │ │ Logger │ │ System │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────┘ │
└────────────────────┬────────────────────────────┘
│
┌────────────────────▼────────────────────────────┐
│ MongoDB (Beanie/Motor) │
└──────────────────────────────────────────────────┘
4.2 Mappstruktur¶
craft-easy-api/
│
├── pyproject.toml # Projektdefinition, beroenden
├── README.md # Quick start guide
├── LICENSE # MIT eller Apache 2.0
├── .env.example # Mall för miljövariabler
│
├── src/
│ └── craft_easy/ # Huvudpaket (importbart: from craft_easy import ...)
│ │
│ ├── __init__.py # Version, package metadata
│ ├── app.py # App factory: create_app()
│ ├── settings.py # Pydantic Settings — alla feature flags
│ │
│ ├── core/ # Kärnmoduler — ramverkets hjärta
│ │ ├── __init__.py
│ │ │
│ │ ├── auth/ # Autentisering
│ │ │ ├── __init__.py # AuthModule — toggle on/off
│ │ │ ├── jwt.py # JWTService: encode, decode, refresh
│ │ │ ├── tokens.py # TokenPayload, TokenType modeller
│ │ │ ├── dependencies.py # Depends(): get_current_user, require_auth, optional_auth
│ │ │ ├── password.py # Lösenordshashing (bcrypt)
│ │ │ └── oauth2/ # OAuth2 providers
│ │ │ ├── __init__.py # OAuth2Module — pluggbar provider-registrering
│ │ │ ├── base.py # BaseOAuth2Provider (abstrakt)
│ │ │ ├── google.py
│ │ │ ├── microsoft.py
│ │ │ ├── github.py
│ │ │ └── routes.py # /auth/oauth2/{provider} endpoints
│ │ │
│ │ ├── access/ # Rättighetssystem
│ │ │ ├── __init__.py # AccessModule — toggle on/off
│ │ │ ├── features.py # Feature-register och definitions
│ │ │ ├── guards.py # Depends(): require_feature, require_any_feature
│ │ │ ├── accumulator.py # Merge access rights från multipla grupper
│ │ │ ├── filters.py # Datafilter (begränsa synliga dokument)
│ │ │ └── attribute_access.py # Fältnivå-access (read/write/none)
│ │ │
│ │ ├── crud/ # CRUD Resource-system
│ │ │ ├── __init__.py
│ │ │ ├── resource.py # Resource-klass: modell → komplett CRUD router
│ │ │ ├── pagination.py # Cursor-baserad + offset pagination
│ │ │ ├── filtering.py # Query parameter → MongoDB filter
│ │ │ ├── sorting.py # Sorteringslogik
│ │ │ ├── projection.py # Fältval (sparse responses)
│ │ │ └── serialization.py # Response serialisering
│ │ │
│ │ ├── cascade/ # Cascade-operationer
│ │ │ ├── __init__.py
│ │ │ ├── delete.py # CascadeDeleteService (deny/null/delete)
│ │ │ ├── update.py # CascadeUpdateService (subscriber, max depth 10)
│ │ │ └── registry.py # Registrering av cascade-regler per modell
│ │ │
│ │ ├── audit/ # Audit trail
│ │ │ ├── __init__.py # AuditModule — toggle on/off
│ │ │ ├── oplog.py # Operationslogg (insert/update/delete)
│ │ │ └── models.py # AuditEntry modell
│ │ │
│ │ ├── middleware/ # ASGI middleware
│ │ │ ├── __init__.py
│ │ │ ├── cors.py # CORS-konfiguration
│ │ │ ├── etag.py # ETag generation och If-Match validering
│ │ │ ├── request_id.py # X-Request-ID injection
│ │ │ ├── logging.py # Strukturerad request/response loggning
│ │ │ └── rate_limit.py # Rate limiting (sliding window)
│ │ │
│ │ ├── hooks/ # Livscykel-hooks
│ │ │ ├── __init__.py
│ │ │ ├── types.py # HookType enum, hook signaturer
│ │ │ ├── registry.py # Hook-registrering per resurs
│ │ │ └── executor.py # Hook-exekvering med felhantering
│ │ │
│ │ ├── errors/ # Felhantering
│ │ │ ├── __init__.py
│ │ │ ├── exceptions.py # Domänspecifika exceptions
│ │ │ └── handlers.py # Exception → HTTP response mapping
│ │ │
│ │ └── events/ # Event-system (intern pub/sub)
│ │ ├── __init__.py
│ │ ├── bus.py # EventBus: publish/subscribe
│ │ └── types.py # Event-typer
│ │
│ ├── models/ # Bas-datamodeller (Beanie Documents)
│ │ ├── __init__.py
│ │ ├── base.py # BaseDocument — gemensamma fält och beteenden
│ │ ├── user.py # User
│ │ ├── organization.py # Organization + hierarki
│ │ ├── access_group.py # AccessGroup, AccessGroupTemplate, AccessGroupCluster
│ │ ├── auth.py # UserAuthentication, AuthenticationCode
│ │ ├── user_settings.py # UserSettings
│ │ └── audit.py # AuditEntry (oplog)
│ │
│ ├── routes/ # Manuella routes (ej CRUD-genererade)
│ │ ├── __init__.py
│ │ ├── auth.py # /auth/* (login, logout, refresh, oauth2)
│ │ ├── health.py # /health, /ready
│ │ └── admin.py # /admin/* (systemhantering)
│ │
│ └── utils/ # Hjälpfunktioner
│ ├── __init__.py
│ ├── objectid.py # PydanticObjectId helpers
│ └── datetime.py # UTC datetime helpers
│
├── tests/
│ ├── conftest.py # Gemensamma fixtures (app, db, auth tokens)
│ ├── unit/ # Unit tests (mocked dependencies)
│ │ ├── core/
│ │ │ ├── test_jwt.py
│ │ │ ├── test_access.py
│ │ │ ├── test_cascade.py
│ │ │ └── ...
│ │ └── models/
│ │ └── ...
│ ├── integration/ # Integration tests (riktig databas)
│ │ ├── test_crud.py
│ │ ├── test_auth_flow.py
│ │ ├── test_cascade_flow.py
│ │ └── ...
│ └── e2e/ # End-to-end (full app)
│ └── ...
│
├── docs/ # Dokumentation
│ ├── getting-started.md
│ ├── configuration.md
│ ├── authentication.md
│ ├── authorization.md
│ └── extending.md
│
└── examples/ # Exempelprojekt
└── airpark/ # Airpark som referensimplementation
├── models/
├── routes/
└── app.py
4.3 Paketering och distribution¶
Strategi: PyPI-paket + projekt-template¶
Bassystemet publiceras som ett pip-installerbart paket. Nya projekt skapas från en cookiecutter-template som scaffoldar rätt struktur.
┌────────────────────────┐
│ craft-easy-api (PyPI) │
│ Ramverket │
│ pip install ... │
└───────────┬────────────┘
│ importerar
┌─────────────────────┼─────────────────────┐
│ │ │
┌─────────▼─────────┐ ┌───────▼──────────┐ ┌───────▼──────────┐
│ airpark-api/ │ │ hobby-projekt/ │ │ annat-system/ │
│ (eget repo) │ │ (eget repo) │ │ (eget repo) │
│ pyproject.toml: │ │ pyproject.toml: │ │ pyproject.toml: │
│ craft-easy-api>=1 │ │ craft-easy-api>=1 │ │ craft-easy-api>=1 │
└────────────────────┘ └───────────────────┘ └──────────────────┘
Varför PyPI-paket istället för fork?
| Fork | PyPI-paket | |
|---|---|---|
| Buggfix i basen | Git merge i varje projekt | pip install --upgrade |
| Anpassning | Ändra direkt (divergering) | Hooks, config, subklasser |
| 4 projekt efter 1 år | 4 unika kodbaser | 4 projekt, samma bas |
| Community | Kan inte bidra tillbaka | Standard open source |
| Versioner | Git-kaos | 1.0.0, 1.1.0, 2.0.0 |
craft-easy-api/pyproject.toml¶
[project]
name = "craft-easy-api"
version = "1.0.0"
description = "Production-ready base system for REST APIs with FastAPI + MongoDB"
readme = "README.md"
license = "MIT"
requires-python = ">=3.12"
authors = [
{name = "ESI", email = "dev@esi.se"},
]
keywords = ["fastapi", "mongodb", "rest", "api", "framework"]
classifiers = [
"Framework :: FastAPI",
"Programming Language :: Python :: 3.12",
"Topic :: Internet :: WWW/HTTP :: HTTP Servers",
]
dependencies = [
"fastapi>=0.115",
"uvicorn>=0.34",
"pydantic>=2.10",
"pydantic-settings>=2.7",
"beanie>=1.27",
"motor>=3.6",
"pyjwt[crypto]>=2.10",
"authlib>=1.4",
"cryptography>=44.0",
"passlib[bcrypt]>=1.7",
"structlog>=24.0",
"slowapi>=0.1.9",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.24",
"httpx>=0.28",
"coverage>=7.0",
"ruff>=0.8",
"mypy>=1.13",
"mongomock-motor>=0.0.34",
"faker>=33.0",
"freezegun>=1.4",
]
[project.scripts]
craft-easy = "craft_easy.cli:main" # CLI: craft-easy init, craft-easy run
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/craft_easy"]
[tool.ruff]
target-version = "py312"
line-length = 100
[tool.mypy]
python_version = "3.12"
strict = true
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
Nytt projekt: cookiecutter-template¶
# Skapa nytt projekt
pip install cookiecutter
cookiecutter gh:easy-software-system/craft-easy-api-template
# Frågor:
# project_name [My API]: Airpark API
# project_slug [airpark-api]:
# package_name [airpark]:
# description [A REST API built on Craft Easy]: Parking system API
# author [ESI]:
# python_version [3.12]:
Genererar:
airpark-api/
├── pyproject.toml # Depends on craft-easy-api
├── Dockerfile
├── docker-compose.yml
├── .env.example
├── .env.development
├── .github/
│ └── workflows/
│ ├── ci.yml # Test + lint
│ └── deploy.yml # Build + push + deploy
├── src/
│ └── airpark/
│ ├── __init__.py
│ ├── app.py # create_app() — din app
│ ├── settings.py # AirparkSettings(BaseSettings)
│ ├── models/
│ │ └── __init__.py # Dina modeller här
│ └── routes/
│ └── __init__.py # Dina custom routes här
└── tests/
├── conftest.py
└── test_health.py # Fungerar direkt
airpark-api/pyproject.toml (genererad)¶
[project]
name = "airpark-api"
version = "0.1.0"
description = "Parking system API"
requires-python = ">=3.12"
dependencies = [
"craft-easy-api>=1.0",
]
[project.optional-dependencies]
dev = [
"craft-easy-api[dev]>=1.0", # Inkluderar pytest, ruff, mypy etc.
]
airpark-api/src/airpark/app.py (genererad)¶
from craft_easy.app import create_base_app
from craft_easy.core.crud import Resource
from craft_easy.models.organization import Consortium, Region, Organization, Department, Unit
from craft_easy.models.user import User
from craft_easy.models.access_group import AccessGroup
from .settings import AirparkSettings
settings = AirparkSettings()
app = create_base_app(settings)
# Bas-resurser — inkluderade i ramverket
app.include_router(Resource(Consortium, "consortiums").build())
app.include_router(Resource(Region, "regions").build())
app.include_router(Resource(Organization, "organizations").build())
app.include_router(Resource(Department, "departments").build())
app.include_router(Resource(Unit, "units").build())
app.include_router(Resource(User, "users").build())
app.include_router(Resource(AccessGroup, "access-groups").build())
# Dina resurser — lägg till här
# from .models.parking_zone import ParkingZone, ParkingZoneCreate
# app.include_router(Resource(ParkingZone, "parking-zones", create_schema=ParkingZoneCreate).build())
CLI: craft-easy (framtida)¶
# Skapa nytt projekt (alternativ till cookiecutter)
craft-easy init airpark-api
# Starta dev-server
craft-easy run
# Generera ny modell
craft-easy model parking-zone --fields "name:str capacity:int zone_type:str"
# Köra tester
craft-easy test
Publicering¶
# Privat: GitHub Packages eller Azure Artifacts
pip install craft-easy-api --index-url https://github.com/...
# Publikt (om/när community-ready):
pip install craft-easy-api # Från PyPI
Under utvecklingsfasen kan du installera direkt från GitHub:
# I projektets pyproject.toml:
dependencies = [
"craft-easy-api @ git+https://github.com/easy-software-system/craft-easy-api.git@main",
]
# Eller specifik version/tag:
dependencies = [
"craft-easy-api @ git+https://github.com/easy-software-system/craft-easy-api.git@v1.0.0",
]
Ingen PyPI-publicering nödvändig förrän paketet är moget nog.
5. Datamodeller¶
5.1 BaseDocument — gemensam bas för alla modeller¶
# src/craft_easy/models/base.py
from datetime import datetime, UTC
from typing import Optional, ClassVar
from beanie import Document, before_event, Insert, Replace, Update
from pydantic import Field
from bson import ObjectId
class BaseDocument(Document):
"""
Bas för alla dokument i systemet.
Ger automatisk tidsstämpling, ETag via revision_id,
och deklarativ cascade-konfiguration.
"""
# Automatiska tidsstämplar
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
# Soft delete (valfritt via Settings)
is_deleted: bool = Field(default=False, exclude=True)
deleted_at: Optional[datetime] = Field(default=None, exclude=True)
# Extern integration
external_id: Optional[str] = Field(default=None, max_length=200)
# --- Cascade-konfiguration (deklarativ) ---
class CascadeConfig:
"""
Deklarativ cascade-konfiguration per modell.
Override i subklasser.
"""
delete: dict[str, str] = {} # fält → "deny" | "null" | "delete"
update: dict[str, str] = {} # lokalt_fält → "relation.källfält"
# --- Beanie lifecycle hooks ---
@before_event(Insert)
async def set_created(self):
now = datetime.now(UTC)
self.created_at = now
self.updated_at = now
@before_event(Replace, Update)
async def set_updated(self):
self.updated_at = datetime.now(UTC)
class Settings:
# Beanie settings — override per modell
use_revision = True # ETag via revision_id
use_state_management = True # Spåra ändringar för cascade
# Index på tidsstämplar
indexes = [
[("created_at", -1)],
[("updated_at", -1)],
]
5.2 Organisationshierarki¶
# src/craft_easy/models/organization.py
from typing import Optional
from beanie import PydanticObjectId, Indexed
from pydantic import Field, field_validator
from .base import BaseDocument
class Consortium(BaseDocument):
"""Toppnivå i organisationshierarkin."""
name: Indexed(str, unique=True)
description: Optional[str] = None
id_number: Optional[str] = Field(default=None, pattern=r"^[A-Z]{2}-\d{5,30}$")
class Settings:
name = "consortiums" # Collection-namn i MongoDB
class CascadeConfig:
delete = {} # Deny via referens från Region
class Region(BaseDocument):
"""Andra nivån — tillhör ett Consortium."""
name: Indexed(str)
description: Optional[str] = None
id_number: Optional[str] = Field(default=None, pattern=r"^[A-Z]{2}-\d{5,30}$")
# Förälder
consortium_id: PydanticObjectId
consortium_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
class Settings:
name = "regions"
class CascadeConfig:
delete = {
"consortium_id": "deny", # Kan inte ta bort consortium om regioner finns
}
update = {
"consortium_name": "consortium_id.name", # Auto-synka namn
}
class Organization(BaseDocument):
"""Tredje nivån — tillhör en Region."""
name: Indexed(str)
description: Optional[str] = None
id_number: Optional[str] = Field(default=None, pattern=r"^[A-Z]{2}-\d{0,30}$")
# Direkt förälder
region_id: PydanticObjectId
region_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
# Ärvda (readonly, cascade-uppdaterade)
consortium_id: Optional[PydanticObjectId] = Field(
default=None, json_schema_extra={"readonly": True}
)
consortium_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
class Settings:
name = "organizations"
indexes = [
[("region_id", 1)],
[("consortium_id", 1)],
]
class CascadeConfig:
delete = {
"region_id": "deny",
}
update = {
"region_name": "region_id.name",
"consortium_id": "region_id.consortium_id",
"consortium_name": "region_id.consortium_name",
}
class Department(BaseDocument):
"""Fjärde nivån — tillhör en Organization."""
name: Indexed(str)
description: Optional[str] = None
id_number: Optional[str] = Field(default=None, pattern=r"^[A-Z]{2}-\d{0,30}$")
# Direkt förälder
organization_id: PydanticObjectId
organization_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
# Ärvda
region_id: Optional[PydanticObjectId] = Field(
default=None, json_schema_extra={"readonly": True}
)
region_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
consortium_id: Optional[PydanticObjectId] = Field(
default=None, json_schema_extra={"readonly": True}
)
consortium_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
class Settings:
name = "departments"
indexes = [
[("organization_id", 1)],
]
class CascadeConfig:
delete = {"organization_id": "deny"}
update = {
"organization_name": "organization_id.name",
"region_id": "organization_id.region_id",
"region_name": "organization_id.region_name",
"consortium_id": "organization_id.consortium_id",
"consortium_name": "organization_id.consortium_name",
}
class Unit(BaseDocument):
"""Lägsta nivån (löv) — tillhör en Department. Stödjer sub-units."""
name: Indexed(str)
description: Optional[str] = None
id_number: Optional[str] = Field(default=None, pattern=r"^[A-Z]{2}-\d{0,30}$")
# Direkt förälder
department_id: PydanticObjectId
department_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
# Självreferens för sub-units
parent_unit_id: Optional[PydanticObjectId] = None
parent_unit_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
# Ärvda
organization_id: Optional[PydanticObjectId] = Field(
default=None, json_schema_extra={"readonly": True}
)
organization_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
region_id: Optional[PydanticObjectId] = Field(
default=None, json_schema_extra={"readonly": True}
)
region_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
consortium_id: Optional[PydanticObjectId] = Field(
default=None, json_schema_extra={"readonly": True}
)
consortium_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
class Settings:
name = "units"
indexes = [
[("department_id", 1)],
[("parent_unit_id", 1)],
]
class CascadeConfig:
delete = {
"department_id": "deny",
"parent_unit_id": "null",
}
update = {
"department_name": "department_id.name",
"parent_unit_name": "parent_unit_id.name",
"organization_id": "department_id.organization_id",
"organization_name": "department_id.organization_name",
"region_id": "department_id.region_id",
"region_name": "department_id.region_name",
"consortium_id": "department_id.consortium_id",
"consortium_name": "department_id.consortium_name",
}
5.3 User-modell¶
# src/craft_easy/models/user.py
from datetime import datetime
from typing import Optional, Annotated
from beanie import PydanticObjectId, Indexed
from pydantic import Field, EmailStr, model_validator
from .base import BaseDocument
class DataAccessEntry(BaseModel):
"""En accessgrupp-tilldelning med valfri giltighetstid."""
access_group_id: PydanticObjectId
access_group_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
valid_from: Optional[datetime] = None
valid_until: Optional[datetime] = None
class ApiKeyIp(BaseModel):
ip: str = Field(pattern=r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
description: Optional[str] = None
class ApiKeyPublicKey(BaseModel):
public_key: str
description: Optional[str] = None
class User(BaseDocument):
"""Användare i systemet."""
# Identitet
name: Indexed(str)
email: Optional[Annotated[EmailStr, Indexed(unique=True, sparse=True)]] = None
phone: Optional[Annotated[str, Indexed(unique=True, sparse=True)]] = Field(
default=None, min_length=12, max_length=12, pattern=r"^\+\d{11}$"
)
description: Optional[str] = None
# Organisatorisk placering — Entry Point (exakt en av dessa)
entry_consortium_id: Optional[PydanticObjectId] = None
entry_region_id: Optional[PydanticObjectId] = None
entry_organization_id: Optional[PydanticObjectId] = None
entry_department_id: Optional[PydanticObjectId] = None
entry_unit_id: Optional[PydanticObjectId] = None
# Ärvda hierarkifält (readonly, cascade-uppdaterade)
consortium_id: Optional[PydanticObjectId] = Field(
default=None, json_schema_extra={"readonly": True}
)
consortium_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
region_id: Optional[PydanticObjectId] = Field(
default=None, json_schema_extra={"readonly": True}
)
region_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
organization_id: Optional[PydanticObjectId] = Field(
default=None, json_schema_extra={"readonly": True}
)
organization_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
department_id: Optional[PydanticObjectId] = Field(
default=None, json_schema_extra={"readonly": True}
)
department_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
unit_id: Optional[PydanticObjectId] = Field(
default=None, json_schema_extra={"readonly": True}
)
unit_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
# Chef
manager_id: Optional[PydanticObjectId] = None
manager_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
# Access
data_access: list[DataAccessEntry] = Field(default_factory=list)
# OAuth
email_oauth: Optional[Annotated[EmailStr, Indexed(unique=True, sparse=True)]] = None
oauth_provider: Optional[str] = None
# API-åtkomst (maskin-till-maskin)
api_code: Optional[Annotated[str, Indexed(unique=True, sparse=True)]] = Field(
default=None, min_length=5, max_length=200
)
api_code_ips: list[ApiKeyIp] = Field(default_factory=list)
api_code_public_keys: list[ApiKeyPublicKey] = Field(default_factory=list)
# BankID
swedish_mobile_bankid: Optional[str] = Field(
default=None, pattern=r"^\d{12}$"
)
# Flaggor
is_enabled: bool = True
system_user: bool = False
public_user: bool = False
@model_validator(mode="after")
def validate_single_entry_point(self) -> "User":
"""Exakt en entry point måste sättas."""
entry_points = [
self.entry_consortium_id,
self.entry_region_id,
self.entry_organization_id,
self.entry_department_id,
self.entry_unit_id,
]
set_count = sum(1 for ep in entry_points if ep is not None)
if set_count != 1:
raise ValueError("Exactly one entry point must be set")
return self
class Settings:
name = "users"
indexes = [
[("organization_id", 1)],
[("department_id", 1)],
[("unit_id", 1)],
[("is_enabled", 1)],
]
class CascadeConfig:
delete = {
"entry_unit_id": "null",
"entry_department_id": "deny",
"entry_organization_id": "deny",
"entry_region_id": "deny",
"entry_consortium_id": "deny",
"manager_id": "null",
"data_access.access_group_id": "delete", # Ta bort entry från listan
}
update = {
"manager_name": "manager_id.name",
"unit_name": "unit_id.name",
"department_name": "department_id.name",
"organization_name": "organization_id.name",
"region_name": "region_id.name",
"consortium_name": "consortium_id.name",
"data_access.access_group_name": "data_access.access_group_id.name",
}
5.4 Access Group-modeller¶
# src/craft_easy/models/access_group.py
from typing import Optional
from beanie import PydanticObjectId, Indexed
from pydantic import Field, BaseModel
from .base import BaseDocument
# --- Sub-modeller ---
class ScopeEntry(BaseModel):
"""En referens i en scope-lista (t.ex. organizations, units)."""
id: PydanticObjectId
name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
class UserScopeEntry(BaseModel):
"""Användarreferens i scope."""
user_id: PydanticObjectId
user_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
class ResourceAccess(BaseModel):
"""Rättighetsdefinition för en resurs."""
methods: list[str] = Field(default_factory=list) # GET, POST, PATCH, PUT, DELETE
attribute_access: dict[str, str] = Field(default_factory=dict) # fält → read|write|none
full_attribute_access: bool = False
filters: dict[str, list[PydanticObjectId]] = Field(default_factory=dict)
full_filter_access: bool = False
features: list[str] = Field(default_factory=list) # NYT: feature-lista
# --- Huvudmodeller ---
class AccessGroupTemplate(BaseDocument):
"""Mall för standardiserade accessgrupp-konfigurationer."""
name: Indexed(str, unique=True)
description: Optional[str] = None
access_rights: dict[str, ResourceAccess] = Field(default_factory=dict)
update_access_groups: bool = False
class Settings:
name = "access_group_templates"
class AccessGroupCluster(BaseDocument):
"""Grupperar templates för automatisk tilldelning till organisationsenheter."""
name: Indexed(str, unique=True)
description: Optional[str] = None
templates: list[ScopeEntry] = Field(default_factory=list)
update_access_groups: bool = False
class Settings:
name = "access_group_clusters"
class CascadeConfig:
delete = {
"templates.id": "delete", # Ta bort entry när template raderas
}
update = {
"templates.name": "templates.id.name",
}
class AccessGroup(BaseDocument):
"""
Central rättighetskontainer.
Definierar vilka resurser, metoder, attribut och data en grupp användare har tillgång till.
"""
name: Indexed(str)
description: Optional[str] = None
# Länk till template/cluster
template_id: Optional[PydanticObjectId] = None
template_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
created_by_cluster_id: Optional[PydanticObjectId] = Field(
default=None, json_schema_extra={"readonly": True}
)
created_by_cluster_name: Optional[str] = Field(
default=None, json_schema_extra={"readonly": True}
)
# Entry point — exakt en (var gruppen "bor")
entry_consortium_id: Optional[PydanticObjectId] = None
entry_region_id: Optional[PydanticObjectId] = None
entry_organization_id: Optional[PydanticObjectId] = None
entry_department_id: Optional[PydanticObjectId] = None
entry_unit_id: Optional[PydanticObjectId] = None
# Access scope — vilken data gruppen ser
consortiums: list[ScopeEntry] = Field(default_factory=list)
regions: list[ScopeEntry] = Field(default_factory=list)
organizations: list[ScopeEntry] = Field(default_factory=list)
departments: list[ScopeEntry] = Field(default_factory=list)
units: list[ScopeEntry] = Field(default_factory=list)
users: list[UserScopeEntry] = Field(default_factory=list)
managers: list[UserScopeEntry] = Field(default_factory=list)
# --- RÄTTIGHETER ---
access_rights: dict[str, ResourceAccess] = Field(default_factory=dict)
# NYT: Feature-baserade rättigheter (utöver resurs/metod)
features: list[str] = Field(default_factory=list)
class Settings:
name = "access_groups"
indexes = [
[("entry_organization_id", 1)],
[("entry_unit_id", 1)],
]
class CascadeConfig:
delete = {
"created_by_cluster_id": "delete",
"entry_consortium_id": "delete",
"entry_region_id": "delete",
"entry_organization_id": "delete",
"entry_department_id": "delete",
"entry_unit_id": "delete",
"consortiums.id": "delete",
"regions.id": "delete",
"organizations.id": "delete",
"departments.id": "delete",
"units.id": "delete",
}
update = {
"template_name": "template_id.name",
"created_by_cluster_name": "created_by_cluster_id.name",
"consortiums.name": "consortiums.id.name",
"regions.name": "regions.id.name",
"organizations.name": "organizations.id.name",
"departments.name": "departments.id.name",
"units.name": "units.id.name",
"users.user_name": "users.user_id.name",
"managers.user_name": "managers.user_id.name",
}
5.5 Auth-modeller¶
# src/craft_easy/models/auth.py
from datetime import datetime
from typing import Optional
from beanie import PydanticObjectId, Indexed
from pydantic import Field
from .base import BaseDocument
class UserAuthentication(BaseDocument):
"""Aktiva token-sessioner (för refresh-spårning)."""
user_id: PydanticObjectId
user_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
token_id: Indexed(str, unique=True)
origin_app: Optional[str] = None
refresh_count: int = 0
class Settings:
name = "user_authentications"
class CascadeConfig:
delete = {"user_id": "delete"}
update = {"user_name": "user_id.name"}
class AuthenticationCode(BaseDocument):
"""Engångskoder för email/SMS-inloggning."""
user_id: PydanticObjectId
user_name: Optional[str] = Field(default=None, json_schema_extra={"readonly": True})
code: str
application: str
method: str # "email" | "sms"
target: str # email-adress eller telefonnummer
is_used: bool = False
send_count: int = 1
expires_at: datetime
class Settings:
name = "authentication_codes"
indexes = [
[("expires_at", 1)], # TTL index
[("user_id", 1), ("application", 1), ("is_used", 1)],
]
class CascadeConfig:
delete = {"user_id": "delete"}
update = {"user_name": "user_id.name"}
class AuditEntry(BaseDocument):
"""Operationslogg — varje mutation loggas."""
resource: str # Collection-namn
item_id: PydanticObjectId # Dokument-ID
operation: str # "insert" | "update" | "replace" | "delete"
user_id: Optional[PydanticObjectId] = None
user_name: Optional[str] = None
ip: Optional[str] = None
changes: Optional[dict] = None # Diff för update/replace
snapshot: Optional[dict] = None # Fullt dokument vid delete
class Settings:
name = "audit_log"
indexes = [
[("resource", 1), ("item_id", 1)],
[("user_id", 1)],
[("created_at", -1)],
]
6. CRUD Resource-system¶
6.1 Resource-klassen — hjärtat i bassystemet¶
Resource-klassen genererar en komplett CRUD APIRouter från en Beanie-modell, med stöd för hooks, access control, cascade-operationer och filtrering.
# src/craft_easy/core/crud/resource.py
from typing import Type, Callable, Optional, Any
from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response
from beanie import Document, PydanticObjectId
from pydantic import BaseModel
from ..auth.dependencies import get_optional_user, get_required_user
from ..access.guards import FeatureGuard
from ..hooks.types import HookType
from ..hooks.registry import HookRegistry
class Resource:
"""
Deklarativ CRUD-resursdefinition.
Användning:
users = Resource(
model=User,
prefix="users",
create_schema=UserCreate,
update_schema=UserUpdate,
features={
"list": "users.list",
"read": "users.read",
"create": "users.create",
"update": "users.update",
"delete": "users.delete",
},
)
@users.hook(HookType.BEFORE_CREATE)
async def populate_hierarchy(data: dict, context: HookContext) -> dict:
# Slå upp org-hierarkin och sätt readonly-fält
...
return data
app.include_router(users.build())
"""
def __init__(
self,
model: Type[Document],
prefix: str,
create_schema: Optional[Type[BaseModel]] = None,
update_schema: Optional[Type[BaseModel]] = None,
response_schema: Optional[Type[BaseModel]] = None,
tags: Optional[list[str]] = None,
features: Optional[dict[str, str]] = None,
auth_required: bool = True,
default_sort: str = "-created_at",
max_page_size: int = 100,
enable_put: bool = True,
):
self.model = model
self.prefix = prefix
self.create_schema = create_schema
self.update_schema = update_schema
self.response_schema = response_schema
self.tags = tags or [prefix]
self.features = features or {}
self.auth_required = auth_required
self.default_sort = default_sort
self.max_page_size = max_page_size
self.enable_put = enable_put
self._hooks = HookRegistry()
# --- Hook-dekoratorer ---
def hook(self, hook_type: HookType):
"""
Dekorator för att registrera livscykel-hooks.
@resource.hook(HookType.BEFORE_CREATE)
async def my_hook(data: dict, context: HookContext) -> dict:
return data
"""
def decorator(fn: Callable):
self._hooks.register(hook_type, fn)
return fn
return decorator
def before_create(self, fn: Callable):
"""Shorthand för @resource.hook(HookType.BEFORE_CREATE)"""
return self.hook(HookType.BEFORE_CREATE)(fn)
def after_create(self, fn: Callable):
return self.hook(HookType.AFTER_CREATE)(fn)
def before_update(self, fn: Callable):
return self.hook(HookType.BEFORE_UPDATE)(fn)
def after_update(self, fn: Callable):
return self.hook(HookType.AFTER_UPDATE)(fn)
def before_delete(self, fn: Callable):
return self.hook(HookType.BEFORE_DELETE)(fn)
def after_delete(self, fn: Callable):
return self.hook(HookType.AFTER_DELETE)(fn)
def after_fetch(self, fn: Callable):
return self.hook(HookType.AFTER_FETCH)(fn)
# --- Router-generering ---
def build(self) -> APIRouter:
"""Bygger en komplett APIRouter med alla CRUD-operationer."""
router = APIRouter(prefix=f"/{self.prefix}", tags=self.tags)
model = self.model
hooks = self._hooks
features = self.features
create_schema = self.create_schema or model
update_schema = self.update_schema or model
response_schema = self.response_schema
max_page_size = self.max_page_size
default_sort = self.default_sort
# Dependencies
auth_dep = get_required_user if self.auth_required else get_optional_user
def feature_dep(operation: str):
feat = features.get(operation)
if feat:
return Depends(FeatureGuard(feat))
return Depends(lambda: None)
# ---- GET / (lista) ----
@router.get("/")
async def list_items(
request: Request,
response: Response,
page: int = Query(1, ge=1),
per_page: int = Query(25, ge=1, le=max_page_size),
sort: Optional[str] = Query(default_sort),
fields: Optional[str] = Query(None, description="Kommaseparerade fält att inkludera"),
user=Depends(auth_dep),
_access=feature_dep("list"),
):
# Bygg filter från query params
filters = _build_filters(request.query_params, model)
# Applicera access-filter om rättighetssystem är aktivt
if hasattr(request.state, "access_filters"):
filters = _merge_access_filters(filters, request.state.access_filters)
query = model.find(filters)
# Sortering
if sort:
direction = -1 if sort.startswith("-") else 1
field = sort.lstrip("-+")
query = query.sort([(field, direction)])
# Projektion
if fields:
field_list = [f.strip() for f in fields.split(",")]
query = query.project({f: 1 for f in field_list})
# Pagination
total = await query.count()
items = await query.skip((page - 1) * per_page).limit(per_page).to_list()
# After-fetch hooks
for item in items:
await hooks.execute(HookType.AFTER_FETCH, item=item, user=user)
# Pagination headers
response.headers["X-Total-Count"] = str(total)
response.headers["X-Page"] = str(page)
response.headers["X-Per-Page"] = str(per_page)
response.headers["X-Total-Pages"] = str(-(-total // per_page)) # Ceil division
return {"items": items, "total": total, "page": page, "per_page": per_page}
# ---- GET /{id} (enskilt dokument) ----
@router.get("/{item_id}")
async def get_item(
item_id: PydanticObjectId,
response: Response,
user=Depends(auth_dep),
_access=feature_dep("read"),
):
item = await model.get(item_id)
if not item:
raise HTTPException(404, detail=f"{model.__name__} not found")
await hooks.execute(HookType.AFTER_FETCH, item=item, user=user)
if hasattr(item, "revision_id") and item.revision_id:
response.headers["ETag"] = f'"{item.revision_id}"'
return item
# ---- POST / (skapa) ----
@router.post("/", status_code=201)
async def create_item(
data: create_schema,
response: Response,
user=Depends(auth_dep),
_access=feature_dep("create"),
):
item_data = data.model_dump(exclude_unset=True)
# Before-create hooks
item_data = await hooks.execute(
HookType.BEFORE_CREATE, data=item_data, user=user
)
# Strip readonly fields
item_data = _strip_readonly(item_data, model)
item = model(**item_data)
await item.insert()
# After-create hooks (cascade update, audit, etc.)
await hooks.execute(HookType.AFTER_CREATE, item=item, user=user)
if hasattr(item, "revision_id") and item.revision_id:
response.headers["ETag"] = f'"{item.revision_id}"'
return item
# ---- PATCH /{id} (delvis uppdatering) ----
@router.patch("/{item_id}")
async def patch_item(
item_id: PydanticObjectId,
data: update_schema,
request: Request,
response: Response,
user=Depends(auth_dep),
_access=feature_dep("update"),
):
item = await model.get(item_id)
if not item:
raise HTTPException(404, detail=f"{model.__name__} not found")
# ETag-kontroll
_verify_etag(request, item)
update_data = data.model_dump(exclude_unset=True)
# Before-update hooks
update_data = await hooks.execute(
HookType.BEFORE_UPDATE, data=update_data, item=item, user=user
)
# Strip readonly fields
update_data = _strip_readonly(update_data, model)
# Spara tidigare state för cascade
previous_state = item.model_dump()
await item.set(update_data)
# After-update hooks (cascade, audit)
await hooks.execute(
HookType.AFTER_UPDATE, item=item, previous=previous_state, user=user
)
if hasattr(item, "revision_id") and item.revision_id:
response.headers["ETag"] = f'"{item.revision_id}"'
return item
# ---- PUT /{id} (full ersättning) ----
if self.enable_put:
@router.put("/{item_id}")
async def replace_item(
item_id: PydanticObjectId,
data: create_schema,
request: Request,
response: Response,
user=Depends(auth_dep),
_access=feature_dep("update"),
):
item = await model.get(item_id)
if not item:
raise HTTPException(404, detail=f"{model.__name__} not found")
_verify_etag(request, item)
replace_data = data.model_dump()
replace_data = await hooks.execute(
HookType.BEFORE_UPDATE, data=replace_data, item=item, user=user
)
replace_data = _strip_readonly(replace_data, model)
previous_state = item.model_dump()
await item.set(replace_data)
await hooks.execute(
HookType.AFTER_UPDATE, item=item, previous=previous_state, user=user
)
if hasattr(item, "revision_id") and item.revision_id:
response.headers["ETag"] = f'"{item.revision_id}"'
return item
# ---- DELETE /{id} ----
@router.delete("/{item_id}", status_code=204)
async def delete_item(
item_id: PydanticObjectId,
request: Request,
user=Depends(auth_dep),
_access=feature_dep("delete"),
):
item = await model.get(item_id)
if not item:
raise HTTPException(404, detail=f"{model.__name__} not found")
_verify_etag(request, item)
# Before-delete hooks (cascade deny check)
await hooks.execute(HookType.BEFORE_DELETE, item=item, user=user)
snapshot = item.model_dump()
await item.delete()
# After-delete hooks (cascade null/delete, audit)
await hooks.execute(
HookType.AFTER_DELETE, item=item, snapshot=snapshot, user=user
)
return router
# --- Interna hjälpfunktioner ---
def _verify_etag(request: Request, item: Document):
"""Verifiera If-Match header mot dokumentets revision."""
if_match = request.headers.get("If-Match")
if not if_match:
raise HTTPException(428, detail="If-Match header required")
expected = f'"{item.revision_id}"'
if if_match != expected and if_match != str(item.revision_id):
raise HTTPException(412, detail="ETag mismatch — document has been modified")
def _strip_readonly(data: dict, model: Type[Document]) -> dict:
"""Ta bort fält markerade som readonly."""
readonly_fields = set()
for field_name, field_info in model.model_fields.items():
extra = field_info.json_schema_extra or {}
if extra.get("readonly"):
readonly_fields.add(field_name)
return {k: v for k, v in data.items() if k not in readonly_fields}
def _build_filters(query_params: dict, model: Type[Document]) -> dict:
"""Konvertera query parameters till MongoDB-filter."""
skip_params = {"page", "per_page", "sort", "fields"}
filters = {}
for key, value in query_params.items():
if key in skip_params:
continue
# Stöd för operatorer: field__gt=5, field__in=a,b,c
if "__" in key:
field, op = key.rsplit("__", 1)
mongo_ops = {
"gt": "$gt", "gte": "$gte", "lt": "$lt", "lte": "$lte",
"ne": "$ne", "in": "$in", "nin": "$nin",
"exists": "$exists", "regex": "$regex",
}
if op in mongo_ops:
if op in ("in", "nin"):
value = value.split(",")
elif op == "exists":
value = value.lower() == "true"
filters[field] = {mongo_ops[op]: value}
else:
filters[key] = value
return filters
6.2 Registrering — "en rad per resurs"¶
# Systemspecifik app.py (t.ex. airpark-api/src/airpark/app.py)
from craft_easy.app import create_base_app
from craft_easy.core.crud import Resource
from craft_easy.models.organization import Consortium, Region, Organization, Department, Unit
from craft_easy.models.user import User
from craft_easy.models.access_group import AccessGroup, AccessGroupTemplate, AccessGroupCluster
from .models.parking_zone import ParkingZone, ParkingZoneCreate, ParkingZoneUpdate
from .models.booking import Booking, BookingCreate, BookingUpdate
from .settings import AirparkSettings
settings = AirparkSettings()
app = create_base_app(settings)
# --- Bas-resurser (från craft-easy) ---
app.include_router(Resource(Consortium, "consortiums").build())
app.include_router(Resource(Region, "regions").build())
app.include_router(Resource(Organization, "organizations").build())
app.include_router(Resource(Department, "departments").build())
app.include_router(Resource(Unit, "units").build())
app.include_router(Resource(User, "users").build())
app.include_router(Resource(AccessGroup, "access-groups").build())
app.include_router(Resource(AccessGroupTemplate, "access-group-templates").build())
app.include_router(Resource(AccessGroupCluster, "access-group-clusters").build())
# --- Airpark-specifika resurser ---
zones = Resource(
ParkingZone, "parking-zones",
create_schema=ParkingZoneCreate,
update_schema=ParkingZoneUpdate,
features={
"list": "parking.zones.list",
"read": "parking.zones.read",
"create": "parking.zones.create",
"update": "parking.zones.update",
"delete": "parking.zones.delete",
},
)
@zones.before_create
async def set_zone_defaults(data: dict, **kwargs) -> dict:
data.setdefault("status", "active")
data.setdefault("capacity", 0)
return data
app.include_router(zones.build())
bookings_resource = Resource(
Booking, "bookings",
create_schema=BookingCreate,
update_schema=BookingUpdate,
features={"list": "bookings.list", "create": "bookings.create"},
)
app.include_router(bookings_resource.build())
7. Autentisering¶
7.1 Arkitektur¶
Autentiseringen är en fristående modul som kan slås av helt via AUTH_ENABLED=false. När den är av skippar alla auth-dependencies och returnerar en anonym användare.
┌──────────────────────────────────────────┐
│ AuthModule │
│ ┌─────────────────────┐ │
│ │ Settings │ │
│ │ AUTH_ENABLED: bool │ │
│ │ JWT_ALGORITHM: str │ │
│ │ JWT_EXPIRY: int │ │
│ │ OAUTH2_PROVIDERS: []│ │
│ └─────────────────────┘ │
│ │
│ ┌───────────┐ ┌──────────────────────┐ │
│ │ JWTService│ │ OAuth2Module │ │
│ │ encode() │ │ ┌─ Google ────────┐ │ │
│ │ decode() │ │ ├─ Microsoft ─────┤ │ │
│ │ refresh() │ │ ├─ GitHub ────────┤ │ │
│ └───────────┘ │ └─ Custom... ─────┘ │ │
│ └──────────────────────┘ │
│ │
│ ┌──────────────────────────────────────┐│
│ │ Dependencies (FastAPI Depends) ││
│ │ get_current_user() ││
│ │ get_optional_user() ││
│ │ require_auth() ││
│ │ require_token_type(TokenType.User) ││
│ └──────────────────────────────────────┘│
└──────────────────────────────────────────┘
7.2 Token-system¶
# src/craft_easy/core/auth/tokens.py
from enum import Enum
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
class TokenType(str, Enum):
USER = "UserToken"
PRE_AUTH = "PreAuthenticationToken"
PUBLIC = "PublicUserToken"
SERVICE = "ServiceToken" # Nytt: för maskin-till-maskin
QR = "QrToken"
class TokenPayload(BaseModel):
"""JWT payload — alla fält typade och validerade."""
# Identitet
token_id: str # Unikt token-ID (UUID)
token_type: TokenType
user_id: Optional[str] = None
user_name: Optional[str] = None
# Organisatorisk kontext
organization_id: Optional[str] = None
department_id: Optional[str] = None
unit_id: Optional[str] = None
# Metadata
ip: Optional[str] = None
origin_app: Optional[str] = None
refresh_count: int = 0
is_system_user: bool = False
api_code: Optional[str] = None
# OAuth2
oauth_provider: Optional[str] = None
# Tidsstämplar
exp: datetime # Expiration
iat: Optional[datetime] = None # Issued at
# Utökningsbart
extra: dict = {} # Systemspecifika extra-fält
7.3 JWT Service¶
# src/craft_easy/core/auth/jwt.py
import uuid
from datetime import datetime, timedelta, UTC
from typing import Optional
import jwt
from pydantic import BaseModel
from ...settings import Settings
from .tokens import TokenPayload, TokenType
class JWTService:
"""
JWT-hantering med ES512.
Stateless — inga instansvariabler förutom nycklar.
"""
def __init__(self, settings: Settings):
self.algorithm = settings.JWT_ALGORITHM # "ES512"
self.private_key = settings.JWT_PRIVATE_KEY
self.public_key = settings.JWT_PUBLIC_KEY
self.user_expiry = timedelta(minutes=settings.JWT_USER_EXPIRY_MINUTES)
self.refresh_max_lifetime = timedelta(
minutes=settings.JWT_REFRESH_MAX_LIFETIME_MINUTES
)
def create_user_token(
self,
user_id: str,
user_name: str,
ip: str,
origin_app: str,
organization_id: Optional[str] = None,
department_id: Optional[str] = None,
unit_id: Optional[str] = None,
is_system_user: bool = False,
api_code: Optional[str] = None,
extra: Optional[dict] = None,
) -> tuple[str, TokenPayload]:
"""Skapa en ny UserToken."""
now = datetime.now(UTC)
payload = TokenPayload(
token_id=str(uuid.uuid4()),
token_type=TokenType.USER,
user_id=user_id,
user_name=user_name,
organization_id=organization_id or "",
department_id=department_id or "",
unit_id=unit_id or "",
ip=ip,
origin_app=origin_app,
refresh_count=0,
is_system_user=is_system_user,
api_code=api_code,
exp=now + self.user_expiry,
iat=now,
extra=extra or {},
)
token_string = jwt.encode(
payload.model_dump(mode="json"),
self.private_key,
algorithm=self.algorithm,
)
return token_string, payload
def decode(self, token: str, verify_exp: bool = True) -> TokenPayload:
"""Avkoda och validera en JWT."""
try:
data = jwt.decode(
token,
self.public_key,
algorithms=[self.algorithm],
options={"verify_exp": verify_exp},
)
return TokenPayload(**data)
except jwt.ExpiredSignatureError:
raise AuthenticationError("Token has expired")
except jwt.InvalidTokenError as e:
raise AuthenticationError(f"Invalid token: {e}")
def refresh(self, token: str, ip: str) -> tuple[str, TokenPayload]:
"""
Förnya en token.
- Avkodar utan expiration check
- Verifierar att total livstid inte överstiger max
- Ökar refresh_count
- Returnerar ny token med nytt exp
"""
payload = self.decode(token, verify_exp=False)
# Kontrollera total livstid
if payload.iat:
age = datetime.now(UTC) - payload.iat
if age > self.refresh_max_lifetime:
raise AuthenticationError("Token refresh lifetime exceeded")
# Kontrollera IP (om inte system user)
if not payload.is_system_user and payload.ip != ip:
raise AuthenticationError("IP mismatch on refresh")
# Ny token
payload.refresh_count += 1
payload.exp = datetime.now(UTC) + self.user_expiry
token_string = jwt.encode(
payload.model_dump(mode="json"),
self.private_key,
algorithm=self.algorithm,
)
return token_string, payload
7.4 Auth Dependencies¶
# src/craft_easy/core/auth/dependencies.py
from typing import Optional
from fastapi import Depends, HTTPException, Request
from fastapi.security import APIKeyHeader
from ...settings import get_settings, Settings
from .jwt import JWTService
from .tokens import TokenPayload, TokenType
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
async def get_jwt_service(settings: Settings = Depends(get_settings)) -> JWTService:
return JWTService(settings)
async def get_current_user(
request: Request,
token: Optional[str] = Depends(api_key_header),
jwt_service: JWTService = Depends(get_jwt_service),
settings: Settings = Depends(get_settings),
) -> Optional[TokenPayload]:
"""
Huvudsaklig auth dependency.
Returnerar TokenPayload eller None beroende på konfiguration.
"""
# Auth disabled → returnera None
if not settings.AUTH_ENABLED:
return None
if not token:
return None
payload = jwt_service.decode(token)
# Spara i request state för access control
request.state.token = payload
request.state.user_id = payload.user_id
return payload
async def get_required_user(
user: Optional[TokenPayload] = Depends(get_current_user),
settings: Settings = Depends(get_settings),
) -> TokenPayload:
"""Kräver autentiserad användare. 401 om ingen token."""
if not settings.AUTH_ENABLED:
# Auth disabled — returnera dummy-token
return TokenPayload(
token_id="anonymous",
token_type=TokenType.USER,
exp=datetime.max,
)
if user is None:
raise HTTPException(401, detail="Authentication required")
return user
async def get_optional_user(
user: Optional[TokenPayload] = Depends(get_current_user),
) -> Optional[TokenPayload]:
"""Autentisering valfri — returnerar None om ingen token."""
return user
def require_token_type(*allowed_types: TokenType):
"""Factory för att kräva specifik token-typ."""
async def dependency(user: TokenPayload = Depends(get_required_user)):
if user.token_type not in allowed_types:
raise HTTPException(
403,
detail=f"Token type {user.token_type} not allowed. "
f"Required: {[t.value for t in allowed_types]}",
)
return user
return Depends(dependency)
7.5 OAuth2 Provider-system¶
# src/craft_easy/core/auth/oauth2/base.py
from abc import ABC, abstractmethod
from typing import Optional
from pydantic import BaseModel
from authlib.integrations.starlette_client import OAuth
class OAuth2UserInfo(BaseModel):
"""Normaliserad användarinfo från OAuth2-provider."""
email: str
name: Optional[str] = None
picture: Optional[str] = None
provider: str
provider_user_id: str
class BaseOAuth2Provider(ABC):
"""Abstrakt bas för OAuth2-providers."""
name: str
@abstractmethod
def register(self, oauth: OAuth, settings: dict) -> None:
"""Registrera providern med Authlib OAuth-klienten."""
...
@abstractmethod
async def get_user_info(self, token: dict) -> OAuth2UserInfo:
"""Hämta normaliserad användarinfo från provider-token."""
...
# src/craft_easy/core/auth/oauth2/google.py
class GoogleProvider(BaseOAuth2Provider):
name = "google"
def register(self, oauth: OAuth, settings: dict) -> None:
oauth.register(
name="google",
client_id=settings["client_id"],
client_secret=settings["client_secret"],
server_metadata_url="https://accounts.google.com/.well-known/openid-configuration",
client_kwargs={"scope": "openid email profile"},
)
async def get_user_info(self, token: dict) -> OAuth2UserInfo:
userinfo = token.get("userinfo", {})
return OAuth2UserInfo(
email=userinfo["email"],
name=userinfo.get("name"),
picture=userinfo.get("picture"),
provider="google",
provider_user_id=userinfo["sub"],
)
7.6 Auth Routes¶
# src/craft_easy/routes/auth.py
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from ..core.auth.jwt import JWTService
from ..core.auth.dependencies import get_jwt_service, get_required_user
from ..models.auth import UserAuthentication
from ..models.user import User
router = APIRouter(prefix="/auth", tags=["authentication"])
@router.post("/login/api-code")
async def login_api_code(
request: Request,
jwt_service: JWTService = Depends(get_jwt_service),
):
"""
Autentisering med API-kod (maskin-till-maskin).
Klienten skickar JWT signerad med sin privata nyckel.
"""
api_key = request.headers.get("X-API-Key")
if not api_key:
raise HTTPException(401, "X-API-Key header required")
# Dekoda klientens JWT för att få api_code
# Validera signaturen mot användarens publika nyckel
# ... (porteras från nuvarande authenticate/api-code)
@router.post("/login/email/{email}")
async def login_email_init(email: str, application: str):
"""Initiera email-inloggning — skicka engångskod."""
# ... (porteras från nuvarande authenticates/email)
@router.post("/login/email/{email}/verify")
async def login_email_verify(email: str, code: str, application: str):
"""Verifiera email-kod och returnera token."""
# ...
@router.post("/refresh")
async def refresh_token(
request: Request,
jwt_service: JWTService = Depends(get_jwt_service),
user=Depends(get_required_user),
):
"""Förnya en befintlig token."""
old_token = request.headers.get("X-API-Key")
ip = request.client.host
new_token, payload = jwt_service.refresh(old_token, ip)
# Uppdatera refresh_count i DB
await UserAuthentication.find_one(
UserAuthentication.token_id == payload.token_id
).update({"$inc": {"refresh_count": 1}})
return {"token": new_token, "expires": payload.exp.isoformat()}
@router.delete("/logout")
async def logout(user=Depends(get_required_user)):
"""Invalidera token (ta bort från DB)."""
await UserAuthentication.find_one(
UserAuthentication.token_id == user.token_id
).delete()
return {"status": "logged_out"}
8. Rättighetssystem¶
8.1 Designfilosofi — Feature-baserat¶
Det nya rättighetssystemet bygger på features istället för endpoint/metod-matriser. En feature är en namngiven förmåga (t.ex. "bookings.create", "reports.export", "admin.users.manage").
Fördelar jämfört med nuvarande system:
- Enklare att förstå: "Kan denna användare exportera rapporter?" → require_feature("reports.export")
- Enklare att konfigurera: En lista med feature-strängar istället för en matris av endpoints × metoder × attribut
- Samma mekanism för API-endpoints och UI-element
- Behåller nuvarande styrka: attribut-access och datafilter
8.2 Feature-register¶
# src/craft_easy/core/access/features.py
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class FeatureDefinition:
"""Definition av en feature."""
name: str # "bookings.create"
description: str # "Create new bookings"
category: str # "bookings"
depends_on: list[str] = field(default_factory=list) # Beroenden
class FeatureRegistry:
"""
Centralt register över alla features i systemet.
Används för validering och dokumentation.
"""
def __init__(self):
self._features: dict[str, FeatureDefinition] = {}
def register(
self,
name: str,
description: str,
category: Optional[str] = None,
depends_on: Optional[list[str]] = None,
) -> FeatureDefinition:
cat = category or name.rsplit(".", 1)[0]
feat = FeatureDefinition(
name=name,
description=description,
category=cat,
depends_on=depends_on or [],
)
self._features[name] = feat
return feat
def validate(self, feature_names: list[str]) -> list[str]:
"""Validera att alla features finns registrerade."""
unknown = [f for f in feature_names if f not in self._features]
if unknown:
raise ValueError(f"Unknown features: {unknown}")
return feature_names
def resolve_dependencies(self, feature_names: list[str]) -> set[str]:
"""Expandera features med deras beroenden."""
resolved = set()
stack = list(feature_names)
while stack:
name = stack.pop()
if name in resolved:
continue
resolved.add(name)
feat = self._features.get(name)
if feat:
stack.extend(feat.depends_on)
return resolved
def all_features(self) -> list[FeatureDefinition]:
return list(self._features.values())
def categories(self) -> dict[str, list[FeatureDefinition]]:
cats: dict[str, list[FeatureDefinition]] = {}
for feat in self._features.values():
cats.setdefault(feat.category, []).append(feat)
return cats
# Global singleton
feature_registry = FeatureRegistry()
# Bas-features (registreras av craft-easy)
feature_registry.register("users.list", "List users")
feature_registry.register("users.read", "Read user details")
feature_registry.register("users.create", "Create users")
feature_registry.register("users.update", "Update users")
feature_registry.register("users.delete", "Delete users")
# ... etc. för alla bas-resurser
8.3 Access Guards¶
# src/craft_easy/core/access/guards.py
from fastapi import Depends, HTTPException, Request
from ..auth.dependencies import get_required_user
from ..auth.tokens import TokenPayload
from .accumulator import AccessAccumulator
from ...settings import get_settings, Settings
class FeatureGuard:
"""
FastAPI Dependency som kontrollerar att användaren har en specifik feature.
Användning:
@router.get("/reports")
async def get_reports(
_: None = Depends(FeatureGuard("reports.export"))
):
...
"""
def __init__(self, required_feature: str):
self.required_feature = required_feature
async def __call__(
self,
request: Request,
user: TokenPayload = Depends(get_required_user),
settings: Settings = Depends(get_settings),
):
# Access control disabled → tillåt allt
if not settings.ACCESS_CONTROL_ENABLED:
return
# System users → full access
if user.is_system_user:
return
# Hämta användarens effektiva features
effective_features = await AccessAccumulator.get_effective_features(user.user_id)
if self.required_feature not in effective_features:
raise HTTPException(
403,
detail=f"Missing required feature: {self.required_feature}",
)
# Spara access-kontext i request state (för datafilter etc.)
request.state.access_rights = await AccessAccumulator.get_access_rights(
user.user_id, request.url.path
)
class RequireAnyFeature:
"""Kräver minst en av flera features."""
def __init__(self, *features: str):
self.features = set(features)
async def __call__(
self,
request: Request,
user: TokenPayload = Depends(get_required_user),
settings: Settings = Depends(get_settings),
):
if not settings.ACCESS_CONTROL_ENABLED or user.is_system_user:
return
effective = await AccessAccumulator.get_effective_features(user.user_id)
if not self.features.intersection(effective):
raise HTTPException(
403,
detail=f"Requires one of: {sorted(self.features)}",
)
class RequireAllFeatures:
"""Kräver alla angivna features."""
def __init__(self, *features: str):
self.features = set(features)
async def __call__(
self,
request: Request,
user: TokenPayload = Depends(get_required_user),
settings: Settings = Depends(get_settings),
):
if not settings.ACCESS_CONTROL_ENABLED or user.is_system_user:
return
effective = await AccessAccumulator.get_effective_features(user.user_id)
missing = self.features - effective
if missing:
raise HTTPException(
403,
detail=f"Missing features: {sorted(missing)}",
)
8.4 Access Accumulator¶
# src/craft_easy/core/access/accumulator.py
from datetime import datetime, UTC
from typing import Optional
from beanie import PydanticObjectId
from ...models.user import User
from ...models.access_group import AccessGroup
class AccessAccumulator:
"""
Ackumulerar rättigheter från alla användarens aktiva accessgrupper.
Merge-regler (samma som nuvarande system):
- Metoder: union (om NÅGON grupp ger GET → användaren har GET)
- Attribut-access: write > read > none
- Filter: OR (union av alla filter-värden)
- Features: union
"""
@staticmethod
async def get_effective_features(user_id: str) -> set[str]:
"""Hämta alla features användaren har via sina accessgrupper."""
user = await User.get(PydanticObjectId(user_id))
if not user:
return set()
now = datetime.now(UTC)
# Filtrera aktiva accessgrupp-tilldelningar
active_group_ids = [
entry.access_group_id
for entry in user.data_access
if (entry.valid_from is None or entry.valid_from <= now)
and (entry.valid_until is None or entry.valid_until > now)
]
if not active_group_ids:
return set()
# Hämta alla grupper
groups = await AccessGroup.find(
{"_id": {"$in": active_group_ids}}
).to_list()
# Ackumulera features
features: set[str] = set()
for group in groups:
features.update(group.features)
# Features från access_rights (bakåtkompatibelt)
for resource_name, resource_access in group.access_rights.items():
features.update(resource_access.features)
return features
@staticmethod
async def get_access_rights(
user_id: str, resource: str
) -> Optional[dict]:
"""
Hämta ackumulerade rättigheter för en specifik resurs.
Returnerar merged access rights från alla aktiva grupper.
"""
user = await User.get(PydanticObjectId(user_id))
if not user:
return None
now = datetime.now(UTC)
active_group_ids = [
entry.access_group_id
for entry in user.data_access
if (entry.valid_from is None or entry.valid_from <= now)
and (entry.valid_until is None or entry.valid_until > now)
]
groups = await AccessGroup.find(
{"_id": {"$in": active_group_ids}}
).to_list()
# Extrahera resource-namn från URL-path
resource_name = resource.strip("/").split("/")[0]
# Merge
merged = {
"methods": set(),
"attribute_access": {},
"filters": {},
"full_attribute_access": False,
"full_filter_access": False,
}
for group in groups:
ra = group.access_rights.get(resource_name)
if not ra:
continue
# Metoder: union
merged["methods"].update(ra.methods)
# Full access flaggor: OR
if ra.full_attribute_access:
merged["full_attribute_access"] = True
if ra.full_filter_access:
merged["full_filter_access"] = True
# Attribut-access: write > read > none
for attr, level in ra.attribute_access.items():
current = merged["attribute_access"].get(attr, "none")
if level == "write" or (level == "read" and current != "write"):
merged["attribute_access"][attr] = level
# Filter: union
for field, values in ra.filters.items():
existing = merged["filters"].get(field, [])
merged["filters"][field] = list(set(existing + values))
merged["methods"] = list(merged["methods"])
return merged
9. Cascade-operationer¶
9.1 Cascade Delete¶
# src/craft_easy/core/cascade/delete.py
from typing import Type
from beanie import Document, PydanticObjectId
from fastapi import HTTPException
from .registry import cascade_registry
class CascadeDeleteService:
"""
Hanterar cascade-delete med tre strategier:
- "deny": Förhindra borttagning om refererade dokument finns
- "null": Sätt referensen till null
- "delete": Ta bort refererade dokument rekursivt
"""
@staticmethod
async def execute(model: Type[Document], item_id: PydanticObjectId):
"""
Kör cascade delete-logik FÖRE borttagning (deny)
och EFTER borttagning (null, delete).
"""
config = getattr(model, "CascadeConfig", None)
if not config or not config.delete:
return
collection_name = model.Settings.name
# Hitta alla modeller som refererar till denna collection
subscribers = cascade_registry.get_delete_subscribers(collection_name)
for sub in subscribers:
field = sub["field"]
action = sub["action"]
subscriber_model = sub["model"]
# Bygg query — stöd för nested paths (t.ex. "data_access.access_group_id")
if "." in field:
# Nested array field
parts = field.split(".")
query = {f"{parts[0]}.{parts[1]}": item_id}
else:
query = {field: item_id}
if action == "deny":
# Kontrollera om refererade dokument finns
count = await subscriber_model.find(query).count()
if count > 0:
raise HTTPException(
422,
detail=(
f"Cannot delete {collection_name}/{item_id}: "
f"referenced by {count} {subscriber_model.Settings.name} document(s) "
f"via field '{field}'"
),
)
elif action == "null":
if "." in field:
# Nested: pull from array
parts = field.split(".")
await subscriber_model.find(query).update_many(
{"$pull": {parts[0]: {parts[1]: item_id}}}
)
else:
await subscriber_model.find(query).update_many(
{"$set": {field: None}}
)
elif action == "delete":
if "." in field:
parts = field.split(".")
await subscriber_model.find(query).update_many(
{"$pull": {parts[0]: {parts[1]: item_id}}}
)
else:
# Rekursiv delete — cascade vidare
docs = await subscriber_model.find(query).to_list()
for doc in docs:
await CascadeDeleteService.execute(
subscriber_model, doc.id
)
await doc.delete()
9.2 Cascade Update¶
# src/craft_easy/core/cascade/update.py
from typing import Type, Any
from beanie import Document, PydanticObjectId
from .registry import cascade_registry
MAX_CHAIN_DEPTH = 10
class CascadeUpdateService:
"""
Hanterar cascade-uppdateringar (subscriber-mönster).
När ett dokument uppdateras, propagerar ändringar till
alla dokument som prenumererar på dess fält.
Kedja: Max 10 nivåer djup för att förhindra oändliga loopar.
"""
@staticmethod
async def populate_readonly_fields(
model: Type[Document], data: dict
) -> dict:
"""
FÖRE insert/update: Slå upp och populera readonly-fält
baserat på cascade_update-konfigurationen.
Exempel: Om data innehåller region_id, slå upp region.name
och sätt region_name automatiskt.
"""
config = getattr(model, "CascadeConfig", None)
if not config or not config.update:
return data
for local_field, source_path in config.update.items():
# Parsa source_path: "region_id.name" → (relation_field="region_id", source_field="name")
parts = source_path.split(".")
if len(parts) == 2:
relation_field, source_field = parts
# Kontrollera om relation-fältet finns i data eller redan är satt
ref_id = data.get(relation_field)
if ref_id is None:
continue
# Slå upp det relaterade dokumentet
ref_model = cascade_registry.get_model_for_field(
model.Settings.name, relation_field
)
if ref_model:
ref_doc = await ref_model.get(ref_id)
if ref_doc:
value = getattr(ref_doc, source_field, None)
data[local_field] = value
return data
@staticmethod
async def publish_updates(
model: Type[Document],
item_id: PydanticObjectId,
changes: dict[str, Any],
chain_depth: int = 0,
):
"""
EFTER update: Propagera ändringar till prenumeranter.
Om Organization.name ändras → uppdatera alla Departments som har
organization_name cascade-uppdaterad.
"""
if chain_depth >= MAX_CHAIN_DEPTH:
return
collection_name = model.Settings.name
# Hitta alla som prenumererar på ändringar i denna collection
subscribers = cascade_registry.get_update_subscribers(collection_name)
for sub in subscribers:
local_field = sub["local_field"] # t.ex. "organization_name"
relation_field = sub["relation_field"] # t.ex. "organization_id"
source_field = sub["source_field"] # t.ex. "name"
subscriber_model = sub["model"]
# Kontrollera om det ändrade fältet matchar source_field
if source_field not in changes:
continue
new_value = changes[source_field]
# Hitta alla dokument som refererar till det uppdaterade dokumentet
if "." in relation_field:
# Nested array: t.ex. "organizations.id"
parts = relation_field.split(".")
query = {f"{parts[0]}.{parts[1]}": item_id}
update = {f"$set": {f"{parts[0]}.$.{local_field.split('.')[-1]}": new_value}}
else:
query = {relation_field: item_id}
update = {"$set": {local_field: new_value}}
# Uppdatera alla prenumeranter
result = await subscriber_model.find(query).update_many(update)
# Kedja: om prenumerantens uppdaterade fält OCKSÅ har prenumeranter
if result.modified_count > 0:
updated_docs = await subscriber_model.find(query).to_list()
for doc in updated_docs:
await CascadeUpdateService.publish_updates(
subscriber_model,
doc.id,
{local_field: new_value},
chain_depth=chain_depth + 1,
)
9.3 Cascade Registry¶
# src/craft_easy/core/cascade/registry.py
from typing import Type
from beanie import Document
class CascadeRegistry:
"""
Central registrering av cascade-regler.
Byggs automatiskt vid app-start genom att inspektera
alla modellers CascadeConfig.
"""
def __init__(self):
self._delete_subscribers: dict[str, list[dict]] = {} # collection → subscribers
self._update_subscribers: dict[str, list[dict]] = {}
self._models: dict[str, Type[Document]] = {} # collection → model class
self._field_relations: dict[str, dict[str, Type[Document]]] = {}
def register_model(self, model: Type[Document]):
"""Registrera en modell och dess cascade-konfiguration."""
collection = model.Settings.name
self._models[collection] = model
config = getattr(model, "CascadeConfig", None)
if not config:
return
# Registrera delete-regler
for field, action in getattr(config, "delete", {}).items():
# Fastställ vilken collection fältet pekar på
target_collection = self._resolve_target(model, field)
if target_collection:
self._delete_subscribers.setdefault(target_collection, []).append({
"model": model,
"field": field,
"action": action,
})
# Registrera update-regler
for local_field, source_path in getattr(config, "update", {}).items():
parts = source_path.split(".")
if len(parts) >= 2:
relation_field = parts[0]
source_field = ".".join(parts[1:])
target_collection = self._resolve_target(model, relation_field)
if target_collection:
self._update_subscribers.setdefault(target_collection, []).append({
"model": model,
"local_field": local_field,
"relation_field": relation_field,
"source_field": source_field,
})
def _resolve_target(self, model: Type[Document], field: str) -> str | None:
"""Avgör vilken collection ett fält pekar på via Beanie Link eller naming convention."""
# Konvention: field som slutar på _id pekar på collection
# T.ex. organization_id → organizations
base_field = field.split(".")[0] # Hantera nested som "data_access.access_group_id"
if base_field.endswith("_id"):
entity = base_field[:-3] # "organization"
# Sök bland registrerade modeller
for coll_name in self._models:
if coll_name.rstrip("s") == entity or coll_name == entity + "s":
return coll_name
return None
def get_delete_subscribers(self, collection: str) -> list[dict]:
return self._delete_subscribers.get(collection, [])
def get_update_subscribers(self, collection: str) -> list[dict]:
return self._update_subscribers.get(collection, [])
def get_model_for_field(self, collection: str, field: str) -> Type[Document] | None:
target = self._resolve_target(self._models[collection], field)
return self._models.get(target) if target else None
# Global singleton
cascade_registry = CascadeRegistry()
10. Middleware, ETag, Audit, Felhantering¶
10.1 Middleware-stack¶
Middleware exekveras i ordning (ytterst → innerst vid request, innerst → ytterst vid response):
# src/craft_easy/app.py
from fastapi import FastAPI
from .core.middleware.cors import setup_cors
from .core.middleware.request_id import RequestIdMiddleware
from .core.middleware.logging import StructuredLoggingMiddleware
from .core.middleware.rate_limit import setup_rate_limiting
from .core.middleware.etag import ETagMiddleware
from .core.errors.handlers import register_error_handlers
from .settings import Settings
def create_base_app(settings: Settings) -> FastAPI:
app = FastAPI(
title=settings.APP_TITLE,
version=settings.APP_VERSION,
docs_url="/docs" if settings.DOCS_ENABLED else None,
redoc_url="/redoc" if settings.DOCS_ENABLED else None,
)
# Middleware (ordning: första = ytterst)
setup_cors(app, settings)
app.add_middleware(RequestIdMiddleware)
app.add_middleware(StructuredLoggingMiddleware)
if settings.RATE_LIMIT_ENABLED:
setup_rate_limiting(app, settings)
if settings.ETAG_ENABLED:
app.add_middleware(ETagMiddleware)
# Error handlers
register_error_handlers(app)
# Health endpoints
from .routes.health import router as health_router
app.include_router(health_router)
# Auth routes (om auth är aktivt)
if settings.AUTH_ENABLED:
from .routes.auth import router as auth_router
app.include_router(auth_router)
# Startup: initialisera DB, registrera modeller
@app.on_event("startup")
async def startup():
await init_database(settings)
return app
10.2 Request ID Middleware¶
# src/craft_easy/core/middleware/request_id.py
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
class RequestIdMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
request.state.request_id = request_id
response: Response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
10.3 ETag Middleware¶
# src/craft_easy/core/middleware/etag.py
import hashlib
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
class ETagMiddleware(BaseHTTPMiddleware):
"""
Hanterar ETag för GET-responses (If-None-Match → 304).
PATCH/PUT/DELETE ETag-validering sker i Resource-klassen
(kräver dokumentets revision_id, inte response-hash).
"""
async def dispatch(self, request: Request, call_next):
response: Response = await call_next(request)
# Bara för GET
if request.method != "GET":
return response
# Om ETag redan satt av route handler (revision_id) — använd det
if "ETag" in response.headers:
etag = response.headers["ETag"]
else:
# Generera ETag från response body
body = b""
async for chunk in response.body_iterator:
body += chunk
etag = f'"{hashlib.md5(body).hexdigest()}"'
response.headers["ETag"] = etag
# Återskapa body iterator
response.body = body
# Conditional GET: If-None-Match
if_none_match = request.headers.get("If-None-Match")
if if_none_match and if_none_match == etag:
return Response(status_code=304, headers={"ETag": etag})
return response
10.4 Audit (Operationslogg)¶
# src/craft_easy/core/audit/oplog.py
from typing import Optional
from beanie import PydanticObjectId
from ...models.auth import AuditEntry
from ..auth.tokens import TokenPayload
from ...settings import get_settings
class AuditLogger:
"""Loggar alla mutationer till audit_log collection."""
@staticmethod
async def log(
resource: str,
item_id: PydanticObjectId,
operation: str,
user: Optional[TokenPayload] = None,
changes: Optional[dict] = None,
snapshot: Optional[dict] = None,
):
settings = get_settings()
if not settings.AUDIT_LOG_ENABLED:
return
entry = AuditEntry(
resource=resource,
item_id=item_id,
operation=operation,
user_id=PydanticObjectId(user.user_id) if user and user.user_id else None,
user_name=user.user_name if user else None,
ip=user.ip if user else None,
changes=changes,
snapshot=snapshot,
)
await entry.insert()
10.5 Felhantering¶
# src/craft_easy/core/errors/exceptions.py
class ESIBaseError(Exception):
"""Bas för alla Craft Easy-fel."""
def __init__(self, message: str, status_code: int = 500, detail: dict | None = None):
self.message = message
self.status_code = status_code
self.detail = detail or {}
super().__init__(message)
class AuthenticationError(ESIBaseError):
def __init__(self, message: str = "Authentication failed"):
super().__init__(message, status_code=401)
class AuthorizationError(ESIBaseError):
def __init__(self, message: str = "Access denied", feature: str | None = None):
super().__init__(message, status_code=403, detail={"feature": feature})
class NotFoundError(ESIBaseError):
def __init__(self, resource: str, item_id: str):
super().__init__(f"{resource} {item_id} not found", status_code=404)
class ConflictError(ESIBaseError):
def __init__(self, message: str = "Resource conflict"):
super().__init__(message, status_code=409)
class ETagMismatchError(ESIBaseError):
def __init__(self):
super().__init__("ETag mismatch — document has been modified", status_code=412)
class ETagRequiredError(ESIBaseError):
def __init__(self):
super().__init__("If-Match header required", status_code=428)
class CascadeDenyError(ESIBaseError):
def __init__(self, resource: str, item_id: str, referenced_by: str, count: int):
super().__init__(
f"Cannot delete {resource}/{item_id}: referenced by {count} {referenced_by}",
status_code=422,
)
class ValidationError(ESIBaseError):
def __init__(self, errors: list[dict]):
super().__init__("Validation failed", status_code=422, detail={"errors": errors})
# src/craft_easy/core/errors/handlers.py
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from pydantic import ValidationError as PydanticValidationError
from .exceptions import ESIBaseError
def register_error_handlers(app: FastAPI):
@app.exception_handler(ESIBaseError)
async def esi_error_handler(request: Request, exc: ESIBaseError):
return JSONResponse(
status_code=exc.status_code,
content={
"error": type(exc).__name__,
"message": exc.message,
"detail": exc.detail,
"request_id": getattr(request.state, "request_id", None),
},
)
@app.exception_handler(PydanticValidationError)
async def pydantic_error_handler(request: Request, exc: PydanticValidationError):
return JSONResponse(
status_code=422,
content={
"error": "ValidationError",
"message": "Request validation failed",
"detail": {"errors": exc.errors()},
"request_id": getattr(request.state, "request_id", None),
},
)
11. Säkerhet¶
11.1 OWASP Top 10 — Åtgärder¶
| OWASP Risk | Åtgärd i Craft Easy |
|---|---|
| A01: Broken Access Control | Feature-baserat rättighetssystem, attribut-access, datafilter, ETag |
| A02: Cryptographic Failures | ES512 (ECDSA) för JWT, bcrypt för lösenord, TLS enforced i prod |
| A03: Injection | Pydantic-validering på ALL input, parameteriserade MongoDB-queries (Beanie) |
| A04: Insecure Design | Secure by default — auth och access PÅ från start |
| A05: Security Misconfiguration | Pydantic Settings med validering, inga defaults som är osäkra |
| A06: Vulnerable Components | Pinnade dependencies, Dependabot, pip-audit i CI |
| A07: Auth Failures | Rate limiting på auth-endpoints, IP-binding, token-rotation |
| A08: Data Integrity Failures | ETag concurrency, audit trail, cascade deny |
| A09: Logging & Monitoring | Strukturerad loggning (structlog), Prometheus metrics, audit log |
| A10: SSRF | Ingen URL-fetching från user input, OAuth2 via konfigurerade providers |
11.2 Rate Limiting¶
# src/craft_easy/core/middleware/rate_limit.py
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi import FastAPI
from ...settings import Settings
def setup_rate_limiting(app: FastAPI, settings: Settings):
limiter = Limiter(
key_func=get_remote_address,
default_limits=[settings.RATE_LIMIT_DEFAULT], # t.ex. "100/minute"
storage_uri=settings.RATE_LIMIT_STORAGE_URI, # Redis URI eller "memory://"
)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Striktare limits på auth-endpoints
# Konfigureras per route via @limiter.limit("5/minute")
11.3 CORS¶
# src/craft_easy/core/middleware/cors.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from ...settings import Settings
def setup_cors(app: FastAPI, settings: Settings):
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS, # Explicit lista, aldrig "*" i prod
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"],
allow_headers=["X-API-Key", "If-Match", "X-Request-ID", "Content-Type"],
expose_headers=["ETag", "X-Total-Count", "X-Page", "X-Per-Page", "X-Total-Pages", "X-Request-ID"],
max_age=3600,
)
11.4 Secrets Management¶
# src/craft_easy/settings.py (relevanta delar)
from pydantic_settings import BaseSettings
from pydantic import Field, SecretStr
from typing import Optional
class Settings(BaseSettings):
"""
Central konfiguration.
Alla hemligheter via miljövariabler — aldrig i kod.
"""
# --- App ---
APP_TITLE: str = "Craft Easy API"
APP_VERSION: str = "1.0.0"
ENVIRONMENT: str = "development" # development | test | production
DEBUG: bool = False
DOCS_ENABLED: bool = True
# --- Database ---
MONGODB_URI: SecretStr = Field(default="mongodb://localhost:27017")
DATABASE_NAME: str = "esi_development"
# --- Auth ---
AUTH_ENABLED: bool = True
JWT_ALGORITHM: str = "ES512"
JWT_PRIVATE_KEY: SecretStr # Ingen default — måste sättas
JWT_PUBLIC_KEY: str # Publik nyckel kan vara i klartext
JWT_USER_EXPIRY_MINUTES: int = 30
JWT_REFRESH_MAX_LIFETIME_MINUTES: int = 43200 # 30 dagar
# --- Access Control ---
ACCESS_CONTROL_ENABLED: bool = True
SYSTEM_USERS: list[str] = Field(default_factory=list) # API codes med full access
# --- OAuth2 ---
OAUTH2_PROVIDERS: list[str] = Field(default_factory=list)
OAUTH2_GOOGLE_CLIENT_ID: Optional[str] = None
OAUTH2_GOOGLE_CLIENT_SECRET: Optional[SecretStr] = None
OAUTH2_MICROSOFT_CLIENT_ID: Optional[str] = None
OAUTH2_MICROSOFT_CLIENT_SECRET: Optional[SecretStr] = None
# ... etc.
# --- Features ---
AUDIT_LOG_ENABLED: bool = True
CASCADE_ENABLED: bool = True
ETAG_ENABLED: bool = True
RATE_LIMIT_ENABLED: bool = True
RATE_LIMIT_DEFAULT: str = "100/minute"
RATE_LIMIT_STORAGE_URI: str = "memory://"
# --- CORS ---
CORS_ORIGINS: list[str] = Field(default=["http://localhost:3000"])
# --- Logging ---
LOG_LEVEL: str = "INFO"
LOG_FORMAT: str = "json" # json | console
model_config = {
"env_file": ".env",
"env_file_encoding": "utf-8",
"case_sensitive": True,
}
11.5 Säkerhetshärdning — checklista¶
- [ ]
JWT_PRIVATE_KEYhar inga defaults — kraschar vid start om ej satt - [ ]
CORS_ORIGINSär explicit lista — aldrig["*"]i produktion - [ ] Rate limiting aktivt på
/auth/*routes (5/minut default) - [ ] Lösenord hashas med bcrypt (work factor 12)
- [ ] Alla MongoDB-queries via Beanie ODM (parameteriserade)
- [ ] Inga
eval(),exec(), ellersubprocessmed user input - [ ]
SecretStrför alla hemliga konfigurationsvärden - [ ] Audit log på alla mutationer
- [ ] IP-binding på tokens (konfigurerbart)
- [ ] Token expiry: 30 min default, max 30 dagars refresh-livstid
- [ ] ETag required på alla PATCH/PUT/DELETE
- [ ] Request ID på alla responses (för spårbarhet)
- [ ] Security headers via reverse proxy (HSTS, CSP, X-Frame-Options)
12. Testning och kvalitet¶
12.1 Teststrategi¶
┌──────────────┐
│ E2E Tests │ ~10% — Full app + DB
│ (httpx) │
┌┴──────────────┴┐
│ Integration │ ~30% — Real DB, multiple modules
│ Tests │
┌┴────────────────┴┐
│ Unit Tests │ ~60% — Mocked dependencies
│ (pytest) │
└───────────────────┘
12.2 Test-fixtures¶
# tests/conftest.py
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
from mongomock_motor import AsyncMongoMockClient
from craft_easy.app import create_base_app
from craft_easy.settings import Settings
from craft_easy.models.user import User
from craft_easy.models.organization import (
Consortium, Region, Organization, Department, Unit,
)
from craft_easy.models.access_group import AccessGroup
from craft_easy.core.auth.jwt import JWTService
@pytest.fixture
def test_settings():
return Settings(
ENVIRONMENT="test",
MONGODB_URI="mongodb://localhost:27017",
DATABASE_NAME="esi_test",
AUTH_ENABLED=True,
ACCESS_CONTROL_ENABLED=True,
JWT_PRIVATE_KEY="<test-private-key>",
JWT_PUBLIC_KEY="<test-public-key>",
CORS_ORIGINS=["*"],
)
@pytest_asyncio.fixture
async def app(test_settings):
app = create_base_app(test_settings)
# Initiera test-DB
client = AsyncIOMotorClient(test_settings.MONGODB_URI.get_secret_value())
db = client[test_settings.DATABASE_NAME]
await init_beanie(
database=db,
document_models=[User, Consortium, Region, Organization, Department, Unit, AccessGroup],
)
yield app
# Cleanup
await client.drop_database(test_settings.DATABASE_NAME)
client.close()
@pytest_asyncio.fixture
async def client(app):
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
yield client
@pytest_asyncio.fixture
async def auth_headers(test_settings):
"""Generera auth headers med giltig JWT för tester."""
jwt_service = JWTService(test_settings)
token, _ = jwt_service.create_user_token(
user_id="000000000000000000000001",
user_name="Test User",
ip="127.0.0.1",
origin_app="test",
is_system_user=True,
)
return {"X-API-Key": token}
12.3 Exempeltester¶
# tests/integration/test_crud.py
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_create_and_get_consortium(client: AsyncClient, auth_headers: dict):
# Create
response = await client.post(
"/consortiums",
json={"name": "Test Consortium"},
headers=auth_headers,
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "Test Consortium"
assert "id" in data
etag = response.headers["ETag"]
# Get
response = await client.get(
f"/consortiums/{data['id']}",
headers=auth_headers,
)
assert response.status_code == 200
assert response.json()["name"] == "Test Consortium"
@pytest.mark.asyncio
async def test_patch_requires_etag(client: AsyncClient, auth_headers: dict):
# Create
response = await client.post(
"/consortiums",
json={"name": "Original"},
headers=auth_headers,
)
item_id = response.json()["id"]
# Patch without ETag → 428
response = await client.patch(
f"/consortiums/{item_id}",
json={"name": "Updated"},
headers=auth_headers,
)
assert response.status_code == 428
@pytest.mark.asyncio
async def test_cascade_deny_delete(client: AsyncClient, auth_headers: dict):
# Create consortium → region
c = await client.post("/consortiums", json={"name": "C"}, headers=auth_headers)
c_id = c.json()["id"]
c_etag = c.headers["ETag"]
await client.post(
"/regions",
json={"name": "R", "consortium_id": c_id},
headers=auth_headers,
)
# Try delete consortium → 422 (deny)
response = await client.delete(
f"/consortiums/{c_id}",
headers={**auth_headers, "If-Match": c_etag},
)
assert response.status_code == 422
assert "referenced by" in response.json()["message"]
12.4 Kvalitetskrav¶
| Krav | Mål | Verktyg |
|---|---|---|
| Kodtäckning | >90% på core/ |
coverage + pytest-cov |
| Typning | 100% — inga Any |
mypy --strict |
| Linting | Noll varningar | ruff check |
| Formatering | Konsekvent | ruff format |
| Säkerhet | Inga kända CVE:er | pip-audit |
| Docstrings | Alla publika API:er | ruff docstring-regler |
12.5 CI Pipeline¶
# .github/workflows/ci.yml
name: CI
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
services:
mongodb:
image: mongo:7
ports: [27017:27017]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- run: pip install -e ".[dev]"
- run: ruff check src/ tests/
- run: ruff format --check src/ tests/
- run: mypy src/
- run: pytest --cov=src/craft_easy --cov-report=xml -v
- run: pip-audit
13. Migrationsstrategi¶
13.1 Fasplan¶
Fas 1: Fundament Fas 2: Kärnlogik Fas 3: Portering Fas 4: Validering
(4 veckor) (5 veckor) (3 veckor) (2 veckor)
───────────────────── ────────────────────── ───────────────────── ──────────────────
┌─ Projektstruktur │ ┌─ CRUD Resource │ ┌─ Portera auth │ ┌─ Lasttester │
├─ Settings │ ├─ Feature guards │ ├─ Portera OAuth2 │ ├─ Säkerhetsaudit │
├─ BaseDocument │ ├─ Access accumulator │ ├─ Portera cascade │ ├─ API-kompattest │
├─ App factory │ ├─ Cascade delete │ ├─ Portera hooks │ ├─ Dokumentation │
├─ JWT Service │ ├─ Cascade update │ ├─ Referens-impl. │ └─ Release prep │
├─ Middleware stack │ ├─ Audit logger │ │ (Airpark) │
├─ Error handling │ ├─ Hook system │ └─ Seed migration │
└─ Health endpoints │ └─ Cascade registry │ │
13.2 Fas 1: Fundament (vecka 1-4)¶
Mål: Fungerande FastAPI-app med modeller, JWT och middleware.
| Vecka | Leverans | Definition of Done |
|---|---|---|
| 1 | Projektstruktur, pyproject.toml, Settings, app factory | uvicorn startar, /health svarar 200 |
| 2 | BaseDocument, organisationsmodeller (Consortium→Unit) | Modeller registrerade i Beanie, index skapade |
| 3 | JWT Service (encode/decode/refresh), auth dependencies | Token kan skapas och valideras, tester gröna |
| 4 | Middleware (CORS, RequestID, logging, ETag), error handlers | Middleware-pipeline fungerar end-to-end |
13.3 Fas 2: Kärnlogik (vecka 5-9)¶
Mål: Komplett CRUD, rättigheter, cascade och audit.
| Vecka | Leverans | Definition of Done |
|---|---|---|
| 5 | Resource-klass med GET (list + filter + pagination), GET item, POST | CRUD för Consortium fungerar med tester |
| 6 | PATCH, PUT, DELETE med ETag, readonly-stripping | Full CRUD med concurrency control |
| 7 | Feature guards, access accumulator, feature registry | Feature-baserad access fungerar end-to-end |
| 8 | Cascade delete (deny/null/delete), cascade registry | Cascade deny blockerar, null/delete fungerar |
| 9 | Cascade update (subscriber + kedja), audit logger, hook system | Namnändringar propagerar, audit loggas |
13.4 Fas 3: Portering (vecka 10-12)¶
Mål: Auth-flöden och specifik affärslogik porterad.
| Vecka | Leverans | Definition of Done |
|---|---|---|
| 10 | Auth routes (API code, email, refresh, logout), OAuth2 providers | Inloggning fungerar via API code och email |
| 11 | OAuth2 (Google, Microsoft), portera hooks från Eve | OAuth2 redirect-flow fungerar |
| 12 | Airpark referens-implementation, seed-data migration | Airpark-API startar och kan hantera data |
13.5 Fas 4: Validering (vecka 13-14)¶
Mål: Produktionsredo.
| Vecka | Leverans | Definition of Done |
|---|---|---|
| 13 | Lasttester (locust), säkerhetsaudit, API-kompatibilitetstester | Prestanda ≥ Eve-systemet, inga OWASP-findings |
| 14 | Dokumentation (getting started, configuration, extending), release | README, docs/, PyPI-publicering |
13.6 Parallell drift¶
Under migrationen kan båda systemen köra parallellt:
Databaserna är desamma — båda systemen läser/skriver till samma MongoDB. Beanie och PyMongo är kompatibla.
13.7 Risker och mitigering¶
| Risk | Sannolikhet | Konsekvens | Mitigering |
|---|---|---|---|
| Beanie ODM saknar funktion vi behöver | Medium | Medium | Fallback till raw Motor-queries |
| Cascade-logik mer komplex än estimerat | Hög | Hög | Börja tidigt (fas 2), utökade tester |
| OAuth2 state-hantering skiljer sig (ASGI vs WSGI) | Medium | Låg | Authlib stödjer Starlette nativt |
| Prestanda sämre än förväntat | Låg | Medium | Profiling från start, async överallt |
| MongoDB-index saknas/skiljer sig | Låg | Hög | Beanie hanterar index deklarativt |
14. Deployment och DevOps¶
14.1 Container¶
# Dockerfile
FROM python:3.12-slim AS base
# Säkerhet: non-root user
RUN groupadd -r esi && useradd -r -g esi esi
WORKDIR /app
# Dependencies (cacheas separat)
COPY pyproject.toml ./
RUN pip install --no-cache-dir .
# Applikationskod
COPY src/ src/
# Byt till non-root
USER esi
# Health check
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import httpx; httpx.get('http://localhost:8000/health').raise_for_status()"
# Start
CMD ["gunicorn", "craft_easy.app:create_base_app()", \
"--worker-class", "uvicorn.workers.UvicornWorker", \
"--workers", "4", \
"--bind", "0.0.0.0:8000", \
"--timeout", "120", \
"--access-logfile", "-"]
14.2 Azure Static Web Apps + Azure Container Apps¶
┌──────────────────────────────────────────────┐
│ Azure Container Apps │
│ │
│ ┌─ craft-easy-api ────────────────────────┐ │
│ │ Container: python:3.12-slim │ │
│ │ Replicas: 2-10 (autoscale) │ │
│ │ CPU: 0.5-2 cores │ │
│ │ Memory: 1-4 GB │ │
│ │ Port: 8000 │ │
│ └───────────────────────────────────────┘ │
│ │
│ Environment Variables: │
│ ├─ MONGODB_URI (secret) │
│ ├─ JWT_PRIVATE_KEY (secret) │
│ ├─ JWT_PUBLIC_KEY │
│ ├─ ENVIRONMENT=production │
│ └─ ... │
└──────────────────┬───────────────────────────┘
│
┌──────────────────▼───────────────────────────┐
│ Azure Cosmos DB (MongoDB API) │
│ Tier: vCore M40 │
└──────────────────────────────────────────────┘
14.3 CD Pipeline¶
# .github/workflows/deploy.yml
name: Deploy
on:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Build & push container
uses: azure/docker-login@v1
with:
login-server: ${{ secrets.ACR_LOGIN_SERVER }}
username: ${{ secrets.ACR_USERNAME }}
password: ${{ secrets.ACR_PASSWORD }}
- run: |
docker build -t ${{ secrets.ACR_LOGIN_SERVER }}/craft-easy-api:${{ github.sha }} .
docker push ${{ secrets.ACR_LOGIN_SERVER }}/craft-easy-api:${{ github.sha }}
- name: Deploy to Azure Container Apps
uses: azure/container-apps-deploy-action@v1
with:
containerAppName: craft-easy-api
resourceGroup: rg-esi-prod
imageToDeploy: ${{ secrets.ACR_LOGIN_SERVER }}/craft-easy-api:${{ github.sha }}
14.4 Monitoring¶
| Verktyg | Syfte | Integration |
|---|---|---|
| Prometheus | Metrics (request latency, error rates, DB query time) | prometheus-fastapi-instrumentator |
| Grafana | Dashboard | Prometheus data source |
| Sentry | Error tracking | sentry-sdk[fastapi] |
| structlog | Structured logging → Azure Log Analytics | JSON format till stdout |
| Azure Monitor | Infrastructure metrics | Container Apps inbyggt |
15. Utvecklingsmiljö¶
15.1 Princip¶
Cosmos DB vCore kör riktig MongoDB-motor under huven. Lokal utveckling behöver därför bara en vanlig MongoDB-instans. Ingen emulator, inget specialverktyg — bara MongoDB 7.
15.2 Docker Compose — komplett dev-stack¶
# docker-compose.yml
services:
mongodb:
image: mongo:7
ports:
- "27017:27017"
volumes:
- mongodb_data:/data/db
# Ingen auth lokalt — enklast möjligt
api:
build: .
ports:
- "8000:8000"
volumes:
- ./src:/app/src # Hot reload — ändra kod utan rebuild
environment:
- MONGODB_URI=mongodb://mongodb:27017
- DATABASE_NAME=${DATABASE_NAME:-myproject-dev}
- AUTH_ENABLED=false
- ACCESS_CONTROL_ENABLED=false
- ENVIRONMENT=development
- DEBUG=true
- LOG_FORMAT=console
depends_on:
- mongodb
command: >
uvicorn craft_easy.app:create_base_app
--host 0.0.0.0
--port 8000
--reload
--reload-dir /app/src
volumes:
mongodb_data:
15.3 Starta utvecklingsmiljön¶
# Första gången
docker compose up -d
# API: http://localhost:8000
# Swagger UI: http://localhost:8000/docs
# ReDoc: http://localhost:8000/redoc
# MongoDB: mongodb://localhost:27017
15.4 Utan Docker — direkt på maskinen¶
# 1. Starta MongoDB (om installerad lokalt)
mongod --dbpath /tmp/mongodata
# 2. Skapa virtualenv
python3.12 -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"
# 3. Starta API med hot reload
MONGODB_URI=mongodb://localhost:27017 \
DATABASE_NAME=myproject-dev \
AUTH_ENABLED=false \
uvicorn craft_easy.app:create_base_app --reload --port 8000
15.5 Miljöfiler per kontext¶
# .env.development — lokal utveckling
MONGODB_URI=mongodb://localhost:27017
DATABASE_NAME=myproject-dev
AUTH_ENABLED=false
ACCESS_CONTROL_ENABLED=false
ENVIRONMENT=development
DEBUG=true
DOCS_ENABLED=true
LOG_FORMAT=console
CORS_ORIGINS=["http://localhost:3000","http://localhost:5173"]
# .env.test — Azure, Cosmos vCore Free (delad)
MONGODB_URI=mongodb+srv://cosmos-free.mongocluster.cosmos.azure.com/...
DATABASE_NAME=myproject-test
AUTH_ENABLED=true
ACCESS_CONTROL_ENABLED=true
ENVIRONMENT=test
DEBUG=false
DOCS_ENABLED=true
LOG_FORMAT=json
# .env.production — Azure, Cosmos vCore M30 (dedikerad)
MONGODB_URI=mongodb+srv://cosmos-prod.mongocluster.cosmos.azure.com/...
DATABASE_NAME=myproject-prod
AUTH_ENABLED=true
ACCESS_CONTROL_ENABLED=true
ENVIRONMENT=production
DEBUG=false
DOCS_ENABLED=false
LOG_FORMAT=json
JWT_PRIVATE_KEY=<from-key-vault>
JWT_PUBLIC_KEY=<from-key-vault>
RATE_LIMIT_ENABLED=true
15.6 Databasklient — mongosh¶
# Lokalt
mongosh mongodb://localhost:27017/myproject-dev
# Cosmos vCore (test)
mongosh "mongodb+srv://cosmos-free.mongocluster.cosmos.azure.com/" \
--tls --authenticationDatabase admin \
--username <user> --password <pass>
Samma mongosh-kommandon fungerar mot båda. Cosmos vCore stödjer mongosh fullt.
15.7 VS Code-tillägg¶
| Tillägg | Syfte |
|---|---|
| MongoDB for VS Code | Anslut till lokal MongoDB och Cosmos, kör queries direkt i editorn |
| REST Client | Testa API-endpoints med .http-filer |
| Python (ms-python) | Debugger, IntelliSense, type checking |
| Docker (ms-azuretools) | Hantera containers |
| Azure Databases | Browsa Cosmos DB direkt |
15.8 Testverktyg¶
# Kör alla tester mot lokal MongoDB
pytest
# Kör med coverage
pytest --cov=src/craft_easy --cov-report=html
# Kör bara unit tests (snabbt, mockad DB)
pytest tests/unit/
# Kör integration tests (kräver MongoDB)
pytest tests/integration/
# Typkontroll
mypy src/
# Lint + format
ruff check src/ tests/
ruff format src/ tests/
15.9 Flöde: Lokal → Test → Produktion¶
┌─────────────────┐ ┌──────────────────┐ ┌───────────────────┐
│ Development │ │ Test │ │ Production │
│ │ │ │ │ │
│ Docker MongoDB │ │ Cosmos Free │ │ Cosmos M30 + HA │
│ Auth: OFF │────▶│ Auth: ON │────▶│ Auth: ON │
│ Access: OFF │push │ Access: ON │merge│ Access: ON │
│ Hot reload: ON │ │ Auto-deploy │ │ Auto-deploy │
│ │ │ (GitHub Actions) │ │ (GitHub Actions) │
│ localhost:8000 │ │ test.api.app.com │ │ api.app.com │
└─────────────────┘ └──────────────────┘ └───────────────────┘
│ │ │
mongodb:// mongodb+srv:// mongodb+srv://
localhost:27017 cosmos-free... cosmos-prod...
Samma kodbas, samma Docker-image. Skillnaden är bara env-variabler.
16. Admin-schema (generisk CRUD-admin)¶
16.1 Koncept¶
Craft Easy API exponerar ett admin-schema (/admin/schema) som gör att en generisk admin-klient kan bygga komplett CRUD-gränssnitt automatiskt — utan att känna till den specifika applikationen.
Admin UI (generisk) → GET /admin/schema → Bygger allt automatiskt
│
└─ Peka på vilken Craft Easy API-instans som helst
16.2 AdminConfig på Resource¶
from craft_easy.core.crud import Resource
from craft_easy.core.admin import AdminConfig, FieldGroup, FieldConfig
zones = Resource(
model=ParkingZone,
prefix="parking-zones",
create_schema=ParkingZoneCreate,
update_schema=ParkingZoneUpdate,
admin=AdminConfig(
icon="parking",
label="Parking Zones",
label_plural="Parking Zones",
description="Manage parking zones and capacity",
# Listvy
list_fields=["name", "capacity", "status", "organization_name"],
search_fields=["name", "description"],
filter_fields=["status", "organization_id"],
sort_default="-created_at",
sort_fields=["name", "capacity", "created_at"],
# Formulär
form_layout=[
FieldGroup("General", ["name", "description", "capacity"]),
FieldGroup("Location", ["latitude", "longitude", "address"]),
FieldGroup("Organization", ["organization_id"]),
],
# Fält-overrides
field_overrides={
"organization_id": FieldConfig(
widget="relation",
relation_endpoint="/organizations",
relation_label_field="name",
relation_search_field="name",
),
"status": FieldConfig(
widget="select",
options=[
{"value": "active", "label": "Active", "color": "green"},
{"value": "inactive", "label": "Inactive", "color": "gray"},
{"value": "maintenance", "label": "Maintenance", "color": "yellow"},
],
),
"description": FieldConfig(widget="textarea", rows=4),
"capacity": FieldConfig(widget="number", min=0, max=10000, step=1),
},
),
)
16.3 AdminConfig-modeller¶
# src/craft_easy/core/admin.py
from typing import Optional, Any
from pydantic import BaseModel
class FieldConfig(BaseModel):
"""UI-metadata för ett enskilt fält."""
widget: Optional[str] = None # text, textarea, number, select, relation,
# date, datetime, boolean, email, url, color,
# json, markdown, media
label: Optional[str] = None # Override av fältnamn
placeholder: Optional[str] = None
help_text: Optional[str] = None
hidden: bool = False # Dölj i formulär
readonly: bool = False # Visa men kan inte ändras
# Number
min: Optional[float] = None
max: Optional[float] = None
step: Optional[float] = None
# Textarea
rows: Optional[int] = None
# Select
options: Optional[list[dict | str]] = None
# Relation (dropdown som hämtar från annan endpoint)
relation_endpoint: Optional[str] = None
relation_label_field: Optional[str] = "name"
relation_search_field: Optional[str] = "name"
relation_value_field: Optional[str] = "_id"
# Media
accept: Optional[str] = None # "image/*", ".pdf", etc.
max_size_mb: Optional[float] = None
class FieldGroup(BaseModel):
"""Grupp av fält i formuläret."""
label: str
fields: list[str]
collapsible: bool = False
collapsed: bool = False
class AdminConfig(BaseModel):
"""Admin UI-konfiguration för en Resource."""
# Identitet
icon: Optional[str] = None
label: Optional[str] = None
label_plural: Optional[str] = None
description: Optional[str] = None
# Listvy
list_fields: Optional[list[str]] = None # None = alla fält
search_fields: list[str] = []
filter_fields: list[str] = []
sort_default: str = "-created_at"
sort_fields: list[str] = []
page_size: int = 25
# Formulär
form_layout: Optional[list[FieldGroup]] = None # None = auto-layout
field_overrides: dict[str, FieldConfig] = {}
# Beteende
allow_create: bool = True
allow_edit: bool = True
allow_delete: bool = True
allow_export: bool = False
allow_bulk_delete: bool = False
16.4 Auto-generering från Pydantic¶
Om AdminConfig inte sätts genereras ett schema automatiskt från Pydantic-modellen:
# src/craft_easy/core/admin.py
def auto_admin_config(model: Type[Document]) -> dict:
"""Generera admin-schema från Pydantic-modell."""
fields = {}
for name, field_info in model.model_fields.items():
if name.startswith("_"):
continue
field = {
"type": _pydantic_type_to_string(field_info.annotation),
"required": field_info.is_required(),
"widget": _infer_widget(name, field_info),
}
# Readonly
extra = field_info.json_schema_extra or {}
if extra.get("readonly"):
field["readonly"] = True
# Enum/Literal → select widget
if hasattr(field_info.annotation, "__args__"):
field["widget"] = "select"
field["options"] = list(field_info.annotation.__args__)
# Regex → pattern
metadata = field_info.metadata or []
for m in metadata:
if hasattr(m, "pattern"):
field["pattern"] = m.pattern
fields[name] = field
return fields
def _infer_widget(name: str, field_info) -> str:
"""Gissa widget baserat på fältnamn och typ."""
type_str = str(field_info.annotation)
if "bool" in type_str:
return "boolean"
if "datetime" in type_str:
return "datetime"
if "int" in type_str or "float" in type_str:
return "number"
if "EmailStr" in type_str:
return "email"
if name.endswith("_id") and "ObjectId" in type_str:
return "relation"
if "description" in name or "notes" in name:
return "textarea"
return "text"
16.5 /admin/schema endpoint¶
# src/craft_easy/routes/admin.py
from fastapi import APIRouter, Depends
from ..core.auth.dependencies import get_required_user
router = APIRouter(prefix="/admin", tags=["admin"])
@router.get("/schema")
async def get_admin_schema(user=Depends(get_required_user)):
"""Returnerar komplett admin-schema för alla registrerade resurser."""
from ..core.crud.registry import resource_registry
resources = []
for resource in resource_registry.all():
schema = resource.build_admin_schema()
resources.append(schema)
return {
"version": "1.0",
"title": settings.APP_TITLE,
"resources": resources,
}
Fullständig admin-specifikation: admin-specification.md
Appendix A: Konfigurationsreferens¶
Alla feature flags med defaults:
# .env.example
# --- App ---
APP_TITLE=My API
APP_VERSION=1.0.0
ENVIRONMENT=development
DEBUG=false
DOCS_ENABLED=true
# --- Database ---
MONGODB_URI=mongodb://localhost:27017
DATABASE_NAME=my_system_development
# --- Auth (REQUIRED) ---
AUTH_ENABLED=true
JWT_ALGORITHM=ES512
JWT_PRIVATE_KEY=<required>
JWT_PUBLIC_KEY=<required>
JWT_USER_EXPIRY_MINUTES=30
JWT_REFRESH_MAX_LIFETIME_MINUTES=43200
# --- Access Control ---
ACCESS_CONTROL_ENABLED=true
SYSTEM_USERS=[]
# --- OAuth2 ---
OAUTH2_PROVIDERS=[]
OAUTH2_GOOGLE_CLIENT_ID=
OAUTH2_GOOGLE_CLIENT_SECRET=
# --- Features ---
AUDIT_LOG_ENABLED=true
CASCADE_ENABLED=true
ETAG_ENABLED=true
RATE_LIMIT_ENABLED=true
RATE_LIMIT_DEFAULT=100/minute
RATE_LIMIT_STORAGE_URI=memory://
# --- CORS ---
CORS_ORIGINS=["http://localhost:3000"]
# --- Logging ---
LOG_LEVEL=INFO
LOG_FORMAT=console
Infrastrukturanalys (hosting, priser, DR) finns i separat dokument: hosting.md