diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index aa0715372..1d7dbb448 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -151,6 +151,17 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand, * index statements. */ LOCKMODE lockMode = GetCreateIndexRelationLockMode(createIndexStatement); + + /* + * Acquire global lock to prevent concurrent writes or DDL. However, + * we do not bother for CREATE INDEX CONCURRENTLY, since we'll have + * to release the lock. + */ + if (!createIndexStatement->concurrent) + { + AcquireDistributedLockOnRelations(list_make1(relationRangeVar), lockMode, 0); + } + Relation relation = table_openrv(relationRangeVar, lockMode); /* @@ -231,6 +242,10 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand, SwitchToSequentialAndLocalExecutionIfIndexNameTooLong(createIndexStatement); DDLJob *ddlJob = GenerateCreateIndexDDLJob(createIndexStatement, createIndexCommand); + + /* we have taken appropriate locks */ + ddlJob->allowedOnWorkerNode = true; + return list_make1(ddlJob); } @@ -590,13 +605,30 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand, Oid relationId = ReindexStmtFindRelationOid(reindexStatement, false); MemoryContext relationContext = NULL; Relation relation = NULL; + LOCKMODE lockmode = IsReindexWithParam_compat(reindexStatement, "concurrently") ? + ShareUpdateExclusiveLock : AccessExclusiveLock; + if (reindexStatement->kind == REINDEX_OBJECT_INDEX) { + /* + * Acquire global lock to prevent concurrent writes or DDL. However, + * we do not bother for REINDEX CONCURRENTLY, since we'll have + * to release the lock. + */ + if (!IsReindexWithParam_compat(reindexStatement, "concurrently")) + { + AcquireDistributedLockOnRelations(list_make1(reindexStatement->relation), + lockmode, 0); + } + Oid indOid = RangeVarGetRelid(reindexStatement->relation, NoLock, 0); relation = index_open(indOid, NoLock); } else { + AcquireDistributedLockOnRelations(list_make1(reindexStatement->relation), + lockmode, 0); + relation = table_openrv(reindexStatement->relation, NoLock); } @@ -643,6 +675,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand, "concurrently"); ddlJob->metadataSyncCommand = reindexCommand; ddlJob->taskList = CreateReindexTaskList(relationId, reindexStatement); + ddlJob->allowedOnWorkerNode = true; ddlJobs = list_make1(ddlJob); } @@ -715,6 +748,16 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand, lockmode = ShareUpdateExclusiveLock; } + /* + * Acquire global lock to prevent concurrent writes or DDL. However, + * we do not bother for DROP INDEX CONCURRENTLY, since we'll have + * to release the lock. + */ + if (!dropIndexStatement->concurrent) + { + AcquireDistributedLockOnRelations(list_make1(rangeVar), lockmode, 0); + } + /* * The next few statements are based on RemoveRelations() in * commands/tablecmds.c in Postgres source. @@ -771,6 +814,7 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand, ddlJob->metadataSyncCommand = dropIndexCommand; ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId, dropIndexStatement); + ddlJob->allowedOnWorkerNode = true; ddlJobs = list_make1(ddlJob); } @@ -790,12 +834,6 @@ PostprocessIndexStmt(Node *node, const char *queryString) { IndexStmt *indexStmt = castNode(IndexStmt, node); - /* this logic only applies to the coordinator */ - if (!IsCoordinator()) - { - return NIL; - } - /* * We make sure schema name is not null in the PreprocessIndexStmt */ @@ -1327,7 +1365,6 @@ void MarkIndexValid(IndexStmt *indexStmt) { Assert(indexStmt->concurrent); - Assert(IsCoordinator()); /* * We make sure schema name is not null in the PreprocessIndexStmt diff --git a/src/backend/distributed/commands/sequence.c b/src/backend/distributed/commands/sequence.c index 9289dcd58..0180a615f 100644 --- a/src/backend/distributed/commands/sequence.c +++ b/src/backend/distributed/commands/sequence.c @@ -337,7 +337,7 @@ PreprocessDropSequenceStmt(Node *node, const char *queryString, (void *) dropStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } @@ -403,7 +403,7 @@ PreprocessRenameSequenceStmt(Node *node, const char *queryString, ProcessUtility List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } @@ -579,7 +579,7 @@ PreprocessAlterSequenceSchemaStmt(Node *node, const char *queryString, List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } @@ -689,7 +689,7 @@ PreprocessAlterSequenceOwnerStmt(Node *node, const char *queryString, List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } @@ -776,7 +776,7 @@ PreprocessAlterSequencePersistenceStmt(Node *node, const char *queryString, List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } @@ -912,7 +912,7 @@ PreprocessGrantOnSequenceStmt(Node *node, const char *queryString, List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 6393fdd71..aba1c5db1 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -1121,7 +1121,10 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) { bool shouldSyncMetadata = false; - EnsureCoordinator(); + if (!ddlJob->allowedOnWorkerNode) + { + EnsureCoordinator(); + } ObjectAddress targetObjectAddress = ddlJob->targetObjectAddress; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 5f8f76bd6..a69c4126e 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -4273,7 +4273,7 @@ SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context, List *comm if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL) { - List *metadataNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, + List *metadataNodes = TargetWorkerSetNodeList(OTHER_METADATA_NODES, RowShareLock); SendMetadataCommandListToWorkerListInCoordinatedTransaction(metadataNodes, CurrentUserName(), diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 60a5ab92b..376426f1a 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -2318,7 +2318,7 @@ SetWorkerColumnOptional(WorkerNode *workerNode, int columnIndex, Datum value) columnIndex, value); - List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, + List *workerNodeList = TargetWorkerSetNodeList(OTHER_METADATA_NODES, ShareLock); /* open connections in parallel */ diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index b4a497647..3ae12f33a 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -142,24 +142,32 @@ SendCommandToWorkersWithMetadataViaSuperUser(const char *command) List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { - List *workerNodeList = NIL; - if (targetWorkerSet == ALL_SHARD_NODES || targetWorkerSet == METADATA_NODES) - { - workerNodeList = ActivePrimaryNodeList(lockMode); - } - else - { - workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); - } + List *workerNodeList = ActivePrimaryNodeList(lockMode); List *result = NIL; + int localGroupId = GetLocalGroupId(); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) { - if ((targetWorkerSet == NON_COORDINATOR_METADATA_NODES || targetWorkerSet == - METADATA_NODES) && + if ((targetWorkerSet == OTHER_METADATA_NODES || + targetWorkerSet == METADATA_NODES) && !workerNode->hasMetadata) { + /* only interested in metadata nodes */ + continue; + } + + if (targetWorkerSet == OTHER_METADATA_NODES && + workerNode->groupId == localGroupId) + { + /* only interested in other metadata nodes */ + continue; + } + + if (targetWorkerSet == NON_COORDINATOR_NODES && + workerNode->groupId == COORDINATOR_GROUP_ID) + { + /* only interested in worker nodes */ continue; } @@ -180,7 +188,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) void SendBareCommandListToMetadataWorkers(List *commandList) { - TargetWorkerSet targetWorkerSet = NON_COORDINATOR_METADATA_NODES; + TargetWorkerSet targetWorkerSet = OTHER_METADATA_NODES; List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, RowShareLock); char *nodeUser = CurrentUserName(); @@ -221,12 +229,12 @@ SendCommandToMetadataWorkersParams(const char *command, const Oid *parameterTypes, const char *const *parameterValues) { - List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, + List *workerNodeList = TargetWorkerSetNodeList(OTHER_METADATA_NODES, RowShareLock); ErrorIfAnyMetadataNodeOutOfSync(workerNodeList); - SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user, + SendCommandToWorkersParamsInternal(OTHER_METADATA_NODES, command, user, parameterCount, parameterTypes, parameterValues); } diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 7229f7c72..e336ee929 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -74,6 +74,9 @@ typedef struct DDLJob const char *metadataSyncCommand; List *taskList; /* worker DDL tasks to execute */ + + /* whether the DDL can be executed from a worker */ + bool allowedOnWorkerNode; } DDLJob; extern ProcessUtility_hook_type PrevProcessUtility; diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index be8fe5ed6..d33ba7402 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -24,10 +24,9 @@ typedef enum TargetWorkerSet { /* - * All the active primary nodes in the metadata which have metadata - * except the coordinator + * All other active primary nodes in the metadata which have metadata. */ - NON_COORDINATOR_METADATA_NODES, + OTHER_METADATA_NODES, /* * All the active primary nodes in the metadata except the coordinator diff --git a/src/test/regress/expected/multi_fix_partition_shard_index_names.out b/src/test/regress/expected/multi_fix_partition_shard_index_names.out index e6795317c..46f81dcd7 100644 --- a/src/test/regress/expected/multi_fix_partition_shard_index_names.out +++ b/src/test/regress/expected/multi_fix_partition_shard_index_names.out @@ -542,8 +542,16 @@ SET citus.log_remote_commands TO ON; CREATE INDEX i4 ON parent_table(dist_col); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'; +LOCK fix_idx_names.parent_table IN SHARE MODE; +SET citus.enable_ddl_propagation TO 'on' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'; +LOCK fix_idx_names.parent_table IN SHARE MODE; +SET citus.enable_ddl_propagation TO 'on' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET citus.enable_ddl_propagation TO 'off' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET citus.enable_ddl_propagation TO 'off' diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index f8c3c8399..bd74a4f4a 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -97,9 +97,6 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_tabl col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass) (3 rows) -CREATE INDEX mx_test_index ON mx_table(col_2); -ERROR: operation is not allowed on this node -HINT: Connect to the coordinator and run it again. ALTER TABLE mx_table ADD COLUMN col_4 int; ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. @@ -114,7 +111,6 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_tabl col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass) (3 rows) -\d mx_test_index -- citus_drop_all_shards SELECT citus_drop_all_shards('mx_table'::regclass, 'public', 'mx_table'); ERROR: operation is not allowed on this node diff --git a/src/test/regress/expected/run_command_on_all_nodes.out b/src/test/regress/expected/run_command_on_all_nodes.out index e95989d84..a3029f069 100644 --- a/src/test/regress/expected/run_command_on_all_nodes.out +++ b/src/test/regress/expected/run_command_on_all_nodes.out @@ -199,12 +199,12 @@ SELECT success, result FROM run_command_on_all_nodes($$select count(*) from run_ (3 rows) -- ddl commands are only allowed from the coordinator -SELECT success, result FROM run_command_on_all_nodes($$create index on run_command_on_all_nodes.test (x)$$); +SELECT success, result FROM run_command_on_all_nodes($$ALTER TABLE run_command_on_all_nodes.test ADD COLUMN z int$$); success | result --------------------------------------------------------------------- f | ERROR: operation is not allowed on this node f | ERROR: operation is not allowed on this node - t | CREATE INDEX + t | ALTER TABLE (3 rows) DROP SCHEMA run_command_on_all_nodes CASCADE; diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index 6012ddbd5..3e41a5e77 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -72,11 +72,9 @@ SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', tr -- DDL commands SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass; -CREATE INDEX mx_test_index ON mx_table(col_2); ALTER TABLE mx_table ADD COLUMN col_4 int; ALTER TABLE mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY(col_1) REFERENCES mx_table(col_1); SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass; -\d mx_test_index -- citus_drop_all_shards SELECT citus_drop_all_shards('mx_table'::regclass, 'public', 'mx_table'); diff --git a/src/test/regress/sql/run_command_on_all_nodes.sql b/src/test/regress/sql/run_command_on_all_nodes.sql index 9b7b083af..339e8b630 100644 --- a/src/test/regress/sql/run_command_on_all_nodes.sql +++ b/src/test/regress/sql/run_command_on_all_nodes.sql @@ -84,7 +84,7 @@ SELECT success, result FROM run_command_on_all_nodes($$insert into run_command_o SELECT success, result FROM run_command_on_all_nodes($$select count(*) from run_command_on_all_nodes.test$$); -- ddl commands are only allowed from the coordinator -SELECT success, result FROM run_command_on_all_nodes($$create index on run_command_on_all_nodes.test (x)$$); +SELECT success, result FROM run_command_on_all_nodes($$ALTER TABLE run_command_on_all_nodes.test ADD COLUMN z int$$); DROP SCHEMA run_command_on_all_nodes CASCADE;