From 0bd4002e5f9e92621de9716544c15316ccc2fd43 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Wed, 8 Jul 2020 20:15:34 +0300 Subject: [PATCH] rename TargetWorkerSet enums Rename TargetWorkerSet enums to make them more explicit about what they mean. Ideally it would be good to treat everything as a node without the 'worker' concept because it makes things complicated. Another improvement could be to rename TargetWorkerSet as TargetNodeSet but it goes to renaming many occurrences of Worker, which is probably too big for this PR. (cherry picked from commit de4b9569359e4f10d4ebf3fbcf7159ee6e2328db) --- src/backend/distributed/commands/collation.c | 10 +++++----- src/backend/distributed/commands/extension.c | 8 ++++---- src/backend/distributed/commands/function.c | 14 +++++++------- src/backend/distributed/commands/role.c | 4 ++-- src/backend/distributed/commands/schema.c | 2 +- src/backend/distributed/commands/type.c | 18 +++++++++--------- .../executor/repartition_join_execution.c | 5 +++-- .../transaction/worker_transaction.c | 17 ++++++----------- src/include/distributed/worker_transaction.h | 8 ++++---- 9 files changed, 41 insertions(+), 45 deletions(-) diff --git a/src/backend/distributed/commands/collation.c b/src/backend/distributed/commands/collation.c index 3c265d170..e0fc367d8 100644 --- a/src/backend/distributed/commands/collation.c +++ b/src/backend/distributed/commands/collation.c @@ -278,7 +278,7 @@ PreprocessDropCollationStmt(Node *node, const char *queryString) (void *) dropStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -311,7 +311,7 @@ PreprocessAlterCollationOwnerStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -346,7 +346,7 @@ PreprocessRenameCollationStmt(Node *node, const char *queryString) (void *) renameStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -379,7 +379,7 @@ PreprocessAlterCollationSchemaStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -604,6 +604,6 @@ PostprocessDefineCollationStmt(Node *node, const char *queryString) MarkObjectDistributed(&collationAddress); - return NodeDDLTaskList(ALL_WORKERS, CreateCollationDDLsIdempotent( + return NodeDDLTaskList(NON_COORDINATOR_NODES, CreateCollationDDLsIdempotent( collationAddress.objectId)); } diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index 591a81fa5..7b91877c9 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -190,7 +190,7 @@ PostprocessCreateExtensionStmt(Node *node, const char *queryString) MarkObjectDistributed(&extensionAddress); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -306,7 +306,7 @@ PreprocessDropExtensionStmt(Node *node, const char *queryString) (void *) deparsedStmt, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -421,7 +421,7 @@ PreprocessAlterExtensionSchemaStmt(Node *node, const char *queryString) (void *) alterExtensionStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -489,7 +489,7 @@ PreprocessAlterExtensionUpdateStmt(Node *node, const char *queryString) (void *) alterExtensionStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 38daa8bee..b835d0617 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -169,7 +169,7 @@ create_distributed_function(PG_FUNCTION_ARGS) const char *alterFunctionOwnerSQL = GetFunctionAlterOwnerCommand(funcOid); initStringInfo(&ddlCommand); appendStringInfo(&ddlCommand, "%s;%s", createFunctionSQL, alterFunctionOwnerSQL); - SendCommandToWorkersAsUser(ALL_WORKERS, CurrentUserName(), ddlCommand.data); + SendCommandToWorkersAsUser(NON_COORDINATOR_NODES, CurrentUserName(), ddlCommand.data); MarkObjectDistributed(&functionAddress); @@ -1192,7 +1192,7 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString) GetFunctionAlterOwnerCommand(address.objectId), ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -1279,7 +1279,7 @@ PreprocessAlterFunctionStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -1312,7 +1312,7 @@ PreprocessRenameFunctionStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -1343,7 +1343,7 @@ PreprocessAlterFunctionSchemaStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -1375,7 +1375,7 @@ PreprocessAlterFunctionOwnerStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -1477,7 +1477,7 @@ PreprocessDropFunctionStmt(Node *node, const char *queryString) (void *) dropStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } diff --git a/src/backend/distributed/commands/role.c b/src/backend/distributed/commands/role.c index 8ecee7680..bf0114620 100644 --- a/src/backend/distributed/commands/role.c +++ b/src/backend/distributed/commands/role.c @@ -171,7 +171,7 @@ PostprocessAlterRoleStmt(Node *node, const char *queryString) } List *commands = list_make1((void *) CreateAlterRoleIfExistsCommand(stmt)); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -212,7 +212,7 @@ PreprocessAlterRoleSetStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commandList); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commandList); } diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index a0ba540bf..3c1cd6388 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -148,7 +148,7 @@ PreprocessGrantOnSchemaStmt(Node *node, const char *queryString) stmt->objects = originalObjects; - return NodeDDLTaskList(ALL_WORKERS, list_make1(sql)); + return NodeDDLTaskList(NON_COORDINATOR_NODES, list_make1(sql)); } diff --git a/src/backend/distributed/commands/type.c b/src/backend/distributed/commands/type.c index 7f848e06f..5d8b341e1 100644 --- a/src/backend/distributed/commands/type.c +++ b/src/backend/distributed/commands/type.c @@ -162,7 +162,7 @@ PreprocessCompositeTypeStmt(Node *node, const char *queryString) (void *) compositeTypeStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -228,7 +228,7 @@ PreprocessAlterTypeStmt(Node *node, const char *queryString) (void *) alterTypeStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -271,7 +271,7 @@ PreprocessCreateEnumStmt(Node *node, const char *queryString) (void *) createEnumStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -362,7 +362,7 @@ PreprocessAlterEnumStmt(Node *node, const char *queryString) (void *) alterEnumStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -500,7 +500,7 @@ PreprocessDropTypeStmt(Node *node, const char *queryString) dropStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -534,7 +534,7 @@ PreprocessRenameTypeStmt(Node *node, const char *queryString) (void *) renameStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -567,7 +567,7 @@ PreprocessRenameTypeAttributeStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -600,7 +600,7 @@ PreprocessAlterTypeSchemaStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -657,7 +657,7 @@ PreprocessAlterTypeOwnerStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 0c5753213..696c1ad9f 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -104,7 +104,7 @@ CreateTemporarySchemasForMergeTasks(Job *topLeveLJob) { List *jobIds = ExtractJobsInJobTree(topLeveLJob); char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName()); - SendCommandToWorkersInParallel(ALL_WORKERS, createSchemasCommand, + SendCommandToWorkersInParallel(NON_COORDINATOR_NODES, createSchemasCommand, CitusExtensionOwnerName()); return jobIds; } @@ -191,7 +191,8 @@ GenerateJobCommands(List *jobIds, char *templateCommand) void DoRepartitionCleanup(List *jobIds) { - SendCommandToWorkersOptionalInParallel(ALL_WORKERS, GenerateDeleteJobsCommand(jobIds), + SendCommandToWorkersOptionalInParallel(NON_COORDINATOR_NODES, + GenerateDeleteJobsCommand(jobIds), CitusExtensionOwnerName()); } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index bd0db1d8e..29288d479 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -201,16 +201,10 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) List *workerNodeList = ActivePrimaryWorkerNodeList(lockMode); List *result = NIL; - int32 localGroupId = GetLocalGroupId(); - WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) { - if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata) - { - continue; - } - if (targetWorkerSet == OTHER_WORKERS && workerNode->groupId == localGroupId) + if (targetWorkerSet == NON_COORDINATOR_METADATA_NODES && !workerNode->hasMetadata) { continue; } @@ -232,7 +226,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) void SendBareCommandListToMetadataWorkers(List *commandList) { - TargetWorkerSet targetWorkerSet = WORKERS_WITH_METADATA; + TargetWorkerSet targetWorkerSet = NON_COORDINATOR_METADATA_NODES; List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock); char *nodeUser = CitusExtensionOwnerName(); @@ -271,7 +265,7 @@ SendBareCommandListToMetadataWorkers(List *commandList) int SendBareOptionalCommandListToAllWorkersAsUser(List *commandList, const char *user) { - TargetWorkerSet targetWorkerSet = ALL_WORKERS; + TargetWorkerSet targetWorkerSet = NON_COORDINATOR_NODES; List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock); int maxError = RESPONSE_OKAY; @@ -318,11 +312,12 @@ SendCommandToMetadataWorkersParams(const char *command, const Oid *parameterTypes, const char *const *parameterValues) { - List *workerNodeList = TargetWorkerSetNodeList(WORKERS_WITH_METADATA, ShareLock); + List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, + ShareLock); ErrorIfAnyMetadataNodeOutOfSync(workerNodeList); - SendCommandToWorkersParamsInternal(WORKERS_WITH_METADATA, command, user, + SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user, parameterCount, parameterTypes, parameterValues); } diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index f1c243cc9..e3018a599 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -18,13 +18,13 @@ /* * TargetWorkerSet is used for determining the type of workers that a command - * is targeted to. + * is targeted to. Currently it doesn't include coordinator even if it is added + * as a worker. */ typedef enum TargetWorkerSet { - WORKERS_WITH_METADATA, - OTHER_WORKERS, - ALL_WORKERS + NON_COORDINATOR_METADATA_NODES, + NON_COORDINATOR_NODES } TargetWorkerSet;