mirror of https://github.com/citusdata/citus.git
Add DistTableCacheEntry->hasOverlappingShardInterval.
This determines whether it's possible to perform binary search on sortedShardIntervalArray or not. If e.g. two shards have overlapping ranges, that'd be prohibitive. That'll be useful in later commit introducing faster shard pruning.support-6.1-faster-pruning
parent
3268ffae48
commit
096a1e3200
|
@ -131,6 +131,9 @@ static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
|
||||||
int shardIntervalArrayLength);
|
int shardIntervalArrayLength);
|
||||||
static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
|
static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
|
||||||
int shardCount);
|
int shardCount);
|
||||||
|
static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
|
||||||
|
int shardIntervalArrayLength,
|
||||||
|
FmgrInfo *shardIntervalSortCompareFunction);
|
||||||
static void InitializeDistTableCache(void);
|
static void InitializeDistTableCache(void);
|
||||||
static void InitializeWorkerNodeCache(void);
|
static void InitializeWorkerNodeCache(void);
|
||||||
static uint32 WorkerNodeHashCode(const void *key, Size keySize);
|
static uint32 WorkerNodeHashCode(const void *key, Size keySize);
|
||||||
|
@ -700,6 +703,7 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
|
||||||
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
cacheEntry->hasUninitializedShardInterval = true;
|
cacheEntry->hasUninitializedShardInterval = true;
|
||||||
|
cacheEntry->hasOverlappingShardInterval = true;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Note that during create_reference_table() call,
|
* Note that during create_reference_table() call,
|
||||||
|
@ -728,6 +732,35 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
|
||||||
cacheEntry->hasUninitializedShardInterval =
|
cacheEntry->hasUninitializedShardInterval =
|
||||||
HasUninitializedShardInterval(sortedShardIntervalArray,
|
HasUninitializedShardInterval(sortedShardIntervalArray,
|
||||||
shardIntervalArrayLength);
|
shardIntervalArrayLength);
|
||||||
|
|
||||||
|
if (!cacheEntry->hasUninitializedShardInterval)
|
||||||
|
{
|
||||||
|
cacheEntry->hasOverlappingShardInterval =
|
||||||
|
HasOverlappingShardInterval(sortedShardIntervalArray,
|
||||||
|
shardIntervalArrayLength,
|
||||||
|
shardIntervalCompareFunction);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
cacheEntry->hasOverlappingShardInterval = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If table is hash-partitioned and has shards, there never should be
|
||||||
|
* any uninitalized shards. Historically we've not prevented that for
|
||||||
|
* range partitioned tables, but it might be a good idea to start
|
||||||
|
* doing so.
|
||||||
|
*/
|
||||||
|
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
|
||||||
|
cacheEntry->hasUninitializedShardInterval)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("hash partitioned table has uninitialized shards")));
|
||||||
|
}
|
||||||
|
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
|
||||||
|
cacheEntry->hasOverlappingShardInterval)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("hash partitioned table has overlapping shards")));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -903,6 +936,52 @@ HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shar
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* HasOverlappingShardInterval determines whether the given list of sorted
|
||||||
|
* shards has overlapping ranges.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
|
||||||
|
int shardIntervalArrayLength,
|
||||||
|
FmgrInfo *shardIntervalSortCompareFunction)
|
||||||
|
{
|
||||||
|
int shardIndex = 0;
|
||||||
|
ShardInterval *lastShardInterval = NULL;
|
||||||
|
Datum comparisonDatum = 0;
|
||||||
|
int comparisonResult = 0;
|
||||||
|
|
||||||
|
/* zero/a single shard can't overlap */
|
||||||
|
if (shardIntervalArrayLength < 2)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
lastShardInterval = shardIntervalArray[0];
|
||||||
|
for (shardIndex = 1; shardIndex < shardIntervalArrayLength; shardIndex++)
|
||||||
|
{
|
||||||
|
ShardInterval *curShardInterval = shardIntervalArray[shardIndex];
|
||||||
|
|
||||||
|
/* only called if !hasUninitializedShardInterval */
|
||||||
|
Assert(lastShardInterval->minValueExists && lastShardInterval->maxValueExists);
|
||||||
|
Assert(curShardInterval->minValueExists && curShardInterval->maxValueExists);
|
||||||
|
|
||||||
|
comparisonDatum = CompareCall2(shardIntervalSortCompareFunction,
|
||||||
|
lastShardInterval->maxValue,
|
||||||
|
curShardInterval->minValue);
|
||||||
|
comparisonResult = DatumGetInt32(comparisonDatum);
|
||||||
|
|
||||||
|
if (comparisonResult >= 0)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
lastShardInterval = curShardInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CitusHasBeenLoaded returns true if the citus extension has been created
|
* CitusHasBeenLoaded returns true if the citus extension has been created
|
||||||
* in the current database and the extension script has been executed. Otherwise,
|
* in the current database and the extension script has been executed. Otherwise,
|
||||||
|
@ -1832,6 +1911,7 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
|
||||||
cacheEntry->shardIntervalArrayLength = 0;
|
cacheEntry->shardIntervalArrayLength = 0;
|
||||||
cacheEntry->hasUninitializedShardInterval = false;
|
cacheEntry->hasUninitializedShardInterval = false;
|
||||||
cacheEntry->hasUniformHashDistribution = false;
|
cacheEntry->hasUniformHashDistribution = false;
|
||||||
|
cacheEntry->hasOverlappingShardInterval = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,7 @@ typedef struct
|
||||||
bool isDistributedTable;
|
bool isDistributedTable;
|
||||||
bool hasUninitializedShardInterval;
|
bool hasUninitializedShardInterval;
|
||||||
bool hasUniformHashDistribution; /* valid for hash partitioned tables */
|
bool hasUniformHashDistribution; /* valid for hash partitioned tables */
|
||||||
|
bool hasOverlappingShardInterval;
|
||||||
|
|
||||||
/* pg_dist_partition metadata for this table */
|
/* pg_dist_partition metadata for this table */
|
||||||
char *partitionKeyString;
|
char *partitionKeyString;
|
||||||
|
|
|
@ -167,7 +167,7 @@ FROM
|
||||||
-- load some more data
|
-- load some more data
|
||||||
\copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|'
|
\copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|'
|
||||||
|
|
||||||
-- Update shards so that they do not have 1-1 matching. We should error here.
|
-- Update shards so that they do not have 1-1 matching, triggering an error.
|
||||||
UPDATE pg_dist_shard SET shardminvalue = '2147483646' WHERE shardid = 1260006;
|
UPDATE pg_dist_shard SET shardminvalue = '2147483646' WHERE shardid = 1260006;
|
||||||
UPDATE pg_dist_shard SET shardmaxvalue = '2147483647' WHERE shardid = 1260006;
|
UPDATE pg_dist_shard SET shardmaxvalue = '2147483647' WHERE shardid = 1260006;
|
||||||
SELECT
|
SELECT
|
||||||
|
|
|
@ -229,15 +229,14 @@ LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_
|
||||||
|
|
||||||
-- load some more data
|
-- load some more data
|
||||||
\copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|'
|
\copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|'
|
||||||
-- Update shards so that they do not have 1-1 matching. We should error here.
|
-- Update shards so that they do not have 1-1 matching, triggering an error.
|
||||||
UPDATE pg_dist_shard SET shardminvalue = '2147483646' WHERE shardid = 1260006;
|
UPDATE pg_dist_shard SET shardminvalue = '2147483646' WHERE shardid = 1260006;
|
||||||
UPDATE pg_dist_shard SET shardmaxvalue = '2147483647' WHERE shardid = 1260006;
|
UPDATE pg_dist_shard SET shardmaxvalue = '2147483647' WHERE shardid = 1260006;
|
||||||
SELECT
|
SELECT
|
||||||
min(l_custkey), max(l_custkey)
|
min(l_custkey), max(l_custkey)
|
||||||
FROM
|
FROM
|
||||||
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_hash b ON (l_custkey = r_custkey);
|
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_hash b ON (l_custkey = r_custkey);
|
||||||
ERROR: cannot perform distributed planning on this query
|
ERROR: hash partitioned table has overlapping shards
|
||||||
DETAIL: Shards of relations in outer join queries must have 1-to-1 shard partitioning
|
|
||||||
UPDATE pg_dist_shard SET shardminvalue = '-2147483648' WHERE shardid = 1260006;
|
UPDATE pg_dist_shard SET shardminvalue = '-2147483648' WHERE shardid = 1260006;
|
||||||
UPDATE pg_dist_shard SET shardmaxvalue = '-1073741825' WHERE shardid = 1260006;
|
UPDATE pg_dist_shard SET shardmaxvalue = '-1073741825' WHERE shardid = 1260006;
|
||||||
-- empty tables
|
-- empty tables
|
||||||
|
|
Loading…
Reference in New Issue