Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/pkg/pipeline/task/archive/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ The task receives records from its input channel, applies the archiving operatio

During **unpack**, the sanitized base filename of each extracted entry is stored in the record context under the key `CATERPILLAR_ARCHIVE_FILE_NAME_WRITE`. The stem is lowercased with non-alphanumeric characters replaced by underscores, while the extension is preserved and lowercased (e.g. `"Report 1.CSV"` → `"report_1.csv"`).

The full entry path within the archive is also stored under `CATERPILLAR_FILE_PATH_WRITE`, sanitized with the same rules so separators collapse to underscores (e.g. `sub/dir/Report 1.CSV` → `sub_dir_report_1.csv`). Use it to give each unpacked entry a destination name that encodes its path inside the archive.

## Configuration Fields

| Field | Type | Default | Description |
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/pipeline/task/archive/tar.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (t *tarArchive) Read() {
log.Fatal(err)
}
rc.SetContextValue(string(task.CtxKeyArchiveFileNameWrite), textutil.SlugifyFileName(filepath.Base(header.Name)))
rc.SetContextValue(string(task.CtxKeyFilePathWrite), textutil.SlugifyFileName(header.Name))
t.SendData(rc.Context, buf, t.OutputChan)
}

Expand Down
1 change: 1 addition & 0 deletions internal/pkg/pipeline/task/archive/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (z *zipArchive) Read() {
if f.FileInfo().Mode().IsRegular() {

rc.SetContextValue(string(task.CtxKeyArchiveFileNameWrite), textutil.SlugifyFileName(filepath.Base(f.Name)))
rc.SetContextValue(string(task.CtxKeyFilePathWrite), textutil.SlugifyFileName(f.Name))

fs, err := f.Open()
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/pipeline/task/file/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ The task automatically determines its mode based on the presence of input/output

In read mode, the sanitized base filename is stored in the record context under the key `CATERPILLAR_FILE_NAME_WRITE`. The stem is lowercased with non-alphanumeric characters replaced by underscores, while the extension is preserved and lowercased (e.g. `"Report 1.CSV"` → `"report_1.csv"`).

The full source path is also stored under `CATERPILLAR_FILE_PATH_WRITE`, sanitized with the same rules. Because slashes and other separators collapse to underscores, the value is a single flat filename segment with no directory structure (e.g. `s3://prod-bucket/reports/type=X/2026-06-10/sub/Report (1).tsv` → `s3_prod_bucket_reports_type_x_2026_06_10_sub_report_1.tsv`). Use it when you need a unique destination name that encodes the whole source path. The extension is preserved, so don't re-append it in the destination `path`.

## Configuration Fields

| Field | Type | Default | Description |
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/pipeline/task/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (f *file) readFile(output chan<- *record.Record) error {
// Create a default record with context
rc := &record.Record{Context: ctx}
rc.SetContextValue(string(task.CtxKeyFileNameWrite), textutil.SlugifyFileName(filepath.Base(path)))
rc.SetContextValue(string(task.CtxKeyFilePathWrite), textutil.SlugifyFileName(path))

// let's write content to output channel
f.SendData(rc.Context, content, output)
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/pipeline/task/sftp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Like the `file` task, the role is **inferred from the channels**:

| The task has… | Role | What it does |
|---------------|------|--------------|
| **no input** (it is the first task) | **source — download** | Reads file(s) at `path` (a single file or a glob; doublestar `**` and `{a,b}` are supported, like the file task) and emits one record per file. The base name is stored in the record context (`CATERPILLAR_FILE_NAME_WRITE`) so a downstream task can name what it writes. |
| **no input** (it is the first task) | **source — download** | Reads file(s) at `path` (a single file or a glob; doublestar `**` and `{a,b}` are supported, like the file task) and emits one record per file. The base name is stored in the record context (`CATERPILLAR_FILE_NAME_WRITE`), and the full sanitized source path in `CATERPILLAR_FILE_PATH_WRITE`, so a downstream task can name what it writes. |
| **an input** | **sink — upload** | Writes each incoming record's data to `path`, used as-is per record. To name files from the source, template `path` — e.g. `{{ context "CATERPILLAR_FILE_NAME_WRITE" }}`. |

It cannot be both: configuring the task with both an input and an output is an error.
Expand Down Expand Up @@ -120,4 +120,4 @@ tasks:

**`task_concurrency` opens multiple connections.** Setting `task_concurrency` above `1` makes the task open one SSH connection per worker and transfer files in parallel. This is faster, but it increases memory use (more files in memory at once), and some servers limit the number of connections per user.

**Nested directories are not preserved.** Each file is identified by its base name only. If you download files from nested folders with a recursive glob (for example `/data/**/*.csv`), they all land in the single destination directory, and files with the same name overwrite each other. To keep a folder structure, use a separate `file → sftp` pair for each folder, with the target path set for each one.
**Nested directories are not preserved.** If you template the destination with `CATERPILLAR_FILE_NAME_WRITE`, each file is identified by its base name only: download files from nested folders with a recursive glob (for example `/data/**/*.csv`) and they all land in the single destination directory, where files with the same name overwrite each other. To avoid collisions, template with `CATERPILLAR_FILE_PATH_WRITE` instead — it encodes the full source path into the (flat) filename, so same-named files from different folders stay distinct. To keep an actual folder structure on the target, use a separate `file → sftp` pair for each folder, with the target path set for each one.
1 change: 1 addition & 0 deletions internal/pkg/pipeline/task/sftp/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (s *sftp) download(client *pkgsftp.Client, output chan<- *record.Record) er

rc := &record.Record{Context: ctx}
rc.SetContextValue(string(task.CtxKeyFileNameWrite), textutil.SlugifyFileName(pathpkg.Base(p)))
rc.SetContextValue(string(task.CtxKeyFilePathWrite), textutil.SlugifyFileName(p))
s.SendData(rc.Context, data, output)
}

Expand Down
1 change: 1 addition & 0 deletions internal/pkg/pipeline/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type contextKeyFile string
const (
CtxKeyFileNameWrite contextKeyFile = "CATERPILLAR_FILE_NAME_WRITE"
CtxKeyArchiveFileNameWrite contextKeyFile = "CATERPILLAR_ARCHIVE_FILE_NAME_WRITE"
CtxKeyFilePathWrite contextKeyFile = "CATERPILLAR_FILE_PATH_WRITE"
)

type Task interface {
Expand Down
Loading