pull/7009/merge
Hanefi Onaldi 2023-11-09 17:30:53 -08:00 committed by GitHub
commit 32e00ee307
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 754 additions and 48 deletions

View File

@ -154,6 +154,17 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand,
* index statements.
*/
LOCKMODE lockMode = GetCreateIndexRelationLockMode(createIndexStatement);
/*
* Acquire global lock to prevent concurrent writes or DDL. However,
* we do not bother for CREATE INDEX CONCURRENTLY, since we'll have
* to release the lock.
*/
if (!createIndexStatement->concurrent)
{
AcquireDistributedLockOnRelations(list_make1(relationRangeVar), lockMode, 0);
}
Relation relation = table_openrv(relationRangeVar, lockMode);
/*
@ -234,6 +245,10 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand,
SwitchToSequentialAndLocalExecutionIfIndexNameTooLong(createIndexStatement);
DDLJob *ddlJob = GenerateCreateIndexDDLJob(createIndexStatement, createIndexCommand);
/* we have taken appropriate locks */
ddlJob->allowedOnWorkerNode = true;
return list_make1(ddlJob);
}
@ -541,11 +556,9 @@ ReindexStmtFindRelationOid(ReindexStmt *reindexStmt, bool missingOk)
Oid relationId = InvalidOid;
LOCKMODE lockmode = IsReindexWithParam_compat(reindexStmt, "concurrently") ?
ShareUpdateExclusiveLock : AccessExclusiveLock;
if (reindexStmt->kind == REINDEX_OBJECT_INDEX)
{
LOCKMODE lockmode = GetReindexIndexRelationLockMode(reindexStmt);
struct ReindexIndexCallbackState state;
state.concurrent = IsReindexWithParam_compat(reindexStmt,
"concurrently");
@ -559,6 +572,7 @@ ReindexStmtFindRelationOid(ReindexStmt *reindexStmt, bool missingOk)
}
else
{
LOCKMODE lockmode = GetReindexTableRelationLockMode(reindexStmt);
relationId = RangeVarGetRelidExtended(reindexStmt->relation, lockmode,
(missingOk) ? RVR_MISSING_OK : 0,
RangeVarCallbackOwnsTable, NULL);
@ -568,6 +582,35 @@ ReindexStmtFindRelationOid(ReindexStmt *reindexStmt, bool missingOk)
}
/*
* GetReindexRelationLockMode returns required lock mode to open the
* index that given REINDEX INDEX command operates on.
*/
LOCKMODE
GetReindexIndexRelationLockMode(ReindexStmt *reindexStmt)
{
if (IsReindexWithParam_compat(reindexStmt, "concurrently"))
{
return ShareUpdateExclusiveLock;
}
else
{
return AccessExclusiveLock;
}
}
/*
* GetReindexTableLockMode returns required lock mode to open the
* relation that given REINDEX TABLE command operates on.
*/
LOCKMODE
GetReindexTableRelationLockMode(ReindexStmt *reindexStmt)
{
return ShareLock;
}
/*
* PreprocessReindexStmt determines whether a given REINDEX statement involves
* a distributed table. If so (and if the statement does not use unsupported
@ -593,13 +636,32 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand,
Oid relationId = ReindexStmtFindRelationOid(reindexStatement, false);
MemoryContext relationContext = NULL;
Relation relation = NULL;
if (reindexStatement->kind == REINDEX_OBJECT_INDEX)
{
LOCKMODE lockmode = GetReindexIndexRelationLockMode(reindexStatement);
/*
* Acquire global lock to prevent concurrent writes or DDL. However,
* we do not bother for REINDEX CONCURRENTLY, since we'll have
* to release the lock.
*/
if (!IsReindexWithParam_compat(reindexStatement, "concurrently"))
{
AcquireDistributedLockOnRelations(list_make1(reindexStatement->relation),
lockmode, 0);
}
Oid indOid = RangeVarGetRelid(reindexStatement->relation, NoLock, 0);
relation = index_open(indOid, NoLock);
}
else
{
LOCKMODE lockmode = GetReindexTableRelationLockMode(reindexStatement);
AcquireDistributedLockOnRelations(list_make1(reindexStatement->relation),
lockmode, 0);
relation = table_openrv(reindexStatement->relation, NoLock);
}
@ -646,6 +708,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand,
"concurrently");
ddlJob->metadataSyncCommand = reindexCommand;
ddlJob->taskList = CreateReindexTaskList(relationId, reindexStatement);
ddlJob->allowedOnWorkerNode = true;
ddlJobs = list_make1(ddlJob);
}
@ -741,6 +804,16 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand,
continue;
}
/*
* Acquire global lock to prevent concurrent writes or DDL. However,
* we do not bother for DROP INDEX CONCURRENTLY, since we'll have
* to release the lock.
*/
if (!dropIndexStatement->concurrent)
{
AcquireDistributedLockOnRelations(list_make1(rangeVar), lockmode, 0);
}
Oid relationId = IndexGetRelation(indexId, false);
bool isCitusRelation = IsCitusTable(relationId);
if (isCitusRelation)
@ -774,6 +847,7 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand,
ddlJob->metadataSyncCommand = dropIndexCommand;
ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId,
dropIndexStatement);
ddlJob->allowedOnWorkerNode = true;
ddlJobs = list_make1(ddlJob);
}
@ -793,12 +867,6 @@ PostprocessIndexStmt(Node *node, const char *queryString)
{
IndexStmt *indexStmt = castNode(IndexStmt, node);
/* this logic only applies to the coordinator */
if (!IsCoordinator())
{
return NIL;
}
/*
* We make sure schema name is not null in the PreprocessIndexStmt
*/
@ -1330,7 +1398,6 @@ void
MarkIndexValid(IndexStmt *indexStmt)
{
Assert(indexStmt->concurrent);
Assert(IsCoordinator());
/*
* We make sure schema name is not null in the PreprocessIndexStmt

View File

@ -337,7 +337,7 @@ PreprocessDropSequenceStmt(Node *node, const char *queryString,
(void *) dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands);
return NodeDDLTaskList(OTHER_METADATA_NODES, commands);
}
@ -403,7 +403,7 @@ PreprocessRenameSequenceStmt(Node *node, const char *queryString, ProcessUtility
List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands);
return NodeDDLTaskList(OTHER_METADATA_NODES, commands);
}
@ -579,7 +579,7 @@ PreprocessAlterSequenceSchemaStmt(Node *node, const char *queryString,
List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands);
return NodeDDLTaskList(OTHER_METADATA_NODES, commands);
}
@ -689,7 +689,7 @@ PreprocessAlterSequenceOwnerStmt(Node *node, const char *queryString,
List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands);
return NodeDDLTaskList(OTHER_METADATA_NODES, commands);
}
@ -776,7 +776,7 @@ PreprocessAlterSequencePersistenceStmt(Node *node, const char *queryString,
List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands);
return NodeDDLTaskList(OTHER_METADATA_NODES, commands);
}
@ -912,7 +912,7 @@ PreprocessGrantOnSequenceStmt(Node *node, const char *queryString,
List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands);
return NodeDDLTaskList(OTHER_METADATA_NODES, commands);
}

View File

@ -1115,7 +1115,24 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
{
bool shouldSyncMetadata = false;
if (!ddlJob->allowedOnWorkerNode)
{
EnsureCoordinator();
}
if (!IsCoordinator() && !CoordinatorAddedAsWorkerNode() &&
!EnableAcquiringUnsafeLockFromWorkers)
{
ereport(ERROR,
(errmsg(
"Cannot execute DDL command from a worker node since the "
"coordinator is not in the metadata."),
errhint(
"Either run this command on the coordinator or add the coordinator "
"in the metadata by using: SELECT citus_set_coordinator_host('<hostname>', <port>);\n"
"Alternatively, though it is not recommended, you can allow this command by running: "
"SET citus.allow_unsafe_locks_from_workers TO 'on';")));
}
ObjectAddress targetObjectAddress = ddlJob->targetObjectAddress;

View File

@ -4391,7 +4391,7 @@ SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context, List *comm
if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL)
{
List *metadataNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
List *metadataNodes = TargetWorkerSetNodeList(OTHER_METADATA_NODES,
RowShareLock);
SendMetadataCommandListToWorkerListInCoordinatedTransaction(metadataNodes,
CurrentUserName(),

View File

@ -2390,7 +2390,7 @@ SetWorkerColumnOptional(WorkerNode *workerNode, int columnIndex, Datum value)
columnIndex,
value);
List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
List *workerNodeList = TargetWorkerSetNodeList(OTHER_METADATA_NODES,
ShareLock);
/* open connections in parallel */

View File

@ -157,24 +157,32 @@ SendCommandListToWorkersWithMetadata(List *commands)
List *
TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
{
List *workerNodeList = NIL;
if (targetWorkerSet == ALL_SHARD_NODES || targetWorkerSet == METADATA_NODES)
{
workerNodeList = ActivePrimaryNodeList(lockMode);
}
else
{
workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode);
}
List *workerNodeList = ActivePrimaryNodeList(lockMode);
List *result = NIL;
int localGroupId = GetLocalGroupId();
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
if ((targetWorkerSet == NON_COORDINATOR_METADATA_NODES || targetWorkerSet ==
METADATA_NODES) &&
if ((targetWorkerSet == OTHER_METADATA_NODES ||
targetWorkerSet == METADATA_NODES) &&
!workerNode->hasMetadata)
{
/* only interested in metadata nodes */
continue;
}
if (targetWorkerSet == OTHER_METADATA_NODES &&
workerNode->groupId == localGroupId)
{
/* only interested in other metadata nodes */
continue;
}
if (targetWorkerSet == NON_COORDINATOR_NODES &&
workerNode->groupId == COORDINATOR_GROUP_ID)
{
/* only interested in worker nodes */
continue;
}
@ -195,7 +203,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
void
SendBareCommandListToMetadataWorkers(List *commandList)
{
TargetWorkerSet targetWorkerSet = NON_COORDINATOR_METADATA_NODES;
TargetWorkerSet targetWorkerSet = OTHER_METADATA_NODES;
List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, RowShareLock);
char *nodeUser = CurrentUserName();
@ -236,12 +244,12 @@ SendCommandToMetadataWorkersParams(const char *command,
const Oid *parameterTypes,
const char *const *parameterValues)
{
List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
List *workerNodeList = TargetWorkerSetNodeList(OTHER_METADATA_NODES,
RowShareLock);
ErrorIfAnyMetadataNodeOutOfSync(workerNodeList);
SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user,
SendCommandToWorkersParamsInternal(OTHER_METADATA_NODES, command, user,
parameterCount, parameterTypes,
parameterValues);
}

View File

@ -405,6 +405,8 @@ extern char * ChooseIndexName(const char *tabname, Oid namespaceId,
extern char * ChooseIndexNameAddition(List *colnames);
extern List * ChooseIndexColumnNames(List *indexElems);
extern LOCKMODE GetCreateIndexRelationLockMode(IndexStmt *createIndexStatement);
extern LOCKMODE GetReindexTableRelationLockMode(ReindexStmt *reindexStmt);
extern LOCKMODE GetReindexIndexRelationLockMode(ReindexStmt *reindexStmt);
extern List * PreprocessReindexStmt(Node *ReindexStatement,
const char *ReindexCommand,
ProcessUtilityContext processUtilityContext);

View File

@ -74,6 +74,9 @@ typedef struct DDLJob
const char *metadataSyncCommand;
List *taskList; /* worker DDL tasks to execute */
/* whether the DDL can be executed from a worker */
bool allowedOnWorkerNode;
} DDLJob;
extern ProcessUtility_hook_type PrevProcessUtility;

View File

@ -24,10 +24,9 @@
typedef enum TargetWorkerSet
{
/*
* All the active primary nodes in the metadata which have metadata
* except the coordinator
* All other active primary nodes in the metadata which have metadata.
*/
NON_COORDINATOR_METADATA_NODES,
OTHER_METADATA_NODES,
/*
* All the active primary nodes in the metadata except the coordinator

View File

@ -290,3 +290,78 @@ stop_session_level_connection_to_node
(1 row)
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy s2-start-session-level-connection s2-worker-create-index-concurrently s1-commit-worker s2-stop-connection s3-select-count s1-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s1-copy:
SELECT run_commands_on_session_level_connection_to_node('COPY copy_table FROM PROGRAM ''echo 5, 50 && echo 6, 60 && echo 7, 70''WITH CSV');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-worker-create-index-concurrently:
SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX CONCURRENTLY copy_table_index ON copy_table(id)');
<waiting ...>
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-worker-create-index-concurrently: <... completed>
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s3-select-count:
SELECT COUNT(*) FROM copy_table;
count
---------------------------------------------------------------------
8
(1 row)
step s1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)

View File

@ -137,6 +137,72 @@ stop_session_level_connection_to_node
(1 row)
starting permutation: s1-start-session-level-connection s1-index-worker s2-start-session-level-connection s2-begin-on-worker s2-select-for-update s1-commit-worker s2-commit-worker s2-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s1-index-worker:
SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX dist_table_index ON dist_table (id)');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-select-for-update:
SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM dist_table WHERE id = 5 FOR UPDATE');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-select-for-update s2-start-session-level-connection s2-begin-on-worker s2-select-for-update s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);

View File

@ -68,8 +68,9 @@ step s2-begin: BEGIN;
step s1-drop: DROP TABLE drop_hash;
step s2-ddl-create-index: CREATE INDEX drop_hash_index ON drop_hash(id); <waiting ...>
step s1-commit: COMMIT;
s2: WARNING: relation "drop_tests.drop_hash" does not exist
step s2-ddl-create-index: <... completed>
ERROR: relation "drop_hash" does not exist
ERROR: failure on connection marked as essential: localhost:xxxxx
step s2-commit: COMMIT;
step s1-select-count: SELECT COUNT(*) FROM drop_hash;
ERROR: relation "drop_hash" does not exist

View File

@ -0,0 +1,294 @@
Parsed test spec with 4 sessions
starting permutation: w1-start-session-level-connection w1-begin-on-worker w2-start-session-level-connection w2-begin-on-worker w1-create-named-index w2-create-named-index w1-commit-worker w2-commit-worker w1-stop-connection w2-stop-connection coord-print-index-count
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step w1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w2-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w1-create-named-index:
SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX dist_table_index ON dist_table(id)');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w2-create-named-index:
SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX dist_table_index_2 ON dist_table(id)');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w2-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step coord-print-index-count:
SELECT
result
FROM
run_command_on_placements('dist_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
result
---------------------------------------------------------------------
2
2
2
2
(4 rows)
citus_remove_node
---------------------------------------------------------------------
(1 row)
starting permutation: w1-start-session-level-connection w1-begin-on-worker w2-start-session-level-connection w2-begin-on-worker w1-create-unnamed-index w2-create-unnamed-index w1-commit-worker w1-stop-connection w2-stop-connection coord-print-index-count
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step w1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w2-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w1-create-unnamed-index:
SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX ON dist_table(id,data)');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w2-create-unnamed-index:
SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX ON dist_table(id,data)');
<waiting ...>
step w1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w2-create-unnamed-index: <... completed>
ERROR: duplicate key value violates unique constraint "pg_class_relname_nsp_index"
step w1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step coord-print-index-count:
SELECT
result
FROM
run_command_on_placements('dist_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
result
---------------------------------------------------------------------
1
1
1
1
(4 rows)
citus_remove_node
---------------------------------------------------------------------
(1 row)
starting permutation: w1-start-session-level-connection w1-create-named-index w1-begin-on-worker w1-delete coord-begin coord-take-lock w1-reindex deadlock-checker-call coord-rollback w1-commit-worker w1-stop-connection
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step w1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w1-create-named-index:
SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX dist_table_index ON dist_table(id)');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w1-delete:
SELECT run_commands_on_session_level_connection_to_node('DELETE FROM dist_table');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step coord-begin:
BEGIN;
step coord-take-lock:
LOCK TABLE dist_table IN ACCESS EXCLUSIVE MODE;
<waiting ...>
step w1-reindex:
SELECT run_commands_on_session_level_connection_to_node('REINDEX INDEX dist_table_index');
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
---------------------------------------------------------------------
t
(1 row)
step coord-take-lock: <... completed>
ERROR: canceling the transaction since it was involved in a distributed deadlock
step w1-reindex: <... completed>
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step coord-rollback:
ROLLBACK;
step w1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step w1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
citus_remove_node
---------------------------------------------------------------------
(1 row)

View File

@ -140,7 +140,7 @@ step s2-begin:
step s2-index:
CREATE INDEX test_index ON test_table(column1);
ERROR: must be owner of table test_table
ERROR: permission denied for table test_table
step s1-insert:
UPDATE test_table SET column2 = 1;

View File

@ -542,8 +542,16 @@ SET citus.log_remote_commands TO ON;
CREATE INDEX i4 ON parent_table(dist_col);
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET citus.enable_ddl_propagation TO 'off';
LOCK fix_idx_names.parent_table IN SHARE MODE;
SET citus.enable_ddl_propagation TO 'on'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET citus.enable_ddl_propagation TO 'off';
LOCK fix_idx_names.parent_table IN SHARE MODE;
SET citus.enable_ddl_propagation TO 'on'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'

View File

@ -97,9 +97,6 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_tabl
col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass)
(3 rows)
CREATE INDEX mx_test_index ON mx_table(col_2);
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
ALTER TABLE mx_table ADD COLUMN col_4 int;
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
@ -114,7 +111,6 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_tabl
col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass)
(3 rows)
\d mx_test_index
-- citus_drop_all_shards
SELECT citus_drop_all_shards('mx_table'::regclass, 'public', 'mx_table');
ERROR: operation is not allowed on this node
@ -161,6 +157,13 @@ SELECT master_remove_node('localhost', 5432);
(1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
NOTICE: dropping metadata on the node (localhost,57638)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass;
SELECT update_distributed_table_colocation('mx_table', colocate_with => 'mx_table_2');
@ -185,7 +188,7 @@ HINT: Connect to the coordinator and run it again.
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
hasmetadata
---------------------------------------------------------------------
t
f
(1 row)
-- stop_metadata_sync_to_node

View File

@ -199,12 +199,12 @@ SELECT success, result FROM run_command_on_all_nodes($$select count(*) from run_
(3 rows)
-- ddl commands are only allowed from the coordinator
SELECT success, result FROM run_command_on_all_nodes($$create index on run_command_on_all_nodes.test (x)$$);
SELECT success, result FROM run_command_on_all_nodes($$ALTER TABLE run_command_on_all_nodes.test ADD COLUMN z int$$);
success | result
---------------------------------------------------------------------
f | ERROR: operation is not allowed on this node
f | ERROR: operation is not allowed on this node
t | CREATE INDEX
t | ALTER TABLE
(3 rows)
DROP SCHEMA run_command_on_all_nodes CASCADE;

View File

@ -103,6 +103,7 @@ test: isolation_insert_select_vs_all_on_mx
test: isolation_ref_select_for_update_vs_all_on_mx
test: isolation_ref_update_delete_upsert_vs_all_on_mx
test: isolation_dis2ref_foreign_keys_on_mx
test: isolation_index_vs_all_on_mx
test: isolation_metadata_sync_deadlock
test: isolation_replicated_dist_on_mx
test: isolation_replicate_reference_tables_to_coordinator

View File

@ -83,6 +83,11 @@ step "s2-coordinator-create-index-concurrently"
CREATE INDEX CONCURRENTLY copy_table_index ON copy_table(id);
}
step "s2-worker-create-index-concurrently"
{
SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX CONCURRENTLY copy_table_index ON copy_table(id)');
}
step "s2-commit-worker"
{
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
@ -116,3 +121,4 @@ permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy" "
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy" "s2-begin" "s2-coordinator-drop" "s1-commit-worker" "s2-commit" "s1-stop-connection" "s3-select-count"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-select-for-update" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy" "s2-coordinator-create-index-concurrently" "s1-commit-worker" "s2-empty" "s3-select-count" "s1-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy" "s2-start-session-level-connection" "s2-worker-create-index-concurrently" "s1-commit-worker" "s2-stop-connection" "s3-select-count" "s1-stop-connection"

View File

@ -43,6 +43,11 @@ step "s1-index"
CREATE INDEX dist_table_index ON dist_table (id);
}
step "s1-index-worker"
{
SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX dist_table_index ON dist_table (id)');
}
step "s1-select-for-update"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM dist_table WHERE id = 5 FOR UPDATE');
@ -122,5 +127,6 @@ step "s3-select-count"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-alter" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count"
permutation "s1-begin" "s1-index" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-select-for-update" "s1-commit" "s2-commit-worker" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-index-worker" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-select-for-update" "s1-commit-worker" "s2-commit-worker" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select-for-update" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-select-for-update" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select-for-update" "s2-coordinator-create-index-concurrently"(*) "s2-empty" "s1-commit-worker" "s1-stop-connection"

View File

@ -0,0 +1,151 @@
#include "isolation_mx_common.include.spec"
setup
{
CREATE TABLE dist_table(id int, data int);
SELECT create_distributed_table('dist_table', 'id');
COPY dist_table FROM PROGRAM 'echo 1, 10 && echo 2, 20 && echo 3, 30 && echo 4, 40 && echo 5, 50' WITH CSV;
SELECT citus_set_coordinator_host('localhost', 57636);
}
teardown
{
DROP TABLE IF EXISTS dist_table CASCADE;
SELECT citus_remove_node('localhost', 57636);
}
session "w1"
step "w1-start-session-level-connection"
{
SELECT start_session_level_connection_to_node('localhost', 57637);
}
step "w1-begin-on-worker"
{
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
}
step "w1-create-named-index"
{
SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX dist_table_index ON dist_table(id)');
}
step "w1-create-unnamed-index"
{
SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX ON dist_table(id,data)');
}
step "w1-reindex"
{
SELECT run_commands_on_session_level_connection_to_node('REINDEX INDEX dist_table_index');
}
step "w1-delete"
{
SELECT run_commands_on_session_level_connection_to_node('DELETE FROM dist_table');
}
step "w1-commit-worker"
{
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
}
step "w1-stop-connection"
{
SELECT stop_session_level_connection_to_node();
}
session "w2"
step "w2-start-session-level-connection"
{
SELECT start_session_level_connection_to_node('localhost', 57638);
}
step "w2-begin-on-worker"
{
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
}
step "w2-create-named-index"
{
SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX dist_table_index_2 ON dist_table(id)');
}
step "w2-create-unnamed-index"
{
SELECT run_commands_on_session_level_connection_to_node('CREATE INDEX ON dist_table(id,data)');
}
step "w2-commit-worker"
{
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
}
step "w2-stop-connection"
{
SELECT stop_session_level_connection_to_node();
}
session "coord"
step "coord-begin"
{
BEGIN;
}
step "coord-rollback"
{
ROLLBACK;
}
step "coord-print-index-count"
{
SELECT
result
FROM
run_command_on_placements('dist_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
}
step "coord-take-lock"
{
LOCK TABLE dist_table IN ACCESS EXCLUSIVE MODE;
}
session "deadlock-checker"
step "deadlock-checker-call"
{
SELECT check_distributed_deadlocks();
}
permutation "w1-start-session-level-connection" "w1-begin-on-worker" // open transaction on worker 1
"w2-start-session-level-connection" "w2-begin-on-worker" // open transaction on worker 2
"w1-create-named-index" "w2-create-named-index" // create indexes with unique names on workers
"w1-commit-worker" "w2-commit-worker" // commit transactions on workers
"w1-stop-connection" "w2-stop-connection" // close connections to workers
"coord-print-index-count" // show indexes on coordinator
permutation "w1-start-session-level-connection" "w1-begin-on-worker" // open transaction on worker 1
"w2-start-session-level-connection" "w2-begin-on-worker" // open transaction on worker 2
"w1-create-unnamed-index" "w2-create-unnamed-index" // create unnamed indexes on workers
"w1-commit-worker" // commit transactions on worker 1 and see error on worker 2 due to name clash
"w1-stop-connection" "w2-stop-connection" // close connections to workers
"coord-print-index-count" // show indexes on coordinator
// the following permutation is expected to fail with a distributed deadlock
permutation "w1-start-session-level-connection" // start session on worker 1 only
"w1-create-named-index" // create index on worker 1
"w1-begin-on-worker" // open transaction block on worker 1
"w1-delete" // delete from table on worker 1
"coord-begin" // open transaction on coordinator to test distributed deadlock
"coord-take-lock" // take ACCESS EXCLUSIVE lock on table on coordinator, get blocked on worker 1
"w1-reindex" // reindex on worker that will acquire ACCESS EXCLUSIVE lock on table, create distributed deadlock
"deadlock-checker-call" // check that distributed deadlock is detected properly
"coord-rollback" // rollback transaction on coordinator to unblock
"w1-commit-worker" // commit transaction on worker 1
"w1-stop-connection" // close connection to worker 1

View File

@ -72,11 +72,9 @@ SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', tr
-- DDL commands
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass;
CREATE INDEX mx_test_index ON mx_table(col_2);
ALTER TABLE mx_table ADD COLUMN col_4 int;
ALTER TABLE mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY(col_1) REFERENCES mx_table(col_1);
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass;
\d mx_test_index
-- citus_drop_all_shards
SELECT citus_drop_all_shards('mx_table'::regclass, 'public', 'mx_table');
@ -98,6 +96,7 @@ SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
\c - - - :master_port
SELECT master_remove_node('localhost', 5432);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
\c - - - :worker_1_port

View File

@ -84,7 +84,7 @@ SELECT success, result FROM run_command_on_all_nodes($$insert into run_command_o
SELECT success, result FROM run_command_on_all_nodes($$select count(*) from run_command_on_all_nodes.test$$);
-- ddl commands are only allowed from the coordinator
SELECT success, result FROM run_command_on_all_nodes($$create index on run_command_on_all_nodes.test (x)$$);
SELECT success, result FROM run_command_on_all_nodes($$ALTER TABLE run_command_on_all_nodes.test ADD COLUMN z int$$);
DROP SCHEMA run_command_on_all_nodes CASCADE;