diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 53b73c762..42bbec3c7 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -68,6 +68,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/connection_cache.h" #include "distributed/listutils.h" +#include "distributed/metadata_cache.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" @@ -76,6 +77,7 @@ #include "distributed/multi_transaction.h" #include "distributed/pg_dist_partition.h" #include "distributed/resource_lock.h" +#include "distributed/shardinterval_utils.h" #include "distributed/worker_protocol.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -144,19 +146,6 @@ static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag); static void LockAllShards(List *shardIntervalList); static HTAB * CreateShardConnectionHash(void); static int CompareShardIntervalsById(const void *leftElement, const void *rightElement); -static bool IsUniformHashDistribution(ShardInterval **shardIntervalArray, - int shardCount); -static FmgrInfo * ShardIntervalCompareFunction(Var *partitionColumn, char - partitionMethod); -static ShardInterval * FindShardInterval(Datum partitionColumnValue, - ShardInterval **shardIntervalCache, - int shardCount, char partitionMethod, - FmgrInfo *compareFunction, - FmgrInfo *hashFunction, bool useBinarySearch); -static ShardInterval * SearchCachedShardInterval(Datum partitionColumnValue, - ShardInterval **shardIntervalCache, - int shardCount, - FmgrInfo *compareFunction); static ShardConnections * GetShardConnections(HTAB *shardConnectionHash, int64 shardId, bool *shardConnectionsFound); @@ -250,9 +239,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) uint32 columnCount = 0; Datum *columnValues = NULL; bool *columnNulls = NULL; - TypeCacheEntry *typeEntry = NULL; FmgrInfo *hashFunction = NULL; FmgrInfo *compareFunction = NULL; + bool hasUniformHashDistribution = false; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId); int shardCount = 0; List *shardIntervalList = NULL; @@ -275,12 +265,11 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) Var *partitionColumn = PartitionColumn(tableId, 0); char partitionMethod = PartitionMethod(tableId); - /* resolve hash function for partition column */ - typeEntry = lookup_type_cache(partitionColumn->vartype, TYPECACHE_HASH_PROC_FINFO); - hashFunction = &(typeEntry->hash_proc_finfo); + /* get hash function for partition column */ + hashFunction = cacheEntry->hashFunction; - /* resolve compare function for shard intervals */ - compareFunction = ShardIntervalCompareFunction(partitionColumn, partitionMethod); + /* get compare function for shard intervals */ + compareFunction = cacheEntry->shardIntervalCompareFunction; /* allocate column values and nulls arrays */ distributedRelation = heap_open(tableId, RowExclusiveLock); @@ -311,16 +300,26 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) } } + /* error if any shard missing min/max values */ + if (cacheEntry->hasUninitializedShardInterval) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not start copy"), + errdetail("Distributed relation \"%s\" has shards " + "with missing shardminvalue/shardmaxvalue.", + relationName))); + } + /* prevent concurrent placement changes and non-commutative DML statements */ LockAllShards(shardIntervalList); /* initialize the shard interval cache */ - shardCount = list_length(shardIntervalList); - shardIntervalCache = SortedShardIntervalArray(shardIntervalList); + shardCount = cacheEntry->shardIntervalArrayLength; + shardIntervalCache = cacheEntry->sortedShardIntervalArray; + hasUniformHashDistribution = cacheEntry->hasUniformHashDistribution; /* determine whether to use binary search */ - if (partitionMethod != DISTRIBUTE_BY_HASH || - !IsUniformHashDistribution(shardIntervalCache, shardCount)) + if (partitionMethod != DISTRIBUTE_BY_HASH || !hasUniformHashDistribution) { useBinarySearch = true; } @@ -736,164 +735,6 @@ CompareShardIntervalsById(const void *leftElement, const void *rightElement) } -/* - * ShardIntervalCompareFunction returns the appropriate compare function for the - * partition column type. In case of hash-partitioning, it always returns the compare - * function for integers. - */ -static FmgrInfo * -ShardIntervalCompareFunction(Var *partitionColumn, char partitionMethod) -{ - FmgrInfo *compareFunction = NULL; - - if (partitionMethod == DISTRIBUTE_BY_HASH) - { - compareFunction = GetFunctionInfo(INT4OID, BTREE_AM_OID, BTORDER_PROC); - } - else if (partitionMethod == DISTRIBUTE_BY_RANGE) - { - compareFunction = GetFunctionInfo(partitionColumn->vartype, - BTREE_AM_OID, BTORDER_PROC); - } - else - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("unsupported partition method %d", partitionMethod))); - } - - return compareFunction; -} - - -/* - * IsUniformHashDistribution determines whether the given list of sorted shards - * has a uniform hash distribution, as produced by master_create_worker_shards. - */ -static bool -IsUniformHashDistribution(ShardInterval **shardIntervalArray, int shardCount) -{ - uint64 hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; - int shardIndex = 0; - - for (shardIndex = 0; shardIndex < shardCount; shardIndex++) - { - ShardInterval *shardInterval = shardIntervalArray[shardIndex]; - int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement); - int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); - - if (shardIndex == (shardCount - 1)) - { - shardMaxHashToken = INT32_MAX; - } - - if (DatumGetInt32(shardInterval->minValue) != shardMinHashToken || - DatumGetInt32(shardInterval->maxValue) != shardMaxHashToken) - { - return false; - } - } - - return true; -} - - -/* - * FindShardInterval finds a single shard interval in the cache for the - * given partition column value. - */ -static ShardInterval * -FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, - int shardCount, char partitionMethod, FmgrInfo *compareFunction, - FmgrInfo *hashFunction, bool useBinarySearch) -{ - ShardInterval *shardInterval = NULL; - - if (partitionMethod == DISTRIBUTE_BY_HASH) - { - int hashedValue = DatumGetInt32(FunctionCall1(hashFunction, - partitionColumnValue)); - if (useBinarySearch) - { - shardInterval = SearchCachedShardInterval(Int32GetDatum(hashedValue), - shardIntervalCache, shardCount, - compareFunction); - } - else - { - uint64 hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; - int shardIndex = (uint32) (hashedValue - INT32_MIN) / hashTokenIncrement; - - Assert(shardIndex <= shardCount); - - /* - * If the shard count is not power of 2, the range of the last - * shard becomes larger than others. For that extra piece of range, - * we still need to use the last shard. - */ - if (shardIndex == shardCount) - { - shardIndex = shardCount - 1; - } - - shardInterval = shardIntervalCache[shardIndex]; - } - } - else - { - shardInterval = SearchCachedShardInterval(partitionColumnValue, - shardIntervalCache, shardCount, - compareFunction); - } - - return shardInterval; -} - - -/* - * SearchCachedShardInterval performs a binary search for a shard interval matching a - * given partition column value and returns it. - */ -static ShardInterval * -SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, - int shardCount, FmgrInfo *compareFunction) -{ - int lowerBoundIndex = 0; - int upperBoundIndex = shardCount; - - while (lowerBoundIndex < upperBoundIndex) - { - int middleIndex = (lowerBoundIndex + upperBoundIndex) / 2; - int maxValueComparison = 0; - int minValueComparison = 0; - - minValueComparison = FunctionCall2Coll(compareFunction, - DEFAULT_COLLATION_OID, - partitionColumnValue, - shardIntervalCache[middleIndex]->minValue); - - if (DatumGetInt32(minValueComparison) < 0) - { - upperBoundIndex = middleIndex; - continue; - } - - maxValueComparison = FunctionCall2Coll(compareFunction, - DEFAULT_COLLATION_OID, - partitionColumnValue, - shardIntervalCache[middleIndex]->maxValue); - - if (DatumGetInt32(maxValueComparison) <= 0) - { - return shardIntervalCache[middleIndex]; - } - - lowerBoundIndex = middleIndex + 1; - } - - return NULL; -} - - /* * GetShardConnections finds existing connections for a shard in the hash * or opens new connections to each active placement and starts a (binary) COPY diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 1be66d944..05a598dbf 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -46,6 +46,9 @@ static uint64 * AllocateUint64(uint64 value); * LoadShardIntervalList returns a list of shard intervals related for a given * distributed table. The function returns an empty list if no shards can be * found for the given relation. + * Since LoadShardIntervalList relies on sortedShardIntervalArray, it returns + * a shard interval list whose elements are sorted on shardminvalue. Shard intervals + * with uninitialized shard min/max values are placed in the end of the list. */ List * LoadShardIntervalList(Oid relationId) @@ -59,7 +62,7 @@ LoadShardIntervalList(Oid relationId) ShardInterval *newShardInterval = NULL; newShardInterval = (ShardInterval *) palloc0(sizeof(ShardInterval)); - CopyShardInterval(&cacheEntry->shardIntervalArray[i], newShardInterval); + CopyShardInterval(cacheEntry->sortedShardIntervalArray[i], newShardInterval); shardList = lappend(shardList, newShardInterval); } @@ -91,6 +94,9 @@ ShardIntervalCount(Oid relationId) /* * LoadShardList reads list of shards for given relationId from pg_dist_shard, * and returns the list of found shardIds. + * Since LoadShardList relies on sortedShardIntervalArray, it returns a shard + * list whose elements are sorted on shardminvalue. Shards with uninitialized + * shard min/max values are placed in the end of the list. */ List * LoadShardList(Oid relationId) @@ -101,7 +107,7 @@ LoadShardList(Oid relationId) for (i = 0; i < cacheEntry->shardIntervalArrayLength; i++) { - ShardInterval *currentShardInterval = &cacheEntry->shardIntervalArray[i]; + ShardInterval *currentShardInterval = cacheEntry->sortedShardIntervalArray[i]; uint64 *shardIdPointer = AllocateUint64(currentShardInterval->shardId); shardList = lappend(shardList, shardIdPointer); diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index 9f14e4c89..58ec3ce66 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -56,7 +56,6 @@ static List * MergeShardIntervals(List *leftShardIntervalList, List *rightShardIntervalList, JoinType joinType); static bool ShardIntervalsMatch(List *leftShardIntervalList, List *rightShardIntervalList); -static List * LoadSortedShardIntervalList(Oid relationId); static List * JoinOrderForTable(TableEntry *firstTable, List *tableEntryList, List *joinClauseList); static List * BestJoinOrder(List *candidateJoinOrders); @@ -123,6 +122,22 @@ FixedJoinOrderList(FromExpr *fromExpr, List *tableEntryList) List *joinedTableList = NIL; JoinOrderNode *firstJoinNode = NULL; JoinOrderNode *currentJoinNode = NULL; + ListCell *tableEntryCell = NULL; + + foreach(tableEntryCell, tableEntryList) + { + TableEntry *rangeTableEntry = (TableEntry *) lfirst(tableEntryCell); + Oid relationId = rangeTableEntry->relationId; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); + + if (cacheEntry->hasUninitializedShardInterval) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform distributed planning on this query"), + errdetail("Shards of relations in outer join queries must " + "have shard min/max values."))); + } + } /* get the FROM section as a flattened list of JoinExpr nodes */ joinList = JoinExprList(fromExpr); @@ -159,8 +174,8 @@ FixedJoinOrderList(FromExpr *fromExpr, List *tableEntryList) joinClauseList = list_concat(joinClauseList, joinWhereClauseList); } - /* get the list of shards to check broadcast/local join possibility */ - candidateShardList = LoadSortedShardIntervalList(nextTable->relationId); + /* get the sorted list of shards to check broadcast/local join possibility */ + candidateShardList = LoadShardIntervalList(nextTable->relationId); /* find the best join rule type */ nextJoinNode = EvaluateJoinRules(joinedTableList, currentJoinNode, @@ -268,8 +283,7 @@ CreateFirstJoinOrderNode(FromExpr *fromExpr, List *tableEntryList) firstPartitionColumn, firstPartitionMethod); - firstJoinNode->shardIntervalList = - LoadSortedShardIntervalList(firstTable->relationId); + firstJoinNode->shardIntervalList = LoadShardIntervalList(firstTable->relationId); return firstJoinNode; } @@ -462,40 +476,6 @@ MergeShardIntervals(List *leftShardIntervalList, List *rightShardIntervalList, } -/* - * LoadSortedShardIntervalList loads a list of shard intervals from the metadata - * and sorts the list by the minimum value of the intervals. - */ -static List * -LoadSortedShardIntervalList(Oid relationId) -{ - List *shardIntervalList = NIL; - int shardCount = 0; - int intervalIndex = 0; - ShardInterval **sortedShardIntervalArray = NULL; - List *sortedShardIntervalList = NIL; - - shardIntervalList = LoadShardIntervalList(relationId); - - shardCount = list_length(shardIntervalList); - if (shardCount <= 1) - { - return shardIntervalList; - } - - sortedShardIntervalArray = SortedShardIntervalArray(shardIntervalList); - - for (intervalIndex = 0; intervalIndex < shardCount; intervalIndex++) - { - ShardInterval *shardInterval = sortedShardIntervalArray[intervalIndex]; - - sortedShardIntervalList = lappend(sortedShardIntervalList, shardInterval); - } - - return sortedShardIntervalList; -} - - /* * JoinOnColumns determines whether two columns are joined by a given join clause * list. diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 25c15d48d..e68804d8c 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -26,6 +26,7 @@ #include "catalog/pg_type.h" #include "commands/extension.h" #include "distributed/citus_nodes.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" @@ -137,7 +138,7 @@ static bool SupportedLateralQuery(Query *parentQuery, Query *lateralQuery); static bool JoinOnPartitionColumn(Query *query); static void ErrorIfUnsupportedShardDistribution(Query *query); static List * RelationIdList(Query *query); -static bool CoPartitionedTables(List *firstShardList, List *secondShardList); +static bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); static bool ShardIntervalsEqual(ShardInterval *firstInterval, ShardInterval *secondInterval); static void ErrorIfUnsupportedFilters(Query *subquery); @@ -2310,8 +2311,8 @@ TablePartitioningSupportsDistinct(List *tableNodeList, MultiExtendedOp *opNode, */ partitionMethod = PartitionMethod(relationId); - if (partitionMethod == DISTRIBUTE_BY_RANGE - || partitionMethod == DISTRIBUTE_BY_HASH) + if (partitionMethod == DISTRIBUTE_BY_RANGE || + partitionMethod == DISTRIBUTE_BY_HASH) { Var *tablePartitionColumn = tableNode->partitionColumn; bool groupedByPartitionColumn = false; @@ -3386,7 +3387,7 @@ JoinOnPartitionColumn(Query *query) static void ErrorIfUnsupportedShardDistribution(Query *query) { - List *firstShardIntervalList = NIL; + Oid firstTableRelationId = InvalidOid; List *relationIdList = RelationIdList(query); ListCell *relationIdCell = NULL; uint32 relationIndex = 0; @@ -3425,21 +3426,21 @@ ErrorIfUnsupportedShardDistribution(Query *query) foreach(relationIdCell, relationIdList) { Oid relationId = lfirst_oid(relationIdCell); - List *currentShardIntervalList = LoadShardIntervalList(relationId); bool coPartitionedTables = false; + Oid currentRelationId = relationId; /* get shard list of first relation and continue for the next relation */ if (relationIndex == 0) { - firstShardIntervalList = currentShardIntervalList; + firstTableRelationId = relationId; relationIndex++; continue; } /* check if this table has 1-1 shard partitioning with first table */ - coPartitionedTables = CoPartitionedTables(firstShardIntervalList, - currentShardIntervalList); + coPartitionedTables = CoPartitionedTables(firstTableRelationId, + currentRelationId); if (!coPartitionedTables) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -3478,21 +3479,23 @@ RelationIdList(Query *query) /* - * CoPartitionedTables checks if given shard lists have 1-to-1 shard partitioning. - * It first sorts both list according to shard interval minimum values. Then it - * compares every shard interval in order and if any pair of shard intervals are - * not equal it returns false. + * CoPartitionedTables checks if given two distributed tables have 1-to-1 shard + * partitioning. It uses shard interval array that are sorted on interval minimum + * values. Then it compares every shard interval in order and if any pair of + * shard intervals are not equal it returns false. */ static bool -CoPartitionedTables(List *firstShardList, List *secondShardList) +CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) { bool coPartitionedTables = true; uint32 intervalIndex = 0; - ShardInterval **sortedFirstIntervalArray = NULL; - ShardInterval **sortedSecondIntervalArray = NULL; - - uint32 firstListShardCount = list_length(firstShardList); - uint32 secondListShardCount = list_length(secondShardList); + DistTableCacheEntry *firstTableCache = DistributedTableCacheEntry(firstRelationId); + DistTableCacheEntry *secondTableCache = DistributedTableCacheEntry(secondRelationId); + ShardInterval **sortedFirstIntervalArray = firstTableCache->sortedShardIntervalArray; + ShardInterval **sortedSecondIntervalArray = + secondTableCache->sortedShardIntervalArray; + uint32 firstListShardCount = firstTableCache->shardIntervalArrayLength; + uint32 secondListShardCount = secondTableCache->shardIntervalArrayLength; if (firstListShardCount != secondListShardCount) { @@ -3505,9 +3508,6 @@ CoPartitionedTables(List *firstShardList, List *secondShardList) return true; } - sortedFirstIntervalArray = SortedShardIntervalArray(firstShardList); - sortedSecondIntervalArray = SortedShardIntervalArray(secondShardList); - for (intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++) { ShardInterval *firstInterval = sortedFirstIntervalArray[intervalIndex]; diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 853cb886e..9d927c99f 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -37,6 +37,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" +#include "distributed/shardinterval_utils.h" #include "distributed/task_tracker.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" @@ -110,8 +111,6 @@ static MapMergeJob * BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Oid baseRelationId, BoundaryNodeJobType boundaryNodeJobType); static uint32 HashPartitionCount(void); -static int CompareShardIntervals(const void *leftElement, const void *rightElement, - FmgrInfo *typeCompareFunction); static ArrayType * SplitPointObject(ShardInterval **shardIntervalArray, uint32 shardIntervalCount); @@ -1716,10 +1715,17 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey, else if (partitionType == RANGE_PARTITION_TYPE) { /* build the split point object for the table on the right-hand side */ - List *shardIntervalList = LoadShardIntervalList(baseRelationId); - uint32 shardCount = (uint32) list_length(shardIntervalList); - ShardInterval **sortedShardIntervalArray = - SortedShardIntervalArray(shardIntervalList); + DistTableCacheEntry *cache = DistributedTableCacheEntry(baseRelationId); + bool hasUninitializedShardInterval = false; + uint32 shardCount = cache->shardIntervalArrayLength; + ShardInterval **sortedShardIntervalArray = cache->sortedShardIntervalArray; + + hasUninitializedShardInterval = cache->hasUninitializedShardInterval; + if (hasUninitializedShardInterval) + { + ereport(ERROR, (errmsg("cannot range repartition shard with " + "missing min/max values"))); + } /* this join-type currently doesn't work for hash partitioned tables */ char basePartitionMethod PG_USED_FOR_ASSERTS_ONLY = @@ -1754,78 +1760,6 @@ HashPartitionCount(void) } -/* - * SortedShardIntervalArray returns a sorted array of shard intervals for shards - * in the given shard list. The array elements are sorted in in ascending order - * according to shard interval's minimum value. - */ -ShardInterval ** -SortedShardIntervalArray(List *shardIntervalList) -{ - FmgrInfo *typeCompareFunction = NULL; - ListCell *shardIntervalCell = NULL; - uint32 shardIntervalIndex = 0; - - ShardInterval **shardIntervalArray = NULL; - uint32 shardIntervalCount = (uint32) list_length(shardIntervalList); - Assert(shardIntervalCount > 0); - - /* allocate an array for sorted shard intervals */ - shardIntervalArray = palloc0(shardIntervalCount * sizeof(ShardInterval *)); - - /* fill in the array with shard intervals */ - foreach(shardIntervalCell, shardIntervalList) - { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); - if (shardInterval->minValueExists && shardInterval->maxValueExists) - { - shardIntervalArray[shardIntervalIndex] = shardInterval; - } - else - { - ereport(ERROR, (errmsg("cannot range repartition shard " UINT64_FORMAT - " with missing min/max values", - shardInterval->shardId))); - } - - /* resolve the datum type and comparison function on first pass */ - if (shardIntervalIndex == 0) - { - Oid typeId = shardInterval->valueTypeId; - typeCompareFunction = GetFunctionInfo(typeId, BTREE_AM_OID, BTORDER_PROC); - } - shardIntervalIndex++; - } - - /* sort shard intervals by their minimum values in ascending order */ - qsort_arg(shardIntervalArray, shardIntervalCount, sizeof(ShardInterval *), - (qsort_arg_comparator) CompareShardIntervals, (void *) typeCompareFunction); - - return shardIntervalArray; -} - - -/* - * CompareShardIntervals acts as a helper function to compare two shard interval - * pointers by their minimum values, using the value's type comparison function. - */ -static int -CompareShardIntervals(const void *leftElement, const void *rightElement, - FmgrInfo *typeCompareFunction) -{ - ShardInterval **leftShardInterval = (ShardInterval **) leftElement; - ShardInterval **rightShardInterval = (ShardInterval **) rightElement; - - Datum leftDatum = (*leftShardInterval)->minValue; - Datum rightDatum = (*rightShardInterval)->minValue; - - Datum comparisonDatum = CompareCall2(typeCompareFunction, leftDatum, rightDatum); - int comparisonResult = DatumGetInt32(comparisonDatum); - - return comparisonResult; -} - - /* * SplitPointObject walks over shard intervals in the given array, extracts each * shard interval's minimum value, sorts and inserts these minimum values into a @@ -2031,10 +1965,9 @@ SubquerySqlTaskList(Job *job) List *shardIntervalList = LoadShardIntervalList(relationId); List *finalShardIntervalList = NIL; ListCell *fragmentCombinationCell = NULL; - ShardInterval **sortedIntervalArray = NULL; + ListCell *shardIntervalCell = NULL; uint32 tableId = rangeTableIndex + 1; /* tableId starts from 1 */ uint32 finalShardCount = 0; - uint32 shardIndex = 0; if (opExpressionList != NIL) { @@ -2056,12 +1989,11 @@ SubquerySqlTaskList(Job *job) return NIL; } - sortedIntervalArray = SortedShardIntervalArray(finalShardIntervalList); fragmentCombinationCell = list_head(fragmentCombinationList); - for (shardIndex = 0; shardIndex < finalShardCount; shardIndex++) + foreach(shardIntervalCell, finalShardIntervalList) { - ShardInterval *shardInterval = sortedIntervalArray[shardIndex]; + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); RangeTableFragment *shardFragment = palloc0(fragmentSize); shardFragment->fragmentReference = &(shardInterval->shardId); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 90b654c90..4a5df7826 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -33,6 +33,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" +#include "distributed/shardinterval_utils.h" #include "executor/execdesc.h" #include "lib/stringinfo.h" #if (PG_VERSION_NUM >= 90500) @@ -51,6 +52,7 @@ #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/relcache.h" +#include "utils/typcache.h" /* planner functions forward declarations */ @@ -62,8 +64,11 @@ static OnConflictExpr * RebuildOnConflict(Oid relationId, #endif static ShardInterval * TargetShardInterval(Query *query); static List * QueryRestrictList(Query *query); +static bool FastShardPruningPossible(CmdType commandType, char partitionMethod); +static ShardInterval * FastShardPruning(Oid distributedTableId, + Const *partionColumnValue); static Oid ExtractFirstDistributedTableId(Query *query); -static Const * ExtractPartitionValue(Query *query, Var *partitionColumn); +static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn); static Task * RouterSelectTask(Query *query); static Job * RouterQueryJob(Query *query, Task *task); static bool ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column); @@ -507,18 +512,19 @@ TargetShardInterval(Query *query) { CmdType commandType = query->commandType; bool selectTask = (commandType == CMD_SELECT); - List *restrictClauseList = NIL; List *prunedShardList = NIL; - Index tableId = 1; int prunedShardCount = 0; + int shardCount = 0; Oid distributedTableId = ExtractFirstDistributedTableId(query); - List *shardIntervalList = NIL; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + char partitionMethod = cacheEntry->partitionMethod; + bool fastShardPruningPossible = false; /* error out if no shards exist for the table */ - shardIntervalList = LoadShardIntervalList(distributedTableId); - if (shardIntervalList == NIL) + shardCount = cacheEntry->shardIntervalArrayLength; + if (shardCount == 0) { char *relationName = get_rel_name(distributedTableId); @@ -530,9 +536,31 @@ TargetShardInterval(Query *query) "and try again."))); } - restrictClauseList = QueryRestrictList(query); - prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList, - shardIntervalList); + fastShardPruningPossible = FastShardPruningPossible(query->commandType, + partitionMethod); + if (fastShardPruningPossible) + { + uint32 rangeTableId = 1; + Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId); + Const *partitionValue = ExtractInsertPartitionValue(query, partitionColumn); + ShardInterval *shardInterval = FastShardPruning(distributedTableId, + partitionValue); + + if (shardInterval != NULL) + { + prunedShardList = lappend(prunedShardList, shardInterval); + } + } + else + { + List *restrictClauseList = QueryRestrictList(query); + Index tableId = 1; + List *shardIntervalList = LoadShardIntervalList(distributedTableId); + + prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList, + shardIntervalList); + } + prunedShardCount = list_length(prunedShardList); if (prunedShardCount != 1) { @@ -554,6 +582,74 @@ TargetShardInterval(Query *query) } +/* + * UseFastShardPruning returns true if the commandType is INSERT and partition method + * is hash or range. + */ +static bool +FastShardPruningPossible(CmdType commandType, char partitionMethod) +{ + /* we currently only support INSERTs */ + if (commandType != CMD_INSERT) + { + return false; + } + + /* fast shard pruning is only supported for hash and range partitioned tables */ + if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE) + { + return true; + } + + return false; +} + + +/* + * FastShardPruning is a higher level API for FindShardInterval function. Given the relationId + * of the distributed table and partitionValue, FastShardPruning function finds the corresponding + * shard interval that the partitionValue should be in. FastShardPruning returns NULL if no + * ShardIntervals exist for the given partitionValue. + */ +static ShardInterval * +FastShardPruning(Oid distributedTableId, Const *partitionValue) +{ + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + int shardCount = cacheEntry->shardIntervalArrayLength; + ShardInterval **sortedShardIntervalArray = cacheEntry->sortedShardIntervalArray; + bool useBinarySearch = false; + char partitionMethod = cacheEntry->partitionMethod; + FmgrInfo *shardIntervalCompareFunction = cacheEntry->shardIntervalCompareFunction; + bool hasUniformHashDistribution = cacheEntry->hasUniformHashDistribution; + FmgrInfo *hashFunction = NULL; + ShardInterval *shardInterval = NULL; + + /* determine whether to use binary search */ + if (partitionMethod != DISTRIBUTE_BY_HASH || !hasUniformHashDistribution) + { + useBinarySearch = true; + } + + /* we only need hash functions for hash distributed tables */ + if (partitionMethod == DISTRIBUTE_BY_HASH) + { + hashFunction = cacheEntry->hashFunction; + } + + /* + * Call FindShardInterval to find the corresponding shard interval for the + * given partition value. + */ + shardInterval = FindShardInterval(partitionValue->constvalue, + sortedShardIntervalArray, shardCount, + partitionMethod, + shardIntervalCompareFunction, hashFunction, + useBinarySearch); + + return shardInterval; +} + + /* * QueryRestrictList returns the restriction clauses for the query. For a SELECT * statement these are the where-clause expressions. For INSERT statements we @@ -572,7 +668,7 @@ QueryRestrictList(Query *query) Oid distributedTableId = ExtractFirstDistributedTableId(query); uint32 rangeTableId = 1; Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId); - Const *partitionValue = ExtractPartitionValue(query, partitionColumn); + Const *partitionValue = ExtractInsertPartitionValue(query, partitionColumn); OpExpr *equalityExpr = MakeOpExpression(partitionColumn, BTEqualStrategyNumber); @@ -628,11 +724,11 @@ ExtractFirstDistributedTableId(Query *query) /* * ExtractPartitionValue extracts the partition column value from a the target - * of a modification command. If a partition value is missing altogether or is + * of an INSERT command. If a partition value is missing altogether or is * NULL, this function throws an error. */ static Const * -ExtractPartitionValue(Query *query, Var *partitionColumn) +ExtractInsertPartitionValue(Query *query, Var *partitionColumn) { Const *partitionValue = NULL; TargetEntry *targetEntry = get_tle_by_resno(query->targetList, diff --git a/src/backend/distributed/test/prune_shard_list.c b/src/backend/distributed/test/prune_shard_list.c index b0d0eba9c..e78bfc802 100644 --- a/src/backend/distributed/test/prune_shard_list.c +++ b/src/backend/distributed/test/prune_shard_list.c @@ -22,6 +22,7 @@ #include "access/skey.h" #endif #include "catalog/pg_type.h" +#include "distributed/metadata_cache.h" #include "distributed/master_metadata_utility.h" #include "distributed/multi_join_order.h" #include "distributed/multi_physical_planner.h" @@ -38,6 +39,7 @@ /* local function forward declarations */ static Expr * MakeTextPartitionExpression(Oid distributedTableId, text *value); static ArrayType * PrunedShardIdsForTable(Oid distributedTableId, List *whereClauseList); +static ArrayType * SortedShardIntervalArray(Oid distributedTableId); /* declarations for dynamic loading */ @@ -46,6 +48,7 @@ PG_FUNCTION_INFO_V1(prune_using_single_value); PG_FUNCTION_INFO_V1(prune_using_either_value); PG_FUNCTION_INFO_V1(prune_using_both_values); PG_FUNCTION_INFO_V1(debug_equality_expression); +PG_FUNCTION_INFO_V1(print_sorted_shard_intervals); /* @@ -140,6 +143,21 @@ debug_equality_expression(PG_FUNCTION_ARGS) } +/* + * print_sorted_shard_intervals prints the sorted shard interval array that is in the + * metadata cache. This function aims to test sorting functionality. + */ +Datum +print_sorted_shard_intervals(PG_FUNCTION_ARGS) +{ + Oid distributedTableId = PG_GETARG_OID(0); + + ArrayType *shardIdArrayType = SortedShardIntervalArray(distributedTableId); + + PG_RETURN_ARRAYTYPE_P(shardIdArrayType); +} + + /* * MakeTextPartitionExpression returns an equality expression between the * specified table's partition column and the provided values. @@ -212,3 +230,34 @@ PrunedShardIdsForTable(Oid distributedTableId, List *whereClauseList) return shardIdArrayType; } + + +/* + * SortedShardIntervalArray simply returns the shard interval ids in the sorted shard + * interval cache as a datum array. + */ +static ArrayType * +SortedShardIntervalArray(Oid distributedTableId) +{ + ArrayType *shardIdArrayType = NULL; + int shardIndex = 0; + Oid shardIdTypeId = INT8OID; + + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + ShardInterval **shardIntervalArray = cacheEntry->sortedShardIntervalArray; + int shardIdCount = cacheEntry->shardIntervalArrayLength; + Datum *shardIdDatumArray = palloc0(shardIdCount * sizeof(Datum)); + + for (shardIndex = 0; shardIndex < shardIdCount; ++shardIndex) + { + ShardInterval *shardId = shardIntervalArray[shardIndex]; + Datum shardIdDatum = Int64GetDatum(shardId->shardId); + + shardIdDatumArray[shardIndex] = shardIdDatum; + } + + shardIdArrayType = DatumArrayToArrayType(shardIdDatumArray, shardIdCount, + shardIdTypeId); + + return shardIdArrayType; +} diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 105179208..9c8a81edc 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -12,6 +12,7 @@ #include "access/genam.h" #include "access/heapam.h" #include "access/htup_details.h" +#include "access/nbtree.h" #include "access/xact.h" #include "catalog/indexing.h" #include "catalog/pg_namespace.h" @@ -22,6 +23,8 @@ #include "distributed/metadata_cache.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" +#include "distributed/shardinterval_utils.h" +#include "distributed/worker_protocol.h" #include "parser/parse_func.h" #include "utils/builtins.h" #include "utils/catcache.h" @@ -34,6 +37,7 @@ #include "utils/relfilenodemap.h" #include "utils/relmapper.h" #include "utils/syscache.h" +#include "utils/typcache.h" /* state which should be cleared upon DROP EXTENSION */ @@ -57,6 +61,16 @@ static ScanKeyData DistShardScanKey[1]; /* local function forward declarations */ static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId); +static FmgrInfo * ShardIntervalCompareFunction(ShardInterval **shardIntervalArray, + char partitionMethod); +static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArray, + int shardCount, + FmgrInfo * + shardIntervalSortCompareFunction); +static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, + int shardIntervalArrayLength); +static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, + int shardCount); static void InitializeDistTableCache(void); static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); @@ -202,7 +216,12 @@ LookupDistTableCacheEntry(Oid relationId) char partitionMethod = 0; List *distShardTupleList = NIL; int shardIntervalArrayLength = 0; - ShardInterval *shardIntervalArray = NULL; + ShardInterval **shardIntervalArray = NULL; + ShardInterval **sortedShardIntervalArray = NULL; + FmgrInfo *shardIntervalCompareFunction = NULL; + FmgrInfo *hashFunction = NULL; + bool hasUninitializedShardInterval = false; + bool hasUniformHashDistribution = false; void *hashKey = (void *) &relationId; if (DistTableCacheHash == NULL) @@ -257,7 +276,7 @@ LookupDistTableCacheEntry(Oid relationId) shardIntervalArray = MemoryContextAllocZero(CacheMemoryContext, shardIntervalArrayLength * - sizeof(ShardInterval)); + sizeof(ShardInterval *)); foreach(distShardTupleCell, distShardTupleList) { @@ -266,9 +285,12 @@ LookupDistTableCacheEntry(Oid relationId) distShardTupleDesc, intervalTypeId, intervalTypeMod); + ShardInterval *newShardInterval = NULL; MemoryContext oldContext = MemoryContextSwitchTo(CacheMemoryContext); - CopyShardInterval(shardInterval, &shardIntervalArray[arrayIndex]); + newShardInterval = (ShardInterval *) palloc0(sizeof(ShardInterval)); + CopyShardInterval(shardInterval, newShardInterval); + shardIntervalArray[arrayIndex] = newShardInterval; MemoryContextSwitchTo(oldContext); @@ -280,6 +302,50 @@ LookupDistTableCacheEntry(Oid relationId) heap_close(distShardRelation, AccessShareLock); } + /* decide and allocate interval comparison function */ + if (shardIntervalArrayLength > 0) + { + MemoryContext oldContext = CurrentMemoryContext; + + /* allocate the comparison function in the cache context */ + oldContext = MemoryContextSwitchTo(CacheMemoryContext); + + shardIntervalCompareFunction = ShardIntervalCompareFunction(shardIntervalArray, + partitionMethod); + + MemoryContextSwitchTo(oldContext); + } + + /* sort the interval array */ + sortedShardIntervalArray = SortShardIntervalArray(shardIntervalArray, + shardIntervalArrayLength, + shardIntervalCompareFunction); + + /* check if there exists any shard intervals with no min/max values */ + hasUninitializedShardInterval = + HasUninitializedShardInterval(sortedShardIntervalArray, shardIntervalArrayLength); + + /* we only need hash functions for hash distributed tables */ + if (partitionMethod == DISTRIBUTE_BY_HASH) + { + TypeCacheEntry *typeEntry = NULL; + Node *partitionNode = stringToNode(partitionKeyString); + Var *partitionColumn = (Var *) partitionNode; + Assert(IsA(partitionNode, Var)); + typeEntry = lookup_type_cache(partitionColumn->vartype, + TYPECACHE_HASH_PROC_FINFO); + + hashFunction = MemoryContextAllocZero(CacheMemoryContext, + sizeof(FmgrInfo)); + + fmgr_info_copy(hashFunction, &(typeEntry->hash_proc_finfo), CacheMemoryContext); + + /* check the shard distribution for hash partitioned tables */ + hasUniformHashDistribution = + HasUniformHashDistribution(sortedShardIntervalArray, + shardIntervalArrayLength); + } + cacheEntry = hash_search(DistTableCacheHash, hashKey, HASH_ENTER, NULL); /* zero out entry, but not the key part */ @@ -298,13 +364,151 @@ LookupDistTableCacheEntry(Oid relationId) cacheEntry->partitionKeyString = partitionKeyString; cacheEntry->partitionMethod = partitionMethod; cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength; - cacheEntry->shardIntervalArray = shardIntervalArray; + cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray; + cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction; + cacheEntry->hashFunction = hashFunction; + cacheEntry->hasUninitializedShardInterval = hasUninitializedShardInterval; + cacheEntry->hasUniformHashDistribution = hasUniformHashDistribution; } return cacheEntry; } +/* + * ShardIntervalCompareFunction returns the appropriate compare function for the + * partition column type. In case of hash-partitioning, it always returns the compare + * function for integers. Callers of this function has to ensure that shardIntervalArray + * has at least one element. + */ +static FmgrInfo * +ShardIntervalCompareFunction(ShardInterval **shardIntervalArray, char partitionMethod) +{ + FmgrInfo *shardIntervalCompareFunction = NULL; + Oid comparisonTypeId = InvalidOid; + + Assert(shardIntervalArray != NULL); + + if (partitionMethod == DISTRIBUTE_BY_HASH) + { + comparisonTypeId = INT4OID; + } + else + { + ShardInterval *shardInterval = shardIntervalArray[0]; + comparisonTypeId = shardInterval->valueTypeId; + } + + shardIntervalCompareFunction = GetFunctionInfo(comparisonTypeId, BTREE_AM_OID, + BTORDER_PROC); + + return shardIntervalCompareFunction; +} + + +/* + * SortedShardIntervalArray sorts the input shardIntervalArray. Shard intervals with + * no min/max values are placed at the end of the array. + */ +static ShardInterval ** +SortShardIntervalArray(ShardInterval **shardIntervalArray, int shardCount, + FmgrInfo *shardIntervalSortCompareFunction) +{ + ShardInterval **sortedShardIntervalArray = NULL; + + /* short cut if there are no shard intervals in the array */ + if (shardCount == 0) + { + return shardIntervalArray; + } + + /* if a shard doesn't have min/max values, it's placed in the end of the array */ + qsort_arg(shardIntervalArray, shardCount, sizeof(ShardInterval *), + (qsort_arg_comparator) CompareShardIntervals, + (void *) shardIntervalSortCompareFunction); + + sortedShardIntervalArray = shardIntervalArray; + + return sortedShardIntervalArray; +} + + +/* + * HasUniformHashDistribution determines whether the given list of sorted shards + * has a uniform hash distribution, as produced by master_create_worker_shards for + * hash partitioned tables. + */ +static bool +HasUniformHashDistribution(ShardInterval **shardIntervalArray, + int shardIntervalArrayLength) +{ + uint64 hashTokenIncrement = 0; + int shardIndex = 0; + + /* if there are no shards, there is no uniform distribution */ + if (shardIntervalArrayLength == 0) + { + return false; + } + + /* calculate the hash token increment */ + hashTokenIncrement = HASH_TOKEN_COUNT / shardIntervalArrayLength; + + for (shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++) + { + ShardInterval *shardInterval = shardIntervalArray[shardIndex]; + int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement); + int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); + + if (shardIndex == (shardIntervalArrayLength - 1)) + { + shardMaxHashToken = INT32_MAX; + } + + if (DatumGetInt32(shardInterval->minValue) != shardMinHashToken || + DatumGetInt32(shardInterval->maxValue) != shardMaxHashToken) + { + return false; + } + } + + return true; +} + + +/* + * HasUninitializedShardInterval returns true if all the elements of the + * sortedShardIntervalArray has min/max values. Callers of the function must + * ensure that input shard interval array is sorted on shardminvalue and uninitialized + * shard intervals are at the end of the array. + */ +static bool +HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount) +{ + bool hasUninitializedShardInterval = false; + ShardInterval *lastShardInterval = NULL; + + if (shardCount == 0) + { + return hasUninitializedShardInterval; + } + + Assert(sortedShardIntervalArray != NULL); + + /* + * Since the shard interval array is sorted, and uninitialized ones stored + * in the end of the array, checking the last element is enough. + */ + lastShardInterval = sortedShardIntervalArray[shardCount - 1]; + if (!lastShardInterval->minValueExists || !lastShardInterval->maxValueExists) + { + hasUninitializedShardInterval = true; + } + + return hasUninitializedShardInterval; +} + + /* * CitusHasBeenLoaded returns true if the citus extension has been created * in the current database and the extension script has been executed. Otherwise, @@ -628,7 +832,7 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry) for (i = 0; i < cacheEntry->shardIntervalArrayLength; i++) { - ShardInterval *shardInterval = &cacheEntry->shardIntervalArray[i]; + ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[i]; bool valueByVal = shardInterval->valueByVal; if (!valueByVal) @@ -643,11 +847,26 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry) pfree(DatumGetPointer(shardInterval->maxValue)); } } + + pfree(shardInterval); } - pfree(cacheEntry->shardIntervalArray); - cacheEntry->shardIntervalArray = NULL; + pfree(cacheEntry->sortedShardIntervalArray); + cacheEntry->sortedShardIntervalArray = NULL; cacheEntry->shardIntervalArrayLength = 0; + + cacheEntry->hasUninitializedShardInterval = false; + cacheEntry->hasUniformHashDistribution = false; + + pfree(cacheEntry->shardIntervalCompareFunction); + cacheEntry->shardIntervalCompareFunction = NULL; + + /* we only allocated hash function for hash distributed tables */ + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH) + { + pfree(cacheEntry->hashFunction); + cacheEntry->hashFunction = NULL; + } } } diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c new file mode 100644 index 000000000..68acb56e0 --- /dev/null +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -0,0 +1,179 @@ +/*------------------------------------------------------------------------- + * + * shardinterval_utils.c + * + * This file contains functions to perform useful operations on shard intervals. + * + * Copyright (c) 2014-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/nbtree.h" +#include "catalog/pg_am.h" +#include "catalog/pg_collation.h" +#include "catalog/pg_type.h" +#include "distributed/shardinterval_utils.h" +#include "distributed/pg_dist_partition.h" +#include "distributed/worker_protocol.h" +#include "utils/catcache.h" +#include "utils/memutils.h" + + +static ShardInterval * SearchCachedShardInterval(Datum partitionColumnValue, + ShardInterval **shardIntervalCache, + int shardCount, + FmgrInfo *compareFunction); + + +/* + * CompareShardIntervals acts as a helper function to compare two shard intervals + * by their minimum values, using the value's type comparison function. + * + * If a shard interval does not have min/max value, it's treated as being greater + * than the other. + */ +int +CompareShardIntervals(const void *leftElement, const void *rightElement, + FmgrInfo *typeCompareFunction) +{ + ShardInterval *leftShardInterval = *((ShardInterval **) leftElement); + ShardInterval *rightShardInterval = *((ShardInterval **) rightElement); + Datum leftDatum = 0; + Datum rightDatum = 0; + Datum comparisonDatum = 0; + int comparisonResult = 0; + + Assert(typeCompareFunction != NULL); + + /* + * Left element should be treated as the greater element in case it doesn't + * have min or max values. + */ + if (!leftShardInterval->minValueExists || !leftShardInterval->maxValueExists) + { + comparisonResult = 1; + return comparisonResult; + } + + /* + * Right element should be treated as the greater element in case it doesn't + * have min or max values. + */ + if (!rightShardInterval->minValueExists || !rightShardInterval->maxValueExists) + { + comparisonResult = -1; + return comparisonResult; + } + + /* if both shard interval have min/max values, calculate the comparison result */ + leftDatum = leftShardInterval->minValue; + rightDatum = rightShardInterval->minValue; + + comparisonDatum = CompareCall2(typeCompareFunction, leftDatum, rightDatum); + comparisonResult = DatumGetInt32(comparisonDatum); + + return comparisonResult; +} + + +/* + * FindShardInterval finds a single shard interval in the cache for the + * given partition column value. + */ +ShardInterval * +FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, + int shardCount, char partitionMethod, FmgrInfo *compareFunction, + FmgrInfo *hashFunction, bool useBinarySearch) +{ + ShardInterval *shardInterval = NULL; + + if (partitionMethod == DISTRIBUTE_BY_HASH) + { + int hashedValue = DatumGetInt32(FunctionCall1(hashFunction, + partitionColumnValue)); + if (useBinarySearch) + { + Assert(compareFunction != NULL); + + shardInterval = SearchCachedShardInterval(Int32GetDatum(hashedValue), + shardIntervalCache, shardCount, + compareFunction); + } + else + { + uint64 hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; + int shardIndex = (uint32) (hashedValue - INT32_MIN) / hashTokenIncrement; + + Assert(shardIndex <= shardCount); + + /* + * If the shard count is not power of 2, the range of the last + * shard becomes larger than others. For that extra piece of range, + * we still need to use the last shard. + */ + if (shardIndex == shardCount) + { + shardIndex = shardCount - 1; + } + + shardInterval = shardIntervalCache[shardIndex]; + } + } + else + { + Assert(compareFunction != NULL); + + shardInterval = SearchCachedShardInterval(partitionColumnValue, + shardIntervalCache, shardCount, + compareFunction); + } + + return shardInterval; +} + + +/* + * SearchCachedShardInterval performs a binary search for a shard interval matching a + * given partition column value and returns it. + */ +static ShardInterval * +SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, + int shardCount, FmgrInfo *compareFunction) +{ + int lowerBoundIndex = 0; + int upperBoundIndex = shardCount; + + while (lowerBoundIndex < upperBoundIndex) + { + int middleIndex = (lowerBoundIndex + upperBoundIndex) / 2; + int maxValueComparison = 0; + int minValueComparison = 0; + + minValueComparison = FunctionCall2Coll(compareFunction, + DEFAULT_COLLATION_OID, + partitionColumnValue, + shardIntervalCache[middleIndex]->minValue); + + if (DatumGetInt32(minValueComparison) < 0) + { + upperBoundIndex = middleIndex; + continue; + } + + maxValueComparison = FunctionCall2Coll(compareFunction, + DEFAULT_COLLATION_OID, + partitionColumnValue, + shardIntervalCache[middleIndex]->maxValue); + + if (DatumGetInt32(maxValueComparison) <= 0) + { + return shardIntervalCache[middleIndex]; + } + + lowerBoundIndex = middleIndex + 1; + } + + return NULL; +} diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 15dddc30a..32485e152 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -11,6 +11,7 @@ #ifndef METADATA_CACHE_H #define METADATA_CACHE_H +#include "fmgr.h" #include "distributed/master_metadata_utility.h" #include "distributed/pg_dist_partition.h" @@ -31,6 +32,8 @@ typedef struct bool isValid; bool isDistributedTable; + bool hasUninitializedShardInterval; + bool hasUniformHashDistribution; /* valid for hash partitioned tables */ /* pg_dist_partition metadata for this table */ char *partitionKeyString; @@ -38,7 +41,10 @@ typedef struct /* pg_dist_shard metadata (variable-length ShardInterval array) for this table */ int shardIntervalArrayLength; - ShardInterval *shardIntervalArray; + ShardInterval **sortedShardIntervalArray; + + FmgrInfo *shardIntervalCompareFunction; /* NULL if no shard intervals exist */ + FmgrInfo *hashFunction; /* NULL if table is not distributed by hash */ } DistTableCacheEntry; diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 6b0fb4172..28fc6525b 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -237,7 +237,6 @@ extern bool OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitio extern int CompareShardPlacements(const void *leftElement, const void *rightElement); /* Function declarations for sorting shards. */ -extern ShardInterval ** SortedShardIntervalArray(List *shardList); extern bool ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterval); diff --git a/src/include/distributed/shardinterval_utils.h b/src/include/distributed/shardinterval_utils.h new file mode 100644 index 000000000..9d8d1ccf2 --- /dev/null +++ b/src/include/distributed/shardinterval_utils.h @@ -0,0 +1,35 @@ +/*------------------------------------------------------------------------- + * + * shardinterval_utils.h + * + * Declarations for public utility functions related to shard intervals. + * + * Copyright (c) 2014-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef SHARDINTERVAL_UTILS_H_ +#define SHARDINTERVAL_UTILS_H_ + +#include "distributed/master_metadata_utility.h" +#include "nodes/primnodes.h" + +/* OperatorCacheEntry contains information for each element in OperatorCache */ +typedef struct ShardIntervalCompareFunctionCacheEntry +{ + Var *partitionColumn; + char partitionMethod; + FmgrInfo *functionInfo; +} ShardIntervalCompareFunctionCacheEntry; + +extern int CompareShardIntervals(const void *leftElement, const void *rightElement, + FmgrInfo *typeCompareFunction); +extern ShardInterval * FindShardInterval(Datum partitionColumnValue, + ShardInterval **shardIntervalCache, + int shardCount, char partitionMethod, + FmgrInfo *compareFunction, + FmgrInfo *hashFunction, bool useBinarySearch); + + +#endif /* SHARDINTERVAL_UTILS_H_ */ diff --git a/src/test/regress/expected/multi_distribution_metadata.out b/src/test/regress/expected/multi_distribution_metadata.out index d626b179e..a096495e7 100644 --- a/src/test/regress/expected/multi_distribution_metadata.out +++ b/src/test/regress/expected/multi_distribution_metadata.out @@ -87,7 +87,7 @@ VALUES SELECT load_shard_id_array('events_hash'); load_shard_id_array --------------------- - {4,3,2,1} + {1,2,3,4} (1 row) -- should see array with first shard range diff --git a/src/test/regress/expected/multi_hash_pruning.out b/src/test/regress/expected/multi_hash_pruning.out index 782360c04..12b5317d3 100644 --- a/src/test/regress/expected/multi_hash_pruning.out +++ b/src/test/regress/expected/multi_hash_pruning.out @@ -73,25 +73,25 @@ DEBUG: predicate pruning for shardId 111 (1 row) EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2; -DEBUG: predicate pruning for shardId 113 DEBUG: predicate pruning for shardId 112 DEBUG: predicate pruning for shardId 110 +DEBUG: predicate pruning for shardId 113 QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported (1 row) EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3; -DEBUG: predicate pruning for shardId 111 DEBUG: predicate pruning for shardId 110 +DEBUG: predicate pruning for shardId 111 QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported (1 row) EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4; -DEBUG: predicate pruning for shardId 111 DEBUG: predicate pruning for shardId 110 +DEBUG: predicate pruning for shardId 111 QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported @@ -99,8 +99,8 @@ DEBUG: predicate pruning for shardId 110 EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is NULL; DEBUG: predicate pruning for shardId 112 -DEBUG: predicate pruning for shardId 111 DEBUG: predicate pruning for shardId 110 +DEBUG: predicate pruning for shardId 111 QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported @@ -231,14 +231,14 @@ DEBUG: predicate pruning for shardId 111 EXPLAIN SELECT count(*) FROM orders_hash_partitioned orders1, orders_hash_partitioned orders2 WHERE orders1.o_orderkey = orders2.o_orderkey; -DEBUG: join prunable for intervals [-1011077333,0] and [1134484726,1134484726] -DEBUG: join prunable for intervals [-1011077333,0] and [-1905060026,-1905060026] DEBUG: join prunable for intervals [-1905060026,-28094569] and [1134484726,1134484726] -DEBUG: join prunable for intervals [1134484726,1134484726] and [-1011077333,0] -DEBUG: join prunable for intervals [1134484726,1134484726] and [-1905060026,-28094569] -DEBUG: join prunable for intervals [1134484726,1134484726] and [-1905060026,-1905060026] DEBUG: join prunable for intervals [-1905060026,-1905060026] and [-1011077333,0] DEBUG: join prunable for intervals [-1905060026,-1905060026] and [1134484726,1134484726] +DEBUG: join prunable for intervals [-1011077333,0] and [-1905060026,-1905060026] +DEBUG: join prunable for intervals [-1011077333,0] and [1134484726,1134484726] +DEBUG: join prunable for intervals [1134484726,1134484726] and [-1905060026,-28094569] +DEBUG: join prunable for intervals [1134484726,1134484726] and [-1905060026,-1905060026] +DEBUG: join prunable for intervals [1134484726,1134484726] and [-1011077333,0] QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported @@ -252,8 +252,8 @@ EXPLAIN SELECT count(*) DEBUG: predicate pruning for shardId 113 DEBUG: predicate pruning for shardId 111 DEBUG: predicate pruning for shardId 112 -DEBUG: predicate pruning for shardId 111 DEBUG: predicate pruning for shardId 110 +DEBUG: predicate pruning for shardId 111 DEBUG: join prunable for intervals [-1905060026,-1905060026] and [-1011077333,0] QUERY PLAN ---------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_index_statements.out b/src/test/regress/expected/multi_index_statements.out index d6f903a94..f32d15cf6 100644 --- a/src/test/regress/expected/multi_index_statements.out +++ b/src/test/regress/expected/multi_index_statements.out @@ -137,11 +137,11 @@ ERROR: creating unique indexes on append-partitioned tables is currently unsupp CREATE INDEX lineitem_orderkey_index ON lineitem (l_orderkey); ERROR: relation "lineitem_orderkey_index" already exists CREATE INDEX try_index ON lineitem USING gist (l_orderkey); -WARNING: could not receive query results from localhost:57638 +WARNING: could not receive query results from localhost:57637 DETAIL: Client error: data type bigint has no default operator class for access method "gist" ERROR: could not execute DDL command on worker node shards CREATE INDEX try_index ON lineitem (non_existent_column); -WARNING: could not receive query results from localhost:57638 +WARNING: could not receive query results from localhost:57637 DETAIL: Client error: column "non_existent_column" does not exist ERROR: could not execute DDL command on worker node shards CREATE INDEX ON lineitem (l_orderkey); diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index 9fd9cac07..ffc9ae245 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -8,34 +8,34 @@ SET client_min_messages TO DEBUG2; EXPLAIN SELECT l1.l_quantity FROM lineitem l1, lineitem l2 WHERE l1.l_orderkey = l2.l_orderkey AND l1.l_quantity > 5; LOG: join order: [ "lineitem" ][ local partition join "lineitem" ] -DEBUG: join prunable for intervals [13921,14947] and [11554,13920] -DEBUG: join prunable for intervals [13921,14947] and [8997,11554] -DEBUG: join prunable for intervals [13921,14947] and [4965,5986] -DEBUG: join prunable for intervals [13921,14947] and [2497,4964] -DEBUG: join prunable for intervals [13921,14947] and [1,2496] -DEBUG: join prunable for intervals [11554,13920] and [13921,14947] -DEBUG: join prunable for intervals [11554,13920] and [4965,5986] -DEBUG: join prunable for intervals [11554,13920] and [2497,4964] -DEBUG: join prunable for intervals [11554,13920] and [1,2496] -DEBUG: join prunable for intervals [8997,11554] and [13921,14947] -DEBUG: join prunable for intervals [8997,11554] and [4965,5986] -DEBUG: join prunable for intervals [8997,11554] and [2497,4964] -DEBUG: join prunable for intervals [8997,11554] and [1,2496] -DEBUG: join prunable for intervals [4965,5986] and [13921,14947] -DEBUG: join prunable for intervals [4965,5986] and [11554,13920] -DEBUG: join prunable for intervals [4965,5986] and [8997,11554] -DEBUG: join prunable for intervals [4965,5986] and [2497,4964] -DEBUG: join prunable for intervals [4965,5986] and [1,2496] -DEBUG: join prunable for intervals [2497,4964] and [13921,14947] -DEBUG: join prunable for intervals [2497,4964] and [11554,13920] -DEBUG: join prunable for intervals [2497,4964] and [8997,11554] -DEBUG: join prunable for intervals [2497,4964] and [4965,5986] -DEBUG: join prunable for intervals [2497,4964] and [1,2496] -DEBUG: join prunable for intervals [1,2496] and [13921,14947] -DEBUG: join prunable for intervals [1,2496] and [11554,13920] -DEBUG: join prunable for intervals [1,2496] and [8997,11554] -DEBUG: join prunable for intervals [1,2496] and [4965,5986] DEBUG: join prunable for intervals [1,2496] and [2497,4964] +DEBUG: join prunable for intervals [1,2496] and [4965,5986] +DEBUG: join prunable for intervals [1,2496] and [8997,11554] +DEBUG: join prunable for intervals [1,2496] and [11554,13920] +DEBUG: join prunable for intervals [1,2496] and [13921,14947] +DEBUG: join prunable for intervals [2497,4964] and [1,2496] +DEBUG: join prunable for intervals [2497,4964] and [4965,5986] +DEBUG: join prunable for intervals [2497,4964] and [8997,11554] +DEBUG: join prunable for intervals [2497,4964] and [11554,13920] +DEBUG: join prunable for intervals [2497,4964] and [13921,14947] +DEBUG: join prunable for intervals [4965,5986] and [1,2496] +DEBUG: join prunable for intervals [4965,5986] and [2497,4964] +DEBUG: join prunable for intervals [4965,5986] and [8997,11554] +DEBUG: join prunable for intervals [4965,5986] and [11554,13920] +DEBUG: join prunable for intervals [4965,5986] and [13921,14947] +DEBUG: join prunable for intervals [8997,11554] and [1,2496] +DEBUG: join prunable for intervals [8997,11554] and [2497,4964] +DEBUG: join prunable for intervals [8997,11554] and [4965,5986] +DEBUG: join prunable for intervals [8997,11554] and [13921,14947] +DEBUG: join prunable for intervals [11554,13920] and [1,2496] +DEBUG: join prunable for intervals [11554,13920] and [2497,4964] +DEBUG: join prunable for intervals [11554,13920] and [4965,5986] +DEBUG: join prunable for intervals [11554,13920] and [13921,14947] +DEBUG: join prunable for intervals [13921,14947] and [1,2496] +DEBUG: join prunable for intervals [13921,14947] and [2497,4964] +DEBUG: join prunable for intervals [13921,14947] and [4965,5986] +DEBUG: join prunable for intervals [13921,14947] and [8997,11554] +DEBUG: join prunable for intervals [13921,14947] and [11554,13920] QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported diff --git a/src/test/regress/expected/multi_join_pruning.out b/src/test/regress/expected/multi_join_pruning.out index 9627df180..6299768f3 100644 --- a/src/test/regress/expected/multi_join_pruning.out +++ b/src/test/regress/expected/multi_join_pruning.out @@ -9,12 +9,12 @@ SET client_min_messages TO DEBUG2; SET citus.large_table_shard_count TO 2; SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey; -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] -DEBUG: join prunable for intervals [8997,11554] and [1,5986] -DEBUG: join prunable for intervals [4965,5986] and [8997,14946] -DEBUG: join prunable for intervals [2497,4964] and [8997,14946] DEBUG: join prunable for intervals [1,2496] and [8997,14946] +DEBUG: join prunable for intervals [2497,4964] and [8997,14946] +DEBUG: join prunable for intervals [4965,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] sum | avg -------+-------------------- 36086 | 3.0076679446574429 @@ -22,12 +22,12 @@ DEBUG: join prunable for intervals [1,2496] and [8997,14946] SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey AND l_orderkey > 9030; -DEBUG: predicate pruning for shardId 102011 -DEBUG: predicate pruning for shardId 102010 DEBUG: predicate pruning for shardId 102009 -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] sum | avg -------+-------------------- 17996 | 3.0194630872483221 @@ -37,12 +37,12 @@ DEBUG: join prunable for intervals [8997,11554] and [1,5986] -- works as expected in this case. SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey AND l_orderkey > 20000; -DEBUG: predicate pruning for shardId 102014 -DEBUG: predicate pruning for shardId 102013 -DEBUG: predicate pruning for shardId 102012 -DEBUG: predicate pruning for shardId 102011 -DEBUG: predicate pruning for shardId 102010 DEBUG: predicate pruning for shardId 102009 +DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 +DEBUG: predicate pruning for shardId 102012 +DEBUG: predicate pruning for shardId 102013 +DEBUG: predicate pruning for shardId 102014 sum | avg -----+----- | @@ -53,13 +53,13 @@ DEBUG: predicate pruning for shardId 102009 -- out all the shards, and leave us with an empty task list. SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey AND l_orderkey > 6000 AND o_orderkey < 6000; -DEBUG: predicate pruning for shardId 102011 -DEBUG: predicate pruning for shardId 102010 DEBUG: predicate pruning for shardId 102009 +DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 DEBUG: predicate pruning for shardId 102016 -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] sum | avg -----+----- | @@ -72,8 +72,8 @@ DEBUG: join prunable for intervals [8997,11554] and [1,5986] EXPLAIN SELECT count(*) FROM array_partitioned_table table1, array_partitioned_table table2 WHERE table1.array_column = table2.array_column; -DEBUG: join prunable for intervals [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] and [{},{AZZXSP27F21T6,AZZXSP27F21T6}] DEBUG: join prunable for intervals [{},{AZZXSP27F21T6,AZZXSP27F21T6}] and [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] +DEBUG: join prunable for intervals [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] and [{},{AZZXSP27F21T6,AZZXSP27F21T6}] QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported @@ -82,8 +82,8 @@ DEBUG: join prunable for intervals [{},{AZZXSP27F21T6,AZZXSP27F21T6}] and [{BA1 EXPLAIN SELECT count(*) FROM composite_partitioned_table table1, composite_partitioned_table table2 WHERE table1.composite_column = table2.composite_column; -DEBUG: join prunable for intervals [(c,5,d),(d,6,e)] and [(a,3,b),(b,4,c)] DEBUG: join prunable for intervals [(a,3,b),(b,4,c)] and [(c,5,d),(d,6,e)] +DEBUG: join prunable for intervals [(c,5,d),(d,6,e)] and [(a,3,b),(b,4,c)] QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported @@ -93,8 +93,8 @@ DEBUG: join prunable for intervals [(a,3,b),(b,4,c)] and [(c,5,d),(d,6,e)] EXPLAIN SELECT count(*) FROM varchar_partitioned_table table1, varchar_partitioned_table table2 WHERE table1.varchar_column = table2.varchar_column; -DEBUG: join prunable for intervals [BA1000U2AMO4ZGX,BZZXSP27F21T6] and [AA1000U2AMO4ZGX,AZZXSP27F21T6] DEBUG: join prunable for intervals [AA1000U2AMO4ZGX,AZZXSP27F21T6] and [BA1000U2AMO4ZGX,BZZXSP27F21T6] +DEBUG: join prunable for intervals [BA1000U2AMO4ZGX,BZZXSP27F21T6] and [AA1000U2AMO4ZGX,AZZXSP27F21T6] QUERY PLAN ---------------------------------------------------------------------- explain statements for distributed queries are currently unsupported diff --git a/src/test/regress/expected/multi_large_table_join_planning.out b/src/test/regress/expected/multi_large_table_join_planning.out index 842f164cc..4573c457f 100644 --- a/src/test/regress/expected/multi_large_table_join_planning.out +++ b/src/test/regress/expected/multi_large_table_join_planning.out @@ -45,30 +45,30 @@ ORDER BY LIMIT 30; DEBUG: StartTransactionCommand DEBUG: push down of limit count: 30 -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] -DEBUG: join prunable for intervals [8997,11554] and [1,5986] -DEBUG: join prunable for intervals [4965,5986] and [8997,14946] -DEBUG: join prunable for intervals [2497,4964] and [8997,14946] DEBUG: join prunable for intervals [1,2496] and [8997,14946] +DEBUG: join prunable for intervals [2497,4964] and [8997,14946] +DEBUG: join prunable for intervals [4965,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] DEBUG: generated sql query for job 1250 and task 3 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102014 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" -DEBUG: generated sql query for job 1250 and task 6 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102013 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" -DEBUG: generated sql query for job 1250 and task 9 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102012 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" -DEBUG: generated sql query for job 1250 and task 12 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102011 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" -DEBUG: generated sql query for job 1250 and task 15 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102010 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" -DEBUG: generated sql query for job 1250 and task 18 DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102009 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" -DEBUG: assigned task 15 to node localhost:57637 -DEBUG: assigned task 18 to node localhost:57638 -DEBUG: assigned task 9 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: assigned task 3 to node localhost:57637 -DEBUG: assigned task 6 to node localhost:57638 +DEBUG: generated sql query for job 1250 and task 6 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102010 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" +DEBUG: generated sql query for job 1250 and task 9 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102011 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" +DEBUG: generated sql query for job 1250 and task 12 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102012 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" +DEBUG: generated sql query for job 1250 and task 15 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102013 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" +DEBUG: generated sql query for job 1250 and task 18 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102014 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > '10'::numeric)" +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 18 to node localhost:57637 +DEBUG: assigned task 15 to node localhost:57638 DEBUG: join prunable for intervals [1,1000] and [6001,7000] DEBUG: join prunable for intervals [6001,7000] and [1,1000] DEBUG: generated sql query for job 1251 and task 3 @@ -83,10 +83,10 @@ DEBUG: assigned task 3 to node localhost:57637 DEBUG: assigned task 6 to node localhost:57638 DEBUG: join prunable for intervals [1,1000] and [1001,2000] DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for intervals [1001,2000] and [6001,7000] DEBUG: join prunable for intervals [6001,7000] and [1,1000] +DEBUG: join prunable for intervals [6001,7000] and [1001,2000] DEBUG: generated sql query for job 1252 and task 3 DETAIL: query string: "SELECT "pg_merge_job_1251.task_000007".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000007 "pg_merge_job_1251.task_000007" JOIN customer_102017 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000007".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000007".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000007".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1, "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 LIMIT '30'::bigint" DEBUG: generated sql query for job 1252 and task 6 @@ -156,29 +156,29 @@ ORDER BY l_partkey, o_orderkey; DEBUG: StartTransactionCommand DEBUG: generated sql query for job 1253 and task 2 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102014 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 4 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102013 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 6 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102012 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 8 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102011 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 10 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102010 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 12 DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102009 lineitem WHERE (l_quantity < 5.0)" -DEBUG: assigned task 10 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: assigned task 6 to node localhost:57637 -DEBUG: assigned task 8 to node localhost:57638 -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: generated sql query for job 1253 and task 4 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102010 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for job 1253 and task 6 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102011 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for job 1253 and task 8 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102012 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for job 1253 and task 10 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102013 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for job 1253 and task 12 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102014 lineitem WHERE (l_quantity < 5.0)" +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 8 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 10 to node localhost:57638 DEBUG: generated sql query for job 1254 and task 2 -DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_102016 orders WHERE (o_totalprice <> 4.0)" -DEBUG: generated sql query for job 1254 and task 4 DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_102015 orders WHERE (o_totalprice <> 4.0)" -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: generated sql query for job 1254 and task 4 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_102016 orders WHERE (o_totalprice <> 4.0)" +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 3 diff --git a/src/test/regress/expected/multi_large_table_join_planning_0.out b/src/test/regress/expected/multi_large_table_join_planning_0.out index 33700d111..9296d52e1 100644 --- a/src/test/regress/expected/multi_large_table_join_planning_0.out +++ b/src/test/regress/expected/multi_large_table_join_planning_0.out @@ -45,30 +45,30 @@ ORDER BY LIMIT 30; DEBUG: StartTransactionCommand DEBUG: push down of limit count: 30 -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] -DEBUG: join prunable for intervals [8997,11554] and [1,5986] -DEBUG: join prunable for intervals [4965,5986] and [8997,14946] -DEBUG: join prunable for intervals [2497,4964] and [8997,14946] DEBUG: join prunable for intervals [1,2496] and [8997,14946] +DEBUG: join prunable for intervals [2497,4964] and [8997,14946] +DEBUG: join prunable for intervals [4965,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] DEBUG: generated sql query for job 1250 and task 3 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102014 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" -DEBUG: generated sql query for job 1250 and task 6 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102013 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" -DEBUG: generated sql query for job 1250 and task 9 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102012 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" -DEBUG: generated sql query for job 1250 and task 12 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102011 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" -DEBUG: generated sql query for job 1250 and task 15 -DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102010 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" -DEBUG: generated sql query for job 1250 and task 18 DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102009 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" -DEBUG: assigned task 15 to node localhost:57637 -DEBUG: assigned task 18 to node localhost:57638 -DEBUG: assigned task 9 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: assigned task 3 to node localhost:57637 -DEBUG: assigned task 6 to node localhost:57638 +DEBUG: generated sql query for job 1250 and task 6 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102010 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" +DEBUG: generated sql query for job 1250 and task 9 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102011 lineitem JOIN orders_102015 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" +DEBUG: generated sql query for job 1250 and task 12 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102012 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" +DEBUG: generated sql query for job 1250 and task 15 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102013 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" +DEBUG: generated sql query for job 1250 and task 18 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_102014 lineitem JOIN orders_102016 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE (orders.o_totalprice > 10::numeric)" +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 18 to node localhost:57637 +DEBUG: assigned task 15 to node localhost:57638 DEBUG: join prunable for intervals [1,1000] and [6001,7000] DEBUG: join prunable for intervals [6001,7000] and [1,1000] DEBUG: generated sql query for job 1251 and task 3 @@ -83,10 +83,10 @@ DEBUG: assigned task 3 to node localhost:57637 DEBUG: assigned task 6 to node localhost:57638 DEBUG: join prunable for intervals [1,1000] and [1001,2000] DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for intervals [1001,2000] and [6001,7000] DEBUG: join prunable for intervals [6001,7000] and [1,1000] +DEBUG: join prunable for intervals [6001,7000] and [1001,2000] DEBUG: generated sql query for job 1252 and task 3 DETAIL: query string: "SELECT "pg_merge_job_1251.task_000007".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000007 "pg_merge_job_1251.task_000007" JOIN customer_102017 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000007".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000007".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000007".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1, "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 LIMIT 30::bigint" DEBUG: generated sql query for job 1252 and task 6 @@ -156,29 +156,29 @@ ORDER BY l_partkey, o_orderkey; DEBUG: StartTransactionCommand DEBUG: generated sql query for job 1253 and task 2 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102014 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 4 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102013 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 6 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102012 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 8 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102011 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 10 -DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102010 lineitem WHERE (l_quantity < 5.0)" -DEBUG: generated sql query for job 1253 and task 12 DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102009 lineitem WHERE (l_quantity < 5.0)" -DEBUG: assigned task 10 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: assigned task 6 to node localhost:57637 -DEBUG: assigned task 8 to node localhost:57638 -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: generated sql query for job 1253 and task 4 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102010 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for job 1253 and task 6 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102011 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for job 1253 and task 8 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102012 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for job 1253 and task 10 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102013 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for job 1253 and task 12 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_102014 lineitem WHERE (l_quantity < 5.0)" +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 8 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 10 to node localhost:57638 DEBUG: generated sql query for job 1254 and task 2 -DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_102016 orders WHERE (o_totalprice <> 4.0)" -DEBUG: generated sql query for job 1254 and task 4 DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_102015 orders WHERE (o_totalprice <> 4.0)" -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: generated sql query for job 1254 and task 4 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_102016 orders WHERE (o_totalprice <> 4.0)" +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 3 diff --git a/src/test/regress/expected/multi_large_table_pruning.out b/src/test/regress/expected/multi_large_table_pruning.out index 20b62fa4d..27908a6ca 100644 --- a/src/test/regress/expected/multi_large_table_pruning.out +++ b/src/test/regress/expected/multi_large_table_pruning.out @@ -16,10 +16,10 @@ WHERE o_custkey = c_custkey; DEBUG: join prunable for intervals [1,1000] and [1001,2000] DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for intervals [1001,2000] and [6001,7000] DEBUG: join prunable for intervals [6001,7000] and [1,1000] +DEBUG: join prunable for intervals [6001,7000] and [1001,2000] DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 5 DEBUG: pruning merge fetch taskId 4 @@ -40,8 +40,8 @@ FROM WHERE o_custkey = c_custkey AND o_orderkey < 0; -DEBUG: predicate pruning for shardId 102016 DEBUG: predicate pruning for shardId 102015 +DEBUG: predicate pruning for shardId 102016 count ------- @@ -56,9 +56,9 @@ FROM WHERE o_custkey = c_custkey AND c_custkey < 0; +DEBUG: predicate pruning for shardId 102017 DEBUG: predicate pruning for shardId 102034 DEBUG: predicate pruning for shardId 102033 -DEBUG: predicate pruning for shardId 102017 count ------- @@ -115,12 +115,12 @@ FROM WHERE l_partkey = c_nationkey AND l_orderkey < 0; -DEBUG: predicate pruning for shardId 102014 -DEBUG: predicate pruning for shardId 102013 -DEBUG: predicate pruning for shardId 102012 -DEBUG: predicate pruning for shardId 102011 -DEBUG: predicate pruning for shardId 102010 DEBUG: predicate pruning for shardId 102009 +DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 +DEBUG: predicate pruning for shardId 102012 +DEBUG: predicate pruning for shardId 102013 +DEBUG: predicate pruning for shardId 102014 count ------- diff --git a/src/test/regress/expected/multi_large_table_task_assignment.out b/src/test/regress/expected/multi_large_table_task_assignment.out index 7ba87fa94..23b7bd2ac 100644 --- a/src/test/regress/expected/multi_large_table_task_assignment.out +++ b/src/test/regress/expected/multi_large_table_task_assignment.out @@ -25,14 +25,14 @@ FROM WHERE o_custkey = c_custkey; DEBUG: StartTransactionCommand -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 DEBUG: join prunable for intervals [1,1000] and [1001,2000] DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for intervals [1001,2000] and [6001,7000] DEBUG: join prunable for intervals [6001,7000] and [1,1000] +DEBUG: join prunable for intervals [6001,7000] and [1001,2000] DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 5 DEBUG: pruning merge fetch taskId 4 @@ -64,40 +64,40 @@ WHERE o_custkey = c_custkey AND o_orderkey = l_orderkey; DEBUG: StartTransactionCommand -DEBUG: assigned task 3 to node localhost:57637 -DEBUG: assigned task 15 to node localhost:57638 -DEBUG: assigned task 6 to node localhost:57637 -DEBUG: assigned task 18 to node localhost:57638 DEBUG: assigned task 9 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: join prunable for intervals [1,2496] and [13921,14947] -DEBUG: join prunable for intervals [1,2496] and [11554,13920] -DEBUG: join prunable for intervals [1,2496] and [8997,11554] -DEBUG: join prunable for intervals [1,2496] and [4965,5986] +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 15 to node localhost:57637 +DEBUG: assigned task 18 to node localhost:57638 DEBUG: join prunable for intervals [1,2496] and [2497,4964] -DEBUG: join prunable for intervals [2497,4964] and [13921,14947] -DEBUG: join prunable for intervals [2497,4964] and [11554,13920] -DEBUG: join prunable for intervals [2497,4964] and [8997,11554] -DEBUG: join prunable for intervals [2497,4964] and [4965,5986] +DEBUG: join prunable for intervals [1,2496] and [4965,5986] +DEBUG: join prunable for intervals [1,2496] and [8997,11554] +DEBUG: join prunable for intervals [1,2496] and [11554,13920] +DEBUG: join prunable for intervals [1,2496] and [13921,14947] DEBUG: join prunable for intervals [2497,4964] and [1,2496] -DEBUG: join prunable for intervals [4965,5986] and [13921,14947] -DEBUG: join prunable for intervals [4965,5986] and [11554,13920] -DEBUG: join prunable for intervals [4965,5986] and [8997,11554] -DEBUG: join prunable for intervals [4965,5986] and [2497,4964] +DEBUG: join prunable for intervals [2497,4964] and [4965,5986] +DEBUG: join prunable for intervals [2497,4964] and [8997,11554] +DEBUG: join prunable for intervals [2497,4964] and [11554,13920] +DEBUG: join prunable for intervals [2497,4964] and [13921,14947] DEBUG: join prunable for intervals [4965,5986] and [1,2496] -DEBUG: join prunable for intervals [8997,11554] and [13921,14947] -DEBUG: join prunable for intervals [8997,11554] and [4965,5986] -DEBUG: join prunable for intervals [8997,11554] and [2497,4964] +DEBUG: join prunable for intervals [4965,5986] and [2497,4964] +DEBUG: join prunable for intervals [4965,5986] and [8997,11554] +DEBUG: join prunable for intervals [4965,5986] and [11554,13920] +DEBUG: join prunable for intervals [4965,5986] and [13921,14947] DEBUG: join prunable for intervals [8997,11554] and [1,2496] -DEBUG: join prunable for intervals [11554,13920] and [13921,14947] -DEBUG: join prunable for intervals [11554,13920] and [4965,5986] -DEBUG: join prunable for intervals [11554,13920] and [2497,4964] +DEBUG: join prunable for intervals [8997,11554] and [2497,4964] +DEBUG: join prunable for intervals [8997,11554] and [4965,5986] +DEBUG: join prunable for intervals [8997,11554] and [13921,14947] DEBUG: join prunable for intervals [11554,13920] and [1,2496] -DEBUG: join prunable for intervals [13921,14947] and [11554,13920] -DEBUG: join prunable for intervals [13921,14947] and [8997,11554] -DEBUG: join prunable for intervals [13921,14947] and [4965,5986] -DEBUG: join prunable for intervals [13921,14947] and [2497,4964] +DEBUG: join prunable for intervals [11554,13920] and [2497,4964] +DEBUG: join prunable for intervals [11554,13920] and [4965,5986] +DEBUG: join prunable for intervals [11554,13920] and [13921,14947] DEBUG: join prunable for intervals [13921,14947] and [1,2496] +DEBUG: join prunable for intervals [13921,14947] and [2497,4964] +DEBUG: join prunable for intervals [13921,14947] and [4965,5986] +DEBUG: join prunable for intervals [13921,14947] and [8997,11554] +DEBUG: join prunable for intervals [13921,14947] and [11554,13920] DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 19 DEBUG: pruning merge fetch taskId 4 @@ -116,10 +116,10 @@ DEBUG: pruning merge fetch taskId 22 DETAIL: Creating dependency on merge taskId 54 DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 3 to node localhost:57638 -DEBUG: assigned task 24 to node localhost:57637 -DEBUG: assigned task 9 to node localhost:57638 DEBUG: assigned task 12 to node localhost:57637 -DEBUG: assigned task 18 to node localhost:57638 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 18 to node localhost:57637 +DEBUG: assigned task 24 to node localhost:57638 DEBUG: propagating assignment from merge task 40 to constrained sql task 15 DEBUG: propagating assignment from merge task 47 to constrained sql task 21 DEBUG: CommitTransactionCommand @@ -154,15 +154,15 @@ FROM WHERE l_partkey = c_nationkey; DEBUG: StartTransactionCommand -DEBUG: assigned task 10 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: assigned task 6 to node localhost:57637 -DEBUG: assigned task 8 to node localhost:57638 -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 6 to node localhost:57638 DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 8 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 10 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 6 to node localhost:57637 DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 3 diff --git a/src/test/regress/expected/multi_null_minmax_value_pruning.out b/src/test/regress/expected/multi_null_minmax_value_pruning.out index cb7ff7b06..80795483c 100644 --- a/src/test/regress/expected/multi_null_minmax_value_pruning.out +++ b/src/test/regress/expected/multi_null_minmax_value_pruning.out @@ -21,10 +21,10 @@ SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 102010; -- Check that partition and join pruning works when min/max values exist -- Adding l_orderkey = 1 to make the query not router executable SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1; -DEBUG: predicate pruning for shardId 102014 -DEBUG: predicate pruning for shardId 102013 -DEBUG: predicate pruning for shardId 102011 DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 +DEBUG: predicate pruning for shardId 102013 +DEBUG: predicate pruning for shardId 102014 l_orderkey | l_linenumber | l_shipdate ------------+--------------+------------ 9030 | 1 | 09-02-1998 @@ -43,12 +43,12 @@ DEBUG: predicate pruning for shardId 102010 SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey; -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] -DEBUG: join prunable for intervals [8997,11554] and [1,5986] -DEBUG: join prunable for intervals [4965,5986] and [8997,14946] -DEBUG: join prunable for intervals [2497,4964] and [8997,14946] DEBUG: join prunable for intervals [1,2496] and [8997,14946] +DEBUG: join prunable for intervals [2497,4964] and [8997,14946] +DEBUG: join prunable for intervals [4965,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] sum | avg -------+-------------------- 36086 | 3.0076679446574429 @@ -58,10 +58,10 @@ DEBUG: join prunable for intervals [1,2496] and [8997,14946] -- partition or join pruning for the shard with null min value. UPDATE pg_dist_shard SET shardminvalue = NULL WHERE shardid = 102009; SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; -DEBUG: predicate pruning for shardId 102014 -DEBUG: predicate pruning for shardId 102013 -DEBUG: predicate pruning for shardId 102011 DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 +DEBUG: predicate pruning for shardId 102013 +DEBUG: predicate pruning for shardId 102014 l_orderkey | l_linenumber | l_shipdate ------------+--------------+------------ 9030 | 1 | 09-02-1998 @@ -74,11 +74,11 @@ DEBUG: predicate pruning for shardId 102010 SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey; -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] -DEBUG: join prunable for intervals [8997,11554] and [1,5986] -DEBUG: join prunable for intervals [4965,5986] and [8997,14946] DEBUG: join prunable for intervals [2497,4964] and [8997,14946] +DEBUG: join prunable for intervals [4965,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] sum | avg -------+-------------------- 36086 | 3.0076679446574429 @@ -88,9 +88,9 @@ DEBUG: join prunable for intervals [2497,4964] and [8997,14946] -- don't apply partition or join pruning for this other shard either. UPDATE pg_dist_shard SET shardmaxvalue = NULL WHERE shardid = 102010; SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; -DEBUG: predicate pruning for shardId 102014 -DEBUG: predicate pruning for shardId 102013 DEBUG: predicate pruning for shardId 102011 +DEBUG: predicate pruning for shardId 102013 +DEBUG: predicate pruning for shardId 102014 l_orderkey | l_linenumber | l_shipdate ------------+--------------+------------ 9030 | 1 | 09-02-1998 @@ -103,10 +103,10 @@ DEBUG: predicate pruning for shardId 102011 SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey; -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] -DEBUG: join prunable for intervals [8997,11554] and [1,5986] DEBUG: join prunable for intervals [4965,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] sum | avg -------+-------------------- 36086 | 3.0076679446574429 @@ -116,10 +116,10 @@ DEBUG: join prunable for intervals [4965,5986] and [8997,14946] -- should apply partition and join pruning for this shard now. UPDATE pg_dist_shard SET shardminvalue = '0' WHERE shardid = 102009; SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; -DEBUG: predicate pruning for shardId 102014 -DEBUG: predicate pruning for shardId 102013 -DEBUG: predicate pruning for shardId 102011 DEBUG: predicate pruning for shardId 102009 +DEBUG: predicate pruning for shardId 102011 +DEBUG: predicate pruning for shardId 102013 +DEBUG: predicate pruning for shardId 102014 l_orderkey | l_linenumber | l_shipdate ------------+--------------+------------ 9030 | 1 | 09-02-1998 @@ -132,11 +132,11 @@ DEBUG: predicate pruning for shardId 102009 SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey; -DEBUG: join prunable for intervals [13921,14947] and [1,5986] -DEBUG: join prunable for intervals [11554,13920] and [1,5986] -DEBUG: join prunable for intervals [8997,11554] and [1,5986] -DEBUG: join prunable for intervals [4965,5986] and [8997,14946] DEBUG: join prunable for intervals [0,2496] and [8997,14946] +DEBUG: join prunable for intervals [4965,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,11554] and [1,5986] +DEBUG: join prunable for intervals [11554,13920] and [1,5986] +DEBUG: join prunable for intervals [13921,14947] and [1,5986] sum | avg -------+-------------------- 36086 | 3.0076679446574429 diff --git a/src/test/regress/expected/multi_partition_pruning.out b/src/test/regress/expected/multi_partition_pruning.out index ea969bddd..189b10594 100644 --- a/src/test/regress/expected/multi_partition_pruning.out +++ b/src/test/regress/expected/multi_partition_pruning.out @@ -6,10 +6,10 @@ SET client_min_messages TO DEBUG2; -- Adding additional l_orderkey = 1 to make this query not router executable SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1; -DEBUG: predicate pruning for shardId 102014 -DEBUG: predicate pruning for shardId 102013 -DEBUG: predicate pruning for shardId 102011 DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 +DEBUG: predicate pruning for shardId 102013 +DEBUG: predicate pruning for shardId 102014 l_orderkey | l_linenumber | l_shipdate ------------+--------------+------------ 9030 | 1 | 09-02-1998 @@ -32,9 +32,9 @@ DEBUG: predicate pruning for shardId 102010 -- trigger the the creation of toasted tables and indexes. This in turn prints -- non-deterministic debug messages. To avoid this chain, we use l_linenumber. SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; -DEBUG: predicate pruning for shardId 102011 -DEBUG: predicate pruning for shardId 102010 DEBUG: predicate pruning for shardId 102009 +DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 sum | avg -------+-------------------- 17999 | 3.0189533713518953 @@ -50,12 +50,12 @@ DEBUG: predicate pruning for shardId 102011 -- The following query should prune out all shards and return empty results SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem WHERE l_orderkey > 20000; -DEBUG: predicate pruning for shardId 102014 -DEBUG: predicate pruning for shardId 102013 -DEBUG: predicate pruning for shardId 102012 -DEBUG: predicate pruning for shardId 102011 -DEBUG: predicate pruning for shardId 102010 DEBUG: predicate pruning for shardId 102009 +DEBUG: predicate pruning for shardId 102010 +DEBUG: predicate pruning for shardId 102011 +DEBUG: predicate pruning for shardId 102012 +DEBUG: predicate pruning for shardId 102013 +DEBUG: predicate pruning for shardId 102014 sum | avg -----+----- | diff --git a/src/test/regress/expected/multi_prune_shard_list.out b/src/test/regress/expected/multi_prune_shard_list.out index 2f74b1991..44ae8203a 100644 --- a/src/test/regress/expected/multi_prune_shard_list.out +++ b/src/test/regress/expected/multi_prune_shard_list.out @@ -21,6 +21,10 @@ CREATE FUNCTION debug_equality_expression(regclass) RETURNS cstring AS 'citus' LANGUAGE C STRICT; +CREATE FUNCTION print_sorted_shard_intervals(regclass) + RETURNS text[] + AS 'citus' + LANGUAGE C STRICT; -- =================================================================== -- test shard pruning functionality -- =================================================================== @@ -40,7 +44,7 @@ VALUES SELECT prune_using_no_values('pruning'); prune_using_no_values ----------------------- - {13,12,11,10} + {10,11,12,13} (1 row) -- with a single value, expect a single shard @@ -61,7 +65,7 @@ SELECT prune_using_single_value('pruning', NULL); SELECT prune_using_either_value('pruning', 'tomato', 'petunia'); prune_using_either_value -------------------------- - {12,11} + {11,12} (1 row) -- an AND clause with incompatible values returns no shards @@ -85,3 +89,88 @@ SELECT debug_equality_expression('pruning'); {OPEXPR :opno 98 :opfuncid 67 :opresulttype 16 :opretset false :opcollid 0 :inputcollid 100 :args ({VAR :varno 1 :varattno 1 :vartype 25 :vartypmod -1 :varcollid 100 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} {CONST :consttype 25 :consttypmod -1 :constcollid 100 :constlen -1 :constbyval false :constisnull true :location -1 :constvalue <>}) :location -1} (1 row) +-- print the initial ordering of shard intervals +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {10,11,12,13} +(1 row) + +-- update only min value for one shard +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 11; +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {10,12,13,11} +(1 row) + +-- now lets have one more shard without min/max values +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 12; +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {10,13,11,12} +(1 row) + +-- now lets have one more shard without min/max values +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 10; +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {13,10,11,12} +(1 row) + +-- all shard placements are uninitialized +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 13; +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {10,11,12,13} +(1 row) + +-- now update the metadata so that the table is a range distributed table +UPDATE pg_dist_partition SET partmethod = 'r' WHERE logicalrelid = 'pruning'::regclass; +-- now the comparison is done via the partition column type, which is text +UPDATE pg_dist_shard SET shardminvalue = 'a', shardmaxvalue = 'b' WHERE shardid = 10; +UPDATE pg_dist_shard SET shardminvalue = 'c', shardmaxvalue = 'd' WHERE shardid = 11; +UPDATE pg_dist_shard SET shardminvalue = 'e', shardmaxvalue = 'f' WHERE shardid = 12; +UPDATE pg_dist_shard SET shardminvalue = 'g', shardmaxvalue = 'h' WHERE shardid = 13; +-- print the ordering of shard intervals with range partitioning as well +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {10,11,12,13} +(1 row) + +-- update only min value for one shard +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 11; +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {10,12,13,11} +(1 row) + +-- now lets have one more shard without min/max values +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 12; +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {10,13,11,12} +(1 row) + +-- now lets have one more shard without min/max values +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 10; +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {13,10,11,12} +(1 row) + +-- all shard placements are uninitialized +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 13; +SELECT print_sorted_shard_intervals('pruning'); + print_sorted_shard_intervals +------------------------------ + {10,11,12,13} +(1 row) + diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index 4d03e19f6..b794a172c 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -105,8 +105,8 @@ DEBUG: Creating router plan DEBUG: Plan is router executable -- first, test zero-shard SELECT, which should return an empty row SELECT COUNT(*) FROM articles_hash WHERE author_id = 1 AND author_id = 2; -DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103300 +DEBUG: predicate pruning for shardId 103301 count ------- @@ -1120,7 +1120,6 @@ DEBUG: predicate pruning for shardId 103301 -- insert query is router plannable even under task-tracker INSERT INTO articles_hash VALUES (51, 1, 'amateus', 1814); DEBUG: Creating router plan -DEBUG: predicate pruning for shardId 103301 DEBUG: Plan is router executable -- verify insert is successfull (not router plannable and executable) SELECT id diff --git a/src/test/regress/output/multi_alter_table_statements.source b/src/test/regress/output/multi_alter_table_statements.source index 9466c3170..7bab12bf2 100644 --- a/src/test/regress/output/multi_alter_table_statements.source +++ b/src/test/regress/output/multi_alter_table_statements.source @@ -260,8 +260,8 @@ ALTER TABLE IF EXISTS non_existent_table ADD COLUMN new_column INTEGER; NOTICE: relation "non_existent_table" does not exist, skipping ALTER TABLE IF EXISTS lineitem_alter ALTER COLUMN int_column2 SET DATA TYPE INTEGER; ALTER TABLE lineitem_alter DROP COLUMN non_existent_column; -WARNING: could not receive query results from localhost:57637 -DETAIL: Client error: column "non_existent_column" of relation "lineitem_alter_103009" does not exist +WARNING: could not receive query results from localhost:57638 +DETAIL: Client error: column "non_existent_column" of relation "lineitem_alter_103000" does not exist ERROR: could not execute DDL command on worker node shards ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS non_existent_column; NOTICE: column "non_existent_column" of relation "lineitem_alter" does not exist, skipping @@ -360,15 +360,15 @@ DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT and TYPE subc -- Verify that we error out in case of postgres errors on supported statement -- types ALTER TABLE lineitem_alter ADD COLUMN new_column non_existent_type; -WARNING: could not receive query results from localhost:57637 +WARNING: could not receive query results from localhost:57638 DETAIL: Client error: type "non_existent_type" does not exist ERROR: could not execute DDL command on worker node shards ALTER TABLE lineitem_alter ALTER COLUMN null_column SET NOT NULL; -WARNING: could not receive query results from localhost:57637 +WARNING: could not receive query results from localhost:57638 DETAIL: Client error: column "null_column" contains null values ERROR: could not execute DDL command on worker node shards ALTER TABLE lineitem_alter ALTER COLUMN l_partkey SET DEFAULT 'a'; -WARNING: could not receive query results from localhost:57637 +WARNING: could not receive query results from localhost:57638 DETAIL: Client error: invalid input syntax for integer: "a" ERROR: could not execute DDL command on worker node shards -- Verify that we error out on statements involving RENAME diff --git a/src/test/regress/output/multi_subquery.source b/src/test/regress/output/multi_subquery.source index 0d8492e9b..e6724b673 100644 --- a/src/test/regress/output/multi_subquery.source +++ b/src/test/regress/output/multi_subquery.source @@ -349,9 +349,9 @@ SET client_min_messages TO DEBUG2; SELECT * FROM (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a) AS foo; -DEBUG: predicate pruning for shardId 102029 -DEBUG: predicate pruning for shardId 102027 DEBUG: predicate pruning for shardId 102026 +DEBUG: predicate pruning for shardId 102027 +DEBUG: predicate pruning for shardId 102029 count ------- (0 rows) @@ -359,9 +359,9 @@ DEBUG: predicate pruning for shardId 102026 SELECT * FROM (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE 'eren' = a GROUP BY a) AS foo; -DEBUG: predicate pruning for shardId 102029 -DEBUG: predicate pruning for shardId 102028 DEBUG: predicate pruning for shardId 102026 +DEBUG: predicate pruning for shardId 102028 +DEBUG: predicate pruning for shardId 102029 count ------- (0 rows) diff --git a/src/test/regress/sql/multi_prune_shard_list.sql b/src/test/regress/sql/multi_prune_shard_list.sql index 2a215c5a5..6db632e1e 100644 --- a/src/test/regress/sql/multi_prune_shard_list.sql +++ b/src/test/regress/sql/multi_prune_shard_list.sql @@ -27,6 +27,11 @@ CREATE FUNCTION debug_equality_expression(regclass) AS 'citus' LANGUAGE C STRICT; +CREATE FUNCTION print_sorted_shard_intervals(regclass) + RETURNS text[] + AS 'citus' + LANGUAGE C STRICT; + -- =================================================================== -- test shard pruning functionality -- =================================================================== @@ -66,3 +71,50 @@ SELECT prune_using_both_values('pruning', 'tomato', 'rose'); -- unit test of the equality expression generation code SELECT debug_equality_expression('pruning'); + +-- print the initial ordering of shard intervals +SELECT print_sorted_shard_intervals('pruning'); + +-- update only min value for one shard +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 11; +SELECT print_sorted_shard_intervals('pruning'); + +-- now lets have one more shard without min/max values +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 12; +SELECT print_sorted_shard_intervals('pruning'); + +-- now lets have one more shard without min/max values +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 10; +SELECT print_sorted_shard_intervals('pruning'); + +-- all shard placements are uninitialized +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 13; +SELECT print_sorted_shard_intervals('pruning'); + +-- now update the metadata so that the table is a range distributed table +UPDATE pg_dist_partition SET partmethod = 'r' WHERE logicalrelid = 'pruning'::regclass; + +-- now the comparison is done via the partition column type, which is text +UPDATE pg_dist_shard SET shardminvalue = 'a', shardmaxvalue = 'b' WHERE shardid = 10; +UPDATE pg_dist_shard SET shardminvalue = 'c', shardmaxvalue = 'd' WHERE shardid = 11; +UPDATE pg_dist_shard SET shardminvalue = 'e', shardmaxvalue = 'f' WHERE shardid = 12; +UPDATE pg_dist_shard SET shardminvalue = 'g', shardmaxvalue = 'h' WHERE shardid = 13; + +-- print the ordering of shard intervals with range partitioning as well +SELECT print_sorted_shard_intervals('pruning'); + +-- update only min value for one shard +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 11; +SELECT print_sorted_shard_intervals('pruning'); + +-- now lets have one more shard without min/max values +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 12; +SELECT print_sorted_shard_intervals('pruning'); + +-- now lets have one more shard without min/max values +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 10; +SELECT print_sorted_shard_intervals('pruning'); + +-- all shard placements are uninitialized +UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 13; +SELECT print_sorted_shard_intervals('pruning');