From c733e1a2c17a931e58582d7ed9ec6a19c7a94fa6 Mon Sep 17 00:00:00 2001 From: snehal ahire Date: Tue, 23 Jun 2026 22:16:48 +0530 Subject: [PATCH 1/3] FEAT: Add CATERPILLAR_FILE_PATH_WRITE context key Expose the sanitized full source path as a record context value on the file task's read mode, alongside the existing CATERPILLAR_FILE_NAME_WRITE (base name only). Path segments are slugified individually with "/" preserved between them, so the directory hierarchy survives and can be used in destination paths to avoid same-name collisions when reading nested folders with a recursive glob (e.g. reportType=X/**/**.tsv). Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/pkg/pipeline/task/file/README.md | 5 ++- internal/pkg/pipeline/task/file/file.go | 1 + internal/pkg/pipeline/task/task.go | 1 + internal/pkg/textutil/slugify.go | 45 +++++++++++++++++++++++ 4 files changed, 51 insertions(+), 1 deletion(-) diff --git a/internal/pkg/pipeline/task/file/README.md b/internal/pkg/pipeline/task/file/README.md index 93d1f9c..74f4818 100644 --- a/internal/pkg/pipeline/task/file/README.md +++ b/internal/pkg/pipeline/task/file/README.md @@ -20,7 +20,10 @@ The file task operates in two modes depending on whether an input channel is pro The task automatically determines its mode based on the presence of input/output channels. -In read mode, the sanitized base filename is stored in the record context under the key `CATERPILLAR_FILE_NAME_WRITE`. The stem is lowercased with non-alphanumeric characters replaced by underscores, while the extension is preserved and lowercased (e.g. `"Report 1.CSV"` → `"report_1.csv"`). +In read mode, two values are stored in each record's context: + +- `CATERPILLAR_FILE_NAME_WRITE` — the sanitized base filename. The stem is lowercased with non-alphanumeric characters replaced by underscores, while the extension is preserved and lowercased (e.g. `"Report 1.CSV"` → `"report_1.csv"`). +- `CATERPILLAR_FILE_PATH_WRITE` — the sanitized full source path with directory hierarchy preserved. Each segment is slugified the same way; the final segment keeps its extension; URL schemes such as `s3://bucket/` are stripped (e.g. `s3://my-bucket/ReportType=A/Folder 1/data.CSV` → `reporttype_a/folder_1/data.csv`). Reference it in the destination of a downstream write task to avoid collisions when reading nested directories with a recursive glob. ## Configuration Fields diff --git a/internal/pkg/pipeline/task/file/file.go b/internal/pkg/pipeline/task/file/file.go index 633e364..660395f 100644 --- a/internal/pkg/pipeline/task/file/file.go +++ b/internal/pkg/pipeline/task/file/file.go @@ -146,6 +146,7 @@ func (f *file) readFile(output chan<- *record.Record) error { // Create a default record with context rc := &record.Record{Context: ctx} rc.SetContextValue(string(task.CtxKeyFileNameWrite), textutil.SlugifyFileName(filepath.Base(path))) + rc.SetContextValue(string(task.CtxKeyFilePathWrite), textutil.SlugifyFilePath(path)) // let's write content to output channel f.SendData(rc.Context, content, output) diff --git a/internal/pkg/pipeline/task/task.go b/internal/pkg/pipeline/task/task.go index 749c36b..776a067 100644 --- a/internal/pkg/pipeline/task/task.go +++ b/internal/pkg/pipeline/task/task.go @@ -25,6 +25,7 @@ type contextKeyFile string const ( CtxKeyFileNameWrite contextKeyFile = "CATERPILLAR_FILE_NAME_WRITE" + CtxKeyFilePathWrite contextKeyFile = "CATERPILLAR_FILE_PATH_WRITE" CtxKeyArchiveFileNameWrite contextKeyFile = "CATERPILLAR_ARCHIVE_FILE_NAME_WRITE" ) diff --git a/internal/pkg/textutil/slugify.go b/internal/pkg/textutil/slugify.go index 2203647..0a2901e 100644 --- a/internal/pkg/textutil/slugify.go +++ b/internal/pkg/textutil/slugify.go @@ -19,6 +19,51 @@ func Slugify(name string) string { return strings.ToLower(result) } +// SlugifyFilePath slugifies each segment of a path while preserving the +// "/" separator between segments. A leading ":///" prefix +// (e.g. "s3://my-bucket/") is stripped so only the path within the source +// location remains; what remains is split on "/", interior segments are +// slugified via Slugify, and the final segment is slugified via +// SlugifyFileName so its extension is preserved. Backslashes are +// normalized to forward slashes, and empty segments (leading "/", "//", +// interior segments that slugify to "") are dropped. +// e.g. "s3://my-bucket/ReportType=A/data 1.CSV" -> "reporttype_a/data_1.csv" +func SlugifyFilePath(path string) string { + if path == "" { + return "" + } + if i := strings.Index(path, "://"); i > 0 { + rest := path[i+3:] + if j := strings.Index(rest, "/"); j >= 0 { + path = rest[j:] + } else { + path = "" + } + } + path = strings.ReplaceAll(path, "\\", "/") + parts := strings.Split(path, "/") + nonEmpty := parts[:0] + for _, p := range parts { + if p != "" { + nonEmpty = append(nonEmpty, p) + } + } + if len(nonEmpty) == 0 { + return "" + } + out := make([]string, 0, len(nonEmpty)) + for i, p := range nonEmpty { + if i == len(nonEmpty)-1 { + out = append(out, SlugifyFileName(p)) + continue + } + if s := Slugify(p); s != "" { + out = append(out, s) + } + } + return strings.Join(out, "/") +} + // SlugifyFileName slugifies a filename while preserving the extension. // The stem is slugified via Slugify and the extension is lowercased. // Empty stems default to "file". Filenames exceeding 200 characters From de7105be08d3338949e33ecbd37e251469363564 Mon Sep 17 00:00:00 2001 From: snehal ahire Date: Wed, 24 Jun 2026 15:27:12 +0530 Subject: [PATCH 2/3] refactored code --- internal/pkg/textutil/slugify.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/internal/pkg/textutil/slugify.go b/internal/pkg/textutil/slugify.go index 0a2901e..5ba1790 100644 --- a/internal/pkg/textutil/slugify.go +++ b/internal/pkg/textutil/slugify.go @@ -32,35 +32,36 @@ func SlugifyFilePath(path string) string { if path == "" { return "" } + if i := strings.Index(path, "://"); i > 0 { rest := path[i+3:] if j := strings.Index(rest, "/"); j >= 0 { path = rest[j:] } else { - path = "" - } - } - path = strings.ReplaceAll(path, "\\", "/") - parts := strings.Split(path, "/") - nonEmpty := parts[:0] - for _, p := range parts { - if p != "" { - nonEmpty = append(nonEmpty, p) + return "" } } - if len(nonEmpty) == 0 { + + parts := strings.FieldsFunc(path, func(r rune) bool { + return r == '/' || r == '\\' + }) + + if len(parts) == 0 { return "" } - out := make([]string, 0, len(nonEmpty)) - for i, p := range nonEmpty { - if i == len(nonEmpty)-1 { + + out := make([]string, 0, len(parts)) + for i, p := range parts { + if i == len(parts)-1 { out = append(out, SlugifyFileName(p)) continue } + if s := Slugify(p); s != "" { out = append(out, s) } } + return strings.Join(out, "/") } From cc751c2a272bba4c874b083168c364848325a1e3 Mon Sep 17 00:00:00 2001 From: snehal ahire Date: Wed, 24 Jun 2026 20:38:22 +0530 Subject: [PATCH 3/3] REFACTOR: Address PR review feedback - Extract URL scheme stripping in SlugifyFilePath into a stripURLScheme helper. - Hoist the slugified file name into a local variable in the file task. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/pkg/pipeline/task/file/file.go | 3 ++- internal/pkg/textutil/slugify.go | 26 ++++++++++++++++--------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/internal/pkg/pipeline/task/file/file.go b/internal/pkg/pipeline/task/file/file.go index 660395f..5b92c7c 100644 --- a/internal/pkg/pipeline/task/file/file.go +++ b/internal/pkg/pipeline/task/file/file.go @@ -144,8 +144,9 @@ func (f *file) readFile(output chan<- *record.Record) error { } // Create a default record with context + fileName := textutil.SlugifyFileName(filepath.Base(path)) rc := &record.Record{Context: ctx} - rc.SetContextValue(string(task.CtxKeyFileNameWrite), textutil.SlugifyFileName(filepath.Base(path))) + rc.SetContextValue(string(task.CtxKeyFileNameWrite), fileName) rc.SetContextValue(string(task.CtxKeyFilePathWrite), textutil.SlugifyFilePath(path)) // let's write content to output channel diff --git a/internal/pkg/textutil/slugify.go b/internal/pkg/textutil/slugify.go index 5ba1790..3b69c21 100644 --- a/internal/pkg/textutil/slugify.go +++ b/internal/pkg/textutil/slugify.go @@ -29,19 +29,11 @@ func Slugify(name string) string { // interior segments that slugify to "") are dropped. // e.g. "s3://my-bucket/ReportType=A/data 1.CSV" -> "reporttype_a/data_1.csv" func SlugifyFilePath(path string) string { + path = stripURLScheme(path) if path == "" { return "" } - if i := strings.Index(path, "://"); i > 0 { - rest := path[i+3:] - if j := strings.Index(rest, "/"); j >= 0 { - path = rest[j:] - } else { - return "" - } - } - parts := strings.FieldsFunc(path, func(r rune) bool { return r == '/' || r == '\\' }) @@ -65,6 +57,22 @@ func SlugifyFilePath(path string) string { return strings.Join(out, "/") } +// stripURLScheme strips a leading "://" prefix (e.g. "s3://my-bucket") +// from path. If a scheme is present but no path follows the host +// (e.g. "s3://bucket-only"), an empty string is returned. When no scheme +// is present, path is returned unchanged. +func stripURLScheme(path string) string { + i := strings.Index(path, "://") + if i <= 0 { + return path + } + rest := path[i+3:] + if j := strings.Index(rest, "/"); j >= 0 { + return rest[j:] + } + return "" +} + // SlugifyFileName slugifies a filename while preserving the extension. // The stem is slugified via Slugify and the extension is lowercased. // Empty stems default to "file". Filenames exceeding 200 characters