diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 9a26090f8..e546c3311 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -869,6 +869,27 @@ ExecuteModifyTasksWithoutResults(List *taskList) } +/* + * ExecuteTasksSequentiallyWithoutResults basically calls ExecuteModifyTasks in + * a loop in order to simulate sequential execution of a list of tasks. Useful + * in cases where issuing commands in parallel before waiting for results could + * result in deadlocks (such as CREATE INDEX CONCURRENTLY). + */ +void +ExecuteTasksSequentiallyWithoutResults(List *taskList) +{ + ListCell *taskCell = NULL; + + foreach(taskCell, taskList) + { + Task *task = (Task *) lfirst(taskCell); + List *singleTask = list_make1(task); + + ExecuteModifyTasksWithoutResults(singleTask); + } +} + + /* * ExecuteModifyTasks executes a list of tasks on remote nodes, and * optionally retrieves the results and stores them in a tuple store. diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 745b440ef..d70fd1930 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -24,6 +24,7 @@ #include "catalog/catalog.h" #include "catalog/dependency.h" #include "catalog/index.h" +#include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_attribute.h" #include "catalog/pg_class.h" @@ -134,12 +135,15 @@ static bool IsAlterTableRenameStmt(RenameStmt *renameStatement); static void ExecuteDistributedDDLJob(DDLJob *ddlJob); static void ShowNoticeIfNotUsing2PC(void); static List * DDLTaskList(Oid relationId, const char *commandString); +static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt); +static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt); static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId, const char *commandString); static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg); static void CheckCopyPermissions(CopyStmt *copyStatement); static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist); +static void PostProcessUtility(Node *parsetree); static bool warnedUserAbout2PC = false; @@ -367,6 +371,8 @@ multi_ProcessUtility(Node *parsetree, standard_ProcessUtility(parsetree, queryString, context, params, dest, completionTag); + PostProcessUtility(parsetree); + if (commandMustRunAsOwner) { SetUserIdAndSecContext(savedUserId, savedSecurityContext); @@ -677,8 +683,9 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) { DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; + ddlJob->concurrentIndexCmd = createIndexStatement->concurrent; ddlJob->commandString = createIndexCommand; - ddlJob->taskList = DDLTaskList(relationId, createIndexCommand); + ddlJob->taskList = CreateIndexTaskList(relationId, createIndexStatement); ddlJobs = list_make1(ddlJob); } @@ -770,8 +777,10 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); ddlJob->targetRelationId = distributedRelationId; + ddlJob->concurrentIndexCmd = dropIndexStatement->concurrent; ddlJob->commandString = dropIndexCommand; - ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand); + ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId, + dropIndexStatement); ddlJobs = list_make1(ddlJob); } @@ -863,6 +872,7 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = leftRelationId; + ddlJob->concurrentIndexCmd = false; ddlJob->commandString = alterTableCommand; if (rightRelationId) @@ -1268,13 +1278,6 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement) "currently unsupported"))); } - if (createIndexStatement->concurrent) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("creating indexes concurrently on distributed tables is " - "currently unsupported"))); - } - if (createIndexStatement->unique) { RangeVar *relation = createIndexStatement->relation; @@ -1352,13 +1355,6 @@ ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement) errhint("Try dropping each object in a separate DROP " "command."))); } - - if (dropIndexStatement->concurrent) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("dropping indexes concurrently on distributed tables is " - "currently unsupported"))); - } } @@ -1989,15 +1985,49 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) } EnsureCoordinator(); - ShowNoticeIfNotUsing2PC(); - if (shouldSyncMetadata) + if (!ddlJob->concurrentIndexCmd) { - SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); - SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString); - } + ShowNoticeIfNotUsing2PC(); - ExecuteModifyTasksWithoutResults(ddlJob->taskList); + if (shouldSyncMetadata) + { + SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); + SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString); + } + + ExecuteModifyTasksWithoutResults(ddlJob->taskList); + } + else + { + /* save old commit protocol to restore at xact end */ + Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE); + SavedMultiShardCommitProtocol = MultiShardCommitProtocol; + MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE; + + PG_TRY(); + { + ExecuteTasksSequentiallyWithoutResults(ddlJob->taskList); + + if (shouldSyncMetadata) + { + List *commandList = list_make2(DISABLE_DDL_PROPAGATION, + (char *) ddlJob->commandString); + + SendBareCommandListToWorkers(WORKERS_WITH_METADATA, commandList); + } + } + PG_CATCH(); + { + ereport(ERROR, + (errmsg("CONCURRENTLY-enabled index command failed"), + errdetail("CONCURRENTLY-enabled index commands can fail partially, " + "leaving behind an INVALID index."), + errhint("Use DROP INDEX CONCURRENTLY IF EXISTS to remove the " + "invalid index, then retry the original command."))); + } + PG_END_TRY(); + } } @@ -2071,6 +2101,117 @@ DDLTaskList(Oid relationId, const char *commandString) } +/* + * CreateIndexTaskList builds a list of tasks to execute a CREATE INDEX command + * against a specified distributed table. + */ +static List * +CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt) +{ + List *taskList = NIL; + List *shardIntervalList = LoadShardIntervalList(relationId); + ListCell *shardIntervalCell = NULL; + Oid schemaId = get_rel_namespace(relationId); + char *schemaName = get_namespace_name(schemaId); + StringInfoData ddlString; + uint64 jobId = INVALID_JOB_ID; + int taskId = 1; + + initStringInfo(&ddlString); + + /* set statement's schema name if it is not set already */ + if (indexStmt->relation->schemaname == NULL) + { + indexStmt->relation->schemaname = schemaName; + } + + /* lock metadata before getting placement lists */ + LockShardListMetadata(shardIntervalList, ShareLock); + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + uint64 shardId = shardInterval->shardId; + Task *task = NULL; + + deparse_shard_index_statement(indexStmt, relationId, shardId, &ddlString); + + task = CitusMakeNode(Task); + task->jobId = jobId; + task->taskId = taskId++; + task->taskType = DDL_TASK; + task->queryString = pstrdup(ddlString.data); + task->replicationModel = REPLICATION_MODEL_INVALID; + task->dependedTaskList = NULL; + task->anchorShardId = shardId; + task->taskPlacementList = FinalizedShardPlacementList(shardId); + + taskList = lappend(taskList, task); + + resetStringInfo(&ddlString); + } + + return taskList; +} + + +/* + * DropIndexTaskList builds a list of tasks to execute a DROP INDEX command + * against a specified distributed table. + */ +static List * +DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt) +{ + List *taskList = NIL; + List *shardIntervalList = LoadShardIntervalList(relationId); + ListCell *shardIntervalCell = NULL; + char *indexName = get_rel_name(indexId); + Oid schemaId = get_rel_namespace(indexId); + char *schemaName = get_namespace_name(schemaId); + StringInfoData ddlString; + uint64 jobId = INVALID_JOB_ID; + int taskId = 1; + + initStringInfo(&ddlString); + + /* lock metadata before getting placement lists */ + LockShardListMetadata(shardIntervalList, ShareLock); + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + uint64 shardId = shardInterval->shardId; + char *shardIndexName = pstrdup(indexName); + Task *task = NULL; + + AppendShardIdToName(&shardIndexName, shardId); + + /* deparse shard-specific DROP INDEX command */ + appendStringInfo(&ddlString, "DROP INDEX %s %s %s %s", + (dropStmt->concurrent ? "CONCURRENTLY" : ""), + (dropStmt->missing_ok ? "IF EXISTS" : ""), + quote_qualified_identifier(schemaName, shardIndexName), + (dropStmt->behavior == DROP_RESTRICT ? "RESTRICT" : "CASCADE")); + + task = CitusMakeNode(Task); + task->jobId = jobId; + task->taskId = taskId++; + task->taskType = DDL_TASK; + task->queryString = pstrdup(ddlString.data); + task->replicationModel = REPLICATION_MODEL_INVALID; + task->dependedTaskList = NULL; + task->anchorShardId = shardId; + task->taskPlacementList = FinalizedShardPlacementList(shardId); + + taskList = lappend(taskList, task); + + resetStringInfo(&ddlString); + } + + return taskList; +} + + /* * ForeignKeyTaskList builds a list of tasks to execute a foreign key command on a * shards of given list of distributed table. @@ -2357,6 +2498,83 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) } +/* + * PostProcessUtility performs additional tasks after a utility's local portion + * has been completed. Right now, the sole use is marking new indexes invalid + * if they were created using the CONCURRENTLY flag. This (non-transactional) + * change provides the fallback state if an error is raised, otherwise a sub- + * sequent change to valid will be committed. + */ +static void +PostProcessUtility(Node *parsetree) +{ + IndexStmt *indexStmt = NULL; + Relation relation = NULL; + Oid indexRelationId = InvalidOid; + Relation indexRelation = NULL; + Relation pg_index = NULL; + HeapTuple indexTuple = NULL; + Form_pg_index indexForm = NULL; + + /* only IndexStmts are processed */ + if (!IsA(parsetree, IndexStmt)) + { + return; + } + + /* and even then only if they're CONCURRENT */ + indexStmt = (IndexStmt *) parsetree; + if (!indexStmt->concurrent) + { + return; + } + + /* finally, this logic only applies to the coordinator */ + if (!IsCoordinator()) + { + return; + } + + /* commit the current transaction and start anew */ + CommitTransactionCommand(); + StartTransactionCommand(); + + /* get the affected relation and index */ + relation = heap_openrv(indexStmt->relation, ShareUpdateExclusiveLock); + indexRelationId = get_relname_relid(indexStmt->idxname, + RelationGetNamespace(relation)); + indexRelation = index_open(indexRelationId, RowExclusiveLock); + + /* close relations but retain locks */ + heap_close(relation, NoLock); + index_close(indexRelation, NoLock); + + /* mark index as invalid, in-place (cannot be rolled back) */ + index_set_state_flags(indexRelationId, INDEX_DROP_CLEAR_VALID); + + /* re-open a transaction command from here on out */ + CommitTransactionCommand(); + StartTransactionCommand(); + + /* now, update index's validity in a way that can roll back */ + pg_index = heap_open(IndexRelationId, RowExclusiveLock); + + indexTuple = SearchSysCacheCopy1(INDEXRELID, ObjectIdGetDatum(indexRelationId)); + Assert(HeapTupleIsValid(indexTuple)); /* better be present, we have lock! */ + + /* mark as valid, save, and update pg_index indexes */ + indexForm = (Form_pg_index) GETSTRUCT(indexTuple); + indexForm->indisvalid = true; + + simple_heap_update(pg_index, &indexTuple->t_self, indexTuple); + CatalogUpdateIndexes(pg_index, indexTuple); + + /* clean up; index now marked valid, but ROLLBACK will mark invalid */ + heap_freetuple(indexTuple); + heap_close(pg_index, RowExclusiveLock); +} + + /* * PlanGrantStmt determines whether a given GRANT/REVOKE statement involves * a distributed table. If so, it creates DDLJobs to encapsulate information @@ -2495,6 +2713,7 @@ PlanGrantStmt(GrantStmt *grantStmt) ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relOid; + ddlJob->concurrentIndexCmd = false; ddlJob->commandString = pstrdup(ddlString.data); ddlJob->taskList = DDLTaskList(relOid, ddlString.data); diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 64f3f6460..6f54de4c5 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -68,6 +68,51 @@ SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command) } +/* + * SendBareCommandListToWorkers sends a list of commands to a set of target + * workers in serial. Commands are committed immediately: new connections are + * always used and no transaction block is used (hence "bare"). The connections + * are made as the extension owner to ensure write access to the Citus metadata + * tables. Primarly useful for INDEX commands using CONCURRENTLY. + */ +void +SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList) +{ + List *workerNodeList = WorkerNodeList(); + ListCell *workerNodeCell = NULL; + char *nodeUser = CitusExtensionOwnerName(); + ListCell *commandCell = NULL; + + /* run commands serially */ + foreach(workerNodeCell, workerNodeList) + { + MultiConnection *workerConnection = NULL; + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + int connectionFlags = FORCE_NEW_CONNECTION; + + if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata) + { + continue; + } + + workerConnection = GetNodeUserDatabaseConnection(connectionFlags, nodeName, + nodePort, nodeUser, NULL); + + /* iterate over the commands and execute them in the same connection */ + foreach(commandCell, commandList) + { + char *commandString = lfirst(commandCell); + + ExecuteCriticalRemoteCommand(workerConnection, commandString); + } + + CloseConnection(workerConnection); + } +} + + /* * SendCommandToWorkersParams sends a command to all workers in parallel. * Commands are committed on the workers when the local transaction commits. The diff --git a/src/backend/distributed/utils/citus_ruleutils.c b/src/backend/distributed/utils/citus_ruleutils.c index 3342d3394..0da28da53 100644 --- a/src/backend/distributed/utils/citus_ruleutils.c +++ b/src/backend/distributed/utils/citus_ruleutils.c @@ -24,6 +24,7 @@ #include "access/tupdesc.h" #include "catalog/dependency.h" #include "catalog/indexing.h" +#include "catalog/namespace.h" #include "catalog/pg_attribute.h" #include "catalog/pg_authid.h" #include "catalog/pg_class.h" @@ -33,12 +34,14 @@ #include "commands/defrem.h" #include "commands/extension.h" #include "distributed/citus_ruleutils.h" +#include "distributed/relay_utility.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "nodes/nodes.h" #include "nodes/nodeFuncs.h" #include "nodes/parsenodes.h" #include "nodes/pg_list.h" +#include "parser/parse_utilcmd.h" #include "storage/lock.h" #include "utils/acl.h" #include "utils/array.h" @@ -587,6 +590,104 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId) } +/* + * deparse_shard_index_statement uses the provided CREATE INDEX node, dist. + * relation, and shard identifier to populate a provided buffer with a string + * representation of a shard-extended version of that command. + */ +void +deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid, + StringInfo buffer) +{ + IndexStmt *indexStmt = copyObject(origStmt); /* copy to avoid modifications */ + char *relationName = indexStmt->relation->relname; + char *indexName = indexStmt->idxname; + ListCell *indexParameterCell = NULL; + List *deparseContext = NULL; + + /* extend relation and index name using shard identifier */ + AppendShardIdToName(&relationName, shardid); + AppendShardIdToName(&indexName, shardid); + + /* use extended shard name and transformed stmt for deparsing */ + deparseContext = deparse_context_for(relationName, distrelid); + indexStmt = transformIndexStmt(distrelid, indexStmt, NULL); + + appendStringInfo(buffer, "CREATE %s INDEX %s %s %s ON %s USING %s ", + (indexStmt->unique ? "UNIQUE" : ""), + (indexStmt->concurrent ? "CONCURRENTLY" : ""), + (indexStmt->if_not_exists ? "IF NOT EXISTS" : ""), + quote_identifier(indexName), + quote_qualified_identifier(indexStmt->relation->schemaname, + relationName), + indexStmt->accessMethod); + + /* index column or expression list begins here */ + appendStringInfoChar(buffer, '('); + + foreach(indexParameterCell, indexStmt->indexParams) + { + IndexElem *indexElement = (IndexElem *) lfirst(indexParameterCell); + + /* use commas to separate subsequent elements */ + if (indexParameterCell != list_head(indexStmt->indexParams)) + { + appendStringInfoChar(buffer, ','); + } + + if (indexElement->name) + { + appendStringInfo(buffer, "%s ", quote_identifier(indexElement->name)); + } + else if (indexElement->expr) + { + appendStringInfo(buffer, "(%s)", deparse_expression(indexElement->expr, + deparseContext, false, + false)); + } + + if (indexElement->collation != NIL) + { + appendStringInfo(buffer, "COLLATE %s ", + NameListToQuotedString(indexElement->collation)); + } + + if (indexElement->opclass != NIL) + { + appendStringInfo(buffer, "%s ", + NameListToQuotedString(indexElement->opclass)); + } + + if (indexElement->ordering != SORTBY_DEFAULT) + { + bool sortAsc = (indexElement->ordering == SORTBY_ASC); + appendStringInfo(buffer, "%s ", (sortAsc ? "ASC" : "DESC")); + } + + if (indexElement->nulls_ordering != SORTBY_NULLS_DEFAULT) + { + bool nullsFirst = (indexElement->nulls_ordering == SORTBY_NULLS_FIRST); + appendStringInfo(buffer, "NULLS %s ", (nullsFirst ? "FIRST" : "LAST")); + } + } + + appendStringInfoString(buffer, ") "); + + if (indexStmt->options != NIL) + { + appendStringInfoString(buffer, "WITH "); + AppendOptionListToString(buffer, indexStmt->options); + } + + if (indexStmt->whereClause != NULL) + { + appendStringInfo(buffer, "WHERE %s", deparse_expression(indexStmt->whereClause, + deparseContext, false, + false)); + } +} + + /* * pg_get_indexclusterdef_string returns the definition of a cluster statement * for given index. The function returns null if the table is not clustered on diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index b64e1a0e8..e7eaf9204 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -32,13 +32,15 @@ extern char * pg_get_sequencedef_string(Oid sequenceRelid); extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId); extern char * pg_get_tableschemadef_string(Oid tableRelationId, bool forShardCreation); extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId); +extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, + int64 shardid, StringInfo buffer); extern char * pg_get_indexclusterdef_string(Oid indexRelationId); extern List * pg_get_table_grants(Oid relationId); /* Function declarations for version dependent PostgreSQL ruleutils functions */ extern void pg_get_query_def(Query *query, StringInfo buffer); -extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid, StringInfo - buffer); +extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid, + StringInfo buffer); extern char * generate_relation_name(Oid relid, List *namespaces); extern char * generate_qualified_relation_name(Oid relid); diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 8c9eafb7d..b5db9ddf9 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -41,6 +41,7 @@ extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node); extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); +extern void ExecuteTasksSequentiallyWithoutResults(List *taskList); #endif /* MULTI_ROUTER_EXECUTOR_H_ */ diff --git a/src/include/distributed/multi_utility.h b/src/include/distributed/multi_utility.h index 143df5d65..7444c10a1 100644 --- a/src/include/distributed/multi_utility.h +++ b/src/include/distributed/multi_utility.h @@ -23,6 +23,7 @@ extern bool EnableDDLPropagation; typedef struct DDLJob { Oid targetRelationId; /* oid of the target distributed relation */ + bool concurrentIndexCmd; /* related to a CONCURRENTLY index command? */ const char *commandString; /* initial (coordinator) DDL command string */ List *taskList; /* worker DDL tasks to execute */ } DDLJob; diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 871c1ac4c..a63f12a78 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -30,6 +30,8 @@ typedef enum TargetWorkerSet extern List * GetWorkerTransactions(void); extern void SendCommandToWorker(char *nodeName, int32 nodePort, char *command); extern void SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command); +extern void SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, + List *commandList); extern void SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command, int parameterCount, const Oid *parameterTypes, const char *const *parameterValues); diff --git a/src/test/regress/expected/multi_index_statements.out b/src/test/regress/expected/multi_index_statements.out index d8c1f58ed..b92999c8b 100644 --- a/src/test/regress/expected/multi_index_statements.out +++ b/src/test/regress/expected/multi_index_statements.out @@ -91,6 +91,8 @@ CREATE INDEX lineitem_orderkey_index on index_test_hash(a); ERROR: relation "lineitem_orderkey_index" already exists CREATE INDEX IF NOT EXISTS lineitem_orderkey_index on index_test_hash(a); NOTICE: relation "lineitem_orderkey_index" already exists, skipping +-- Verify that we can create indexes concurrently +CREATE INDEX CONCURRENTLY lineitem_concurrently_index ON lineitem (l_orderkey); -- Verify that all indexes got created on the master node and one of the workers SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_test_%' ORDER BY indexname; schemaname | tablename | indexname | tablespace | indexdef @@ -102,6 +104,7 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t public | index_test_range | index_test_range_index_a_b | | CREATE UNIQUE INDEX index_test_range_index_a_b ON index_test_range USING btree (a, b) public | index_test_range | index_test_range_index_a_b_partial | | CREATE UNIQUE INDEX index_test_range_index_a_b_partial ON index_test_range USING btree (a, b) WHERE (c IS NOT NULL) public | lineitem | lineitem_colref_index | | CREATE INDEX lineitem_colref_index ON lineitem USING btree (record_ne(lineitem.*, NULL::record)) + public | lineitem | lineitem_concurrently_index | | CREATE INDEX lineitem_concurrently_index ON lineitem USING btree (l_orderkey) public | lineitem | lineitem_orderkey_hash_index | | CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey) public | lineitem | lineitem_orderkey_index | | CREATE INDEX lineitem_orderkey_index ON lineitem USING btree (l_orderkey) public | lineitem | lineitem_orderkey_index_new | | CREATE INDEX lineitem_orderkey_index_new ON lineitem USING btree (l_orderkey) @@ -109,13 +112,13 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t public | lineitem | lineitem_partkey_desc_index | | CREATE INDEX lineitem_partkey_desc_index ON lineitem USING btree (l_partkey DESC) public | lineitem | lineitem_pkey | | CREATE UNIQUE INDEX lineitem_pkey ON lineitem USING btree (l_orderkey, l_linenumber) public | lineitem | lineitem_time_index | | CREATE INDEX lineitem_time_index ON lineitem USING btree (l_shipdate) -(14 rows) +(15 rows) \c - - - :worker_1_port SELECT count(*) FROM pg_indexes WHERE tablename = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1); count ------- - 8 + 9 (1 row) SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_hash%'; @@ -138,8 +141,6 @@ SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_append%'; \c - - - :master_port -- Verify that we error out on unsupported statement types -CREATE INDEX CONCURRENTLY try_index ON lineitem (l_orderkey); -ERROR: creating indexes concurrently on distributed tables is currently unsupported CREATE UNIQUE INDEX try_index ON lineitem (l_orderkey); ERROR: creating unique indexes on append-partitioned tables is currently unsupported CREATE INDEX try_index ON lineitem (l_orderkey) TABLESPACE newtablespace; @@ -180,6 +181,7 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t public | index_test_range | index_test_range_index_a_b | | CREATE UNIQUE INDEX index_test_range_index_a_b ON index_test_range USING btree (a, b) public | index_test_range | index_test_range_index_a_b_partial | | CREATE UNIQUE INDEX index_test_range_index_a_b_partial ON index_test_range USING btree (a, b) WHERE (c IS NOT NULL) public | lineitem | lineitem_colref_index | | CREATE INDEX lineitem_colref_index ON lineitem USING btree (record_ne(lineitem.*, NULL::record)) + public | lineitem | lineitem_concurrently_index | | CREATE INDEX lineitem_concurrently_index ON lineitem USING btree (l_orderkey) public | lineitem | lineitem_orderkey_hash_index | | CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey) public | lineitem | lineitem_orderkey_index | | CREATE INDEX lineitem_orderkey_index ON lineitem USING btree (l_orderkey) public | lineitem | lineitem_orderkey_index_new | | CREATE INDEX lineitem_orderkey_index_new ON lineitem USING btree (l_orderkey) @@ -187,7 +189,7 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t public | lineitem | lineitem_partkey_desc_index | | CREATE INDEX lineitem_partkey_desc_index ON lineitem USING btree (l_partkey DESC) public | lineitem | lineitem_pkey | | CREATE UNIQUE INDEX lineitem_pkey ON lineitem USING btree (l_orderkey, l_linenumber) public | lineitem | lineitem_time_index | | CREATE INDEX lineitem_time_index ON lineitem USING btree (l_shipdate) -(14 rows) +(15 rows) -- -- DROP INDEX @@ -196,9 +198,6 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t DROP INDEX lineitem_orderkey_index, lineitem_partial_index; ERROR: cannot drop multiple distributed objects in a single command HINT: Try dropping each object in a separate DROP command. --- Verify that we error out on the CONCURRENTLY clause -DROP INDEX CONCURRENTLY lineitem_orderkey_index; -ERROR: dropping indexes concurrently on distributed tables is currently unsupported -- Verify that we can succesfully drop indexes DROP INDEX lineitem_orderkey_index; NOTICE: using one-phase commit for distributed DDL commands @@ -221,6 +220,8 @@ DROP INDEX index_test_range_index_a_b_partial; DROP INDEX index_test_hash_index_a; DROP INDEX index_test_hash_index_a_b; DROP INDEX index_test_hash_index_a_b_partial; +-- Verify that we can drop indexes concurrently +DROP INDEX CONCURRENTLY lineitem_concurrently_index; -- Verify that all the indexes are dropped from the master and one worker node. -- As there's a primary key, so exclude those from this check. SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%'; @@ -244,7 +245,47 @@ SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname; ------------+-----------+-----------+------------+---------- (0 rows) +-- create index that will conflict with master operations +CREATE INDEX CONCURRENTLY ith_b_idx_102089 ON index_test_hash_102089(b); \c - - - :master_port +-- should fail because worker index already exists +CREATE INDEX CONCURRENTLY ith_b_idx ON index_test_hash(b); +ERROR: CONCURRENTLY-enabled index command failed +DETAIL: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index. +HINT: Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index, then retry the original command. +-- the failure results in an INVALID index +SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass; + Index Valid? +-------------- + f +(1 row) + +-- we can clean it up and recreate with an DROP IF EXISTS +DROP INDEX CONCURRENTLY IF EXISTS ith_b_idx; +CREATE INDEX CONCURRENTLY ith_b_idx ON index_test_hash(b); +SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass; + Index Valid? +-------------- + t +(1 row) + +\c - - - :worker_1_port +-- now drop shard index to test partial master DROP failure +DROP INDEX CONCURRENTLY ith_b_idx_102089; +\c - - - :master_port +DROP INDEX CONCURRENTLY ith_b_idx; +ERROR: CONCURRENTLY-enabled index command failed +DETAIL: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index. +HINT: Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index, then retry the original command. +-- the failure results in an INVALID index +SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass; + Index Valid? +-------------- + f +(1 row) + +-- final clean up +DROP INDEX CONCURRENTLY IF EXISTS ith_b_idx; -- Drop created tables DROP TABLE index_test_range; DROP TABLE index_test_hash; diff --git a/src/test/regress/expected/multi_mx_ddl.out b/src/test/regress/expected/multi_mx_ddl.out index df9d775ae..2572e532f 100644 --- a/src/test/regress/expected/multi_mx_ddl.out +++ b/src/test/regress/expected/multi_mx_ddl.out @@ -18,6 +18,7 @@ SELECT * FROM mx_ddl_table ORDER BY key; CREATE INDEX ddl_test_index ON mx_ddl_table(value); NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +CREATE INDEX CONCURRENTLY ddl_test_concurrent_index ON mx_ddl_table(value); -- ADD COLUMN ALTER TABLE mx_ddl_table ADD COLUMN version INTEGER; -- SET DEFAULT @@ -40,6 +41,7 @@ ALTER TABLE mx_ddl_table ALTER COLUMN version SET NOT NULL; version | integer | not null default 1 Indexes: "mx_ddl_table_pkey" PRIMARY KEY, btree (key) + "ddl_test_concurrent_index" btree (value) "ddl_test_index" btree (value) \c - - - :worker_1_port @@ -52,9 +54,21 @@ Indexes: version | integer | not null default 1 Indexes: "mx_ddl_table_pkey" PRIMARY KEY, btree (key) + "ddl_test_concurrent_index" btree (value) "ddl_test_index" btree (value) -\d mx_ddl_table_1600000 +\d mx_ddl_table_1220088 + Table "public.mx_ddl_table_1220088" + Column | Type | Modifiers +---------+---------+-------------------- + key | integer | not null + value | integer | + version | integer | not null default 1 +Indexes: + "mx_ddl_table_pkey_1220088" PRIMARY KEY, btree (key) + "ddl_test_concurrent_index_1220088" btree (value) + "ddl_test_index_1220088" btree (value) + \c - - - :worker_2_port \d mx_ddl_table Table "public.mx_ddl_table" @@ -65,9 +79,21 @@ Indexes: version | integer | not null default 1 Indexes: "mx_ddl_table_pkey" PRIMARY KEY, btree (key) + "ddl_test_concurrent_index" btree (value) "ddl_test_index" btree (value) -\d mx_ddl_table_1600001 +\d mx_ddl_table_1220089 + Table "public.mx_ddl_table_1220089" + Column | Type | Modifiers +---------+---------+-------------------- + key | integer | not null + value | integer | + version | integer | not null default 1 +Indexes: + "mx_ddl_table_pkey_1220089" PRIMARY KEY, btree (key) + "ddl_test_concurrent_index_1220089" btree (value) + "ddl_test_index_1220089" btree (value) + INSERT INTO mx_ddl_table VALUES (37, 78, 2); INSERT INTO mx_ddl_table VALUES (38, 78); -- Switch to the coordinator @@ -100,6 +126,7 @@ SELECT * FROM mx_ddl_table ORDER BY key; DROP INDEX ddl_test_index; NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +DROP INDEX CONCURRENTLY ddl_test_concurrent_index; -- DROP DEFAULT ALTER TABLE mx_ddl_table ALTER COLUMN version DROP DEFAULT; -- DROP NOT NULL @@ -126,7 +153,15 @@ Indexes: Indexes: "mx_ddl_table_pkey" PRIMARY KEY, btree (key) -\d mx_ddl_table_1600000 +\d mx_ddl_table_1220088 +Table "public.mx_ddl_table_1220088" + Column | Type | Modifiers +--------+---------+----------- + key | integer | not null + value | integer | +Indexes: + "mx_ddl_table_pkey_1220088" PRIMARY KEY, btree (key) + \c - - - :worker_2_port \d mx_ddl_table Table "public.mx_ddl_table" @@ -137,7 +172,15 @@ Indexes: Indexes: "mx_ddl_table_pkey" PRIMARY KEY, btree (key) -\d mx_ddl_table_1600001 +\d mx_ddl_table_1220089 +Table "public.mx_ddl_table_1220089" + Column | Type | Modifiers +--------+---------+----------- + key | integer | not null + value | integer | +Indexes: + "mx_ddl_table_pkey_1220089" PRIMARY KEY, btree (key) + -- Show that DDL commands are done within a two-phase commit transaction \c - - - :master_port SET client_min_messages TO debug2; diff --git a/src/test/regress/sql/multi_index_statements.sql b/src/test/regress/sql/multi_index_statements.sql index 58e52a20f..784a7a988 100644 --- a/src/test/regress/sql/multi_index_statements.sql +++ b/src/test/regress/sql/multi_index_statements.sql @@ -64,6 +64,9 @@ CREATE INDEX IF NOT EXISTS lineitem_orderkey_index_new on lineitem(l_orderkey); CREATE INDEX lineitem_orderkey_index on index_test_hash(a); CREATE INDEX IF NOT EXISTS lineitem_orderkey_index on index_test_hash(a); +-- Verify that we can create indexes concurrently +CREATE INDEX CONCURRENTLY lineitem_concurrently_index ON lineitem (l_orderkey); + -- Verify that all indexes got created on the master node and one of the workers SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_test_%' ORDER BY indexname; \c - - - :worker_1_port @@ -75,7 +78,6 @@ SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_append%'; -- Verify that we error out on unsupported statement types -CREATE INDEX CONCURRENTLY try_index ON lineitem (l_orderkey); CREATE UNIQUE INDEX try_index ON lineitem (l_orderkey); CREATE INDEX try_index ON lineitem (l_orderkey) TABLESPACE newtablespace; @@ -105,9 +107,6 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t -- Verify that we can't drop multiple indexes in a single command DROP INDEX lineitem_orderkey_index, lineitem_partial_index; --- Verify that we error out on the CONCURRENTLY clause -DROP INDEX CONCURRENTLY lineitem_orderkey_index; - -- Verify that we can succesfully drop indexes DROP INDEX lineitem_orderkey_index; DROP INDEX lineitem_orderkey_index_new; @@ -130,6 +129,9 @@ DROP INDEX index_test_hash_index_a; DROP INDEX index_test_hash_index_a_b; DROP INDEX index_test_hash_index_a_b_partial; +-- Verify that we can drop indexes concurrently +DROP INDEX CONCURRENTLY lineitem_concurrently_index; + -- Verify that all the indexes are dropped from the master and one worker node. -- As there's a primary key, so exclude those from this check. SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%'; @@ -137,8 +139,37 @@ SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname; \c - - - :worker_1_port SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%'; SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname; + +-- create index that will conflict with master operations +CREATE INDEX CONCURRENTLY ith_b_idx_102089 ON index_test_hash_102089(b); + \c - - - :master_port +-- should fail because worker index already exists +CREATE INDEX CONCURRENTLY ith_b_idx ON index_test_hash(b); + +-- the failure results in an INVALID index +SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass; + +-- we can clean it up and recreate with an DROP IF EXISTS +DROP INDEX CONCURRENTLY IF EXISTS ith_b_idx; +CREATE INDEX CONCURRENTLY ith_b_idx ON index_test_hash(b); +SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass; + +\c - - - :worker_1_port + +-- now drop shard index to test partial master DROP failure +DROP INDEX CONCURRENTLY ith_b_idx_102089; + +\c - - - :master_port +DROP INDEX CONCURRENTLY ith_b_idx; + +-- the failure results in an INVALID index +SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass; + +-- final clean up +DROP INDEX CONCURRENTLY IF EXISTS ith_b_idx; + -- Drop created tables DROP TABLE index_test_range; DROP TABLE index_test_hash; diff --git a/src/test/regress/sql/multi_mx_ddl.sql b/src/test/regress/sql/multi_mx_ddl.sql index 0afa624fd..f0dd5cacf 100644 --- a/src/test/regress/sql/multi_mx_ddl.sql +++ b/src/test/regress/sql/multi_mx_ddl.sql @@ -8,6 +8,8 @@ SELECT * FROM mx_ddl_table ORDER BY key; -- CREATE INDEX CREATE INDEX ddl_test_index ON mx_ddl_table(value); +CREATE INDEX CONCURRENTLY ddl_test_concurrent_index ON mx_ddl_table(value); + -- ADD COLUMN ALTER TABLE mx_ddl_table ADD COLUMN version INTEGER; @@ -27,13 +29,13 @@ ALTER TABLE mx_ddl_table ALTER COLUMN version SET NOT NULL; \d mx_ddl_table -\d mx_ddl_table_1600000 +\d mx_ddl_table_1220088 \c - - - :worker_2_port \d mx_ddl_table -\d mx_ddl_table_1600001 +\d mx_ddl_table_1220089 INSERT INTO mx_ddl_table VALUES (37, 78, 2); INSERT INTO mx_ddl_table VALUES (38, 78); @@ -56,6 +58,8 @@ SELECT * FROM mx_ddl_table ORDER BY key; -- DROP INDEX DROP INDEX ddl_test_index; +DROP INDEX CONCURRENTLY ddl_test_concurrent_index; + -- DROP DEFAULT ALTER TABLE mx_ddl_table ALTER COLUMN version DROP DEFAULT; @@ -73,13 +77,13 @@ ALTER TABLE mx_ddl_table DROP COLUMN version; \d mx_ddl_table -\d mx_ddl_table_1600000 +\d mx_ddl_table_1220088 \c - - - :worker_2_port \d mx_ddl_table -\d mx_ddl_table_1600001 +\d mx_ddl_table_1220089 -- Show that DDL commands are done within a two-phase commit transaction \c - - - :master_port