rename TargetWorkerSet enums

Rename TargetWorkerSet enums to make them more explicit about what they
mean. Ideally it would be good to treat everything as a node without the
'worker' concept because it makes things complicated. Another
improvement could be to rename TargetWorkerSet as TargetNodeSet but it
goes to renaming many occurrences of Worker, which is probably too big
for this PR.

(cherry picked from commit de4b9569359e4f10d4ebf3fbcf7159ee6e2328db)
pull/4206/head
Sait Talha Nisanci 2020-07-08 20:15:34 +03:00
parent 23f24a9668
commit 0bd4002e5f
9 changed files with 41 additions and 45 deletions

View File

@ -278,7 +278,7 @@ PreprocessDropCollationStmt(Node *node, const char *queryString)
(void *) dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -311,7 +311,7 @@ PreprocessAlterCollationOwnerStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -346,7 +346,7 @@ PreprocessRenameCollationStmt(Node *node, const char *queryString)
(void *) renameStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -379,7 +379,7 @@ PreprocessAlterCollationSchemaStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -604,6 +604,6 @@ PostprocessDefineCollationStmt(Node *node, const char *queryString)
MarkObjectDistributed(&collationAddress);
return NodeDDLTaskList(ALL_WORKERS, CreateCollationDDLsIdempotent(
return NodeDDLTaskList(NON_COORDINATOR_NODES, CreateCollationDDLsIdempotent(
collationAddress.objectId));
}

View File

@ -190,7 +190,7 @@ PostprocessCreateExtensionStmt(Node *node, const char *queryString)
MarkObjectDistributed(&extensionAddress);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -306,7 +306,7 @@ PreprocessDropExtensionStmt(Node *node, const char *queryString)
(void *) deparsedStmt,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -421,7 +421,7 @@ PreprocessAlterExtensionSchemaStmt(Node *node, const char *queryString)
(void *) alterExtensionStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -489,7 +489,7 @@ PreprocessAlterExtensionUpdateStmt(Node *node, const char *queryString)
(void *) alterExtensionStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}

View File

@ -169,7 +169,7 @@ create_distributed_function(PG_FUNCTION_ARGS)
const char *alterFunctionOwnerSQL = GetFunctionAlterOwnerCommand(funcOid);
initStringInfo(&ddlCommand);
appendStringInfo(&ddlCommand, "%s;%s", createFunctionSQL, alterFunctionOwnerSQL);
SendCommandToWorkersAsUser(ALL_WORKERS, CurrentUserName(), ddlCommand.data);
SendCommandToWorkersAsUser(NON_COORDINATOR_NODES, CurrentUserName(), ddlCommand.data);
MarkObjectDistributed(&functionAddress);
@ -1192,7 +1192,7 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString)
GetFunctionAlterOwnerCommand(address.objectId),
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -1279,7 +1279,7 @@ PreprocessAlterFunctionStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -1312,7 +1312,7 @@ PreprocessRenameFunctionStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -1343,7 +1343,7 @@ PreprocessAlterFunctionSchemaStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -1375,7 +1375,7 @@ PreprocessAlterFunctionOwnerStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -1477,7 +1477,7 @@ PreprocessDropFunctionStmt(Node *node, const char *queryString)
(void *) dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}

View File

@ -171,7 +171,7 @@ PostprocessAlterRoleStmt(Node *node, const char *queryString)
}
List *commands = list_make1((void *) CreateAlterRoleIfExistsCommand(stmt));
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -212,7 +212,7 @@ PreprocessAlterRoleSetStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commandList);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commandList);
}

View File

@ -148,7 +148,7 @@ PreprocessGrantOnSchemaStmt(Node *node, const char *queryString)
stmt->objects = originalObjects;
return NodeDDLTaskList(ALL_WORKERS, list_make1(sql));
return NodeDDLTaskList(NON_COORDINATOR_NODES, list_make1(sql));
}

View File

@ -162,7 +162,7 @@ PreprocessCompositeTypeStmt(Node *node, const char *queryString)
(void *) compositeTypeStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -228,7 +228,7 @@ PreprocessAlterTypeStmt(Node *node, const char *queryString)
(void *) alterTypeStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -271,7 +271,7 @@ PreprocessCreateEnumStmt(Node *node, const char *queryString)
(void *) createEnumStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -362,7 +362,7 @@ PreprocessAlterEnumStmt(Node *node, const char *queryString)
(void *) alterEnumStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -500,7 +500,7 @@ PreprocessDropTypeStmt(Node *node, const char *queryString)
dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -534,7 +534,7 @@ PreprocessRenameTypeStmt(Node *node, const char *queryString)
(void *) renameStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -567,7 +567,7 @@ PreprocessRenameTypeAttributeStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -600,7 +600,7 @@ PreprocessAlterTypeSchemaStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -657,7 +657,7 @@ PreprocessAlterTypeOwnerStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}

View File

@ -104,7 +104,7 @@ CreateTemporarySchemasForMergeTasks(Job *topLeveLJob)
{
List *jobIds = ExtractJobsInJobTree(topLeveLJob);
char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName());
SendCommandToWorkersInParallel(ALL_WORKERS, createSchemasCommand,
SendCommandToWorkersInParallel(NON_COORDINATOR_NODES, createSchemasCommand,
CitusExtensionOwnerName());
return jobIds;
}
@ -191,7 +191,8 @@ GenerateJobCommands(List *jobIds, char *templateCommand)
void
DoRepartitionCleanup(List *jobIds)
{
SendCommandToWorkersOptionalInParallel(ALL_WORKERS, GenerateDeleteJobsCommand(jobIds),
SendCommandToWorkersOptionalInParallel(NON_COORDINATOR_NODES,
GenerateDeleteJobsCommand(jobIds),
CitusExtensionOwnerName());
}

View File

@ -201,16 +201,10 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
List *workerNodeList = ActivePrimaryWorkerNodeList(lockMode);
List *result = NIL;
int32 localGroupId = GetLocalGroupId();
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata)
{
continue;
}
if (targetWorkerSet == OTHER_WORKERS && workerNode->groupId == localGroupId)
if (targetWorkerSet == NON_COORDINATOR_METADATA_NODES && !workerNode->hasMetadata)
{
continue;
}
@ -232,7 +226,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
void
SendBareCommandListToMetadataWorkers(List *commandList)
{
TargetWorkerSet targetWorkerSet = WORKERS_WITH_METADATA;
TargetWorkerSet targetWorkerSet = NON_COORDINATOR_METADATA_NODES;
List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
char *nodeUser = CitusExtensionOwnerName();
@ -271,7 +265,7 @@ SendBareCommandListToMetadataWorkers(List *commandList)
int
SendBareOptionalCommandListToAllWorkersAsUser(List *commandList, const char *user)
{
TargetWorkerSet targetWorkerSet = ALL_WORKERS;
TargetWorkerSet targetWorkerSet = NON_COORDINATOR_NODES;
List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
int maxError = RESPONSE_OKAY;
@ -318,11 +312,12 @@ SendCommandToMetadataWorkersParams(const char *command,
const Oid *parameterTypes,
const char *const *parameterValues)
{
List *workerNodeList = TargetWorkerSetNodeList(WORKERS_WITH_METADATA, ShareLock);
List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
ShareLock);
ErrorIfAnyMetadataNodeOutOfSync(workerNodeList);
SendCommandToWorkersParamsInternal(WORKERS_WITH_METADATA, command, user,
SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user,
parameterCount, parameterTypes,
parameterValues);
}

View File

@ -18,13 +18,13 @@
/*
* TargetWorkerSet is used for determining the type of workers that a command
* is targeted to.
* is targeted to. Currently it doesn't include coordinator even if it is added
* as a worker.
*/
typedef enum TargetWorkerSet
{
WORKERS_WITH_METADATA,
OTHER_WORKERS,
ALL_WORKERS
NON_COORDINATOR_METADATA_NODES,
NON_COORDINATOR_NODES
} TargetWorkerSet;