Add queries for the shards on the worker

pull/7557/head
EmelSimsek 2024-03-11 17:37:44 +03:00
parent 8afa2d0386
commit 4398c123b1
No known key found for this signature in database
GPG Key ID: EB13DFB77C32D7D8
1 changed files with 37 additions and 10 deletions

View File

@ -60,15 +60,20 @@ static void CreateFixPartitionShardIndexNames(Oid parentRelationId,
Oid parentIndexOid); Oid parentIndexOid);
static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId, static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
List *indexIdList, List *indexIdList,
Oid partitionRelationId); Oid partitionRelationId,
List *shardPlacements);
static List * WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex( static List * WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(
char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId); char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId,
List *
shardPlacements);
static List * WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid static List * WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid
partitionIndexId, partitionIndexId,
char * char *
qualifiedParentShardIndexName, qualifiedParentShardIndexName,
Oid Oid
partitionId); partitionId,
List *
shardPlacements);
static List * CheckConstraintNameListForRelation(Oid relationId); static List * CheckConstraintNameListForRelation(Oid relationId);
static bool RelationHasConstraint(Oid relationId, char *constraintName); static bool RelationHasConstraint(Oid relationId, char *constraintName);
static char * RenameConstraintCommand(Oid relationId, char *constraintName, static char * RenameConstraintCommand(Oid relationId, char *constraintName,
@ -567,10 +572,14 @@ CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId,
{ {
uint64 parentShardId = parentShardInterval->shardId; uint64 parentShardId = parentShardInterval->shardId;
List *shardPlacementList = ActiveShardPlacementList(parentShardId);
List *queryStringList = List *queryStringList =
WorkerFixPartitionShardIndexNamesCommandList(parentShardId, WorkerFixPartitionShardIndexNamesCommandList(parentShardId,
parentIndexIdList, parentIndexIdList,
partitionRelationId); partitionRelationId,
shardPlacementList);
if (queryStringList != NIL) if (queryStringList != NIL)
{ {
Task *task = CitusMakeNode(Task); Task *task = CitusMakeNode(Task);
@ -591,7 +600,7 @@ CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId,
task->taskPlacementList = ActiveShardPlacementList(parentShardId); task->taskPlacementList = ActiveShardPlacementList(parentShardId);
bool localExecutionSupported = true; bool localExecutionSupported = true;
ExecuteUtilityTaskList(list_make1(task), localExecutionSupported); ExecuteUtilityTaskList(task, localExecutionSupported);
} }
/* after every iteration, clean-up all the memory associated with it */ /* after every iteration, clean-up all the memory associated with it */
@ -611,7 +620,8 @@ CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId,
static List * static List *
WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId, WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
List *parentIndexIdList, List *parentIndexIdList,
Oid partitionRelationId) Oid partitionRelationId,
List *shardPlacements)
{ {
List *commandList = NIL; List *commandList = NIL;
Oid parentIndexId = InvalidOid; Oid parentIndexId = InvalidOid;
@ -634,7 +644,8 @@ WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
char *qualifiedParentShardIndexName = quote_qualified_identifier(schemaName, char *qualifiedParentShardIndexName = quote_qualified_identifier(schemaName,
parentShardIndexName); parentShardIndexName);
List *commands = WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex( List *commands = WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(
qualifiedParentShardIndexName, parentIndexId, partitionRelationId); qualifiedParentShardIndexName, parentIndexId, partitionRelationId,
shardPlacements);
commandList = list_concat(commandList, commands); commandList = list_concat(commandList, commands);
} }
@ -653,7 +664,8 @@ WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
*/ */
static List * static List *
WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex( WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(
char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId) char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId,
List *shardPlacements)
{ {
List *commandList = NIL; List *commandList = NIL;
@ -673,7 +685,8 @@ WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(
{ {
List *commands = List *commands =
WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex( WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(
partitionIndexId, qualifiedParentShardIndexName, partitionId); partitionIndexId, qualifiedParentShardIndexName, partitionId,
shardPlacements);
commandList = list_concat(commandList, commands); commandList = list_concat(commandList, commands);
} }
} }
@ -690,7 +703,8 @@ static List *
WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndexId, WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndexId,
char * char *
qualifiedParentShardIndexName, qualifiedParentShardIndexName,
Oid partitionId) Oid partitionId,
List *shardPlacements)
{ {
List *commandList = NIL; List *commandList = NIL;
@ -710,6 +724,19 @@ WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndex
*/ */
uint64 partitionShardId = partitionShardInterval->shardId; uint64 partitionShardId = partitionShardInterval->shardId;
List *activePlacementList = ActiveShardPlacementList(partitionShardId);
if ((list_length(activePlacementList) > 0) && (list_length(shardPlacements) > 0))
{
ShardPlacement *left = (ShardPlacement *) linitial(activePlacementList);
ShardPlacement *right = (ShardPlacement *) linitial(shardPlacements);
if (left->nodeId != right->nodeId)
{
continue;
}
}
/* get qualified partition shard name */ /* get qualified partition shard name */
char *partitionShardName = pstrdup(partitionName); char *partitionShardName = pstrdup(partitionName);
AppendShardIdToName(&partitionShardName, partitionShardId); AppendShardIdToName(&partitionShardName, partitionShardId);