#include "watchdog/watchdog.h"
+/*
+ * Reasons for signalling a pgpool-II main process
+ */
+typedef enum
+{
+ SIG_FAILOVER_INTERRUPT, /* signal main to start failover */
+ SIG_WATCHDOG_STATE_CHANGED, /* notify main about local watchdog node state changed */
+ MAX_INTERUPTS /* Must be last! */
+} User1SignalReason;
+
+
+typedef struct User1SignalSlot
+{
+ sig_atomic_t signalFlags[MAX_INTERUPTS];
+}User1SignalSlot;
/*
* Process pending signal actions.
*/
wakeup_children(); \
wakeup_request = 0; \
} \
- if (failover_request) \
+ if (sigusr1_request) \
{ \
- failover(); \
- failover_request = 0; \
+ sigusr1_interupt_processor(); \
+ sigusr1_request = 0; \
} \
if (sigchld_request) \
{ \
static int process_backend_health_check_failure(int health_check_node_id, int retrycnt);
static bool do_health_check(bool use_template_db, volatile int *health_check_node_id);
+static void signal_user1_to_parent_with_reason(User1SignalReason reason);
static void FileUnlink(int code, Datum path);
static pid_t pcp_fork_a_child(int unix_fd, int inet_fd, char *pcp_conf_file);
static int create_inet_domain_socket(const char *hostname, const int port);
static int *create_inet_domain_sockets(const char *hostname, const int port);
static void failover(void);
+static bool check_all_backend_down(void);
static void reaper(void);
static void wakeup_children(void);
static void reload_config(void);
static int read_status_file(bool discard_status);
static RETSIGTYPE exit_handler(int sig);
static RETSIGTYPE reap_handler(int sig);
-static RETSIGTYPE failover_handler(int sig);
+static RETSIGTYPE sigusr1_handler(int sig);
+static void sigusr1_interupt_processor(void);
static RETSIGTYPE reload_config_handler(int sig);
static RETSIGTYPE health_check_timer_handler(int sig);
static RETSIGTYPE wakeup_handler(int sig);
static void terminate_all_childrens();
static void system_will_go_down(int code, Datum arg);
static char* process_name_from_pid(pid_t pid);
-static void initialize_backend_status_from_watchdog(void);
+static void sync_backend_from_watchdog(void);
static struct sockaddr_un un_addr; /* unix domain socket path */
static struct sockaddr_un pcp_un_addr; /* unix domain socket path for PCP */
ProcessInfo *process_info = NULL; /* Per child info table on shmem */
+volatile User1SignalSlot *user1SignalSlot = NULL;/* User 1 signal slot on shmem */
struct timeval random_start_time;
/*
extern char hba_file[POOLMAXPATHLEN+1];
static int exiting = 0; /* non 0 if I'm exiting */
-static int switching = 0; /* non 0 if I'm fail overing or degenerating */
+static int switching = 0; /* non 0 if I'm failing over or degenerating */
POOL_REQUEST_INFO *Req_info; /* request info area in shared memory */
volatile sig_atomic_t *InRecovery; /* non 0 if recovery is started */
volatile sig_atomic_t reload_config_request = 0;
-static volatile sig_atomic_t failover_request = 0;
+static volatile sig_atomic_t sigusr1_request = 0;
static volatile sig_atomic_t sigchld_request = 0;
static volatile sig_atomic_t wakeup_request = 0;
*/
pool_signal(SIGUSR2, wakeup_handler);
pool_signal(SIGCHLD, reap_handler);
- pool_signal(SIGUSR1, failover_handler);
+ pool_signal(SIGUSR1, sigusr1_handler);
/*
* okay as we need to wait until watchdog is in stable state
- * so only wait for SIGUSR2, SIGCHLD, and signals those are
+ * so only wait for SIGUSR1, SIGCHLD, and signals those are
* necessary to make sure we respond to user requests of shutdown
* if it arrives while we are in waiting state.
*
* once our backend status will be synchronized across the cluster
*/
sigfillset(&mask);
- sigdelset(&mask, SIGUSR2);
+ sigdelset(&mask, SIGUSR1);
sigdelset(&mask, SIGCHLD);
sigdelset(&mask, SIGTERM);
sigdelset(&mask, SIGINT);
watchdog_pid = initialize_watchdog();
ereport (LOG,
(errmsg("waiting for watchdog to initialize")));
- while (wakeup_request == 0 && sigchld_request == 0)
+ while (sigusr1_request == 0 && sigchld_request == 0)
{
sigsuspend(&mask);
}
*/
wd_lifecheck_pid = initialize_watchdog_lifecheck();
- /* load the backend node status from watchdog cluster */
- initialize_backend_status_from_watchdog();
+ if (sigusr1_request)
+ {
+ sigusr1_interupt_processor();
+ sigusr1_request = 0;
+ }
}
fds[0] = create_unix_domain_socket(un_addr);
pool_signal(SIGINT, exit_handler);
pool_signal(SIGQUIT, exit_handler);
pool_signal(SIGCHLD, reap_handler);
- pool_signal(SIGUSR1, failover_handler);
+ pool_signal(SIGUSR1, sigusr1_handler);
pool_signal(SIGUSR2, wakeup_handler);
pool_signal(SIGHUP, reload_config_handler);
POOL_SETMASK(&oldmask);
if(failover_in_progress == false)
{
- pool_signal_parent(SIGUSR1);
+ signal_user1_to_parent_with_reason(SIG_FAILOVER_INTERRUPT);
}
}
return true;
}
+void register_watchdog_state_change_interupt(void)
+{
+ signal_user1_to_parent_with_reason(SIG_WATCHDOG_STATE_CHANGED);
+}
+static void signal_user1_to_parent_with_reason(User1SignalReason reason)
+{
+ user1SignalSlot->signalFlags[reason] = true;
+ pool_signal_parent(SIGUSR1);
+}
+
/*
* fork a child for PCP
*/
* handle SIGUSR1
*
*/
-static RETSIGTYPE failover_handler(int sig)
+static RETSIGTYPE sigusr1_handler(int sig)
{
int save_errno = errno;
POOL_SETMASK(&BlockSig);
- failover_request = 1;
+ sigusr1_request = 1;
write(pipe_fds[1], "\0", 1);
#ifdef NOT_USED
if(write(pipe_fds[1], "\0", 1) < 0)
ereport(WARNING,
- (errmsg("failover_handler: write to pipe failed with error \"%s\"", strerror(errno))));
+ (errmsg("SIGUSR1 handler: write to pipe failed with error \"%s\"", strerror(errno))));
#endif
POOL_SETMASK(&UnBlockSig);
errno = save_errno;
}
+static void sigusr1_interupt_processor(void)
+{
+ ereport(DEBUG1,
+ (errmsg("Pgpool-II parent process received SIGUSR1")));
+
+ if (user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED])
+ {
+ ereport(DEBUG1,
+ (errmsg("Pgpool-II parent process received SIGUSR1 from watchdog")));
+
+ user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED] = false;
+ if (get_watchdog_local_node_state() == WD_STANDBY)
+ {
+ ereport(LOG,
+ (errmsg("we have joined the watchdog cluster as STANDBY node"),
+ errdetail("syncing the backend states from the MASTER watchdog node")));
+ sync_backend_from_watchdog();
+ }
+ }
+ if (user1SignalSlot->signalFlags[SIG_FAILOVER_INTERRUPT])
+ {
+ ereport(LOG,
+ (errmsg("Pgpool-II parent process has received failover request")));
+ user1SignalSlot->signalFlags[SIG_FAILOVER_INTERRUPT] = false;
+ if (processState == INITIALIZING)
+ {
+ ereport(LOG,
+ (errmsg("ignoring the failover request, since we are still starting up")));
+ }
+ else
+ {
+ failover();
+ }
+ }
+}
+
+/* returns true if all backends are down */
+static bool check_all_backend_down(void)
+{
+ int i;
+ /* Check to see if all backends are down */
+ for (i=0;i<NUM_BACKENDS;i++)
+ {
+ if (BACKEND_INFO(i).backend_status != CON_DOWN &&
+ BACKEND_INFO(i).backend_status != CON_UNUSED)
+ {
+ ereport(LOG,
+ (errmsg("Node %d is not down (status: %d)",
+ i, BACKEND_INFO(i).backend_status)));
+ return false;
+ }
+ }
+ return true;
+}
/*
* backend connection error, failover/failback request, if possible
BACKEND_INFO(node_id).backend_port)));
/* Check to see if all backends are down */
- for (i=0;i<NUM_BACKENDS;i++)
- {
- if (BACKEND_INFO(i).backend_status != CON_DOWN &&
- BACKEND_INFO(i).backend_status != CON_UNUSED)
- {
- ereport(LOG,
- (errmsg("Node %d is not down (status: %d)",
- i, BACKEND_INFO(i).backend_status)));
- all_backend_down = false;
- break;
- }
- }
+ all_backend_down = check_all_backend_down();
BACKEND_INFO(node_id).backend_status = CON_CONNECT_WAIT; /* unset down status */
(void)write_status_file();
if (STREAM && reqkind == NODE_UP_REQUEST && all_backend_down == false)
{
ereport(LOG,
- (errmsg("Do not restart children because we are failbacking node id %d host: %s port: %d and we are in streaming replication mode and not all backends were down", node_id,
+ (errmsg("Do not restart children because we are failing back node id %d host: %s port: %d and we are in streaming replication mode and not all backends were down", node_id,
BACKEND_INFO(node_id).backend_hostname,
BACKEND_INFO(node_id).backend_port)));
process_info[i].connection_info = pool_coninfo(i,0,0);
}
+ user1SignalSlot = pool_shared_memory_create(sizeof(User1SignalSlot));
/* create fail over/switch over event area */
Req_info = pool_shared_memory_create(sizeof(POOL_REQUEST_INFO));
return -1;
}
-static void initialize_backend_status_from_watchdog(void)
+/*
+ * The function fetch the current status of all configured backend
+ * nodes from the MASTER/COORDINATOR watchdog Pgpool-II and synchronize the
+ * local backend states with the cluster wide status of each node.
+ *
+ * Latter in the funcrtion after syncing the backend node status the function
+ * do a partial or full restart of Pgpool-II children depending upon the
+ * Pgpool-II mode and type of node status change
+ *
+ */
+static void sync_backend_from_watchdog(void)
{
- if (pool_config->use_watchdog)
+ bool primary_changed = false;
+ bool node_status_was_changed_to_down = false;
+ bool node_status_was_changed_to_up = false;
+ bool need_to_restart_children = false;
+ bool partial_restart = false;
+ bool reload_maste_node_id = false;
+
+ int down_node_ids[MAX_NUM_BACKENDS];
+ int down_node_ids_index = 0;
+ int i;
+
+ /*
+ * Ask the watchdog to get all the backend states from the Master/Coordinator
+ * Pgpool-II node
+ */
+ WDPGBackendStatus* backendStatus = get_pg_backend_status_from_master_wd_node();
+ if (!backendStatus)
+ {
+ ereport(WARNING,
+ (errmsg("failed to get the backend status from the master watchdog node"),
+ errdetail("using the local backend node status")));
+ return;
+ }
+ if (backendStatus->node_count <= 0)
{
- WDPGBackendStatus* backendStatus = get_pg_backend_status_from_master_wd_node();
- if (backendStatus)
+ /*
+ * -ve node count is returned by watchdog when the node itself is a master
+ * and in that case we need to use the loacl backend node status
+ */
+ ereport(LOG,
+ (errmsg("I am the master watchdog node"),
+ errdetail("using the local backend node status")));
+ pfree(backendStatus);
+ return;
+ }
+
+ ereport(LOG,
+ (errmsg("master watchdog node \"%s\" returned status for %d backend nodes",backendStatus->nodeName,backendStatus->node_count)));
+
+ ereport(DEBUG1,
+ (errmsg("primary node on master watchdog node \"%s\" is %d",backendStatus->nodeName,backendStatus->primary_node_id)));
+
+ if (Req_info->primary_node_id != backendStatus->primary_node_id)
+ {
+ /* Do not produce this log message if we are starting up the Pgpool-II*/
+ if (processState != INITIALIZING)
+ ereport(LOG,
+ (errmsg("primary node:%d on master watchdog node \"%s\" is different from local primary node:%d",
+ backendStatus->primary_node_id,backendStatus->nodeName,Req_info->primary_node_id)));
+
+ Req_info->primary_node_id = backendStatus->primary_node_id;
+ primary_changed = true;
+ }
+ /* update the local backend status*/
+ for (i = 0; i < backendStatus->node_count; i++)
+ {
+ if (backendStatus->backend_status[i] == CON_DOWN)
{
- if (backendStatus->node_count <= 0)
+ if (BACKEND_INFO(i).backend_status != CON_DOWN)
{
- /*
- * -ve node count is returned by watchdog when the node itself is a master
- * and in that case we need to use the loacl backend node status
- */
+ BACKEND_INFO(i).backend_status = CON_DOWN;
+ my_backend_status[i] = &(BACKEND_INFO(i).backend_status);
+ reload_maste_node_id = true;
+ node_status_was_changed_to_down = true;
ereport(LOG,
- (errmsg("I am the master watchdog node"),
- errdetail("using the local backend node status")));
+ (errmsg("backend:%d is set to down status", i),
+ errdetail("backend:%d is DOWN on cluster master \"%s\"",i,backendStatus->nodeName)));
+ down_node_ids[down_node_ids_index++] = i;
}
- else
+ }
+ else if (backendStatus->backend_status[i] == CON_CONNECT_WAIT ||
+ backendStatus->backend_status[i] == CON_UP)
+ {
+ if (BACKEND_INFO(i).backend_status != CON_CONNECT_WAIT)
{
- int i;
- bool reload_maste_node_id = false;
- ereport(LOG,
- (errmsg("master watchdog node \"%s\" returned status for %d backend nodes",backendStatus->nodeName,backendStatus->node_count)));
+ if (BACKEND_INFO(i).backend_status == CON_DOWN)
+ node_status_was_changed_to_up = true;
+
+ BACKEND_INFO(i).backend_status = CON_CONNECT_WAIT;
+ my_backend_status[i] = &(BACKEND_INFO(i).backend_status);
+ reload_maste_node_id = true;
+
ereport(LOG,
- (errmsg("primary node on master watchdog node \"%s\" is %d",backendStatus->nodeName,backendStatus->primary_node_id)));
+ (errmsg("backend:%d is set to UP status", i),
+ errdetail("backend:%d is UP on cluster master \"%s\"",i,backendStatus->nodeName)));
- Req_info->primary_node_id = backendStatus->primary_node_id;
+ }
+ }
+ }
+ pfree(backendStatus);
- for (i = 0; i < backendStatus->node_count; i++)
- {
- if (backendStatus->backend_status[i] == CON_DOWN)
- {
- if (BACKEND_INFO(i).backend_status != CON_DOWN)
- {
+ if (reload_maste_node_id)
+ {
+ Req_info->master_node_id = get_next_master_node();
+ }
- BACKEND_INFO(i).backend_status = CON_DOWN;
- my_backend_status[i] = &(BACKEND_INFO(i).backend_status);
- reload_maste_node_id = true;
- ereport(LOG,
- (errmsg("backend status from \"%s\" backend:%d is set to down status",backendStatus->nodeName, i)));
- }
- }
- else if (backendStatus->backend_status[i] == CON_CONNECT_WAIT ||
- backendStatus->backend_status[i] == CON_UP)
+ /* We don't need to do anything else if the Pgpool-II is starting up */
+ if (processState == INITIALIZING)
+ return;
+
+ /*
+ * Decide if All or subset of the Pgpool-II children needs immidiate
+ * restart or we can do that after finishing the current session
+ *
+ * Check if there was no change at all
+ */
+ if (node_status_was_changed_to_up == false &&
+ node_status_was_changed_to_down == false &&
+ primary_changed == false)
+ {
+ ereport(LOG,
+ (errmsg("backend nodes status remains same after the sync from \"%s\"",backendStatus->nodeName)));
+ return;
+ }
+ if (!STREAM)
+ {
+ /* If we are not in streaming replication mode
+ * restart all child processes
+ */
+ ereport(LOG,
+ (errmsg("node status was chenged after the sync from \"%s\"",backendStatus->nodeName),
+ errdetail("all children needs to be restarted as we are not in streaming replication mode")));
+ need_to_restart_children = true;
+ partial_restart = false;
+ }
+ else if (primary_changed)
+ {
+ /* if Primary node was changed, We should restart all
+ * children
+ */
+ need_to_restart_children = true;
+ partial_restart = false;
+ ereport(LOG,
+ (errmsg("primary node was chenged after the sync from \"%s\"",backendStatus->nodeName),
+ errdetail("all children needs to be restarted")));
+
+ }
+ else
+ {
+ if (node_status_was_changed_to_down == false)
+ {
+ /* no node was detached, So no need to restart
+ * any child process
+ */
+ need_to_restart_children = false;
+ partial_restart = false;
+ ereport(LOG,
+ (errmsg("No backend node was detached because of backend status sync from \"%s\"",backendStatus->nodeName),
+ errdetail("no need to restart children")));
+ }
+ else
+ {
+ ereport(LOG,
+ (errmsg("%d backend node(s) were detached because of backend status sync from \"%s\"",down_node_ids_index,backendStatus->nodeName),
+ errdetail("restarting the children processes")));
+
+ need_to_restart_children = true;
+ partial_restart = !check_all_backend_down();
+ }
+ }
+
+ /* Kill children and restart them if needed */
+ if (need_to_restart_children)
+ {
+ for (i=0;i<pool_config->num_init_children;i++)
+ {
+ bool restart = false;
+
+ if (partial_restart)
+ {
+ int j, k;
+ for (j=0;j<pool_config->max_pool;j++)
+ {
+ for (k=0;k<NUM_BACKENDS;k++)
{
- if (BACKEND_INFO(i).backend_status != CON_CONNECT_WAIT)
+ int idx;
+ ConnectionInfo *con = pool_coninfo(i, j, k);
+ for (idx = 0; idx < down_node_ids_index; idx ++)
{
- BACKEND_INFO(i).backend_status = CON_CONNECT_WAIT;
- my_backend_status[i] = &(BACKEND_INFO(i).backend_status);
- reload_maste_node_id = true;
+ int node_id = down_node_ids[idx];
+ if (con->connected && con->load_balancing_node == node_id)
+ {
+ ereport(LOG,
+ (errmsg("child process with PID:%d needs restart, because pool %d uses backend %d",
+ process_info[i].pid, j, node_id)));
+ restart = true;
+ break;
+ }
+ if (restart)
+ break;
}
}
}
+ }
+ else
+ {
+ restart = true;
+ }
- if (reload_maste_node_id)
+ if (restart)
+ {
+ if (process_info[i].pid)
{
- Req_info->master_node_id = get_next_master_node();
+ kill(process_info[i].pid, SIGQUIT);
+
+ process_info[i].pid = fork_a_child(fds, i);
+ process_info[i].start_time = time(NULL);
}
}
- pfree(backendStatus);
+ else
+ process_info[i].need_to_restart = 1;
}
- else
+ }
+
+ else
+ {
+ /* Set restart request to each child. Children will exit(1)
+ * whenever they are convenient.
+ */
+ for (i=0;i<pool_config->num_init_children;i++)
{
- ereport(WARNING,
- (errmsg("failed to get the backend status from the master watchdog node"),
- errdetail("using the local backend node status")));
+ process_info[i].need_to_restart = 1;
}
}
}
{WD_INFORM_I_AM_GOING_DOWN, "INFORM I AM GOING DOWN"},
{WD_ASK_FOR_POOL_CONFIG, "ASK FOR POOL CONFIG"},
{WD_POOL_CONFIG_DATA, "CONFIG DATA"},
- {WD_GET_MASTER_DATA_REQUEST, "DATA REQUEST"},
+ {WD_GET_MASTER_DATA_REQUEST, "DATA REQUEST FOR MASTER"},
+ {WD_GET_RUNTIME_VARIABLE_VALUE, "GET WD RUNTIME VARIABLE VALUE"},
{WD_CMD_REPLY_IN_DATA, "COMMAND REPLY IN DATA"},
{WD_FAILOVER_LOCKING_REQUEST,"FAILOVER LOCKING REQUEST"},
{WD_CLUSTER_SERVICE_MESSAGE, "CLUSTER SERVICE MESSAGE"},
static IPC_CMD_PREOCESS_RES process_IPC_command(WDCommandData* ipcCommand);
static IPC_CMD_PREOCESS_RES process_IPC_nodeStatusChange_command(WDCommandData* ipcCommand);
static IPC_CMD_PREOCESS_RES process_IPC_nodeList_command(WDCommandData* ipcCommand);
+static IPC_CMD_PREOCESS_RES process_IPC_get_runtime_variable_value_request(WDCommandData* ipcCommand);
static IPC_CMD_PREOCESS_RES process_IPC_online_recovery(WDCommandData* ipcCommand);
static IPC_CMD_PREOCESS_RES process_IPC_failover_locking_cmd(WDCommandData *ipcCommand);
static IPC_CMD_PREOCESS_RES process_IPC_data_request_from_master(WDCommandData *ipcCommand);
static void wd_check_config(void);
static pid_t watchdog_main(void);
static pid_t fork_watchdog_child(void);
-static void cluster_in_stable_state(void);
static bool check_IPC_client_authentication(json_value *rootObj, bool internal_client_only);
static bool check_and_report_IPC_authentication(WDCommandData* ipcCommand);
case WD_GET_MASTER_DATA_REQUEST:
return process_IPC_data_request_from_master(ipcCommand);
+ case WD_GET_RUNTIME_VARIABLE_VALUE:
+ return process_IPC_get_runtime_variable_value_request(ipcCommand);
default:
ipcCommand->errorMessage = MemoryContextStrdup(ipcCommand->memoryContext,"unknown IPC command type");
break;
}
+static IPC_CMD_PREOCESS_RES process_IPC_get_runtime_variable_value_request(WDCommandData* ipcCommand)
+{
+ /* get the json for node list */
+ JsonNode* jNode = NULL;
+ char* requestVarName = NULL;
+
+ if (ipcCommand->sourcePacket.len <= 0 || ipcCommand->sourcePacket.data == NULL)
+ return IPC_CMD_ERROR;
+
+ json_value *root = json_parse(ipcCommand->sourcePacket.data,ipcCommand->sourcePacket.len);
+ /* The root node must be object */
+ if (root == NULL || root->type != json_object)
+ {
+ json_value_free(root);
+ ereport(NOTICE,
+ (errmsg("failed to process get local variable IPC command"),
+ errdetail("unable to parse json data")));
+ return IPC_CMD_ERROR;
+ }
+
+ requestVarName = json_get_string_value_for_key(root, WD_JSON_KEY_VARIABLE_NAME);
+
+ if (requestVarName == NULL)
+ {
+ json_value_free(root);
+ ipcCommand->errorMessage = MemoryContextStrdup(ipcCommand->memoryContext,
+ "requested variable name is null");
+ return IPC_CMD_ERROR;
+ }
+
+ jNode = jw_create_with_object(true);
+
+ if (strcasecmp(WD_RUNTIME_VAR_WD_STATE, requestVarName) == 0)
+ {
+ jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA_TYPE, VALUE_DATA_TYPE_INT);
+ jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA, g_cluster.localNode->state);
+ }
+ else if (strcasecmp(WD_RUNTIME_VAR_QUORUM_STATE, requestVarName) == 0)
+ {
+ jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA_TYPE, VALUE_DATA_TYPE_INT);
+ jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA, g_cluster.quorum_status);
+ }
+ else if (strcasecmp(WD_RUNTIME_VAR_ESCALATION_STATE, requestVarName) == 0)
+ {
+ jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA_TYPE, VALUE_DATA_TYPE_BOOL);
+ jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA, g_cluster.escalated);
+ }
+ else
+ {
+ json_value_free(root);
+ jw_destroy(jNode);
+ ipcCommand->errorMessage = MemoryContextStrdup(ipcCommand->memoryContext,
+ "unknown variable requested");
+ return IPC_CMD_ERROR;
+ }
+
+ jw_finish_document(jNode);
+ json_value_free(root);
+ write_ipc_command_with_result_data(ipcCommand, WD_IPC_CMD_RESULT_OK,
+ jw_get_json_string(jNode), jw_get_json_length(jNode) +1);
+ jw_destroy(jNode);
+ return IPC_CMD_COMPLETE;
+}
+
static IPC_CMD_PREOCESS_RES process_IPC_nodeList_command(WDCommandData* ipcCommand)
{
/* get the json for node list */
syncRequestType = json_get_string_value_for_key(root, "SyncRequestType");
json_get_int_value_for_key(root, "FailoverLockID", &failoverLockID);
json_get_int_value_for_key(root, "WDFailoverID", (int*)&failoverID);
- if (syncRequestType == false)
+ if (syncRequestType == NULL)
{
ereport(LOG,
(errmsg("unable to process failover command lock request from %s",
}
-static void cluster_in_stable_state(void)
-{
- if (g_cluster.clusterInitialized == false)
- {
- g_cluster.clusterInitialized = true;
- /* Inform the parent */
- kill(getppid(), SIGUSR2);
- }
-}
-
static void update_interface_status(void)
{
struct ifaddrs *ifAddrStruct=NULL;
errdetail("our declare coordinator message is accepted by all nodes")));
g_cluster.masterNode = g_cluster.localNode;
- cluster_in_stable_state();
+ register_watchdog_state_change_interupt();
/*
* Check if the quorum is present then start the escalation process
if (clusterCommand->commandStatus == COMMAND_FINISHED_ALL_REPLIED ||
clusterCommand->commandStatus == COMMAND_FINISHED_TIMEOUT)
{
- cluster_in_stable_state();
+ register_watchdog_state_change_interupt();
ereport(LOG,
(errmsg("successfully joined the watchdog cluster as standby node"),
/*
- * sets the state of local watchdog node, and fires an state change event
+ * sets the state of local watchdog node, and fires a state change event
* if the new and old state differes
*/
static int set_state(WD_STATES newState)
case WD_NODE_STATUS_CHANGE_COMMAND:
case WD_REGISTER_FOR_NOTIFICATION:
case WD_GET_NODES_LIST_COMMAND:
+ case WD_GET_RUNTIME_VARIABLE_VALUE:
internal_client_only = false;
break;
* used to identify the ipc internal
* clients
*/
-
void wd_ipc_initialize_data(void)
{
if (watchdog_ipc_address == NULL)
}
}
+WD_STATES get_watchdog_local_node_state(void)
+{
+ WD_STATES ret = WD_DEAD;
+ WDGenericData *state = get_wd_runtime_variable_value(WD_RUNTIME_VAR_WD_STATE);
+ if (state == NULL)
+ {
+ ereport(LOG,
+ (errmsg("failed to get current state of local watchdog node"),
+ errdetail("get runtime variable value from watchdog returned no data")));
+ return WD_DEAD;
+ }
+ if (state->valueType != VALUE_DATA_TYPE_INT)
+ {
+ ereport(LOG,
+ (errmsg("failed to get current state of local watchdog node"),
+ errdetail("get runtime variable value from watchdog returned invalid value type")));
+ pfree(state);
+ return WD_DEAD;
+ }
+ ret = (WD_STATES)state->data.intVal;
+ pfree(state);
+ return ret;
+}
+
char* get_watchdog_ipc_address(void)
{
return watchdog_ipc_address;
return result;
}
+/*
+ * Function gets the runtime value of watchdog varibale using the
+ * watchdog IPC
+ */
+WDGenericData *get_wd_runtime_variable_value(char *varName)
+{
+ unsigned int *shared_key = get_ipc_shared_key();
+ char *data = get_simple_request_json(WD_JSON_KEY_VARIABLE_NAME,varName,
+ shared_key?*shared_key:0,pool_config->wd_authkey);
+
+ WDIPCCmdResult *result = issue_command_to_watchdog(WD_GET_RUNTIME_VARIABLE_VALUE,
+ WD_DEFAULT_IPC_COMMAND_TIMEOUT,
+ data, strlen(data), true);
+ pfree(data);
+
+ if (result == NULL)
+ {
+ ereport(WARNING,
+ (errmsg("get runtime variable value from watchdog failed"),
+ errdetail("issue command to watchdog returned NULL")));
+ return NULL;
+ }
+ if (result->type == WD_IPC_CMD_CLUSTER_IN_TRAN)
+ {
+ ereport(WARNING,
+ (errmsg("get runtime variable value from watchdog failed"),
+ errdetail("watchdog cluster is not in stable state"),
+ errhint("try again when the cluster is fully initialized")));
+ FreeCmdResult(result);
+ return NULL;
+ }
+ else if (result->type == WD_IPC_CMD_TIMEOUT)
+ {
+ ereport(WARNING,
+ (errmsg("get runtime variable value from watchdog failed"),
+ errdetail("ipc command timeout")));
+ FreeCmdResult(result);
+ return NULL;
+ }
+ else if (result->type == WD_IPC_CMD_RESULT_OK)
+ {
+ json_value *root = NULL;
+ WDGenericData *genData = NULL;
+ WDValueDataType dayaType;
+
+ root = json_parse(result->data, result->length);
+ /* The root node must be object */
+ if (root == NULL || root->type != json_object)
+ {
+ FreeCmdResult(result);
+ return NULL;
+ }
+
+ if (json_get_int_value_for_key(root, WD_JSON_KEY_VALUE_DATA_TYPE, (int*)&dayaType))
+ {
+ FreeCmdResult(result);
+ json_value_free(root);
+ return NULL;
+ }
+
+ switch (dayaType) {
+ case VALUE_DATA_TYPE_INT:
+ {
+ int intVal;
+ if (json_get_int_value_for_key(root, WD_JSON_KEY_VALUE_DATA, &intVal))
+ {
+ ereport(WARNING,
+ (errmsg("get runtime variable value from watchdog failed"),
+ errdetail("unable to get INT value from JSON data returned by watchdog")));
+ }
+ else
+ {
+ genData = palloc(sizeof(WDGenericData));
+ genData->valueType = dayaType;
+ genData->data.intVal = intVal;
+ }
+ }
+ break;
+
+ case VALUE_DATA_TYPE_LONG:
+ {
+ long longVal;
+ if (json_get_long_value_for_key(root, WD_JSON_KEY_VALUE_DATA, &longVal))
+ {
+ ereport(WARNING,
+ (errmsg("get runtime variable value from watchdog failed"),
+ errdetail("unable to get LONG value from JSON data returned by watchdog")));
+ }
+ else
+ {
+ genData = palloc(sizeof(WDGenericData));
+ genData->valueType = dayaType;
+ genData->data.longVal = longVal;
+ }
+ }
+ break;
+
+ case VALUE_DATA_TYPE_BOOL:
+ {
+ bool boolVal;
+ if (json_get_bool_value_for_key(root, WD_JSON_KEY_VALUE_DATA, &boolVal))
+ {
+ ereport(WARNING,
+ (errmsg("get runtime variable value from watchdog failed"),
+ errdetail("unable to get BOOL value from JSON data returned by watchdog")));
+ }
+ else
+ {
+ genData = palloc(sizeof(WDGenericData));
+ genData->valueType = dayaType;
+ genData->data.boolVal = boolVal;
+ }
+ }
+ break;
+
+ case VALUE_DATA_TYPE_STRING:
+ {
+ char *ptr = json_get_string_value_for_key(root, WD_JSON_KEY_VALUE_DATA);
+ if (ptr == NULL)
+ {
+ ereport(WARNING,
+ (errmsg("get runtime variable value from watchdog failed"),
+ errdetail("unable to get STRING value from JSON data returned by watchdog")));
+ }
+ else
+ {
+ genData = palloc(sizeof(WDGenericData));
+ genData->valueType = dayaType;
+ genData->data.stringVal = pstrdup(ptr);
+ }
+ }
+ break;
+
+ default:
+ ereport(WARNING,
+ (errmsg("get runtime variable value from watchdog failed, unknown value data type")));
+ break;
+ }
+
+ json_value_free(root);
+ FreeCmdResult(result);
+ return genData;
+ }
+
+ ereport(WARNING,
+ (errmsg("get runtime variable value from watchdog failed")));
+ FreeCmdResult(result);
+ return NULL;
+
+}
+
/*
* function gets the PG backend status of all attached nodes from
* the master watchdog node.