diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index 761c9bd22..e0ef67d7d 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -34,6 +34,7 @@ #include "distributed/multi_partitioning_utils.h" #include "distributed/resource_lock.h" #include "distributed/relation_access_tracking.h" +#include "distributed/relation_utils.h" #include "distributed/version_compat.h" #include "lib/stringinfo.h" #include "miscadmin.h" @@ -46,13 +47,19 @@ /* Local functions forward declarations for helper functions */ -static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt); -static void SwitchToSequentialExecutionIfIndexNameTooLong(Oid relationId, Oid - namespaceId, - IndexStmt *createIndexStatement); -static char * GenerateLongestShardPartitionIndexName(Oid relationId, Oid namespaceId, - IndexStmt *createIndexStatement); - +static bool IndexAlreadyExists(IndexStmt *createIndexStatement); +static Oid CreateIndexStmtGetIndexId(IndexStmt *createIndexStatement); +static Oid CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement); +static void SwitchToSequentialExecutionIfIndexNameTooLong( + IndexStmt *createIndexStatement); +static char * GenerateLongestShardPartitionIndexName(IndexStmt *createIndexStatement); +static char * GenerateDefaultIndexName(IndexStmt *createIndexStatement); +static List * GenerateIndexParameters(IndexStmt *createIndexStatement); +static DDLJob * GenerateCreateIndexDDLJob(IndexStmt *createIndexStatement, + const char *createIndexCommand); +static Oid CreateIndexStmtGetRelationId(IndexStmt *createIndexStatement); +static LOCKMODE GetCreateIndexRelationLockMode(IndexStmt *createIndexStatement); +static List * CreateIndexTaskList(IndexStmt *indexStmt); static List * CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt); static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg); @@ -120,103 +127,124 @@ List * PreprocessIndexStmt(Node *node, const char *createIndexCommand) { IndexStmt *createIndexStatement = castNode(IndexStmt, node); - List *ddlJobs = NIL; - /* - * We first check whether a distributed relation is affected. For that, we need to - * open the relation. To prevent race conditions with later lookups, lock the table, - * and modify the rangevar to include the schema. - */ - if (createIndexStatement->relation != NULL) + RangeVar *relationRangeVar = createIndexStatement->relation; + if (relationRangeVar == NULL) { - LOCKMODE lockmode = ShareLock; - MemoryContext relationContext = NULL; - - /* - * We don't support concurrently creating indexes for distributed - * tables, but till this point, we don't know if it is a regular or a - * distributed table. - */ - if (createIndexStatement->concurrent) - { - lockmode = ShareUpdateExclusiveLock; - } - - /* - * XXX: Consider using RangeVarGetRelidExtended with a permission - * checking callback. Right now we'll acquire the lock before having - * checked permissions, and will only fail when executing the actual - * index statements. - */ - Relation relation = table_openrv(createIndexStatement->relation, lockmode); - Oid relationId = RelationGetRelid(relation); - - bool isCitusRelation = IsCitusTable(relationId); - - if (createIndexStatement->relation->schemaname == NULL) - { - /* - * Before we do any further processing, fix the schema name to make sure - * that a (distributed) table with the same name does not appear on the - * search path in front of the current schema. We do this even if the - * table is not distributed, since a distributed table may appear on the - * search path by the time postgres starts processing this statement. - */ - char *namespaceName = get_namespace_name(RelationGetNamespace(relation)); - - /* ensure we copy string into proper context */ - relationContext = GetMemoryChunkContext(createIndexStatement->relation); - createIndexStatement->relation->schemaname = MemoryContextStrdup( - relationContext, namespaceName); - } - - table_close(relation, NoLock); - - if (isCitusRelation) - { - char *indexName = createIndexStatement->idxname; - char *namespaceName = createIndexStatement->relation->schemaname; - - ErrorIfUnsupportedIndexStmt(createIndexStatement); - - Oid namespaceId = get_namespace_oid(namespaceName, false); - Oid indexRelationId = get_relname_relid(indexName, namespaceId); - - /* if index does not exist, send the command to workers */ - if (!OidIsValid(indexRelationId)) - { - DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; - ddlJob->concurrentIndexCmd = createIndexStatement->concurrent; - ddlJob->startNewTransaction = createIndexStatement->concurrent; - ddlJob->commandString = createIndexCommand; - ddlJob->taskList = CreateIndexTaskList(relationId, createIndexStatement); - - ddlJobs = list_make1(ddlJob); - - /* - * Citus has the logic to truncate the long shard names to prevent - * various issues, including self-deadlocks. However, for partitioned - * tables, when index is created on the parent table, the index names - * on the partitions are auto-generated by Postgres. We use the same - * Postgres function to generate the index names on the shards of the - * partitions. If the length exceeds the limit, we switch to sequential - * execution mode. - * - * The root cause of the problem is that postgres truncates the - * table/index names if they are longer than "NAMEDATALEN - 1". - * From Citus' perspective, running commands in parallel on the - * shards could mean these table/index names are truncated to be - * the same, and thus forming a self-deadlock as these tables/ - * indexes are inserted into postgres' metadata tables, like pg_class. - */ - SwitchToSequentialExecutionIfIndexNameTooLong(relationId, namespaceId, - createIndexStatement); - } - } + /* let's be on the safe side */ + return NIL; } - return ddlJobs; + /* + * We first check whether a distributed relation is affected. For that, + * we need to open the relation. To prevent race conditions with later + * lookups, lock the table. + * + * XXX: Consider using RangeVarGetRelidExtended with a permission + * checking callback. Right now we'll acquire the lock before having + * checked permissions, and will only fail when executing the actual + * index statements. + */ + LOCKMODE lockMode = GetCreateIndexRelationLockMode(createIndexStatement); + Relation relation = table_openrv(relationRangeVar, lockMode); + + /* + * Before we do any further processing, fix the schema name to make sure + * that a (distributed) table with the same name does not appear on the + * search_path in front of the current schema. We do this even if the + * table is not distributed, since a distributed table may appear on the + * search_path by the time postgres starts processing this command. + */ + if (relationRangeVar->schemaname == NULL) + { + /* ensure we copy string into proper context */ + MemoryContext relationContext = GetMemoryChunkContext(relationRangeVar); + char *namespaceName = RelationGetNamespaceName(relation); + relationRangeVar->schemaname = MemoryContextStrdup(relationContext, + namespaceName); + } + + table_close(relation, NoLock); + + Oid relationId = CreateIndexStmtGetRelationId(createIndexStatement); + if (!IsCitusTable(relationId)) + { + return NIL; + } + + ErrorIfUnsupportedIndexStmt(createIndexStatement); + + if (IndexAlreadyExists(createIndexStatement)) + { + /* + * Let standard_processUtility to error out or skip if command has + * IF NOT EXISTS. + */ + return NIL; + } + + /* + * Citus has the logic to truncate the long shard names to prevent + * various issues, including self-deadlocks. However, for partitioned + * tables, when index is created on the parent table, the index names + * on the partitions are auto-generated by Postgres. We use the same + * Postgres function to generate the index names on the shards of the + * partitions. If the length exceeds the limit, we switch to sequential + * execution mode. + * + * The root cause of the problem is that postgres truncates the + * table/index names if they are longer than "NAMEDATALEN - 1". + * From Citus' perspective, running commands in parallel on the + * shards could mean these table/index names are truncated to be + * the same, and thus forming a self-deadlock as these tables/ + * indexes are inserted into postgres' metadata tables, like pg_class. + */ + SwitchToSequentialExecutionIfIndexNameTooLong(createIndexStatement); + + DDLJob *ddlJob = GenerateCreateIndexDDLJob(createIndexStatement, createIndexCommand); + return list_make1(ddlJob); +} + + +/* + * IndexAlreadyExists returns true if index to be created by given CREATE INDEX + * command already exists. + */ +static bool +IndexAlreadyExists(IndexStmt *createIndexStatement) +{ + Oid indexRelationId = CreateIndexStmtGetIndexId(createIndexStatement); + return OidIsValid(indexRelationId); +} + + +/* + * CreateIndexStmtGetIndexId returns OID of the index that given CREATE INDEX + * command attempts to create if it's already created before. Otherwise, returns + * InvalidOid. + */ +static Oid +CreateIndexStmtGetIndexId(IndexStmt *createIndexStatement) +{ + char *indexName = createIndexStatement->idxname; + Oid namespaceId = CreateIndexStmtGetSchemaId(createIndexStatement); + Oid indexRelationId = get_relname_relid(indexName, namespaceId); + return indexRelationId; +} + + +/* + * CreateIndexStmtGetSchemaId returns schemaId of the schema that given + * CREATE INDEX command operates on. + */ +static Oid +CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement) +{ + RangeVar *relationRangeVar = createIndexStatement->relation; + char *schemaName = relationRangeVar->schemaname; + bool missingOk = false; + Oid namespaceId = get_namespace_oid(schemaName, missingOk); + return namespaceId; } @@ -226,9 +254,9 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand) * sequential execution to prevent self-deadlocks. */ static void -SwitchToSequentialExecutionIfIndexNameTooLong(Oid relationId, Oid namespaceId, - IndexStmt *createIndexStatement) +SwitchToSequentialExecutionIfIndexNameTooLong(IndexStmt *createIndexStatement) { + Oid relationId = CreateIndexStmtGetRelationId(createIndexStatement); if (!PartitionedTable(relationId)) { /* Citus already handles long names for regular tables */ @@ -244,10 +272,7 @@ SwitchToSequentialExecutionIfIndexNameTooLong(Oid relationId, Oid namespaceId, return; } - char *indexName = - GenerateLongestShardPartitionIndexName(relationId, namespaceId, - createIndexStatement); - + char *indexName = GenerateLongestShardPartitionIndexName(createIndexStatement); if (indexName && strnlen(indexName, NAMEDATALEN) >= NAMEDATALEN - 1) { @@ -282,9 +307,9 @@ SwitchToSequentialExecutionIfIndexNameTooLong(Oid relationId, Oid namespaceId, * possible index name. */ static char * -GenerateLongestShardPartitionIndexName(Oid relationId, Oid namespaceId, - IndexStmt *createIndexStatement) +GenerateLongestShardPartitionIndexName(IndexStmt *createIndexStatement) { + Oid relationId = CreateIndexStmtGetRelationId(createIndexStatement); char *longestPartitionName = LongestPartitionName(relationId); if (longestPartitionName == NULL) { @@ -294,37 +319,106 @@ GenerateLongestShardPartitionIndexName(Oid relationId, Oid namespaceId, char *longestPartitionShardName = pstrdup(longestPartitionName); ShardInterval *shardInterval = LoadShardIntervalWithLongestShardName(relationId); - AppendShardIdToName(&longestPartitionShardName, shardInterval->shardId); - /* - * The rest of the code is copy & paste from DefineIndex() - * postgres/src/backend/commands/indexcmds.c - */ + IndexStmt *createLongestShardIndexStmt = copyObject(createIndexStatement); + createLongestShardIndexStmt->relation->relname = longestPartitionShardName; - /* - * Calculate the new list of index columns including both key columns and - * INCLUDE columns. Later we can determine which of these are key - * columns, and which are just part of the INCLUDE list by checking the - * list position. A list item in a position less than ii_NumIndexKeyAttrs - * is part of the key columns, and anything equal to and over is part of - * the INCLUDE columns. - */ - List *allIndexParams = - list_concat(list_copy(createIndexStatement->indexParams), - list_copy(createIndexStatement->indexIncludingParams)); - - List *indexColNames = ChooseIndexColumnNames(allIndexParams); - - char *choosenIndexName = ChooseIndexName(longestPartitionShardName, namespaceId, - indexColNames, - createIndexStatement->excludeOpNames, - createIndexStatement->primary, - createIndexStatement->isconstraint); + char *choosenIndexName = GenerateDefaultIndexName(createLongestShardIndexStmt); return choosenIndexName; } +/* + * GenerateDefaultIndexName is a wrapper around postgres function ChooseIndexName + * that generates default index name for the index to be created by given CREATE + * INDEX statement as postgres would do. + * + * (See DefineIndex at postgres/src/backend/commands/indexcmds.c) + */ +static char * +GenerateDefaultIndexName(IndexStmt *createIndexStatement) +{ + char *relationName = createIndexStatement->relation->relname; + Oid namespaceId = CreateIndexStmtGetSchemaId(createIndexStatement); + List *indexParams = GenerateIndexParameters(createIndexStatement); + List *indexColNames = ChooseIndexColumnNames(indexParams); + char *indexName = ChooseIndexName(relationName, namespaceId, indexColNames, + createIndexStatement->excludeOpNames, + createIndexStatement->primary, + createIndexStatement->isconstraint); + + return indexName; +} + + +/* + * GenerateIndexParameters is a helper function that creates a list of parameters + * required to assign a default index name for the index to be created by given + * CREATE INDEX command. + */ +static List * +GenerateIndexParameters(IndexStmt *createIndexStatement) +{ + List *indexParams = createIndexStatement->indexParams; + List *indexIncludingParams = createIndexStatement->indexIncludingParams; + List *allIndexParams = list_concat(list_copy(indexParams), + list_copy(indexIncludingParams)); + return allIndexParams; +} + + +/* + * GenerateCreateIndexDDLJob returns DDLJob for given CREATE INDEX command. + */ +static DDLJob * +GenerateCreateIndexDDLJob(IndexStmt *createIndexStatement, const char *createIndexCommand) +{ + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + + ddlJob->targetRelationId = CreateIndexStmtGetRelationId(createIndexStatement); + ddlJob->concurrentIndexCmd = createIndexStatement->concurrent; + ddlJob->startNewTransaction = createIndexStatement->concurrent; + ddlJob->commandString = createIndexCommand; + ddlJob->taskList = CreateIndexTaskList(createIndexStatement); + + return ddlJob; +} + + +/* + * CreateIndexStmtGetRelationId returns relationId for relation that given + * CREATE INDEX command operates on. + */ +static Oid +CreateIndexStmtGetRelationId(IndexStmt *createIndexStatement) +{ + RangeVar *relationRangeVar = createIndexStatement->relation; + LOCKMODE lockMode = GetCreateIndexRelationLockMode(createIndexStatement); + bool missingOk = false; + Oid relationId = RangeVarGetRelid(relationRangeVar, lockMode, missingOk); + return relationId; +} + + +/* + * GetCreateIndexRelationLockMode returns required lock mode to open the + * relation that given CREATE INDEX command operates on. + */ +static LOCKMODE +GetCreateIndexRelationLockMode(IndexStmt *createIndexStatement) +{ + if (createIndexStatement->concurrent) + { + return ShareUpdateExclusiveLock; + } + else + { + return ShareLock; + } +} + + /* * PreprocessReindexStmt determines whether a given REINDEX statement involves * a distributed table. If so (and if the statement does not use unsupported @@ -635,13 +729,13 @@ ErrorIfUnsupportedAlterIndexStmt(AlterTableStmt *alterTableStatement) /* - * CreateIndexTaskList builds a list of tasks to execute a CREATE INDEX command - * against a specified distributed table. + * CreateIndexTaskList builds a list of tasks to execute a CREATE INDEX command. */ static List * -CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt) +CreateIndexTaskList(IndexStmt *indexStmt) { List *taskList = NIL; + Oid relationId = CreateIndexStmtGetRelationId(indexStmt); List *shardIntervalList = LoadShardIntervalList(relationId); StringInfoData ddlString; uint64 jobId = INVALID_JOB_ID; diff --git a/src/backend/distributed/utils/relation_utils.c b/src/backend/distributed/utils/relation_utils.c new file mode 100644 index 000000000..12c76a4ab --- /dev/null +++ b/src/backend/distributed/utils/relation_utils.c @@ -0,0 +1,30 @@ +/*------------------------------------------------------------------------- + * + * relation_utils.c + * + * This file contains functions similar to rel.h to perform useful + * operations on Relation objects. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "distributed/relation_utils.h" + +#include "utils/lsyscache.h" +#include "utils/rel.h" + + +/* + * RelationGetNamespaceName returns the relation's namespace name. + */ +char * +RelationGetNamespaceName(Relation relation) +{ + Oid namespaceId = RelationGetNamespace(relation); + char *namespaceName = get_namespace_name(namespaceId); + return namespaceName; +} diff --git a/src/include/distributed/relation_utils.h b/src/include/distributed/relation_utils.h new file mode 100644 index 000000000..873398f00 --- /dev/null +++ b/src/include/distributed/relation_utils.h @@ -0,0 +1,20 @@ +/*------------------------------------------------------------------------- + * + * relation_utils.h + * Utilities related to Relation objects. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef RELATION_UTILS_H +#define RELATION_UTILS_H + +#include "postgres.h" + +#include "utils/relcache.h" + +extern char * RelationGetNamespaceName(Relation relation); + +#endif /* RELATION_UTILS_H */