Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ MODULE_big = pg_wait_sampling
OBJS = pg_wait_sampling.o collector.o

EXTENSION = pg_wait_sampling
DATA = pg_wait_sampling--1.1.sql pg_wait_sampling--1.0--1.1.sql
DATA = pg_wait_sampling--1.1.sql pg_wait_sampling--1.0--1.1.sql pg_wait_sampling--1.1--1.2.sql

REGRESS = load queries

Expand Down
91 changes: 58 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ When `pg_wait_sampling` is enabled, it collects two kinds of statistics.
recent samples depending on history size (configurable). Assuming there is
a client who periodically read this history and dump it somewhere, user
can have continuous history.
* Waits profile. It's implemented as in-memory hash table where count
of samples are accumulated per each process and each wait event
(and each query with `pg_stat_statements`). This hash
* Waits profile. It's implemented as in-memory hash table where samples
are accumulated and can be grouped by process, wait event, query, user and/or
database (and then joined by queryid with `pg_stat_statements`). This hash
table can be reset by user request. Assuming there is a client who
periodically dumps profile and resets it, user can have statistics of
intensivity of wait events among time.
Expand Down Expand Up @@ -98,53 +98,67 @@ Usage
`pg_wait_sampling` interacts with user by set of views and functions.

`pg_wait_sampling_current` view – information about current wait events for
all processed including background workers.

| Column name | Column type | Description |
| ----------- | ----------- | ----------------------- |
| pid | int4 | Id of process |
| event_type | text | Name of wait event type |
| event | text | Name of wait event |
| queryid | int8 | Id of query |
all processes including background workers.

| Column name | Column type | Description |
| ------------------- | ----------- | --------------------------- |
| pid | int4 | Id of process |
| event_type | text | Name of wait event type |
| event | text | Name of wait event |
| queryid | int8 | Id of query |
| role_id | int4 | Id of role |
| database_id | int4 | Id of database |
| leader_pid | int4 | Id of parallel query leader |
| is_regular_backend | bool | Is backend or worker |

`pg_wait_sampling_get_current(pid int4)` returns the same table for single given
process.

`pg_wait_sampling_history` view – history of wait events obtained by sampling into
in-memory ring buffer.

| Column name | Column type | Description |
| ----------- | ----------- | ----------------------- |
| pid | int4 | Id of process |
| ts | timestamptz | Sample timestamp |
| event_type | text | Name of wait event type |
| event | text | Name of wait event |
| queryid | int8 | Id of query |
| Column name | Column type | Description |
| ------------------- | ----------- | --------------------------- |
| pid | int4 | Id of process |
| event_type | text | Name of wait event type |
| event | text | Name of wait event |
| queryid | int8 | Id of query |
| role_id | int4 | Id of role |
| database_id | int4 | Id of database |
| leader_pid | int4 | Id of parallel query leader |
| is_regular_backend | bool | Is backend or worker |
| ts | timestamptz | Sample timestamp |

`pg_wait_sampling_profile` view – profile of wait events obtained by sampling into
in-memory hash table.

| Column name | Column type | Description |
| ----------- | ----------- | ----------------------- |
| pid | int4 | Id of process |
| event_type | text | Name of wait event type |
| event | text | Name of wait event |
| queryid | int8 | Id of query |
| count | text | Count of samples |
| Column name | Column type | Description |
| ------------------- | ----------- | --------------------------- |
| pid | int4 | Id of process |
| event_type | text | Name of wait event type |
| event | text | Name of wait event |
| queryid | int8 | Id of query |
| role_id | int4 | Id of role |
| database_id | int4 | Id of database |
| leader_pid | int4 | Id of parallel query leader |
| is_regular_backend | bool | Is backend or worker |
| count | text | Count of samples |

`pg_wait_sampling_reset_profile()` function resets the profile.

The work of wait event statistics collector worker is controlled by following
GUCs.

| Parameter name | Data type | Description | Default value |
|----------------------------------| --------- |---------------------------------------------|--------------:|
| pg_wait_sampling.history_size | int4 | Size of history in-memory ring buffer | 5000 |
| pg_wait_sampling.history_period | int4 | Period for history sampling in milliseconds | 10 |
| pg_wait_sampling.profile_period | int4 | Period for profile sampling in milliseconds | 10 |
| pg_wait_sampling.profile_pid | bool | Whether profile should be per pid | true |
| pg_wait_sampling.profile_queries | enum | Whether profile should be per query | top |
| pg_wait_sampling.sample_cpu | bool | Whether on CPU backends should be sampled | true |
| Parameter name | Data type | Description | Default value |
|-------------------------------------| --------- |---------------------------------------------|-----------------------|
| pg_wait_sampling.history_size | int4 | Size of history in-memory ring buffer | 5000 |
| pg_wait_sampling.history_period | int4 | Period for history sampling in milliseconds | 10 |
| pg_wait_sampling.profile_period | int4 | Period for profile sampling in milliseconds | 10 |
| pg_wait_sampling.profile_pid | bool | Whether profile should be per pid | true |
| pg_wait_sampling.profile_queries | enum | Whether profile should be per query | top |
| pg_wait_sampling.sample_cpu | bool | Whether on CPU backends should be sampled | true |
| pg_wait_sampling.history_dimensions | text | Columns that are sampled for history | 'pid, event, queryid' |
| pg_wait_sampling.profile_dimensions | text | Columns that are sampled for profile | 'pid, event, queryid' |

If `pg_wait_sampling.profile_pid` is set to false, sampling profile wouldn't be
collected in per-process manner. In this case the value of pid could would
Expand All @@ -158,10 +172,21 @@ If `pg_wait_sampling.sample_cpu` is set to true then processes that are not
waiting on anything are also sampled. The wait event columns for such processes
will be NULL.

`pg_wait_sampling.history_dimenstions` and `pg_wait_sampling.profile_dimensions`
determine what columns will be sampled in `history/profile` views.
Allowed values are `all`, `pid`, `event`, `query_id`, `role_id`,
`database_id`, `leader_pid` and any combination of column names.
`event` turns on and off both event and event_type columns.
`all` cannot be used together with any other values and must be used alone.

Values of these GUC variables can be changed only in config file or with ALTER SYSTEM.
Then you need to reload server's configuration (such as with pg_reload_conf function)
for changes to take effect.

> [!WARNING]
> Using `pg_reload_conf` will reset `pg_wait_sampling_history` and
> `pg_wait_sampling_profile` views if new dimensions differ from old ones.

See
[PostgreSQL documentation](http://www.postgresql.org/docs/devel/static/monitoring-stats.html#WAIT-EVENT-TABLE)
for list of possible wait events.
Expand Down
128 changes: 88 additions & 40 deletions collector.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

static volatile sig_atomic_t shutdown_requested = false;

int saved_profile_dimensions;
int saved_history_dimensions;

static void handle_sigterm(SIGNAL_ARGS);

/*
Expand Down Expand Up @@ -61,7 +64,8 @@ pgws_register_wait_collector(void)
static void
alloc_history(History *observations, int count)
{
observations->items = (HistoryItem *) palloc0(sizeof(HistoryItem) * count);
saved_history_dimensions = pgws_history_dimensions;
observations->samples = (Sample *) palloc0(sizeof(Sample) * count);
observations->index = 0;
observations->count = count;
observations->wraparound = false;
Expand All @@ -73,13 +77,13 @@ alloc_history(History *observations, int count)
static void
realloc_history(History *observations, int count)
{
HistoryItem *newitems;
Sample *newitems;
int copyCount,
i,
j;

/* Allocate new array for history */
newitems = (HistoryItem *) palloc0(sizeof(HistoryItem) * count);
newitems = (Sample *) palloc0(sizeof(Sample) * count);

/* Copy entries from old array to the new */
if (observations->wraparound)
Expand All @@ -98,14 +102,14 @@ realloc_history(History *observations, int count)
{
if (j >= observations->count)
j = 0;
memcpy(&newitems[i], &observations->items[j], sizeof(HistoryItem));
memcpy(&newitems[i], &observations->samples[j], sizeof(Sample));
i++;
j++;
}

/* Switch to new history array */
pfree(observations->items);
observations->items = newitems;
pfree(observations->samples);
observations->samples = newitems;
observations->index = copyCount;
observations->count = count;
observations->wraparound = false;
Expand All @@ -125,22 +129,55 @@ handle_sigterm(SIGNAL_ARGS)
/*
* Get next item of history with rotation.
*/
static HistoryItem *
static Sample *
get_next_observation(History *observations)
{
HistoryItem *result;
Sample *result;

/* Check for wraparound */
if (observations->index >= observations->count)
{
observations->index = 0;
observations->wraparound = true;
}
result = &observations->items[observations->index];
result = &observations->samples[observations->index];
observations->index++;
return result;
}

void
fill_sample(Sample *sample, PGPROC *proc, int pid, uint32 wait_event_info,
uint64 queryId, int dimensions_mask)
{
Oid role_id = proc->roleId;
Oid database_id = proc->databaseId;
PGPROC *lockGroupLeader = proc->lockGroupLeader;
#if PG_VERSION_NUM >= 180000
bool is_regular_backend = proc->isRegularBackend;
#else
bool is_regular_backend = !proc->isBackgroundWorker;
#endif

if (dimensions_mask & PGWS_DIMENSIONS_PID)
sample->pid = pid;
if (dimensions_mask & PGWS_DIMENSIONS_WAIT_EVENT)
sample->wait_event_info = wait_event_info;
if (pgws_profileQueries || (dimensions_mask & PGWD_DIMENSIONS_QUERY_ID))
sample->queryId = queryId;
/* Copy everything else we need from PGPROC */
if (dimensions_mask & PGWS_DIMENSIONS_ROLE_ID)
sample->role_id = role_id;
if (dimensions_mask & PGWS_DIMENSIONS_DB_ID)
sample->database_id = database_id;
if (dimensions_mask & PGWS_DIMENSIONS_PARALLEL_LEADER_PID)
sample->parallel_leader_pid = (lockGroupLeader &&
lockGroupLeader->pid != pid ?
lockGroupLeader->pid :
0);
if (dimensions_mask & PGWS_DIMENSIONS_IS_REGULAR_BE)
sample->is_regular_backend = is_regular_backend;
}

/*
* Read current waits from backends and write them to history array
* and/or profile hash.
Expand All @@ -162,37 +199,38 @@ probe_waits(History *observations, HTAB *profile_hash,
LWLockAcquire(ProcArrayLock, LW_SHARED);
for (i = 0; i < ProcGlobal->allProcCount; i++)
{
HistoryItem item,
*observation;
PGPROC *proc = &ProcGlobal->allProcs[i];
/* We do not copy PGPROC since it is very big */
PGPROC *proc = &ProcGlobal->allProcs[i];
int pid;
uint32 wait_event_info;

if (!pgws_should_sample_proc(proc, &item.pid, &item.wait_event_info))
if (!pgws_should_sample_proc(proc, &pid, &wait_event_info))
continue;

if (pgws_profileQueries)
item.queryId = pgws_proc_queryids[i];
else
item.queryId = 0;

item.ts = ts;

/* Write to the history if needed */
if (write_history)
{
observation = get_next_observation(observations);
*observation = item;
Sample *observation = get_next_observation(observations);
memset(observation, 0, sizeof(Sample));
fill_sample(observation, proc, pid, wait_event_info,
pgws_proc_queryids[i], saved_history_dimensions);
observation->ts = ts;
}

/* Write to the profile if needed */
if (write_profile)
{
ProfileItem *profileItem;
bool found;
Sample *profileItem;
Sample key;
bool found;

memset(&key, 0, sizeof(Sample));
fill_sample(&key, proc, pid, wait_event_info,
pgws_proc_queryids[i], saved_profile_dimensions);
if (!profile_pid)
item.pid = 0;
key.pid = 0;

profileItem = (ProfileItem *) hash_search(profile_hash, &item, HASH_ENTER, &found);
profileItem = (Sample *) hash_search(profile_hash, &key, HASH_ENTER, &found);
if (found)
profileItem->count++;
else
Expand Down Expand Up @@ -229,8 +267,8 @@ send_history(History *observations, shm_mq_handle *mqh)
for (i = 0; i < count; i++)
{
mq_result = shm_mq_send_compat(mqh,
sizeof(HistoryItem),
&observations->items[i],
sizeof(Sample),
&observations->samples[i],
false,
true);
if (mq_result == SHM_MQ_DETACHED)
Expand All @@ -249,10 +287,10 @@ send_history(History *observations, shm_mq_handle *mqh)
static void
send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
{
HASH_SEQ_STATUS scan_status;
ProfileItem *item;
Size count = hash_get_num_entries(profile_hash);
shm_mq_result mq_result;
HASH_SEQ_STATUS scan_status;
Sample *sample;
Size count = hash_get_num_entries(profile_hash);
shm_mq_result mq_result;

/* Send array size first since receive_array expects this */
mq_result = shm_mq_send_compat(mqh, sizeof(count), &count, false, true);
Expand All @@ -264,9 +302,9 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
return;
}
hash_seq_init(&scan_status, profile_hash);
while ((item = (ProfileItem *) hash_seq_search(&scan_status)) != NULL)
while ((sample = (Sample *) hash_seq_search(&scan_status)) != NULL)
{
mq_result = shm_mq_send_compat(mqh, sizeof(ProfileItem), item, false,
mq_result = shm_mq_send_compat(mqh, sizeof(Sample), sample, false,
true);
if (mq_result == SHM_MQ_DETACHED)
{
Expand All @@ -287,12 +325,10 @@ make_profile_hash()
{
HASHCTL hash_ctl;

if (pgws_profileQueries)
hash_ctl.keysize = offsetof(ProfileItem, count);
else
hash_ctl.keysize = offsetof(ProfileItem, queryId);

hash_ctl.entrysize = sizeof(ProfileItem);
saved_profile_dimensions = pgws_profile_dimensions;
/* Fields that are not in dimensions mask are zero and are included in key */
hash_ctl.keysize = offsetof(Sample, count);
hash_ctl.entrysize = sizeof(Sample);
return hash_create("Waits profile hash", 1024, &hash_ctl,
HASH_ELEM | HASH_BLOBS);
}
Expand Down Expand Up @@ -377,6 +413,18 @@ pgws_collector_main(Datum main_arg)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);

/* Reset profile and history if needed */
if (pgws_history_dimensions != saved_history_dimensions)
{
pfree(observations.samples);
alloc_history(&observations, pgws_historySize);
}
if (pgws_profile_dimensions != saved_profile_dimensions)
{
hash_destroy(profile_hash);
profile_hash = make_profile_hash();
}
}

/* Calculate time to next sample for history or profile */
Expand Down
Loading