Pipeline Authoring Guide
This guide explains how to author an ETLPlus pipeline YAML, using examples/configs/pipeline.yml as
a reference.
Overview
ETLPlus focuses on simple, JSON-first ETL. The pipeline file is a declarative description that your
built-in CLI runner can execute directly, or that your own runner (a script, Makefile, or CI job)
can parse and execute using ETLPlus primitives: extract, validate, transform, and load.
CLI note: ETLPlus uses Typer for command parsing. The legacy argparse parser has been removed. Use
the documented etlplus commands and flags (check etlplus --help) when wiring your runner.
Running a Pipeline from YAML (CLI)
Use the built-in etlplus run command to execute jobs defined in a pipeline YAML. The command reads
your config, resolves vars and env placeholders, then runs the requested job:
# List jobs with the check command
etlplus check --config examples/configs/pipeline.yml --jobs
# Run a specific job
etlplus run --config examples/configs/pipeline.yml --job file_to_file_customers
# Run another job from the same config
etlplus run --config examples/configs/pipeline.yml --job api_to_file_github_repos
For scripted usage inside a larger Python project, prefer importing the Python API directly (e.g.,
extract, transform, validate, load) instead of invoking the CLI subprocess.
Top-Level Structure
A pipeline file typically includes:
name: ETLPlus Demo Pipeline
version: "1"
profile:
default_target: local
env:
GITHUB_ORG: dagitali
GITHUB_TOKEN: "${GITHUB_TOKEN}"
vars:
data_dir: in
out_dir: out
profile.envis a convenient place to document expected environment variables. Resolve them in your runner before invoking ETLPlus functions.Treat repository-local env loaders as a development convenience. The intended runtime contract is still environment-injected configuration from the invoking shell, container, CI job, or scheduler.
varscollects reusable paths/values for templating.
APIs
Declare HTTP APIs and endpoints under apis. You can define headers, endpoints, and pagination:
apis:
github:
base_url: "https://api.github.com"
headers:
Accept: application/vnd.github+json
Authorization: "Bearer ${GITHUB_TOKEN}"
endpoints:
org_repos:
path: "/orgs/${GITHUB_ORG}/repos"
query_params:
per_page: 100
type: public
pagination:
type: page # page | offset | cursor
page_param: page
size_param: per_page
start_page: 1
page_size: 100
rate_limit:
max_per_sec: 2
Note: Use query_params for URL query string pairs (e.g., ?key=value). Older keys like params
or query are not supported to avoid ambiguity with body/form fields.
Profiles, base_path, and auth
For per-environment settings, define named profiles under an API. Each profile can include:
base_url(required): scheme + host (optionally with a path)base_path(optional): path prefix that’s composed afterbase_urlheaders: default headers for that profileauth: provider-specific auth block (shape is pass-through)
Example:
apis:
github:
profiles:
default:
base_url: "https://api.github.com"
base_path: "/v1"
auth:
type: bearer
token: "${GITHUB_TOKEN}"
headers:
Accept: application/vnd.github+json
Authorization: "Bearer ${GITHUB_TOKEN}"
endpoints:
org_repos:
path: "/orgs/${GITHUB_ORG}/repos"
At runtime, the model computes an effective base URL by composing base_url and base_path. If you
build an HTTP client from the config, prefer using the composed URL. For convenience, the
ApiConfig model exposes:
effective_base_url(): returnsbase_url+base_path(when present)build_endpoint_url(endpoint): composes the full URL frombase_url,base_path, and the endpoint’spath
Header precedence:
profiles.<name>.defaults.headers(lowest)profiles.<name>.headersAPI top-level
headers(highest)
Pagination tips (mirrors etlplus.api):
Page/offset styles: use
page_param,size_param,start_page, andpage_size.Cursor style: specify
cursor_paramandcursor_path(e.g.,data.nextCursor).Extract records from nested payloads with
records_path(e.g.,data.items).Rate limiting: set
rate_limit.sleep_secondsorrate_limit.max_per_secon the API or endpoint to define default pacing. Job runners mergejobs[].extract.options.rate_limitover those defaults and forward the merged mapping intoEndpointClient.paginate(..., rate_limit_overrides=...), so you can temporarily slow down or speed up a single job without editing the shared API profile. The paginator enforces that effective delay via a sharedRateLimiterbefore each page fetch.
Client helpers (etlplus.api.EndpointClient) now return the JSONRecords alias (a list of
JSONDict) so pipelines and custom runners can rely on typed payloads when aggregating paginated
responses.
See etlplus/api/README.md for the code-level pagination API.
Runner behavior with base_path (sources and targets)
When you reference an API service and endpoint in a pipeline (whether in a source or an API target),
the runner composes the request URL using the API model’s helpers, which honor any configured
base_path automatically.
Example:
apis:
myapi:
profiles:
default:
base_url: "https://api.example.com"
base_path: "/v1"
endpoints:
list_items:
path: "/items"
sources:
- name: list_items_source
type: api
service: myapi
endpoint: list_items
At runtime, the request is issued to:
https://api.example.com/v1/items
No extra wiring is needed — the composed base URL (including base_path) is used under the hood
when the job runs.
Databases
Declare connection defaults or named connections you’ll use in sources/targets:
Prefer managed database endpoints plus runtime-injected credentials for deployable pipelines. Localhost DSNs, SQLite files, and Docker-backed databases are useful fixtures for development and CI smoke checks, but they are not the canonical operating model.
databases:
mssql:
default:
driver: "ODBC Driver 18 for SQL Server"
server: "${MSSQL_SERVER}" # localhost,1433
database: "${MSSQL_DATABASE}"
username: "${MSSQL_USER}"
password: "${MSSQL_PASSWORD}"
trusted_connection: true
options:
encrypt: "yes"
trust_server_certificate: "yes"
connection_timeout: 30
application_name: "ETLPlus"
postgres:
warehouse:
host: "${PGHOST}"
port: "${PGPORT}"
database: "${PGDATABASE}"
username: "${PGUSER}"
password: "${PGPASSWORD}"
options:
sslmode: require
connection_timeout: 30
application_name: "ETLPlus"
sqlite:
local:
database: "./${data_dir}/demo.db"
options:
timeout: 30
Managed databases are a first-class configuration path. Localhost DSNs and Docker-backed databases remain useful for development, but ETLPlus expects the same connector surface to work with env-injected credentials and hosted database endpoints.
Note: Database extract/load in ETLPlus is minimal today; consider this a placeholder for orchestration that calls into DB clients.
For BigQuery-oriented configs, install the optional extra first:
pip install "etlplus[database-bigquery]"
Then you can keep the normal type: database connector shape and add the provider-specific fields:
sources:
- name: warehouse_events
type: database
provider: bigquery
project: analytics-project
dataset: warehouse
table: events
If you already have a SQLAlchemy-style BigQuery connection string, you can still provide it through
connection_string and omit project/dataset.
For Snowflake-oriented configs, install the matching optional extra:
pip install "etlplus[database-snowflake]"
Then keep the same type: database connector shape and add the Snowflake-specific fields:
sources:
- name: warehouse_events_snowflake
type: database
provider: snowflake
account: acme.us-east-1
database: analytics
schema: public
warehouse: transforming
table: events
If you already have a SQLAlchemy-style Snowflake connection string, you can still provide it through
connection_string and omit the provider-specific fields.
File Systems
Point to local/cloud locations and logical folders:
file_systems:
local:
base_path: "./${data_dir}"
folders:
in: "./${data_dir}"
out: "./${out_dir}"
s3:
bucket: "my-etlplus-bucket"
prefix: "data/"
region: "us-east-1"
profile: "${AWS_PROFILE}"
Prefer provider SDK credential chains or runtime-injected environment variables over checked-in
secrets. For remote-storage and cloud-database jobs, run etlplus check --readiness --config ...
to confirm the relevant extras and common provider-credential hints.
Sources
Define where data comes from:
sources:
- name: customers_csv
type: file # file | database | api
format: csv # json | csv | xml | yaml
path: "${data_dir}/customers.csv"
options:
header: true
delimiter: ","
encoding: utf-8
- name: remote_customers_csv
type: file
format: csv
path: "s3://my-etlplus-bucket/customers.csv"
options:
delimiter: ","
encoding: utf-8
- name: github_repos
type: api
service: github # reference into apis
endpoint: org_repos
Source-level query_params (direct form):
sources:
- name: users_api
type: api
url: "https://api.example.com/v1/users"
headers:
Authorization: "Bearer ${TOKEN}"
query_params:
active: true
page: 1
Tip: You can also override query parameters per job using
jobs[].extract.options.query_params: { ... }.
Rate limit overrides follow the same pattern: populate jobs[].extract.options.rate_limit with
either sleep_seconds or max_per_sec to override an API or endpoint default for that specific
job. Those values are merged into the client configuration and forwarded to
EndpointClient.paginate(..., rate_limit_overrides=...), ensuring only that job’s paginator is sped
up or slowed down.
File source notes:
File connector
pathvalues can be local paths or supported remote URIs such ass3://..., Azure Blob/Data Lake URIs, and HTTP/HTTPS URLs.Local paths are useful for quick iteration, but remote object storage and managed filesystems are also first-class targets for the same connector surface.
Connector-level
optionsare forwarded to file reads. Job-leveljobs[].extract.optionsvalues override connector-level file options for that job.ETLPlus still infers the format from the filename extension (
.csv,.json,.xml,.yaml). However,--source-formatand--target-formatoverride that inference in the Typer CLI. This means you can safely point at files without extensions or with misleading suffixes and force the desired parser or writer without renaming the file first.
Note: When using a service + endpoint in a source, URL composition (including base_path) is
handled automatically. See “Runner behavior with base_path (sources and targets)” in the APIs
section.
Validations
Validation rule sets map field names to rules, mirroring etlplus.ops.validate.FieldRules:
validations:
customers_basic:
CustomerId:
required: true
type: integer
min: 1
Email:
type: string
maxLength: 320
For document-oriented checks outside the in-pipeline ruleset model, the stable CLI also supports
schema-based validation through etlplus validate --schema. Use --schema-format jsonschema for
JSON or YAML payloads, --schema-format frictionless for CSV payloads, and --schema-format xsd
for XML payloads. When the source or schema path already makes the schema family clear, ETLPlus can
infer it without --schema-format; ambiguous inline text and STDIN cases still require the explicit
format. CSV validation failures keep stable row-aware keys such as row[3].email, so the same
errors and field_errors contract remains useful when uniqueness or required-cell constraints
fail.
Transforms
Transformation pipelines follow etlplus.ops.transform.transform() shapes exactly:
transforms:
clean_customers:
filter: { field: Email, op: contains, value: "@" }
map:
FirstName: first_name
LastName: last_name
select: [CustomerId, first_name, last_name, Email, Status]
sort:
- { field: first_name, reverse: false }
- last_name
summarize_customers:
aggregate:
- { field: CustomerId, func: count, alias: row_count }
- { field: CustomerId, func: max, alias: max_id }
Transform semantics to keep in mind:
etlplus.ops.transform.transform()applies steps in the fixed orderaggregate,filter,map,select,sort, regardless of YAML key order.When an
aggregatestep is present, the transform result is a single mapping containing the merged aggregate outputs. That makes aggregate transforms ideal for summaries, but it also means row-wise cleanup steps are not applied afterward.When you provide multiple
sortspecs, they are applied sequentially. Because each step sorts the output of the previous one, later sort specs become the higher-precedence keys.For custom Python orchestration, the same per-step behavior is available through the public modules under
etlplus.ops.transformations.
Targets
Where your data lands:
targets:
- name: customers_json_out
type: file
format: json
path: "${out_dir}/customers_clean.json"
- name: customers_json_archive
type: file
format: json
path: "s3://my-etlplus-bucket/archive/customers_clean.json"
options:
encoding: utf-8
- name: webhook_out
type: api
url: "https://httpbin.org/post"
method: post
headers:
Content-Type: application/json
Note: API targets that reference a service + endpoint also honor base_path via the same runner
behavior described in the APIs section.
File target notes:
File target
pathvalues can be local paths or supported remote URIs.Connector-level
optionsare forwarded to file writes. Job-leveljobs[].load.overridesvalues override connector-level file options for that job.
Service + endpoint target example:
apis:
myapi:
profiles:
default:
base_url: "https://api.example.com"
base_path: "/v1"
endpoints:
ingest:
path: "/ingest"
targets:
- name: ingest_out
type: api
service: myapi
endpoint: ingest
method: post
headers:
Content-Type: application/json
Connector Parsing and Extension
Under the hood, source and target entries are parsed via a single tolerant constructor that looks at
the type field and builds a concrete connector dataclass:
type: file→ConnectorFiletype: database→ConnectorDbtype: api→ConnectorApi
Details:
The pipeline loader uses a unified path for both
sourcesandtargets.Unknown or malformed entries are skipped rather than failing the whole load (keeping pipeline authoring permissive).
The connector kind is also available as a type-safe literal in code as
etlplus.connector.ConnectorType(values:"file" | "database" | "api").
To add new connector kinds in the future, implement a new dataclass in etlplus.connector
and extend the internal parser to handle its type value.
Jobs
Jobs orchestrate the flow end-to-end. Each job can reference a source, validations, transform, and target:
jobs:
- name: file_to_file_customers
depends_on: [seed_customers]
extract: { source: customers_csv }
validate: { ruleset: customers_basic }
transform: { pipeline: clean_customers }
load: { target: customers_json_out }
- name: seed_customers
extract: { source: seed_customers_csv }
load: { target: customers_db_out }
Notes:
depends_onis optional and can be a string or list of job names.Jobs without dependencies run first when ordered as a DAG.
etlplus run --job <name>executes the selected job plus its dependency closure.etlplus run --allexecutes every configured job in DAG order.
Running Pipelines (CLI and Python)
Once you have a pipeline YAML, you can run jobs either from the command line or directly from Python.
CLI: etlplus check (Inspect) and etlplus run (Execute)
List jobs or show a summary from a pipeline file:
etlplus check --config examples/configs/pipeline.yml --jobs
etlplus check --config examples/configs/pipeline.yml --summary
etlplus check --config examples/configs/pipeline.yml --graph
Run a specific job end-to-end (extract → validate → transform → load):
etlplus run --config examples/configs/pipeline.yml --job file_to_file_customers
Run all configured jobs in DAG order:
etlplus run --config examples/configs/pipeline.yml --all
Notes:
These commands read the same YAML schema described in this guide.
check --graphvalidates cycles and missing dependencies without executing jobs.Environment-variable substitution (e.g.
${GITHUB_TOKEN}) is applied the same way as when loading configs via the Python API.For more details on the orchestration implementation, see the
etlplus.ops.rundocstrings.
Python: etlplus.ops.run.run
To trigger a job programmatically, use the high-level runner function exposed by the package:
from etlplus.ops.run import run as run_job
result = run_job(
job="file_to_file_customers",
config_path="examples/configs/pipeline.yml",
)
print(result["status"], result.get("records"))
The run() function returns the final load result as a JSONDict envelope, which typically
includes status, message, and implementation-specific metadata such as record counts.
## Minimal working example
```yaml
name: "Quickstart"
vars:
data_dir: examples/data
out_dir: examples
sources:
- name: sample
type: file
format: json
path: "${data_dir}/sample.json"
transforms:
tidy:
filter: { field: age, op: gt, value: 25 }
select: [name, email]
validations:
basic:
name: { type: string, required: true }
email: { type: string, required: true }
targets:
- name: sample_out
type: file
format: json
path: "${out_dir}/sample_output.json"
jobs:
- name: run
extract: { source: sample }
validate: { ruleset: basic }
transform: { pipeline: tidy }
load: { target: sample_out }
Tips
Use environment variables for secrets and org-specific values; resolve them in your runner.
ETLPlus also supports additive secret tokens during config substitution:
${secret:NAME}or${secret:env:NAME}resolves from the active environment mapping.${secret:file:path.to.key}resolves from a local JSON or YAML mapping file referenced byETLPLUS_SECRETS_FILE.
Missing secret tokens remain unchanged so
etlplus check --readinessand strict config diagnostics can still report unresolved substitution requirements.Apply safety caps for API pagination (
max_pages,max_records) when running in CI.Validation controls: set
severity: warn|errorandphase: before_transform|after_transform|both.Keep pipelines composable; factor common transforms into named pipelines reused across jobs.
Example secret token usage:
profile:
env:
API_BASE_URL: https://api.example.test
sources:
- name: remote_payload
type: api
api: upstream
endpoint: customers
headers:
Authorization: "Bearer ${secret:API_TOKEN}"
targets:
- name: warehouse_file
type: file
format: json
path: "${secret:file:paths.customer_export}"
With a local secrets file:
export ETLPLUS_SECRETS_FILE="$PWD/.etlplus-secrets.json"
{
"paths": {
"customer_export": "./out/customers.json"
}
}
For the HTTP client and pagination API, see etlplus/api/README.md.
Design notes: Mapping inputs, dict outputs
ETLPlus config constructors (e.g., ApiConfig.from_obj, Config.from_dict) accept Mapping[str, Any] rather than dict[str, Any] for inputs. Why?
Flexibility: callers can pass any mapping-like object (e.g., YAML loaders that return custom mappings) without copying into a
dictfirst.Clear intent: inputs are treated as read-only; we normalize to concrete
dictonly for internal storage.Lower coupling: depending on the standard Mapping protocol avoids import cycles and keeps modules cohesive.
Practically, you can pass a plain dict everywhere and it will work.
Merge Semantics (Python 3.13)
We use the dict union operator for clarity:
a | bcreates a merged copy withbtaking precedence.a |= bupdatesain-place withb’s keys.
Header precedence (lowest → highest):
profiles.<name>.defaults.headersprofiles.<name>.headersAPI top-level
headers
Extending Config Shapes
When adding new config objects or fields:
Prefer
@dataclass(slots=True)for models.Add a
@classmethod from_obj(cls, obj: Mapping[str, Any]) -> Selfthat is tolerant of missing optional keys and performs minimal type normalization (e.g., cast header values tostr).Keep inputs as
Mapping[...](non-mutating) and store concretedictinternally.Reuse small helpers for repeated casts (e.g.,
headers: dict[str, str]).
Contributors: for the repo-wide typing approach (TypedDicts as editor hints, Mapping[str, Any]
inputs, and overloads imported only under TYPE_CHECKING), see
CONTRIBUTING.md#typing-philosophy.