mirror of https://github.com/citusdata/citus.git
Add ShardPlacement fields required for colocated placement connection mapping.
parent
1d79820b74
commit
6972186652
|
@ -473,6 +473,10 @@ OutShardPlacement(OUTFUNC_ARGS)
|
||||||
WRITE_ENUM_FIELD(shardState, RelayFileState);
|
WRITE_ENUM_FIELD(shardState, RelayFileState);
|
||||||
WRITE_STRING_FIELD(nodeName);
|
WRITE_STRING_FIELD(nodeName);
|
||||||
WRITE_UINT_FIELD(nodePort);
|
WRITE_UINT_FIELD(nodePort);
|
||||||
|
/* so we can deal with 0 */
|
||||||
|
WRITE_INT_FIELD(partitionMethod);
|
||||||
|
WRITE_UINT_FIELD(colocationGroupId);
|
||||||
|
WRITE_UINT_FIELD(representativeValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -268,6 +268,10 @@ ReadShardPlacement(READFUNC_ARGS)
|
||||||
READ_ENUM_FIELD(shardState, RelayFileState);
|
READ_ENUM_FIELD(shardState, RelayFileState);
|
||||||
READ_STRING_FIELD(nodeName);
|
READ_STRING_FIELD(nodeName);
|
||||||
READ_UINT_FIELD(nodePort);
|
READ_UINT_FIELD(nodePort);
|
||||||
|
/* so we can deal with 0 */
|
||||||
|
READ_INT_FIELD(partitionMethod);
|
||||||
|
READ_UINT_FIELD(colocationGroupId);
|
||||||
|
READ_UINT_FIELD(representativeValue);
|
||||||
|
|
||||||
READ_DONE();
|
READ_DONE();
|
||||||
}
|
}
|
||||||
|
|
|
@ -703,9 +703,31 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
|
||||||
foreach(placementCell, placementList)
|
foreach(placementCell, placementList)
|
||||||
{
|
{
|
||||||
ShardPlacement *srcPlacement = (ShardPlacement *) lfirst(placementCell);
|
ShardPlacement *srcPlacement = (ShardPlacement *) lfirst(placementCell);
|
||||||
|
ShardPlacement *dstPlacement = &placementArray[placementOffset];
|
||||||
|
|
||||||
CopyShardPlacement(srcPlacement, &placementArray[placementOffset]);
|
CopyShardPlacement(srcPlacement, dstPlacement);
|
||||||
|
|
||||||
|
/* fill in remaining fields */
|
||||||
|
Assert(cacheEntry->partitionMethod != 0);
|
||||||
|
dstPlacement->partitionMethod = cacheEntry->partitionMethod;
|
||||||
|
dstPlacement->colocationGroupId = cacheEntry->colocationId;
|
||||||
|
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH)
|
||||||
|
{
|
||||||
|
Assert(shardInterval->minValueExists);
|
||||||
|
Assert(shardInterval->valueTypeId == INT4OID);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Use the lower boundary of the interval's range to identify
|
||||||
|
* it for colocation purposes. That remains meaningful even if
|
||||||
|
* a concurrent session splits a shard.
|
||||||
|
*/
|
||||||
|
dstPlacement->representativeValue =
|
||||||
|
DatumGetInt32(shardInterval->minValue);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
dstPlacement->representativeValue = 0;
|
||||||
|
}
|
||||||
placementOffset++;
|
placementOffset++;
|
||||||
}
|
}
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
|
@ -53,6 +53,9 @@ typedef struct ShardPlacement
|
||||||
RelayFileState shardState;
|
RelayFileState shardState;
|
||||||
char *nodeName;
|
char *nodeName;
|
||||||
uint32 nodePort;
|
uint32 nodePort;
|
||||||
|
char partitionMethod;
|
||||||
|
uint32 colocationGroupId;
|
||||||
|
uint32 representativeValue;
|
||||||
} ShardPlacement;
|
} ShardPlacement;
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue