Skip to content

craft-easy-file-import (standalone)

craft-easy-file-import is a separate Python package that complements the API import engine. It is designed for long-running polling workers, bank reconciliation, and standalone file processing in containers where you don't want the full API process.

pip install craft-easy-file-import

When to use which

Scenario API engine Standalone package
User uploads a file through the admin UI Yes No
Need per-row rejection + manual correction flow Yes No
Need the records persisted via Craft Easy CRUD + hooks Yes No
Poll an SFTP server every 5 minutes for new files Yes
Watch a local directory for dropped files Yes
Parse BgMax or SEPA bank files and match payments Yes
Run as a dedicated Cloud Run / Azure Container App worker Yes
Minimal dependencies; no Mongo or FastAPI Yes

The two packages can be used together. A common deployment has the standalone package polling an SFTP folder for bank files and pushing matched payments into the API via ImportApiClient, while the API's own engine handles the interactive uploads from the admin UI.

Architecture

┌──────────────┐    ┌─────────────┐    ┌──────────────┐
│   sources    │──▶ │  pipelines  │──▶ │   runner     │
│ SFTP / dir   │    │ generic /   │    │ poll cycle / │
│              │    │ reconcile   │    │ loop         │
└──────────────┘    └─────────────┘    └──────────────┘
                    ┌──────────────────┐
                    │ ImportApiClient  │──▶ Craft Easy API
                    └──────────────────┘

Four building blocks cover every use case:

  • Parsers — decode a file format into Python dicts (CSV, JSON, flat-file, BgMax, SEPA)
  • Pipelines — orchestrate the handling of a parsed file (generic row-by-row or bank reconciliation)
  • Sources — where files come from (SFTP, watched directory)
  • Runner — polls sources, feeds files through pipelines, on a schedule

Minimal example

import asyncio
from craft_easy_file_import import (
    CSVParser,
    CSVParserConfig,
    GenericImportPipeline,
    PipelineBinding,
    ImportApiClient,
    run_loop,
)
from craft_easy_file_import.sources import WatchedDirectorySource

async def handle_row(row: dict) -> None:
    async with ImportApiClient(
        base_url="https://api.example.com",
        token=os.environ["API_TOKEN"],
        tenant_id="tenant_123",
    ) as api:
        await api.post("/products", data=row)

async def main():
    parser = CSVParser(CSVParserConfig(encoding="utf-8"))
    pipeline = GenericImportPipeline(
        pipeline_name="product-sync",
        parser=parser,
        on_row=handle_row,
    )
    source = WatchedDirectorySource(
        watch_dir="/mnt/inbox",
        file_pattern="products-*.csv",
        archive_dir="/mnt/archive",
    )
    bindings = [
        PipelineBinding(name="products", source=source, pipeline=pipeline),
    ]

    await run_loop(lambda: bindings, poll_interval=300)

asyncio.run(main())

This process watches /mnt/inbox for new CSV files every 5 minutes, parses each row, POSTs it to /products with a bearer token, and archives the processed file.

What's in the package

from craft_easy_file_import import (
    # Generic parsers
    CSVParser, CSVParserConfig,
    JSONParser, JSONParserConfig,
    FlatFileParser, FlatFileConfig, PostType, PostField,

    # Pipelines
    BasePipeline,
    GenericImportPipeline,
    ReconciliationPipeline,
    PipelineResult,
    SourceFile,

    # API client
    ImportApiClient,

    # Runner
    PipelineBinding,
    run_poll_cycle,
    run_loop,
)

# Banking parsers live in a submodule
from craft_easy_file_import.parsers.banking import (
    BgMaxParser, BgMaxPayment, BgMaxResult,
    SEPAParser, SEPAPayment, SEPAResult,
)

See Bank Parsers, Reconciliation, and Pipelines & Runner for the details of each building block.

The ImportApiClient

A minimal async HTTP client for pushing parsed data back into a Craft Easy API. It sets Authorization: Bearer <token>, optionally X-Tenant-Id, and handles JSON and multipart uploads:

from craft_easy_file_import import ImportApiClient

async with ImportApiClient(
    base_url="https://api.example.com",
    token="...",
    tenant_id="tenant_123",
    timeout=30.0,
) as api:
    # POST JSON
    result = await api.post("/products", data={"sku": "ABC", "name": "..."})

    # PATCH
    result = await api.patch("/products/123", data={"price": 199})

    # GET with params
    result = await api.get("/products", params={"sku": "ABC"})

    # Multipart file upload
    with open("report.pdf", "rb") as f:
        result = await api.upload_file(
            "/documents",
            file_name="report.pdf",
            file_content=f.read(),
            form_data={"category": "invoice"},
        )

The client is an async context manager — always use it with async with so the underlying httpx.AsyncClient is closed cleanly.

Dependencies

By design, the package has a minimal base: httpx for HTTP, Python stdlib for parsing. Optional extras cover extra features:

Extra Installs Enables
sftp asyncssh SFTPSource for polling remote servers
dev pytest, pytest-asyncio Test suite

The package does not depend on craft-easy-api, Beanie, Mongo, or FastAPI. You can run it in a 50 MB container with nothing but Python and httpx.

Deployment patterns

Kubernetes CronJob / Azure Container Instance: run run_poll_cycle() once per invocation. Simplest pattern; no long-running process.

Long-running worker: use run_loop(factory, poll_interval=300) inside an async def main(). Scales cleanly in Kubernetes, Cloud Run (min-instances=1), or a systemd service. The factory callable is re-invoked each cycle, so you can dynamically reload configuration (new SFTP credentials, new pipelines) without restarting.

Event-driven: skip the runner entirely and invoke pipeline.process(source_file, dry_run=False) from your own event handler (e.g. an Azure Blob Storage trigger). You get the parsing + callback logic without the polling loop.