diff --git a/Makefile b/Makefile index c0effef..f42020f 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \ delete3 delete4 savepoint specialvalue toast bytea message typmod \ filtertable selecttable include_timestamp include_lsn include_xids \ include_domain_data_type truncate type_oid actions position default \ - pk rename_column numeric_data_types_as_string + pk rename_column numeric_data_types_as_string skip_empty_xacts PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) @@ -18,6 +18,8 @@ endif # truncate API is available in 11+ ifneq (,$(findstring $(MAJORVERSION),9.4 9.5 9.6 10)) REGRESS := $(filter-out truncate, $(REGRESS)) +# skip_empty_xacts uses a primary key on a partitioned table (11+) +REGRESS := $(filter-out skip_empty_xacts, $(REGRESS)) endif # actions API is available in 11+ diff --git a/README.md b/README.md index 1517e8f..e86d504 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,7 @@ Parameters * `numeric-data-types-as-string`: use string for numeric data types. JSON specification does not recognize `Infinity` and `NaN` as valid numeric values. There might be [potential interoperability problems](https://datatracker.ietf.org/doc/html/rfc7159#section-6) for double precision numbers. Default is _false_. * `pretty-print`: add spaces and indentation to JSON structures. Default is _false_. * `write-in-chunks`: write after every change instead of every changeset. Only used when `format-version` is `1`. Default is _false_. +* `skip-empty-xacts`: don't include empty transactions. Default is false. * `include-lsn`: add _nextlsn_ to each changeset. Default is _false_. * `include-transaction`: emit records denoting the start and end of each transaction. Default is _true_. * `include-unchanged-toast` (deprecated): Don't use it. It is deprecated. diff --git a/expected/skip_empty_xacts.out b/expected/skip_empty_xacts.out new file mode 100644 index 0000000..7fd1977 --- /dev/null +++ b/expected/skip_empty_xacts.out @@ -0,0 +1,149 @@ +-- Test skip-empty-xacts option (issue #106) +\set VERBOSITY terse +CREATE TABLE w2j_kept (a integer primary key); +CREATE TABLE w2j_filtered (b integer primary key); +CREATE TABLE w2j_part (a integer, t text, PRIMARY KEY (a, t)) PARTITION BY LIST (t); +CREATE TABLE w2j_part_one PARTITION OF w2j_part FOR VALUES IN ('one'); +CREATE MATERIALIZED VIEW w2j_mv AS SELECT count(*) AS n FROM w2j_kept; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + ?column? +---------- + init +(1 row) + +-- workload: only the first INSERT survives add-tables filtering +INSERT INTO w2j_kept (a) VALUES (1); +INSERT INTO w2j_filtered (b) VALUES (1); +INSERT INTO w2j_part (a, t) VALUES (1, 'one'); +CREATE TABLE w2j_ddl (c integer); +DROP TABLE w2j_ddl; +TRUNCATE w2j_filtered; +-- format v1: without skip-empty-xacts, filtered/DDL transactions produce empty changesets +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept'); + data +-------------------------------------------------------------------------------------------------------------------------------------- + {"change":[{"kind":"insert","schema":"public","table":"w2j_kept","columnnames":["a"],"columntypes":["integer"],"columnvalues":[1]}]} + {"change":[]} + {"change":[]} + {"change":[]} + {"change":[]} + {"change":[]} +(6 rows) + +-- format v1: with skip-empty-xacts, empty transactions are gone +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1'); + data +-------------------------------------------------------------------------------------------------------------------------------------- + {"change":[{"kind":"insert","schema":"public","table":"w2j_kept","columnnames":["a"],"columntypes":["integer"],"columnvalues":[1]}]} +(1 row) + +-- format v1: skip-empty-xacts with write-in-chunks +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1', 'write-in-chunks', '1'); + data +------------------------------------------------------------------------------------------------------------------------- + {"change":[ + {"kind":"insert","schema":"public","table":"w2j_kept","columnnames":["a"],"columntypes":["integer"],"columnvalues":[1]} + ]} +(3 rows) + +-- format v2: without skip-empty-xacts +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'add-tables', 'public.w2j_kept'); + data +--------------------------------------------------------------------------------------------------------- + {"action":"B"} + {"action":"I","schema":"public","table":"w2j_kept","columns":[{"name":"a","type":"integer","value":1}]} + {"action":"C"} + {"action":"B"} + {"action":"C"} + {"action":"B"} + {"action":"C"} + {"action":"B"} + {"action":"C"} + {"action":"B"} + {"action":"C"} + {"action":"B"} + {"action":"C"} +(13 rows) + +-- format v2: with skip-empty-xacts +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '2', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1'); + data +--------------------------------------------------------------------------------------------------------- + {"action":"B"} + {"action":"I","schema":"public","table":"w2j_kept","columns":[{"name":"a","type":"integer","value":1}]} + {"action":"C"} +(3 rows) + +-- messages: transactional message marks the transaction as non-empty; +-- non-transactional messages are unaffected; a transaction whose only +-- message is prefix-filtered is empty +SELECT 1 FROM pg_logical_emit_message(true, 'wal2json', 'kept message'); + ?column? +---------- + 1 +(1 row) + +SELECT 1 FROM pg_logical_emit_message(false, 'wal2json', 'non-transactional message'); + ?column? +---------- + 1 +(1 row) + +SELECT 1 FROM pg_logical_emit_message(true, 'filtered', 'filtered message'); + ?column? +---------- + 1 +(1 row) + +-- format v1: filtered transactional message leaves an empty transaction without the option +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'filter-msg-prefixes', 'filtered'); + data +----------------------------------------------------------------------------------------------------------------- + {"change":[{"kind":"message","transactional":true,"prefix":"wal2json","content":"kept message"}]} + {"change":[{"kind":"message","transactional":false,"prefix":"wal2json","content":"non-transactional message"}]} + {"change":[]} +(3 rows) + +-- format v1: with skip-empty-xacts the filtered-message transaction disappears +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'filter-msg-prefixes', 'filtered', 'skip-empty-xacts', '1'); + data +----------------------------------------------------------------------------------------------------------------- + {"change":[{"kind":"message","transactional":true,"prefix":"wal2json","content":"kept message"}]} + {"change":[{"kind":"message","transactional":false,"prefix":"wal2json","content":"non-transactional message"}]} +(2 rows) + +-- format v2: with skip-empty-xacts +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '2', 'filter-msg-prefixes', 'filtered', 'skip-empty-xacts', '1'); + data +------------------------------------------------------------------------------------------------ + {"action":"B"} + {"action":"M","transactional":true,"prefix":"wal2json","content":"kept message"} + {"action":"C"} + {"action":"M","transactional":false,"prefix":"wal2json","content":"non-transactional message"} +(4 rows) + +-- VACUUM FULL and REFRESH MATERIALIZED VIEW flood the slot with empty +-- transactions (issue #106); transaction counts vary across versions so +-- assert on counts, not raw output +VACUUM FULL w2j_kept; +REFRESH MATERIALIZED VIEW w2j_mv; +SELECT count(*) > 0 AS has_empty_xacts FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept') WHERE data = '{"change":[]}'; + has_empty_xacts +----------------- + t +(1 row) + +SELECT count(*) AS remaining FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1'); + remaining +----------- + 0 +(1 row) + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + stop +(1 row) + +DROP MATERIALIZED VIEW w2j_mv; +DROP TABLE w2j_kept, w2j_filtered, w2j_part; diff --git a/sql/skip_empty_xacts.sql b/sql/skip_empty_xacts.sql new file mode 100644 index 0000000..6f4c26e --- /dev/null +++ b/sql/skip_empty_xacts.sql @@ -0,0 +1,55 @@ +-- Test skip-empty-xacts option (issue #106) +\set VERBOSITY terse + +CREATE TABLE w2j_kept (a integer primary key); +CREATE TABLE w2j_filtered (b integer primary key); +CREATE TABLE w2j_part (a integer, t text, PRIMARY KEY (a, t)) PARTITION BY LIST (t); +CREATE TABLE w2j_part_one PARTITION OF w2j_part FOR VALUES IN ('one'); +CREATE MATERIALIZED VIEW w2j_mv AS SELECT count(*) AS n FROM w2j_kept; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + +-- workload: only the first INSERT survives add-tables filtering +INSERT INTO w2j_kept (a) VALUES (1); +INSERT INTO w2j_filtered (b) VALUES (1); +INSERT INTO w2j_part (a, t) VALUES (1, 'one'); +CREATE TABLE w2j_ddl (c integer); +DROP TABLE w2j_ddl; +TRUNCATE w2j_filtered; + +-- format v1: without skip-empty-xacts, filtered/DDL transactions produce empty changesets +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept'); +-- format v1: with skip-empty-xacts, empty transactions are gone +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1'); +-- format v1: skip-empty-xacts with write-in-chunks +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1', 'write-in-chunks', '1'); +-- format v2: without skip-empty-xacts +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'add-tables', 'public.w2j_kept'); +-- format v2: with skip-empty-xacts +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '2', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1'); + +-- messages: transactional message marks the transaction as non-empty; +-- non-transactional messages are unaffected; a transaction whose only +-- message is prefix-filtered is empty +SELECT 1 FROM pg_logical_emit_message(true, 'wal2json', 'kept message'); +SELECT 1 FROM pg_logical_emit_message(false, 'wal2json', 'non-transactional message'); +SELECT 1 FROM pg_logical_emit_message(true, 'filtered', 'filtered message'); + +-- format v1: filtered transactional message leaves an empty transaction without the option +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'filter-msg-prefixes', 'filtered'); +-- format v1: with skip-empty-xacts the filtered-message transaction disappears +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'filter-msg-prefixes', 'filtered', 'skip-empty-xacts', '1'); +-- format v2: with skip-empty-xacts +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '2', 'filter-msg-prefixes', 'filtered', 'skip-empty-xacts', '1'); + +-- VACUUM FULL and REFRESH MATERIALIZED VIEW flood the slot with empty +-- transactions (issue #106); transaction counts vary across versions so +-- assert on counts, not raw output +VACUUM FULL w2j_kept; +REFRESH MATERIALIZED VIEW w2j_mv; +SELECT count(*) > 0 AS has_empty_xacts FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept') WHERE data = '{"change":[]}'; +SELECT count(*) AS remaining FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1'); + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); +DROP MATERIALIZED VIEW w2j_mv; +DROP TABLE w2j_kept, w2j_filtered, w2j_part; diff --git a/wal2json.c b/wal2json.c index 50e6503..77837e1 100644 --- a/wal2json.c +++ b/wal2json.c @@ -80,6 +80,8 @@ typedef struct bool pretty_print; /* pretty-print JSON? */ bool write_in_chunks; /* write in chunks? (v1) */ bool numeric_data_types_as_string; /* use strings for numeric data types */ + bool skip_empty_xacts; /* skip empty transactions */ + bool xact_wrote_changes; /* has current transaction written any change? */ JsonAction actions; /* output only these actions */ @@ -126,6 +128,8 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions * static void pg_decode_shutdown(LogicalDecodingContext *ctx); static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); +static void pg_output_begin(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, bool last_write); static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void pg_decode_change(LogicalDecodingContext *ctx, @@ -163,7 +167,7 @@ static bool pg_add_by_table(List *add_tables, char *schemaname, char *tablename) /* version 1 */ static void pg_decode_begin_txn_v1(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn); + ReorderBufferTXN *txn, bool last_write); static void pg_decode_commit_txn_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void pg_decode_change_v1(LogicalDecodingContext *ctx, @@ -183,7 +187,7 @@ static void pg_decode_truncate_v1(LogicalDecodingContext *ctx, /* version 2 */ static void pg_decode_begin_txn_v2(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn); + ReorderBufferTXN *txn, bool last_write); static void pg_decode_commit_txn_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void pg_decode_write_value(LogicalDecodingContext *ctx, Datum value, bool isnull, Oid typid); @@ -285,6 +289,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is data->numeric_data_types_as_string = false; data->pretty_print = false; data->write_in_chunks = false; + data->skip_empty_xacts = false; + data->xact_wrote_changes = false; data->include_lsn = false; data->include_not_null = false; data->include_default = false; @@ -548,6 +554,19 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "skip-empty-xacts") == 0) + { + if (elem->arg == NULL) + { + elog(DEBUG1, "skip-empty-xacts argument is null"); + data->skip_empty_xacts = true; + } + else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else if (strcmp(elem->defname, "include-lsn") == 0) { if (elem->arg == NULL) @@ -831,23 +850,40 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { JsonDecodingData *data = ctx->output_plugin_private; + data->xact_wrote_changes = false; + + /* + * If we are skipping empty transactions, defer the BEGIN output until + * the first change for this transaction is emitted. + */ + if (data->skip_empty_xacts) + return; + + pg_output_begin(ctx, txn, true); +} + +static void +pg_output_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, bool last_write) +{ + JsonDecodingData *data = ctx->output_plugin_private; + if (data->format_version == 2) - pg_decode_begin_txn_v2(ctx, txn); + pg_decode_begin_txn_v2(ctx, txn, last_write); else if (data->format_version == 1) - pg_decode_begin_txn_v1(ctx, txn); + pg_decode_begin_txn_v1(ctx, txn, last_write); else elog(ERROR, "format version %d is not supported", data->format_version); } static void -pg_decode_begin_txn_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +pg_decode_begin_txn_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, bool last_write) { JsonDecodingData *data = ctx->output_plugin_private; data->nr_changes = 0; /* Transaction starts */ - OutputPluginPrepareWrite(ctx, true); + OutputPluginPrepareWrite(ctx, last_write); appendStringInfo(ctx->out, "{%s", data->nl); @@ -882,11 +918,11 @@ pg_decode_begin_txn_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) appendStringInfo(ctx->out, "%s\"change\":%s[", data->ht, data->sp); if (data->write_in_chunks) - OutputPluginWrite(ctx, true); + OutputPluginWrite(ctx, last_write); } static void -pg_decode_begin_txn_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +pg_decode_begin_txn_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, bool last_write) { JsonDecodingData *data = ctx->output_plugin_private; @@ -894,7 +930,7 @@ pg_decode_begin_txn_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) if (!data->include_transaction) return; - OutputPluginPrepareWrite(ctx, true); + OutputPluginPrepareWrite(ctx, last_write); appendStringInfoString(ctx->out, "{\"action\":\"B\""); if (data->include_xids) appendStringInfo(ctx->out, ",\"xid\":%u", txn->xid); @@ -927,7 +963,7 @@ pg_decode_begin_txn_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) } appendStringInfoChar(ctx->out, '}'); - OutputPluginWrite(ctx, true); + OutputPluginWrite(ctx, last_write); } /* COMMIT callback */ @@ -936,6 +972,7 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { JsonDecodingData *data = ctx->output_plugin_private; + bool skipped_xact = data->skip_empty_xacts && !data->xact_wrote_changes; /* * Some older minor versions from back branches (10 to 14) calls @@ -948,9 +985,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * logical decoding. */ #if PG_VERSION_NUM >= 160000 - OutputPluginUpdateProgress(ctx, false); /* XXX change 2nd param when skipped empty transaction is supported */ + OutputPluginUpdateProgress(ctx, skipped_xact); #elif PG_VERSION_NUM >= 150000 && PG_VERSION_NUM < 160000 - update_replication_progress(ctx, false); /* XXX change 2nd param when skipped empty transaction is supported */ + update_replication_progress(ctx, skipped_xact); #elif PG_VERSION_NUM >= 140004 && PG_VERSION_NUM < 150000 update_replication_progress(ctx); #elif PG_VERSION_NUM >= 130008 && PG_VERSION_NUM < 140000 @@ -973,6 +1010,10 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginUpdateProgress(ctx); #endif + /* don't output an empty COMMIT for a skipped empty transaction */ + if (skipped_xact) + return; + elog(DEBUG2, "my change counter: " UINT64_FORMAT " ; # of changes: " UINT64_FORMAT " ; # of changes in memory: " UINT64_FORMAT, data->nr_changes, txn->nentries, txn->nentries_mem); elog(DEBUG2, "# of subxacts: %d", txn->nsubtxns); @@ -1753,9 +1794,6 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, schemaname = get_namespace_name(class_form->relnamespace); tablename = NameStr(class_form->relname); - if (data->write_in_chunks) - OutputPluginPrepareWrite(ctx, true); - /* Make sure rd_replidindex is set */ RelationGetIndexList(relation); @@ -1836,6 +1874,14 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + /* output BEGIN if we haven't yet */ + if (data->skip_empty_xacts && !data->xact_wrote_changes) + pg_output_begin(ctx, txn, false); + data->xact_wrote_changes = true; + + if (data->write_in_chunks) + OutputPluginPrepareWrite(ctx, true); + /* Change counter */ data->nr_changes++; @@ -2583,6 +2629,11 @@ pg_decode_change_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, return; } + /* output BEGIN if we haven't yet */ + if (data->skip_empty_xacts && !data->xact_wrote_changes) + pg_output_begin(ctx, txn, false); + data->xact_wrote_changes = true; + pg_decode_write_change(ctx, txn, relation, change); MemoryContextSwitchTo(old); @@ -2650,6 +2701,17 @@ pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } } + /* + * output BEGIN if we haven't yet. Don't do it for non-transactional + * messages because they are not part of a transaction. + */ + if (transactional) + { + if (data->skip_empty_xacts && !data->xact_wrote_changes) + pg_output_begin(ctx, txn, false); + data->xact_wrote_changes = true; + } + if (data->format_version == 2) pg_decode_message_v2(ctx, txn, lsn, transactional, prefix, content_size, content); else if (data->format_version == 1) @@ -2993,6 +3055,11 @@ static void pg_decode_truncate_v2(LogicalDecodingContext *ctx, continue; } + /* output BEGIN if we haven't yet */ + if (data->skip_empty_xacts && !data->xact_wrote_changes) + pg_output_begin(ctx, txn, false); + data->xact_wrote_changes = true; + OutputPluginPrepareWrite(ctx, true); appendStringInfoChar(ctx->out, '{'); appendStringInfoString(ctx->out, "\"action\":\"T\"");