Pipelines & Runner¶
Pipelines and the runner are the glue that turns parsers and sources into a running import worker. A pipeline says "for this kind of file, do this work"; a source says "here's where files come from"; and the runner says "check the sources on a schedule and feed each new file through its pipeline".
Pipelines¶
Every pipeline implements the same interface:
class BasePipeline(ABC):
@property
def name(self) -> str: ...
async def process(
self,
file: SourceFile,
dry_run: bool = False,
) -> PipelineResult: ...
Two concrete pipelines ship with the package:
| Pipeline | Use for |
|---|---|
GenericImportPipeline |
CSV / JSON / flat-file imports — row-by-row callback |
ReconciliationPipeline |
BgMax / SEPA bank file reconciliation — matched/unmatched callbacks |
GenericImportPipeline¶
from craft_easy_file_import import (
GenericImportPipeline,
CSVParser,
CSVParserConfig,
ImportApiClient,
)
async def push_row(row: dict) -> None:
async with ImportApiClient(
base_url="https://api.example.com",
token=os.environ["API_TOKEN"],
tenant_id="tenant_123",
) as api:
await api.post("/products", data=row)
pipeline = GenericImportPipeline(
pipeline_name="product-sync",
parser=CSVParser(CSVParserConfig(encoding="utf-8")),
on_row=push_row,
encoding="utf-8",
)
parser only needs a parse(content) method that returns an iterable of dicts — so any of the built-in parsers work, or a custom one. on_row is called once per row; failures are captured in the PipelineResult.errors and the next row is processed.
ReconciliationPipeline¶
For bank files, the reconciliation pipeline orchestrates the full flow: parse the bank file, match payments, invoke your callbacks.
from craft_easy_file_import import ReconciliationPipeline
pipeline = ReconciliationPipeline(
file_format="bgmax", # "bgmax" or "sepa"
tenant_id="tenant_123",
on_matched=mark_as_paid,
on_unmatched=flag_for_manual_review,
encoding="latin-1", # BgMax is Latin-1, SEPA is UTF-8
)
See Reconciliation for the detailed matching flow and callback signatures.
PipelineResult¶
Every pipeline.process() call returns a PipelineResult:
@dataclass
class PipelineResult:
pipeline_name: str
file_name: str
source: str # source binding name
started_at: datetime
completed_at: datetime | None
total_records: int
processed_count: int
matched_count: int # reconciliation only
unmatched_count: int # reconciliation only
error_count: int
errors: list[str]
dry_run: bool
@property
def success(self) -> bool:
return self.error_count == 0
success is True only when every record completed without an error. For monitoring, check success per result and alert when it flips.
Dry-run mode¶
Every pipeline honours dry_run=True. In dry-run, the pipeline still parses the file and walks every record, but does not call the on_row / on_matched / on_unmatched callbacks. Use this to validate a new template against a real file before enabling it in production:
result = await pipeline.process(source_file, dry_run=True)
assert result.total_records == 1000
assert result.error_count == 0
Sources¶
Sources produce SourceFile objects:
@dataclass
class SourceFile:
name: str
content: bytes
size: int
source_path: str = ""
retrieved_at: datetime
Two implementations are included.
WatchedDirectorySource¶
Polls a local (or mounted) directory for files matching a glob pattern.
from craft_easy_file_import.sources import WatchedDirectorySource
source = WatchedDirectorySource(
watch_dir="/mnt/inbox",
file_pattern="*.csv",
archive_dir="/mnt/archive", # optional — files moved here after processing
)
If archive_dir is set, files are moved there after the pipeline finishes. Otherwise they stay in watch_dir, so the next poll will pick them up again — useful during development but dangerous in production.
SFTPSource¶
Polls an SFTP server. Requires the optional asyncssh dependency (pip install craft-easy-file-import[sftp]).
from craft_easy_file_import.sources import SFTPSource
source = SFTPSource(
host="sftp.bank.example.com",
port=22,
username="bankfiles",
key_path="/secrets/sftp.key", # or password=...
remote_dir="/out",
file_pattern="*.bgm",
archive_dir="/archive",
)
The source lists matching files, downloads each one into memory, and archives them on the server after processing. Failed downloads are logged and the file is skipped — the next poll will retry it.
Bindings — connecting sources to pipelines¶
A PipelineBinding is a named pair of (source, pipeline):
from craft_easy_file_import import PipelineBinding
bindings = [
PipelineBinding(
name="bank-bgmax",
source=SFTPSource(host="sftp.bank.example.com", ...),
pipeline=ReconciliationPipeline(file_format="bgmax", tenant_id="tenant_123", ...),
),
PipelineBinding(
name="erp-products",
source=WatchedDirectorySource(watch_dir="/mnt/erp", file_pattern="products*.csv"),
pipeline=GenericImportPipeline(pipeline_name="products", parser=..., on_row=...),
),
]
Every binding runs independently — a failure in one binding doesn't affect the others. The name shows up in PipelineResult.source for logging and monitoring.
The runner¶
Two entry points drive the bindings.
run_poll_cycle — one pass¶
from craft_easy_file_import import run_poll_cycle
results = await run_poll_cycle(bindings, dry_run=False)
for r in results:
if not r.success:
logger.error("Pipeline %s failed: %s", r.pipeline_name, r.errors)
Runs one pass over every binding, processes every new file, returns the results, and exits. This is the right choice for Kubernetes CronJob, Azure Container Instance, or Cloud Run triggered by Cloud Scheduler — one invocation, one batch, done.
run_loop — continuous polling¶
from craft_easy_file_import import run_loop
def build_bindings() -> list[PipelineBinding]:
# Re-read configuration, rebuild bindings, etc.
return [...]
await run_loop(
build_bindings,
poll_interval=300, # seconds between polls
dry_run=False,
)
Polls in an infinite loop. The build_bindings factory is called at the start of every cycle, so you can:
- Reload configuration without restarting the process
- Rotate credentials (new SFTP keys, new API tokens)
- Add or remove pipelines dynamically
- Switch pipelines between tenants
If the factory raises, the cycle is skipped, the error is logged, and the next cycle runs as normal. Exceptions per binding are caught the same way — a single misbehaving pipeline never takes down the worker.
The factory can be sync or async — both are supported.
Full example: bank file worker¶
import asyncio
import os
from craft_easy_file_import import (
ReconciliationPipeline,
PipelineBinding,
ImportApiClient,
run_loop,
)
from craft_easy_file_import.sources import SFTPSource
API_URL = os.environ["API_URL"]
API_TOKEN = os.environ["API_TOKEN"]
TENANT_ID = os.environ["TENANT_ID"]
async def on_matched(match):
async with ImportApiClient(API_URL, API_TOKEN, TENANT_ID) as api:
await api.post(
f"/invoices/{match.invoice_id}/payments",
data={
"amount": str(match.payment.amount),
"reference": match.payment.reference,
"paid_at": match.payment.payment_date.isoformat(),
},
)
async def on_unmatched(match):
async with ImportApiClient(API_URL, API_TOKEN, TENANT_ID) as api:
await api.post("/unmatched-payments", data={
"amount": str(match.payment.amount),
"reference": match.payment.reference,
"reason": match.reason,
})
def build_bindings():
return [
PipelineBinding(
name="bgmax",
source=SFTPSource(
host="sftp.bank.example.com",
username="acme",
key_path="/secrets/sftp.key",
remote_dir="/out",
file_pattern="*.bgm",
archive_dir="/archive",
),
pipeline=ReconciliationPipeline(
file_format="bgmax",
tenant_id=TENANT_ID,
on_matched=on_matched,
on_unmatched=on_unmatched,
encoding="latin-1",
),
),
]
async def main():
await run_loop(build_bindings, poll_interval=300)
if __name__ == "__main__":
asyncio.run(main())
Deploy this as a single-replica long-running worker (Cloud Run, Azure Container App, Kubernetes Deployment). The process only needs outbound network access to the SFTP server and the API — no database, no inbound traffic, no secrets beyond the SFTP key and the API token.