From c48bf5ebaac852b48d5435f14399c183a3c4303a Mon Sep 17 00:00:00 2001 From: EmelSimsek Date: Fri, 8 Mar 2024 14:51:44 +0300 Subject: [PATCH] Rename per worker --- .../distributed/operations/shard_rebalancer.c | 2 +- .../utils/multi_partitioning_utils.c | 72 ++++++++++++------- src/include/distributed/shard_rebalancer.h | 2 + 3 files changed, 48 insertions(+), 28 deletions(-) diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index d1868d3c4..b1780b669 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -435,7 +435,7 @@ BigIntArrayDatumContains(Datum *array, int arrayLength, uint64 toFind) * FullShardPlacementList returns a List containing all the shard placements of * a specific table (excluding the excludedShardArray) */ -static List * +List * FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray) { List *shardPlacementList = NIL; diff --git a/src/backend/distributed/utils/multi_partitioning_utils.c b/src/backend/distributed/utils/multi_partitioning_utils.c index ede2008ca..e034e276b 100644 --- a/src/backend/distributed/utils/multi_partitioning_utils.c +++ b/src/backend/distributed/utils/multi_partitioning_utils.c @@ -50,6 +50,7 @@ #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" +extern List *FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray); static char * PartitionBound(Oid partitionId); static Relation try_relation_open_nolock(Oid relationId); static List * CreateFixPartitionConstraintsTaskList(Oid relationId); @@ -555,43 +556,60 @@ CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId, /* lock metadata before getting placement lists */ LockShardListMetadata(parentShardIntervalList, ShareLock); - MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, - "CreateFixPartitionShardIndexNames", - ALLOCSET_DEFAULT_SIZES); - MemoryContext oldContext = MemoryContextSwitchTo(localContext); - int taskId = 1; - ShardInterval *parentShardInterval = NULL; - foreach_ptr(parentShardInterval, parentShardIntervalList) - { - uint64 parentShardId = parentShardInterval->shardId; + List *shardPlacementList = FullShardPlacementList(parentRelationId, construct_empty_array(INT4OID)); - List *queryStringList = - WorkerFixPartitionShardIndexNamesCommandList(parentShardId, + + List *workerNodeList = ReadDistNode(true); + + /* make sure we have deterministic output for our tests */ + workerNodeList = SortList(workerNodeList, CompareWorkerNodes); + + MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CreateFixPartitionShardIndexNames", + ALLOCSET_DEFAULT_SIZES); + MemoryContext oldContext = MemoryContextSwitchTo(localContext); + + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + List *shardsOnNode = FilterActiveShardPlacementListByNode( + shardPlacementList, workerNode); + + ShardPlacement *shardPlacement = NULL; + + foreach_ptr(shardPlacement, shardsOnNode) + { + uint64 parentShardId = shardPlacement->shardId; + + List *queryStringList = + WorkerFixPartitionShardIndexNamesCommandList(parentShardId, parentIndexIdList, partitionRelationId); - if (queryStringList != NIL) - { - Task *task = CitusMakeNode(Task); - task->jobId = INVALID_JOB_ID; - task->taskId = taskId++; - task->taskType = DDL_TASK; + if (queryStringList != NIL) + { + Task *task = CitusMakeNode(Task); + task->jobId = INVALID_JOB_ID; + task->taskId = taskId++; + task->taskType = DDL_TASK; - char *prefix = "SELECT pg_catalog.citus_run_local_command($$"; - char *postfix = "$$)"; - char *string = StringJoinParams(queryStringList, ';', prefix, postfix); + char *prefix = "SELECT pg_catalog.citus_run_local_command($$"; + char *postfix = "$$)"; + char *string = StringJoinParams(queryStringList, ';', prefix, postfix); - SetTaskQueryString(task, string); + SetTaskQueryString(task, string); - task->dependentTaskList = NULL; - task->replicationModel = REPLICATION_MODEL_INVALID; - task->anchorShardId = parentShardId; - task->taskPlacementList = ActiveShardPlacementList(parentShardId); + task->dependentTaskList = NULL; + task->replicationModel = REPLICATION_MODEL_INVALID; + task->anchorShardId = parentShardId; + task->taskPlacementList = ActiveShardPlacementList(parentShardId); - bool localExecutionSupported = true; - ExecuteUtilityTaskList(list_make1(task), localExecutionSupported); + bool localExecutionSupported = true; + ExecuteUtilityTaskList(list_make1(task), localExecutionSupported); + } + + break; } /* after every iteration, clean-up all the memory associated with it */ diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 79414eb3c..f6f555677 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -222,4 +222,6 @@ extern void SetupRebalanceMonitor(List *placementUpdateList, uint64 initialProgressState, PlacementUpdateStatus initialStatus); +extern List *FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray); + #endif /* SHARD_REBALANCER_H */