Fix for 0000289: Inconsistent backend state
authorMuhammad Usama <m.usama@gmail.com>
Mon, 10 Apr 2017 19:53:59 +0000 (00:53 +0500)
committerMuhammad Usama <m.usama@gmail.com>
Mon, 10 Apr 2017 19:53:59 +0000 (00:53 +0500)
Pgpool-II syncs the backend node states at the time of startup which works fine
for almost all cases except when the watchdog cluster becomes partitioned
(because of some network problem) and after recovering from it the Pgpool-II
nodes (that are already up and serving) joins back the cluster. At that time the
backend node status among different nodes can become In-sync among the Pgpool-II
nodes, if the backend node status on the newly joined Pgpool-II is different
from the status on existing watchdog cluster nodes.

Now with this commit, every time the Pgpool-II node joins the watchdog cluster
as a standby either at startup or after some problem recovery, it realigns its
backend node statuses with the cluster leader, by fetching the statuses from
the master/coordinator and updating the local statuses of each backend.

The patch also borrows the logic of smart restarting of Pgpool-II children
from the failover() function to minimise the chances of session disconnection
when the backend node statuses are updated from the master/coordinator Pgpool-II.

src/include/pool.h
src/include/watchdog/watchdog.h
src/include/watchdog/wd_ipc_commands.h
src/include/watchdog/wd_ipc_defines.h
src/include/watchdog/wd_json_data.h
src/main/pgpool_main.c
src/watchdog/watchdog.c
src/watchdog/wd_commands.c
src/watchdog/wd_json_data.c

index 3feef1f3264ff51b676dccf4bc9a1e05e75bfc8e..98bb00cd4109334779e7542d3c1b3c3ac7355845 100644 (file)
@@ -514,6 +514,7 @@ extern char remote_port[];  /* client port */
 /*
  * public functions
  */
+extern void register_watchdog_state_change_interupt(void);
 extern bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, int count, bool switch_over, unsigned int wd_failover_id);
 extern char *get_config_file_name(void);
 extern char *get_hba_file_name(void);
index f8dda212a4a29d7230d5debec3dd15c04d61c5c3..636042173d34deafcf667d3caa57e654cea464fd 100644 (file)
@@ -27,6 +27,7 @@
 #define WATCHDOG_H
 
 #include <sys/time.h>
+#include "pool_config.h"
 
 #define WD_TIME_INIT(tv)      ((tv).tv_sec = (tv).tv_usec = 0)
 #define WD_TIME_ISSET(tv)     ((tv).tv_sec || (tv).tv_usec)
index bc5bd16d306e458983b340749b35776b83c9f96b..723f9e96afe774e461648d62d4c15e8bf6e793a5 100644 (file)
@@ -45,6 +45,19 @@ typedef struct WDIPCCmdResult
        char*   data;
 }WDIPCCmdResult;
 
+typedef struct WDGenericData
+{
+       WDValueDataType valueType;
+       union data
+       {
+               char    *stringVal;
+               int             intVal;
+               bool    boolVal;
+               long    longVal;
+       }data;
+}WDGenericData;
+
+
 extern void wd_ipc_initialize_data(void);
 extern char* get_watchdog_ipc_address(void);
 extern unsigned int* get_ipc_shared_key(void);
@@ -62,6 +75,8 @@ extern WDFailoverCMDResults wd_degenerate_backend_set(int *node_id_set, int coun
 extern WDFailoverCMDResults wd_promote_backend(int node_id, unsigned int *wd_failover_id);
 
 extern WDPGBackendStatus* get_pg_backend_status_from_master_wd_node(void);
+extern WDGenericData *get_wd_runtime_variable_value(char *varName);
+extern WD_STATES get_watchdog_local_node_state(void);
 
 extern char* wd_get_watchdog_nodes(int nodeID);
 
index b058aebdfc59c964dbb2d354c964e928d846f414..e8f5a44179a6931271809e69f9f8be5faa007897 100644 (file)
@@ -61,6 +61,13 @@ typedef enum WDFailoverCMDResults
        FAILOVER_RES_TIMEOUT
 }WDFailoverCMDResults;
 
+typedef enum WDValueDataType
+{
+       VALUE_DATA_TYPE_INT = 1,
+       VALUE_DATA_TYPE_STRING,
+       VALUE_DATA_TYPE_BOOL,
+       VALUE_DATA_TYPE_LONG
+}WDValueDataType;
 
 /* IPC MESSAGES TYPES */
 #define WD_REGISTER_FOR_NOTIFICATION           '0'
@@ -77,6 +84,7 @@ typedef enum WDFailoverCMDResults
 #define WD_IPC_ONLINE_RECOVERY_COMMAND         'r'
 #define WD_FAILOVER_LOCKING_REQUEST                    's'
 #define WD_GET_MASTER_DATA_REQUEST                     'd'
+#define WD_GET_RUNTIME_VARIABLE_VALUE          'v'
 
 #define WD_FUNCTION_START_RECOVERY             "START_RECOVERY"
 #define WD_FUNCTION_END_RECOVERY               "END_RECOVERY"
@@ -85,7 +93,10 @@ typedef enum WDFailoverCMDResults
 #define WD_FUNCTION_PROMOTE_REQUEST            "PROMOTE_BACKEND_REQUEST"
 
 #define WD_DATE_REQ_PG_BACKEND_DATA            "BackendStatus"
-
+#define WD_JSON_KEY_DATA_REQ_TYPE              "DataRequestType"
+#define WD_JSON_KEY_VARIABLE_NAME              "VarName"
+#define WD_JSON_KEY_VALUE_DATA_TYPE            "ValueDataType"
+#define WD_JSON_KEY_VALUE_DATA                 "ValueData"
 
 #define WD_REQ_FAILOVER_START                  "FAILOVER_START"
 #define WD_REQ_FAILOVER_END                            "FAILOVER_FINISH"
@@ -107,6 +118,12 @@ typedef enum WDFailoverCMDResults
                                                                                                 * to authenticate the internal pgpool-II processes
                                                                                                 */
 
+/* Watchdog runtime variable names */
+
+#define WD_RUNTIME_VAR_WD_STATE                        "WDState"
+#define WD_RUNTIME_VAR_QUORUM_STATE            "QuorumState"
+#define WD_RUNTIME_VAR_ESCALATION_STATE        "Escalated"
+
 /* Use to inform node new node status by lifecheck */
 #define WD_LIFECHECK_NODE_STATUS_DEAD  1
 #define WD_LIFECHECK_NODE_STATUS_ALIVE 2
index 1b91ef91cfdbcf270457b4e3f90d26acc7dbd86e..780f1d6563e49d0f5370f70d7a6cceccfec5193b 100644 (file)
@@ -77,6 +77,8 @@ extern char* get_wd_simple_message_json(char* message);
 extern WDPGBackendStatus* get_pg_backend_node_status_from_json(char* json_data, int data_len);
 extern char* get_backend_node_status_json(WatchdogNode* wdNode);
 
+extern char* get_simple_request_json(char *key, char* value, unsigned int sharedKey, char* authKey);
+
 extern bool parse_data_request_json(char* json_data, int data_len, char** request_type);
 extern char* get_data_request_json(char* request_type, unsigned int sharedKey, char* authKey);
 
index 2f7620bafd7ec2743680d0393cbe185c1de62bf5..5983c1d44af7ccace54dff81013bf844e7cbfcb3 100644 (file)
 
 #include "watchdog/watchdog.h"
 
+/*
+ * Reasons for signalling a pgpool-II main process
+ */
+typedef enum
+{
+       SIG_FAILOVER_INTERRUPT,         /* signal main to start failover */
+       SIG_WATCHDOG_STATE_CHANGED,     /* notify main about local watchdog node state changed */
+       MAX_INTERUPTS                           /* Must be last! */
+} User1SignalReason;
+
+
+typedef struct User1SignalSlot
+{
+       sig_atomic_t    signalFlags[MAX_INTERUPTS];
+}User1SignalSlot;
 /*
  * Process pending signal actions.
  */
                        wakeup_children(); \
                        wakeup_request = 0; \
                } \
-               if (failover_request) \
+               if (sigusr1_request) \
                { \
-                       failover(); \
-                       failover_request = 0; \
+                       sigusr1_interupt_processor(); \
+                       sigusr1_request = 0; \
                } \
                if (sigchld_request) \
                { \
 
 static int process_backend_health_check_failure(int health_check_node_id, int retrycnt);
 static bool do_health_check(bool use_template_db, volatile int *health_check_node_id);
+static void signal_user1_to_parent_with_reason(User1SignalReason reason);
 
 static void FileUnlink(int code, Datum path);
 static pid_t pcp_fork_a_child(int unix_fd, int inet_fd, char *pcp_conf_file);
@@ -107,6 +123,7 @@ static int create_unix_domain_socket(struct sockaddr_un un_addr_tmp);
 static int create_inet_domain_socket(const char *hostname, const int port);
 static int *create_inet_domain_sockets(const char *hostname, const int port);
 static void failover(void);
+static bool check_all_backend_down(void);
 static void reaper(void);
 static void wakeup_children(void);
 static void reload_config(void);
@@ -117,7 +134,8 @@ static pid_t fork_follow_child(int old_master, int new_primary, int old_primary)
 static int read_status_file(bool discard_status);
 static RETSIGTYPE exit_handler(int sig);
 static RETSIGTYPE reap_handler(int sig);
-static RETSIGTYPE failover_handler(int sig);
+static RETSIGTYPE sigusr1_handler(int sig);
+static void sigusr1_interupt_processor(void);
 static RETSIGTYPE reload_config_handler(int sig);
 static RETSIGTYPE health_check_timer_handler(int sig);
 static RETSIGTYPE wakeup_handler(int sig);
@@ -131,11 +149,12 @@ static int find_primary_node_repeatedly(void);
 static void terminate_all_childrens();
 static void system_will_go_down(int code, Datum arg);
 static char* process_name_from_pid(pid_t pid);
-static void initialize_backend_status_from_watchdog(void);
+static void sync_backend_from_watchdog(void);
 
 static struct sockaddr_un un_addr;             /* unix domain socket path */
 static struct sockaddr_un pcp_un_addr;  /* unix domain socket path for PCP */
 ProcessInfo *process_info = NULL;              /* Per child info table on shmem */
+volatile User1SignalSlot       *user1SignalSlot = NULL;/* User 1 signal slot on shmem */
 struct timeval random_start_time;
 
 /*
@@ -159,12 +178,12 @@ extern char conf_file[POOLMAXPATHLEN+1];
 extern char hba_file[POOLMAXPATHLEN+1];
 
 static int exiting = 0;                /* non 0 if I'm exiting */
-static int switching = 0;              /* non 0 if I'm fail overing or degenerating */
+static int switching = 0;              /* non 0 if I'm failing over or degenerating */
 
 POOL_REQUEST_INFO *Req_info;           /* request info area in shared memory */
 volatile sig_atomic_t *InRecovery; /* non 0 if recovery is started */
 volatile sig_atomic_t reload_config_request = 0;
-static volatile sig_atomic_t failover_request = 0;
+static volatile sig_atomic_t sigusr1_request = 0;
 static volatile sig_atomic_t sigchld_request = 0;
 static volatile sig_atomic_t wakeup_request = 0;
 
@@ -250,11 +269,11 @@ int PgpoolMain(bool discard_status, bool clear_memcache_oidmaps)
                 */
                pool_signal(SIGUSR2, wakeup_handler);
                pool_signal(SIGCHLD, reap_handler);
-               pool_signal(SIGUSR1, failover_handler);
+               pool_signal(SIGUSR1, sigusr1_handler);
 
                /*
                 * okay as we need to wait until watchdog is in stable state
-                * so only wait for SIGUSR2, SIGCHLD, and signals those are
+                * so only wait for SIGUSR1, SIGCHLD, and signals those are
                 * necessary to make sure we respond to user requests of shutdown
                 * if it arrives while we are in waiting state.
                 *
@@ -265,7 +284,7 @@ int PgpoolMain(bool discard_status, bool clear_memcache_oidmaps)
                 * once our backend status will be synchronized across the cluster
                 */
                sigfillset(&mask);
-               sigdelset(&mask, SIGUSR2);
+               sigdelset(&mask, SIGUSR1);
                sigdelset(&mask, SIGCHLD);
                sigdelset(&mask, SIGTERM);
                sigdelset(&mask, SIGINT);
@@ -273,7 +292,7 @@ int PgpoolMain(bool discard_status, bool clear_memcache_oidmaps)
                watchdog_pid = initialize_watchdog();
                ereport (LOG,
                                 (errmsg("waiting for watchdog to initialize")));
-               while (wakeup_request == 0 && sigchld_request == 0)
+               while (sigusr1_request == 0 && sigchld_request == 0)
                {
                        sigsuspend(&mask);
                }
@@ -292,8 +311,11 @@ int PgpoolMain(bool discard_status, bool clear_memcache_oidmaps)
                 */
                wd_lifecheck_pid = initialize_watchdog_lifecheck();
 
-               /* load the backend node status from watchdog cluster */
-               initialize_backend_status_from_watchdog();
+               if (sigusr1_request)
+               {
+                       sigusr1_interupt_processor();
+                       sigusr1_request = 0;
+               }
        }
 
        fds[0] = create_unix_domain_socket(un_addr);
@@ -347,7 +369,7 @@ int PgpoolMain(bool discard_status, bool clear_memcache_oidmaps)
        pool_signal(SIGINT, exit_handler);
        pool_signal(SIGQUIT, exit_handler);
        pool_signal(SIGCHLD, reap_handler);
-       pool_signal(SIGUSR1, failover_handler);
+       pool_signal(SIGUSR1, sigusr1_handler);
        pool_signal(SIGUSR2, wakeup_handler);
        pool_signal(SIGHUP, reload_config_handler);
 
@@ -644,13 +666,23 @@ bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, i
                POOL_SETMASK(&oldmask);
                if(failover_in_progress == false)
                {
-                       pool_signal_parent(SIGUSR1);
+                       signal_user1_to_parent_with_reason(SIG_FAILOVER_INTERRUPT);
                }
        }
 
        return true;
 }
 
+void register_watchdog_state_change_interupt(void)
+{
+       signal_user1_to_parent_with_reason(SIG_WATCHDOG_STATE_CHANGED);
+}
+static void signal_user1_to_parent_with_reason(User1SignalReason reason)
+{
+       user1SignalSlot->signalFlags[reason] = true;
+       pool_signal_parent(SIGUSR1);
+}
+
 /*
  * fork a child for PCP
  */
@@ -1500,19 +1532,19 @@ static int get_next_master_node(void)
  * handle SIGUSR1
  *
  */
-static RETSIGTYPE failover_handler(int sig)
+static RETSIGTYPE sigusr1_handler(int sig)
 {
        int save_errno = errno;
 
        POOL_SETMASK(&BlockSig);
-       failover_request = 1;
+       sigusr1_request = 1;
 
        write(pipe_fds[1], "\0", 1);
 
 #ifdef NOT_USED
        if(write(pipe_fds[1], "\0", 1) < 0)
         ereport(WARNING,
-                (errmsg("failover_handler: write to pipe failed with error \"%s\"", strerror(errno))));
+                (errmsg("SIGUSR1 handler: write to pipe failed with error \"%s\"", strerror(errno))));
 #endif
 
        POOL_SETMASK(&UnBlockSig);
@@ -1520,6 +1552,60 @@ static RETSIGTYPE failover_handler(int sig)
        errno = save_errno;
 }
 
+static void sigusr1_interupt_processor(void)
+{
+       ereport(DEBUG1,
+                       (errmsg("Pgpool-II parent process received SIGUSR1")));
+
+       if (user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED])
+       {
+               ereport(DEBUG1,
+                               (errmsg("Pgpool-II parent process received SIGUSR1 from watchdog")));
+
+               user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED] = false;
+               if (get_watchdog_local_node_state() == WD_STANDBY)
+               {
+                       ereport(LOG,
+                               (errmsg("we have joined the watchdog cluster as STANDBY node"),
+                                        errdetail("syncing the backend states from the MASTER watchdog node")));
+                       sync_backend_from_watchdog();
+               }
+       }
+       if (user1SignalSlot->signalFlags[SIG_FAILOVER_INTERRUPT])
+       {
+               ereport(LOG,
+                               (errmsg("Pgpool-II parent process has received failover request")));
+               user1SignalSlot->signalFlags[SIG_FAILOVER_INTERRUPT] = false;
+               if (processState == INITIALIZING)
+               {
+                       ereport(LOG,
+                                       (errmsg("ignoring the failover request, since we are still starting up")));
+               }
+               else
+               {
+                       failover();
+               }
+       }
+}
+
+/* returns true if all backends are down */
+static bool check_all_backend_down(void)
+{
+       int i;
+       /* Check to see if all backends are down */
+       for (i=0;i<NUM_BACKENDS;i++)
+       {
+               if (BACKEND_INFO(i).backend_status != CON_DOWN &&
+                       BACKEND_INFO(i).backend_status != CON_UNUSED)
+               {
+                       ereport(LOG,
+                                       (errmsg("Node %d is not down (status: %d)",
+                                                       i, BACKEND_INFO(i).backend_status)));
+                       return false;
+               }
+       }
+       return true;
+}
 
 /*
  * backend connection error, failover/failback request, if possible
@@ -1659,18 +1745,7 @@ static void failover(void)
                                         BACKEND_INFO(node_id).backend_port)));
 
                        /* Check to see if all backends are down */
-                       for (i=0;i<NUM_BACKENDS;i++)
-                       {
-                               if (BACKEND_INFO(i).backend_status != CON_DOWN &&
-                                       BACKEND_INFO(i).backend_status != CON_UNUSED)
-                               {
-                                       ereport(LOG,
-                                                       (errmsg("Node %d is not down (status: %d)",
-                                                                       i, BACKEND_INFO(i).backend_status)));
-                                       all_backend_down = false;
-                                       break;
-                               }
-                       }
+                       all_backend_down = check_all_backend_down();
 
                        BACKEND_INFO(node_id).backend_status = CON_CONNECT_WAIT;        /* unset down status */
                        (void)write_status_file();
@@ -1778,7 +1853,7 @@ static void failover(void)
                if (STREAM && reqkind == NODE_UP_REQUEST && all_backend_down == false)
                {
                        ereport(LOG,
-                                       (errmsg("Do not restart children because we are failbacking node id %d host: %s port: %d and we are in streaming replication mode and not all backends were down", node_id,
+                                       (errmsg("Do not restart children because we are failing back node id %d host: %s port: %d and we are in streaming replication mode and not all backends were down", node_id,
                                         BACKEND_INFO(node_id).backend_hostname,
                                         BACKEND_INFO(node_id).backend_port)));
 
@@ -3026,6 +3101,7 @@ static void initialize_shared_mem_objects(bool clear_memcache_oidmaps)
                process_info[i].connection_info = pool_coninfo(i,0,0);
        }
 
+       user1SignalSlot = pool_shared_memory_create(sizeof(User1SignalSlot));
        /* create fail over/switch over event area */
        Req_info = pool_shared_memory_create(sizeof(POOL_REQUEST_INFO));
 
@@ -3442,72 +3518,240 @@ int pool_frontend_exists(void)
        return -1;
 }
 
-static void initialize_backend_status_from_watchdog(void)
+/*
+ * The function fetch the current status of all configured backend
+ * nodes from the MASTER/COORDINATOR watchdog Pgpool-II and synchronize the
+ * local backend states with the cluster wide status of each node.
+ *
+ * Latter in the funcrtion after syncing the backend node status the function
+ * do a partial or full restart of Pgpool-II children depending upon the
+ * Pgpool-II mode and type of node status change
+ *
+ */
+static void sync_backend_from_watchdog(void)
 {
-       if (pool_config->use_watchdog)
+       bool primary_changed = false;
+       bool node_status_was_changed_to_down = false;
+       bool node_status_was_changed_to_up = false;
+       bool need_to_restart_children = false;
+       bool partial_restart = false;
+       bool reload_maste_node_id = false;
+
+       int down_node_ids[MAX_NUM_BACKENDS];
+       int down_node_ids_index = 0;
+       int i;
+
+       /*
+        * Ask the watchdog to get all the backend states from the Master/Coordinator
+        * Pgpool-II node
+        */
+       WDPGBackendStatus* backendStatus = get_pg_backend_status_from_master_wd_node();
+       if (!backendStatus)
+       {
+               ereport(WARNING,
+                       (errmsg("failed to get the backend status from the master watchdog node"),
+                                errdetail("using the local backend node status")));
+               return;
+       }
+       if (backendStatus->node_count <= 0)
        {
-               WDPGBackendStatus* backendStatus = get_pg_backend_status_from_master_wd_node();
-               if (backendStatus)
+               /*
+                * -ve node count is returned by watchdog when the node itself is a master
+                * and in that case we need to use the loacl backend node status
+                */
+               ereport(LOG,
+                       (errmsg("I am the master watchdog node"),
+                                errdetail("using the local backend node status")));
+               pfree(backendStatus);
+               return;
+       }
+
+       ereport(LOG,
+                       (errmsg("master watchdog node \"%s\" returned status for %d backend nodes",backendStatus->nodeName,backendStatus->node_count)));
+
+       ereport(DEBUG1,
+                       (errmsg("primary node on master watchdog node \"%s\" is %d",backendStatus->nodeName,backendStatus->primary_node_id)));
+
+       if (Req_info->primary_node_id != backendStatus->primary_node_id)
+       {
+               /* Do not produce this log message if we are starting up the Pgpool-II*/
+               if (processState != INITIALIZING)
+                       ereport(LOG,
+                                       (errmsg("primary node:%d on master watchdog node \"%s\" is different from local primary node:%d",
+                                                       backendStatus->primary_node_id,backendStatus->nodeName,Req_info->primary_node_id)));
+
+               Req_info->primary_node_id = backendStatus->primary_node_id;
+               primary_changed = true;
+       }
+       /* update the local backend status*/
+       for (i = 0; i < backendStatus->node_count; i++)
+       {
+               if (backendStatus->backend_status[i] == CON_DOWN)
                {
-                       if (backendStatus->node_count <= 0)
+                       if (BACKEND_INFO(i).backend_status != CON_DOWN)
                        {
-                               /*
-                                * -ve node count is returned by watchdog when the node itself is a master
-                                * and in that case we need to use the loacl backend node status
-                                */
+                               BACKEND_INFO(i).backend_status = CON_DOWN;
+                               my_backend_status[i] = &(BACKEND_INFO(i).backend_status);
+                               reload_maste_node_id = true;
+                               node_status_was_changed_to_down = true;
                                ereport(LOG,
-                                               (errmsg("I am the master watchdog node"),
-                                                errdetail("using the local backend node status")));
+                                               (errmsg("backend:%d is set to down status", i),
+                                                errdetail("backend:%d is DOWN on cluster master \"%s\"",i,backendStatus->nodeName)));
+                               down_node_ids[down_node_ids_index++] = i;
                        }
-                       else
+               }
+               else if (backendStatus->backend_status[i] == CON_CONNECT_WAIT ||
+                                backendStatus->backend_status[i] == CON_UP)
+               {
+                       if (BACKEND_INFO(i).backend_status != CON_CONNECT_WAIT)
                        {
-                               int i;
-                               bool reload_maste_node_id = false;
-                               ereport(LOG,
-                                               (errmsg("master watchdog node \"%s\" returned status for %d backend nodes",backendStatus->nodeName,backendStatus->node_count)));
+                               if (BACKEND_INFO(i).backend_status == CON_DOWN)
+                                       node_status_was_changed_to_up = true;
+
+                               BACKEND_INFO(i).backend_status = CON_CONNECT_WAIT;
+                               my_backend_status[i] = &(BACKEND_INFO(i).backend_status);
+                               reload_maste_node_id = true;
+
                                ereport(LOG,
-                                               (errmsg("primary node on master watchdog node \"%s\" is %d",backendStatus->nodeName,backendStatus->primary_node_id)));
+                                       (errmsg("backend:%d is set to UP status", i),
+                                                errdetail("backend:%d is UP on cluster master \"%s\"",i,backendStatus->nodeName)));
 
-                               Req_info->primary_node_id = backendStatus->primary_node_id;
+                       }
+               }
+       }
+       pfree(backendStatus);
 
-                               for (i = 0; i < backendStatus->node_count; i++)
-                               {
-                                       if (backendStatus->backend_status[i] == CON_DOWN)
-                                       {
-                                               if (BACKEND_INFO(i).backend_status != CON_DOWN)
-                                               {
+       if (reload_maste_node_id)
+       {
+               Req_info->master_node_id = get_next_master_node();
+       }
 
-                                                       BACKEND_INFO(i).backend_status = CON_DOWN;
-                                                       my_backend_status[i] = &(BACKEND_INFO(i).backend_status);
-                                                       reload_maste_node_id = true;
-                                                       ereport(LOG,
-                                                                       (errmsg("backend status from \"%s\" backend:%d is set to down status",backendStatus->nodeName, i)));
-                                               }
-                                       }
-                                       else if (backendStatus->backend_status[i] == CON_CONNECT_WAIT ||
-                                                               backendStatus->backend_status[i] == CON_UP)
+       /* We don't need to do anything else if the Pgpool-II is starting up */
+       if (processState == INITIALIZING)
+               return;
+
+       /*
+        * Decide if All or subset of the Pgpool-II children needs immidiate
+        * restart or we can do that after finishing the current session
+        *
+        * Check if there was no change at all
+        */
+       if (node_status_was_changed_to_up == false &&
+        node_status_was_changed_to_down == false &&
+        primary_changed == false)
+       {
+               ereport(LOG,
+                       (errmsg("backend nodes status remains same after the sync from \"%s\"",backendStatus->nodeName)));
+               return;
+       }
+       if (!STREAM)
+       {
+               /* If we are not in streaming replication mode
+                * restart all child processes
+                */
+               ereport(LOG,
+                       (errmsg("node status was chenged after the sync from \"%s\"",backendStatus->nodeName),
+                                errdetail("all children needs to be restarted as we are not in streaming replication mode")));
+               need_to_restart_children = true;
+               partial_restart = false;
+       }
+       else if (primary_changed)
+       {
+               /* if Primary node was changed, We should restart all
+                * children
+                */
+               need_to_restart_children = true;
+               partial_restart = false;
+               ereport(LOG,
+                       (errmsg("primary node was chenged after the sync from \"%s\"",backendStatus->nodeName),
+                               errdetail("all children needs to be restarted")));
+
+       }
+       else
+       {
+               if (node_status_was_changed_to_down == false)
+               {
+                       /* no node was detached, So no need to restart
+                        * any child process
+                        */
+                       need_to_restart_children = false;
+                       partial_restart = false;
+                       ereport(LOG,
+                               (errmsg("No backend node was detached because of backend status sync from \"%s\"",backendStatus->nodeName),
+                                        errdetail("no need to restart children")));
+               }
+               else
+               {
+                       ereport(LOG,
+                               (errmsg("%d backend node(s) were detached because of backend status sync from \"%s\"",down_node_ids_index,backendStatus->nodeName),
+                                        errdetail("restarting the children processes")));
+
+                       need_to_restart_children = true;
+                       partial_restart = !check_all_backend_down();
+               }
+       }
+
+       /* Kill children and restart them if needed */
+       if (need_to_restart_children)
+       {
+               for (i=0;i<pool_config->num_init_children;i++)
+               {
+                       bool restart = false;
+
+                       if (partial_restart)
+                       {
+                               int j, k;
+                               for (j=0;j<pool_config->max_pool;j++)
+                               {
+                                       for (k=0;k<NUM_BACKENDS;k++)
                                        {
-                                               if (BACKEND_INFO(i).backend_status != CON_CONNECT_WAIT)
+                                               int idx;
+                                               ConnectionInfo *con = pool_coninfo(i, j, k);
+                                               for (idx = 0; idx < down_node_ids_index; idx ++)
                                                {
-                                                       BACKEND_INFO(i).backend_status = CON_CONNECT_WAIT;
-                                                       my_backend_status[i] = &(BACKEND_INFO(i).backend_status);
-                                                       reload_maste_node_id = true;
+                                                       int node_id = down_node_ids[idx];
+                                                       if (con->connected && con->load_balancing_node == node_id)
+                                                       {
+                                                               ereport(LOG,
+                                                                               (errmsg("child process with PID:%d needs restart, because pool %d uses backend %d",
+                                                                                               process_info[i].pid, j, node_id)));
+                                                               restart = true;
+                                                               break;
+                                                       }
+                                                       if (restart)
+                                                               break;
                                                }
                                        }
                                }
+                       }
+                       else
+                       {
+                               restart = true;
+                       }
 
-                               if (reload_maste_node_id)
+                       if (restart)
+                       {
+                               if (process_info[i].pid)
                                {
-                                       Req_info->master_node_id = get_next_master_node();
+                                       kill(process_info[i].pid, SIGQUIT);
+
+                                       process_info[i].pid = fork_a_child(fds, i);
+                                       process_info[i].start_time = time(NULL);
                                }
                        }
-                       pfree(backendStatus);
+                       else
+                               process_info[i].need_to_restart = 1;
                }
-               else
+       }
+
+       else
+       {
+               /* Set restart request to each child. Children will exit(1)
+                * whenever they are convenient.
+                */
+               for (i=0;i<pool_config->num_init_children;i++)
                {
-                       ereport(WARNING,
-                               (errmsg("failed to get the backend status from the master watchdog node"),
-                                        errdetail("using the local backend node status")));
+                       process_info[i].need_to_restart = 1;
                }
        }
 }
index 1b6ff32f2f8bc850144ac6b9a7ea2dc7fe917787..f22241bed367df4e962ebe14528b015f2e41bbfc 100644 (file)
@@ -142,7 +142,8 @@ packet_types all_packet_types[] = {
        {WD_INFORM_I_AM_GOING_DOWN, "INFORM I AM GOING DOWN"},
        {WD_ASK_FOR_POOL_CONFIG, "ASK FOR POOL CONFIG"},
        {WD_POOL_CONFIG_DATA, "CONFIG DATA"},
-       {WD_GET_MASTER_DATA_REQUEST, "DATA REQUEST"},
+       {WD_GET_MASTER_DATA_REQUEST, "DATA REQUEST FOR MASTER"},
+       {WD_GET_RUNTIME_VARIABLE_VALUE, "GET WD RUNTIME VARIABLE VALUE"},
        {WD_CMD_REPLY_IN_DATA, "COMMAND REPLY IN DATA"},
        {WD_FAILOVER_LOCKING_REQUEST,"FAILOVER LOCKING REQUEST"},
        {WD_CLUSTER_SERVICE_MESSAGE, "CLUSTER SERVICE MESSAGE"},
@@ -454,6 +455,7 @@ static WDCommandData* get_wd_IPC_command_from_socket(int sock);
 static IPC_CMD_PREOCESS_RES process_IPC_command(WDCommandData* ipcCommand);
 static IPC_CMD_PREOCESS_RES process_IPC_nodeStatusChange_command(WDCommandData* ipcCommand);
 static IPC_CMD_PREOCESS_RES process_IPC_nodeList_command(WDCommandData* ipcCommand);
+static IPC_CMD_PREOCESS_RES process_IPC_get_runtime_variable_value_request(WDCommandData* ipcCommand);
 static IPC_CMD_PREOCESS_RES process_IPC_online_recovery(WDCommandData* ipcCommand);
 static IPC_CMD_PREOCESS_RES process_IPC_failover_locking_cmd(WDCommandData *ipcCommand);
 static IPC_CMD_PREOCESS_RES process_IPC_data_request_from_master(WDCommandData *ipcCommand);
@@ -489,7 +491,6 @@ static int wd_create_recv_socket(int port);
 static void wd_check_config(void);
 static pid_t watchdog_main(void);
 static pid_t fork_watchdog_child(void);
-static void cluster_in_stable_state(void);
 static bool check_IPC_client_authentication(json_value *rootObj, bool internal_client_only);
 static bool check_and_report_IPC_authentication(WDCommandData* ipcCommand);
 
@@ -1796,6 +1797,8 @@ static IPC_CMD_PREOCESS_RES process_IPC_command(WDCommandData* ipcCommand)
                case WD_GET_MASTER_DATA_REQUEST:
                        return process_IPC_data_request_from_master(ipcCommand);
 
+               case WD_GET_RUNTIME_VARIABLE_VALUE:
+                       return process_IPC_get_runtime_variable_value_request(ipcCommand);
                default:
                        ipcCommand->errorMessage = MemoryContextStrdup(ipcCommand->memoryContext,"unknown IPC command type");
                        break;
@@ -1804,6 +1807,70 @@ static IPC_CMD_PREOCESS_RES process_IPC_command(WDCommandData* ipcCommand)
 }
 
 
+static IPC_CMD_PREOCESS_RES process_IPC_get_runtime_variable_value_request(WDCommandData* ipcCommand)
+{
+       /* get the json for node list */
+       JsonNode* jNode = NULL;
+       char* requestVarName = NULL;
+
+       if (ipcCommand->sourcePacket.len <= 0 || ipcCommand->sourcePacket.data == NULL)
+               return IPC_CMD_ERROR;
+
+       json_value *root = json_parse(ipcCommand->sourcePacket.data,ipcCommand->sourcePacket.len);
+       /* The root node must be object */
+       if (root == NULL || root->type != json_object)
+       {
+               json_value_free(root);
+               ereport(NOTICE,
+                       (errmsg("failed to process get local variable IPC command"),
+                                errdetail("unable to parse json data")));
+               return IPC_CMD_ERROR;
+       }
+
+       requestVarName = json_get_string_value_for_key(root, WD_JSON_KEY_VARIABLE_NAME);
+
+       if (requestVarName == NULL)
+       {
+               json_value_free(root);
+               ipcCommand->errorMessage = MemoryContextStrdup(ipcCommand->memoryContext,
+                                                                                                          "requested variable name is null");
+               return IPC_CMD_ERROR;
+       }
+
+       jNode = jw_create_with_object(true);
+
+       if (strcasecmp(WD_RUNTIME_VAR_WD_STATE, requestVarName) == 0)
+       {
+               jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA_TYPE, VALUE_DATA_TYPE_INT);
+               jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA, g_cluster.localNode->state);
+       }
+       else if (strcasecmp(WD_RUNTIME_VAR_QUORUM_STATE, requestVarName) == 0)
+       {
+               jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA_TYPE, VALUE_DATA_TYPE_INT);
+               jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA, g_cluster.quorum_status);
+       }
+       else if (strcasecmp(WD_RUNTIME_VAR_ESCALATION_STATE, requestVarName) == 0)
+       {
+               jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA_TYPE, VALUE_DATA_TYPE_BOOL);
+               jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA, g_cluster.escalated);
+       }
+       else
+       {
+               json_value_free(root);
+               jw_destroy(jNode);
+               ipcCommand->errorMessage = MemoryContextStrdup(ipcCommand->memoryContext,
+                                                                                                          "unknown variable requested");
+               return IPC_CMD_ERROR;
+       }
+
+       jw_finish_document(jNode);
+       json_value_free(root);
+       write_ipc_command_with_result_data(ipcCommand, WD_IPC_CMD_RESULT_OK,
+                                                                                        jw_get_json_string(jNode), jw_get_json_length(jNode) +1);
+       jw_destroy(jNode);
+       return IPC_CMD_COMPLETE;
+}
+
 static IPC_CMD_PREOCESS_RES process_IPC_nodeList_command(WDCommandData* ipcCommand)
 {
        /* get the json for node list */
@@ -2496,7 +2563,7 @@ static IPC_CMD_PREOCESS_RES process_failover_locking_requests_on_cordinator(WDCo
                syncRequestType = json_get_string_value_for_key(root, "SyncRequestType");
                json_get_int_value_for_key(root, "FailoverLockID", &failoverLockID);
                json_get_int_value_for_key(root, "WDFailoverID", (int*)&failoverID);
-               if (syncRequestType == false)
+               if (syncRequestType == NULL)
                {
                        ereport(LOG,
                                        (errmsg("unable to process failover command lock request from %s",
@@ -4704,16 +4771,6 @@ static bool wd_commands_packet_processor(WD_EVENTS event, WatchdogNode* wdNode,
 }
 
 
-static void cluster_in_stable_state(void)
-{
-       if (g_cluster.clusterInitialized == false)
-       {
-               g_cluster.clusterInitialized = true;
-               /* Inform the parent */
-               kill(getppid(), SIGUSR2);
-       }
-}
-
 static void update_interface_status(void)
 {
        struct ifaddrs *ifAddrStruct=NULL;
@@ -5350,7 +5407,7 @@ static int watchdog_state_machine_coordinator(WD_EVENTS event, WatchdogNode* wdN
                                                         errdetail("our declare coordinator message is accepted by all nodes")));
 
                                        g_cluster.masterNode = g_cluster.localNode;
-                                       cluster_in_stable_state();
+                                       register_watchdog_state_change_interupt();
 
                                        /*
                                         * Check if the quorum is present then start the escalation process
@@ -6025,7 +6082,7 @@ static int watchdog_state_machine_standby(WD_EVENTS event, WatchdogNode* wdNode,
                                if (clusterCommand->commandStatus == COMMAND_FINISHED_ALL_REPLIED ||
                                        clusterCommand->commandStatus == COMMAND_FINISHED_TIMEOUT)
                                {
-                                       cluster_in_stable_state();
+                                       register_watchdog_state_change_interupt();
 
                                        ereport(LOG,
                                                (errmsg("successfully joined the watchdog cluster as standby node"),
@@ -6220,7 +6277,7 @@ static int get_mimimum_nodes_required_for_quorum(void)
 
 
 /*
- * sets the state of local watchdog node, and fires an state change event
+ * sets the state of local watchdog node, and fires a state change event
  * if the new and old state differes
  */
 static int set_state(WD_STATES newState)
@@ -6819,6 +6876,7 @@ static bool check_and_report_IPC_authentication(WDCommandData* ipcCommand)
                case WD_NODE_STATUS_CHANGE_COMMAND:
                case WD_REGISTER_FOR_NOTIFICATION:
                case WD_GET_NODES_LIST_COMMAND:
+               case WD_GET_RUNTIME_VARIABLE_VALUE:
                        internal_client_only = false;
                        break;
 
index 08d5fe9aab93d442d7f7917f987b5c54d468509e..42c7770519311832f665f01e5ba63b1fe0fafd68 100644 (file)
@@ -76,7 +76,6 @@ unsigned int *ipc_shared_key = NULL;   /* key lives in shared memory
                                                                                * used to identify the ipc internal
                                                                                * clients
                                                                                */
-
 void wd_ipc_initialize_data(void)
 {
        if (watchdog_ipc_address == NULL)
@@ -112,6 +111,30 @@ void wd_ipc_initialize_data(void)
        }
 }
 
+WD_STATES get_watchdog_local_node_state(void)
+{
+       WD_STATES ret = WD_DEAD;
+       WDGenericData *state = get_wd_runtime_variable_value(WD_RUNTIME_VAR_WD_STATE);
+       if (state == NULL)
+       {
+               ereport(LOG,
+                               (errmsg("failed to get current state of local watchdog node"),
+                                errdetail("get runtime variable value from watchdog returned no data")));
+               return WD_DEAD;
+       }
+       if (state->valueType != VALUE_DATA_TYPE_INT)
+       {
+               ereport(LOG,
+                               (errmsg("failed to get current state of local watchdog node"),
+                                errdetail("get runtime variable value from watchdog returned invalid value type")));
+               pfree(state);
+               return WD_DEAD;
+       }
+       ret = (WD_STATES)state->data.intVal;
+       pfree(state);
+       return ret;
+}
+
 char* get_watchdog_ipc_address(void)
 {
        return watchdog_ipc_address;
@@ -294,6 +317,157 @@ issue_command_to_watchdog(char type, int timeout_sec, char* data, int data_len,
        return result;
 }
 
+/*
+ * Function gets the runtime value of watchdog varibale using the
+ * watchdog IPC
+ */
+WDGenericData *get_wd_runtime_variable_value(char *varName)
+{
+       unsigned int *shared_key = get_ipc_shared_key();
+       char *data = get_simple_request_json(WD_JSON_KEY_VARIABLE_NAME,varName,
+                                                                          shared_key?*shared_key:0,pool_config->wd_authkey);
+
+       WDIPCCmdResult *result = issue_command_to_watchdog(WD_GET_RUNTIME_VARIABLE_VALUE,
+                                                                                                          WD_DEFAULT_IPC_COMMAND_TIMEOUT,
+                                                                                                          data, strlen(data), true);
+       pfree(data);
+
+       if (result == NULL)
+       {
+               ereport(WARNING,
+                       (errmsg("get runtime variable value from watchdog failed"),
+                                errdetail("issue command to watchdog returned NULL")));
+               return NULL;
+       }
+       if (result->type == WD_IPC_CMD_CLUSTER_IN_TRAN)
+       {
+               ereport(WARNING,
+                               (errmsg("get runtime variable value from watchdog failed"),
+                                errdetail("watchdog cluster is not in stable state"),
+                                       errhint("try again when the cluster is fully initialized")));
+               FreeCmdResult(result);
+               return NULL;
+       }
+       else if (result->type == WD_IPC_CMD_TIMEOUT)
+       {
+               ereport(WARNING,
+                               (errmsg("get runtime variable value from watchdog failed"),
+                                errdetail("ipc command timeout")));
+               FreeCmdResult(result);
+               return NULL;
+       }
+       else if (result->type == WD_IPC_CMD_RESULT_OK)
+       {
+               json_value *root = NULL;
+               WDGenericData *genData = NULL;
+               WDValueDataType dayaType;
+
+               root = json_parse(result->data, result->length);
+               /* The root node must be object */
+               if (root == NULL || root->type != json_object)
+               {
+                       FreeCmdResult(result);
+                       return NULL;
+               }
+
+               if (json_get_int_value_for_key(root, WD_JSON_KEY_VALUE_DATA_TYPE, (int*)&dayaType))
+               {
+                       FreeCmdResult(result);
+                       json_value_free(root);
+                       return NULL;
+               }
+
+               switch (dayaType) {
+                       case VALUE_DATA_TYPE_INT:
+                       {
+                               int intVal;
+                               if (json_get_int_value_for_key(root, WD_JSON_KEY_VALUE_DATA, &intVal))
+                               {
+                                       ereport(WARNING,
+                                               (errmsg("get runtime variable value from watchdog failed"),
+                                                        errdetail("unable to get INT value from JSON data returned by watchdog")));
+                               }
+                               else
+                               {
+                                       genData = palloc(sizeof(WDGenericData));
+                                       genData->valueType = dayaType;
+                                       genData->data.intVal = intVal;
+                               }
+                       }
+                               break;
+
+                       case VALUE_DATA_TYPE_LONG:
+                       {
+                               long longVal;
+                               if (json_get_long_value_for_key(root, WD_JSON_KEY_VALUE_DATA, &longVal))
+                               {
+                                       ereport(WARNING,
+                                               (errmsg("get runtime variable value from watchdog failed"),
+                                                        errdetail("unable to get LONG value from JSON data returned by watchdog")));
+                               }
+                               else
+                               {
+                                       genData = palloc(sizeof(WDGenericData));
+                                       genData->valueType = dayaType;
+                                       genData->data.longVal = longVal;
+                               }
+                       }
+                               break;
+
+                       case VALUE_DATA_TYPE_BOOL:
+                       {
+                               bool boolVal;
+                               if (json_get_bool_value_for_key(root, WD_JSON_KEY_VALUE_DATA, &boolVal))
+                               {
+                                       ereport(WARNING,
+                                               (errmsg("get runtime variable value from watchdog failed"),
+                                                        errdetail("unable to get BOOL value from JSON data returned by watchdog")));
+                               }
+                               else
+                               {
+                                       genData = palloc(sizeof(WDGenericData));
+                                       genData->valueType = dayaType;
+                                       genData->data.boolVal = boolVal;
+                               }
+                       }
+                               break;
+
+                       case VALUE_DATA_TYPE_STRING:
+                       {
+                               char *ptr = json_get_string_value_for_key(root, WD_JSON_KEY_VALUE_DATA);
+                               if (ptr == NULL)
+                               {
+                                       ereport(WARNING,
+                                               (errmsg("get runtime variable value from watchdog failed"),
+                                                        errdetail("unable to get STRING value from JSON data returned by watchdog")));
+                               }
+                               else
+                               {
+                                       genData = palloc(sizeof(WDGenericData));
+                                       genData->valueType = dayaType;
+                                       genData->data.stringVal = pstrdup(ptr);
+                               }
+                       }
+                               break;
+
+                       default:
+                               ereport(WARNING,
+                                               (errmsg("get runtime variable value from watchdog failed, unknown value data type")));
+                               break;
+               }
+
+               json_value_free(root);
+               FreeCmdResult(result);
+               return genData;
+       }
+
+       ereport(WARNING,
+                       (errmsg("get runtime variable value from watchdog failed")));
+       FreeCmdResult(result);
+       return NULL;
+
+}
+
 /*
  * function gets the PG backend status of all attached nodes from
  * the master watchdog node.
index 6fedd4da15932cd9f60679519ab630842bdc8ead..ef3e7ec03272f4bdfc929180800429395707c028 100644 (file)
@@ -32,8 +32,6 @@
 #include "watchdog/wd_ipc_defines.h"
 #include "pool.h"
 
-#define WD_JSON_KEY_DATA_REQ_TYPE      "DataRequestType"
-
 
 POOL_CONFIG* get_pool_config_from_json(char* json_data, int data_len)
 {
@@ -227,7 +225,10 @@ char* get_pool_config_json(void)
        return json_str;
 }
 
-char* get_data_request_json(char* request_type, unsigned int sharedKey, char* authKey)
+/* The function returs the simple JSON string that contains
+ * only one KEY,VALUE along with the authkey key value if provided
+ */
+char* get_simple_request_json(char *key, char* value, unsigned int sharedKey, char* authKey)
 {
        char* json_str;
 
@@ -238,13 +239,18 @@ char* get_data_request_json(char* request_type, unsigned int sharedKey, char* au
        if (authKey != NULL && strlen(authKey) > 0)
                jw_put_string(jNode, WD_IPC_AUTH_KEY, authKey); /*  put the auth key*/
 
-       jw_put_string(jNode, WD_JSON_KEY_DATA_REQ_TYPE, request_type);
+       jw_put_string(jNode, key, value);
        jw_finish_document(jNode);
        json_str = pstrdup(jw_get_json_string(jNode));
        jw_destroy(jNode);
        return json_str;
 }
 
+char* get_data_request_json(char* request_type, unsigned int sharedKey, char* authKey)
+{
+       return get_simple_request_json(WD_JSON_KEY_DATA_REQ_TYPE, request_type, sharedKey, authKey);
+}
+
 bool parse_data_request_json(char* json_data, int data_len, char** request_type)
 {
        json_value *root;