craft-easy-file-import (standalone)¶
craft-easy-file-import is a separate Python package that complements the API import engine. It is designed for long-running polling workers, bank reconciliation, and standalone file processing in containers where you don't want the full API process.
When to use which¶
| Scenario | API engine | Standalone package |
|---|---|---|
| User uploads a file through the admin UI | Yes | No |
| Need per-row rejection + manual correction flow | Yes | No |
| Need the records persisted via Craft Easy CRUD + hooks | Yes | No |
| Poll an SFTP server every 5 minutes for new files | — | Yes |
| Watch a local directory for dropped files | — | Yes |
| Parse BgMax or SEPA bank files and match payments | — | Yes |
| Run as a dedicated Cloud Run / Azure Container App worker | — | Yes |
| Minimal dependencies; no Mongo or FastAPI | — | Yes |
The two packages can be used together. A common deployment has the standalone package polling an SFTP folder for bank files and pushing matched payments into the API via ImportApiClient, while the API's own engine handles the interactive uploads from the admin UI.
Architecture¶
┌──────────────┐ ┌─────────────┐ ┌──────────────┐
│ sources │──▶ │ pipelines │──▶ │ runner │
│ SFTP / dir │ │ generic / │ │ poll cycle / │
│ │ │ reconcile │ │ loop │
└──────────────┘ └─────────────┘ └──────────────┘
│
▼
┌──────────────────┐
│ ImportApiClient │──▶ Craft Easy API
└──────────────────┘
Four building blocks cover every use case:
- Parsers — decode a file format into Python dicts (CSV, JSON, flat-file, BgMax, SEPA)
- Pipelines — orchestrate the handling of a parsed file (generic row-by-row or bank reconciliation)
- Sources — where files come from (SFTP, watched directory)
- Runner — polls sources, feeds files through pipelines, on a schedule
Minimal example¶
import asyncio
from craft_easy_file_import import (
CSVParser,
CSVParserConfig,
GenericImportPipeline,
PipelineBinding,
ImportApiClient,
run_loop,
)
from craft_easy_file_import.sources import WatchedDirectorySource
async def handle_row(row: dict) -> None:
async with ImportApiClient(
base_url="https://api.example.com",
token=os.environ["API_TOKEN"],
tenant_id="tenant_123",
) as api:
await api.post("/products", data=row)
async def main():
parser = CSVParser(CSVParserConfig(encoding="utf-8"))
pipeline = GenericImportPipeline(
pipeline_name="product-sync",
parser=parser,
on_row=handle_row,
)
source = WatchedDirectorySource(
watch_dir="/mnt/inbox",
file_pattern="products-*.csv",
archive_dir="/mnt/archive",
)
bindings = [
PipelineBinding(name="products", source=source, pipeline=pipeline),
]
await run_loop(lambda: bindings, poll_interval=300)
asyncio.run(main())
This process watches /mnt/inbox for new CSV files every 5 minutes, parses each row, POSTs it to /products with a bearer token, and archives the processed file.
What's in the package¶
from craft_easy_file_import import (
# Generic parsers
CSVParser, CSVParserConfig,
JSONParser, JSONParserConfig,
FlatFileParser, FlatFileConfig, PostType, PostField,
# Pipelines
BasePipeline,
GenericImportPipeline,
ReconciliationPipeline,
PipelineResult,
SourceFile,
# API client
ImportApiClient,
# Runner
PipelineBinding,
run_poll_cycle,
run_loop,
)
# Banking parsers live in a submodule
from craft_easy_file_import.parsers.banking import (
BgMaxParser, BgMaxPayment, BgMaxResult,
SEPAParser, SEPAPayment, SEPAResult,
)
See Bank Parsers, Reconciliation, and Pipelines & Runner for the details of each building block.
The ImportApiClient¶
A minimal async HTTP client for pushing parsed data back into a Craft Easy API. It sets Authorization: Bearer <token>, optionally X-Tenant-Id, and handles JSON and multipart uploads:
from craft_easy_file_import import ImportApiClient
async with ImportApiClient(
base_url="https://api.example.com",
token="...",
tenant_id="tenant_123",
timeout=30.0,
) as api:
# POST JSON
result = await api.post("/products", data={"sku": "ABC", "name": "..."})
# PATCH
result = await api.patch("/products/123", data={"price": 199})
# GET with params
result = await api.get("/products", params={"sku": "ABC"})
# Multipart file upload
with open("report.pdf", "rb") as f:
result = await api.upload_file(
"/documents",
file_name="report.pdf",
file_content=f.read(),
form_data={"category": "invoice"},
)
The client is an async context manager — always use it with async with so the underlying httpx.AsyncClient is closed cleanly.
Dependencies¶
By design, the package has a minimal base: httpx for HTTP, Python stdlib for parsing. Optional extras cover extra features:
| Extra | Installs | Enables |
|---|---|---|
sftp |
asyncssh |
SFTPSource for polling remote servers |
dev |
pytest, pytest-asyncio |
Test suite |
The package does not depend on craft-easy-api, Beanie, Mongo, or FastAPI. You can run it in a 50 MB container with nothing but Python and httpx.
Deployment patterns¶
Kubernetes CronJob / Azure Container Instance: run run_poll_cycle() once per invocation. Simplest pattern; no long-running process.
Long-running worker: use run_loop(factory, poll_interval=300) inside an async def main(). Scales cleanly in Kubernetes, Cloud Run (min-instances=1), or a systemd service. The factory callable is re-invoked each cycle, so you can dynamically reload configuration (new SFTP credentials, new pipelines) without restarting.
Event-driven: skip the runner entirely and invoke pipeline.process(source_file, dry_run=False) from your own event handler (e.g. an Azure Blob Storage trigger). You get the parsing + callback logic without the polling loop.