22 * collector.c
33 * Collector of wait event history and profile.
44 *
5- * Copyright (c) 2015-2016 , Postgres Professional
5+ * Copyright (c) 2015-2025 , Postgres Professional
66 *
77 * IDENTIFICATION
88 * contrib/pg_wait_sampling/pg_wait_sampling.c
99 */
1010#include "postgres.h"
1111
12- #include "catalog/pg_type.h"
13- #if PG_VERSION_NUM >= 130000
14- #include "common/hashfn.h"
15- #endif
16- #include "funcapi.h"
1712#include "miscadmin.h"
13+ #include "pg_wait_sampling.h"
1814#include "postmaster/bgworker.h"
1915#include "postmaster/interrupt.h"
20- #include "storage/ipc.h"
21- #include "storage/procarray.h"
16+ #include "storage/latch.h"
17+ #include "storage/lock.h"
18+ #include "storage/lwlock.h"
19+ #include "storage/proc.h"
2220#include "storage/procsignal.h"
2321#include "storage/shm_mq.h"
24- #include "storage/shm_toc .h"
25- #include "storage/spin .h"
26- #include "utils/memutils .h"
27- #include "utils/resowner.h"
22+ #include "utils/guc .h"
23+ #include "utils/hsearch .h"
24+ #include "utils/timestamp .h"
25+ #if PG_VERSION_NUM < 140000
2826#include "pgstat.h"
27+ #else
28+ #include "utils/wait_event.h"
29+ #endif
2930
30- #include "compat.h"
31- #include "pg_wait_sampling.h"
31+ static inline shm_mq_result
32+ shm_mq_send_compat (shm_mq_handle * mqh , Size nbytes , const void * data ,
33+ bool nowait , bool force_flush )
34+ {
35+ #if PG_VERSION_NUM >= 150000
36+ return shm_mq_send (mqh , nbytes , data , nowait , force_flush );
37+ #else
38+ return shm_mq_send (mqh , nbytes , data , nowait );
39+ #endif
40+ }
3241
33- static volatile sig_atomic_t shutdown_requested = false;
42+ #if PG_VERSION_NUM < 170000
43+ #define INIT_PG_LOAD_SESSION_LIBS 0x0001
44+ #define INIT_PG_OVERRIDE_ALLOW_CONNS 0x0002
45+ #endif
3446
35- static void handle_sigterm (SIGNAL_ARGS );
47+ static inline void
48+ InitPostgresCompat (const char * in_dbname , Oid dboid ,
49+ const char * username , Oid useroid ,
50+ bits32 flags ,
51+ char * out_dbname )
52+ {
53+ #if PG_VERSION_NUM >= 170000
54+ InitPostgres (in_dbname , dboid , username , useroid , flags , out_dbname );
55+ #elif PG_VERSION_NUM >= 150000
56+ InitPostgres (in_dbname , dboid , username , useroid ,
57+ flags & INIT_PG_LOAD_SESSION_LIBS ,
58+ flags & INIT_PG_OVERRIDE_ALLOW_CONNS , out_dbname );
59+ #else
60+ InitPostgres (in_dbname , dboid , username , useroid , out_dbname ,
61+ flags & INIT_PG_OVERRIDE_ALLOW_CONNS );
62+ #endif
63+ }
3664
3765/*
3866 * Register background worker for collecting waits history.
@@ -111,16 +139,6 @@ realloc_history(History *observations, int count)
111139 observations -> wraparound = false;
112140}
113141
114- static void
115- handle_sigterm (SIGNAL_ARGS )
116- {
117- int save_errno = errno ;
118- shutdown_requested = true;
119- if (MyProc )
120- SetLatch (& MyProc -> procLatch );
121- errno = save_errno ;
122- }
123-
124142/*
125143 * Get next item of history with rotation.
126144 */
@@ -129,6 +147,7 @@ get_next_observation(History *observations)
129147{
130148 HistoryItem * result ;
131149
150+ /* Check for wraparound */
132151 if (observations -> index >= observations -> count )
133152 {
134153 observations -> index = 0 ;
@@ -215,6 +234,7 @@ send_history(History *observations, shm_mq_handle *mqh)
215234 else
216235 count = observations -> index ;
217236
237+ /* Send array size first since receive_array expects this */
218238 mq_result = shm_mq_send_compat (mqh , sizeof (count ), & count , false, true);
219239 if (mq_result == SHM_MQ_DETACHED )
220240 {
@@ -251,6 +271,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
251271 Size count = hash_get_num_entries (profile_hash );
252272 shm_mq_result mq_result ;
253273
274+ /* Send array size first since receive_array expects this */
254275 mq_result = shm_mq_send_compat (mqh , sizeof (count ), & count , false, true);
255276 if (mq_result == SHM_MQ_DETACHED )
256277 {
@@ -283,32 +304,11 @@ make_profile_hash()
283304{
284305 HASHCTL hash_ctl ;
285306
286- hash_ctl .hash = tag_hash ;
287- hash_ctl .hcxt = TopMemoryContext ;
288-
289- if (pgws_profileQueries )
290- hash_ctl .keysize = offsetof(ProfileItem , count );
291- else
292- hash_ctl .keysize = offsetof(ProfileItem , queryId );
293-
307+ /* We always include queryId in hash key */
308+ hash_ctl .keysize = offsetof(ProfileItem , count );
294309 hash_ctl .entrysize = sizeof (ProfileItem );
295310 return hash_create ("Waits profile hash" , 1024 , & hash_ctl ,
296- HASH_FUNCTION | HASH_ELEM );
297- }
298-
299- /*
300- * Delta between two timestamps in milliseconds.
301- */
302- static int64
303- millisecs_diff (TimestampTz tz1 , TimestampTz tz2 )
304- {
305- long secs ;
306- int microsecs ;
307-
308- TimestampDifference (tz1 , tz2 , & secs , & microsecs );
309-
310- return secs * 1000 + microsecs / 1000 ;
311-
311+ HASH_ELEM | HASH_BLOBS );
312312}
313313
314314/*
@@ -319,77 +319,49 @@ pgws_collector_main(Datum main_arg)
319319{
320320 HTAB * profile_hash = NULL ;
321321 History observations ;
322- MemoryContext old_context ,
323- collector_context ;
324322 TimestampTz current_ts ,
325323 history_ts ,
326324 profile_ts ;
327325
328- /*
329- * Establish signal handlers.
330- *
331- * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as
332- * it would a normal user backend. To make that happen, we establish a
333- * signal handler that is a stripped-down version of die(). We don't have
334- * any equivalent of the backend's command-read loop, where interrupts can
335- * be processed immediately, so make sure ImmediateInterruptOK is turned
336- * off.
337- *
338- * We also want to respond to the ProcSignal notifications. This is done
339- * in the upstream provided procsignal_sigusr1_handler, which is
340- * automatically used if a bgworker connects to a database. But since our
341- * worker doesn't connect to any database even though it calls
342- * InitPostgres, which will still initializze a new backend and thus
343- * partitipate to the ProcSignal infrastructure.
344- */
345- pqsignal (SIGTERM , handle_sigterm );
326+ /* Establish signal handlers */
346327 pqsignal (SIGHUP , SignalHandlerForConfigReload );
347328 pqsignal (SIGUSR1 , procsignal_sigusr1_handler );
348329 BackgroundWorkerUnblockSignals ();
349330 InitPostgresCompat (NULL , InvalidOid , NULL , InvalidOid , 0 , NULL );
350331 SetProcessingMode (NormalProcessing );
351332
352- /* Make pg_wait_sampling recognisable in pg_stat_activity */
353- pgstat_report_appname ("pg_wait_sampling collector" );
354333
355- profile_hash = make_profile_hash ();
356334 pgws_collector_hdr -> latch = & MyProc -> procLatch ;
357335
358- CurrentResourceOwner = ResourceOwnerCreate (NULL , "pg_wait_sampling collector" );
359- collector_context = AllocSetContextCreate (TopMemoryContext ,
360- "pg_wait_sampling context" , ALLOCSET_DEFAULT_SIZES );
361- old_context = MemoryContextSwitchTo (collector_context );
362336 alloc_history (& observations , pgws_historySize );
363- MemoryContextSwitchTo ( old_context );
337+ profile_hash = make_profile_hash ( );
364338
365- ereport (LOG , ( errmsg ("pg_wait_sampling collector started" ) ));
339+ ereport (LOG , errmsg ("pg_wait_sampling collector started" ));
366340
367341 /* Start counting time for history and profile samples */
368342 profile_ts = history_ts = GetCurrentTimestamp ();
369343
370344 while (1 )
371345 {
372- int rc ;
373346 shm_mq_handle * mqh ;
374347 int64 history_diff ,
375348 profile_diff ;
376349 bool write_history ,
377350 write_profile ;
378351
379- /* We need an explicit call for at least ProcSignal notifications. */
380- CHECK_FOR_INTERRUPTS ();
352+ HandleMainLoopInterrupts ();
381353
382354 if (ConfigReloadPending )
383355 {
384356 ConfigReloadPending = false;
385357 ProcessConfigFile (PGC_SIGHUP );
386358 }
387359
388- /* Wait calculate time to next sample for history or profile */
360+ /* Calculate time to next sample for history or profile */
389361 current_ts = GetCurrentTimestamp ();
390362
391- history_diff = millisecs_diff (history_ts , current_ts );
392- profile_diff = millisecs_diff (profile_ts , current_ts );
363+ history_diff = TimestampDifferenceMilliseconds (history_ts , current_ts );
364+ profile_diff = TimestampDifferenceMilliseconds (profile_ts , current_ts );
393365
394366 write_history = (history_diff >= (int64 )pgws_historyPeriod );
395367 write_profile = (profile_diff >= (int64 )pgws_profilePeriod );
@@ -412,20 +384,15 @@ pgws_collector_main(Datum main_arg)
412384 }
413385 }
414386
415- /* Shutdown if requested */
416- if (shutdown_requested )
417- break ;
418-
419387 /*
420- * Wait until next sample time or request to do something through
388+ * Wait for sample time or until request to do something through
421389 * shared memory.
422390 */
423- rc = WaitLatch (& MyProc -> procLatch , WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH ,
424- Min (pgws_historyPeriod - (int )history_diff ,
425- pgws_historyPeriod - (int )profile_diff ), PG_WAIT_EXTENSION );
426-
427- if (rc & WL_POSTMASTER_DEATH )
428- proc_exit (1 );
391+ WaitLatch (& MyProc -> procLatch ,
392+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH ,
393+ Min (pgws_historyPeriod - (int )history_diff ,
394+ pgws_profilePeriod - (int )profile_diff ),
395+ PG_WAIT_EXTENSION );
429396
430397 ResetLatch (& MyProc -> procLatch );
431398
@@ -484,15 +451,4 @@ pgws_collector_main(Datum main_arg)
484451 LockRelease (& tag , ExclusiveLock , false);
485452 }
486453 }
487-
488- MemoryContextReset (collector_context );
489-
490- /*
491- * We're done. Explicitly detach the shared memory segment so that we
492- * don't get a resource leak warning at commit time. This will fire any
493- * on_dsm_detach callbacks we've registered, as well. Once that's done,
494- * we can go ahead and exit.
495- */
496- ereport (LOG , (errmsg ("pg_wait_sampling collector shutting down" )));
497- proc_exit (0 );
498454}
0 commit comments