From e1d21292a22e15cec44fa553ff5eda2314a96346 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 27 Apr 2017 16:03:05 -0700 Subject: [PATCH 1/4] Fix: Make FindShardIntervalIndex robust against 0 shards. --- .../distributed/utils/shardinterval_utils.c | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index 8a63375e8..5133da261 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -247,11 +247,12 @@ FindShardInterval(Datum partitionColumnValue, DistTableCacheEntry *cacheEntry) * the searched value. Note that the searched value must be the hashed value * of the original value if the distribution method is hash. * - * Note that, if the searched value can not be found for hash partitioned tables, - * we error out. This should only happen if something is terribly wrong, either - * metadata tables are corrupted or we have a bug somewhere. Such as a hash - * function which returns a value not in the range of [INT32_MIN, INT32_MAX] can - * fire this. + * Note that, if the searched value can not be found for hash partitioned + * tables, we error out (unless there are no shards, in which case + * INVALID_SHARD_INDEX is returned). This should only happen if something is + * terribly wrong, either metadata tables are corrupted or we have a bug + * somewhere. Such as a hash function which returns a value not in the range + * of [INT32_MIN, INT32_MAX] can fire this. */ static int FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry) @@ -264,6 +265,11 @@ FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry) !cacheEntry->hasUniformHashDistribution); int shardIndex = INVALID_SHARD_INDEX; + if (shardCount == 0) + { + return INVALID_SHARD_INDEX; + } + if (partitionMethod == DISTRIBUTE_BY_HASH) { if (useBinarySearch) From 14014b79cab49864ff1cb36838c8fb641bae289d Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 25 Apr 2017 20:26:40 -0700 Subject: [PATCH 2/4] Build DistTableCacheEntry->shardIntervalCompareFunction even for 0 shards. Previously we, unnecessarily, used a the first shard's type information to to look up the comparison function. But that information is already available, so use it. That's helpful because we sometimes want to access the comparator function even if there's no shards. --- .../distributed/utils/metadata_cache.c | 68 ++++--------------- src/include/distributed/metadata_cache.h | 2 +- 2 files changed, 15 insertions(+), 55 deletions(-) diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index a48405597..710526aaa 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -134,8 +134,6 @@ static ShardCacheEntry * LookupShardCacheEntry(int64 shardId); static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId); static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void BuildCachedShardList(DistTableCacheEntry *cacheEntry); -static FmgrInfo * ShardIntervalCompareFunction(ShardInterval **shardIntervalArray, - char partitionMethod); static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArray, int shardCount, FmgrInfo * @@ -622,6 +620,10 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) List *distShardTupleList = NIL; int shardIntervalArrayLength = 0; int shardIndex = 0; + GetPartitionTypeInputInfo(cacheEntry->partitionKeyString, + cacheEntry->partitionMethod, + &intervalTypeId, + &intervalTypeMod); distShardTupleList = LookupDistShardTuples(cacheEntry->relationId); shardIntervalArrayLength = list_length(distShardTupleList); @@ -631,13 +633,6 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) TupleDesc distShardTupleDesc = RelationGetDescr(distShardRelation); ListCell *distShardTupleCell = NULL; int arrayIndex = 0; - Oid intervalTypeId = InvalidOid; - int32 intervalTypeMod = -1; - - GetPartitionTypeInputInfo(cacheEntry->partitionKeyString, - cacheEntry->partitionMethod, - &intervalTypeId, - &intervalTypeMod); shardIntervalArray = MemoryContextAllocZero(CacheMemoryContext, shardIntervalArrayLength * @@ -677,23 +672,19 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) } /* decide and allocate interval comparison function */ - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) + if (intervalTypeId != InvalidOid) + { + /* allocate the comparison function in the cache context */ + MemoryContext oldContext = MemoryContextSwitchTo(CacheMemoryContext); + + shardIntervalCompareFunction = GetFunctionInfo(intervalTypeId, BTREE_AM_OID, + BTORDER_PROC); + MemoryContextSwitchTo(oldContext); + } + else { shardIntervalCompareFunction = NULL; } - else if (shardIntervalArrayLength > 0) - { - MemoryContext oldContext = CurrentMemoryContext; - - /* allocate the comparison function in the cache context */ - oldContext = MemoryContextSwitchTo(CacheMemoryContext); - - shardIntervalCompareFunction = - ShardIntervalCompareFunction(shardIntervalArray, - cacheEntry->partitionMethod); - - MemoryContextSwitchTo(oldContext); - } /* reference tables has a single shard which is not initialized */ if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) @@ -798,37 +789,6 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) } -/* - * ShardIntervalCompareFunction returns the appropriate compare function for the - * partition column type. In case of hash-partitioning, it always returns the compare - * function for integers. Callers of this function has to ensure that shardIntervalArray - * has at least one element. - */ -static FmgrInfo * -ShardIntervalCompareFunction(ShardInterval **shardIntervalArray, char partitionMethod) -{ - FmgrInfo *shardIntervalCompareFunction = NULL; - Oid comparisonTypeId = InvalidOid; - - Assert(shardIntervalArray != NULL); - - if (partitionMethod == DISTRIBUTE_BY_HASH) - { - comparisonTypeId = INT4OID; - } - else - { - ShardInterval *shardInterval = shardIntervalArray[0]; - comparisonTypeId = shardInterval->valueTypeId; - } - - shardIntervalCompareFunction = GetFunctionInfo(comparisonTypeId, BTREE_AM_OID, - BTORDER_PROC); - - return shardIntervalCompareFunction; -} - - /* * SortedShardIntervalArray sorts the input shardIntervalArray. Shard intervals with * no min/max values are placed at the end of the array. diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 24324aa56..d80b4a038 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -49,7 +49,7 @@ typedef struct int shardIntervalArrayLength; ShardInterval **sortedShardIntervalArray; - FmgrInfo *shardIntervalCompareFunction; /* NULL if no shard intervals exist */ + FmgrInfo *shardIntervalCompareFunction; /* NULL if DISTRIBUTE_BY_NONE */ FmgrInfo *hashFunction; /* NULL if table is not distributed by hash */ /* pg_dist_shard_placement metadata */ From 5695452661cb720dfceb37cea595494c07b6da33 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 25 Apr 2017 23:29:10 -0700 Subject: [PATCH 3/4] Add DistTableCacheEntry->shardValueCompareFunction. That's useful when comparing values a hash-partitioned table is filtered by. The existing shardIntervalCompareFunction is about comparing hashed values, not unhashed ones. The added btree opclass function is so we can get a comparator back. This should be changed much more widely, but is not necessary so far. --- .../distributed/utils/metadata_cache.c | 39 ++++++++++++++++++- src/include/distributed/metadata_cache.h | 10 ++++- .../regress/expected/multi_data_types.out | 12 ++++-- src/test/regress/sql/multi_data_types.sql | 13 +++++-- 4 files changed, 65 insertions(+), 9 deletions(-) diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 710526aaa..a36dd5c46 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -156,6 +156,7 @@ static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relation static List * LookupDistShardTuples(Oid relationId); static Oid LookupShardRelation(int64 shardId); static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, + Oid *columnTypeId, int32 *columnTypeMod, Oid *intervalTypeId, int32 *intervalTypeMod); static ShardInterval * TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid intervalTypeId, @@ -617,11 +618,19 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) ShardInterval **shardIntervalArray = NULL; ShardInterval **sortedShardIntervalArray = NULL; FmgrInfo *shardIntervalCompareFunction = NULL; + FmgrInfo *shardColumnCompareFunction = NULL; List *distShardTupleList = NIL; int shardIntervalArrayLength = 0; int shardIndex = 0; + Oid columnTypeId = InvalidOid; + int32 columnTypeMod = -1; + Oid intervalTypeId = InvalidOid; + int32 intervalTypeMod = -1; + GetPartitionTypeInputInfo(cacheEntry->partitionKeyString, cacheEntry->partitionMethod, + &columnTypeId, + &columnTypeMod, &intervalTypeId, &intervalTypeMod); @@ -671,7 +680,22 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) heap_close(distShardRelation, AccessShareLock); } - /* decide and allocate interval comparison function */ + /* look up value comparison function */ + if (columnTypeId != InvalidOid) + { + /* allocate the comparison function in the cache context */ + MemoryContext oldContext = MemoryContextSwitchTo(CacheMemoryContext); + + shardColumnCompareFunction = GetFunctionInfo(columnTypeId, BTREE_AM_OID, + BTORDER_PROC); + MemoryContextSwitchTo(oldContext); + } + else + { + shardColumnCompareFunction = NULL; + } + + /* look up interval comparison function */ if (intervalTypeId != InvalidOid) { /* allocate the comparison function in the cache context */ @@ -785,6 +809,7 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength; cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray; + cacheEntry->shardColumnCompareFunction = shardColumnCompareFunction; cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction; } @@ -2375,8 +2400,11 @@ LookupShardRelation(int64 shardId) */ static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, + Oid *columnTypeId, int32 *columnTypeMod, Oid *intervalTypeId, int32 *intervalTypeMod) { + *columnTypeId = InvalidOid; + *columnTypeMod = -1; *intervalTypeId = InvalidOid; *intervalTypeMod = -1; @@ -2391,18 +2419,25 @@ GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, *intervalTypeId = partitionColumn->vartype; *intervalTypeMod = partitionColumn->vartypmod; + *columnTypeId = partitionColumn->vartype; + *columnTypeMod = partitionColumn->vartypmod; break; } case DISTRIBUTE_BY_HASH: { + Node *partitionNode = stringToNode(partitionKeyString); + Var *partitionColumn = (Var *) partitionNode; + Assert(IsA(partitionNode, Var)); + *intervalTypeId = INT4OID; + *columnTypeId = partitionColumn->vartype; + *columnTypeMod = partitionColumn->vartypmod; break; } case DISTRIBUTE_BY_NONE: { - *intervalTypeId = InvalidOid; break; } diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index d80b4a038..5cc2bbf26 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -49,7 +49,15 @@ typedef struct int shardIntervalArrayLength; ShardInterval **sortedShardIntervalArray; - FmgrInfo *shardIntervalCompareFunction; /* NULL if DISTRIBUTE_BY_NONE */ + /* comparator for partition column's type, NULL if DISTRIBUTE_BY_NONE */ + FmgrInfo *shardColumnCompareFunction; + + /* + * Comparator for partition interval type (different from + * shardValueCompareFunction if hash-partitioned), NULL if + * DISTRIBUTE_BY_NONE. + */ + FmgrInfo *shardIntervalCompareFunction; FmgrInfo *hashFunction; /* NULL if table is not distributed by hash */ /* pg_dist_shard_placement metadata */ diff --git a/src/test/regress/expected/multi_data_types.out b/src/test/regress/expected/multi_data_types.out index 76fd44367..1cb4d1f9b 100644 --- a/src/test/regress/expected/multi_data_types.out +++ b/src/test/regress/expected/multi_data_types.out @@ -10,8 +10,13 @@ CREATE TYPE test_composite_type AS ( ); -- ... as well as a function to use as its comparator... CREATE FUNCTION equal_test_composite_type_function(test_composite_type, test_composite_type) RETURNS boolean -AS 'select $1.i = $2.i AND $1.i2 = $2.i2;' -LANGUAGE SQL +LANGUAGE 'internal' +AS 'record_eq' +IMMUTABLE +RETURNS NULL ON NULL INPUT; +CREATE FUNCTION cmp_test_composite_type_function(test_composite_type, test_composite_type) RETURNS int +LANGUAGE 'internal' +AS 'btrecordcmp' IMMUTABLE RETURNS NULL ON NULL INPUT; -- ... use that function to create a custom equality operator... @@ -34,7 +39,8 @@ RETURNS NULL ON NULL INPUT; -- One uses BTREE the other uses HASH CREATE OPERATOR CLASS cats_op_fam_clas3 DEFAULT FOR TYPE test_composite_type USING BTREE AS -OPERATOR 3 = (test_composite_type, test_composite_type); +OPERATOR 3 = (test_composite_type, test_composite_type), +FUNCTION 1 cmp_test_composite_type_function(test_composite_type, test_composite_type); CREATE OPERATOR CLASS cats_op_fam_class DEFAULT FOR TYPE test_composite_type USING HASH AS OPERATOR 1 = (test_composite_type, test_composite_type), diff --git a/src/test/regress/sql/multi_data_types.sql b/src/test/regress/sql/multi_data_types.sql index 315550474..3dd6eb0f7 100644 --- a/src/test/regress/sql/multi_data_types.sql +++ b/src/test/regress/sql/multi_data_types.sql @@ -15,8 +15,14 @@ CREATE TYPE test_composite_type AS ( -- ... as well as a function to use as its comparator... CREATE FUNCTION equal_test_composite_type_function(test_composite_type, test_composite_type) RETURNS boolean -AS 'select $1.i = $2.i AND $1.i2 = $2.i2;' -LANGUAGE SQL +LANGUAGE 'internal' +AS 'record_eq' +IMMUTABLE +RETURNS NULL ON NULL INPUT; + +CREATE FUNCTION cmp_test_composite_type_function(test_composite_type, test_composite_type) RETURNS int +LANGUAGE 'internal' +AS 'btrecordcmp' IMMUTABLE RETURNS NULL ON NULL INPUT; @@ -44,7 +50,8 @@ RETURNS NULL ON NULL INPUT; -- One uses BTREE the other uses HASH CREATE OPERATOR CLASS cats_op_fam_clas3 DEFAULT FOR TYPE test_composite_type USING BTREE AS -OPERATOR 3 = (test_composite_type, test_composite_type); +OPERATOR 3 = (test_composite_type, test_composite_type), +FUNCTION 1 cmp_test_composite_type_function(test_composite_type, test_composite_type); CREATE OPERATOR CLASS cats_op_fam_class DEFAULT FOR TYPE test_composite_type USING HASH AS From d9470fcf53046b4447fc3578d0bfd53e1d212d9e Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 27 Apr 2017 18:17:14 -0700 Subject: [PATCH 4/4] Add DistTableCacheEntry->hasOverlappingShardInterval. This determines whether it's possible to perform binary search on sortedShardIntervalArray or not. If e.g. two shards have overlapping ranges, that'd be prohibitive. That'll be useful in later commit introducing faster shard pruning. --- .../distributed/utils/metadata_cache.c | 80 +++++++++++++++++++ src/include/distributed/metadata_cache.h | 1 + .../input/multi_outer_join_reference.source | 2 +- .../output/multi_outer_join_reference.source | 5 +- 4 files changed, 84 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index a36dd5c46..15062f78e 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -145,6 +145,9 @@ static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArr static void ErrorIfInstalledVersionMismatch(void); static char * AvailableExtensionVersion(void); static char * InstalledExtensionVersion(void); +static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray, + int shardIntervalArrayLength, + FmgrInfo *shardIntervalSortCompareFunction); static void InitializeDistTableCache(void); static void InitializeWorkerNodeCache(void); static uint32 WorkerNodeHashCode(const void *key, Size keySize); @@ -714,6 +717,7 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) { cacheEntry->hasUninitializedShardInterval = true; + cacheEntry->hasOverlappingShardInterval = true; /* * Note that during create_reference_table() call, @@ -742,6 +746,35 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) cacheEntry->hasUninitializedShardInterval = HasUninitializedShardInterval(sortedShardIntervalArray, shardIntervalArrayLength); + + if (!cacheEntry->hasUninitializedShardInterval) + { + cacheEntry->hasOverlappingShardInterval = + HasOverlappingShardInterval(sortedShardIntervalArray, + shardIntervalArrayLength, + shardIntervalCompareFunction); + } + else + { + cacheEntry->hasOverlappingShardInterval = true; + } + + /* + * If table is hash-partitioned and has shards, there never should be + * any uninitalized shards. Historically we've not prevented that for + * range partitioned tables, but it might be a good idea to start + * doing so. + */ + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && + cacheEntry->hasUninitializedShardInterval) + { + ereport(ERROR, (errmsg("hash partitioned table has uninitialized shards"))); + } + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && + cacheEntry->hasOverlappingShardInterval) + { + ereport(ERROR, (errmsg("hash partitioned table has overlapping shards"))); + } } @@ -917,6 +950,52 @@ HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shar } +/* + * HasOverlappingShardInterval determines whether the given list of sorted + * shards has overlapping ranges. + */ +static bool +HasOverlappingShardInterval(ShardInterval **shardIntervalArray, + int shardIntervalArrayLength, + FmgrInfo *shardIntervalSortCompareFunction) +{ + int shardIndex = 0; + ShardInterval *lastShardInterval = NULL; + Datum comparisonDatum = 0; + int comparisonResult = 0; + + /* zero/a single shard can't overlap */ + if (shardIntervalArrayLength < 2) + { + return false; + } + + lastShardInterval = shardIntervalArray[0]; + for (shardIndex = 1; shardIndex < shardIntervalArrayLength; shardIndex++) + { + ShardInterval *curShardInterval = shardIntervalArray[shardIndex]; + + /* only called if !hasUninitializedShardInterval */ + Assert(lastShardInterval->minValueExists && lastShardInterval->maxValueExists); + Assert(curShardInterval->minValueExists && curShardInterval->maxValueExists); + + comparisonDatum = CompareCall2(shardIntervalSortCompareFunction, + lastShardInterval->maxValue, + curShardInterval->minValue); + comparisonResult = DatumGetInt32(comparisonDatum); + + if (comparisonResult >= 0) + { + return true; + } + + lastShardInterval = curShardInterval; + } + + return false; +} + + /* * CitusHasBeenLoaded returns true if the citus extension has been created * in the current database and the extension script has been executed. Otherwise, @@ -2138,6 +2217,7 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry) cacheEntry->shardIntervalArrayLength = 0; cacheEntry->hasUninitializedShardInterval = false; cacheEntry->hasUniformHashDistribution = false; + cacheEntry->hasOverlappingShardInterval = false; } diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 5cc2bbf26..ae83940d4 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -38,6 +38,7 @@ typedef struct bool isDistributedTable; bool hasUninitializedShardInterval; bool hasUniformHashDistribution; /* valid for hash partitioned tables */ + bool hasOverlappingShardInterval; /* pg_dist_partition metadata for this table */ char *partitionKeyString; diff --git a/src/test/regress/input/multi_outer_join_reference.source b/src/test/regress/input/multi_outer_join_reference.source index f1e5946c4..9a106eb61 100644 --- a/src/test/regress/input/multi_outer_join_reference.source +++ b/src/test/regress/input/multi_outer_join_reference.source @@ -166,7 +166,7 @@ FROM -- load some more data \copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|' --- Update shards so that they do not have 1-1 matching. We should error here. +-- Update shards so that they do not have 1-1 matching, triggering an error. UPDATE pg_dist_shard SET shardminvalue = '2147483646' WHERE shardid = 1260006; UPDATE pg_dist_shard SET shardmaxvalue = '2147483647' WHERE shardid = 1260006; SELECT diff --git a/src/test/regress/output/multi_outer_join_reference.source b/src/test/regress/output/multi_outer_join_reference.source index 82d1b88ed..a2b17a0fa 100644 --- a/src/test/regress/output/multi_outer_join_reference.source +++ b/src/test/regress/output/multi_outer_join_reference.source @@ -228,15 +228,14 @@ LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_ -- load some more data \copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|' --- Update shards so that they do not have 1-1 matching. We should error here. +-- Update shards so that they do not have 1-1 matching, triggering an error. UPDATE pg_dist_shard SET shardminvalue = '2147483646' WHERE shardid = 1260006; UPDATE pg_dist_shard SET shardmaxvalue = '2147483647' WHERE shardid = 1260006; SELECT min(l_custkey), max(l_custkey) FROM multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_hash b ON (l_custkey = r_custkey); -ERROR: cannot perform distributed planning on this query -DETAIL: Shards of relations in outer join queries must have 1-to-1 shard partitioning +ERROR: hash partitioned table has overlapping shards UPDATE pg_dist_shard SET shardminvalue = '-2147483648' WHERE shardid = 1260006; UPDATE pg_dist_shard SET shardmaxvalue = '-1073741825' WHERE shardid = 1260006; -- empty tables