diff --git a/src/backend/distributed/commands/collation.c b/src/backend/distributed/commands/collation.c index 3c265d170..e0fc367d8 100644 --- a/src/backend/distributed/commands/collation.c +++ b/src/backend/distributed/commands/collation.c @@ -278,7 +278,7 @@ PreprocessDropCollationStmt(Node *node, const char *queryString) (void *) dropStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -311,7 +311,7 @@ PreprocessAlterCollationOwnerStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -346,7 +346,7 @@ PreprocessRenameCollationStmt(Node *node, const char *queryString) (void *) renameStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -379,7 +379,7 @@ PreprocessAlterCollationSchemaStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -604,6 +604,6 @@ PostprocessDefineCollationStmt(Node *node, const char *queryString) MarkObjectDistributed(&collationAddress); - return NodeDDLTaskList(ALL_WORKERS, CreateCollationDDLsIdempotent( + return NodeDDLTaskList(NON_COORDINATOR_NODES, CreateCollationDDLsIdempotent( collationAddress.objectId)); } diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index 591a81fa5..7b91877c9 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -190,7 +190,7 @@ PostprocessCreateExtensionStmt(Node *node, const char *queryString) MarkObjectDistributed(&extensionAddress); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -306,7 +306,7 @@ PreprocessDropExtensionStmt(Node *node, const char *queryString) (void *) deparsedStmt, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -421,7 +421,7 @@ PreprocessAlterExtensionSchemaStmt(Node *node, const char *queryString) (void *) alterExtensionStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -489,7 +489,7 @@ PreprocessAlterExtensionUpdateStmt(Node *node, const char *queryString) (void *) alterExtensionStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 9ff2159db..1164114a1 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -169,7 +169,7 @@ create_distributed_function(PG_FUNCTION_ARGS) const char *alterFunctionOwnerSQL = GetFunctionAlterOwnerCommand(funcOid); initStringInfo(&ddlCommand); appendStringInfo(&ddlCommand, "%s;%s", createFunctionSQL, alterFunctionOwnerSQL); - SendCommandToWorkersAsUser(ALL_WORKERS, CurrentUserName(), ddlCommand.data); + SendCommandToWorkersAsUser(NON_COORDINATOR_NODES, CurrentUserName(), ddlCommand.data); MarkObjectDistributed(&functionAddress); @@ -1192,7 +1192,7 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString) GetFunctionAlterOwnerCommand(address.objectId), ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -1279,7 +1279,7 @@ PreprocessAlterFunctionStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -1312,7 +1312,7 @@ PreprocessRenameFunctionStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -1343,7 +1343,7 @@ PreprocessAlterFunctionSchemaStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -1375,7 +1375,7 @@ PreprocessAlterFunctionOwnerStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -1477,7 +1477,7 @@ PreprocessDropFunctionStmt(Node *node, const char *queryString) (void *) dropStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } diff --git a/src/backend/distributed/commands/role.c b/src/backend/distributed/commands/role.c index c6ec3d473..cb4b9d9dd 100644 --- a/src/backend/distributed/commands/role.c +++ b/src/backend/distributed/commands/role.c @@ -169,7 +169,7 @@ PostprocessAlterRoleStmt(Node *node, const char *queryString) } List *commands = list_make1((void *) CreateAlterRoleIfExistsCommand(stmt)); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -210,7 +210,7 @@ PreprocessAlterRoleSetStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commandList); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commandList); } diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index a0ba540bf..3c1cd6388 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -148,7 +148,7 @@ PreprocessGrantOnSchemaStmt(Node *node, const char *queryString) stmt->objects = originalObjects; - return NodeDDLTaskList(ALL_WORKERS, list_make1(sql)); + return NodeDDLTaskList(NON_COORDINATOR_NODES, list_make1(sql)); } diff --git a/src/backend/distributed/commands/type.c b/src/backend/distributed/commands/type.c index 7f848e06f..5d8b341e1 100644 --- a/src/backend/distributed/commands/type.c +++ b/src/backend/distributed/commands/type.c @@ -162,7 +162,7 @@ PreprocessCompositeTypeStmt(Node *node, const char *queryString) (void *) compositeTypeStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -228,7 +228,7 @@ PreprocessAlterTypeStmt(Node *node, const char *queryString) (void *) alterTypeStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -271,7 +271,7 @@ PreprocessCreateEnumStmt(Node *node, const char *queryString) (void *) createEnumStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -362,7 +362,7 @@ PreprocessAlterEnumStmt(Node *node, const char *queryString) (void *) alterEnumStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -500,7 +500,7 @@ PreprocessDropTypeStmt(Node *node, const char *queryString) dropStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -534,7 +534,7 @@ PreprocessRenameTypeStmt(Node *node, const char *queryString) (void *) renameStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -567,7 +567,7 @@ PreprocessRenameTypeAttributeStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -600,7 +600,7 @@ PreprocessAlterTypeSchemaStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -657,7 +657,7 @@ PreprocessAlterTypeOwnerStmt(Node *node, const char *queryString) (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(ALL_WORKERS, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 0c5753213..696c1ad9f 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -104,7 +104,7 @@ CreateTemporarySchemasForMergeTasks(Job *topLeveLJob) { List *jobIds = ExtractJobsInJobTree(topLeveLJob); char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName()); - SendCommandToWorkersInParallel(ALL_WORKERS, createSchemasCommand, + SendCommandToWorkersInParallel(NON_COORDINATOR_NODES, createSchemasCommand, CitusExtensionOwnerName()); return jobIds; } @@ -191,7 +191,8 @@ GenerateJobCommands(List *jobIds, char *templateCommand) void DoRepartitionCleanup(List *jobIds) { - SendCommandToWorkersOptionalInParallel(ALL_WORKERS, GenerateDeleteJobsCommand(jobIds), + SendCommandToWorkersOptionalInParallel(NON_COORDINATOR_NODES, + GenerateDeleteJobsCommand(jobIds), CitusExtensionOwnerName()); } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 7e010f8c9..0c0dad87d 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -201,16 +201,10 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); List *result = NIL; - int32 localGroupId = GetLocalGroupId(); - WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) { - if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata) - { - continue; - } - if (targetWorkerSet == OTHER_WORKERS && workerNode->groupId == localGroupId) + if (targetWorkerSet == NON_COORDINATOR_METADATA_NODES && !workerNode->hasMetadata) { continue; } @@ -232,7 +226,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) void SendBareCommandListToMetadataWorkers(List *commandList) { - TargetWorkerSet targetWorkerSet = WORKERS_WITH_METADATA; + TargetWorkerSet targetWorkerSet = NON_COORDINATOR_METADATA_NODES; List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock); char *nodeUser = CitusExtensionOwnerName(); @@ -271,7 +265,7 @@ SendBareCommandListToMetadataWorkers(List *commandList) int SendBareOptionalCommandListToAllWorkersAsUser(List *commandList, const char *user) { - TargetWorkerSet targetWorkerSet = ALL_WORKERS; + TargetWorkerSet targetWorkerSet = NON_COORDINATOR_NODES; List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock); int maxError = RESPONSE_OKAY; @@ -318,11 +312,12 @@ SendCommandToMetadataWorkersParams(const char *command, const Oid *parameterTypes, const char *const *parameterValues) { - List *workerNodeList = TargetWorkerSetNodeList(WORKERS_WITH_METADATA, ShareLock); + List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, + ShareLock); ErrorIfAnyMetadataNodeOutOfSync(workerNodeList); - SendCommandToWorkersParamsInternal(WORKERS_WITH_METADATA, command, user, + SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user, parameterCount, parameterTypes, parameterValues); } diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index f1c243cc9..e3018a599 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -18,13 +18,13 @@ /* * TargetWorkerSet is used for determining the type of workers that a command - * is targeted to. + * is targeted to. Currently it doesn't include coordinator even if it is added + * as a worker. */ typedef enum TargetWorkerSet { - WORKERS_WITH_METADATA, - OTHER_WORKERS, - ALL_WORKERS + NON_COORDINATOR_METADATA_NODES, + NON_COORDINATOR_NODES } TargetWorkerSet;