This tutorial walks through four workspaces of increasing complexity. Each workspace is self-contained -- you can jump to any chapter, but the concepts build on each other:
- Single Ingestion Pipeline (
fruitshop_simple_workspace) -- a plain dlt pipeline deployed with CLI commands - Jobs and Deployments (
github_ingest_workspace) -- job decorators, triggers, and manifest-based deployment - Transformation Pipelines (
jaffle_shop_workspace) -- connected pipelines with followup jobs - Incremental Pipelines (
usgs_earthquakes_workspace) -- freshness constraints, backfill, and refresh cascade
- Python 3.12+
- uv package manager
- A dltHub Runtime account
- (Optional) A MotherDuck account for cloud destinations
dlt comes from: https://github.com/dlt-hub/dlt/archive/refs/heads/feat/workspace-deployment.zip and is added to each pyproject.toml You must setup runtime client to use locally running backend (already added to config.toml(s))
[runtime]
log_level="WARNING" # the system log level of dlt
# use the dlthub_telemetry setting to enable/disable anonymous usage data reporting, see https://dlthub.com/docs/reference/telemetry
dlthub_telemetry = false
api_base_url = "https://dlthub.dev/api/api"
auth_base_url = "https://dlthub.dev/api/auth"Each workspace is an independent Python project. To work with one:
cd <workspace_name>
uv sync
source .venv/bin/activateEvery workspace uses named destinations (e.g. fruitshop_destination, warehouse)
that resolve to different backends depending on the active profile:
| Profile | Config file | Secrets file | Destination | Use case |
|---|---|---|---|---|
dev |
dev.config.toml |
secrets.toml |
DuckDB (local) | Local development |
prod |
prod.config.toml |
prod.secrets.toml |
MotherDuck | Batch jobs on Runtime |
access |
access.config.toml |
access.secrets.toml |
MotherDuck (read-only) | Interactive notebooks on Runtime |
The dev profile is active by default. When you run a batch job on Runtime, it uses
prod. When you serve an interactive notebook, it uses access.
To configure MotherDuck credentials, create the secrets files in each workspace's
.dlt/ directory:
prod.secrets.toml (read/write):
[destination.fruitshop_destination.credentials]
database = "your_database"
password = "your-motherduck-service-token"access.secrets.toml (read-only):
[destination.fruitshop_destination.credentials]
database = "your_database"
password = "your-motherduck-read-only-token"Secrets files (
*.secrets.toml,secrets.toml) are gitignored. Never commit them. Runtime stores your secrets securely when you sync your configuration.
Authenticate once from any workspace:
uv run dlt runtime loginThis opens a browser for GitHub OAuth and links your local workspace to your dltHub Runtime account.
Workspace: fruitshop_simple_workspace
This is the simplest possible dlt workspace. It contains a pipeline that loads locally-generated data and a marimo notebook for exploring it. There are no job decorators and no deployment module -- everything is a regular Python script.
This is how you'd deploy an existing dlt project to Runtime without changing any code.
fruitshop_simple_workspace/
fruitshop_pipeline.py # pipeline that loads fruitshop data
fruitshop_notebook.py # marimo notebook for data exploration
pyproject.toml
.dlt/
config.toml # base config (duckdb destination)
dev.config.toml # dev profile
prod.config.toml # prod profile (motherduck)
access.config.toml # access profile (motherduck, read-only)
fruitshop_pipeline.py defines four resources -- customers, inventory_categories,
inventory, and purchases -- grouped into a fruitshop() source. The pipeline
loads them into a named destination called fruitshop_destination:
p = dlt.pipeline(
pipeline_name="fruitshop",
destination="fruitshop_destination",
dataset_name="fruitshop_data",
)
load_info = p.run(fruitshop())The destination name fruitshop_destination resolves to DuckDB or MotherDuck
depending on which profile is active.
cd fruitshop_simple_workspace
uv sync
uv run python fruitshop_pipeline.pyThis runs with the dev profile -- data goes into a local DuckDB file.
Upload your code and run the pipeline as a batch job:
uv run dlt runtime launch fruitshop_pipeline.pyThis single command:
- Syncs your code and configuration to Runtime
- Creates a batch job from the script and starts it
The batch job runs with the prod profile, so data goes to MotherDuck (or
whichever cloud destination you configured in prod.config.toml).
Add -f to follow logs in your terminal until the run completes:
uv run dlt runtime launch fruitshop_pipeline.py -fHow it works:
launchandserveaccept a Python file name as a convenience. Under the hood, the CLI generates a single-job deployment manifest from that file and syncs it to Runtime. This is called an ad-hoc deploy -- no__deployment__module is needed. When a workspace grows beyond one or two scripts, you'll want a proper deployment module instead (see Chapter 2).
Deploy the marimo notebook as an interactive app:
uv run dlt runtime serve fruitshop_notebook.pyThis deploys the notebook with the access profile (read-only credentials),
waits until it's running, and opens it in your browser.
# open the Runtime web dashboard
uv run dlt runtime dashboard
# workspace deployment overview
uv run dlt runtime info
# list all jobs
uv run dlt runtime job list
# stream logs for the latest run
uv run dlt runtime logs fruitshop_pipeline -f
# cancel a stuck run
uv run dlt runtime cancel fruitshop_pipelineWhen you run a script locally, dlt uses the base config.toml merged with
dev.config.toml -- the dev profile is the default.
When Runtime runs a batch job (via launch), it uses the prod
profile: config.toml + prod.config.toml + prod.secrets.toml.
When Runtime runs an interactive job (via serve), it uses the access profile:
config.toml + access.config.toml + access.secrets.toml.
This separation ensures batch pipelines have write access while notebooks only get read-only credentials.
Using launch and serve with a script file is the quickest way to get code
running on Runtime. However, it creates jobs one at a time and doesn't support:
- Scheduled triggers (cron, every N minutes)
- Followup jobs (run B after A succeeds)
- Freshness constraints
- Deploying the entire workspace as a single unit
For all of these, you need job decorators and a deployment module -- which is what the next chapter introduces.
Workspace: github_ingest_workspace
This workspace loads commits and contributors from the GitHub REST API. It introduces the three building blocks of manifest-based deployment:
- Job decorators that attach scheduling and metadata to Python functions
__deployment__.pythat declares which jobs exist in the workspacedlt runtime deploythat syncs the entire job graph to Runtime in one step
github_ingest_workspace/
github_pipeline.py # ingestion job with @pipeline decorator
github_dq_pipeline.py # data quality batch job with @job decorator
github_transformations_notebook.py # marimo: transformation design workflow
github_dq_notebook.py # marimo: interactive DQ exploration
github_report_notebook.py # marimo: commit analytics dashboard
__deployment__.py # deployment module
pyproject.toml
.dlt/
config.toml
dev.config.toml
prod.config.toml
access.config.toml
-
github_pipeline.py(load_commits) -- ingests commits and contributors from the GitHub REST API into thewarehousedestination. Runs every 5 minutes viatrigger.every("5m"). All resources use replace mode (full refresh each run). -
github_dq_pipeline.py(run_dq_checks) -- validates the ingested data by running a suite of checks (non-null keys, valid contributor types, positive contribution counts). The job fails if any check has failures, making it visible in the Runtime dashboard. Runs hourly via a cron schedule.
This workspace includes three marimo notebooks, deployed as interactive jobs.
The transformations notebook and the report notebook form a pipeline:
the transformations notebook produces an analytics-ready commits table that
the report notebook reads.
-
github_transformations_notebook.py-- a step-by-step tutorial that ingests raw GitHub data, explores the normalized schema, and builds a flattenedcommitstransformation using Ibis. It runs both the ingest and transform pipelines inside the notebook, producing agithub_transformdataset. This notebook is the prerequisite for the report notebook. -
github_report_notebook.py-- an analytics dashboard built on the transformed data fromgithub_transform. Shows commit activity over time, top contributors, merge ratio trends, commit timing heatmaps, and message length distributions. Requires the transformations notebook to have been run first. -
github_dq_notebook.py-- interactive exploration of data quality metrics and checks on the raw ingestion data. Demonstratesdq.with_metrics()for profiling (null rates, unique counts, averages) anddq.CheckSuitefor row-level validation. Works independently of the other two notebooks -- it only needs the ingestion pipeline to have run.
Instead of using launch with a file name, you can decorate functions with
@job or @pipeline from dlt.hub.run. The decorator attaches metadata --
triggers, tags, display name -- that Runtime uses to schedule and present the job.
dlt.hub.run provides two decorators for batch jobs:
@job-- a general-purpose batch job (any Python function)@pipeline-- a batch job bound to a nameddlt.pipeline
Both produce the same kind of job. @pipeline is a convenience that associates
the job with a specific pipeline name, so Runtime can link telemetry and datasets.
Note: we'll have fine grained retries and other goodies on @pipeline soon
Here's the ingestion job in github_pipeline.py:
from dlt.hub import run
from dlt.hub.run import trigger
@run.pipeline(
"github_pipeline",
trigger=trigger.every("5m"),
expose={"tags": ["ingest"], "display_name": "GitHub commits ingest"},
)
def load_commits():
"""Load commits and contributors from the GitHub REST API."""
github_pipeline = dlt.pipeline(
pipeline_name="github_pipeline",
destination="warehouse",
dataset_name="github_data",
)
load_info = github_pipeline.run(github_rest_api_source())
print(load_info)And the data quality batch job in github_dq_pipeline.py:
from dlt.hub import run
from dlt.hub.run import trigger
@run.job(
trigger=trigger.schedule("0 * * * *"),
expose={"display_name": "GitHub data quality"},
)
def run_dq_checks():
"""Run data quality checks on GitHub data. Fails if any check has failures."""
# ... check logic ...
if not all_passed:
raise RuntimeError("Data quality checks failed")A trigger tells Runtime when to run a job. You can pass a trigger (or a list of triggers) to any job decorator.
| Trigger | Meaning |
|---|---|
trigger.every("5m") |
Run every 5 minutes |
trigger.schedule("0 * * * *") |
Cron expression (every hour at :00) |
trigger.schedule("0 8 * * *") |
Cron expression (daily at 8am UTC) |
"*/5 * * * *" |
Shorthand -- bare cron string, auto-detected |
Triggers declared in code are the source of truth. There is no separate CLI command to add or remove schedules -- change the decorator, redeploy.
Tags are labels attached to jobs via the expose parameter. They serve two
purposes:
- Organization -- group related jobs in the Runtime dashboard
- Bulk operations -- trigger, list, or cancel all jobs sharing a tag
For example, the ingestion job above has tags: ["ingest"]. In a larger
workspace you might tag all your ingestion jobs the same way, then trigger
them all at once:
# trigger every job tagged "ingest"
uv run dlt runtime trigger "tag:ingest"This is useful for backfill scenarios: tag all your ingestion jobs with
"backfill", then dlt runtime trigger "tag:backfill" fires them all in one
command.
__deployment__.py is a Python module that declares everything deployable in
the workspace. Runtime discovers jobs by inspecting its contents:
"""GitHub ingest workspace -- loads and monitors GitHub API data"""
from github_pipeline import load_commits
from github_dq_pipeline import run_dq_checks
import github_transformations_notebook
import github_dq_notebook
import github_report_notebook
__all__ = [
"load_commits",
"run_dq_checks",
"github_transformations_notebook",
"github_dq_notebook",
"github_report_notebook",
]The rules are straightforward:
- Function imports (
from ... import load_commits) produce one job per function. The function must be decorated with@job,@pipeline, or@interactive. - Module imports (
import github_report_notebook) produce one job per module. The framework is auto-detected: marimo notebooks become interactive GUI jobs, FastMCP modules become MCP tool servers, Streamlit apps become dashboards. __all__lists exactly which names to deploy. Only listed names are included.__doc__(the module docstring) becomes the workspace description visible in the Runtime dashboard.
This is the central command for manifest-based deployment. It reads your
__deployment__.py, generates a deployment manifest, and syncs it to Runtime:
cd github_ingest_workspace
uv run dlt runtime deployThe deploy command:
- Imports
__deployment__.pyand collects all job definitions - Generates a deployment manifest (a JSON document describing every job, its triggers, entry points, and metadata)
- Syncs your code and configuration to Runtime
- Sends the manifest to Runtime for reconciliation
Runtime compares the new manifest against the currently deployed jobs and classifies each one:
| Status | Meaning |
|---|---|
| added | New job -- will be created |
| updated | Job definition changed -- will be updated |
| unchanged | No changes -- left as-is |
| archived | Job was in the previous manifest but not in this one -- triggers disabled, history preserved |
This is declarative: you describe what should exist, Runtime figures out the diff.
Removing a job from __deployment__.py doesn't delete it -- it archives it, preserving
run history and logs.
# see what would change without applying
uv run dlt runtime deploy --dry-run
# dump the full expanded manifest as YAML
uv run dlt runtime deploy --show-manifestAfter deploying, jobs with triggers run automatically on their schedule. You can also run them manually:
# launch a specific job by name
uv run dlt runtime launch load_commits
# launch with log streaming
uv run dlt runtime launch load_commits -f
# trigger all jobs tagged "ingest"
uv run dlt runtime trigger "tag:ingest"
# trigger all jobs that have a schedule trigger
uv run dlt runtime trigger "schedule:*"
# preview which jobs would be triggered (without creating runs)
uv run dlt runtime trigger "tag:ingest" --dry-run
# serve one of the notebooks
uv run dlt runtime serve github_report_notebookThe trigger command accepts selectors -- fnmatch patterns that match
against job triggers and tags. This lets you fire groups of jobs without naming
each one.
# list all deployed jobs
uv run dlt runtime job list
# list only jobs tagged "ingest"
uv run dlt runtime job "tag:ingest" list
# detailed info for a specific job
uv run dlt runtime job load_commits info
# stream logs
uv run dlt runtime logs load_commits -f
# cancel the latest run of a specific job
uv run dlt runtime cancel load_commits
# cancel all running jobs matching a selector
uv run dlt runtime cancel "tag:ingest"
# workspace deployment overview
uv run dlt runtime infoThis workspace runs each job independently -- the DQ checks don't know whether ingestion has finished. In the next chapter, we connect pipelines with followup triggers so that transformations run automatically after ingestion succeeds.
Workspace: jaffle_shop_workspace
This workspace demonstrates how to connect multiple pipelines so they run as a chain. An ingestion pipeline loads raw data from the Jaffle Shop API and a local parquet file; a transformation pipeline computes customer-level aggregations and loads them into the remote warehouse. The transformation runs automatically after ingestion succeeds -- no polling, no cron guessing.
jaffle_shop_workspace/
jaffle_ingestion.py # ingest job: REST API + parquet -> local DuckDB
jaffle_transformations.py # transform job: Ibis aggregations -> remote warehouse
payments.parquet # sample payments data
__deployment__.py # exports both jobs + an inline @run.interactive MCP server
pyproject.toml
.dlt/
config.toml
dev.config.toml
prod.config.toml
access.config.toml
jaffle_ingestion.py jaffle_transformations.py
┌────────────────────┐ ┌────────────────────────┐
│ REST API │ │ customer_orders │
│ (customers, │ on success │ customer_payments │
│ products, orders) ├─────────────────►│ │
│ + payments.parquet │ │ -> remote warehouse │
│ -> local DuckDB │ │ (MotherDuck) │
└────────────────────┘ └────────────────────────┘
schedule: hourly trigger: ingest.success
Both jobs use replace mode -- every run is a full refresh. This keeps the setup simple: no incremental cursors, no merge keys, no stale-data concerns.
The key concept in this workspace is the followup trigger. Instead of giving the transform job its own schedule, we tell it to run whenever ingestion succeeds:
from jaffle_ingestion import ingest_jaffle
@run.pipeline(
"jaffle_transform",
trigger=ingest_jaffle.success,
expose={"display_name": "Jaffle Shop transform"},
)
def transform_jaffle():
...ingest_jaffle.success is a trigger string that resolves to
job.success:jobs.jaffle_ingestion.ingest_jaffle. When Runtime sees ingestion
complete successfully, it immediately fires the transform.
Every decorated job (a JobFactory instance) exposes these trigger properties:
| Property | Fires when |
|---|---|
job.success |
The job completes successfully |
job.fail |
The job fails |
job.completed |
A tuple (success, fail) -- useful with multiple triggers |
A job can have more than one trigger. Pass a list:
@run.pipeline(
"jaffle_transform",
trigger=[trigger.schedule("0 8 * * *"), ingest_jaffle.success],
)
def transform_jaffle():
...This job runs either on its daily 8am schedule or immediately after ingestion succeeds -- whichever comes first.
When a job has multiple triggers, you might need to know which one caused the
current run. Declare a run_context parameter:
from dlt.hub.run import TJobRunContext
@run.pipeline(
"jaffle_transform",
trigger=[ingest_jaffle.success, some_other_job.success],
)
def transform_jaffle(run_context: TJobRunContext):
if run_context["trigger"] == ingest_jaffle.success:
print("Triggered by ingestion")
else:
print("Triggered by something else")TJobRunContext is a dict injected by the Runtime launcher. It includes:
| Key | Type | Description |
|---|---|---|
run_id |
str |
Unique identifier for this run |
trigger |
str |
The trigger string that fired (e.g. job.success:jobs.jaffle_ingestion.ingest_jaffle) |
refresh |
bool |
Whether this run carries a refresh signal (see Chapter 4) |
The execute parameter controls how Runtime manages a running job. The most
common setting is a timeout -- a maximum wall-clock duration after which
Runtime terminates the run.
The ingest job in this workspace has a 6-hour timeout using the string shorthand:
@run.pipeline(
jaffle_ingest_pipe,
trigger=trigger.schedule("0 * * * *"),
execute={"timeout": "6h"},
)
def ingest_jaffle():
...The transform job uses the full dict form with a custom grace period (2 hours timeout, 1 minute grace):
@run.pipeline(
"jaffle_transform",
trigger=ingest_jaffle.success,
execute={"timeout": {"timeout": 7200, "grace_period": 60}}, # 2h, 1min grace
)
def transform_jaffle():
...The timeout field accepts either a human-readable string ("6h", "30m",
"90s") or a TTimeoutSpec dict with timeout and grace_period in seconds.
When the timeout expires, Runtime sends a termination signal to the job process. The grace period is the window for the job to finish in-flight work (flush buffers, commit pending loads) before Runtime hard-kills the process. If the job exits cleanly within the grace period, the run counts as a normal completion.
| Field | Type | Default | Description |
|---|---|---|---|
timeout |
float |
none | Max wall-clock duration in seconds |
grace_period |
float |
30.0 | Seconds for graceful shutdown before hard kill |
There are no retries at the platform level -- retry logic belongs in your
pipeline code (e.g. via dlt's built-in retry support), where you know whether
a partial load is safe to resume.
The transformation functions use the @dlt.hub.transformation decorator, which
marks an Ibis expression (or SQL query) as a dlt resource that reads from an
existing dataset:
@dlt.hub.transformation(write_disposition="replace")
def customer_orders(dataset: dlt.Dataset) -> typing.Iterator[ir.Table]:
"""Aggregate statistics about previous customer orders."""
orders = dataset.table("orders").to_ibis()
yield orders.group_by("customer_id").aggregate(
first_order=orders.ordered_at.min(),
most_recent_order=orders.ordered_at.max(),
number_of_orders=orders.id.count(),
)Multiple transformations can be grouped into a source and run through a pipeline:
@dlt.source
def customers_metrics(raw_dataset: dlt.Dataset) -> list:
return [
customer_orders(raw_dataset),
customer_payments(raw_dataset),
]
jaffle_transform_pipe = dlt.pipeline(
"jaffle_transform",
destination="jaffleshop_transformation_destination",
)
jaffle_transform_pipe.run(
customers_metrics(jaffle_ingest_pipe.dataset())
)When source and destination share the same engine (both DuckDB, or both MotherDuck), dlt executes the query as pure SQL with zero data movement.
Interactive jobs are long-running HTTP services -- notebooks, dashboards, REST
APIs, MCP servers. Declare them with @run.interactive. The function returns
the framework instance (a FastMCP, a Starlette app, etc.) and the job
launcher detects what was returned and runs it.
This workspace ships a small MCP server defined right inside __deployment__.py:
@run.interactive(
section="jaffle_mcp",
interface="mcp",
idle_timeout="30m",
expose={"display_name": "Jaffle Shop MCP"},
)
def jaffle_mcp():
"""MCP server exposing a row_counts tool over the Jaffle Shop datasets."""
from fastmcp import FastMCP
mcp = FastMCP("jaffle-shop-data")
@mcp.tool
def row_counts() -> dict:
# ... query both pipeline datasets and return row counts ...
return results
return mcpA few things worth noting:
fastmcpis imported inside the function, so the deployment manifest generator doesn't need fastmcp installed -- only the runtime that actually starts the job does.- The function returns the
FastMCPinstance. The launcher detects this return value and delegates to the MCP launcher to start the server. section="jaffle_mcp"overrides the default section (which would be the containing module name__deployment__). This keeps the job's config path short and predictable.idle_timeout="30m"asks Runtime to recycle the server if it sits idle for 30 minutes. Useful for interactive jobs that shouldn't run forever.
Serve it with uv run dlt runtime serve jaffle_mcp.
Jobs read configuration through dlt's standard config system under the
[jobs.<section>.<spec>] section. The MCP server is a good example -- we
switch its transport to SSE in .dlt/config.toml:
[jobs.jaffle_mcp.mcp]
transport = "sse"jaffle_mcp is the section we set on the decorator. mcp is the
McpConfiguration spec section read by the MCP launcher. Because config files
are profile-aware, you can override the transport per profile by adding the
same section to dev.config.toml or prod.config.toml. Any job-specific
settings your code reads from dlt.config.value follow the same pattern.
cd jaffle_shop_workspace
uv sync
# preview the deployment
uv run dlt runtime deploy --dry-run
# deploy
uv run dlt runtime deploy
# run pipeline using pipeline name
uv dlt runtime run-pipeline jaffle_ingest -fAfter ingestion completes, watch for the transform to start:
uv run dlt runtime logs transform_jaffle -fAll jobs in this workspace use replace mode -- every run loads everything from scratch. In the next chapter, we add incremental loading with cursor-based pagination, freshness constraints that prevent transforms from running on stale data, and a backfill mechanism that cascades a full refresh through the entire pipeline graph.
Workspace: usgs_earthquakes_workspace
This workspace ingests real-time earthquake data from the USGS, transforms it into analytics tables, and ships a dashboard. It demonstrates dlt's most advanced deployment features: scheduler-driven intervals (pipelines receive start/end bounds from Runtime instead of persisting their own cursor state), freshness constraints between jobs, a backfill job that cascades a refresh signal without loading data, dependency groups, and timezone-aware cron scheduling.
usgs_earthquakes_workspace/
usgs/
__init__.py # stateless dlt source: takes (interval_start, interval_end)
settings.py # API URLs and constants
transformations.py # Ibis transformations: daily stats + severity classification
usgs_pipeline.py # 5 jobs: backfill (cascade + setup), daily, 2 transforms, clock
usgs_dashboard.py # marimo dashboard with 6 charts
__deployment__.py
pyproject.toml
.dlt/
config.toml
dev.config.toml # epoch + destination for dev
prod.config.toml
access.config.toml
backfill_usgs (manual, cascade + setup only)
══╦══> usgs_daily (cron */3 + followup, freshness gate, scheduler interval)
║ ┆
║ ┆ freshness: usgs_daily.is_fresh
║ ┆
║ ├╌╌╌> transform_earthquakes (every 5m, scheduler interval)
║ └╌╌╌> transform_feeds_summary (every 5m, replace)
║
clock (detached 1-minute heartbeat)
- Solid arrow (
══>) is a trigger + freshness gate: the daily job fires on its own cron and immediately after backfill succeeds, but only after backfill has completed at least once. - Dotted arrows (
╌╌>) are freshness gates only: transforms run on their own 5-minute cron but wait until the most recent ingest interval is complete before processing. This prevents transforms from observing half-loaded data.
The earthquakes source is a function of (interval_start, interval_end).
It holds no cursor state between runs -- the bounds are passed in explicitly
by the caller, never read from pipeline.state:
@dlt.source
def source(interval_start: datetime, interval_end: datetime):
incremental_interval = dlt.sources.incremental(
"time",
initial_value=interval_start,
end_value=interval_end,
range_end="closed",
)
@dlt.resource(write_disposition="merge", primary_key="id")
def earthquakes(
time: dlt.sources.incremental[datetime] = incremental_interval,
) -> Iterable[TDataItem]:
params = {
"starttime": time.start_value.isoformat(),
"minmagnitude": MIN_MAGNITUDE,
}
if time.end_value is not None:
params["endtime"] = time.end_value.isoformat()
payload = requests.get(FDSN_EVENT_URL, params=params).json()
for feature in payload.get("features", []):
yield _flatten_feature(feature)
return [earthquakes, feeds_summary]What each piece does:
initial_value+end_valueboth set: binds the incremental window to the caller-supplied bounds. Because both endpoints are pinned, dlt does not fall back to pipeline state.range_end="closed": both endpoints inclusive, matching the FDSN service's own inclusiveendtimesemantics.- Cursor pushdown:
time.start_value/time.end_valuefeed straight into the FDSN API asstarttime/endtimequery params, so only the requested slice is fetched from the server -- no over-fetch, no client-side filtering. merge+primary_key="id": earthquake records can be revised by USGS. Merge-by-id updates existing rows in place so replaying overlapping windows stays idempotent.
Why stateless matters: every run is a pure function of its inputs. That opens the door to future parallel backfills (multiple workers processing different slices concurrently, no cursor contention) and makes every run easy to reason about and safe to re-run.
The feeds_summary resource is the opposite: write_disposition="replace"
with no cursor. The "significant events of the past 30 days" feed is a
small snapshot rebuilt on every run.
When Runtime fires a scheduled job, it populates run_context with the
[interval_start, interval_end] window the job should process:
- Normal run: the window covers the cron tick that just elapsed. If the
job missed previous ticks (downtime, freshness gate blocking, ...), the
scheduler extends
interval_startback to where the last successful run ended. Windows are always continuous -- no gaps, no dropped data. - On refresh: Runtime resets the interval pointer back to the configured
start of the overall range, so the next run reprocesses the full history.
The job doesn't have to override
interval_startmanually.
Here's how usgs_daily uses the scheduler-supplied window:
@run.pipeline(
usgs_ing_pipeline,
interval={"start": USGS_EPOCH},
trigger=["*/3 * * * *", backfill_usgs.success],
require={"timezone": "Europe/Berlin"},
)
def usgs_daily(run_context: TJobRunContext, epoch: str = None):
if run_context["refresh"]:
usgs_ing_pipeline.refresh = "drop_sources"
# optionally narrow the refresh window for faster dev testing
if epoch:
run_context["interval_start"] = datetime.fromisoformat(epoch)
_load_ingest(
run_context["interval_start"],
run_context["interval_end"],
["earthquakes", "feeds_summary"],
)No state lookups, no cursor restoration, no SQL filter rendering. The scheduler hands over the window; the job forwards it to the stateless source.
The interval= parameter tells the scheduler the overall time range the
job covers:
interval={"start": USGS_EPOCH}interval.start is where the data begins. Together with the cron trigger
it defines the full set of discrete windows the job should eventually cover.
interval.end is optional -- when omitted it defaults to "now" on every
run.
The scheduler uses interval.start for two things:
- Initial run: the first window starts at
interval.start. - Refresh: when a refresh signal arrives, the scheduler resets the
interval pointer back to
interval.startso the entire range gets reprocessed.
Transform jobs in this workspace also declare interval={"start": USGS_EPOCH}
for the same reason -- the scheduler needs to know where transformable data
begins. Note that transforms don't need to override the window on refresh:
the amount of data they process is fully controlled by what the ingest job
loaded.
The epoch function parameter on usgs_daily is a dev-profile convenience.
When set, it overrides the scheduler's reset-to-start on refresh with a
more recent date, so local test runs finish fast instead of re-ingesting
from the very beginning:
# .dlt/dev.config.toml -- dev-only, narrows refresh window for fast testing
[jobs.usgs_pipeline]
epoch = "2026-04-05T00:00:00+00:00"In production, epoch is None (no override in prod.config.toml), so
refreshes use the full interval.start range as Runtime provides it.
The [jobs.usgs_pipeline] section in the config above applies to every
job defined in usgs_pipeline.py because the default config section for a
job is the containing module name. A single section, many jobs:
[jobs.usgs_pipeline]
epoch = "2026-04-05T00:00:00+00:00"Every decorated function in usgs_pipeline.py -- backfill_usgs,
usgs_daily, transform_earthquakes, transform_feeds_summary -- picks up
this epoch unless a per-job section ([jobs.usgs_pipeline.usgs_daily])
overrides it. Only usgs_daily actually declares an epoch parameter, so
the value is silently ignored by the others.
See the "Job configuration via dlt.config.value" subsection in the
Reference for the general pattern.
usgs_daily declares require={"timezone": "Europe/Berlin"}. This tells
Runtime to interpret the job's cron expressions in that IANA timezone --
*/3 * * * * still ticks every 3 minutes, but midnight-based cron
expressions (0 0 * * *) fire at midnight Berlin time rather than UTC.
The scheduler's interval_start / interval_end are UTC datetimes in
run_context, but they align to tick boundaries in the declared timezone.
So a daily job with timezone="Europe/Berlin" and cron 0 0 * * * receives
intervals like [2026-04-13T22:00:00Z, 2026-04-14T22:00:00Z] during
daylight saving -- the Berlin midnight.
Set timezone on every scheduled job that cares about local-time ticks;
transforms in this workspace declare the same Berlin timezone for the same
reason. See the Reference for the full list of require options.
A freshness constraint is a gate: "don't run me until the upstream job's last completed interval has fully covered my scheduled interval."
@run.pipeline(
usgs_ing_pipeline,
interval={"start": USGS_EPOCH},
trigger=["*/3 * * * *", backfill_usgs.success],
require={"timezone": "Europe/Berlin"},
)
def usgs_daily(run_context: TJobRunContext, epoch: str = None):
...Freshness is not a trigger. The distinction matters:
| Concept | Semantics |
|---|---|
Trigger (trigger=upstream.success) |
"Run me immediately when upstream finishes" (event-driven) |
Freshness (freshness=upstream.is_fresh) |
"Run me on my own schedule, but only when upstream is done" (gate) |
In this workspace:
usgs_dailydoesn't use freshness -- it depends onbackfill_usgsvia a followup trigger (backfill_usgs.success) and itsinterval.starthandles the "first run" boundary.- Both transform jobs declare
freshness=[usgs_daily.is_fresh]-- even though they have their own 5-minute cron, they wait until the most recent ingest interval is complete. This preventstransform_feeds_summaryfrom observing the brief mid-load window wherefeeds_summary(a replace resource) has been dropped but not yet rewritten.
The refresh= parameter on a job decorator controls how a refresh signal
propagates downstream. When a job runs with run_context["refresh"] == True,
it's responsible for reacting to that flag in its function body.
| Policy | Behavior |
|---|---|
refresh="always" |
Every successful run cascades a refresh signal to all downstream jobs (originator) |
refresh="auto" (default) |
Passes through if received, otherwise no-op (transparent) |
refresh="block" |
Stops the signal -- downstream jobs never receive it |
The backfill job is the cascade originator. It does not load data -- its sole job is to fire the refresh signal and do one-shot setup (drop sources, initialize destination state, warm caches, etc.):
@run.job(
expose={"tags": ["backfill"], "display_name": "USGS backfill cascade"},
refresh="always",
)
def backfill_usgs(epoch = USGS_EPOCH):
"""Cascade a refresh signal to every downstream job and do initial setup.
Does not load data -- downstream jobs reprocess from interval.start."""
print("initial setup for epoch:", epoch)
# place one-shot setup here (schema init, destination warmup, ...)When backfill succeeds, Runtime clears prev_completed_run on all reachable
downstream jobs (BFS walk, stopped by block policies). Those jobs then
start with run_context["refresh"] = True and react accordingly:
| Job | refresh= |
Reaction in body |
|---|---|---|
backfill_usgs |
always |
Originates the cascade. One-shot setup only -- loads no data. |
usgs_daily |
auto |
Sets pipeline.refresh = "drop_sources". Runtime resets the scheduler interval to interval.start; an optional dev-only epoch override narrows the window for fast testing. |
transform_earthquakes |
auto |
Sets pipeline.refresh = "drop_resources". Runtime resets the interval; the transform re-aggregates whatever the ingest loaded. |
transform_feeds_summary |
auto |
Sets pipeline.refresh = "drop_resources". The replace transform naturally rebuilds from the refreshed ingest data. |
Why drop_sources vs drop_resources:
drop_sources(ingest) wipes every table owned by the source and the source-level state. For our stateless source that means just the tables -- there's no cursor to clear -- but the table drop ensures the next run rebuilds cleanly.drop_resources(transforms) drops only the transform's own output table(s) and associated resource(s) state. Does not touch other resources. If at some point transformation resources are placed in a single transformation source, this ensures that only resources handled by specified job are dropped in it.
Transform jobs don't need their own cursor or state lookup. They read the
same run_context["interval_start"] / interval_end the ingest job saw,
and forward it to the Ibis transformation:
@run.pipeline(
usgs_eq_stats_pipeline,
trigger=trigger.schedule("*/5 * * * *"),
interval={"start": USGS_EPOCH},
freshness=[usgs_daily.is_fresh],
require={"dependency_groups": ["ibis"], "timezone": "Europe/Berlin"},
expose={"tags": ["transform"]},
)
def transform_earthquakes(run_context: TJobRunContext):
if run_context["refresh"]:
usgs_eq_stats_pipeline.refresh = "drop_resources"
time_window = (run_context["interval_start"], run_context["interval_end"])
usgs_eq_stats_pipeline.run(
earthquake_daily_stats(usgs_ing_pipeline.dataset(), time_window)
)The transform doesn't need an epoch override -- the amount of data it
processes is fully controlled by what the ingest job loaded. Its
interval={"start": USGS_EPOCH} tells the scheduler where transformable
data begins, but on refresh, Runtime resets the window automatically.
The Ibis transformation applies the window as a filter:
@dlt.hub.transformation(
table_name="earthquake_daily_stats",
write_disposition="merge",
primary_key=["day", "region"],
)
def earthquake_daily_stats(
dataset: dlt.Dataset,
time_window: Optional[Tuple[datetime, datetime]] = None,
) -> typing.Iterator[ir.Table]:
eq = dataset.table("earthquakes").to_ibis()
if time_window is not None:
start, end = time_window
eq = eq.filter((eq.time >= start) & (eq.time <= end))
yield (
eq.mutate(day=eq.time.cast("date"), region=...)
.group_by(["day", "region"])
.aggregate(event_count=ibis._.count(), ...)
)The merge disposition with primary_key=["day", "region"] makes
overlapping windows idempotent -- partial-day rows on slice boundaries are
updated in place if the scheduler sends an extended window after a missed
tick.
The transform jobs need ibis-framework but the ingest jobs don't. Rather than
adding ibis to the workspace's main dependencies, we declare it as a
dependency group in pyproject.toml:
[dependency-groups]
ibis = ["ibis-framework[duckdb]"]Then tell Runtime to install it only for the jobs that need it:
@run.pipeline(
usgs_eq_stats_pipeline,
trigger=trigger.every("5m"),
freshness=[usgs_daily.is_fresh],
require={"dependency_groups": ["ibis"]},
)
def transform_earthquakes(run_context: TJobRunContext):
...Runtime composes the execution environment from the workspace's base dependencies
plus the job's declared dependency_groups. Ingest jobs get a leaner environment
without ibis, while transform jobs get the full analytical stack.
cd usgs_earthquakes_workspace
uv sync
# preview the deployment
uv run dlt runtime deploy --dry-run
# deploy the full job graph
uv run dlt runtime deploy
# kick off the initial setup + cascade refresh to all downstream jobs.
# backfill_usgs itself does not load data -- it just fires the signal.
# downstream jobs then reprocess from `epoch` on their next run.
uv run dlt runtime trigger "tag:backfill"
# after the cascade, cron takes over:
# usgs_daily fires every 3 minutes, processing the scheduler-supplied window
# transforms fire every 5 minutes (gated on daily freshness)
# monitor
uv run dlt runtime logs backfill_usgs -f
uv run dlt runtime logs transform_earthquakes -f
# force another full refresh at any time
uv run dlt runtime launch backfill_usgs --refresh
# force full refresh of the transformation layer only
# both transform jobs are tagged "transform" -- the selector fires both at once
uv run dlt runtime trigger "tag:transform" --refreshNo profiles, no uv, no pyproject just raw OSS scripts and notebooks. Still works
For the full reference on decorator arguments, triggers, module conventions, and manifest auto-generation, see REFERENCE.md.