diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 4b0aad2ec..70e1e939f 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -1993,24 +1993,27 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE); SavedMultiShardCommitProtocol = MultiShardCommitProtocol; MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE; - } - else - { - ShowNoticeIfNotUsing2PC(); - } - if (shouldSyncMetadata) - { - SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); - SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString); - } + if (shouldSyncMetadata) + { + List *commandList = list_make2(DISABLE_DDL_PROPAGATION, + (char *) ddlJob->commandString); + + SendBareCommandListToWorkers(WORKERS_WITH_METADATA, commandList); + } - if (ddlJob->preventTransaction) - { ExecuteSequentialTasksWithoutResults(ddlJob->taskList); } else { + ShowNoticeIfNotUsing2PC(); + + if (shouldSyncMetadata) + { + SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); + SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString); + } + ExecuteModifyTasksWithoutResults(ddlJob->taskList); } } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 64f3f6460..cd010523e 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -68,6 +68,51 @@ SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command) } +void +SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList) +{ + List *workerNodeList = WorkerNodeList(); + ListCell *workerNodeCell = NULL; + char *nodeUser = CitusExtensionOwnerName(); + ListCell *commandCell = NULL; + + if (XactModificationLevel > XACT_MODIFICATION_NONE) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot open new connections after the first modification " + "command within a transaction"))); + } + + /* run commands serially */ + foreach(workerNodeCell, workerNodeList) + { + MultiConnection *workerConnection = NULL; + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + int connectionFlags = FORCE_NEW_CONNECTION; + + if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata) + { + continue; + } + + workerConnection = GetNodeUserDatabaseConnection(connectionFlags, nodeName, + nodePort, nodeUser, NULL); + + /* iterate over the commands and execute them in the same connection */ + foreach(commandCell, commandList) + { + char *commandString = lfirst(commandCell); + + ExecuteCriticalRemoteCommand(workerConnection, commandString); + } + + CloseConnection(workerConnection); + } +} + + /* * SendCommandToWorkersParams sends a command to all workers in parallel. * Commands are committed on the workers when the local transaction commits. The diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 871c1ac4c..a63f12a78 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -30,6 +30,8 @@ typedef enum TargetWorkerSet extern List * GetWorkerTransactions(void); extern void SendCommandToWorker(char *nodeName, int32 nodePort, char *command); extern void SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command); +extern void SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, + List *commandList); extern void SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command, int parameterCount, const Oid *parameterTypes, const char *const *parameterValues); diff --git a/src/test/regress/expected/multi_mx_ddl.out b/src/test/regress/expected/multi_mx_ddl.out index df9d775ae..2572e532f 100644 --- a/src/test/regress/expected/multi_mx_ddl.out +++ b/src/test/regress/expected/multi_mx_ddl.out @@ -18,6 +18,7 @@ SELECT * FROM mx_ddl_table ORDER BY key; CREATE INDEX ddl_test_index ON mx_ddl_table(value); NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +CREATE INDEX CONCURRENTLY ddl_test_concurrent_index ON mx_ddl_table(value); -- ADD COLUMN ALTER TABLE mx_ddl_table ADD COLUMN version INTEGER; -- SET DEFAULT @@ -40,6 +41,7 @@ ALTER TABLE mx_ddl_table ALTER COLUMN version SET NOT NULL; version | integer | not null default 1 Indexes: "mx_ddl_table_pkey" PRIMARY KEY, btree (key) + "ddl_test_concurrent_index" btree (value) "ddl_test_index" btree (value) \c - - - :worker_1_port @@ -52,9 +54,21 @@ Indexes: version | integer | not null default 1 Indexes: "mx_ddl_table_pkey" PRIMARY KEY, btree (key) + "ddl_test_concurrent_index" btree (value) "ddl_test_index" btree (value) -\d mx_ddl_table_1600000 +\d mx_ddl_table_1220088 + Table "public.mx_ddl_table_1220088" + Column | Type | Modifiers +---------+---------+-------------------- + key | integer | not null + value | integer | + version | integer | not null default 1 +Indexes: + "mx_ddl_table_pkey_1220088" PRIMARY KEY, btree (key) + "ddl_test_concurrent_index_1220088" btree (value) + "ddl_test_index_1220088" btree (value) + \c - - - :worker_2_port \d mx_ddl_table Table "public.mx_ddl_table" @@ -65,9 +79,21 @@ Indexes: version | integer | not null default 1 Indexes: "mx_ddl_table_pkey" PRIMARY KEY, btree (key) + "ddl_test_concurrent_index" btree (value) "ddl_test_index" btree (value) -\d mx_ddl_table_1600001 +\d mx_ddl_table_1220089 + Table "public.mx_ddl_table_1220089" + Column | Type | Modifiers +---------+---------+-------------------- + key | integer | not null + value | integer | + version | integer | not null default 1 +Indexes: + "mx_ddl_table_pkey_1220089" PRIMARY KEY, btree (key) + "ddl_test_concurrent_index_1220089" btree (value) + "ddl_test_index_1220089" btree (value) + INSERT INTO mx_ddl_table VALUES (37, 78, 2); INSERT INTO mx_ddl_table VALUES (38, 78); -- Switch to the coordinator @@ -100,6 +126,7 @@ SELECT * FROM mx_ddl_table ORDER BY key; DROP INDEX ddl_test_index; NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +DROP INDEX CONCURRENTLY ddl_test_concurrent_index; -- DROP DEFAULT ALTER TABLE mx_ddl_table ALTER COLUMN version DROP DEFAULT; -- DROP NOT NULL @@ -126,7 +153,15 @@ Indexes: Indexes: "mx_ddl_table_pkey" PRIMARY KEY, btree (key) -\d mx_ddl_table_1600000 +\d mx_ddl_table_1220088 +Table "public.mx_ddl_table_1220088" + Column | Type | Modifiers +--------+---------+----------- + key | integer | not null + value | integer | +Indexes: + "mx_ddl_table_pkey_1220088" PRIMARY KEY, btree (key) + \c - - - :worker_2_port \d mx_ddl_table Table "public.mx_ddl_table" @@ -137,7 +172,15 @@ Indexes: Indexes: "mx_ddl_table_pkey" PRIMARY KEY, btree (key) -\d mx_ddl_table_1600001 +\d mx_ddl_table_1220089 +Table "public.mx_ddl_table_1220089" + Column | Type | Modifiers +--------+---------+----------- + key | integer | not null + value | integer | +Indexes: + "mx_ddl_table_pkey_1220089" PRIMARY KEY, btree (key) + -- Show that DDL commands are done within a two-phase commit transaction \c - - - :master_port SET client_min_messages TO debug2; diff --git a/src/test/regress/sql/multi_mx_ddl.sql b/src/test/regress/sql/multi_mx_ddl.sql index 0afa624fd..f0dd5cacf 100644 --- a/src/test/regress/sql/multi_mx_ddl.sql +++ b/src/test/regress/sql/multi_mx_ddl.sql @@ -8,6 +8,8 @@ SELECT * FROM mx_ddl_table ORDER BY key; -- CREATE INDEX CREATE INDEX ddl_test_index ON mx_ddl_table(value); +CREATE INDEX CONCURRENTLY ddl_test_concurrent_index ON mx_ddl_table(value); + -- ADD COLUMN ALTER TABLE mx_ddl_table ADD COLUMN version INTEGER; @@ -27,13 +29,13 @@ ALTER TABLE mx_ddl_table ALTER COLUMN version SET NOT NULL; \d mx_ddl_table -\d mx_ddl_table_1600000 +\d mx_ddl_table_1220088 \c - - - :worker_2_port \d mx_ddl_table -\d mx_ddl_table_1600001 +\d mx_ddl_table_1220089 INSERT INTO mx_ddl_table VALUES (37, 78, 2); INSERT INTO mx_ddl_table VALUES (38, 78); @@ -56,6 +58,8 @@ SELECT * FROM mx_ddl_table ORDER BY key; -- DROP INDEX DROP INDEX ddl_test_index; +DROP INDEX CONCURRENTLY ddl_test_concurrent_index; + -- DROP DEFAULT ALTER TABLE mx_ddl_table ALTER COLUMN version DROP DEFAULT; @@ -73,13 +77,13 @@ ALTER TABLE mx_ddl_table DROP COLUMN version; \d mx_ddl_table -\d mx_ddl_table_1600000 +\d mx_ddl_table_1220088 \c - - - :worker_2_port \d mx_ddl_table -\d mx_ddl_table_1600001 +\d mx_ddl_table_1220089 -- Show that DDL commands are done within a two-phase commit transaction \c - - - :master_port