mirror of https://github.com/citusdata/citus.git
Merge pull request #1073 from citusdata/refactor_shard_index
Add binary search capability to ShardIndex()pull/1063/head
commit
7f10d8562b
|
@ -277,7 +277,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
|
||||||
uint64 sourceShardId = sourceShardInterval->shardId;
|
uint64 sourceShardId = sourceShardInterval->shardId;
|
||||||
uint64 newShardId = GetNextShardId();
|
uint64 newShardId = GetNextShardId();
|
||||||
ListCell *sourceShardPlacementCell = NULL;
|
ListCell *sourceShardPlacementCell = NULL;
|
||||||
int sourceShardIndex = FindShardIntervalIndex(sourceShardInterval);
|
int sourceShardIndex = ShardIndex(sourceShardInterval);
|
||||||
|
|
||||||
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
|
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
|
||||||
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
|
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
|
||||||
|
|
|
@ -307,7 +307,7 @@ CopyShardForeignConstraintCommandList(ShardInterval *shardInterval)
|
||||||
/* we will only use shardIndex if there is a foreign constraint */
|
/* we will only use shardIndex if there is a foreign constraint */
|
||||||
if (commandList != NIL)
|
if (commandList != NIL)
|
||||||
{
|
{
|
||||||
shardIndex = FindShardIntervalIndex(shardInterval);
|
shardIndex = ShardIndex(shardInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach(commandCell, commandList)
|
foreach(commandCell, commandList)
|
||||||
|
|
|
@ -153,7 +153,7 @@ find_shard_interval_index(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
uint32 shardId = PG_GETARG_UINT32(0);
|
uint32 shardId = PG_GETARG_UINT32(0);
|
||||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||||
uint32 shardIndex = FindShardIntervalIndex(shardInterval);
|
uint32 shardIndex = ShardIndex(shardInterval);
|
||||||
|
|
||||||
PG_RETURN_INT32(shardIndex);
|
PG_RETURN_INT32(shardIndex);
|
||||||
}
|
}
|
||||||
|
|
|
@ -348,7 +348,7 @@ ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShard
|
||||||
* partitioned table's shards.
|
* partitioned table's shards.
|
||||||
*
|
*
|
||||||
* We do min/max value check here to decide whether two shards are colocated,
|
* We do min/max value check here to decide whether two shards are colocated,
|
||||||
* instead we can simply use FindShardIntervalIndex function on both shards then
|
* instead we can simply use ShardIndex function on both shards then
|
||||||
* but do index check, but we avoid it because this way it is more cheaper.
|
* but do index check, but we avoid it because this way it is more cheaper.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
|
@ -852,10 +852,10 @@ ColocatedShardIntervalList(ShardInterval *shardInterval)
|
||||||
return colocatedShardList;
|
return colocatedShardList;
|
||||||
}
|
}
|
||||||
|
|
||||||
shardIntervalIndex = FindShardIntervalIndex(shardInterval);
|
shardIntervalIndex = ShardIndex(shardInterval);
|
||||||
colocatedTableList = ColocatedTableList(distributedTableId);
|
colocatedTableList = ColocatedTableList(distributedTableId);
|
||||||
|
|
||||||
/* FindShardIntervalIndex have to find index of given shard */
|
/* ShardIndex have to find index of given shard */
|
||||||
Assert(shardIntervalIndex >= 0);
|
Assert(shardIntervalIndex >= 0);
|
||||||
|
|
||||||
foreach(colocatedTableCell, colocatedTableList)
|
foreach(colocatedTableCell, colocatedTableList)
|
||||||
|
|
|
@ -23,10 +23,12 @@
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
|
|
||||||
|
|
||||||
static ShardInterval * SearchCachedShardInterval(Datum partitionColumnValue,
|
static int FindShardIntervalIndex(Datum searchedValue, ShardInterval **shardIntervalCache,
|
||||||
ShardInterval **shardIntervalCache,
|
int shardCount, char partitionMethod,
|
||||||
int shardCount,
|
FmgrInfo *compareFunction, bool useBinarySearch);
|
||||||
FmgrInfo *compareFunction);
|
static int SearchCachedShardInterval(Datum partitionColumnValue,
|
||||||
|
ShardInterval **shardIntervalCache,
|
||||||
|
int shardCount, FmgrInfo *compareFunction);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -171,7 +173,7 @@ CompareRelationShards(const void *leftElement, const void *rightElement)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FindShardIntervalIndex finds index of given shard in sorted shard interval array.
|
* ShardIndex finds the index of given shard in sorted shard interval array.
|
||||||
*
|
*
|
||||||
* For hash partitioned tables, it calculates hash value of a number in its
|
* For hash partitioned tables, it calculates hash value of a number in its
|
||||||
* range (e.g. min value) and finds which shard should contain the hashed
|
* range (e.g. min value) and finds which shard should contain the hashed
|
||||||
|
@ -179,15 +181,30 @@ CompareRelationShards(const void *leftElement, const void *rightElement)
|
||||||
* other than hash and reference, the function errors out.
|
* other than hash and reference, the function errors out.
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
FindShardIntervalIndex(ShardInterval *shardInterval)
|
ShardIndex(ShardInterval *shardInterval)
|
||||||
{
|
{
|
||||||
|
int shardIndex = INVALID_SHARD_INDEX;
|
||||||
Oid distributedTableId = shardInterval->relationId;
|
Oid distributedTableId = shardInterval->relationId;
|
||||||
|
Datum shardMinValue = shardInterval->minValue;
|
||||||
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
||||||
|
ShardInterval **shardIntervalCache = cacheEntry->sortedShardIntervalArray;
|
||||||
|
int shardCount = cacheEntry->shardIntervalArrayLength;
|
||||||
char partitionMethod = cacheEntry->partitionMethod;
|
char partitionMethod = cacheEntry->partitionMethod;
|
||||||
int shardCount = 0;
|
FmgrInfo *compareFunction = cacheEntry->shardIntervalCompareFunction;
|
||||||
int32 shardMinValue = 0;
|
bool hasUniformHashDistribution = cacheEntry->hasUniformHashDistribution;
|
||||||
uint64 hashTokenIncrement = 0;
|
bool useBinarySearch = false;
|
||||||
int shardIndex = -1;
|
|
||||||
|
/*
|
||||||
|
* Note that, we can also support append and range distributed tables, but
|
||||||
|
* currently it is not required.
|
||||||
|
*/
|
||||||
|
if (partitionMethod != DISTRIBUTE_BY_HASH && partitionMethod != DISTRIBUTE_BY_NONE)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("finding index of a given shard is only supported for "
|
||||||
|
"hash distributed and reference tables")));
|
||||||
|
}
|
||||||
|
|
||||||
/* short-circuit for reference tables */
|
/* short-circuit for reference tables */
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
|
@ -198,34 +215,15 @@ FindShardIntervalIndex(ShardInterval *shardInterval)
|
||||||
return shardIndex;
|
return shardIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/* determine whether to use binary search */
|
||||||
* We can support it for other types of partitioned tables with simple binary scan
|
if (partitionMethod != DISTRIBUTE_BY_HASH || !hasUniformHashDistribution)
|
||||||
* but it is not necessary at the moment. If we need that simply check algorithm in
|
|
||||||
* FindShardInterval and SearchCachedShardInterval.
|
|
||||||
*/
|
|
||||||
if (partitionMethod != DISTRIBUTE_BY_HASH)
|
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
useBinarySearch = true;
|
||||||
errmsg("finding index of given shard is not supported for "
|
|
||||||
"non-hash partitioned tables")));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
shardCount = cacheEntry->shardIntervalArrayLength;
|
shardIndex = FindShardIntervalIndex(shardMinValue, shardIntervalCache,
|
||||||
shardMinValue = DatumGetInt32(shardInterval->minValue);
|
shardCount, partitionMethod,
|
||||||
hashTokenIncrement = HASH_TOKEN_COUNT / shardCount;
|
compareFunction, useBinarySearch);
|
||||||
shardIndex = (uint32) (shardMinValue - INT32_MIN) / hashTokenIncrement;
|
|
||||||
|
|
||||||
Assert(shardIndex <= shardCount);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If the shard count is not power of 2, the range of the last
|
|
||||||
* shard becomes larger than others. For that extra piece of range,
|
|
||||||
* we still need to use the last shard.
|
|
||||||
*/
|
|
||||||
if (shardIndex == shardCount)
|
|
||||||
{
|
|
||||||
shardIndex = shardCount - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return shardIndex;
|
return shardIndex;
|
||||||
}
|
}
|
||||||
|
@ -242,25 +240,69 @@ FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache
|
||||||
int shardCount, char partitionMethod, FmgrInfo *compareFunction,
|
int shardCount, char partitionMethod, FmgrInfo *compareFunction,
|
||||||
FmgrInfo *hashFunction, bool useBinarySearch)
|
FmgrInfo *hashFunction, bool useBinarySearch)
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = NULL;
|
Datum searchedValue = partitionColumnValue;
|
||||||
|
int shardIndex = INVALID_SHARD_INDEX;
|
||||||
|
|
||||||
|
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
||||||
|
{
|
||||||
|
searchedValue = FunctionCall1(hashFunction, partitionColumnValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
shardIndex = FindShardIntervalIndex(searchedValue, shardIntervalCache,
|
||||||
|
shardCount, partitionMethod,
|
||||||
|
compareFunction, useBinarySearch);
|
||||||
|
|
||||||
|
if (shardIndex == INVALID_SHARD_INDEX)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return shardIntervalCache[shardIndex];
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FindShardIntervalIndex finds the index of the shard interval which covers
|
||||||
|
* the searched value. Note that the searched value must be the hashed value
|
||||||
|
* of the original value if the distribution method is hash.
|
||||||
|
*
|
||||||
|
* Note that, if the searched value can not be found for hash partitioned tables,
|
||||||
|
* we error out. This should only happen if something is terribly wrong, either
|
||||||
|
* metadata tables are corrupted or we have a bug somewhere. Such as a hash
|
||||||
|
* function which returns a value not in the range of [INT32_MIN, INT32_MAX] can
|
||||||
|
* fire this.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
FindShardIntervalIndex(Datum searchedValue, ShardInterval **shardIntervalCache,
|
||||||
|
int shardCount, char partitionMethod, FmgrInfo *compareFunction,
|
||||||
|
bool useBinarySearch)
|
||||||
|
{
|
||||||
|
int shardIndex = INVALID_SHARD_INDEX;
|
||||||
|
|
||||||
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
||||||
{
|
{
|
||||||
int hashedValue = DatumGetInt32(FunctionCall1(hashFunction,
|
|
||||||
partitionColumnValue));
|
|
||||||
if (useBinarySearch)
|
if (useBinarySearch)
|
||||||
{
|
{
|
||||||
Assert(compareFunction != NULL);
|
Assert(compareFunction != NULL);
|
||||||
|
|
||||||
shardInterval = SearchCachedShardInterval(Int32GetDatum(hashedValue),
|
shardIndex = SearchCachedShardInterval(searchedValue, shardIntervalCache,
|
||||||
shardIntervalCache, shardCount,
|
shardCount, compareFunction);
|
||||||
compareFunction);
|
|
||||||
|
/* we should always return a valid shard index for hash partitioned tables */
|
||||||
|
if (shardIndex == INVALID_SHARD_INDEX)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
|
||||||
|
errmsg("cannot find shard interval"),
|
||||||
|
errdetail("Hash of the partition column value "
|
||||||
|
"does not fall into any shards.")));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
int hashedValue = DatumGetInt32(searchedValue);
|
||||||
uint64 hashTokenIncrement = HASH_TOKEN_COUNT / shardCount;
|
uint64 hashTokenIncrement = HASH_TOKEN_COUNT / shardCount;
|
||||||
int shardIndex = (uint32) (hashedValue - INT32_MIN) / hashTokenIncrement;
|
|
||||||
|
|
||||||
|
shardIndex = (uint32) (hashedValue - INT32_MIN) / hashTokenIncrement;
|
||||||
Assert(shardIndex <= shardCount);
|
Assert(shardIndex <= shardCount);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -272,36 +314,34 @@ FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache
|
||||||
{
|
{
|
||||||
shardIndex = shardCount - 1;
|
shardIndex = shardCount - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
shardInterval = shardIntervalCache[shardIndex];
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (partitionMethod == DISTRIBUTE_BY_NONE)
|
else if (partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
int shardIndex = 0;
|
|
||||||
|
|
||||||
/* reference tables has a single shard, all values mapped to that shard */
|
/* reference tables has a single shard, all values mapped to that shard */
|
||||||
Assert(shardCount == 1);
|
Assert(shardCount == 1);
|
||||||
shardInterval = shardIntervalCache[shardIndex];
|
|
||||||
|
shardIndex = 0;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
Assert(compareFunction != NULL);
|
Assert(compareFunction != NULL);
|
||||||
|
|
||||||
shardInterval = SearchCachedShardInterval(partitionColumnValue,
|
shardIndex = SearchCachedShardInterval(searchedValue, shardIntervalCache,
|
||||||
shardIntervalCache, shardCount,
|
shardCount, compareFunction);
|
||||||
compareFunction);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return shardInterval;
|
return shardIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SearchCachedShardInterval performs a binary search for a shard interval matching a
|
* SearchCachedShardInterval performs a binary search for a shard interval
|
||||||
* given partition column value and returns it.
|
* matching a given partition column value and returns it's index in the cached
|
||||||
|
* array. If it can not find any shard interval with the given value, it returns
|
||||||
|
* INVALID_SHARD_INDEX.
|
||||||
*/
|
*/
|
||||||
static ShardInterval *
|
static int
|
||||||
SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache,
|
SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache,
|
||||||
int shardCount, FmgrInfo *compareFunction)
|
int shardCount, FmgrInfo *compareFunction)
|
||||||
{
|
{
|
||||||
|
@ -332,13 +372,13 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter
|
||||||
|
|
||||||
if (DatumGetInt32(maxValueComparison) <= 0)
|
if (DatumGetInt32(maxValueComparison) <= 0)
|
||||||
{
|
{
|
||||||
return shardIntervalCache[middleIndex];
|
return middleIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
lowerBoundIndex = middleIndex + 1;
|
lowerBoundIndex = middleIndex + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return INVALID_SHARD_INDEX;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,8 @@
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "nodes/primnodes.h"
|
#include "nodes/primnodes.h"
|
||||||
|
|
||||||
|
#define INVALID_SHARD_INDEX -1
|
||||||
|
|
||||||
/* OperatorCacheEntry contains information for each element in OperatorCache */
|
/* OperatorCacheEntry contains information for each element in OperatorCache */
|
||||||
typedef struct ShardIntervalCompareFunctionCacheEntry
|
typedef struct ShardIntervalCompareFunctionCacheEntry
|
||||||
{
|
{
|
||||||
|
@ -29,7 +31,7 @@ extern int CompareShardIntervals(const void *leftElement, const void *rightEleme
|
||||||
extern int CompareShardIntervalsById(const void *leftElement, const void *rightElement);
|
extern int CompareShardIntervalsById(const void *leftElement, const void *rightElement);
|
||||||
extern int CompareRelationShards(const void *leftElement,
|
extern int CompareRelationShards(const void *leftElement,
|
||||||
const void *rightElement);
|
const void *rightElement);
|
||||||
extern int FindShardIntervalIndex(ShardInterval *shardInterval);
|
extern int ShardIndex(ShardInterval *shardInterval);
|
||||||
extern ShardInterval * FindShardInterval(Datum partitionColumnValue,
|
extern ShardInterval * FindShardInterval(Datum partitionColumnValue,
|
||||||
ShardInterval **shardIntervalCache,
|
ShardInterval **shardIntervalCache,
|
||||||
int shardCount, char partitionMethod,
|
int shardCount, char partitionMethod,
|
||||||
|
|
|
@ -107,9 +107,8 @@ INSERT INTO limit_orders VALUES (32744, 'AAPL', 9580, '2004-10-19 10:23:54', 'bu
|
||||||
-- try a single-row INSERT with no shard to receive it
|
-- try a single-row INSERT with no shard to receive it
|
||||||
INSERT INTO insufficient_shards VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
|
INSERT INTO insufficient_shards VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
|
||||||
20.69);
|
20.69);
|
||||||
ERROR: distributed modifications must target exactly one shard
|
ERROR: cannot find shard interval
|
||||||
DETAIL: This command modifies no shards.
|
DETAIL: Hash of the partition column value does not fall into any shards.
|
||||||
HINT: Make sure the value for partition column "id" falls into a single shard.
|
|
||||||
-- try an insert to a range-partitioned table
|
-- try an insert to a range-partitioned table
|
||||||
INSERT INTO range_partitioned VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
|
INSERT INTO range_partitioned VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
|
||||||
20.69);
|
20.69);
|
||||||
|
|
Loading…
Reference in New Issue