Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/pkg/sftp v1.13.10
github.com/stretchr/testify v1.11.1
github.com/xuri/excelize/v2 v2.10.0
github.com/yamitzky/xlrd-go v0.1.0
golang.org/x/crypto v0.45.0
golang.org/x/net v0.47.0
google.golang.org/protobuf v1.36.9
Expand Down Expand Up @@ -84,7 +85,7 @@ require (
go.yaml.in/yaml/v2 v2.4.3 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/text v0.31.0 // indirect
golang.org/x/text v0.32.0 // indirect
)

require (
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,8 @@ github.com/xuri/excelize/v2 v2.10.0 h1:8aKsP7JD39iKLc6dH5Tw3dgV3sPRh8uRVXu/fMstf
github.com/xuri/excelize/v2 v2.10.0/go.mod h1:SC5TzhQkaOsTWpANfm+7bJCldzcnU/jrhqkTi/iBHBU=
github.com/xuri/nfp v0.0.2-0.20250530014748-2ddeb826f9a9 h1:+C0TIdyyYmzadGaL/HBLbf3WdLgC29pgyhTjAT/0nuE=
github.com/xuri/nfp v0.0.2-0.20250530014748-2ddeb826f9a9/go.mod h1:WwHg+CVyzlv/TX9xqBFXEZAuxOPxn2k1GNHwG41IIUQ=
github.com/yamitzky/xlrd-go v0.1.0 h1:WPrLvRMz/ob+ZmEWMmbg/TtrUVh2BTCGGzbqRzsrYBU=
github.com/yamitzky/xlrd-go v0.1.0/go.mod h1:qH3XYtKvWAvhH87qmIDY6YgxAXKAyLD28jpum/PLS7k=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
Expand Down Expand Up @@ -566,8 +568,8 @@ golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -604,8 +606,8 @@ golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM=
golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM=
golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU=
golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
27 changes: 21 additions & 6 deletions internal/pkg/pipeline/task/converter/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Converter Task

The `converter` task converts data between different formats, supporting CSV, HTML, XLSX (Excel), EML (Email), Protobuf, and other data format transformations.
The `converter` task converts data between different formats, supporting CSV, HTML, XLSX, XLS, EML (Email), Protobuf, and other data format transformations.

## Function

Expand All @@ -21,7 +21,7 @@ The converter task transforms data between different formats. It receives record
|-------|------|---------|-------------|
| `name` | string | - | Task name for identification |
| `type` | string | `converter` | Must be "converter" |
| `format` | string | - | Format to convert to (csv, html, sst, xlsx, eml, protobuf) |
| `format` | string | - | Format to convert to (csv, html, sst, xlsx, xls, eml, protobuf) |
| `delimiter` | string| \t | Used only in sst converter for spliting key and value|

### CSV Format Options
Expand Down Expand Up @@ -92,7 +92,7 @@ tasks:
### SST Format Options
Convert a single line to the SSTable which could be stored on s3 or via file. It expects a single line as input

### XLSX Format Options
### XLSX / XLS Format Options

| Field | Type | Default | Description |
|-------|------|---------|-------------|
Expand All @@ -102,14 +102,15 @@ Convert a single line to the SSTable which could be stored on s3 or via file. It
| `sanitize_headers` | bool | `false` | If true, normalizes header row values: non-alphanumeric characters are replaced by underscores, leading/trailing underscores are trimmed, and the result is lowercased. Assumes the first unskipped row to be header |
| `sanitize_sheet_names` | bool | `false` | If true, normalizes sheet names: non-alphanumeric characters are replaced by underscores, leading/trailing underscores are trimmed, and the result is lowercased before storing in the `xlsx_sheet_name` context key |

**Important:** The XLSX converter emits **one record per sheet**. Each record contains the sheet's data in CSV format, with the sheet name available in the record context under the key `xlsx_sheet_name`.
**Important:** Both converters emit **one record per sheet**. Each record contains the sheet's data in CSV format, with the sheet name available in the record context under the key `xlsx_sheet_name`.

## Supported Formats

The converter supports the following formats:
- **CSV**: Converts CSV data to JSON with column mapping and type conversion
- **HTML**: Converts HTML to JSON representation with element structure
- **XLSX**: Converts Excel files to CSV format. **Note:** Each sheet in the Excel file is emitted as a separate record with the sheet name stored in the context (key: `xlsx_sheet_name`)
- **XLSX**: Converts modern Excel files to CSV format. **Note:** Each sheet is emitted as a separate record with the sheet name stored in the context (key: `xlsx_sheet_name`)
- **XLS**: Converts legacy Excel 97-2003 files (`.xls`, BIFF8) to CSV format. Same options and per-sheet output as XLSX
- **EML**: Converts EML (Email) files to their constituent parts (HTML body, Text body, Attachments)
- **Protobuf**: Decodes binary protobuf messages to JSON using a compiled FileDescriptorSet

Expand Down Expand Up @@ -166,6 +167,20 @@ tasks:
only_data: true
```

### Legacy Excel 97-2003 (.xls) to CSV:
```yaml
tasks:
- name: read_excel
type: file
path: data.xls
- name: convert_excel
type: converter
format: xls
- name: echo
type: echo
only_data: true
```

### Excel to CSV conversion (specific sheets):
```yaml
tasks:
Expand Down Expand Up @@ -236,4 +251,4 @@ tasks:
- **Database migration**: Convert data for different database systems
- **Report generation**: Convert data to report-friendly formats
- **Data exchange**: Enable data sharing between different systems
- **ETL workflows**: Transform data as part of extract, transform, load processes
- **ETL workflows**: Transform data as part of extract, transform, load processes
1 change: 1 addition & 0 deletions internal/pkg/pipeline/task/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (c *core) UnmarshalYAML(unmarshal func(interface{}) error) error {
`html`: new(html),
`sst`: new(sst),
`xlsx`: new(xlsx),
`xls`: new(xls),
`eml`: new(eml),
`protobuf`: new(protobuf),
}
Expand Down
260 changes: 260 additions & 0 deletions internal/pkg/pipeline/task/converter/xls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
package converter

import (
"bytes"
csvEncoder "encoding/csv"
"fmt"
"io"
"math"
"strconv"

"github.com/yamitzky/xlrd-go/xlrd"

"github.com/patterninc/caterpillar/internal/pkg/textutil"
)

type xls struct {
Sheets []string `yaml:"sheets,omitempty" json:"sheets,omitempty"`
SkipRows int `yaml:"skip_rows,omitempty" json:"skip_rows,omitempty"`
SkipRowsBySheet map[string]int `yaml:"skip_rows_by_sheet,omitempty" json:"skip_rows_by_sheet,omitempty"`
SanitizeHeaders bool `yaml:"sanitize_headers,omitempty" json:"sanitize_headers,omitempty"`
SanitizeSheetNames bool `yaml:"sanitize_sheet_names,omitempty" json:"sanitize_sheet_names,omitempty"`
}

func (x *xls) convert(data []byte, _ string) (outputs []converterOutput, err error) {
// recover to avoid crash due to panic.
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic while parsing .xls file: %v", r)
}
}()

// Logfile defaults to stdout, so redirect its diagnostics to avoid polluting task output.
reader, err := xlrd.OpenWorkbook(``, &xlrd.OpenWorkbookOptions{
FileContents: data,
Logfile: io.Discard,
})
if err != nil {
return nil, err
}

// Get sheets
sheets := reader.SheetNames()
if len(sheets) == 0 {
return nil, fmt.Errorf("no sheet found in the excel file")
}

if len(x.Sheets) > 0 {
sheets = x.Sheets
}

// Create one output record per sheet
outputs = make([]converterOutput, 0, len(sheets))

for _, sheet := range sheets {
output, err := x.readSheet(reader, sheet)
if err != nil {
return nil, err
}

outputs = append(outputs, output)
}

return outputs, nil
}

func (x *xls) readSheet(reader *xlrd.Book, sheet string) (converterOutput, error) {
rowsToSkip := x.getRowsToSkip(sheet)
// Create buffer for this sheet
var buff bytes.Buffer
writer := csvEncoder.NewWriter(&buff)

// Get all rows from the sheet
// Unlike xlsx, which uses excelise, there is no api to get formatted rows as output,
// so we require custom formatting over the cells based on their format types
rows, err := x.sheetRows(reader, sheet)
if err != nil {
return converterOutput{}, fmt.Errorf("error reading rows from sheet %s: %w", sheet, err)
}

// Write rows to buffer
isHeaderRow := true
for i, cols := range rows {
if i < rowsToSkip {
continue
}

if x.SanitizeHeaders && isHeaderRow {
for j, col := range cols {
cols[j] = textutil.Slugify(col)
}
isHeaderRow = false
}

if err := writer.Write(cols); err != nil {
return converterOutput{}, err
}
}

// Flush the writer
writer.Flush()
if err := writer.Error(); err != nil {
return converterOutput{}, err
}

outputSheetName := sheet
if x.SanitizeSheetNames {
outputSheetName = textutil.Slugify(sheet)
}

return converterOutput{
Data: buff.Bytes(),
Metadata: map[string]string{
sheetName: outputSheetName,
},
}, nil
}

// -------------------------- Everything below are helper functions ----------------------------//

func (x *xls) sheetRows(reader *xlrd.Book, sheet string) ([][]string, error) {
sh, err := reader.SheetByName(sheet)
if err != nil {
return nil, err
}

rows := make([][]string, 0, sh.NRows)
for r := 0; r < sh.NRows; r++ {
cols := make([]string, 0, sh.NCols)
for c := 0; c < sh.NCols; c++ {
cols = append(cols, cellString(reader, sh, r, c))
}
rows = append(rows, trimTrailingEmpty(cols))
}
for len(rows) > 0 && len(rows[len(rows)-1]) == 0 {
rows = rows[:len(rows)-1]
}

return rows, nil
}

func (x *xls) getRowsToSkip(sheet string) int {
rowsToSkip := x.SkipRows
if x.SkipRowsBySheet != nil {
if val, found := x.SkipRowsBySheet[sheet]; found {
rowsToSkip = val
}
}

if rowsToSkip < 0 {
rowsToSkip = 0
}

return rowsToSkip
}

// cellString renders a cell to its CSV text
func cellString(book *xlrd.Book, sheet *xlrd.Sheet, r, c int) string {
switch sheet.RawCellType(r, c) {
case xlrd.XL_CELL_TEXT:
if s, ok := sheet.RawCellValue(r, c).(string); ok {
return s
}
case xlrd.XL_CELL_NUMBER:
f, ok := sheet.RawCellValue(r, c).(float64)
if !ok {
return ``
}
if isDateCell(book, sheet.RawCellXFIndex(r, c)) {
if s, ok := formatDate(f, book.Datemode); ok {
return s
}
}
return formatNumber(f)
case xlrd.XL_CELL_BOOLEAN:
switch v := sheet.RawCellValue(r, c).(type) {
case int:
return boolText(v != 0)
case bool:
return boolText(v)
}
case xlrd.XL_CELL_ERROR:
// xlrd-go stores the raw BIFF error code, so we emit "#ERR<code>".
return fmt.Sprintf(`#ERR%v`, sheet.RawCellValue(r, c))
}
return ``
}

// isDateCell reports whether a numeric cell carries a date/time number format
func isDateCell(book *xlrd.Book, xfIndex int) bool {
if xfIndex < 0 || xfIndex >= len(book.XFList) {
return false
}
formatKey := book.XFList[xfIndex].FormatKey
if isBuiltinDateFormat(formatKey) {
return true
}
if book.FormatMap == nil {
return false
}
format := book.FormatMap[formatKey]
if format == nil || format.FormatString == `` {
return false
}
return xlrd.IsDateFormatString(book, format.FormatString)
}

// isBuiltinDateFormat reports whether a built-in number-format key is a date/time
// format. Ranges match excelize
func isBuiltinDateFormat(key int) bool {
switch {
case key >= 14 && key <= 22,
key >= 27 && key <= 36,
key >= 45 && key <= 47,
key >= 50 && key <= 58,
key >= 71 && key <= 81:
return true
default:
return false
}
}

func formatDate(value float64, datemode int) (string, bool) {
if math.IsNaN(value) || math.IsInf(value, 0) {
return ``, false
}
t, err := xlrd.XldateAsDatetime(value, datemode)
if err != nil {
return ``, false
}
switch {
case value >= 0 && value < 1:
return t.Format(`15:04:05`), true
case value != math.Floor(value):
return t.Format(`2006-01-02 15:04:05`), true
default:
return t.Format(`2006-01-02`), true
}
}

func boolText(b bool) string {
if b {
return `TRUE`
}
return `FALSE`
}

func formatNumber(f float64) string {
if f == math.Trunc(f) && !math.IsInf(f, 0) && !math.IsNaN(f) && math.Abs(f) < 1e18 {
return strconv.FormatInt(int64(f), 10)
}
return strconv.FormatFloat(f, 'f', -1, 64)
}

func trimTrailingEmpty(row []string) []string {
end := len(row)
for end > 0 && row[end-1] == "" {
end--
}
return row[:end]
}
Loading