mirror of https://github.com/citusdata/citus.git
Prevent empty placement creation in the coordinator
parent
7e30e717ef
commit
fd5418b91e
|
@ -165,10 +165,11 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
|||
/* generate new and unique shardId from sequence */
|
||||
uint64 shardId = GetNextShardId();
|
||||
|
||||
/* if enough live groups, add an extra candidate node as backup */
|
||||
List *workerNodeList = AppendDistributedTablePlacementNodeList(NoLock);
|
||||
int workerNodeListLength = list_length(workerNodeList);
|
||||
|
||||
if (list_length(workerNodeList) > ShardReplicationFactor)
|
||||
/* if enough live groups, add an extra candidate node as backup */
|
||||
if (workerNodeListLength > ShardReplicationFactor)
|
||||
{
|
||||
attemptableNodeCount = ShardReplicationFactor + 1;
|
||||
}
|
||||
|
@ -177,6 +178,17 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
|||
attemptableNodeCount = ShardReplicationFactor;
|
||||
}
|
||||
|
||||
/* check if we are trying to create an empty placement on the coordinator */
|
||||
if(workerNodeListLength + 1 == ShardReplicationFactor)
|
||||
{
|
||||
List *fullWorkerNodeList = DistributedTablePlacementNodeList(NoLock);
|
||||
if(list_length(fullWorkerNodeList) == ShardReplicationFactor)
|
||||
{
|
||||
ereport(ERROR, (errmsg("Cannot create an empty placement on the coordinator."),
|
||||
errhint("Reduce shard replication factor or add a worker node.")));
|
||||
}
|
||||
}
|
||||
|
||||
/* first retrieve a list of random nodes for shard placements */
|
||||
while (candidateNodeIndex < attemptableNodeCount)
|
||||
{
|
||||
|
|
|
@ -267,7 +267,8 @@ ClientHostAddress(StringInfo clientHostStringInfo)
|
|||
|
||||
/*
|
||||
* WorkerGetNodeWithName finds and returns a node from the membership list that
|
||||
* has the given hostname. The function returns null if no such node exists.
|
||||
* has the given hostname. The node cannot be the coordinator.
|
||||
* The function returns null if no such node exists.
|
||||
*/
|
||||
static WorkerNode *
|
||||
WorkerGetNodeWithName(const char *hostname)
|
||||
|
@ -281,7 +282,7 @@ WorkerGetNodeWithName(const char *hostname)
|
|||
while ((workerNode = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH);
|
||||
if (nameCompare == 0)
|
||||
if (nameCompare == 0 && (workerNode->groupId != 0))
|
||||
{
|
||||
/* we need to terminate the scan since we break */
|
||||
hash_seq_term(&status);
|
||||
|
@ -579,8 +580,8 @@ NodeIsReadableWorker(WorkerNode *node)
|
|||
|
||||
/*
|
||||
* PrimaryNodesNotInList scans through the worker node hash and returns a list of all
|
||||
* primary nodes which are not in currentList. It runs in O(n*m) but currentList is
|
||||
* quite small.
|
||||
* primary nodes which are not in currentList and are not the coordinator.
|
||||
* It runs in O(n*m) but currentList is quite small.
|
||||
*/
|
||||
static List *
|
||||
PrimaryNodesNotInList(List *currentList)
|
||||
|
@ -599,7 +600,7 @@ PrimaryNodesNotInList(List *currentList)
|
|||
continue;
|
||||
}
|
||||
|
||||
if (NodeIsPrimary(workerNode))
|
||||
if (NodeIsPrimary(workerNode) && (workerNode->groupId != 0))
|
||||
{
|
||||
workerNodeList = lappend(workerNodeList, workerNode);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue