From 090893dbe4f3dcf028720b99611659e6ae9e9668 Mon Sep 17 00:00:00 2001 From: Adrij Shikhar Date: Tue, 2 Jun 2026 23:54:43 +0530 Subject: [PATCH] Add skip-empty-xacts option Logical decoding emits a BEGIN/COMMIT pair even for transactions that contain no decodable changes (DDL, VACUUM FULL, REFRESH MATERIALIZED VIEW) or whose changes were all filtered out (filter-tables, add-tables, filter-msg-prefixes). Consumers are flooded with empty changesets; see issue #106 for reports of thousands of them. Add a skip-empty-xacts option (default false, preserving the current behavior). When enabled, the BEGIN output is deferred until the first change that survives filtering is emitted and the COMMIT output is suppressed if no change was emitted. Transactional messages count as changes; non-transactional messages are not affected. On PostgreSQL 15 and later, skipped transactions are reported to the progress machinery (skipped_xact) so the walsender keeps sending keepalives during long runs of skipped transactions. This is the same approach used by test_decoding. --- Makefile | 4 +- README.md | 1 + expected/skip_empty_xacts.out | 149 ++++++++++++++++++++++++++++++++++ sql/skip_empty_xacts.sql | 55 +++++++++++++ wal2json.c | 97 ++++++++++++++++++---- 5 files changed, 290 insertions(+), 16 deletions(-) create mode 100644 expected/skip_empty_xacts.out create mode 100644 sql/skip_empty_xacts.sql diff --git a/Makefile b/Makefile index c0effef..f42020f 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \ delete3 delete4 savepoint specialvalue toast bytea message typmod \ filtertable selecttable include_timestamp include_lsn include_xids \ include_domain_data_type truncate type_oid actions position default \ - pk rename_column numeric_data_types_as_string + pk rename_column numeric_data_types_as_string skip_empty_xacts PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) @@ -18,6 +18,8 @@ endif # truncate API is available in 11+ ifneq (,$(findstring $(MAJORVERSION),9.4 9.5 9.6 10)) REGRESS := $(filter-out truncate, $(REGRESS)) +# skip_empty_xacts uses a primary key on a partitioned table (11+) +REGRESS := $(filter-out skip_empty_xacts, $(REGRESS)) endif # actions API is available in 11+ diff --git a/README.md b/README.md index 1517e8f..e86d504 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,7 @@ Parameters * `numeric-data-types-as-string`: use string for numeric data types. JSON specification does not recognize `Infinity` and `NaN` as valid numeric values. There might be [potential interoperability problems](https://datatracker.ietf.org/doc/html/rfc7159#section-6) for double precision numbers. Default is _false_. * `pretty-print`: add spaces and indentation to JSON structures. Default is _false_. * `write-in-chunks`: write after every change instead of every changeset. Only used when `format-version` is `1`. Default is _false_. +* `skip-empty-xacts`: don't include empty transactions. Default is false. * `include-lsn`: add _nextlsn_ to each changeset. Default is _false_. * `include-transaction`: emit records denoting the start and end of each transaction. Default is _true_. * `include-unchanged-toast` (deprecated): Don't use it. It is deprecated. diff --git a/expected/skip_empty_xacts.out b/expected/skip_empty_xacts.out new file mode 100644 index 0000000..7fd1977 --- /dev/null +++ b/expected/skip_empty_xacts.out @@ -0,0 +1,149 @@ +-- Test skip-empty-xacts option (issue #106) +\set VERBOSITY terse +CREATE TABLE w2j_kept (a integer primary key); +CREATE TABLE w2j_filtered (b integer primary key); +CREATE TABLE w2j_part (a integer, t text, PRIMARY KEY (a, t)) PARTITION BY LIST (t); +CREATE TABLE w2j_part_one PARTITION OF w2j_part FOR VALUES IN ('one'); +CREATE MATERIALIZED VIEW w2j_mv AS SELECT count(*) AS n FROM w2j_kept; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + ?column? +---------- + init +(1 row) + +-- workload: only the first INSERT survives add-tables filtering +INSERT INTO w2j_kept (a) VALUES (1); +INSERT INTO w2j_filtered (b) VALUES (1); +INSERT INTO w2j_part (a, t) VALUES (1, 'one'); +CREATE TABLE w2j_ddl (c integer); +DROP TABLE w2j_ddl; +TRUNCATE w2j_filtered; +-- format v1: without skip-empty-xacts, filtered/DDL transactions produce empty changesets +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept'); + data +-------------------------------------------------------------------------------------------------------------------------------------- + {"change":[{"kind":"insert","schema":"public","table":"w2j_kept","columnnames":["a"],"columntypes":["integer"],"columnvalues":[1]}]} + {"change":[]} + {"change":[]} + {"change":[]} + {"change":[]} + {"change":[]} +(6 rows) + +-- format v1: with skip-empty-xacts, empty transactions are gone +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1'); + data +-------------------------------------------------------------------------------------------------------------------------------------- + {"change":[{"kind":"insert","schema":"public","table":"w2j_kept","columnnames":["a"],"columntypes":["integer"],"columnvalues":[1]}]} +(1 row) + +-- format v1: skip-empty-xacts with write-in-chunks +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1', 'write-in-chunks', '1'); + data +------------------------------------------------------------------------------------------------------------------------- + {"change":[ + {"kind":"insert","schema":"public","table":"w2j_kept","columnnames":["a"],"columntypes":["integer"],"columnvalues":[1]} + ]} +(3 rows) + +-- format v2: without skip-empty-xacts +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'add-tables', 'public.w2j_kept'); + data +--------------------------------------------------------------------------------------------------------- + {"action":"B"} + {"action":"I","schema":"public","table":"w2j_kept","columns":[{"name":"a","type":"integer","value":1}]} + {"action":"C"} + {"action":"B"} + {"action":"C"} + {"action":"B"} + {"action":"C"} + {"action":"B"} + {"action":"C"} + {"action":"B"} + {"action":"C"} + {"action":"B"} + {"action":"C"} +(13 rows) + +-- format v2: with skip-empty-xacts +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '2', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1'); + data +--------------------------------------------------------------------------------------------------------- + {"action":"B"} + {"action":"I","schema":"public","table":"w2j_kept","columns":[{"name":"a","type":"integer","value":1}]} + {"action":"C"} +(3 rows) + +-- messages: transactional message marks the transaction as non-empty; +-- non-transactional messages are unaffected; a transaction whose only +-- message is prefix-filtered is empty +SELECT 1 FROM pg_logical_emit_message(true, 'wal2json', 'kept message'); + ?column? +---------- + 1 +(1 row) + +SELECT 1 FROM pg_logical_emit_message(false, 'wal2json', 'non-transactional message'); + ?column? +---------- + 1 +(1 row) + +SELECT 1 FROM pg_logical_emit_message(true, 'filtered', 'filtered message'); + ?column? +---------- + 1 +(1 row) + +-- format v1: filtered transactional message leaves an empty transaction without the option +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'filter-msg-prefixes', 'filtered'); + data +----------------------------------------------------------------------------------------------------------------- + {"change":[{"kind":"message","transactional":true,"prefix":"wal2json","content":"kept message"}]} + {"change":[{"kind":"message","transactional":false,"prefix":"wal2json","content":"non-transactional message"}]} + {"change":[]} +(3 rows) + +-- format v1: with skip-empty-xacts the filtered-message transaction disappears +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'filter-msg-prefixes', 'filtered', 'skip-empty-xacts', '1'); + data +----------------------------------------------------------------------------------------------------------------- + {"change":[{"kind":"message","transactional":true,"prefix":"wal2json","content":"kept message"}]} + {"change":[{"kind":"message","transactional":false,"prefix":"wal2json","content":"non-transactional message"}]} +(2 rows) + +-- format v2: with skip-empty-xacts +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '2', 'filter-msg-prefixes', 'filtered', 'skip-empty-xacts', '1'); + data +------------------------------------------------------------------------------------------------ + {"action":"B"} + {"action":"M","transactional":true,"prefix":"wal2json","content":"kept message"} + {"action":"C"} + {"action":"M","transactional":false,"prefix":"wal2json","content":"non-transactional message"} +(4 rows) + +-- VACUUM FULL and REFRESH MATERIALIZED VIEW flood the slot with empty +-- transactions (issue #106); transaction counts vary across versions so +-- assert on counts, not raw output +VACUUM FULL w2j_kept; +REFRESH MATERIALIZED VIEW w2j_mv; +SELECT count(*) > 0 AS has_empty_xacts FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept') WHERE data = '{"change":[]}'; + has_empty_xacts +----------------- + t +(1 row) + +SELECT count(*) AS remaining FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1'); + remaining +----------- + 0 +(1 row) + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + stop +(1 row) + +DROP MATERIALIZED VIEW w2j_mv; +DROP TABLE w2j_kept, w2j_filtered, w2j_part; diff --git a/sql/skip_empty_xacts.sql b/sql/skip_empty_xacts.sql new file mode 100644 index 0000000..6f4c26e --- /dev/null +++ b/sql/skip_empty_xacts.sql @@ -0,0 +1,55 @@ +-- Test skip-empty-xacts option (issue #106) +\set VERBOSITY terse + +CREATE TABLE w2j_kept (a integer primary key); +CREATE TABLE w2j_filtered (b integer primary key); +CREATE TABLE w2j_part (a integer, t text, PRIMARY KEY (a, t)) PARTITION BY LIST (t); +CREATE TABLE w2j_part_one PARTITION OF w2j_part FOR VALUES IN ('one'); +CREATE MATERIALIZED VIEW w2j_mv AS SELECT count(*) AS n FROM w2j_kept; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + +-- workload: only the first INSERT survives add-tables filtering +INSERT INTO w2j_kept (a) VALUES (1); +INSERT INTO w2j_filtered (b) VALUES (1); +INSERT INTO w2j_part (a, t) VALUES (1, 'one'); +CREATE TABLE w2j_ddl (c integer); +DROP TABLE w2j_ddl; +TRUNCATE w2j_filtered; + +-- format v1: without skip-empty-xacts, filtered/DDL transactions produce empty changesets +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept'); +-- format v1: with skip-empty-xacts, empty transactions are gone +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1'); +-- format v1: skip-empty-xacts with write-in-chunks +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1', 'write-in-chunks', '1'); +-- format v2: without skip-empty-xacts +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'add-tables', 'public.w2j_kept'); +-- format v2: with skip-empty-xacts +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '2', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1'); + +-- messages: transactional message marks the transaction as non-empty; +-- non-transactional messages are unaffected; a transaction whose only +-- message is prefix-filtered is empty +SELECT 1 FROM pg_logical_emit_message(true, 'wal2json', 'kept message'); +SELECT 1 FROM pg_logical_emit_message(false, 'wal2json', 'non-transactional message'); +SELECT 1 FROM pg_logical_emit_message(true, 'filtered', 'filtered message'); + +-- format v1: filtered transactional message leaves an empty transaction without the option +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'filter-msg-prefixes', 'filtered'); +-- format v1: with skip-empty-xacts the filtered-message transaction disappears +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'filter-msg-prefixes', 'filtered', 'skip-empty-xacts', '1'); +-- format v2: with skip-empty-xacts +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '2', 'filter-msg-prefixes', 'filtered', 'skip-empty-xacts', '1'); + +-- VACUUM FULL and REFRESH MATERIALIZED VIEW flood the slot with empty +-- transactions (issue #106); transaction counts vary across versions so +-- assert on counts, not raw output +VACUUM FULL w2j_kept; +REFRESH MATERIALIZED VIEW w2j_mv; +SELECT count(*) > 0 AS has_empty_xacts FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept') WHERE data = '{"change":[]}'; +SELECT count(*) AS remaining FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1'); + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); +DROP MATERIALIZED VIEW w2j_mv; +DROP TABLE w2j_kept, w2j_filtered, w2j_part; diff --git a/wal2json.c b/wal2json.c index 50e6503..77837e1 100644 --- a/wal2json.c +++ b/wal2json.c @@ -80,6 +80,8 @@ typedef struct bool pretty_print; /* pretty-print JSON? */ bool write_in_chunks; /* write in chunks? (v1) */ bool numeric_data_types_as_string; /* use strings for numeric data types */ + bool skip_empty_xacts; /* skip empty transactions */ + bool xact_wrote_changes; /* has current transaction written any change? */ JsonAction actions; /* output only these actions */ @@ -126,6 +128,8 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions * static void pg_decode_shutdown(LogicalDecodingContext *ctx); static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); +static void pg_output_begin(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, bool last_write); static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void pg_decode_change(LogicalDecodingContext *ctx, @@ -163,7 +167,7 @@ static bool pg_add_by_table(List *add_tables, char *schemaname, char *tablename) /* version 1 */ static void pg_decode_begin_txn_v1(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn); + ReorderBufferTXN *txn, bool last_write); static void pg_decode_commit_txn_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void pg_decode_change_v1(LogicalDecodingContext *ctx, @@ -183,7 +187,7 @@ static void pg_decode_truncate_v1(LogicalDecodingContext *ctx, /* version 2 */ static void pg_decode_begin_txn_v2(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn); + ReorderBufferTXN *txn, bool last_write); static void pg_decode_commit_txn_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void pg_decode_write_value(LogicalDecodingContext *ctx, Datum value, bool isnull, Oid typid); @@ -285,6 +289,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is data->numeric_data_types_as_string = false; data->pretty_print = false; data->write_in_chunks = false; + data->skip_empty_xacts = false; + data->xact_wrote_changes = false; data->include_lsn = false; data->include_not_null = false; data->include_default = false; @@ -548,6 +554,19 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "skip-empty-xacts") == 0) + { + if (elem->arg == NULL) + { + elog(DEBUG1, "skip-empty-xacts argument is null"); + data->skip_empty_xacts = true; + } + else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else if (strcmp(elem->defname, "include-lsn") == 0) { if (elem->arg == NULL) @@ -831,23 +850,40 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { JsonDecodingData *data = ctx->output_plugin_private; + data->xact_wrote_changes = false; + + /* + * If we are skipping empty transactions, defer the BEGIN output until + * the first change for this transaction is emitted. + */ + if (data->skip_empty_xacts) + return; + + pg_output_begin(ctx, txn, true); +} + +static void +pg_output_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, bool last_write) +{ + JsonDecodingData *data = ctx->output_plugin_private; + if (data->format_version == 2) - pg_decode_begin_txn_v2(ctx, txn); + pg_decode_begin_txn_v2(ctx, txn, last_write); else if (data->format_version == 1) - pg_decode_begin_txn_v1(ctx, txn); + pg_decode_begin_txn_v1(ctx, txn, last_write); else elog(ERROR, "format version %d is not supported", data->format_version); } static void -pg_decode_begin_txn_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +pg_decode_begin_txn_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, bool last_write) { JsonDecodingData *data = ctx->output_plugin_private; data->nr_changes = 0; /* Transaction starts */ - OutputPluginPrepareWrite(ctx, true); + OutputPluginPrepareWrite(ctx, last_write); appendStringInfo(ctx->out, "{%s", data->nl); @@ -882,11 +918,11 @@ pg_decode_begin_txn_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) appendStringInfo(ctx->out, "%s\"change\":%s[", data->ht, data->sp); if (data->write_in_chunks) - OutputPluginWrite(ctx, true); + OutputPluginWrite(ctx, last_write); } static void -pg_decode_begin_txn_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +pg_decode_begin_txn_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, bool last_write) { JsonDecodingData *data = ctx->output_plugin_private; @@ -894,7 +930,7 @@ pg_decode_begin_txn_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) if (!data->include_transaction) return; - OutputPluginPrepareWrite(ctx, true); + OutputPluginPrepareWrite(ctx, last_write); appendStringInfoString(ctx->out, "{\"action\":\"B\""); if (data->include_xids) appendStringInfo(ctx->out, ",\"xid\":%u", txn->xid); @@ -927,7 +963,7 @@ pg_decode_begin_txn_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) } appendStringInfoChar(ctx->out, '}'); - OutputPluginWrite(ctx, true); + OutputPluginWrite(ctx, last_write); } /* COMMIT callback */ @@ -936,6 +972,7 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { JsonDecodingData *data = ctx->output_plugin_private; + bool skipped_xact = data->skip_empty_xacts && !data->xact_wrote_changes; /* * Some older minor versions from back branches (10 to 14) calls @@ -948,9 +985,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * logical decoding. */ #if PG_VERSION_NUM >= 160000 - OutputPluginUpdateProgress(ctx, false); /* XXX change 2nd param when skipped empty transaction is supported */ + OutputPluginUpdateProgress(ctx, skipped_xact); #elif PG_VERSION_NUM >= 150000 && PG_VERSION_NUM < 160000 - update_replication_progress(ctx, false); /* XXX change 2nd param when skipped empty transaction is supported */ + update_replication_progress(ctx, skipped_xact); #elif PG_VERSION_NUM >= 140004 && PG_VERSION_NUM < 150000 update_replication_progress(ctx); #elif PG_VERSION_NUM >= 130008 && PG_VERSION_NUM < 140000 @@ -973,6 +1010,10 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginUpdateProgress(ctx); #endif + /* don't output an empty COMMIT for a skipped empty transaction */ + if (skipped_xact) + return; + elog(DEBUG2, "my change counter: " UINT64_FORMAT " ; # of changes: " UINT64_FORMAT " ; # of changes in memory: " UINT64_FORMAT, data->nr_changes, txn->nentries, txn->nentries_mem); elog(DEBUG2, "# of subxacts: %d", txn->nsubtxns); @@ -1753,9 +1794,6 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, schemaname = get_namespace_name(class_form->relnamespace); tablename = NameStr(class_form->relname); - if (data->write_in_chunks) - OutputPluginPrepareWrite(ctx, true); - /* Make sure rd_replidindex is set */ RelationGetIndexList(relation); @@ -1836,6 +1874,14 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + /* output BEGIN if we haven't yet */ + if (data->skip_empty_xacts && !data->xact_wrote_changes) + pg_output_begin(ctx, txn, false); + data->xact_wrote_changes = true; + + if (data->write_in_chunks) + OutputPluginPrepareWrite(ctx, true); + /* Change counter */ data->nr_changes++; @@ -2583,6 +2629,11 @@ pg_decode_change_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, return; } + /* output BEGIN if we haven't yet */ + if (data->skip_empty_xacts && !data->xact_wrote_changes) + pg_output_begin(ctx, txn, false); + data->xact_wrote_changes = true; + pg_decode_write_change(ctx, txn, relation, change); MemoryContextSwitchTo(old); @@ -2650,6 +2701,17 @@ pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } } + /* + * output BEGIN if we haven't yet. Don't do it for non-transactional + * messages because they are not part of a transaction. + */ + if (transactional) + { + if (data->skip_empty_xacts && !data->xact_wrote_changes) + pg_output_begin(ctx, txn, false); + data->xact_wrote_changes = true; + } + if (data->format_version == 2) pg_decode_message_v2(ctx, txn, lsn, transactional, prefix, content_size, content); else if (data->format_version == 1) @@ -2993,6 +3055,11 @@ static void pg_decode_truncate_v2(LogicalDecodingContext *ctx, continue; } + /* output BEGIN if we haven't yet */ + if (data->skip_empty_xacts && !data->xact_wrote_changes) + pg_output_begin(ctx, txn, false); + data->xact_wrote_changes = true; + OutputPluginPrepareWrite(ctx, true); appendStringInfoChar(ctx->out, '{'); appendStringInfoString(ctx->out, "\"action\":\"T\"");