From ac18420188144f5f7273dd1a7883014c2879aca7 Mon Sep 17 00:00:00 2001 From: narayan-pattern Date: Wed, 10 Jun 2026 15:47:18 +0530 Subject: [PATCH] FEAT: Add CATERPILLAR_FILE_PATH_WRITE context key Expose the full source path (slugified into a flat, slash-free filename segment) as a record context value, alongside the existing CATERPILLAR_FILE_NAME_WRITE (base name only). Set wherever a read emits records: file (S3 + local), sftp download, and archive (tar/zip) unpack. Value is textutil.SlugifyFileName(), so separators collapse to underscores and the extension is preserved. Lets a write destination encode the whole source path, which also avoids same-name collisions when reading nested folders with a recursive glob. Docs updated for the file, sftp, and archive tasks. Co-Authored-By: Claude Opus 4.8 --- internal/pkg/pipeline/task/archive/README.md | 2 ++ internal/pkg/pipeline/task/archive/tar.go | 1 + internal/pkg/pipeline/task/archive/zip.go | 1 + internal/pkg/pipeline/task/file/README.md | 2 ++ internal/pkg/pipeline/task/file/file.go | 1 + internal/pkg/pipeline/task/sftp/README.md | 4 ++-- internal/pkg/pipeline/task/sftp/operations.go | 1 + internal/pkg/pipeline/task/task.go | 1 + 8 files changed, 11 insertions(+), 2 deletions(-) diff --git a/internal/pkg/pipeline/task/archive/README.md b/internal/pkg/pipeline/task/archive/README.md index 38944d6..84fe9d3 100644 --- a/internal/pkg/pipeline/task/archive/README.md +++ b/internal/pkg/pipeline/task/archive/README.md @@ -18,6 +18,8 @@ The task receives records from its input channel, applies the archiving operatio During **unpack**, the sanitized base filename of each extracted entry is stored in the record context under the key `CATERPILLAR_ARCHIVE_FILE_NAME_WRITE`. The stem is lowercased with non-alphanumeric characters replaced by underscores, while the extension is preserved and lowercased (e.g. `"Report 1.CSV"` → `"report_1.csv"`). +The full entry path within the archive is also stored under `CATERPILLAR_FILE_PATH_WRITE`, sanitized with the same rules so separators collapse to underscores (e.g. `sub/dir/Report 1.CSV` → `sub_dir_report_1.csv`). Use it to give each unpacked entry a destination name that encodes its path inside the archive. + ## Configuration Fields | Field | Type | Default | Description | diff --git a/internal/pkg/pipeline/task/archive/tar.go b/internal/pkg/pipeline/task/archive/tar.go index e2d1198..c534a43 100644 --- a/internal/pkg/pipeline/task/archive/tar.go +++ b/internal/pkg/pipeline/task/archive/tar.go @@ -50,6 +50,7 @@ func (t *tarArchive) Read() { log.Fatal(err) } rc.SetContextValue(string(task.CtxKeyArchiveFileNameWrite), textutil.SlugifyFileName(filepath.Base(header.Name))) + rc.SetContextValue(string(task.CtxKeyFilePathWrite), textutil.SlugifyFileName(header.Name)) t.SendData(rc.Context, buf, t.OutputChan) } diff --git a/internal/pkg/pipeline/task/archive/zip.go b/internal/pkg/pipeline/task/archive/zip.go index bf0b694..d1c8697 100644 --- a/internal/pkg/pipeline/task/archive/zip.go +++ b/internal/pkg/pipeline/task/archive/zip.go @@ -41,6 +41,7 @@ func (z *zipArchive) Read() { if f.FileInfo().Mode().IsRegular() { rc.SetContextValue(string(task.CtxKeyArchiveFileNameWrite), textutil.SlugifyFileName(filepath.Base(f.Name))) + rc.SetContextValue(string(task.CtxKeyFilePathWrite), textutil.SlugifyFileName(f.Name)) fs, err := f.Open() if err != nil { diff --git a/internal/pkg/pipeline/task/file/README.md b/internal/pkg/pipeline/task/file/README.md index 93d1f9c..12beca3 100644 --- a/internal/pkg/pipeline/task/file/README.md +++ b/internal/pkg/pipeline/task/file/README.md @@ -22,6 +22,8 @@ The task automatically determines its mode based on the presence of input/output In read mode, the sanitized base filename is stored in the record context under the key `CATERPILLAR_FILE_NAME_WRITE`. The stem is lowercased with non-alphanumeric characters replaced by underscores, while the extension is preserved and lowercased (e.g. `"Report 1.CSV"` → `"report_1.csv"`). +The full source path is also stored under `CATERPILLAR_FILE_PATH_WRITE`, sanitized with the same rules. Because slashes and other separators collapse to underscores, the value is a single flat filename segment with no directory structure (e.g. `s3://prod-bucket/reports/type=X/2026-06-10/sub/Report (1).tsv` → `s3_prod_bucket_reports_type_x_2026_06_10_sub_report_1.tsv`). Use it when you need a unique destination name that encodes the whole source path. The extension is preserved, so don't re-append it in the destination `path`. + ## Configuration Fields | Field | Type | Default | Description | diff --git a/internal/pkg/pipeline/task/file/file.go b/internal/pkg/pipeline/task/file/file.go index 633e364..1bc55a3 100644 --- a/internal/pkg/pipeline/task/file/file.go +++ b/internal/pkg/pipeline/task/file/file.go @@ -146,6 +146,7 @@ func (f *file) readFile(output chan<- *record.Record) error { // Create a default record with context rc := &record.Record{Context: ctx} rc.SetContextValue(string(task.CtxKeyFileNameWrite), textutil.SlugifyFileName(filepath.Base(path))) + rc.SetContextValue(string(task.CtxKeyFilePathWrite), textutil.SlugifyFileName(path)) // let's write content to output channel f.SendData(rc.Context, content, output) diff --git a/internal/pkg/pipeline/task/sftp/README.md b/internal/pkg/pipeline/task/sftp/README.md index 4d3288a..f4c1818 100644 --- a/internal/pkg/pipeline/task/sftp/README.md +++ b/internal/pkg/pipeline/task/sftp/README.md @@ -17,7 +17,7 @@ Like the `file` task, the role is **inferred from the channels**: | The task has… | Role | What it does | |---------------|------|--------------| -| **no input** (it is the first task) | **source — download** | Reads file(s) at `path` (a single file or a glob; doublestar `**` and `{a,b}` are supported, like the file task) and emits one record per file. The base name is stored in the record context (`CATERPILLAR_FILE_NAME_WRITE`) so a downstream task can name what it writes. | +| **no input** (it is the first task) | **source — download** | Reads file(s) at `path` (a single file or a glob; doublestar `**` and `{a,b}` are supported, like the file task) and emits one record per file. The base name is stored in the record context (`CATERPILLAR_FILE_NAME_WRITE`), and the full sanitized source path in `CATERPILLAR_FILE_PATH_WRITE`, so a downstream task can name what it writes. | | **an input** | **sink — upload** | Writes each incoming record's data to `path`, used as-is per record. To name files from the source, template `path` — e.g. `{{ context "CATERPILLAR_FILE_NAME_WRITE" }}`. | It cannot be both: configuring the task with both an input and an output is an error. @@ -120,4 +120,4 @@ tasks: **`task_concurrency` opens multiple connections.** Setting `task_concurrency` above `1` makes the task open one SSH connection per worker and transfer files in parallel. This is faster, but it increases memory use (more files in memory at once), and some servers limit the number of connections per user. -**Nested directories are not preserved.** Each file is identified by its base name only. If you download files from nested folders with a recursive glob (for example `/data/**/*.csv`), they all land in the single destination directory, and files with the same name overwrite each other. To keep a folder structure, use a separate `file → sftp` pair for each folder, with the target path set for each one. +**Nested directories are not preserved.** If you template the destination with `CATERPILLAR_FILE_NAME_WRITE`, each file is identified by its base name only: download files from nested folders with a recursive glob (for example `/data/**/*.csv`) and they all land in the single destination directory, where files with the same name overwrite each other. To avoid collisions, template with `CATERPILLAR_FILE_PATH_WRITE` instead — it encodes the full source path into the (flat) filename, so same-named files from different folders stay distinct. To keep an actual folder structure on the target, use a separate `file → sftp` pair for each folder, with the target path set for each one. diff --git a/internal/pkg/pipeline/task/sftp/operations.go b/internal/pkg/pipeline/task/sftp/operations.go index 5eecfd9..f1eab28 100644 --- a/internal/pkg/pipeline/task/sftp/operations.go +++ b/internal/pkg/pipeline/task/sftp/operations.go @@ -96,6 +96,7 @@ func (s *sftp) download(client *pkgsftp.Client, output chan<- *record.Record) er rc := &record.Record{Context: ctx} rc.SetContextValue(string(task.CtxKeyFileNameWrite), textutil.SlugifyFileName(pathpkg.Base(p))) + rc.SetContextValue(string(task.CtxKeyFilePathWrite), textutil.SlugifyFileName(p)) s.SendData(rc.Context, data, output) } diff --git a/internal/pkg/pipeline/task/task.go b/internal/pkg/pipeline/task/task.go index 749c36b..c4ace7a 100644 --- a/internal/pkg/pipeline/task/task.go +++ b/internal/pkg/pipeline/task/task.go @@ -26,6 +26,7 @@ type contextKeyFile string const ( CtxKeyFileNameWrite contextKeyFile = "CATERPILLAR_FILE_NAME_WRITE" CtxKeyArchiveFileNameWrite contextKeyFile = "CATERPILLAR_ARCHIVE_FILE_NAME_WRITE" + CtxKeyFilePathWrite contextKeyFile = "CATERPILLAR_FILE_PATH_WRITE" ) type Task interface {