diff --git a/src/backend/distributed/operations/citus_split_shard_by_split_points.c b/src/backend/distributed/operations/citus_split_shard_by_split_points.c index ca1ee5cca..07e273544 100644 --- a/src/backend/distributed/operations/citus_split_shard_by_split_points.c +++ b/src/backend/distributed/operations/citus_split_shard_by_split_points.c @@ -27,15 +27,15 @@ /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_split_shard_by_split_points); -static SplitMode LookupSplitMode(Oid shardSplitModeOid); +static SplitMode LookupSplitMode(Oid shardTransferModeOid); /* - * citus_split_shard_by_split_points(shard_id bigint, split_points text[], node_ids integer[], split_mode citus.split_mode) + * citus_split_shard_by_split_points(shard_id bigint, split_points text[], node_ids integer[], shard_transfer_mode citus.shard_transfer_mode) * Split source shard into multiple shards using the given split points. * 'shard_id' is the id of source shard to split. * 'split_points' is an array that represents the split points. * 'node_ids' is an array that represents the placement node ids of the new shards. - * 'split_mode citus.split_mode' is the mode of split. + * 'shard_transfer_mode citus.shard_transfer_mode' is the transfer mode for split. */ Datum citus_split_shard_by_split_points(PG_FUNCTION_ARGS) @@ -51,8 +51,8 @@ citus_split_shard_by_split_points(PG_FUNCTION_ARGS) ArrayType *nodeIdsArrayObject = PG_GETARG_ARRAYTYPE_P(2); List *nodeIdsForPlacementList = IntegerArrayTypeToList(nodeIdsArrayObject); - Oid shardSplitModeOid = PG_GETARG_OID(3); - SplitMode shardSplitMode = LookupSplitMode(shardSplitModeOid); + Oid shardTransferModeOid = PG_GETARG_OID(3); + SplitMode shardSplitMode = LookupSplitMode(shardTransferModeOid); SplitShard( shardSplitMode, @@ -66,31 +66,33 @@ citus_split_shard_by_split_points(PG_FUNCTION_ARGS) /* - * LookupSplitMode maps the oids of citus.shard_split_mode enum - * values to an enum. + * LookupSplitMode maps the oids of citus.shard_transfer_mode to SplitMode enum. */ SplitMode -LookupSplitMode(Oid shardSplitModeOid) +LookupSplitMode(Oid shardTransferModeOid) { SplitMode shardSplitMode = BLOCKING_SPLIT; - Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardSplitModeOid); + Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid); char *enumLabel = DatumGetCString(enumLabelDatum); /* Extend with other modes as we support them */ - if (strncmp(enumLabel, "blocking", NAMEDATALEN) == 0) + if (strncmp(enumLabel, "block_writes", NAMEDATALEN) == 0) { shardSplitMode = BLOCKING_SPLIT; } - else if (strncmp(enumLabel, "non_blocking", NAMEDATALEN) == 0) + /* Extend with other modes as we support them */ + else if (strncmp(enumLabel, "auto", NAMEDATALEN) == 0 || + strncmp(enumLabel, "force_logical", NAMEDATALEN) == 0) { shardSplitMode = NON_BLOCKING_SPLIT; } - /* Extend with other modes as we support them */ else { - ereport(ERROR, (errmsg("Invalid split mode: %s. Expected split mode is blocking.", - enumLabel))); + /* We will not get here as postgres will validate the enum value. */ + ereport(ERROR, (errmsg( + "Invalid shard tranfer mode: '%s'. Expected split mode is 'block_writes/auto/force_logical'.", + enumLabel))); } return shardSplitMode; diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index c98907875..e91a8ba90 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -17,6 +17,7 @@ #include "lib/stringinfo.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "distributed/shared_library_init.h" #include "distributed/adaptive_executor.h" #include "distributed/colocation_utils.h" #include "distributed/metadata_cache.h" @@ -35,6 +36,7 @@ #include "distributed/multi_physical_planner.h" #include "commands/dbcommands.h" #include "distributed/shardsplit_logical_replication.h" +#include "distributed/deparse_shard_query.h" /* Function declarations */ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, @@ -89,6 +91,8 @@ static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, List *workersForPlacementList); +static Task * CreateTaskForDDLCommandList(uint64 jobId, List *ddlCommandList, + WorkerNode *workerNode); /* Customize error message strings based on operation type */ static const char *const SplitOperationName[] = @@ -239,7 +243,8 @@ ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, int splitPointsCount = list_length(shardSplitPointsList); int nodeIdsCount = list_length(nodeIdsForPlacementList); - if (nodeIdsCount != splitPointsCount + 1) + int shardsCount = splitPointsCount + 1; + if (nodeIdsCount != shardsCount) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -250,6 +255,12 @@ ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, splitPointsCount))); } + if (shardsCount > MAX_SHARD_COUNT) + { + ereport(ERROR, (errmsg( + "Resulting shard count '%d' with split is greater than max shard count '%d' limit.", + shardsCount, MAX_SHARD_COUNT))); + } Assert(shardIntervalToSplit->minValueExists); Assert(shardIntervalToSplit->maxValueExists); @@ -373,7 +384,7 @@ SplitShard(SplitMode splitMode, shardSplitPointsList, nodeIdsForPlacementList); - List *workersForPlacementList = NULL; + List *workersForPlacementList = NIL; Datum nodeId; foreach_int(nodeId, nodeIdsForPlacementList) { @@ -494,13 +505,19 @@ static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { - /* Iterate on shard interval list for shard group */ - List *shardIntervalList = NULL; + /* + * Iterate over all the shards in the shard group. + */ + List *shardIntervalList = NIL; foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { - /* Iterate on split shard interval list and corresponding placement worker */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; + + /* + * Iterate on split shards DDL command list for a given shard + * and create them on corresponding workerPlacementNode. + */ forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, workersForPlacementList) { @@ -520,6 +537,24 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, } +/* Create a DDL task with corresponding task list on given worker node */ +static Task * +CreateTaskForDDLCommandList(uint64 jobId, List *ddlCommandList, WorkerNode *workerNode) +{ + Task *ddlTask = CitusMakeNode(Task); + ddlTask->jobId = jobId; + ddlTask->taskType = DDL_TASK; + ddlTask->replicationModel = REPLICATION_MODEL_INVALID; + SetTaskQueryStringList(ddlTask, ddlCommandList); + + ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); + SetPlacementNodeMetadata(taskPlacement, workerNode); + ddlTask->taskPlacementList = list_make1(taskPlacement); + + return ddlTask; +} + + /* Create ShardGroup auxiliary structures (indexes, stats, replicaindentities, triggers) * on a list of corresponding workers. */ @@ -527,29 +562,52 @@ void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { + List *shardIntervalList = NIL; + List *ddlTaskExecList = NIL; + /* - * Create auxiliary structures post copy. + * Iterate over all the shards in the shard group. */ - List *shardIntervalList = NULL; foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { - /* Iterate on split shard interval list and corresponding placement worker */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; + + /* + * Iterate on split shard interval list for given shard and create tasks + * for every single split shard in a shard group. + */ forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, workersForPlacementList) { - List *indexCommandList = GetPostLoadTableCreationCommands( + List *ddlCommandList = GetPostLoadTableCreationCommands( shardInterval->relationId, true /* includeIndexes */, true /* includeReplicaIdentity */); - indexCommandList = WorkerApplyShardDDLCommandList( - indexCommandList, + ddlCommandList = WorkerApplyShardDDLCommandList( + ddlCommandList, shardInterval->shardId); - CreateObjectOnPlacement(indexCommandList, workerPlacementNode); + /* + * A task is expected to be instantiated with a non-null 'ddlCommandList'. + * The list can be empty, if no auxiliary structures are present. + */ + if (ddlCommandList != NULL) + { + uint64 jobId = shardInterval->shardId; + Task *ddlTask = CreateTaskForDDLCommandList(jobId, ddlCommandList, + workerPlacementNode); + + ddlTaskExecList = lappend(ddlTaskExecList, ddlTask); + } } } + + ExecuteTaskListOutsideTransaction( + ROW_MODIFY_NONE, + ddlTaskExecList, + MaxAdaptiveExecutorPoolSize, + NULL /* jobIdList (ignored by API implementation) */); } @@ -587,10 +645,10 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList) { ShardInterval *sourceShardIntervalToCopy = NULL; - List *splitShardIntervalList = NULL; + List *splitShardIntervalList = NIL; int taskId = 0; - List *splitCopyTaskList = NULL; + List *splitCopyTaskList = NIL; forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList, splitShardIntervalList, shardGroupSplitIntervalListList) { @@ -632,12 +690,12 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, * ROW(81060015, -- destination shard id * -2147483648, -- split range begin * 1073741823, --split range end - * 10 -- worker node id)::citus.split_copy_info, + * 10 -- worker node id)::pg_catalog.split_copy_info, * -- split copy info for split children 2 * ROW(81060016, --destination shard id * 1073741824, --split range begin * 2147483647, --split range end - * 11 -- workef node id)::citus.split_copy_info + * 11 -- workef node id)::pg_catalog.split_copy_info * ] * ); */ @@ -662,7 +720,7 @@ CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, StringInfo splitCopyInfoRow = makeStringInfo(); appendStringInfo(splitCopyInfoRow, - "ROW(%lu, %d, %d, %u)::citus.split_copy_info", + "ROW(%lu, %d, %d, %u)::pg_catalog.split_copy_info", splitChildShardInterval->shardId, DatumGetInt32(splitChildShardInterval->minValue), DatumGetInt32(splitChildShardInterval->maxValue), @@ -712,12 +770,12 @@ static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList, List *splitPointsForShard) { - List *shardGroupSplitIntervalListList = NULL; + List *shardGroupSplitIntervalListList = NIL; ShardInterval *shardToSplitInterval = NULL; foreach_ptr(shardToSplitInterval, sourceColocatedShardIntervalList) { - List *shardSplitIntervalList = NULL; + List *shardSplitIntervalList = NIL; CreateSplitIntervalsForShard(shardToSplitInterval, splitPointsForShard, &shardSplitIntervalList); @@ -783,12 +841,17 @@ static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { - /* Iterate on shard intervals for shard group */ - List *shardIntervalList = NULL; - List *syncedShardList = NULL; + List *shardIntervalList = NIL; + List *syncedShardList = NIL; + + /* + * Iterate over all the shards in the shard group. + */ foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { - /* Iterate on split children shards along with the respective placement workers */ + /* + * Iterate on split shards list for a given shard and insert metadata. + */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, @@ -833,12 +896,19 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { /* Create constraints between shards */ - List *shardIntervalList = NULL; + List *shardIntervalList = NIL; + + /* + * Iterate over all the shards in the shard group. + */ foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { - /* Iterate on split children shards along with the respective placement workers */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; + + /* + * Iterate on split shards list for a given shard and create constraints. + */ forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, workersForPlacementList) { @@ -944,12 +1014,19 @@ static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { - List *shardIntervalList = NULL; + List *shardIntervalList = NIL; + + /* + * Iterate over all the shards in the shard group. + */ foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { - /* Iterate on split shard interval list and corresponding placement worker */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; + + /* + * Iterate on split shards list for a given shard and perform drop. + */ forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, workersForPlacementList) { diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 473f3b670..9239caffb 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -70,9 +70,10 @@ static void ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor); static void ShardCopyDestReceiverShutdown(DestReceiver *destReceiver); static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver); -static bool CanUseLocalCopy(uint64 destinationNodeId); -static StringInfo ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool - useBinaryFormat); +static bool CanUseLocalCopy(uint32_t destinationNodeId); +static StringInfo ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, + bool + useBinaryFormat); static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest); static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState @@ -80,10 +81,10 @@ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); static bool -CanUseLocalCopy(uint64 destinationNodeId) +CanUseLocalCopy(uint32_t destinationNodeId) { /* If destination node is same as source, use local copy */ - return GetLocalNodeId() == destinationNodeId; + return GetLocalNodeId() == (int32) destinationNodeId; } @@ -102,7 +103,7 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) NULL /* database (current) */); ClaimConnectionExclusively(copyDest->connection); - StringInfo copyStatement = ConstructCopyStatement( + StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, copyDest->copyOutState->binary); @@ -344,11 +345,12 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest) /* - * ConstructCopyStatement constructs the text of a COPY statement + * ConstructShardCopyStatement constructs the text of a COPY statement * for copying into a result table */ static StringInfo -ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool useBinaryFormat) +ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool + useBinaryFormat) { char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index 63f7656fd..2b33654f9 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -12,6 +12,7 @@ #include "utils/lsyscache.h" #include "utils/array.h" #include "utils/builtins.h" +#include "distributed/utils/array_type.h" #include "distributed/listutils.h" #include "distributed/multi_executor.h" #include "distributed/worker_shard_copy.h" @@ -42,7 +43,7 @@ static void BuildMinMaxRangeArrays(List *splitCopyInfoList, ArrayType **minValue ArrayType **maxValueArray); /* - * worker_split_copy(source_shard_id bigint, splitCopyInfo citus.split_copy_info[]) + * worker_split_copy(source_shard_id bigint, splitCopyInfo pg_catalog.split_copy_info[]) * UDF to split copy shard to list of destination shards. * 'source_shard_id' : Source ShardId to split copy. * 'splitCopyInfos' : Array of Split Copy Info (destination_shard's id, min/max ranges and node_id) @@ -54,10 +55,12 @@ worker_split_copy(PG_FUNCTION_ARGS) ShardInterval *shardIntervalToSplitCopy = LoadShardInterval(shardIdToSplitCopy); ArrayType *splitCopyInfoArrayObject = PG_GETARG_ARRAYTYPE_P(1); - if (array_contains_nulls(splitCopyInfoArrayObject)) + bool arrayHasNull = ARR_HASNULL(splitCopyInfoArrayObject); + if (arrayHasNull) { - ereport(ERROR, - (errmsg("Shard Copy Info cannot have null values."))); + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg( + "pg_catalog.split_copy_info array cannot contain null values"))); } const int slice_ndim = 0; @@ -67,7 +70,7 @@ worker_split_copy(PG_FUNCTION_ARGS) mState); Datum copyInfoDatum = 0; bool isnull = false; - List *splitCopyInfoList = NULL; + List *splitCopyInfoList = NIL; while (array_iterate(copyInfo_iterator, ©InfoDatum, &isnull)) { SplitCopyInfo *splitCopyInfo = NULL; @@ -118,7 +121,7 @@ ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo) if (isnull) { ereport(ERROR, (errmsg( - "destination_shard_id for split_copy_info cannot be null."))); + "destination_shard_id for pg_catalog.split_copy_info cannot be null."))); } copyInfo->destinationShardId = DatumGetUInt64(destinationShardIdDatum); @@ -127,7 +130,7 @@ ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo) if (isnull) { ereport(ERROR, (errmsg( - "destination_shard_min_value for split_copy_info cannot be null."))); + "destination_shard_min_value for pg_catalog.split_copy_info cannot be null."))); } copyInfo->destinationShardMinHashValue = minValueDatum; @@ -136,7 +139,7 @@ ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo) if (isnull) { ereport(ERROR, (errmsg( - "destination_shard_max_value for split_copy_info cannot be null."))); + "destination_shard_max_value for pg_catalog.split_copy_info cannot be null."))); } copyInfo->destinationShardMaxHashValue = maxValueDatum; @@ -145,7 +148,7 @@ ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo) if (isnull) { ereport(ERROR, (errmsg( - "destination_shard_node_id for split_copy_info cannot be null."))); + "destination_shard_node_id for pg_catalog.split_copy_info cannot be null."))); } copyInfo->destinationShardNodeId = DatumGetInt32(nodeIdDatum); diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql index f7dc5ca07..c1cf1afa4 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql @@ -50,12 +50,11 @@ DROP FUNCTION pg_catalog.citus_split_shard_by_split_points( shard_id bigint, split_points text[], node_ids integer[], - split_mode citus.split_mode); + shard_transfer_mode citus.shard_transfer_mode); DROP FUNCTION pg_catalog.worker_split_copy( source_shard_id bigint, - splitCopyInfos citus.split_copy_info[]); -DROP TYPE citus.split_mode; -DROP TYPE citus.split_copy_info; + splitCopyInfos pg_catalog.split_copy_info[]); +DROP TYPE pg_catalog.split_copy_info; #include "../../../columnar/sql/downgrades/columnar--11.1-1--11.0-3.sql" diff --git a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.1-1.sql index cd6701e2d..36624c40e 100644 --- a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.1-1.sql +++ b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.1-1.sql @@ -1,23 +1,14 @@ -DROP TYPE IF EXISTS citus.split_mode; - --- Three modes to be implemented: blocking, non_blocking and auto. --- Currently, the default / only supported mode is blocking. -CREATE TYPE citus.split_mode AS ENUM ( - 'blocking', - 'non_blocking' -); - CREATE OR REPLACE FUNCTION pg_catalog.citus_split_shard_by_split_points( shard_id bigint, split_points text[], -- A 'nodeId' is a uint32 in CITUS [1, 4294967296] but postgres does not have unsigned type support. -- Use integer (consistent with other previously defined UDFs that take nodeId as integer) as for all practical purposes it is big enough. node_ids integer[], - -- Three modes to be implemented: blocking, non_blocking and auto. - -- Currently, the default / only supported mode is blocking. - split_mode citus.split_mode default 'blocking') + -- Three modes to be implemented: block_writes, force_logical and auto. + -- Currently, the default / only supported mode is block_writes. + shard_transfer_mode citus.shard_transfer_mode default 'block_writes') RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$citus_split_shard_by_split_points$$; -COMMENT ON FUNCTION pg_catalog.citus_split_shard_by_split_points(shard_id bigint, split_points text[], nodeIds integer[], citus.split_mode) +COMMENT ON FUNCTION pg_catalog.citus_split_shard_by_split_points(shard_id bigint, split_points text[], nodeIds integer[], citus.shard_transfer_mode) IS 'split a shard using split mode.'; diff --git a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql index cd6701e2d..36624c40e 100644 --- a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql @@ -1,23 +1,14 @@ -DROP TYPE IF EXISTS citus.split_mode; - --- Three modes to be implemented: blocking, non_blocking and auto. --- Currently, the default / only supported mode is blocking. -CREATE TYPE citus.split_mode AS ENUM ( - 'blocking', - 'non_blocking' -); - CREATE OR REPLACE FUNCTION pg_catalog.citus_split_shard_by_split_points( shard_id bigint, split_points text[], -- A 'nodeId' is a uint32 in CITUS [1, 4294967296] but postgres does not have unsigned type support. -- Use integer (consistent with other previously defined UDFs that take nodeId as integer) as for all practical purposes it is big enough. node_ids integer[], - -- Three modes to be implemented: blocking, non_blocking and auto. - -- Currently, the default / only supported mode is blocking. - split_mode citus.split_mode default 'blocking') + -- Three modes to be implemented: block_writes, force_logical and auto. + -- Currently, the default / only supported mode is block_writes. + shard_transfer_mode citus.shard_transfer_mode default 'block_writes') RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$citus_split_shard_by_split_points$$; -COMMENT ON FUNCTION pg_catalog.citus_split_shard_by_split_points(shard_id bigint, split_points text[], nodeIds integer[], citus.split_mode) +COMMENT ON FUNCTION pg_catalog.citus_split_shard_by_split_points(shard_id bigint, split_points text[], nodeIds integer[], citus.shard_transfer_mode) IS 'split a shard using split mode.'; diff --git a/src/backend/distributed/sql/udfs/worker_split_copy/11.1-1.sql b/src/backend/distributed/sql/udfs/worker_split_copy/11.1-1.sql index b9c5869d2..0ecad4a07 100644 --- a/src/backend/distributed/sql/udfs/worker_split_copy/11.1-1.sql +++ b/src/backend/distributed/sql/udfs/worker_split_copy/11.1-1.sql @@ -1,3 +1,7 @@ +-- We want to create the type in pg_catalog but doing that leads to an error +-- "ERROR: permission denied to create "pg_catalog.split_copy_info" +-- "DETAIL: System catalog modifications are currently disallowed. "" +-- As a workaround, we create the type in the citus schema and then later modify it to pg_catalog. DROP TYPE IF EXISTS citus.split_copy_info; CREATE TYPE citus.split_copy_info AS ( destination_shard_id bigint, @@ -6,12 +10,13 @@ CREATE TYPE citus.split_copy_info AS ( -- A 'nodeId' is a uint32 in CITUS [1, 4294967296] but postgres does not have unsigned type support. -- Use integer (consistent with other previously defined UDFs that take nodeId as integer) as for all practical purposes it is big enough. destination_shard_node_id integer); +ALTER TYPE citus.split_copy_info SET SCHEMA pg_catalog; CREATE OR REPLACE FUNCTION pg_catalog.worker_split_copy( source_shard_id bigint, - splitCopyInfos citus.split_copy_info[]) + splitCopyInfos pg_catalog.split_copy_info[]) RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_copy$$; -COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos citus.split_copy_info[]) +COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos pg_catalog.split_copy_info[]) IS 'Perform split copy for shard'; diff --git a/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql b/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql index b9c5869d2..0ecad4a07 100644 --- a/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql +++ b/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql @@ -1,3 +1,7 @@ +-- We want to create the type in pg_catalog but doing that leads to an error +-- "ERROR: permission denied to create "pg_catalog.split_copy_info" +-- "DETAIL: System catalog modifications are currently disallowed. "" +-- As a workaround, we create the type in the citus schema and then later modify it to pg_catalog. DROP TYPE IF EXISTS citus.split_copy_info; CREATE TYPE citus.split_copy_info AS ( destination_shard_id bigint, @@ -6,12 +10,13 @@ CREATE TYPE citus.split_copy_info AS ( -- A 'nodeId' is a uint32 in CITUS [1, 4294967296] but postgres does not have unsigned type support. -- Use integer (consistent with other previously defined UDFs that take nodeId as integer) as for all practical purposes it is big enough. destination_shard_node_id integer); +ALTER TYPE citus.split_copy_info SET SCHEMA pg_catalog; CREATE OR REPLACE FUNCTION pg_catalog.worker_split_copy( source_shard_id bigint, - splitCopyInfos citus.split_copy_info[]) + splitCopyInfos pg_catalog.split_copy_info[]) RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_copy$$; -COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos citus.split_copy_info[]) +COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos pg_catalog.split_copy_info[]) IS 'Perform split copy for shard'; diff --git a/src/test/regress/expected/citus_split_shard_by_split_points.out b/src/test/regress/expected/citus_split_shard_by_split_points.out index 8adbb5f69..743996160 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points.out @@ -11,6 +11,9 @@ Here is a high level overview of test plan: 8. Split an already split shard second time on a different schema. */ CREATE SCHEMA "citus_split_test_schema"; +CREATE ROLE test_split_role WITH LOGIN; +GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role; +SET ROLE test_split_role; SET search_path TO "citus_split_test_schema"; SET citus.next_shard_id TO 8981000; SET citus.next_placement_id TO 8610000; @@ -91,7 +94,6 @@ SELECT COUNT(*) FROM colocated_dist_table; -- END: Load data into tables. -- BEGIN : Display current state. --- TODO(niupre): Can we refactor this to be a function? SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid @@ -194,6 +196,7 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node -- END : Display current state -- BEGIN : Move one shard before we split it. \c - postgres - :master_port +SET ROLE test_split_role; SET search_path TO "citus_split_test_schema"; SET citus.next_shard_id TO 8981007; SET citus.defer_drop_after_shard_move TO OFF; @@ -214,7 +217,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 8981000, ARRAY['-1073741824'], ARRAY[:worker_1_node, :worker_2_node], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -225,7 +228,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 8981001, ARRAY['536870911', '1610612735'], ARRAY[:worker_1_node, :worker_1_node, :worker_2_node], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -241,7 +244,6 @@ SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localho -- END : Move a shard post split. -- BEGIN : Display current state. --- TODO(niupre): Can we refactor this to be a function? SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid @@ -374,6 +376,7 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node -- END : Display current state -- BEGIN: Should be able to change/drop constraints \c - postgres - :master_port +SET ROLE test_split_role; SET search_path TO "citus_split_test_schema"; ALTER INDEX index_on_sensors RENAME TO index_on_sensors_renamed; ALTER INDEX index_on_sensors_renamed ALTER COLUMN 1 SET STATISTICS 200; @@ -388,7 +391,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 8981007, ARRAY['-2100000000'], ARRAY[:worker_1_node, :worker_2_node], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- diff --git a/src/test/regress/expected/citus_split_shard_by_split_points_negative.out b/src/test/regress/expected/citus_split_shard_by_split_points_negative.out index d8b006741..5986fa74b 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points_negative.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points_negative.out @@ -32,6 +32,25 @@ SELECT create_distributed_table('table_to_split','id'); SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +-- UDF fails for any other shard_transfer_mode other than block_writes. +SELECT citus_split_shard_by_split_points( + 49761302, + ARRAY['50'], + ARRAY[101, 201], + 'auto'); +ERROR: Shard Tranfer mode: 'auto' is not supported. Please use 'block_writes' instead. +SELECT citus_split_shard_by_split_points( + 49761302, + ARRAY['50'], + ARRAY[101, 201], + 'force_logical'); +ERROR: Shard Tranfer mode: 'force_logical' is not supported. Please use 'block_writes' instead. +SELECT citus_split_shard_by_split_points( + 49761302, + ARRAY['50'], + ARRAY[101, 201], + 'gibberish'); +ERROR: invalid input value for enum citus.shard_transfer_mode: "gibberish" -- UDF fails for range partitioned tables. SELECT citus_split_shard_by_split_points( 60761300, @@ -83,6 +102,15 @@ SELECT citus_split_shard_by_split_points( ARRAY['-1073741825'], -- Split point equals shard's max value. ARRAY[:worker_1_node, :worker_2_node]); ERROR: Invalid split point -1073741825, as split points should be inclusive. Please use -1073741826 instead. +-- UDF fails if resulting shard count from split greater than MAX_SHARD_COUNT (64000) +-- 64000 split point definee 64000+1 way split (64001 worker nodes needed). +WITH shard_ranges AS (SELECT ((-2147483648 + indx))::text as split_points, :worker_1_node as node_ids FROM generate_series(1,64000) indx ) +SELECT citus_split_shard_by_split_points( + 49761300, + array_agg(split_points), + array_agg(node_ids) || :worker_1_node) --placement node list should exceed split points by one. +FROM shard_ranges; +ERROR: Resulting shard count '64001' with split is greater than max shard count '64000' limit. -- UDF fails where source shard cannot be split further i.e min and max range is equal. -- Create a Shard where range cannot be split further SELECT isolate_tenant_to_new_shard('table_to_split', 1); diff --git a/src/test/regress/expected/isolation_blocking_shard_split.out b/src/test/regress/expected/isolation_blocking_shard_split.out index ff3c250fd..02a23174e 100644 --- a/src/test/regress/expected/isolation_blocking_shard_split.out +++ b/src/test/regress/expected/isolation_blocking_shard_split.out @@ -42,7 +42,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -126,7 +126,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -200,7 +200,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -280,7 +280,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -359,7 +359,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -439,7 +439,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -509,7 +509,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -585,7 +585,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -657,7 +657,7 @@ step s1-blocking-shard-split: 1500001, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -669,7 +669,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); step s1-commit: COMMIT; @@ -732,7 +732,7 @@ step s1-blocking-shard-split: 1500001, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -744,7 +744,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); step s1-commit: COMMIT; @@ -812,7 +812,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -895,7 +895,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- diff --git a/src/test/regress/expected/isolation_blocking_shard_split_with_fkey_to_reference.out b/src/test/regress/expected/isolation_blocking_shard_split_with_fkey_to_reference.out index 410b9c2a0..9a6ed53eb 100644 --- a/src/test/regress/expected/isolation_blocking_shard_split_with_fkey_to_reference.out +++ b/src/test/regress/expected/isolation_blocking_shard_split_with_fkey_to_reference.out @@ -20,7 +20,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -80,7 +80,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -140,7 +140,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -200,7 +200,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -260,7 +260,7 @@ step s2-blocking-shard-split: 1500002, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index a7b75811d..780a2c705 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1095,24 +1095,23 @@ SELECT * FROM multi_extension.print_extension_changes(); table columnar.chunk_group | table columnar.options | table columnar.stripe | - | function citus_split_shard_by_split_points(bigint,text[],integer[],citus.split_mode) void + | function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void | function columnar.get_storage_id(regclass) bigint | function columnar_internal.columnar_handler(internal) table_am_handler - | function worker_split_copy(bigint,citus.split_copy_info[]) void + | function worker_split_copy(bigint,split_copy_info[]) void | schema columnar_internal | sequence columnar_internal.storageid_seq | table columnar_internal.chunk | table columnar_internal.chunk_group | table columnar_internal.options | table columnar_internal.stripe - | type citus.split_copy_info - | type citus.split_mode + | type split_copy_info | view columnar.chunk | view columnar.chunk_group | view columnar.options | view columnar.storage | view columnar.stripe -(31 rows) +(30 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index e14b98282..fb034c632 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -113,7 +113,7 @@ ORDER BY 1; function citus_shard_indexes_on_worker() function citus_shard_sizes() function citus_shards_on_worker() - function citus_split_shard_by_split_points(bigint,text[],integer[],citus.split_mode) + function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) function citus_stat_activity() function citus_stat_statements() function citus_stat_statements_reset() @@ -243,7 +243,7 @@ ORDER BY 1; function worker_record_sequence_dependency(regclass,regclass,name) function worker_save_query_explain_analyze(text,jsonb) function worker_split_shard_replication_setup(citus.split_shard_info[]) - function worker_split_copy(bigint,citus.split_copy_info[]) + function worker_split_copy(bigint,split_copy_info[]) schema citus schema citus_internal schema columnar @@ -273,10 +273,9 @@ ORDER BY 1; type citus.distribution_type type citus.shard_transfer_mode type citus.split_shard_info - type citus.split_copy_info - type citus.split_mode type citus_copy_format type noderole + type split_copy_info view citus_dist_stat_activity view citus_lock_waits view citus_schema.citus_tables @@ -292,5 +291,5 @@ ORDER BY 1; view columnar.stripe view pg_dist_shard_placement view time_partitions -(274 rows) +(273 rows) diff --git a/src/test/regress/expected/worker_shard_binary_copy_test.out b/src/test/regress/expected/worker_shard_binary_copy_test.out index 800d06839..fc9b2cd86 100644 --- a/src/test/regress/expected/worker_shard_binary_copy_test.out +++ b/src/test/regress/expected/worker_shard_binary_copy_test.out @@ -154,12 +154,12 @@ SELECT * from worker_split_copy( ROW(81060015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81060016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); worker_split_copy @@ -176,12 +176,12 @@ SELECT * from worker_split_copy( ROW(81060015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_2_node)::citus.split_copy_info, + :worker_2_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81060016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_2_node)::citus.split_copy_info + :worker_2_node)::pg_catalog.split_copy_info ] ); worker_split_copy diff --git a/src/test/regress/expected/worker_shard_text_copy_test.out b/src/test/regress/expected/worker_shard_text_copy_test.out index 1ed623ad6..52b26cbb9 100644 --- a/src/test/regress/expected/worker_shard_text_copy_test.out +++ b/src/test/regress/expected/worker_shard_text_copy_test.out @@ -154,12 +154,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); worker_split_copy @@ -176,12 +176,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_2_node)::citus.split_copy_info, + :worker_2_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_2_node)::citus.split_copy_info + :worker_2_node)::pg_catalog.split_copy_info ] ); worker_split_copy diff --git a/src/test/regress/expected/worker_split_binary_copy_test.out b/src/test/regress/expected/worker_split_binary_copy_test.out index 0453530d1..07dacbdb1 100644 --- a/src/test/regress/expected/worker_split_binary_copy_test.out +++ b/src/test/regress/expected/worker_split_binary_copy_test.out @@ -3,6 +3,43 @@ SET search_path TO worker_split_binary_copy_test; SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 81060000; +-- Remove extra nodes added, otherwise GetLocalNodeId() does not bahave correctly. +SELECT citus_remove_node('localhost', 8887); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_remove_node('localhost', 9995); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_remove_node('localhost', 9992); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_remove_node('localhost', 9998); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_remove_node('localhost', 9997); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_remove_node('localhost', 8888); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + -- BEGIN: Create distributed table and insert data. CREATE TABLE worker_split_binary_copy_test.shard_to_split_copy ( l_orderkey bigint not null, @@ -154,12 +191,12 @@ SELECT * from worker_split_copy( ROW(81060015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81060016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); worker_split_copy @@ -176,12 +213,12 @@ SELECT * from worker_split_copy( ROW(81060015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_2_node)::citus.split_copy_info, + :worker_2_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81060016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_2_node)::citus.split_copy_info + :worker_2_node)::pg_catalog.split_copy_info ] ); worker_split_copy diff --git a/src/test/regress/expected/worker_split_copy_test.out b/src/test/regress/expected/worker_split_copy_test.out index 52f93d5c1..c17ef5aa4 100644 --- a/src/test/regress/expected/worker_split_copy_test.out +++ b/src/test/regress/expected/worker_split_copy_test.out @@ -51,6 +51,52 @@ SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_810700 SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -- END: Set worker_1_node and worker_2_node +-- BEGIN: Test Negative scenario +SELECT * from worker_split_copy( + 101, -- Invalid source shard id. + ARRAY[ + -- split copy info for split children 1 + ROW(81070015, -- destination shard id + -2147483648, -- split range begin + -1073741824, --split range end + :worker_1_node)::pg_catalog.split_copy_info, + -- split copy info for split children 2 + ROW(81070016, --destination shard id + -1073741823, --split range begin + -1, --split range end + :worker_1_node)::pg_catalog.split_copy_info + ] + ); +ERROR: could not find valid entry for shard xxxxx +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[] -- empty array + ); +ERROR: cannot determine type of empty array +HINT: Explicitly cast to the desired type, for example ARRAY[]::integer[]. +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[NULL] -- empty array + ); +ERROR: function worker_split_copy(integer, text[]) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[NULL::pg_catalog.split_copy_info]-- empty array + ); +ERROR: pg_catalog.split_copy_info array cannot contain null values +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[ROW(NULL)]-- empty array + ); +ERROR: function worker_split_copy(integer, record[]) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[ROW(NULL, NULL, NULL, NULL)::pg_catalog.split_copy_info] -- empty array + ); +ERROR: destination_shard_id for pg_catalog.split_copy_info cannot be null. +-- END: Test Negative scenario -- BEGIN: Trigger 2-way local shard split copy. -- Ensure we will perform text copy. SET citus.enable_binary_protocol = false; @@ -61,12 +107,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin -1073741824, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id -1073741823, --split range begin -1, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); worker_split_copy diff --git a/src/test/regress/expected/worker_split_text_copy_test.out b/src/test/regress/expected/worker_split_text_copy_test.out index 80aad97d8..164d3a6d7 100644 --- a/src/test/regress/expected/worker_split_text_copy_test.out +++ b/src/test/regress/expected/worker_split_text_copy_test.out @@ -154,12 +154,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); worker_split_copy @@ -176,12 +176,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_2_node)::citus.split_copy_info, + :worker_2_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_2_node)::citus.split_copy_info + :worker_2_node)::pg_catalog.split_copy_info ] ); worker_split_copy diff --git a/src/test/regress/spec/isolation_blocking_shard_split.spec b/src/test/regress/spec/isolation_blocking_shard_split.spec index a06824886..ddac66f5b 100644 --- a/src/test/regress/spec/isolation_blocking_shard_split.spec +++ b/src/test/regress/spec/isolation_blocking_shard_split.spec @@ -70,7 +70,7 @@ step "s1-blocking-shard-split" 1500001, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); } step "s1-commit" @@ -91,7 +91,7 @@ step "s2-blocking-shard-split" 1500002, ARRAY['1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); } step "s2-commit" diff --git a/src/test/regress/spec/isolation_blocking_shard_split_with_fkey_to_reference.spec b/src/test/regress/spec/isolation_blocking_shard_split_with_fkey_to_reference.spec index 243d8ef05..49b56c4a5 100644 --- a/src/test/regress/spec/isolation_blocking_shard_split_with_fkey_to_reference.spec +++ b/src/test/regress/spec/isolation_blocking_shard_split_with_fkey_to_reference.spec @@ -67,7 +67,7 @@ step "s2-blocking-shard-split" 1500002, ARRAY['-1073741824'], ARRAY[1, 2], - 'blocking'); + 'block_writes'); } step "s2-add-fkey" diff --git a/src/test/regress/sql/citus_split_shard_by_split_points.sql b/src/test/regress/sql/citus_split_shard_by_split_points.sql index 8d618c3c5..6c2957953 100644 --- a/src/test/regress/sql/citus_split_shard_by_split_points.sql +++ b/src/test/regress/sql/citus_split_shard_by_split_points.sql @@ -12,6 +12,11 @@ Here is a high level overview of test plan: */ CREATE SCHEMA "citus_split_test_schema"; + +CREATE ROLE test_split_role WITH LOGIN; +GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role; +SET ROLE test_split_role; + SET search_path TO "citus_split_test_schema"; SET citus.next_shard_id TO 8981000; SET citus.next_placement_id TO 8610000; @@ -67,7 +72,6 @@ SELECT COUNT(*) FROM colocated_dist_table; -- END: Load data into tables. -- BEGIN : Display current state. --- TODO(niupre): Can we refactor this to be a function? SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid @@ -115,6 +119,7 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node -- BEGIN : Move one shard before we split it. \c - postgres - :master_port +SET ROLE test_split_role; SET search_path TO "citus_split_test_schema"; SET citus.next_shard_id TO 8981007; SET citus.defer_drop_after_shard_move TO OFF; @@ -133,14 +138,14 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 8981000, ARRAY['-1073741824'], ARRAY[:worker_1_node, :worker_2_node], - 'blocking'); + 'block_writes'); -- Perform 3 way split SELECT pg_catalog.citus_split_shard_by_split_points( 8981001, ARRAY['536870911', '1610612735'], ARRAY[:worker_1_node, :worker_1_node, :worker_2_node], - 'blocking'); + 'block_writes'); -- END : Split two shards : One with move and One without move. -- BEGIN : Move a shard post split. @@ -148,7 +153,6 @@ SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localho -- END : Move a shard post split. -- BEGIN : Display current state. --- TODO(niupre): Can we refactor this to be a function? SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid @@ -196,6 +200,7 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node -- BEGIN: Should be able to change/drop constraints \c - postgres - :master_port +SET ROLE test_split_role; SET search_path TO "citus_split_test_schema"; ALTER INDEX index_on_sensors RENAME TO index_on_sensors_renamed; ALTER INDEX index_on_sensors_renamed ALTER COLUMN 1 SET STATISTICS 200; @@ -211,7 +216,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 8981007, ARRAY['-2100000000'], ARRAY[:worker_1_node, :worker_2_node], - 'blocking'); + 'block_writes'); SET search_path TO "citus_split_test_schema"; SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport diff --git a/src/test/regress/sql/citus_split_shard_by_split_points_negative.sql b/src/test/regress/sql/citus_split_shard_by_split_points_negative.sql index bdaf32682..e730a8c28 100644 --- a/src/test/regress/sql/citus_split_shard_by_split_points_negative.sql +++ b/src/test/regress/sql/citus_split_shard_by_split_points_negative.sql @@ -23,6 +23,25 @@ SELECT create_distributed_table('table_to_split','id'); SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +-- UDF fails for any other shard_transfer_mode other than block_writes. +SELECT citus_split_shard_by_split_points( + 49761302, + ARRAY['50'], + ARRAY[101, 201], + 'auto'); + +SELECT citus_split_shard_by_split_points( + 49761302, + ARRAY['50'], + ARRAY[101, 201], + 'force_logical'); + +SELECT citus_split_shard_by_split_points( + 49761302, + ARRAY['50'], + ARRAY[101, 201], + 'gibberish'); + -- UDF fails for range partitioned tables. SELECT citus_split_shard_by_split_points( 60761300, @@ -74,6 +93,15 @@ SELECT citus_split_shard_by_split_points( ARRAY['-1073741825'], -- Split point equals shard's max value. ARRAY[:worker_1_node, :worker_2_node]); +-- UDF fails if resulting shard count from split greater than MAX_SHARD_COUNT (64000) +-- 64000 split point definee 64000+1 way split (64001 worker nodes needed). +WITH shard_ranges AS (SELECT ((-2147483648 + indx))::text as split_points, :worker_1_node as node_ids FROM generate_series(1,64000) indx ) +SELECT citus_split_shard_by_split_points( + 49761300, + array_agg(split_points), + array_agg(node_ids) || :worker_1_node) --placement node list should exceed split points by one. +FROM shard_ranges; + -- UDF fails where source shard cannot be split further i.e min and max range is equal. -- Create a Shard where range cannot be split further SELECT isolate_tenant_to_new_shard('table_to_split', 1); diff --git a/src/test/regress/sql/worker_split_binary_copy_test.sql b/src/test/regress/sql/worker_split_binary_copy_test.sql index 03426f443..a47e968bd 100644 --- a/src/test/regress/sql/worker_split_binary_copy_test.sql +++ b/src/test/regress/sql/worker_split_binary_copy_test.sql @@ -4,6 +4,14 @@ SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 81060000; +-- Remove extra nodes added, otherwise GetLocalNodeId() does not bahave correctly. +SELECT citus_remove_node('localhost', 8887); +SELECT citus_remove_node('localhost', 9995); +SELECT citus_remove_node('localhost', 9992); +SELECT citus_remove_node('localhost', 9998); +SELECT citus_remove_node('localhost', 9997); +SELECT citus_remove_node('localhost', 8888); + -- BEGIN: Create distributed table and insert data. CREATE TABLE worker_split_binary_copy_test.shard_to_split_copy ( l_orderkey bigint not null, @@ -157,12 +165,12 @@ SELECT * from worker_split_copy( ROW(81060015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81060016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); -- END: Trigger 2-way local shard split copy. @@ -175,12 +183,12 @@ SELECT * from worker_split_copy( ROW(81060015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_2_node)::citus.split_copy_info, + :worker_2_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81060016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_2_node)::citus.split_copy_info + :worker_2_node)::pg_catalog.split_copy_info ] ); -- END: Trigger 2-way remote shard split copy. diff --git a/src/test/regress/sql/worker_split_copy_test.sql b/src/test/regress/sql/worker_split_copy_test.sql index 324cc87c2..b799eb305 100644 --- a/src/test/regress/sql/worker_split_copy_test.sql +++ b/src/test/regress/sql/worker_split_copy_test.sql @@ -35,6 +35,49 @@ SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -- END: Set worker_1_node and worker_2_node +-- BEGIN: Test Negative scenario +SELECT * from worker_split_copy( + 101, -- Invalid source shard id. + ARRAY[ + -- split copy info for split children 1 + ROW(81070015, -- destination shard id + -2147483648, -- split range begin + -1073741824, --split range end + :worker_1_node)::pg_catalog.split_copy_info, + -- split copy info for split children 2 + ROW(81070016, --destination shard id + -1073741823, --split range begin + -1, --split range end + :worker_1_node)::pg_catalog.split_copy_info + ] + ); + +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[] -- empty array + ); + +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[NULL] -- empty array + ); + +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[NULL::pg_catalog.split_copy_info]-- empty array + ); + +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[ROW(NULL)]-- empty array + ); + +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[ROW(NULL, NULL, NULL, NULL)::pg_catalog.split_copy_info] -- empty array + ); +-- END: Test Negative scenario + -- BEGIN: Trigger 2-way local shard split copy. -- Ensure we will perform text copy. SET citus.enable_binary_protocol = false; @@ -45,12 +88,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin -1073741824, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id -1073741823, --split range begin -1, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); -- END: Trigger 2-way local shard split copy. diff --git a/src/test/regress/sql/worker_split_text_copy_test.sql b/src/test/regress/sql/worker_split_text_copy_test.sql index 1fa5d476e..10791a66d 100644 --- a/src/test/regress/sql/worker_split_text_copy_test.sql +++ b/src/test/regress/sql/worker_split_text_copy_test.sql @@ -157,12 +157,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_1_node)::citus.split_copy_info, + :worker_1_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_1_node)::citus.split_copy_info + :worker_1_node)::pg_catalog.split_copy_info ] ); -- END: Trigger 2-way local shard split copy. @@ -175,12 +175,12 @@ SELECT * from worker_split_copy( ROW(81070015, -- destination shard id -2147483648, -- split range begin 1073741823, --split range end - :worker_2_node)::citus.split_copy_info, + :worker_2_node)::pg_catalog.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id 1073741824, --split range begin 2147483647, --split range end - :worker_2_node)::citus.split_copy_info + :worker_2_node)::pg_catalog.split_copy_info ] ); -- END: Trigger 2-way remote shard split copy.