Skip to content

Pipelines & Runner

Pipelines and the runner are the glue that turns parsers and sources into a running import worker. A pipeline says "for this kind of file, do this work"; a source says "here's where files come from"; and the runner says "check the sources on a schedule and feed each new file through its pipeline".

Pipelines

Every pipeline implements the same interface:

class BasePipeline(ABC):
    @property
    def name(self) -> str: ...

    async def process(
        self,
        file: SourceFile,
        dry_run: bool = False,
    ) -> PipelineResult: ...

Two concrete pipelines ship with the package:

Pipeline Use for
GenericImportPipeline CSV / JSON / flat-file imports — row-by-row callback
ReconciliationPipeline BgMax / SEPA bank file reconciliation — matched/unmatched callbacks

GenericImportPipeline

from craft_easy_file_import import (
    GenericImportPipeline,
    CSVParser,
    CSVParserConfig,
    ImportApiClient,
)

async def push_row(row: dict) -> None:
    async with ImportApiClient(
        base_url="https://api.example.com",
        token=os.environ["API_TOKEN"],
        tenant_id="tenant_123",
    ) as api:
        await api.post("/products", data=row)

pipeline = GenericImportPipeline(
    pipeline_name="product-sync",
    parser=CSVParser(CSVParserConfig(encoding="utf-8")),
    on_row=push_row,
    encoding="utf-8",
)

parser only needs a parse(content) method that returns an iterable of dicts — so any of the built-in parsers work, or a custom one. on_row is called once per row; failures are captured in the PipelineResult.errors and the next row is processed.

ReconciliationPipeline

For bank files, the reconciliation pipeline orchestrates the full flow: parse the bank file, match payments, invoke your callbacks.

from craft_easy_file_import import ReconciliationPipeline

pipeline = ReconciliationPipeline(
    file_format="bgmax",                 # "bgmax" or "sepa"
    tenant_id="tenant_123",
    on_matched=mark_as_paid,
    on_unmatched=flag_for_manual_review,
    encoding="latin-1",                  # BgMax is Latin-1, SEPA is UTF-8
)

See Reconciliation for the detailed matching flow and callback signatures.

PipelineResult

Every pipeline.process() call returns a PipelineResult:

@dataclass
class PipelineResult:
    pipeline_name: str
    file_name: str
    source: str                          # source binding name
    started_at: datetime
    completed_at: datetime | None
    total_records: int
    processed_count: int
    matched_count: int                   # reconciliation only
    unmatched_count: int                 # reconciliation only
    error_count: int
    errors: list[str]
    dry_run: bool

    @property
    def success(self) -> bool:
        return self.error_count == 0

success is True only when every record completed without an error. For monitoring, check success per result and alert when it flips.

Dry-run mode

Every pipeline honours dry_run=True. In dry-run, the pipeline still parses the file and walks every record, but does not call the on_row / on_matched / on_unmatched callbacks. Use this to validate a new template against a real file before enabling it in production:

result = await pipeline.process(source_file, dry_run=True)
assert result.total_records == 1000
assert result.error_count == 0

Sources

Sources produce SourceFile objects:

@dataclass
class SourceFile:
    name: str
    content: bytes
    size: int
    source_path: str = ""
    retrieved_at: datetime

Two implementations are included.

WatchedDirectorySource

Polls a local (or mounted) directory for files matching a glob pattern.

from craft_easy_file_import.sources import WatchedDirectorySource

source = WatchedDirectorySource(
    watch_dir="/mnt/inbox",
    file_pattern="*.csv",
    archive_dir="/mnt/archive",       # optional — files moved here after processing
)

If archive_dir is set, files are moved there after the pipeline finishes. Otherwise they stay in watch_dir, so the next poll will pick them up again — useful during development but dangerous in production.

SFTPSource

Polls an SFTP server. Requires the optional asyncssh dependency (pip install craft-easy-file-import[sftp]).

from craft_easy_file_import.sources import SFTPSource

source = SFTPSource(
    host="sftp.bank.example.com",
    port=22,
    username="bankfiles",
    key_path="/secrets/sftp.key",     # or password=...
    remote_dir="/out",
    file_pattern="*.bgm",
    archive_dir="/archive",
)

The source lists matching files, downloads each one into memory, and archives them on the server after processing. Failed downloads are logged and the file is skipped — the next poll will retry it.

Bindings — connecting sources to pipelines

A PipelineBinding is a named pair of (source, pipeline):

from craft_easy_file_import import PipelineBinding

bindings = [
    PipelineBinding(
        name="bank-bgmax",
        source=SFTPSource(host="sftp.bank.example.com", ...),
        pipeline=ReconciliationPipeline(file_format="bgmax", tenant_id="tenant_123", ...),
    ),
    PipelineBinding(
        name="erp-products",
        source=WatchedDirectorySource(watch_dir="/mnt/erp", file_pattern="products*.csv"),
        pipeline=GenericImportPipeline(pipeline_name="products", parser=..., on_row=...),
    ),
]

Every binding runs independently — a failure in one binding doesn't affect the others. The name shows up in PipelineResult.source for logging and monitoring.

The runner

Two entry points drive the bindings.

run_poll_cycle — one pass

from craft_easy_file_import import run_poll_cycle

results = await run_poll_cycle(bindings, dry_run=False)

for r in results:
    if not r.success:
        logger.error("Pipeline %s failed: %s", r.pipeline_name, r.errors)

Runs one pass over every binding, processes every new file, returns the results, and exits. This is the right choice for Kubernetes CronJob, Azure Container Instance, or Cloud Run triggered by Cloud Scheduler — one invocation, one batch, done.

run_loop — continuous polling

from craft_easy_file_import import run_loop

def build_bindings() -> list[PipelineBinding]:
    # Re-read configuration, rebuild bindings, etc.
    return [...]

await run_loop(
    build_bindings,
    poll_interval=300,       # seconds between polls
    dry_run=False,
)

Polls in an infinite loop. The build_bindings factory is called at the start of every cycle, so you can:

  • Reload configuration without restarting the process
  • Rotate credentials (new SFTP keys, new API tokens)
  • Add or remove pipelines dynamically
  • Switch pipelines between tenants

If the factory raises, the cycle is skipped, the error is logged, and the next cycle runs as normal. Exceptions per binding are caught the same way — a single misbehaving pipeline never takes down the worker.

The factory can be sync or async — both are supported.

Full example: bank file worker

import asyncio
import os
from craft_easy_file_import import (
    ReconciliationPipeline,
    PipelineBinding,
    ImportApiClient,
    run_loop,
)
from craft_easy_file_import.sources import SFTPSource

API_URL = os.environ["API_URL"]
API_TOKEN = os.environ["API_TOKEN"]
TENANT_ID = os.environ["TENANT_ID"]

async def on_matched(match):
    async with ImportApiClient(API_URL, API_TOKEN, TENANT_ID) as api:
        await api.post(
            f"/invoices/{match.invoice_id}/payments",
            data={
                "amount": str(match.payment.amount),
                "reference": match.payment.reference,
                "paid_at": match.payment.payment_date.isoformat(),
            },
        )

async def on_unmatched(match):
    async with ImportApiClient(API_URL, API_TOKEN, TENANT_ID) as api:
        await api.post("/unmatched-payments", data={
            "amount": str(match.payment.amount),
            "reference": match.payment.reference,
            "reason": match.reason,
        })

def build_bindings():
    return [
        PipelineBinding(
            name="bgmax",
            source=SFTPSource(
                host="sftp.bank.example.com",
                username="acme",
                key_path="/secrets/sftp.key",
                remote_dir="/out",
                file_pattern="*.bgm",
                archive_dir="/archive",
            ),
            pipeline=ReconciliationPipeline(
                file_format="bgmax",
                tenant_id=TENANT_ID,
                on_matched=on_matched,
                on_unmatched=on_unmatched,
                encoding="latin-1",
            ),
        ),
    ]

async def main():
    await run_loop(build_bindings, poll_interval=300)

if __name__ == "__main__":
    asyncio.run(main())

Deploy this as a single-replica long-running worker (Cloud Run, Azure Container App, Kubernetes Deployment). The process only needs outbound network access to the SFTP server and the API — no database, no inbound traffic, no secrets beyond the SFTP key and the API token.