pull/7557/merge
Emel Şimşek 2025-03-05 10:21:05 -08:00 committed by GitHub
commit 7d94095bb6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 70 additions and 55 deletions

View File

@ -13,6 +13,7 @@
#include "access/htup_details.h" #include "access/htup_details.h"
#include "catalog/index.h" #include "catalog/index.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/partition.h" #include "catalog/partition.h"
#include "catalog/pg_class.h" #include "catalog/pg_class.h"
#include "catalog/pg_constraint.h" #include "catalog/pg_constraint.h"
@ -26,6 +27,7 @@
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/regproc.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/syscache.h" #include "utils/syscache.h"
#include "utils/varlena.h" #include "utils/varlena.h"
@ -60,15 +62,20 @@ static void CreateFixPartitionShardIndexNames(Oid parentRelationId,
Oid parentIndexOid); Oid parentIndexOid);
static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId, static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
List *indexIdList, List *indexIdList,
Oid partitionRelationId); Oid partitionRelationId,
List *shardPlacements);
static List * WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex( static List * WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(
char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId); char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId,
List *
shardPlacements);
static List * WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid static List * WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid
partitionIndexId, partitionIndexId,
char * char *
qualifiedParentShardIndexName, qualifiedParentShardIndexName,
Oid Oid
partitionId); partitionId,
List *
shardPlacements);
static List * CheckConstraintNameListForRelation(Oid relationId); static List * CheckConstraintNameListForRelation(Oid relationId);
static bool RelationHasConstraint(Oid relationId, char *constraintName); static bool RelationHasConstraint(Oid relationId, char *constraintName);
static char * RenameConstraintCommand(Oid relationId, char *constraintName, static char * RenameConstraintCommand(Oid relationId, char *constraintName,
@ -228,27 +235,20 @@ worker_fix_partition_shard_index_names(PG_FUNCTION_ARGS)
{ {
Oid parentShardIndexId = PG_GETARG_OID(0); Oid parentShardIndexId = PG_GETARG_OID(0);
text *partitionShardName = PG_GETARG_TEXT_P(1); text *partitionIndexName = PG_GETARG_TEXT_P(1);
text *partitionName = PG_GETARG_TEXT_P(2);
/* resolve partitionShardId from passed in schema and partition shard name */ /* resolve partitionShardId from passed in schema and partition shard name */
List *partitionShardNameList = textToQualifiedNameList(partitionShardName); List *partitionNameList = textToQualifiedNameList(partitionName);
RangeVar *partitionShard = makeRangeVarFromNameList(partitionShardNameList); RangeVar *partition = makeRangeVarFromNameList(partitionNameList);
/* lock the relation with the lock mode */ /* lock the relation with the lock mode */
bool missing_ok = true; bool missing_ok = true;
Oid partitionShardId = RangeVarGetRelid(partitionShard, NoLock, missing_ok); Oid partitionId = RangeVarGetRelid(partition, NoLock, missing_ok);
if (!OidIsValid(partitionShardId))
{
PG_RETURN_VOID();
}
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
EnsureTableOwner(partitionShardId); EnsureTableOwner(partitionId);
text *newPartitionShardIndexNameText = PG_GETARG_TEXT_P(2);
char *newPartitionShardIndexName = text_to_cstring(
newPartitionShardIndexNameText);
if (!has_subclass(parentShardIndexId)) if (!has_subclass(parentShardIndexId))
{ {
@ -258,12 +258,31 @@ worker_fix_partition_shard_index_names(PG_FUNCTION_ARGS)
List *partitionShardIndexIds = find_inheritance_children(parentShardIndexId, List *partitionShardIndexIds = find_inheritance_children(parentShardIndexId,
ShareRowExclusiveLock); ShareRowExclusiveLock);
List *partitionShardIntervalList = LoadShardIntervalList(partitionId);
ShardInterval *partitionShardInterval = NULL;
foreach_ptr(partitionShardInterval, partitionShardIntervalList)
{
uint64 partitionShardId = partitionShardInterval->shardId;
char *shardName = ConstructQualifiedShardName(partitionShardInterval);
List *names = stringToQualifiedNameList_compat(shardName);
RangeVar *partitionShardRangeVar = makeRangeVarFromNameList(names);
Oid partitionShardOid = RangeVarGetRelid(partitionShardRangeVar, NoLock,
missing_ok);
Oid partitionShardIndexId = InvalidOid; Oid partitionShardIndexId = InvalidOid;
foreach_oid(partitionShardIndexId, partitionShardIndexIds) foreach_oid(partitionShardIndexId, partitionShardIndexIds)
{ {
if (IndexGetRelation(partitionShardIndexId, false) == partitionShardId) if (IndexGetRelation(partitionShardIndexId, false) != partitionShardOid)
{ {
continue;
}
char *partitionShardIndexName = get_rel_name(partitionShardIndexId); char *partitionShardIndexName = get_rel_name(partitionShardIndexId);
if (ExtractShardIdFromTableName(partitionShardIndexName, missing_ok) == if (ExtractShardIdFromTableName(partitionShardIndexName, missing_ok) ==
INVALID_SHARD_ID) INVALID_SHARD_ID)
{ {
@ -274,6 +293,11 @@ worker_fix_partition_shard_index_names(PG_FUNCTION_ARGS)
* which ends in _shardid, hence we maintain naming consistency: * which ends in _shardid, hence we maintain naming consistency:
* we can reach this partition shard index by conventional Citus naming * we can reach this partition shard index by conventional Citus naming
*/ */
char *newPartitionShardIndexName = pstrdup(text_to_cstring(
partitionIndexName));
AppendShardIdToName(&newPartitionShardIndexName, partitionShardId);
RenameStmt *stmt = makeNode(RenameStmt); RenameStmt *stmt = makeNode(RenameStmt);
stmt->renameType = OBJECT_INDEX; stmt->renameType = OBJECT_INDEX;
@ -285,7 +309,6 @@ worker_fix_partition_shard_index_names(PG_FUNCTION_ARGS)
RenameRelation(stmt); RenameRelation(stmt);
} }
break;
} }
} }
@ -567,10 +590,14 @@ CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId,
{ {
uint64 parentShardId = parentShardInterval->shardId; uint64 parentShardId = parentShardInterval->shardId;
List *shardPlacementList = ActiveShardPlacementList(parentShardId);
List *queryStringList = List *queryStringList =
WorkerFixPartitionShardIndexNamesCommandList(parentShardId, WorkerFixPartitionShardIndexNamesCommandList(parentShardId,
parentIndexIdList, parentIndexIdList,
partitionRelationId); partitionRelationId,
shardPlacementList);
if (queryStringList != NIL) if (queryStringList != NIL)
{ {
Task *task = CitusMakeNode(Task); Task *task = CitusMakeNode(Task);
@ -611,7 +638,8 @@ CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId,
static List * static List *
WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId, WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
List *parentIndexIdList, List *parentIndexIdList,
Oid partitionRelationId) Oid partitionRelationId,
List *shardPlacements)
{ {
List *commandList = NIL; List *commandList = NIL;
Oid parentIndexId = InvalidOid; Oid parentIndexId = InvalidOid;
@ -634,7 +662,8 @@ WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
char *qualifiedParentShardIndexName = quote_qualified_identifier(schemaName, char *qualifiedParentShardIndexName = quote_qualified_identifier(schemaName,
parentShardIndexName); parentShardIndexName);
List *commands = WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex( List *commands = WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(
qualifiedParentShardIndexName, parentIndexId, partitionRelationId); qualifiedParentShardIndexName, parentIndexId, partitionRelationId,
shardPlacements);
commandList = list_concat(commandList, commands); commandList = list_concat(commandList, commands);
} }
@ -653,7 +682,8 @@ WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
*/ */
static List * static List *
WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex( WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(
char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId) char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId,
List *shardPlacements)
{ {
List *commandList = NIL; List *commandList = NIL;
@ -673,7 +703,8 @@ WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(
{ {
List *commands = List *commands =
WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex( WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(
partitionIndexId, qualifiedParentShardIndexName, partitionId); partitionIndexId, qualifiedParentShardIndexName, partitionId,
shardPlacements);
commandList = list_concat(commandList, commands); commandList = list_concat(commandList, commands);
} }
} }
@ -690,7 +721,8 @@ static List *
WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndexId, WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndexId,
char * char *
qualifiedParentShardIndexName, qualifiedParentShardIndexName,
Oid partitionId) Oid partitionId,
List *shardPlacements)
{ {
List *commandList = NIL; List *commandList = NIL;
@ -698,38 +730,21 @@ WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndex
char *partitionIndexName = get_rel_name(partitionIndexId); char *partitionIndexName = get_rel_name(partitionIndexId);
char *partitionName = get_rel_name(partitionId); char *partitionName = get_rel_name(partitionId);
char *partitionSchemaName = get_namespace_name(get_rel_namespace(partitionId)); char *partitionSchemaName = get_namespace_name(get_rel_namespace(partitionId));
List *partitionShardIntervalList = LoadShardIntervalList(partitionId);
ShardInterval *partitionShardInterval = NULL;
foreach_ptr(partitionShardInterval, partitionShardIntervalList)
{
/*
* Prepare commands for each shard of current partition
* to fix the index name that corresponds to the
* current parent index name
*/
uint64 partitionShardId = partitionShardInterval->shardId;
/* get qualified partition shard name */ char *qualifiedPartitionName = quote_qualified_identifier(
char *partitionShardName = pstrdup(partitionName);
AppendShardIdToName(&partitionShardName, partitionShardId);
char *qualifiedPartitionShardName = quote_qualified_identifier(
partitionSchemaName, partitionSchemaName,
partitionShardName); partitionName);
/* generate the new correct index name */
char *newPartitionShardIndexName = pstrdup(partitionIndexName);
AppendShardIdToName(&newPartitionShardIndexName, partitionShardId);
/* create worker_fix_partition_shard_index_names command */ /* create worker_fix_partition_shard_index_names command */
StringInfo shardQueryString = makeStringInfo(); StringInfo shardQueryString = makeStringInfo();
appendStringInfo(shardQueryString, appendStringInfo(shardQueryString,
"SELECT worker_fix_partition_shard_index_names(%s::regclass, %s, %s)", "SELECT worker_fix_partition_shard_index_names(%s::regclass, %s, %s)",
quote_literal_cstr(qualifiedParentShardIndexName), quote_literal_cstr(qualifiedParentShardIndexName),
quote_literal_cstr(qualifiedPartitionShardName), quote_literal_cstr(partitionIndexName),
quote_literal_cstr(newPartitionShardIndexName)); quote_literal_cstr(qualifiedPartitionName));
commandList = lappend(commandList, shardQueryString->data); commandList = lappend(commandList, shardQueryString->data);
}
return commandList; return commandList;
} }