(4 rows)
select * from pgq_node.create_node('aqueue', 'root', 'node1', 'node1_worker', null, null, null);
- ret_code | ret_note
-----------+-------------------------------------------------------------
- 200 | Node "node1" initialized for queue "node1" with type "root"
+ ret_code | ret_note
+----------+--------------------------------------------------------------
+ 200 | Node "node1" initialized for queue "aqueue" with type "root"
(1 row)
select * from pgq_node.register_subscriber('aqueue', 'node2', 'node2_worker', null);
(1 row)
select * from pgq_node.set_subscriber_watermark('aqueue', 'node2', 3);
- ret_code | ret_note
-----------+----------
- 200 | Ok
+ ret_code | ret_note
+----------+---------------------------
+ 200 | .node2.watermark set to 3
(1 row)
select queue_name, consumer_name, last_tick from pgq.get_consumer_info();
select * from pgq_node.get_node_info('aqueue');
ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick
----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------
- 200 | Ok | root | node1 | 1 | 1 | node1 | dbname=node1 | | | node1_worker | f | f | 1
+ 100 | Ok | root | node1 | 1 | 1 | node1 | dbname=node1 | | | node1_worker | f | f | 1
(1 row)
select * from pgq_node.get_subscriber_info('aqueue');
(1 row)
select * from pgq_node.create_node('bqueue', 'branch', 'node2', 'node2_worker', 'node1', 1, null);
- ret_code | ret_note
-----------+---------------------------------------------------------------
- 200 | Node "node2" initialized for queue "node2" with type "branch"
+ ret_code | ret_note
+----------+----------------------------------------------------------------
+ 200 | Node "node2" initialized for queue "bqueue" with type "branch"
(1 row)
select * from pgq_node.register_consumer('bqueue', 'random_consumer', 'node1', 1);
select * from pgq_node.get_node_info('aqueue');
ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick
----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------
- 200 | Ok | root | node1 | 1 | 1 | node1 | dbname=node1 | | | node1_worker | f | f | 1
+ 100 | Ok | root | node1 | 1 | 1 | node1 | dbname=node1 | | | node1_worker | f | f | 1
(1 row)
select * from pgq_node.get_node_info('bqueue');
ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick
----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------
- 200 | Ok | branch | node2 | 1 | 1 | node1 | dbname=node1 | | | node2_worker | f | f | 1
+ 100 | Ok | branch | node2 | 1 | 1 | node1 | dbname=node1 | | | node2_worker | f | f | 1
(1 row)
select * from pgq_node.get_node_info('cqueue');
select * from pgq_node.get_consumer_state('bqueue', 'random_consumer');
ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate
----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------
- 200 | Ok | branch | node2 | 1 | node1 | dbname=node1 | f | f
+ 100 | Ok | branch | node2 | 1 | node1 | dbname=node1 | f | f
(1 row)
select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2');
ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate
----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------
- 200 | Ok | branch | node2 | 1 | node1 | dbname=node1 | f | f
+ 100 | Ok | branch | node2 | 1 | node1 | dbname=node1 | f | f
(1 row)
select * from pgq_node.set_consumer_completed('bqueue', 'random_consumer2', 2);
- ret_code | ret_note
-----------+----------
- 200 | Ok
+ ret_code | ret_note
+----------+----------------------------------------------
+ 100 | Consumer random_consumer2 compleded tick = 2
(1 row)
select * from pgq_node.set_consumer_paused('bqueue', 'random_consumer2', true);
select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2');
ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate
----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------
- 200 | Ok | branch | node2 | 2 | node3 | dbname=node3 | t | f
+ 100 | Ok | branch | node2 | 2 | node3 | dbname=node3 | t | f
(1 row)
select * from pgq_node.unregister_consumer('bqueue', 'random_consumer2');
select * from pgq_node.get_node_info('bqueue');
ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick
----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------
- 200 | Ok | branch | node2 | 1 | 1 | node1 | dbname=node1 | | | node2_worker | f | f | 1
+ 100 | Ok | branch | node2 | 1 | 1 | node1 | dbname=node1 | | | node2_worker | f | f | 1
(1 row)
set session_replication_role = 'replica';