mirror of https://github.com/citusdata/citus.git
Reuse the same state for multiple inserts
parent
35a52a6fe1
commit
0d4e249c97
|
@ -40,6 +40,12 @@
|
|||
|
||||
#include "cstore_metadata_serialization.h"
|
||||
|
||||
typedef struct
|
||||
{
|
||||
Relation rel;
|
||||
EState *estate;
|
||||
} ModifyState;
|
||||
|
||||
static Oid CStoreStripeAttrRelationId(void);
|
||||
static Oid CStoreStripeAttrIndexRelationId(void);
|
||||
static Oid CStoreStripesRelationId(void);
|
||||
|
@ -47,13 +53,13 @@ static Oid CStoreStripesIndexRelationId(void);
|
|||
static Oid CStoreTablesRelationId(void);
|
||||
static Oid CStoreTablesIndexRelationId(void);
|
||||
static Oid CStoreNamespaceId(void);
|
||||
static void InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr,
|
||||
uint64 existsSize, uint64 valuesSize,
|
||||
uint64 skiplistSize);
|
||||
static int TableBlockRowCount(Oid relid);
|
||||
static void DeleteTableMetadataRowIfExists(Oid relid);
|
||||
static void InsertTupleAndEnforceConstraints(Relation rel, HeapTuple heapTuple);
|
||||
static void DeleteTupleAndEnforceConstraints(Relation rel, HeapTuple heapTuple);
|
||||
static ModifyState * StartModifyRelation(Relation rel);
|
||||
static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values,
|
||||
bool *nulls);
|
||||
static void DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple);
|
||||
static void FinishModifyRelation(ModifyState *state);
|
||||
static EState * create_estate_for_relation(Relation rel);
|
||||
|
||||
/* constants for cstore_stripe_attr */
|
||||
|
@ -86,10 +92,9 @@ static EState * create_estate_for_relation(Relation rel);
|
|||
void
|
||||
InitCStoreTableMetadata(Oid relid, int blockRowCount)
|
||||
{
|
||||
Oid cstoreTableOid = InvalidOid;
|
||||
Relation cstoreTable = NULL;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
HeapTuple tuple = NULL;
|
||||
Oid cstoreTablesOid = InvalidOid;
|
||||
Relation cstoreTables = NULL;
|
||||
ModifyState *modifyState = NULL;
|
||||
|
||||
bool nulls[Natts_cstore_tables] = { 0 };
|
||||
Datum values[Natts_cstore_tables] = {
|
||||
|
@ -101,17 +106,16 @@ InitCStoreTableMetadata(Oid relid, int blockRowCount)
|
|||
|
||||
DeleteTableMetadataRowIfExists(relid);
|
||||
|
||||
cstoreTableOid = CStoreTablesRelationId();
|
||||
cstoreTable = heap_open(cstoreTableOid, RowExclusiveLock);
|
||||
tupleDescriptor = RelationGetDescr(cstoreTable);
|
||||
cstoreTablesOid = CStoreTablesRelationId();
|
||||
cstoreTables = heap_open(cstoreTablesOid, RowExclusiveLock);
|
||||
|
||||
tuple = heap_form_tuple(tupleDescriptor, values, nulls);
|
||||
|
||||
InsertTupleAndEnforceConstraints(cstoreTable, tuple);
|
||||
modifyState = StartModifyRelation(cstoreTables);
|
||||
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
||||
FinishModifyRelation(modifyState);
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
||||
heap_close(cstoreTable, NoLock);
|
||||
heap_close(cstoreTables, NoLock);
|
||||
}
|
||||
|
||||
|
||||
|
@ -132,11 +136,10 @@ InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe)
|
|||
|
||||
Oid cstoreStripesOid = CStoreStripesRelationId();
|
||||
Relation cstoreStripes = heap_open(cstoreStripesOid, RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(cstoreStripes);
|
||||
|
||||
HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls);
|
||||
|
||||
InsertTupleAndEnforceConstraints(cstoreStripes, tuple);
|
||||
ModifyState *modifyState = StartModifyRelation(cstoreStripes);
|
||||
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
||||
FinishModifyRelation(modifyState);
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
||||
|
@ -268,7 +271,9 @@ DeleteTableMetadataRowIfExists(Oid relid)
|
|||
heapTuple = systable_getnext(scanDescriptor);
|
||||
if (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
DeleteTupleAndEnforceConstraints(cstoreTables, heapTuple);
|
||||
ModifyState *modifyState = StartModifyRelation(cstoreTables);
|
||||
DeleteTupleAndEnforceConstraints(modifyState, heapTuple);
|
||||
FinishModifyRelation(modifyState);
|
||||
}
|
||||
|
||||
systable_endscan_ordered(scanDescriptor);
|
||||
|
@ -277,144 +282,33 @@ DeleteTableMetadataRowIfExists(Oid relid)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertTupleAndEnforceConstraints inserts a tuple into a relation and
|
||||
* makes sure constraints (e.g. FK constraints, NOT NULL, ...) are enforced.
|
||||
*/
|
||||
static void
|
||||
InsertTupleAndEnforceConstraints(Relation rel, HeapTuple heapTuple)
|
||||
{
|
||||
EState *estate = NULL;
|
||||
TupleTableSlot *slot = NULL;
|
||||
|
||||
estate = create_estate_for_relation(rel);
|
||||
slot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel), &TTSOpsHeapTuple);
|
||||
ExecStoreHeapTuple(heapTuple, slot, false);
|
||||
|
||||
ExecOpenIndices(estate->es_result_relation_info, false);
|
||||
|
||||
/* ExecSimpleRelationInsert executes any constraints */
|
||||
ExecSimpleRelationInsert(estate, slot);
|
||||
|
||||
ExecCloseIndices(estate->es_result_relation_info);
|
||||
|
||||
AfterTriggerEndQuery(estate);
|
||||
ExecCleanUpTriggerState(estate);
|
||||
ExecResetTupleTable(estate->es_tupleTable, false);
|
||||
FreeExecutorState(estate);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* DeleteTupleAndEnforceConstraints deletes a tuple from a relation and
|
||||
* makes sure constraints (e.g. FK constraints) are enforced.
|
||||
*/
|
||||
static void
|
||||
DeleteTupleAndEnforceConstraints(Relation rel, HeapTuple heapTuple)
|
||||
{
|
||||
EState *estate = create_estate_for_relation(rel);
|
||||
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
|
||||
|
||||
ItemPointer tid = &(heapTuple->t_self);
|
||||
simple_table_tuple_delete(rel, tid, estate->es_snapshot);
|
||||
|
||||
/* execute AFTER ROW DELETE Triggers to enforce constraints */
|
||||
ExecARDeleteTriggers(estate, resultRelInfo,
|
||||
tid, NULL, NULL);
|
||||
|
||||
AfterTriggerEndQuery(estate);
|
||||
ExecCleanUpTriggerState(estate);
|
||||
ExecResetTupleTable(estate->es_tupleTable, false);
|
||||
FreeExecutorState(estate);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Based on a similar function from
|
||||
* postgres/src/backend/replication/logical/worker.c.
|
||||
*
|
||||
* Executor state preparation for evaluation of constraint expressions,
|
||||
* indexes and triggers.
|
||||
*
|
||||
* This is based on similar code in copy.c
|
||||
*/
|
||||
static EState *
|
||||
create_estate_for_relation(Relation rel)
|
||||
{
|
||||
EState *estate;
|
||||
ResultRelInfo *resultRelInfo;
|
||||
RangeTblEntry *rte;
|
||||
|
||||
estate = CreateExecutorState();
|
||||
|
||||
rte = makeNode(RangeTblEntry);
|
||||
rte->rtekind = RTE_RELATION;
|
||||
rte->relid = RelationGetRelid(rel);
|
||||
rte->relkind = rel->rd_rel->relkind;
|
||||
rte->rellockmode = AccessShareLock;
|
||||
ExecInitRangeTable(estate, list_make1(rte));
|
||||
|
||||
resultRelInfo = makeNode(ResultRelInfo);
|
||||
InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0);
|
||||
|
||||
estate->es_result_relations = resultRelInfo;
|
||||
estate->es_num_result_relations = 1;
|
||||
estate->es_result_relation_info = resultRelInfo;
|
||||
|
||||
estate->es_output_cid = GetCurrentCommandId(true);
|
||||
|
||||
/* Prepare to catch AFTER triggers. */
|
||||
AfterTriggerBeginQuery();
|
||||
|
||||
return estate;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SaveStripeFooter stores give StripeFooter as cstore_stripe_attr records.
|
||||
*/
|
||||
void
|
||||
SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer)
|
||||
{
|
||||
for (AttrNumber attr = 1; attr <= footer->columnCount; attr++)
|
||||
{
|
||||
InsertStripeAttrRow(relid, stripe, attr,
|
||||
footer->existsSizeArray[attr - 1],
|
||||
footer->valueSizeArray[attr - 1],
|
||||
footer->skipListSizeArray[attr - 1]);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertStripeAttrRow adds a row to cstore_stripe_attr.
|
||||
*/
|
||||
static void
|
||||
InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr,
|
||||
uint64 existsSize, uint64 valuesSize,
|
||||
uint64 skiplistSize)
|
||||
{
|
||||
bool nulls[Natts_cstore_stripe_attr] = { 0 };
|
||||
Datum values[Natts_cstore_stripe_attr] = {
|
||||
ObjectIdGetDatum(relid),
|
||||
Int64GetDatum(stripe),
|
||||
Int16GetDatum(attr),
|
||||
Int64GetDatum(existsSize),
|
||||
Int64GetDatum(valuesSize),
|
||||
Int64GetDatum(skiplistSize)
|
||||
};
|
||||
|
||||
Oid cstoreStripeAttrOid = CStoreStripeAttrRelationId();
|
||||
Relation cstoreStripeAttrs = heap_open(cstoreStripeAttrOid, RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(cstoreStripeAttrs);
|
||||
|
||||
HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls);
|
||||
ModifyState *modifyState = StartModifyRelation(cstoreStripeAttrs);
|
||||
|
||||
InsertTupleAndEnforceConstraints(cstoreStripeAttrs, tuple);
|
||||
for (AttrNumber attr = 1; attr <= footer->columnCount; attr++)
|
||||
{
|
||||
bool nulls[Natts_cstore_stripe_attr] = { 0 };
|
||||
Datum values[Natts_cstore_stripe_attr] = {
|
||||
ObjectIdGetDatum(relid),
|
||||
Int64GetDatum(stripe),
|
||||
Int16GetDatum(attr),
|
||||
Int64GetDatum(footer->existsSizeArray[attr - 1]),
|
||||
Int64GetDatum(footer->valueSizeArray[attr - 1]),
|
||||
Int64GetDatum(footer->skipListSizeArray[attr - 1])
|
||||
};
|
||||
|
||||
CommandCounterIncrement();
|
||||
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
||||
}
|
||||
|
||||
FinishModifyRelation(modifyState);
|
||||
heap_close(cstoreStripeAttrs, NoLock);
|
||||
}
|
||||
|
||||
|
@ -489,6 +383,118 @@ ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* StartModifyRelation allocates resources for modifications.
|
||||
*/
|
||||
static ModifyState *
|
||||
StartModifyRelation(Relation rel)
|
||||
{
|
||||
ModifyState *modifyState = NULL;
|
||||
EState *estate = create_estate_for_relation(rel);
|
||||
|
||||
/* ExecSimpleRelationInsert, ... require caller to open indexes */
|
||||
ExecOpenIndices(estate->es_result_relation_info, false);
|
||||
|
||||
modifyState = palloc(sizeof(ModifyState));
|
||||
modifyState->rel = rel;
|
||||
modifyState->estate = estate;
|
||||
|
||||
return modifyState;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertTupleAndEnforceConstraints inserts a tuple into a relation and makes
|
||||
* sure constraints are enforced and indexes are updated.
|
||||
*/
|
||||
static void
|
||||
InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values, bool *nulls)
|
||||
{
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(state->rel);
|
||||
HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls);
|
||||
TupleTableSlot *slot = ExecInitExtraTupleSlot(state->estate, tupleDescriptor,
|
||||
&TTSOpsHeapTuple);
|
||||
ExecStoreHeapTuple(tuple, slot, false);
|
||||
|
||||
/* use ExecSimpleRelationInsert to enforce constraints */
|
||||
ExecSimpleRelationInsert(state->estate, slot);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DeleteTupleAndEnforceConstraints deletes a tuple from a relation and
|
||||
* makes sure constraints (e.g. FK constraints) are enforced.
|
||||
*/
|
||||
static void
|
||||
DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple)
|
||||
{
|
||||
EState *estate = state->estate;
|
||||
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
|
||||
|
||||
ItemPointer tid = &(heapTuple->t_self);
|
||||
simple_table_tuple_delete(state->rel, tid, estate->es_snapshot);
|
||||
|
||||
/* execute AFTER ROW DELETE Triggers to enforce constraints */
|
||||
ExecARDeleteTriggers(estate, resultRelInfo, tid, NULL, NULL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FinishModifyRelation cleans up resources after modifications are done.
|
||||
*/
|
||||
static void
|
||||
FinishModifyRelation(ModifyState *state)
|
||||
{
|
||||
ExecCloseIndices(state->estate->es_result_relation_info);
|
||||
|
||||
AfterTriggerEndQuery(state->estate);
|
||||
ExecCleanUpTriggerState(state->estate);
|
||||
ExecResetTupleTable(state->estate->es_tupleTable, false);
|
||||
FreeExecutorState(state->estate);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Based on a similar function from
|
||||
* postgres/src/backend/replication/logical/worker.c.
|
||||
*
|
||||
* Executor state preparation for evaluation of constraint expressions,
|
||||
* indexes and triggers.
|
||||
*
|
||||
* This is based on similar code in copy.c
|
||||
*/
|
||||
static EState *
|
||||
create_estate_for_relation(Relation rel)
|
||||
{
|
||||
EState *estate;
|
||||
ResultRelInfo *resultRelInfo;
|
||||
RangeTblEntry *rte;
|
||||
|
||||
estate = CreateExecutorState();
|
||||
|
||||
rte = makeNode(RangeTblEntry);
|
||||
rte->rtekind = RTE_RELATION;
|
||||
rte->relid = RelationGetRelid(rel);
|
||||
rte->relkind = rel->rd_rel->relkind;
|
||||
rte->rellockmode = AccessShareLock;
|
||||
ExecInitRangeTable(estate, list_make1(rte));
|
||||
|
||||
resultRelInfo = makeNode(ResultRelInfo);
|
||||
InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0);
|
||||
|
||||
estate->es_result_relations = resultRelInfo;
|
||||
estate->es_num_result_relations = 1;
|
||||
estate->es_result_relation_info = resultRelInfo;
|
||||
|
||||
estate->es_output_cid = GetCurrentCommandId(true);
|
||||
|
||||
/* Prepare to catch AFTER triggers. */
|
||||
AfterTriggerBeginQuery();
|
||||
|
||||
return estate;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr.
|
||||
* TODO: should we cache this similar to citus?
|
||||
|
@ -555,6 +561,10 @@ CStoreTablesIndexRelationId(void)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CStoreNamespaceId returns namespace id of the schema we store cstore
|
||||
* related tables.
|
||||
*/
|
||||
static Oid
|
||||
CStoreNamespaceId(void)
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue