From cf8fea5a3f3eba33afe15fed370fd6a10dca9f7d Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Mon, 11 Jul 2022 15:44:48 -0700 Subject: [PATCH 01/11] Address PR comments --- .../distributed/operations/shard_split.c | 108 ++++++++++++++---- .../operations/worker_shard_copy.c | 12 +- .../operations/worker_split_copy_udf.c | 11 +- .../expected/worker_split_copy_test.out | 46 ++++++++ .../regress/sql/worker_split_copy_test.sql | 43 +++++++ 5 files changed, 188 insertions(+), 32 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 7411e39a6..22cbbbc97 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -32,6 +32,7 @@ #include "distributed/pg_dist_shard.h" #include "distributed/metadata_sync.h" #include "distributed/multi_physical_planner.h" +#include "distributed/deparse_shard_query.h" /* Function declarations */ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, @@ -70,6 +71,8 @@ static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, List *workersForPlacementList); +static Task * CreateTaskForDDLCommandList(uint64 jobId, List *ddlCommandList, + WorkerNode *workerNode); /* Customize error message strings based on operation type */ static const char *const SplitOperationName[] = @@ -354,7 +357,7 @@ SplitShard(SplitMode splitMode, shardSplitPointsList, nodeIdsForPlacementList); - List *workersForPlacementList = NULL; + List *workersForPlacementList = NIL; Datum nodeId; foreach_int(nodeId, nodeIdsForPlacementList) { @@ -472,13 +475,19 @@ static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { - /* Iterate on shard interval list for shard group */ - List *shardIntervalList = NULL; + /* + * Iterate over all the shards in the shard group. + */ + List *shardIntervalList = NIL; foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { - /* Iterate on split shard interval list and corresponding placement worker */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; + + /* + * Iterate on split shards DDL command list for a given shard + * and create them on corresponding workerPlacementNode. + */ forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, workersForPlacementList) { @@ -498,6 +507,24 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, } +/* Create a DDL task with corresponding task list on given worker node */ +static Task * +CreateTaskForDDLCommandList(uint64 jobId, List *ddlCommandList, WorkerNode *workerNode) +{ + Task *ddlTask = CitusMakeNode(Task); + ddlTask->jobId = jobId; + ddlTask->taskType = DDL_TASK; + ddlTask->replicationModel = REPLICATION_MODEL_INVALID; + SetTaskQueryStringList(ddlTask, ddlCommandList); + + ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); + SetPlacementNodeMetadata(taskPlacement, workerNode); + ddlTask->taskPlacementList = list_make1(taskPlacement); + + return ddlTask; +} + + /* Create ShardGroup auxiliary structures (indexes, stats, replicaindentities, triggers) * on a list of corresponding workers. */ @@ -505,29 +532,45 @@ static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { + List *shardIntervalList = NIL; + List *ddlTaskExecList = NIL; + /* - * Create auxiliary structures post copy. + * Iterate over all the shards in the shard group. */ - List *shardIntervalList = NULL; foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { - /* Iterate on split shard interval list and corresponding placement worker */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; + + /* + * Iterate on split shard interval list for given shard and create tasks + * for every single split shard in a shard group. + */ forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, workersForPlacementList) { - List *indexCommandList = GetPostLoadTableCreationCommands( + List *ddlCommandList = GetPostLoadTableCreationCommands( shardInterval->relationId, true /* includeIndexes */, true /* includeReplicaIdentity */); - indexCommandList = WorkerApplyShardDDLCommandList( - indexCommandList, + ddlCommandList = WorkerApplyShardDDLCommandList( + ddlCommandList, shardInterval->shardId); - CreateObjectOnPlacement(indexCommandList, workerPlacementNode); + uint64 jobId = shardInterval->shardId; + Task *ddlTask = CreateTaskForDDLCommandList(jobId, ddlCommandList, + workerPlacementNode); + + ddlTaskExecList = lappend(ddlTaskExecList, ddlTask); } } + + ExecuteTaskListOutsideTransaction( + ROW_MODIFY_NONE, + ddlTaskExecList, + MaxAdaptiveExecutorPoolSize, + NULL /* jobIdList (ignored by API implementation) */); } @@ -565,10 +608,10 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList) { ShardInterval *sourceShardIntervalToCopy = NULL; - List *splitShardIntervalList = NULL; + List *splitShardIntervalList = NIL; int taskId = 0; - List *splitCopyTaskList = NULL; + List *splitCopyTaskList = NIL; forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList, splitShardIntervalList, shardGroupSplitIntervalListList) { @@ -690,12 +733,12 @@ static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList, List *splitPointsForShard) { - List *shardGroupSplitIntervalListList = NULL; + List *shardGroupSplitIntervalListList = NIL; ShardInterval *shardToSplitInterval = NULL; foreach_ptr(shardToSplitInterval, sourceColocatedShardIntervalList) { - List *shardSplitIntervalList = NULL; + List *shardSplitIntervalList = NIL; CreateSplitIntervalsForShard(shardToSplitInterval, splitPointsForShard, &shardSplitIntervalList); @@ -761,12 +804,17 @@ static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { - /* Iterate on shard intervals for shard group */ - List *shardIntervalList = NULL; - List *syncedShardList = NULL; + List *shardIntervalList = NIL; + List *syncedShardList = NIL; + + /* + * Iterate over all the shards in the shard group. + */ foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { - /* Iterate on split children shards along with the respective placement workers */ + /* + * Iterate on split shards list for a given shard and insert metadata. + */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, @@ -811,12 +859,19 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { /* Create constraints between shards */ - List *shardIntervalList = NULL; + List *shardIntervalList = NIL; + + /* + * Iterate over all the shards in the shard group. + */ foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { - /* Iterate on split children shards along with the respective placement workers */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; + + /* + * Iterate on split shards list for a given shard and create constraints. + */ forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, workersForPlacementList) { @@ -922,12 +977,19 @@ static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { - List *shardIntervalList = NULL; + List *shardIntervalList = NIL; + + /* + * Iterate over all the shards in the shard group. + */ foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { - /* Iterate on split shard interval list and corresponding placement worker */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; + + /* + * Iterate on split shards list for a given shard and perform drop. + */ forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, workersForPlacementList) { diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 473f3b670..a90c74ef9 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -71,8 +71,9 @@ static void ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, static void ShardCopyDestReceiverShutdown(DestReceiver *destReceiver); static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver); static bool CanUseLocalCopy(uint64 destinationNodeId); -static StringInfo ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool - useBinaryFormat); +static StringInfo ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, + bool + useBinaryFormat); static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest); static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState @@ -102,7 +103,7 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) NULL /* database (current) */); ClaimConnectionExclusively(copyDest->connection); - StringInfo copyStatement = ConstructCopyStatement( + StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, copyDest->copyOutState->binary); @@ -344,11 +345,12 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest) /* - * ConstructCopyStatement constructs the text of a COPY statement + * ConstructShardCopyStatement constructs the text of a COPY statement * for copying into a result table */ static StringInfo -ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool useBinaryFormat) +ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool + useBinaryFormat) { char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index 63f7656fd..17efac4c0 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -12,6 +12,7 @@ #include "utils/lsyscache.h" #include "utils/array.h" #include "utils/builtins.h" +#include "distributed/utils/array_type.h" #include "distributed/listutils.h" #include "distributed/multi_executor.h" #include "distributed/worker_shard_copy.h" @@ -54,10 +55,12 @@ worker_split_copy(PG_FUNCTION_ARGS) ShardInterval *shardIntervalToSplitCopy = LoadShardInterval(shardIdToSplitCopy); ArrayType *splitCopyInfoArrayObject = PG_GETARG_ARRAYTYPE_P(1); - if (array_contains_nulls(splitCopyInfoArrayObject)) + bool arrayHasNull = ARR_HASNULL(splitCopyInfoArrayObject); + if (arrayHasNull) { - ereport(ERROR, - (errmsg("Shard Copy Info cannot have null values."))); + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg( + "citus.split_copy_info array cannot contain null values"))); } const int slice_ndim = 0; @@ -67,7 +70,7 @@ worker_split_copy(PG_FUNCTION_ARGS) mState); Datum copyInfoDatum = 0; bool isnull = false; - List *splitCopyInfoList = NULL; + List *splitCopyInfoList = NIL; while (array_iterate(copyInfo_iterator, ©InfoDatum, &isnull)) { SplitCopyInfo *splitCopyInfo = NULL; diff --git a/src/test/regress/expected/worker_split_copy_test.out b/src/test/regress/expected/worker_split_copy_test.out index 52f93d5c1..60c145069 100644 --- a/src/test/regress/expected/worker_split_copy_test.out +++ b/src/test/regress/expected/worker_split_copy_test.out @@ -51,6 +51,52 @@ SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_810700 SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -- END: Set worker_1_node and worker_2_node +-- BEGIN: Test Negative scenario +SELECT * from worker_split_copy( + 101, -- Invalid source shard id. + ARRAY[ + -- split copy info for split children 1 + ROW(81070015, -- destination shard id + -2147483648, -- split range begin + -1073741824, --split range end + :worker_1_node)::citus.split_copy_info, + -- split copy info for split children 2 + ROW(81070016, --destination shard id + -1073741823, --split range begin + -1, --split range end + :worker_1_node)::citus.split_copy_info + ] + ); +ERROR: could not find valid entry for shard xxxxx +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[] -- empty array + ); +ERROR: cannot determine type of empty array +HINT: Explicitly cast to the desired type, for example ARRAY[]::integer[]. +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[NULL] -- empty array + ); +ERROR: function worker_split_copy(integer, text[]) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[NULL::citus.split_copy_info]-- empty array + ); +ERROR: citus.split_copy_info array cannot contain null values +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[ROW(NULL)]-- empty array + ); +ERROR: function worker_split_copy(integer, record[]) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[ROW(NULL, NULL, NULL, NULL)::citus.split_copy_info] -- empty array + ); +ERROR: destination_shard_id for split_copy_info cannot be null. +-- END: Test Negative scenario -- BEGIN: Trigger 2-way local shard split copy. -- Ensure we will perform text copy. SET citus.enable_binary_protocol = false; diff --git a/src/test/regress/sql/worker_split_copy_test.sql b/src/test/regress/sql/worker_split_copy_test.sql index 324cc87c2..ff27faae1 100644 --- a/src/test/regress/sql/worker_split_copy_test.sql +++ b/src/test/regress/sql/worker_split_copy_test.sql @@ -35,6 +35,49 @@ SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -- END: Set worker_1_node and worker_2_node +-- BEGIN: Test Negative scenario +SELECT * from worker_split_copy( + 101, -- Invalid source shard id. + ARRAY[ + -- split copy info for split children 1 + ROW(81070015, -- destination shard id + -2147483648, -- split range begin + -1073741824, --split range end + :worker_1_node)::citus.split_copy_info, + -- split copy info for split children 2 + ROW(81070016, --destination shard id + -1073741823, --split range begin + -1, --split range end + :worker_1_node)::citus.split_copy_info + ] + ); + +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[] -- empty array + ); + +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[NULL] -- empty array + ); + +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[NULL::citus.split_copy_info]-- empty array + ); + +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[ROW(NULL)]-- empty array + ); + +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[ROW(NULL, NULL, NULL, NULL)::citus.split_copy_info] -- empty array + ); +-- END: Test Negative scenario + -- BEGIN: Trigger 2-way local shard split copy. -- Ensure we will perform text copy. SET citus.enable_binary_protocol = false; From 8f8761ed407a663fe5b9e3c9795b43d98a5578f8 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Mon, 11 Jul 2022 17:45:32 -0700 Subject: [PATCH 02/11] Fix null handling when creating AuxiliaryStructures --- src/backend/distributed/operations/shard_split.c | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 22cbbbc97..a2ba24df4 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -558,11 +558,18 @@ CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, ddlCommandList, shardInterval->shardId); - uint64 jobId = shardInterval->shardId; - Task *ddlTask = CreateTaskForDDLCommandList(jobId, ddlCommandList, - workerPlacementNode); + /* + * A task is expected to be instantiated with a non-null 'ddlCommandList'. + * The list can be empty, if no auxiliary structures are present. + */ + if (ddlCommandList != NULL) + { + uint64 jobId = shardInterval->shardId; + Task *ddlTask = CreateTaskForDDLCommandList(jobId, ddlCommandList, + workerPlacementNode); - ddlTaskExecList = lappend(ddlTaskExecList, ddlTask); + ddlTaskExecList = lappend(ddlTaskExecList, ddlTask); + } } } From 0445d72cc763aec1cce03d64a48afa91438ccba1 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Tue, 12 Jul 2022 10:38:18 -0700 Subject: [PATCH 03/11] Ensure local copy is triggered in tests --- .../operations/worker_shard_copy.c | 6 +-- .../citus_split_shard_by_split_points.out | 2 - .../expected/multi_cluster_management.out | 37 +++++++++++++++++++ .../sql/citus_split_shard_by_split_points.sql | 2 - .../regress/sql/multi_cluster_management.sql | 8 ++++ 5 files changed, 48 insertions(+), 7 deletions(-) diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index a90c74ef9..9239caffb 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -70,7 +70,7 @@ static void ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor); static void ShardCopyDestReceiverShutdown(DestReceiver *destReceiver); static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver); -static bool CanUseLocalCopy(uint64 destinationNodeId); +static bool CanUseLocalCopy(uint32_t destinationNodeId); static StringInfo ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool useBinaryFormat); @@ -81,10 +81,10 @@ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); static bool -CanUseLocalCopy(uint64 destinationNodeId) +CanUseLocalCopy(uint32_t destinationNodeId) { /* If destination node is same as source, use local copy */ - return GetLocalNodeId() == destinationNodeId; + return GetLocalNodeId() == (int32) destinationNodeId; } diff --git a/src/test/regress/expected/citus_split_shard_by_split_points.out b/src/test/regress/expected/citus_split_shard_by_split_points.out index 8adbb5f69..2d008dc24 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points.out @@ -91,7 +91,6 @@ SELECT COUNT(*) FROM colocated_dist_table; -- END: Load data into tables. -- BEGIN : Display current state. --- TODO(niupre): Can we refactor this to be a function? SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid @@ -241,7 +240,6 @@ SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localho -- END : Move a shard post split. -- BEGIN : Display current state. --- TODO(niupre): Can we refactor this to be a function? SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index cb3fe3a5d..1ac2c4965 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -1258,3 +1258,40 @@ SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHER t (1 row) +-- Remove extra nodes added, this causes GetLocalNodeId() to behave incorrectly in other tests. +SELECT master_remove_node('localhost', 8887); + master_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT master_remove_node('localhost', 9995); + master_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT master_remove_node('localhost', 9992); + master_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT master_remove_node('localhost', 9998); + master_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT master_remove_node('localhost', 9997); + master_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT master_remove_node('localhost', 8888); + master_remove_node +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/sql/citus_split_shard_by_split_points.sql b/src/test/regress/sql/citus_split_shard_by_split_points.sql index 8d618c3c5..7448a488a 100644 --- a/src/test/regress/sql/citus_split_shard_by_split_points.sql +++ b/src/test/regress/sql/citus_split_shard_by_split_points.sql @@ -67,7 +67,6 @@ SELECT COUNT(*) FROM colocated_dist_table; -- END: Load data into tables. -- BEGIN : Display current state. --- TODO(niupre): Can we refactor this to be a function? SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid @@ -148,7 +147,6 @@ SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localho -- END : Move a shard post split. -- BEGIN : Display current state. --- TODO(niupre): Can we refactor this to be a function? SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index 6d9b9c589..3d78f1382 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -511,3 +511,11 @@ SELECT start_metadata_sync_to_all_nodes(); -- verify that at the end of this file, all primary nodes have metadata synced SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary'; + +-- Remove extra nodes added, this causes GetLocalNodeId() to behave incorrectly in other tests. +SELECT master_remove_node('localhost', 8887); +SELECT master_remove_node('localhost', 9995); +SELECT master_remove_node('localhost', 9992); +SELECT master_remove_node('localhost', 9998); +SELECT master_remove_node('localhost', 9997); +SELECT master_remove_node('localhost', 8888); From c4a0d55b4586e4a2166f9b6082de20c0c2f4064f Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Tue, 12 Jul 2022 13:42:35 -0700 Subject: [PATCH 04/11] Limit max shards that can be created with split --- src/backend/distributed/operations/shard_split.c | 10 +++++++++- .../citus_split_shard_by_split_points_negative.out | 9 +++++++++ .../sql/citus_split_shard_by_split_points_negative.sql | 9 +++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index a2ba24df4..3578b00e0 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -16,6 +16,7 @@ #include "lib/stringinfo.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "distributed/shared_library_init.h" #include "distributed/adaptive_executor.h" #include "distributed/colocation_utils.h" #include "distributed/metadata_cache.h" @@ -223,7 +224,8 @@ ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, int splitPointsCount = list_length(shardSplitPointsList); int nodeIdsCount = list_length(nodeIdsForPlacementList); - if (nodeIdsCount != splitPointsCount + 1) + int shardsCount = splitPointsCount + 1; + if (nodeIdsCount != shardsCount) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -234,6 +236,12 @@ ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, splitPointsCount))); } + if (shardsCount > MAX_SHARD_COUNT) + { + ereport(ERROR, (errmsg( + "Resulting shard count '%d' with split is greater than max shard count '%d' limit.", + shardsCount, MAX_SHARD_COUNT))); + } Assert(shardIntervalToSplit->minValueExists); Assert(shardIntervalToSplit->maxValueExists); diff --git a/src/test/regress/expected/citus_split_shard_by_split_points_negative.out b/src/test/regress/expected/citus_split_shard_by_split_points_negative.out index d8b006741..148e3a142 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points_negative.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points_negative.out @@ -83,6 +83,15 @@ SELECT citus_split_shard_by_split_points( ARRAY['-1073741825'], -- Split point equals shard's max value. ARRAY[:worker_1_node, :worker_2_node]); ERROR: Invalid split point -1073741825, as split points should be inclusive. Please use -1073741826 instead. +-- UDF fails if resulting shard count from split greater than MAX_SHARD_COUNT (64000) +-- 64000 split point definee 64000+1 way split (64001 worker nodes needed). +WITH shard_ranges AS (SELECT ((-2147483648 + indx))::text as split_points, :worker_1_node as node_ids FROM generate_series(1,64000) indx ) +SELECT citus_split_shard_by_split_points( + 49761300, + array_agg(split_points), + array_agg(node_ids) || :worker_1_node) --placement node list should exceed split points by one. +FROM shard_ranges; +ERROR: Resulting shard count '64001' with split is greater than max shard count '64000' limit. -- UDF fails where source shard cannot be split further i.e min and max range is equal. -- Create a Shard where range cannot be split further SELECT isolate_tenant_to_new_shard('table_to_split', 1); diff --git a/src/test/regress/sql/citus_split_shard_by_split_points_negative.sql b/src/test/regress/sql/citus_split_shard_by_split_points_negative.sql index bdaf32682..7f310c055 100644 --- a/src/test/regress/sql/citus_split_shard_by_split_points_negative.sql +++ b/src/test/regress/sql/citus_split_shard_by_split_points_negative.sql @@ -74,6 +74,15 @@ SELECT citus_split_shard_by_split_points( ARRAY['-1073741825'], -- Split point equals shard's max value. ARRAY[:worker_1_node, :worker_2_node]); +-- UDF fails if resulting shard count from split greater than MAX_SHARD_COUNT (64000) +-- 64000 split point definee 64000+1 way split (64001 worker nodes needed). +WITH shard_ranges AS (SELECT ((-2147483648 + indx))::text as split_points, :worker_1_node as node_ids FROM generate_series(1,64000) indx ) +SELECT citus_split_shard_by_split_points( + 49761300, + array_agg(split_points), + array_agg(node_ids) || :worker_1_node) --placement node list should exceed split points by one. +FROM shard_ranges; + -- UDF fails where source shard cannot be split further i.e min and max range is equal. -- Create a Shard where range cannot be split further SELECT isolate_tenant_to_new_shard('table_to_split', 1); From a0084a6590268453a31ea6b772f03402e11dee1f Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Tue, 12 Jul 2022 14:37:46 -0700 Subject: [PATCH 05/11] Test failure fix --- .../expected/multi_cluster_management.out | 37 ------------------- .../worker_split_binary_copy_test.out | 37 +++++++++++++++++++ .../regress/sql/multi_cluster_management.sql | 8 ---- .../sql/worker_split_binary_copy_test.sql | 8 ++++ 4 files changed, 45 insertions(+), 45 deletions(-) diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 1ac2c4965..cb3fe3a5d 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -1258,40 +1258,3 @@ SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHER t (1 row) --- Remove extra nodes added, this causes GetLocalNodeId() to behave incorrectly in other tests. -SELECT master_remove_node('localhost', 8887); - master_remove_node ---------------------------------------------------------------------- - -(1 row) - -SELECT master_remove_node('localhost', 9995); - master_remove_node ---------------------------------------------------------------------- - -(1 row) - -SELECT master_remove_node('localhost', 9992); - master_remove_node ---------------------------------------------------------------------- - -(1 row) - -SELECT master_remove_node('localhost', 9998); - master_remove_node ---------------------------------------------------------------------- - -(1 row) - -SELECT master_remove_node('localhost', 9997); - master_remove_node ---------------------------------------------------------------------- - -(1 row) - -SELECT master_remove_node('localhost', 8888); - master_remove_node ---------------------------------------------------------------------- - -(1 row) - diff --git a/src/test/regress/expected/worker_split_binary_copy_test.out b/src/test/regress/expected/worker_split_binary_copy_test.out index 0453530d1..997918cb6 100644 --- a/src/test/regress/expected/worker_split_binary_copy_test.out +++ b/src/test/regress/expected/worker_split_binary_copy_test.out @@ -3,6 +3,43 @@ SET search_path TO worker_split_binary_copy_test; SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 81060000; +-- Remove extra nodes added, otherwise GetLocalNodeId() does not bahave correctly. +SELECT citus_remove_node('localhost', 8887); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_remove_node('localhost', 9995); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_remove_node('localhost', 9992); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_remove_node('localhost', 9998); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_remove_node('localhost', 9997); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_remove_node('localhost', 8888); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + -- BEGIN: Create distributed table and insert data. CREATE TABLE worker_split_binary_copy_test.shard_to_split_copy ( l_orderkey bigint not null, diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index 3d78f1382..6d9b9c589 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -511,11 +511,3 @@ SELECT start_metadata_sync_to_all_nodes(); -- verify that at the end of this file, all primary nodes have metadata synced SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary'; - --- Remove extra nodes added, this causes GetLocalNodeId() to behave incorrectly in other tests. -SELECT master_remove_node('localhost', 8887); -SELECT master_remove_node('localhost', 9995); -SELECT master_remove_node('localhost', 9992); -SELECT master_remove_node('localhost', 9998); -SELECT master_remove_node('localhost', 9997); -SELECT master_remove_node('localhost', 8888); diff --git a/src/test/regress/sql/worker_split_binary_copy_test.sql b/src/test/regress/sql/worker_split_binary_copy_test.sql index 03426f443..9e969f1d5 100644 --- a/src/test/regress/sql/worker_split_binary_copy_test.sql +++ b/src/test/regress/sql/worker_split_binary_copy_test.sql @@ -4,6 +4,14 @@ SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 81060000; +-- Remove extra nodes added, otherwise GetLocalNodeId() does not bahave correctly. +SELECT citus_remove_node('localhost', 8887); +SELECT citus_remove_node('localhost', 9995); +SELECT citus_remove_node('localhost', 9992); +SELECT citus_remove_node('localhost', 9998); +SELECT citus_remove_node('localhost', 9997); +SELECT citus_remove_node('localhost', 8888); + -- BEGIN: Create distributed table and insert data. CREATE TABLE worker_split_binary_copy_test.shard_to_split_copy ( l_orderkey bigint not null, From 6fc7544f18eac351941422ef6a980582d9f7df02 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Tue, 12 Jul 2022 15:42:20 -0700 Subject: [PATCH 06/11] Remove split_mode and use shard_transfer_mode instead' --- .../citus_split_shard_by_split_points.c | 26 ++++++++++------- .../sql/downgrades/citus--11.1-1--11.0-3.sql | 3 +- .../11.1-1.sql | 16 +++-------- .../latest.sql | 16 +++-------- .../citus_split_shard_by_split_points.out | 6 ++-- ...s_split_shard_by_split_points_negative.out | 19 +++++++++++++ .../isolation_blocking_shard_split.out | 28 +++++++++---------- ...ing_shard_split_with_fkey_to_reference.out | 10 +++---- src/test/regress/expected/multi_extension.out | 3 +- .../expected/upgrade_list_citus_objects.out | 3 +- .../spec/isolation_blocking_shard_split.spec | 4 +-- ...ng_shard_split_with_fkey_to_reference.spec | 2 +- .../sql/citus_split_shard_by_split_points.sql | 6 ++-- ...s_split_shard_by_split_points_negative.sql | 19 +++++++++++++ 14 files changed, 93 insertions(+), 68 deletions(-) diff --git a/src/backend/distributed/operations/citus_split_shard_by_split_points.c b/src/backend/distributed/operations/citus_split_shard_by_split_points.c index da9c12497..d40245f48 100644 --- a/src/backend/distributed/operations/citus_split_shard_by_split_points.c +++ b/src/backend/distributed/operations/citus_split_shard_by_split_points.c @@ -30,12 +30,12 @@ PG_FUNCTION_INFO_V1(citus_split_shard_by_split_points); static SplitMode LookupSplitMode(Oid shardSplitModeOid); /* - * citus_split_shard_by_split_points(shard_id bigint, split_points text[], node_ids integer[], split_mode citus.split_mode) + * citus_split_shard_by_split_points(shard_id bigint, split_points text[], node_ids integer[], shard_transfer_mode citus.shard_transfer_mode) * Split source shard into multiple shards using the given split points. * 'shard_id' is the id of source shard to split. * 'split_points' is an array that represents the split points. * 'node_ids' is an array that represents the placement node ids of the new shards. - * 'split_mode citus.split_mode' is the mode of split. + * 'shard_transfer_mode citus.shard_transfer_mode' is the transfer mode for split. */ Datum citus_split_shard_by_split_points(PG_FUNCTION_ARGS) @@ -51,8 +51,8 @@ citus_split_shard_by_split_points(PG_FUNCTION_ARGS) ArrayType *nodeIdsArrayObject = PG_GETARG_ARRAYTYPE_P(2); List *nodeIdsForPlacementList = IntegerArrayTypeToList(nodeIdsArrayObject); - Oid shardSplitModeOid = PG_GETARG_OID(3); - SplitMode shardSplitMode = LookupSplitMode(shardSplitModeOid); + Oid shardTransferModeOid = PG_GETARG_OID(3); + SplitMode shardSplitMode = LookupSplitMode(shardTransferModeOid); SplitShard( shardSplitMode, @@ -66,25 +66,31 @@ citus_split_shard_by_split_points(PG_FUNCTION_ARGS) /* - * LookupSplitMode maps the oids of citus.shard_split_mode enum - * values to an enum. + * LookupSplitMode maps the oids of citus.shard_transfer_mode to SplitMode enum. */ SplitMode -LookupSplitMode(Oid shardSplitModeOid) +LookupSplitMode(Oid shardTransferModeOid) { SplitMode shardSplitMode = BLOCKING_SPLIT; - Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardSplitModeOid); + Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid); char *enumLabel = DatumGetCString(enumLabelDatum); /* Extend with other modes as we support them */ - if (strncmp(enumLabel, "blocking", NAMEDATALEN) == 0) + if (strncmp(enumLabel, "block_writes", NAMEDATALEN) == 0) { shardSplitMode = BLOCKING_SPLIT; } + else if (strncmp(enumLabel, "auto", NAMEDATALEN) == 0 || + strncmp(enumLabel, "force_logical", NAMEDATALEN) == 0) + { + ereport(ERROR, (errmsg("Shard Tranfer mode: '%s' is not supported. Please use 'block_writes' instead.", + enumLabel))); + } else { - ereport(ERROR, (errmsg("Invalid split mode: %s. Expected split mode is blocking.", + // We will not get here as postgres will validate the enum value. + ereport(ERROR, (errmsg("Invalid shard tranfer mode: '%s'. Expected split mode is 'block_writes'.", enumLabel))); } diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql index f7dc5ca07..90f54c2c5 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql @@ -50,11 +50,10 @@ DROP FUNCTION pg_catalog.citus_split_shard_by_split_points( shard_id bigint, split_points text[], node_ids integer[], - split_mode citus.split_mode); + shard_transfer_mode citus.shard_transfer_mode); DROP FUNCTION pg_catalog.worker_split_copy( source_shard_id bigint, splitCopyInfos citus.split_copy_info[]); -DROP TYPE citus.split_mode; DROP TYPE citus.split_copy_info; #include "../../../columnar/sql/downgrades/columnar--11.1-1--11.0-3.sql" diff --git a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.1-1.sql index 559769260..36624c40e 100644 --- a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.1-1.sql +++ b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.1-1.sql @@ -1,22 +1,14 @@ -DROP TYPE IF EXISTS citus.split_mode; - --- Three modes to be implemented: blocking, non_blocking and auto. --- Currently, the default / only supported mode is blocking. -CREATE TYPE citus.split_mode AS ENUM ( - 'blocking' -); - CREATE OR REPLACE FUNCTION pg_catalog.citus_split_shard_by_split_points( shard_id bigint, split_points text[], -- A 'nodeId' is a uint32 in CITUS [1, 4294967296] but postgres does not have unsigned type support. -- Use integer (consistent with other previously defined UDFs that take nodeId as integer) as for all practical purposes it is big enough. node_ids integer[], - -- Three modes to be implemented: blocking, non_blocking and auto. - -- Currently, the default / only supported mode is blocking. - split_mode citus.split_mode default 'blocking') + -- Three modes to be implemented: block_writes, force_logical and auto. + -- Currently, the default / only supported mode is block_writes. + shard_transfer_mode citus.shard_transfer_mode default 'block_writes') RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$citus_split_shard_by_split_points$$; -COMMENT ON FUNCTION pg_catalog.citus_split_shard_by_split_points(shard_id bigint, split_points text[], nodeIds integer[], citus.split_mode) +COMMENT ON FUNCTION pg_catalog.citus_split_shard_by_split_points(shard_id bigint, split_points text[], nodeIds integer[], citus.shard_transfer_mode) IS 'split a shard using split mode.'; diff --git a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql index 559769260..36624c40e 100644 --- a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql @@ -1,22 +1,14 @@ -DROP TYPE IF EXISTS citus.split_mode; - --- Three modes to be implemented: blocking, non_blocking and auto. --- Currently, the default / only supported mode is blocking. -CREATE TYPE citus.split_mode AS ENUM ( - 'blocking' -); - CREATE OR REPLACE FUNCTION pg_catalog.citus_split_shard_by_split_points( shard_id bigint, split_points text[], -- A 'nodeId' is a uint32 in CITUS [1, 4294967296] but postgres does not have unsigned type support. -- Use integer (consistent with other previously defined UDFs that take nodeId as integer) as for all practical purposes it is big enough. node_ids integer[], - -- Three modes to be implemented: blocking, non_blocking and auto. - -- Currently, the default / only supported mode is blocking. - split_mode citus.split_mode default 'blocking') + -- Three modes to be implemented: block_writes, force_logical and auto. + -- Currently, the default / only supported mode is block_writes. + shard_transfer_mode citus.shard_transfer_mode default 'block_writes') RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$citus_split_shard_by_split_points$$; -COMMENT ON FUNCTION pg_catalog.citus_split_shard_by_split_points(shard_id bigint, split_points text[], nodeIds integer[], citus.split_mode) +COMMENT ON FUNCTION pg_catalog.citus_split_shard_by_split_points(shard_id bigint, split_points text[], nodeIds integer[], citus.shard_transfer_mode) IS 'split a shard using split mode.'; diff --git a/src/test/regress/expected/citus_split_shard_by_split_points.out b/src/test/regress/expected/citus_split_shard_by_split_points.out index 2d008dc24..85bae93e4 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points.out @@ -213,7 +213,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 8981000, ARRAY['-1073741824'], ARRAY[:worker_1_node, :worker_2_node], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -224,7 +224,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 8981001, ARRAY['536870911', '1610612735'], ARRAY[:worker_1_node, :worker_1_node, :worker_2_node], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -386,7 +386,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 8981007, ARRAY['-2100000000'], ARRAY[:worker_1_node, :worker_2_node], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- diff --git a/src/test/regress/expected/citus_split_shard_by_split_points_negative.out b/src/test/regress/expected/citus_split_shard_by_split_points_negative.out index 148e3a142..5986fa74b 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points_negative.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points_negative.out @@ -32,6 +32,25 @@ SELECT create_distributed_table('table_to_split','id'); SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +-- UDF fails for any other shard_transfer_mode other than block_writes. +SELECT citus_split_shard_by_split_points( + 49761302, + ARRAY['50'], + ARRAY[101, 201], + 'auto'); +ERROR: Shard Tranfer mode: 'auto' is not supported. Please use 'block_writes' instead. +SELECT citus_split_shard_by_split_points( + 49761302, + ARRAY['50'], + ARRAY[101, 201], + 'force_logical'); +ERROR: Shard Tranfer mode: 'force_logical' is not supported. Please use 'block_writes' instead. +SELECT citus_split_shard_by_split_points( + 49761302, + ARRAY['50'], + ARRAY[101, 201], + 'gibberish'); +ERROR: invalid input value for enum citus.shard_transfer_mode: "gibberish" -- UDF fails for range partitioned tables. SELECT citus_split_shard_by_split_points( 60761300, diff --git a/src/test/regress/expected/isolation_blocking_shard_split.out b/src/test/regress/expected/isolation_blocking_shard_split.out index ff3c250fd..02a23174e 100644 --- a/src/test/regress/expected/isolation_blocking_shard_split.out +++ b/src/test/regress/expected/isolation_blocking_shard_split.out @@ -42,7 +42,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -126,7 +126,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -200,7 +200,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -280,7 +280,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -359,7 +359,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -439,7 +439,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -509,7 +509,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -585,7 +585,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -657,7 +657,7 @@ step s1-blocking-shard-split: 1500001, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -669,7 +669,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); step s1-commit: COMMIT; @@ -732,7 +732,7 @@ step s1-blocking-shard-split: 1500001, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -744,7 +744,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); step s1-commit: COMMIT; @@ -812,7 +812,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -895,7 +895,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- diff --git a/src/test/regress/expected/isolation_blocking_shard_split_with_fkey_to_reference.out b/src/test/regress/expected/isolation_blocking_shard_split_with_fkey_to_reference.out index 410b9c2a0..9a6ed53eb 100644 --- a/src/test/regress/expected/isolation_blocking_shard_split_with_fkey_to_reference.out +++ b/src/test/regress/expected/isolation_blocking_shard_split_with_fkey_to_reference.out @@ -20,7 +20,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -80,7 +80,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -140,7 +140,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -200,7 +200,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -260,7 +260,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 19004dd8f..ce728db90 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1095,7 +1095,7 @@ SELECT * FROM multi_extension.print_extension_changes(); table columnar.chunk_group | table columnar.options | table columnar.stripe | - | function citus_split_shard_by_split_points(bigint,text[],integer[],citus.split_mode) void + | function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void | function columnar.get_storage_id(regclass) bigint | function columnar_internal.columnar_handler(internal) table_am_handler | function worker_split_copy(bigint,citus.split_copy_info[]) void @@ -1106,7 +1106,6 @@ SELECT * FROM multi_extension.print_extension_changes(); | table columnar_internal.options | table columnar_internal.stripe | type citus.split_copy_info - | type citus.split_mode | view columnar.chunk | view columnar.chunk_group | view columnar.options diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 6063ac3ec..614d95a9b 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -113,7 +113,7 @@ ORDER BY 1; function citus_shard_indexes_on_worker() function citus_shard_sizes() function citus_shards_on_worker() - function citus_split_shard_by_split_points(bigint,text[],integer[],citus.split_mode) + function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) function citus_stat_activity() function citus_stat_statements() function citus_stat_statements_reset() @@ -272,7 +272,6 @@ ORDER BY 1; type citus.distribution_type type citus.shard_transfer_mode type citus.split_copy_info - type citus.split_mode type citus_copy_format type noderole view citus_dist_stat_activity diff --git a/src/test/regress/spec/isolation_blocking_shard_split.spec b/src/test/regress/spec/isolation_blocking_shard_split.spec index a06824886..ddac66f5b 100644 --- a/src/test/regress/spec/isolation_blocking_shard_split.spec +++ b/src/test/regress/spec/isolation_blocking_shard_split.spec @@ -70,7 +70,7 @@ step "s1-blocking-shard-split" 1500001, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); } step "s1-commit" @@ -91,7 +91,7 @@ step "s2-blocking-shard-split" 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); } step "s2-commit" diff --git a/src/test/regress/spec/isolation_blocking_shard_split_with_fkey_to_reference.spec b/src/test/regress/spec/isolation_blocking_shard_split_with_fkey_to_reference.spec index 243d8ef05..49b56c4a5 100644 --- a/src/test/regress/spec/isolation_blocking_shard_split_with_fkey_to_reference.spec +++ b/src/test/regress/spec/isolation_blocking_shard_split_with_fkey_to_reference.spec @@ -67,7 +67,7 @@ step "s2-blocking-shard-split" 1500002, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); } step "s2-add-fkey" diff --git a/src/test/regress/sql/citus_split_shard_by_split_points.sql b/src/test/regress/sql/citus_split_shard_by_split_points.sql index 7448a488a..ec553c0da 100644 --- a/src/test/regress/sql/citus_split_shard_by_split_points.sql +++ b/src/test/regress/sql/citus_split_shard_by_split_points.sql @@ -132,14 +132,14 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 8981000, ARRAY['-1073741824'], ARRAY[:worker_1_node, :worker_2_node], - 'blocking'); + 'block_writes'); -- Perform 3 way split SELECT pg_catalog.citus_split_shard_by_split_points( 8981001, ARRAY['536870911', '1610612735'], ARRAY[:worker_1_node, :worker_1_node, :worker_2_node], - 'blocking'); + 'block_writes'); -- END : Split two shards : One with move and One without move. -- BEGIN : Move a shard post split. @@ -209,7 +209,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 8981007, ARRAY['-2100000000'], ARRAY[:worker_1_node, :worker_2_node], - 'blocking'); + 'block_writes'); SET search_path TO "citus_split_test_schema"; SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport diff --git a/src/test/regress/sql/citus_split_shard_by_split_points_negative.sql b/src/test/regress/sql/citus_split_shard_by_split_points_negative.sql index 7f310c055..e730a8c28 100644 --- a/src/test/regress/sql/citus_split_shard_by_split_points_negative.sql +++ b/src/test/regress/sql/citus_split_shard_by_split_points_negative.sql @@ -23,6 +23,25 @@ SELECT create_distributed_table('table_to_split','id'); SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +-- UDF fails for any other shard_transfer_mode other than block_writes. +SELECT citus_split_shard_by_split_points( + 49761302, + ARRAY['50'], + ARRAY[101, 201], + 'auto'); + +SELECT citus_split_shard_by_split_points( + 49761302, + ARRAY['50'], + ARRAY[101, 201], + 'force_logical'); + +SELECT citus_split_shard_by_split_points( + 49761302, + ARRAY['50'], + ARRAY[101, 201], + 'gibberish'); + -- UDF fails for range partitioned tables. SELECT citus_split_shard_by_split_points( 60761300, From 8244db7d7c73a001634d209adf0c4530c5950990 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Tue, 12 Jul 2022 15:54:04 -0700 Subject: [PATCH 07/11] Fix test failure --- .../citus_split_shard_by_split_points.c | 16 +++++++++------- .../expected/upgrade_list_citus_objects.out | 2 +- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/operations/citus_split_shard_by_split_points.c b/src/backend/distributed/operations/citus_split_shard_by_split_points.c index d40245f48..11e979900 100644 --- a/src/backend/distributed/operations/citus_split_shard_by_split_points.c +++ b/src/backend/distributed/operations/citus_split_shard_by_split_points.c @@ -27,7 +27,7 @@ /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_split_shard_by_split_points); -static SplitMode LookupSplitMode(Oid shardSplitModeOid); +static SplitMode LookupSplitMode(Oid shardTransferModeOid); /* * citus_split_shard_by_split_points(shard_id bigint, split_points text[], node_ids integer[], shard_transfer_mode citus.shard_transfer_mode) @@ -82,16 +82,18 @@ LookupSplitMode(Oid shardTransferModeOid) shardSplitMode = BLOCKING_SPLIT; } else if (strncmp(enumLabel, "auto", NAMEDATALEN) == 0 || - strncmp(enumLabel, "force_logical", NAMEDATALEN) == 0) + strncmp(enumLabel, "force_logical", NAMEDATALEN) == 0) { - ereport(ERROR, (errmsg("Shard Tranfer mode: '%s' is not supported. Please use 'block_writes' instead.", - enumLabel))); + ereport(ERROR, (errmsg( + "Shard Tranfer mode: '%s' is not supported. Please use 'block_writes' instead.", + enumLabel))); } else { - // We will not get here as postgres will validate the enum value. - ereport(ERROR, (errmsg("Invalid shard tranfer mode: '%s'. Expected split mode is 'block_writes'.", - enumLabel))); + /* We will not get here as postgres will validate the enum value. */ + ereport(ERROR, (errmsg( + "Invalid shard tranfer mode: '%s'. Expected split mode is 'block_writes'.", + enumLabel))); } return shardSplitMode; diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 614d95a9b..fb0ff32cb 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -289,5 +289,5 @@ ORDER BY 1; view columnar.stripe view pg_dist_shard_placement view time_partitions -(274 rows) +(273 rows) From 8f56a30a215bc9eeaa808161a90b8db4d1be434d Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Tue, 12 Jul 2022 16:01:30 -0700 Subject: [PATCH 08/11] Fix test failure --- src/test/regress/expected/multi_extension.out | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index ce728db90..902f0870b 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1111,7 +1111,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | view columnar.options | view columnar.storage | view columnar.stripe -(31 rows) +(30 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version From aca0a03d21967b987e60004398ee1fae5f3441c2 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Wed, 13 Jul 2022 14:09:02 -0700 Subject: [PATCH 09/11] Fixing permission issue when splitting non-superuser owned tables --- src/backend/distributed/operations/shard_split.c | 6 +++--- .../operations/worker_split_copy_udf.c | 12 ++++++------ .../sql/downgrades/citus--11.1-1--11.0-3.sql | 4 ++-- .../sql/udfs/worker_split_copy/11.1-1.sql | 9 +++++++-- .../sql/udfs/worker_split_copy/latest.sql | 9 +++++++-- .../citus_split_shard_by_split_points.out | 5 +++++ src/test/regress/expected/multi_extension.out | 4 ++-- .../expected/upgrade_list_citus_objects.out | 4 ++-- .../expected/worker_shard_binary_copy_test.out | 8 ++++---- .../expected/worker_shard_text_copy_test.out | 8 ++++---- .../expected/worker_split_binary_copy_test.out | 8 ++++---- .../regress/expected/worker_split_copy_test.out | 16 ++++++++-------- .../expected/worker_split_text_copy_test.out | 8 ++++---- .../sql/citus_split_shard_by_split_points.sql | 7 +++++++ .../sql/worker_split_binary_copy_test.sql | 8 ++++---- src/test/regress/sql/worker_split_copy_test.sql | 12 ++++++------ .../regress/sql/worker_split_text_copy_test.sql | 8 ++++---- 17 files changed, 79 insertions(+), 57 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 3578b00e0..9c0b6a26d 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -668,12 +668,12 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, * ROW(81060015, -- destination shard id * -2147483648, -- split range begin * 1073741823, --split range end - * 10 -- worker node id)::citus.split_copy_info, + * 10 -- worker node id)::pg_catalog.split_copy_info, * -- split copy info for split children 2 * ROW(81060016, --destination shard id * 1073741824, --split range begin * 2147483647, --split range end - * 11 -- workef node id)::citus.split_copy_info + * 11 -- workef node id)::pg_catalog.split_copy_info * ] * ); */ @@ -698,7 +698,7 @@ CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, StringInfo splitCopyInfoRow = makeStringInfo(); appendStringInfo(splitCopyInfoRow, - "ROW(%lu, %d, %d, %u)::citus.split_copy_info", + "ROW(%lu, %d, %d, %u)::pg_catalog.split_copy_info", splitChildShardInterval->shardId, DatumGetInt32(splitChildShardInterval->minValue), DatumGetInt32(splitChildShardInterval->maxValue), diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index 17efac4c0..2b33654f9 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -43,7 +43,7 @@ static void BuildMinMaxRangeArrays(List *splitCopyInfoList, ArrayType **minValue ArrayType **maxValueArray); /* - * worker_split_copy(source_shard_id bigint, splitCopyInfo citus.split_copy_info[]) + * worker_split_copy(source_shard_id bigint, splitCopyInfo pg_catalog.split_copy_info[]) * UDF to split copy shard to list of destination shards. * 'source_shard_id' : Source ShardId to split copy. * 'splitCopyInfos' : Array of Split Copy Info (destination_shard's id, min/max ranges and node_id) @@ -60,7 +60,7 @@ worker_split_copy(PG_FUNCTION_ARGS) { ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), errmsg( - "citus.split_copy_info array cannot contain null values"))); + "pg_catalog.split_copy_info array cannot contain null values"))); } const int slice_ndim = 0; @@ -121,7 +121,7 @@ ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo) if (isnull) { ereport(ERROR, (errmsg( - "destination_shard_id for split_copy_info cannot be null."))); + "destination_shard_id for pg_catalog.split_copy_info cannot be null."))); } copyInfo->destinationShardId = DatumGetUInt64(destinationShardIdDatum); @@ -130,7 +130,7 @@ ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo) if (isnull) { ereport(ERROR, (errmsg( - "destination_shard_min_value for split_copy_info cannot be null."))); + "destination_shard_min_value for pg_catalog.split_copy_info cannot be null."))); } copyInfo->destinationShardMinHashValue = minValueDatum; @@ -139,7 +139,7 @@ ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo) if (isnull) { ereport(ERROR, (errmsg( - "destination_shard_max_value for split_copy_info cannot be null."))); + "destination_shard_max_value for pg_catalog.split_copy_info cannot be null."))); } copyInfo->destinationShardMaxHashValue = maxValueDatum; @@ -148,7 +148,7 @@ ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo) if (isnull) { ereport(ERROR, (errmsg( - "destination_shard_node_id for split_copy_info cannot be null."))); + "destination_shard_node_id for pg_catalog.split_copy_info cannot be null."))); } copyInfo->destinationShardNodeId = DatumGetInt32(nodeIdDatum); diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql index 90f54c2c5..c1cf1afa4 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql @@ -53,8 +53,8 @@ DROP FUNCTION pg_catalog.citus_split_shard_by_split_points( shard_transfer_mode citus.shard_transfer_mode); DROP FUNCTION pg_catalog.worker_split_copy( source_shard_id bigint, - splitCopyInfos citus.split_copy_info[]); -DROP TYPE citus.split_copy_info; + splitCopyInfos pg_catalog.split_copy_info[]); +DROP TYPE pg_catalog.split_copy_info; #include "../../../columnar/sql/downgrades/columnar--11.1-1--11.0-3.sql" diff --git a/src/backend/distributed/sql/udfs/worker_split_copy/11.1-1.sql b/src/backend/distributed/sql/udfs/worker_split_copy/11.1-1.sql index b9c5869d2..0ecad4a07 100644 --- a/src/backend/distributed/sql/udfs/worker_split_copy/11.1-1.sql +++ b/src/backend/distributed/sql/udfs/worker_split_copy/11.1-1.sql @@ -1,3 +1,7 @@ +-- We want to create the type in pg_catalog but doing that leads to an error +-- "ERROR: permission denied to create "pg_catalog.split_copy_info" +-- "DETAIL: System catalog modifications are currently disallowed. "" +-- As a workaround, we create the type in the citus schema and then later modify it to pg_catalog. DROP TYPE IF EXISTS citus.split_copy_info; CREATE TYPE citus.split_copy_info AS ( destination_shard_id bigint, @@ -6,12 +10,13 @@ CREATE TYPE citus.split_copy_info AS ( -- A 'nodeId' is a uint32 in CITUS [1, 4294967296] but postgres does not have unsigned type support. -- Use integer (consistent with other previously defined UDFs that take nodeId as integer) as for all practical purposes it is big enough. destination_shard_node_id integer); +ALTER TYPE citus.split_copy_info SET SCHEMA pg_catalog; CREATE OR REPLACE FUNCTION pg_catalog.worker_split_copy( source_shard_id bigint, - splitCopyInfos citus.split_copy_info[]) + splitCopyInfos pg_catalog.split_copy_info[]) RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_copy$$; -COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos citus.split_copy_info[]) +COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos pg_catalog.split_copy_info[]) IS 'Perform split copy for shard'; diff --git a/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql b/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql index b9c5869d2..0ecad4a07 100644 --- a/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql +++ b/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql @@ -1,3 +1,7 @@ +-- We want to create the type in pg_catalog but doing that leads to an error +-- "ERROR: permission denied to create "pg_catalog.split_copy_info" +-- "DETAIL: System catalog modifications are currently disallowed. "" +-- As a workaround, we create the type in the citus schema and then later modify it to pg_catalog. DROP TYPE IF EXISTS citus.split_copy_info; CREATE TYPE citus.split_copy_info AS ( destination_shard_id bigint, @@ -6,12 +10,13 @@ CREATE TYPE citus.split_copy_info AS ( -- A 'nodeId' is a uint32 in CITUS [1, 4294967296] but postgres does not have unsigned type support. -- Use integer (consistent with other previously defined UDFs that take nodeId as integer) as for all practical purposes it is big enough. destination_shard_node_id integer); +ALTER TYPE citus.split_copy_info SET SCHEMA pg_catalog; CREATE OR REPLACE FUNCTION pg_catalog.worker_split_copy( source_shard_id bigint, - splitCopyInfos citus.split_copy_info[]) + splitCopyInfos pg_catalog.split_copy_info[]) RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_copy$$; -COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos citus.split_copy_info[]) +COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos pg_catalog.split_copy_info[]) IS 'Perform split copy for shard'; diff --git a/src/test/regress/expected/citus_split_shard_by_split_points.out b/src/test/regress/expected/citus_split_shard_by_split_points.out index 85bae93e4..743996160 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points.out @@ -11,6 +11,9 @@ Here is a high level overview of test plan: 8. Split an already split shard second time on a different schema. */ CREATE SCHEMA "citus_split_test_schema"; +CREATE ROLE test_split_role WITH LOGIN; +GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role; +SET ROLE test_split_role; SET search_path TO "citus_split_test_schema"; SET citus.next_shard_id TO 8981000; SET citus.next_placement_id TO 8610000; @@ -193,6 +196,7 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node -- END : Display current state -- BEGIN : Move one shard before we split it. \c - postgres - :master_port +SET ROLE test_split_role; SET search_path TO "citus_split_test_schema"; SET citus.next_shard_id TO 8981007; SET citus.defer_drop_after_shard_move TO OFF; @@ -372,6 +376,7 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node -- END : Display current state -- BEGIN: Should be able to change/drop constraints \c - postgres - :master_port +SET ROLE test_split_role; SET search_path TO "citus_split_test_schema"; ALTER INDEX index_on_sensors RENAME TO index_on_sensors_renamed; ALTER INDEX index_on_sensors_renamed ALTER COLUMN 1 SET STATISTICS 200; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 902f0870b..c7d13899a 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1098,14 +1098,14 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void | function columnar.get_storage_id(regclass) bigint | function columnar_internal.columnar_handler(internal) table_am_handler - | function worker_split_copy(bigint,citus.split_copy_info[]) void + | function worker_split_copy(bigint,pg_catalog.split_copy_info[]) void | schema columnar_internal | sequence columnar_internal.storageid_seq | table columnar_internal.chunk | table columnar_internal.chunk_group | table columnar_internal.options | table columnar_internal.stripe - | type citus.split_copy_info + | type pg_catalog.split_copy_info | view columnar.chunk | view columnar.chunk_group | view columnar.options diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index fb0ff32cb..7e8990497 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -242,7 +242,7 @@ ORDER BY 1; function worker_partitioned_table_size(regclass) function worker_record_sequence_dependency(regclass,regclass,name) function worker_save_query_explain_analyze(text,jsonb) - function worker_split_copy(bigint,citus.split_copy_info[]) + function worker_split_copy(bigint,pg_catalog.split_copy_info[]) schema citus schema citus_internal schema columnar @@ -271,7 +271,7 @@ ORDER BY 1; table pg_dist_transaction type citus.distribution_type type citus.shard_transfer_mode - type citus.split_copy_info + type pg_catalog.split_copy_info type citus_copy_format type noderole view citus_dist_stat_activity diff --git a/src/test/regress/expected/worker_shard_binary_copy_test.out b/src/test/regress/expected/worker_shard_binary_copy_test.out index 800d06839..fc9b2cd86 100644 --- a/src/test/regress/expected/worker_shard_binary_copy_test.out +++ b/src/test/regress/expected/worker_shard_binary_copy_test.out @@ -154,12 +154,12 @@ SELECT * from worker_split_copy( ROW(81060015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81060016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); worker_split_copy @@ -176,12 +176,12 @@ SELECT * from worker_split_copy( ROW(81060015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_2_node)::citus.split_copy_info, + :worker_2_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81060016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_2_node)::citus.split_copy_info + :worker_2_node)::pg_catalog.split_copy_info ] ); worker_split_copy diff --git a/src/test/regress/expected/worker_shard_text_copy_test.out b/src/test/regress/expected/worker_shard_text_copy_test.out index 1ed623ad6..52b26cbb9 100644 --- a/src/test/regress/expected/worker_shard_text_copy_test.out +++ b/src/test/regress/expected/worker_shard_text_copy_test.out @@ -154,12 +154,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); worker_split_copy @@ -176,12 +176,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_2_node)::citus.split_copy_info, + :worker_2_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_2_node)::citus.split_copy_info + :worker_2_node)::pg_catalog.split_copy_info ] ); worker_split_copy diff --git a/src/test/regress/expected/worker_split_binary_copy_test.out b/src/test/regress/expected/worker_split_binary_copy_test.out index 997918cb6..07dacbdb1 100644 --- a/src/test/regress/expected/worker_split_binary_copy_test.out +++ b/src/test/regress/expected/worker_split_binary_copy_test.out @@ -191,12 +191,12 @@ SELECT * from worker_split_copy( ROW(81060015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81060016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); worker_split_copy @@ -213,12 +213,12 @@ SELECT * from worker_split_copy( ROW(81060015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_2_node)::citus.split_copy_info, + :worker_2_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81060016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_2_node)::citus.split_copy_info + :worker_2_node)::pg_catalog.split_copy_info ] ); worker_split_copy diff --git a/src/test/regress/expected/worker_split_copy_test.out b/src/test/regress/expected/worker_split_copy_test.out index 60c145069..c17ef5aa4 100644 --- a/src/test/regress/expected/worker_split_copy_test.out +++ b/src/test/regress/expected/worker_split_copy_test.out @@ -59,12 +59,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin -1073741824, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id -1073741823, --split range begin -1, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); ERROR: could not find valid entry for shard xxxxx @@ -82,9 +82,9 @@ ERROR: function worker_split_copy(integer, text[]) does not exist HINT: No function matches the given name and argument types. You might need to add explicit type casts. SELECT * from worker_split_copy( 81070000, -- source shard id to copy - ARRAY[NULL::citus.split_copy_info]-- empty array + ARRAY[NULL::pg_catalog.split_copy_info]-- empty array ); -ERROR: citus.split_copy_info array cannot contain null values +ERROR: pg_catalog.split_copy_info array cannot contain null values SELECT * from worker_split_copy( 81070000, -- source shard id to copy ARRAY[ROW(NULL)]-- empty array @@ -93,9 +93,9 @@ ERROR: function worker_split_copy(integer, record[]) does not exist HINT: No function matches the given name and argument types. You might need to add explicit type casts. SELECT * from worker_split_copy( 81070000, -- source shard id to copy - ARRAY[ROW(NULL, NULL, NULL, NULL)::citus.split_copy_info] -- empty array + ARRAY[ROW(NULL, NULL, NULL, NULL)::pg_catalog.split_copy_info] -- empty array ); -ERROR: destination_shard_id for split_copy_info cannot be null. +ERROR: destination_shard_id for pg_catalog.split_copy_info cannot be null. -- END: Test Negative scenario -- BEGIN: Trigger 2-way local shard split copy. -- Ensure we will perform text copy. @@ -107,12 +107,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin -1073741824, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id -1073741823, --split range begin -1, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); worker_split_copy diff --git a/src/test/regress/expected/worker_split_text_copy_test.out b/src/test/regress/expected/worker_split_text_copy_test.out index 80aad97d8..164d3a6d7 100644 --- a/src/test/regress/expected/worker_split_text_copy_test.out +++ b/src/test/regress/expected/worker_split_text_copy_test.out @@ -154,12 +154,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); worker_split_copy @@ -176,12 +176,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_2_node)::citus.split_copy_info, + :worker_2_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_2_node)::citus.split_copy_info + :worker_2_node)::pg_catalog.split_copy_info ] ); worker_split_copy diff --git a/src/test/regress/sql/citus_split_shard_by_split_points.sql b/src/test/regress/sql/citus_split_shard_by_split_points.sql index ec553c0da..6c2957953 100644 --- a/src/test/regress/sql/citus_split_shard_by_split_points.sql +++ b/src/test/regress/sql/citus_split_shard_by_split_points.sql @@ -12,6 +12,11 @@ Here is a high level overview of test plan: */ CREATE SCHEMA "citus_split_test_schema"; + +CREATE ROLE test_split_role WITH LOGIN; +GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role; +SET ROLE test_split_role; + SET search_path TO "citus_split_test_schema"; SET citus.next_shard_id TO 8981000; SET citus.next_placement_id TO 8610000; @@ -114,6 +119,7 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node -- BEGIN : Move one shard before we split it. \c - postgres - :master_port +SET ROLE test_split_role; SET search_path TO "citus_split_test_schema"; SET citus.next_shard_id TO 8981007; SET citus.defer_drop_after_shard_move TO OFF; @@ -194,6 +200,7 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node -- BEGIN: Should be able to change/drop constraints \c - postgres - :master_port +SET ROLE test_split_role; SET search_path TO "citus_split_test_schema"; ALTER INDEX index_on_sensors RENAME TO index_on_sensors_renamed; ALTER INDEX index_on_sensors_renamed ALTER COLUMN 1 SET STATISTICS 200; diff --git a/src/test/regress/sql/worker_split_binary_copy_test.sql b/src/test/regress/sql/worker_split_binary_copy_test.sql index 9e969f1d5..a47e968bd 100644 --- a/src/test/regress/sql/worker_split_binary_copy_test.sql +++ b/src/test/regress/sql/worker_split_binary_copy_test.sql @@ -165,12 +165,12 @@ SELECT * from worker_split_copy( ROW(81060015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81060016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); -- END: Trigger 2-way local shard split copy. @@ -183,12 +183,12 @@ SELECT * from worker_split_copy( ROW(81060015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_2_node)::citus.split_copy_info, + :worker_2_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81060016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_2_node)::citus.split_copy_info + :worker_2_node)::pg_catalog.split_copy_info ] ); -- END: Trigger 2-way remote shard split copy. diff --git a/src/test/regress/sql/worker_split_copy_test.sql b/src/test/regress/sql/worker_split_copy_test.sql index ff27faae1..b799eb305 100644 --- a/src/test/regress/sql/worker_split_copy_test.sql +++ b/src/test/regress/sql/worker_split_copy_test.sql @@ -43,12 +43,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin -1073741824, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id -1073741823, --split range begin -1, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); @@ -64,7 +64,7 @@ SELECT * from worker_split_copy( SELECT * from worker_split_copy( 81070000, -- source shard id to copy - ARRAY[NULL::citus.split_copy_info]-- empty array + ARRAY[NULL::pg_catalog.split_copy_info]-- empty array ); SELECT * from worker_split_copy( @@ -74,7 +74,7 @@ SELECT * from worker_split_copy( SELECT * from worker_split_copy( 81070000, -- source shard id to copy - ARRAY[ROW(NULL, NULL, NULL, NULL)::citus.split_copy_info] -- empty array + ARRAY[ROW(NULL, NULL, NULL, NULL)::pg_catalog.split_copy_info] -- empty array ); -- END: Test Negative scenario @@ -88,12 +88,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin -1073741824, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id -1073741823, --split range begin -1, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); -- END: Trigger 2-way local shard split copy. diff --git a/src/test/regress/sql/worker_split_text_copy_test.sql b/src/test/regress/sql/worker_split_text_copy_test.sql index 1fa5d476e..10791a66d 100644 --- a/src/test/regress/sql/worker_split_text_copy_test.sql +++ b/src/test/regress/sql/worker_split_text_copy_test.sql @@ -157,12 +157,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); -- END: Trigger 2-way local shard split copy. @@ -175,12 +175,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_2_node)::citus.split_copy_info, + :worker_2_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_2_node)::citus.split_copy_info + :worker_2_node)::pg_catalog.split_copy_info ] ); -- END: Trigger 2-way remote shard split copy. From e87a8cd3037804a5d01ee1cc72e35117053629d7 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Wed, 13 Jul 2022 14:19:20 -0700 Subject: [PATCH 10/11] Fix test expected output --- src/test/regress/expected/multi_extension.out | 4 ++-- src/test/regress/expected/upgrade_list_citus_objects.out | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index c7d13899a..79dafe9db 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1098,14 +1098,14 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void | function columnar.get_storage_id(regclass) bigint | function columnar_internal.columnar_handler(internal) table_am_handler - | function worker_split_copy(bigint,pg_catalog.split_copy_info[]) void + | function worker_split_copy(bigint,split_copy_info[]) void | schema columnar_internal | sequence columnar_internal.storageid_seq | table columnar_internal.chunk | table columnar_internal.chunk_group | table columnar_internal.options | table columnar_internal.stripe - | type pg_catalog.split_copy_info + | type split_copy_info | view columnar.chunk | view columnar.chunk_group | view columnar.options diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 7e8990497..a7254bdc3 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -242,7 +242,7 @@ ORDER BY 1; function worker_partitioned_table_size(regclass) function worker_record_sequence_dependency(regclass,regclass,name) function worker_save_query_explain_analyze(text,jsonb) - function worker_split_copy(bigint,pg_catalog.split_copy_info[]) + function worker_split_copy(bigint,split_copy_info[]) schema citus schema citus_internal schema columnar @@ -271,9 +271,9 @@ ORDER BY 1; table pg_dist_transaction type citus.distribution_type type citus.shard_transfer_mode - type pg_catalog.split_copy_info type citus_copy_format type noderole + type split_copy_info view citus_dist_stat_activity view citus_lock_waits view citus_schema.citus_tables From 4b89493e36236575c0638a05896307c4ec359655 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Wed, 13 Jul 2022 14:29:26 -0700 Subject: [PATCH 11/11] Remove extra space --- src/test/regress/expected/multi_extension.out | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 79dafe9db..d35f84e3d 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1105,7 +1105,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | table columnar_internal.chunk_group | table columnar_internal.options | table columnar_internal.stripe - | type split_copy_info + | type split_copy_info | view columnar.chunk | view columnar.chunk_group | view columnar.options