diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 2a3203a61..db7db578b 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -277,7 +277,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId) uint64 sourceShardId = sourceShardInterval->shardId; uint64 newShardId = GetNextShardId(); ListCell *sourceShardPlacementCell = NULL; - int sourceShardIndex = FindShardIntervalIndex(sourceShardInterval); + int sourceShardIndex = ShardIndex(sourceShardInterval); int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue); int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue); diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index ca7a8e003..549c895eb 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -307,7 +307,7 @@ CopyShardForeignConstraintCommandList(ShardInterval *shardInterval) /* we will only use shardIndex if there is a foreign constraint */ if (commandList != NIL) { - shardIndex = FindShardIntervalIndex(shardInterval); + shardIndex = ShardIndex(shardInterval); } foreach(commandCell, commandList) diff --git a/src/backend/distributed/test/colocation_utils.c b/src/backend/distributed/test/colocation_utils.c index ad83d5304..416522b1d 100644 --- a/src/backend/distributed/test/colocation_utils.c +++ b/src/backend/distributed/test/colocation_utils.c @@ -153,7 +153,7 @@ find_shard_interval_index(PG_FUNCTION_ARGS) { uint32 shardId = PG_GETARG_UINT32(0); ShardInterval *shardInterval = LoadShardInterval(shardId); - uint32 shardIndex = FindShardIntervalIndex(shardInterval); + uint32 shardIndex = ShardIndex(shardInterval); PG_RETURN_INT32(shardIndex); } diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 4a2035dcd..82b25040a 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -348,7 +348,7 @@ ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShard * partitioned table's shards. * * We do min/max value check here to decide whether two shards are colocated, - * instead we can simply use FindShardIntervalIndex function on both shards then + * instead we can simply use ShardIndex function on both shards then * but do index check, but we avoid it because this way it is more cheaper. */ static bool @@ -852,10 +852,10 @@ ColocatedShardIntervalList(ShardInterval *shardInterval) return colocatedShardList; } - shardIntervalIndex = FindShardIntervalIndex(shardInterval); + shardIntervalIndex = ShardIndex(shardInterval); colocatedTableList = ColocatedTableList(distributedTableId); - /* FindShardIntervalIndex have to find index of given shard */ + /* ShardIndex have to find index of given shard */ Assert(shardIntervalIndex >= 0); foreach(colocatedTableCell, colocatedTableList) diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index a28749483..f38f218c9 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -23,10 +23,12 @@ #include "utils/memutils.h" -static ShardInterval * SearchCachedShardInterval(Datum partitionColumnValue, - ShardInterval **shardIntervalCache, - int shardCount, - FmgrInfo *compareFunction); +static int FindShardIntervalIndex(Datum searchedValue, ShardInterval **shardIntervalCache, + int shardCount, char partitionMethod, + FmgrInfo *compareFunction, bool useBinarySearch); +static int SearchCachedShardInterval(Datum partitionColumnValue, + ShardInterval **shardIntervalCache, + int shardCount, FmgrInfo *compareFunction); /* @@ -171,7 +173,7 @@ CompareRelationShards(const void *leftElement, const void *rightElement) /* - * FindShardIntervalIndex finds index of given shard in sorted shard interval array. + * ShardIndex finds the index of given shard in sorted shard interval array. * * For hash partitioned tables, it calculates hash value of a number in its * range (e.g. min value) and finds which shard should contain the hashed @@ -179,15 +181,30 @@ CompareRelationShards(const void *leftElement, const void *rightElement) * other than hash and reference, the function errors out. */ int -FindShardIntervalIndex(ShardInterval *shardInterval) +ShardIndex(ShardInterval *shardInterval) { + int shardIndex = INVALID_SHARD_INDEX; Oid distributedTableId = shardInterval->relationId; + Datum shardMinValue = shardInterval->minValue; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + ShardInterval **shardIntervalCache = cacheEntry->sortedShardIntervalArray; + int shardCount = cacheEntry->shardIntervalArrayLength; char partitionMethod = cacheEntry->partitionMethod; - int shardCount = 0; - int32 shardMinValue = 0; - uint64 hashTokenIncrement = 0; - int shardIndex = -1; + FmgrInfo *compareFunction = cacheEntry->shardIntervalCompareFunction; + bool hasUniformHashDistribution = cacheEntry->hasUniformHashDistribution; + bool useBinarySearch = false; + + /* + * Note that, we can also support append and range distributed tables, but + * currently it is not required. + */ + if (partitionMethod != DISTRIBUTE_BY_HASH && partitionMethod != DISTRIBUTE_BY_NONE) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("finding index of a given shard is only supported for " + "hash distributed and reference tables"))); + } /* short-circuit for reference tables */ if (partitionMethod == DISTRIBUTE_BY_NONE) @@ -198,34 +215,15 @@ FindShardIntervalIndex(ShardInterval *shardInterval) return shardIndex; } - /* - * We can support it for other types of partitioned tables with simple binary scan - * but it is not necessary at the moment. If we need that simply check algorithm in - * FindShardInterval and SearchCachedShardInterval. - */ - if (partitionMethod != DISTRIBUTE_BY_HASH) + /* determine whether to use binary search */ + if (partitionMethod != DISTRIBUTE_BY_HASH || !hasUniformHashDistribution) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("finding index of given shard is not supported for " - "non-hash partitioned tables"))); + useBinarySearch = true; } - shardCount = cacheEntry->shardIntervalArrayLength; - shardMinValue = DatumGetInt32(shardInterval->minValue); - hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; - shardIndex = (uint32) (shardMinValue - INT32_MIN) / hashTokenIncrement; - - Assert(shardIndex <= shardCount); - - /* - * If the shard count is not power of 2, the range of the last - * shard becomes larger than others. For that extra piece of range, - * we still need to use the last shard. - */ - if (shardIndex == shardCount) - { - shardIndex = shardCount - 1; - } + shardIndex = FindShardIntervalIndex(shardMinValue, shardIntervalCache, + shardCount, partitionMethod, + compareFunction, useBinarySearch); return shardIndex; } @@ -242,25 +240,69 @@ FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache int shardCount, char partitionMethod, FmgrInfo *compareFunction, FmgrInfo *hashFunction, bool useBinarySearch) { - ShardInterval *shardInterval = NULL; + Datum searchedValue = partitionColumnValue; + int shardIndex = INVALID_SHARD_INDEX; + + if (partitionMethod == DISTRIBUTE_BY_HASH) + { + searchedValue = FunctionCall1(hashFunction, partitionColumnValue); + } + + shardIndex = FindShardIntervalIndex(searchedValue, shardIntervalCache, + shardCount, partitionMethod, + compareFunction, useBinarySearch); + + if (shardIndex == INVALID_SHARD_INDEX) + { + return NULL; + } + + return shardIntervalCache[shardIndex]; +} + + +/* + * FindShardIntervalIndex finds the index of the shard interval which covers + * the searched value. Note that the searched value must be the hashed value + * of the original value if the distribution method is hash. + * + * Note that, if the searched value can not be found for hash partitioned tables, + * we error out. This should only happen if something is terribly wrong, either + * metadata tables are corrupted or we have a bug somewhere. Such as a hash + * function which returns a value not in the range of [INT32_MIN, INT32_MAX] can + * fire this. + */ +static int +FindShardIntervalIndex(Datum searchedValue, ShardInterval **shardIntervalCache, + int shardCount, char partitionMethod, FmgrInfo *compareFunction, + bool useBinarySearch) +{ + int shardIndex = INVALID_SHARD_INDEX; if (partitionMethod == DISTRIBUTE_BY_HASH) { - int hashedValue = DatumGetInt32(FunctionCall1(hashFunction, - partitionColumnValue)); if (useBinarySearch) { Assert(compareFunction != NULL); - shardInterval = SearchCachedShardInterval(Int32GetDatum(hashedValue), - shardIntervalCache, shardCount, - compareFunction); + shardIndex = SearchCachedShardInterval(searchedValue, shardIntervalCache, + shardCount, compareFunction); + + /* we should always return a valid shard index for hash partitioned tables */ + if (shardIndex == INVALID_SHARD_INDEX) + { + ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION), + errmsg("cannot find shard interval"), + errdetail("Hash of the partition column value " + "does not fall into any shards."))); + } } else { + int hashedValue = DatumGetInt32(searchedValue); uint64 hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; - int shardIndex = (uint32) (hashedValue - INT32_MIN) / hashTokenIncrement; + shardIndex = (uint32) (hashedValue - INT32_MIN) / hashTokenIncrement; Assert(shardIndex <= shardCount); /* @@ -272,36 +314,34 @@ FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache { shardIndex = shardCount - 1; } - - shardInterval = shardIntervalCache[shardIndex]; } } else if (partitionMethod == DISTRIBUTE_BY_NONE) { - int shardIndex = 0; - /* reference tables has a single shard, all values mapped to that shard */ Assert(shardCount == 1); - shardInterval = shardIntervalCache[shardIndex]; + + shardIndex = 0; } else { Assert(compareFunction != NULL); - shardInterval = SearchCachedShardInterval(partitionColumnValue, - shardIntervalCache, shardCount, - compareFunction); + shardIndex = SearchCachedShardInterval(searchedValue, shardIntervalCache, + shardCount, compareFunction); } - return shardInterval; + return shardIndex; } /* - * SearchCachedShardInterval performs a binary search for a shard interval matching a - * given partition column value and returns it. + * SearchCachedShardInterval performs a binary search for a shard interval + * matching a given partition column value and returns it's index in the cached + * array. If it can not find any shard interval with the given value, it returns + * INVALID_SHARD_INDEX. */ -static ShardInterval * +static int SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, int shardCount, FmgrInfo *compareFunction) { @@ -332,13 +372,13 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter if (DatumGetInt32(maxValueComparison) <= 0) { - return shardIntervalCache[middleIndex]; + return middleIndex; } lowerBoundIndex = middleIndex + 1; } - return NULL; + return INVALID_SHARD_INDEX; } diff --git a/src/include/distributed/shardinterval_utils.h b/src/include/distributed/shardinterval_utils.h index 53882080a..0d3b8fa15 100644 --- a/src/include/distributed/shardinterval_utils.h +++ b/src/include/distributed/shardinterval_utils.h @@ -15,6 +15,8 @@ #include "distributed/master_metadata_utility.h" #include "nodes/primnodes.h" +#define INVALID_SHARD_INDEX -1 + /* OperatorCacheEntry contains information for each element in OperatorCache */ typedef struct ShardIntervalCompareFunctionCacheEntry { @@ -29,7 +31,7 @@ extern int CompareShardIntervals(const void *leftElement, const void *rightEleme extern int CompareShardIntervalsById(const void *leftElement, const void *rightElement); extern int CompareRelationShards(const void *leftElement, const void *rightElement); -extern int FindShardIntervalIndex(ShardInterval *shardInterval); +extern int ShardIndex(ShardInterval *shardInterval); extern ShardInterval * FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, int shardCount, char partitionMethod, diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 5e412d88e..411aedef9 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -107,9 +107,8 @@ INSERT INTO limit_orders VALUES (32744, 'AAPL', 9580, '2004-10-19 10:23:54', 'bu -- try a single-row INSERT with no shard to receive it INSERT INTO insufficient_shards VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); -ERROR: distributed modifications must target exactly one shard -DETAIL: This command modifies no shards. -HINT: Make sure the value for partition column "id" falls into a single shard. +ERROR: cannot find shard interval +DETAIL: Hash of the partition column value does not fall into any shards. -- try an insert to a range-partitioned table INSERT INTO range_partitioned VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69);