22 * collector.c
33 * Collector of wait event history and profile.
44 *
5- * Copyright (c) 2015-2016 , Postgres Professional
5+ * Copyright (c) 2015-2025 , Postgres Professional
66 *
77 * IDENTIFICATION
88 * contrib/pg_wait_sampling/pg_wait_sampling.c
99 */
1010#include "postgres.h"
1111
12- #include "catalog/pg_type.h"
13- #if PG_VERSION_NUM >= 130000
14- #include "common/hashfn.h"
15- #endif
16- #include "funcapi.h"
12+ #include <signal.h>
13+
14+ #include "compat.h"
1715#include "miscadmin.h"
16+ #include "pg_wait_sampling.h"
17+ #include "pgstat.h"
1818#include "postmaster/bgworker.h"
1919#include "postmaster/interrupt.h"
2020#include "storage/ipc.h"
21- #include "storage/procarray.h"
21+ #include "storage/latch.h"
22+ #include "storage/lock.h"
23+ #include "storage/lwlock.h"
24+ #include "storage/proc.h"
2225#include "storage/procsignal.h"
2326#include "storage/shm_mq.h"
24- #include "storage/shm_toc .h"
25- #include "storage/spin .h"
27+ #include "utils/guc .h"
28+ #include "utils/hsearch .h"
2629#include "utils/memutils.h"
2730#include "utils/resowner.h"
28- #include "pgstat.h"
29-
30- #include "compat.h"
31- #include "pg_wait_sampling.h"
31+ #include "utils/timestamp.h"
3232
3333static volatile sig_atomic_t shutdown_requested = false;
3434
@@ -73,10 +73,10 @@ alloc_history(History *observations, int count)
7373static void
7474realloc_history (History * observations , int count )
7575{
76- HistoryItem * newitems ;
77- int copyCount ,
78- i ,
79- j ;
76+ HistoryItem * newitems ;
77+ int copyCount ,
78+ i ,
79+ j ;
8080
8181 /* Allocate new array for history */
8282 newitems = (HistoryItem * ) palloc0 (sizeof (HistoryItem ) * count );
@@ -114,7 +114,8 @@ realloc_history(History *observations, int count)
114114static void
115115handle_sigterm (SIGNAL_ARGS )
116116{
117- int save_errno = errno ;
117+ int save_errno = errno ;
118+
118119 shutdown_requested = true;
119120 if (MyProc )
120121 SetLatch (& MyProc -> procLatch );
@@ -129,6 +130,7 @@ get_next_observation(History *observations)
129130{
130131 HistoryItem * result ;
131132
133+ /* Check for wraparound */
132134 if (observations -> index >= observations -> count )
133135 {
134136 observations -> index = 0 ;
@@ -149,7 +151,7 @@ probe_waits(History *observations, HTAB *profile_hash,
149151{
150152 int i ,
151153 newSize ;
152- TimestampTz ts = GetCurrentTimestamp ();
154+ TimestampTz ts = GetCurrentTimestamp ();
153155
154156 /* Realloc waits history if needed */
155157 newSize = pgws_historySize ;
@@ -160,9 +162,9 @@ probe_waits(History *observations, HTAB *profile_hash,
160162 LWLockAcquire (ProcArrayLock , LW_SHARED );
161163 for (i = 0 ; i < ProcGlobal -> allProcCount ; i ++ )
162164 {
163- HistoryItem item ,
164- * observation ;
165- PGPROC * proc = & ProcGlobal -> allProcs [i ];
165+ HistoryItem item ,
166+ * observation ;
167+ PGPROC * proc = & ProcGlobal -> allProcs [i ];
166168
167169 if (!pgws_should_sample_proc (proc , & item .pid , & item .wait_event_info ))
168170 continue ;
@@ -184,8 +186,8 @@ probe_waits(History *observations, HTAB *profile_hash,
184186 /* Write to the profile if needed */
185187 if (write_profile )
186188 {
187- ProfileItem * profileItem ;
188- bool found ;
189+ ProfileItem * profileItem ;
190+ bool found ;
189191
190192 if (!profile_pid )
191193 item .pid = 0 ;
@@ -206,15 +208,16 @@ probe_waits(History *observations, HTAB *profile_hash,
206208static void
207209send_history (History * observations , shm_mq_handle * mqh )
208210{
209- Size count ,
210- i ;
211- shm_mq_result mq_result ;
211+ Size count ,
212+ i ;
213+ shm_mq_result mq_result ;
212214
213215 if (observations -> wraparound )
214216 count = observations -> count ;
215217 else
216218 count = observations -> index ;
217219
220+ /* Send array size first since receive_array expects this */
218221 mq_result = shm_mq_send_compat (mqh , sizeof (count ), & count , false, true);
219222 if (mq_result == SHM_MQ_DETACHED )
220223 {
@@ -226,10 +229,10 @@ send_history(History *observations, shm_mq_handle *mqh)
226229 for (i = 0 ; i < count ; i ++ )
227230 {
228231 mq_result = shm_mq_send_compat (mqh ,
229- sizeof (HistoryItem ),
230- & observations -> items [i ],
231- false,
232- true);
232+ sizeof (HistoryItem ),
233+ & observations -> items [i ],
234+ false,
235+ true);
233236 if (mq_result == SHM_MQ_DETACHED )
234237 {
235238 ereport (WARNING ,
@@ -246,11 +249,12 @@ send_history(History *observations, shm_mq_handle *mqh)
246249static void
247250send_profile (HTAB * profile_hash , shm_mq_handle * mqh )
248251{
249- HASH_SEQ_STATUS scan_status ;
250- ProfileItem * item ;
251- Size count = hash_get_num_entries (profile_hash );
252- shm_mq_result mq_result ;
252+ HASH_SEQ_STATUS scan_status ;
253+ ProfileItem * item ;
254+ Size count = hash_get_num_entries (profile_hash );
255+ shm_mq_result mq_result ;
253256
257+ /* Send array size first since receive_array expects this */
254258 mq_result = shm_mq_send_compat (mqh , sizeof (count ), & count , false, true);
255259 if (mq_result == SHM_MQ_DETACHED )
256260 {
@@ -281,10 +285,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
281285static HTAB *
282286make_profile_hash ()
283287{
284- HASHCTL hash_ctl ;
285-
286- hash_ctl .hash = tag_hash ;
287- hash_ctl .hcxt = TopMemoryContext ;
288+ HASHCTL hash_ctl ;
288289
289290 if (pgws_profileQueries )
290291 hash_ctl .keysize = offsetof(ProfileItem , count );
@@ -293,7 +294,7 @@ make_profile_hash()
293294
294295 hash_ctl .entrysize = sizeof (ProfileItem );
295296 return hash_create ("Waits profile hash" , 1024 , & hash_ctl ,
296- HASH_FUNCTION | HASH_ELEM );
297+ HASH_ELEM | HASH_BLOBS );
297298}
298299
299300/*
@@ -302,8 +303,8 @@ make_profile_hash()
302303static int64
303304millisecs_diff (TimestampTz tz1 , TimestampTz tz2 )
304305{
305- long secs ;
306- int microsecs ;
306+ long secs ;
307+ int microsecs ;
307308
308309 TimestampDifference (tz1 , tz2 , & secs , & microsecs );
309310
@@ -317,26 +318,19 @@ millisecs_diff(TimestampTz tz1, TimestampTz tz2)
317318void
318319pgws_collector_main (Datum main_arg )
319320{
320- HTAB * profile_hash = NULL ;
321- History observations ;
322- MemoryContext old_context ,
323- collector_context ;
324- TimestampTz current_ts ,
325- history_ts ,
326- profile_ts ;
321+ HTAB * profile_hash = NULL ;
322+ History observations ;
323+ MemoryContext old_context ,
324+ collector_context ;
325+ TimestampTz current_ts ,
326+ history_ts ,
327+ profile_ts ;
327328
328329 /*
329330 * Establish signal handlers.
330331 *
331- * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as
332- * it would a normal user backend. To make that happen, we establish a
333- * signal handler that is a stripped-down version of die(). We don't have
334- * any equivalent of the backend's command-read loop, where interrupts can
335- * be processed immediately, so make sure ImmediateInterruptOK is turned
336- * off.
337- *
338- * We also want to respond to the ProcSignal notifications. This is done
339- * in the upstream provided procsignal_sigusr1_handler, which is
332+ * We want to respond to the ProcSignal notifications. This is done in
333+ * the upstream provided procsignal_sigusr1_handler, which is
340334 * automatically used if a bgworker connects to a database. But since our
341335 * worker doesn't connect to any database even though it calls
342336 * InitPostgres, which will still initializze a new backend and thus
@@ -357,7 +351,7 @@ pgws_collector_main(Datum main_arg)
357351
358352 CurrentResourceOwner = ResourceOwnerCreate (NULL , "pg_wait_sampling collector" );
359353 collector_context = AllocSetContextCreate (TopMemoryContext ,
360- "pg_wait_sampling context" , ALLOCSET_DEFAULT_SIZES );
354+ "pg_wait_sampling context" , ALLOCSET_DEFAULT_SIZES );
361355 old_context = MemoryContextSwitchTo (collector_context );
362356 alloc_history (& observations , pgws_historySize );
363357 MemoryContextSwitchTo (old_context );
@@ -369,12 +363,12 @@ pgws_collector_main(Datum main_arg)
369363
370364 while (1 )
371365 {
372- int rc ;
373- shm_mq_handle * mqh ;
374- int64 history_diff ,
375- profile_diff ;
376- bool write_history ,
377- write_profile ;
366+ int rc ;
367+ shm_mq_handle * mqh ;
368+ int64 history_diff ,
369+ profile_diff ;
370+ bool write_history ,
371+ write_profile ;
378372
379373 /* We need an explicit call for at least ProcSignal notifications. */
380374 CHECK_FOR_INTERRUPTS ();
@@ -385,14 +379,14 @@ pgws_collector_main(Datum main_arg)
385379 ProcessConfigFile (PGC_SIGHUP );
386380 }
387381
388- /* Wait calculate time to next sample for history or profile */
382+ /* Calculate time to next sample for history or profile */
389383 current_ts = GetCurrentTimestamp ();
390384
391385 history_diff = millisecs_diff (history_ts , current_ts );
392386 profile_diff = millisecs_diff (profile_ts , current_ts );
393387
394- write_history = (history_diff >= (int64 )pgws_historyPeriod );
395- write_profile = (profile_diff >= (int64 )pgws_profilePeriod );
388+ write_history = (history_diff >= (int64 ) pgws_historyPeriod );
389+ write_profile = (profile_diff >= (int64 ) pgws_profilePeriod );
396390
397391 if (write_history || write_profile )
398392 {
@@ -421,8 +415,8 @@ pgws_collector_main(Datum main_arg)
421415 * shared memory.
422416 */
423417 rc = WaitLatch (& MyProc -> procLatch , WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH ,
424- Min (pgws_historyPeriod - (int )history_diff ,
425- pgws_historyPeriod - (int )profile_diff ), PG_WAIT_EXTENSION );
418+ Min (pgws_historyPeriod - (int ) history_diff ,
419+ pgws_historyPeriod - (int ) profile_diff ), PG_WAIT_EXTENSION );
426420
427421 if (rc & WL_POSTMASTER_DEATH )
428422 proc_exit (1 );
@@ -443,7 +437,7 @@ pgws_collector_main(Datum main_arg)
443437
444438 if (request == HISTORY_REQUEST || request == PROFILE_REQUEST )
445439 {
446- shm_mq_result mq_result ;
440+ shm_mq_result mq_result ;
447441
448442 /* Send history or profile */
449443 shm_mq_set_sender (pgws_collector_mq , MyProc );
@@ -487,12 +481,6 @@ pgws_collector_main(Datum main_arg)
487481
488482 MemoryContextReset (collector_context );
489483
490- /*
491- * We're done. Explicitly detach the shared memory segment so that we
492- * don't get a resource leak warning at commit time. This will fire any
493- * on_dsm_detach callbacks we've registered, as well. Once that's done,
494- * we can go ahead and exit.
495- */
496484 ereport (LOG , (errmsg ("pg_wait_sampling collector shutting down" )));
497485 proc_exit (0 );
498486}
0 commit comments