From 4a05f1f1e87ade82a9ba179a1288e15cc14fe74f Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Sun, 19 Jun 2022 17:07:26 -0700 Subject: [PATCH] Fixing executor and misc --- .../distributed/operations/shard_split.c | 2 +- .../operations/worker_shard_copy.c | 100 +++++++++++------- .../operations/worker_split_copy.c | 50 +++++---- .../operations/worker_split_copy_udf.c | 7 +- .../distributed/sql/citus--11.0-2--11.1-1.sql | 1 + .../11.0-2.sql | 2 + .../latest.sql | 2 + src/include/distributed/listutils.h | 1 - src/include/distributed/worker_shard_copy.h | 1 + src/include/distributed/worker_split_copy.h | 6 +- 10 files changed, 105 insertions(+), 67 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 79e577aa3..b8cbc56fb 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -473,7 +473,7 @@ CreateSplitShardsForShardGroup(WorkerNode* sourceShardNode, /* Perform Split Copy */ DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList); - // TODO(niupre) : Use Adaptive execution for creating multiple indexes parallely. + // TODO(niupre) : Can we use Adaptive execution for creating multiple indexes parallely? foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { ShardInterval *shardInterval = NULL; diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index dcc14a321..2f74b2417 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -56,6 +56,9 @@ typedef struct ShardCopyDestReceiver /* local copy if destination shard in same node */ bool useLocalCopy; + /* EState for per-tuple memory allocation */ + EState *executorState; + /* * Connection for destination shard (NULL if useLocalCopy is true) */ @@ -70,7 +73,7 @@ static void ShardCopyDestReceiverShutdown(DestReceiver *destReceiver); static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver); static bool CanUseLocalCopy(uint64 destinationNodeId); static StringInfo ConstructCopyStatement(char* destinationShardFullyQualifiedName, bool useBinaryFormat); -static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); +static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest); static bool ShouldSendCopyNow(StringInfo buffer); static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); @@ -96,9 +99,14 @@ static bool ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest; - CopyOutState copyOutState = copyDest->copyOutState; - TupleDesc tupleDescriptor = copyDest->tupleDescriptor; - FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; + + /* + * Switch to a per-tuple memory memory context. When used in + * context of Split Copy, this is a no-op as switch is already done. + */ + EState *executorState = copyDest->executorState; + MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); + MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); /* Create connection lazily */ if(copyDest->tuplesSent == 0 && (!copyDest->useLocalCopy)) @@ -110,11 +118,11 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) workerNode->workerName, workerNode->workerPort, currentUser, - NULL); + NULL /* database (current) */); ClaimConnectionExclusively(copyDest->connection); StringInfo copyStatement = ConstructCopyStatement(copyDest->destinationShardFullyQualifiedName, - copyDest->destinationNodeId); + copyDest->copyOutState->binary); ExecuteCriticalRemoteCommand(copyDest->connection, copyStatement->data); } @@ -122,9 +130,10 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) Datum *columnValues = slot->tts_values; bool *columnNulls = slot->tts_isnull; + CopyOutState copyOutState = copyDest->copyOutState; if(copyDest->useLocalCopy) { - WriteLocalTuple(slot, copyDest, copyOutState); + WriteLocalTuple(slot, copyDest); if (ShouldSendCopyNow(copyOutState->fe_msgbuf)) { LocalCopyToShard(copyDest, copyOutState); @@ -132,19 +141,25 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) } else { + FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; + resetStringInfo(copyOutState->fe_msgbuf); - AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, + AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor, copyOutState, columnOutputFunctions, NULL /* columnCoercionPaths */); if (!PutRemoteCopyData(copyDest->connection, copyOutState->fe_msgbuf->data, copyOutState->fe_msgbuf->len)) { ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg("Failed to COPY to shard %s,", copyDest->destinationShardFullyQualifiedName), - errdetail("failed to send %d bytes %s", copyOutState->fe_msgbuf->len, - copyOutState->fe_msgbuf->data))); + errdetail("failed to send %d bytes %s on node %u", copyOutState->fe_msgbuf->len, + copyOutState->fe_msgbuf->data, + copyDest->destinationNodeId))); } } + MemoryContextSwitchTo(oldContext); + ResetPerTupleExprContext(executorState); + copyDest->tuplesSent++; return true; } @@ -163,16 +178,13 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputT CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); copyOutState->binary = CanUseBinaryCopyFormat(inputTupleDescriptor); copyOutState->null_print = (char *) nullPrintCharacter; + copyOutState->null_print_client = (char *) nullPrintCharacter; copyOutState->fe_msgbuf = makeStringInfo(); copyOutState->delim = (char *) delimiterCharacter; - // not used for shard copy - copyOutState->null_print_client = (char *) nullPrintCharacter; - copyOutState->rowcontext = NULL; - copyDest->copyOutState = copyOutState; - - // TODO(niupre): Explain why this is needed. + copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState); copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); + copyDest->copyOutState = copyOutState; } static void @@ -217,6 +229,31 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) } } +DestReceiver * CreateShardCopyDestReceiver( + EState *executorState, + char* destinationShardFullyQualifiedName, + uint32_t destinationNodeId) +{ + ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) palloc0( + sizeof(ShardCopyDestReceiver)); + + /* set up the DestReceiver function pointers */ + copyDest->pub.receiveSlot = ShardCopyDestReceiverReceive; + copyDest->pub.rStartup = ShardCopyDestReceiverStartup; + copyDest->pub.rShutdown = ShardCopyDestReceiverShutdown; + copyDest->pub.rDestroy = ShardCopyDestReceiverDestroy; + copyDest->pub.mydest = DestCopyOut; + copyDest->executorState = executorState; + + copyDest->destinationNodeId = destinationNodeId; + copyDest->destinationShardFullyQualifiedName = destinationShardFullyQualifiedName; + copyDest->tuplesSent = 0; + copyDest->connection = NULL; + copyDest->useLocalCopy = CanUseLocalCopy(destinationNodeId); + + return (DestReceiver *) copyDest; +} + static void ShardCopyDestReceiverDestroy(DestReceiver *dest) { @@ -248,37 +285,20 @@ ConstructCopyStatement(char *destinationShardFullyQualifiedName, bool useBinaryF if(useBinaryFormat) { - appendStringInfo(command, "WITH (format binary)"); + appendStringInfo(command, "WITH (format binary);"); + } + else + { + appendStringInfo(command, ";"); } return command; } -DestReceiver * CreateShardCopyDestReceiver( - char* destinationShardFullyQualifiedName, - uint32_t destinationNodeId) +static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest) { - ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) palloc0( - sizeof(ShardCopyDestReceiver)); + CopyOutState localCopyOutState = copyDest->copyOutState; - /* set up the DestReceiver function pointers */ - copyDest->pub.receiveSlot = ShardCopyDestReceiverReceive; - copyDest->pub.rStartup = ShardCopyDestReceiverStartup; - copyDest->pub.rShutdown = ShardCopyDestReceiverShutdown; - copyDest->pub.rDestroy = ShardCopyDestReceiverDestroy; - copyDest->pub.mydest = DestCopyOut; - - copyDest->destinationNodeId = destinationNodeId; - copyDest->destinationShardFullyQualifiedName = destinationShardFullyQualifiedName; - copyDest->tuplesSent = 0; - copyDest->connection = NULL; - copyDest->useLocalCopy = CanUseLocalCopy(destinationNodeId); - - return (DestReceiver *) copyDest; -} - -static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) -{ /* * Since we are doing a local copy, the following statements should * use local execution to see the changes diff --git a/src/backend/distributed/operations/worker_split_copy.c b/src/backend/distributed/operations/worker_split_copy.c index 38168600e..4df59e62a 100644 --- a/src/backend/distributed/operations/worker_split_copy.c +++ b/src/backend/distributed/operations/worker_split_copy.c @@ -36,8 +36,8 @@ typedef struct SplitCopyDestReceiver /* Split factor */ uint splitFactor; - /* Source shard name */ - char *sourceShardName; + /* EState for per-tuple memory allocation */ + EState *executorState; /* Source shard Oid */ Oid sourceShardRelationOid; @@ -51,7 +51,7 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, static void SplitCopyDestReceiverShutdown(DestReceiver *dest); static void SplitCopyDestReceiverDestroy(DestReceiver *copyDest); -DestReceiver * CreateSplitCopyDestReceiver(uint64 sourceShardIdToCopy, List* splitCopyInfoList) +DestReceiver * CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy, List* splitCopyInfoList) { SplitCopyDestReceiver *splitCopyDest = palloc0(sizeof(SplitCopyDestReceiver)); @@ -62,6 +62,7 @@ DestReceiver * CreateSplitCopyDestReceiver(uint64 sourceShardIdToCopy, List* spl splitCopyDest->pub.rShutdown = SplitCopyDestReceiverShutdown; splitCopyDest->pub.rDestroy = SplitCopyDestReceiverDestroy; + splitCopyDest->executorState = executorState; splitCopyDest->splitFactor = splitCopyInfoList->length; ShardInterval *shardIntervalToSplitCopy = LoadShardInterval(sourceShardIdToCopy); splitCopyDest->sourceShardRelationOid = shardIntervalToSplitCopy->relationId; @@ -83,6 +84,7 @@ DestReceiver * CreateSplitCopyDestReceiver(uint64 sourceShardIdToCopy, List* spl quote_qualified_identifier(destinationShardSchemaName, destinationShardNameCopy); DestReceiver *shardCopyDest = CreateShardCopyDestReceiver( + executorState, destinationShardFullyQualifiedName, splitCopyInfo->destinationShardNodeId); @@ -99,24 +101,29 @@ DestReceiver * CreateSplitCopyDestReceiver(uint64 sourceShardIdToCopy, List* spl static void SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor) { - SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest; + SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest; - for (int index = 0; index < self->splitFactor; index++) + for (int index = 0; index < copyDest->splitFactor; index++) { - DestReceiver *shardCopyDest = self->shardCopyDestReceiverArray[index]; + DestReceiver *shardCopyDest = copyDest->shardCopyDestReceiverArray[index]; shardCopyDest->rStartup(shardCopyDest, operation, inputTupleDescriptor); } } static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { - SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest; + SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest; - CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(self->sourceShardRelationOid); + /* Switch to a per-tuple memory memory context */ + EState *executorState = copyDest->executorState; + MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); + MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); + + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(copyDest->sourceShardRelationOid); if (cacheEntry == NULL) { ereport(ERROR, errmsg("Could not find shard %s for split copy.", - self->sourceShardName)); + get_rel_name(copyDest->sourceShardRelationOid))); } /* Partition Column Metadata on source shard */ @@ -131,48 +138,51 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *des if (columnNulls[partitionColumnIndex]) { ereport(ERROR, errmsg("Found null partition value for shard %s during split copy.", - self->sourceShardName)); + get_rel_name(copyDest->sourceShardRelationOid))); } Datum hashedValueDatum = FunctionCall1(hashFunction, columnValues[partitionColumnIndex]); int32_t hashedValue = DatumGetInt32(hashedValueDatum); - for(int index = 0 ; index < self->splitFactor; index++) + for(int index = 0 ; index < copyDest->splitFactor; index++) { - SplitCopyInfo *splitCopyInfo = self->splitCopyInfoArray[index]; + SplitCopyInfo *splitCopyInfo = copyDest->splitCopyInfoArray[index]; if (splitCopyInfo->destinationShardMinHashValue <= hashedValue && splitCopyInfo->destinationShardMaxHashValue >= hashedValue) { - DestReceiver *shardCopyDestReceiver = self->shardCopyDestReceiverArray[index]; + DestReceiver *shardCopyDestReceiver = copyDest->shardCopyDestReceiverArray[index]; shardCopyDestReceiver->receiveSlot(slot, shardCopyDestReceiver); } } + MemoryContextSwitchTo(oldContext); + ResetPerTupleExprContext(executorState); + return true; } static void SplitCopyDestReceiverShutdown(DestReceiver *dest) { - SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest; + SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest; - for (int index = 0; index < self->splitFactor; index++) + for (int index = 0; index < copyDest->splitFactor; index++) { - DestReceiver *shardCopyDest = self->shardCopyDestReceiverArray[index]; + DestReceiver *shardCopyDest = copyDest->shardCopyDestReceiverArray[index]; shardCopyDest->rShutdown(shardCopyDest); } } static void SplitCopyDestReceiverDestroy(DestReceiver *dest) { - SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest; + SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest; - for (int index = 0; index < self->splitFactor; index++) + for (int index = 0; index < copyDest->splitFactor; index++) { - DestReceiver *shardCopyDest = self->shardCopyDestReceiverArray[index]; + DestReceiver *shardCopyDest = copyDest->shardCopyDestReceiverArray[index]; shardCopyDest->rDestroy(shardCopyDest); pfree(shardCopyDest); - pfree(self->splitCopyInfoArray[index]); + pfree(copyDest->splitCopyInfoArray[index]); } } diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index 28e5b5254..10272fd8d 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -15,7 +15,6 @@ #include "distributed/worker_split_copy.h" #include "distributed/citus_ruleutils.h" -PG_FUNCTION_INFO_V1(worker_split_shardgroup_copy); PG_FUNCTION_INFO_V1(worker_split_copy); static void @@ -48,7 +47,9 @@ worker_split_copy(PG_FUNCTION_ARGS) splitCopyInfoList = lappend(splitCopyInfoList, splitCopyInfo); } - DestReceiver *splitCopyDestReceiver = CreateSplitCopyDestReceiver(shardIdToSplitCopy, splitCopyInfoList); + + EState *executor = CreateExecutorState(); + DestReceiver *splitCopyDestReceiver = CreateSplitCopyDestReceiver(executor, shardIdToSplitCopy, splitCopyInfoList); StringInfo selectShardQueryForCopy = makeStringInfo(); appendStringInfo(selectShardQueryForCopy, @@ -58,6 +59,8 @@ worker_split_copy(PG_FUNCTION_ARGS) ParamListInfo params = NULL; ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params, (DestReceiver *) splitCopyDestReceiver); + FreeExecutorState(executor); + PG_RETURN_VOID(); } diff --git a/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql index 458cef095..e7d2c67ff 100644 --- a/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql @@ -9,3 +9,4 @@ DROP FUNCTION pg_catalog.worker_repartition_cleanup(bigint); #include "../../columnar/sql/columnar--11.0-2--11.1-1.sql" #include "udfs/citus_split_shard_by_split_points/11.0-2.sql" +#include "udfs/worker_split_copy/11.0-2.sql" diff --git a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.0-2.sql b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.0-2.sql index ca100f04e..9369d356c 100644 --- a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.0-2.sql +++ b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.0-2.sql @@ -1,3 +1,5 @@ +DROP TYPE IF EXISTS citus.split_mode; + -- Three modes to be implemented: blocking, non_blocking and auto. -- Currently, the default / only supported mode is blocking. CREATE TYPE citus.split_mode AS ENUM ( diff --git a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql index ca100f04e..9369d356c 100644 --- a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql @@ -1,3 +1,5 @@ +DROP TYPE IF EXISTS citus.split_mode; + -- Three modes to be implemented: blocking, non_blocking and auto. -- Currently, the default / only supported mode is blocking. CREATE TYPE citus.split_mode AS ENUM ( diff --git a/src/include/distributed/listutils.h b/src/include/distributed/listutils.h index ad70fbabc..e4a185b4d 100644 --- a/src/include/distributed/listutils.h +++ b/src/include/distributed/listutils.h @@ -97,7 +97,6 @@ typedef struct ListCellAndListWrapper var2 ## CellDoNotUse = lnext_compat(l2, var2 ## CellDoNotUse) \ ) - /* * forboth_ptr_oid - * a convenience macro which loops through two lists at the same time. The diff --git a/src/include/distributed/worker_shard_copy.h b/src/include/distributed/worker_shard_copy.h index 2db6514de..ce2eda51c 100644 --- a/src/include/distributed/worker_shard_copy.h +++ b/src/include/distributed/worker_shard_copy.h @@ -15,6 +15,7 @@ struct FullRelationName; extern DestReceiver * CreateShardCopyDestReceiver( + EState *executorState, char* destinationShardFullyQualifiedName, uint32_t destinationNodeId); diff --git a/src/include/distributed/worker_split_copy.h b/src/include/distributed/worker_split_copy.h index a9f664f59..97f863164 100644 --- a/src/include/distributed/worker_split_copy.h +++ b/src/include/distributed/worker_split_copy.h @@ -15,11 +15,11 @@ typedef struct SplitCopyInfo { uint64 destinationShardId; /* destination shard id */ - int32 destinationShardMinHashValue; /* min hash value of destination shard */ - int32 destinationShardMaxHashValue; /* max hash value of destination shard */ + int32 destinationShardMinHashValue; /* min hash value of destination shard */ + int32 destinationShardMaxHashValue; /* max hash value of destination shard */ uint32_t destinationShardNodeId; /* node where split child shard is to be placed */ } SplitCopyInfo; -extern DestReceiver* CreateSplitCopyDestReceiver(uint64 sourceShardIdToCopy, List* splitCopyInfoList); +extern DestReceiver* CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy, List* splitCopyInfoList); #endif /* WORKER_SPLIT_COPY_H_ */