Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion internal/pkg/pipeline/task/file/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ The file task operates in two modes depending on whether an input channel is pro

The task automatically determines its mode based on the presence of input/output channels.

In read mode, the sanitized base filename is stored in the record context under the key `CATERPILLAR_FILE_NAME_WRITE`. The stem is lowercased with non-alphanumeric characters replaced by underscores, while the extension is preserved and lowercased (e.g. `"Report 1.CSV"` → `"report_1.csv"`).
In read mode, two values are stored in each record's context:

- `CATERPILLAR_FILE_NAME_WRITE` — the sanitized base filename. The stem is lowercased with non-alphanumeric characters replaced by underscores, while the extension is preserved and lowercased (e.g. `"Report 1.CSV"` → `"report_1.csv"`).
- `CATERPILLAR_FILE_PATH_WRITE` — the sanitized full source path with directory hierarchy preserved. Each segment is slugified the same way; the final segment keeps its extension; URL schemes such as `s3://bucket/` are stripped (e.g. `s3://my-bucket/ReportType=A/Folder 1/data.CSV` → `reporttype_a/folder_1/data.csv`). Reference it in the destination of a downstream write task to avoid collisions when reading nested directories with a recursive glob.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any use case/example where we would be leveraging this sluggified file path?


## Configuration Fields

Expand Down
4 changes: 3 additions & 1 deletion internal/pkg/pipeline/task/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,10 @@ func (f *file) readFile(output chan<- *record.Record) error {
}

// Create a default record with context
fileName := textutil.SlugifyFileName(filepath.Base(path))
rc := &record.Record{Context: ctx}
rc.SetContextValue(string(task.CtxKeyFileNameWrite), textutil.SlugifyFileName(filepath.Base(path)))
rc.SetContextValue(string(task.CtxKeyFileNameWrite), fileName)
rc.SetContextValue(string(task.CtxKeyFilePathWrite), textutil.SlugifyFilePath(path))

// let's write content to output channel
f.SendData(rc.Context, content, output)
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/pipeline/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type contextKeyFile string

const (
CtxKeyFileNameWrite contextKeyFile = "CATERPILLAR_FILE_NAME_WRITE"
CtxKeyFilePathWrite contextKeyFile = "CATERPILLAR_FILE_PATH_WRITE"
CtxKeyArchiveFileNameWrite contextKeyFile = "CATERPILLAR_ARCHIVE_FILE_NAME_WRITE"
)

Expand Down
54 changes: 54 additions & 0 deletions internal/pkg/textutil/slugify.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,60 @@ func Slugify(name string) string {
return strings.ToLower(result)
}

// SlugifyFilePath slugifies each segment of a path while preserving the
// "/" separator between segments. A leading "<scheme>://<host>/" prefix
// (e.g. "s3://my-bucket/") is stripped so only the path within the source
// location remains; what remains is split on "/", interior segments are
// slugified via Slugify, and the final segment is slugified via
// SlugifyFileName so its extension is preserved. Backslashes are
// normalized to forward slashes, and empty segments (leading "/", "//",
// interior segments that slugify to "") are dropped.
// e.g. "s3://my-bucket/ReportType=A/data 1.CSV" -> "reporttype_a/data_1.csv"
func SlugifyFilePath(path string) string {
path = stripURLScheme(path)
if path == "" {
return ""
}

parts := strings.FieldsFunc(path, func(r rune) bool {
return r == '/' || r == '\\'
})

if len(parts) == 0 {
return ""
}

out := make([]string, 0, len(parts))
for i, p := range parts {
if i == len(parts)-1 {
out = append(out, SlugifyFileName(p))
continue
}

if s := Slugify(p); s != "" {
out = append(out, s)
}
}

return strings.Join(out, "/")
}

// stripURLScheme strips a leading "<scheme>://<host>" prefix (e.g. "s3://my-bucket")
// from path. If a scheme is present but no path follows the host
// (e.g. "s3://bucket-only"), an empty string is returned. When no scheme
// is present, path is returned unchanged.
func stripURLScheme(path string) string {
i := strings.Index(path, "://")
if i <= 0 {
return path
}
rest := path[i+3:]
if j := strings.Index(rest, "/"); j >= 0 {
return rest[j:]
}
return ""
}

// SlugifyFileName slugifies a filename while preserving the extension.
// The stem is slugified via Slugify and the extension is lowercased.
// Empty stems default to "file". Filenames exceeding 200 characters
Expand Down
Loading