52.1. Примеры логического декодирования #
Следующий пример демонстрирует управление логическим декодированием на уровне SQL.
Прежде чем вы сможете использовать логическое декодирование, вы должны установить в wal_level значение logical, а в max_replication_slots значение, не меньшее 1. После этого вы должны подключиться к целевой базе данных (в следующем примере, это postgres) как суперпользователь.
postgres=# -- Создать слот с именем 'regression_slot', использующий модуль вывода 'test_decoding'
postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
slot_name | lsn
-----------------+-----------
regression_slot | 0/16B1970
(1 row)
postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn
-----------------+---------------+-----------+----------+--------+-------------+-----------------
regression_slot | test_decoding | logical | postgres | f | 0/16A4408 | 0/16A4440
(1 row)
postgres=# -- Пока никакие изменения не видны
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----+-----+------
(0 rows)
postgres=# CREATE TABLE data(id serial primary key, data text);
CREATE TABLE
postgres=# -- DDL не реплицируется, поэтому видна только транзакция
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-------+--------------
0/BA2DA58 | 10297 | BEGIN 10297
0/BA5A5A0 | 10297 | COMMIT 10297
(2 rows)
postgres=# -- Когда изменения прочитаны, они считаются обработанными и уже не выдаются
postgres=# -- в последующем вызове:
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----+-----+------
(0 rows)
postgres=# BEGIN;
postgres=*# INSERT INTO data(data) VALUES('1');
postgres=*# INSERT INTO data(data) VALUES('2');
postgres=*# COMMIT;
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-------+---------------------------------------------------------
0/BA5A688 | 10298 | BEGIN 10298
0/BA5A6F0 | 10298 | table public.data: INSERT: id[integer]:1 data[text]:'1'
0/BA5A7F8 | 10298 | table public.data: INSERT: id[integer]:2 data[text]:'2'
0/BA5A8A8 | 10298 | COMMIT 10298
(4 rows)
postgres=# INSERT INTO data(data) VALUES('3');
postgres=# -- Также можно заглянуть вперёд в потоке изменений, не считывая эти изменения
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-------+---------------------------------------------------------
0/BA5A8E0 | 10299 | BEGIN 10299
0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
0/BA5A990 | 10299 | COMMIT 10299
(3 rows)
postgres=# -- Следующий вызов pg_logical_slot_peek_changes() снова возвращает те же изменения
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-------+---------------------------------------------------------
0/BA5A8E0 | 10299 | BEGIN 10299
0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
0/BA5A990 | 10299 | COMMIT 10299
(3 rows)
postgres=# -- Модулю вывода можно передать параметры, влияющие на форматирование
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on');
lsn | xid | data
-----------+-------+---------------------------------------------------------
0/BA5A8E0 | 10299 | BEGIN 10299
0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
0/BA5A990 | 10299 | COMMIT 10299 (at 2017-05-10 12:07:21.272494-04)
(3 rows)
postgres=# -- Не забудьте удалить слот, который вам больше не нужен, чтобы он
postgres=# -- не потреблял ресурсы сервера:
postgres=# SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
-----------------------
(1 row)Следующие примеры показывают, как можно управлять логическим декодированием средствами протокола потоковой репликации, используя программу pg_recvlogical, включённую в дистрибутив Postgres Pro. Для этого нужно, чтобы конфигурация аутентификации клиентов допускала подключения для репликации (см. Подраздел 26.2.5.1) и чтобы значение max_wal_senders было достаточно большим и позволило установить дополнительное подключение. Второй пример демонстрирует потоковую передачу двухфазных транзакций. Прежде чем использовать двухфазные команды, для параметра max_prepared_transactions нужно установить значение не меньше 1.
Пример 1: $ pg_recvlogical -d postgres --slot=test --create-slot $ pg_recvlogical -d postgres --slot=test --start -f - Control+Z $ psql -d postgres -c "INSERT INTO data(data) VALUES('4');" $ fg BEGIN 693 table public.data: INSERT: id[integer]:4 data[text]:'4' COMMIT 693 Control+C $ pg_recvlogical -d postgres --slot=test --drop-slot Пример 2: $ pg_recvlogical -d postgres --slot=test --create-slot --two-phase $ pg_recvlogical -d postgres --slot=test --start -f - Control+Z $ psql -d postgres -c "BEGIN;INSERT INTO data(data) VALUES('5');PREPARE TRANSACTION 'test';" $ fg BEGIN 694 table public.data: INSERT: id[integer]:5 data[text]:'5' PREPARE TRANSACTION 'test', txid 694 Control+Z $ psql -d postgres -c "COMMIT PREPARED 'test';" $ fg COMMIT PREPARED 'test', txid 694 Control+C $ pg_recvlogical -d postgres --slot=test --drop-slot
В следующем примере показан интерфейс SQL, который можно использовать для декодирования подготовленных транзакций. Перед тем, как использовать команды двухфазной фиксации, вы должны установить значение max_prepared_transactions равным как минимум 1. Вы также должны при создании слота передать в параметре two_phase функции pg_create_logical_replication_slot значение true. Обратите внимание, вся транзакция будет транслироваться после фиксации, если она ещё не была декодирована.
postgres=# BEGIN;
postgres=*# INSERT INTO data(data) VALUES('5');
postgres=*# PREPARE TRANSACTION 'test_prepared1';
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+---------------------------------------------------------
0/1689DC0 | 529 | BEGIN 529
0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
(3 rows)
postgres=# COMMIT PREPARED 'test_prepared1';
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+--------------------------------------------
0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
(4 row)
postgres=#-- вы также можете откатить подготовленную транзакцию
postgres=# BEGIN;
postgres=*# INSERT INTO data(data) VALUES('6');
postgres=*# PREPARE TRANSACTION 'test_prepared2';
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+---------------------------------------------------------
0/168A180 | 530 | BEGIN 530
0/168A1E8 | 530 | table public.data: INSERT: id[integer]:4 data[text]:'6'
0/168A430 | 530 | PREPARE TRANSACTION 'test_prepared2', txid 530
(3 rows)
postgres=# ROLLBACK PREPARED 'test_prepared2';
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+----------------------------------------------
0/168A4B8 | 530 | ROLLBACK PREPARED 'test_prepared2', txid 530
(1 row)