feat(table): Add idle-based file flushing and a reaper RollingDataWriter for streaming pipelines#900
feat(table): Add idle-based file flushing and a reaper RollingDataWriter for streaming pipelines#900abhirathod95 wants to merge 4 commits intoapache:mainfrom
Conversation
|
|
||
| // closeAndWait cancels the goroutine, waits for it to | ||
| // drain, and removes the writer from the map. | ||
| _ = writer.closeAndWait() |
There was a problem hiding this comment.
Error silently discarded. Final flush failure means data loss with no signal. At minimum log it; ideally send to an error channel the caller can inspect.
| ctx := context.Background() | ||
| outputCh := make(chan iceberg.DataFile, 10) | ||
|
|
||
| writer, err := factory.getOrCreateRollingDataWriter(ctx, nil, "part=a", nil, outputCh) |
There was a problem hiding this comment.
reapIdleWriters calls closeAndWait() without holding w.mu. Meanwhile, getOrCreateRollingDataWriter holds w.mu, loads the writer from the map, and returns it.
Between cancel() and writers.Delete() inside closeAndWait(), the main pipeline can get the cancelled writer:
Reaper: cancel() → writers.Delete() → wg.Wait()
↑ window
Main: mu.Lock() → writers.Load() → returns cancelled writer → Add() → ctx.Done() → error
Add() returns context.Canceled. The main write pipeline fails with an unexpected error. Data for a "just-reaped" partition breaks the whole pipeline instead of creating a new writer.
|
|
||
| func (r *RollingDataWriter) close() { | ||
| r.cancel() | ||
| close(r.recordCh) |
There was a problem hiding this comment.
Avoids the send-on-closed panic, but the drain loop's default case means any record that enters the buffer after the drain started is silently dropped and never Released.
There was a problem hiding this comment.
Where does the recordCh get closed now? We don't want to leak the resource and leave the channel open.
| if !idleTimer.Stop() { | ||
| select { | ||
| case <-idleTimerC: | ||
| default: | ||
| } |
There was a problem hiding this comment.
add a comment here about how Stop returns false if the timer already expired etc. and how this is consuming the message on the channel. Hence why we don't need to drain records on the idle timeout
|
|
||
| func (r *RollingDataWriter) close() { | ||
| r.cancel() | ||
| close(r.recordCh) |
There was a problem hiding this comment.
Where does the recordCh get closed now? We don't want to leak the resource and leave the channel open.
WriteRecords only rolls files when they hit targetFileSize. When the input is a long-lived channel (e.g. streaming events into time-based partitions like hour=2024010100), low-volume partitions may never reach the target size. Additionally, writers for older time partitions (yesterday's or 5 hours ago) will no longer receive any data, but will continue to use up resources. This means:
table/write_records.go
New public API options:
Both are threaded through writeRecordConfig → recordWritingArgs.
table/arrow_utils.go
Added idleTimeout and reaperTimeout fields to recordWritingArgs so they flow from the public API down to newWriterFactory.
table/rolling_data_writer.go
writerFactory changes:
RollingDataWriter changes:
a. recordCh — write record, reset idle timer if file is open
b. idleTimerC — flush current file (identical to size-based roll)
c. ctx.Done() — drain buffered records, flush, exit