mirror of https://github.com/citusdata/citus.git
Add nodeId to shardPlacements and use it for shard placement comparisons
Before this commit, shardPlacements were identified with shardId, nodeName and nodeport. Instead of using nodeName and nodePort, we now use nodeId since it apparently has performance benefits in several places in the code.pull/2633/head
parent
32ee0217d5
commit
0ea4e52df5
|
@ -1051,6 +1051,13 @@ RemoteFinalizedShardPlacementList(uint64 shardId)
|
||||||
shardPlacement->nodeName = nodeName;
|
shardPlacement->nodeName = nodeName;
|
||||||
shardPlacement->nodePort = nodePort;
|
shardPlacement->nodePort = nodePort;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We cannot know the nodeId, but it is not necessary at this point either.
|
||||||
|
* This is only used to to look up the connection for a group of co-located
|
||||||
|
* placements, but append-distributed tables are never co-located.
|
||||||
|
*/
|
||||||
|
shardPlacement->nodeId = -1;
|
||||||
|
|
||||||
finalizedPlacementList = lappend(finalizedPlacementList, shardPlacement);
|
finalizedPlacementList = lappend(finalizedPlacementList, shardPlacement);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,8 +133,7 @@ static HTAB *ConnectionPlacementHash;
|
||||||
typedef struct ColocatedPlacementsHashKey
|
typedef struct ColocatedPlacementsHashKey
|
||||||
{
|
{
|
||||||
/* to identify host - database can't differ */
|
/* to identify host - database can't differ */
|
||||||
char nodeName[MAX_NODE_LENGTH];
|
int nodeId;
|
||||||
uint32 nodePort;
|
|
||||||
|
|
||||||
/* colocation group, or invalid */
|
/* colocation group, or invalid */
|
||||||
uint32 colocationGroupId;
|
uint32 colocationGroupId;
|
||||||
|
@ -689,8 +688,7 @@ FindOrCreatePlacementEntry(ShardPlacement *placement)
|
||||||
ColocatedPlacementsHashKey key;
|
ColocatedPlacementsHashKey key;
|
||||||
ColocatedPlacementsHashEntry *colocatedEntry = NULL;
|
ColocatedPlacementsHashEntry *colocatedEntry = NULL;
|
||||||
|
|
||||||
strlcpy(key.nodeName, placement->nodeName, MAX_NODE_LENGTH);
|
key.nodeId = placement->nodeId;
|
||||||
key.nodePort = placement->nodePort;
|
|
||||||
key.colocationGroupId = placement->colocationGroupId;
|
key.colocationGroupId = placement->colocationGroupId;
|
||||||
key.representativeValue = placement->representativeValue;
|
key.representativeValue = placement->representativeValue;
|
||||||
|
|
||||||
|
@ -1149,8 +1147,7 @@ ColocatedPlacementsHashHash(const void *key, Size keysize)
|
||||||
ColocatedPlacementsHashKey *entry = (ColocatedPlacementsHashKey *) key;
|
ColocatedPlacementsHashKey *entry = (ColocatedPlacementsHashKey *) key;
|
||||||
uint32 hash = 0;
|
uint32 hash = 0;
|
||||||
|
|
||||||
hash = string_hash(entry->nodeName, NAMEDATALEN);
|
hash = hash_uint32(entry->nodeId);
|
||||||
hash = hash_combine(hash, hash_uint32(entry->nodePort));
|
|
||||||
hash = hash_combine(hash, hash_uint32(entry->colocationGroupId));
|
hash = hash_combine(hash, hash_uint32(entry->colocationGroupId));
|
||||||
hash = hash_combine(hash, hash_uint32(entry->representativeValue));
|
hash = hash_combine(hash, hash_uint32(entry->representativeValue));
|
||||||
|
|
||||||
|
@ -1164,8 +1161,7 @@ ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize)
|
||||||
ColocatedPlacementsHashKey *ca = (ColocatedPlacementsHashKey *) a;
|
ColocatedPlacementsHashKey *ca = (ColocatedPlacementsHashKey *) a;
|
||||||
ColocatedPlacementsHashKey *cb = (ColocatedPlacementsHashKey *) b;
|
ColocatedPlacementsHashKey *cb = (ColocatedPlacementsHashKey *) b;
|
||||||
|
|
||||||
if (strncmp(ca->nodeName, cb->nodeName, MAX_NODE_LENGTH) != 0 ||
|
if (ca->nodeId != cb->nodeId ||
|
||||||
ca->nodePort != cb->nodePort ||
|
|
||||||
ca->colocationGroupId != cb->colocationGroupId ||
|
ca->colocationGroupId != cb->colocationGroupId ||
|
||||||
ca->representativeValue != cb->representativeValue)
|
ca->representativeValue != cb->representativeValue)
|
||||||
{
|
{
|
||||||
|
|
|
@ -2552,8 +2552,7 @@ CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterv
|
||||||
ShardPlacement *secondShardPlacement = (ShardPlacement *) lfirst(
|
ShardPlacement *secondShardPlacement = (ShardPlacement *) lfirst(
|
||||||
secondShardPlacementCell);
|
secondShardPlacementCell);
|
||||||
|
|
||||||
if (strcmp(firstShardPlacement->nodeName, secondShardPlacement->nodeName) != 0 ||
|
if (firstShardPlacement->nodeId != secondShardPlacement->nodeId)
|
||||||
firstShardPlacement->nodePort != secondShardPlacement->nodePort)
|
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -5375,6 +5374,7 @@ AssignDualHashTaskList(List *taskList)
|
||||||
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
|
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
|
||||||
taskPlacement->nodeName = pstrdup(workerNode->workerName);
|
taskPlacement->nodeName = pstrdup(workerNode->workerName);
|
||||||
taskPlacement->nodePort = workerNode->workerPort;
|
taskPlacement->nodePort = workerNode->workerPort;
|
||||||
|
taskPlacement->nodeId = workerNode->nodeId;
|
||||||
|
|
||||||
taskPlacementList = lappend(taskPlacementList, taskPlacement);
|
taskPlacementList = lappend(taskPlacementList, taskPlacement);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2027,6 +2027,7 @@ PlanRouterQuery(Query *originalQuery,
|
||||||
(ShardPlacement *) CitusMakeNode(ShardPlacement);
|
(ShardPlacement *) CitusMakeNode(ShardPlacement);
|
||||||
dummyPlacement->nodeName = workerNode->workerName;
|
dummyPlacement->nodeName = workerNode->workerName;
|
||||||
dummyPlacement->nodePort = workerNode->workerPort;
|
dummyPlacement->nodePort = workerNode->workerPort;
|
||||||
|
dummyPlacement->nodeId = workerNode->nodeId;
|
||||||
dummyPlacement->groupId = workerNode->groupId;
|
dummyPlacement->groupId = workerNode->groupId;
|
||||||
|
|
||||||
workerList = lappend(workerList, dummyPlacement);
|
workerList = lappend(workerList, dummyPlacement);
|
||||||
|
|
|
@ -200,6 +200,7 @@ CopyNodeShardPlacement(COPYFUNC_ARGS)
|
||||||
COPY_SCALAR_FIELD(groupId);
|
COPY_SCALAR_FIELD(groupId);
|
||||||
COPY_STRING_FIELD(nodeName);
|
COPY_STRING_FIELD(nodeName);
|
||||||
COPY_SCALAR_FIELD(nodePort);
|
COPY_SCALAR_FIELD(nodePort);
|
||||||
|
COPY_SCALAR_FIELD(nodeId);
|
||||||
COPY_SCALAR_FIELD(partitionMethod);
|
COPY_SCALAR_FIELD(partitionMethod);
|
||||||
COPY_SCALAR_FIELD(colocationGroupId);
|
COPY_SCALAR_FIELD(colocationGroupId);
|
||||||
COPY_SCALAR_FIELD(representativeValue);
|
COPY_SCALAR_FIELD(representativeValue);
|
||||||
|
|
|
@ -405,6 +405,7 @@ OutShardPlacement(OUTFUNC_ARGS)
|
||||||
WRITE_INT_FIELD(groupId);
|
WRITE_INT_FIELD(groupId);
|
||||||
WRITE_STRING_FIELD(nodeName);
|
WRITE_STRING_FIELD(nodeName);
|
||||||
WRITE_UINT_FIELD(nodePort);
|
WRITE_UINT_FIELD(nodePort);
|
||||||
|
WRITE_INT_FIELD(nodeId);
|
||||||
/* so we can deal with 0 */
|
/* so we can deal with 0 */
|
||||||
WRITE_INT_FIELD(partitionMethod);
|
WRITE_INT_FIELD(partitionMethod);
|
||||||
WRITE_UINT_FIELD(colocationGroupId);
|
WRITE_UINT_FIELD(colocationGroupId);
|
||||||
|
|
|
@ -315,6 +315,7 @@ ReadShardPlacement(READFUNC_ARGS)
|
||||||
READ_INT_FIELD(groupId);
|
READ_INT_FIELD(groupId);
|
||||||
READ_STRING_FIELD(nodeName);
|
READ_STRING_FIELD(nodeName);
|
||||||
READ_UINT_FIELD(nodePort);
|
READ_UINT_FIELD(nodePort);
|
||||||
|
READ_INT_FIELD(nodeId);
|
||||||
/* so we can deal with 0 */
|
/* so we can deal with 0 */
|
||||||
READ_INT_FIELD(partitionMethod);
|
READ_INT_FIELD(partitionMethod);
|
||||||
READ_UINT_FIELD(colocationGroupId);
|
READ_UINT_FIELD(colocationGroupId);
|
||||||
|
|
|
@ -420,25 +420,12 @@ CompareShardPlacementsByNode(const void *leftElement, const void *rightElement)
|
||||||
const ShardPlacement *leftPlacement = *((const ShardPlacement **) leftElement);
|
const ShardPlacement *leftPlacement = *((const ShardPlacement **) leftElement);
|
||||||
const ShardPlacement *rightPlacement = *((const ShardPlacement **) rightElement);
|
const ShardPlacement *rightPlacement = *((const ShardPlacement **) rightElement);
|
||||||
|
|
||||||
char *leftNodeName = leftPlacement->nodeName;
|
|
||||||
char *rightNodeName = rightPlacement->nodeName;
|
|
||||||
|
|
||||||
uint32 leftNodePort = leftPlacement->nodePort;
|
|
||||||
uint32 rightNodePort = rightPlacement->nodePort;
|
|
||||||
|
|
||||||
/* first compare node names */
|
|
||||||
int nodeNameCompare = strncmp(leftNodeName, rightNodeName, WORKER_LENGTH);
|
|
||||||
if (nodeNameCompare != 0)
|
|
||||||
{
|
|
||||||
return nodeNameCompare;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* if node names are same, check node ports */
|
/* if node names are same, check node ports */
|
||||||
if (leftNodePort < rightNodePort)
|
if (leftPlacement->nodeId < rightPlacement->nodeId)
|
||||||
{
|
{
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
else if (leftNodePort > rightNodePort)
|
else if (leftPlacement->nodeId > rightPlacement->nodeId)
|
||||||
{
|
{
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -526,6 +526,7 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
|
||||||
|
|
||||||
shardPlacement->nodeName = pstrdup(workerNode->workerName);
|
shardPlacement->nodeName = pstrdup(workerNode->workerName);
|
||||||
shardPlacement->nodePort = workerNode->workerPort;
|
shardPlacement->nodePort = workerNode->workerPort;
|
||||||
|
shardPlacement->nodeId = workerNode->nodeId;
|
||||||
|
|
||||||
/* fill in remaining fields */
|
/* fill in remaining fields */
|
||||||
Assert(tableEntry->partitionMethod != 0);
|
Assert(tableEntry->partitionMethod != 0);
|
||||||
|
|
|
@ -99,6 +99,7 @@ typedef struct ShardPlacement
|
||||||
/* the rest of the fields aren't from pg_dist_placement */
|
/* the rest of the fields aren't from pg_dist_placement */
|
||||||
char *nodeName;
|
char *nodeName;
|
||||||
uint32 nodePort;
|
uint32 nodePort;
|
||||||
|
int nodeId;
|
||||||
char partitionMethod;
|
char partitionMethod;
|
||||||
uint32 colocationGroupId;
|
uint32 colocationGroupId;
|
||||||
uint32 representativeValue;
|
uint32 representativeValue;
|
||||||
|
|
Loading…
Reference in New Issue