From 4398c123b140eef585205db43cf4a90a29b796f6 Mon Sep 17 00:00:00 2001 From: EmelSimsek Date: Mon, 11 Mar 2024 17:37:44 +0300 Subject: [PATCH] Add queries for the shards on the worker --- .../utils/multi_partitioning_utils.c | 47 +++++++++++++++---- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/utils/multi_partitioning_utils.c b/src/backend/distributed/utils/multi_partitioning_utils.c index ede2008ca..ab9c10d0b 100644 --- a/src/backend/distributed/utils/multi_partitioning_utils.c +++ b/src/backend/distributed/utils/multi_partitioning_utils.c @@ -60,15 +60,20 @@ static void CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid parentIndexOid); static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId, List *indexIdList, - Oid partitionRelationId); + Oid partitionRelationId, + List *shardPlacements); static List * WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex( - char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId); + char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId, + List * + shardPlacements); static List * WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndexId, char * qualifiedParentShardIndexName, Oid - partitionId); + partitionId, + List * + shardPlacements); static List * CheckConstraintNameListForRelation(Oid relationId); static bool RelationHasConstraint(Oid relationId, char *constraintName); static char * RenameConstraintCommand(Oid relationId, char *constraintName, @@ -567,10 +572,14 @@ CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId, { uint64 parentShardId = parentShardInterval->shardId; + List *shardPlacementList = ActiveShardPlacementList(parentShardId); + List *queryStringList = WorkerFixPartitionShardIndexNamesCommandList(parentShardId, parentIndexIdList, - partitionRelationId); + partitionRelationId, + shardPlacementList); + if (queryStringList != NIL) { Task *task = CitusMakeNode(Task); @@ -591,7 +600,7 @@ CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId, task->taskPlacementList = ActiveShardPlacementList(parentShardId); bool localExecutionSupported = true; - ExecuteUtilityTaskList(list_make1(task), localExecutionSupported); + ExecuteUtilityTaskList(task, localExecutionSupported); } /* after every iteration, clean-up all the memory associated with it */ @@ -611,7 +620,8 @@ CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId, static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId, List *parentIndexIdList, - Oid partitionRelationId) + Oid partitionRelationId, + List *shardPlacements) { List *commandList = NIL; Oid parentIndexId = InvalidOid; @@ -634,7 +644,8 @@ WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId, char *qualifiedParentShardIndexName = quote_qualified_identifier(schemaName, parentShardIndexName); List *commands = WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex( - qualifiedParentShardIndexName, parentIndexId, partitionRelationId); + qualifiedParentShardIndexName, parentIndexId, partitionRelationId, + shardPlacements); commandList = list_concat(commandList, commands); } @@ -653,7 +664,8 @@ WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId, */ static List * WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex( - char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId) + char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId, + List *shardPlacements) { List *commandList = NIL; @@ -673,7 +685,8 @@ WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex( { List *commands = WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex( - partitionIndexId, qualifiedParentShardIndexName, partitionId); + partitionIndexId, qualifiedParentShardIndexName, partitionId, + shardPlacements); commandList = list_concat(commandList, commands); } } @@ -690,7 +703,8 @@ static List * WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndexId, char * qualifiedParentShardIndexName, - Oid partitionId) + Oid partitionId, + List *shardPlacements) { List *commandList = NIL; @@ -710,6 +724,19 @@ WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndex */ uint64 partitionShardId = partitionShardInterval->shardId; + List *activePlacementList = ActiveShardPlacementList(partitionShardId); + + if ((list_length(activePlacementList) > 0) && (list_length(shardPlacements) > 0)) + { + ShardPlacement *left = (ShardPlacement *) linitial(activePlacementList); + ShardPlacement *right = (ShardPlacement *) linitial(shardPlacements); + + if (left->nodeId != right->nodeId) + { + continue; + } + } + /* get qualified partition shard name */ char *partitionShardName = pstrdup(partitionName); AppendShardIdToName(&partitionShardName, partitionShardId);