diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index 2c3970078..c87150a8c 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -154,6 +154,17 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand, * index statements. */ LOCKMODE lockMode = GetCreateIndexRelationLockMode(createIndexStatement); + + /* + * Acquire global lock to prevent concurrent writes or DDL. However, + * we do not bother for CREATE INDEX CONCURRENTLY, since we'll have + * to release the lock. + */ + if (!createIndexStatement->concurrent) + { + AcquireDistributedLockOnRelations(list_make1(relationRangeVar), lockMode, 0); + } + Relation relation = table_openrv(relationRangeVar, lockMode); /* @@ -234,6 +245,10 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand, SwitchToSequentialAndLocalExecutionIfIndexNameTooLong(createIndexStatement); DDLJob *ddlJob = GenerateCreateIndexDDLJob(createIndexStatement, createIndexCommand); + + /* we have taken appropriate locks */ + ddlJob->allowedOnWorkerNode = true; + return list_make1(ddlJob); } @@ -541,11 +556,9 @@ ReindexStmtFindRelationOid(ReindexStmt *reindexStmt, bool missingOk) Oid relationId = InvalidOid; - LOCKMODE lockmode = IsReindexWithParam_compat(reindexStmt, "concurrently") ? - ShareUpdateExclusiveLock : AccessExclusiveLock; - if (reindexStmt->kind == REINDEX_OBJECT_INDEX) { + LOCKMODE lockmode = GetReindexIndexRelationLockMode(reindexStmt); struct ReindexIndexCallbackState state; state.concurrent = IsReindexWithParam_compat(reindexStmt, "concurrently"); @@ -559,6 +572,7 @@ ReindexStmtFindRelationOid(ReindexStmt *reindexStmt, bool missingOk) } else { + LOCKMODE lockmode = GetReindexTableRelationLockMode(reindexStmt); relationId = RangeVarGetRelidExtended(reindexStmt->relation, lockmode, (missingOk) ? RVR_MISSING_OK : 0, RangeVarCallbackOwnsTable, NULL); @@ -568,6 +582,35 @@ ReindexStmtFindRelationOid(ReindexStmt *reindexStmt, bool missingOk) } +/* + * GetReindexRelationLockMode returns required lock mode to open the + * index that given REINDEX INDEX command operates on. + */ +LOCKMODE +GetReindexIndexRelationLockMode(ReindexStmt *reindexStmt) +{ + if (IsReindexWithParam_compat(reindexStmt, "concurrently")) + { + return ShareUpdateExclusiveLock; + } + else + { + return AccessExclusiveLock; + } +} + + +/* + * GetReindexTableLockMode returns required lock mode to open the + * relation that given REINDEX TABLE command operates on. + */ +LOCKMODE +GetReindexTableRelationLockMode(ReindexStmt *reindexStmt) +{ + return ShareLock; +} + + /* * PreprocessReindexStmt determines whether a given REINDEX statement involves * a distributed table. If so (and if the statement does not use unsupported @@ -593,13 +636,32 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand, Oid relationId = ReindexStmtFindRelationOid(reindexStatement, false); MemoryContext relationContext = NULL; Relation relation = NULL; + if (reindexStatement->kind == REINDEX_OBJECT_INDEX) { + LOCKMODE lockmode = GetReindexIndexRelationLockMode(reindexStatement); + + /* + * Acquire global lock to prevent concurrent writes or DDL. However, + * we do not bother for REINDEX CONCURRENTLY, since we'll have + * to release the lock. + */ + if (!IsReindexWithParam_compat(reindexStatement, "concurrently")) + { + AcquireDistributedLockOnRelations(list_make1(reindexStatement->relation), + lockmode, 0); + } + Oid indOid = RangeVarGetRelid(reindexStatement->relation, NoLock, 0); relation = index_open(indOid, NoLock); } else { + LOCKMODE lockmode = GetReindexTableRelationLockMode(reindexStatement); + + AcquireDistributedLockOnRelations(list_make1(reindexStatement->relation), + lockmode, 0); + relation = table_openrv(reindexStatement->relation, NoLock); } @@ -646,6 +708,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand, "concurrently"); ddlJob->metadataSyncCommand = reindexCommand; ddlJob->taskList = CreateReindexTaskList(relationId, reindexStatement); + ddlJob->allowedOnWorkerNode = true; ddlJobs = list_make1(ddlJob); } @@ -741,6 +804,16 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand, continue; } + /* + * Acquire global lock to prevent concurrent writes or DDL. However, + * we do not bother for DROP INDEX CONCURRENTLY, since we'll have + * to release the lock. + */ + if (!dropIndexStatement->concurrent) + { + AcquireDistributedLockOnRelations(list_make1(rangeVar), lockmode, 0); + } + Oid relationId = IndexGetRelation(indexId, false); bool isCitusRelation = IsCitusTable(relationId); if (isCitusRelation) @@ -774,6 +847,7 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand, ddlJob->metadataSyncCommand = dropIndexCommand; ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId, dropIndexStatement); + ddlJob->allowedOnWorkerNode = true; ddlJobs = list_make1(ddlJob); } @@ -793,12 +867,6 @@ PostprocessIndexStmt(Node *node, const char *queryString) { IndexStmt *indexStmt = castNode(IndexStmt, node); - /* this logic only applies to the coordinator */ - if (!IsCoordinator()) - { - return NIL; - } - /* * We make sure schema name is not null in the PreprocessIndexStmt */ @@ -1330,7 +1398,6 @@ void MarkIndexValid(IndexStmt *indexStmt) { Assert(indexStmt->concurrent); - Assert(IsCoordinator()); /* * We make sure schema name is not null in the PreprocessIndexStmt diff --git a/src/backend/distributed/commands/sequence.c b/src/backend/distributed/commands/sequence.c index 9ff586c8c..d2784129e 100644 --- a/src/backend/distributed/commands/sequence.c +++ b/src/backend/distributed/commands/sequence.c @@ -337,7 +337,7 @@ PreprocessDropSequenceStmt(Node *node, const char *queryString, (void *) dropStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } @@ -403,7 +403,7 @@ PreprocessRenameSequenceStmt(Node *node, const char *queryString, ProcessUtility List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } @@ -579,7 +579,7 @@ PreprocessAlterSequenceSchemaStmt(Node *node, const char *queryString, List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } @@ -689,7 +689,7 @@ PreprocessAlterSequenceOwnerStmt(Node *node, const char *queryString, List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } @@ -776,7 +776,7 @@ PreprocessAlterSequencePersistenceStmt(Node *node, const char *queryString, List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } @@ -912,7 +912,7 @@ PreprocessGrantOnSequenceStmt(Node *node, const char *queryString, List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index e8866773a..d6d0ec065 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -1115,7 +1115,24 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) { bool shouldSyncMetadata = false; - EnsureCoordinator(); + if (!ddlJob->allowedOnWorkerNode) + { + EnsureCoordinator(); + } + + if (!IsCoordinator() && !CoordinatorAddedAsWorkerNode() && + !EnableAcquiringUnsafeLockFromWorkers) + { + ereport(ERROR, + (errmsg( + "Cannot execute DDL command from a worker node since the " + "coordinator is not in the metadata."), + errhint( + "Either run this command on the coordinator or add the coordinator " + "in the metadata by using: SELECT citus_set_coordinator_host('', );\n" + "Alternatively, though it is not recommended, you can allow this command by running: " + "SET citus.allow_unsafe_locks_from_workers TO 'on';"))); + } ObjectAddress targetObjectAddress = ddlJob->targetObjectAddress; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 40bdae0ea..d6a1a5afc 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -4391,7 +4391,7 @@ SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context, List *comm if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL) { - List *metadataNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, + List *metadataNodes = TargetWorkerSetNodeList(OTHER_METADATA_NODES, RowShareLock); SendMetadataCommandListToWorkerListInCoordinatedTransaction(metadataNodes, CurrentUserName(), diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index a73f2e9d2..58acccb35 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -2390,7 +2390,7 @@ SetWorkerColumnOptional(WorkerNode *workerNode, int columnIndex, Datum value) columnIndex, value); - List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, + List *workerNodeList = TargetWorkerSetNodeList(OTHER_METADATA_NODES, ShareLock); /* open connections in parallel */ diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 03ecbea72..f3fd8a80e 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -157,24 +157,32 @@ SendCommandListToWorkersWithMetadata(List *commands) List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { - List *workerNodeList = NIL; - if (targetWorkerSet == ALL_SHARD_NODES || targetWorkerSet == METADATA_NODES) - { - workerNodeList = ActivePrimaryNodeList(lockMode); - } - else - { - workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); - } + List *workerNodeList = ActivePrimaryNodeList(lockMode); List *result = NIL; + int localGroupId = GetLocalGroupId(); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) { - if ((targetWorkerSet == NON_COORDINATOR_METADATA_NODES || targetWorkerSet == - METADATA_NODES) && + if ((targetWorkerSet == OTHER_METADATA_NODES || + targetWorkerSet == METADATA_NODES) && !workerNode->hasMetadata) { + /* only interested in metadata nodes */ + continue; + } + + if (targetWorkerSet == OTHER_METADATA_NODES && + workerNode->groupId == localGroupId) + { + /* only interested in other metadata nodes */ + continue; + } + + if (targetWorkerSet == NON_COORDINATOR_NODES && + workerNode->groupId == COORDINATOR_GROUP_ID) + { + /* only interested in worker nodes */ continue; } @@ -195,7 +203,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) void SendBareCommandListToMetadataWorkers(List *commandList) { - TargetWorkerSet targetWorkerSet = NON_COORDINATOR_METADATA_NODES; + TargetWorkerSet targetWorkerSet = OTHER_METADATA_NODES; List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, RowShareLock); char *nodeUser = CurrentUserName(); @@ -236,12 +244,12 @@ SendCommandToMetadataWorkersParams(const char *command, const Oid *parameterTypes, const char *const *parameterValues) { - List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, + List *workerNodeList = TargetWorkerSetNodeList(OTHER_METADATA_NODES, RowShareLock); ErrorIfAnyMetadataNodeOutOfSync(workerNodeList); - SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user, + SendCommandToWorkersParamsInternal(OTHER_METADATA_NODES, command, user, parameterCount, parameterTypes, parameterValues); } diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 43429278f..f211eb066 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -405,6 +405,8 @@ extern char * ChooseIndexName(const char *tabname, Oid namespaceId, extern char * ChooseIndexNameAddition(List *colnames); extern List * ChooseIndexColumnNames(List *indexElems); extern LOCKMODE GetCreateIndexRelationLockMode(IndexStmt *createIndexStatement); +extern LOCKMODE GetReindexTableRelationLockMode(ReindexStmt *reindexStmt); +extern LOCKMODE GetReindexIndexRelationLockMode(ReindexStmt *reindexStmt); extern List * PreprocessReindexStmt(Node *ReindexStatement, const char *ReindexCommand, ProcessUtilityContext processUtilityContext); diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index c474dcc43..31f391620 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -74,6 +74,9 @@ typedef struct DDLJob const char *metadataSyncCommand; List *taskList; /* worker DDL tasks to execute */ + + /* whether the DDL can be executed from a worker */ + bool allowedOnWorkerNode; } DDLJob; extern ProcessUtility_hook_type PrevProcessUtility; diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 631940edf..6dde1a7ee 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -24,10 +24,9 @@ typedef enum TargetWorkerSet { /* - * All the active primary nodes in the metadata which have metadata - * except the coordinator + * All other active primary nodes in the metadata which have metadata. */ - NON_COORDINATOR_METADATA_NODES, + OTHER_METADATA_NODES, /* * All the active primary nodes in the metadata except the coordinator diff --git a/src/test/regress/expected/isolation_copy_vs_all_on_mx.out b/src/test/regress/expected/isolation_copy_vs_all_on_mx.out index 76f36a753..df895c250 100644 --- a/src/test/regress/expected/isolation_copy_vs_all_on_mx.out +++ b/src/test/regress/expected/isolation_copy_vs_all_on_mx.out @@ -290,3 +290,78 @@ stop_session_level_connection_to_node (1 row) + +starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy s2-start-session-level-connection s2-worker-create-index-concurrently s1-commit-worker s2-stop-connection s3-select-count s1-stop-connection +step s1-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s1-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s1-copy: + SELECT run_commands_on_session_level_connection_to_node('COPY copy_table FROM PROGRAM ''echo 5, 50 && echo 6, 60 && echo 7, 70''WITH CSV'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-worker-create-index-concurrently: + SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX CONCURRENTLY copy_table_index ON copy_table(id)'); + +step s1-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-worker-create-index-concurrently: <... completed> +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s3-select-count: + SELECT COUNT(*) FROM copy_table; + +count +--------------------------------------------------------------------- + 8 +(1 row) + +step s1-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/expected/isolation_drop_alter_index_select_for_update_on_mx.out b/src/test/regress/expected/isolation_drop_alter_index_select_for_update_on_mx.out index ac67b311b..e05a59676 100644 --- a/src/test/regress/expected/isolation_drop_alter_index_select_for_update_on_mx.out +++ b/src/test/regress/expected/isolation_drop_alter_index_select_for_update_on_mx.out @@ -137,6 +137,72 @@ stop_session_level_connection_to_node (1 row) +starting permutation: s1-start-session-level-connection s1-index-worker s2-start-session-level-connection s2-begin-on-worker s2-select-for-update s1-commit-worker s2-commit-worker s2-stop-connection +step s1-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s1-index-worker: + SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX dist_table_index ON dist_table (id)'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-select-for-update: + SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM dist_table WHERE id = 5 FOR UPDATE'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s1-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + + starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-select-for-update s2-start-session-level-connection s2-begin-on-worker s2-select-for-update s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: SELECT start_session_level_connection_to_node('localhost', 57637); diff --git a/src/test/regress/expected/isolation_drop_vs_all.out b/src/test/regress/expected/isolation_drop_vs_all.out index 2c8912c21..7fec73e5f 100644 --- a/src/test/regress/expected/isolation_drop_vs_all.out +++ b/src/test/regress/expected/isolation_drop_vs_all.out @@ -68,8 +68,9 @@ step s2-begin: BEGIN; step s1-drop: DROP TABLE drop_hash; step s2-ddl-create-index: CREATE INDEX drop_hash_index ON drop_hash(id); step s1-commit: COMMIT; +s2: WARNING: relation "drop_tests.drop_hash" does not exist step s2-ddl-create-index: <... completed> -ERROR: relation "drop_hash" does not exist +ERROR: failure on connection marked as essential: localhost:xxxxx step s2-commit: COMMIT; step s1-select-count: SELECT COUNT(*) FROM drop_hash; ERROR: relation "drop_hash" does not exist diff --git a/src/test/regress/expected/isolation_index_vs_all_on_mx.out b/src/test/regress/expected/isolation_index_vs_all_on_mx.out new file mode 100644 index 000000000..9ce2a0755 --- /dev/null +++ b/src/test/regress/expected/isolation_index_vs_all_on_mx.out @@ -0,0 +1,294 @@ +Parsed test spec with 4 sessions + +starting permutation: w1-start-session-level-connection w1-begin-on-worker w2-start-session-level-connection w2-begin-on-worker w1-create-named-index w2-create-named-index w1-commit-worker w2-commit-worker w1-stop-connection w2-stop-connection coord-print-index-count +citus_set_coordinator_host +--------------------------------------------------------------------- + +(1 row) + +step w1-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w1-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w1-create-named-index: + SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX dist_table_index ON dist_table(id)'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w2-create-named-index: + SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX dist_table_index_2 ON dist_table(id)'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w1-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w2-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w1-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step coord-print-index-count: + SELECT + result + FROM + run_command_on_placements('dist_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''') + ORDER BY + nodeport; + +result +--------------------------------------------------------------------- + 2 + 2 + 2 + 2 +(4 rows) + +citus_remove_node +--------------------------------------------------------------------- + +(1 row) + + +starting permutation: w1-start-session-level-connection w1-begin-on-worker w2-start-session-level-connection w2-begin-on-worker w1-create-unnamed-index w2-create-unnamed-index w1-commit-worker w1-stop-connection w2-stop-connection coord-print-index-count +citus_set_coordinator_host +--------------------------------------------------------------------- + +(1 row) + +step w1-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w1-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w1-create-unnamed-index: + SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX ON dist_table(id,data)'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w2-create-unnamed-index: + SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX ON dist_table(id,data)'); + +step w1-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w2-create-unnamed-index: <... completed> +ERROR: duplicate key value violates unique constraint "pg_class_relname_nsp_index" +step w1-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step coord-print-index-count: + SELECT + result + FROM + run_command_on_placements('dist_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''') + ORDER BY + nodeport; + +result +--------------------------------------------------------------------- + 1 + 1 + 1 + 1 +(4 rows) + +citus_remove_node +--------------------------------------------------------------------- + +(1 row) + + +starting permutation: w1-start-session-level-connection w1-create-named-index w1-begin-on-worker w1-delete coord-begin coord-take-lock w1-reindex deadlock-checker-call coord-rollback w1-commit-worker w1-stop-connection +citus_set_coordinator_host +--------------------------------------------------------------------- + +(1 row) + +step w1-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w1-create-named-index: + SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX dist_table_index ON dist_table(id)'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w1-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w1-delete: + SELECT run_commands_on_session_level_connection_to_node('DELETE FROM dist_table'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step coord-begin: + BEGIN; + +step coord-take-lock: + LOCK TABLE dist_table IN ACCESS EXCLUSIVE MODE; + +step w1-reindex: + SELECT run_commands_on_session_level_connection_to_node('REINDEX INDEX dist_table_index'); + +step deadlock-checker-call: + SELECT check_distributed_deadlocks(); + +check_distributed_deadlocks +--------------------------------------------------------------------- +t +(1 row) + +step coord-take-lock: <... completed> +ERROR: canceling the transaction since it was involved in a distributed deadlock +step w1-reindex: <... completed> +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step coord-rollback: + ROLLBACK; + +step w1-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step w1-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +citus_remove_node +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/expected/isolation_multiuser_locking.out b/src/test/regress/expected/isolation_multiuser_locking.out index 943d579a4..996fb6c9d 100644 --- a/src/test/regress/expected/isolation_multiuser_locking.out +++ b/src/test/regress/expected/isolation_multiuser_locking.out @@ -140,7 +140,7 @@ step s2-begin: step s2-index: CREATE INDEX test_index ON test_table(column1); -ERROR: must be owner of table test_table +ERROR: permission denied for table test_table step s1-insert: UPDATE test_table SET column2 = 1; diff --git a/src/test/regress/expected/multi_fix_partition_shard_index_names.out b/src/test/regress/expected/multi_fix_partition_shard_index_names.out index e6795317c..46f81dcd7 100644 --- a/src/test/regress/expected/multi_fix_partition_shard_index_names.out +++ b/src/test/regress/expected/multi_fix_partition_shard_index_names.out @@ -542,8 +542,16 @@ SET citus.log_remote_commands TO ON; CREATE INDEX i4 ON parent_table(dist_col); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'; +LOCK fix_idx_names.parent_table IN SHARE MODE; +SET citus.enable_ddl_propagation TO 'on' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'; +LOCK fix_idx_names.parent_table IN SHARE MODE; +SET citus.enable_ddl_propagation TO 'on' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET citus.enable_ddl_propagation TO 'off' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET citus.enable_ddl_propagation TO 'off' diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index f8c3c8399..f93e57b1e 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -97,9 +97,6 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_tabl col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass) (3 rows) -CREATE INDEX mx_test_index ON mx_table(col_2); -ERROR: operation is not allowed on this node -HINT: Connect to the coordinator and run it again. ALTER TABLE mx_table ADD COLUMN col_4 int; ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. @@ -114,7 +111,6 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_tabl col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass) (3 rows) -\d mx_test_index -- citus_drop_all_shards SELECT citus_drop_all_shards('mx_table'::regclass, 'public', 'mx_table'); ERROR: operation is not allowed on this node @@ -161,6 +157,13 @@ SELECT master_remove_node('localhost', 5432); (1 row) +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); +NOTICE: dropping metadata on the node (localhost,57638) + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + \c - - - :worker_1_port UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass; SELECT update_distributed_table_colocation('mx_table', colocate_with => 'mx_table_2'); @@ -185,7 +188,7 @@ HINT: Connect to the coordinator and run it again. SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; hasmetadata --------------------------------------------------------------------- - t + f (1 row) -- stop_metadata_sync_to_node diff --git a/src/test/regress/expected/run_command_on_all_nodes.out b/src/test/regress/expected/run_command_on_all_nodes.out index e95989d84..a3029f069 100644 --- a/src/test/regress/expected/run_command_on_all_nodes.out +++ b/src/test/regress/expected/run_command_on_all_nodes.out @@ -199,12 +199,12 @@ SELECT success, result FROM run_command_on_all_nodes($$select count(*) from run_ (3 rows) -- ddl commands are only allowed from the coordinator -SELECT success, result FROM run_command_on_all_nodes($$create index on run_command_on_all_nodes.test (x)$$); +SELECT success, result FROM run_command_on_all_nodes($$ALTER TABLE run_command_on_all_nodes.test ADD COLUMN z int$$); success | result --------------------------------------------------------------------- f | ERROR: operation is not allowed on this node f | ERROR: operation is not allowed on this node - t | CREATE INDEX + t | ALTER TABLE (3 rows) DROP SCHEMA run_command_on_all_nodes CASCADE; diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index d8cc77c73..eaf228af2 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -103,6 +103,7 @@ test: isolation_insert_select_vs_all_on_mx test: isolation_ref_select_for_update_vs_all_on_mx test: isolation_ref_update_delete_upsert_vs_all_on_mx test: isolation_dis2ref_foreign_keys_on_mx +test: isolation_index_vs_all_on_mx test: isolation_metadata_sync_deadlock test: isolation_replicated_dist_on_mx test: isolation_replicate_reference_tables_to_coordinator diff --git a/src/test/regress/spec/isolation_copy_vs_all_on_mx.spec b/src/test/regress/spec/isolation_copy_vs_all_on_mx.spec index 2990bf5cd..2f4b702f6 100644 --- a/src/test/regress/spec/isolation_copy_vs_all_on_mx.spec +++ b/src/test/regress/spec/isolation_copy_vs_all_on_mx.spec @@ -83,6 +83,11 @@ step "s2-coordinator-create-index-concurrently" CREATE INDEX CONCURRENTLY copy_table_index ON copy_table(id); } +step "s2-worker-create-index-concurrently" +{ + SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX CONCURRENTLY copy_table_index ON copy_table(id)'); +} + step "s2-commit-worker" { SELECT run_commands_on_session_level_connection_to_node('COMMIT'); @@ -116,3 +121,4 @@ permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy" " permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy" "s2-begin" "s2-coordinator-drop" "s1-commit-worker" "s2-commit" "s1-stop-connection" "s3-select-count" permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-select-for-update" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count" permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy" "s2-coordinator-create-index-concurrently" "s1-commit-worker" "s2-empty" "s3-select-count" "s1-stop-connection" +permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy" "s2-start-session-level-connection" "s2-worker-create-index-concurrently" "s1-commit-worker" "s2-stop-connection" "s3-select-count" "s1-stop-connection" diff --git a/src/test/regress/spec/isolation_drop_alter_index_select_for_update_on_mx.spec b/src/test/regress/spec/isolation_drop_alter_index_select_for_update_on_mx.spec index 9d4086ac1..cc816c693 100644 --- a/src/test/regress/spec/isolation_drop_alter_index_select_for_update_on_mx.spec +++ b/src/test/regress/spec/isolation_drop_alter_index_select_for_update_on_mx.spec @@ -43,6 +43,11 @@ step "s1-index" CREATE INDEX dist_table_index ON dist_table (id); } +step "s1-index-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX dist_table_index ON dist_table (id)'); +} + step "s1-select-for-update" { SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM dist_table WHERE id = 5 FOR UPDATE'); @@ -122,5 +127,6 @@ step "s3-select-count" permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-alter" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count" permutation "s1-begin" "s1-index" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-select-for-update" "s1-commit" "s2-commit-worker" "s2-stop-connection" +permutation "s1-start-session-level-connection" "s1-index-worker" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-select-for-update" "s1-commit-worker" "s2-commit-worker" "s2-stop-connection" permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select-for-update" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-select-for-update" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select-for-update" "s2-coordinator-create-index-concurrently"(*) "s2-empty" "s1-commit-worker" "s1-stop-connection" diff --git a/src/test/regress/spec/isolation_index_vs_all_on_mx.spec b/src/test/regress/spec/isolation_index_vs_all_on_mx.spec new file mode 100644 index 000000000..8c8fd69a2 --- /dev/null +++ b/src/test/regress/spec/isolation_index_vs_all_on_mx.spec @@ -0,0 +1,151 @@ +#include "isolation_mx_common.include.spec" + +setup +{ + CREATE TABLE dist_table(id int, data int); + SELECT create_distributed_table('dist_table', 'id'); + COPY dist_table FROM PROGRAM 'echo 1, 10 && echo 2, 20 && echo 3, 30 && echo 4, 40 && echo 5, 50' WITH CSV; + SELECT citus_set_coordinator_host('localhost', 57636); +} + +teardown +{ + DROP TABLE IF EXISTS dist_table CASCADE; + SELECT citus_remove_node('localhost', 57636); +} + +session "w1" + +step "w1-start-session-level-connection" +{ + SELECT start_session_level_connection_to_node('localhost', 57637); +} + +step "w1-begin-on-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); +} + +step "w1-create-named-index" +{ + SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX dist_table_index ON dist_table(id)'); +} + +step "w1-create-unnamed-index" +{ + SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX ON dist_table(id,data)'); +} + +step "w1-reindex" +{ + SELECT run_commands_on_session_level_connection_to_node('REINDEX INDEX dist_table_index'); +} + +step "w1-delete" +{ + SELECT run_commands_on_session_level_connection_to_node('DELETE FROM dist_table'); +} + +step "w1-commit-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); +} + +step "w1-stop-connection" +{ + SELECT stop_session_level_connection_to_node(); +} + + +session "w2" + +step "w2-start-session-level-connection" +{ + SELECT start_session_level_connection_to_node('localhost', 57638); +} + +step "w2-begin-on-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); +} + +step "w2-create-named-index" +{ + SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX dist_table_index_2 ON dist_table(id)'); +} + +step "w2-create-unnamed-index" +{ + SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX ON dist_table(id,data)'); +} + +step "w2-commit-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); +} + +step "w2-stop-connection" +{ + SELECT stop_session_level_connection_to_node(); +} + +session "coord" + +step "coord-begin" +{ + BEGIN; +} + +step "coord-rollback" +{ + ROLLBACK; +} + +step "coord-print-index-count" +{ + SELECT + result + FROM + run_command_on_placements('dist_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''') + ORDER BY + nodeport; +} + +step "coord-take-lock" +{ + LOCK TABLE dist_table IN ACCESS EXCLUSIVE MODE; +} + +session "deadlock-checker" + +step "deadlock-checker-call" +{ + SELECT check_distributed_deadlocks(); +} + +permutation "w1-start-session-level-connection" "w1-begin-on-worker" // open transaction on worker 1 + "w2-start-session-level-connection" "w2-begin-on-worker" // open transaction on worker 2 + "w1-create-named-index" "w2-create-named-index" // create indexes with unique names on workers + "w1-commit-worker" "w2-commit-worker" // commit transactions on workers + "w1-stop-connection" "w2-stop-connection" // close connections to workers + "coord-print-index-count" // show indexes on coordinator + +permutation "w1-start-session-level-connection" "w1-begin-on-worker" // open transaction on worker 1 + "w2-start-session-level-connection" "w2-begin-on-worker" // open transaction on worker 2 + "w1-create-unnamed-index" "w2-create-unnamed-index" // create unnamed indexes on workers + "w1-commit-worker" // commit transactions on worker 1 and see error on worker 2 due to name clash + "w1-stop-connection" "w2-stop-connection" // close connections to workers + "coord-print-index-count" // show indexes on coordinator + +// the following permutation is expected to fail with a distributed deadlock +permutation "w1-start-session-level-connection" // start session on worker 1 only + "w1-create-named-index" // create index on worker 1 + "w1-begin-on-worker" // open transaction block on worker 1 + "w1-delete" // delete from table on worker 1 + "coord-begin" // open transaction on coordinator to test distributed deadlock + "coord-take-lock" // take ACCESS EXCLUSIVE lock on table on coordinator, get blocked on worker 1 + "w1-reindex" // reindex on worker that will acquire ACCESS EXCLUSIVE lock on table, create distributed deadlock + "deadlock-checker-call" // check that distributed deadlock is detected properly + "coord-rollback" // rollback transaction on coordinator to unblock + "w1-commit-worker" // commit transaction on worker 1 + "w1-stop-connection" // close connection to worker 1 diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index 6012ddbd5..1069dbf47 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -72,11 +72,9 @@ SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', tr -- DDL commands SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass; -CREATE INDEX mx_test_index ON mx_table(col_2); ALTER TABLE mx_table ADD COLUMN col_4 int; ALTER TABLE mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY(col_1) REFERENCES mx_table(col_1); SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass; -\d mx_test_index -- citus_drop_all_shards SELECT citus_drop_all_shards('mx_table'::regclass, 'public', 'mx_table'); @@ -98,6 +96,7 @@ SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; \c - - - :master_port SELECT master_remove_node('localhost', 5432); +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); \c - - - :worker_1_port diff --git a/src/test/regress/sql/run_command_on_all_nodes.sql b/src/test/regress/sql/run_command_on_all_nodes.sql index 9b7b083af..339e8b630 100644 --- a/src/test/regress/sql/run_command_on_all_nodes.sql +++ b/src/test/regress/sql/run_command_on_all_nodes.sql @@ -84,7 +84,7 @@ SELECT success, result FROM run_command_on_all_nodes($$insert into run_command_o SELECT success, result FROM run_command_on_all_nodes($$select count(*) from run_command_on_all_nodes.test$$); -- ddl commands are only allowed from the coordinator -SELECT success, result FROM run_command_on_all_nodes($$create index on run_command_on_all_nodes.test (x)$$); +SELECT success, result FROM run_command_on_all_nodes($$ALTER TABLE run_command_on_all_nodes.test ADD COLUMN z int$$); DROP SCHEMA run_command_on_all_nodes CASCADE;