mirror of https://github.com/citusdata/citus.git
copy kinda works
parent
d3c7d30a1e
commit
3020e4d96d
|
@ -123,6 +123,7 @@ static ProcessUtility_hook_type PrevProcessUtilityHook = NULL;
|
||||||
static ExecutorStart_hook_type PrevExecutorStartHook = NULL;
|
static ExecutorStart_hook_type PrevExecutorStartHook = NULL;
|
||||||
static ExecutorFinish_hook_type PrevExecutorFinishHook = NULL;
|
static ExecutorFinish_hook_type PrevExecutorFinishHook = NULL;
|
||||||
|
|
||||||
|
static HTAB *ColumnarCopier = NULL;
|
||||||
static ColumnarExecLevel *ColumnarExecLevelStack = NULL;
|
static ColumnarExecLevel *ColumnarExecLevelStack = NULL;
|
||||||
static MemoryContext ColumnarWriterContext = NULL;
|
static MemoryContext ColumnarWriterContext = NULL;
|
||||||
|
|
||||||
|
@ -577,9 +578,18 @@ GetWriteState(Relation relation, TupleDesc tupdesc, bool iscopy)
|
||||||
ALLOCSET_DEFAULT_SIZES);
|
ALLOCSET_DEFAULT_SIZES);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
HTAB **hashtablep;
|
||||||
|
if (iscopy)
|
||||||
|
{
|
||||||
|
hashtablep = &ColumnarCopier;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
ColumnarExecLevel *level = ColumnarExecLevelStack;
|
ColumnarExecLevel *level = ColumnarExecLevelStack;
|
||||||
|
hashtablep = &level->hashtable;
|
||||||
|
}
|
||||||
|
|
||||||
if (level->hashtable == NULL)
|
if (*hashtablep == NULL)
|
||||||
{
|
{
|
||||||
HASHCTL info;
|
HASHCTL info;
|
||||||
uint32 hashFlags = (HASH_ELEM | HASH_CONTEXT);
|
uint32 hashFlags = (HASH_ELEM | HASH_CONTEXT);
|
||||||
|
@ -588,22 +598,23 @@ GetWriteState(Relation relation, TupleDesc tupdesc, bool iscopy)
|
||||||
info.entrysize = sizeof(ColumnarWriterEntry);
|
info.entrysize = sizeof(ColumnarWriterEntry);
|
||||||
info.hcxt = ColumnarWriterContext;
|
info.hcxt = ColumnarWriterContext;
|
||||||
|
|
||||||
level->hashtable = hash_create("columnar writers",
|
*hashtablep = hash_create("columnar writers", 64, &info, hashFlags);
|
||||||
64, &info, hashFlags);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool found;
|
bool found;
|
||||||
ColumnarWriterEntry *entry = hash_search(
|
ColumnarWriterEntry *entry = hash_search(
|
||||||
level->hashtable, &relation->rd_node, HASH_ENTER, &found);
|
*hashtablep, &relation->rd_node, HASH_ENTER, &found);
|
||||||
|
|
||||||
if (!found)
|
if (!found)
|
||||||
{
|
{
|
||||||
ColumnarOptions columnarOptions = { 0 };
|
ColumnarOptions columnarOptions = { 0 };
|
||||||
ReadColumnarOptions(relation->rd_id, &columnarOptions);
|
ReadColumnarOptions(relation->rd_id, &columnarOptions);
|
||||||
|
|
||||||
|
MemoryContext oldContext = MemoryContextSwitchTo(ColumnarWriterContext);
|
||||||
entry->writeState = ColumnarBeginWrite(relation->rd_node,
|
entry->writeState = ColumnarBeginWrite(relation->rd_node,
|
||||||
columnarOptions,
|
columnarOptions,
|
||||||
tupdesc);
|
tupdesc);
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
return entry->writeState;
|
return entry->writeState;
|
||||||
|
@ -1772,7 +1783,8 @@ ColumnarExecutorStart(QueryDesc *queryDesc, int eflags)
|
||||||
{
|
{
|
||||||
PrevExecutorStartHook(queryDesc, eflags);
|
PrevExecutorStartHook(queryDesc, eflags);
|
||||||
|
|
||||||
ColumnarExecLevel *level = palloc0(sizeof(ColumnarExecLevel));
|
ColumnarExecLevel *level = MemoryContextAllocZero(TopMemoryContext,
|
||||||
|
sizeof(ColumnarExecLevel));
|
||||||
/* initialize hashtable lazily */
|
/* initialize hashtable lazily */
|
||||||
level->next = ColumnarExecLevelStack;
|
level->next = ColumnarExecLevelStack;
|
||||||
ColumnarExecLevelStack = level;
|
ColumnarExecLevelStack = level;
|
||||||
|
@ -1785,6 +1797,11 @@ ColumnarFlushWriters(HTAB *hashtable)
|
||||||
HASH_SEQ_STATUS status;
|
HASH_SEQ_STATUS status;
|
||||||
ColumnarWriterEntry *entry;
|
ColumnarWriterEntry *entry;
|
||||||
|
|
||||||
|
if (hashtable == NULL)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
hash_seq_init(&status, hashtable);
|
hash_seq_init(&status, hashtable);
|
||||||
while ((entry = hash_seq_search(&status)) != 0)
|
while ((entry = hash_seq_search(&status)) != 0)
|
||||||
{
|
{
|
||||||
|
@ -1800,11 +1817,10 @@ ColumnarExecutorFinish(QueryDesc *queryDesc)
|
||||||
ColumnarExecLevel *level = ColumnarExecLevelStack;
|
ColumnarExecLevel *level = ColumnarExecLevelStack;
|
||||||
|
|
||||||
Assert(level != NULL);
|
Assert(level != NULL);
|
||||||
if (level->hashtable != NULL)
|
|
||||||
{
|
|
||||||
ColumnarFlushWriters(level->hashtable);
|
ColumnarFlushWriters(level->hashtable);
|
||||||
}
|
|
||||||
ColumnarExecLevelStack = level->next;
|
ColumnarExecLevelStack = level->next;
|
||||||
|
pfree(level);
|
||||||
|
|
||||||
PrevExecutorFinishHook(queryDesc);
|
PrevExecutorFinishHook(queryDesc);
|
||||||
}
|
}
|
||||||
|
@ -2025,6 +2041,9 @@ ColumnarProcessUtility(PlannedStmt *pstmt,
|
||||||
|
|
||||||
PrevProcessUtilityHook(pstmt, queryString, context,
|
PrevProcessUtilityHook(pstmt, queryString, context,
|
||||||
params, queryEnv, dest, completionTag);
|
params, queryEnv, dest, completionTag);
|
||||||
|
|
||||||
|
ColumnarFlushWriters(ColumnarCopier);
|
||||||
|
ColumnarCopier = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue