diff --git a/src/backend/distributed/operations/citus_split_shard_by_split_points.c b/src/backend/distributed/operations/citus_split_shard_by_split_points.c index f9aa42e12..071a41073 100644 --- a/src/backend/distributed/operations/citus_split_shard_by_split_points.c +++ b/src/backend/distributed/operations/citus_split_shard_by_split_points.c @@ -33,8 +33,8 @@ static SplitMode LookupSplitMode(Oid shardSplitModeOid); * citus_split_shard_by_split_points(shard_id bigint, split_points integer[], node_ids integer[]) * Split source shard into multiple shards using the given split points. * 'shard_id' is the id of source shard to split. - * 'split_points' is an array of integers that represents the split points. - * 'node_ids' is an array of integers that represents the placement node ids of the new shards. + * 'split_points' is an array that represents the split points. + * 'node_ids' is an array that represents the placement node ids of the new shards. * 'split_mode citus.split_mode' is the mode of split. */ Datum @@ -66,7 +66,7 @@ citus_split_shard_by_split_points(PG_FUNCTION_ARGS) /* * LookupSplitMode maps the oids of citus.shard_split_mode enum - * values to a char. + * values to an enum. */ SplitMode LookupSplitMode(Oid shardSplitModeOid) diff --git a/src/backend/distributed/operations/isolate_shards.c b/src/backend/distributed/operations/isolate_shards.c index 680a3fdf3..4f377a3b3 100644 --- a/src/backend/distributed/operations/isolate_shards.c +++ b/src/backend/distributed/operations/isolate_shards.c @@ -32,6 +32,7 @@ #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" #include "distributed/version_compat.h" +#include "distributed/shard_split.h" #include "nodes/pg_list.h" #include "storage/lock.h" #include "utils/builtins.h" @@ -48,7 +49,6 @@ PG_FUNCTION_INFO_V1(worker_hash); /* local function forward declarations */ static uint64 SplitShardByValue(ShardInterval *sourceShard, Datum distributionValueDatum); -static void ErrorIfCannotSplitShard(ShardInterval *sourceShard); static void CreateSplitOffShards(ShardInterval *sourceShard, int hashedValue, List **splitOffShardList, int *isolatedShardId); static List * ShardTemplateList(ShardInterval *sourceShard, int hashedValue, @@ -57,13 +57,6 @@ static ShardInterval * CreateSplitOffShardFromTemplate(ShardInterval *shardTempl Oid relationId); static List * SplitOffCommandList(ShardInterval *sourceShard, ShardInterval *splitOffShard); -static void ExecuteCommandListOnPlacements(List *commandList, List *placementList); -static void InsertSplitOffShardMetadata(List *splitOffShardList, - List *sourcePlacementList); -static void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList); -static void ExecuteCommandListOnWorker(char *nodeName, int nodePort, List *commandList); -static void DropShardList(List *shardIntervalList); - /* * isolate_tenant_to_new_shard isolates a tenant to its own shard by spliting @@ -245,7 +238,7 @@ SplitShardByValue(ShardInterval *sourceShard, Datum distributionValueDatum) /* get locks */ BlockWritesToShardList(colocatedShardList); - ErrorIfCannotSplitShard(sourceShard); + ErrorIfCannotSplitShard(ISOLATE_TENANT_TO_NEW_SHARD, sourceShard); /* get hash function name */ CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); @@ -283,154 +276,6 @@ SplitShardByValue(ShardInterval *sourceShard, Datum distributionValueDatum) return isolatedShardId; } - -/* - * CreateForeignConstraints creates the foreign constraints on the newly - * created shards via the tenant isolation. - * - * The function treats foreign keys to reference tables and foreign keys to - * co-located distributed tables differently. The former one needs to be - * executed over a single connection to prevent self-deadlocks. The latter - * one can be executed in parallel if there are multiple replicas. - */ -static void -CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList) -{ - ListCell *splitOffShardCell = NULL; - - List *colocatedShardForeignConstraintCommandList = NIL; - List *referenceTableForeignConstraintList = NIL; - - foreach(splitOffShardCell, splitOffShardList) - { - ShardInterval *splitOffShard = (ShardInterval *) lfirst(splitOffShardCell); - - List *currentColocatedForeignKeyList = NIL; - List *currentReferenceForeignKeyList = NIL; - - CopyShardForeignConstraintCommandListGrouped(splitOffShard, - ¤tColocatedForeignKeyList, - ¤tReferenceForeignKeyList); - - colocatedShardForeignConstraintCommandList = - list_concat(colocatedShardForeignConstraintCommandList, - currentColocatedForeignKeyList); - referenceTableForeignConstraintList = - list_concat(referenceTableForeignConstraintList, - currentReferenceForeignKeyList); - } - - /* - * We can use parallel connections to while creating co-located foreign keys - * if the source placement . - * However, foreign keys to reference tables need to be created using a single - * connection per worker to prevent self-deadlocks. - */ - if (colocatedShardForeignConstraintCommandList != NIL) - { - ExecuteCommandListOnPlacements(colocatedShardForeignConstraintCommandList, - sourcePlacementList); - } - - if (referenceTableForeignConstraintList != NIL) - { - ListCell *shardPlacementCell = NULL; - foreach(shardPlacementCell, sourcePlacementList) - { - ShardPlacement *shardPlacement = - (ShardPlacement *) lfirst(shardPlacementCell); - - char *nodeName = shardPlacement->nodeName; - int32 nodePort = shardPlacement->nodePort; - - /* - * We're using the connections that we've used for dropping the - * source placements within the same coordinated transaction. - */ - ExecuteCommandListOnWorker(nodeName, nodePort, - referenceTableForeignConstraintList); - } - } -} - - -/* - * ExecuteCommandListOnWorker executes the command on the given node within - * the coordinated 2PC. - */ -static void -ExecuteCommandListOnWorker(char *nodeName, int nodePort, List *commandList) -{ - ListCell *commandCell = NULL; - - foreach(commandCell, commandList) - { - char *command = (char *) lfirst(commandCell); - - SendCommandToWorker(nodeName, nodePort, command); - } -} - - -/* - * ErrorIfCannotSplitShard checks relation kind and invalid shards. It errors - * out if we are not able to split the given shard. - */ -static void -ErrorIfCannotSplitShard(ShardInterval *sourceShard) -{ - Oid relationId = sourceShard->relationId; - ListCell *colocatedTableCell = NULL; - ListCell *colocatedShardCell = NULL; - - /* checks for table ownership and foreign tables */ - List *colocatedTableList = ColocatedTableList(relationId); - foreach(colocatedTableCell, colocatedTableList) - { - Oid colocatedTableId = lfirst_oid(colocatedTableCell); - - /* check that user has owner rights in all co-located tables */ - EnsureTableOwner(colocatedTableId); - - char relationKind = get_rel_relkind(colocatedTableId); - if (relationKind == RELKIND_FOREIGN_TABLE) - { - char *relationName = get_rel_name(colocatedTableId); - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot isolate tenant because \"%s\" is a " - "foreign table", relationName), - errdetail("Isolating shards backed by foreign tables " - "is not supported."))); - } - } - - /* check shards with inactive placements */ - List *colocatedShardList = ColocatedShardIntervalList(sourceShard); - foreach(colocatedShardCell, colocatedShardList) - { - ShardInterval *shardInterval = (ShardInterval *) lfirst(colocatedShardCell); - uint64 shardId = shardInterval->shardId; - ListCell *shardPlacementCell = NULL; - - List *shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId); - foreach(shardPlacementCell, shardPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); - if (placement->shardState != SHARD_STATE_ACTIVE) - { - char *relationName = get_rel_name(shardInterval->relationId); - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot isolate tenant because relation " - "\"%s\" has an inactive shard placement " - "for the shard %lu", relationName, shardId), - errhint("Use master_copy_shard_placement UDF to " - "repair the inactive shard placement."))); - } - } - } -} - - /* * CreateSplitOffShards gets a shard and a hashed value to pick the split point. * First, it creates templates to create new shards. Then, for every colocated @@ -611,212 +456,3 @@ SplitOffCommandList(ShardInterval *sourceShard, ShardInterval *splitOffShard) return splitOffCommandList; } - - -/* - * ExecuteCommandListOnPlacements runs the given command list on the nodes of - * the given shard placement list. First, it creates connections. Then it sends - * commands one by one. For every command, first it send the command to all - * connections and then checks the results. This helps to run long running - * commands in parallel. Finally, it sends commit messages to all connections - * and close them. - */ -static void -ExecuteCommandListOnPlacements(List *commandList, List *placementList) -{ - List *workerConnectionList = NIL; - ListCell *workerConnectionCell = NULL; - ListCell *shardPlacementCell = NULL; - ListCell *commandCell = NULL; - - /* create connections and start transactions */ - foreach(shardPlacementCell, placementList) - { - ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell); - char *nodeName = shardPlacement->nodeName; - int32 nodePort = shardPlacement->nodePort; - - int connectionFlags = FORCE_NEW_CONNECTION; - char *currentUser = CurrentUserName(); - - /* create a new connection */ - MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, - nodeName, - nodePort, - currentUser, - NULL); - - /* mark connection as critical ans start transaction */ - MarkRemoteTransactionCritical(workerConnection); - RemoteTransactionBegin(workerConnection); - - /* add connection to the list */ - workerConnectionList = lappend(workerConnectionList, workerConnection); - } - - /* send and check results for every command one by one */ - foreach(commandCell, commandList) - { - char *command = lfirst(commandCell); - - /* first only send the command */ - foreach(workerConnectionCell, workerConnectionList) - { - MultiConnection *workerConnection = - (MultiConnection *) lfirst(workerConnectionCell); - - int querySent = SendRemoteCommand(workerConnection, command); - if (querySent == 0) - { - ReportConnectionError(workerConnection, ERROR); - } - } - - /* then check the result separately to run long running commands in parallel */ - foreach(workerConnectionCell, workerConnectionList) - { - MultiConnection *workerConnection = - (MultiConnection *) lfirst(workerConnectionCell); - bool raiseInterrupts = true; - - PGresult *result = GetRemoteCommandResult(workerConnection, raiseInterrupts); - if (!IsResponseOK(result)) - { - ReportResultError(workerConnection, result, ERROR); - } - - PQclear(result); - ForgetResults(workerConnection); - } - } - - /* finally commit each transaction and close connections */ - foreach(workerConnectionCell, workerConnectionList) - { - MultiConnection *workerConnection = - (MultiConnection *) lfirst(workerConnectionCell); - - RemoteTransactionCommit(workerConnection); - CloseConnection(workerConnection); - } -} - - -/* - * InsertSplitOffShardMetadata inserts new shard and shard placement data into - * catolog tables both the coordinator and mx nodes. - */ -static void -InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList) -{ - List *syncedShardList = NIL; - ListCell *shardCell = NULL; - ListCell *commandCell = NULL; - - /* add new metadata */ - foreach(shardCell, splitOffShardList) - { - ShardInterval *splitOffShard = (ShardInterval *) lfirst(shardCell); - Oid relationId = splitOffShard->relationId; - uint64 shardId = splitOffShard->shardId; - char storageType = splitOffShard->storageType; - ListCell *shardPlacementCell = NULL; - - int32 shardMinValue = DatumGetInt32(splitOffShard->minValue); - int32 shardMaxValue = DatumGetInt32(splitOffShard->maxValue); - text *shardMinValueText = IntegerToText(shardMinValue); - text *shardMaxValueText = IntegerToText(shardMaxValue); - - InsertShardRow(relationId, shardId, storageType, shardMinValueText, - shardMaxValueText); - - /* split off shard placement metadata */ - foreach(shardPlacementCell, sourcePlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); - uint64 shardSize = 0; - - InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, SHARD_STATE_ACTIVE, - shardSize, placement->groupId); - } - - if (ShouldSyncTableMetadata(relationId)) - { - syncedShardList = lappend(syncedShardList, splitOffShard); - } - } - - /* send commands to synced nodes one by one */ - List *splitOffShardMetadataCommandList = ShardListInsertCommand(syncedShardList); - foreach(commandCell, splitOffShardMetadataCommandList) - { - char *command = (char *) lfirst(commandCell); - SendCommandToWorkersWithMetadata(command); - } -} - - -/* - * DropShardList drops shards and their metadata from both the coordinator and - * mx nodes. - */ -static void -DropShardList(List *shardIntervalList) -{ - ListCell *shardIntervalCell = NULL; - - foreach(shardIntervalCell, shardIntervalList) - { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); - ListCell *shardPlacementCell = NULL; - Oid relationId = shardInterval->relationId; - uint64 oldShardId = shardInterval->shardId; - - /* delete metadata from synced nodes */ - if (ShouldSyncTableMetadata(relationId)) - { - ListCell *commandCell = NULL; - - /* send the commands one by one */ - List *shardMetadataDeleteCommandList = ShardDeleteCommandList(shardInterval); - foreach(commandCell, shardMetadataDeleteCommandList) - { - char *command = (char *) lfirst(commandCell); - SendCommandToWorkersWithMetadata(command); - } - } - - /* delete shard placements and drop shards */ - List *shardPlacementList = ActiveShardPlacementList(oldShardId); - foreach(shardPlacementCell, shardPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); - char *workerName = placement->nodeName; - uint32 workerPort = placement->nodePort; - StringInfo dropQuery = makeStringInfo(); - - DeleteShardPlacementRow(placement->placementId); - - /* get shard name */ - char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); - - char storageType = shardInterval->storageType; - if (storageType == SHARD_STORAGE_TABLE) - { - appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, - qualifiedShardName); - } - else if (storageType == SHARD_STORAGE_FOREIGN) - { - appendStringInfo(dropQuery, DROP_FOREIGN_TABLE_COMMAND, - qualifiedShardName); - } - - /* drop old shard */ - SendCommandToWorker(workerName, workerPort, dropQuery->data); - } - - /* delete shard row */ - DeleteShardRow(oldShardId); - } -} diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index f1a0d04d2..79e577aa3 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -33,22 +33,12 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_physical_planner.h" -/* - * These will be public function when citus-enterprise is merged - */ -static void ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard); -static void InsertSplitOffShardMetadata(List *splitOffShardList, - List *sourcePlacementList); -static void DropShardList(List *shardIntervalList); -static void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList); -static void ExecuteCommandListOnPlacements(List *commandList, List *placementList); - /* Function declarations */ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, ShardInterval *shardIntervalToSplit, List *shardSplitPointsList, List *nodeIdsForPlacementList); -static void CreateSplitShardsForShardGroup(uint32_t sourceShardNodeId, +static void CreateSplitShardsForShardGroup(WorkerNode* sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList, @@ -64,17 +54,19 @@ static void BlockingShardSplit(SplitOperation splitOperation, ShardInterval *shardIntervalToSplit, List *shardSplitPointsList, List *workersForPlacementList); -static void DoSplitCopy(uint32_t sourceShardNodeId, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList); +static void DoSplitCopy(WorkerNode* sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList); static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChildrenShardIntervalList, List* workersForPlacementList); /* Customize error message strings based on operation type */ static const char *const SplitOperationName[] = { - [SHARD_SPLIT_API] = "split" + [SHARD_SPLIT_API] = "split", + [ISOLATE_TENANT_TO_NEW_SHARD] = "isolate", }; static const char *const SplitTargetName[] = { - [SHARD_SPLIT_API] = "shard" + [SHARD_SPLIT_API] = "shard", + [ISOLATE_TENANT_TO_NEW_SHARD] = "tenant", }; static const char *const SplitOperationType[] = { @@ -345,10 +337,6 @@ SplitShard(SplitMode splitMode, LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock); } - /* - * TODO(niupre): When all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API, - * these two methods will be merged. - */ ErrorIfCannotSplitShard(SHARD_SPLIT_API, shardIntervalToSplit); ErrorIfCannotSplitShardExtended( SHARD_SPLIT_API, @@ -386,7 +374,7 @@ SplitShard(SplitMode splitMode, else { /* we only support blocking shard split in this code path for now. */ - ereport(ERROR, (errmsg("Invalid split mode %s.", SplitOperationType[splitMode]))); + ereport(ERROR, (errmsg("Invalid split mode value %d.", splitMode))); } } @@ -415,7 +403,7 @@ BlockingShardSplit(SplitOperation splitOperation, sourceColocatedShardIntervalList, shardSplitPointsList); - // Only single placement allowed (already validated by caller) + /* Only single placement allowed (already validated by caller) */ List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId); Assert(sourcePlacementList->length == 1); WorkerNode* sourceShardToCopyNode = (WorkerNode *) linitial(sourcePlacementList); @@ -423,7 +411,7 @@ BlockingShardSplit(SplitOperation splitOperation, /* Physically create split children and perform split copy */ List *splitOffShardList = NULL; CreateSplitShardsForShardGroup( - sourceShardToCopyNode->nodeId, + sourceShardToCopyNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList, @@ -451,7 +439,7 @@ BlockingShardSplit(SplitOperation splitOperation, /* Create ShardGroup split children on a list of corresponding workers. */ static void -CreateSplitShardsForShardGroup(uint32_t sourceShardNodeId, +CreateSplitShardsForShardGroup(WorkerNode* sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList, @@ -483,26 +471,48 @@ CreateSplitShardsForShardGroup(uint32_t sourceShardNodeId, } /* Perform Split Copy */ - DoSplitCopy(sourceShardNodeId, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList); + DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList); // TODO(niupre) : Use Adaptive execution for creating multiple indexes parallely. + foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) + { + ShardInterval *shardInterval = NULL; + WorkerNode *workerPlacementNode = NULL; + forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, + workersForPlacementList) + { + List *indexCommandList = GetPostLoadTableCreationCommands( + shardInterval->relationId, + true /* includeIndexes */, + true /* includeReplicaIdentity */); + indexCommandList = WorkerApplyShardDDLCommandList( + indexCommandList, + shardInterval->shardId); + + CreateObjectOnPlacement(indexCommandList, workerPlacementNode); + } + } } +/* + * Perform Split Copy from source shard(s) to split children. + * 'sourceShardNode' : Source shard worker node. + * 'sourceColocatedShardIntervalList' : List of source shard intervals from shard group. + * 'shardGroupSplitIntervalListList' : List of shard intervals for split children. + * 'workersForPlacementList' : List of workers for split children placement. + */ static void -DoSplitCopy(uint32_t sourceShardNodeId, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList) +DoSplitCopy(WorkerNode* sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList) { ShardInterval *sourceShardIntervalToCopy = NULL; List *splitShardIntervalList = NULL; - WorkerNode *workerNodeSource = NULL; - WorkerNode *workerNodeDestination = NULL; int taskId = 0; List *splitCopyTaskList = NULL; - forthree_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList, - splitShardIntervalList, shardGroupSplitIntervalListList, - workerNodeDestination, workersForPlacementList) + forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList, + splitShardIntervalList, shardGroupSplitIntervalListList) { - StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(sourceShardIntervalToCopy, splitShardIntervalList, workersForPlacementList); + StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(sourceShardIntervalToCopy, splitShardIntervalList, destinationWorkerNodesList); Task *task = CreateBasicTask( sourceShardIntervalToCopy->shardId, /* jobId */ @@ -511,7 +521,7 @@ DoSplitCopy(uint32_t sourceShardNodeId, List *sourceColocatedShardIntervalList, splitCopyUdfCommand->data); ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); - SetPlacementNodeMetadata(taskPlacement, workerNodeSource); + SetPlacementNodeMetadata(taskPlacement, sourceShardNode); task->taskPlacementList = list_make1(taskPlacement); @@ -519,12 +529,17 @@ DoSplitCopy(uint32_t sourceShardNodeId, List *sourceColocatedShardIntervalList, taskId++; } - // TODO(niupre) : Pass appropriate MaxParallelShards value from GUC. - ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList, 1, NULL /* jobIdList */); + ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList, MaxAdaptiveExecutorPoolSize, NULL /* jobIdList */); } +/* + * Create Copy command for a given shard source shard to be copied to corresponding split children. + * 'sourceShardSplitInterval' : Source shard interval to be copied. + * 'splitChildrenShardINnerIntervalList' : List of shard intervals for split children. + * 'destinationWorkerNodesList' : List of workers for split children placement. + */ static StringInfo -CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChildrenShardIntervalList, List* workersForPlacementList) +CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChildrenShardIntervalList, List* destinationWorkerNodesList) { StringInfo splitCopyInfoArray = makeStringInfo(); appendStringInfo(splitCopyInfoArray, "ARRAY["); @@ -532,7 +547,7 @@ CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChild ShardInterval *splitChildShardInterval = NULL; bool addComma = false; WorkerNode *destinationWorkerNode = NULL; - forboth_ptr(splitChildShardInterval, splitChildrenShardIntervalList, destinationWorkerNode, workersForPlacementList) + forboth_ptr(splitChildShardInterval, splitChildrenShardIntervalList, destinationWorkerNode, destinationWorkerNodesList) { if(addComma) { @@ -646,7 +661,7 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard, * InsertSplitOffShardMetadata inserts new shard and shard placement data into * catolog tables both the coordinator and mx nodes. */ -static void +void InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList) { List *syncedShardList = NIL; @@ -699,7 +714,7 @@ InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList) * DropShardList drops shards and their metadata from both the coordinator and * mx nodes. */ -static void +void DropShardList(List *shardIntervalList) { ListCell *shardIntervalCell = NULL; @@ -769,7 +784,7 @@ DropShardList(List *shardIntervalList) * executed over a single connection to prevent self-deadlocks. The latter * one can be executed in parallel if there are multiple replicas. */ -static void +void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList) { ListCell *splitOffShardCell = NULL; @@ -840,7 +855,7 @@ CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList) * commands in parallel. Finally, it sends commit messages to all connections * and close them. */ -static void +void ExecuteCommandListOnPlacements(List *commandList, List *placementList) { List *workerConnectionList = NIL; diff --git a/src/backend/distributed/utils/array_type.c b/src/backend/distributed/utils/array_type.c index b5f6cf995..aca06de99 100644 --- a/src/backend/distributed/utils/array_type.c +++ b/src/backend/distributed/utils/array_type.c @@ -12,10 +12,13 @@ #include "postgres.h" #include "miscadmin.h" +#include "pg_version_compat.h" +#include "pgtypes.h" #include "catalog/pg_type.h" #include "nodes/pg_list.h" #include "distributed/utils/array_type.h" #include "utils/array.h" +#include "utils/builtins.h" #include "utils/lsyscache.h" @@ -133,14 +136,26 @@ extern List * TextArrayTypeToIntegerList(ArrayType *arrayObject, Oid datumTypeId switch (datumTypeId) { case INT2OID: - list = lappend(list, pg_strtoint16(intAsStr)); + { + int16_t *int16Value = palloc0(sizeof(int16_t)); + *int16Value = pg_strtoint16(intAsStr); + list = lappend(list, (void*) int16Value); break; + } case INT4OID: - list = lappend(list, pg_strtoint32(intAsStr)); + { + int32_t *int32Value = palloc0(sizeof(int32_t)); + *int32Value = pg_strtoint32(intAsStr); + list = lappend(list, (void*) int32Value); break; + } case INT8OID: - list = lappend(list, pg_strtoint64(intAsStr)); + { + int64_t *int64Value = palloc0(sizeof(int64_t)); + *int64Value = pg_strtoint64(intAsStr); + list = lappend(list, (void*) int64Value); break; + } default: ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("Unsupported datum type for array."))); diff --git a/src/include/distributed/listutils.h b/src/include/distributed/listutils.h index c1ea8c20f..ad70fbabc 100644 --- a/src/include/distributed/listutils.h +++ b/src/include/distributed/listutils.h @@ -97,26 +97,6 @@ typedef struct ListCellAndListWrapper var2 ## CellDoNotUse = lnext_compat(l2, var2 ## CellDoNotUse) \ ) -/* - * forthree_ptr - - * a convenience macro which loops through three lists of pointers at the same - * time, without needing a ListCell. It only needs three declared pointer - * variables to store the pointer of each of the three cells in. - */ -#define forthree_ptr(var1, l1, var2, l2, var3, l3) \ - for (ListCell *(var1 ## CellDoNotUse) = list_head(l1), \ - *(var2 ## CellDoNotUse) = list_head(l2), \ - *(var3 ## CellDoNotUse) = list_head(l3); \ - (var1 ## CellDoNotUse) != NULL && \ - (var2 ## CellDoNotUse) != NULL && \ - (var3 ## CellDoNotUse) != NULL && \ - (((var1) = lfirst(var1 ## CellDoNotUse)) || true) && \ - (((var2) = lfirst(var2 ## CellDoNotUse)) || true) && \ - (((var3) = lfirst(var3 ## CellDoNotUse)) || true); \ - var1 ## CellDoNotUse = lnext_compat(l1, var1 ## CellDoNotUse), \ - var2 ## CellDoNotUse = lnext_compat(l2, var2 ## CellDoNotUse), \ - var3 ## CellDoNotUse = lnext_compat(l3, var3 ## CellDoNotUse) \ - ) /* * forboth_ptr_oid - diff --git a/src/include/distributed/shard_split.h b/src/include/distributed/shard_split.h index 27d1ade5a..d67fb6036 100644 --- a/src/include/distributed/shard_split.h +++ b/src/include/distributed/shard_split.h @@ -9,13 +9,13 @@ *------------------------------------------------------------------------- */ -#ifndef SHARDSPLIT_UTILS_H_ -#define SHARDSPLIT_UTILS_H_ +#ifndef SHARDSPLIT_H_ +#define SHARDSPLIT_H_ /* Split Modes supported by Shard Split API */ typedef enum SplitMode { - BLOCKING_SPLIT = 0, + BLOCKING_SPLIT = 0 } SplitMode; /* @@ -24,12 +24,13 @@ typedef enum SplitMode */ typedef enum SplitOperation { - SHARD_SPLIT_API = 0 + SHARD_SPLIT_API = 0, + ISOLATE_TENANT_TO_NEW_SHARD } SplitOperation; /* - * SplitShard API to split a given shard (or shard group) in blocking / non-blocking fashion - * based on specified split points to a set of destination nodes. + * SplitShard API to split a given shard (or shard group) using split mode and + * specified split points to a set of destination nodes. */ extern void SplitShard(SplitMode splitMode, SplitOperation splitOperation, @@ -37,4 +38,11 @@ extern void SplitShard(SplitMode splitMode, List *shardSplitPointsList, List *nodeIdsForPlacementList); -#endif /* SHARDSPLIT_UTILS_H_ */ +/* TODO(niupre): Make all these APIs private when all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API. */ +extern void ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard); +extern void InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList); +extern void DropShardList(List *shardIntervalList); +extern void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList); +extern void ExecuteCommandListOnPlacements(List *commandList, List *placementList); + +#endif /* SHARDSPLIT_H_ */