mirror of https://github.com/citusdata/citus.git
Fix int32 overflow and use PG macros for INT32_XX (#4061)
* Use CalculateUniformHashRangeIndex in HashPartitionId
INT32_MIN definition can change among different platforms hence it is
possible to get overflow, we would see crashes because of this in debian
distros. We have already solved a similar problem with introducing
CalculateUniformHashRangeIndex method, hence to solve it we can use the
same method, this also removes some duplication and has a single place
to decide that.
* Use PG_INT32_XX instead of INT32_XX to be safer
(cherry picked from commit ef841115de
)
Conflicts:
src/backend/distributed/commands/multi_copy.c
release-9.2
parent
b504b749a6
commit
c6c31e0f1f
|
@ -198,14 +198,14 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
||||||
uint32 roundRobinNodeIndex = shardIndex % workerNodeCount;
|
uint32 roundRobinNodeIndex = shardIndex % workerNodeCount;
|
||||||
|
|
||||||
/* initialize the hash token space for this shard */
|
/* initialize the hash token space for this shard */
|
||||||
int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement);
|
int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
|
||||||
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
|
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
|
||||||
uint64 shardId = GetNextShardId();
|
uint64 shardId = GetNextShardId();
|
||||||
|
|
||||||
/* if we are at the last shard, make sure the max token value is INT_MAX */
|
/* if we are at the last shard, make sure the max token value is INT_MAX */
|
||||||
if (shardIndex == (shardCount - 1))
|
if (shardIndex == (shardCount - 1))
|
||||||
{
|
{
|
||||||
shardMaxHashToken = INT32_MAX;
|
shardMaxHashToken = PG_INT32_MAX;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* insert the shard metadata row along with its min/max values */
|
/* insert the shard metadata row along with its min/max values */
|
||||||
|
|
|
@ -1391,12 +1391,12 @@ HasUniformHashDistribution(ShardInterval **shardIntervalArray,
|
||||||
for (int shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++)
|
for (int shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++)
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = shardIntervalArray[shardIndex];
|
ShardInterval *shardInterval = shardIntervalArray[shardIndex];
|
||||||
int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement);
|
int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
|
||||||
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
|
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
|
||||||
|
|
||||||
if (shardIndex == (shardIntervalArrayLength - 1))
|
if (shardIndex == (shardIntervalArrayLength - 1))
|
||||||
{
|
{
|
||||||
shardMaxHashToken = INT32_MAX;
|
shardMaxHashToken = PG_INT32_MAX;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (DatumGetInt32(shardInterval->minValue) != shardMinHashToken ||
|
if (DatumGetInt32(shardInterval->minValue) != shardMinHashToken ||
|
||||||
|
|
|
@ -4411,7 +4411,7 @@ GenerateSyntheticShardIntervalArray(int partitionCount)
|
||||||
ShardInterval *shardInterval = CitusMakeNode(ShardInterval);
|
ShardInterval *shardInterval = CitusMakeNode(ShardInterval);
|
||||||
|
|
||||||
/* calculate the split of the hash space */
|
/* calculate the split of the hash space */
|
||||||
int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement);
|
int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
|
||||||
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
|
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
|
||||||
|
|
||||||
shardInterval->relationId = InvalidOid;
|
shardInterval->relationId = InvalidOid;
|
||||||
|
|
|
@ -219,8 +219,8 @@ create_monolithic_shard_row(PG_FUNCTION_ARGS)
|
||||||
StringInfo maxInfo = makeStringInfo();
|
StringInfo maxInfo = makeStringInfo();
|
||||||
uint64 newShardId = GetNextShardId();
|
uint64 newShardId = GetNextShardId();
|
||||||
|
|
||||||
appendStringInfo(minInfo, "%d", INT32_MIN);
|
appendStringInfo(minInfo, "%d", PG_INT32_MIN);
|
||||||
appendStringInfo(maxInfo, "%d", INT32_MAX);
|
appendStringInfo(maxInfo, "%d", PG_INT32_MAX);
|
||||||
|
|
||||||
text *minInfoText = cstring_to_text(minInfo->data);
|
text *minInfoText = cstring_to_text(minInfo->data);
|
||||||
text *maxInfoText = cstring_to_text(maxInfo->data);
|
text *maxInfoText = cstring_to_text(maxInfo->data);
|
||||||
|
|
|
@ -26,9 +26,6 @@
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
|
|
||||||
|
|
||||||
static int CalculateUniformHashRangeIndex(int hashedValue, int shardCount);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* LowestShardIntervalById returns the shard interval with the lowest shard
|
* LowestShardIntervalById returns the shard interval with the lowest shard
|
||||||
* ID from a list of shard intervals.
|
* ID from a list of shard intervals.
|
||||||
|
@ -311,7 +308,7 @@ FindShardInterval(Datum partitionColumnValue, DistTableCacheEntry *cacheEntry)
|
||||||
* INVALID_SHARD_INDEX is returned). This should only happen if something is
|
* INVALID_SHARD_INDEX is returned). This should only happen if something is
|
||||||
* terribly wrong, either metadata tables are corrupted or we have a bug
|
* terribly wrong, either metadata tables are corrupted or we have a bug
|
||||||
* somewhere. Such as a hash function which returns a value not in the range
|
* somewhere. Such as a hash function which returns a value not in the range
|
||||||
* of [INT32_MIN, INT32_MAX] can fire this.
|
* of [PG_INT32_MIN, PG_INT32_MAX] can fire this.
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry)
|
FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry)
|
||||||
|
@ -443,13 +440,13 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter
|
||||||
* NOTE: This function is ONLY for hash-distributed tables with uniform
|
* NOTE: This function is ONLY for hash-distributed tables with uniform
|
||||||
* hash ranges.
|
* hash ranges.
|
||||||
*/
|
*/
|
||||||
static int
|
int
|
||||||
CalculateUniformHashRangeIndex(int hashedValue, int shardCount)
|
CalculateUniformHashRangeIndex(int hashedValue, int shardCount)
|
||||||
{
|
{
|
||||||
int64 hashedValue64 = (int64) hashedValue;
|
int64 hashedValue64 = (int64) hashedValue;
|
||||||
|
|
||||||
/* normalize to the 0-UINT32_MAX range */
|
/* normalize to the 0-UINT32_MAX range */
|
||||||
int64 normalizedHashValue = hashedValue64 - INT32_MIN;
|
int64 normalizedHashValue = hashedValue64 - PG_INT32_MIN;
|
||||||
|
|
||||||
/* size of each hash range */
|
/* size of each hash range */
|
||||||
int64 hashRangeSize = HASH_TOKEN_COUNT / shardCount;
|
int64 hashRangeSize = HASH_TOKEN_COUNT / shardCount;
|
||||||
|
|
|
@ -250,7 +250,7 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
|
||||||
static ShardInterval **
|
static ShardInterval **
|
||||||
SyntheticShardIntervalArrayForShardMinValues(Datum *shardMinValues, int shardCount)
|
SyntheticShardIntervalArrayForShardMinValues(Datum *shardMinValues, int shardCount)
|
||||||
{
|
{
|
||||||
Datum nextShardMaxValue = Int32GetDatum(INT32_MAX);
|
Datum nextShardMaxValue = Int32GetDatum(PG_INT32_MAX);
|
||||||
ShardInterval **syntheticShardIntervalArray =
|
ShardInterval **syntheticShardIntervalArray =
|
||||||
palloc(sizeof(ShardInterval *) * shardCount);
|
palloc(sizeof(ShardInterval *) * shardCount);
|
||||||
|
|
||||||
|
@ -1240,7 +1240,6 @@ HashPartitionId(Datum partitionValue, Oid partitionCollation, const void *contex
|
||||||
FmgrInfo *comparisonFunction = hashPartitionContext->comparisonFunction;
|
FmgrInfo *comparisonFunction = hashPartitionContext->comparisonFunction;
|
||||||
Datum hashDatum = FunctionCall1Coll(hashFunction, DEFAULT_COLLATION_OID,
|
Datum hashDatum = FunctionCall1Coll(hashFunction, DEFAULT_COLLATION_OID,
|
||||||
partitionValue);
|
partitionValue);
|
||||||
int32 hashResult = 0;
|
|
||||||
uint32 hashPartitionId = 0;
|
uint32 hashPartitionId = 0;
|
||||||
|
|
||||||
if (hashDatum == 0)
|
if (hashDatum == 0)
|
||||||
|
@ -1250,10 +1249,8 @@ HashPartitionId(Datum partitionValue, Oid partitionCollation, const void *contex
|
||||||
|
|
||||||
if (hashPartitionContext->hasUniformHashDistribution)
|
if (hashPartitionContext->hasUniformHashDistribution)
|
||||||
{
|
{
|
||||||
uint64 hashTokenIncrement = HASH_TOKEN_COUNT / partitionCount;
|
int hashValue = DatumGetInt32(hashDatum);
|
||||||
|
hashPartitionId = CalculateUniformHashRangeIndex(hashValue, partitionCount);
|
||||||
hashResult = DatumGetInt32(hashDatum);
|
|
||||||
hashPartitionId = (uint32) (hashResult - INT32_MIN) / hashTokenIncrement;
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
@ -47,6 +47,7 @@ extern int CompareShardPlacementsByShardId(const void *leftElement,
|
||||||
extern int CompareRelationShards(const void *leftElement,
|
extern int CompareRelationShards(const void *leftElement,
|
||||||
const void *rightElement);
|
const void *rightElement);
|
||||||
extern int ShardIndex(ShardInterval *shardInterval);
|
extern int ShardIndex(ShardInterval *shardInterval);
|
||||||
|
extern int CalculateUniformHashRangeIndex(int hashedValue, int shardCount);
|
||||||
extern ShardInterval * FindShardInterval(Datum partitionColumnValue,
|
extern ShardInterval * FindShardInterval(Datum partitionColumnValue,
|
||||||
DistTableCacheEntry *cacheEntry);
|
DistTableCacheEntry *cacheEntry);
|
||||||
extern int FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry);
|
extern int FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry);
|
||||||
|
|
Loading…
Reference in New Issue