From 914137ab4a3d55789e1879a621eaf07524699d52 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 22 Jun 2022 16:37:09 +0200 Subject: [PATCH] Prototype of create/drop index via worker --- src/backend/distributed/commands/index.c | 48 ++++++++++++++++--- src/backend/distributed/commands/sequence.c | 10 ++-- .../distributed/commands/utility_hook.c | 5 +- .../distributed/metadata/node_metadata.c | 2 +- .../transaction/worker_transaction.c | 36 ++++++++------ .../distributed/commands/utility_hook.h | 3 ++ src/include/distributed/worker_transaction.h | 5 +- 7 files changed, 78 insertions(+), 31 deletions(-) diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index ac04d9701..476ccc55f 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -151,6 +151,17 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand, * index statements. */ LOCKMODE lockMode = GetCreateIndexRelationLockMode(createIndexStatement); + + /* + * Acquire global lock to prevent concurrent writes or DDL. However, + * we do not bother for CREATE INDEX CONCURRENTLY, since we'll have + * to release the lock. + */ + if (!createIndexStatement->concurrent) + { + AcquireDistributedLockOnRelations(list_make1(relationRangeVar), lockMode, 0); + } + Relation relation = table_openrv(relationRangeVar, lockMode); /* @@ -231,6 +242,10 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand, SwitchToSequentialAndLocalExecutionIfIndexNameTooLong(createIndexStatement); DDLJob *ddlJob = GenerateCreateIndexDDLJob(createIndexStatement, createIndexCommand); + + /* we have taken appropriate locks */ + ddlJob->allowedOnWorkerNode = true; + return list_make1(ddlJob); } @@ -555,6 +570,17 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand, "concurrently"); state.locked_table_oid = InvalidOid; + /* + * Acquire global lock to prevent concurrent writes or DDL. However, + * we do not bother for REINDEX CONCURRENTLY, since we'll have + * to release the lock. + */ + if (!state.concurrent) + { + AcquireDistributedLockOnRelations(list_make1(reindexStatement->relation), + lockmode, 0); + } + Oid indOid = RangeVarGetRelidExtended(reindexStatement->relation, lockmode, 0, RangeVarCallbackForReindexIndex, @@ -564,6 +590,9 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand, } else { + AcquireDistributedLockOnRelations(list_make1(reindexStatement->relation), + lockmode, 0); + RangeVarGetRelidExtended(reindexStatement->relation, lockmode, 0, RangeVarCallbackOwnsTable, NULL); @@ -614,6 +643,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand, "concurrently"); ddlJob->metadataSyncCommand = reindexCommand; ddlJob->taskList = CreateReindexTaskList(relationId, reindexStatement); + ddlJob->allowedOnWorkerNode = true; ddlJobs = list_make1(ddlJob); } @@ -662,6 +692,16 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand, lockmode = ShareUpdateExclusiveLock; } + /* + * Acquire global lock to prevent concurrent writes or DDL. However, + * we do not bother for DROP INDEX CONCURRENTLY, since we'll have + * to release the lock. + */ + if (!dropIndexStatement->concurrent) + { + AcquireDistributedLockOnRelations(list_make1(rangeVar), lockmode, 0); + } + /* * The next few statements are based on RemoveRelations() in * commands/tablecmds.c in Postgres source. @@ -718,6 +758,7 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand, ddlJob->metadataSyncCommand = dropIndexCommand; ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId, dropIndexStatement); + ddlJob->allowedOnWorkerNode = true; ddlJobs = list_make1(ddlJob); } @@ -737,12 +778,6 @@ PostprocessIndexStmt(Node *node, const char *queryString) { IndexStmt *indexStmt = castNode(IndexStmt, node); - /* this logic only applies to the coordinator */ - if (!IsCoordinator()) - { - return NIL; - } - /* * We make sure schema name is not null in the PreprocessIndexStmt */ @@ -1265,7 +1300,6 @@ void MarkIndexValid(IndexStmt *indexStmt) { Assert(indexStmt->concurrent); - Assert(IsCoordinator()); /* * We make sure schema name is not null in the PreprocessIndexStmt diff --git a/src/backend/distributed/commands/sequence.c b/src/backend/distributed/commands/sequence.c index fcf47deac..218fd0ac3 100644 --- a/src/backend/distributed/commands/sequence.c +++ b/src/backend/distributed/commands/sequence.c @@ -315,7 +315,7 @@ PreprocessDropSequenceStmt(Node *node, const char *queryString, (void *) dropStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } @@ -350,7 +350,7 @@ PreprocessRenameSequenceStmt(Node *node, const char *queryString, ProcessUtility List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } @@ -513,7 +513,7 @@ PreprocessAlterSequenceSchemaStmt(Node *node, const char *queryString, List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } @@ -615,7 +615,7 @@ PreprocessAlterSequenceOwnerStmt(Node *node, const char *queryString, List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } @@ -718,7 +718,7 @@ PreprocessGrantOnSequenceStmt(Node *node, const char *queryString, List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); + return NodeDDLTaskList(OTHER_METADATA_NODES, commands); } diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index fa78e70aa..a7694acd2 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -1130,7 +1130,10 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) { bool shouldSyncMetadata = false; - EnsureCoordinator(); + if (!ddlJob->allowedOnWorkerNode) + { + EnsureCoordinator(); + } ObjectAddress targetObjectAddress = ddlJob->targetObjectAddress; diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 23d4d3f4d..f631ca255 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -2216,7 +2216,7 @@ SetWorkerColumnOptional(WorkerNode *workerNode, int columnIndex, Datum value) columnIndex, value); - List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, + List *workerNodeList = TargetWorkerSetNodeList(OTHER_METADATA_NODES, ShareLock); /* open connections in parallel */ diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 6b4b1a351..f4a968dfd 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -148,24 +148,32 @@ SendCommandToWorkersWithMetadataViaSuperUser(const char *command) List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { - List *workerNodeList = NIL; - if (targetWorkerSet == ALL_SHARD_NODES || targetWorkerSet == METADATA_NODES) - { - workerNodeList = ActivePrimaryNodeList(lockMode); - } - else - { - workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); - } + List *workerNodeList = ActivePrimaryNodeList(lockMode); List *result = NIL; + int localGroupId = GetLocalGroupId(); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) { - if ((targetWorkerSet == NON_COORDINATOR_METADATA_NODES || targetWorkerSet == - METADATA_NODES) && + if ((targetWorkerSet == OTHER_METADATA_NODES || + targetWorkerSet == METADATA_NODES) && !workerNode->hasMetadata) { + /* only interested in metadata nodes */ + continue; + } + + if (targetWorkerSet == OTHER_METADATA_NODES && + workerNode->groupId == localGroupId) + { + /* only interested in other metadata nodes */ + continue; + } + + if (targetWorkerSet == NON_COORDINATOR_NODES && + workerNode->groupId == COORDINATOR_GROUP_ID) + { + /* only interested in worker nodes */ continue; } @@ -186,7 +194,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) void SendBareCommandListToMetadataWorkers(List *commandList) { - TargetWorkerSet targetWorkerSet = NON_COORDINATOR_METADATA_NODES; + TargetWorkerSet targetWorkerSet = OTHER_METADATA_NODES; List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, RowShareLock); char *nodeUser = CurrentUserName(); @@ -227,12 +235,12 @@ SendCommandToMetadataWorkersParams(const char *command, const Oid *parameterTypes, const char *const *parameterValues) { - List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, + List *workerNodeList = TargetWorkerSetNodeList(OTHER_METADATA_NODES, RowShareLock); ErrorIfAnyMetadataNodeOutOfSync(workerNodeList); - SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user, + SendCommandToWorkersParamsInternal(OTHER_METADATA_NODES, command, user, parameterCount, parameterTypes, parameterValues); } diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 1dd37ce1e..20251d537 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -74,6 +74,9 @@ typedef struct DDLJob const char *metadataSyncCommand; List *taskList; /* worker DDL tasks to execute */ + + /* whether the DDL can be executed from a worker */ + bool allowedOnWorkerNode; } DDLJob; extern ProcessUtility_hook_type PrevProcessUtility; diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 72b16acd5..2562a4a88 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -23,10 +23,9 @@ typedef enum TargetWorkerSet { /* - * All the active primary nodes in the metadata which have metadata - * except the coordinator + * All other active primary nodes in the metadata which have metadata. */ - NON_COORDINATOR_METADATA_NODES, + OTHER_METADATA_NODES, /* * All the active primary nodes in the metadata except the coordinator