Fixing executor and misc

pull/6029/head
Nitish Upreti 2022-06-19 17:07:26 -07:00
parent c38de446bb
commit 4a05f1f1e8
10 changed files with 105 additions and 67 deletions

View File

@ -473,7 +473,7 @@ CreateSplitShardsForShardGroup(WorkerNode* sourceShardNode,
/* Perform Split Copy */
DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList);
// TODO(niupre) : Use Adaptive execution for creating multiple indexes parallely.
// TODO(niupre) : Can we use Adaptive execution for creating multiple indexes parallely?
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{
ShardInterval *shardInterval = NULL;

View File

@ -56,6 +56,9 @@ typedef struct ShardCopyDestReceiver
/* local copy if destination shard in same node */
bool useLocalCopy;
/* EState for per-tuple memory allocation */
EState *executorState;
/*
* Connection for destination shard (NULL if useLocalCopy is true)
*/
@ -70,7 +73,7 @@ static void ShardCopyDestReceiverShutdown(DestReceiver *destReceiver);
static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver);
static bool CanUseLocalCopy(uint64 destinationNodeId);
static StringInfo ConstructCopyStatement(char* destinationShardFullyQualifiedName, bool useBinaryFormat);
static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState);
static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest);
static bool ShouldSendCopyNow(StringInfo buffer);
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState);
@ -96,9 +99,14 @@ static bool
ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{
ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest;
CopyOutState copyOutState = copyDest->copyOutState;
TupleDesc tupleDescriptor = copyDest->tupleDescriptor;
FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions;
/*
* Switch to a per-tuple memory memory context. When used in
* context of Split Copy, this is a no-op as switch is already done.
*/
EState *executorState = copyDest->executorState;
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
/* Create connection lazily */
if(copyDest->tuplesSent == 0 && (!copyDest->useLocalCopy))
@ -110,11 +118,11 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
workerNode->workerName,
workerNode->workerPort,
currentUser,
NULL);
NULL /* database (current) */);
ClaimConnectionExclusively(copyDest->connection);
StringInfo copyStatement = ConstructCopyStatement(copyDest->destinationShardFullyQualifiedName,
copyDest->destinationNodeId);
copyDest->copyOutState->binary);
ExecuteCriticalRemoteCommand(copyDest->connection, copyStatement->data);
}
@ -122,9 +130,10 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
Datum *columnValues = slot->tts_values;
bool *columnNulls = slot->tts_isnull;
CopyOutState copyOutState = copyDest->copyOutState;
if(copyDest->useLocalCopy)
{
WriteLocalTuple(slot, copyDest, copyOutState);
WriteLocalTuple(slot, copyDest);
if (ShouldSendCopyNow(copyOutState->fe_msgbuf))
{
LocalCopyToShard(copyDest, copyOutState);
@ -132,19 +141,25 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
}
else
{
FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions;
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor,
copyOutState, columnOutputFunctions, NULL /* columnCoercionPaths */);
if (!PutRemoteCopyData(copyDest->connection, copyOutState->fe_msgbuf->data, copyOutState->fe_msgbuf->len))
{
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to COPY to shard %s,",
copyDest->destinationShardFullyQualifiedName),
errdetail("failed to send %d bytes %s", copyOutState->fe_msgbuf->len,
copyOutState->fe_msgbuf->data)));
errdetail("failed to send %d bytes %s on node %u", copyOutState->fe_msgbuf->len,
copyOutState->fe_msgbuf->data,
copyDest->destinationNodeId)));
}
}
MemoryContextSwitchTo(oldContext);
ResetPerTupleExprContext(executorState);
copyDest->tuplesSent++;
return true;
}
@ -163,16 +178,13 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputT
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
copyOutState->binary = CanUseBinaryCopyFormat(inputTupleDescriptor);
copyOutState->null_print = (char *) nullPrintCharacter;
copyOutState->null_print_client = (char *) nullPrintCharacter;
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->delim = (char *) delimiterCharacter;
// not used for shard copy
copyOutState->null_print_client = (char *) nullPrintCharacter;
copyOutState->rowcontext = NULL;
copyDest->copyOutState = copyOutState;
// TODO(niupre): Explain why this is needed.
copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState);
copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
copyOutState->binary);
copyDest->copyOutState = copyOutState;
}
static void
@ -217,6 +229,31 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
}
}
DestReceiver * CreateShardCopyDestReceiver(
EState *executorState,
char* destinationShardFullyQualifiedName,
uint32_t destinationNodeId)
{
ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) palloc0(
sizeof(ShardCopyDestReceiver));
/* set up the DestReceiver function pointers */
copyDest->pub.receiveSlot = ShardCopyDestReceiverReceive;
copyDest->pub.rStartup = ShardCopyDestReceiverStartup;
copyDest->pub.rShutdown = ShardCopyDestReceiverShutdown;
copyDest->pub.rDestroy = ShardCopyDestReceiverDestroy;
copyDest->pub.mydest = DestCopyOut;
copyDest->executorState = executorState;
copyDest->destinationNodeId = destinationNodeId;
copyDest->destinationShardFullyQualifiedName = destinationShardFullyQualifiedName;
copyDest->tuplesSent = 0;
copyDest->connection = NULL;
copyDest->useLocalCopy = CanUseLocalCopy(destinationNodeId);
return (DestReceiver *) copyDest;
}
static void
ShardCopyDestReceiverDestroy(DestReceiver *dest)
{
@ -248,37 +285,20 @@ ConstructCopyStatement(char *destinationShardFullyQualifiedName, bool useBinaryF
if(useBinaryFormat)
{
appendStringInfo(command, "WITH (format binary)");
appendStringInfo(command, "WITH (format binary);");
}
else
{
appendStringInfo(command, ";");
}
return command;
}
DestReceiver * CreateShardCopyDestReceiver(
char* destinationShardFullyQualifiedName,
uint32_t destinationNodeId)
static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest)
{
ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) palloc0(
sizeof(ShardCopyDestReceiver));
CopyOutState localCopyOutState = copyDest->copyOutState;
/* set up the DestReceiver function pointers */
copyDest->pub.receiveSlot = ShardCopyDestReceiverReceive;
copyDest->pub.rStartup = ShardCopyDestReceiverStartup;
copyDest->pub.rShutdown = ShardCopyDestReceiverShutdown;
copyDest->pub.rDestroy = ShardCopyDestReceiverDestroy;
copyDest->pub.mydest = DestCopyOut;
copyDest->destinationNodeId = destinationNodeId;
copyDest->destinationShardFullyQualifiedName = destinationShardFullyQualifiedName;
copyDest->tuplesSent = 0;
copyDest->connection = NULL;
copyDest->useLocalCopy = CanUseLocalCopy(destinationNodeId);
return (DestReceiver *) copyDest;
}
static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState)
{
/*
* Since we are doing a local copy, the following statements should
* use local execution to see the changes

View File

@ -36,8 +36,8 @@ typedef struct SplitCopyDestReceiver
/* Split factor */
uint splitFactor;
/* Source shard name */
char *sourceShardName;
/* EState for per-tuple memory allocation */
EState *executorState;
/* Source shard Oid */
Oid sourceShardRelationOid;
@ -51,7 +51,7 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot,
static void SplitCopyDestReceiverShutdown(DestReceiver *dest);
static void SplitCopyDestReceiverDestroy(DestReceiver *copyDest);
DestReceiver * CreateSplitCopyDestReceiver(uint64 sourceShardIdToCopy, List* splitCopyInfoList)
DestReceiver * CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy, List* splitCopyInfoList)
{
SplitCopyDestReceiver *splitCopyDest =
palloc0(sizeof(SplitCopyDestReceiver));
@ -62,6 +62,7 @@ DestReceiver * CreateSplitCopyDestReceiver(uint64 sourceShardIdToCopy, List* spl
splitCopyDest->pub.rShutdown = SplitCopyDestReceiverShutdown;
splitCopyDest->pub.rDestroy = SplitCopyDestReceiverDestroy;
splitCopyDest->executorState = executorState;
splitCopyDest->splitFactor = splitCopyInfoList->length;
ShardInterval *shardIntervalToSplitCopy = LoadShardInterval(sourceShardIdToCopy);
splitCopyDest->sourceShardRelationOid = shardIntervalToSplitCopy->relationId;
@ -83,6 +84,7 @@ DestReceiver * CreateSplitCopyDestReceiver(uint64 sourceShardIdToCopy, List* spl
quote_qualified_identifier(destinationShardSchemaName, destinationShardNameCopy);
DestReceiver *shardCopyDest = CreateShardCopyDestReceiver(
executorState,
destinationShardFullyQualifiedName,
splitCopyInfo->destinationShardNodeId);
@ -99,24 +101,29 @@ DestReceiver * CreateSplitCopyDestReceiver(uint64 sourceShardIdToCopy, List* spl
static void SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor)
{
SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest;
SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest;
for (int index = 0; index < self->splitFactor; index++)
for (int index = 0; index < copyDest->splitFactor; index++)
{
DestReceiver *shardCopyDest = self->shardCopyDestReceiverArray[index];
DestReceiver *shardCopyDest = copyDest->shardCopyDestReceiverArray[index];
shardCopyDest->rStartup(shardCopyDest, operation, inputTupleDescriptor);
}
}
static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{
SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest;
SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest;
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(self->sourceShardRelationOid);
/* Switch to a per-tuple memory memory context */
EState *executorState = copyDest->executorState;
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(copyDest->sourceShardRelationOid);
if (cacheEntry == NULL)
{
ereport(ERROR, errmsg("Could not find shard %s for split copy.",
self->sourceShardName));
get_rel_name(copyDest->sourceShardRelationOid)));
}
/* Partition Column Metadata on source shard */
@ -131,48 +138,51 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *des
if (columnNulls[partitionColumnIndex])
{
ereport(ERROR, errmsg("Found null partition value for shard %s during split copy.",
self->sourceShardName));
get_rel_name(copyDest->sourceShardRelationOid)));
}
Datum hashedValueDatum = FunctionCall1(hashFunction, columnValues[partitionColumnIndex]);
int32_t hashedValue = DatumGetInt32(hashedValueDatum);
for(int index = 0 ; index < self->splitFactor; index++)
for(int index = 0 ; index < copyDest->splitFactor; index++)
{
SplitCopyInfo *splitCopyInfo = self->splitCopyInfoArray[index];
SplitCopyInfo *splitCopyInfo = copyDest->splitCopyInfoArray[index];
if (splitCopyInfo->destinationShardMinHashValue <= hashedValue &&
splitCopyInfo->destinationShardMaxHashValue >= hashedValue)
{
DestReceiver *shardCopyDestReceiver = self->shardCopyDestReceiverArray[index];
DestReceiver *shardCopyDestReceiver = copyDest->shardCopyDestReceiverArray[index];
shardCopyDestReceiver->receiveSlot(slot, shardCopyDestReceiver);
}
}
MemoryContextSwitchTo(oldContext);
ResetPerTupleExprContext(executorState);
return true;
}
static void SplitCopyDestReceiverShutdown(DestReceiver *dest)
{
SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest;
SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest;
for (int index = 0; index < self->splitFactor; index++)
for (int index = 0; index < copyDest->splitFactor; index++)
{
DestReceiver *shardCopyDest = self->shardCopyDestReceiverArray[index];
DestReceiver *shardCopyDest = copyDest->shardCopyDestReceiverArray[index];
shardCopyDest->rShutdown(shardCopyDest);
}
}
static void SplitCopyDestReceiverDestroy(DestReceiver *dest)
{
SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest;
SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest;
for (int index = 0; index < self->splitFactor; index++)
for (int index = 0; index < copyDest->splitFactor; index++)
{
DestReceiver *shardCopyDest = self->shardCopyDestReceiverArray[index];
DestReceiver *shardCopyDest = copyDest->shardCopyDestReceiverArray[index];
shardCopyDest->rDestroy(shardCopyDest);
pfree(shardCopyDest);
pfree(self->splitCopyInfoArray[index]);
pfree(copyDest->splitCopyInfoArray[index]);
}
}

View File

@ -15,7 +15,6 @@
#include "distributed/worker_split_copy.h"
#include "distributed/citus_ruleutils.h"
PG_FUNCTION_INFO_V1(worker_split_shardgroup_copy);
PG_FUNCTION_INFO_V1(worker_split_copy);
static void
@ -48,7 +47,9 @@ worker_split_copy(PG_FUNCTION_ARGS)
splitCopyInfoList = lappend(splitCopyInfoList, splitCopyInfo);
}
DestReceiver *splitCopyDestReceiver = CreateSplitCopyDestReceiver(shardIdToSplitCopy, splitCopyInfoList);
EState *executor = CreateExecutorState();
DestReceiver *splitCopyDestReceiver = CreateSplitCopyDestReceiver(executor, shardIdToSplitCopy, splitCopyInfoList);
StringInfo selectShardQueryForCopy = makeStringInfo();
appendStringInfo(selectShardQueryForCopy,
@ -58,6 +59,8 @@ worker_split_copy(PG_FUNCTION_ARGS)
ParamListInfo params = NULL;
ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params, (DestReceiver *) splitCopyDestReceiver);
FreeExecutorState(executor);
PG_RETURN_VOID();
}

View File

@ -9,3 +9,4 @@ DROP FUNCTION pg_catalog.worker_repartition_cleanup(bigint);
#include "../../columnar/sql/columnar--11.0-2--11.1-1.sql"
#include "udfs/citus_split_shard_by_split_points/11.0-2.sql"
#include "udfs/worker_split_copy/11.0-2.sql"

View File

@ -1,3 +1,5 @@
DROP TYPE IF EXISTS citus.split_mode;
-- Three modes to be implemented: blocking, non_blocking and auto.
-- Currently, the default / only supported mode is blocking.
CREATE TYPE citus.split_mode AS ENUM (

View File

@ -1,3 +1,5 @@
DROP TYPE IF EXISTS citus.split_mode;
-- Three modes to be implemented: blocking, non_blocking and auto.
-- Currently, the default / only supported mode is blocking.
CREATE TYPE citus.split_mode AS ENUM (

View File

@ -97,7 +97,6 @@ typedef struct ListCellAndListWrapper
var2 ## CellDoNotUse = lnext_compat(l2, var2 ## CellDoNotUse) \
)
/*
* forboth_ptr_oid -
* a convenience macro which loops through two lists at the same time. The

View File

@ -15,6 +15,7 @@
struct FullRelationName;
extern DestReceiver * CreateShardCopyDestReceiver(
EState *executorState,
char* destinationShardFullyQualifiedName,
uint32_t destinationNodeId);

View File

@ -20,6 +20,6 @@ typedef struct SplitCopyInfo
uint32_t destinationShardNodeId; /* node where split child shard is to be placed */
} SplitCopyInfo;
extern DestReceiver* CreateSplitCopyDestReceiver(uint64 sourceShardIdToCopy, List* splitCopyInfoList);
extern DestReceiver* CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy, List* splitCopyInfoList);
#endif /* WORKER_SPLIT_COPY_H_ */