BI Export¶
Operational data in MongoDB is excellent for transactions but painful for analytics — wide joins, time-series reports, and BI tools want columnar storage. The BI export module streams your Craft Easy resources to external warehouses incrementally, with GDPR filtering and schema inference built in.
Two targets are shipped out of the box:
| Target | Module | Dependencies |
|---|---|---|
| BigQuery | craft_easy.core.bi_export.targets.bigquery |
google-cloud-bigquery |
| Azure SQL | craft_easy.core.bi_export.targets.azure_sql |
pyodbc + ODBC Driver 18 |
Enable BI export routes in settings.py:
The export pipeline¶
For every configured resource, the service runs this loop:
- Look up the last sync timestamp for the target table (stored in the target system itself — no extra metadata in Mongo).
- Query the resource for documents with
updated_at > last_sync_timestamp. - Apply GDPR filtering per the resource's config.
- Flatten nested fields according to the flatten rules.
- Infer schema from the first batch and create or evolve the target table.
- Upsert rows in batches (default 500) keyed by
_id. - Return a
dictwith exported/skipped counts and any errors.
Export is idempotent — running the same config twice yields the same result because upserts are keyed by _id and the last-sync timestamp is stored in the target itself.
Configuring a resource for export¶
from craft_easy.core.bi_export.config import BIExportConfig
invoices_config = BIExportConfig(
resource="invoices",
target_table="invoices_fact",
gdpr_mode="exclude",
flatten_depth=1,
flatten_fields={
"customer.name": "customer_name",
"customer.org_number": "customer_org_nr",
},
exclude_fields=["internal_notes", "draft_payload"],
)
Configuration fields¶
| Field | Purpose |
|---|---|
resource |
MongoDB collection name |
target_table |
Destination table name in the warehouse |
gdpr_mode |
How to treat GDPR-tagged fields: exclude, anonymize, hash |
schedule |
Cron expression (used by the scheduled job — see below) |
flatten_depth |
Auto-flatten nested dicts up to this depth (default 1) |
flatten_fields |
Explicit mapping from dot-path to flat column name |
exclude_fields |
Fields to drop before export |
include_fields |
Whitelist — if set, only these fields are exported |
GDPR modes explained¶
| Mode | Effect |
|---|---|
exclude |
GDPR-tagged fields are removed from the exported row entirely |
anonymize |
Field values are replaced with "***" |
hash |
Field values are replaced with the first 16 chars of their SHA-256 hash — enables joins without leaking PII |
GDPR tags come from the model itself — see GDPR for how fields are marked.
Exporting to BigQuery¶
from craft_easy.core.bi_export.service import BIExportService
from craft_easy.core.bi_export.targets.bigquery import BigQueryTarget
target = BigQueryTarget(
project_id="my-gcp-project",
dataset_id="analytics",
location="europe-north1",
credentials_path="/secrets/bq-sa.json",
)
svc = BIExportService()
async with target:
result = await svc.export_to_target(
config=invoices_config,
target=target,
tenant_id=None, # None = all tenants
batch_size=500,
)
print(f"Exported {result['rows_upserted']} rows in {result['duration_ms']} ms")
BigQuery type mapping:
| Inferred type | BigQuery |
|---|---|
string |
STRING |
integer |
INT64 |
float |
FLOAT64 |
boolean |
BOOL |
datetime |
TIMESTAMP |
json |
JSON |
Exporting to Azure SQL¶
from craft_easy.core.bi_export.targets.azure_sql import AzureSQLTarget
# Password auth
target = AzureSQLTarget(
connection_string=(
"Driver={ODBC Driver 18 for SQL Server};"
"Server=tcp:myserver.database.windows.net,1433;"
"Database=analytics;Uid=bi_user;Pwd=...;Encrypt=yes;"
"TrustServerCertificate=no;Connection Timeout=30;"
),
)
# Or Managed Identity
target = AzureSQLTarget(
connection_string=(
"Driver={ODBC Driver 18 for SQL Server};"
"Server=tcp:myserver.database.windows.net,1433;"
"Database=analytics;"
"Authentication=ActiveDirectoryMsi;Encrypt=yes;"
),
)
Azure SQL type mapping:
| Inferred type | SQL Server |
|---|---|
string |
NVARCHAR(MAX) |
integer |
BIGINT |
float |
FLOAT |
boolean |
BIT |
datetime |
DATETIME2 |
json |
NVARCHAR(MAX) |
Upserts are emitted as MERGE statements matching on _id.
Writing a custom target¶
Subclass ExportTarget:
from craft_easy.core.bi_export.targets.base import ExportTarget, ExportResult
class ClickHouseTarget(ExportTarget):
async def connect(self) -> None: ...
async def close(self) -> None: ...
async def create_table_if_not_exists(
self, table_name: str, schema: dict[str, str]
) -> None: ...
async def upsert_rows(
self, table_name: str, rows: list[dict], key_field: str = "_id"
) -> ExportResult: ...
async def get_last_sync_timestamp(
self, table_name: str
) -> datetime | None: ...
The service then treats your target identically to the built-in ones.
Ad-hoc export endpoints¶
In addition to the scheduled incremental export, the module exposes REST endpoints for on-demand extracts. These are the go-to for quick dashboards, spreadsheet exports, and BI tools that pull data on a schedule:
| Endpoint | Purpose |
|---|---|
GET /bi/export/{resource}?format=json\|csv&date_from=...&date_to=...&fields=... |
Download the full resource as JSON or CSV |
GET /bi/summary/{resource}?group_by=status&metric=count |
Aggregate by a field (count, sum, avg) |
GET /bi/dashboard |
Resource counts + recent activity from the audit log |
GET /bi/report/revenue?period=2026-04 |
Total/refunded/net revenue for a month, broken down by provider and resource type |
Example:
# Download April's claims as CSV
curl "http://localhost:5001/bi/export/claims?format=csv&date_from=2026-04-01&date_to=2026-04-30" \
-H "Authorization: Bearer $TOKEN" \
-o claims-april.csv
# Revenue report for March
curl "http://localhost:5001/bi/report/revenue?period=2026-03" \
-H "Authorization: Bearer $TOKEN"
Running exports on a schedule¶
The export service is designed to run inside the built-in bi-export job — see Built-in Jobs. A typical configuration:
{
"target_type": "bigquery",
"project_id": "my-gcp-project",
"dataset_id": "analytics",
"location": "europe-north1",
"credentials_path": "/secrets/bq-sa.json",
"configs": [
{
"resource": "invoices",
"target_table": "invoices_fact",
"gdpr_mode": "exclude",
"flatten_depth": 1
},
{
"resource": "payments",
"target_table": "payments_fact",
"gdpr_mode": "hash"
}
]
}
Trigger the job via the /jobs/run/bi-export endpoint, or schedule it with a cron expression via /jobs/schedules. The default built-in schedule runs at 02:00 in the tenant's timezone.