(1 row)
select * from pgq_node.get_consumer_info('aqueue');
- consumer_name | provider_node | last_tick_id | paused | uptodate
----------------+---------------+--------------+--------+----------
- node1_worker | node1 | 1 | f | f
+ consumer_name | provider_node | last_tick_id | paused | uptodate | cur_error
+---------------+---------------+--------------+--------+----------+-----------
+ node1_worker | node1 | 1 | f | f |
(1 row)
select * from pgq_node.unregister_subscriber('aqueue', 'node3');
(1 row)
select * from pgq_node.local_state;
- queue_name | consumer_name | provider_node | last_tick_id | paused | uptodate
-------------+------------------+---------------+--------------+--------+----------
- aqueue | node1_worker | node1 | 1 | f | f
- bqueue | node2_worker | node1 | 1 | f | f
- bqueue | random_consumer | node1 | 1 | f | f
- bqueue | random_consumer2 | node1 | 1 | f | f
+ queue_name | consumer_name | provider_node | last_tick_id | cur_error | paused | uptodate
+------------+------------------+---------------+--------------+-----------+--------+----------
+ aqueue | node1_worker | node1 | 1 | | f | f
+ bqueue | node2_worker | node1 | 1 | | f | f
+ bqueue | random_consumer | node1 | 1 | | f | f
+ bqueue | random_consumer2 | node1 | 1 | | f | f
(4 rows)
select * from pgq_node.node_info;
select * from pgq_node.is_root_node('cqueue');
ERROR: queue does not exist: cqueue
select * from pgq_node.get_consumer_state('bqueue', 'random_consumer');
- ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate
-----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------
- 100 | Ok | branch | node2 | 1 | node1 | dbname=node1 | f | f
+ ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error
+----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------+-----------
+ 100 | Ok | branch | node2 | 1 | node1 | dbname=node1 | f | f |
(1 row)
select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2');
- ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate
-----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------
- 100 | Ok | branch | node2 | 1 | node1 | dbname=node1 | f | f
+ ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error
+----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------+-----------
+ 100 | Ok | branch | node2 | 1 | node1 | dbname=node1 | f | f |
+(1 row)
+
+select * from pgq_node.set_consumer_error('bqueue', 'random_consumer2', 'failure');
+ ret_code | ret_note
+----------+-------------------------------------------
+ 100 | Consumer random_consumer2 error = failure
+(1 row)
+
+select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2');
+ ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error
+----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------+-----------
+ 100 | Ok | branch | node2 | 1 | node1 | dbname=node1 | f | f | failure
(1 row)
select * from pgq_node.set_consumer_completed('bqueue', 'random_consumer2', 2);
100 | Consumer random_consumer2 compleded tick = 2
(1 row)
+select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2');
+ ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error
+----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------+-----------
+ 100 | Ok | branch | node2 | 2 | node1 | dbname=node1 | f | f |
+(1 row)
+
select * from pgq_node.set_consumer_paused('bqueue', 'random_consumer2', true);
ret_code | ret_note
----------+--------------------------------------------
(1 row)
select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2');
- ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate
-----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------
- 100 | Ok | branch | node2 | 2 | node3 | dbname=node3 | t | f
+ ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error
+----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------+-----------
+ 100 | Ok | branch | node2 | 2 | node3 | dbname=node3 | t | f |
(1 row)
select * from pgq_node.unregister_consumer('bqueue', 'random_consumer2');
(1 row)
select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2');
- ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate
-----------+-------------------------------------------+-----------+-----------+----------------+---------------+-------------------+--------+----------
- 404 | Unknown consumer: bqueue/random_consumer2 | | | | | | |
+ ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error
+----------+-------------------------------------------+-----------+-----------+----------------+---------------+-------------------+--------+----------+-----------
+ 404 | Unknown consumer: bqueue/random_consumer2 | | | | | | | |
(1 row)
select * from pgq_node.get_node_info('bqueue');