rename TargetWorkerSet enums (#4015)

Rename TargetWorkerSet enums to make them more explicit about what they
mean. Ideally it would be good to treat everything as a node without the
'worker' concept because it makes things complicated. Another
improvement could be to rename TargetWorkerSet as TargetNodeSet but it
goes to renaming many occurrences of Worker, which is probably too big
for this PR.
pull/4017/head
SaitTalhaNisanci 2020-07-10 11:21:27 +03:00 committed by GitHub
parent b642ed10e9
commit 3f50165365
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 41 additions and 45 deletions

View File

@ -278,7 +278,7 @@ PreprocessDropCollationStmt(Node *node, const char *queryString)
(void *) dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -311,7 +311,7 @@ PreprocessAlterCollationOwnerStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -346,7 +346,7 @@ PreprocessRenameCollationStmt(Node *node, const char *queryString)
(void *) renameStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -379,7 +379,7 @@ PreprocessAlterCollationSchemaStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -604,6 +604,6 @@ PostprocessDefineCollationStmt(Node *node, const char *queryString)
MarkObjectDistributed(&collationAddress);
return NodeDDLTaskList(ALL_WORKERS, CreateCollationDDLsIdempotent(
return NodeDDLTaskList(NON_COORDINATOR_NODES, CreateCollationDDLsIdempotent(
collationAddress.objectId));
}

View File

@ -190,7 +190,7 @@ PostprocessCreateExtensionStmt(Node *node, const char *queryString)
MarkObjectDistributed(&extensionAddress);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -306,7 +306,7 @@ PreprocessDropExtensionStmt(Node *node, const char *queryString)
(void *) deparsedStmt,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -421,7 +421,7 @@ PreprocessAlterExtensionSchemaStmt(Node *node, const char *queryString)
(void *) alterExtensionStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -489,7 +489,7 @@ PreprocessAlterExtensionUpdateStmt(Node *node, const char *queryString)
(void *) alterExtensionStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}

View File

@ -169,7 +169,7 @@ create_distributed_function(PG_FUNCTION_ARGS)
const char *alterFunctionOwnerSQL = GetFunctionAlterOwnerCommand(funcOid);
initStringInfo(&ddlCommand);
appendStringInfo(&ddlCommand, "%s;%s", createFunctionSQL, alterFunctionOwnerSQL);
SendCommandToWorkersAsUser(ALL_WORKERS, CurrentUserName(), ddlCommand.data);
SendCommandToWorkersAsUser(NON_COORDINATOR_NODES, CurrentUserName(), ddlCommand.data);
MarkObjectDistributed(&functionAddress);
@ -1192,7 +1192,7 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString)
GetFunctionAlterOwnerCommand(address.objectId),
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -1279,7 +1279,7 @@ PreprocessAlterFunctionStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -1312,7 +1312,7 @@ PreprocessRenameFunctionStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -1343,7 +1343,7 @@ PreprocessAlterFunctionSchemaStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -1375,7 +1375,7 @@ PreprocessAlterFunctionOwnerStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -1477,7 +1477,7 @@ PreprocessDropFunctionStmt(Node *node, const char *queryString)
(void *) dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}

View File

@ -169,7 +169,7 @@ PostprocessAlterRoleStmt(Node *node, const char *queryString)
}
List *commands = list_make1((void *) CreateAlterRoleIfExistsCommand(stmt));
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -210,7 +210,7 @@ PreprocessAlterRoleSetStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commandList);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commandList);
}

View File

@ -148,7 +148,7 @@ PreprocessGrantOnSchemaStmt(Node *node, const char *queryString)
stmt->objects = originalObjects;
return NodeDDLTaskList(ALL_WORKERS, list_make1(sql));
return NodeDDLTaskList(NON_COORDINATOR_NODES, list_make1(sql));
}

View File

@ -162,7 +162,7 @@ PreprocessCompositeTypeStmt(Node *node, const char *queryString)
(void *) compositeTypeStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -228,7 +228,7 @@ PreprocessAlterTypeStmt(Node *node, const char *queryString)
(void *) alterTypeStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -271,7 +271,7 @@ PreprocessCreateEnumStmt(Node *node, const char *queryString)
(void *) createEnumStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -362,7 +362,7 @@ PreprocessAlterEnumStmt(Node *node, const char *queryString)
(void *) alterEnumStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -500,7 +500,7 @@ PreprocessDropTypeStmt(Node *node, const char *queryString)
dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -534,7 +534,7 @@ PreprocessRenameTypeStmt(Node *node, const char *queryString)
(void *) renameStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -567,7 +567,7 @@ PreprocessRenameTypeAttributeStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -600,7 +600,7 @@ PreprocessAlterTypeSchemaStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
@ -657,7 +657,7 @@ PreprocessAlterTypeOwnerStmt(Node *node, const char *queryString)
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}

View File

@ -104,7 +104,7 @@ CreateTemporarySchemasForMergeTasks(Job *topLeveLJob)
{
List *jobIds = ExtractJobsInJobTree(topLeveLJob);
char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName());
SendCommandToWorkersInParallel(ALL_WORKERS, createSchemasCommand,
SendCommandToWorkersInParallel(NON_COORDINATOR_NODES, createSchemasCommand,
CitusExtensionOwnerName());
return jobIds;
}
@ -191,7 +191,8 @@ GenerateJobCommands(List *jobIds, char *templateCommand)
void
DoRepartitionCleanup(List *jobIds)
{
SendCommandToWorkersOptionalInParallel(ALL_WORKERS, GenerateDeleteJobsCommand(jobIds),
SendCommandToWorkersOptionalInParallel(NON_COORDINATOR_NODES,
GenerateDeleteJobsCommand(jobIds),
CitusExtensionOwnerName());
}

View File

@ -201,16 +201,10 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode);
List *result = NIL;
int32 localGroupId = GetLocalGroupId();
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata)
{
continue;
}
if (targetWorkerSet == OTHER_WORKERS && workerNode->groupId == localGroupId)
if (targetWorkerSet == NON_COORDINATOR_METADATA_NODES && !workerNode->hasMetadata)
{
continue;
}
@ -232,7 +226,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
void
SendBareCommandListToMetadataWorkers(List *commandList)
{
TargetWorkerSet targetWorkerSet = WORKERS_WITH_METADATA;
TargetWorkerSet targetWorkerSet = NON_COORDINATOR_METADATA_NODES;
List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
char *nodeUser = CitusExtensionOwnerName();
@ -271,7 +265,7 @@ SendBareCommandListToMetadataWorkers(List *commandList)
int
SendBareOptionalCommandListToAllWorkersAsUser(List *commandList, const char *user)
{
TargetWorkerSet targetWorkerSet = ALL_WORKERS;
TargetWorkerSet targetWorkerSet = NON_COORDINATOR_NODES;
List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
int maxError = RESPONSE_OKAY;
@ -318,11 +312,12 @@ SendCommandToMetadataWorkersParams(const char *command,
const Oid *parameterTypes,
const char *const *parameterValues)
{
List *workerNodeList = TargetWorkerSetNodeList(WORKERS_WITH_METADATA, ShareLock);
List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
ShareLock);
ErrorIfAnyMetadataNodeOutOfSync(workerNodeList);
SendCommandToWorkersParamsInternal(WORKERS_WITH_METADATA, command, user,
SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user,
parameterCount, parameterTypes,
parameterValues);
}

View File

@ -18,13 +18,13 @@
/*
* TargetWorkerSet is used for determining the type of workers that a command
* is targeted to.
* is targeted to. Currently it doesn't include coordinator even if it is added
* as a worker.
*/
typedef enum TargetWorkerSet
{
WORKERS_WITH_METADATA,
OTHER_WORKERS,
ALL_WORKERS
NON_COORDINATOR_METADATA_NODES,
NON_COORDINATOR_NODES
} TargetWorkerSet;