Skip to content

BI Export

Operational data in MongoDB is excellent for transactions but painful for analytics — wide joins, time-series reports, and BI tools want columnar storage. The BI export module streams your Craft Easy resources to external warehouses incrementally, with GDPR filtering and schema inference built in.

Two targets are shipped out of the box:

Target Module Dependencies
BigQuery craft_easy.core.bi_export.targets.bigquery google-cloud-bigquery
Azure SQL craft_easy.core.bi_export.targets.azure_sql pyodbc + ODBC Driver 18

Enable BI export routes in settings.py:

BI_EXPORT_ENABLED=true

The export pipeline

For every configured resource, the service runs this loop:

  1. Look up the last sync timestamp for the target table (stored in the target system itself — no extra metadata in Mongo).
  2. Query the resource for documents with updated_at > last_sync_timestamp.
  3. Apply GDPR filtering per the resource's config.
  4. Flatten nested fields according to the flatten rules.
  5. Infer schema from the first batch and create or evolve the target table.
  6. Upsert rows in batches (default 500) keyed by _id.
  7. Return a dict with exported/skipped counts and any errors.

Export is idempotent — running the same config twice yields the same result because upserts are keyed by _id and the last-sync timestamp is stored in the target itself.

Configuring a resource for export

from craft_easy.core.bi_export.config import BIExportConfig

invoices_config = BIExportConfig(
    resource="invoices",
    target_table="invoices_fact",
    gdpr_mode="exclude",
    flatten_depth=1,
    flatten_fields={
        "customer.name": "customer_name",
        "customer.org_number": "customer_org_nr",
    },
    exclude_fields=["internal_notes", "draft_payload"],
)

Configuration fields

Field Purpose
resource MongoDB collection name
target_table Destination table name in the warehouse
gdpr_mode How to treat GDPR-tagged fields: exclude, anonymize, hash
schedule Cron expression (used by the scheduled job — see below)
flatten_depth Auto-flatten nested dicts up to this depth (default 1)
flatten_fields Explicit mapping from dot-path to flat column name
exclude_fields Fields to drop before export
include_fields Whitelist — if set, only these fields are exported

GDPR modes explained

Mode Effect
exclude GDPR-tagged fields are removed from the exported row entirely
anonymize Field values are replaced with "***"
hash Field values are replaced with the first 16 chars of their SHA-256 hash — enables joins without leaking PII

GDPR tags come from the model itself — see GDPR for how fields are marked.

Exporting to BigQuery

from craft_easy.core.bi_export.service import BIExportService
from craft_easy.core.bi_export.targets.bigquery import BigQueryTarget

target = BigQueryTarget(
    project_id="my-gcp-project",
    dataset_id="analytics",
    location="europe-north1",
    credentials_path="/secrets/bq-sa.json",
)

svc = BIExportService()

async with target:
    result = await svc.export_to_target(
        config=invoices_config,
        target=target,
        tenant_id=None,            # None = all tenants
        batch_size=500,
    )

print(f"Exported {result['rows_upserted']} rows in {result['duration_ms']} ms")

BigQuery type mapping:

Inferred type BigQuery
string STRING
integer INT64
float FLOAT64
boolean BOOL
datetime TIMESTAMP
json JSON

Exporting to Azure SQL

from craft_easy.core.bi_export.targets.azure_sql import AzureSQLTarget

# Password auth
target = AzureSQLTarget(
    connection_string=(
        "Driver={ODBC Driver 18 for SQL Server};"
        "Server=tcp:myserver.database.windows.net,1433;"
        "Database=analytics;Uid=bi_user;Pwd=...;Encrypt=yes;"
        "TrustServerCertificate=no;Connection Timeout=30;"
    ),
)

# Or Managed Identity
target = AzureSQLTarget(
    connection_string=(
        "Driver={ODBC Driver 18 for SQL Server};"
        "Server=tcp:myserver.database.windows.net,1433;"
        "Database=analytics;"
        "Authentication=ActiveDirectoryMsi;Encrypt=yes;"
    ),
)

Azure SQL type mapping:

Inferred type SQL Server
string NVARCHAR(MAX)
integer BIGINT
float FLOAT
boolean BIT
datetime DATETIME2
json NVARCHAR(MAX)

Upserts are emitted as MERGE statements matching on _id.

Writing a custom target

Subclass ExportTarget:

from craft_easy.core.bi_export.targets.base import ExportTarget, ExportResult

class ClickHouseTarget(ExportTarget):
    async def connect(self) -> None: ...
    async def close(self) -> None: ...

    async def create_table_if_not_exists(
        self, table_name: str, schema: dict[str, str]
    ) -> None: ...

    async def upsert_rows(
        self, table_name: str, rows: list[dict], key_field: str = "_id"
    ) -> ExportResult: ...

    async def get_last_sync_timestamp(
        self, table_name: str
    ) -> datetime | None: ...

The service then treats your target identically to the built-in ones.

Ad-hoc export endpoints

In addition to the scheduled incremental export, the module exposes REST endpoints for on-demand extracts. These are the go-to for quick dashboards, spreadsheet exports, and BI tools that pull data on a schedule:

Endpoint Purpose
GET /bi/export/{resource}?format=json\|csv&date_from=...&date_to=...&fields=... Download the full resource as JSON or CSV
GET /bi/summary/{resource}?group_by=status&metric=count Aggregate by a field (count, sum, avg)
GET /bi/dashboard Resource counts + recent activity from the audit log
GET /bi/report/revenue?period=2026-04 Total/refunded/net revenue for a month, broken down by provider and resource type

Example:

# Download April's claims as CSV
curl "http://localhost:5001/bi/export/claims?format=csv&date_from=2026-04-01&date_to=2026-04-30" \
  -H "Authorization: Bearer $TOKEN" \
  -o claims-april.csv

# Revenue report for March
curl "http://localhost:5001/bi/report/revenue?period=2026-03" \
  -H "Authorization: Bearer $TOKEN"

Running exports on a schedule

The export service is designed to run inside the built-in bi-export job — see Built-in Jobs. A typical configuration:

{
  "target_type": "bigquery",
  "project_id": "my-gcp-project",
  "dataset_id": "analytics",
  "location": "europe-north1",
  "credentials_path": "/secrets/bq-sa.json",
  "configs": [
    {
      "resource": "invoices",
      "target_table": "invoices_fact",
      "gdpr_mode": "exclude",
      "flatten_depth": 1
    },
    {
      "resource": "payments",
      "target_table": "payments_fact",
      "gdpr_mode": "hash"
    }
  ]
}

Trigger the job via the /jobs/run/bi-export endpoint, or schedule it with a cron expression via /jobs/schedules. The default built-in schedule runs at 02:00 in the tenant's timezone.