*/
#include "postgres.h"
-#include "pgpa_identifier.h"
#include "pg_plan_advice.h"
+#include "pgpa_collector.h"
+#include "pgpa_identifier.h"
#include "pgpa_output.h"
#include "pgpa_walker.h"
PG_MODULE_MAGIC;
static pgpa_shared_state *pgpa_state = NULL;
+static dsa_area *pgpa_dsa_area = NULL;
/* GUC variables */
-static int pg_plan_advice_local_collection_limit = 0;
-static int pg_plan_advice_shared_collection_limit = 0;
+int pg_plan_advice_local_collection_limit = 0;
+int pg_plan_advice_shared_collection_limit = 0;
/* Saved hook values */
static ExecutorStart_hook_type prev_ExecutorStart = NULL;
+/* Memory context */
+static MemoryContext pgpa_memory_context = NULL;
+
static bool pg_plan_advice_ExecutorStart(QueryDesc *queryDesc, int eflags);
-static void pgpa_collect_advice(PlannedStmt *pstmt);
+static void pgpa_generate_advice(PlannedStmt *pstmt, const char *query_string);
/*
* Initialize this module.
{
pgpa_shared_state *state = (pgpa_shared_state *) ptr;
- state->area = DSA_HANDLE_INVALID;
LWLockInitialize(&state->lock, LWLockNewTrancheId());
+ state->dsa_tranche = LWLockNewTrancheId();
+ state->area = DSA_HANDLE_INVALID;
state->advice_count = 0;
state->advice_array = InvalidDsaPointer;
}
+/*
+ * Return a pointer to a memory context where long-lived data managed by this
+ * module can be stored.
+ */
+MemoryContext
+pg_plan_advice_get_mcxt(void)
+{
+ if (pgpa_memory_context == NULL)
+ pgpa_memory_context = AllocSetContextCreate(TopMemoryContext,
+ "pg_plan_advice",
+ ALLOCSET_DEFAULT_SIZES);
+
+ return pgpa_memory_context;
+}
+
/*
* Get a pointer to our shared state.
*
* this backend has not yet accessed it, attach to it. Otherwise, just return
* our cached pointer.
*
- * Along the way, make sure the relevant LWLock tranche is registered.
+ * Along the way, make sure the relevant LWLock tranches are registered.
*/
pgpa_shared_state *
pg_plan_advice_attach(void)
{
state = GetNamedDSMSegment("pg_plan_advice", sizeof(pgpa_shared_state),
pgpa_init_shared_state, &found);
- LWLockRegisterTranche(pgpa_state->lock.tranche, "pg_plan_advice");
+ LWLockRegisterTranche(state->lock.tranche, "pg_plan_advice_lock");
+ LWLockRegisterTranche(state->dsa_tranche, "pg_plan_advice_dsa");
pgpa_state = state;
}
return pgpa_state;
}
+dsa_area *
+pg_plan_advice_dsa_area(void)
+{
+ if (pgpa_dsa_area == NULL)
+ {
+ pgpa_shared_state *state = pg_plan_advice_attach();
+ dsa_handle area_handle;
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(pg_plan_advice_get_mcxt());
+
+ LWLockAcquire(&state->lock, LW_EXCLUSIVE);
+ area_handle = state->area;
+ if (area_handle == DSA_HANDLE_INVALID)
+ {
+ pgpa_dsa_area = dsa_create(state->dsa_tranche);
+ state->area = dsa_get_handle(pgpa_dsa_area);
+ LWLockRelease(&state->lock);
+ }
+ else
+ {
+ LWLockRelease(&state->lock);
+ pgpa_dsa_area = dsa_attach(area_handle);
+ }
+
+ dsa_pin_mapping(pgpa_dsa_area);
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ return pgpa_dsa_area;
+}
+
static bool
pg_plan_advice_ExecutorStart(QueryDesc *queryDesc, int eflags)
{
if (pg_plan_advice_local_collection_limit > 0
|| pg_plan_advice_shared_collection_limit > 0)
- pgpa_collect_advice(pstmt);
+ pgpa_generate_advice(pstmt, queryDesc->sourceText);
if (prev_ExecutorStart)
return prev_ExecutorStart(queryDesc, eflags);
}
static void
-pgpa_collect_advice(PlannedStmt *pstmt)
+pgpa_generate_advice(PlannedStmt *pstmt, const char *query_string)
{
pgpa_plan_walker_context context;
StringInfoData buf;
initStringInfo(&buf);
pgpa_output_advice(&buf, &context, rt_identifiers);
- elog(LOG, "advice:\n%s", buf.data);
+ pgpa_collect_advice(pstmt->queryId, query_string, buf.data);
}
#include "postgres.h"
+#include "pg_plan_advice.h"
+#include "pgpa_collector.h"
+
#include "datatype/timestamp.h"
+#include "funcapi.h"
+#include "miscadmin.h"
#include "nodes/pg_list.h"
+#include "utils/builtins.h"
+#include "utils/timestamp.h"
+
+PG_FUNCTION_INFO_V1(pg_get_collected_local_advice);
+
+#define ADVICE_CHUNK_SIZE 1024
+#define ADVICE_CHUNK_ARRAY_SIZE 64
typedef struct pgpa_collected_advice
{
Oid userid; /* user OID */
Oid dbid; /* database OID */
uint64 queryid; /* query identifier */
- int nesting_level; /* query nesting level */
TimestampTz timestamp; /* query timestamp */
- char advice_offset; /* start of advice in textual data */
+ int advice_offset; /* start of advice in textual data */
char textual_data[FLEXIBLE_ARRAY_MEMBER];
} pgpa_collected_advice;
-static List *locally_collected_advice = NIL;
+typedef struct pgpa_local_advice_chunk
+{
+ pgpa_collected_advice *entries[ADVICE_CHUNK_SIZE];
+} pgpa_local_advice_chunk;
+
+typedef struct pgpa_local_advice
+{
+ uint64 next_id;
+ uint64 oldest_id;
+ uint64 base_id;
+ int chunk_array_allocated_size;
+ pgpa_local_advice_chunk **chunks;
+} pgpa_local_advice;
+
+static pgpa_local_advice *local_advice;
-extern void
-pgpa_save_collected_advice(Oid userid, Oid dbid, uint64 queryId,
- int nesting_level, TimestampTz timestamp,
- const char *query_string, const char *advice_string);
+static pgpa_collected_advice *pgpa_make_collected_advice(Oid userid,
+ Oid dbid,
+ uint64 queryId,
+ TimestampTz timestamp,
+ const char *query_string,
+ const char *advice_string,
+ dsa_area *area,
+ dsa_pointer *result);
+static void pgpa_store_local_advice(pgpa_collected_advice *ca);
+static void pgpa_trim_local_advice(void);
-#if 0
static inline const char *
query_string(pgpa_collected_advice *ca)
{
{
return ca->textual_data + ca->advice_offset;
}
-#endif
+
+#define PG_GET_ADVICE_COLUMNS 7
void
-pgpa_save_collected_advice(Oid userid, Oid dbid, uint64 queryId,
- int nesting_level, TimestampTz timestamp,
- const char *query_string, const char *advice_string)
+pgpa_collect_advice(uint64 queryId, const char *query_string,
+ const char *advice_string)
+{
+ Oid userid = GetUserId();
+ Oid dbid = MyDatabaseId;
+ TimestampTz now = GetCurrentTimestamp();
+
+ if (pg_plan_advice_local_collection_limit > 0)
+ {
+ pgpa_collected_advice *ca;
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(pg_plan_advice_get_mcxt());
+ ca = pgpa_make_collected_advice(userid, dbid, queryId, now,
+ query_string, advice_string,
+ NULL, NULL);
+ pgpa_store_local_advice(ca);
+ pgpa_trim_local_advice();
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ if (pg_plan_advice_shared_collection_limit > 0)
+ {
+ dsa_area *area = pg_plan_advice_dsa_area();
+ dsa_pointer ca_pointer;
+
+ pgpa_make_collected_advice(userid, dbid, queryId, now,
+ query_string, advice_string, area,
+ &ca_pointer);
+ /* XXX do something */
+ /* XXX over limit? */
+ }
+}
+
+/*
+ * Allocate and fill a new pgpa_collected_advice object.
+ *
+ * If area != NULL, it is used to allocate the new object, and the resulting
+ * dsa_pointer is returned via *result.
+ *
+ * If area == NULL, the new object is allocated in the current memory context,
+ * and result is not examined or modified.
+ */
+static pgpa_collected_advice *
+pgpa_make_collected_advice(Oid userid, Oid dbid, uint64 queryId,
+ TimestampTz timestamp,
+ const char *query_string,
+ const char *advice_string,
+ dsa_area *area, dsa_pointer *result)
{
size_t query_string_length = strlen(query_string) + 1;
size_t advice_string_length = strlen(advice_string) + 1;
total_length = offsetof(pgpa_collected_advice, textual_data)
+ query_string_length + advice_string_length;
- ca = palloc(total_length);
- ca->userid = userid;
- ca->dbid = dbid;
+ if (area == NULL)
+ ca = palloc(total_length);
+ else
+ {
+ *result = dsa_allocate(area, total_length);
+ ca = dsa_get_address(area, *result);
+ }
+
+ ca->userid = GetUserId();
+ ca->dbid = MyDatabaseId;
ca->queryid = queryId;
- ca->nesting_level = nesting_level;
ca->timestamp = timestamp;
+ ca->advice_offset = query_string_length;
memcpy(ca->textual_data, query_string, query_string_length);
- memcpy(&ca->textual_data[query_string_length],
+ memcpy(&ca->textual_data[ca->advice_offset],
advice_string, advice_string_length);
- locally_collected_advice = lappend(locally_collected_advice, ca);
+ return ca;
+}
+
+/*
+ * Add a pg_collected_advice object to our backend-local advice collection.
+ *
+ * Caller is responsible for switching to the appropriate memory context;
+ * the provided object should have been allocated in that same context.
+ */
+static void
+pgpa_store_local_advice(pgpa_collected_advice *ca)
+{
+ uint64 chunk_number;
+ uint64 chunk_offset;
+ pgpa_local_advice *la = local_advice;
+
+ /* If the local advice collector isn't initialized yet, do that now. */
+ if (la == NULL)
+ {
+ la = palloc0(sizeof(pgpa_local_advice));
+ la->chunk_array_allocated_size = ADVICE_CHUNK_ARRAY_SIZE;
+ la->chunks = palloc0_array(pgpa_local_advice_chunk *,
+ la->chunk_array_allocated_size);
+ local_advice = la;
+ }
+
+ /* Compute chunk and offset at which to store this advice. */
+ chunk_number = (la->next_id - la->base_id) / ADVICE_CHUNK_SIZE;
+ chunk_offset = (la->next_id - la->base_id) % ADVICE_CHUNK_SIZE;
+
+ /* Extend chunk array, if needed. */
+ if (chunk_number > la->chunk_array_allocated_size)
+ {
+ int new_size;
+
+ new_size = la->chunk_array_allocated_size + ADVICE_CHUNK_ARRAY_SIZE;
+ la->chunks = repalloc0_array(la->chunks,
+ pgpa_local_advice_chunk *,
+ la->chunk_array_allocated_size,
+ new_size);
+ la->chunk_array_allocated_size = new_size;
+ }
+
+ /* Allocate new chunk, if needed. */
+ if (la->chunks[chunk_number] == NULL)
+ la->chunks[chunk_number] = palloc0_object(pgpa_local_advice_chunk);
+
+ /* Save pointer and bump next-id counter. */
+ Assert(la->chunks[chunk_number]->entries[chunk_offset] == NULL);
+ la->chunks[chunk_number]->entries[chunk_offset] = ca;
+ ++la->next_id;
+}
+
+/*
+ * Remove local advice in excess of pg_plan_advice.local_collection_limit.
+ */
+static void
+pgpa_trim_local_advice(void)
+{
+ pgpa_local_advice *la = local_advice;
+ uint64 current_count;
+ uint64 trim_count;
+ uint64 total_chunk_count;
+ uint64 trim_chunk_count;
+ uint64 remaining_chunk_count;
+
+ /* If we haven't yet reached the limit, there's nothing to do. */
+ current_count = la->next_id - la->oldest_id;
+ if (current_count < pg_plan_advice_local_collection_limit)
+ return;
+
+ /* Free enough entries to get us back down to the limit. */
+ trim_count = current_count - pg_plan_advice_local_collection_limit;
+ while (trim_count > 0)
+ {
+ uint64 chunk_number;
+ uint64 chunk_offset;
+
+ chunk_number = (la->oldest_id - la->base_id) / ADVICE_CHUNK_SIZE;
+ chunk_offset = (la->oldest_id - la->base_id) % ADVICE_CHUNK_SIZE;
+
+ Assert(la->chunks[chunk_number]->entries[chunk_offset] != NULL);
+ pfree(la->chunks[chunk_number]->entries[chunk_offset]);
+ la->chunks[chunk_number]->entries[chunk_offset] = NULL;
+ ++la->oldest_id;
+ --trim_count;
+ }
+
+ /* Free any chunks that are now entirely unused. */
+ trim_chunk_count = (la->oldest_id - la->base_id) / ADVICE_CHUNK_SIZE;
+ for (uint64 n = 0; n < trim_chunk_count; ++n)
+ pfree(la->chunks[n]);
+
+ /* Slide remaining chunk pointers back toward the base of the array. */
+ total_chunk_count = (la->next_id - la->base_id +
+ ADVICE_CHUNK_SIZE - 1) / ADVICE_CHUNK_SIZE;
+ remaining_chunk_count = total_chunk_count - trim_chunk_count;
+ if (remaining_chunk_count > 0)
+ memmove(&la->chunks[0], &la->chunks[trim_chunk_count],
+ sizeof(pgpa_local_advice_chunk *) * remaining_chunk_count);
+
+ /* Adjust base ID value accordingly. */
+ la->base_id += trim_chunk_count * ADVICE_CHUNK_SIZE;
+}
+
+/*
+ * SQL-callable SRF to return locally collected advice
+ */
+Datum
+pg_get_collected_local_advice(PG_FUNCTION_ARGS)
+{
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ pgpa_local_advice *la = local_advice;
+
+ /*
+ * XXX. Is this the correct thing from a security point of view?
+ */
+ if (InSecurityRestrictedOperation())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("cannot call \"%s\" within security-restricted operation",
+ "pg_get_collected_local_advice")));
+
+ InitMaterializedSRF(fcinfo, 0);
+
+ if (la == NULL)
+ return (Datum) 0;
+
+ for (uint64 id = la->oldest_id; id < la->next_id; ++id)
+ {
+ uint64 chunk_number;
+ uint64 chunk_offset;
+ pgpa_collected_advice *ca;
+ Datum values[PG_GET_ADVICE_COLUMNS];
+ bool nulls[PG_GET_ADVICE_COLUMNS] = {0};
+
+ chunk_number = (id - la->base_id) / ADVICE_CHUNK_SIZE;
+ chunk_offset = (id - la->base_id) % ADVICE_CHUNK_SIZE;
+
+ ca = la->chunks[chunk_number]->entries[chunk_offset];
+
+ values[0] = UInt64GetDatum(id);
+ values[1] = ObjectIdGetDatum(ca->userid);
+ values[2] = ObjectIdGetDatum(ca->dbid);
+ values[3] = UInt64GetDatum(ca->queryid);
+ values[4] = TimestampGetDatum(ca->timestamp);
+ values[5] = CStringGetTextDatum(query_string(ca));
+ values[6] = CStringGetTextDatum(advice_string(ca));
+
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
+ values, nulls);
+ }
+
+ return (Datum) 0;
}