Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \
delete3 delete4 savepoint specialvalue toast bytea message typmod \
filtertable selecttable include_timestamp include_lsn include_xids \
include_domain_data_type truncate type_oid actions position default \
pk rename_column numeric_data_types_as_string
pk rename_column numeric_data_types_as_string skip_empty_xacts

PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
Expand All @@ -18,6 +18,8 @@ endif
# truncate API is available in 11+
ifneq (,$(findstring $(MAJORVERSION),9.4 9.5 9.6 10))
REGRESS := $(filter-out truncate, $(REGRESS))
# skip_empty_xacts uses a primary key on a partitioned table (11+)
REGRESS := $(filter-out skip_empty_xacts, $(REGRESS))
endif

# actions API is available in 11+
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ Parameters
* `numeric-data-types-as-string`: use string for numeric data types. JSON specification does not recognize `Infinity` and `NaN` as valid numeric values. There might be [potential interoperability problems](https://datatracker.ietf.org/doc/html/rfc7159#section-6) for double precision numbers. Default is _false_.
* `pretty-print`: add spaces and indentation to JSON structures. Default is _false_.
* `write-in-chunks`: write after every change instead of every changeset. Only used when `format-version` is `1`. Default is _false_.
* `skip-empty-xacts`: don't include empty transactions. Default is false.
* `include-lsn`: add _nextlsn_ to each changeset. Default is _false_.
* `include-transaction`: emit records denoting the start and end of each transaction. Default is _true_.
* `include-unchanged-toast` (deprecated): Don't use it. It is deprecated.
Expand Down
149 changes: 149 additions & 0 deletions expected/skip_empty_xacts.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
-- Test skip-empty-xacts option (issue #106)
\set VERBOSITY terse
CREATE TABLE w2j_kept (a integer primary key);
CREATE TABLE w2j_filtered (b integer primary key);
CREATE TABLE w2j_part (a integer, t text, PRIMARY KEY (a, t)) PARTITION BY LIST (t);
CREATE TABLE w2j_part_one PARTITION OF w2j_part FOR VALUES IN ('one');
CREATE MATERIALIZED VIEW w2j_mv AS SELECT count(*) AS n FROM w2j_kept;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');
?column?
----------
init
(1 row)

-- workload: only the first INSERT survives add-tables filtering
INSERT INTO w2j_kept (a) VALUES (1);
INSERT INTO w2j_filtered (b) VALUES (1);
INSERT INTO w2j_part (a, t) VALUES (1, 'one');
CREATE TABLE w2j_ddl (c integer);
DROP TABLE w2j_ddl;
TRUNCATE w2j_filtered;
-- format v1: without skip-empty-xacts, filtered/DDL transactions produce empty changesets
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept');
data
--------------------------------------------------------------------------------------------------------------------------------------
{"change":[{"kind":"insert","schema":"public","table":"w2j_kept","columnnames":["a"],"columntypes":["integer"],"columnvalues":[1]}]}
{"change":[]}
{"change":[]}
{"change":[]}
{"change":[]}
{"change":[]}
(6 rows)

-- format v1: with skip-empty-xacts, empty transactions are gone
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1');
data
--------------------------------------------------------------------------------------------------------------------------------------
{"change":[{"kind":"insert","schema":"public","table":"w2j_kept","columnnames":["a"],"columntypes":["integer"],"columnvalues":[1]}]}
(1 row)

-- format v1: skip-empty-xacts with write-in-chunks
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1', 'write-in-chunks', '1');
data
-------------------------------------------------------------------------------------------------------------------------
{"change":[
{"kind":"insert","schema":"public","table":"w2j_kept","columnnames":["a"],"columntypes":["integer"],"columnvalues":[1]}
]}
(3 rows)

-- format v2: without skip-empty-xacts
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'add-tables', 'public.w2j_kept');
data
---------------------------------------------------------------------------------------------------------
{"action":"B"}
{"action":"I","schema":"public","table":"w2j_kept","columns":[{"name":"a","type":"integer","value":1}]}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
(13 rows)

-- format v2: with skip-empty-xacts
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '2', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1');
data
---------------------------------------------------------------------------------------------------------
{"action":"B"}
{"action":"I","schema":"public","table":"w2j_kept","columns":[{"name":"a","type":"integer","value":1}]}
{"action":"C"}
(3 rows)

-- messages: transactional message marks the transaction as non-empty;
-- non-transactional messages are unaffected; a transaction whose only
-- message is prefix-filtered is empty
SELECT 1 FROM pg_logical_emit_message(true, 'wal2json', 'kept message');
?column?
----------
1
(1 row)

SELECT 1 FROM pg_logical_emit_message(false, 'wal2json', 'non-transactional message');
?column?
----------
1
(1 row)

SELECT 1 FROM pg_logical_emit_message(true, 'filtered', 'filtered message');
?column?
----------
1
(1 row)

-- format v1: filtered transactional message leaves an empty transaction without the option
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'filter-msg-prefixes', 'filtered');
data
-----------------------------------------------------------------------------------------------------------------
{"change":[{"kind":"message","transactional":true,"prefix":"wal2json","content":"kept message"}]}
{"change":[{"kind":"message","transactional":false,"prefix":"wal2json","content":"non-transactional message"}]}
{"change":[]}
(3 rows)

-- format v1: with skip-empty-xacts the filtered-message transaction disappears
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'filter-msg-prefixes', 'filtered', 'skip-empty-xacts', '1');
data
-----------------------------------------------------------------------------------------------------------------
{"change":[{"kind":"message","transactional":true,"prefix":"wal2json","content":"kept message"}]}
{"change":[{"kind":"message","transactional":false,"prefix":"wal2json","content":"non-transactional message"}]}
(2 rows)

-- format v2: with skip-empty-xacts
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '2', 'filter-msg-prefixes', 'filtered', 'skip-empty-xacts', '1');
data
------------------------------------------------------------------------------------------------
{"action":"B"}
{"action":"M","transactional":true,"prefix":"wal2json","content":"kept message"}
{"action":"C"}
{"action":"M","transactional":false,"prefix":"wal2json","content":"non-transactional message"}
(4 rows)

-- VACUUM FULL and REFRESH MATERIALIZED VIEW flood the slot with empty
-- transactions (issue #106); transaction counts vary across versions so
-- assert on counts, not raw output
VACUUM FULL w2j_kept;
REFRESH MATERIALIZED VIEW w2j_mv;
SELECT count(*) > 0 AS has_empty_xacts FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept') WHERE data = '{"change":[]}';
has_empty_xacts
-----------------
t
(1 row)

SELECT count(*) AS remaining FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1');
remaining
-----------
0
(1 row)

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
stop
(1 row)

DROP MATERIALIZED VIEW w2j_mv;
DROP TABLE w2j_kept, w2j_filtered, w2j_part;
55 changes: 55 additions & 0 deletions sql/skip_empty_xacts.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
-- Test skip-empty-xacts option (issue #106)
\set VERBOSITY terse

CREATE TABLE w2j_kept (a integer primary key);
CREATE TABLE w2j_filtered (b integer primary key);
CREATE TABLE w2j_part (a integer, t text, PRIMARY KEY (a, t)) PARTITION BY LIST (t);
CREATE TABLE w2j_part_one PARTITION OF w2j_part FOR VALUES IN ('one');
CREATE MATERIALIZED VIEW w2j_mv AS SELECT count(*) AS n FROM w2j_kept;

SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');

-- workload: only the first INSERT survives add-tables filtering
INSERT INTO w2j_kept (a) VALUES (1);
INSERT INTO w2j_filtered (b) VALUES (1);
INSERT INTO w2j_part (a, t) VALUES (1, 'one');
CREATE TABLE w2j_ddl (c integer);
DROP TABLE w2j_ddl;
TRUNCATE w2j_filtered;

-- format v1: without skip-empty-xacts, filtered/DDL transactions produce empty changesets
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept');
-- format v1: with skip-empty-xacts, empty transactions are gone
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1');
-- format v1: skip-empty-xacts with write-in-chunks
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1', 'write-in-chunks', '1');
-- format v2: without skip-empty-xacts
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'add-tables', 'public.w2j_kept');
-- format v2: with skip-empty-xacts
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '2', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1');

-- messages: transactional message marks the transaction as non-empty;
-- non-transactional messages are unaffected; a transaction whose only
-- message is prefix-filtered is empty
SELECT 1 FROM pg_logical_emit_message(true, 'wal2json', 'kept message');
SELECT 1 FROM pg_logical_emit_message(false, 'wal2json', 'non-transactional message');
SELECT 1 FROM pg_logical_emit_message(true, 'filtered', 'filtered message');

-- format v1: filtered transactional message leaves an empty transaction without the option
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'filter-msg-prefixes', 'filtered');
-- format v1: with skip-empty-xacts the filtered-message transaction disappears
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'filter-msg-prefixes', 'filtered', 'skip-empty-xacts', '1');
-- format v2: with skip-empty-xacts
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '2', 'filter-msg-prefixes', 'filtered', 'skip-empty-xacts', '1');

-- VACUUM FULL and REFRESH MATERIALIZED VIEW flood the slot with empty
-- transactions (issue #106); transaction counts vary across versions so
-- assert on counts, not raw output
VACUUM FULL w2j_kept;
REFRESH MATERIALIZED VIEW w2j_mv;
SELECT count(*) > 0 AS has_empty_xacts FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept') WHERE data = '{"change":[]}';
SELECT count(*) AS remaining FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '1', 'add-tables', 'public.w2j_kept', 'skip-empty-xacts', '1');

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
DROP MATERIALIZED VIEW w2j_mv;
DROP TABLE w2j_kept, w2j_filtered, w2j_part;
Loading