diff --git a/cstore_metadata_tables.c b/cstore_metadata_tables.c index 39e852c55..3843e4cd6 100644 --- a/cstore_metadata_tables.c +++ b/cstore_metadata_tables.c @@ -40,6 +40,12 @@ #include "cstore_metadata_serialization.h" +typedef struct +{ + Relation rel; + EState *estate; +} ModifyState; + static Oid CStoreStripeAttrRelationId(void); static Oid CStoreStripeAttrIndexRelationId(void); static Oid CStoreStripesRelationId(void); @@ -47,13 +53,13 @@ static Oid CStoreStripesIndexRelationId(void); static Oid CStoreTablesRelationId(void); static Oid CStoreTablesIndexRelationId(void); static Oid CStoreNamespaceId(void); -static void InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr, - uint64 existsSize, uint64 valuesSize, - uint64 skiplistSize); static int TableBlockRowCount(Oid relid); static void DeleteTableMetadataRowIfExists(Oid relid); -static void InsertTupleAndEnforceConstraints(Relation rel, HeapTuple heapTuple); -static void DeleteTupleAndEnforceConstraints(Relation rel, HeapTuple heapTuple); +static ModifyState * StartModifyRelation(Relation rel); +static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values, + bool *nulls); +static void DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple); +static void FinishModifyRelation(ModifyState *state); static EState * create_estate_for_relation(Relation rel); /* constants for cstore_stripe_attr */ @@ -86,10 +92,9 @@ static EState * create_estate_for_relation(Relation rel); void InitCStoreTableMetadata(Oid relid, int blockRowCount) { - Oid cstoreTableOid = InvalidOid; - Relation cstoreTable = NULL; - TupleDesc tupleDescriptor = NULL; - HeapTuple tuple = NULL; + Oid cstoreTablesOid = InvalidOid; + Relation cstoreTables = NULL; + ModifyState *modifyState = NULL; bool nulls[Natts_cstore_tables] = { 0 }; Datum values[Natts_cstore_tables] = { @@ -101,17 +106,16 @@ InitCStoreTableMetadata(Oid relid, int blockRowCount) DeleteTableMetadataRowIfExists(relid); - cstoreTableOid = CStoreTablesRelationId(); - cstoreTable = heap_open(cstoreTableOid, RowExclusiveLock); - tupleDescriptor = RelationGetDescr(cstoreTable); + cstoreTablesOid = CStoreTablesRelationId(); + cstoreTables = heap_open(cstoreTablesOid, RowExclusiveLock); - tuple = heap_form_tuple(tupleDescriptor, values, nulls); - - InsertTupleAndEnforceConstraints(cstoreTable, tuple); + modifyState = StartModifyRelation(cstoreTables); + InsertTupleAndEnforceConstraints(modifyState, values, nulls); + FinishModifyRelation(modifyState); CommandCounterIncrement(); - heap_close(cstoreTable, NoLock); + heap_close(cstoreTables, NoLock); } @@ -132,11 +136,10 @@ InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe) Oid cstoreStripesOid = CStoreStripesRelationId(); Relation cstoreStripes = heap_open(cstoreStripesOid, RowExclusiveLock); - TupleDesc tupleDescriptor = RelationGetDescr(cstoreStripes); - HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls); - - InsertTupleAndEnforceConstraints(cstoreStripes, tuple); + ModifyState *modifyState = StartModifyRelation(cstoreStripes); + InsertTupleAndEnforceConstraints(modifyState, values, nulls); + FinishModifyRelation(modifyState); CommandCounterIncrement(); @@ -268,7 +271,9 @@ DeleteTableMetadataRowIfExists(Oid relid) heapTuple = systable_getnext(scanDescriptor); if (HeapTupleIsValid(heapTuple)) { - DeleteTupleAndEnforceConstraints(cstoreTables, heapTuple); + ModifyState *modifyState = StartModifyRelation(cstoreTables); + DeleteTupleAndEnforceConstraints(modifyState, heapTuple); + FinishModifyRelation(modifyState); } systable_endscan_ordered(scanDescriptor); @@ -277,144 +282,33 @@ DeleteTableMetadataRowIfExists(Oid relid) } -/* - * InsertTupleAndEnforceConstraints inserts a tuple into a relation and - * makes sure constraints (e.g. FK constraints, NOT NULL, ...) are enforced. - */ -static void -InsertTupleAndEnforceConstraints(Relation rel, HeapTuple heapTuple) -{ - EState *estate = NULL; - TupleTableSlot *slot = NULL; - - estate = create_estate_for_relation(rel); - slot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel), &TTSOpsHeapTuple); - ExecStoreHeapTuple(heapTuple, slot, false); - - ExecOpenIndices(estate->es_result_relation_info, false); - - /* ExecSimpleRelationInsert executes any constraints */ - ExecSimpleRelationInsert(estate, slot); - - ExecCloseIndices(estate->es_result_relation_info); - - AfterTriggerEndQuery(estate); - ExecCleanUpTriggerState(estate); - ExecResetTupleTable(estate->es_tupleTable, false); - FreeExecutorState(estate); -} - - - -/* - * DeleteTupleAndEnforceConstraints deletes a tuple from a relation and - * makes sure constraints (e.g. FK constraints) are enforced. - */ -static void -DeleteTupleAndEnforceConstraints(Relation rel, HeapTuple heapTuple) -{ - EState *estate = create_estate_for_relation(rel); - ResultRelInfo *resultRelInfo = estate->es_result_relation_info; - - ItemPointer tid = &(heapTuple->t_self); - simple_table_tuple_delete(rel, tid, estate->es_snapshot); - - /* execute AFTER ROW DELETE Triggers to enforce constraints */ - ExecARDeleteTriggers(estate, resultRelInfo, - tid, NULL, NULL); - - AfterTriggerEndQuery(estate); - ExecCleanUpTriggerState(estate); - ExecResetTupleTable(estate->es_tupleTable, false); - FreeExecutorState(estate); -} - - -/* - * Based on a similar function from - * postgres/src/backend/replication/logical/worker.c. - * - * Executor state preparation for evaluation of constraint expressions, - * indexes and triggers. - * - * This is based on similar code in copy.c - */ -static EState * -create_estate_for_relation(Relation rel) -{ - EState *estate; - ResultRelInfo *resultRelInfo; - RangeTblEntry *rte; - - estate = CreateExecutorState(); - - rte = makeNode(RangeTblEntry); - rte->rtekind = RTE_RELATION; - rte->relid = RelationGetRelid(rel); - rte->relkind = rel->rd_rel->relkind; - rte->rellockmode = AccessShareLock; - ExecInitRangeTable(estate, list_make1(rte)); - - resultRelInfo = makeNode(ResultRelInfo); - InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0); - - estate->es_result_relations = resultRelInfo; - estate->es_num_result_relations = 1; - estate->es_result_relation_info = resultRelInfo; - - estate->es_output_cid = GetCurrentCommandId(true); - - /* Prepare to catch AFTER triggers. */ - AfterTriggerBeginQuery(); - - return estate; -} - - /* * SaveStripeFooter stores give StripeFooter as cstore_stripe_attr records. */ void SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer) { - for (AttrNumber attr = 1; attr <= footer->columnCount; attr++) - { - InsertStripeAttrRow(relid, stripe, attr, - footer->existsSizeArray[attr - 1], - footer->valueSizeArray[attr - 1], - footer->skipListSizeArray[attr - 1]); - } -} - - -/* - * InsertStripeAttrRow adds a row to cstore_stripe_attr. - */ -static void -InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr, - uint64 existsSize, uint64 valuesSize, - uint64 skiplistSize) -{ - bool nulls[Natts_cstore_stripe_attr] = { 0 }; - Datum values[Natts_cstore_stripe_attr] = { - ObjectIdGetDatum(relid), - Int64GetDatum(stripe), - Int16GetDatum(attr), - Int64GetDatum(existsSize), - Int64GetDatum(valuesSize), - Int64GetDatum(skiplistSize) - }; - Oid cstoreStripeAttrOid = CStoreStripeAttrRelationId(); Relation cstoreStripeAttrs = heap_open(cstoreStripeAttrOid, RowExclusiveLock); - TupleDesc tupleDescriptor = RelationGetDescr(cstoreStripeAttrs); - HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls); + ModifyState *modifyState = StartModifyRelation(cstoreStripeAttrs); - InsertTupleAndEnforceConstraints(cstoreStripeAttrs, tuple); + for (AttrNumber attr = 1; attr <= footer->columnCount; attr++) + { + bool nulls[Natts_cstore_stripe_attr] = { 0 }; + Datum values[Natts_cstore_stripe_attr] = { + ObjectIdGetDatum(relid), + Int64GetDatum(stripe), + Int16GetDatum(attr), + Int64GetDatum(footer->existsSizeArray[attr - 1]), + Int64GetDatum(footer->valueSizeArray[attr - 1]), + Int64GetDatum(footer->skipListSizeArray[attr - 1]) + }; - CommandCounterIncrement(); + InsertTupleAndEnforceConstraints(modifyState, values, nulls); + } + FinishModifyRelation(modifyState); heap_close(cstoreStripeAttrs, NoLock); } @@ -489,6 +383,118 @@ ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount) } +/* + * StartModifyRelation allocates resources for modifications. + */ +static ModifyState * +StartModifyRelation(Relation rel) +{ + ModifyState *modifyState = NULL; + EState *estate = create_estate_for_relation(rel); + + /* ExecSimpleRelationInsert, ... require caller to open indexes */ + ExecOpenIndices(estate->es_result_relation_info, false); + + modifyState = palloc(sizeof(ModifyState)); + modifyState->rel = rel; + modifyState->estate = estate; + + return modifyState; +} + + +/* + * InsertTupleAndEnforceConstraints inserts a tuple into a relation and makes + * sure constraints are enforced and indexes are updated. + */ +static void +InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values, bool *nulls) +{ + TupleDesc tupleDescriptor = RelationGetDescr(state->rel); + HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls); + TupleTableSlot *slot = ExecInitExtraTupleSlot(state->estate, tupleDescriptor, + &TTSOpsHeapTuple); + ExecStoreHeapTuple(tuple, slot, false); + + /* use ExecSimpleRelationInsert to enforce constraints */ + ExecSimpleRelationInsert(state->estate, slot); +} + + +/* + * DeleteTupleAndEnforceConstraints deletes a tuple from a relation and + * makes sure constraints (e.g. FK constraints) are enforced. + */ +static void +DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple) +{ + EState *estate = state->estate; + ResultRelInfo *resultRelInfo = estate->es_result_relation_info; + + ItemPointer tid = &(heapTuple->t_self); + simple_table_tuple_delete(state->rel, tid, estate->es_snapshot); + + /* execute AFTER ROW DELETE Triggers to enforce constraints */ + ExecARDeleteTriggers(estate, resultRelInfo, tid, NULL, NULL); +} + + +/* + * FinishModifyRelation cleans up resources after modifications are done. + */ +static void +FinishModifyRelation(ModifyState *state) +{ + ExecCloseIndices(state->estate->es_result_relation_info); + + AfterTriggerEndQuery(state->estate); + ExecCleanUpTriggerState(state->estate); + ExecResetTupleTable(state->estate->es_tupleTable, false); + FreeExecutorState(state->estate); +} + + +/* + * Based on a similar function from + * postgres/src/backend/replication/logical/worker.c. + * + * Executor state preparation for evaluation of constraint expressions, + * indexes and triggers. + * + * This is based on similar code in copy.c + */ +static EState * +create_estate_for_relation(Relation rel) +{ + EState *estate; + ResultRelInfo *resultRelInfo; + RangeTblEntry *rte; + + estate = CreateExecutorState(); + + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = RelationGetRelid(rel); + rte->relkind = rel->rd_rel->relkind; + rte->rellockmode = AccessShareLock; + ExecInitRangeTable(estate, list_make1(rte)); + + resultRelInfo = makeNode(ResultRelInfo); + InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0); + + estate->es_result_relations = resultRelInfo; + estate->es_num_result_relations = 1; + estate->es_result_relation_info = resultRelInfo; + + estate->es_output_cid = GetCurrentCommandId(true); + + /* Prepare to catch AFTER triggers. */ + AfterTriggerBeginQuery(); + + return estate; +} + + /* * CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr. * TODO: should we cache this similar to citus? @@ -555,6 +561,10 @@ CStoreTablesIndexRelationId(void) } +/* + * CStoreNamespaceId returns namespace id of the schema we store cstore + * related tables. + */ static Oid CStoreNamespaceId(void) {