diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 9a26090f8..4dae254c8 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -869,6 +869,23 @@ ExecuteModifyTasksWithoutResults(List *taskList) } +int64 +ExecuteSequentialTasksWithoutResults(List *taskList) +{ + ListCell *taskCell = NULL; + + foreach(taskCell, taskList) + { + Task *task = (Task *) lfirst(taskCell); + List *singleTask = list_make1(task); + + ExecuteModifyTasks(singleTask, false, NULL, NULL); + } + + return 0; +} + + /* * ExecuteModifyTasks executes a list of tasks on remote nodes, and * optionally retrieves the results and stores them in a tuple store. diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index bb51cc480..95b0be922 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -679,6 +679,7 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) { DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; + ddlJob->preventTransaction = createIndexStatement->concurrent; ddlJob->commandString = createIndexCommand; ddlJob->taskList = IndexTaskList(relationId, createIndexStatement); @@ -772,6 +773,7 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); ddlJob->targetRelationId = distributedRelationId; + ddlJob->preventTransaction = dropIndexStatement->concurrent; ddlJob->commandString = dropIndexCommand; ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId, dropIndexStatement); @@ -866,6 +868,7 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = leftRelationId; + ddlJob->preventTransaction = false; ddlJob->commandString = alterTableCommand; if (rightRelationId) @@ -1271,13 +1274,6 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement) "currently unsupported"))); } - if (createIndexStatement->concurrent) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("creating indexes concurrently on distributed tables is " - "currently unsupported"))); - } - if (createIndexStatement->unique) { RangeVar *relation = createIndexStatement->relation; @@ -1355,13 +1351,6 @@ ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement) errhint("Try dropping each object in a separate DROP " "command."))); } - - if (dropIndexStatement->concurrent) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("dropping indexes concurrently on distributed tables is " - "currently unsupported"))); - } } @@ -1994,13 +1983,28 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) EnsureCoordinator(); ShowNoticeIfNotUsing2PC(); + if (ddlJob->preventTransaction) + { + /* save old commit protocol to restore at xact end */ + Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE); + SavedMultiShardCommitProtocol = MultiShardCommitProtocol; + MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE; + } + if (shouldSyncMetadata) { SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString); } - ExecuteModifyTasksWithoutResults(ddlJob->taskList); + if (ddlJob->preventTransaction) + { + ExecuteSequentialTasksWithoutResults(ddlJob->taskList); + } + else + { + ExecuteModifyTasksWithoutResults(ddlJob->taskList); + } } @@ -2608,6 +2612,7 @@ PlanGrantStmt(GrantStmt *grantStmt) ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relOid; + ddlJob->preventTransaction = false; ddlJob->commandString = pstrdup(ddlString.data); ddlJob->taskList = DDLTaskList(relOid, ddlString.data); diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 8c9eafb7d..389e868c2 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -41,6 +41,7 @@ extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node); extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); +extern int64 ExecuteSequentialTasksWithoutResults(List *taskList); #endif /* MULTI_ROUTER_EXECUTOR_H_ */ diff --git a/src/include/distributed/multi_utility.h b/src/include/distributed/multi_utility.h index 143df5d65..00ebc1062 100644 --- a/src/include/distributed/multi_utility.h +++ b/src/include/distributed/multi_utility.h @@ -23,6 +23,7 @@ extern bool EnableDDLPropagation; typedef struct DDLJob { Oid targetRelationId; /* oid of the target distributed relation */ + bool preventTransaction; const char *commandString; /* initial (coordinator) DDL command string */ List *taskList; /* worker DDL tasks to execute */ } DDLJob; diff --git a/src/test/regress/expected/multi_index_statements.out b/src/test/regress/expected/multi_index_statements.out index d8c1f58ed..dadd7d52c 100644 --- a/src/test/regress/expected/multi_index_statements.out +++ b/src/test/regress/expected/multi_index_statements.out @@ -91,6 +91,8 @@ CREATE INDEX lineitem_orderkey_index on index_test_hash(a); ERROR: relation "lineitem_orderkey_index" already exists CREATE INDEX IF NOT EXISTS lineitem_orderkey_index on index_test_hash(a); NOTICE: relation "lineitem_orderkey_index" already exists, skipping +-- Verify that we can create indexes concurrently +CREATE INDEX CONCURRENTLY lineitem_concurrently_index ON lineitem (l_orderkey); -- Verify that all indexes got created on the master node and one of the workers SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_test_%' ORDER BY indexname; schemaname | tablename | indexname | tablespace | indexdef @@ -102,6 +104,7 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t public | index_test_range | index_test_range_index_a_b | | CREATE UNIQUE INDEX index_test_range_index_a_b ON index_test_range USING btree (a, b) public | index_test_range | index_test_range_index_a_b_partial | | CREATE UNIQUE INDEX index_test_range_index_a_b_partial ON index_test_range USING btree (a, b) WHERE (c IS NOT NULL) public | lineitem | lineitem_colref_index | | CREATE INDEX lineitem_colref_index ON lineitem USING btree (record_ne(lineitem.*, NULL::record)) + public | lineitem | lineitem_concurrently_index | | CREATE INDEX lineitem_concurrently_index ON lineitem USING btree (l_orderkey) public | lineitem | lineitem_orderkey_hash_index | | CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey) public | lineitem | lineitem_orderkey_index | | CREATE INDEX lineitem_orderkey_index ON lineitem USING btree (l_orderkey) public | lineitem | lineitem_orderkey_index_new | | CREATE INDEX lineitem_orderkey_index_new ON lineitem USING btree (l_orderkey) @@ -109,13 +112,13 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t public | lineitem | lineitem_partkey_desc_index | | CREATE INDEX lineitem_partkey_desc_index ON lineitem USING btree (l_partkey DESC) public | lineitem | lineitem_pkey | | CREATE UNIQUE INDEX lineitem_pkey ON lineitem USING btree (l_orderkey, l_linenumber) public | lineitem | lineitem_time_index | | CREATE INDEX lineitem_time_index ON lineitem USING btree (l_shipdate) -(14 rows) +(15 rows) \c - - - :worker_1_port SELECT count(*) FROM pg_indexes WHERE tablename = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1); count ------- - 8 + 9 (1 row) SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_hash%'; @@ -138,8 +141,6 @@ SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_append%'; \c - - - :master_port -- Verify that we error out on unsupported statement types -CREATE INDEX CONCURRENTLY try_index ON lineitem (l_orderkey); -ERROR: creating indexes concurrently on distributed tables is currently unsupported CREATE UNIQUE INDEX try_index ON lineitem (l_orderkey); ERROR: creating unique indexes on append-partitioned tables is currently unsupported CREATE INDEX try_index ON lineitem (l_orderkey) TABLESPACE newtablespace; @@ -180,6 +181,7 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t public | index_test_range | index_test_range_index_a_b | | CREATE UNIQUE INDEX index_test_range_index_a_b ON index_test_range USING btree (a, b) public | index_test_range | index_test_range_index_a_b_partial | | CREATE UNIQUE INDEX index_test_range_index_a_b_partial ON index_test_range USING btree (a, b) WHERE (c IS NOT NULL) public | lineitem | lineitem_colref_index | | CREATE INDEX lineitem_colref_index ON lineitem USING btree (record_ne(lineitem.*, NULL::record)) + public | lineitem | lineitem_concurrently_index | | CREATE INDEX lineitem_concurrently_index ON lineitem USING btree (l_orderkey) public | lineitem | lineitem_orderkey_hash_index | | CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey) public | lineitem | lineitem_orderkey_index | | CREATE INDEX lineitem_orderkey_index ON lineitem USING btree (l_orderkey) public | lineitem | lineitem_orderkey_index_new | | CREATE INDEX lineitem_orderkey_index_new ON lineitem USING btree (l_orderkey) @@ -187,7 +189,7 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t public | lineitem | lineitem_partkey_desc_index | | CREATE INDEX lineitem_partkey_desc_index ON lineitem USING btree (l_partkey DESC) public | lineitem | lineitem_pkey | | CREATE UNIQUE INDEX lineitem_pkey ON lineitem USING btree (l_orderkey, l_linenumber) public | lineitem | lineitem_time_index | | CREATE INDEX lineitem_time_index ON lineitem USING btree (l_shipdate) -(14 rows) +(15 rows) -- -- DROP INDEX @@ -196,9 +198,6 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t DROP INDEX lineitem_orderkey_index, lineitem_partial_index; ERROR: cannot drop multiple distributed objects in a single command HINT: Try dropping each object in a separate DROP command. --- Verify that we error out on the CONCURRENTLY clause -DROP INDEX CONCURRENTLY lineitem_orderkey_index; -ERROR: dropping indexes concurrently on distributed tables is currently unsupported -- Verify that we can succesfully drop indexes DROP INDEX lineitem_orderkey_index; NOTICE: using one-phase commit for distributed DDL commands @@ -221,6 +220,8 @@ DROP INDEX index_test_range_index_a_b_partial; DROP INDEX index_test_hash_index_a; DROP INDEX index_test_hash_index_a_b; DROP INDEX index_test_hash_index_a_b_partial; +-- Verify that we can drop indexes concurrently +DROP INDEX CONCURRENTLY lineitem_concurrently_index; -- Verify that all the indexes are dropped from the master and one worker node. -- As there's a primary key, so exclude those from this check. SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%'; diff --git a/src/test/regress/sql/multi_index_statements.sql b/src/test/regress/sql/multi_index_statements.sql index 58e52a20f..178ab92cd 100644 --- a/src/test/regress/sql/multi_index_statements.sql +++ b/src/test/regress/sql/multi_index_statements.sql @@ -64,6 +64,9 @@ CREATE INDEX IF NOT EXISTS lineitem_orderkey_index_new on lineitem(l_orderkey); CREATE INDEX lineitem_orderkey_index on index_test_hash(a); CREATE INDEX IF NOT EXISTS lineitem_orderkey_index on index_test_hash(a); +-- Verify that we can create indexes concurrently +CREATE INDEX CONCURRENTLY lineitem_concurrently_index ON lineitem (l_orderkey); + -- Verify that all indexes got created on the master node and one of the workers SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_test_%' ORDER BY indexname; \c - - - :worker_1_port @@ -75,7 +78,6 @@ SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_append%'; -- Verify that we error out on unsupported statement types -CREATE INDEX CONCURRENTLY try_index ON lineitem (l_orderkey); CREATE UNIQUE INDEX try_index ON lineitem (l_orderkey); CREATE INDEX try_index ON lineitem (l_orderkey) TABLESPACE newtablespace; @@ -105,9 +107,6 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t -- Verify that we can't drop multiple indexes in a single command DROP INDEX lineitem_orderkey_index, lineitem_partial_index; --- Verify that we error out on the CONCURRENTLY clause -DROP INDEX CONCURRENTLY lineitem_orderkey_index; - -- Verify that we can succesfully drop indexes DROP INDEX lineitem_orderkey_index; DROP INDEX lineitem_orderkey_index_new; @@ -130,6 +129,9 @@ DROP INDEX index_test_hash_index_a; DROP INDEX index_test_hash_index_a_b; DROP INDEX index_test_hash_index_a_b_partial; +-- Verify that we can drop indexes concurrently +DROP INDEX CONCURRENTLY lineitem_concurrently_index; + -- Verify that all the indexes are dropped from the master and one worker node. -- As there's a primary key, so exclude those from this check. SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%';