2121#include "postgres.h"
2222#include "fmgr.h"
2323#include "miscadmin.h"
24- // #include "common/pg_socket.h"
2524#include "pqexpbuffer.h"
2625#include "access/xact.h"
2726#include "access/clog.h"
@@ -67,8 +66,8 @@ bool MtmIsReceiver;
6766
6867typedef struct MtmFlushPosition
6968{
70- dlist_node node ;
71- int node_id ;
69+ dlist_node node ;
70+ int node_id ;
7271 XLogRecPtr local_end ;
7372 XLogRecPtr remote_end ;
7473} MtmFlushPosition ;
@@ -81,7 +80,7 @@ char const* const MtmReplicationModeName[] =
8180
8281static dlist_head MtmLsnMapping = DLIST_STATIC_INIT (MtmLsnMapping );
8382
84- MtmConfig * receiver_mtm_cfg ;
83+ MtmConfig * receiver_mtm_cfg ;
8584bool receiver_mtm_cfg_valid ;
8685
8786/* Signal handling */
@@ -308,15 +307,15 @@ MtmExecute(void* work, int size, MtmReceiverContext *receiver_ctx, bool no_pool)
308307static bool
309308MtmFilterTransaction (char * record , int size , Syncpoint * spvector , HTAB * filter_map )
310309{
311- StringInfoData s ;
312- uint8 event ;
310+ StringInfoData s ;
311+ uint8 event ;
313312 XLogRecPtr origin_lsn ;
314313 XLogRecPtr end_lsn ;
315314 XLogRecPtr tx_lsn ;
316- int replication_node ;
317- int origin_node ;
318- char const * gid = "" ;
319- char msgtype PG_USED_FOR_ASSERTS_ONLY ;
315+ int replication_node ;
316+ int origin_node ;
317+ char const * gid = "" ;
318+ char msgtype PG_USED_FOR_ASSERTS_ONLY ;
320319
321320 s .data = record ;
322321 s .len = size ;
@@ -377,12 +376,10 @@ MtmFilterTransaction(char *record, int size, Syncpoint *spvector, HTAB *filter_m
377376
378377 hash_search (filter_map , & entry , HASH_FIND , & found );
379378
380- {
381- mtm_log (MtmReceiverFilter ,
382- "Filter (map) transaction %s from node %d event=%x (restrt=%" INT64_MODIFIER "x, tx=%d/%" INT64_MODIFIER "x) -> %d" ,
383- gid , replication_node , event ,
384- spvector [origin_node - 1 ].origin_lsn , origin_node , tx_lsn , found );
385- }
379+ mtm_log (MtmReceiverFilter ,
380+ "Filter (map) transaction %s from node %d event=%x (restrt=%" INT64_MODIFIER "x, tx=%d/%" INT64_MODIFIER "x) -> %d" ,
381+ gid , replication_node , event ,
382+ spvector [origin_node - 1 ].origin_lsn , origin_node , tx_lsn , found );
386383
387384 return found ;
388385 }
@@ -422,10 +419,10 @@ MtmEndSession(int nodeId, bool unlock)
422419static PGconn *
423420receiver_connect (char * conninfo )
424421{
425- PGconn * conn ;
426- ConnStatusType status ;
427- const char * keys [] = {"dbname" , "replication" , NULL };
428- const char * vals [] = {conninfo , "database" , NULL };
422+ PGconn * conn ;
423+ ConnStatusType status ;
424+ const char * keys [] = {"dbname" , "replication" , NULL };
425+ const char * vals [] = {conninfo , "database" , NULL };
429426
430427 conn = PQconnectdbParams (keys , vals , /* expand_dbname = */ true);
431428 status = PQstatus (conn );
@@ -447,9 +444,9 @@ receiver_connect(char *conninfo)
447444void
448445MtmReceiverCreateSlot (char * conninfo , int my_node_id )
449446{
450- StringInfoData cmd ;
451- PGresult * res ;
452- PGconn * conn = receiver_connect (conninfo );
447+ StringInfoData cmd ;
448+ PGresult * res ;
449+ PGconn * conn = receiver_connect (conninfo );
453450
454451 if (!conn )
455452 mtm_log (ERROR , "Could not connect to '%s'" , conninfo );
@@ -846,15 +843,11 @@ pglogical_receiver_main(Datum main_arg)
846843 }
847844 if (stmt [0 ] == 'Z' || (stmt [0 ] == 'M' && (stmt [1 ] == 'L' || stmt [1 ] == 'P' || stmt [1 ] == 'C' || stmt [1 ] == 'S' ))) {
848845 if (stmt [0 ] == 'M' && stmt [1 ] == 'C' )
849- {
850846 /* concurrent DDL should be executed by parallel workers */
851847 MtmExecute (stmt , msg_len , & receiver_ctx , false);
852- }
853848 else
854- {
855849 /* all other messages should be processed by receiver itself */
856850 MtmExecute (stmt , msg_len , & receiver_ctx , true);
857- }
858851 }
859852 else
860853 {
@@ -877,7 +870,8 @@ pglogical_receiver_main(Datum main_arg)
877870 else
878871 MtmExecute (buf .data , buf .used , & receiver_ctx , false);
879872
880- } else if (spill_file >= 0 )
873+ }
874+ else if (spill_file >= 0 )
881875 {
882876 MtmCloseSpillFile (spill_file );
883877 resetStringInfo (& spill_info );
@@ -899,16 +893,16 @@ pglogical_receiver_main(Datum main_arg)
899893 * not more than the specified timeout, so that we can send a
900894 * response back to the client.
901895 */
902- int r ;
903- fd_set input_mask ;
904- int64 message_target = 0 ;
905- int64 fsync_target = 0 ;
906- struct timeval timeout ;
907- struct timeval * timeoutptr = NULL ;
908- int64 targettime ;
909- long secs ;
910- int usecs ;
911- int64 now ;
896+ int r ;
897+ fd_set input_mask ;
898+ int64 message_target = 0 ;
899+ int64 fsync_target = 0 ;
900+ struct timeval timeout ;
901+ struct timeval * timeoutptr = NULL ;
902+ int64 targettime ;
903+ long secs ;
904+ int usecs ;
905+ int64 now ;
912906
913907 FD_ZERO (& input_mask );
914908 FD_SET (PQsocket (conn ), & input_mask );
@@ -919,10 +913,7 @@ pglogical_receiver_main(Datum main_arg)
919913 if (fsync_target > 0 && fsync_target < targettime )
920914 targettime = fsync_target ;
921915 now = feGetCurrentTimestamp ();
922- feTimestampDifference (now ,
923- targettime ,
924- & secs ,
925- & usecs );
916+ feTimestampDifference (now , targettime , & secs , & usecs );
926917 if (secs <= 0 )
927918 timeout .tv_sec = 1 ; /* Always sleep at least 1 sec */
928919 else
@@ -940,14 +931,13 @@ pglogical_receiver_main(Datum main_arg)
940931 sendFeedback (conn , now , nodeId );
941932 }
942933 else if (r < 0 && errno == EINTR )
943- {
944934 /*
945935 * Got a timeout or signal. Continue the loop and either
946936 * deliver a status packet to the server or just go back into
947937 * blocking.
948938 */
949939 continue ;
950- }
940+
951941 else if (r < 0 )
952942 {
953943 ereport (LOG , (MTM_ERRMSG ("%s: Incorrect status received." ,
@@ -1003,17 +993,16 @@ pglogical_receiver_main(Datum main_arg)
1003993 BgwPoolCancel (& Mtm -> pools [nodeId - 1 ]);
1004994 MtmSleep (RECEIVER_SUSPEND_TIMEOUT );
1005995 }
1006- // ByteBufferFree(&buf);
1007- /* Never reach that point */
1008996
997+ /* Never reach that point */
1009998 proc_exit (2 );
1010999}
10111000
10121001BackgroundWorkerHandle *
10131002MtmStartReceiver (int nodeId , Oid db_id , Oid user_id , pid_t monitor_pid )
10141003{
1015- BackgroundWorker worker ;
1016- BackgroundWorkerHandle * handle ;
1004+ BackgroundWorker worker ;
1005+ BackgroundWorkerHandle * handle ;
10171006
10181007 MemSet (& worker , 0 , sizeof (BackgroundWorker ));
10191008 worker .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION ;
0 commit comments