From cf8fea5a3f3eba33afe15fed370fd6a10dca9f7d Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Mon, 11 Jul 2022 15:44:48 -0700 Subject: [PATCH] Address PR comments --- .../distributed/operations/shard_split.c | 108 ++++++++++++++---- .../operations/worker_shard_copy.c | 12 +- .../operations/worker_split_copy_udf.c | 11 +- .../expected/worker_split_copy_test.out | 46 ++++++++ .../regress/sql/worker_split_copy_test.sql | 43 +++++++ 5 files changed, 188 insertions(+), 32 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 7411e39a6..22cbbbc97 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -32,6 +32,7 @@ #include "distributed/pg_dist_shard.h" #include "distributed/metadata_sync.h" #include "distributed/multi_physical_planner.h" +#include "distributed/deparse_shard_query.h" /* Function declarations */ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, @@ -70,6 +71,8 @@ static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, List *workersForPlacementList); +static Task * CreateTaskForDDLCommandList(uint64 jobId, List *ddlCommandList, + WorkerNode *workerNode); /* Customize error message strings based on operation type */ static const char *const SplitOperationName[] = @@ -354,7 +357,7 @@ SplitShard(SplitMode splitMode, shardSplitPointsList, nodeIdsForPlacementList); - List *workersForPlacementList = NULL; + List *workersForPlacementList = NIL; Datum nodeId; foreach_int(nodeId, nodeIdsForPlacementList) { @@ -472,13 +475,19 @@ static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { - /* Iterate on shard interval list for shard group */ - List *shardIntervalList = NULL; + /* + * Iterate over all the shards in the shard group. + */ + List *shardIntervalList = NIL; foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { - /* Iterate on split shard interval list and corresponding placement worker */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; + + /* + * Iterate on split shards DDL command list for a given shard + * and create them on corresponding workerPlacementNode. + */ forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, workersForPlacementList) { @@ -498,6 +507,24 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, } +/* Create a DDL task with corresponding task list on given worker node */ +static Task * +CreateTaskForDDLCommandList(uint64 jobId, List *ddlCommandList, WorkerNode *workerNode) +{ + Task *ddlTask = CitusMakeNode(Task); + ddlTask->jobId = jobId; + ddlTask->taskType = DDL_TASK; + ddlTask->replicationModel = REPLICATION_MODEL_INVALID; + SetTaskQueryStringList(ddlTask, ddlCommandList); + + ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); + SetPlacementNodeMetadata(taskPlacement, workerNode); + ddlTask->taskPlacementList = list_make1(taskPlacement); + + return ddlTask; +} + + /* Create ShardGroup auxiliary structures (indexes, stats, replicaindentities, triggers) * on a list of corresponding workers. */ @@ -505,29 +532,45 @@ static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { + List *shardIntervalList = NIL; + List *ddlTaskExecList = NIL; + /* - * Create auxiliary structures post copy. + * Iterate over all the shards in the shard group. */ - List *shardIntervalList = NULL; foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { - /* Iterate on split shard interval list and corresponding placement worker */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; + + /* + * Iterate on split shard interval list for given shard and create tasks + * for every single split shard in a shard group. + */ forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, workersForPlacementList) { - List *indexCommandList = GetPostLoadTableCreationCommands( + List *ddlCommandList = GetPostLoadTableCreationCommands( shardInterval->relationId, true /* includeIndexes */, true /* includeReplicaIdentity */); - indexCommandList = WorkerApplyShardDDLCommandList( - indexCommandList, + ddlCommandList = WorkerApplyShardDDLCommandList( + ddlCommandList, shardInterval->shardId); - CreateObjectOnPlacement(indexCommandList, workerPlacementNode); + uint64 jobId = shardInterval->shardId; + Task *ddlTask = CreateTaskForDDLCommandList(jobId, ddlCommandList, + workerPlacementNode); + + ddlTaskExecList = lappend(ddlTaskExecList, ddlTask); } } + + ExecuteTaskListOutsideTransaction( + ROW_MODIFY_NONE, + ddlTaskExecList, + MaxAdaptiveExecutorPoolSize, + NULL /* jobIdList (ignored by API implementation) */); } @@ -565,10 +608,10 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList) { ShardInterval *sourceShardIntervalToCopy = NULL; - List *splitShardIntervalList = NULL; + List *splitShardIntervalList = NIL; int taskId = 0; - List *splitCopyTaskList = NULL; + List *splitCopyTaskList = NIL; forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList, splitShardIntervalList, shardGroupSplitIntervalListList) { @@ -690,12 +733,12 @@ static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList, List *splitPointsForShard) { - List *shardGroupSplitIntervalListList = NULL; + List *shardGroupSplitIntervalListList = NIL; ShardInterval *shardToSplitInterval = NULL; foreach_ptr(shardToSplitInterval, sourceColocatedShardIntervalList) { - List *shardSplitIntervalList = NULL; + List *shardSplitIntervalList = NIL; CreateSplitIntervalsForShard(shardToSplitInterval, splitPointsForShard, &shardSplitIntervalList); @@ -761,12 +804,17 @@ static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { - /* Iterate on shard intervals for shard group */ - List *shardIntervalList = NULL; - List *syncedShardList = NULL; + List *shardIntervalList = NIL; + List *syncedShardList = NIL; + + /* + * Iterate over all the shards in the shard group. + */ foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { - /* Iterate on split children shards along with the respective placement workers */ + /* + * Iterate on split shards list for a given shard and insert metadata. + */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, @@ -811,12 +859,19 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { /* Create constraints between shards */ - List *shardIntervalList = NULL; + List *shardIntervalList = NIL; + + /* + * Iterate over all the shards in the shard group. + */ foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { - /* Iterate on split children shards along with the respective placement workers */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; + + /* + * Iterate on split shards list for a given shard and create constraints. + */ forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, workersForPlacementList) { @@ -922,12 +977,19 @@ static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { - List *shardIntervalList = NULL; + List *shardIntervalList = NIL; + + /* + * Iterate over all the shards in the shard group. + */ foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { - /* Iterate on split shard interval list and corresponding placement worker */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; + + /* + * Iterate on split shards list for a given shard and perform drop. + */ forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, workersForPlacementList) { diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 473f3b670..a90c74ef9 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -71,8 +71,9 @@ static void ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, static void ShardCopyDestReceiverShutdown(DestReceiver *destReceiver); static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver); static bool CanUseLocalCopy(uint64 destinationNodeId); -static StringInfo ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool - useBinaryFormat); +static StringInfo ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, + bool + useBinaryFormat); static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest); static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState @@ -102,7 +103,7 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) NULL /* database (current) */); ClaimConnectionExclusively(copyDest->connection); - StringInfo copyStatement = ConstructCopyStatement( + StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, copyDest->copyOutState->binary); @@ -344,11 +345,12 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest) /* - * ConstructCopyStatement constructs the text of a COPY statement + * ConstructShardCopyStatement constructs the text of a COPY statement * for copying into a result table */ static StringInfo -ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool useBinaryFormat) +ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool + useBinaryFormat) { char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index 63f7656fd..17efac4c0 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -12,6 +12,7 @@ #include "utils/lsyscache.h" #include "utils/array.h" #include "utils/builtins.h" +#include "distributed/utils/array_type.h" #include "distributed/listutils.h" #include "distributed/multi_executor.h" #include "distributed/worker_shard_copy.h" @@ -54,10 +55,12 @@ worker_split_copy(PG_FUNCTION_ARGS) ShardInterval *shardIntervalToSplitCopy = LoadShardInterval(shardIdToSplitCopy); ArrayType *splitCopyInfoArrayObject = PG_GETARG_ARRAYTYPE_P(1); - if (array_contains_nulls(splitCopyInfoArrayObject)) + bool arrayHasNull = ARR_HASNULL(splitCopyInfoArrayObject); + if (arrayHasNull) { - ereport(ERROR, - (errmsg("Shard Copy Info cannot have null values."))); + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg( + "citus.split_copy_info array cannot contain null values"))); } const int slice_ndim = 0; @@ -67,7 +70,7 @@ worker_split_copy(PG_FUNCTION_ARGS) mState); Datum copyInfoDatum = 0; bool isnull = false; - List *splitCopyInfoList = NULL; + List *splitCopyInfoList = NIL; while (array_iterate(copyInfo_iterator, ©InfoDatum, &isnull)) { SplitCopyInfo *splitCopyInfo = NULL; diff --git a/src/test/regress/expected/worker_split_copy_test.out b/src/test/regress/expected/worker_split_copy_test.out index 52f93d5c1..60c145069 100644 --- a/src/test/regress/expected/worker_split_copy_test.out +++ b/src/test/regress/expected/worker_split_copy_test.out @@ -51,6 +51,52 @@ SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_810700 SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -- END: Set worker_1_node and worker_2_node +-- BEGIN: Test Negative scenario +SELECT * from worker_split_copy( + 101, -- Invalid source shard id. + ARRAY[ + -- split copy info for split children 1 + ROW(81070015, -- destination shard id + -2147483648, -- split range begin + -1073741824, --split range end + :worker_1_node)::citus.split_copy_info, + -- split copy info for split children 2 + ROW(81070016, --destination shard id + -1073741823, --split range begin + -1, --split range end + :worker_1_node)::citus.split_copy_info + ] + ); +ERROR: could not find valid entry for shard xxxxx +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[] -- empty array + ); +ERROR: cannot determine type of empty array +HINT: Explicitly cast to the desired type, for example ARRAY[]::integer[]. +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[NULL] -- empty array + ); +ERROR: function worker_split_copy(integer, text[]) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[NULL::citus.split_copy_info]-- empty array + ); +ERROR: citus.split_copy_info array cannot contain null values +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[ROW(NULL)]-- empty array + ); +ERROR: function worker_split_copy(integer, record[]) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[ROW(NULL, NULL, NULL, NULL)::citus.split_copy_info] -- empty array + ); +ERROR: destination_shard_id for split_copy_info cannot be null. +-- END: Test Negative scenario -- BEGIN: Trigger 2-way local shard split copy. -- Ensure we will perform text copy. SET citus.enable_binary_protocol = false; diff --git a/src/test/regress/sql/worker_split_copy_test.sql b/src/test/regress/sql/worker_split_copy_test.sql index 324cc87c2..ff27faae1 100644 --- a/src/test/regress/sql/worker_split_copy_test.sql +++ b/src/test/regress/sql/worker_split_copy_test.sql @@ -35,6 +35,49 @@ SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -- END: Set worker_1_node and worker_2_node +-- BEGIN: Test Negative scenario +SELECT * from worker_split_copy( + 101, -- Invalid source shard id. + ARRAY[ + -- split copy info for split children 1 + ROW(81070015, -- destination shard id + -2147483648, -- split range begin + -1073741824, --split range end + :worker_1_node)::citus.split_copy_info, + -- split copy info for split children 2 + ROW(81070016, --destination shard id + -1073741823, --split range begin + -1, --split range end + :worker_1_node)::citus.split_copy_info + ] + ); + +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[] -- empty array + ); + +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[NULL] -- empty array + ); + +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[NULL::citus.split_copy_info]-- empty array + ); + +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[ROW(NULL)]-- empty array + ); + +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[ROW(NULL, NULL, NULL, NULL)::citus.split_copy_info] -- empty array + ); +-- END: Test Negative scenario + -- BEGIN: Trigger 2-way local shard split copy. -- Ensure we will perform text copy. SET citus.enable_binary_protocol = false;