From 6c7abc2ba5c9e7402da72e24702178b242d959ab Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 30 Mar 2016 14:43:46 +0300 Subject: [PATCH] Add fast shard pruning path for INSERTs on hash partitioned tables This commit adds a fast shard pruning path for INSERTs on hash-partitioned tables. The rationale behind this change is that if there exists a sorted shard interval array, a single index lookup on the array allows us to find the corresponding shard interval. As mentioned above, we need a sorted (wrt shardminvalue) shard interval array. Thus, this commit updates shardIntervalArray to sortedShardIntervalArray in the metadata cache. Then uses the low-level API that is defined in multi_copy to handle the fast shard pruning. The performance impact of this change is more apparent as more shards exist for a distributed table. Previous implementation was relying on linear search through the shard intervals. However, this commit relies on constant lookup time on shard interval array. Thus, the shard pruning becomes less dependent on the shard count. --- src/backend/distributed/commands/multi_copy.c | 209 +++------------- .../master/master_metadata_utility.c | 10 +- .../distributed/planner/multi_join_order.c | 58 ++--- .../planner/multi_logical_optimizer.c | 38 +-- .../planner/multi_physical_planner.c | 94 +------ .../planner/multi_router_planner.c | 118 ++++++++- .../distributed/test/prune_shard_list.c | 49 ++++ .../distributed/utils/metadata_cache.c | 235 +++++++++++++++++- .../distributed/utils/shardinterval_utils.c | 179 +++++++++++++ src/include/distributed/metadata_cache.h | 8 +- .../distributed/multi_physical_planner.h | 1 - src/include/distributed/shardinterval_utils.h | 35 +++ .../expected/multi_distribution_metadata.out | 2 +- .../regress/expected/multi_hash_pruning.out | 20 +- .../expected/multi_index_statements.out | 4 +- .../expected/multi_join_order_additional.out | 54 ++-- .../regress/expected/multi_join_pruning.out | 42 ++-- .../multi_large_table_join_planning.out | 86 +++---- .../multi_large_table_join_planning_0.out | 86 +++---- .../expected/multi_large_table_pruning.out | 18 +- .../multi_large_table_task_assignment.out | 84 +++---- .../multi_null_minmax_value_pruning.out | 54 ++-- .../expected/multi_partition_pruning.out | 20 +- .../expected/multi_prune_shard_list.out | 93 ++++++- .../multi_alter_table_statements.source | 10 +- src/test/regress/output/multi_subquery.source | 8 +- .../regress/sql/multi_prune_shard_list.sql | 52 ++++ 27 files changed, 1078 insertions(+), 589 deletions(-) create mode 100644 src/backend/distributed/utils/shardinterval_utils.c create mode 100644 src/include/distributed/shardinterval_utils.h diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 53b73c762..4c943969a 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -68,6 +68,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/connection_cache.h" #include "distributed/listutils.h" +#include "distributed/metadata_cache.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" @@ -76,6 +77,7 @@ #include "distributed/multi_transaction.h" #include "distributed/pg_dist_partition.h" #include "distributed/resource_lock.h" +#include "distributed/shardinterval_utils.h" #include "distributed/worker_protocol.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -144,19 +146,6 @@ static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag); static void LockAllShards(List *shardIntervalList); static HTAB * CreateShardConnectionHash(void); static int CompareShardIntervalsById(const void *leftElement, const void *rightElement); -static bool IsUniformHashDistribution(ShardInterval **shardIntervalArray, - int shardCount); -static FmgrInfo * ShardIntervalCompareFunction(Var *partitionColumn, char - partitionMethod); -static ShardInterval * FindShardInterval(Datum partitionColumnValue, - ShardInterval **shardIntervalCache, - int shardCount, char partitionMethod, - FmgrInfo *compareFunction, - FmgrInfo *hashFunction, bool useBinarySearch); -static ShardInterval * SearchCachedShardInterval(Datum partitionColumnValue, - ShardInterval **shardIntervalCache, - int shardCount, - FmgrInfo *compareFunction); static ShardConnections * GetShardConnections(HTAB *shardConnectionHash, int64 shardId, bool *shardConnectionsFound); @@ -250,9 +239,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) uint32 columnCount = 0; Datum *columnValues = NULL; bool *columnNulls = NULL; - TypeCacheEntry *typeEntry = NULL; FmgrInfo *hashFunction = NULL; FmgrInfo *compareFunction = NULL; + bool hasUniformHashDistribution = false; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId); int shardCount = 0; List *shardIntervalList = NULL; @@ -275,12 +265,11 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) Var *partitionColumn = PartitionColumn(tableId, 0); char partitionMethod = PartitionMethod(tableId); - /* resolve hash function for partition column */ - typeEntry = lookup_type_cache(partitionColumn->vartype, TYPECACHE_HASH_PROC_FINFO); - hashFunction = &(typeEntry->hash_proc_finfo); + /* get hash function for partition column */ + hashFunction = cacheEntry->hashFunction; - /* resolve compare function for shard intervals */ - compareFunction = ShardIntervalCompareFunction(partitionColumn, partitionMethod); + /* get compare function for shard intervals */ + compareFunction = cacheEntry->shardIntervalCompareFunction; /* allocate column values and nulls arrays */ distributedRelation = heap_open(tableId, RowExclusiveLock); @@ -311,16 +300,32 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) } } + /* error if any shard missing min/max values */ + if (cacheEntry->hasUninitializedShardInterval) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not start copy"), + errdetail("Distributed relation \"%s\" has shards " + "with missing shardminvalue/shardmaxvalue.", + relationName))); + } + + if (partitionMethod == DISTRIBUTE_BY_APPEND) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unsupported partition method %d", partitionMethod))); + } + /* prevent concurrent placement changes and non-commutative DML statements */ LockAllShards(shardIntervalList); /* initialize the shard interval cache */ - shardCount = list_length(shardIntervalList); - shardIntervalCache = SortedShardIntervalArray(shardIntervalList); + shardCount = cacheEntry->shardIntervalArrayLength; + shardIntervalCache = cacheEntry->sortedShardIntervalArray; + hasUniformHashDistribution = cacheEntry->hasUniformHashDistribution; /* determine whether to use binary search */ - if (partitionMethod != DISTRIBUTE_BY_HASH || - !IsUniformHashDistribution(shardIntervalCache, shardCount)) + if (partitionMethod != DISTRIBUTE_BY_HASH || !hasUniformHashDistribution) { useBinarySearch = true; } @@ -736,164 +741,6 @@ CompareShardIntervalsById(const void *leftElement, const void *rightElement) } -/* - * ShardIntervalCompareFunction returns the appropriate compare function for the - * partition column type. In case of hash-partitioning, it always returns the compare - * function for integers. - */ -static FmgrInfo * -ShardIntervalCompareFunction(Var *partitionColumn, char partitionMethod) -{ - FmgrInfo *compareFunction = NULL; - - if (partitionMethod == DISTRIBUTE_BY_HASH) - { - compareFunction = GetFunctionInfo(INT4OID, BTREE_AM_OID, BTORDER_PROC); - } - else if (partitionMethod == DISTRIBUTE_BY_RANGE) - { - compareFunction = GetFunctionInfo(partitionColumn->vartype, - BTREE_AM_OID, BTORDER_PROC); - } - else - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("unsupported partition method %d", partitionMethod))); - } - - return compareFunction; -} - - -/* - * IsUniformHashDistribution determines whether the given list of sorted shards - * has a uniform hash distribution, as produced by master_create_worker_shards. - */ -static bool -IsUniformHashDistribution(ShardInterval **shardIntervalArray, int shardCount) -{ - uint64 hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; - int shardIndex = 0; - - for (shardIndex = 0; shardIndex < shardCount; shardIndex++) - { - ShardInterval *shardInterval = shardIntervalArray[shardIndex]; - int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement); - int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); - - if (shardIndex == (shardCount - 1)) - { - shardMaxHashToken = INT32_MAX; - } - - if (DatumGetInt32(shardInterval->minValue) != shardMinHashToken || - DatumGetInt32(shardInterval->maxValue) != shardMaxHashToken) - { - return false; - } - } - - return true; -} - - -/* - * FindShardInterval finds a single shard interval in the cache for the - * given partition column value. - */ -static ShardInterval * -FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, - int shardCount, char partitionMethod, FmgrInfo *compareFunction, - FmgrInfo *hashFunction, bool useBinarySearch) -{ - ShardInterval *shardInterval = NULL; - - if (partitionMethod == DISTRIBUTE_BY_HASH) - { - int hashedValue = DatumGetInt32(FunctionCall1(hashFunction, - partitionColumnValue)); - if (useBinarySearch) - { - shardInterval = SearchCachedShardInterval(Int32GetDatum(hashedValue), - shardIntervalCache, shardCount, - compareFunction); - } - else - { - uint64 hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; - int shardIndex = (uint32) (hashedValue - INT32_MIN) / hashTokenIncrement; - - Assert(shardIndex <= shardCount); - - /* - * If the shard count is not power of 2, the range of the last - * shard becomes larger than others. For that extra piece of range, - * we still need to use the last shard. - */ - if (shardIndex == shardCount) - { - shardIndex = shardCount - 1; - } - - shardInterval = shardIntervalCache[shardIndex]; - } - } - else - { - shardInterval = SearchCachedShardInterval(partitionColumnValue, - shardIntervalCache, shardCount, - compareFunction); - } - - return shardInterval; -} - - -/* - * SearchCachedShardInterval performs a binary search for a shard interval matching a - * given partition column value and returns it. - */ -static ShardInterval * -SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, - int shardCount, FmgrInfo *compareFunction) -{ - int lowerBoundIndex = 0; - int upperBoundIndex = shardCount; - - while (lowerBoundIndex < upperBoundIndex) - { - int middleIndex = (lowerBoundIndex + upperBoundIndex) / 2; - int maxValueComparison = 0; - int minValueComparison = 0; - - minValueComparison = FunctionCall2Coll(compareFunction, - DEFAULT_COLLATION_OID, - partitionColumnValue, - shardIntervalCache[middleIndex]->minValue); - - if (DatumGetInt32(minValueComparison) < 0) - { - upperBoundIndex = middleIndex; - continue; - } - - maxValueComparison = FunctionCall2Coll(compareFunction, - DEFAULT_COLLATION_OID, - partitionColumnValue, - shardIntervalCache[middleIndex]->maxValue); - - if (DatumGetInt32(maxValueComparison) <= 0) - { - return shardIntervalCache[middleIndex]; - } - - lowerBoundIndex = middleIndex + 1; - } - - return NULL; -} - - /* * GetShardConnections finds existing connections for a shard in the hash * or opens new connections to each active placement and starts a (binary) COPY diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 1be66d944..05a598dbf 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -46,6 +46,9 @@ static uint64 * AllocateUint64(uint64 value); * LoadShardIntervalList returns a list of shard intervals related for a given * distributed table. The function returns an empty list if no shards can be * found for the given relation. + * Since LoadShardIntervalList relies on sortedShardIntervalArray, it returns + * a shard interval list whose elements are sorted on shardminvalue. Shard intervals + * with uninitialized shard min/max values are placed in the end of the list. */ List * LoadShardIntervalList(Oid relationId) @@ -59,7 +62,7 @@ LoadShardIntervalList(Oid relationId) ShardInterval *newShardInterval = NULL; newShardInterval = (ShardInterval *) palloc0(sizeof(ShardInterval)); - CopyShardInterval(&cacheEntry->shardIntervalArray[i], newShardInterval); + CopyShardInterval(cacheEntry->sortedShardIntervalArray[i], newShardInterval); shardList = lappend(shardList, newShardInterval); } @@ -91,6 +94,9 @@ ShardIntervalCount(Oid relationId) /* * LoadShardList reads list of shards for given relationId from pg_dist_shard, * and returns the list of found shardIds. + * Since LoadShardList relies on sortedShardIntervalArray, it returns a shard + * list whose elements are sorted on shardminvalue. Shards with uninitialized + * shard min/max values are placed in the end of the list. */ List * LoadShardList(Oid relationId) @@ -101,7 +107,7 @@ LoadShardList(Oid relationId) for (i = 0; i < cacheEntry->shardIntervalArrayLength; i++) { - ShardInterval *currentShardInterval = &cacheEntry->shardIntervalArray[i]; + ShardInterval *currentShardInterval = cacheEntry->sortedShardIntervalArray[i]; uint64 *shardIdPointer = AllocateUint64(currentShardInterval->shardId); shardList = lappend(shardList, shardIdPointer); diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index 9f14e4c89..58ec3ce66 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -56,7 +56,6 @@ static List * MergeShardIntervals(List *leftShardIntervalList, List *rightShardIntervalList, JoinType joinType); static bool ShardIntervalsMatch(List *leftShardIntervalList, List *rightShardIntervalList); -static List * LoadSortedShardIntervalList(Oid relationId); static List * JoinOrderForTable(TableEntry *firstTable, List *tableEntryList, List *joinClauseList); static List * BestJoinOrder(List *candidateJoinOrders); @@ -123,6 +122,22 @@ FixedJoinOrderList(FromExpr *fromExpr, List *tableEntryList) List *joinedTableList = NIL; JoinOrderNode *firstJoinNode = NULL; JoinOrderNode *currentJoinNode = NULL; + ListCell *tableEntryCell = NULL; + + foreach(tableEntryCell, tableEntryList) + { + TableEntry *rangeTableEntry = (TableEntry *) lfirst(tableEntryCell); + Oid relationId = rangeTableEntry->relationId; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); + + if (cacheEntry->hasUninitializedShardInterval) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform distributed planning on this query"), + errdetail("Shards of relations in outer join queries must " + "have shard min/max values."))); + } + } /* get the FROM section as a flattened list of JoinExpr nodes */ joinList = JoinExprList(fromExpr); @@ -159,8 +174,8 @@ FixedJoinOrderList(FromExpr *fromExpr, List *tableEntryList) joinClauseList = list_concat(joinClauseList, joinWhereClauseList); } - /* get the list of shards to check broadcast/local join possibility */ - candidateShardList = LoadSortedShardIntervalList(nextTable->relationId); + /* get the sorted list of shards to check broadcast/local join possibility */ + candidateShardList = LoadShardIntervalList(nextTable->relationId); /* find the best join rule type */ nextJoinNode = EvaluateJoinRules(joinedTableList, currentJoinNode, @@ -268,8 +283,7 @@ CreateFirstJoinOrderNode(FromExpr *fromExpr, List *tableEntryList) firstPartitionColumn, firstPartitionMethod); - firstJoinNode->shardIntervalList = - LoadSortedShardIntervalList(firstTable->relationId); + firstJoinNode->shardIntervalList = LoadShardIntervalList(firstTable->relationId); return firstJoinNode; } @@ -462,40 +476,6 @@ MergeShardIntervals(List *leftShardIntervalList, List *rightShardIntervalList, } -/* - * LoadSortedShardIntervalList loads a list of shard intervals from the metadata - * and sorts the list by the minimum value of the intervals. - */ -static List * -LoadSortedShardIntervalList(Oid relationId) -{ - List *shardIntervalList = NIL; - int shardCount = 0; - int intervalIndex = 0; - ShardInterval **sortedShardIntervalArray = NULL; - List *sortedShardIntervalList = NIL; - - shardIntervalList = LoadShardIntervalList(relationId); - - shardCount = list_length(shardIntervalList); - if (shardCount <= 1) - { - return shardIntervalList; - } - - sortedShardIntervalArray = SortedShardIntervalArray(shardIntervalList); - - for (intervalIndex = 0; intervalIndex < shardCount; intervalIndex++) - { - ShardInterval *shardInterval = sortedShardIntervalArray[intervalIndex]; - - sortedShardIntervalList = lappend(sortedShardIntervalList, shardInterval); - } - - return sortedShardIntervalList; -} - - /* * JoinOnColumns determines whether two columns are joined by a given join clause * list. diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 25c15d48d..6ce28687c 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -26,6 +26,7 @@ #include "catalog/pg_type.h" #include "commands/extension.h" #include "distributed/citus_nodes.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" @@ -137,7 +138,7 @@ static bool SupportedLateralQuery(Query *parentQuery, Query *lateralQuery); static bool JoinOnPartitionColumn(Query *query); static void ErrorIfUnsupportedShardDistribution(Query *query); static List * RelationIdList(Query *query); -static bool CoPartitionedTables(List *firstShardList, List *secondShardList); +static bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); static bool ShardIntervalsEqual(ShardInterval *firstInterval, ShardInterval *secondInterval); static void ErrorIfUnsupportedFilters(Query *subquery); @@ -3386,7 +3387,7 @@ JoinOnPartitionColumn(Query *query) static void ErrorIfUnsupportedShardDistribution(Query *query) { - List *firstShardIntervalList = NIL; + Oid firstTableRelationId = InvalidOid; List *relationIdList = RelationIdList(query); ListCell *relationIdCell = NULL; uint32 relationIndex = 0; @@ -3425,21 +3426,21 @@ ErrorIfUnsupportedShardDistribution(Query *query) foreach(relationIdCell, relationIdList) { Oid relationId = lfirst_oid(relationIdCell); - List *currentShardIntervalList = LoadShardIntervalList(relationId); bool coPartitionedTables = false; + Oid currentRelationId = relationId; /* get shard list of first relation and continue for the next relation */ if (relationIndex == 0) { - firstShardIntervalList = currentShardIntervalList; + firstTableRelationId = relationId; relationIndex++; continue; } /* check if this table has 1-1 shard partitioning with first table */ - coPartitionedTables = CoPartitionedTables(firstShardIntervalList, - currentShardIntervalList); + coPartitionedTables = CoPartitionedTables(firstTableRelationId, + currentRelationId); if (!coPartitionedTables) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -3478,21 +3479,23 @@ RelationIdList(Query *query) /* - * CoPartitionedTables checks if given shard lists have 1-to-1 shard partitioning. - * It first sorts both list according to shard interval minimum values. Then it - * compares every shard interval in order and if any pair of shard intervals are - * not equal it returns false. + * CoPartitionedTables checks if given two distributed tables have 1-to-1 shard + * partitioning. It uses shard interval array that are sorted on interval minimum + * values. Then it compares every shard interval in order and if any pair of + * shard intervals are not equal it returns false. */ static bool -CoPartitionedTables(List *firstShardList, List *secondShardList) +CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) { bool coPartitionedTables = true; uint32 intervalIndex = 0; - ShardInterval **sortedFirstIntervalArray = NULL; - ShardInterval **sortedSecondIntervalArray = NULL; - - uint32 firstListShardCount = list_length(firstShardList); - uint32 secondListShardCount = list_length(secondShardList); + DistTableCacheEntry *firstTableCache = DistributedTableCacheEntry(firstRelationId); + DistTableCacheEntry *secondTableCache = DistributedTableCacheEntry(secondRelationId); + ShardInterval **sortedFirstIntervalArray = firstTableCache->sortedShardIntervalArray; + ShardInterval **sortedSecondIntervalArray = + secondTableCache->sortedShardIntervalArray; + uint32 firstListShardCount = firstTableCache->shardIntervalArrayLength; + uint32 secondListShardCount = secondTableCache->shardIntervalArrayLength; if (firstListShardCount != secondListShardCount) { @@ -3505,9 +3508,6 @@ CoPartitionedTables(List *firstShardList, List *secondShardList) return true; } - sortedFirstIntervalArray = SortedShardIntervalArray(firstShardList); - sortedSecondIntervalArray = SortedShardIntervalArray(secondShardList); - for (intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++) { ShardInterval *firstInterval = sortedFirstIntervalArray[intervalIndex]; diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 853cb886e..0d13273fe 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -37,6 +37,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" +#include "distributed/shardinterval_utils.h" #include "distributed/task_tracker.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" @@ -110,8 +111,6 @@ static MapMergeJob * BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Oid baseRelationId, BoundaryNodeJobType boundaryNodeJobType); static uint32 HashPartitionCount(void); -static int CompareShardIntervals(const void *leftElement, const void *rightElement, - FmgrInfo *typeCompareFunction); static ArrayType * SplitPointObject(ShardInterval **shardIntervalArray, uint32 shardIntervalCount); @@ -1716,10 +1715,17 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey, else if (partitionType == RANGE_PARTITION_TYPE) { /* build the split point object for the table on the right-hand side */ - List *shardIntervalList = LoadShardIntervalList(baseRelationId); - uint32 shardCount = (uint32) list_length(shardIntervalList); - ShardInterval **sortedShardIntervalArray = - SortedShardIntervalArray(shardIntervalList); + DistTableCacheEntry *cache = DistributedTableCacheEntry(baseRelationId); + bool hasUninitializedShardInterval = false; + uint32 shardCount = cache->shardIntervalArrayLength; + ShardInterval **sortedShardIntervalArray = cache->sortedShardIntervalArray; + + hasUninitializedShardInterval = cache->hasUninitializedShardInterval; + if (hasUninitializedShardInterval) + { + ereport(ERROR, (errmsg("cannot range repartition shard with " + "missing min/max values"))); + } /* this join-type currently doesn't work for hash partitioned tables */ char basePartitionMethod PG_USED_FOR_ASSERTS_ONLY = @@ -1754,78 +1760,6 @@ HashPartitionCount(void) } -/* - * SortedShardIntervalArray returns a sorted array of shard intervals for shards - * in the given shard list. The array elements are sorted in in ascending order - * according to shard interval's minimum value. - */ -ShardInterval ** -SortedShardIntervalArray(List *shardIntervalList) -{ - FmgrInfo *typeCompareFunction = NULL; - ListCell *shardIntervalCell = NULL; - uint32 shardIntervalIndex = 0; - - ShardInterval **shardIntervalArray = NULL; - uint32 shardIntervalCount = (uint32) list_length(shardIntervalList); - Assert(shardIntervalCount > 0); - - /* allocate an array for sorted shard intervals */ - shardIntervalArray = palloc0(shardIntervalCount * sizeof(ShardInterval *)); - - /* fill in the array with shard intervals */ - foreach(shardIntervalCell, shardIntervalList) - { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); - if (shardInterval->minValueExists && shardInterval->maxValueExists) - { - shardIntervalArray[shardIntervalIndex] = shardInterval; - } - else - { - ereport(ERROR, (errmsg("cannot range repartition shard " UINT64_FORMAT - " with missing min/max values", - shardInterval->shardId))); - } - - /* resolve the datum type and comparison function on first pass */ - if (shardIntervalIndex == 0) - { - Oid typeId = shardInterval->valueTypeId; - typeCompareFunction = GetFunctionInfo(typeId, BTREE_AM_OID, BTORDER_PROC); - } - shardIntervalIndex++; - } - - /* sort shard intervals by their minimum values in ascending order */ - qsort_arg(shardIntervalArray, shardIntervalCount, sizeof(ShardInterval *), - (qsort_arg_comparator) CompareShardIntervals, (void *) typeCompareFunction); - - return shardIntervalArray; -} - - -/* - * CompareShardIntervals acts as a helper function to compare two shard interval - * pointers by their minimum values, using the value's type comparison function. - */ -static int -CompareShardIntervals(const void *leftElement, const void *rightElement, - FmgrInfo *typeCompareFunction) -{ - ShardInterval **leftShardInterval = (ShardInterval **) leftElement; - ShardInterval **rightShardInterval = (ShardInterval **) rightElement; - - Datum leftDatum = (*leftShardInterval)->minValue; - Datum rightDatum = (*rightShardInterval)->minValue; - - Datum comparisonDatum = CompareCall2(typeCompareFunction, leftDatum, rightDatum); - int comparisonResult = DatumGetInt32(comparisonDatum); - - return comparisonResult; -} - - /* * SplitPointObject walks over shard intervals in the given array, extracts each * shard interval's minimum value, sorts and inserts these minimum values into a @@ -2031,7 +1965,6 @@ SubquerySqlTaskList(Job *job) List *shardIntervalList = LoadShardIntervalList(relationId); List *finalShardIntervalList = NIL; ListCell *fragmentCombinationCell = NULL; - ShardInterval **sortedIntervalArray = NULL; uint32 tableId = rangeTableIndex + 1; /* tableId starts from 1 */ uint32 finalShardCount = 0; uint32 shardIndex = 0; @@ -2056,12 +1989,11 @@ SubquerySqlTaskList(Job *job) return NIL; } - sortedIntervalArray = SortedShardIntervalArray(finalShardIntervalList); fragmentCombinationCell = list_head(fragmentCombinationList); for (shardIndex = 0; shardIndex < finalShardCount; shardIndex++) { - ShardInterval *shardInterval = sortedIntervalArray[shardIndex]; + ShardInterval *shardInterval = list_nth(finalShardIntervalList, shardIndex); RangeTableFragment *shardFragment = palloc0(fragmentSize); shardFragment->fragmentReference = &(shardInterval->shardId); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 90b654c90..1f0d44dc5 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -33,6 +33,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" +#include "distributed/shardinterval_utils.h" #include "executor/execdesc.h" #include "lib/stringinfo.h" #if (PG_VERSION_NUM >= 90500) @@ -51,6 +52,7 @@ #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/relcache.h" +#include "utils/typcache.h" /* planner functions forward declarations */ @@ -62,8 +64,11 @@ static OnConflictExpr * RebuildOnConflict(Oid relationId, #endif static ShardInterval * TargetShardInterval(Query *query); static List * QueryRestrictList(Query *query); +static bool FastShardPruningPossible(CmdType commandType, char partitionMethod); +static ShardInterval * FastShardPruning(Oid distributedTableId, + Const *partionColumnValue); static Oid ExtractFirstDistributedTableId(Query *query); -static Const * ExtractPartitionValue(Query *query, Var *partitionColumn); +static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn); static Task * RouterSelectTask(Query *query); static Job * RouterQueryJob(Query *query, Task *task); static bool ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column); @@ -507,18 +512,19 @@ TargetShardInterval(Query *query) { CmdType commandType = query->commandType; bool selectTask = (commandType == CMD_SELECT); - List *restrictClauseList = NIL; List *prunedShardList = NIL; - Index tableId = 1; int prunedShardCount = 0; + int shardCount = 0; Oid distributedTableId = ExtractFirstDistributedTableId(query); - List *shardIntervalList = NIL; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + char partitionMethod = cacheEntry->partitionMethod; + bool fastShardPruningPossible = false; /* error out if no shards exist for the table */ - shardIntervalList = LoadShardIntervalList(distributedTableId); - if (shardIntervalList == NIL) + shardCount = cacheEntry->shardIntervalArrayLength; + if (shardCount == 0) { char *relationName = get_rel_name(distributedTableId); @@ -530,9 +536,29 @@ TargetShardInterval(Query *query) "and try again."))); } - restrictClauseList = QueryRestrictList(query); - prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList, - shardIntervalList); + fastShardPruningPossible = FastShardPruningPossible(query->commandType, + partitionMethod); + if (fastShardPruningPossible) + { + uint32 rangeTableId = 1; + Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId); + Const *partitionValue = ExtractInsertPartitionValue(query, partitionColumn); + ShardInterval *shardInterval = FastShardPruning(distributedTableId, + partitionValue); + + if (shardInterval != NULL) + { + prunedShardList = lappend(prunedShardList, shardInterval); + } + } + else + { + List *restrictClauseList = QueryRestrictList(query); + Index tableId = 1; + List *shardIntervalList = LoadShardIntervalList(distributedTableId); + + prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList, + shardIntervalList); prunedShardCount = list_length(prunedShardList); if (prunedShardCount != 1) { @@ -554,6 +580,74 @@ TargetShardInterval(Query *query) } +/* + * UseFastShardPruning returns true if the commandType is INSERT and partition method + * is hash or range. + */ +static bool +FastShardPruningPossible(CmdType commandType, char partitionMethod) +{ + /* we currently only support INSERTs */ + if (commandType != CMD_INSERT) + { + return false; + } + + /* fast shard pruning is only supported for hash and range partitioned tables */ + if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE) + { + return true; + } + + return false; +} + + +/* + * FastShardPruning is a higher level API for FindShardInterval function. Given the relationId + * of the distributed table and partitionValue, FastShardPruning function finds the corresponding + * shard interval that the partitionValue should be in. FastShardPruning returns NULL if no + * ShardIntervals exist for the given partitionValue. + */ +static ShardInterval * +FastShardPruning(Oid distributedTableId, Const *partitionValue) +{ + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + int shardCount = cacheEntry->shardIntervalArrayLength; + ShardInterval **sortedShardIntervalArray = cacheEntry->sortedShardIntervalArray; + bool useBinarySearch = false; + char partitionMethod = cacheEntry->partitionMethod; + FmgrInfo *shardIntervalCompareFunction = cacheEntry->shardIntervalCompareFunction; + bool hasUniformHashDistribution = cacheEntry->hasUniformHashDistribution; + FmgrInfo *hashFunction = NULL; + ShardInterval *shardInterval = NULL; + + /* determine whether to use binary search */ + if (partitionMethod != DISTRIBUTE_BY_HASH || !hasUniformHashDistribution) + { + useBinarySearch = true; + } + + /* we only need hash functions for hash distributed tables */ + if (partitionMethod == DISTRIBUTE_BY_HASH) + { + hashFunction = cacheEntry->hashFunction; + } + + /* + * Call FindShardInterval to find the corresponding shard interval for the + * given partition value. + */ + shardInterval = FindShardInterval(partitionValue->constvalue, + sortedShardIntervalArray, shardCount, + partitionMethod, + shardIntervalCompareFunction, hashFunction, + useBinarySearch); + + return shardInterval; +} + + /* * QueryRestrictList returns the restriction clauses for the query. For a SELECT * statement these are the where-clause expressions. For INSERT statements we @@ -572,7 +666,7 @@ QueryRestrictList(Query *query) Oid distributedTableId = ExtractFirstDistributedTableId(query); uint32 rangeTableId = 1; Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId); - Const *partitionValue = ExtractPartitionValue(query, partitionColumn); + Const *partitionValue = ExtractInsertPartitionValue(query, partitionColumn); OpExpr *equalityExpr = MakeOpExpression(partitionColumn, BTEqualStrategyNumber); @@ -628,11 +722,11 @@ ExtractFirstDistributedTableId(Query *query) /* * ExtractPartitionValue extracts the partition column value from a the target - * of a modification command. If a partition value is missing altogether or is + * of an INSERT command. If a partition value is missing altogether or is * NULL, this function throws an error. */ static Const * -ExtractPartitionValue(Query *query, Var *partitionColumn) +ExtractInsertPartitionValue(Query *query, Var *partitionColumn) { Const *partitionValue = NULL; TargetEntry *targetEntry = get_tle_by_resno(query->targetList, diff --git a/src/backend/distributed/test/prune_shard_list.c b/src/backend/distributed/test/prune_shard_list.c index b0d0eba9c..e78bfc802 100644 --- a/src/backend/distributed/test/prune_shard_list.c +++ b/src/backend/distributed/test/prune_shard_list.c @@ -22,6 +22,7 @@ #include "access/skey.h" #endif #include "catalog/pg_type.h" +#include "distributed/metadata_cache.h" #include "distributed/master_metadata_utility.h" #include "distributed/multi_join_order.h" #include "distributed/multi_physical_planner.h" @@ -38,6 +39,7 @@ /* local function forward declarations */ static Expr * MakeTextPartitionExpression(Oid distributedTableId, text *value); static ArrayType * PrunedShardIdsForTable(Oid distributedTableId, List *whereClauseList); +static ArrayType * SortedShardIntervalArray(Oid distributedTableId); /* declarations for dynamic loading */ @@ -46,6 +48,7 @@ PG_FUNCTION_INFO_V1(prune_using_single_value); PG_FUNCTION_INFO_V1(prune_using_either_value); PG_FUNCTION_INFO_V1(prune_using_both_values); PG_FUNCTION_INFO_V1(debug_equality_expression); +PG_FUNCTION_INFO_V1(print_sorted_shard_intervals); /* @@ -140,6 +143,21 @@ debug_equality_expression(PG_FUNCTION_ARGS) } +/* + * print_sorted_shard_intervals prints the sorted shard interval array that is in the + * metadata cache. This function aims to test sorting functionality. + */ +Datum +print_sorted_shard_intervals(PG_FUNCTION_ARGS) +{ + Oid distributedTableId = PG_GETARG_OID(0); + + ArrayType *shardIdArrayType = SortedShardIntervalArray(distributedTableId); + + PG_RETURN_ARRAYTYPE_P(shardIdArrayType); +} + + /* * MakeTextPartitionExpression returns an equality expression between the * specified table's partition column and the provided values. @@ -212,3 +230,34 @@ PrunedShardIdsForTable(Oid distributedTableId, List *whereClauseList) return shardIdArrayType; } + + +/* + * SortedShardIntervalArray simply returns the shard interval ids in the sorted shard + * interval cache as a datum array. + */ +static ArrayType * +SortedShardIntervalArray(Oid distributedTableId) +{ + ArrayType *shardIdArrayType = NULL; + int shardIndex = 0; + Oid shardIdTypeId = INT8OID; + + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + ShardInterval **shardIntervalArray = cacheEntry->sortedShardIntervalArray; + int shardIdCount = cacheEntry->shardIntervalArrayLength; + Datum *shardIdDatumArray = palloc0(shardIdCount * sizeof(Datum)); + + for (shardIndex = 0; shardIndex < shardIdCount; ++shardIndex) + { + ShardInterval *shardId = shardIntervalArray[shardIndex]; + Datum shardIdDatum = Int64GetDatum(shardId->shardId); + + shardIdDatumArray[shardIndex] = shardIdDatum; + } + + shardIdArrayType = DatumArrayToArrayType(shardIdDatumArray, shardIdCount, + shardIdTypeId); + + return shardIdArrayType; +} diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 105179208..0608d2df5 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -22,6 +22,8 @@ #include "distributed/metadata_cache.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" +#include "distributed/shardinterval_utils.h" +#include "distributed/worker_protocol.h" #include "parser/parse_func.h" #include "utils/builtins.h" #include "utils/catcache.h" @@ -34,6 +36,7 @@ #include "utils/relfilenodemap.h" #include "utils/relmapper.h" #include "utils/syscache.h" +#include "utils/typcache.h" /* state which should be cleared upon DROP EXTENSION */ @@ -57,6 +60,16 @@ static ScanKeyData DistShardScanKey[1]; /* local function forward declarations */ static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId); +static FmgrInfo * ShardIntervalCompareFunction(ShardInterval **shardIntervalArray, + char partitionMethod); +static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArray, + int shardCount, + FmgrInfo * + shardIntervalSortCompareFunction); +static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, + int shardIntervalArrayLength); +static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, + int shardCount); static void InitializeDistTableCache(void); static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); @@ -202,7 +215,12 @@ LookupDistTableCacheEntry(Oid relationId) char partitionMethod = 0; List *distShardTupleList = NIL; int shardIntervalArrayLength = 0; - ShardInterval *shardIntervalArray = NULL; + ShardInterval **shardIntervalArray = NULL; + ShardInterval **sortedShardIntervalArray = NULL; + FmgrInfo *shardIntervalCompareFunction = NULL; + FmgrInfo *hashFunction = NULL; + bool hasUninitializedShardInterval = false; + bool hasUniformHashDistribution = false; void *hashKey = (void *) &relationId; if (DistTableCacheHash == NULL) @@ -257,7 +275,7 @@ LookupDistTableCacheEntry(Oid relationId) shardIntervalArray = MemoryContextAllocZero(CacheMemoryContext, shardIntervalArrayLength * - sizeof(ShardInterval)); + sizeof(ShardInterval *)); foreach(distShardTupleCell, distShardTupleList) { @@ -266,9 +284,12 @@ LookupDistTableCacheEntry(Oid relationId) distShardTupleDesc, intervalTypeId, intervalTypeMod); + ShardInterval *newShardInterval = NULL; MemoryContext oldContext = MemoryContextSwitchTo(CacheMemoryContext); - CopyShardInterval(shardInterval, &shardIntervalArray[arrayIndex]); + newShardInterval = (ShardInterval *) palloc0(sizeof(ShardInterval)); + CopyShardInterval(shardInterval, newShardInterval); + shardIntervalArray[arrayIndex] = newShardInterval; MemoryContextSwitchTo(oldContext); @@ -280,6 +301,53 @@ LookupDistTableCacheEntry(Oid relationId) heap_close(distShardRelation, AccessShareLock); } + /* decide and allocate interval comparison function */ + if (shardIntervalArrayLength > 0) + { + MemoryContext oldContext = CurrentMemoryContext; + + /* allocate the comparison function in the cache context */ + oldContext = MemoryContextSwitchTo(CacheMemoryContext); + + shardIntervalCompareFunction = ShardIntervalCompareFunction(shardIntervalArray, + partitionMethod); + + MemoryContextSwitchTo(oldContext); + } + + /* sort the interval array */ + sortedShardIntervalArray = SortShardIntervalArray(shardIntervalArray, + shardIntervalArrayLength, + shardIntervalCompareFunction); + + /* check the shard distribution for hash partitioned tables */ + if (partitionMethod == DISTRIBUTE_BY_HASH) + { + hasUniformHashDistribution = + HasUniformHashDistribution(sortedShardIntervalArray, + shardIntervalArrayLength); + } + + /* check if there exists any shard intervals with no min/max values */ + hasUninitializedShardInterval = + HasUninitializedShardInterval(sortedShardIntervalArray, shardIntervalArrayLength); + + /* we only need hash functions for hash distributed tables */ + if (partitionMethod == DISTRIBUTE_BY_HASH) + { + TypeCacheEntry *typeEntry = NULL; + Node *partitionNode = stringToNode(partitionKeyString); + Var *partitionColumn = (Var *) partitionNode; + Assert(IsA(partitionNode, Var)); + typeEntry = lookup_type_cache(partitionColumn->vartype, + TYPECACHE_HASH_PROC_FINFO); + + hashFunction = MemoryContextAllocZero(CacheMemoryContext, + sizeof(FmgrInfo)); + + fmgr_info_copy(hashFunction, &(typeEntry->hash_proc_finfo), CacheMemoryContext); + } + cacheEntry = hash_search(DistTableCacheHash, hashKey, HASH_ENTER, NULL); /* zero out entry, but not the key part */ @@ -298,13 +366,151 @@ LookupDistTableCacheEntry(Oid relationId) cacheEntry->partitionKeyString = partitionKeyString; cacheEntry->partitionMethod = partitionMethod; cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength; - cacheEntry->shardIntervalArray = shardIntervalArray; + cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray; + cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction; + cacheEntry->hashFunction = hashFunction; + cacheEntry->hasUninitializedShardInterval = hasUninitializedShardInterval; + cacheEntry->hasUniformHashDistribution = hasUniformHashDistribution; } return cacheEntry; } +/* + * ShardIntervalCompareFunction returns the appropriate compare function for the + * partition column type. In case of hash-partitioning, it always returns the compare + * function for integers. Callers of this function has to ensure that shardIntervalArray + * has at least one element. + */ +static FmgrInfo * +ShardIntervalCompareFunction(ShardInterval **shardIntervalArray, char partitionMethod) +{ + FmgrInfo *shardIntervalCompareFunction = NULL; + Oid comparisonTypeId = InvalidOid; + + Assert(shardIntervalArray != NULL); + + if (partitionMethod == DISTRIBUTE_BY_HASH) + { + comparisonTypeId = INT4OID; + } + else + { + ShardInterval *shardInterval = shardIntervalArray[0]; + comparisonTypeId = shardInterval->valueTypeId; + } + + shardIntervalCompareFunction = GetFunctionInfo(comparisonTypeId, BTREE_AM_OID, + BTORDER_PROC); + + return shardIntervalCompareFunction; +} + + +/* + * SortedShardIntervalArray sorts the input shardIntervalArray. Shard intervals with + * no min/max values are placed at the end of the array. + */ +static ShardInterval ** +SortShardIntervalArray(ShardInterval **shardIntervalArray, int shardCount, + FmgrInfo *shardIntervalSortCompareFunction) +{ + ShardInterval **sortedShardIntervalArray = NULL; + + /* short cut if there are no shard intervals in the array */ + if (shardCount == 0) + { + return shardIntervalArray; + } + + /* if a shard doesn't have min/max values, it's placed in the end of the array */ + qsort_arg(shardIntervalArray, shardCount, sizeof(ShardInterval *), + (qsort_arg_comparator) CompareShardIntervals, + (void *) shardIntervalSortCompareFunction); + + sortedShardIntervalArray = shardIntervalArray; + + return sortedShardIntervalArray; +} + + +/* + * HasUniformHashDistribution determines whether the given list of sorted shards + * has a uniform hash distribution, as produced by master_create_worker_shards for + * hash partitioned tables. + */ +static bool +HasUniformHashDistribution(ShardInterval **shardIntervalArray, + int shardIntervalArrayLength) +{ + uint64 hashTokenIncrement = 0; + int shardIndex = 0; + + /* if there are no shards, there is no uniform distribution */ + if (shardIntervalArrayLength == 0) + { + return false; + } + + /* calculate the hash token increment */ + hashTokenIncrement = HASH_TOKEN_COUNT / shardIntervalArrayLength; + + for (shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++) + { + ShardInterval *shardInterval = shardIntervalArray[shardIndex]; + int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement); + int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); + + if (shardIndex == (shardIntervalArrayLength - 1)) + { + shardMaxHashToken = INT32_MAX; + } + + if (DatumGetInt32(shardInterval->minValue) != shardMinHashToken || + DatumGetInt32(shardInterval->maxValue) != shardMaxHashToken) + { + return false; + } + } + + return true; +} + + +/* + * HasUninitializedShardInterval returns true if all the elements of the + * sortedShardIntervalArray has min/max values. Callers of the function must + * ensure that input shard interval array is sorted on shardminvalue and uninitialized + * shard intervals are at the end of the array. + */ +static bool +HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount) +{ + bool hasUninitializedShardInterval = false; + ShardInterval *lastShardInterval = NULL; + + if (shardCount == 0) + { + return hasUninitializedShardInterval; + } + + Assert(sortedShardIntervalArray != NULL); + + /* + * Since the shard interval array is sorted, and uninitialized ones stored + * in the end of the array, checking the last element is enough. + */ + lastShardInterval = sortedShardIntervalArray[shardCount - 1]; + if (!lastShardInterval->minValueExists || !lastShardInterval->maxValueExists) + { + hasUninitializedShardInterval = true; + } + + return hasUninitializedShardInterval; +} + + /* * CitusHasBeenLoaded returns true if the citus extension has been created * in the current database and the extension script has been executed. Otherwise, @@ -628,7 +834,7 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry) for (i = 0; i < cacheEntry->shardIntervalArrayLength; i++) { - ShardInterval *shardInterval = &cacheEntry->shardIntervalArray[i]; + ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[i]; bool valueByVal = shardInterval->valueByVal; if (!valueByVal) @@ -643,11 +849,26 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry) pfree(DatumGetPointer(shardInterval->maxValue)); } } + + pfree(shardInterval); } - pfree(cacheEntry->shardIntervalArray); - cacheEntry->shardIntervalArray = NULL; + pfree(cacheEntry->sortedShardIntervalArray); + cacheEntry->sortedShardIntervalArray = NULL; cacheEntry->shardIntervalArrayLength = 0; + + cacheEntry->hasUninitializedShardInterval = false; + cacheEntry->hasUniformHashDistribution = false; + + pfree(cacheEntry->shardIntervalCompareFunction); + cacheEntry->shardIntervalCompareFunction = NULL; + + /* we only allocated hash function for hash distributed tables */ + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH) + { + pfree(cacheEntry->hashFunction); + cacheEntry->hashFunction = NULL; + } } } diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c new file mode 100644 index 000000000..68acb56e0 --- /dev/null +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -0,0 +1,179 @@ +/*------------------------------------------------------------------------- + * + * shardinterval_utils.c + * + * This file contains functions to perform useful operations on shard intervals. + * + * Copyright (c) 2014-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/nbtree.h" +#include "catalog/pg_am.h" +#include "catalog/pg_collation.h" +#include "catalog/pg_type.h" +#include "distributed/shardinterval_utils.h" +#include "distributed/pg_dist_partition.h" +#include "distributed/worker_protocol.h" +#include "utils/catcache.h" +#include "utils/memutils.h" + + +static ShardInterval * SearchCachedShardInterval(Datum partitionColumnValue, + ShardInterval **shardIntervalCache, + int shardCount, + FmgrInfo *compareFunction); + + +/* + * CompareShardIntervals acts as a helper function to compare two shard intervals + * by their minimum values, using the value's type comparison function. + * + * If a shard interval does not have min/max value, it's treated as being greater + * than the other. + */ +int +CompareShardIntervals(const void *leftElement, const void *rightElement, + FmgrInfo *typeCompareFunction) +{ + ShardInterval *leftShardInterval = *((ShardInterval **) leftElement); + ShardInterval *rightShardInterval = *((ShardInterval **) rightElement); + Datum leftDatum = 0; + Datum rightDatum = 0; + Datum comparisonDatum = 0; + int comparisonResult = 0; + + Assert(typeCompareFunction != NULL); + + /* + * Left element should be treated as the greater element in case it doesn't + * have min or max values. + */ + if (!leftShardInterval->minValueExists || !leftShardInterval->maxValueExists) + { + comparisonResult = 1; + return comparisonResult; + } + + /* + * Right element should be treated as the greater element in case it doesn't + * have min or max values. + */ + if (!rightShardInterval->minValueExists || !rightShardInterval->maxValueExists) + { + comparisonResult = -1; + return comparisonResult; + } + + /* if both shard interval have min/max values, calculate the comparison result */ + leftDatum = leftShardInterval->minValue; + rightDatum = rightShardInterval->minValue; + + comparisonDatum = CompareCall2(typeCompareFunction, leftDatum, rightDatum); + comparisonResult = DatumGetInt32(comparisonDatum); + + return comparisonResult; +} + + +/* + * FindShardInterval finds a single shard interval in the cache for the + * given partition column value. + */ +ShardInterval * +FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, + int shardCount, char partitionMethod, FmgrInfo *compareFunction, + FmgrInfo *hashFunction, bool useBinarySearch) +{ + ShardInterval *shardInterval = NULL; + + if (partitionMethod == DISTRIBUTE_BY_HASH) + { + int hashedValue = DatumGetInt32(FunctionCall1(hashFunction, + partitionColumnValue)); + if (useBinarySearch) + { + Assert(compareFunction != NULL); + + shardInterval = SearchCachedShardInterval(Int32GetDatum(hashedValue), + shardIntervalCache, shardCount, + compareFunction); + } + else + { + uint64 hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; + int shardIndex = (uint32) (hashedValue - INT32_MIN) / hashTokenIncrement; + + Assert(shardIndex <= shardCount); + + /* + * If the shard count is not power of 2, the range of the last + * shard becomes larger than others. For that extra piece of range, + * we still need to use the last shard. + */ + if (shardIndex == shardCount) + { + shardIndex = shardCount - 1; + } + + shardInterval = shardIntervalCache[shardIndex]; + } + } + else + { + Assert(compareFunction != NULL); + + shardInterval = SearchCachedShardInterval(partitionColumnValue, + shardIntervalCache, shardCount, + compareFunction); + } + + return shardInterval; +} + + +/* + * SearchCachedShardInterval performs a binary search for a shard interval matching a + * given partition column value and returns it. + */ +static ShardInterval * +SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, + int shardCount, FmgrInfo *compareFunction) +{ + int lowerBoundIndex = 0; + int upperBoundIndex = shardCount; + + while (lowerBoundIndex < upperBoundIndex) + { + int middleIndex = (lowerBoundIndex + upperBoundIndex) / 2; + int maxValueComparison = 0; + int minValueComparison = 0; + + minValueComparison = FunctionCall2Coll(compareFunction, + DEFAULT_COLLATION_OID, + partitionColumnValue, + shardIntervalCache[middleIndex]->minValue); + + if (DatumGetInt32(minValueComparison) < 0) + { + upperBoundIndex = middleIndex; + continue; + } + + maxValueComparison = FunctionCall2Coll(compareFunction, + DEFAULT_COLLATION_OID, + partitionColumnValue, + shardIntervalCache[middleIndex]->maxValue); + + if (DatumGetInt32(maxValueComparison) <= 0) + { + return shardIntervalCache[middleIndex]; + } + + lowerBoundIndex = middleIndex + 1; + } + + return NULL; +} diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 15dddc30a..32485e152 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -11,6 +11,7 @@ #ifndef METADATA_CACHE_H #define METADATA_CACHE_H +#include "fmgr.h" #include "distributed/master_metadata_utility.h" #include "distributed/pg_dist_partition.h" @@ -31,6 +32,8 @@ typedef struct bool isValid; bool isDistributedTable; + bool hasUninitializedShardInterval; + bool hasUniformHashDistribution; /* valid for hash partitioned tables */ /* pg_dist_partition metadata for this table */ char *partitionKeyString; @@ -38,7 +41,10 @@ typedef struct /* pg_dist_shard metadata (variable-length ShardInterval array) for this table */ int shardIntervalArrayLength; - ShardInterval *shardIntervalArray; + ShardInterval **sortedShardIntervalArray; + + FmgrInfo *shardIntervalCompareFunction; /* NULL if no shard intervals exist */ + FmgrInfo *hashFunction; /* NULL if table is not distributed by hash */ } DistTableCacheEntry; diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 6b0fb4172..28fc6525b 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -237,7 +237,6 @@ extern bool OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitio extern int CompareShardPlacements(const void *leftElement, const void *rightElement); /* Function declarations for sorting shards. */ -extern ShardInterval ** SortedShardIntervalArray(List *shardList); extern bool ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterval); diff --git a/src/include/distributed/shardinterval_utils.h b/src/include/distributed/shardinterval_utils.h new file mode 100644 index 000000000..9d8d1ccf2 --- /dev/null +++ b/src/include/distributed/shardinterval_utils.h @@ -0,0 +1,35 @@ +/*------------------------------------------------------------------------- + * + * shardinterval_utils.h + * + * Declarations for public utility functions related to shard intervals. + * + * Copyright (c) 2014-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef SHARDINTERVAL_UTILS_H_ +#define SHARDINTERVAL_UTILS_H_ + +#include "distributed/master_metadata_utility.h" +#include "nodes/primnodes.h" + +/* OperatorCacheEntry contains information for each element in OperatorCache */ +typedef struct ShardIntervalCompareFunctionCacheEntry +{ + Var *partitionColumn; + char partitionMethod; + FmgrInfo *functionInfo; +} ShardIntervalCompareFunctionCacheEntry; + +extern int CompareShardIntervals(const void *leftElement, const void *rightElement, + FmgrInfo *typeCompareFunction); +extern ShardInterval * FindShardInterval(Datum partitionColumnValue, + ShardInterval **shardIntervalCache, + int shardCount, char partitionMethod, + FmgrInfo *compareFunction, + FmgrInfo *hashFunction, bool useBinarySearch); + + +#endif /* SHARDINTERVAL_UTILS_H_ */ diff --git a/src/test/regress/expected/multi_distribution_metadata.out b/src/test/regress/expected/multi_distribution_metadata.out index d626b179e..a096495e7 100644 --- a/src/test/regress/expected/multi_distribution_metadata.out +++ b/src/test/regress/expected/multi_distribution_metadata.out @@ -87,7 +87,7 @@ VALUES SELECT load_shard_id_array('events_hash'); load_shard_id_array --------------------- - {4,3,2,1} + {1,2,3,4} (1 row) -- should see array with first shard range diff --git a/src/test/regress/expected/multi_hash_pruning.out b/src/test/regress/expected/multi_hash_pruning.out index 782360c04..12b5317d3 100644 --- a/src/test/regress/expected/multi_hash_pruning.out +++ b/src/test/regress/expected/multi_hash_pruning.out @@ -73,25 +73,25 @@ DEBUG: predicate pruning for shardId 111 (1 row) EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2; -DEBUG: predicate pruning for shardId 113 DEBUG: predicate pruning for shardId 112 DEBUG: predicate pruning for shardId 110 +DEBUG: predicate pruning for shardId 113 QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported (1 row) EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3; -DEBUG: predicate pruning for shardId 111 DEBUG: predicate pruning for shardId 110 +DEBUG: predicate pruning for shardId 111 QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported (1 row) EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4; -DEBUG: predicate pruning for shardId 111 DEBUG: predicate pruning for shardId 110 +DEBUG: predicate pruning for shardId 111 QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported @@ -99,8 +99,8 @@ DEBUG: predicate pruning for shardId 110 EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is NULL; DEBUG: predicate pruning for shardId 112 -DEBUG: predicate pruning for shardId 111 DEBUG: predicate pruning for shardId 110 +DEBUG: predicate pruning for shardId 111 QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported @@ -231,14 +231,14 @@ DEBUG: predicate pruning for shardId 111 EXPLAIN SELECT count(*) FROM orders_hash_partitioned orders1, orders_hash_partitioned orders2 WHERE orders1.o_orderkey = orders2.o_orderkey; -DEBUG: join prunable for intervals [-1011077333,0] and [1134484726,1134484726] -DEBUG: join prunable for intervals [-1011077333,0] and [-1905060026,-1905060026] DEBUG: join prunable for intervals [-1905060026,-28094569] and [1134484726,1134484726] -DEBUG: join prunable for intervals [1134484726,1134484726] and [-1011077333,0] -DEBUG: join prunable for intervals [1134484726,1134484726] and [-1905060026,-28094569] -DEBUG: join prunable for intervals [1134484726,1134484726] and [-1905060026,-1905060026] DEBUG: join prunable for intervals [-1905060026,-1905060026] and [-1011077333,0] DEBUG: join prunable for intervals [-1905060026,-1905060026] and [1134484726,1134484726] +DEBUG: join prunable for intervals [-1011077333,0] and [-1905060026,-1905060026] +DEBUG: join prunable for intervals [-1011077333,0] and [1134484726,1134484726] +DEBUG: join prunable for intervals [1134484726,1134484726] and [-1905060026,-28094569] +DEBUG: join prunable for intervals [1134484726,1134484726] and [-1905060026,-1905060026] +DEBUG: join prunable for intervals [1134484726,1134484726] and [-1011077333,0] QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported @@ -252,8 +252,8 @@ EXPLAIN SELECT count(*) DEBUG: predicate pruning for shardId 113 DEBUG: predicate pruning for shardId 111 DEBUG: predicate pruning for shardId 112 -DEBUG: predicate pruning for shardId 111 DEBUG: predicate pruning for shardId 110 +DEBUG: predicate pruning for shardId 111 DEBUG: join prunable for intervals [-1905060026,-1905060026] and [-1011077333,0] QUERY PLAN ---------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_index_statements.out b/src/test/regress/expected/multi_index_statements.out index d6f903a94..f32d15cf6 100644 --- a/src/test/regress/expected/multi_index_statements.out +++ b/src/test/regress/expected/multi_index_statements.out @@ -137,11 +137,11 @@ ERROR: creating unique indexes on append-partitioned tables is currently unsupp CREATE INDEX lineitem_orderkey_index ON lineitem (l_orderkey); ERROR: relation "lineitem_orderkey_index" already exists CREATE INDEX try_index ON lineitem USING gist (l_orderkey); -WARNING: could not receive query results from localhost:57638 +WARNING: could not receive query results from localhost:57637 DETAIL: Client error: data type bigint has no default operator class for access method "gist" ERROR: could not execute DDL command on worker node shards CREATE INDEX try_index ON lineitem (non_existent_column); -WARNING: could not receive query results from localhost:57638 +WARNING: could not receive query results from localhost:57637 DETAIL: Client error: column "non_existent_column" does not exist ERROR: could not execute DDL command on worker node shards CREATE INDEX ON lineitem (l_orderkey); diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index 9fd9cac07..ffc9ae245 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -8,34 +8,34 @@ SET client_min_messages TO DEBUG2; EXPLAIN SELECT l1.l_quantity FROM lineitem l1, lineitem l2 WHERE l1.l_orderkey = l2.l_orderkey AND l1.l_quantity > 5; LOG: join order: [ "lineitem" ][ local partition join "lineitem" ] -DEBUG: join prunable for intervals [13921,14947] and [11554,13920] -DEBUG: join prunable for intervals [13921,14947] and [8997,11554] -DEBUG: join prunable for intervals [13921,14947] and [4965,5986] -DEBUG: join prunable for intervals [13921,14947] and [2497,4964] -DEBUG: join prunable for intervals [13921,14947] and [1,2496] -DEBUG: join prunable for intervals [11554,13920] and [13921,14947] -DEBUG: join prunable for intervals [11554,13920] and [4965,5986] -DEBUG: join prunable for intervals [11554,13920] and [2497,4964] -DEBUG: join prunable for intervals [11554,13920] and [1,2496] -DEBUG: join prunable for intervals [8997,11554] and [13921,14947] -DEBUG: join prunable for intervals [8997,11554] and [4965,5986] -DEBUG: join prunable for intervals [8997,11554] and [2497,4964] -DEBUG: join prunable for intervals [8997,11554] and [1,2496] -DEBUG: join prunable for intervals [4965,5986] and [13921,14947] -DEBUG: join prunable for intervals [4965,5986] and [11554,13920] -DEBUG: join prunable for intervals [4965,5986] and [8997,11554] -DEBUG: join prunable for intervals [4965,5986] and [2497,4964] -DEBUG: join prunable for intervals [4965,5986] and [1,2496] -DEBUG: join prunable for intervals [2497,4964] and [13921,14947] -DEBUG: join prunable for intervals [2497,4964] and [11554,13920] -DEBUG: join prunable for intervals [2497,4964] and [8997,11554] -DEBUG: join prunable for intervals [2497,4964] and [4965,5986] -DEBUG: join prunable for intervals [2497,4964] and [1,2496] -DEBUG: join prunable for intervals [1,2496] and [13921,14947] -DEBUG: join prunable for intervals [1,2496] and [11554,13920] -DEBUG: join prunable for intervals [1,2496] and [8997,11554] -DEBUG: join prunable for intervals [1,2496] and [4965,5986] DEBUG: join prunable for intervals [1,2496] and [2497,4964] +DEBUG: join prunable for intervals [1,2496] and [4965,5986] +DEBUG: join prunable for intervals [1,2496] and [8997,11554] +DEBUG: join prunable for intervals [1,2496] and [11554,13920] +DEBUG: join prunable for intervals [1,2496] and [13921,14947] +DEBUG: join prunable for intervals [2497,4964] and [1,2496] +DEBUG: join prunable for intervals [2497,4964] and [4965,5986] +DEBUG: join prunable for intervals [2497,4964] and [8997,11554] +DEBUG: join prunable for intervals [2497,4964] and [11554,13920] +DEBUG: join prunable for intervals [2497,4964] and [13921,14947] +DEBUG: join prunable for intervals [4965,5986] and [1,2496] +DEBUG: join prunable for intervals [4965,5986] and [2497,4964] +DEBUG: join prunable for intervals [4965,5986] and [8997,11554] +DEBUG: join prunable for intervals [4965,5986] and [11554,13920] +DEBUG: join prunable for intervals [4965,5986] and [13921,14947] +DEBUG: join prunable for intervals [8997,11554] and [1,2496] +DEBUG: join prunable for intervals [8997,11554] and [2497,4964] +DEBUG: join prunable for intervals [8997,11554] and [4965,5986] +DEBUG: join prunable for intervals [8997,11554] and [13921,14947] +DEBUG: join prunable for intervals [11554,13920] and [1,2496] +DEBUG: join prunable for intervals [11554,13920] and [2497,4964] +DEBUG: join prunable for intervals [11554,13920] and [4965,5986] +DEBUG: join prunable for intervals [11554,13920] and [13921,14947] +DEBUG: join prunable for intervals [13921,14947] and [1,2496] +DEBUG: join prunable for intervals [13921,14947] and [2497,4964] +DEBUG: join prunable for intervals [13921,14947] and [4965,5986] +DEBUG: join prunable for intervals [13921,14947] and [8997,11554] +DEBUG: join prunable for intervals [13921,14947] and [11554,13920] QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported diff --git a/src/test/regress/expected/multi_join_pruning.out b/src/test/regress/expected/multi_join_pruning.out index 9627df180..6299768f3 100644 --- a/src/test/regress/expected/multi_join_pruning.out +++ b/src/test/regress/expected/multi_join_pruning.out @@ -9,12 +9,12 @@ SET client_min_messages TO DEBUG2; SET citus.large_table_shard_count TO 2; SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey; -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] -DEBUG: join prunable for intervals [8997,11554] and [1,5986] -DEBUG: join prunable for intervals [4965,5986] and [8997,14946] -DEBUG: join prunable for intervals [2497,4964] and [8997,14946] DEBUG: join prunable for intervals [1,2496] and [8997,14946] +DEBUG: join prunable for intervals [2497,4964] and [8997,14946] +DEBUG: join prunable for intervals [4965,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] sum | avg -------+-------------------- 36086 | 3.0076679446574429 @@ -22,12 +22,12 @@ DEBUG: join prunable for intervals [1,2496] and [8997,14946] SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey AND l_orderkey > 9030; -DEBUG: predicate pruning for shardId 102011 -DEBUG: predicate pruning for shardId 102010 DEBUG: predicate pruning for shardId 102009 -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] sum | avg -------+-------------------- 17996 | 3.0194630872483221 @@ -37,12 +37,12 @@ DEBUG: join prunable for intervals [8997,11554] and [1,5986] -- works as expected in this case. SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey AND l_orderkey > 20000; -DEBUG: predicate pruning for shardId 102014 -DEBUG: predicate pruning for shardId 102013 -DEBUG: predicate pruning for shardId 102012 -DEBUG: predicate pruning for shardId 102011 -DEBUG: predicate pruning for shardId 102010 DEBUG: predicate pruning for shardId 102009 +DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 +DEBUG: predicate pruning for shardId 102012 +DEBUG: predicate pruning for shardId 102013 +DEBUG: predicate pruning for shardId 102014 sum | avg -----+----- | @@ -53,13 +53,13 @@ DEBUG: predicate pruning for shardId 102009 -- out all the shards, and leave us with an empty task list. SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey AND l_orderkey > 6000 AND o_orderkey < 6000; -DEBUG: predicate pruning for shardId 102011 -DEBUG: predicate pruning for shardId 102010 DEBUG: predicate pruning for shardId 102009 +DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 DEBUG: predicate pruning for shardId 102016 -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] sum | avg -----+----- | @@ -72,8 +72,8 @@ DEBUG: join prunable for intervals [8997,11554] and [1,5986] EXPLAIN SELECT count(*) FROM array_partitioned_table table1, array_partitioned_table table2 WHERE table1.array_column = table2.array_column; -DEBUG: join prunable for intervals [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] and [{},{AZZXSP27F21T6,AZZXSP27F21T6}] DEBUG: join prunable for intervals [{},{AZZXSP27F21T6,AZZXSP27F21T6}] and [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] +DEBUG: join prunable for intervals [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] and [{},{AZZXSP27F21T6,AZZXSP27F21T6}] QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported @@ -82,8 +82,8 @@ DEBUG: join prunable for intervals [{},{AZZXSP27F21T6,AZZXSP27F21T6}] and [{BA1 EXPLAIN SELECT count(*) FROM composite_partitioned_table table1, composite_partitioned_table table2 WHERE table1.composite_column = table2.composite_column; -DEBUG: join prunable for intervals [(c,5,d),(d,6,e)] and [(a,3,b),(b,4,c)] DEBUG: join prunable for intervals [(a,3,b),(b,4,c)] and [(c,5,d),(d,6,e)] +DEBUG: join prunable for intervals [(c,5,d),(d,6,e)] and [(a,3,b),(b,4,c)] QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported @@ -93,8 +93,8 @@ DEBUG: join prunable for intervals [(a,3,b),(b,4,c)] and [(c,5,d),(d,6,e)] EXPLAIN SELECT count(*) FROM varchar_partitioned_table table1, varchar_partitioned_table table2 WHERE table1.varchar_column = table2.varchar_column; -DEBUG: join prunable for intervals [BA1000U2AMO4ZGX,BZZXSP27F21T6] and [AA1000U2AMO4ZGX,AZZXSP27F21T6] DEBUG: join prunable for intervals [AA1000U2AMO4ZGX,AZZXSP27F21T6] and [BA1000U2AMO4ZGX,BZZXSP27F21T6] +DEBUG: join prunable for intervals [BA1000U2AMO4ZGX,BZZXSP27F21T6] and [AA1000U2AMO4ZGX,AZZXSP27F21T6] QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported diff --git a/src/test/regress/expected/multi_large_table_join_planning.out b/src/test/regress/expected/multi_large_table_join_planning.out index 842f164cc..4573c457f 100644 --- a/src/test/regress/expected/multi_large_table_join_planning.out +++ b/src/test/regress/expected/multi_large_table_join_planning.out @@ -45,30 +45,30 @@ ORDER BY LIMIT 30; DEBUG: StartTransactionCommand DEBUG: push down of limit count: 30 -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] -DEBUG: join prunable for intervals [8997,11554] and [1,5986] -DEBUG: join prunable for intervals [4965,5986] and [8997,14946] -DEBUG: join prunable for intervals [2497,4964] and [8997,14946] DEBUG: join prunable for intervals [1,2496] and [8997,14946] +DEBUG: join prunable for intervals [2497,4964] and [8997,14946] +DEBUG: join prunable for intervals [4965,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] DEBUG: generated sql query for job 1250 and task 3 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102014 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" -DEBUG: generated sql query for job 1250 and task 6 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102013 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" -DEBUG: generated sql query for job 1250 and task 9 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102012 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" -DEBUG: generated sql query for job 1250 and task 12 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102011 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" -DEBUG: generated sql query for job 1250 and task 15 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102010 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" -DEBUG: generated sql query for job 1250 and task 18 DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102009 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" -DEBUG: assigned task 15 to node localhost:57637 -DEBUG: assigned task 18 to node localhost:57638 -DEBUG: assigned task 9 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: assigned task 3 to node localhost:57637 -DEBUG: assigned task 6 to node localhost:57638 +DEBUG: generated sql query for job 1250 and task 6 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102010 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" +DEBUG: generated sql query for job 1250 and task 9 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102011 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" +DEBUG: generated sql query for job 1250 and task 12 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102012 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" +DEBUG: generated sql query for job 1250 and task 15 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102013 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" +DEBUG: generated sql query for job 1250 and task 18 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102014 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 18 to node localhost:57637 +DEBUG: assigned task 15 to node localhost:57638 DEBUG: join prunable for intervals [1,1000] and [6001,7000] DEBUG: join prunable for intervals [6001,7000] and [1,1000] DEBUG: generated sql query for job 1251 and task 3 @@ -83,10 +83,10 @@ DEBUG: assigned task 3 to node localhost:57637 DEBUG: assigned task 6 to node localhost:57638 DEBUG: join prunable for intervals [1,1000] and [1001,2000] DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for intervals [1001,2000] and [6001,7000] DEBUG: join prunable for intervals [6001,7000] and [1,1000] +DEBUG: join prunable for intervals [6001,7000] and [1001,2000] DEBUG: generated sql query for job 1252 and task 3 DETAIL: query string: "SELECT "pg_merge_job_1251.task_000007".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000007 "pg_merge_job_1251.task_000007" JOIN customer_102017 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000007".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000007".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000007".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1, "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 LIMIT '30'::bigint" DEBUG: generated sql query for job 1252 and task 6 @@ -156,29 +156,29 @@ ORDER BY l_partkey, o_orderkey; DEBUG: StartTransactionCommand DEBUG: generated sql query for job 1253 and task 2 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102014 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 4 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102013 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 6 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102012 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 8 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102011 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 10 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102010 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 12 DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102009 lineitem WHERE (l_quantity < 5.0)" -DEBUG: assigned task 10 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: assigned task 6 to node localhost:57637 -DEBUG: assigned task 8 to node localhost:57638 -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: generated sql query for job 1253 and task 4 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102010 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for job 1253 and task 6 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102011 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for job 1253 and task 8 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102012 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for job 1253 and task 10 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102013 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for job 1253 and task 12 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102014 lineitem WHERE (l_quantity < 5.0)" +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 8 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 10 to node localhost:57638 DEBUG: generated sql query for job 1254 and task 2 -DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_102016 orders WHERE (o_totalprice <> 4.0)" -DEBUG: generated sql query for job 1254 and task 4 DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_102015 orders WHERE (o_totalprice <> 4.0)" -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: generated sql query for job 1254 and task 4 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_102016 orders WHERE (o_totalprice <> 4.0)" +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 3 diff --git a/src/test/regress/expected/multi_large_table_join_planning_0.out b/src/test/regress/expected/multi_large_table_join_planning_0.out index 33700d111..9296d52e1 100644 --- a/src/test/regress/expected/multi_large_table_join_planning_0.out +++ b/src/test/regress/expected/multi_large_table_join_planning_0.out @@ -45,30 +45,30 @@ ORDER BY LIMIT 30; DEBUG: StartTransactionCommand DEBUG: push down of limit count: 30 -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] -DEBUG: join prunable for intervals [8997,11554] and [1,5986] -DEBUG: join prunable for intervals [4965,5986] and [8997,14946] -DEBUG: join prunable for intervals [2497,4964] and [8997,14946] DEBUG: join prunable for intervals [1,2496] and [8997,14946] +DEBUG: join prunable for intervals [2497,4964] and [8997,14946] +DEBUG: join prunable for intervals [4965,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] DEBUG: generated sql query for job 1250 and task 3 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102014 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" -DEBUG: generated sql query for job 1250 and task 6 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102013 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" -DEBUG: generated sql query for job 1250 and task 9 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102012 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" -DEBUG: generated sql query for job 1250 and task 12 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102011 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" -DEBUG: generated sql query for job 1250 and task 15 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102010 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" -DEBUG: generated sql query for job 1250 and task 18 DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102009 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" -DEBUG: assigned task 15 to node localhost:57637 -DEBUG: assigned task 18 to node localhost:57638 -DEBUG: assigned task 9 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: assigned task 3 to node localhost:57637 -DEBUG: assigned task 6 to node localhost:57638 +DEBUG: generated sql query for job 1250 and task 6 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102010 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" +DEBUG: generated sql query for job 1250 and task 9 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102011 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" +DEBUG: generated sql query for job 1250 and task 12 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102012 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" +DEBUG: generated sql query for job 1250 and task 15 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102013 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" +DEBUG: generated sql query for job 1250 and task 18 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102014 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 18 to node localhost:57637 +DEBUG: assigned task 15 to node localhost:57638 DEBUG: join prunable for intervals [1,1000] and [6001,7000] DEBUG: join prunable for intervals [6001,7000] and [1,1000] DEBUG: generated sql query for job 1251 and task 3 @@ -83,10 +83,10 @@ DEBUG: assigned task 3 to node localhost:57637 DEBUG: assigned task 6 to node localhost:57638 DEBUG: join prunable for intervals [1,1000] and [1001,2000] DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for intervals [1001,2000] and [6001,7000] DEBUG: join prunable for intervals [6001,7000] and [1,1000] +DEBUG: join prunable for intervals [6001,7000] and [1001,2000] DEBUG: generated sql query for job 1252 and task 3 DETAIL: query string: "SELECT "pg_merge_job_1251.task_000007".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000007 "pg_merge_job_1251.task_000007" JOIN customer_102017 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000007".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000007".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000007".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1, "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 LIMIT 30::bigint" DEBUG: generated sql query for job 1252 and task 6 @@ -156,29 +156,29 @@ ORDER BY l_partkey, o_orderkey; DEBUG: StartTransactionCommand DEBUG: generated sql query for job 1253 and task 2 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102014 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 4 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102013 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 6 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102012 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 8 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102011 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 10 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102010 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 12 DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102009 lineitem WHERE (l_quantity < 5.0)" -DEBUG: assigned task 10 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: assigned task 6 to node localhost:57637 -DEBUG: assigned task 8 to node localhost:57638 -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: generated sql query for job 1253 and task 4 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102010 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for job 1253 and task 6 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102011 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for job 1253 and task 8 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102012 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for job 1253 and task 10 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102013 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for job 1253 and task 12 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102014 lineitem WHERE (l_quantity < 5.0)" +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 8 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 10 to node localhost:57638 DEBUG: generated sql query for job 1254 and task 2 -DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_102016 orders WHERE (o_totalprice <> 4.0)" -DEBUG: generated sql query for job 1254 and task 4 DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_102015 orders WHERE (o_totalprice <> 4.0)" -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: generated sql query for job 1254 and task 4 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_102016 orders WHERE (o_totalprice <> 4.0)" +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 3 diff --git a/src/test/regress/expected/multi_large_table_pruning.out b/src/test/regress/expected/multi_large_table_pruning.out index 20b62fa4d..27908a6ca 100644 --- a/src/test/regress/expected/multi_large_table_pruning.out +++ b/src/test/regress/expected/multi_large_table_pruning.out @@ -16,10 +16,10 @@ WHERE o_custkey = c_custkey; DEBUG: join prunable for intervals [1,1000] and [1001,2000] DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for intervals [1001,2000] and [6001,7000] DEBUG: join prunable for intervals [6001,7000] and [1,1000] +DEBUG: join prunable for intervals [6001,7000] and [1001,2000] DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 5 DEBUG: pruning merge fetch taskId 4 @@ -40,8 +40,8 @@ FROM WHERE o_custkey = c_custkey AND o_orderkey < 0; -DEBUG: predicate pruning for shardId 102016 DEBUG: predicate pruning for shardId 102015 +DEBUG: predicate pruning for shardId 102016 count ------- @@ -56,9 +56,9 @@ FROM WHERE o_custkey = c_custkey AND c_custkey < 0; +DEBUG: predicate pruning for shardId 102017 DEBUG: predicate pruning for shardId 102034 DEBUG: predicate pruning for shardId 102033 -DEBUG: predicate pruning for shardId 102017 count ------- @@ -115,12 +115,12 @@ FROM WHERE l_partkey = c_nationkey AND l_orderkey < 0; -DEBUG: predicate pruning for shardId 102014 -DEBUG: predicate pruning for shardId 102013 -DEBUG: predicate pruning for shardId 102012 -DEBUG: predicate pruning for shardId 102011 -DEBUG: predicate pruning for shardId 102010 DEBUG: predicate pruning for shardId 102009 +DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 +DEBUG: predicate pruning for shardId 102012 +DEBUG: predicate pruning for shardId 102013 +DEBUG: predicate pruning for shardId 102014 count ------- diff --git a/src/test/regress/expected/multi_large_table_task_assignment.out b/src/test/regress/expected/multi_large_table_task_assignment.out index 7ba87fa94..23b7bd2ac 100644 --- a/src/test/regress/expected/multi_large_table_task_assignment.out +++ b/src/test/regress/expected/multi_large_table_task_assignment.out @@ -25,14 +25,14 @@ FROM WHERE o_custkey = c_custkey; DEBUG: StartTransactionCommand -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 DEBUG: join prunable for intervals [1,1000] and [1001,2000] DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for intervals [1001,2000] and [6001,7000] DEBUG: join prunable for intervals [6001,7000] and [1,1000] +DEBUG: join prunable for intervals [6001,7000] and [1001,2000] DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 5 DEBUG: pruning merge fetch taskId 4 @@ -64,40 +64,40 @@ WHERE o_custkey = c_custkey AND o_orderkey = l_orderkey; DEBUG: StartTransactionCommand -DEBUG: assigned task 3 to node localhost:57637 -DEBUG: assigned task 15 to node localhost:57638 -DEBUG: assigned task 6 to node localhost:57637 -DEBUG: assigned task 18 to node localhost:57638 DEBUG: assigned task 9 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: join prunable for intervals [1,2496] and [13921,14947] -DEBUG: join prunable for intervals [1,2496] and [11554,13920] -DEBUG: join prunable for intervals [1,2496] and [8997,11554] -DEBUG: join prunable for intervals [1,2496] and [4965,5986] +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 15 to node localhost:57637 +DEBUG: assigned task 18 to node localhost:57638 DEBUG: join prunable for intervals [1,2496] and [2497,4964] -DEBUG: join prunable for intervals [2497,4964] and [13921,14947] -DEBUG: join prunable for intervals [2497,4964] and [11554,13920] -DEBUG: join prunable for intervals [2497,4964] and [8997,11554] -DEBUG: join prunable for intervals [2497,4964] and [4965,5986] +DEBUG: join prunable for intervals [1,2496] and [4965,5986] +DEBUG: join prunable for intervals [1,2496] and [8997,11554] +DEBUG: join prunable for intervals [1,2496] and [11554,13920] +DEBUG: join prunable for intervals [1,2496] and [13921,14947] DEBUG: join prunable for intervals [2497,4964] and [1,2496] -DEBUG: join prunable for intervals [4965,5986] and [13921,14947] -DEBUG: join prunable for intervals [4965,5986] and [11554,13920] -DEBUG: join prunable for intervals [4965,5986] and [8997,11554] -DEBUG: join prunable for intervals [4965,5986] and [2497,4964] +DEBUG: join prunable for intervals [2497,4964] and [4965,5986] +DEBUG: join prunable for intervals [2497,4964] and [8997,11554] +DEBUG: join prunable for intervals [2497,4964] and [11554,13920] +DEBUG: join prunable for intervals [2497,4964] and [13921,14947] DEBUG: join prunable for intervals [4965,5986] and [1,2496] -DEBUG: join prunable for intervals [8997,11554] and [13921,14947] -DEBUG: join prunable for intervals [8997,11554] and [4965,5986] -DEBUG: join prunable for intervals [8997,11554] and [2497,4964] +DEBUG: join prunable for intervals [4965,5986] and [2497,4964] +DEBUG: join prunable for intervals [4965,5986] and [8997,11554] +DEBUG: join prunable for intervals [4965,5986] and [11554,13920] +DEBUG: join prunable for intervals [4965,5986] and [13921,14947] DEBUG: join prunable for intervals [8997,11554] and [1,2496] -DEBUG: join prunable for intervals [11554,13920] and [13921,14947] -DEBUG: join prunable for intervals [11554,13920] and [4965,5986] -DEBUG: join prunable for intervals [11554,13920] and [2497,4964] +DEBUG: join prunable for intervals [8997,11554] and [2497,4964] +DEBUG: join prunable for intervals [8997,11554] and [4965,5986] +DEBUG: join prunable for intervals [8997,11554] and [13921,14947] DEBUG: join prunable for intervals [11554,13920] and [1,2496] -DEBUG: join prunable for intervals [13921,14947] and [11554,13920] -DEBUG: join prunable for intervals [13921,14947] and [8997,11554] -DEBUG: join prunable for intervals [13921,14947] and [4965,5986] -DEBUG: join prunable for intervals [13921,14947] and [2497,4964] +DEBUG: join prunable for intervals [11554,13920] and [2497,4964] +DEBUG: join prunable for intervals [11554,13920] and [4965,5986] +DEBUG: join prunable for intervals [11554,13920] and [13921,14947] DEBUG: join prunable for intervals [13921,14947] and [1,2496] +DEBUG: join prunable for intervals [13921,14947] and [2497,4964] +DEBUG: join prunable for intervals [13921,14947] and [4965,5986] +DEBUG: join prunable for intervals [13921,14947] and [8997,11554] +DEBUG: join prunable for intervals [13921,14947] and [11554,13920] DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 19 DEBUG: pruning merge fetch taskId 4 @@ -116,10 +116,10 @@ DEBUG: pruning merge fetch taskId 22 DETAIL: Creating dependency on merge taskId 54 DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 3 to node localhost:57638 -DEBUG: assigned task 24 to node localhost:57637 -DEBUG: assigned task 9 to node localhost:57638 DEBUG: assigned task 12 to node localhost:57637 -DEBUG: assigned task 18 to node localhost:57638 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 18 to node localhost:57637 +DEBUG: assigned task 24 to node localhost:57638 DEBUG: propagating assignment from merge task 40 to constrained sql task 15 DEBUG: propagating assignment from merge task 47 to constrained sql task 21 DEBUG: CommitTransactionCommand @@ -154,15 +154,15 @@ FROM WHERE l_partkey = c_nationkey; DEBUG: StartTransactionCommand -DEBUG: assigned task 10 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: assigned task 6 to node localhost:57637 -DEBUG: assigned task 8 to node localhost:57638 -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 6 to node localhost:57638 DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 8 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 10 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 6 to node localhost:57637 DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 3 diff --git a/src/test/regress/expected/multi_null_minmax_value_pruning.out b/src/test/regress/expected/multi_null_minmax_value_pruning.out index cb7ff7b06..80795483c 100644 --- a/src/test/regress/expected/multi_null_minmax_value_pruning.out +++ b/src/test/regress/expected/multi_null_minmax_value_pruning.out @@ -21,10 +21,10 @@ SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 102010; -- Check that partition and join pruning works when min/max values exist -- Adding l_orderkey = 1 to make the query not router executable SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1; -DEBUG: predicate pruning for shardId 102014 -DEBUG: predicate pruning for shardId 102013 -DEBUG: predicate pruning for shardId 102011 DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 +DEBUG: predicate pruning for shardId 102013 +DEBUG: predicate pruning for shardId 102014 l_orderkey | l_linenumber | l_shipdate ------------+--------------+------------ 9030 | 1 | 09-02-1998 @@ -43,12 +43,12 @@ DEBUG: predicate pruning for shardId 102010 SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey; -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] -DEBUG: join prunable for intervals [8997,11554] and [1,5986] -DEBUG: join prunable for intervals [4965,5986] and [8997,14946] -DEBUG: join prunable for intervals [2497,4964] and [8997,14946] DEBUG: join prunable for intervals [1,2496] and [8997,14946] +DEBUG: join prunable for intervals [2497,4964] and [8997,14946] +DEBUG: join prunable for intervals [4965,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] sum | avg -------+-------------------- 36086 | 3.0076679446574429 @@ -58,10 +58,10 @@ DEBUG: join prunable for intervals [1,2496] and [8997,14946] -- partition or join pruning for the shard with null min value. UPDATE pg_dist_shard SET shardminvalue = NULL WHERE shardid = 102009; SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; -DEBUG: predicate pruning for shardId 102014 -DEBUG: predicate pruning for shardId 102013 -DEBUG: predicate pruning for shardId 102011 DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 +DEBUG: predicate pruning for shardId 102013 +DEBUG: predicate pruning for shardId 102014 l_orderkey | l_linenumber | l_shipdate ------------+--------------+------------ 9030 | 1 | 09-02-1998 @@ -74,11 +74,11 @@ DEBUG: predicate pruning for shardId 102010 SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey; -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] -DEBUG: join prunable for intervals [8997,11554] and [1,5986] -DEBUG: join prunable for intervals [4965,5986] and [8997,14946] DEBUG: join prunable for intervals [2497,4964] and [8997,14946] +DEBUG: join prunable for intervals [4965,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] sum | avg -------+-------------------- 36086 | 3.0076679446574429 @@ -88,9 +88,9 @@ DEBUG: join prunable for intervals [2497,4964] and [8997,14946] -- don't apply partition or join pruning for this other shard either. UPDATE pg_dist_shard SET shardmaxvalue = NULL WHERE shardid = 102010; SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; -DEBUG: predicate pruning for shardId 102014 -DEBUG: predicate pruning for shardId 102013 DEBUG: predicate pruning for shardId 102011 +DEBUG: predicate pruning for shardId 102013 +DEBUG: predicate pruning for shardId 102014 l_orderkey | l_linenumber | l_shipdate ------------+--------------+------------ 9030 | 1 | 09-02-1998 @@ -103,10 +103,10 @@ DEBUG: predicate pruning for shardId 102011 SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey; -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] -DEBUG: join prunable for intervals [8997,11554] and [1,5986] DEBUG: join prunable for intervals [4965,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] sum | avg -------+-------------------- 36086 | 3.0076679446574429 @@ -116,10 +116,10 @@ DEBUG: join prunable for intervals [4965,5986] and [8997,14946] -- should apply partition and join pruning for this shard now. UPDATE pg_dist_shard SET shardminvalue = '0' WHERE shardid = 102009; SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; -DEBUG: predicate pruning for shardId 102014 -DEBUG: predicate pruning for shardId 102013 -DEBUG: predicate pruning for shardId 102011 DEBUG: predicate pruning for shardId 102009 +DEBUG: predicate pruning for shardId 102011 +DEBUG: predicate pruning for shardId 102013 +DEBUG: predicate pruning for shardId 102014 l_orderkey | l_linenumber | l_shipdate ------------+--------------+------------ 9030 | 1 | 09-02-1998 @@ -132,11 +132,11 @@ DEBUG: predicate pruning for shardId 102009 SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey; -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] -DEBUG: join prunable for intervals [8997,11554] and [1,5986] -DEBUG: join prunable for intervals [4965,5986] and [8997,14946] DEBUG: join prunable for intervals [0,2496] and [8997,14946] +DEBUG: join prunable for intervals [4965,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] sum | avg -------+-------------------- 36086 | 3.0076679446574429 diff --git a/src/test/regress/expected/multi_partition_pruning.out b/src/test/regress/expected/multi_partition_pruning.out index ea969bddd..189b10594 100644 --- a/src/test/regress/expected/multi_partition_pruning.out +++ b/src/test/regress/expected/multi_partition_pruning.out @@ -6,10 +6,10 @@ SET client_min_messages TO DEBUG2; -- Adding additional l_orderkey = 1 to make this query not router executable SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1; -DEBUG: predicate pruning for shardId 102014 -DEBUG: predicate pruning for shardId 102013 -DEBUG: predicate pruning for shardId 102011 DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 +DEBUG: predicate pruning for shardId 102013 +DEBUG: predicate pruning for shardId 102014 l_orderkey | l_linenumber | l_shipdate ------------+--------------+------------ 9030 | 1 | 09-02-1998 @@ -32,9 +32,9 @@ DEBUG: predicate pruning for shardId 102010 -- trigger the the creation of toasted tables and indexes. This in turn prints -- non-deterministic debug messages. To avoid this chain, we use l_linenumber. SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; -DEBUG: predicate pruning for shardId 102011 -DEBUG: predicate pruning for shardId 102010 DEBUG: predicate pruning for shardId 102009 +DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 sum | avg -------+-------------------- 17999 | 3.0189533713518953 @@ -50,12 +50,12 @@ DEBUG: predicate pruning for shardId 102011 -- The following query should prune out all shards and return empty results SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem WHERE l_orderkey > 20000; -DEBUG: predicate pruning for shardId 102014 -DEBUG: predicate pruning for shardId 102013 -DEBUG: predicate pruning for shardId 102012 -DEBUG: predicate pruning for shardId 102011 -DEBUG: predicate pruning for shardId 102010 DEBUG: predicate pruning for shardId 102009 +DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 +DEBUG: predicate pruning for shardId 102012 +DEBUG: predicate pruning for shardId 102013 +DEBUG: predicate pruning for shardId 102014 sum | avg -----+----- | diff --git a/src/test/regress/expected/multi_prune_shard_list.out b/src/test/regress/expected/multi_prune_shard_list.out index 2f74b1991..44ae8203a 100644 --- a/src/test/regress/expected/multi_prune_shard_list.out +++ b/src/test/regress/expected/multi_prune_shard_list.out @@ -21,6 +21,10 @@ CREATE FUNCTION debug_equality_expression(regclass) RETURNS cstring AS 'citus' LANGUAGE C STRICT; +CREATE FUNCTION print_sorted_shard_intervals(regclass) + RETURNS text[] + AS 'citus' + LANGUAGE C STRICT; -- =================================================================== -- test shard pruning functionality -- =================================================================== @@ -40,7 +44,7 @@ VALUES SELECT prune_using_no_values('pruning'); prune_using_no_values ----------------------- - {13,12,11,10} + {10,11,12,13} (1 row) -- with a single value, expect a single shard @@ -61,7 +65,7 @@ SELECT prune_using_single_value('pruning', NULL); SELECT prune_using_either_value('pruning', 'tomato', 'petunia'); prune_using_either_value -------------------------- - {12,11} + {11,12} (1 row) -- an AND clause with incompatible values returns no shards @@ -85,3 +89,88 @@ SELECT debug_equality_expression('pruning'); {OPEXPR :opno 98 :opfuncid 67 :opresulttype 16 :opretset false :opcollid 0 :inputcollid 100 :args ({VAR :varno 1 :varattno 1 :vartype 25 :vartypmod -1 :varcollid 100 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} {CONST :consttype 25 :consttypmod -1 :constcollid 100 :constlen -1 :constbyval false :constisnull true :location -1 :constvalue <>}) :location -1} (1 row) +-- print the initial ordering of shard intervals +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {10,11,12,13} +(1 row) + +-- update only min value for one shard +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 11; +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {10,12,13,11} +(1 row) + +-- now lets have one more shard without min/max values +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 12; +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {10,13,11,12} +(1 row) + +-- now lets have one more shard without min/max values +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 10; +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {13,10,11,12} +(1 row) + +-- all shard placements are uninitialized +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 13; +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {10,11,12,13} +(1 row) + +-- now update the metadata so that the table is a range distributed table +UPDATE pg_dist_partition SET partmethod = 'r' WHERE logicalrelid = 'pruning'::regclass; +-- now the comparison is done via the partition column type, which is text +UPDATE pg_dist_shard SET shardminvalue = 'a', shardmaxvalue = 'b' WHERE shardid = 10; +UPDATE pg_dist_shard SET shardminvalue = 'c', shardmaxvalue = 'd' WHERE shardid = 11; +UPDATE pg_dist_shard SET shardminvalue = 'e', shardmaxvalue = 'f' WHERE shardid = 12; +UPDATE pg_dist_shard SET shardminvalue = 'g', shardmaxvalue = 'h' WHERE shardid = 13; +-- print the ordering of shard intervals with range partitioning as well +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {10,11,12,13} +(1 row) + +-- update only min value for one shard +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 11; +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {10,12,13,11} +(1 row) + +-- now lets have one more shard without min/max values +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 12; +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {10,13,11,12} +(1 row) + +-- now lets have one more shard without min/max values +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 10; +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {13,10,11,12} +(1 row) + +-- all shard placements are uninitialized +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 13; +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {10,11,12,13} +(1 row) + diff --git a/src/test/regress/output/multi_alter_table_statements.source b/src/test/regress/output/multi_alter_table_statements.source index 9466c3170..7bab12bf2 100644 --- a/src/test/regress/output/multi_alter_table_statements.source +++ b/src/test/regress/output/multi_alter_table_statements.source @@ -260,8 +260,8 @@ ALTER TABLE IF EXISTS non_existent_table ADD COLUMN new_column INTEGER; NOTICE: relation "non_existent_table" does not exist, skipping ALTER TABLE IF EXISTS lineitem_alter ALTER COLUMN int_column2 SET DATA TYPE INTEGER; ALTER TABLE lineitem_alter DROP COLUMN non_existent_column; -WARNING: could not receive query results from localhost:57637 -DETAIL: Client error: column "non_existent_column" of relation "lineitem_alter_103009" does not exist +WARNING: could not receive query results from localhost:57638 +DETAIL: Client error: column "non_existent_column" of relation "lineitem_alter_103000" does not exist ERROR: could not execute DDL command on worker node shards ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS non_existent_column; NOTICE: column "non_existent_column" of relation "lineitem_alter" does not exist, skipping @@ -360,15 +360,15 @@ DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT and TYPE subc -- Verify that we error out in case of postgres errors on supported statement -- types ALTER TABLE lineitem_alter ADD COLUMN new_column non_existent_type; -WARNING: could not receive query results from localhost:57637 +WARNING: could not receive query results from localhost:57638 DETAIL: Client error: type "non_existent_type" does not exist ERROR: could not execute DDL command on worker node shards ALTER TABLE lineitem_alter ALTER COLUMN null_column SET NOT NULL; -WARNING: could not receive query results from localhost:57637 +WARNING: could not receive query results from localhost:57638 DETAIL: Client error: column "null_column" contains null values ERROR: could not execute DDL command on worker node shards ALTER TABLE lineitem_alter ALTER COLUMN l_partkey SET DEFAULT 'a'; -WARNING: could not receive query results from localhost:57637 +WARNING: could not receive query results from localhost:57638 DETAIL: Client error: invalid input syntax for integer: "a" ERROR: could not execute DDL command on worker node shards -- Verify that we error out on statements involving RENAME diff --git a/src/test/regress/output/multi_subquery.source b/src/test/regress/output/multi_subquery.source index 0d8492e9b..e6724b673 100644 --- a/src/test/regress/output/multi_subquery.source +++ b/src/test/regress/output/multi_subquery.source @@ -349,9 +349,9 @@ SET client_min_messages TO DEBUG2; SELECT * FROM (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a) AS foo; -DEBUG: predicate pruning for shardId 102029 -DEBUG: predicate pruning for shardId 102027 DEBUG: predicate pruning for shardId 102026 +DEBUG: predicate pruning for shardId 102027 +DEBUG: predicate pruning for shardId 102029 count ------- (0 rows) @@ -359,9 +359,9 @@ DEBUG: predicate pruning for shardId 102026 SELECT * FROM (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE 'eren' = a GROUP BY a) AS foo; -DEBUG: predicate pruning for shardId 102029 -DEBUG: predicate pruning for shardId 102028 DEBUG: predicate pruning for shardId 102026 +DEBUG: predicate pruning for shardId 102028 +DEBUG: predicate pruning for shardId 102029 count ------- (0 rows) diff --git a/src/test/regress/sql/multi_prune_shard_list.sql b/src/test/regress/sql/multi_prune_shard_list.sql index 2a215c5a5..6db632e1e 100644 --- a/src/test/regress/sql/multi_prune_shard_list.sql +++ b/src/test/regress/sql/multi_prune_shard_list.sql @@ -27,6 +27,11 @@ CREATE FUNCTION debug_equality_expression(regclass) AS 'citus' LANGUAGE C STRICT; +CREATE FUNCTION print_sorted_shard_intervals(regclass) + RETURNS text[] + AS 'citus' + LANGUAGE C STRICT; + -- =================================================================== -- test shard pruning functionality -- =================================================================== @@ -66,3 +71,50 @@ SELECT prune_using_both_values('pruning', 'tomato', 'rose'); -- unit test of the equality expression generation code SELECT debug_equality_expression('pruning'); + +-- print the initial ordering of shard intervals +SELECT print_sorted_shard_intervals('pruning'); + +-- update only min value for one shard +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 11; +SELECT print_sorted_shard_intervals('pruning'); + +-- now lets have one more shard without min/max values +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 12; +SELECT print_sorted_shard_intervals('pruning'); + +-- now lets have one more shard without min/max values +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 10; +SELECT print_sorted_shard_intervals('pruning'); + +-- all shard placements are uninitialized +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 13; +SELECT print_sorted_shard_intervals('pruning'); + +-- now update the metadata so that the table is a range distributed table +UPDATE pg_dist_partition SET partmethod = 'r' WHERE logicalrelid = 'pruning'::regclass; + +-- now the comparison is done via the partition column type, which is text +UPDATE pg_dist_shard SET shardminvalue = 'a', shardmaxvalue = 'b' WHERE shardid = 10; +UPDATE pg_dist_shard SET shardminvalue = 'c', shardmaxvalue = 'd' WHERE shardid = 11; +UPDATE pg_dist_shard SET shardminvalue = 'e', shardmaxvalue = 'f' WHERE shardid = 12; +UPDATE pg_dist_shard SET shardminvalue = 'g', shardmaxvalue = 'h' WHERE shardid = 13; + +-- print the ordering of shard intervals with range partitioning as well +SELECT print_sorted_shard_intervals('pruning'); + +-- update only min value for one shard +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 11; +SELECT print_sorted_shard_intervals('pruning'); + +-- now lets have one more shard without min/max values +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 12; +SELECT print_sorted_shard_intervals('pruning'); + +-- now lets have one more shard without min/max values +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 10; +SELECT print_sorted_shard_intervals('pruning'); + +-- all shard placements are uninitialized +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 13; +SELECT print_sorted_shard_intervals('pruning');