mirror of https://github.com/citusdata/citus.git
unify utility and exec
parent
79236ab81f
commit
dab706280b
|
@ -124,7 +124,6 @@ static ProcessUtility_hook_type PrevProcessUtilityHook = NULL;
|
|||
static ExecutorStart_hook_type PrevExecutorStartHook = NULL;
|
||||
static ExecutorFinish_hook_type PrevExecutorFinishHook = NULL;
|
||||
|
||||
static HTAB *ColumnarCopier = NULL;
|
||||
static int ColumnarExecDepth = 0;
|
||||
static ColumnarExecLevel *ColumnarExecLevelStack = NULL;
|
||||
static MemoryContext ColumnarWriterContext = NULL;
|
||||
|
@ -172,7 +171,8 @@ static void ColumnarReadMissingRowsIntoIndex(TableScanDesc scan, Relation indexR
|
|||
ValidateIndexState *state);
|
||||
static ItemPointerData TupleSortSkipSmallerItemPointers(Tuplesortstate *tupleSort,
|
||||
ItemPointer targetItemPointer);
|
||||
|
||||
static void ColumnarExecLevelPush(void);
|
||||
static void ColumnarExecLevelPop(void);
|
||||
|
||||
/* Custom tuple slot ops used for columnar. Initialized in columnar_tableam_init(). */
|
||||
static TupleTableSlotOps TTSOpsColumnar;
|
||||
|
@ -600,22 +600,13 @@ GetCurrentExecLevel()
|
|||
}
|
||||
|
||||
static HTAB *
|
||||
GetWriterHashTable(bool iscopy)
|
||||
GetWriterHashTable()
|
||||
{
|
||||
AllocateWriterContext();
|
||||
|
||||
HTAB **hashtablep;
|
||||
if (iscopy)
|
||||
{
|
||||
hashtablep = &ColumnarCopier;
|
||||
}
|
||||
else
|
||||
{
|
||||
ColumnarExecLevel *level = GetCurrentExecLevel();
|
||||
hashtablep = &level->hashtable;
|
||||
}
|
||||
|
||||
if (*hashtablep == NULL)
|
||||
if (level->hashtable == NULL)
|
||||
{
|
||||
HASHCTL info;
|
||||
uint32 hashFlags = (HASH_ELEM | HASH_CONTEXT);
|
||||
|
@ -624,16 +615,16 @@ GetWriterHashTable(bool iscopy)
|
|||
info.entrysize = sizeof(ColumnarWriterEntry);
|
||||
info.hcxt = ColumnarWriterContext;
|
||||
|
||||
*hashtablep = hash_create("columnar writers", 64, &info, hashFlags);
|
||||
level->hashtable = hash_create("columnar writers", 64, &info, hashFlags);
|
||||
}
|
||||
|
||||
return *hashtablep;
|
||||
return level->hashtable;
|
||||
}
|
||||
|
||||
static ColumnarWriteState *
|
||||
GetWriteState(Relation relation, TupleDesc tupdesc, bool iscopy)
|
||||
GetWriteState(Relation relation, TupleDesc tupdesc)
|
||||
{
|
||||
HTAB *hashtable = GetWriterHashTable(iscopy);
|
||||
HTAB *hashtable = GetWriterHashTable();
|
||||
|
||||
bool found;
|
||||
ColumnarWriterEntry *entry = hash_search(
|
||||
|
@ -665,7 +656,7 @@ columnar_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
|
|||
* columnar_init_write_state allocates the write state in a longer
|
||||
* lasting context, so no need to worry about it.
|
||||
*/
|
||||
ColumnarWriteState *writeState = GetWriteState(relation, RelationGetDescr(relation), false);
|
||||
ColumnarWriteState *writeState = GetWriteState(relation, RelationGetDescr(relation));
|
||||
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(ColumnarWritePerTupleContext(
|
||||
writeState));
|
||||
|
@ -707,7 +698,7 @@ columnar_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
|
|||
CommandId cid, int options, BulkInsertState bistate)
|
||||
{
|
||||
ColumnarWriteState *writeState = GetWriteState(relation,
|
||||
RelationGetDescr(relation), true);
|
||||
RelationGetDescr(relation));
|
||||
|
||||
ColumnarCheckLogicalReplication(relation);
|
||||
|
||||
|
@ -1817,9 +1808,8 @@ ColumnarSubXactCallback(SubXactEvent event, SubTransactionId mySubid,
|
|||
static void
|
||||
ColumnarExecutorStart(QueryDesc *queryDesc, int eflags)
|
||||
{
|
||||
ColumnarExecLevelPush();
|
||||
PrevExecutorStartHook(queryDesc, eflags);
|
||||
|
||||
ColumnarExecDepth++;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1844,7 +1834,14 @@ ColumnarFlushWriters(HTAB *hashtable)
|
|||
|
||||
|
||||
static void
|
||||
ColumnarExecutorFinish(QueryDesc *queryDesc)
|
||||
ColumnarExecLevelPush()
|
||||
{
|
||||
ColumnarExecDepth++;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ColumnarExecLevelPop()
|
||||
{
|
||||
Assert(ColumnarExecDepth > 0);
|
||||
|
||||
|
@ -1858,8 +1855,13 @@ ColumnarExecutorFinish(QueryDesc *queryDesc)
|
|||
}
|
||||
|
||||
ColumnarExecDepth--;
|
||||
}
|
||||
|
||||
static void
|
||||
ColumnarExecutorFinish(QueryDesc *queryDesc)
|
||||
{
|
||||
PrevExecutorFinishHook(queryDesc);
|
||||
ColumnarExecLevelPop();
|
||||
}
|
||||
|
||||
|
||||
|
@ -2054,6 +2056,8 @@ ColumnarProcessUtility(PlannedStmt *pstmt,
|
|||
DestReceiver *dest,
|
||||
QueryCompletionCompat *completionTag)
|
||||
{
|
||||
ColumnarExecLevelPush();
|
||||
|
||||
Node *parsetree = pstmt->utilityStmt;
|
||||
|
||||
if (IsA(parsetree, IndexStmt))
|
||||
|
@ -2079,8 +2083,7 @@ ColumnarProcessUtility(PlannedStmt *pstmt,
|
|||
PrevProcessUtilityHook(pstmt, queryString, context,
|
||||
params, queryEnv, dest, completionTag);
|
||||
|
||||
ColumnarFlushWriters(ColumnarCopier);
|
||||
ColumnarCopier = NULL;
|
||||
ColumnarExecLevelPop();
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue