{
IPC_CMD_COMPLETE,
IPC_CMD_PROCESSING,
- IPC_CMD_ERROR
+ IPC_CMD_ERROR,
+ IPC_CMD_OK,
+ IPC_CMD_TRY_AGAIN
}IPC_CMD_PREOCESS_RES;
#define BEACON_MESSAGE_INTERVAL_SECONDS 10 /* interval between beacon messages */
+#define MAX_SECS_WAIT_FOR_REPLY_FROM_NODE 5 /* time in seconds to wait for the reply from
+ * remote watchdog node
+ */
+
#define WD_NO_MESSAGE 0
#define WD_ACCEPT_MESSAGE 'G'
#define WD_INFO_MESSAGE 'I'
#define WD_JOIN_COORDINATOR_MESSAGE 'J'
-#define WD_INTERLOCKING_REQUEST 'L'
#define WD_IAM_COORDINATOR_MESSAGE 'M'
#define WD_IAM_IN_NW_TROUBLE_MESSAGE 'N'
#define WD_QUORUM_IS_LOST 'Q'
#define WD_REJECT_MESSAGE 'R'
#define WD_STAND_FOR_COORDINATOR_MESSAGE 'S'
-#define WD_INTERUNLOCKING_REQUEST 'U'
-#define WD_REPLICATE_VARIABLE_REQUEST 'V'
+#define WD_REMOTE_FAILOVER_REQUEST 'V'
#define WD_INFORM_I_AM_GOING_DOWN 'X'
#define WD_ASK_FOR_POOL_CONFIG 'Y'
#define WD_POOL_CONFIG_DATA 'Z'
+#define WD_CMD_REPLY_IN_DATA '-'
typedef struct packet_types
{
{WD_ACCEPT_MESSAGE, "ACCEPT"},
{WD_INFO_MESSAGE, "NODE INFO"},
{WD_JOIN_COORDINATOR_MESSAGE, "JOIN COORDINATOR"},
- {WD_INTERLOCKING_REQUEST, "INTERLOCKING REQUEST"},
{WD_IAM_COORDINATOR_MESSAGE, "IAM COORDINATOR"},
{WD_IAM_IN_NW_TROUBLE_MESSAGE, "I AM IN NETWORK TROUBLE"},
{WD_QUORUM_IS_LOST, "QUORUM IS LOST"},
{WD_REJECT_MESSAGE, "REJECT"},
{WD_STAND_FOR_COORDINATOR_MESSAGE, "STAND FOR COORDINATOR"},
-
- {WD_INTERUNLOCKING_REQUEST, "INTERUNLOCKING REQUEST"},
- {WD_REPLICATE_VARIABLE_REQUEST, "REPLICATE VARIABLE REQUEST"},
+ {WD_REMOTE_FAILOVER_REQUEST, "REPLICATE FAILOVER REQUEST"},
+ {WD_IPC_ONLINE_RECOVERY_COMMAND, "ONLINE RECOVERY REQUEST"},
+ {WD_IPC_FAILOVER_COMMAND, "FAILOVER FUNCTION COMMAND"},
{WD_INFORM_I_AM_GOING_DOWN, "INFORM I AM GOING DOWN"},
{WD_ASK_FOR_POOL_CONFIG, "ASK FOR POOL CONFIG"},
{WD_POOL_CONFIG_DATA, "CONFIG DATA"},
{WD_GET_MASTER_DATA_REQUEST, "DATA REQUEST"},
-
+ {WD_CMD_REPLY_IN_DATA, "COMMAND REPLY IN DATA"},
+ {WD_FAILOVER_LOCKING_REQUEST,"FAILOVER LOCKING REQUEST"},
{WD_NO_MESSAGE,""}
};
"LOST",
"IN NETWORK TROUBLE",
"SHUTDOWN",
- "ADD MESSAGE SENT"};
+ "ADD MESSAGE SENT"
+};
typedef struct WDPacketData
{
char* result_data;
}WDCommandNodeResult;
-
-typedef struct WDIPCCommandData
+typedef enum WDCommandSource
{
- MemoryContext memoryContext;
- int issueing_sock;
- char type;
- struct timeval issue_time;
- unsigned int internal_command_id;
- int data_len;
- char *data_buf;
-
- unsigned int sendTo_count;
- unsigned int reply_from_count;
- unsigned int timeout_secs;
-
- WDCommandNodeResult* nodeResults;
-}WDIPCCommandData;
+ COMMAND_SOURCE_IPC,
+ COMMAND_SOURCE_LOCAL,
+ COMMAND_SOURCE_REMOTE,
+ COMMAND_SOURCE_INTERNAL
+}WDCommandSource;
typedef struct WDFunctionCommandData
{
COMMAND_IN_PROGRESS,
COMMAND_FINISHED_TIMEOUT,
COMMAND_FINISHED_ALL_REPLIED,
- COMMAND_FINISHED_NODE_REJECTED
+ COMMAND_FINISHED_NODE_REJECTED,
+ COMMAND_FINISHED_SEND_FAILED
}WDCommandStatus;
typedef struct WDCommandData
{
- WDPacketData packet;
+ WDPacketData sourcePacket;
+ WDPacketData commandPacket;
WDCommandNodeResult *nodeResults;
WatchdogNode *sendToNode; /* NULL means send to all */
WDCommandStatus commandStatus;
unsigned int commandTimeoutSecs;
struct timeval commandTime;
unsigned int commandSendToCount;
+ unsigned int commandSendToErrorCount;
unsigned int commandReplyFromCount;
- int commandFinished;
- int partial_sent;
+ WDCommandSource commandSource;
+ int sourceIPCSocket; /* Only valid for COMMAND_SOURCE_IPC */
+ WatchdogNode *sourceWdNode; /* Only valid for COMMAND_SOURCE_REMOTE */
+ char *errorMessage;
+ MemoryContext memoryContext;
+ void (*commandCompleteFunc)(struct WDCommandData* command);
}WDCommandData;
typedef struct WDInterfaceStatus
WatchdogNode* localNode;
WatchdogNode* remoteNodes;
WatchdogNode* masterNode;
- WatchdogNode* lockHolderNode;
InterlockingNode interlockingNode;
int remoteNodeCount;
int aliveNodeCount;
List *ipc_commands;
List *wd_timer_commands;
List *wdInterfaceToMonitor;
+ List *wdCurrentFailovers;
}wd_cluster;
+typedef struct WDFailoverObject
+{
+ int id;
+ POOL_REQUEST_KIND reqKind;
+ int nodesCount;
+ unsigned int failoverID;
+ int *nodeList;
+ WatchdogNode* wdRequestingNode;
+ struct timeval startTime;
+ int state;
+}WDFailoverObject;
+
+
+static void process_remote_failover_command_on_coordinator(WatchdogNode* wdNode, WDPacketData* pkt);
+static WDFailoverObject* get_failover_object(POOL_REQUEST_KIND reqKind, int nodesCount, int *nodeList);
+static WDFailoverObject* get_failover_object_by_id(unsigned int failoverID);
+static bool does_int_array_contains_value(int *intArray, int count, int value);
+static bool remove_failover_object_by_id(unsigned int failoverID);
+
+static int send_command_packet_to_remote_nodes(WDCommandData* ipcCommand, bool source_included);
+static void wd_command_is_complete(WDCommandData* ipcCommand);
+static IPC_CMD_PREOCESS_RES wd_command_processor_for_node_lost_event(WDCommandData* ipcCommand, WatchdogNode* wdLostNode);
+static bool is_cluster_command_in_progress(void);
+
volatile sig_atomic_t reload_config_signal = 0;
volatile sig_atomic_t sigchld_request = 0;
static bool connect_to_node(WatchdogNode* wdNode);
static bool is_socket_connection_connected(SocketConnection* conn);
+static void service_unreachable_nodes(void);
+
+static void allocate_resultNodes_in_IPCCommand(WDCommandData* ipcCommand);
+static bool is_node_active_and_reachable(WatchdogNode* wdNode);
+static bool is_node_active(WatchdogNode* wdNode);
+static bool is_node_reachable(WatchdogNode* wdNode);
+
static int update_successful_outgoing_cons(fd_set* wmask, int pending_fds_count);
static int prepare_fds(fd_set* rmask, fd_set* wmask, fd_set* emask);
-static WDPacketData* get_addnode_message(void);
-static WDPacketData* get_mynode_info_message(WDPacketData* replyFor);
-static WDPacketData* get_minimum_message(char type, WDPacketData* replyFor);
-
static void set_next_commandID_in_message(WDPacketData* pkt);
static void set_message_commandID(WDPacketData* pkt, unsigned int commandID);
static void set_message_data(WDPacketData* pkt, const char* data, int len);
static WDPacketData* read_packet_of_type(SocketConnection* conn, char ensure_type);
static WDPacketData* read_packet(SocketConnection* conn);
static WDPacketData* get_message_of_type(char type);
+static WDPacketData* get_addnode_message(void);
+static WDPacketData* get_mynode_info_message(WDPacketData* replyFor);
+static WDPacketData* get_minimum_message(char type, WDPacketData* replyFor);
+
static int issue_watchdog_internal_command(WatchdogNode* wdNode, WDPacketData *pkt, int timeout_sec);
-static char get_current_command_resultant_message_type(void);
static void check_for_current_command_timeout(void);
static bool watchdog_internal_command_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt);
static bool service_lost_connections(void);
+static void service_ipc_commands(void);
static void service_internal_command(void);
static unsigned int get_next_commandID(void);
static int update_quorum_status(void);
static int get_mimimum_nodes_required_for_quorum(void);
-static bool write_packet_to_socket(int sock, WDPacketData* pkt);
+static bool write_packet_to_socket(int sock, WDPacketData* pkt, bool ipcPacket);
static int read_sockets(fd_set* rmask,int pending_fds_count);
static void set_timeout(unsigned int sec);
static int wd_create_command_server_socket(void);
static int watchdog_state_machine(WD_EVENTS event, WatchdogNode* wdNode, WDPacketData* pkt);
static int watchdog_state_machine_nw_error(WD_EVENTS event, WatchdogNode* wdNode, WDPacketData* pkt);
-static void cleanUpIPCCommand(WDIPCCommandData* ipcCommand);
-static bool read_ipc_command_and_process(int socket, bool *remove_socket);
+static void cleanUpIPCCommand(WDCommandData* ipcCommand);
+static bool read_ipc_socket_and_process(int socket, bool *remove_socket);
static JsonNode* get_node_list_json(int id);
static bool add_nodeinfo_to_json(JsonNode* jNode, WatchdogNode* node);
static void resign_from_escalated_node(void);
static void start_escalated_node(void);
static void init_wd_packet(WDPacketData* pkt);
+static void wd_packet_shallow_copy(WDPacketData* srcPkt, WDPacketData* dstPkt);
static bool wd_commands_packet_processor(WD_EVENTS event, WatchdogNode* wdNode, WDPacketData* pkt);
-static WDIPCCommandData* get_wd_IPC_command_from_reply(WDPacketData* pkt);
-static WDIPCCommandData* get_wd_IPC_command_from_socket(int sock);
+static WDCommandData* get_wd_IPC_command_from_reply(WDPacketData* pkt);
+static WDCommandData* get_wd_IPC_command_from_socket(int sock);
-static IPC_CMD_PREOCESS_RES process_IPC_command(WDIPCCommandData* ipcCommand);
-static IPC_CMD_PREOCESS_RES process_IPC_nodeStatusChange_command(WDIPCCommandData* IPCCommand);
-static IPC_CMD_PREOCESS_RES process_IPC_nodeList_command(WDIPCCommandData* IPCCommand);
-static IPC_CMD_PREOCESS_RES process_IPC_replicate_variable(WDIPCCommandData* IPCCommand);
-static IPC_CMD_PREOCESS_RES process_IPC_failover_cmd_synchronise(WDIPCCommandData *IPCCommand);
-static IPC_CMD_PREOCESS_RES process_IPC_data_request_from_master(WDIPCCommandData *IPCCommand);
-static IPC_CMD_PREOCESS_RES execute_replicate_command(WDIPCCommandData* ipcCommand);
-static bool write_ipc_command_with_result_data(WDIPCCommandData* IPCCommand, char type, char* data, int len);
+static IPC_CMD_PREOCESS_RES process_IPC_command(WDCommandData* ipcCommand);
+static IPC_CMD_PREOCESS_RES process_IPC_nodeStatusChange_command(WDCommandData* ipcCommand);
+static IPC_CMD_PREOCESS_RES process_IPC_nodeList_command(WDCommandData* ipcCommand);
+static IPC_CMD_PREOCESS_RES process_IPC_online_recovery(WDCommandData* ipcCommand);
+static IPC_CMD_PREOCESS_RES process_IPC_failover_locking_cmd(WDCommandData *ipcCommand);
+static IPC_CMD_PREOCESS_RES process_IPC_data_request_from_master(WDCommandData *ipcCommand);
+static IPC_CMD_PREOCESS_RES process_IPC_failover_command(WDCommandData* ipcCommand);
+static IPC_CMD_PREOCESS_RES process_IPC_failover_command_on_coordinator(WDCommandData* ipcCommand);
+static IPC_CMD_PREOCESS_RES process_failover_command_on_coordinator(WDCommandData* ipcCommand);
-static int node_has_requested_for_interlocking(WatchdogNode* wdNode, WDPacketData* pkt);
-static bool node_has_resigned_from_interlocking(WatchdogNode* wdNode, WDPacketData* pkt);
+static bool write_ipc_command_with_result_data(WDCommandData* ipcCommand, char type, char* data, int len);
static void process_wd_func_commands_for_timer_events(void);
static void add_wd_command_for_timer_events(unsigned int expire_secs, bool need_tics, WDFunctionCommandData* wd_func_command);
-static bool reply_is_received_for_pgpool_replicate_command(WatchdogNode* wdNode, WDPacketData* pkt, WDIPCCommandData* ipcCommand);
-static bool process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt, char* func_name, int node_count, int* node_id_list);
-static bool process_pgpool_replicate_command(WatchdogNode* wdNode, WDPacketData* pkt);
-
-static void process_failover_command_sync_requests(WatchdogNode* wdNode, WDPacketData* pkt, WDIPCCommandData* ipcCommand);
-static WDFailoverCMDResults node_is_asking_for_failover_end(WatchdogNode* wdNode, WDPacketData* pkt);
-static WDFailoverCMDResults node_is_asking_for_failover_start(WatchdogNode* wdNode, WDPacketData* pkt);
-static WDFailoverCMDResults node_is_asking_for_failover_lock_status(WatchdogNode* wdNode, WDPacketData* pkt, WDFailoverLock failoverLock);
-static WDFailoverCMDResults node_is_asking_for_failover_lock_release(WatchdogNode* wdNode, WDPacketData* pkt, WDFailoverLock failoverLock);
+static bool reply_is_received_for_pgpool_replicate_command(WatchdogNode* wdNode, WDPacketData* pkt, WDCommandData* ipcCommand);
+static void process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt, char* func_name, int node_count, int* node_id_list, unsigned int failover_id);
+static void process_pgpool_remote_failover_command(WatchdogNode* wdNode, WDPacketData* pkt);
+static void process_remote_online_recovery_command(WatchdogNode* wdNode, WDPacketData* pkt);
+
+
+static IPC_CMD_PREOCESS_RES process_failover_locking_requests_on_cordinator(WDCommandData* ipcCommand);
+static WDFailoverCMDResults node_is_asking_for_failover_end(WatchdogNode* wdNode, WDPacketData* pkt, unsigned int failoverID);
+static WDFailoverCMDResults node_is_asking_for_failover_start(WatchdogNode* wdNode, WDPacketData* pkt, unsigned int failoverID);
+static WDFailoverCMDResults node_is_asking_for_failover_lock_status(WatchdogNode* wdNode, WDPacketData* pkt,
+ WDFailoverLock failoverLock, unsigned int failoverID);
+static WDFailoverCMDResults node_is_asking_for_failover_lock_release(WatchdogNode* wdNode, WDPacketData* pkt,
+ WDFailoverLock failoverLock, unsigned int failoverID);
static void wd_system_will_go_down(int code, Datum arg);
static void verify_pool_configurations(WatchdogNode* wdNode, POOL_CONFIG* config);
static pid_t fork_watchdog_child(void);
static void cluster_in_stable_state(void);
static bool check_IPC_client_authentication(json_value *rootObj, bool internal_client_only);
-static bool check_and_report_IPC_authentication(WDIPCCommandData* ipcCommand);
+static bool check_and_report_IPC_authentication(WDCommandData* ipcCommand);
static void print_received_packet_info(WDPacketData* pkt,WatchdogNode* wdNode);
static void update_interface_status(void);
g_cluster.notify_clients = NULL;
g_cluster.ipc_command_socks = NULL;
g_cluster.wd_timer_commands = NULL;
+ g_cluster.wdCurrentFailovers = NULL;
+ g_cluster.ipc_commands = NULL;
g_cluster.localNode->state = WD_DEAD;
g_cluster.ipc_auth_needed = strlen(pool_config->wd_authkey)?true:false;
if (wdNode->client_socket.sock_state != WD_SOCK_WAITING_FOR_CONNECT && wdNode->client_socket.sock_state != WD_SOCK_CONNECTED &&
wdNode->server_socket.sock_state != WD_SOCK_WAITING_FOR_CONNECT && wdNode->server_socket.sock_state != WD_SOCK_CONNECTED)
{
- wdNode->state = WD_DEAD;
+ if (wdNode->state == WD_SHUTDOWN)
+ continue;
connect_to_node(wdNode);
+ if (wdNode->client_socket.sock_state == WD_SOCK_CONNECTED)
+ {
+ ereport(LOG,
+ (errmsg("connection to the remote node \"%s\" is restored",wdNode->nodeName)));
+ watchdog_state_machine(WD_EVENT_NEW_OUTBOUND_CONNECTION, wdNode, NULL);
+ }
}
}
}
processed_fds += update_successful_outgoing_cons(&wmask,(select_ret - processed_fds));
processed_fds += read_sockets(&rmask,(select_ret - processed_fds));
}
+ if (WD_TIME_DIFF_SEC(ref_time,g_tm_set_time) >= 1)
+ {
+ process_wd_func_commands_for_timer_events();
+ }
if (timeout_event)
+ {
+ g_timeout_sec = 0;
watchdog_state_machine(WD_EVENT_TIMEOUT, NULL, NULL);
- if (WD_TIME_DIFF_SEC(ref_time,g_tm_set_time) >= 1)
- process_wd_func_commands_for_timer_events();
+ }
check_for_current_command_timeout();
-
+
if (service_lost_connections() == true)
+ {
service_internal_command();
+ service_ipc_commands();
+ }
+
+ service_unreachable_nodes();
update_connected_node_count();
}
{
ereport(DEBUG2,
(errmsg("client socket of %s is ready for reading", wdNode->nodeName)));
-
+
WDPacketData* pkt = read_packet(&wdNode->client_socket);
if (pkt)
{
watchdog_state_machine(WD_EVENT_PACKET_RCV, wdNode, pkt);
+ /* since a packet is received reset last sent time */
+ wdNode->last_sent_time.tv_sec = 0;
+ wdNode->last_sent_time.tv_usec = 0;
free_packet(pkt);
}
+ else
+ {
+ ereport(LOG,
+ (errmsg("client socket of %s is closed", wdNode->nodeName)));
+ }
+
count++;
if (count >= pending_fds_count)
return count;
if (pkt)
{
watchdog_state_machine(WD_EVENT_PACKET_RCV, wdNode, pkt);
+ /* since a packet is received reset last sent time */
+ wdNode->last_sent_time.tv_sec = 0;
+ wdNode->last_sent_time.tv_usec = 0;
free_packet(pkt);
}
-
+ else
+ {
+ ereport(LOG,
+ (errmsg("outbound socket of %s is closed", wdNode->nodeName)));
+ }
+
count++;
if (count >= pending_fds_count)
return count;
}
}
}
-
+
foreach(lc, g_cluster.unidentified_socks)
{
SocketConnection *conn = lfirst(lc);
print_watchdog_node_info(tempNode);
authenticated = verify_authhash_for_node(tempNode, authkey);
ereport(DEBUG1,
- (errmsg("ADD NODE MESSAGE from Hostname:\"%s\" PORT:%d pgpool_port:%d",tempNode->hostname,tempNode->wd_port,tempNode->pgpool_port)));
+ (errmsg("ADD NODE MESSAGE from hostname:\"%s\" port:%d pgpool_port:%d",tempNode->hostname,tempNode->wd_port,tempNode->pgpool_port)));
/* verify this node */
if (authenticated)
{
{
/* reply with node info message */
ereport(NOTICE,
- (errmsg("New node joined the cluster Hostname:\"%s\" PORT:%d pgpool_port:%d",tempNode->hostname,tempNode->wd_port,tempNode->pgpool_port)));
+ (errmsg("New node joined the cluster hostname:\"%s\" port:%d pgpool_port:%d",tempNode->hostname,tempNode->wd_port,tempNode->pgpool_port)));
watchdog_state_machine(WD_EVENT_PACKET_RCV, wdNode, pkt);
}
else
ereport(NOTICE,
- (errmsg("add node from Hostname:\"%s\" PORT:%d pgpool_port:%d rejected.",tempNode->hostname,tempNode->wd_port,tempNode->pgpool_port),
+ (errmsg("add node from hostname:\"%s\" port:%d pgpool_port:%d rejected.",tempNode->hostname,tempNode->wd_port,tempNode->pgpool_port),
errdetail("verify the other watchdog node configurations")));
-
}
else
{
ereport(NOTICE,
- (errmsg("authentication failed for add node from Hostname:\"%s\" PORT:%d pgpool_port:%d",tempNode->hostname,tempNode->wd_port,tempNode->pgpool_port),
+ (errmsg("authentication failed for add node from hostname:\"%s\" port:%d pgpool_port:%d",tempNode->hostname,tempNode->wd_port,tempNode->pgpool_port),
errdetail("make sure wd_authkey configuration is same on all nodes")));
}
if (found == false || authenticated == false)
{
/* reply with reject message, We do not need to go to state processor */
- /* For now, create a empty temp node. TODO*/
+ /* For now, create a empty temp node.*/
WatchdogNode tmpNode;
tmpNode.client_socket = *conn;
tmpNode.client_socket.sock_state = WD_SOCK_CONNECTED;
if (command_sock > 0 && FD_ISSET(command_sock, rmask))
{
bool remove_sock = false;
- read_ipc_command_and_process(command_sock, &remove_sock);
+ read_ipc_socket_and_process(command_sock, &remove_sock);
if (remove_sock)
{
/* Also locate the command if it has this socket */
- WDIPCCommandData* ipcCommand = get_wd_IPC_command_from_socket(command_sock);
+ WDCommandData* ipcCommand = get_wd_IPC_command_from_socket(command_sock);
if (ipcCommand)
{
/* special case we want to remove the socket from
* ipc_command_sock list manually, so mark the issuing socket
* of ipcComman to invalid value
*/
- ipcCommand->issueing_sock = -1;
+ ipcCommand->sourceIPCSocket = -1;
}
close(command_sock);
socks_to_del = lappend_int(socks_to_del,command_sock);
if (notify_sock > 0 && FD_ISSET(notify_sock, rmask))
{
bool remove_sock = false;
- read_ipc_command_and_process(notify_sock, &remove_sock);
+ read_ipc_socket_and_process(notify_sock, &remove_sock);
if (remove_sock)
{
close(notify_sock);
if (read_interface_change_event(g_cluster.network_monitor_sock, &link_event, &deleted))
{
ereport(DEBUG1,
- (errmsg("Network event received"),
+ (errmsg("network event received"),
errdetail("deleted = %s Link change event = %s",
deleted?"YES":"NO",
link_event?"YES":"NO")));
return count;
}
-static bool write_ipc_command_with_result_data(WDIPCCommandData* IPCCommand, char type, char* data, int len)
+static bool write_ipc_command_with_result_data(WDCommandData* ipcCommand, char type, char* data, int len)
{
- int send_len = 0;
- if (socket_write(IPCCommand->issueing_sock, &type, 1) < 0)
- return false;
-
- if (len > 0)
+ WDPacketData pkt;
+ pkt.data = data;
+ pkt.len = len;
+ pkt.type = type;
+ if (ipcCommand == NULL || ipcCommand->commandSource != COMMAND_SOURCE_IPC || ipcCommand->sourceIPCSocket <= 0)
{
- send_len = htonl(len);
- if (socket_write(IPCCommand->issueing_sock, &send_len, sizeof(int)) < 0)
- return false;
- if (socket_write(IPCCommand->issueing_sock, data, len) < 0)
- return false;
- }
- else if (socket_write(IPCCommand->issueing_sock, &send_len, sizeof(int)) < 0)
+ ereport(DEBUG1,
+ (errmsg("not replying to IPC, Invalid IPC command.")));
return false;
- return true;
+ }
+ return write_packet_to_socket(ipcCommand->sourceIPCSocket, &pkt, true);
}
-static bool read_ipc_command_and_process(int sock, bool *remove_socket)
+static WDCommandData* create_command_object(int packet_data_length)
+{
+ MemoryContext mCxt, oldCxt;
+ WDCommandData* wdCommand;
+ /* wd command lives in its own memory context */
+ mCxt = AllocSetContextCreate(TopMemoryContext,
+ "WDCommand",
+ ALLOCSET_SMALL_MINSIZE,
+ ALLOCSET_SMALL_INITSIZE,
+ ALLOCSET_SMALL_MAXSIZE);
+ oldCxt = MemoryContextSwitchTo(mCxt);
+
+ wdCommand = palloc0(sizeof(WDCommandData));
+ wdCommand->memoryContext = mCxt;
+ if (packet_data_length > 0)
+ wdCommand->sourcePacket.data = palloc(packet_data_length);
+ wdCommand->commandPacket.type = WD_NO_MESSAGE;
+ wdCommand->sourcePacket.type = WD_NO_MESSAGE;
+ MemoryContextSwitchTo(oldCxt);
+ return wdCommand;
+}
+
+static bool read_ipc_socket_and_process(int sock, bool *remove_socket)
{
char type;
- IPC_CMD_PREOCESS_RES res;
int data_len,ret;
- WDIPCCommandData* IPCCommand = NULL;
+ WDCommandData* ipcCommand;
+ IPC_CMD_PREOCESS_RES res;
*remove_socket = true;
data_len = ntohl(data_len);
/* see if we have enough information to process this command */
- MemoryContext mCxt, oldCxt;
- mCxt = AllocSetContextCreate(TopMemoryContext,
- "WDIPCCommand",
- ALLOCSET_SMALL_MINSIZE,
- ALLOCSET_SMALL_INITSIZE,
- ALLOCSET_SMALL_MAXSIZE);
- oldCxt = MemoryContextSwitchTo(mCxt);
-
- IPCCommand = palloc0(sizeof(WDIPCCommandData));
-
- IPCCommand->issueing_sock = sock;
- IPCCommand->type = type;
- gettimeofday(&IPCCommand->issue_time, NULL);
+ ipcCommand = create_command_object(data_len);
+ ipcCommand->sourceIPCSocket = sock;
+ ipcCommand->commandSource = COMMAND_SOURCE_IPC;
+ ipcCommand->sourcePacket.type = type;
+ ipcCommand->sourcePacket.len = data_len;
+ gettimeofday(&ipcCommand->commandTime, NULL);
if (data_len > 0)
{
- IPCCommand->data_buf = palloc(data_len);
- if (socket_read(sock, IPCCommand->data_buf , data_len, 0) <= 0)
+ if (socket_read(sock, ipcCommand->sourcePacket.data , data_len, 0) <= 0)
{
ereport(LOG,
(errmsg("error reading IPC from socket"),
errdetail("read from socket failed with error \"%s\"",strerror(errno))));
- MemoryContextDelete(mCxt);
return false;
}
}
- else
- IPCCommand->data_buf = NULL;
-
- IPCCommand->nodeResults = NULL;
- IPCCommand->memoryContext = mCxt;
- IPCCommand->data_len = data_len;
- MemoryContextSwitchTo(oldCxt);
- res = process_IPC_command(IPCCommand);
+ res = process_IPC_command(ipcCommand);
if (res == IPC_CMD_PROCESSING)
{
/*
* The command still needs further processing
* store it in the list
*/
+ MemoryContext oldCxt;
*remove_socket = false;
oldCxt = MemoryContextSwitchTo(TopMemoryContext);
- g_cluster.ipc_commands = lappend(g_cluster.ipc_commands,IPCCommand);
+ g_cluster.ipc_commands = lappend(g_cluster.ipc_commands,ipcCommand);
MemoryContextSwitchTo(oldCxt);
return true;
}
- if (res == IPC_CMD_ERROR)
- ereport(NOTICE,
- (errmsg("error processing IPC from socket")));
-
- /* Delete the ipcCommand structure,
+ else if (res != IPC_CMD_COMPLETE)
+ {
+ char res_type;
+ char *data = NULL;
+ int data_len = 0;
+ switch (res) {
+ case IPC_CMD_TRY_AGAIN:
+ res_type = WD_IPC_CMD_CLUSTER_IN_TRAN;
+ break;
+ case IPC_CMD_ERROR:
+ ereport(NOTICE,
+ (errmsg("error processing IPC from socket")));
+ res_type = WD_IPC_CMD_RESULT_BAD;
+ break;
+ case IPC_CMD_OK:
+ res_type = WD_IPC_CMD_RESULT_OK;
+ break;
+ default:
+ res_type = WD_IPC_CMD_RESULT_BAD;
+ ereport(NOTICE,
+ (errmsg("unexpected IPC processing result")));
+ break;
+ }
+ if (ipcCommand->errorMessage)
+ {
+ data = get_wd_simple_message_json(ipcCommand->errorMessage);
+ data_len = strlen(data) + 1;
+ }
+
+ if (write_ipc_command_with_result_data(ipcCommand, res_type, data, data_len))
+ {
+ ereport(NOTICE,
+ (errmsg("error writing to IPC socket")));
+ }
+ if (data)
+ pfree(data);
+ }
+
+ /* Delete the Command structure,
* it is as simple as to delete the memory context
*/
- MemoryContextDelete(mCxt);
+ MemoryContextDelete(ipcCommand->memoryContext);
return (res != IPC_CMD_ERROR);
}
-static IPC_CMD_PREOCESS_RES process_IPC_command(WDIPCCommandData* ipcCommand)
+static IPC_CMD_PREOCESS_RES process_IPC_command(WDCommandData* ipcCommand)
{
/* authenticate the client first */
if (check_and_report_IPC_authentication(ipcCommand) == false)
{
+ /* authentication error is already reported to the caller*/
return IPC_CMD_ERROR;
}
- switch(ipcCommand->type)
+ switch(ipcCommand->sourcePacket.type)
{
case WD_NODE_STATUS_CHANGE_COMMAND:
case WD_REGISTER_FOR_NOTIFICATION:
/* Add this socket to the notify socket list*/
- g_cluster.notify_clients = lappend_int(g_cluster.notify_clients, ipcCommand->issueing_sock);
+ g_cluster.notify_clients = lappend_int(g_cluster.notify_clients, ipcCommand->sourceIPCSocket);
/* The command is completed successfully */
return IPC_CMD_COMPLETE;
break;
return process_IPC_nodeList_command(ipcCommand);
break;
- case WD_FUNCTION_COMMAND:
- return process_IPC_replicate_variable(ipcCommand);
+ case WD_IPC_FAILOVER_COMMAND:
+ return process_IPC_failover_command(ipcCommand);
+
+ case WD_IPC_ONLINE_RECOVERY_COMMAND:
+ return process_IPC_online_recovery(ipcCommand);
break;
- case WD_FAILOVER_CMD_SYNC_REQUEST:
- return process_IPC_failover_cmd_synchronise(ipcCommand);
+ case WD_FAILOVER_LOCKING_REQUEST:
+ return process_IPC_failover_locking_cmd(ipcCommand);
case WD_GET_MASTER_DATA_REQUEST:
return process_IPC_data_request_from_master(ipcCommand);
default:
- {
- char* error_json;
- ereport(LOG,
- (errmsg("invalid IPC command type %c",ipcCommand->type)));
-
- error_json = get_wd_simple_error_message_json("unknown IPC command type");
- if (write_ipc_command_with_result_data(ipcCommand, WD_IPC_CMD_RESULT_BAD,
- error_json, strlen(error_json) +1))
- {
- ereport(LOG,
- (errmsg("failed to forward error message of process node list command to IPC socket")));
- }
- pfree(error_json);
-
- }
+ ipcCommand->errorMessage = MemoryContextStrdup(ipcCommand->memoryContext,"unknown IPC command type");
break;
}
return IPC_CMD_ERROR;
}
-static IPC_CMD_PREOCESS_RES process_IPC_nodeList_command(WDIPCCommandData* IPCCommand)
+static IPC_CMD_PREOCESS_RES process_IPC_nodeList_command(WDCommandData* ipcCommand)
{
/* get the json for node list */
JsonNode* jNode = NULL;
int NodeID = -1;
- bool ret;
- if (IPCCommand->data_len <= 0 || IPCCommand->data_buf == NULL)
+ if (ipcCommand->sourcePacket.len <= 0 || ipcCommand->sourcePacket.data == NULL)
return IPC_CMD_ERROR;
- json_value *root = json_parse(IPCCommand->data_buf,IPCCommand->data_len);
+ json_value *root = json_parse(ipcCommand->sourcePacket.data,ipcCommand->sourcePacket.len);
/* The root node must be object */
if (root == NULL || root->type != json_object)
{
json_value_free(root);
jNode = get_node_list_json(NodeID);
- ret = write_ipc_command_with_result_data(IPCCommand, WD_IPC_CMD_RESULT_OK,
+ write_ipc_command_with_result_data(ipcCommand, WD_IPC_CMD_RESULT_OK,
jw_get_json_string(jNode), jw_get_json_length(jNode) +1);
jw_destroy(jNode);
- if (ret == false)
- return IPC_CMD_ERROR;
return IPC_CMD_COMPLETE;
}
-static IPC_CMD_PREOCESS_RES process_IPC_nodeStatusChange_command(WDIPCCommandData* IPCCommand)
+static IPC_CMD_PREOCESS_RES process_IPC_nodeStatusChange_command(WDCommandData* ipcCommand)
{
int nodeStatus;
int nodeID;
char *message;
bool ret;
- if (IPCCommand->data_len <= 0 || IPCCommand->data_buf == NULL)
+ if (ipcCommand->sourcePacket.len <= 0 || ipcCommand->sourcePacket.data == NULL)
return IPC_CMD_ERROR;
- ret = parse_node_status_json(IPCCommand->data_buf, IPCCommand->data_len, &nodeID, &nodeStatus, &message);
+ ret = parse_node_status_json(ipcCommand->sourcePacket.data, ipcCommand->sourcePacket.len, &nodeID, &nodeStatus, &message);
if (ret == false)
{
return true;
}
-
-
-static IPC_CMD_PREOCESS_RES process_IPC_replicate_variable(WDIPCCommandData* IPCCommand)
+static WDFailoverObject* get_failover_object_by_id(unsigned int failoverID)
{
- char res_type = WD_IPC_CMD_RESULT_BAD;
-
- if (get_local_node_state() == WD_STANDBY ||
- get_local_node_state() == WD_COORDINATOR)
+ ListCell *lc;
+ foreach(lc, g_cluster.wdCurrentFailovers)
{
- IPC_CMD_PREOCESS_RES execute_res = execute_replicate_command(IPCCommand);
-
- if (execute_res == IPC_CMD_COMPLETE)
- {
- res_type = WD_IPC_CMD_RESULT_OK;
- }
- else if (execute_res == IPC_CMD_ERROR)
+ WDFailoverObject* failoverObj = lfirst(lc);
+ if (failoverObj)
{
- res_type = WD_IPC_CMD_RESULT_BAD;
- }
- else /* IPC_CMD_PROCESSING*/
- {
- /*
- * Just return from the function, Do not reply back to requester at the moment
- * as we still need to further process this command
- */
- return execute_res;
+ if (failoverObj->failoverID == failoverID)
+ {
+ return failoverObj;
+ }
}
}
- else /* we are not in any stable state at the moment */
+ return NULL;
+}
+static bool remove_failover_object_by_id(unsigned int failoverID)
+{
+ WDFailoverObject* failoverObj = get_failover_object_by_id(failoverID);
+ if (failoverObj)
{
- res_type = WD_IPC_CMD_CLUSTER_IN_TRAN;
+ ereport(DEBUG2,
+ (errmsg("removing failover object with ID:%d",failoverID)));
+ g_cluster.wdCurrentFailovers = list_delete_ptr(g_cluster.wdCurrentFailovers,failoverObj);
+ pfree(failoverObj->nodeList);
+ pfree(failoverObj);
+ return true;
}
+ return false;
+}
- if (write_ipc_command_with_result_data(IPCCommand, res_type, NULL, 0))
+static bool does_int_array_contains_value(int *intArray, int count, int value)
+{
+ int i;
+ for (i=0; i<count; i++)
{
- /*
- * This is the complete lifecycle of command.
- * we are done with it
- */
-
- return IPC_CMD_COMPLETE;
+ if (intArray[i] == value)
+ return true;
}
- return IPC_CMD_ERROR;
+ return false;
}
-static IPC_CMD_PREOCESS_RES process_IPC_data_request_from_master(WDIPCCommandData *IPCCommand)
+static WDFailoverObject* get_failover_object(POOL_REQUEST_KIND reqKind, int nodesCount, int *nodeList)
{
- char res_type = WD_IPC_CMD_RESULT_BAD;
- /*
- * if cluster or myself is not in stable state
- * just return cluster in transaction
- */
- ereport(LOG,
- (errmsg("processing master node data request from IPC socket")));
-
- IPCCommand->type = WD_GET_MASTER_DATA_REQUEST;
- if (get_local_node_state() == WD_STANDBY)
+ ListCell *lc;
+ foreach(lc, g_cluster.wdCurrentFailovers)
{
- /* I am a standby node, Just forward the request to coordinator */
-
- WDPacketData wdPacket;
- init_wd_packet(&wdPacket);
- set_message_type(&wdPacket, WD_GET_MASTER_DATA_REQUEST);
- set_next_commandID_in_message(&wdPacket);
- set_message_data(&wdPacket, IPCCommand->data_buf , IPCCommand->data_len);
- /* save the command ID */
- IPCCommand->internal_command_id = wdPacket.command_id;
- if (send_message(g_cluster.masterNode, &wdPacket) <= 0)
- {
- ereport(LOG,
- (errmsg("failed to process master node data request from IPC socket"),
- errdetail("failed to forward the request to master watchdog node \"%s\"",g_cluster.masterNode->nodeName)));
- /* we have failed to send to any node, return lock failed */
- res_type = WD_IPC_CMD_RESULT_BAD;
- }
- else
+ WDFailoverObject* failoverObj = lfirst(lc);
+ if (failoverObj)
{
- /*
- * we need to wait for the result
- */
- ereport(LOG,
- (errmsg("data request from IPC socket is forwarded to master watchdog node \"%s\"",g_cluster.masterNode->nodeName),
- errdetail("waiting for the reply from master node...")));
-
- return IPC_CMD_PROCESSING;
+ if (failoverObj->reqKind == reqKind && failoverObj->nodesCount == nodesCount)
+ {
+ bool equal = true;
+ int i;
+ for (i=0; i<nodesCount; i++)
+ {
+ if (does_int_array_contains_value(nodeList,nodesCount,failoverObj->nodeList[i]) == false)
+ {
+ equal = false;
+ break;
+ }
+ }
+ if (equal)
+ return failoverObj;
+ }
}
}
- else if (get_local_node_state() == WD_COORDINATOR)
+ return NULL;
+}
+
+static void process_remote_failover_command_on_coordinator(WatchdogNode* wdNode, WDPacketData* pkt)
+{
+ if (get_local_node_state() != WD_COORDINATOR)
{
- /* This node is itself a master node, So send the empty result with OK tag */
- res_type = WD_IPC_CMD_RESULT_OK;
+ /* only lock holder can resign itself */
+ reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
}
- else /* we are not in any stable state at the moment */
+ else
{
- res_type = WD_IPC_CMD_CLUSTER_IN_TRAN;
- }
+ IPC_CMD_PREOCESS_RES res;
+ WDCommandData* ipcCommand = create_command_object(pkt->len);
+ ipcCommand->sourcePacket.type = pkt->type;
+ ipcCommand->sourcePacket.len = pkt->len;
+ ipcCommand->sourcePacket.command_id = pkt->command_id;
- if (write_ipc_command_with_result_data(IPCCommand, res_type, NULL, 0))
- {
- /*
- * This is the complete lifecycle of command.
- * we are done with it
- */
- return IPC_CMD_COMPLETE;
- }
- return IPC_CMD_ERROR;
+ if (pkt->len > 0)
+ memcpy(ipcCommand->sourcePacket.data, pkt->data, pkt->len);
-}
+ ipcCommand->commandSource = COMMAND_SOURCE_REMOTE;
+ ipcCommand->sourceWdNode = wdNode;
+ gettimeofday(&ipcCommand->commandTime, NULL);
-static IPC_CMD_PREOCESS_RES process_IPC_failover_cmd_synchronise(WDIPCCommandData *IPCCommand)
-{
- char res_type = WD_IPC_CMD_RESULT_BAD;
- /*
- * if cluster or myself is not in stable state
- * just return cluster in transaction
- */
- ereport(LOG,
- (errmsg("processing sync request from IPC socket")));
+ ereport(LOG,
+ (errmsg("watchdog received the failover command from remote pgpool-II node \"%s\"",wdNode->nodeName)));
- IPCCommand->type = WD_FAILOVER_CMD_SYNC_REQUEST;
- if (get_local_node_state() == WD_STANDBY)
- {
- /* I am a standby node, Just forward the request to coordinator */
-
- WDPacketData wdPacket;
- init_wd_packet(&wdPacket);
- set_message_type(&wdPacket, WD_FAILOVER_CMD_SYNC_REQUEST);
- set_next_commandID_in_message(&wdPacket);
- set_message_data(&wdPacket, IPCCommand->data_buf , IPCCommand->data_len);
- /* save the command ID */
- IPCCommand->internal_command_id = wdPacket.command_id;
- if (send_message(g_cluster.masterNode, &wdPacket) <= 0)
+ res = process_failover_command_on_coordinator(ipcCommand);
+ if (res == IPC_CMD_PROCESSING)
{
+ MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext);
+ g_cluster.ipc_commands = lappend(g_cluster.ipc_commands,ipcCommand);
+ MemoryContextSwitchTo(oldCxt);
ereport(LOG,
- (errmsg("failed to process sync request from IPC socket"),
- errdetail("failed to forward the request to master watchdog node \"%s\"",g_cluster.masterNode->nodeName)));
- /* we have failed to send to any node, return lock failed */
- res_type = WD_IPC_CMD_RESULT_BAD;
+ (errmsg("failover command from remote pgpool-II node \"%s\" is still processing",wdNode->nodeName)));
}
else
{
- /*
- * we need to wait for the result
- */
- ereport(LOG,
- (errmsg("sync request from IPC socket is forwarded to master watchdog node \"%s\"",g_cluster.masterNode->nodeName),
- errdetail("waiting for the reply from master node...")));
-
- return IPC_CMD_PROCESSING;
+ cleanUpIPCCommand(ipcCommand);
}
}
- else if (get_local_node_state() == WD_COORDINATOR)
+}
+
+static IPC_CMD_PREOCESS_RES process_IPC_failover_command_on_coordinator(WDCommandData* ipcCommand)
+{
+ if (get_local_node_state() != WD_COORDINATOR)
+ return IPC_CMD_ERROR; /* should never hapen*/
+
+ ereport(LOG,
+ (errmsg("watchdog received the failover command on IPC socket")));
+
+ return process_failover_command_on_coordinator(ipcCommand);
+}
+
+
+static bool reply_to_failove_command(WDCommandData* ipcCommand, WDFailoverCMDResults cmdResult, unsigned int failoverID)
+{
+ bool ret = false;
+ JsonNode* jNode = jw_create_with_object(true);
+ jw_put_int(jNode, WD_FAILOVER_RESULT_KEY, cmdResult);
+ jw_put_int(jNode, WD_FAILOVER_ID_KEY, failoverID);
+ /* create the packet */
+ jw_end_element(jNode);
+ jw_finish_document(jNode);
+
+ ereport(DEBUG1,
+ (errmsg("replying to failover command"),
+ errdetail("%.*s",jw_get_json_length(jNode),jw_get_json_string(jNode))));
+
+ if (ipcCommand->commandSource == COMMAND_SOURCE_IPC)
{
- /*
- * If I am coordinator, Just process the request locally
- */
- process_failover_command_sync_requests(g_cluster.localNode, NULL, IPCCommand);
- return IPC_CMD_COMPLETE;
+ ret = write_ipc_command_with_result_data(ipcCommand, WD_IPC_CMD_RESULT_OK,
+ jw_get_json_string(jNode), jw_get_json_length(jNode) +1);
}
- else /* we are not in any stable state at the moment */
+ else if (ipcCommand->commandSource == COMMAND_SOURCE_REMOTE)
{
- res_type = WD_IPC_CMD_CLUSTER_IN_TRAN;
+ reply_with_message(ipcCommand->sourceWdNode, WD_CMD_REPLY_IN_DATA,
+ jw_get_json_string(jNode), jw_get_json_length(jNode) + 1,
+ &ipcCommand->sourcePacket);
}
+ jw_destroy(jNode);
+ return ret;
+}
- if (write_ipc_command_with_result_data(IPCCommand, res_type, NULL, 0))
- {
- /*
- * This is the complete lifecycle of command.
- * we are done with it
- */
+/*
+ * The Function forwards the failover command to all standby nodes.
+ */
+static IPC_CMD_PREOCESS_RES process_failover_command_on_coordinator(WDCommandData* ipcCommand)
+{
+ char* func_name;
+ int node_count = 0;
+ int *node_id_list = NULL;
+ bool ret = false;
+ WDFailoverObject* failoverObj;
+ POOL_REQUEST_KIND reqKind;
+
+ if (get_local_node_state() != WD_COORDINATOR)
+ return IPC_CMD_ERROR; /* should never happen*/
+
+ /*
+ * The coordinator node
+ * Forward this command to all standby nodes.
+ * Ask the caller to proceed with failover
+ * but first check if this failover is already requested
+ * by some other node.
+ */
+
+ ret = parse_wd_node_function_json(ipcCommand->sourcePacket.data, ipcCommand->sourcePacket.len,
+ &func_name, &node_id_list, &node_count);
+ if (ret == false)
+ {
+ ereport(LOG,(
+ errmsg("failed to process failover command"),
+ errdetail("unable to parse the command data")));
+ reply_to_failove_command(ipcCommand, FAILOVER_RES_INVALID_FUNCTION, 0);
return IPC_CMD_COMPLETE;
}
- return IPC_CMD_ERROR;
-}
-static int node_has_requested_for_interlocking(WatchdogNode* wdNode, WDPacketData* pkt)
-{
- /* only coordinator(master) node can process this request */
- if (get_local_node_state() == WD_COORDINATOR)
+ if (strcasecmp(WD_FUNCTION_FAILBACK_REQUEST, func_name) == 0)
+ reqKind = NODE_UP_REQUEST;
+ else if (strcasecmp(WD_FUNCTION_DEGENERATE_REQUEST, func_name) == 0)
+ reqKind = NODE_DOWN_REQUEST;
+ else if (strcasecmp(WD_FUNCTION_PROMOTE_REQUEST, func_name) == 0)
+ reqKind = PROMOTE_NODE_REQUEST;
+ else
+ {
+ reply_to_failove_command(ipcCommand, FAILOVER_RES_INVALID_FUNCTION, 0);
+ return IPC_CMD_COMPLETE;
+ }
+
+ ereport(LOG,(
+ errmsg("watchdog received failover command [%s]",func_name)));
+
+ if (get_cluster_node_count() == 0)
+ {
+ /*
+ * Since I am the only node in the cluster so nothing
+ * we need to do here
+ */
+ ereport(LOG,(
+ errmsg("I am the only Pgpool-II node in the watchdog cluster"),
+ errdetail("local Pgpool-II node is allowd to execute the failover command [%s]",func_name)));
+ reply_to_failove_command(ipcCommand, FAILOVER_RES_PROCEED, 0);
+ return IPC_CMD_COMPLETE;
+ }
+
+ if (ipcCommand->commandSource == COMMAND_SOURCE_REMOTE && Req_info->switching)
{
- /* check if we already have no lockholder node */
- if (g_cluster.lockHolderNode == NULL || g_cluster.lockHolderNode == wdNode)
+ /*
+ * check if the failover is allowed before doing anything
+ */
+ ereport(LOG,
+ (errmsg("failover command [%s] request from Pgpool-II node \"%s\" is rejected because of switching",
+ func_name,
+ ipcCommand->sourceWdNode->nodeName)));
+ reply_to_failove_command(ipcCommand, FAILOVER_RES_NOT_ALLOWED, 0);
+ return IPC_CMD_COMPLETE;
+ }
+
+ /*
+ * check if the same failover is already issued to the main
+ * process
+ */
+ failoverObj = get_failover_object(reqKind, node_count, node_id_list);
+ if (failoverObj)
+ {
+ ereport(LOG,
+ (errmsg("ignoring the failover command [%s] request, similar failover request is already in progress",func_name)));
+
+ /* Same failover is already in progress */
+ reply_to_failove_command(ipcCommand, FAILOVER_RES_ALREADY_ISSUED, 0);
+ return IPC_CMD_COMPLETE;
+ }
+ else
+ {
+ MemoryContext oldCxt;
+ ereport(DEBUG2,
+ (errmsg("no similar failover in progress")));
+ /*
+ * okay now ask all nodes to start failover
+ */
+ wd_packet_shallow_copy(&ipcCommand->sourcePacket, &ipcCommand->commandPacket);
+ ipcCommand->commandPacket.type = WD_REMOTE_FAILOVER_REQUEST;
+ set_next_commandID_in_message(&ipcCommand->commandPacket);
+
+ oldCxt = MemoryContextSwitchTo(TopMemoryContext);
+ /* No similar failover is in progress */
+ failoverObj = palloc0(sizeof(WDFailoverObject));
+ failoverObj->reqKind = reqKind;
+ failoverObj->nodesCount = node_count;
+ if (node_count > 0)
{
- if (wdNode == g_cluster.localNode)
- {
- g_cluster.lockHolderNode = wdNode;
- /* TODO inform all cluster about the new lock holder */
- return true;
- }
- /* reply the node with success message */
- else if (reply_with_minimal_message(wdNode, WD_ACCEPT_MESSAGE, pkt))
- {
- g_cluster.lockHolderNode = wdNode;
- /* TODO inform all cluster about the new lock holder */
- return true;
- }
+ failoverObj->nodeList = palloc(sizeof(int) * node_count);
+ memcpy(failoverObj->nodeList, node_id_list, sizeof(int) * node_count);
+ }
+ failoverObj->failoverID = ipcCommand->commandPacket.command_id; /* use command id as failover id */
+ gettimeofday(&failoverObj->startTime, NULL);
+ failoverObj->wdRequestingNode = g_cluster.localNode;
+ g_cluster.wdCurrentFailovers = lappend(g_cluster.wdCurrentFailovers,failoverObj);
+
+ MemoryContextSwitchTo(oldCxt);
+ /* We may also need to send the Accept message here for remote node */
+
+ ipcCommand->sendToNode = NULL; /* command needs to be sent to all nodes */
+
+ ereport(LOG,
+ (errmsg("forwarding the failover request [%s] to all alive nodes",func_name),
+ errdetail("watchdog cluster currently has %d remote connected nodes",get_cluster_node_count())));
+
+ /* see if there is any node we want to send to */
+ send_command_packet_to_remote_nodes(ipcCommand, false);
+
+ ereport(LOG,
+ (errmsg("failover request [%s] is sent to %d nodes",func_name,ipcCommand->commandSendToCount)));
+
+ /* For a moment just think it is successfully sent to all nodes.*/
+ if (ipcCommand->commandSource == COMMAND_SOURCE_IPC)
+ {
+ reply_to_failove_command(ipcCommand, FAILOVER_RES_PROCEED, failoverObj->failoverID);
+ return IPC_CMD_COMPLETE;
}
else
{
- reply_with_minimal_message(wdNode, WD_REJECT_MESSAGE, pkt);
+ process_wd_command_function(ipcCommand->sourceWdNode, &ipcCommand->sourcePacket,
+ func_name, node_count, node_id_list, failoverObj->failoverID);
+ if (get_cluster_node_count() == 1)
+ {
+ /* Since its just 2 nodes cluster, and the only other
+ * node is the one that actually issued the failover
+ * so the command actually completes here
+ */
+ return IPC_CMD_COMPLETE;
+ }
}
}
- else
- {
- reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
- }
- return false;
+
+ return IPC_CMD_PROCESSING;
}
-/*
- * process_failover_command_sync_requests()
- * the function is the main processor of all interlocking related requests.
- * it parses the request json and executes the requested intelocking command
- */
-static void process_failover_command_sync_requests(WatchdogNode* wdNode, WDPacketData* pkt, WDIPCCommandData* ipcCommand)
+static IPC_CMD_PREOCESS_RES process_IPC_failover_command(WDCommandData* ipcCommand)
{
-
- WDFailoverCMDResults res = FAILOVER_RES_TRANSITION;
- JsonNode* jNode = NULL;
- int failoverLockID = -1;
-
- /* only coordinator(master) node can process this request */
if (get_local_node_state() == WD_COORDINATOR)
{
- char* json_data = NULL;
- int data_len = 0;
- json_value *root = NULL;
- char* syncRequestType = NULL;
-
- /* We need to identify failover command type and sync function */
- if (pkt)
+ return process_IPC_failover_command_on_coordinator(ipcCommand);
+ }
+ else if (get_local_node_state() == WD_STANDBY)
+ {
+ /* I am a standby node, Just forward the request to coordinator */
+
+ wd_packet_shallow_copy(&ipcCommand->sourcePacket, &ipcCommand->commandPacket);
+ set_next_commandID_in_message(&ipcCommand->commandPacket);
+
+ ipcCommand->sendToNode = g_cluster.masterNode; /* command needs to be sent to all nodes */
+ if (send_command_packet_to_remote_nodes(ipcCommand, true) <= 0)
{
- json_data = pkt->data;
- data_len = pkt->len;
+ ereport(LOG,
+ (errmsg("failed to process failover request from IPC socket"),
+ errdetail("failed to forward the request to master watchdog node \"%s\"",g_cluster.masterNode->nodeName)));
+ /* we have failed to send to any node, return lock failed */
+ return IPC_CMD_ERROR;
}
else
{
- json_data = ipcCommand->data_buf;
- data_len = ipcCommand->data_len;
+ /*
+ * we need to wait for the result
+ */
+ ereport(LOG,
+ (errmsg("failover request from IPC socket is forwarded to master watchdog node \"%s\"",g_cluster.masterNode->nodeName),
+ errdetail("waiting for the reply from master node...")));
+ return IPC_CMD_PROCESSING;
}
+ }
+ /* we are not in stable state at the moment */
+ return IPC_CMD_ERROR;
+}
- if (data_len > 0 && json_data)
- {
- root = json_parse(json_data,data_len);
- if (root && root->type == json_object)
- {
- syncRequestType = json_get_string_value_for_key(root, "SyncRequestType");
- json_get_int_value_for_key(root, "FailoverLockID", &failoverLockID);
- }
- else
- {
- ereport(LOG,
- (errmsg("unable to parse json data of interlocking command")));
- }
- }
- if (syncRequestType)
- {
+static IPC_CMD_PREOCESS_RES process_IPC_online_recovery(WDCommandData* ipcCommand)
+{
+ if (get_local_node_state() == WD_STANDBY ||
+ get_local_node_state() == WD_COORDINATOR)
+ {
+ /* save the hassel if I am the only alive node */
+ if (get_cluster_node_count() == 0)
+ return IPC_CMD_OK;
- if (strcasecmp(WD_REQ_FAILOVER_START, syncRequestType) == 0)
- res = node_is_asking_for_failover_start(wdNode, pkt);
+ wd_packet_shallow_copy(&ipcCommand->sourcePacket, &ipcCommand->commandPacket);
+ set_next_commandID_in_message(&ipcCommand->commandPacket);
- else if (strcasecmp(WD_REQ_FAILOVER_END, syncRequestType) == 0)
- res = node_is_asking_for_failover_end(wdNode, pkt);
+ ipcCommand->sendToNode = NULL; /* command needs to be sent to all nodes */
+ if (send_command_packet_to_remote_nodes(ipcCommand, true) <= 0)
+ {
+ ereport(LOG,
+ (errmsg("failed to process failover request from IPC socket"),
+ errdetail("failed to forward the request to remote nodes")));
+ /* we have failed to send to any node, return lock failed */
+ return IPC_CMD_ERROR;
+ }
+ return IPC_CMD_PROCESSING;
+ }
+ /* we are not in any stable state at the moment */
+ return IPC_CMD_TRY_AGAIN;
+}
- else if (strcasecmp(WD_REQ_FAILOVER_RELEASE_LOCK, syncRequestType) == 0)
- res = node_is_asking_for_failover_lock_release(wdNode, pkt, failoverLockID);
+static IPC_CMD_PREOCESS_RES process_IPC_data_request_from_master(WDCommandData *ipcCommand)
+{
+ /*
+ * if cluster or myself is not in stable state
+ * just return cluster in transaction
+ */
+ ereport(LOG,
+ (errmsg("processing master node data request from IPC socket")));
- else if (strcasecmp(WD_REQ_FAILOVER_LOCK_STATUS, syncRequestType) == 0)
- res = node_is_asking_for_failover_lock_status(wdNode, pkt, failoverLockID);
+ if (get_local_node_state() == WD_STANDBY)
+ {
+ /*
+ * set the command id in the IPC packet before forwaring
+ * it on the watchdog socket
+ */
+ wd_packet_shallow_copy(&ipcCommand->sourcePacket, &ipcCommand->commandPacket);
+ set_next_commandID_in_message(&ipcCommand->commandPacket);
- else
- res = FAILOVER_RES_ERROR;
+ ipcCommand->sendToNode = g_cluster.masterNode;
+ if (send_command_packet_to_remote_nodes(ipcCommand, true) <= 0)
+ {
+ ereport(LOG,
+ (errmsg("failed to process master node data request from IPC socket"),
+ errdetail("failed to forward the request to master watchdog node \"%s\"",g_cluster.masterNode->nodeName)));
+ /* we have failed to send to any node, return lock failed */
+ return IPC_CMD_ERROR;
}
else
{
+ /*
+ * we need to wait for the result
+ */
ereport(LOG,
- (errmsg("invalid json data"),
- errdetail("unable to find interlocking command type")));
- res = FAILOVER_RES_ERROR;
- }
+ (errmsg("data request from IPC socket is forwarded to master watchdog node \"%s\"",g_cluster.masterNode->nodeName),
+ errdetail("waiting for the reply from master node...")));
- if (root)
- {
- json_value_free(root);
+ return IPC_CMD_PROCESSING;
}
}
- else
+ else if (get_local_node_state() == WD_COORDINATOR)
{
- /* I am not the coordinator node. So just return an error */
- res = FAILOVER_RES_ERROR;
+ /* This node is itself a master node, So send the empty result with OK tag */
+ return IPC_CMD_OK;
}
- if (res != FAILOVER_RES_ERROR)
- {
- /* create the json result */
- jNode = jw_create_with_object(true);
- /* add the node count */
- jw_put_int(jNode, "FailoverLockID", failoverLockID);
- jw_put_int(jNode, "InterlockingResult", res);
- /* create the packet */
- jw_end_element(jNode);
- jw_finish_document(jNode);
- }
+ /* we are not in any stable state at the moment */
+
+ return IPC_CMD_TRY_AGAIN;
+}
+
+static IPC_CMD_PREOCESS_RES process_IPC_failover_locking_cmd(WDCommandData *ipcCommand)
+{
+ /*
+ * if cluster or myself is not in stable state
+ * just return cluster in transaction
+ */
+ ereport(LOG,
+ (errmsg("processing failover command lock request from IPC socket")));
- if (wdNode != g_cluster.localNode)
+ if (get_local_node_state() == WD_STANDBY)
{
- if (jNode == NULL)
+ /* I am a standby node, Just forward the request to coordinator */
+ wd_packet_shallow_copy(&ipcCommand->sourcePacket, &ipcCommand->commandPacket);
+ set_next_commandID_in_message(&ipcCommand->commandPacket);
+
+ ipcCommand->sendToNode = g_cluster.masterNode;
+ if (send_command_packet_to_remote_nodes(ipcCommand, true) <= 0)
{
- reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
+ ereport(LOG,
+ (errmsg("failed to process the failover command lock request from IPC socket"),
+ errdetail("failed to forward the request to master watchdog node \"%s\"",g_cluster.masterNode->nodeName)));
+ /* we have failed to send to any node, return lock failed */
+ return IPC_CMD_ERROR;
}
else
{
- reply_with_message(wdNode, WD_DATA_MESSAGE, jw_get_json_string(jNode), jw_get_json_length(jNode) + 1, pkt);
+ /*
+ * wait for the result
+ */
+ ereport(LOG,
+ (errmsg("failover command lock request from IPC socket is forwarded to master watchdog node \"%s\"",g_cluster.masterNode->nodeName),
+ errdetail("waiting for the reply from master node...")));
+
+ return IPC_CMD_PROCESSING;
}
}
+ else if (get_local_node_state() == WD_COORDINATOR)
+ {
+ /*
+ * If I am coordinator, Just process the request locally
+ */
+ return process_failover_locking_requests_on_cordinator(ipcCommand);
+ }
+
+ /* we are not in any stable state at the moment */
+ return IPC_CMD_TRY_AGAIN;
+}
+
+static void process_remote_failover_locking_request(WatchdogNode* wdNode, WDPacketData* pkt)
+{
+ if (get_local_node_state() != WD_COORDINATOR)
+ {
+ /* only lock holder can resign itself */
+ reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
+ }
else
{
- /* reply on IPC Socket */
- bool ret;
- if (jNode != NULL)
+ IPC_CMD_PREOCESS_RES res;
+ WDCommandData* ipcCommand = create_command_object(pkt->len);
+ ipcCommand->sourcePacket.type = pkt->type;
+ ipcCommand->sourcePacket.len = pkt->len;
+ ipcCommand->sourcePacket.command_id = pkt->command_id;
+ if (pkt->len > 0)
+ memcpy(ipcCommand->sourcePacket.data, pkt->data, pkt->len);
+
+ ipcCommand->commandSource = COMMAND_SOURCE_REMOTE;
+ ipcCommand->sourceWdNode = wdNode;
+ gettimeofday(&ipcCommand->commandTime, NULL);
+
+ res = process_failover_locking_requests_on_cordinator(ipcCommand);
+ if (res == IPC_CMD_PROCESSING)
{
- ret = write_ipc_command_with_result_data(ipcCommand, WD_IPC_CMD_RESULT_OK,
- jw_get_json_string(jNode), jw_get_json_length(jNode) +1);
- jw_destroy(jNode);
+ MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext);
+ g_cluster.ipc_commands = lappend(g_cluster.ipc_commands,ipcCommand);
+ MemoryContextSwitchTo(oldCxt);
}
else
{
- ret =write_ipc_command_with_result_data(ipcCommand, WD_IPC_CMD_RESULT_BAD, NULL, 0);
+ cleanUpIPCCommand(ipcCommand);
}
+ }
+}
- if (ret == false)
+
+/*
+ * process_failover_locking_requests_on_cordinator()
+ * the function is the main processor of all interlocking related requests.
+ * it parses the request json and executes the requested intelocking command
+ */
+static IPC_CMD_PREOCESS_RES process_failover_locking_requests_on_cordinator(WDCommandData* ipcCommand)
+{
+ WDFailoverCMDResults res = FAILOVER_RES_TRANSITION;
+ json_value* root;
+ int failoverLockID = -1;
+ unsigned int failoverID = 0;
+ char *syncRequestType;
+ WatchdogNode* wdNode;
+
+ if (get_local_node_state() != WD_COORDINATOR)
+ return IPC_CMD_ERROR;
+
+ if (ipcCommand->sourcePacket.data == NULL || ipcCommand->sourcePacket.len <= 0)
+ {
+ ereport(LOG,
+ (errmsg("failed to process locking request"),
+ errdetail("invalid command packet")));
+ reply_to_failove_command(ipcCommand, FAILOVER_RES_INVALID_FUNCTION, failoverID);
+ return IPC_CMD_COMPLETE;
+ }
+ /* parse the json*/
+ root = json_parse(ipcCommand->sourcePacket.data,ipcCommand->sourcePacket.len);
+ if (root && root->type == json_object)
+ {
+ syncRequestType = json_get_string_value_for_key(root, "SyncRequestType");
+ json_get_int_value_for_key(root, "FailoverLockID", &failoverLockID);
+ json_get_int_value_for_key(root, "WDFailoverID", (int*)&failoverID);
+ if (syncRequestType == false)
{
ereport(LOG,
- (errmsg("failed to write results for failover sync request to IPC socket")));
+ (errmsg("failed to process locking request"),
+ errdetail("invalid command packet")));
+ reply_to_failove_command(ipcCommand, FAILOVER_RES_INVALID_FUNCTION, failoverID);
+ return IPC_CMD_COMPLETE;
}
}
+ else
+ {
+ ereport(LOG,
+ (errmsg("failed to process locking request"),
+ errdetail("invalid command packet")));
+ reply_to_failove_command(ipcCommand, FAILOVER_RES_INVALID_FUNCTION, failoverID);
+ return IPC_CMD_COMPLETE;
+ }
+
+ if (ipcCommand->commandSource == COMMAND_SOURCE_IPC)
+ wdNode = g_cluster.localNode;
+ else
+ wdNode = ipcCommand->sourceWdNode;
+
+ if (strcasecmp(WD_REQ_FAILOVER_START, syncRequestType) == 0)
+ res = node_is_asking_for_failover_start(wdNode, &ipcCommand->sourcePacket, failoverID);
+
+ else if (strcasecmp(WD_REQ_FAILOVER_END, syncRequestType) == 0)
+ res = node_is_asking_for_failover_end(wdNode, &ipcCommand->sourcePacket, failoverID);
+
+ else if (strcasecmp(WD_REQ_FAILOVER_RELEASE_LOCK, syncRequestType) == 0)
+ res = node_is_asking_for_failover_lock_release(wdNode, &ipcCommand->sourcePacket, failoverLockID, failoverID);
+
+ else if (strcasecmp(WD_REQ_FAILOVER_LOCK_STATUS, syncRequestType) == 0)
+ res = node_is_asking_for_failover_lock_status(wdNode, &ipcCommand->sourcePacket, failoverLockID, failoverID);
+
+ else
+ {
+ ereport(LOG,
+ (errmsg("failed to process locking request"),
+ errdetail("invalid command packet")));
+ res = FAILOVER_RES_INVALID_FUNCTION;
+ }
+ reply_to_failove_command(ipcCommand, res, failoverID);
+
+ if (root)
+ json_value_free(root);
+
+ return IPC_CMD_COMPLETE;
}
/*
* Only coordinator/master node can execute the interlocking requests.
*/
static WDFailoverCMDResults
-node_is_asking_for_failover_start(WatchdogNode* wdNode, WDPacketData* pkt)
+node_is_asking_for_failover_start(WatchdogNode* wdNode, WDPacketData* pkt, unsigned int failoverID)
{
WDFailoverCMDResults res = FAILOVER_RES_TRANSITION;
ereport(LOG,
- (errmsg("%s pgpool-II node \"%s\" is requesting to become a lock holder",
+ (errmsg("%s pgpool-II node \"%s\" is requesting to become a lock holder for failover ID: %d",
(g_cluster.localNode == wdNode)? "local":"remote",
- wdNode->nodeName)));
+ wdNode->nodeName, failoverID)));
/* only coordinator(master) node can process this request */
if (get_local_node_state() == WD_COORDINATOR)
{
- /* check if we have no node in interlocking or requesting node is itself
- * a lock holder node
- */
- if (g_cluster.interlockingNode.lockHolderNode == NULL ||
- g_cluster.interlockingNode.lockHolderNode == wdNode)
+ /* only the coordinator node can become a lock holder */
+ if (g_cluster.masterNode == wdNode)
{
int i = 0;
/* lock all command locks */
}
else
{
- /* some other node is holding the lock */
res = FAILOVER_RES_I_AM_NOT_LOCK_HOLDER;
- ereport(LOG,
+ if (g_cluster.interlockingNode.lockHolderNode == NULL)
+ ereport(LOG,
+ (errmsg("request to become a lock holder is denied to %s pgpool-II node \"%s\"",
+ (g_cluster.localNode == wdNode)? "local":"remote",
+ wdNode->nodeName),
+ errdetail("only master/coordinator can become a lock holder")));
+ else
+ ereport(LOG,
(errmsg("lock holder request denied to %s pgpool-II node \"%s\"",
(g_cluster.localNode == wdNode)? "local":"remote",
wdNode->nodeName),
* Only coordinator/master node can execute the interlocking requests.
*/
static WDFailoverCMDResults
-node_is_asking_for_failover_end(WatchdogNode* wdNode, WDPacketData* pkt)
+node_is_asking_for_failover_end(WatchdogNode* wdNode, WDPacketData* pkt, unsigned int failoverID)
{
WDFailoverCMDResults res = FAILOVER_RES_TRANSITION;
ereport(LOG,
- (errmsg("%s pgpool-II node \"%s\" is requesting to resign from a lock holder",
+ (errmsg("%s pgpool-II node \"%s\" is requesting to resign from a lock holder for failover ID %d",
(g_cluster.localNode == wdNode)? "local":"remote",
- wdNode->nodeName)));
+ wdNode->nodeName, failoverID)));
if (get_local_node_state() == WD_COORDINATOR)
{
(errmsg("%s pgpool-II node \"%s\" has resigned from the lock holder",
(g_cluster.localNode == wdNode)? "local":"remote",
wdNode->nodeName)));
+ /* This marks the end of failover. Remove the
+ * associated failover object
+ */
+ remove_failover_object_by_id(failoverID);
}
else /* some other node is holding the lock */
{
* Only coordinator/master node can execute the interlocking requests.
*/
static WDFailoverCMDResults
-node_is_asking_for_failover_lock_release(WatchdogNode* wdNode, WDPacketData* pkt, WDFailoverLock failoverLock)
+node_is_asking_for_failover_lock_release(WatchdogNode* wdNode, WDPacketData* pkt, WDFailoverLock failoverLock, unsigned int failoverID)
{
WDFailoverCMDResults res = FAILOVER_RES_TRANSITION;
ereport(LOG,
- (errmsg("%s pgpool-II node \"%s\" is requesting to release [%s] lock",
+ (errmsg("%s pgpool-II node \"%s\" is requesting to release [%s] lock for failover ID %d",
(g_cluster.localNode == wdNode)? "local":"remote",
wdNode->nodeName,
- wd_failover_lock_name[failoverLock])));
+ wd_failover_lock_name[failoverLock],
+ failoverID)));
if (get_local_node_state() == WD_COORDINATOR)
{
res = FAILOVER_RES_SUCCESS;
ereport(LOG,
- (errmsg("%s pgpool-II node \"%s\" has released the [%s] lock",
+ (errmsg("%s pgpool-II node \"%s\" has released the [%s] lock for failover ID %d",
(g_cluster.localNode == wdNode)? "local":"remote",
wdNode->nodeName,
- wd_failover_lock_name[failoverLock])));
+ wd_failover_lock_name[failoverLock],
+ failoverID)));
}
else
{
* Only coordinator/master node can execute the interlocking requests.
*/
static WDFailoverCMDResults
-node_is_asking_for_failover_lock_status(WatchdogNode* wdNode, WDPacketData* pkt, WDFailoverLock failoverLock)
+node_is_asking_for_failover_lock_status(WatchdogNode* wdNode, WDPacketData* pkt, WDFailoverLock failoverLock, unsigned int failoverID)
{
WDFailoverCMDResults res = FAILOVER_RES_TRANSITION;
ereport(LOG,
- (errmsg("%s pgpool-II node \"%s\" is checking the status of [%s] lock",
+ (errmsg("%s pgpool-II node \"%s\" is checking the status of [%s] lock for failover ID %d",
(g_cluster.localNode == wdNode)? "local":"remote",
wdNode->nodeName,
- wd_failover_lock_name[failoverLock])));
+ wd_failover_lock_name[failoverLock],
+ failoverID)));
if (get_local_node_state() == WD_COORDINATOR)
{
- /* check if the node requesting to start the command is the lock holder */
+ /* check if the lock holder exists */
if (g_cluster.interlockingNode.lockHolderNode)
{
/* make sure the request is of a valid lock */
}
else
{
+ /* There is one special case, Since only the coordinator/master
+ * can become a lock holder, and in case when some standby node asks for
+ * the status of lock before the master node has even started to failover
+ * The normal flow will return that no lock holder exist. That make the
+ * standby node to think if master node is already finished with the
+ * failover.
+ */
+ if (get_failover_object_by_id(failoverID))
+ {
+ ereport(LOG,
+ (errmsg("[%s] lock status check request from %s pgpool-II node \"%s\" for failover ID %d",
+ wd_failover_lock_name[failoverLock],
+ (g_cluster.localNode == wdNode)? "local":"remote",
+ wdNode->nodeName,
+ failoverID),
+ errdetail("but failover is not yet started by master node")));
+ res = FAILOVER_RES_NO_LOCKHOLDER_BUT_WAIT;
+
+ }
+ else
+ {
/* no lock holder exists */
- ereport(LOG,
- (errmsg("[%s] lock status check request denied to %s pgpool-II node \"%s\"",
- wd_failover_lock_name[failoverLock],
- (g_cluster.localNode == wdNode)? "local":"remote",
- wdNode->nodeName),
- errdetail("no lock holder exists")));
- res = FAILOVER_RES_NO_LOCKHOLDER;
+ ereport(LOG,
+ (errmsg("[%s] lock status check request from %s pgpool-II node \"%s\" for failover ID %d",
+ wd_failover_lock_name[failoverLock],
+ (g_cluster.localNode == wdNode)? "local":"remote",
+ wdNode->nodeName,
+ failoverID),
+ errdetail("no lock holder exists")));
+ res = FAILOVER_RES_NO_LOCKHOLDER;
+ }
}
}
else
return res;
}
-static bool node_has_resigned_from_interlocking(WatchdogNode* wdNode, WDPacketData* pkt)
-{
- /* only coordinator(master) node can process this request */
- if (get_local_node_state() == WD_COORDINATOR)
- {
- /* check if we already have no lockholder node */
- if (g_cluster.lockHolderNode == NULL || g_cluster.lockHolderNode == wdNode)
- {
- /* reply the node with success message */
- if (reply_with_minimal_message(wdNode, WD_ACCEPT_MESSAGE, pkt))
- {
- g_cluster.lockHolderNode = NULL;
- /* TODO inform all cluster about the new lock holder */
- return true;
- }
- }
- else
- {
- /* only lock holder can resign itself */
- reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
- }
- }
- else
- reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
- return false;
-}
-
-
static WatchdogNode* parse_node_info_message(WDPacketData* pkt, char **authkey)
{
if (pkt == NULL || (pkt->type != WD_ADD_NODE_MESSAGE && pkt->type != WD_INFO_MESSAGE))
}
ereport(DEBUG1,
- (errmsg("received packet tyep %c while need packet type %c",type,ensure_type)));
-
+ (errmsg("received packet type %c while need packet type %c",type,ensure_type)));
+
if (ensure_type != WD_NO_MESSAGE && ensure_type != type)
{
/* The packet type is not what we want.*/
close_socket_connection(conn);
return NULL;
}
-
+
ret = socket_read(conn->sock, &cmd_id, sizeof(int) ,1);
if (ret != sizeof(int))
{
ereport(DEBUG3,
(errmsg("received packet with command id %d from watchdog node ",cmd_id)));
-
+
ret = socket_read(conn->sock, &len, sizeof(int), 1);
if (ret != sizeof(int))
{
close_socket_connection(conn);
return NULL;
}
-
+
len = ntohl(len);
+ ereport(DEBUG1,
+ (errmsg("reading packet type %c of length",type,len)));
+
pkt = get_empty_packet();
set_message_type(pkt, type);
set_message_commandID(pkt, cmd_id);
return (conn->sock > 0 && conn->sock_state == WD_SOCK_CONNECTED);
}
+
static bool is_node_reachable(WatchdogNode* wdNode)
{
if (is_socket_connection_connected(&wdNode->client_socket))
return false;
}
+static bool is_node_active(WatchdogNode* wdNode)
+{
+ if (wdNode->state == WD_DEAD || wdNode->state == WD_LOST || wdNode->state == WD_SHUTDOWN)
+ return false;
+ return true;
+}
+
+static bool is_node_active_and_reachable(WatchdogNode* wdNode)
+{
+ if (is_node_active(wdNode))
+ return is_node_reachable(wdNode);
+ return false;
+}
+
static int accept_incomming_connections(fd_set* rmask, int pending_fds_count)
{
int processed_fds = 0;
MemoryContextSwitchTo(oldCxt);
}
}
-
+
if (processed_fds >= pending_fds_count)
return processed_fds;
-
+
if ( FD_ISSET(g_cluster.command_server_sock, rmask) )
{
struct sockaddr addr;
socklen_t addrlen = sizeof(struct sockaddr);
processed_fds++;
-
+
int fd = accept(g_cluster.command_server_sock, &addr, &addrlen);
if (fd < 0)
{
return count;
}
-static bool write_packet_to_socket(int sock, WDPacketData* pkt)
+static bool write_packet_to_socket(int sock, WDPacketData* pkt, bool ipcPacket)
{
int ret = 0;
int command_id, len;
errdetail("%s",strerror(errno))));
return false;
}
- /* COMMAND */
- command_id = htonl(pkt->command_id);
- if (write(sock, &command_id, 4) < 4)
+ if (ipcPacket == false)
{
- ereport(LOG,
- (errmsg("failed to send command id, Socket:%d Type:[%s], Command_ID:%d, data Length:%d",sock,pkt_type?pkt_type->name:"NULL", pkt->command_id,pkt->len),
- errdetail("%s",strerror(errno))));
+ /* IPC packets does not have command ID field*/
+ command_id = htonl(pkt->command_id);
+ if (write(sock, &command_id, 4) < 4)
+ {
+ ereport(LOG,
+ (errmsg("failed to send command id, Socket:%d Type:[%s], Command_ID:%d, data Length:%d",sock,pkt_type?pkt_type->name:"NULL", pkt->command_id,pkt->len),
+ errdetail("%s",strerror(errno))));
- return false;
+ return false;
+ }
}
- /* LENGTH */
+ /* data length */
len = htonl(pkt->len);
if (write(sock, &len, 4) < 4)
{
return true;
}
+static void wd_packet_shallow_copy(WDPacketData* srcPkt, WDPacketData* dstPkt)
+{
+ dstPkt->command_id = srcPkt->command_id;
+ dstPkt->data = srcPkt->data;
+ dstPkt->len = srcPkt->len;
+ dstPkt->type = srcPkt->type;
+}
+
static void init_wd_packet(WDPacketData* pkt)
{
pkt->len = 0;
}
-static WDIPCCommandData* get_wd_IPC_command_from_reply(WDPacketData* pkt)
+static WDCommandData* get_wd_IPC_command_from_reply(WDPacketData* pkt)
{
ListCell *lc;
foreach(lc, g_cluster.ipc_commands)
{
- WDIPCCommandData* ipcCommand = lfirst(lc);
+ WDCommandData* ipcCommand = lfirst(lc);
if (ipcCommand)
{
- if (ipcCommand->internal_command_id == pkt->command_id)
+ if (ipcCommand->commandSource != COMMAND_SOURCE_IPC)
+ continue;
+ if (ipcCommand->commandPacket.command_id == pkt->command_id)
return ipcCommand;
}
}
return NULL;
}
-static WDIPCCommandData* get_wd_IPC_command_from_socket(int sock)
+static WDCommandData* get_wd_IPC_command_from_socket(int sock)
{
ListCell *lc;
foreach(lc, g_cluster.ipc_commands)
{
- WDIPCCommandData* ipcCommand = lfirst(lc);
+ WDCommandData* ipcCommand = lfirst(lc);
if (ipcCommand)
{
- if (ipcCommand->issueing_sock == sock)
+ if (ipcCommand->commandSource != COMMAND_SOURCE_IPC)
+ continue;
+
+ if (ipcCommand->sourceIPCSocket == sock)
return ipcCommand;
}
}
}
-static void cleanUpIPCCommand(WDIPCCommandData* ipcCommand)
+static void cleanUpIPCCommand(WDCommandData* ipcCommand)
{
/*
* close the socket associated with ipcCommand
* and remove it from ipcSocket list
*/
- if (ipcCommand->issueing_sock > 0)
+ if (ipcCommand->commandSource == COMMAND_SOURCE_IPC &&
+ ipcCommand->sourceIPCSocket > 0)
{
- close(ipcCommand->issueing_sock);
- g_cluster.ipc_command_socks = list_delete_int(g_cluster.ipc_command_socks,ipcCommand->issueing_sock);
- ipcCommand->issueing_sock = -1;
+ close(ipcCommand->sourceIPCSocket);
+ g_cluster.ipc_command_socks = list_delete_int(g_cluster.ipc_command_socks,ipcCommand->sourceIPCSocket);
+ ipcCommand->sourceIPCSocket = -1;
}
/* Now remove the ipcCommand instance from the command list */
g_cluster.ipc_commands = list_delete_ptr(g_cluster.ipc_commands,ipcCommand);
pfree(tempNode);
}
break;
-
- case WD_INTERLOCKING_REQUEST:
- node_has_requested_for_interlocking(wdNode, pkt);
- break;
-
- case WD_INTERUNLOCKING_REQUEST:
- node_has_resigned_from_interlocking(wdNode, pkt);
- break;
-
+
case WD_JOIN_COORDINATOR_MESSAGE:
{
/*
case WD_IAM_COORDINATOR_MESSAGE:
{
/*
- * if the message is received from coordinator reply with infor,
+ * if the message is received from coordinator reply with info,
* otherwise reject
*/
if (g_cluster.masterNode != NULL && wdNode != g_cluster.masterNode)
{
if (conn->sock > 0 && conn->sock_state == WD_SOCK_CONNECTED)
{
- if (write_packet_to_socket(conn->sock, pkt) == true)
+ if (write_packet_to_socket(conn->sock, pkt, false) == true)
return true;
ereport(DEBUG1,
(errmsg("sending packet failed, closing connection")));
static bool send_message_to_node(WatchdogNode* wdNode, WDPacketData *pkt)
{
- if (send_message_to_connection(&wdNode->client_socket,pkt) == true)
- return true;
- if (send_message_to_connection(&wdNode->server_socket,pkt) == true)
- return true;
- ereport(DEBUG1,
- (errmsg("sending packet to node \"%s\" failed, closing connection", wdNode->nodeName)));
- return false;
+ bool ret;
+ ret = send_message_to_connection(&wdNode->client_socket,pkt);
+ if (ret == false)
+ {
+ ret = send_message_to_connection(&wdNode->server_socket,pkt);
+ }
+ if (ret)
+ {
+ /* we only update the last sent time if reply for packet is expected */
+ switch (pkt->type) {
+ case WD_REMOTE_FAILOVER_REQUEST:
+ case WD_FAILOVER_LOCKING_REQUEST:
+ case WD_IPC_FAILOVER_COMMAND:
+ if (wdNode->last_sent_time.tv_sec <= 0)
+ gettimeofday(&wdNode->last_sent_time, NULL);
+ break;
+ default:
+ break;
+ }
+ }
+ else
+ {
+ ereport(DEBUG1,
+ (errmsg("sending packet %c to node \"%s\" failed", pkt->type,wdNode->nodeName)));
+ }
+ return ret;
}
/*
return 1;
return 0;
}
- /* NULL means send to all nodes */
+ /* NULL means send to all reachable nodes */
for (i=0; i< g_cluster.remoteNodeCount; i++)
{
wdNode = &(g_cluster.remoteNodes[i]);
- if (send_message_to_node(wdNode,pkt))
+ if (is_node_reachable(wdNode) && send_message_to_node(wdNode,pkt))
count++;
}
return count;
}
+static IPC_CMD_PREOCESS_RES wd_command_processor_for_node_lost_event(WDCommandData* ipcCommand, WatchdogNode* wdLostNode)
+{
+ if (ipcCommand->sendToNode)
+ {
+ /* The command was sent to one node only */
+ if (ipcCommand->sendToNode == wdLostNode)
+ {
+ /* Fail this command, Since the only
+ * node it was sent to is lost
+ */
+ ipcCommand->commandStatus = COMMAND_FINISHED_SEND_FAILED;
+ wd_command_is_complete(ipcCommand);
+ return IPC_CMD_ERROR;
+ }
+ else
+ {
+ /* Dont worry this command is fine for now */
+ return IPC_CMD_PROCESSING;
+ }
+ }
+ else
+ {
+ /* search the node that is lost */
+ int i;
+ for (i=0; i< g_cluster.remoteNodeCount; i++)
+ {
+ WDCommandNodeResult* nodeResult = &ipcCommand->nodeResults[i];
+ if (nodeResult->wdNode == wdLostNode)
+ {
+ if (nodeResult->cmdState == COMMAND_STATE_SENT)
+ {
+ ereport(LOG,
+ (errmsg("remote node \"%s\" lost while ipc command was in progress ",wdLostNode->nodeName)));
+ /* since the node is lost and will be removed from the cluster
+ * So remove decrement the sent count of command and see what is
+ * the situation after that
+ */
+ nodeResult->cmdState = COMMAND_STATE_DO_NOT_SEND;
+ ipcCommand->commandSendToCount--;
+ if (ipcCommand->commandSendToCount <= ipcCommand->commandReplyFromCount)
+ {
+ /* If we have already received the results from all alive nodes
+ * finish the command
+ */
+ ipcCommand->commandStatus = COMMAND_FINISHED_ALL_REPLIED;
+ wd_command_is_complete(ipcCommand);
+ return IPC_CMD_COMPLETE;
+ }
+ }
+ break;
+ }
+ }
+ }
+ return IPC_CMD_PROCESSING;
+}
+
+static void wd_command_is_complete(WDCommandData* ipcCommand)
+{
+ if (ipcCommand->commandCompleteFunc)
+ {
+ ipcCommand->commandCompleteFunc(ipcCommand);
+ return;
+ }
+ /* There is not special function for this command
+ * use the standard reply
+ */
+ if (ipcCommand->commandSource == COMMAND_SOURCE_IPC)
+ {
+ char res_type;
+ switch (ipcCommand->commandStatus) {
+ case COMMAND_FINISHED_ALL_REPLIED:
+ res_type = WD_IPC_CMD_RESULT_OK;
+ break;
+ case COMMAND_FINISHED_TIMEOUT:
+ res_type = WD_IPC_CMD_TIMEOUT;
+ break;
+ case COMMAND_FINISHED_NODE_REJECTED:
+ case COMMAND_FINISHED_SEND_FAILED:
+ res_type = WD_IPC_CMD_RESULT_BAD;
+ break;
+ default:
+ res_type= WD_IPC_CMD_RESULT_OK;
+ break;
+ }
+ write_ipc_command_with_result_data(ipcCommand, res_type, NULL, 0);
+ }
+ else if (ipcCommand->commandSource == COMMAND_SOURCE_REMOTE)
+ {
+ char res_type;
+
+ if (ipcCommand->commandStatus == COMMAND_FINISHED_ALL_REPLIED)
+ res_type = WD_ACCEPT_MESSAGE;
+ else
+ res_type = WD_REJECT_MESSAGE;
+
+ reply_with_minimal_message(ipcCommand->sourceWdNode, res_type, &ipcCommand->commandPacket);
+ }
+}
+
+
+static void node_lost_while_ipc_command(WatchdogNode* wdNode)
+{
+ List* ipcCommands_to_del = NIL;
+ ListCell *lc;
+ foreach(lc, g_cluster.ipc_commands)
+ {
+ WDCommandData* ipcCommand = lfirst(lc);
+ IPC_CMD_PREOCESS_RES res = wd_command_processor_for_node_lost_event(ipcCommand, wdNode);
+ if (res != IPC_CMD_PROCESSING)
+ {
+ ipcCommands_to_del = lappend(ipcCommands_to_del,ipcCommand);
+ }
+ }
+ /* delete completed commands */
+ foreach(lc, ipcCommands_to_del)
+ {
+ WDCommandData* ipcCommand = lfirst(lc);
+ cleanUpIPCCommand(ipcCommand);
+ }
+}
+
+
+/*
+ * The function walks through all command and resends
+ * the failed maessage again if it can.
+ */
+static void service_ipc_commands(void)
+{
+ ListCell *lc;
+ foreach(lc, g_cluster.ipc_commands)
+ {
+ WDCommandData* ipcCommand = lfirst(lc);
+
+ if (ipcCommand && ipcCommand->commandSendToErrorCount)
+ {
+ int i;
+ for (i=0; i< g_cluster.remoteNodeCount; i++)
+ {
+ WDCommandNodeResult* nodeResult = &ipcCommand->nodeResults[i];
+ if (nodeResult->cmdState == COMMAND_STATE_SEND_ERROR)
+ {
+ if (is_node_active_and_reachable(nodeResult->wdNode))
+ {
+ ereport(LOG,
+ (errmsg("remote node \"%s\" is reachable again, resending the command packet ",nodeResult->wdNode->nodeName)));
+
+ if (send_message_to_node(nodeResult->wdNode, &ipcCommand->commandPacket) == true)
+ {
+ nodeResult->cmdState = COMMAND_STATE_SENT;
+ ipcCommand->commandSendToErrorCount--;
+ ipcCommand->commandSendToCount++;
+ if (ipcCommand->commandSendToErrorCount == 0)
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
static void service_internal_command(void)
{
int i;
WDCommandNodeResult* nodeResult = &g_cluster.currentCommand.nodeResults[i];
if (nodeResult->cmdState == COMMAND_STATE_SEND_ERROR)
{
- if (is_node_reachable(nodeResult->wdNode))
+ if (is_node_active_and_reachable(nodeResult->wdNode))
+ {
+ if (send_message_to_node(nodeResult->wdNode, &g_cluster.currentCommand.commandPacket) == true)
+ {
+ nodeResult->cmdState = COMMAND_STATE_SENT;
+ g_cluster.currentCommand.commandSendToCount++;
+ }
+ }
+ }
+ }
+}
+
+/* remove the unreachable nodes from cluster */
+static void service_unreachable_nodes(void)
+{
+ int i;
+ struct timeval currTime;
+ gettimeofday(&currTime,NULL);
+
+ for (i = 0; i< g_cluster.remoteNodeCount; i++)
+ {
+ WatchdogNode* wdNode = &(g_cluster.remoteNodes[i]);
+
+ if (is_node_active(wdNode) == false)
+ continue;
+
+ if (is_node_reachable(wdNode) || wdNode->client_socket.sock_state == WD_SOCK_WAITING_FOR_CONNECT)
+ {
+ /* check if we are waiting for reply from this node */
+ if (wdNode->last_sent_time.tv_sec > 0)
{
- if (send_message_to_node(nodeResult->wdNode, &g_cluster.currentCommand.packet) == true)
+ if (WD_TIME_DIFF_SEC(currTime,wdNode->last_sent_time) >= MAX_SECS_WAIT_FOR_REPLY_FROM_NODE)
{
- nodeResult->cmdState = COMMAND_STATE_SENT;
- g_cluster.currentCommand.commandSendToCount++;
+ ereport(LOG,
+ (errmsg("remote node \"%s\" is not replying..",wdNode->nodeName),
+ errdetail("marking the node as lost")));
+ /* mark the node as lost */
+ watchdog_state_machine(WD_EVENT_REMOTE_NODE_LOST, wdNode, NULL);
}
}
}
+ else
+ {
+ ereport(LOG,
+ (errmsg("remote node \"%s\" is not reachable",wdNode->nodeName),
+ errdetail("marking the node as lost")));
+ watchdog_state_machine(WD_EVENT_REMOTE_NODE_LOST, wdNode, NULL);
+ }
}
}
int i;
WDCommandNodeResult* nodeResult = NULL;
/* verify the packet is reply for our command */
- if (pkt->command_id != g_cluster.currentCommand.packet.command_id)
+ if (pkt->command_id != g_cluster.currentCommand.commandPacket.command_id)
return false;
if (g_cluster.currentCommand.commandStatus != COMMAND_IN_PROGRESS)
return false;
if (g_cluster.currentCommand.commandReplyFromCount >= g_cluster.currentCommand.commandSendToCount)
{
- g_cluster.currentCommand.commandFinished = true;
if (pkt->type == WD_REJECT_MESSAGE || pkt->type == WD_ERROR_MESSAGE)
g_cluster.currentCommand.commandStatus = COMMAND_FINISHED_NODE_REJECTED;
else
else if (pkt->type == WD_REJECT_MESSAGE || pkt->type == WD_ERROR_MESSAGE)
{
/* Error or reject message by any node imidiately finishes the command */
- g_cluster.currentCommand.commandFinished = true;
g_cluster.currentCommand.commandStatus = COMMAND_FINISHED_NODE_REJECTED;
watchdog_state_machine(WD_EVENT_COMMAND_FINISHED, wdNode, pkt);
}
static void check_for_current_command_timeout(void)
{
struct timeval currTime;
- if (g_cluster.currentCommand.commandStatus != COMMAND_IN_PROGRESS ||
- g_cluster.currentCommand.commandFinished != 0)
+ if (g_cluster.currentCommand.commandStatus != COMMAND_IN_PROGRESS)
return;
gettimeofday(&currTime,NULL);
if (WD_TIME_DIFF_SEC(currTime,g_cluster.currentCommand.commandTime) >= g_cluster.currentCommand.commandTimeoutSecs)
{
- g_cluster.currentCommand.commandFinished = true;
g_cluster.currentCommand.commandStatus = COMMAND_FINISHED_TIMEOUT;
watchdog_state_machine(WD_EVENT_COMMAND_FINISHED, NULL, NULL);
}
}
-static char get_current_command_resultant_message_type(void)
-{
- char res = WD_ACCEPT_MESSAGE;
- int i;
- if (g_cluster.currentCommand.commandFinished == 0)
- return WD_NO_MESSAGE;
- if (g_cluster.currentCommand.sendToNode == NULL)
- {
- /* The command was for all nodes */
- for (i = 0; i< g_cluster.remoteNodeCount; i++)
- {
- WDCommandNodeResult* nodeRes = &g_cluster.currentCommand.nodeResults[i];
- if (nodeRes->cmdState != COMMAND_STATE_REPLIED)
- continue;
- if (nodeRes->result_type != WD_ACCEPT_MESSAGE && nodeRes->result_type != WD_INFO_MESSAGE)
- {
- /* failed */
- if (res != WD_ERROR_MESSAGE)
- res = nodeRes->result_type;
- }
- }
- }
- else
- {
- if (g_cluster.currentCommand.commandSendToCount == 0) /* We failed to send to any node */
- return WD_ERROR_MESSAGE;
- if (g_cluster.currentCommand.commandReplyFromCount == 0) /* We got no reply */
- return WD_ERROR_MESSAGE;
- for (i = 0; i< g_cluster.remoteNodeCount; i++)
- {
- WDCommandNodeResult* nodeRes = &g_cluster.currentCommand.nodeResults[i];
- if (nodeRes->wdNode == g_cluster.currentCommand.sendToNode)
- {
- if (nodeRes->cmdState != COMMAND_STATE_REPLIED)
- return WD_ERROR_MESSAGE;
- return nodeRes->result_type;
- }
- }
- return WD_ERROR_MESSAGE;
- }
- return res;
-}
static void clear_current_command(void)
{
g_cluster.currentCommand.commandStatus = COMMAND_EMPTY;
- g_cluster.currentCommand.packet.type = WD_NO_MESSAGE;
- if (g_cluster.currentCommand.packet.data)
- pfree(g_cluster.currentCommand.packet.data);
+ g_cluster.currentCommand.sourcePacket.type = WD_NO_MESSAGE;
+ g_cluster.currentCommand.commandPacket.type = WD_NO_MESSAGE;
+ if (g_cluster.currentCommand.sourcePacket.data)
+ pfree(g_cluster.currentCommand.sourcePacket.data);
+ g_cluster.currentCommand.sourcePacket.data = NULL;
+ g_cluster.currentCommand.commandPacket.data = NULL;
}
/*
gettimeofday(&g_cluster.currentCommand.commandTime, NULL);
g_cluster.currentCommand.commandTimeoutSecs = timeout_sec;
- g_cluster.currentCommand.packet.type = pkt->type;
- g_cluster.currentCommand.packet.command_id = pkt->command_id;
- g_cluster.currentCommand.packet.len = 0;
- g_cluster.currentCommand.packet.data = NULL;
+ g_cluster.currentCommand.commandPacket.type = pkt->type;
+ g_cluster.currentCommand.commandPacket.command_id = pkt->command_id;
+ g_cluster.currentCommand.commandPacket.len = 0;
+ g_cluster.currentCommand.commandPacket.data = NULL;
g_cluster.currentCommand.sendToNode = wdNode;
g_cluster.currentCommand.commandSendToCount = 0;
{
WDCommandNodeResult* nodeResult = &g_cluster.currentCommand.nodeResults[i];
clear_command_node_result(nodeResult);
- if (nodeResult->wdNode->state == WD_DEAD || nodeResult->wdNode->state == WD_SHUTDOWN)
+ if (is_node_active(nodeResult->wdNode) == false)
{
ereport(DEBUG2,
(errmsg("not sending watchdog internal command packet to DEAD %s",nodeResult->wdNode->nodeName)));
}
if (save_message && pkt->len > 0)
{
- g_cluster.currentCommand.packet.data = MemoryContextAlloc(TopMemoryContext,pkt->len);
- memcpy(g_cluster.currentCommand.packet.data,pkt->data,pkt->len);
- g_cluster.currentCommand.packet.len = pkt->len;
+ g_cluster.currentCommand.commandPacket.data = MemoryContextAlloc(TopMemoryContext,pkt->len);
+ memcpy(g_cluster.currentCommand.commandPacket.data,pkt->data,pkt->len);
+ g_cluster.currentCommand.commandPacket.len = pkt->len;
}
- g_cluster.currentCommand.commandFinished = false;
return g_cluster.currentCommand.commandSendToCount;
}
for (i = 0; i< g_cluster.remoteNodeCount; i++)
{
WatchdogNode* wdNode = &(g_cluster.remoteNodes[i]);
- if (wdNode->state == WD_SHUTDOWN)
+ if (wdNode->state == WD_SHUTDOWN || wdNode->state == WD_DEAD)
continue;
if (is_socket_connection_connected(&wdNode->client_socket) == false)
watchdog_state_machine(WD_EVENT_NEW_OUTBOUND_CONNECTION, wdNode, NULL);
ret = true;
}
-
}
}
}
return ret;
}
-
/*
* The function only considers the node state.
* All node states count towards the cluster participating nodes
WDPacketData *pkt = get_message_of_type(type);
if (pkt)
{
+ /* If the old command is in progress and we are about to issue a new command
+ * finish the old command with timeout
+ */
+ if (is_cluster_command_in_progress())
+ {
+ g_cluster.currentCommand.commandStatus = COMMAND_FINISHED_TIMEOUT;
+ watchdog_state_machine(WD_EVENT_COMMAND_FINISHED, NULL, NULL);
+ }
ret = issue_watchdog_internal_command(wdNode, pkt, timeout_sec);
free_packet(pkt);
}
return g_cluster.localNode->state;
}
-
/*
* returns true if no message is swollowed by the
* processor and no further action is required
*/
static bool wd_commands_packet_processor(WD_EVENTS event, WatchdogNode* wdNode, WDPacketData* pkt)
{
- WDIPCCommandData* ipcCommand;
+ WDCommandData* ipcCommand;
if (event != WD_EVENT_PACKET_RCV)
return false;
if (pkt == NULL)
return false;
-
- if (pkt->type == WD_FAILOVER_CMD_SYNC_REQUEST)
+
+ if (pkt->type == WD_FAILOVER_LOCKING_REQUEST)
{
- process_failover_command_sync_requests(wdNode, pkt, NULL);
+ process_remote_failover_locking_request(wdNode, pkt);
return true;
}
-
- if (pkt->type == WD_REPLICATE_VARIABLE_REQUEST)
+
+ if (pkt->type == WD_IPC_FAILOVER_COMMAND)
{
- process_pgpool_replicate_command(wdNode, pkt);
+ process_remote_failover_command_on_coordinator(wdNode, pkt);
return true;
}
-
- if (pkt->type == WD_INTERLOCKING_REQUEST)
+
+ if (pkt->type == WD_REMOTE_FAILOVER_REQUEST)
{
- node_has_requested_for_interlocking(wdNode, pkt);
+ process_pgpool_remote_failover_command(wdNode, pkt);
return true;
}
-
- if (pkt->type == WD_INTERUNLOCKING_REQUEST)
+
+ if (pkt->type == WD_IPC_ONLINE_RECOVERY_COMMAND)
{
- node_has_resigned_from_interlocking(wdNode, pkt);
+ process_remote_online_recovery_command(wdNode, pkt);
return true;
}
-
+
if (pkt->type == WD_DATA_MESSAGE)
{
ipcCommand = get_wd_IPC_command_from_reply(pkt);
}
return false;
}
+
+ if (pkt->type == WD_CMD_REPLY_IN_DATA)
+ {
+ ipcCommand = get_wd_IPC_command_from_reply(pkt);
+ if (ipcCommand == NULL)
+ return false;
+
+ /* Just forward the data to IPC socket and finsh the command */
+ if (write_ipc_command_with_result_data(ipcCommand, WD_IPC_CMD_RESULT_OK, pkt->data, pkt->len) == false)
+ ereport(LOG,
+ (errmsg("failed to forward data message to IPC command socket")));
+ /*
+ * ok we are done, delete this command
+ */
+ cleanUpIPCCommand(ipcCommand);
+ return true; /* do not process this packet further */
+ }
-
- if (pkt->type == WD_ACCEPT_MESSAGE ||
- pkt->type == WD_REJECT_MESSAGE ||
- pkt->type == WD_ERROR_MESSAGE)
+ else if (pkt->type == WD_ACCEPT_MESSAGE ||
+ pkt->type == WD_REJECT_MESSAGE ||
+ pkt->type == WD_ERROR_MESSAGE)
{
ipcCommand = get_wd_IPC_command_from_reply(pkt);
+
if (ipcCommand == NULL)
- {
return false;
+
+ if (ipcCommand->commandPacket.type == WD_IPC_FAILOVER_COMMAND)
+ {
+ if (pkt->type == WD_ACCEPT_MESSAGE)
+ reply_to_failove_command(ipcCommand, FAILOVER_RES_PROCEED, 0);
+ else
+ reply_to_failove_command(ipcCommand, FAILOVER_RES_MASTER_REJECTED, 0);
+ return true;
}
-
- if (ipcCommand->type == WD_INTERLOCKING_REQUEST ||
- ipcCommand->type == WD_INTERUNLOCKING_REQUEST ||
- ipcCommand->type == WD_FAILOVER_CMD_SYNC_REQUEST)
+
+ else if (ipcCommand->commandPacket.type == WD_FAILOVER_LOCKING_REQUEST)
{
/*
* we are expecting only one reply for this
*/
char res_type = WD_IPC_CMD_RESULT_BAD;
if (pkt->type == WD_ACCEPT_MESSAGE)
- {
- /* okay we are the lock holder */
- g_cluster.lockHolderNode = g_cluster.localNode;
res_type = WD_IPC_CMD_RESULT_OK;
- }
+ else
+ res_type = WD_IPC_CMD_RESULT_BAD;
if (write_ipc_command_with_result_data(ipcCommand, res_type, NULL, 0) == false)
ereport(LOG,
return true; /* do not process this packet further */
}
- if (ipcCommand->type == WD_REPLICATE_VARIABLE_REQUEST)
+ else if (ipcCommand->commandPacket.type == WD_REMOTE_FAILOVER_REQUEST ||
+ ipcCommand->commandPacket.type == WD_IPC_ONLINE_RECOVERY_COMMAND)
+ {
return reply_is_received_for_pgpool_replicate_command(wdNode, pkt, ipcCommand);
+ }
}
return false;
static int watchdog_state_machine(WD_EVENTS event, WatchdogNode* wdNode, WDPacketData* pkt)
{
ereport(DEBUG1,
- (errmsg("STATE MACHINE INVOKED WITH EVENT = %s Current State = %s",wd_event_name[event], wd_state_names[get_local_node_state()])));
+ (errmsg("STATE MACHINE INVOKED WITH EVENT = %s Current State = %s",
+ wd_event_name[event], wd_state_names[get_local_node_state()])));
if (event == WD_EVENT_REMOTE_NODE_LOST)
{
+ /* close all socket connections to the node */
+ close_socket_connection(&wdNode->client_socket);
+ close_socket_connection(&wdNode->server_socket);
+
if (wdNode->state == WD_SHUTDOWN)
{
ereport(LOG,
(errmsg("watchdog cluster has lost the coordinator node")));
g_cluster.masterNode = NULL;
}
+
+ /* clear the wait timer on the node */
+ wdNode->last_sent_time.tv_sec = 0;
+ wdNode->last_sent_time.tv_usec = 0;
+ node_lost_while_ipc_command(wdNode);
+
}
else if (event == WD_EVENT_PACKET_RCV)
{
standard_packet_processor(wdNode, pkt);
}
- if (pkt->type == WD_INFORM_I_AM_GOING_DOWN) /* TODO do it better way */
+ if (pkt->type == WD_INFORM_I_AM_GOING_DOWN)
{
wdNode->state = WD_SHUTDOWN;
return watchdog_state_machine(WD_EVENT_REMOTE_NODE_LOST, wdNode, NULL);
}
+
if (watchdog_internal_command_packet_processor(wdNode,pkt) == true)
{
return 0;
}
}
free_packet(addPkt);
- set_timeout(4);
+ set_timeout(MAX_SECS_WAIT_FOR_REPLY_FROM_NODE);
}
break;
{
case WD_EVENT_WD_STATE_CHANGED:
g_cluster.masterNode = NULL;
- send_cluster_command(NULL, WD_REQ_INFO_MESSAGE, 5);
- set_timeout(5);
+ try_connecting_with_all_unreachable_nodes();
+ send_cluster_command(NULL, WD_REQ_INFO_MESSAGE, 4);
+ set_timeout(MAX_SECS_WAIT_FOR_REPLY_FROM_NODE);
break;
case WD_EVENT_TIMEOUT:
case WD_EVENT_COMMAND_FINISHED:
{
- if (g_cluster.currentCommand.packet.type == WD_REQ_INFO_MESSAGE)
+ if (g_cluster.currentCommand.commandPacket.type == WD_REQ_INFO_MESSAGE)
set_state(WD_INITIALIZING);
}
break;
if (wdNode->state == WD_ADD_MESSAGE_SENT)
ereport(FATAL,
(return_code(POOL_EXIT_FATAL),
- errmsg("Add to watchdog cluster request is rejected by node \"%s:%d\"",wdNode->hostname,wdNode->wd_port),
+ errmsg("add to watchdog cluster request is rejected by node \"%s:%d\"",wdNode->hostname,wdNode->wd_port),
errhint("check the watchdog configurations.")));
break;
switch (event)
{
case WD_EVENT_WD_STATE_CHANGED:
- send_cluster_command(NULL, WD_STAND_FOR_COORDINATOR_MESSAGE, 5);
+ send_cluster_command(NULL, WD_STAND_FOR_COORDINATOR_MESSAGE, 4);
/* wait for 5 seconds if someone rejects us*/
- set_timeout(5);
+ set_timeout(MAX_SECS_WAIT_FOR_REPLY_FROM_NODE);
break;
case WD_EVENT_COMMAND_FINISHED:
{
- if (g_cluster.currentCommand.packet.type == WD_STAND_FOR_COORDINATOR_MESSAGE)
+ if (g_cluster.currentCommand.commandPacket.type == WD_STAND_FOR_COORDINATOR_MESSAGE)
{
if (g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_ALL_REPLIED ||
g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_TIMEOUT)
case WD_EVENT_WD_STATE_CHANGED:
{
int i;
- send_cluster_command(NULL, WD_DECLARE_COORDINATOR_MESSAGE, 5);
- set_timeout(10);
+ send_cluster_command(NULL, WD_DECLARE_COORDINATOR_MESSAGE, 4);
+ set_timeout(MAX_SECS_WAIT_FOR_REPLY_FROM_NODE);
ereport(LOG,
(errmsg("I am announcing my self as master/coordinator watchdog node")));
case WD_EVENT_COMMAND_FINISHED:
{
- if (g_cluster.currentCommand.packet.type == WD_DECLARE_COORDINATOR_MESSAGE)
+ if (g_cluster.currentCommand.commandPacket.type == WD_DECLARE_COORDINATOR_MESSAGE)
{
if (g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_ALL_REPLIED ||
g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_TIMEOUT)
}
}
- else if (g_cluster.currentCommand.packet.type == WD_IAM_COORDINATOR_MESSAGE)
+ else if (g_cluster.currentCommand.commandPacket.type == WD_IAM_COORDINATOR_MESSAGE)
{
if (g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_ALL_REPLIED)
{
break;
case WD_EVENT_TIMEOUT:
- send_cluster_command(NULL, WD_IAM_COORDINATOR_MESSAGE, BEACON_MESSAGE_INTERVAL_SECONDS);
- set_timeout(BEACON_MESSAGE_INTERVAL_SECONDS);
+ if (is_cluster_command_in_progress())
+ {
+ set_timeout(MAX_SECS_WAIT_FOR_REPLY_FROM_NODE);
+ }
+ else
+ {
+ send_cluster_command(NULL, WD_IAM_COORDINATOR_MESSAGE, 5);
+ set_timeout(BEACON_MESSAGE_INTERVAL_SECONDS);
+ }
break;
case WD_EVENT_REMOTE_NODE_LOST:
{
case WD_EVENT_WD_STATE_CHANGED:
clear_current_command();
- set_timeout(6);
+ set_timeout(MAX_SECS_WAIT_FOR_REPLY_FROM_NODE);
break;
case WD_EVENT_TIMEOUT:
case WD_EVENT_COMMAND_FINISHED:
{
- if (g_cluster.currentCommand.packet.type == WD_JOIN_COORDINATOR_MESSAGE)
+ if (g_cluster.currentCommand.commandPacket.type == WD_JOIN_COORDINATOR_MESSAGE)
{
if (g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_ALL_REPLIED ||
g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_TIMEOUT)
}
-static void allocate_resultNodes_in_IPCCommand(WDIPCCommandData* ipcCommand)
+static void allocate_resultNodes_in_IPCCommand(WDCommandData* ipcCommand)
{
MemoryContext oldCxt;
int i;
MemoryContextSwitchTo(oldCxt);
}
-static IPC_CMD_PREOCESS_RES execute_replicate_command(WDIPCCommandData* ipcCommand)
-{
- int i;
- IPC_CMD_PREOCESS_RES res;
-
- WDPacketData wdPacket;
- init_wd_packet(&wdPacket);
- set_message_type(&wdPacket, WD_REPLICATE_VARIABLE_REQUEST);
- set_next_commandID_in_message(&wdPacket);
- set_message_data(&wdPacket,ipcCommand->data_buf, ipcCommand->data_len);
-
- allocate_resultNodes_in_IPCCommand(ipcCommand);
- ipcCommand->sendTo_count = 0;
- ipcCommand->reply_from_count = 0;
- ipcCommand->internal_command_id = wdPacket.command_id;
- ipcCommand->type = wdPacket.type;
-
- for (i=0; i< g_cluster.remoteNodeCount; i++)
- {
- WDCommandNodeResult* nodeResult = &ipcCommand->nodeResults[i];
- if (send_message_to_node(nodeResult->wdNode, &wdPacket) == true)
- {
- nodeResult->cmdState = COMMAND_STATE_SENT;
- ipcCommand->sendTo_count++;
- }
- else
- nodeResult->cmdState = COMMAND_STATE_SEND_ERROR;
- }
- /*
- * The current quorum status of standby node can be out of sync,
- * so update it before making any decision on command success
- */
- if (get_local_node_state() == WD_STANDBY)
- update_quorum_status();
-
- if (ipcCommand->sendTo_count == 0)
- {
- /* We are not able to send the message to any node.
- * But this does not straight away means we are failed.
- * There are two scenarios.
- *
- * 1- The current cluster setting requires only single node
- * to complete the quorum.
- *
- * 2- Currrently the cluster does not holds the quorum and
- * I am the only node alive
- *
- * in these both of these above cases the command will be marked as successful
- * even if we are not able to send to any node
- */
- if (get_mimimum_nodes_required_for_quorum() == 0)
- res = IPC_CMD_COMPLETE;
- /*
- * If quorum is not present at the moment, Sending to all connected nodes
- * is enough to mark it as success
- */
- else if (g_cluster.quorum_status < 0 && get_cluster_node_count() == 0)
- res = IPC_CMD_COMPLETE;
- else
- res = IPC_CMD_ERROR;
- }
- else if (ipcCommand->sendTo_count < get_mimimum_nodes_required_for_quorum() )
- {
- if (g_cluster.quorum_status < 0 && get_cluster_node_count() == ipcCommand->sendTo_count)
- res = IPC_CMD_PROCESSING;
- else
- res = IPC_CMD_ERROR;
- }
- else
- {
- res = IPC_CMD_PROCESSING;
- }
- return res;
-}
-
-static bool process_pgpool_replicate_command(WatchdogNode* wdNode, WDPacketData* pkt)
+static void process_pgpool_remote_failover_command(WatchdogNode* wdNode, WDPacketData* pkt)
{
char* func_name;
int node_count = 0;
int *node_id_list = NULL;
- bool ret = false;
if (pkt->data == NULL || pkt->len == 0)
{
ereport(LOG,
- (errmsg("watchdog is unable to process pgpool replicate command"),
+ (errmsg("watchdog is unable to process pgpool failover command"),
errdetail("command packet contains no data")));
- return false;
+ reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
+ return;
}
+ if (wdNode != g_cluster.masterNode)
+ {
+ ereport(LOG,
+ (errmsg("watchdog is unable to process pgpool failover command received from \"%s\"",wdNode->nodeName),
+ errdetail("only master/coordinator (\"%s\") node can send the replicate commands",g_cluster.masterNode->nodeName)));
+ reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
+ return;
+ }
if (parse_wd_node_function_json(pkt->data, pkt->len, &func_name, &node_id_list, &node_count))
- ret = process_wd_command_function(wdNode, pkt, func_name, node_count, node_id_list);
+ {
+ ereport(LOG,
+ (errmsg("watchdog received the failover command from \"%s\"",wdNode->nodeName)));
+ process_wd_command_function(wdNode, pkt, func_name, node_count, node_id_list, pkt->command_id);
+ }
else
+ {
+ ereport(LOG,
+ (errmsg("watchdog is unable to process pgpool failover command"),
+ errdetail("command packet contains invalid data")));
reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
+ }
if (func_name)
pfree(func_name);
if (node_id_list)
pfree(node_id_list);
-
- return ret;
}
-static bool process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt, char* func_name, int node_count, int* node_id_list)
+static void process_remote_online_recovery_command(WatchdogNode* wdNode, WDPacketData* pkt)
{
- if (strcasecmp(WD_FUNCTION_START_RECOVERY, func_name) == 0)
+ char* func_name;
+ int node_count = 0;
+ int *node_id_list = NULL;
+
+ if (pkt->data == NULL || pkt->len == 0)
{
- if (*InRecovery != RECOVERY_INIT)
- {
- reply_with_minimal_message(wdNode, WD_REJECT_MESSAGE, pkt);
- }
- else
+ ereport(LOG,
+ (errmsg("watchdog is unable to process pgpool online recovery command"),
+ errdetail("command packet contains no data")));
+ reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
+ return;
+ }
+
+ ereport(LOG,
+ (errmsg("watchdog received online recovery request from \"%s\"",wdNode->nodeName)));
+
+ if (parse_wd_node_function_json(pkt->data, pkt->len, &func_name, &node_id_list, &node_count))
+ {
+ if (strcasecmp(WD_FUNCTION_START_RECOVERY, func_name) == 0)
{
- *InRecovery = RECOVERY_ONLINE;
- if (Req_info->conn_counter == 0)
- {
- reply_with_minimal_message(wdNode, WD_ACCEPT_MESSAGE, pkt);
- }
- else if(pool_config->recovery_timeout <= 0)
+ if (*InRecovery != RECOVERY_INIT)
{
reply_with_minimal_message(wdNode, WD_REJECT_MESSAGE, pkt);
}
else
{
- WDFunctionCommandData* wd_func_command;
- MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext);
-
- wd_func_command = palloc(sizeof(WDFunctionCommandData));
- wd_func_command->commandType = pkt->type;
- wd_func_command->commandID = pkt->command_id;
- wd_func_command->funcName = MemoryContextStrdup(TopMemoryContext,func_name);
- wd_func_command->wdNode = wdNode;
+ *InRecovery = RECOVERY_ONLINE;
+ if (Req_info->conn_counter == 0)
+ {
+ reply_with_minimal_message(wdNode, WD_ACCEPT_MESSAGE, pkt);
+ }
+ else if(pool_config->recovery_timeout <= 0)
+ {
+ reply_with_minimal_message(wdNode, WD_REJECT_MESSAGE, pkt);
+ }
+ else
+ {
+ WDFunctionCommandData* wd_func_command;
+ MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext);
- /* Add this command for timer tick */
- add_wd_command_for_timer_events(pool_config->recovery_timeout, true, wd_func_command);
+ wd_func_command = palloc(sizeof(WDFunctionCommandData));
+ wd_func_command->commandType = pkt->type;
+ wd_func_command->commandID = pkt->command_id;
+ wd_func_command->funcName = MemoryContextStrdup(TopMemoryContext,func_name);
+ wd_func_command->wdNode = wdNode;
- MemoryContextSwitchTo(oldCxt);
-
+ /* Add this command for timer tick */
+ add_wd_command_for_timer_events(pool_config->recovery_timeout, true, wd_func_command);
+
+ MemoryContextSwitchTo(oldCxt);
+ }
}
}
+ else if (strcasecmp(WD_FUNCTION_END_RECOVERY, func_name) == 0)
+ {
+ *InRecovery = RECOVERY_INIT;
+ reply_with_minimal_message(wdNode, WD_ACCEPT_MESSAGE, pkt);
+ kill(getppid(), SIGUSR2);
+ }
+ else
+ {
+ ereport(LOG,
+ (errmsg("watchdog failed to process online recovery request"),
+ errdetail("invalid command [%s] in online recovery request from \"%s\"",func_name,wdNode->nodeName)));
+ reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
+ }
}
- else if (strcasecmp(WD_FUNCTION_END_RECOVERY, func_name) == 0)
+ else
{
- *InRecovery = RECOVERY_INIT;
- reply_with_minimal_message(wdNode, WD_ACCEPT_MESSAGE, pkt);
- kill(getppid(), SIGUSR2);
+ ereport(LOG,
+ (errmsg("watchdog failed to process online recovery request"),
+ errdetail("invalid data in online recovery request from \"%s\"",wdNode->nodeName)));
+ reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
}
- else if (strcasecmp(WD_FUNCTION_FAILBACK_REQUEST, func_name) == 0)
+ if (func_name)
+ pfree(func_name);
+ if (node_id_list)
+ pfree(node_id_list);
+}
+
+static void process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt, char* func_name,
+ int node_count, int* node_id_list, unsigned int failover_id)
+{
+ if (strcasecmp(WD_FUNCTION_FAILBACK_REQUEST, func_name) == 0)
{
if (Req_info->switching)
{
else
{
reply_with_minimal_message(wdNode, WD_ACCEPT_MESSAGE, pkt);
- wd_set_node_mask_for_failback_req(node_id_list, node_count);
- send_failback_request(node_id_list[0],false);
+ send_failback_request(node_id_list[0],false, failover_id);
}
}
else
{
reply_with_minimal_message(wdNode, WD_ACCEPT_MESSAGE, pkt);
- wd_set_node_mask_for_degenerate_req(node_id_list, node_count);
- degenerate_backend_set(node_id_list, node_count, false);
+ degenerate_backend_set(node_id_list, node_count, false, failover_id);
}
}
-
+
else if (strcasecmp(WD_FUNCTION_PROMOTE_REQUEST, func_name) == 0)
{
if (Req_info->switching)
else
{
reply_with_minimal_message(wdNode, WD_ACCEPT_MESSAGE, pkt);
- wd_set_node_mask_for_promote_req(node_id_list, node_count);
- promote_backend(node_id_list[0]);
+ promote_backend(node_id_list[0], failover_id);
}
}
else
/* This is not supported function */
reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
}
- return true;
}
-static bool reply_is_received_for_pgpool_replicate_command(WatchdogNode* wdNode, WDPacketData* pkt, WDIPCCommandData* ipcCommand)
+static bool reply_is_received_for_pgpool_replicate_command(WatchdogNode* wdNode, WDPacketData* pkt, WDCommandData* ipcCommand)
{
int i;
WDCommandNodeResult* nodeResult = NULL;
(errmsg("unable to find result node for pgpool-II replicate command packet received from watchdog node \"%s\"",wdNode->nodeName)));
return true;
}
+
nodeResult->result_type = pkt->type;
nodeResult->cmdState = COMMAND_STATE_REPLIED;
- ipcCommand->reply_from_count++;
+ ipcCommand->commandReplyFromCount++;
ereport(DEBUG2,
(errmsg("watchdog node \"%s\" has replied for pgpool-II replicate command packet",wdNode->nodeName),
- errdetail("command was sent to %d nodes and %d nodes have replied to it",ipcCommand->sendTo_count,ipcCommand->reply_from_count)));
+ errdetail("command was sent to %d nodes and %d nodes have replied to it",ipcCommand->commandSendToCount,ipcCommand->commandReplyFromCount)));
- if (ipcCommand->reply_from_count >= ipcCommand->sendTo_count)
+ if (pkt->type != WD_ACCEPT_MESSAGE)
+ {
+ /* reject message from any node finishes the command */
+ ipcCommand->commandStatus = COMMAND_FINISHED_NODE_REJECTED;
+ wd_command_is_complete(ipcCommand);
+ cleanUpIPCCommand(ipcCommand);
+ }
+ else if (ipcCommand->commandReplyFromCount >= ipcCommand->commandSendToCount)
{
/*
* we have received results from all nodes
* analyze the result
*/
-
- char res_type = WD_IPC_CMD_RESULT_OK;
-
- for (i=0; i< g_cluster.remoteNodeCount; i++)
- {
- nodeResult = &ipcCommand->nodeResults[i];
- if (nodeResult->cmdState == COMMAND_STATE_REPLIED &&
- nodeResult->result_type != WD_ACCEPT_MESSAGE)
- {
- res_type = WD_IPC_CMD_RESULT_BAD;
- break;
- }
- }
- if (write_ipc_command_with_result_data(ipcCommand, res_type, NULL, 0) == false)
- ereport(LOG,
- (errmsg("failed to forward message to IPC command socket")));
-
+ ipcCommand->commandStatus = COMMAND_FINISHED_ALL_REPLIED;
+ wd_command_is_complete(ipcCommand);
cleanUpIPCCommand(ipcCommand);
}
*/
static bool process_wd_command_timer_event(bool timer_expired, WDFunctionCommandData* wd_func_command)
{
- if (wd_func_command->commandType == WD_REPLICATE_VARIABLE_REQUEST)
+ if (wd_func_command->commandType == WD_IPC_ONLINE_RECOVERY_COMMAND)
{
if (wd_func_command->funcName && strcasecmp("START_RECOVERY", wd_func_command->funcName) == 0)
{
WDPacketData emptyPkt;
emptyPkt.command_id = wd_func_command->commandID;
reply_with_minimal_message(wd_func_command->wdNode, WD_ACCEPT_MESSAGE, &emptyPkt);
- /* TODO delete command object */
return true;
}
else if (timer_expired)
* this one also informs the calling client about the failure
*/
-static bool check_and_report_IPC_authentication(WDIPCCommandData* ipcCommand)
+static bool check_and_report_IPC_authentication(WDCommandData* ipcCommand)
{
json_value *root = NULL;
bool internal_client_only = false;
return false; /* should never happen*/
/* first identify the command type */
- switch(ipcCommand->type)
+ switch(ipcCommand->sourcePacket.type)
{
case WD_NODE_STATUS_CHANGE_COMMAND:
case WD_REGISTER_FOR_NOTIFICATION:
internal_client_only = false;
break;
- case WD_FUNCTION_COMMAND:
- case WD_FAILOVER_CMD_SYNC_REQUEST:
+ case WD_IPC_FAILOVER_COMMAND:
+ case WD_IPC_ONLINE_RECOVERY_COMMAND:
+ case WD_FAILOVER_LOCKING_REQUEST:
case WD_GET_MASTER_DATA_REQUEST:
/* only allowed internaly.*/
internal_client_only = true;
return true;
}
- if (ipcCommand->data_len <= 0 || ipcCommand->data_buf == NULL)
+ if (ipcCommand->sourcePacket.len <= 0 || ipcCommand->sourcePacket.data == NULL)
{
ereport(LOG,
(errmsg("authentication failed"),
errdetail("IPC command contains no data")));
+ ipcCommand->errorMessage = MemoryContextStrdup(ipcCommand->memoryContext,
+ "authentication failed: invalid data");
+
return false;
}
- root = json_parse(ipcCommand->data_buf,ipcCommand->data_len);
+ root = json_parse(ipcCommand->sourcePacket.data,ipcCommand->sourcePacket.len);
/* The root node must be object */
if (root == NULL || root->type != json_object)
{
ereport(LOG,
(errmsg("authentication failed"),
errdetail("IPC command contains an invalid data")));
+
+ ipcCommand->errorMessage = MemoryContextStrdup(ipcCommand->memoryContext,
+ "authentication failed: invalid data");
+
return false;
}
if (ret == false)
{
- char* error_json;
ereport(WARNING,
(errmsg("authentication failed"),
errdetail("invalid IPC key")));
-
- error_json = get_wd_simple_error_message_json("IPC client authentication failed");
- if (write_ipc_command_with_result_data(ipcCommand, WD_IPC_CMD_RESULT_BAD,
- error_json, strlen(error_json) +1))
- {
- ereport(LOG,
- (errmsg("failed to forward error message to IPC socket")));
- }
- pfree(error_json);
+ ipcCommand->errorMessage = MemoryContextStrdup(ipcCommand->memoryContext,
+ "authentication failed: invalid KEY");
}
return ret;
}
pkt_type?pkt_type->name:"UNKNOWN",
wd_state_names[get_local_node_state()])));
}
+
+static int send_command_packet_to_remote_nodes(WDCommandData* ipcCommand, bool source_included)
+{
+ int i;
+ ipcCommand->commandSendToCount = 0;
+ ipcCommand->commandReplyFromCount = 0;
+ ipcCommand->commandSendToErrorCount = 0;
+ allocate_resultNodes_in_IPCCommand(ipcCommand);
+ ereport(DEBUG2,
+ (errmsg("sending the %c type message to \"%s\"",
+ ipcCommand->commandPacket.type,
+ ipcCommand->sendToNode?ipcCommand->sendToNode->nodeName:"ALL NODES")));
+ for (i=0; i< g_cluster.remoteNodeCount; i++)
+ {
+ WDCommandNodeResult* nodeResult = &ipcCommand->nodeResults[i];
+ if (ipcCommand->sendToNode != NULL && ipcCommand->sendToNode != nodeResult->wdNode)
+ {
+ /* The command is intended for specific node and this is not the one */
+ nodeResult->cmdState = COMMAND_STATE_DO_NOT_SEND;
+ }
+ else if (source_included == false && ipcCommand->sourceWdNode == nodeResult->wdNode &&
+ ipcCommand->commandSource == COMMAND_SOURCE_REMOTE)
+ {
+ ereport(DEBUG1,
+ (errmsg("not sending the %c type message to command originator node \"%s\"",
+ ipcCommand->commandPacket.type,nodeResult->wdNode->nodeName)));
+ /*
+ * The message is not supposed to be sent to the watchdog
+ * node that started this command
+ */
+ nodeResult->cmdState = COMMAND_STATE_DO_NOT_SEND;
+ }
+ else if (is_node_active(nodeResult->wdNode) == false)
+ {
+ nodeResult->cmdState = COMMAND_STATE_DO_NOT_SEND;
+ }
+ else if (is_node_reachable(nodeResult->wdNode) == false)
+ {
+ nodeResult->cmdState = COMMAND_STATE_SEND_ERROR;
+ ipcCommand->commandSendToErrorCount++;
+ }
+ else if (send_message_to_node(nodeResult->wdNode, &ipcCommand->commandPacket) == true)
+ {
+ ereport(DEBUG2,
+ (errmsg("%c type message written to socket for node \"%s\"",
+ ipcCommand->commandPacket.type,nodeResult->wdNode->nodeName)));
+
+ nodeResult->cmdState = COMMAND_STATE_SENT;
+ ipcCommand->commandSendToCount++;
+ }
+ else
+ {
+ nodeResult->cmdState = COMMAND_STATE_SEND_ERROR;
+ ipcCommand->commandSendToErrorCount++;
+ }
+ }
+ return ipcCommand->commandSendToCount;
+}
+
+static bool is_cluster_command_in_progress(void)
+{
+ return g_cluster.currentCommand.commandStatus == COMMAND_IN_PROGRESS;
+}
+