Skip to content

feat(table): Add idle-based file flushing and a reaper RollingDataWriter for streaming pipelines#900

Open
abhirathod95 wants to merge 4 commits intoapache:mainfrom
abhirathod95:fanout-writer-time-flush
Open

feat(table): Add idle-based file flushing and a reaper RollingDataWriter for streaming pipelines#900
abhirathod95 wants to merge 4 commits intoapache:mainfrom
abhirathod95:fanout-writer-time-flush

Conversation

@abhirathod95
Copy link
Copy Markdown
Contributor

@abhirathod95 abhirathod95 commented Apr 14, 2026

WriteRecords only rolls files when they hit targetFileSize. When the input is a long-lived channel (e.g. streaming events into time-based partitions like hour=2024010100), low-volume partitions may never reach the target size. Additionally, writers for older time partitions (yesterday's or 5 hours ago) will no longer receive any data, but will continue to use up resources. This means:

  • Buffered data stays invisible to readers until the file is eventually closed
  • Open file writers and goroutines accumulate for stale partitions
  • Buffered data is lost on crash

table/write_records.go

New public API options:

  • WithIdleTimeout(d) — flush the current data file when no records arrive for d. After flush, the writer stays alive and opens a new file on the next record.
  • WithReaperTimeout(d) — tear down writer goroutines that have been idle for d, freeing resources. Independent from idle flush. Defaults to 10x idle timeout if omitted.

Both are threaded through writeRecordConfig → recordWritingArgs.

table/arrow_utils.go

Added idleTimeout and reaperTimeout fields to recordWritingArgs so they flow from the public API down to newWriterFactory.

table/rolling_data_writer.go

writerFactory changes:

  • New fields: idleTimeout, reaperTimeout, reaperCancel, reaperDone
  • newWriterFactory starts a reapIdleWriters goroutine when idleTimeout > 0 && reaperTimeout >= 0. A negative reaper timeout disables it.
  • reapIdleWriters ticks at reaperTimeout, scans all writers, and tears down any whose lastWriteTime exceeds the threshold.
  • closeAll stops the reaper and waits for it before closing writers.

RollingDataWriter changes:

  • New field: lastWriteTime atomic.Int64 — updated on every record write.
  • stream() rewritten from for range recordCh to an explicit select loop with three cases:
    a. recordCh — write record, reset idle timer if file is open
    b. idleTimerC — flush current file (identical to size-based roll)
    c. ctx.Done() — drain buffered records, flush, exit
  • close() calls cancel() only (not close(recordCh)) to avoid send-on-closed-channel panics when the reaper races with Add.

@abhirathod95 abhirathod95 requested a review from zeroshade as a code owner April 14, 2026 20:30
@abhirathod95 abhirathod95 changed the title Adds idle-based file flushing and a reaper RollingDataWriter for streaming pipelines feat(table): Add idle-based file flushing and a reaper RollingDataWriter for streaming pipelines Apr 14, 2026
Copy link
Copy Markdown
Contributor

@laskoviymishka laskoviymishka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall LGTM, there is one potential race condition, once fixed - good to merge.


// closeAndWait cancels the goroutine, waits for it to
// drain, and removes the writer from the map.
_ = writer.closeAndWait()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error silently discarded. Final flush failure means data loss with no signal. At minimum log it; ideally send to an error channel the caller can inspect.

ctx := context.Background()
outputCh := make(chan iceberg.DataFile, 10)

writer, err := factory.getOrCreateRollingDataWriter(ctx, nil, "part=a", nil, outputCh)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reapIdleWriters calls closeAndWait() without holding w.mu. Meanwhile, getOrCreateRollingDataWriter holds w.mu, loads the writer from the map, and returns it.

Between cancel() and writers.Delete() inside closeAndWait(), the main pipeline can get the cancelled writer:

  Reaper:    cancel()  →  writers.Delete()  →  wg.Wait()
                    ↑ window
  Main:      mu.Lock() → writers.Load() → returns cancelled writer → Add() → ctx.Done() → error

Add() returns context.Canceled. The main write pipeline fails with an unexpected error. Data for a "just-reaped" partition breaks the whole pipeline instead of creating a new writer.


func (r *RollingDataWriter) close() {
r.cancel()
close(r.recordCh)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoids the send-on-closed panic, but the drain loop's default case means any record that enters the buffer after the drain started is silently dropped and never Released.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the recordCh get closed now? We don't want to leak the resource and leave the channel open.

Comment on lines +417 to +421
if !idleTimer.Stop() {
select {
case <-idleTimerC:
default:
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment here about how Stop returns false if the timer already expired etc. and how this is consuming the message on the channel. Hence why we don't need to drain records on the idle timeout


func (r *RollingDataWriter) close() {
r.cancel()
close(r.recordCh)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the recordCh get closed now? We don't want to leak the resource and leave the channel open.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants