From 363409a0c2aec722e3311f032e5d4137de58ec85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 18 Sep 2019 20:22:53 +0000 Subject: [PATCH] Propagate REINDEX TABLE & REINDEX INDEX --- src/backend/distributed/commands/index.c | 320 ++++++++++++++---- .../distributed/commands/utility_hook.c | 2 +- .../distributed/relay/relay_event_utility.c | 9 - .../distributed/utils/citus_ruleutils.c | 77 +++++ src/include/distributed/citus_ruleutils.h | 2 + src/include/distributed/commands.h | 3 +- .../expected/multi_index_statements.out | 6 - .../expected/multi_index_statements_0.out | 6 - .../regress/sql/multi_index_statements.sql | 4 - 9 files changed, 339 insertions(+), 90 deletions(-) diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index d49014ec5..d564346f3 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -19,6 +19,7 @@ #include "catalog/index.h" #include "catalog/namespace.h" #include "catalog/pg_class.h" +#include "commands/tablecmds.h" #include "distributed/citus_ruleutils.h" #include "distributed/commands.h" #include "distributed/commands/utility_hook.h" @@ -39,8 +40,12 @@ /* Local functions forward declarations for helper functions */ static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt); +static List * CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt); static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg); +static void RangeVarCallbackForReindexIndex(const RangeVar *rel, Oid relOid, Oid + oldRelOid, + void *arg); static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement); static void ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement); static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt); @@ -57,6 +62,18 @@ struct DropRelationCallbackState bool concurrent; }; +/* + * This struct defines the state for the callback for reindex statements. + * It is copied as it is from commands/indexcmds.c in Postgres source. + */ +struct ReindexIndexCallbackState +{ +#if PG_VERSION_NUM >= 120000 + bool concurrent; +#endif + Oid locked_table_oid; +}; + /* * IsIndexRenameStmt returns whether the passed-in RenameStmt is the following @@ -101,7 +118,6 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) Relation relation = NULL; Oid relationId = InvalidOid; bool isDistributedRelation = false; - char *namespaceName = NULL; LOCKMODE lockmode = ShareLock; MemoryContext relationContext = NULL; @@ -126,19 +142,22 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) isDistributedRelation = IsDistributedTable(relationId); - /* - * Before we do any further processing, fix the schema name to make sure - * that a (distributed) table with the same name does not appear on the - * search path in front of the current schema. We do this even if the - * table is not distributed, since a distributed table may appear on the - * search path by the time postgres starts processing this statement. - */ - namespaceName = get_namespace_name(RelationGetNamespace(relation)); + if (createIndexStatement->relation->schemaname == NULL) + { + /* + * Before we do any further processing, fix the schema name to make sure + * that a (distributed) table with the same name does not appear on the + * search path in front of the current schema. We do this even if the + * table is not distributed, since a distributed table may appear on the + * search path by the time postgres starts processing this statement. + */ + char *namespaceName = get_namespace_name(RelationGetNamespace(relation)); - /* ensure we copy string into proper context */ - relationContext = GetMemoryChunkContext(createIndexStatement->relation); - namespaceName = MemoryContextStrdup(relationContext, namespaceName); - createIndexStatement->relation->schemaname = namespaceName; + /* ensure we copy string into proper context */ + relationContext = GetMemoryChunkContext(createIndexStatement->relation); + createIndexStatement->relation->schemaname = MemoryContextStrdup( + relationContext, namespaceName); + } heap_close(relation, NoLock); @@ -147,6 +166,7 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) Oid namespaceId = InvalidOid; Oid indexRelationId = InvalidOid; char *indexName = createIndexStatement->idxname; + char *namespaceName = createIndexStatement->relation->schemaname; ErrorIfUnsupportedIndexStmt(createIndexStatement); @@ -172,65 +192,109 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) /* - * ErrorIfReindexOnDistributedTable determines whether a given REINDEX - * involves a distributed table, & raises an error if so. + * PlanReindexStmt determines whether a given REINDEX statement involves + * a distributed table. If so (and if the statement does not use unsupported + * options), it modifies the input statement to ensure proper execution against + * the master node table and creates a DDLJob to encapsulate information needed + * during the worker node portion of DDL execution before returning that DDLJob + * in a List. If no distributed table is involved, this function returns NIL. */ -void -ErrorIfReindexOnDistributedTable(ReindexStmt *ReindexStatement) +List * +PlanReindexStmt(ReindexStmt *reindexStatement, const char *reindexCommand) { - Relation relation = NULL; - Oid relationId = InvalidOid; - bool isDistributedRelation = false; - LOCKMODE lockmode = AccessShareLock; + List *ddlJobs = NIL; /* * We first check whether a distributed relation is affected. For that, we need to - * open the relation. + * open the relation. To prevent race conditions with later lookups, lock the table, + * and modify the rangevar to include the schema. */ - if (ReindexStatement->relation == NULL) + if (reindexStatement->relation != NULL) { - /* ignore REINDEX SCHEMA, REINDEX SYSTEM, and REINDEX DATABASE */ - return; + Relation relation = NULL; + Oid relationId = InvalidOid; + bool isDistributedRelation = false; +#if PG_VERSION_NUM >= 120000 + LOCKMODE lockmode = reindexStatement->concurrent ? ShareUpdateExclusiveLock : + AccessExclusiveLock; +#else + LOCKMODE lockmode = AccessExclusiveLock; +#endif + MemoryContext relationContext = NULL; + + Assert(reindexStatement->kind == REINDEX_OBJECT_INDEX || + reindexStatement->kind == REINDEX_OBJECT_TABLE); + + if (reindexStatement->kind == REINDEX_OBJECT_INDEX) + { + Oid indOid; + struct ReindexIndexCallbackState state; +#if PG_VERSION_NUM >= 120000 + state.concurrent = reindexStatement->concurrent; +#endif + state.locked_table_oid = InvalidOid; + + indOid = RangeVarGetRelidInternal(reindexStatement->relation, + lockmode, 0, + RangeVarCallbackForReindexIndex, + &state); + relation = index_open(indOid, NoLock); + relationId = IndexGetRelation(indOid, false); + } + else + { + RangeVarGetRelidInternal(reindexStatement->relation, lockmode, 0, + RangeVarCallbackOwnsTable, NULL); + + relation = heap_openrv(reindexStatement->relation, NoLock); + relationId = RelationGetRelid(relation); + } + + isDistributedRelation = IsDistributedTable(relationId); + + if (reindexStatement->relation->schemaname == NULL) + { + /* + * Before we do any further processing, fix the schema name to make sure + * that a (distributed) table with the same name does not appear on the + * search path in front of the current schema. We do this even if the + * table is not distributed, since a distributed table may appear on the + * search path by the time postgres starts processing this statement. + */ + char *namespaceName = get_namespace_name(RelationGetNamespace(relation)); + + /* ensure we copy string into proper context */ + relationContext = GetMemoryChunkContext(reindexStatement->relation); + reindexStatement->relation->schemaname = MemoryContextStrdup(relationContext, + namespaceName); + } + + if (reindexStatement->kind == REINDEX_OBJECT_INDEX) + { + index_close(relation, NoLock); + } + else + { + heap_close(relation, NoLock); + } + + if (isDistributedRelation) + { + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + ddlJob->targetRelationId = relationId; +#if PG_VERSION_NUM >= 120000 + ddlJob->concurrentIndexCmd = reindexStatement->concurrent; +#else + ddlJob->concurrentIndexCmd = false; +#endif + ddlJob->commandString = reindexCommand; + ddlJob->taskList = CreateReindexTaskList(relationId, reindexStatement); + + ddlJobs = list_make1(ddlJob); + } } - Assert(ReindexStatement->kind == REINDEX_OBJECT_INDEX || - ReindexStatement->kind == REINDEX_OBJECT_TABLE); - - /* - * XXX: Consider using RangeVarGetRelidExtended with a permission - * checking callback. Right now we'll acquire the lock before having - * checked permissions. - */ - if (ReindexStatement->kind == REINDEX_OBJECT_INDEX) - { - Oid indOid = RangeVarGetRelid(ReindexStatement->relation, - NoLock, false); - relation = index_open(indOid, lockmode); - relationId = IndexGetRelation(indOid, false); - } - else - { - relation = heap_openrv(ReindexStatement->relation, lockmode); - relationId = RelationGetRelid(relation); - } - - isDistributedRelation = IsDistributedTable(relationId); - - if (ReindexStatement->kind == REINDEX_OBJECT_INDEX) - { - index_close(relation, NoLock); - } - else - { - heap_close(relation, NoLock); - } - - if (isDistributedRelation) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg( - "REINDEX is not implemented for distributed relations"))); - } + return ddlJobs; } @@ -487,6 +551,52 @@ CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt) } +/* + * CreateReindexTaskList builds a list of tasks to execute a REINDEX command + * against a specified distributed table. + */ +static List * +CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt) +{ + List *taskList = NIL; + List *shardIntervalList = LoadShardIntervalList(relationId); + ListCell *shardIntervalCell = NULL; + StringInfoData ddlString; + uint64 jobId = INVALID_JOB_ID; + int taskId = 1; + + initStringInfo(&ddlString); + + /* lock metadata before getting placement lists */ + LockShardListMetadata(shardIntervalList, ShareLock); + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + uint64 shardId = shardInterval->shardId; + Task *task = NULL; + + deparse_shard_reindex_statement(reindexStmt, relationId, shardId, &ddlString); + + task = CitusMakeNode(Task); + task->jobId = jobId; + task->taskId = taskId++; + task->taskType = DDL_TASK; + task->queryString = pstrdup(ddlString.data); + task->replicationModel = REPLICATION_MODEL_INVALID; + task->dependedTaskList = NULL; + task->anchorShardId = shardId; + task->taskPlacementList = FinalizedShardPlacementList(shardId); + + taskList = lappend(taskList, task); + + resetStringInfo(&ddlString); + } + + return taskList; +} + + /* * Before acquiring a table lock, check whether we have sufficient rights. * In the case of DROP INDEX, also try to lock the table before the index. @@ -585,6 +695,90 @@ RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, voi } +/* + * Check permissions on table before acquiring relation lock; also lock + * the heap before the RangeVarGetRelidExtended takes the index lock, to avoid + * deadlocks. + * + * This code is borrowed from RangeVarCallbackForReindexIndex() in + * commands/indexcmds.c in Postgres source. We need this to ensure the right + * order of locking while dealing with REINDEX statements. + */ +static void +RangeVarCallbackForReindexIndex(const RangeVar *relation, Oid relId, Oid oldRelId, + void *arg) +{ + /* *INDENT-OFF* */ + char relkind; + struct ReindexIndexCallbackState *state = arg; + LOCKMODE table_lockmode; + + /* + * Lock level here should match table lock in reindex_index() for + * non-concurrent case and table locks used by index_concurrently_*() for + * concurrent case. + */ +#if PG_VERSION_NUM >= 120000 + table_lockmode = state->concurrent ? ShareUpdateExclusiveLock : ShareLock; +#else + table_lockmode = ShareLock; +#endif + + /* + * If we previously locked some other index's heap, and the name we're + * looking up no longer refers to that relation, release the now-useless + * lock. + */ + if (relId != oldRelId && OidIsValid(oldRelId)) + { + UnlockRelationOid(state->locked_table_oid, table_lockmode); + state->locked_table_oid = InvalidOid; + } + + /* If the relation does not exist, there's nothing more to do. */ + if (!OidIsValid(relId)) + return; + + /* + * If the relation does exist, check whether it's an index. But note that + * the relation might have been dropped between the time we did the name + * lookup and now. In that case, there's nothing to do. + */ + relkind = get_rel_relkind(relId); + if (!relkind) + return; + if (relkind != RELKIND_INDEX +#if PG_VERSION_NUM >= 110000 + && relkind != RELKIND_PARTITIONED_INDEX +#endif + ) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not an index", relation->relname))); + + /* Check permissions */ + if (!pg_class_ownercheck(relId, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX, relation->relname); + + /* Lock heap before index to avoid deadlock. */ + if (relId != oldRelId) + { + Oid table_oid = IndexGetRelation(relId, true); + + /* + * If the OID isn't valid, it means the index was concurrently + * dropped, which is not a problem for us; just return normally. + */ + if (OidIsValid(table_oid)) + { + LockRelationOid(table_oid, table_lockmode); + state->locked_table_oid = table_oid; + } + } + /* *INDENT-ON* */ +} + + /* * ErrorIfUnsupportedIndexStmt checks if the corresponding index statement is * supported for distributed tables and errors out if it is not. diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 585456669..63986a6a1 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -386,7 +386,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, if (IsA(parsetree, ReindexStmt)) { - ErrorIfReindexOnDistributedTable((ReindexStmt *) parsetree); + ddlJobs = PlanReindexStmt((ReindexStmt *) parsetree, queryString); } if (IsA(parsetree, DropStmt)) diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index 0fa9e080c..623c64ece 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -406,15 +406,6 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) AppendShardIdToName(objectName, shardId); } - else if (objectType == REINDEX_OBJECT_DATABASE) - { - ereport(ERROR, (errmsg("cannot extend name for multi-relation reindex"))); - } - else - { - ereport(ERROR, (errmsg("invalid object type in reindex statement"), - errdetail("Object type: %u", (uint32) objectType))); - } break; } diff --git a/src/backend/distributed/utils/citus_ruleutils.c b/src/backend/distributed/utils/citus_ruleutils.c index 00db6e66d..54ea1f106 100644 --- a/src/backend/distributed/utils/citus_ruleutils.c +++ b/src/backend/distributed/utils/citus_ruleutils.c @@ -38,6 +38,7 @@ #include "distributed/multi_partitioning_utils.h" #include "distributed/relay_utility.h" #include "distributed/master_metadata_utility.h" +#include "distributed/metadata_cache.h" #include "distributed/version_compat.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" @@ -715,6 +716,82 @@ deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid, } +/* + * deparse_shard_reindex_statement uses the provided REINDEX node, dist. + * relation, and shard identifier to populate a provided buffer with a string + * representation of a shard-extended version of that command. + */ +void +deparse_shard_reindex_statement(ReindexStmt *origStmt, Oid distrelid, int64 shardid, + StringInfo buffer) +{ + ReindexStmt *reindexStmt = copyObject(origStmt); /* copy to avoid modifications */ + char *relationName = NULL; +#if PG_VERSION_NUM >= 120000 + const char *concurrentlyString = reindexStmt->concurrent ? "CONCURRENTLY " : ""; +#else + const char *concurrentlyString = ""; +#endif + + + if (reindexStmt->kind == REINDEX_OBJECT_INDEX || + reindexStmt->kind == REINDEX_OBJECT_TABLE) + { + relationName = reindexStmt->relation->relname; + + /* extend relation and index name using shard identifier */ + AppendShardIdToName(&relationName, shardid); + } + + appendStringInfoString(buffer, "REINDEX "); + + if (reindexStmt->options == REINDEXOPT_VERBOSE) + { + appendStringInfoString(buffer, "(VERBOSE) "); + } + + switch (reindexStmt->kind) + { + case REINDEX_OBJECT_INDEX: + { + appendStringInfo(buffer, "INDEX %s%s", concurrentlyString, + quote_qualified_identifier(reindexStmt->relation->schemaname, + relationName)); + break; + } + + case REINDEX_OBJECT_TABLE: + { + appendStringInfo(buffer, "TABLE %s%s", concurrentlyString, + quote_qualified_identifier(reindexStmt->relation->schemaname, + relationName)); + break; + } + + case REINDEX_OBJECT_SCHEMA: + { + appendStringInfo(buffer, "SCHEMA %s%s", concurrentlyString, + quote_identifier(reindexStmt->name)); + break; + } + + case REINDEX_OBJECT_SYSTEM: + { + appendStringInfo(buffer, "SYSTEM %s%s", concurrentlyString, + quote_identifier(reindexStmt->name)); + break; + } + + case REINDEX_OBJECT_DATABASE: + { + appendStringInfo(buffer, "DATABASE %s%s", concurrentlyString, + quote_identifier(reindexStmt->name)); + break; + } + } +} + + /* deparse_index_columns appends index or include parameters to the provided buffer */ static void deparse_index_columns(StringInfo buffer, List *indexParameterList, List *deparseContext) diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index 204173671..a79381473 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -35,6 +35,8 @@ extern void EnsureRelationKindSupported(Oid relationId); extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId); extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid, StringInfo buffer); +extern void deparse_shard_reindex_statement(ReindexStmt *origStmt, Oid distrelid, + int64 shardid, StringInfo buffer); extern char * pg_get_indexclusterdef_string(Oid indexRelationId); extern List * pg_get_table_grants(Oid relationId); extern bool contain_nextval_expression_walker(Node *node, void *context); diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 94befe461..da4c93b85 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -58,7 +58,8 @@ extern List * PlanGrantStmt(GrantStmt *grantStmt); extern bool IsIndexRenameStmt(RenameStmt *renameStmt); extern List * PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand); -extern void ErrorIfReindexOnDistributedTable(ReindexStmt *ReindexStatement); +extern List * PlanReindexStmt(ReindexStmt *ReindexStatement, + const char *ReindexCommand); extern List * PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand); extern void PostProcessIndexStmt(IndexStmt *indexStmt); diff --git a/src/test/regress/expected/multi_index_statements.out b/src/test/regress/expected/multi_index_statements.out index 9a64dab13..f64d20041 100644 --- a/src/test/regress/expected/multi_index_statements.out +++ b/src/test/regress/expected/multi_index_statements.out @@ -207,17 +207,11 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t -- -- REINDEX -- -SET citus.log_remote_commands to on; -SET client_min_messages = LOG; REINDEX INDEX lineitem_orderkey_index; -ERROR: REINDEX is not implemented for distributed relations REINDEX TABLE lineitem; -ERROR: REINDEX is not implemented for distributed relations REINDEX SCHEMA public; REINDEX DATABASE regression; REINDEX SYSTEM regression; -SET citus.log_remote_commands to off; -RESET client_min_messages; -- -- DROP INDEX -- diff --git a/src/test/regress/expected/multi_index_statements_0.out b/src/test/regress/expected/multi_index_statements_0.out index ccd0d1e68..02e4ae61b 100644 --- a/src/test/regress/expected/multi_index_statements_0.out +++ b/src/test/regress/expected/multi_index_statements_0.out @@ -208,17 +208,11 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t -- -- REINDEX -- -SET citus.log_remote_commands to on; -SET client_min_messages = LOG; REINDEX INDEX lineitem_orderkey_index; -ERROR: REINDEX is not implemented for distributed relations REINDEX TABLE lineitem; -ERROR: REINDEX is not implemented for distributed relations REINDEX SCHEMA public; REINDEX DATABASE regression; REINDEX SYSTEM regression; -SET citus.log_remote_commands to off; -RESET client_min_messages; -- -- DROP INDEX -- diff --git a/src/test/regress/sql/multi_index_statements.sql b/src/test/regress/sql/multi_index_statements.sql index d1ea0bf93..9a7036353 100644 --- a/src/test/regress/sql/multi_index_statements.sql +++ b/src/test/regress/sql/multi_index_statements.sql @@ -117,15 +117,11 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t -- REINDEX -- -SET citus.log_remote_commands to on; -SET client_min_messages = LOG; REINDEX INDEX lineitem_orderkey_index; REINDEX TABLE lineitem; REINDEX SCHEMA public; REINDEX DATABASE regression; REINDEX SYSTEM regression; -SET citus.log_remote_commands to off; -RESET client_min_messages; -- -- DROP INDEX