From 1ddc70ca5572e451516b0994fea60967546b526d Mon Sep 17 00:00:00 2001 From: Metin Doslu Date: Thu, 29 Dec 2016 14:43:01 +0200 Subject: [PATCH] Add binary search capability to ShardIndex() Renamed FindShardIntervalIndex() to ShardIndex() and added binary search capability. It used to assume that hash partition tables are always uniformly distributed which is not true if upcoming tenant isolation feature is applied. This commit also reduces code duplication. --- .../distributed/master/master_create_shards.c | 2 +- .../distributed/master/master_repair_shards.c | 2 +- .../distributed/test/colocation_utils.c | 2 +- .../distributed/utils/colocation_utils.c | 6 +- .../distributed/utils/shardinterval_utils.c | 152 +++++++++++------- src/include/distributed/shardinterval_utils.h | 4 +- .../regress/expected/multi_modifications.out | 5 +- 7 files changed, 107 insertions(+), 66 deletions(-) diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 2a3203a61..db7db578b 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -277,7 +277,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId) uint64 sourceShardId = sourceShardInterval->shardId; uint64 newShardId = GetNextShardId(); ListCell *sourceShardPlacementCell = NULL; - int sourceShardIndex = FindShardIntervalIndex(sourceShardInterval); + int sourceShardIndex = ShardIndex(sourceShardInterval); int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue); int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue); diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index ca7a8e003..549c895eb 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -307,7 +307,7 @@ CopyShardForeignConstraintCommandList(ShardInterval *shardInterval) /* we will only use shardIndex if there is a foreign constraint */ if (commandList != NIL) { - shardIndex = FindShardIntervalIndex(shardInterval); + shardIndex = ShardIndex(shardInterval); } foreach(commandCell, commandList) diff --git a/src/backend/distributed/test/colocation_utils.c b/src/backend/distributed/test/colocation_utils.c index ad83d5304..416522b1d 100644 --- a/src/backend/distributed/test/colocation_utils.c +++ b/src/backend/distributed/test/colocation_utils.c @@ -153,7 +153,7 @@ find_shard_interval_index(PG_FUNCTION_ARGS) { uint32 shardId = PG_GETARG_UINT32(0); ShardInterval *shardInterval = LoadShardInterval(shardId); - uint32 shardIndex = FindShardIntervalIndex(shardInterval); + uint32 shardIndex = ShardIndex(shardInterval); PG_RETURN_INT32(shardIndex); } diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 4a2035dcd..82b25040a 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -348,7 +348,7 @@ ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShard * partitioned table's shards. * * We do min/max value check here to decide whether two shards are colocated, - * instead we can simply use FindShardIntervalIndex function on both shards then + * instead we can simply use ShardIndex function on both shards then * but do index check, but we avoid it because this way it is more cheaper. */ static bool @@ -852,10 +852,10 @@ ColocatedShardIntervalList(ShardInterval *shardInterval) return colocatedShardList; } - shardIntervalIndex = FindShardIntervalIndex(shardInterval); + shardIntervalIndex = ShardIndex(shardInterval); colocatedTableList = ColocatedTableList(distributedTableId); - /* FindShardIntervalIndex have to find index of given shard */ + /* ShardIndex have to find index of given shard */ Assert(shardIntervalIndex >= 0); foreach(colocatedTableCell, colocatedTableList) diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index a28749483..f38f218c9 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -23,10 +23,12 @@ #include "utils/memutils.h" -static ShardInterval * SearchCachedShardInterval(Datum partitionColumnValue, - ShardInterval **shardIntervalCache, - int shardCount, - FmgrInfo *compareFunction); +static int FindShardIntervalIndex(Datum searchedValue, ShardInterval **shardIntervalCache, + int shardCount, char partitionMethod, + FmgrInfo *compareFunction, bool useBinarySearch); +static int SearchCachedShardInterval(Datum partitionColumnValue, + ShardInterval **shardIntervalCache, + int shardCount, FmgrInfo *compareFunction); /* @@ -171,7 +173,7 @@ CompareRelationShards(const void *leftElement, const void *rightElement) /* - * FindShardIntervalIndex finds index of given shard in sorted shard interval array. + * ShardIndex finds the index of given shard in sorted shard interval array. * * For hash partitioned tables, it calculates hash value of a number in its * range (e.g. min value) and finds which shard should contain the hashed @@ -179,15 +181,30 @@ CompareRelationShards(const void *leftElement, const void *rightElement) * other than hash and reference, the function errors out. */ int -FindShardIntervalIndex(ShardInterval *shardInterval) +ShardIndex(ShardInterval *shardInterval) { + int shardIndex = INVALID_SHARD_INDEX; Oid distributedTableId = shardInterval->relationId; + Datum shardMinValue = shardInterval->minValue; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + ShardInterval **shardIntervalCache = cacheEntry->sortedShardIntervalArray; + int shardCount = cacheEntry->shardIntervalArrayLength; char partitionMethod = cacheEntry->partitionMethod; - int shardCount = 0; - int32 shardMinValue = 0; - uint64 hashTokenIncrement = 0; - int shardIndex = -1; + FmgrInfo *compareFunction = cacheEntry->shardIntervalCompareFunction; + bool hasUniformHashDistribution = cacheEntry->hasUniformHashDistribution; + bool useBinarySearch = false; + + /* + * Note that, we can also support append and range distributed tables, but + * currently it is not required. + */ + if (partitionMethod != DISTRIBUTE_BY_HASH && partitionMethod != DISTRIBUTE_BY_NONE) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("finding index of a given shard is only supported for " + "hash distributed and reference tables"))); + } /* short-circuit for reference tables */ if (partitionMethod == DISTRIBUTE_BY_NONE) @@ -198,34 +215,15 @@ FindShardIntervalIndex(ShardInterval *shardInterval) return shardIndex; } - /* - * We can support it for other types of partitioned tables with simple binary scan - * but it is not necessary at the moment. If we need that simply check algorithm in - * FindShardInterval and SearchCachedShardInterval. - */ - if (partitionMethod != DISTRIBUTE_BY_HASH) + /* determine whether to use binary search */ + if (partitionMethod != DISTRIBUTE_BY_HASH || !hasUniformHashDistribution) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("finding index of given shard is not supported for " - "non-hash partitioned tables"))); + useBinarySearch = true; } - shardCount = cacheEntry->shardIntervalArrayLength; - shardMinValue = DatumGetInt32(shardInterval->minValue); - hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; - shardIndex = (uint32) (shardMinValue - INT32_MIN) / hashTokenIncrement; - - Assert(shardIndex <= shardCount); - - /* - * If the shard count is not power of 2, the range of the last - * shard becomes larger than others. For that extra piece of range, - * we still need to use the last shard. - */ - if (shardIndex == shardCount) - { - shardIndex = shardCount - 1; - } + shardIndex = FindShardIntervalIndex(shardMinValue, shardIntervalCache, + shardCount, partitionMethod, + compareFunction, useBinarySearch); return shardIndex; } @@ -242,25 +240,69 @@ FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache int shardCount, char partitionMethod, FmgrInfo *compareFunction, FmgrInfo *hashFunction, bool useBinarySearch) { - ShardInterval *shardInterval = NULL; + Datum searchedValue = partitionColumnValue; + int shardIndex = INVALID_SHARD_INDEX; + + if (partitionMethod == DISTRIBUTE_BY_HASH) + { + searchedValue = FunctionCall1(hashFunction, partitionColumnValue); + } + + shardIndex = FindShardIntervalIndex(searchedValue, shardIntervalCache, + shardCount, partitionMethod, + compareFunction, useBinarySearch); + + if (shardIndex == INVALID_SHARD_INDEX) + { + return NULL; + } + + return shardIntervalCache[shardIndex]; +} + + +/* + * FindShardIntervalIndex finds the index of the shard interval which covers + * the searched value. Note that the searched value must be the hashed value + * of the original value if the distribution method is hash. + * + * Note that, if the searched value can not be found for hash partitioned tables, + * we error out. This should only happen if something is terribly wrong, either + * metadata tables are corrupted or we have a bug somewhere. Such as a hash + * function which returns a value not in the range of [INT32_MIN, INT32_MAX] can + * fire this. + */ +static int +FindShardIntervalIndex(Datum searchedValue, ShardInterval **shardIntervalCache, + int shardCount, char partitionMethod, FmgrInfo *compareFunction, + bool useBinarySearch) +{ + int shardIndex = INVALID_SHARD_INDEX; if (partitionMethod == DISTRIBUTE_BY_HASH) { - int hashedValue = DatumGetInt32(FunctionCall1(hashFunction, - partitionColumnValue)); if (useBinarySearch) { Assert(compareFunction != NULL); - shardInterval = SearchCachedShardInterval(Int32GetDatum(hashedValue), - shardIntervalCache, shardCount, - compareFunction); + shardIndex = SearchCachedShardInterval(searchedValue, shardIntervalCache, + shardCount, compareFunction); + + /* we should always return a valid shard index for hash partitioned tables */ + if (shardIndex == INVALID_SHARD_INDEX) + { + ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION), + errmsg("cannot find shard interval"), + errdetail("Hash of the partition column value " + "does not fall into any shards."))); + } } else { + int hashedValue = DatumGetInt32(searchedValue); uint64 hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; - int shardIndex = (uint32) (hashedValue - INT32_MIN) / hashTokenIncrement; + shardIndex = (uint32) (hashedValue - INT32_MIN) / hashTokenIncrement; Assert(shardIndex <= shardCount); /* @@ -272,36 +314,34 @@ FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache { shardIndex = shardCount - 1; } - - shardInterval = shardIntervalCache[shardIndex]; } } else if (partitionMethod == DISTRIBUTE_BY_NONE) { - int shardIndex = 0; - /* reference tables has a single shard, all values mapped to that shard */ Assert(shardCount == 1); - shardInterval = shardIntervalCache[shardIndex]; + + shardIndex = 0; } else { Assert(compareFunction != NULL); - shardInterval = SearchCachedShardInterval(partitionColumnValue, - shardIntervalCache, shardCount, - compareFunction); + shardIndex = SearchCachedShardInterval(searchedValue, shardIntervalCache, + shardCount, compareFunction); } - return shardInterval; + return shardIndex; } /* - * SearchCachedShardInterval performs a binary search for a shard interval matching a - * given partition column value and returns it. + * SearchCachedShardInterval performs a binary search for a shard interval + * matching a given partition column value and returns it's index in the cached + * array. If it can not find any shard interval with the given value, it returns + * INVALID_SHARD_INDEX. */ -static ShardInterval * +static int SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, int shardCount, FmgrInfo *compareFunction) { @@ -332,13 +372,13 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter if (DatumGetInt32(maxValueComparison) <= 0) { - return shardIntervalCache[middleIndex]; + return middleIndex; } lowerBoundIndex = middleIndex + 1; } - return NULL; + return INVALID_SHARD_INDEX; } diff --git a/src/include/distributed/shardinterval_utils.h b/src/include/distributed/shardinterval_utils.h index 53882080a..0d3b8fa15 100644 --- a/src/include/distributed/shardinterval_utils.h +++ b/src/include/distributed/shardinterval_utils.h @@ -15,6 +15,8 @@ #include "distributed/master_metadata_utility.h" #include "nodes/primnodes.h" +#define INVALID_SHARD_INDEX -1 + /* OperatorCacheEntry contains information for each element in OperatorCache */ typedef struct ShardIntervalCompareFunctionCacheEntry { @@ -29,7 +31,7 @@ extern int CompareShardIntervals(const void *leftElement, const void *rightEleme extern int CompareShardIntervalsById(const void *leftElement, const void *rightElement); extern int CompareRelationShards(const void *leftElement, const void *rightElement); -extern int FindShardIntervalIndex(ShardInterval *shardInterval); +extern int ShardIndex(ShardInterval *shardInterval); extern ShardInterval * FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, int shardCount, char partitionMethod, diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 5e412d88e..411aedef9 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -107,9 +107,8 @@ INSERT INTO limit_orders VALUES (32744, 'AAPL', 9580, '2004-10-19 10:23:54', 'bu -- try a single-row INSERT with no shard to receive it INSERT INTO insufficient_shards VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); -ERROR: distributed modifications must target exactly one shard -DETAIL: This command modifies no shards. -HINT: Make sure the value for partition column "id" falls into a single shard. +ERROR: cannot find shard interval +DETAIL: Hash of the partition column value does not fall into any shards. -- try an insert to a range-partitioned table INSERT INTO range_partitioned VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69);