H.6. wal2json — convert WAL changes into JSON via logical decoding #
The wal2json module is a Postgres Pro extension for logical decoding that converts database changes from the write-ahead log (WAL) into JSON format. Postgres Pro has access to tuples generated by INSERT and UPDATE operations. Depending on the configured replica identity, it can also access previous row versions of UPDATE and DELETE. Changes can be consumed either through the streaming protocol (logical replication slots) or through a specialized SQL API.
Format version 1 generates a JSON object per transaction. This object contains all new and old tuples, with optional inclusion of properties such as transaction timestamps, schema-qualified names, data types, and transaction identifiers.
Format version 2 generates a JSON object per tuple, with optional JSON objects marking transaction start and end. Different tuple properties can also be included.
H.6.1. Installation and Setup #
The wal2json extension is provided with Postgres Pro Enterprise as a separate pre-built package wal2json-ent-16 (for the detailed installation instructions, see Chapter 17). Once you have Postgres Pro Enterprise installed, complete the following steps to enable wal2json:
Enable logical decoding by setting
wal_leveltologicalin thepostgresql.conffile.Optionally, you can also edit values of the
max_replication_slotsandmax_wal_sendersparameters.Restart the database server for the changes to take effect.
H.6.2. Options #
The wal2json module provides a variety of options for managing logical decoding:
include-xidsAdd transaction IDs to each set of changes in the JSON output. The default is
off.include-timestampAdd
timestampto each set of changes in the JSON output. The default isoff.include-schemasAdd the schema name to each change record in the JSON output. The default is
on.include-typesAdd
typeto each change record in the JSON output. The default ison.include-typmodAdd type modifiers for columns that have them. The default is
on.include-type-oidsAdd type OIDs. The default is off.
include-domain-data-typeReplace a domain name with the underlying data type. The default is
off.include-column-positionsAdd the column position (
pg_attribute.attnum). The default isoff.include-originAdd the origin of each change. The default is
off.include-not-nullAdd information whether a column is marked as
not nullin thecolumnoptionalsfield of the JSON output. The default isoff.include-defaultAdd default expressions to the JSON output. The default is
off.include-pkAdd primary key information including column names and data type to the
pkfield of the JSON output. The default isoff.numeric-data-types-as-stringConvert values of numeric data types to strings in the JSON output. The JSON specification does not support
InfinityandNaNas valid numeric values. There might be potential interoperability problems for double precision numbers. The default isoff.pretty-printAdd spaces and indentation to the JSON output structure. The default is
off.write-in-chunksEnable writing the JSON output in smaller chunks instead of writing the entire changeset at once. This option is used only when
format-versionis set to1. The default isoff.include-lsnAdd the
nextlsnfield to each changeset. The default isoff.include-transactionAdd records denoting the start and end of each transaction to the JSON output. The default is
off.filter-originsExclude changes from the specified origins. The default is empty, which means that no origin will be filtered. The value is a comma-separated list.
filter-tablesExclude rows from the specified tables. The default is empty, which means that no table will be filtered. The value is a comma-separated list. The tables should be schema-qualified.
*.foomeans thefootable in all schemas, andbar.*means all tables in thebarschema. Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash. Schema and table names are case-sensitive. For example, table"public"."Foo bar"should be specified aspublic.Foo\ bar.add-tablesInclude rows only from the specified tables. The default is empty, which means that all tables from all schemas are included. It follows the same rules as
filter-tables.filter-msg-prefixesExclude messages whose prefix is specified in the option value. The default is empty, which means that no messages will be filtered. The value is a comma-separated list.
add-msg-prefixesInclude only messages whose prefix is specified in the option value. The default is all prefixes. The value is a comma-separated list.
wal2jsonappliesfilter-msg-prefixesbefore this option.format-versionDefine which output format to use. The default is
1.actionsDefine which operations will be included in the JSON output. The default is all actions (
INSERT,UPDATE,DELETE, andTRUNCATE). However, if you are usingformat-version = 1,TRUNCATEis not enabled (for backward compatibility).
H.6.3. Examples #
There are two ways to obtain the changes (JSON objects) from wal2json: calling functions via SQL or pg_recvlogical.
H.6.3.1. pg_recvlogical #
This is an example how to obtain JSON objects from wal2json using pg_recvlogical. Besides the configuration mentioned above, it is necessary to configure a replication connection to use pg_recvlogical. Since Postgres Pro version 10, logical replication matches a normal entry with a database name or keywords such as all.
To configure a replication connection and database parameters:
Add a replication connection rule to
pg_hba.conf:local mydatabase myuser trustOptionally, set
max_wal_sendersinpostgresql.conf:max_wal_senders = 1Restart the database server if you changed
max_wal_senders.
To obtain JSON objects from wal2json:
Open a terminal and connect to the database:
$ pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json $ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -In another terminal:
$ cat /tmp/example1.sql CREATE TABLE table1_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE table1_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); BEGIN; INSERT INTO table1_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT INTO table1_with_pk (b, c) VALUES('Tuning', now()); INSERT INTO table1_with_pk (b, c) VALUES('Replication', now()); SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered'); SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered'); DELETE FROM table1_with_pk WHERE a < 3; SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction'); INSERT INTO table1_without_pk (b, c) VALUES(2.34, 'Tapir'); -- it is not added to stream because there isn't a pk or a replica identity UPDATE table1_without_pk SET c = 'Anta' WHERE c = 'Tapir'; COMMIT; DROP TABLE table1_with_pk; DROP TABLE table1_without_pk; $ psql -At -f /tmp/example1.sql postgres CREATE TABLE CREATE TABLE BEGIN INSERT 0 1 INSERT 0 1 INSERT 0 1 3/78BFC828 3/78BFC880 DELETE 2 3/78BFC990 INSERT 0 1 UPDATE 1 COMMIT DROP TABLE DROP TABLEThe output in the first terminal might look like this:
$ psql -At -f /tmp/example2.sql postgres CREATE TABLE CREATE TABLE init BEGIN INSERT 0 1 INSERT 0 1 INSERT 0 1 3/78C2CA50 3/78C2CAA8 DELETE 2 3/78C2CBD8 INSERT 0 1 UPDATE 1 COMMIT { "change": [ { "kind": "message", "transactional": false, "prefix": "wal2json", "content": "this non-transactional message will be delivered even if you rollback the transaction" } ] } psql:/tmp/example2.sql:17: WARNING: table "table2_without_pk" without primary key or replica identity is nothing { "change": [ { "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"] } ,{ "kind": "message", "transactional": true, "prefix": "wal2json", "content": "this message will be delivered" } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [1, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [2, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "insert", "schema": "public", "table": "table2_without_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "numeric(5,2)", "text"], "columnvalues": [1, 2.34, "Tapir"] } ] } stop DROP TABLE DROP TABLETo drop the slot in the first terminal:
Ctrl+C $ pg_recvlogical -d postgres --slot test_slot --drop-slot
H.6.3.2. Calling SQL Functions #
These are examples how to obtain changes from wal2json via SQL.
If format-version is set to 1, the script might look like this:
$ cat /tmp/example2.sql
CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
BEGIN;
INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());
SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
DELETE FROM table2_with_pk WHERE a < 3;
SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');
INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');
-- it is not added to stream because there isn't a pk or a replica identity
UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1', 'add-msg-prefixes', 'wal2json');
SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
DROP TABLE table2_with_pk;
DROP TABLE table2_without_pk;
The expected output might look like this:
$ psql -At -f /tmp/example2.sql postgres
CREATE TABLE
CREATE TABLE
init
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78C2CA50
3/78C2CAA8
DELETE 2
3/78C2CBD8
INSERT 0 1
UPDATE 1
COMMIT
{
"change": [
{
"kind": "message",
"transactional": false,
"prefix": "wal2json",
"content": "this non-transactional message will be delivered even if you rollback the transaction"
}
]
}
psql:/tmp/example2.sql:17: WARNING: table "table2_without_pk" without primary key or replica identity is nothing
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "message",
"transactional": true,
"prefix": "wal2json",
"content": "this message will be delivered"
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [1, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [2, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_without_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "numeric(5,2)", "text"],
"columnvalues": [1, 2.34, "Tapir"]
}
]
}
stop
DROP TABLE
DROP TABLE
If format-version is set to 2, the script might look like this:
$ cat /tmp/example3.sql
CREATE TABLE table3_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table3_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
BEGIN;
INSERT INTO table3_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table3_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table3_with_pk (b, c) VALUES('Replication', now());
SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
DELETE FROM table3_with_pk WHERE a < 3;
SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');
INSERT INTO table3_without_pk (b, c) VALUES(2.34, 'Tapir');
-- it is not added to stream because there isn't a pk or a replica identity
UPDATE table3_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'format-version', '2', 'add-msg-prefixes', 'wal2json');
SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
DROP TABLE table3_with_pk;
DROP TABLE table3_without_pk;
The expected output might look like this:
$ psql -At -f /tmp/example3.sql postgres
CREATE TABLE
CREATE TABLE
init
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78CB8F30
3/78CB8F88
DELETE 2
3/78CB90B8
INSERT 0 1
UPDATE 1
COMMIT
psql:/tmp/example3.sql:20: WARNING: no tuple identifier for UPDATE in table "public"."table3_without_pk"
{"action":"M","transactional":false,"prefix":"wal2json","content":"this non-transactional message will be delivered even if you rollback the transaction"}
{"action":"B"}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"character varying(30)","value":"Backup and Restore"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":2},{"name":"b","type":"character varying(30)","value":"Tuning"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"character varying(30)","value":"Replication"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"M","transactional":true,"prefix":"wal2json","content":"this message will be delivered"}
{"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":1},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":2},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_without_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"numeric(5,2)","value":2.34},{"name":"c","type":"text","value":"Tapir"}]}
{"action":"C"}
stop
DROP TABLE
DROP TABLE