22 * collector.c 
33 *		Collector of wait event history and profile. 
44 * 
5-  * Copyright (c) 2015-2016 , Postgres Professional 
5+  * Copyright (c) 2015-2025 , Postgres Professional 
66 * 
77 * IDENTIFICATION 
88 *	  contrib/pg_wait_sampling/pg_wait_sampling.c 
99 */ 
1010#include  "postgres.h" 
1111
12- #include  "catalog/pg_type.h" 
13- #if  PG_VERSION_NUM  >= 130000 
14- #include  "common/hashfn.h" 
15- #endif 
16- #include  "funcapi.h" 
12+ #include  <signal.h> 
13+ 
14+ #include  "compat.h" 
1715#include  "miscadmin.h" 
16+ #include  "pg_wait_sampling.h" 
17+ #include  "pgstat.h" 
1818#include  "postmaster/bgworker.h" 
1919#include  "postmaster/interrupt.h" 
2020#include  "storage/ipc.h" 
21- #include  "storage/procarray.h" 
21+ #include  "storage/latch.h" 
22+ #include  "storage/lock.h" 
23+ #include  "storage/lwlock.h" 
24+ #include  "storage/proc.h" 
2225#include  "storage/procsignal.h" 
2326#include  "storage/shm_mq.h" 
24- #include  "storage/shm_toc .h" 
25- #include  "storage/spin .h" 
27+ #include  "utils/guc .h" 
28+ #include  "utils/hsearch .h" 
2629#include  "utils/memutils.h" 
2730#include  "utils/resowner.h" 
28- #include  "pgstat.h" 
29- 
30- #include  "compat.h" 
31- #include  "pg_wait_sampling.h" 
31+ #include  "utils/timestamp.h" 
3232
3333static  volatile  sig_atomic_t  shutdown_requested  =  false;
3434
@@ -73,10 +73,10 @@ alloc_history(History *observations, int count)
7373static  void 
7474realloc_history (History  * observations , int  count )
7575{
76- 	HistoryItem 	    * newitems ;
77- 	int 				 copyCount ,
78- 					 i ,
79- 					 j ;
76+ 	HistoryItem  * newitems ;
77+ 	int 			copyCount ,
78+ 				i ,
79+ 				j ;
8080
8181	/* Allocate new array for history */ 
8282	newitems  =  (HistoryItem  * ) palloc0 (sizeof (HistoryItem ) *  count );
@@ -114,7 +114,8 @@ realloc_history(History *observations, int count)
114114static  void 
115115handle_sigterm (SIGNAL_ARGS )
116116{
117- 	int  save_errno  =  errno ;
117+ 	int 			save_errno  =  errno ;
118+ 
118119	shutdown_requested  =  true;
119120	if  (MyProc )
120121		SetLatch (& MyProc -> procLatch );
@@ -129,6 +130,7 @@ get_next_observation(History *observations)
129130{
130131	HistoryItem  * result ;
131132
133+ 	/* Check for wraparound */ 
132134	if  (observations -> index  >= observations -> count )
133135	{
134136		observations -> index  =  0 ;
@@ -149,7 +151,7 @@ probe_waits(History *observations, HTAB *profile_hash,
149151{
150152	int 			i ,
151153				newSize ;
152- 	TimestampTz 	 ts  =  GetCurrentTimestamp ();
154+ 	TimestampTz   ts  =  GetCurrentTimestamp ();
153155
154156	/* Realloc waits history if needed */ 
155157	newSize  =  pgws_historySize ;
@@ -160,9 +162,9 @@ probe_waits(History *observations, HTAB *profile_hash,
160162	LWLockAcquire (ProcArrayLock , LW_SHARED );
161163	for  (i  =  0 ; i  <  ProcGlobal -> allProcCount ; i ++ )
162164	{
163- 		HistoryItem 		 item ,
164- 					    * observation ;
165- 		PGPROC 		    * proc  =  & ProcGlobal -> allProcs [i ];
165+ 		HistoryItem   item ,
166+ 				   * observation ;
167+ 		PGPROC 	   * proc  =  & ProcGlobal -> allProcs [i ];
166168
167169		if  (!pgws_should_sample_proc (proc , & item .pid , & item .wait_event_info ))
168170			continue ;
@@ -184,8 +186,8 @@ probe_waits(History *observations, HTAB *profile_hash,
184186		/* Write to the profile if needed */ 
185187		if  (write_profile )
186188		{
187- 			ProfileItem 	    * profileItem ;
188- 			bool 			 found ;
189+ 			ProfileItem  * profileItem ;
190+ 			bool 		found ;
189191
190192			if  (!profile_pid )
191193				item .pid  =  0 ;
@@ -206,15 +208,16 @@ probe_waits(History *observations, HTAB *profile_hash,
206208static  void 
207209send_history (History  * observations , shm_mq_handle  * mqh )
208210{
209- 	Size 	count ,
210- 			i ;
211- 	shm_mq_result 	 mq_result ;
211+ 	Size 		 count ,
212+ 				 i ;
213+ 	shm_mq_result   mq_result ;
212214
213215	if  (observations -> wraparound )
214216		count  =  observations -> count ;
215217	else 
216218		count  =  observations -> index ;
217219
220+ 	/* Send array size first since receive_array expects this */ 
218221	mq_result  =  shm_mq_send_compat (mqh , sizeof (count ), & count , false, true);
219222	if  (mq_result  ==  SHM_MQ_DETACHED )
220223	{
@@ -226,10 +229,10 @@ send_history(History *observations, shm_mq_handle *mqh)
226229	for  (i  =  0 ; i  <  count ; i ++ )
227230	{
228231		mq_result  =  shm_mq_send_compat (mqh ,
229- 								sizeof (HistoryItem ),
230- 								& observations -> items [i ],
231- 								false,
232- 								true);
232+ 									    sizeof (HistoryItem ),
233+ 									    & observations -> items [i ],
234+ 									    false,
235+ 									    true);
233236		if  (mq_result  ==  SHM_MQ_DETACHED )
234237		{
235238			ereport (WARNING ,
@@ -246,11 +249,12 @@ send_history(History *observations, shm_mq_handle *mqh)
246249static  void 
247250send_profile (HTAB  * profile_hash , shm_mq_handle  * mqh )
248251{
249- 	HASH_SEQ_STATUS 	 scan_status ;
250- 	ProfileItem 	    * item ;
251- 	Size 			 count  =  hash_get_num_entries (profile_hash );
252- 	shm_mq_result 	 mq_result ;
252+ 	HASH_SEQ_STATUS   scan_status ;
253+ 	ProfileItem  * item ;
254+ 	Size 		count  =  hash_get_num_entries (profile_hash );
255+ 	shm_mq_result   mq_result ;
253256
257+ 	/* Send array size first since receive_array expects this */ 
254258	mq_result  =  shm_mq_send_compat (mqh , sizeof (count ), & count , false, true);
255259	if  (mq_result  ==  SHM_MQ_DETACHED )
256260	{
@@ -281,10 +285,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
281285static  HTAB  * 
282286make_profile_hash ()
283287{
284- 	HASHCTL  hash_ctl ;
285- 
286- 	hash_ctl .hash  =  tag_hash ;
287- 	hash_ctl .hcxt  =  TopMemoryContext ;
288+ 	HASHCTL 		hash_ctl ;
288289
289290	if  (pgws_profileQueries )
290291		hash_ctl .keysize  =  offsetof(ProfileItem , count );
@@ -293,7 +294,7 @@ make_profile_hash()
293294
294295	hash_ctl .entrysize  =  sizeof (ProfileItem );
295296	return  hash_create ("Waits profile hash" , 1024 , & hash_ctl ,
296- 					   HASH_FUNCTION  | HASH_ELEM );
297+ 					   HASH_ELEM  | HASH_BLOBS );
297298}
298299
299300/* 
@@ -302,8 +303,8 @@ make_profile_hash()
302303static  int64 
303304millisecs_diff (TimestampTz  tz1 , TimestampTz  tz2 )
304305{
305- 	long 	secs ;
306- 	int 		microsecs ;
306+ 	long 		 secs ;
307+ 	int 			 microsecs ;
307308
308309	TimestampDifference (tz1 , tz2 , & secs , & microsecs );
309310
@@ -317,26 +318,19 @@ millisecs_diff(TimestampTz tz1, TimestampTz tz2)
317318void 
318319pgws_collector_main (Datum  main_arg )
319320{
320- 	HTAB 		    * profile_hash  =  NULL ;
321- 	History 			 observations ;
322- 	MemoryContext 	 old_context ,
323- 					 collector_context ;
324- 	TimestampTz 		 current_ts ,
325- 					 history_ts ,
326- 					 profile_ts ;
321+ 	HTAB 	   * profile_hash  =  NULL ;
322+ 	History 		observations ;
323+ 	MemoryContext   old_context ,
324+ 				collector_context ;
325+ 	TimestampTz   current_ts ,
326+ 				history_ts ,
327+ 				profile_ts ;
327328
328329	/* 
329330	 * Establish signal handlers. 
330331	 * 
331- 	 * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as 
332- 	 * it would a normal user backend.  To make that happen, we establish a 
333- 	 * signal handler that is a stripped-down version of die().  We don't have 
334- 	 * any equivalent of the backend's command-read loop, where interrupts can 
335- 	 * be processed immediately, so make sure ImmediateInterruptOK is turned 
336- 	 * off. 
337- 	 * 
338- 	 * We also want to respond to the ProcSignal notifications.  This is done 
339- 	 * in the upstream provided procsignal_sigusr1_handler, which is 
332+ 	 * We want to respond to the ProcSignal notifications.  This is done in 
333+ 	 * the upstream provided procsignal_sigusr1_handler, which is 
340334	 * automatically used if a bgworker connects to a database.  But since our 
341335	 * worker doesn't connect to any database even though it calls 
342336	 * InitPostgres, which will still initializze a new backend and thus 
@@ -357,7 +351,7 @@ pgws_collector_main(Datum main_arg)
357351
358352	CurrentResourceOwner  =  ResourceOwnerCreate (NULL , "pg_wait_sampling collector" );
359353	collector_context  =  AllocSetContextCreate (TopMemoryContext ,
360- 			"pg_wait_sampling context" , ALLOCSET_DEFAULT_SIZES );
354+ 											   "pg_wait_sampling context" , ALLOCSET_DEFAULT_SIZES );
361355	old_context  =  MemoryContextSwitchTo (collector_context );
362356	alloc_history (& observations , pgws_historySize );
363357	MemoryContextSwitchTo (old_context );
@@ -369,12 +363,12 @@ pgws_collector_main(Datum main_arg)
369363
370364	while  (1 )
371365	{
372- 		int 				 rc ;
373- 		shm_mq_handle    * mqh ;
374- 		int64 			 history_diff ,
375- 						 profile_diff ;
376- 		bool 			 write_history ,
377- 						 write_profile ;
366+ 		int 			rc ;
367+ 		shm_mq_handle  * mqh ;
368+ 		int64 		history_diff ,
369+ 					profile_diff ;
370+ 		bool 		write_history ,
371+ 					write_profile ;
378372
379373		/* We need an explicit call for at least ProcSignal notifications. */ 
380374		CHECK_FOR_INTERRUPTS ();
@@ -385,14 +379,14 @@ pgws_collector_main(Datum main_arg)
385379			ProcessConfigFile (PGC_SIGHUP );
386380		}
387381
388- 		/* Wait calculate  time to next sample for history or profile */ 
382+ 		/* Calculate  time to next sample for history or profile */ 
389383		current_ts  =  GetCurrentTimestamp ();
390384
391385		history_diff  =  millisecs_diff (history_ts , current_ts );
392386		profile_diff  =  millisecs_diff (profile_ts , current_ts );
393387
394- 		write_history  =  (history_diff  >= (int64 )pgws_historyPeriod );
395- 		write_profile  =  (profile_diff  >= (int64 )pgws_profilePeriod );
388+ 		write_history  =  (history_diff  >= (int64 )  pgws_historyPeriod );
389+ 		write_profile  =  (profile_diff  >= (int64 )  pgws_profilePeriod );
396390
397391		if  (write_history  ||  write_profile )
398392		{
@@ -421,8 +415,8 @@ pgws_collector_main(Datum main_arg)
421415		 * shared memory. 
422416		 */ 
423417		rc  =  WaitLatch (& MyProc -> procLatch , WL_LATCH_SET  | WL_TIMEOUT  | WL_POSTMASTER_DEATH ,
424- 				Min (pgws_historyPeriod  -  (int )history_diff ,
425- 					pgws_historyPeriod  -  (int )profile_diff ), PG_WAIT_EXTENSION );
418+ 					    Min (pgws_historyPeriod  -  (int )  history_diff ,
419+ 						    pgws_historyPeriod  -  (int )  profile_diff ), PG_WAIT_EXTENSION );
426420
427421		if  (rc  &  WL_POSTMASTER_DEATH )
428422			proc_exit (1 );
@@ -443,7 +437,7 @@ pgws_collector_main(Datum main_arg)
443437
444438			if  (request  ==  HISTORY_REQUEST  ||  request  ==  PROFILE_REQUEST )
445439			{
446- 				shm_mq_result 	 mq_result ;
440+ 				shm_mq_result   mq_result ;
447441
448442				/* Send history or profile */ 
449443				shm_mq_set_sender (pgws_collector_mq , MyProc );
@@ -487,12 +481,6 @@ pgws_collector_main(Datum main_arg)
487481
488482	MemoryContextReset (collector_context );
489483
490- 	/* 
491- 	 * We're done.  Explicitly detach the shared memory segment so that we 
492- 	 * don't get a resource leak warning at commit time.  This will fire any 
493- 	 * on_dsm_detach callbacks we've registered, as well.  Once that's done, 
494- 	 * we can go ahead and exit. 
495- 	 */ 
496484	ereport (LOG , (errmsg ("pg_wait_sampling collector shutting down" )));
497485	proc_exit (0 );
498486}
0 commit comments