diff --git a/internal/pkg/pipeline/task/file/README.md b/internal/pkg/pipeline/task/file/README.md index 93d1f9c..769aa57 100644 --- a/internal/pkg/pipeline/task/file/README.md +++ b/internal/pkg/pipeline/task/file/README.md @@ -20,7 +20,10 @@ The file task operates in two modes depending on whether an input channel is pro The task automatically determines its mode based on the presence of input/output channels. -In read mode, the sanitized base filename is stored in the record context under the key `CATERPILLAR_FILE_NAME_WRITE`. The stem is lowercased with non-alphanumeric characters replaced by underscores, while the extension is preserved and lowercased (e.g. `"Report 1.CSV"` → `"report_1.csv"`). +In read mode, two values are stored in each record's context: + +- `CATERPILLAR_FILE_NAME_WRITE` — the sanitized base filename. The stem is lowercased with non-alphanumeric characters replaced by underscores, while the extension is preserved and lowercased (e.g. `"Report 1.CSV"` → `"report_1.csv"`). +- `CATERPILLAR_FILE_PATH_WRITE` — the sanitized full source path with directory hierarchy preserved. Each segment is slugified the same way; the final segment keeps its extension; URL schemes such as `s3://bucket/` are stripped (e.g. `s3://my-bucket/ReportType=A/Folder 1/data.CSV` → `reporttype_a/folder_1/data.csv`). Reference it in the destination of a downstream write task to avoid collisions when reading nested directories with a recursive glob. ## Configuration Fields @@ -146,6 +149,26 @@ tasks: path: output/data_{{ macro "timestamp" }}.txt ``` +### Preserving source hierarchy when reading nested directories: + +When reading from a recursive glob, two files in different subdirectories can share the same base name (e.g. `reportType=X/subA/data.tsv` and `reportType=X/subB/data.tsv`). Referencing `CATERPILLAR_FILE_NAME_WRITE` alone in the destination would collide; `CATERPILLAR_FILE_PATH_WRITE` preserves the source folder hierarchy so the writes stay distinct. + +```yaml +tasks: + - name: read_nested + type: file + path: s3://source-bucket/reportType=X/**/*.tsv + + - name: write_mirrored + type: file + path: s3://dest-bucket/ds={{ macro "date" }}/{{ context "CATERPILLAR_FILE_PATH_WRITE" }} + region: us-east-1 +``` + +With the inputs above, the writes land at: +- `s3://dest-bucket/ds=.../reporttype_x/suba/data.tsv` +- `s3://dest-bucket/ds=.../reporttype_x/subb/data.tsv` + ## Sample Pipelines - `test/pipelines/file.yaml` - Basic file operations diff --git a/internal/pkg/pipeline/task/file/file.go b/internal/pkg/pipeline/task/file/file.go index 633e364..5b92c7c 100644 --- a/internal/pkg/pipeline/task/file/file.go +++ b/internal/pkg/pipeline/task/file/file.go @@ -144,8 +144,10 @@ func (f *file) readFile(output chan<- *record.Record) error { } // Create a default record with context + fileName := textutil.SlugifyFileName(filepath.Base(path)) rc := &record.Record{Context: ctx} - rc.SetContextValue(string(task.CtxKeyFileNameWrite), textutil.SlugifyFileName(filepath.Base(path))) + rc.SetContextValue(string(task.CtxKeyFileNameWrite), fileName) + rc.SetContextValue(string(task.CtxKeyFilePathWrite), textutil.SlugifyFilePath(path)) // let's write content to output channel f.SendData(rc.Context, content, output) diff --git a/internal/pkg/pipeline/task/task.go b/internal/pkg/pipeline/task/task.go index 749c36b..776a067 100644 --- a/internal/pkg/pipeline/task/task.go +++ b/internal/pkg/pipeline/task/task.go @@ -25,6 +25,7 @@ type contextKeyFile string const ( CtxKeyFileNameWrite contextKeyFile = "CATERPILLAR_FILE_NAME_WRITE" + CtxKeyFilePathWrite contextKeyFile = "CATERPILLAR_FILE_PATH_WRITE" CtxKeyArchiveFileNameWrite contextKeyFile = "CATERPILLAR_ARCHIVE_FILE_NAME_WRITE" ) diff --git a/internal/pkg/textutil/slugify.go b/internal/pkg/textutil/slugify.go index 2203647..3b69c21 100644 --- a/internal/pkg/textutil/slugify.go +++ b/internal/pkg/textutil/slugify.go @@ -19,6 +19,60 @@ func Slugify(name string) string { return strings.ToLower(result) } +// SlugifyFilePath slugifies each segment of a path while preserving the +// "/" separator between segments. A leading ":///" prefix +// (e.g. "s3://my-bucket/") is stripped so only the path within the source +// location remains; what remains is split on "/", interior segments are +// slugified via Slugify, and the final segment is slugified via +// SlugifyFileName so its extension is preserved. Backslashes are +// normalized to forward slashes, and empty segments (leading "/", "//", +// interior segments that slugify to "") are dropped. +// e.g. "s3://my-bucket/ReportType=A/data 1.CSV" -> "reporttype_a/data_1.csv" +func SlugifyFilePath(path string) string { + path = stripURLScheme(path) + if path == "" { + return "" + } + + parts := strings.FieldsFunc(path, func(r rune) bool { + return r == '/' || r == '\\' + }) + + if len(parts) == 0 { + return "" + } + + out := make([]string, 0, len(parts)) + for i, p := range parts { + if i == len(parts)-1 { + out = append(out, SlugifyFileName(p)) + continue + } + + if s := Slugify(p); s != "" { + out = append(out, s) + } + } + + return strings.Join(out, "/") +} + +// stripURLScheme strips a leading "://" prefix (e.g. "s3://my-bucket") +// from path. If a scheme is present but no path follows the host +// (e.g. "s3://bucket-only"), an empty string is returned. When no scheme +// is present, path is returned unchanged. +func stripURLScheme(path string) string { + i := strings.Index(path, "://") + if i <= 0 { + return path + } + rest := path[i+3:] + if j := strings.Index(rest, "/"); j >= 0 { + return rest[j:] + } + return "" +} + // SlugifyFileName slugifies a filename while preserving the extension. // The stem is slugified via Slugify and the extension is lowercased. // Empty stems default to "file". Filenames exceeding 200 characters diff --git a/test/pipelines/file_path_write_test.yaml b/test/pipelines/file_path_write_test.yaml new file mode 100644 index 0000000..2c9c7f8 --- /dev/null +++ b/test/pipelines/file_path_write_test.yaml @@ -0,0 +1,8 @@ +tasks: + - name: read_nested_json + type: file + path: test/pipelines/**/*.json + + - name: write_mirrored + type: file + path: /tmp/caterpillar/file_path_write_test/{{ context "CATERPILLAR_FILE_PATH_WRITE" }}