Skip to content

feat: Single-pass WAL streaming for LOG_BASED replication#772

Open
bdewilde wants to merge 33 commits into
MeltanoLabs:mainfrom
bdewilde:single-pass-wal-streaming
Open

feat: Single-pass WAL streaming for LOG_BASED replication#772
bdewilde wants to merge 33 commits into
MeltanoLabs:mainfrom
bdewilde:single-pass-wal-streaming

Conversation

@bdewilde

@bdewilde bdewilde commented Apr 27, 2026

Copy link
Copy Markdown

problem

PostgresLogBasedStream.get_records() opens its own LogicalReplicationConnection per selected stream. With N LOG_BASED streams the tap runs N sequential WAL scans -- each rereads the same segments, with add-tables discarding most records server-side. End-to-end sync time scales ~linearly in N. For pipelines with multiple LOG_BASED streams against a large backlog, this dominates run-time.

changes

A new SingleConnectionWALReader opens one logical replication connection with add-tables covering all selected LOG_BASED tables, scans the WAL once, and dispatches each parsed wal2json message inline to the owning stream's new emit_record() method for immediate Singer RECORD emission. STATE flushes every 30s, and the slot is advanced to the WAL tip on idle/max-run exit.

  • new modules: _wal_helpers.py (FQN/escaping/parsing helpers) and wal_reader.py (the reader and read loop).
  • client.py gains emit_record method and a config-flag branch in get_records
  • tap.py adds the log_based_single_connection config setting (default False!) and _sync_log_based_streams_shared orchestration
  • new tests: tests/test_wal_helpers.py, tests/test_wal_reader.py, tests/test_consume.py

Full disclosure, I had Claude Code implement those three test modules, and then I iterated a bit. If it's still excessive / not testing usefully -- something Claude is known to do, sigh -- just let me know, and I will take a hatchet to it.

This is a very belated follow-up to PR #667 and Issue #587.

constraints

  • Tap.sync_all is @typing.final, so dispatch can't be restructured at the SDK boundary -- this was a bummer. My next best option was trigger at the first LOG_BASED stream's get_records() call, gated by a _shared_wal_run_completed flag on the tap so siblings become no-ops.
  • SCHEMA-before-RECORD across streams: _sync_log_based_streams_shared pre-writes every stream's schema before the reader runs. Since the SDK's Stream.sync() later calls _write_schema_message() again, the override on PostgresLogBasedStream is idempotent. (Without that flag every SCHEMA would be emitted twice, which is not great.)
  • Per-stream LSN filter: Replication opens at min(start_lsn) across all streams, so each stream's own bookmark is captured at construction and used to drop messages that it's already past.
  • I had to dip into private SDK calls -- _write_record_message() and _increment_stream_state() -- for this to work. I consolidated them in one place -- emit_record() -- so SDK renames hit one method. Not sure how stable the API is here...

questions

  1. _write_schema_message idempotency: I made the smallest fix I could for the duplicate-SCHEMA bug given the @final constraint on sync_all. Is there another / cleaner approach?
  2. emit_record() uses internal SDK calls. Is this going to be an issue? Is there a safer / "public" equivalent?
  3. get_records() as trigger: The "first stream's get_records fires the shared reader" pattern is a not-great workaround for sync_all being final. Is it okay as documented, or is there a cleaner SDK hook?
  4. replication_max_run_seconds / replication_idle_exit_seconds now bound the whole LOG_BASED batch instead of each stream. To me that feels like an improvement, but I don't know the whole system / downstream use caess. For example, does anything downstream assume per-stream bounds?

@edgarrmondragon

Copy link
Copy Markdown
Member

Thanks @bdewilde!

There are some typing errors. I might take a longer look later in the week.

@edgarrmondragon edgarrmondragon changed the title Single-pass WAL streaming for LOG_BASED replication? feat: Single-pass WAL streaming for LOG_BASED replication May 4, 2026
@edgarrmondragon edgarrmondragon self-assigned this May 4, 2026
@edgarrmondragon edgarrmondragon added the enhancement New feature or request label May 4, 2026
@bdewilde

bdewilde commented May 4, 2026

Copy link
Copy Markdown
Author

Thanks @bdewilde!

There are some typing errors. I might take a longer look later in the week.

Hi Edgar, thanks in advance for giving it a longer look! My bad for the typing errors -- if you can point me at them (without having to do a whole review ;), I can try to fix them sometime this week. And apologies for the test failures. I wasn't ever able to get the full unit test suite running green in my local dev, but all of the new tests I added did pass for me.

@bdewilde

Copy link
Copy Markdown
Author

Hi again @edgarrmondragon 🙂 Just swinging by to check on this. Is there anything I can do to nudge this forward? I'm happy to make changes / go back to the drawing board, I just need some direction from you, since I'm not deeply familiar with the SDK.

@edgarrmondragon edgarrmondragon left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bdewilde and sorry for the delay!

I started reviewing and managed to take a look at 2 of the files. I'll continue tomorrow, but in the meantime, some questions, nits and suggestions.

Comment thread tap_postgres/_wal_helpers.py
Comment thread tap_postgres/_wal_helpers.py
Comment thread tap_postgres/wal_reader.py
Comment thread tap_postgres/wal_reader.py Outdated
Comment thread tap_postgres/wal_reader.py Outdated
Comment thread tap_postgres/client.py Outdated
Comment thread tap_postgres/wal_reader.py Outdated
Comment thread tap_postgres/client.py
Comment thread tap_postgres/client.py Outdated
Comment thread tap_postgres/wal_reader.py
Comment thread tests/test_consume.py
}


# TODO: should this be a shared fixture/function in conftest?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we could have an instance of this as a fixture if the dummy config is also the same.

Comment thread tests/test_consume.py Outdated
@bdewilde bdewilde requested a review from edgarrmondragon June 1, 2026 18:43

@edgarrmondragon edgarrmondragon left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bdewilde, LGTM!

@bdewilde

bdewilde commented Jun 1, 2026

Copy link
Copy Markdown
Author

@edgarrmondragon Revisiting this with fresh eyes today, a couple notable questions came to mind:

  • What happens when a user adds a new stream to an existing set with log-based replication? In SingleConnectionWALReader.run(), we compute the "global" start LSN across all selected streams -- does a new stream cause the global min to be the very first WAL entry, which would force an iteration over the full WAL? Per-stream bookmarks ensure the replicated data is correct, but a full iteration could be quite slow. Not great! In meltano, iirc a new stream with log-based replication does effectively a full-table replication for its first run only, but I couldn't find any logic to confirm that or dig into it more deeply. Probably you know this off the top of your head... 🙏
  • When exiting out of the run loop, we call SingleConnectionWALReader._advance_slot_and_state_all(), which sets every stream's bookmark to the current WAL tip. I think this is a problem if we exit due to max_run_seconds before the WAL's backlog has been fully read, which would discard the not-yet-emitted messages and then they'd be silently skipped over on the next run. Right? iirc I was imitating a pattern from the original per-stream logic, so maybe this issue already exists, it's just that the blast radius is smaller (per-stream rather than all-streams). Am I understanding this correctly? If so, could we maybe just pass max_lsn_seen out of _run_loop(), and then send that into _advance_slot_and_state_all() as an arg?

sicarul added a commit to pulumi/tap-postgres that referenced this pull request Jun 5, 2026
feat: single-pass WAL streaming for LOG_BASED replication (MeltanoLabs#772)
@sicarul

sicarul commented Jun 5, 2026

Copy link
Copy Markdown

I understand your main motivation with this PR is speed, however in some scenarios this should also improve accuracy, i've stumbled upon a scenario in which the current implementation will advance the WAL because of table A, and then the table B already lost the reference because the replication slot advanced past the updates we needed for table B. I'll test the PR out

@bdewilde

Copy link
Copy Markdown
Author

Hi @sicarul , just following up! Did this set of changes work for you, from both a perf and accuracy perspective? :)

@edgarrmondragon , to address my second question above, I just pushed changes to prevent (afaict) a silent data-loss risk in how the single-pass WAL reader advances replication state on exit. Previously, both idle and timeout exit paths advanced every stream's bookmark and the slot to the current WAL tip. That's correct and desirable behavior on an idle exit -- the backlog has been drained! -- but on a max_run_seconds timeout with messages still unread, it skips past records between the max LSN actually dispatched and the tip — silently losing those records. Does that make sense, or have I seriously misunderstood this behavior? (Sorry, I'm not an expert on this...) The fix has _run_loop() return (max_lsn_seen, caught_up), and run() picks the safe target to advance to: the tip when "caught up", otherwise max_lsn_seen. This preserves WAL-retention safety while never bookmarking past unread changes. Let me know if you have thoughts on this!

@sicarul

sicarul commented Jun 12, 2026

Copy link
Copy Markdown

Yes it's working fine for us so far

@edgarrmondragon

Copy link
Copy Markdown
Member

Thanks @bdewilde and @sicarul!

I'll take another look at this next week and try to answer your question about losing records when the run timeout is reached (I'm no expert in this niche either 😅)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants