From f111c4f38af06e2819dd3c3ae4e9ca19dd6ae075 Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Fri, 9 May 2014 20:19:17 +0800 Subject: [PATCH] bdr: move heap and index access routines into bdr_executor.c --- contrib/bdr/bdr.h | 84 ++++++++---- contrib/bdr/bdr_apply.c | 242 -------------------------------- contrib/bdr/bdr_executor.c | 265 ++++++++++++++++++++++++++++++++++++ contrib/bdr/worker.mk | 3 +- src/tools/msvc/Mkvcbuild.pm | 3 +- 5 files changed, 323 insertions(+), 274 deletions(-) create mode 100644 contrib/bdr/bdr_executor.c diff --git a/contrib/bdr/bdr.h b/contrib/bdr/bdr.h index 1f27e94def..dd29f9201b 100644 --- a/contrib/bdr/bdr.h +++ b/contrib/bdr/bdr.h @@ -28,6 +28,13 @@ */ struct pg_conn; +/* Forward declarations */ +struct TupleTableSlot; /* from executor/tuptable.h */ +struct EState; /* from nodes/execnodes.h */ +struct ScanKeyData; /* from access/skey.h for ScanKey */ +enum LockTupleMode; /* from access/heapam.h */ + + /* * Flags to indicate which fields are present in a commit record sent by the * output plugin. @@ -37,6 +44,14 @@ typedef enum BdrOutputCommitFlags BDR_OUTPUT_COMMIT_HAS_ORIGIN = 1 } BdrOutputCommitFlags; +typedef enum BDRConflictHandlerType +{ + BDRUpdateUpdateConflictHandler, + BDRUpdateDeleteConflictHandler, + BDRInsertInsertConflictHandler, + BDRInsertUpdateConflictHandler +} BDRConflictHandlerType; + /* * BdrApplyWorker describes a BDR worker connection. * @@ -134,6 +149,29 @@ typedef struct BdrConnectionConfig bool is_valid; } BdrConnectionConfig; +typedef struct BDRConflictHandler +{ + Oid handler_oid; + BDRConflictHandlerType handler_type; + uint64 timeframe; +} BDRConflictHandler; + +/* + * This structure is for caching relation specific information, such as + * conflict handlers. + */ +typedef struct BDRRelation +{ + /* hash key */ + Oid reloid; + + Relation rel; + + BDRConflictHandler *conflict_handlers; + size_t conflict_handlers_len; +} BDRRelation; + + /* * Params for every connection in bdr.connections. * @@ -183,6 +221,22 @@ extern void bdr_process_remote_action(StringInfo s); extern void fetch_sysid_via_node_id(RepNodeId node_id, uint64 *sysid, TimeLineID *tli); +/* Index maintenance, heap access, etc */ +extern struct EState * bdr_create_rel_estate(Relation rel); +extern void UserTableUpdateIndexes(struct EState *estate, + struct TupleTableSlot *slot); +extern void UserTableUpdateOpenIndexes(struct EState *estate, + struct TupleTableSlot *slot); +extern void build_index_scan_keys(struct EState *estate, + struct ScanKeyData **scan_keys, + struct TupleTableSlot *slot); +extern void build_index_scan_key(struct ScanKeyData *skey, Relation rel, + Relation idxrel, + struct TupleTableSlot *slot); +extern bool find_pkey_tuple(struct ScanKeyData *skey, BDRRelation *rel, + Relation idxrel, struct TupleTableSlot *slot, bool + lock, enum LockTupleMode mode); + /* sequence support */ extern void bdr_sequencer_shmem_init(int nnodes, int sequencers); extern void bdr_sequencer_init(int seq_slot); @@ -243,36 +297,6 @@ bdr_establish_connection_and_slot(BdrConnectionConfig *cfg, Name out_slot_name, uint64 *out_sysid, TimeLineID* out_timeline, RepNodeId *out_replication_identifier, char **out_snapshot); -typedef enum BDRConflictHandlerType -{ - BDRUpdateUpdateConflictHandler, - BDRUpdateDeleteConflictHandler, - BDRInsertInsertConflictHandler, - BDRInsertUpdateConflictHandler -} BDRConflictHandlerType; - -typedef struct BDRConflictHandler -{ - Oid handler_oid; - BDRConflictHandlerType handler_type; - uint64 timeframe; -} BDRConflictHandler; - -/* - * This structure is for caching relation specific information, such as - * conflict handlers. - */ -typedef struct BDRRelation -{ - /* hash key */ - Oid reloid; - - Relation rel; - - BDRConflictHandler *conflict_handlers; - size_t conflict_handlers_len; -} BDRRelation; - /* use instead of heap_open()/heap_close() */ extern BDRRelation *bdr_heap_open(Oid reloid, LOCKMODE lockmode); extern void bdr_heap_close(BDRRelation * rel, LOCKMODE lockmode); diff --git a/contrib/bdr/bdr_apply.c b/contrib/bdr/bdr_apply.c index 5f4ba4da4a..5866a58c93 100644 --- a/contrib/bdr/bdr_apply.c +++ b/contrib/bdr/bdr_apply.c @@ -22,7 +22,6 @@ #include "pgstat.h" #include "access/committs.h" -#include "access/heapam.h" #include "access/htup_details.h" #include "access/relscan.h" #include "access/xact.h" @@ -32,17 +31,13 @@ #include "catalog/namespace.h" #include "catalog/pg_type.h" -#include "executor/executor.h" - #include "libpq/pqformat.h" -#include "parser/parse_relation.h" #include "parser/parse_type.h" #include "replication/logical.h" #include "replication/replication_identifier.h" -#include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/lwlock.h" @@ -57,7 +52,6 @@ #include "utils/memutils.h" #include "utils/snapmgr.h" #include "utils/syscache.h" -#include "utils/tqual.h" /* Useful for development: #define VERBOSE_INSERT @@ -99,15 +93,6 @@ BdrApplyWorker *bdr_apply_worker = NULL; */ BdrConnectionConfig *bdr_apply_config = NULL; -static void build_index_scan_keys(EState *estate, ScanKey *scan_keys, TupleTableSlot *slot); -static void build_index_scan_key(ScanKey skey, Relation rel, Relation idx_rel, TupleTableSlot *slot); -static bool find_pkey_tuple(ScanKey skey, BDRRelation *rel, Relation idx_rel, TupleTableSlot *slot, - bool lock, LockTupleMode mode); - -static void UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot); -static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot); -static EState *bdr_create_rel_estate(Relation rel); - static BDRRelation *read_rel(StringInfo s, LOCKMODE mode); extern void read_tuple_parts(StringInfo s, BDRRelation *rel, BDRTupleData *tup); static void read_tuple(StringInfo s, BDRRelation *rel, TupleTableSlot *slot); @@ -1617,233 +1602,6 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple) } } -static EState * -bdr_create_rel_estate(Relation rel) -{ - EState *estate; - ResultRelInfo *resultRelInfo; - - estate = CreateExecutorState(); - - resultRelInfo = makeNode(ResultRelInfo); - resultRelInfo->ri_RangeTableIndex = 1; /* dummy */ - resultRelInfo->ri_RelationDesc = rel; - resultRelInfo->ri_TrigInstrument = NULL; - - estate->es_result_relations = resultRelInfo; - estate->es_num_result_relations = 1; - estate->es_result_relation_info = resultRelInfo; - - return estate; -} - -static void -UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot) -{ - /* HOT update does not require index inserts */ - if (HeapTupleIsHeapOnly(slot->tts_tuple)) - return; - - ExecOpenIndices(estate->es_result_relation_info); - UserTableUpdateOpenIndexes(estate, slot); - ExecCloseIndices(estate->es_result_relation_info); -} - -static void -UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot) -{ - List *recheckIndexes = NIL; - - /* HOT update does not require index inserts */ - if (HeapTupleIsHeapOnly(slot->tts_tuple)) - return; - - if (estate->es_result_relation_info->ri_NumIndices > 0) - { - recheckIndexes = ExecInsertIndexTuples(slot, - &slot->tts_tuple->t_self, - estate); - - if (recheckIndexes != NIL) - ereport(ERROR, - (errmsg("bdr doesn't support index rechecks"))); - } - - /* FIXME: recheck the indexes */ - list_free(recheckIndexes); -} - -static void -build_index_scan_keys(EState *estate, ScanKey *scan_keys, TupleTableSlot *slot) -{ - ResultRelInfo *relinfo; - int i; - - relinfo = estate->es_result_relation_info; - - /* build scankeys for each index */ - for (i = 0; i < relinfo->ri_NumIndices; i++) - { - IndexInfo *ii = relinfo->ri_IndexRelationInfo[i]; - - if (!ii->ii_Unique) - continue; - - scan_keys[i] = palloc(ii->ii_NumIndexAttrs * sizeof(ScanKeyData)); - build_index_scan_key(scan_keys[i], - relinfo->ri_RelationDesc, - relinfo->ri_IndexRelationDescs[i], - slot); - } -} - -/* - * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that - * is setup to match 'rel' (*NOT* idxrel!). - */ -static void -build_index_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *slot) -{ - int attoff; - Datum indclassDatum; - Datum indkeyDatum; - bool isnull; - oidvector *opclass; - int2vector *indkey; - HeapTuple key = slot->tts_tuple; - - indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple, - Anum_pg_index_indclass, &isnull); - Assert(!isnull); - opclass = (oidvector *) DatumGetPointer(indclassDatum); - - indkeyDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple, - Anum_pg_index_indkey, &isnull); - Assert(!isnull); - indkey = (int2vector *) DatumGetPointer(indkeyDatum); - - - for (attoff = 0; attoff < RelationGetNumberOfAttributes(idxrel); attoff++) - { - Oid operator; - Oid opfamily; - RegProcedure regop; - int pkattno = attoff + 1; - int mainattno = indkey->values[attoff]; - Oid atttype = attnumTypeId(rel, mainattno); - Oid optype = get_opclass_input_type(opclass->values[attoff]); - - opfamily = get_opclass_family(opclass->values[attoff]); - - operator = get_opfamily_member(opfamily, optype, - optype, - BTEqualStrategyNumber); - - if (!OidIsValid(operator)) - elog(ERROR, - "could not lookup equality operator for type %u, optype %u in opfamily %u", - atttype, optype, opfamily); - - regop = get_opcode(operator); - - /* FIXME: convert type? */ - ScanKeyInit(&skey[attoff], - pkattno, - BTEqualStrategyNumber, - regop, - fastgetattr(key, mainattno, - RelationGetDescr(rel), &isnull)); - if (isnull) - elog(ERROR, "index tuple with a null column"); - } -} - -/* - * Search the index 'idxrel' for a tuple identified by 'skey' in 'rel'. - * - * If a matching tuple is found setup 'tid' to point to it and return true, - * false is returned otherwise. - */ -static bool -find_pkey_tuple(ScanKey skey, BDRRelation *rel, Relation idxrel, - TupleTableSlot *slot, bool lock, LockTupleMode mode) -{ - HeapTuple scantuple; - bool found; - IndexScanDesc scan; - SnapshotData snap; - TransactionId xwait; - - InitDirtySnapshot(snap); - scan = index_beginscan(rel->rel, idxrel, - &snap, - RelationGetNumberOfAttributes(idxrel), - 0); - -retry: - found = false; - - index_rescan(scan, skey, RelationGetNumberOfAttributes(idxrel), NULL, 0); - - if ((scantuple = index_getnext(scan, ForwardScanDirection)) != NULL) - { - found = true; - /* FIXME: Improve TupleSlot to not require copying the whole tuple */ - ExecStoreTuple(scantuple, slot, InvalidBuffer, false); - ExecMaterializeSlot(slot); - - xwait = TransactionIdIsValid(snap.xmin) ? - snap.xmin : snap.xmax; - - if (TransactionIdIsValid(xwait)) - { - XactLockTableWait(xwait, NULL, NULL, XLTW_None); - goto retry; - } - } - - if (lock && found) - { - Buffer buf; - HeapUpdateFailureData hufd; - HTSU_Result res; - HeapTupleData locktup; - - ItemPointerCopy(&slot->tts_tuple->t_self, &locktup.t_self); - - PushActiveSnapshot(GetLatestSnapshot()); - - res = heap_lock_tuple(rel->rel, &locktup, GetCurrentCommandId(false), mode, - false /* wait */, - false /* don't follow updates */, - &buf, &hufd); - /* the tuple slot already has the buffer pinned */ - ReleaseBuffer(buf); - - PopActiveSnapshot(); - - switch (res) - { - case HeapTupleMayBeUpdated: - break; - case HeapTupleUpdated: - /* XXX: Improve handling here */ - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent update, retrying"))); - goto retry; - default: - elog(ERROR, "unexpected HTSU_Result after locking: %u", res); - break; - } - } - - index_endscan(scan); - - return found; -} - - /* * Read a remote action type and process the action record. * diff --git a/contrib/bdr/bdr_executor.c b/contrib/bdr/bdr_executor.c new file mode 100644 index 0000000000..70b1098f1a --- /dev/null +++ b/contrib/bdr/bdr_executor.c @@ -0,0 +1,265 @@ +/* ------------------------------------------------------------------------- + * + * bdr_executor.c + * Relation and index access and maintenance routines required by bdr + * + * BDR does a lot of direct access to indexes and relations, some of which + * isn't handled by simple calls into the backend. Most of it lives here. + * + * Copyright (C) 2012-2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/bdr/bdr_executor.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "bdr.h" + +#include "access/heapam.h" +#include "access/skey.h" +#include "access/xact.h" + +#include "executor/executor.h" +#include "executor/tuptable.h" + +#include "nodes/execnodes.h" + +#include "parser/parse_relation.h" + +#include "storage/bufmgr.h" +#include "storage/lmgr.h" + +#include "utils/lsyscache.h" +#include "utils/snapmgr.h" +#include "utils/syscache.h" +#include "utils/tqual.h" + +EState * +bdr_create_rel_estate(Relation rel) +{ + EState *estate; + ResultRelInfo *resultRelInfo; + + estate = CreateExecutorState(); + + resultRelInfo = makeNode(ResultRelInfo); + resultRelInfo->ri_RangeTableIndex = 1; /* dummy */ + resultRelInfo->ri_RelationDesc = rel; + resultRelInfo->ri_TrigInstrument = NULL; + + estate->es_result_relations = resultRelInfo; + estate->es_num_result_relations = 1; + estate->es_result_relation_info = resultRelInfo; + + return estate; +} + +void +UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot) +{ + /* HOT update does not require index inserts */ + if (HeapTupleIsHeapOnly(slot->tts_tuple)) + return; + + ExecOpenIndices(estate->es_result_relation_info); + UserTableUpdateOpenIndexes(estate, slot); + ExecCloseIndices(estate->es_result_relation_info); +} + +void +UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot) +{ + List *recheckIndexes = NIL; + + /* HOT update does not require index inserts */ + if (HeapTupleIsHeapOnly(slot->tts_tuple)) + return; + + if (estate->es_result_relation_info->ri_NumIndices > 0) + { + recheckIndexes = ExecInsertIndexTuples(slot, + &slot->tts_tuple->t_self, + estate); + + if (recheckIndexes != NIL) + ereport(ERROR, + (errmsg("bdr doesn't support index rechecks"))); + } + + /* FIXME: recheck the indexes */ + list_free(recheckIndexes); +} + +void +build_index_scan_keys(EState *estate, ScanKey *scan_keys, TupleTableSlot *slot) +{ + ResultRelInfo *relinfo; + int i; + + relinfo = estate->es_result_relation_info; + + /* build scankeys for each index */ + for (i = 0; i < relinfo->ri_NumIndices; i++) + { + IndexInfo *ii = relinfo->ri_IndexRelationInfo[i]; + + if (!ii->ii_Unique) + continue; + + scan_keys[i] = palloc(ii->ii_NumIndexAttrs * sizeof(ScanKeyData)); + build_index_scan_key(scan_keys[i], + relinfo->ri_RelationDesc, + relinfo->ri_IndexRelationDescs[i], + slot); + } +} + +/* + * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that + * is setup to match 'rel' (*NOT* idxrel!). + */ +void +build_index_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *slot) +{ + int attoff; + Datum indclassDatum; + Datum indkeyDatum; + bool isnull; + oidvector *opclass; + int2vector *indkey; + HeapTuple key = slot->tts_tuple; + + indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple, + Anum_pg_index_indclass, &isnull); + Assert(!isnull); + opclass = (oidvector *) DatumGetPointer(indclassDatum); + + indkeyDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple, + Anum_pg_index_indkey, &isnull); + Assert(!isnull); + indkey = (int2vector *) DatumGetPointer(indkeyDatum); + + + for (attoff = 0; attoff < RelationGetNumberOfAttributes(idxrel); attoff++) + { + Oid operator; + Oid opfamily; + RegProcedure regop; + int pkattno = attoff + 1; + int mainattno = indkey->values[attoff]; + Oid atttype = attnumTypeId(rel, mainattno); + Oid optype = get_opclass_input_type(opclass->values[attoff]); + + opfamily = get_opclass_family(opclass->values[attoff]); + + operator = get_opfamily_member(opfamily, optype, + optype, + BTEqualStrategyNumber); + + if (!OidIsValid(operator)) + elog(ERROR, + "could not lookup equality operator for type %u, optype %u in opfamily %u", + atttype, optype, opfamily); + + regop = get_opcode(operator); + + /* FIXME: convert type? */ + ScanKeyInit(&skey[attoff], + pkattno, + BTEqualStrategyNumber, + regop, + fastgetattr(key, mainattno, + RelationGetDescr(rel), &isnull)); + if (isnull) + elog(ERROR, "index tuple with a null column"); + } +} + +/* + * Search the index 'idxrel' for a tuple identified by 'skey' in 'rel'. + * + * If a matching tuple is found setup 'tid' to point to it and return true, + * false is returned otherwise. + */ +bool +find_pkey_tuple(ScanKey skey, BDRRelation *rel, Relation idxrel, + TupleTableSlot *slot, bool lock, LockTupleMode mode) +{ + HeapTuple scantuple; + bool found; + IndexScanDesc scan; + SnapshotData snap; + TransactionId xwait; + + InitDirtySnapshot(snap); + scan = index_beginscan(rel->rel, idxrel, + &snap, + RelationGetNumberOfAttributes(idxrel), + 0); + +retry: + found = false; + + index_rescan(scan, skey, RelationGetNumberOfAttributes(idxrel), NULL, 0); + + if ((scantuple = index_getnext(scan, ForwardScanDirection)) != NULL) + { + found = true; + /* FIXME: Improve TupleSlot to not require copying the whole tuple */ + ExecStoreTuple(scantuple, slot, InvalidBuffer, false); + ExecMaterializeSlot(slot); + + xwait = TransactionIdIsValid(snap.xmin) ? + snap.xmin : snap.xmax; + + if (TransactionIdIsValid(xwait)) + { + XactLockTableWait(xwait, NULL, NULL, XLTW_None); + goto retry; + } + } + + if (lock && found) + { + Buffer buf; + HeapUpdateFailureData hufd; + HTSU_Result res; + HeapTupleData locktup; + + ItemPointerCopy(&slot->tts_tuple->t_self, &locktup.t_self); + + PushActiveSnapshot(GetLatestSnapshot()); + + res = heap_lock_tuple(rel->rel, &locktup, GetCurrentCommandId(false), mode, + false /* wait */, + false /* don't follow updates */, + &buf, &hufd); + /* the tuple slot already has the buffer pinned */ + ReleaseBuffer(buf); + + PopActiveSnapshot(); + + switch (res) + { + case HeapTupleMayBeUpdated: + break; + case HeapTupleUpdated: + /* XXX: Improve handling here */ + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent update, retrying"))); + goto retry; + default: + elog(ERROR, "unexpected HTSU_Result after locking: %u", res); + break; + } + } + + index_endscan(scan); + + return found; +} + diff --git a/contrib/bdr/worker.mk b/contrib/bdr/worker.mk index 47db8ffe1f..042863f5f2 100644 --- a/contrib/bdr/worker.mk +++ b/contrib/bdr/worker.mk @@ -2,7 +2,8 @@ MODULE_big = bdr OBJS = bdr.o bdr_apply.o bdr_compat.o bdr_commandfilter.o bdr_count.o \ - bdr_seq.o bdr_init_replica.o bdr_relcache.o bdr_conflict_handlers.o + bdr_seq.o bdr_init_replica.o bdr_relcache.o bdr_conflict_handlers.o \ + bdr_executor.o EXTENSION = bdr DATA = bdr--0.5.sql diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index 2d2011c713..3148d9d560 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -520,7 +520,8 @@ sub mkvcbuild $bdr_apply->AddFiles('contrib\bdr', 'bdr.c', 'bdr_apply.c', 'bdr_commandfilter.c', 'bdr_compat.c', 'bdr_count.c', 'bdr_seq.c', 'bdr_init_replica.c', - 'bdr_relcache.c', 'bdr_conflict_handlers.c'); + 'bdr_relcache.c', 'bdr_conflict_handlers.c', + 'bdr_executor.c'); $bdr_apply->AddReference($postgres); $bdr_apply->AddLibrary('wsock32.lib'); $bdr_apply->AddIncludeDir('src\interfaces\libpq'); -- 2.39.5