Rename per worker

pull/7555/head
EmelSimsek 2024-03-08 14:51:44 +03:00
parent 56e014e64e
commit c48bf5ebaa
No known key found for this signature in database
GPG Key ID: EB13DFB77C32D7D8
3 changed files with 48 additions and 28 deletions

View File

@ -435,7 +435,7 @@ BigIntArrayDatumContains(Datum *array, int arrayLength, uint64 toFind)
* FullShardPlacementList returns a List containing all the shard placements of * FullShardPlacementList returns a List containing all the shard placements of
* a specific table (excluding the excludedShardArray) * a specific table (excluding the excludedShardArray)
*/ */
static List * List *
FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray) FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray)
{ {
List *shardPlacementList = NIL; List *shardPlacementList = NIL;

View File

@ -50,6 +50,7 @@
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
extern List *FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray);
static char * PartitionBound(Oid partitionId); static char * PartitionBound(Oid partitionId);
static Relation try_relation_open_nolock(Oid relationId); static Relation try_relation_open_nolock(Oid relationId);
static List * CreateFixPartitionConstraintsTaskList(Oid relationId); static List * CreateFixPartitionConstraintsTaskList(Oid relationId);
@ -555,43 +556,60 @@ CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId,
/* lock metadata before getting placement lists */ /* lock metadata before getting placement lists */
LockShardListMetadata(parentShardIntervalList, ShareLock); LockShardListMetadata(parentShardIntervalList, ShareLock);
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CreateFixPartitionShardIndexNames",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
int taskId = 1; int taskId = 1;
ShardInterval *parentShardInterval = NULL; List *shardPlacementList = FullShardPlacementList(parentRelationId, construct_empty_array(INT4OID));
foreach_ptr(parentShardInterval, parentShardIntervalList)
{
uint64 parentShardId = parentShardInterval->shardId;
List *queryStringList =
WorkerFixPartitionShardIndexNamesCommandList(parentShardId, List *workerNodeList = ReadDistNode(true);
/* make sure we have deterministic output for our tests */
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CreateFixPartitionShardIndexNames",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
List *shardsOnNode = FilterActiveShardPlacementListByNode(
shardPlacementList, workerNode);
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardsOnNode)
{
uint64 parentShardId = shardPlacement->shardId;
List *queryStringList =
WorkerFixPartitionShardIndexNamesCommandList(parentShardId,
parentIndexIdList, parentIndexIdList,
partitionRelationId); partitionRelationId);
if (queryStringList != NIL) if (queryStringList != NIL)
{ {
Task *task = CitusMakeNode(Task); Task *task = CitusMakeNode(Task);
task->jobId = INVALID_JOB_ID; task->jobId = INVALID_JOB_ID;
task->taskId = taskId++; task->taskId = taskId++;
task->taskType = DDL_TASK; task->taskType = DDL_TASK;
char *prefix = "SELECT pg_catalog.citus_run_local_command($$"; char *prefix = "SELECT pg_catalog.citus_run_local_command($$";
char *postfix = "$$)"; char *postfix = "$$)";
char *string = StringJoinParams(queryStringList, ';', prefix, postfix); char *string = StringJoinParams(queryStringList, ';', prefix, postfix);
SetTaskQueryString(task, string); SetTaskQueryString(task, string);
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = parentShardId; task->anchorShardId = parentShardId;
task->taskPlacementList = ActiveShardPlacementList(parentShardId); task->taskPlacementList = ActiveShardPlacementList(parentShardId);
bool localExecutionSupported = true; bool localExecutionSupported = true;
ExecuteUtilityTaskList(list_make1(task), localExecutionSupported); ExecuteUtilityTaskList(list_make1(task), localExecutionSupported);
}
break;
} }
/* after every iteration, clean-up all the memory associated with it */ /* after every iteration, clean-up all the memory associated with it */

View File

@ -222,4 +222,6 @@ extern void SetupRebalanceMonitor(List *placementUpdateList,
uint64 initialProgressState, uint64 initialProgressState,
PlacementUpdateStatus initialStatus); PlacementUpdateStatus initialStatus);
extern List *FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray);
#endif /* SHARD_REBALANCER_H */ #endif /* SHARD_REBALANCER_H */