mirror of https://github.com/citusdata/citus.git
Add binary search capability to ShardIndex()
Renamed FindShardIntervalIndex() to ShardIndex() and added binary search capability. It used to assume that hash partition tables are always uniformly distributed which is not true if upcoming tenant isolation feature is applied. This commit also reduces code duplication.pull/1073/head
parent
29e5e3e715
commit
1ddc70ca55
|
@ -277,7 +277,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
|
||||||
uint64 sourceShardId = sourceShardInterval->shardId;
|
uint64 sourceShardId = sourceShardInterval->shardId;
|
||||||
uint64 newShardId = GetNextShardId();
|
uint64 newShardId = GetNextShardId();
|
||||||
ListCell *sourceShardPlacementCell = NULL;
|
ListCell *sourceShardPlacementCell = NULL;
|
||||||
int sourceShardIndex = FindShardIntervalIndex(sourceShardInterval);
|
int sourceShardIndex = ShardIndex(sourceShardInterval);
|
||||||
|
|
||||||
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
|
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
|
||||||
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
|
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
|
||||||
|
|
|
@ -307,7 +307,7 @@ CopyShardForeignConstraintCommandList(ShardInterval *shardInterval)
|
||||||
/* we will only use shardIndex if there is a foreign constraint */
|
/* we will only use shardIndex if there is a foreign constraint */
|
||||||
if (commandList != NIL)
|
if (commandList != NIL)
|
||||||
{
|
{
|
||||||
shardIndex = FindShardIntervalIndex(shardInterval);
|
shardIndex = ShardIndex(shardInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach(commandCell, commandList)
|
foreach(commandCell, commandList)
|
||||||
|
|
|
@ -153,7 +153,7 @@ find_shard_interval_index(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
uint32 shardId = PG_GETARG_UINT32(0);
|
uint32 shardId = PG_GETARG_UINT32(0);
|
||||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||||
uint32 shardIndex = FindShardIntervalIndex(shardInterval);
|
uint32 shardIndex = ShardIndex(shardInterval);
|
||||||
|
|
||||||
PG_RETURN_INT32(shardIndex);
|
PG_RETURN_INT32(shardIndex);
|
||||||
}
|
}
|
||||||
|
|
|
@ -348,7 +348,7 @@ ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShard
|
||||||
* partitioned table's shards.
|
* partitioned table's shards.
|
||||||
*
|
*
|
||||||
* We do min/max value check here to decide whether two shards are colocated,
|
* We do min/max value check here to decide whether two shards are colocated,
|
||||||
* instead we can simply use FindShardIntervalIndex function on both shards then
|
* instead we can simply use ShardIndex function on both shards then
|
||||||
* but do index check, but we avoid it because this way it is more cheaper.
|
* but do index check, but we avoid it because this way it is more cheaper.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
|
@ -852,10 +852,10 @@ ColocatedShardIntervalList(ShardInterval *shardInterval)
|
||||||
return colocatedShardList;
|
return colocatedShardList;
|
||||||
}
|
}
|
||||||
|
|
||||||
shardIntervalIndex = FindShardIntervalIndex(shardInterval);
|
shardIntervalIndex = ShardIndex(shardInterval);
|
||||||
colocatedTableList = ColocatedTableList(distributedTableId);
|
colocatedTableList = ColocatedTableList(distributedTableId);
|
||||||
|
|
||||||
/* FindShardIntervalIndex have to find index of given shard */
|
/* ShardIndex have to find index of given shard */
|
||||||
Assert(shardIntervalIndex >= 0);
|
Assert(shardIntervalIndex >= 0);
|
||||||
|
|
||||||
foreach(colocatedTableCell, colocatedTableList)
|
foreach(colocatedTableCell, colocatedTableList)
|
||||||
|
|
|
@ -23,10 +23,12 @@
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
|
|
||||||
|
|
||||||
static ShardInterval * SearchCachedShardInterval(Datum partitionColumnValue,
|
static int FindShardIntervalIndex(Datum searchedValue, ShardInterval **shardIntervalCache,
|
||||||
|
int shardCount, char partitionMethod,
|
||||||
|
FmgrInfo *compareFunction, bool useBinarySearch);
|
||||||
|
static int SearchCachedShardInterval(Datum partitionColumnValue,
|
||||||
ShardInterval **shardIntervalCache,
|
ShardInterval **shardIntervalCache,
|
||||||
int shardCount,
|
int shardCount, FmgrInfo *compareFunction);
|
||||||
FmgrInfo *compareFunction);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -171,7 +173,7 @@ CompareRelationShards(const void *leftElement, const void *rightElement)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FindShardIntervalIndex finds index of given shard in sorted shard interval array.
|
* ShardIndex finds the index of given shard in sorted shard interval array.
|
||||||
*
|
*
|
||||||
* For hash partitioned tables, it calculates hash value of a number in its
|
* For hash partitioned tables, it calculates hash value of a number in its
|
||||||
* range (e.g. min value) and finds which shard should contain the hashed
|
* range (e.g. min value) and finds which shard should contain the hashed
|
||||||
|
@ -179,15 +181,30 @@ CompareRelationShards(const void *leftElement, const void *rightElement)
|
||||||
* other than hash and reference, the function errors out.
|
* other than hash and reference, the function errors out.
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
FindShardIntervalIndex(ShardInterval *shardInterval)
|
ShardIndex(ShardInterval *shardInterval)
|
||||||
{
|
{
|
||||||
|
int shardIndex = INVALID_SHARD_INDEX;
|
||||||
Oid distributedTableId = shardInterval->relationId;
|
Oid distributedTableId = shardInterval->relationId;
|
||||||
|
Datum shardMinValue = shardInterval->minValue;
|
||||||
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
||||||
|
ShardInterval **shardIntervalCache = cacheEntry->sortedShardIntervalArray;
|
||||||
|
int shardCount = cacheEntry->shardIntervalArrayLength;
|
||||||
char partitionMethod = cacheEntry->partitionMethod;
|
char partitionMethod = cacheEntry->partitionMethod;
|
||||||
int shardCount = 0;
|
FmgrInfo *compareFunction = cacheEntry->shardIntervalCompareFunction;
|
||||||
int32 shardMinValue = 0;
|
bool hasUniformHashDistribution = cacheEntry->hasUniformHashDistribution;
|
||||||
uint64 hashTokenIncrement = 0;
|
bool useBinarySearch = false;
|
||||||
int shardIndex = -1;
|
|
||||||
|
/*
|
||||||
|
* Note that, we can also support append and range distributed tables, but
|
||||||
|
* currently it is not required.
|
||||||
|
*/
|
||||||
|
if (partitionMethod != DISTRIBUTE_BY_HASH && partitionMethod != DISTRIBUTE_BY_NONE)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("finding index of a given shard is only supported for "
|
||||||
|
"hash distributed and reference tables")));
|
||||||
|
}
|
||||||
|
|
||||||
/* short-circuit for reference tables */
|
/* short-circuit for reference tables */
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
|
@ -198,34 +215,15 @@ FindShardIntervalIndex(ShardInterval *shardInterval)
|
||||||
return shardIndex;
|
return shardIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/* determine whether to use binary search */
|
||||||
* We can support it for other types of partitioned tables with simple binary scan
|
if (partitionMethod != DISTRIBUTE_BY_HASH || !hasUniformHashDistribution)
|
||||||
* but it is not necessary at the moment. If we need that simply check algorithm in
|
|
||||||
* FindShardInterval and SearchCachedShardInterval.
|
|
||||||
*/
|
|
||||||
if (partitionMethod != DISTRIBUTE_BY_HASH)
|
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
useBinarySearch = true;
|
||||||
errmsg("finding index of given shard is not supported for "
|
|
||||||
"non-hash partitioned tables")));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
shardCount = cacheEntry->shardIntervalArrayLength;
|
shardIndex = FindShardIntervalIndex(shardMinValue, shardIntervalCache,
|
||||||
shardMinValue = DatumGetInt32(shardInterval->minValue);
|
shardCount, partitionMethod,
|
||||||
hashTokenIncrement = HASH_TOKEN_COUNT / shardCount;
|
compareFunction, useBinarySearch);
|
||||||
shardIndex = (uint32) (shardMinValue - INT32_MIN) / hashTokenIncrement;
|
|
||||||
|
|
||||||
Assert(shardIndex <= shardCount);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If the shard count is not power of 2, the range of the last
|
|
||||||
* shard becomes larger than others. For that extra piece of range,
|
|
||||||
* we still need to use the last shard.
|
|
||||||
*/
|
|
||||||
if (shardIndex == shardCount)
|
|
||||||
{
|
|
||||||
shardIndex = shardCount - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return shardIndex;
|
return shardIndex;
|
||||||
}
|
}
|
||||||
|
@ -242,25 +240,69 @@ FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache
|
||||||
int shardCount, char partitionMethod, FmgrInfo *compareFunction,
|
int shardCount, char partitionMethod, FmgrInfo *compareFunction,
|
||||||
FmgrInfo *hashFunction, bool useBinarySearch)
|
FmgrInfo *hashFunction, bool useBinarySearch)
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = NULL;
|
Datum searchedValue = partitionColumnValue;
|
||||||
|
int shardIndex = INVALID_SHARD_INDEX;
|
||||||
|
|
||||||
|
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
||||||
|
{
|
||||||
|
searchedValue = FunctionCall1(hashFunction, partitionColumnValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
shardIndex = FindShardIntervalIndex(searchedValue, shardIntervalCache,
|
||||||
|
shardCount, partitionMethod,
|
||||||
|
compareFunction, useBinarySearch);
|
||||||
|
|
||||||
|
if (shardIndex == INVALID_SHARD_INDEX)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return shardIntervalCache[shardIndex];
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FindShardIntervalIndex finds the index of the shard interval which covers
|
||||||
|
* the searched value. Note that the searched value must be the hashed value
|
||||||
|
* of the original value if the distribution method is hash.
|
||||||
|
*
|
||||||
|
* Note that, if the searched value can not be found for hash partitioned tables,
|
||||||
|
* we error out. This should only happen if something is terribly wrong, either
|
||||||
|
* metadata tables are corrupted or we have a bug somewhere. Such as a hash
|
||||||
|
* function which returns a value not in the range of [INT32_MIN, INT32_MAX] can
|
||||||
|
* fire this.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
FindShardIntervalIndex(Datum searchedValue, ShardInterval **shardIntervalCache,
|
||||||
|
int shardCount, char partitionMethod, FmgrInfo *compareFunction,
|
||||||
|
bool useBinarySearch)
|
||||||
|
{
|
||||||
|
int shardIndex = INVALID_SHARD_INDEX;
|
||||||
|
|
||||||
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
||||||
{
|
{
|
||||||
int hashedValue = DatumGetInt32(FunctionCall1(hashFunction,
|
|
||||||
partitionColumnValue));
|
|
||||||
if (useBinarySearch)
|
if (useBinarySearch)
|
||||||
{
|
{
|
||||||
Assert(compareFunction != NULL);
|
Assert(compareFunction != NULL);
|
||||||
|
|
||||||
shardInterval = SearchCachedShardInterval(Int32GetDatum(hashedValue),
|
shardIndex = SearchCachedShardInterval(searchedValue, shardIntervalCache,
|
||||||
shardIntervalCache, shardCount,
|
shardCount, compareFunction);
|
||||||
compareFunction);
|
|
||||||
|
/* we should always return a valid shard index for hash partitioned tables */
|
||||||
|
if (shardIndex == INVALID_SHARD_INDEX)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
|
||||||
|
errmsg("cannot find shard interval"),
|
||||||
|
errdetail("Hash of the partition column value "
|
||||||
|
"does not fall into any shards.")));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
int hashedValue = DatumGetInt32(searchedValue);
|
||||||
uint64 hashTokenIncrement = HASH_TOKEN_COUNT / shardCount;
|
uint64 hashTokenIncrement = HASH_TOKEN_COUNT / shardCount;
|
||||||
int shardIndex = (uint32) (hashedValue - INT32_MIN) / hashTokenIncrement;
|
|
||||||
|
|
||||||
|
shardIndex = (uint32) (hashedValue - INT32_MIN) / hashTokenIncrement;
|
||||||
Assert(shardIndex <= shardCount);
|
Assert(shardIndex <= shardCount);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -272,36 +314,34 @@ FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache
|
||||||
{
|
{
|
||||||
shardIndex = shardCount - 1;
|
shardIndex = shardCount - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
shardInterval = shardIntervalCache[shardIndex];
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (partitionMethod == DISTRIBUTE_BY_NONE)
|
else if (partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
int shardIndex = 0;
|
|
||||||
|
|
||||||
/* reference tables has a single shard, all values mapped to that shard */
|
/* reference tables has a single shard, all values mapped to that shard */
|
||||||
Assert(shardCount == 1);
|
Assert(shardCount == 1);
|
||||||
shardInterval = shardIntervalCache[shardIndex];
|
|
||||||
|
shardIndex = 0;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
Assert(compareFunction != NULL);
|
Assert(compareFunction != NULL);
|
||||||
|
|
||||||
shardInterval = SearchCachedShardInterval(partitionColumnValue,
|
shardIndex = SearchCachedShardInterval(searchedValue, shardIntervalCache,
|
||||||
shardIntervalCache, shardCount,
|
shardCount, compareFunction);
|
||||||
compareFunction);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return shardInterval;
|
return shardIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SearchCachedShardInterval performs a binary search for a shard interval matching a
|
* SearchCachedShardInterval performs a binary search for a shard interval
|
||||||
* given partition column value and returns it.
|
* matching a given partition column value and returns it's index in the cached
|
||||||
|
* array. If it can not find any shard interval with the given value, it returns
|
||||||
|
* INVALID_SHARD_INDEX.
|
||||||
*/
|
*/
|
||||||
static ShardInterval *
|
static int
|
||||||
SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache,
|
SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache,
|
||||||
int shardCount, FmgrInfo *compareFunction)
|
int shardCount, FmgrInfo *compareFunction)
|
||||||
{
|
{
|
||||||
|
@ -332,13 +372,13 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter
|
||||||
|
|
||||||
if (DatumGetInt32(maxValueComparison) <= 0)
|
if (DatumGetInt32(maxValueComparison) <= 0)
|
||||||
{
|
{
|
||||||
return shardIntervalCache[middleIndex];
|
return middleIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
lowerBoundIndex = middleIndex + 1;
|
lowerBoundIndex = middleIndex + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return INVALID_SHARD_INDEX;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,8 @@
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "nodes/primnodes.h"
|
#include "nodes/primnodes.h"
|
||||||
|
|
||||||
|
#define INVALID_SHARD_INDEX -1
|
||||||
|
|
||||||
/* OperatorCacheEntry contains information for each element in OperatorCache */
|
/* OperatorCacheEntry contains information for each element in OperatorCache */
|
||||||
typedef struct ShardIntervalCompareFunctionCacheEntry
|
typedef struct ShardIntervalCompareFunctionCacheEntry
|
||||||
{
|
{
|
||||||
|
@ -29,7 +31,7 @@ extern int CompareShardIntervals(const void *leftElement, const void *rightEleme
|
||||||
extern int CompareShardIntervalsById(const void *leftElement, const void *rightElement);
|
extern int CompareShardIntervalsById(const void *leftElement, const void *rightElement);
|
||||||
extern int CompareRelationShards(const void *leftElement,
|
extern int CompareRelationShards(const void *leftElement,
|
||||||
const void *rightElement);
|
const void *rightElement);
|
||||||
extern int FindShardIntervalIndex(ShardInterval *shardInterval);
|
extern int ShardIndex(ShardInterval *shardInterval);
|
||||||
extern ShardInterval * FindShardInterval(Datum partitionColumnValue,
|
extern ShardInterval * FindShardInterval(Datum partitionColumnValue,
|
||||||
ShardInterval **shardIntervalCache,
|
ShardInterval **shardIntervalCache,
|
||||||
int shardCount, char partitionMethod,
|
int shardCount, char partitionMethod,
|
||||||
|
|
|
@ -107,9 +107,8 @@ INSERT INTO limit_orders VALUES (32744, 'AAPL', 9580, '2004-10-19 10:23:54', 'bu
|
||||||
-- try a single-row INSERT with no shard to receive it
|
-- try a single-row INSERT with no shard to receive it
|
||||||
INSERT INTO insufficient_shards VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
|
INSERT INTO insufficient_shards VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
|
||||||
20.69);
|
20.69);
|
||||||
ERROR: distributed modifications must target exactly one shard
|
ERROR: cannot find shard interval
|
||||||
DETAIL: This command modifies no shards.
|
DETAIL: Hash of the partition column value does not fall into any shards.
|
||||||
HINT: Make sure the value for partition column "id" falls into a single shard.
|
|
||||||
-- try an insert to a range-partitioned table
|
-- try an insert to a range-partitioned table
|
||||||
INSERT INTO range_partitioned VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
|
INSERT INTO range_partitioned VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
|
||||||
20.69);
|
20.69);
|
||||||
|
|
Loading…
Reference in New Issue