feat: Single-pass WAL streaming for LOG_BASED replication#772
feat: Single-pass WAL streaming for LOG_BASED replication#772bdewilde wants to merge 33 commits into
Conversation
|
Thanks @bdewilde! There are some typing errors. I might take a longer look later in the week. |
Hi Edgar, thanks in advance for giving it a longer look! My bad for the typing errors -- if you can point me at them (without having to do a whole review ;), I can try to fix them sometime this week. And apologies for the test failures. I wasn't ever able to get the full unit test suite running green in my local dev, but all of the new tests I added did pass for me. |
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
|
Hi again @edgarrmondragon 🙂 Just swinging by to check on this. Is there anything I can do to nudge this forward? I'm happy to make changes / go back to the drawing board, I just need some direction from you, since I'm not deeply familiar with the SDK. |
edgarrmondragon
left a comment
There was a problem hiding this comment.
Thanks @bdewilde and sorry for the delay!
I started reviewing and managed to take a look at 2 of the files. I'll continue tomorrow, but in the meantime, some questions, nits and suggestions.
| } | ||
|
|
||
|
|
||
| # TODO: should this be a shared fixture/function in conftest? |
There was a problem hiding this comment.
Yeah, we could have an instance of this as a fixture if the dummy config is also the same.
…/tap-postgres into single-pass-wal-streaming
|
@edgarrmondragon Revisiting this with fresh eyes today, a couple notable questions came to mind:
|
feat: single-pass WAL streaming for LOG_BASED replication (MeltanoLabs#772)
|
I understand your main motivation with this PR is speed, however in some scenarios this should also improve accuracy, i've stumbled upon a scenario in which the current implementation will advance the WAL because of table A, and then the table B already lost the reference because the replication slot advanced past the updates we needed for table B. I'll test the PR out |
|
Hi @sicarul , just following up! Did this set of changes work for you, from both a perf and accuracy perspective? :) @edgarrmondragon , to address my second question above, I just pushed changes to prevent (afaict) a silent data-loss risk in how the single-pass WAL reader advances replication state on exit. Previously, both idle and timeout exit paths advanced every stream's bookmark and the slot to the current WAL tip. That's correct and desirable behavior on an idle exit -- the backlog has been drained! -- but on a |
|
Yes it's working fine for us so far |
problem
PostgresLogBasedStream.get_records()opens its ownLogicalReplicationConnectionper selected stream. With N LOG_BASED streams the tap runs N sequential WAL scans -- each rereads the same segments, with add-tables discarding most records server-side. End-to-end sync time scales ~linearly in N. For pipelines with multiple LOG_BASED streams against a large backlog, this dominates run-time.changes
A new
SingleConnectionWALReaderopens one logical replication connection withadd-tablescovering all selected LOG_BASED tables, scans the WAL once, and dispatches each parsed wal2json message inline to the owning stream's newemit_record()method for immediate Singer RECORD emission. STATE flushes every 30s, and the slot is advanced to the WAL tip on idle/max-run exit._wal_helpers.py(FQN/escaping/parsing helpers) andwal_reader.py(the reader and read loop).client.pygainsemit_recordmethod and a config-flag branch inget_recordstap.pyadds thelog_based_single_connectionconfig setting (default False!) and_sync_log_based_streams_sharedorchestrationtests/test_wal_helpers.py,tests/test_wal_reader.py,tests/test_consume.pyFull disclosure, I had Claude Code implement those three test modules, and then I iterated a bit. If it's still excessive / not testing usefully -- something Claude is known to do, sigh -- just let me know, and I will take a hatchet to it.
This is a very belated follow-up to PR #667 and Issue #587.
constraints
Tap.sync_allis@typing.final, so dispatch can't be restructured at the SDK boundary -- this was a bummer. My next best option was trigger at the first LOG_BASED stream'sget_records()call, gated by a_shared_wal_run_completedflag on the tap so siblings become no-ops._sync_log_based_streams_sharedpre-writes every stream's schema before the reader runs. Since the SDK'sStream.sync()later calls_write_schema_message()again, the override onPostgresLogBasedStreamis idempotent. (Without that flag every SCHEMA would be emitted twice, which is not great.)min(start_lsn)across all streams, so each stream's own bookmark is captured at construction and used to drop messages that it's already past._write_record_message()and_increment_stream_state()-- for this to work. I consolidated them in one place --emit_record()-- so SDK renames hit one method. Not sure how stable the API is here...questions
_write_schema_messageidempotency: I made the smallest fix I could for the duplicate-SCHEMA bug given the@finalconstraint onsync_all. Is there another / cleaner approach?emit_record()uses internal SDK calls. Is this going to be an issue? Is there a safer / "public" equivalent?get_records()as trigger: The "first stream's get_records fires the shared reader" pattern is a not-great workaround forsync_allbeing final. Is it okay as documented, or is there a cleaner SDK hook?replication_max_run_seconds/replication_idle_exit_secondsnow bound the whole LOG_BASED batch instead of each stream. To me that feels like an improvement, but I don't know the whole system / downstream use caess. For example, does anything downstream assume per-stream bounds?