From 4398c123b140eef585205db43cf4a90a29b796f6 Mon Sep 17 00:00:00 2001 From: EmelSimsek Date: Mon, 11 Mar 2024 17:37:44 +0300 Subject: [PATCH 1/4] Add queries for the shards on the worker --- .../utils/multi_partitioning_utils.c | 47 +++++++++++++++---- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/utils/multi_partitioning_utils.c b/src/backend/distributed/utils/multi_partitioning_utils.c index ede2008ca..ab9c10d0b 100644 --- a/src/backend/distributed/utils/multi_partitioning_utils.c +++ b/src/backend/distributed/utils/multi_partitioning_utils.c @@ -60,15 +60,20 @@ static void CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid parentIndexOid); static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId, List *indexIdList, - Oid partitionRelationId); + Oid partitionRelationId, + List *shardPlacements); static List * WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex( - char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId); + char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId, + List * + shardPlacements); static List * WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndexId, char * qualifiedParentShardIndexName, Oid - partitionId); + partitionId, + List * + shardPlacements); static List * CheckConstraintNameListForRelation(Oid relationId); static bool RelationHasConstraint(Oid relationId, char *constraintName); static char * RenameConstraintCommand(Oid relationId, char *constraintName, @@ -567,10 +572,14 @@ CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId, { uint64 parentShardId = parentShardInterval->shardId; + List *shardPlacementList = ActiveShardPlacementList(parentShardId); + List *queryStringList = WorkerFixPartitionShardIndexNamesCommandList(parentShardId, parentIndexIdList, - partitionRelationId); + partitionRelationId, + shardPlacementList); + if (queryStringList != NIL) { Task *task = CitusMakeNode(Task); @@ -591,7 +600,7 @@ CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId, task->taskPlacementList = ActiveShardPlacementList(parentShardId); bool localExecutionSupported = true; - ExecuteUtilityTaskList(list_make1(task), localExecutionSupported); + ExecuteUtilityTaskList(task, localExecutionSupported); } /* after every iteration, clean-up all the memory associated with it */ @@ -611,7 +620,8 @@ CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId, static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId, List *parentIndexIdList, - Oid partitionRelationId) + Oid partitionRelationId, + List *shardPlacements) { List *commandList = NIL; Oid parentIndexId = InvalidOid; @@ -634,7 +644,8 @@ WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId, char *qualifiedParentShardIndexName = quote_qualified_identifier(schemaName, parentShardIndexName); List *commands = WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex( - qualifiedParentShardIndexName, parentIndexId, partitionRelationId); + qualifiedParentShardIndexName, parentIndexId, partitionRelationId, + shardPlacements); commandList = list_concat(commandList, commands); } @@ -653,7 +664,8 @@ WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId, */ static List * WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex( - char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId) + char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId, + List *shardPlacements) { List *commandList = NIL; @@ -673,7 +685,8 @@ WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex( { List *commands = WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex( - partitionIndexId, qualifiedParentShardIndexName, partitionId); + partitionIndexId, qualifiedParentShardIndexName, partitionId, + shardPlacements); commandList = list_concat(commandList, commands); } } @@ -690,7 +703,8 @@ static List * WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndexId, char * qualifiedParentShardIndexName, - Oid partitionId) + Oid partitionId, + List *shardPlacements) { List *commandList = NIL; @@ -710,6 +724,19 @@ WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndex */ uint64 partitionShardId = partitionShardInterval->shardId; + List *activePlacementList = ActiveShardPlacementList(partitionShardId); + + if ((list_length(activePlacementList) > 0) && (list_length(shardPlacements) > 0)) + { + ShardPlacement *left = (ShardPlacement *) linitial(activePlacementList); + ShardPlacement *right = (ShardPlacement *) linitial(shardPlacements); + + if (left->nodeId != right->nodeId) + { + continue; + } + } + /* get qualified partition shard name */ char *partitionShardName = pstrdup(partitionName); AppendShardIdToName(&partitionShardName, partitionShardId); From baa5941c508bd179735fea546f8769ca5d0b13e8 Mon Sep 17 00:00:00 2001 From: EmelSimsek Date: Mon, 11 Mar 2024 17:43:28 +0300 Subject: [PATCH 2/4] FixBuildError --- src/backend/distributed/utils/multi_partitioning_utils.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/distributed/utils/multi_partitioning_utils.c b/src/backend/distributed/utils/multi_partitioning_utils.c index ab9c10d0b..179c905ff 100644 --- a/src/backend/distributed/utils/multi_partitioning_utils.c +++ b/src/backend/distributed/utils/multi_partitioning_utils.c @@ -600,7 +600,7 @@ CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId, task->taskPlacementList = ActiveShardPlacementList(parentShardId); bool localExecutionSupported = true; - ExecuteUtilityTaskList(task, localExecutionSupported); + ExecuteUtilityTaskList(list_make1(task), localExecutionSupported); } /* after every iteration, clean-up all the memory associated with it */ From 5d21f8361384b8d9a464aeaf45f1b1c627667dc7 Mon Sep 17 00:00:00 2001 From: EmelSimsek Date: Tue, 19 Mar 2024 14:25:25 +0300 Subject: [PATCH 3/4] Refactor --- .../utils/multi_partitioning_utils.c | 64 +++++++++++++------ 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/src/backend/distributed/utils/multi_partitioning_utils.c b/src/backend/distributed/utils/multi_partitioning_utils.c index 179c905ff..88a99241c 100644 --- a/src/backend/distributed/utils/multi_partitioning_utils.c +++ b/src/backend/distributed/utils/multi_partitioning_utils.c @@ -13,6 +13,7 @@ #include "access/htup_details.h" #include "catalog/index.h" #include "catalog/indexing.h" +#include "catalog/namespace.h" #include "catalog/partition.h" #include "catalog/pg_class.h" #include "catalog/pg_constraint.h" @@ -26,6 +27,7 @@ #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/regproc.h" #include "utils/rel.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -233,27 +235,20 @@ worker_fix_partition_shard_index_names(PG_FUNCTION_ARGS) { Oid parentShardIndexId = PG_GETARG_OID(0); - text *partitionShardName = PG_GETARG_TEXT_P(1); + text *partitionIndexName = PG_GETARG_TEXT_P(1); + + text *partitionName = PG_GETARG_TEXT_P(2); /* resolve partitionShardId from passed in schema and partition shard name */ - List *partitionShardNameList = textToQualifiedNameList(partitionShardName); - RangeVar *partitionShard = makeRangeVarFromNameList(partitionShardNameList); + List *partitionNameList = textToQualifiedNameList(partitionName); + RangeVar *partition = makeRangeVarFromNameList(partitionNameList); /* lock the relation with the lock mode */ bool missing_ok = true; - Oid partitionShardId = RangeVarGetRelid(partitionShard, NoLock, missing_ok); - - if (!OidIsValid(partitionShardId)) - { - PG_RETURN_VOID(); - } + Oid partitionId = RangeVarGetRelid(partition, NoLock, missing_ok); CheckCitusVersion(ERROR); - EnsureTableOwner(partitionShardId); - - text *newPartitionShardIndexNameText = PG_GETARG_TEXT_P(2); - char *newPartitionShardIndexName = text_to_cstring( - newPartitionShardIndexNameText); + EnsureTableOwner(partitionId); if (!has_subclass(parentShardIndexId)) { @@ -263,12 +258,31 @@ worker_fix_partition_shard_index_names(PG_FUNCTION_ARGS) List *partitionShardIndexIds = find_inheritance_children(parentShardIndexId, ShareRowExclusiveLock); - Oid partitionShardIndexId = InvalidOid; - foreach_oid(partitionShardIndexId, partitionShardIndexIds) + + List *partitionShardIntervalList = LoadShardIntervalList(partitionId); + ShardInterval *partitionShardInterval = NULL; + foreach_ptr(partitionShardInterval, partitionShardIntervalList) { - if (IndexGetRelation(partitionShardIndexId, false) == partitionShardId) + uint64 partitionShardId = partitionShardInterval->shardId; + + char *shardName = ConstructQualifiedShardName(partitionShardInterval); + List *names = stringToQualifiedNameList_compat(shardName); + RangeVar *partitionShardRangeVar = makeRangeVarFromNameList(names); + + Oid partitionShardOid = RangeVarGetRelid(partitionShardRangeVar, NoLock, + missing_ok); + + Oid partitionShardIndexId = InvalidOid; + foreach_oid(partitionShardIndexId, partitionShardIndexIds) { + if (IndexGetRelation(partitionShardIndexId, false) != partitionShardOid) + { + continue; + } + char *partitionShardIndexName = get_rel_name(partitionShardIndexId); + + if (ExtractShardIdFromTableName(partitionShardIndexName, missing_ok) == INVALID_SHARD_ID) { @@ -279,6 +293,11 @@ worker_fix_partition_shard_index_names(PG_FUNCTION_ARGS) * which ends in _shardid, hence we maintain naming consistency: * we can reach this partition shard index by conventional Citus naming */ + + char *newPartitionShardIndexName = pstrdup(text_to_cstring( + partitionIndexName)); + AppendShardIdToName(&newPartitionShardIndexName, partitionShardId); + RenameStmt *stmt = makeNode(RenameStmt); stmt->renameType = OBJECT_INDEX; @@ -290,7 +309,6 @@ worker_fix_partition_shard_index_names(PG_FUNCTION_ARGS) RenameRelation(stmt); } - break; } } @@ -712,6 +730,12 @@ WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndex char *partitionIndexName = get_rel_name(partitionIndexId); char *partitionName = get_rel_name(partitionId); char *partitionSchemaName = get_namespace_name(get_rel_namespace(partitionId)); + + + char *qualifiedPartitionName = quote_qualified_identifier( + partitionSchemaName, + partitionName); + List *partitionShardIntervalList = LoadShardIntervalList(partitionId); ShardInterval *partitionShardInterval = NULL; @@ -753,8 +777,8 @@ WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndex appendStringInfo(shardQueryString, "SELECT worker_fix_partition_shard_index_names(%s::regclass, %s, %s)", quote_literal_cstr(qualifiedParentShardIndexName), - quote_literal_cstr(qualifiedPartitionShardName), - quote_literal_cstr(newPartitionShardIndexName)); + quote_literal_cstr(partitionIndexName), + quote_literal_cstr(qualifiedPartitionName)); commandList = lappend(commandList, shardQueryString->data); } From 622016c08ecc5f173f742953e22a991e2fddf68a Mon Sep 17 00:00:00 2001 From: EmelSimsek Date: Tue, 19 Mar 2024 14:30:56 +0300 Subject: [PATCH 4/4] Refactor --- .../utils/multi_partitioning_utils.c | 52 +++---------------- 1 file changed, 8 insertions(+), 44 deletions(-) diff --git a/src/backend/distributed/utils/multi_partitioning_utils.c b/src/backend/distributed/utils/multi_partitioning_utils.c index 88a99241c..b435056f2 100644 --- a/src/backend/distributed/utils/multi_partitioning_utils.c +++ b/src/backend/distributed/utils/multi_partitioning_utils.c @@ -736,51 +736,15 @@ WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndex partitionSchemaName, partitionName); - List *partitionShardIntervalList = LoadShardIntervalList(partitionId); - ShardInterval *partitionShardInterval = NULL; - foreach_ptr(partitionShardInterval, partitionShardIntervalList) - { - /* - * Prepare commands for each shard of current partition - * to fix the index name that corresponds to the - * current parent index name - */ - uint64 partitionShardId = partitionShardInterval->shardId; - - List *activePlacementList = ActiveShardPlacementList(partitionShardId); - - if ((list_length(activePlacementList) > 0) && (list_length(shardPlacements) > 0)) - { - ShardPlacement *left = (ShardPlacement *) linitial(activePlacementList); - ShardPlacement *right = (ShardPlacement *) linitial(shardPlacements); - - if (left->nodeId != right->nodeId) - { - continue; - } - } - - /* get qualified partition shard name */ - char *partitionShardName = pstrdup(partitionName); - AppendShardIdToName(&partitionShardName, partitionShardId); - char *qualifiedPartitionShardName = quote_qualified_identifier( - partitionSchemaName, - partitionShardName); - - /* generate the new correct index name */ - char *newPartitionShardIndexName = pstrdup(partitionIndexName); - AppendShardIdToName(&newPartitionShardIndexName, partitionShardId); - - /* create worker_fix_partition_shard_index_names command */ - StringInfo shardQueryString = makeStringInfo(); - appendStringInfo(shardQueryString, - "SELECT worker_fix_partition_shard_index_names(%s::regclass, %s, %s)", - quote_literal_cstr(qualifiedParentShardIndexName), - quote_literal_cstr(partitionIndexName), - quote_literal_cstr(qualifiedPartitionName)); - commandList = lappend(commandList, shardQueryString->data); - } + /* create worker_fix_partition_shard_index_names command */ + StringInfo shardQueryString = makeStringInfo(); + appendStringInfo(shardQueryString, + "SELECT worker_fix_partition_shard_index_names(%s::regclass, %s, %s)", + quote_literal_cstr(qualifiedParentShardIndexName), + quote_literal_cstr(partitionIndexName), + quote_literal_cstr(qualifiedPartitionName)); + commandList = lappend(commandList, shardQueryString->data); return commandList; }