diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index a2b59e1a9..cdfd94cc5 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -1020,7 +1020,7 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, currentPartitionMethod, currentAnchorTable); } - else + else if (candidatePartitionMethod == DISTRIBUTE_BY_RANGE) { return MakeJoinOrderNode(candidateTable, SINGLE_RANGE_PARTITION_JOIN, currentPartitionColumnList, @@ -1059,7 +1059,7 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, candidatePartitionMethod, candidateTable); } - else + else if (currentPartitionMethod == DISTRIBUTE_BY_RANGE) { return MakeJoinOrderNode(candidateTable, SINGLE_RANGE_PARTITION_JOIN, diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 0691df7cb..3bab7d1bc 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -234,6 +234,12 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList) continue; } + /* append-distributed tables do not have a strict partition column */ + if (IsCitusTableType(relationId, APPEND_DISTRIBUTED)) + { + continue; + } + if (isPartitionColumn) { FieldSelect *compositeField = CompositeFieldRecursive(targetExpression, diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index b1a16e699..a8f4f50c9 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -171,10 +171,6 @@ static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, TaskType taskType, bool modifyRequiresCoordinatorEvaluation, DeferredErrorMessage **planningError); -static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction, - Oid collation, - ShardInterval *firstInterval, - ShardInterval *secondInterval); static List * SqlTaskList(Job *job); static bool DependsOnHashPartitionJob(Job *job); static uint32 AnchorRangeTableId(List *rangeTableList); @@ -228,8 +224,6 @@ static List * MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex); static StringInfo ColumnNameArrayString(uint32 columnCount, uint64 generatingJobId); static StringInfo ColumnTypeArrayString(List *targetEntryList); -static bool CoPlacedShardIntervals(ShardInterval *firstInterval, - ShardInterval *secondInterval); static List * FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr); static List * FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr); @@ -2451,16 +2445,6 @@ ErrorIfUnsupportedShardDistribution(Query *query) } else { - CitusTableCacheEntry *distTableEntry = GetCitusTableCacheEntry(relationId); - if (distTableEntry->hasOverlappingShardInterval) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("Currently append partitioned relations " - "with overlapping shard intervals are " - "not supported"))); - } - appendDistributedRelationCount++; } } @@ -2648,56 +2632,24 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, /* - * CoPartitionedTables checks if given two distributed tables have 1-to-1 shard - * placement matching. It first checks for the shard count, if tables don't have - * same amount shard then it returns false. Note that, if any table does not - * have any shard, it returns true. If two tables have same amount of shards, - * we check colocationIds for hash distributed tables and shardInterval's min - * max values for append and range distributed tables. + * CoPartitionedTables checks if given two distributed tables are co-located. */ bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) { - if (firstRelationId == secondRelationId) - { - return true; - } - CitusTableCacheEntry *firstTableCache = GetCitusTableCacheEntry(firstRelationId); CitusTableCacheEntry *secondTableCache = GetCitusTableCacheEntry(secondRelationId); - ShardInterval **sortedFirstIntervalArray = firstTableCache->sortedShardIntervalArray; - ShardInterval **sortedSecondIntervalArray = - secondTableCache->sortedShardIntervalArray; - uint32 firstListShardCount = firstTableCache->shardIntervalArrayLength; - uint32 secondListShardCount = secondTableCache->shardIntervalArrayLength; - FmgrInfo *comparisonFunction = firstTableCache->shardIntervalCompareFunction; - - /* reference tables are always & only copartitioned with reference tables */ - if (IsCitusTableTypeCacheEntry(firstTableCache, CITUS_TABLE_WITH_NO_DIST_KEY) && - IsCitusTableTypeCacheEntry(secondTableCache, CITUS_TABLE_WITH_NO_DIST_KEY)) - { - return true; - } - else if (IsCitusTableTypeCacheEntry(firstTableCache, CITUS_TABLE_WITH_NO_DIST_KEY) || - IsCitusTableTypeCacheEntry(secondTableCache, CITUS_TABLE_WITH_NO_DIST_KEY)) + if (firstTableCache->partitionMethod == DISTRIBUTE_BY_APPEND || + secondTableCache->partitionMethod == DISTRIBUTE_BY_APPEND) { + /* + * Append-distributed tables can have overlapping shards. Therefore they are + * never co-partitioned, not even with themselves. + */ return false; } - if (firstListShardCount != secondListShardCount) - { - return false; - } - - /* if there are not any shards just return true */ - if (firstListShardCount == 0) - { - return true; - } - - Assert(comparisonFunction != NULL); - /* * Check if the tables have the same colocation ID - if so, we know * they're colocated. @@ -2708,130 +2660,16 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) return true; } - /* - * For hash distributed tables two tables are accepted as colocated only if - * they have the same colocationId. Otherwise they may have same minimum and - * maximum values for each shard interval, yet hash function may result with - * different values for the same value. int vs bigint can be given as an - * example. - */ - if (IsCitusTableTypeCacheEntry(firstTableCache, HASH_DISTRIBUTED) || - IsCitusTableTypeCacheEntry(secondTableCache, HASH_DISTRIBUTED)) + if (firstRelationId == secondRelationId) { - return false; + /* + * Even without an explicit co-location ID, non-append tables can be considered + * co-located with themselves. + */ + return true; } - - /* - * Don't compare unequal types - */ - Oid collation = firstTableCache->partitionColumn->varcollid; - if (firstTableCache->partitionColumn->vartype != - secondTableCache->partitionColumn->vartype || - collation != secondTableCache->partitionColumn->varcollid) - { - return false; - } - - - /* - * If not known to be colocated check if the remaining shards are - * anyway. Do so by comparing the shard interval arrays that are sorted on - * interval minimum values. Then it compares every shard interval in order - * and if any pair of shard intervals are not equal or they are not located - * in the same node it returns false. - */ - for (uint32 intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++) - { - ShardInterval *firstInterval = sortedFirstIntervalArray[intervalIndex]; - ShardInterval *secondInterval = sortedSecondIntervalArray[intervalIndex]; - - bool shardIntervalsEqual = ShardIntervalsEqual(comparisonFunction, - collation, - firstInterval, - secondInterval); - if (!shardIntervalsEqual || !CoPlacedShardIntervals(firstInterval, - secondInterval)) - { - return false; - } - } - - return true; -} - - -/* - * CoPlacedShardIntervals checks whether the given intervals located in the same nodes. - */ -static bool -CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval) -{ - List *firstShardPlacementList = ShardPlacementListWithoutOrphanedPlacements( - firstInterval->shardId); - List *secondShardPlacementList = ShardPlacementListWithoutOrphanedPlacements( - secondInterval->shardId); - ListCell *firstShardPlacementCell = NULL; - ListCell *secondShardPlacementCell = NULL; - - /* Shards must have same number of placements */ - if (list_length(firstShardPlacementList) != list_length(secondShardPlacementList)) - { - return false; - } - - firstShardPlacementList = SortList(firstShardPlacementList, CompareShardPlacements); - secondShardPlacementList = SortList(secondShardPlacementList, CompareShardPlacements); - - forboth(firstShardPlacementCell, firstShardPlacementList, secondShardPlacementCell, - secondShardPlacementList) - { - ShardPlacement *firstShardPlacement = (ShardPlacement *) lfirst( - firstShardPlacementCell); - ShardPlacement *secondShardPlacement = (ShardPlacement *) lfirst( - secondShardPlacementCell); - - if (firstShardPlacement->nodeId != secondShardPlacement->nodeId) - { - return false; - } - } - - return true; -} - - -/* - * ShardIntervalsEqual checks if given shard intervals have equal min/max values. - */ -static bool -ShardIntervalsEqual(FmgrInfo *comparisonFunction, Oid collation, - ShardInterval *firstInterval, ShardInterval *secondInterval) -{ - bool shardIntervalsEqual = false; - - Datum firstMin = firstInterval->minValue; - Datum firstMax = firstInterval->maxValue; - Datum secondMin = secondInterval->minValue; - Datum secondMax = secondInterval->maxValue; - - if (firstInterval->minValueExists && firstInterval->maxValueExists && - secondInterval->minValueExists && secondInterval->maxValueExists) - { - Datum minDatum = FunctionCall2Coll(comparisonFunction, collation, firstMin, - secondMin); - Datum maxDatum = FunctionCall2Coll(comparisonFunction, collation, firstMax, - secondMax); - int firstComparison = DatumGetInt32(minDatum); - int secondComparison = DatumGetInt32(maxDatum); - - if (firstComparison == 0 && secondComparison == 0) - { - shardIntervalsEqual = true; - } - } - - return shardIntervalsEqual; + return false; } diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index eb4394256..437bef6b4 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -85,6 +85,7 @@ typedef struct AttributeEquivalenceClassMember static bool ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext); +static bool ContextContainsAppendRelation(RelationRestrictionContext *restrictionContext); static int RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo); static Var * FindUnionAllVar(PlannerInfo *root, List *translatedVars, Oid relationOid, Index relationRteIndex, Index *partitionKeyIndex); @@ -236,6 +237,29 @@ ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext) } +/* + * ContextContainsAppendRelation determines whether the given + * RelationRestrictionContext contains any append-distributed tables. + */ +static bool +ContextContainsAppendRelation(RelationRestrictionContext *restrictionContext) +{ + ListCell *relationRestrictionCell = NULL; + + foreach(relationRestrictionCell, restrictionContext->relationRestrictionList) + { + RelationRestriction *relationRestriction = lfirst(relationRestrictionCell); + + if (IsCitusTableType(relationRestriction->relationId, APPEND_DISTRIBUTED)) + { + return true; + } + } + + return false; +} + + /* * SafeToPushdownUnionSubquery returns true if all the relations are returns * partition keys in the same ordinal position and there is no reference table @@ -504,6 +528,12 @@ RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext *restrictionCon /* there is a single distributed relation, no need to continue */ return true; } + else if (ContextContainsAppendRelation( + restrictionContext->relationRestrictionContext)) + { + /* we never consider append-distributed tables co-located */ + return false; + } List *attributeEquivalenceList = GenerateAllAttributeEquivalences(restrictionContext); @@ -1906,6 +1936,17 @@ AllRelationsInRestrictionContextColocated(RelationRestrictionContext *restrictio continue; } + if (IsCitusTableType(relationId, APPEND_DISTRIBUTED)) + { + /* + * If we got to this point, it means there are multiple distributed + * relations and at least one of them is append-distributed. Since + * we do not consider append-distributed tables to be co-located, + * we can immediately return false. + */ + return false; + } + int colocationId = TableColocationId(relationId); if (initialColocationId == INVALID_COLOCATION_ID) diff --git a/src/test/regress/expected/ignoring_orphaned_shards.out b/src/test/regress/expected/ignoring_orphaned_shards.out index a73dc8906..d6e4916af 100644 --- a/src/test/regress/expected/ignoring_orphaned_shards.out +++ b/src/test/regress/expected/ignoring_orphaned_shards.out @@ -324,6 +324,9 @@ SELECT create_distributed_table('range2', 'id', 'range'); (1 row) CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}'); +-- Mark tables co-located +UPDATE pg_dist_partition SET colocationid = 30001 +WHERE logicalrelid = 'range1'::regclass OR logicalrelid = 'range2'::regclass; -- Move shard placement and DON'T clean it up, now range1 and range2 are -- colocated, but only range2 has an orphaned shard. SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); @@ -339,7 +342,7 @@ SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid 92448600 | 1 | 57637 (2 rows) --- Make sure that tables are detected as colocated +-- Make sure co-located join works SELECT * FROM range1 JOIN range2 ON range1.id = range2.id; id | id --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 832b64a31..e2fe379d2 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -1070,11 +1070,14 @@ Aggregate Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob - Map Task Count: 1 + Map Task Count: 4 Merge Task Count: 4 -> MapMergeJob Map Task Count: 2 - Merge Task Count: 1 + Merge Task Count: 4 + -> MapMergeJob + Map Task Count: 1 + Merge Task Count: 4 -> MapMergeJob Map Task Count: 1 Merge Task Count: 4 @@ -1102,12 +1105,16 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) "Tasks Shown": "None, not supported for re-partition queries", "Dependent Jobs": [ { - "Map Task Count": 1, + "Map Task Count": 4, "Merge Task Count": 4, "Dependent Jobs": [ { "Map Task Count": 2, - "Merge Task Count": 1 + "Merge Task Count": 4 + }, + { + "Map Task Count": 1, + "Merge Task Count": 4 } ] }, @@ -1154,12 +1161,16 @@ EXPLAIN (COSTS FALSE, FORMAT XML) None, not supported for re-partition queries - 1 + 4 4 2 - 1 + 4 + + + 1 + 4 diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index d0eaa1a4d..405962dbc 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -169,7 +169,7 @@ LOG: join order: [ "orders" ][ dual partition join "customer_hash" ] EXPLAIN (COSTS OFF) SELECT count(*) FROM orders_hash, customer_append WHERE c_custkey = o_custkey; -LOG: join order: [ "orders_hash" ][ single range partition join "customer_append" ] +LOG: join order: [ "orders_hash" ][ dual partition join "customer_append" ] QUERY PLAN --------------------------------------------------------------------- Aggregate diff --git a/src/test/regress/expected/multi_join_order_tpch_repartition.out b/src/test/regress/expected/multi_join_order_tpch_repartition.out index 959673bd6..e26a4bfec 100644 --- a/src/test/regress/expected/multi_join_order_tpch_repartition.out +++ b/src/test/regress/expected/multi_join_order_tpch_repartition.out @@ -53,7 +53,7 @@ GROUP BY ORDER BY revenue DESC, o_orderdate; -LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single range partition join "customer_append" ] +LOG: join order: [ "orders" ][ local partition join "lineitem" ][ dual partition join "customer_append" ] QUERY PLAN --------------------------------------------------------------------- Sort @@ -97,16 +97,14 @@ GROUP BY c_comment ORDER BY revenue DESC; -LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single range partition join "customer_append" ][ reference join "nation" ] - QUERY PLAN +LOG: join order: [ "orders" ][ local partition join "lineitem" ][ dual partition join "customer_append" ][ reference join "nation" ] + QUERY PLAN --------------------------------------------------------------------- Sort - Sort Key: (sum(remote_scan.revenue)) DESC - -> HashAggregate - Group Key: remote_scan.c_custkey, remote_scan.c_name, remote_scan.c_acctbal, remote_scan.c_phone, remote_scan.n_name, remote_scan.c_address, remote_scan.c_comment - -> Custom Scan (Citus Adaptive) - explain statements for distributed queries are not enabled -(6 rows) + Sort Key: remote_scan.revenue DESC + -> Custom Scan (Citus Adaptive) + explain statements for distributed queries are not enabled +(4 rows) -- Query #19 from the TPC-H decision support benchmark (modified) EXPLAIN (COSTS OFF) @@ -139,7 +137,7 @@ WHERE AND l_shipmode in ('AIR', 'AIR REG', 'TRUCK') AND l_shipinstruct = 'DELIVER IN PERSON' ); -LOG: join order: [ "lineitem" ][ single range partition join "part_append" ] +LOG: join order: [ "lineitem" ][ dual partition join "part_append" ] QUERY PLAN --------------------------------------------------------------------- Aggregate @@ -159,7 +157,7 @@ WHERE c_custkey = o_custkey GROUP BY l_partkey; -LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single range partition join "part_append" ][ single range partition join "customer_append" ] +LOG: join order: [ "lineitem" ][ local partition join "orders" ][ dual partition join "part_append" ][ dual partition join "customer_append" ] QUERY PLAN --------------------------------------------------------------------- HashAggregate diff --git a/src/test/regress/expected/multi_join_pruning.out b/src/test/regress/expected/multi_join_pruning.out index 77131da45..df6d94197 100644 --- a/src/test/regress/expected/multi_join_pruning.out +++ b/src/test/regress/expected/multi_join_pruning.out @@ -101,7 +101,7 @@ EXPLAIN (COSTS OFF) SELECT count(*) FROM array_partitioned_table table1, array_partitioned_table table2 WHERE table1.array_column = table2.array_column; -DEBUG: Router planner does not support append-partitioned tables. +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: join prunable for intervals [{},{AZZXSP27F21T6,AZZXSP27F21T6}] and [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] DEBUG: join prunable for intervals [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] and [{},{AZZXSP27F21T6,AZZXSP27F21T6}] QUERY PLAN @@ -115,7 +115,7 @@ EXPLAIN (COSTS OFF) SELECT count(*) FROM composite_partitioned_table table1, composite_partitioned_table table2 WHERE table1.composite_column = table2.composite_column; -DEBUG: Router planner does not support append-partitioned tables. +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: join prunable for intervals [(a,3,b),(b,4,c)] and [(c,5,d),(d,6,e)] DEBUG: join prunable for intervals [(c,5,d),(d,6,e)] and [(a,3,b),(b,4,c)] QUERY PLAN @@ -130,7 +130,7 @@ EXPLAIN (COSTS OFF) SELECT count(*) FROM varchar_partitioned_table table1, varchar_partitioned_table table2 WHERE table1.varchar_column = table2.varchar_column; -DEBUG: Router planner does not support append-partitioned tables. +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: join prunable for intervals [AA1000U2AMO4ZGX,AZZXSP27F21T6] and [BA1000U2AMO4ZGX,BZZXSP27F21T6] DEBUG: join prunable for intervals [BA1000U2AMO4ZGX,BZZXSP27F21T6] and [AA1000U2AMO4ZGX,AZZXSP27F21T6] QUERY PLAN diff --git a/src/test/regress/expected/multi_partition_pruning.out b/src/test/regress/expected/multi_partition_pruning.out index 1d65721b7..87eee868a 100644 --- a/src/test/regress/expected/multi_partition_pruning.out +++ b/src/test/regress/expected/multi_partition_pruning.out @@ -57,7 +57,7 @@ CREATE TABLE varchar_partitioned_table ( varchar_column varchar(100) ); -SELECT create_distributed_table('varchar_partitioned_table', 'varchar_column', 'append'); +SELECT create_distributed_table('varchar_partitioned_table', 'varchar_column', 'range'); create_distributed_table --------------------------------------------------------------------- @@ -85,7 +85,7 @@ CREATE TABLE array_partitioned_table ( array_column text[] ); -SELECT create_distributed_table('array_partitioned_table', 'array_column', 'append'); +SELECT create_distributed_table('array_partitioned_table', 'array_column', 'range'); create_distributed_table --------------------------------------------------------------------- @@ -121,7 +121,7 @@ CREATE TABLE composite_partitioned_table ( composite_column composite_type ); -SELECT create_distributed_table('composite_partitioned_table', 'composite_column', 'append'); +SELECT create_distributed_table('composite_partitioned_table', 'composite_column', 'range'); create_distributed_table --------------------------------------------------------------------- @@ -150,40 +150,37 @@ INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, SET client_min_messages TO ERROR; EXPLAIN (COSTS OFF) SELECT count(*) FROM varchar_partitioned_table WHERE varchar_column = 'BA2'; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate - -> Custom Scan (Citus Adaptive) - Task Count: 1 - Tasks Shown: All - -> Task - Error: Could not get remote plan. -(6 rows) + Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Error: Could not get remote plan. +(5 rows) EXPLAIN (COSTS OFF) SELECT count(*) FROM array_partitioned_table WHERE array_column > '{BA1000U2AMO4ZGX, BZZXSP27F21T6}'; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate - -> Custom Scan (Citus Adaptive) - Task Count: 1 - Tasks Shown: All - -> Task - Error: Could not get remote plan. -(6 rows) + Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Error: Could not get remote plan. +(5 rows) EXPLAIN (COSTS OFF) SELECT count(*) FROM composite_partitioned_table WHERE composite_column < '(b,5,c)'::composite_type; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate - -> Custom Scan (Citus Adaptive) - Task Count: 1 - Tasks Shown: All - -> Task - Error: Could not get remote plan. -(6 rows) + Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Error: Could not get remote plan. +(5 rows) SET client_min_messages TO NOTICE; diff --git a/src/test/regress/expected/multi_repartition_join_planning.out b/src/test/regress/expected/multi_repartition_join_planning.out index e1bcda671..13f569a4e 100644 --- a/src/test/regress/expected/multi_repartition_join_planning.out +++ b/src/test/regress/expected/multi_repartition_join_planning.out @@ -66,24 +66,62 @@ ORDER BY DEBUG: Router planner does not support append-partitioned tables. DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647] DEBUG: join prunable for intervals [0,2147483647] and [-2147483648,-1] -DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [6001,7000] and [1,1000] +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 3 -DEBUG: pruning merge fetch taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 3 +DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 6 -DEBUG: join prunable for intervals [1,1000] and [1001,2000] -DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] -DEBUG: join prunable for intervals [6001,7000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] -DEBUG: pruning merge fetch taskId 1 -DETAIL: Creating dependency on merge taskId 5 -DEBUG: pruning merge fetch taskId 3 -DETAIL: Creating dependency on merge taskId 8 DEBUG: pruning merge fetch taskId 5 -DETAIL: Creating dependency on merge taskId 11 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 13 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 18 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 23 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 28 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 16 l_partkey | o_orderkey | count --------------------------------------------------------------------- 18 | 12005 | 1 diff --git a/src/test/regress/expected/multi_repartition_join_pruning.out b/src/test/regress/expected/multi_repartition_join_pruning.out index 733a7205e..285263eea 100644 --- a/src/test/regress/expected/multi_repartition_join_pruning.out +++ b/src/test/regress/expected/multi_repartition_join_pruning.out @@ -14,28 +14,47 @@ FROM WHERE o_custkey = c_custkey; DEBUG: Router planner does not support append-partitioned tables. -DEBUG: join prunable for intervals [1,1000] and [1001,2000] -DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] -DEBUG: join prunable for intervals [6001,7000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 3 -DEBUG: pruning merge fetch taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 6 DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 16 QUERY PLAN --------------------------------------------------------------------- Aggregate -> Custom Scan (Citus Adaptive) - Task Count: 3 + Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob Map Task Count: 2 - Merge Task Count: 3 -(7 rows) + Merge Task Count: 4 + -> MapMergeJob + Map Task Count: 3 + Merge Task Count: 4 +(10 rows) SELECT count(*) @@ -44,18 +63,34 @@ FROM WHERE o_custkey = c_custkey; DEBUG: Router planner does not support append-partitioned tables. -DEBUG: join prunable for intervals [1,1000] and [1001,2000] -DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] -DEBUG: join prunable for intervals [6001,7000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 3 -DEBUG: pruning merge fetch taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 6 DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 16 count --------------------------------------------------------------------- 2985 @@ -72,28 +107,47 @@ WHERE o_custkey = c_custkey AND o_orderkey < 0; DEBUG: Router planner does not support append-partitioned tables. -DEBUG: join prunable for intervals [1,1000] and [1001,2000] -DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] -DEBUG: join prunable for intervals [6001,7000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 3 -DEBUG: pruning merge fetch taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 6 DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 16 QUERY PLAN --------------------------------------------------------------------- Aggregate -> Custom Scan (Citus Adaptive) - Task Count: 3 + Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob Map Task Count: 2 - Merge Task Count: 3 -(7 rows) + Merge Task Count: 4 + -> MapMergeJob + Map Task Count: 3 + Merge Task Count: 4 +(10 rows) SELECT count(*) @@ -103,18 +157,34 @@ WHERE o_custkey = c_custkey AND o_orderkey < 0 AND o_orderkey > 0; DEBUG: Router planner does not support append-partitioned tables. -DEBUG: join prunable for intervals [1,1000] and [1001,2000] -DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] -DEBUG: join prunable for intervals [6001,7000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 3 -DEBUG: pruning merge fetch taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 6 DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 16 count --------------------------------------------------------------------- 0 @@ -139,8 +209,11 @@ DEBUG: Router planner does not support append-partitioned tables. Tasks Shown: None, not supported for re-partition queries -> MapMergeJob Map Task Count: 2 - Merge Task Count: 3 -(7 rows) + Merge Task Count: 4 + -> MapMergeJob + Map Task Count: 0 + Merge Task Count: 0 +(10 rows) SELECT count(*) @@ -359,7 +432,10 @@ DEBUG: Router planner does not support append-partitioned tables. -> MapMergeJob Map Task Count: 0 Merge Task Count: 0 -(6 rows) + -> MapMergeJob + Map Task Count: 0 + Merge Task Count: 0 +(9 rows) -- execute once, to verify that's handled SELECT @@ -389,7 +465,10 @@ DEBUG: Router planner does not support append-partitioned tables. -> MapMergeJob Map Task Count: 0 Merge Task Count: 0 -(6 rows) + -> MapMergeJob + Map Task Count: 0 + Merge Task Count: 0 +(9 rows) EXPLAIN (COSTS OFF) SELECT diff --git a/src/test/regress/expected/multi_repartition_join_ref.out b/src/test/regress/expected/multi_repartition_join_ref.out index 9d14058ed..4f2a4da7b 100644 --- a/src/test/regress/expected/multi_repartition_join_ref.out +++ b/src/test/regress/expected/multi_repartition_join_ref.out @@ -13,7 +13,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -41,7 +41,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -69,7 +69,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -96,7 +96,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ single range partition join "part_append" ][ cartesian product reference join "supplier" ] +LOG: join order: [ "lineitem" ][ dual partition join "part_append" ][ cartesian product reference join "supplier" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -124,7 +124,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -152,7 +152,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -180,7 +180,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -208,7 +208,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ single range partition join "part_append" ][ reference join "supplier" ] +LOG: join order: [ "lineitem" ][ dual partition join "part_append" ][ reference join "supplier" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_repartition_join_task_assignment.out b/src/test/regress/expected/multi_repartition_join_task_assignment.out index c94012a27..a7426a0f9 100644 --- a/src/test/regress/expected/multi_repartition_join_task_assignment.out +++ b/src/test/regress/expected/multi_repartition_join_task_assignment.out @@ -24,18 +24,38 @@ DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: no sharding pruning constraints on customer_append found DEBUG: shard count after pruning for customer_append: 3 -DEBUG: join prunable for intervals [1,1000] and [1001,2000] -DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] -DEBUG: join prunable for intervals [6001,7000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 3 -DEBUG: pruning merge fetch taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 6 DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 16 +DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx @@ -56,24 +76,44 @@ WHERE o_custkey = c_custkey AND o_orderkey = l_orderkey; DEBUG: Router planner does not support append-partitioned tables. +DEBUG: no sharding pruning constraints on customer_append found +DEBUG: shard count after pruning for customer_append: 3 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx DEBUG: no sharding pruning constraints on lineitem found DEBUG: shard count after pruning for lineitem: 2 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx -DEBUG: no sharding pruning constraints on customer_append found -DEBUG: shard count after pruning for customer_append: 3 -DEBUG: join prunable for intervals [1,1000] and [1001,2000] -DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] -DEBUG: join prunable for intervals [6001,7000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 2 DETAIL: Creating dependency on merge taskId 3 -DEBUG: pruning merge fetch taskId 3 -DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 8 DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 8 DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 16 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index e79fdd5c7..4342f6dea 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -1931,26 +1931,34 @@ DEBUG: query has a single distribution column value: 2 SELECT * FROM articles_hash ar join authors_range au on (ar.author_id = au.id) WHERE ar.author_id = 3; DEBUG: found no worker with all shard placements -DEBUG: join prunable for intervals [1,10] and [11,20] -DEBUG: join prunable for intervals [1,10] and [21,30] -DEBUG: join prunable for intervals [1,10] and [31,40] -DEBUG: join prunable for intervals [11,20] and [1,10] -DEBUG: join prunable for intervals [11,20] and [21,30] -DEBUG: join prunable for intervals [11,20] and [31,40] -DEBUG: join prunable for intervals [21,30] and [1,10] -DEBUG: join prunable for intervals [21,30] and [11,20] -DEBUG: join prunable for intervals [21,30] and [31,40] -DEBUG: join prunable for intervals [31,40] and [1,10] -DEBUG: join prunable for intervals [31,40] and [11,20] -DEBUG: join prunable for intervals [31,40] and [21,30] +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 2 -DEBUG: pruning merge fetch taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 4 DEBUG: pruning merge fetch taskId 5 -DETAIL: Creating dependency on merge taskId 6 +DETAIL: Creating dependency on merge taskId 10 DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 10 DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 20 id | author_id | title | word_count | name | id --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/expected/multi_subquery.out b/src/test/regress/expected/multi_subquery.out index 2261e0f02..f4c4ccc21 100644 --- a/src/test/regress/expected/multi_subquery.out +++ b/src/test/regress/expected/multi_subquery.out @@ -823,8 +823,7 @@ FROM l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; -ERROR: cannot push down this subquery -DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning +ERROR: shard counts of co-located tables do not match -- Check that we can prune shards in subqueries with VARCHAR partition columns CREATE TABLE subquery_pruning_varchar_test_table ( diff --git a/src/test/regress/expected/non_colocated_join_order.out b/src/test/regress/expected/non_colocated_join_order.out index e8e95f8be..5d646633d 100644 --- a/src/test/regress/expected/non_colocated_join_order.out +++ b/src/test/regress/expected/non_colocated_join_order.out @@ -22,10 +22,10 @@ SELECT master_create_distributed_table('test_table_2', 'id', 'append'); \copy test_table_2 FROM STDIN DELIMITER ',' SET citus.log_multi_join_order to TRUE; SET client_min_messages to DEBUG1; --- Since we both have same amount of shards and they are colocated on the same node --- local join logic will be triggered. +SET citus.enable_repartition_joins TO on; +-- when joining append tables we always get dual re-partition joins SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id; -LOG: join order: [ "test_table_1" ][ local partition join "test_table_2" ] +LOG: join order: [ "test_table_1" ][ dual partition join "test_table_2" ] count --------------------------------------------------------------------- 6 @@ -41,7 +41,7 @@ SET citus.shard_replication_factor to 1; -- for interval [8,10] repartition join logic will be triggered. SET citus.enable_repartition_joins to ON; SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id; -LOG: join order: [ "test_table_1" ][ single range partition join "test_table_2" ] +LOG: join order: [ "test_table_1" ][ dual partition join "test_table_2" ] count --------------------------------------------------------------------- 9 diff --git a/src/test/regress/expected/subquery_append.out b/src/test/regress/expected/subquery_append.out new file mode 100644 index 000000000..c023bc92c --- /dev/null +++ b/src/test/regress/expected/subquery_append.out @@ -0,0 +1,233 @@ +CREATE SCHEMA subquery_append; +SET search_path TO subquery_append; +CREATE TABLE append_table (key text, value int, extra int default 0); +CREATE INDEX ON append_table (key); +SELECT create_distributed_table('append_table', 'key', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT 1 FROM master_create_empty_shard('append_table'); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_create_empty_shard('append_table'); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +CREATE TABLE ref_table (value int); +CREATE INDEX ON ref_table (value); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +\COPY append_table (key,value) FROM STDIN WITH CSV +\COPY append_table (key,value) FROM STDIN WITH CSV +\COPY ref_table FROM STDIN WITH CSV +-- exercise some optimizer pushdown features with subqueries +SELECT count(*) FROM (SELECT random() FROM append_table) u; + count +--------------------------------------------------------------------- + 12 +(1 row) + +SELECT * FROM (SELECT DISTINCT key FROM append_table) sub ORDER BY 1 LIMIT 3; + key +--------------------------------------------------------------------- + abc + bcd + cde +(3 rows) + +SELECT DISTINCT key FROM (SELECT key FROM append_table) sub ORDER BY 1 LIMIT 3; + key +--------------------------------------------------------------------- + abc + bcd + cde +(3 rows) + +SELECT key, max(v) FROM (SELECT key, value + 1 AS v FROM append_table) sub GROUP BY key ORDER BY 1,2 LIMIT 3; + key | max +--------------------------------------------------------------------- + abc | 235 + bcd | 235 + cde | 346 +(3 rows) + +SELECT v, max(key) FROM (SELECT key, value + 1 AS v FROM append_table) sub GROUP BY v ORDER BY 1,2 LIMIT 3; + v | max +--------------------------------------------------------------------- + 1 | jkl + 2 | ijk + 124 | hij +(3 rows) + +SELECT key, row_number() OVER (ORDER BY value) FROM (SELECT key, value, random() FROM append_table) sub ORDER BY 1,2 LIMIT 3; + key | row_number +--------------------------------------------------------------------- + abc | 6 + abc | 9 + bcd | 4 +(3 rows) + +SELECT key, row_number() OVER (ORDER BY value PARTITION BY key) FROM (SELECT key, value, random() FROM append_table) sub ORDER BY 1,2 LIMIT 3; +ERROR: syntax error at or near "PARTITION" +SELECT key, row_number() OVER (ORDER BY value) FROM (SELECT key, value, random() FROM append_table) sub ORDER BY 1,2 LIMIT 3; + key | row_number +--------------------------------------------------------------------- + abc | 6 + abc | 9 + bcd | 4 +(3 rows) + +SELECT key, row_number() OVER (PARTITION BY key) FROM (SELECT key, value, random() FROM append_table) sub ORDER BY 1,2 LIMIT 3; + key | row_number +--------------------------------------------------------------------- + abc | 1 + abc | 2 + bcd | 1 +(3 rows) + +-- try some joins in subqueries +SELECT key, count(*) FROM (SELECT *, random() FROM append_table a JOIN append_table b USING (key)) u GROUP BY key ORDER BY 1,2 LIMIT 3; +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +SELECT key, count(*) FROM (SELECT *, random() FROM append_table a JOIN ref_table b USING (value)) u GROUP BY key ORDER BY 1,2 LIMIT 3; + key | count +--------------------------------------------------------------------- + abc | 1 + abc | 1 + bcd | 2 +(3 rows) + +SELECT key, value FROM append_table a WHERE value IN (SELECT * FROM ref_table) ORDER BY 1,2; + key | value +--------------------------------------------------------------------- + abc | 123 + abc | 234 + bcd | 123 + bcd | 234 + cde | 345 + efg | 123 + efg | 234 + hij | 123 + hij | 234 +(9 rows) + +SELECT key, value FROM append_table a WHERE key IN (SELECT key FROM append_table WHERE value > 100) ORDER BY 1,2; +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +SELECT key, value FROM append_table a WHERE value = (SELECT max(value) FROM ref_table) ORDER BY 1,2; + key | value +--------------------------------------------------------------------- + cde | 345 +(1 row) + +SELECT key, value FROM append_table a WHERE value = (SELECT max(value) FROM ref_table r WHERE a.value = r.value) ORDER BY 1,2; + key | value +--------------------------------------------------------------------- + abc | 123 + abc | 234 + bcd | 123 + bcd | 234 + cde | 345 + efg | 123 + efg | 234 + hij | 123 + hij | 234 +(9 rows) + +SELECT key, (SELECT max(value) FROM ref_table r WHERE r.value = a.value) FROM append_table a ORDER BY 1,2; + key | max +--------------------------------------------------------------------- + abc | 123 + abc | 234 + bcd | 123 + bcd | 234 + cde | 345 + def | + efg | 123 + efg | 234 + hij | 123 + hij | 234 + ijk | + jkl | +(12 rows) + +-- Test delete +BEGIN; +DELETE FROM append_table a WHERE a.value IN (SELECT s FROM generate_series(1,100) s); +SELECT count(*) FROM append_table; + count +--------------------------------------------------------------------- + 11 +(1 row) + +DELETE FROM append_table a USING ref_table r WHERE a.value = r.value; +SELECT count(*) FROM append_table; + count +--------------------------------------------------------------------- + 2 +(1 row) + +DELETE FROM append_table WHERE value < 2; +SELECT count(*) FROM append_table; + count +--------------------------------------------------------------------- + 1 +(1 row) + +DELETE FROM append_table; +SELECT count(*) FROM append_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + +DELETE FROM append_table a USING append_table b WHERE a.key = b.key; +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +END; +-- Test update +BEGIN; +UPDATE append_table a SET extra = 1 WHERE a.value IN (SELECT s FROM generate_series(1,100) s); +SELECT count(*) FROM append_table WHERE extra = 1; + count +--------------------------------------------------------------------- + 1 +(1 row) + +UPDATE append_table a SET extra = 1 FROM ref_table r WHERE a.value = r.value; +SELECT count(*) FROM append_table WHERE extra = 1; + count +--------------------------------------------------------------------- + 10 +(1 row) + +UPDATE append_table SET extra = 1 WHERE value < 2; +SELECT count(*) FROM append_table WHERE extra = 1; + count +--------------------------------------------------------------------- + 11 +(1 row) + +UPDATE append_table SET extra = 1; +SELECT count(*) FROM append_table WHERE extra = 1; + count +--------------------------------------------------------------------- + 12 +(1 row) + +UPDATE append_table a sET extra = 1 FROM append_table b WHERE a.key = b.key; +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +END; +DROP SCHEMA subquery_append CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table append_table +drop cascades to table ref_table diff --git a/src/test/regress/input/multi_behavioral_analytics_create_table_superuser.source b/src/test/regress/input/multi_behavioral_analytics_create_table_superuser.source index e42f31005..d1f308db9 100644 --- a/src/test/regress/input/multi_behavioral_analytics_create_table_superuser.source +++ b/src/test/regress/input/multi_behavioral_analytics_create_table_superuser.source @@ -141,7 +141,7 @@ CREATE TABLE events ( event_type character varying(255), event_time bigint ); -SELECT master_create_distributed_table('events', 'composite_id', 'range'); +SELECT create_distributed_table('events', 'composite_id', 'range'); SELECT master_create_empty_shard('events') AS new_shard_id \gset @@ -181,7 +181,11 @@ CREATE TABLE users ( composite_id user_composite_type, lastseen bigint ); -SELECT master_create_distributed_table('users', 'composite_id', 'range'); +SELECT create_distributed_table('users', 'composite_id', 'range'); + +-- we will guarantee co-locatedness for these tables +UPDATE pg_dist_partition SET colocationid = 20001 +WHERE logicalrelid = 'events'::regclass OR logicalrelid = 'users'::regclass; SELECT master_create_empty_shard('users') AS new_shard_id \gset @@ -228,7 +232,7 @@ CREATE TABLE lineitem_subquery ( l_shipmode char(10) not null, l_comment varchar(44) not null, PRIMARY KEY(l_orderkey, l_linenumber) ); -SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range'); +SELECT create_distributed_table('lineitem_subquery', 'l_orderkey', 'range'); CREATE TABLE orders_subquery ( o_orderkey bigint not null, @@ -241,7 +245,11 @@ CREATE TABLE orders_subquery ( o_shippriority integer not null, o_comment varchar(79) not null, PRIMARY KEY(o_orderkey) ); -SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range'); +SELECT create_distributed_table('orders_subquery', 'o_orderkey', 'range'); + +-- we will guarantee co-locatedness for these tabes +UPDATE pg_dist_partition SET colocationid = 20002 +WHERE logicalrelid = 'orders_subquery'::regclass OR logicalrelid = 'lineitem_subquery'::regclass; SET citus.enable_router_execution TO 'false'; diff --git a/src/test/regress/input/multi_outer_join.source b/src/test/regress/input/multi_outer_join.source index 820e9c625..f4708da13 100644 --- a/src/test/regress/input/multi_outer_join.source +++ b/src/test/regress/input/multi_outer_join.source @@ -16,7 +16,7 @@ CREATE TABLE multi_outer_join_left l_mktsegment char(10) not null, l_comment varchar(117) not null ); -SELECT master_create_distributed_table('multi_outer_join_left', 'l_custkey', 'append'); +SELECT create_distributed_table('multi_outer_join_left', 'l_custkey', 'hash'); CREATE TABLE multi_outer_join_right ( @@ -29,7 +29,7 @@ CREATE TABLE multi_outer_join_right r_mktsegment char(10) not null, r_comment varchar(117) not null ); -SELECT master_create_distributed_table('multi_outer_join_right', 'r_custkey', 'append'); +SELECT create_distributed_table('multi_outer_join_right', 'r_custkey', 'hash'); CREATE TABLE multi_outer_join_right_reference ( @@ -55,7 +55,7 @@ CREATE TABLE multi_outer_join_third t_mktsegment char(10) not null, t_comment varchar(117) not null ); -SELECT master_create_distributed_table('multi_outer_join_third', 't_custkey', 'append'); +SELECT create_distributed_table('multi_outer_join_third', 't_custkey', 'hash'); CREATE TABLE multi_outer_join_third_reference ( @@ -70,18 +70,8 @@ CREATE TABLE multi_outer_join_third_reference ); SELECT create_reference_table('multi_outer_join_third_reference'); - --- Make sure we do not crash if both tables have no shards -SELECT - min(l_custkey), max(l_custkey) -FROM - multi_outer_join_left a LEFT JOIN multi_outer_join_third b ON (l_custkey = t_custkey); - --- Left table is a large table \copy multi_outer_join_left FROM '@abs_srcdir@/data/customer-1-10.data' with delimiter '|' \copy multi_outer_join_left FROM '@abs_srcdir@/data/customer-11-20.data' with delimiter '|' - --- Right table is a small table \copy multi_outer_join_right FROM '@abs_srcdir@/data/customer-1-15.data' with delimiter '|' \copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-1-15.data' with delimiter '|' @@ -187,8 +177,8 @@ FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey); -- empty tables -SELECT * FROM master_apply_delete_command('DELETE FROM multi_outer_join_left'); -SELECT * FROM master_apply_delete_command('DELETE FROM multi_outer_join_right'); +TRUNCATE multi_outer_join_left; +TRUNCATE multi_outer_join_right; -- reload shards with 1-1 matching \copy multi_outer_join_left FROM '@abs_srcdir@/data/customer-subset-11-20.data' with delimiter '|' @@ -482,7 +472,8 @@ FROM (multi_outer_join_right r1 LEFT OUTER JOIN multi_outer_join_left l1 ON (l1.l_custkey = r1.r_custkey)) AS test(c_custkey, c_nationkey) - INNER JOIN multi_outer_join_third t1 ON (test.c_custkey = t1.t_custkey); + INNER JOIN multi_outer_join_third t1 ON (test.c_custkey = t1.t_custkey) +ORDER BY 1; -- simple test to ensure anti-joins work with hash-partitioned tables CREATE TABLE left_values(val int); diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index f45224301..a167440e5 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -50,7 +50,7 @@ test: union_pushdown test: set_operation_and_local_tables test: subqueries_deep subquery_view subquery_partitioning subqueries_not_supported -test: subquery_in_targetlist subquery_in_where subquery_complex_target_list +test: subquery_in_targetlist subquery_in_where subquery_complex_target_list subquery_append test: subquery_prepared_statements test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins non_colocated_join_order test: cte_inline recursive_view_local_table values diff --git a/src/test/regress/output/multi_behavioral_analytics_create_table_superuser.source b/src/test/regress/output/multi_behavioral_analytics_create_table_superuser.source index 12e94074e..3813cef17 100644 --- a/src/test/regress/output/multi_behavioral_analytics_create_table_superuser.source +++ b/src/test/regress/output/multi_behavioral_analytics_create_table_superuser.source @@ -7,9 +7,9 @@ SELECT run_command_on_master_and_workers($f$ IMMUTABLE RETURNS NULL ON NULL INPUT; $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -20,9 +20,9 @@ SELECT run_command_on_master_and_workers($f$ IMMUTABLE RETURNS NULL ON NULL INPUT; $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -33,9 +33,9 @@ SELECT run_command_on_master_and_workers($f$ IMMUTABLE RETURNS NULL ON NULL INPUT; $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -45,9 +45,9 @@ SELECT run_command_on_master_and_workers($f$ AS 'record_eq' IMMUTABLE; $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -58,9 +58,9 @@ SELECT run_command_on_master_and_workers($f$ IMMUTABLE RETURNS NULL ON NULL INPUT; $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -71,9 +71,9 @@ SELECT run_command_on_master_and_workers($f$ IMMUTABLE RETURNS NULL ON NULL INPUT; $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -84,9 +84,9 @@ SELECT run_command_on_master_and_workers($f$ PROCEDURE = gt_user_composite_type_function ); $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -97,9 +97,9 @@ SELECT run_command_on_master_and_workers($f$ PROCEDURE = ge_user_composite_type_function ); $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) -- ... use that function to create a custom equality operator... @@ -117,9 +117,9 @@ SELECT run_command_on_master_and_workers($f$ hashes ); $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -130,9 +130,9 @@ SELECT run_command_on_master_and_workers($f$ PROCEDURE = le_user_composite_type_function ); $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -143,9 +143,9 @@ SELECT run_command_on_master_and_workers($f$ PROCEDURE = lt_user_composite_type_function ); $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) -- ... and create a custom operator family for hash indexes... @@ -153,9 +153,9 @@ SELECT run_command_on_master_and_workers($f$ CREATE OPERATOR FAMILY cats_2_op_fam USING hash; $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) -- We need to define two different operator classes for the composite types @@ -172,9 +172,9 @@ SELECT run_command_on_master_and_workers($f$ FUNCTION 1 cmp_user_composite_type_function(user_composite_type, user_composite_type); $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -184,9 +184,9 @@ SELECT run_command_on_master_and_workers($f$ OPERATOR 1 = (user_composite_type, user_composite_type), FUNCTION 1 test_composite_type_hash(user_composite_type); $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) CREATE TABLE events ( @@ -195,10 +195,10 @@ CREATE TABLE events ( event_type character varying(255), event_time bigint ); -SELECT master_create_distributed_table('events', 'composite_id', 'range'); - master_create_distributed_table ---------------------------------- - +SELECT create_distributed_table('events', 'composite_id', 'range'); + create_distributed_table +--------------------------------------------------------------------- + (1 row) SELECT master_create_empty_shard('events') AS new_shard_id @@ -222,12 +222,15 @@ CREATE TABLE users ( composite_id user_composite_type, lastseen bigint ); -SELECT master_create_distributed_table('users', 'composite_id', 'range'); - master_create_distributed_table ---------------------------------- - +SELECT create_distributed_table('users', 'composite_id', 'range'); + create_distributed_table +--------------------------------------------------------------------- + (1 row) +-- we will guarantee co-locatedness for these tables +UPDATE pg_dist_partition SET colocationid = 20001 +WHERE logicalrelid = 'events'::regclass OR logicalrelid = 'users'::regclass; SELECT master_create_empty_shard('users') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' @@ -264,10 +267,10 @@ CREATE TABLE lineitem_subquery ( l_shipmode char(10) not null, l_comment varchar(44) not null, PRIMARY KEY(l_orderkey, l_linenumber) ); -SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range'); - master_create_distributed_table ---------------------------------- - +SELECT create_distributed_table('lineitem_subquery', 'l_orderkey', 'range'); + create_distributed_table +--------------------------------------------------------------------- + (1 row) CREATE TABLE orders_subquery ( @@ -281,12 +284,15 @@ CREATE TABLE orders_subquery ( o_shippriority integer not null, o_comment varchar(79) not null, PRIMARY KEY(o_orderkey) ); -SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range'); - master_create_distributed_table ---------------------------------- - +SELECT create_distributed_table('orders_subquery', 'o_orderkey', 'range'); + create_distributed_table +--------------------------------------------------------------------- + (1 row) +-- we will guarantee co-locatedness for these tabes +UPDATE pg_dist_partition SET colocationid = 20002 +WHERE logicalrelid = 'orders_subquery'::regclass OR logicalrelid = 'lineitem_subquery'::regclass; SET citus.enable_router_execution TO 'false'; -- Check that we don't crash if there are not any shards. SELECT @@ -302,9 +308,9 @@ FROM l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; - avg ------ - + avg +--------------------------------------------------------------------- + (1 row) -- Load data into tables. diff --git a/src/test/regress/output/multi_outer_join.source b/src/test/regress/output/multi_outer_join.source index e4b43ec0d..94a152989 100644 --- a/src/test/regress/output/multi_outer_join.source +++ b/src/test/regress/output/multi_outer_join.source @@ -12,10 +12,10 @@ CREATE TABLE multi_outer_join_left l_mktsegment char(10) not null, l_comment varchar(117) not null ); -SELECT master_create_distributed_table('multi_outer_join_left', 'l_custkey', 'append'); - master_create_distributed_table ---------------------------------- - +SELECT create_distributed_table('multi_outer_join_left', 'l_custkey', 'hash'); + create_distributed_table +--------------------------------------------------------------------- + (1 row) CREATE TABLE multi_outer_join_right @@ -29,10 +29,10 @@ CREATE TABLE multi_outer_join_right r_mktsegment char(10) not null, r_comment varchar(117) not null ); -SELECT master_create_distributed_table('multi_outer_join_right', 'r_custkey', 'append'); - master_create_distributed_table ---------------------------------- - +SELECT create_distributed_table('multi_outer_join_right', 'r_custkey', 'hash'); + create_distributed_table +--------------------------------------------------------------------- + (1 row) CREATE TABLE multi_outer_join_right_reference @@ -47,9 +47,9 @@ CREATE TABLE multi_outer_join_right_reference r_comment varchar(117) not null ); SELECT create_reference_table('multi_outer_join_right_reference'); - create_reference_table ------------------------- - + create_reference_table +--------------------------------------------------------------------- + (1 row) CREATE TABLE multi_outer_join_third @@ -63,10 +63,10 @@ CREATE TABLE multi_outer_join_third t_mktsegment char(10) not null, t_comment varchar(117) not null ); -SELECT master_create_distributed_table('multi_outer_join_third', 't_custkey', 'append'); - master_create_distributed_table ---------------------------------- - +SELECT create_distributed_table('multi_outer_join_third', 't_custkey', 'hash'); + create_distributed_table +--------------------------------------------------------------------- + (1 row) CREATE TABLE multi_outer_join_third_reference @@ -81,25 +81,13 @@ CREATE TABLE multi_outer_join_third_reference t_comment varchar(117) not null ); SELECT create_reference_table('multi_outer_join_third_reference'); - create_reference_table ------------------------- - + create_reference_table +--------------------------------------------------------------------- + (1 row) --- Make sure we do not crash if both tables have no shards -SELECT - min(l_custkey), max(l_custkey) -FROM - multi_outer_join_left a LEFT JOIN multi_outer_join_third b ON (l_custkey = t_custkey); - min | max ------+----- - | -(1 row) - --- Left table is a large table \copy multi_outer_join_left FROM '@abs_srcdir@/data/customer-1-10.data' with delimiter '|' \copy multi_outer_join_left FROM '@abs_srcdir@/data/customer-11-20.data' with delimiter '|' --- Right table is a small table \copy multi_outer_join_right FROM '@abs_srcdir@/data/customer-1-15.data' with delimiter '|' \copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-1-15.data' with delimiter '|' -- Make sure we do not crash if one table has no shards @@ -107,14 +95,18 @@ SELECT min(l_custkey), max(l_custkey) FROM multi_outer_join_left a LEFT JOIN multi_outer_join_third b ON (l_custkey = t_custkey); -ERROR: shard counts of co-located tables do not match + min | max +--------------------------------------------------------------------- + 1 | 20 +(1 row) + SELECT min(t_custkey), max(t_custkey) FROM multi_outer_join_third a LEFT JOIN multi_outer_join_right_reference b ON (r_custkey = t_custkey); - min | max ------+----- - | + min | max +--------------------------------------------------------------------- + | (1 row) -- Third table is a single shard table with all data @@ -125,8 +117,8 @@ SELECT min(l_custkey), max(l_custkey) FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey); - min | max ------+----- + min | max +--------------------------------------------------------------------- 1 | 20 (1 row) @@ -135,8 +127,8 @@ SELECT count(*) FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right_reference b ON (l_nationkey = r_nationkey); - count -------- + count +--------------------------------------------------------------------- 28 (1 row) @@ -147,8 +139,8 @@ FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey) WHERE r_custkey IS NULL; - min | max ------+----- + min | max +--------------------------------------------------------------------- 16 | 20 (1 row) @@ -159,8 +151,8 @@ FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey) WHERE r_custkey IS NULL OR r_custkey = 5; - min | max ------+----- + min | max +--------------------------------------------------------------------- 5 | 20 (1 row) @@ -172,8 +164,8 @@ FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey) WHERE r_custkey = 5 or r_custkey > 15; - min | max ------+----- + min | max +--------------------------------------------------------------------- 5 | 5 (1 row) @@ -183,8 +175,8 @@ SELECT FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey AND r_custkey = 5); - count | count --------+------- + count | count +--------------------------------------------------------------------- 20 | 1 (1 row) @@ -194,8 +186,8 @@ SELECT FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey AND r_custkey = -1 /* nonexistant */); - count | count --------+------- + count | count +--------------------------------------------------------------------- 20 | 0 (1 row) @@ -205,8 +197,8 @@ SELECT FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey AND l_custkey = -1 /* nonexistant */); - count | count --------+------- + count | count +--------------------------------------------------------------------- 20 | 0 (1 row) @@ -215,14 +207,18 @@ SELECT min(r_custkey), max(r_custkey) FROM multi_outer_join_left a RIGHT JOIN multi_outer_join_right b ON (l_custkey = r_custkey); -ERROR: shard counts of co-located tables do not match + min | max +--------------------------------------------------------------------- + 1 | 15 +(1 row) + -- Reverse right join should be same as left join SELECT min(l_custkey), max(l_custkey) FROM multi_outer_join_right_reference a RIGHT JOIN multi_outer_join_left b ON (l_custkey = r_custkey); - min | max ------+----- + min | max +--------------------------------------------------------------------- 1 | 20 (1 row) @@ -233,24 +229,14 @@ SELECT min(l_custkey), max(l_custkey) FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey); - min | max ------+----- + min | max +--------------------------------------------------------------------- 1 | 20 (1 row) -- empty tables -SELECT * FROM master_apply_delete_command('DELETE FROM multi_outer_join_left'); - master_apply_delete_command ------------------------------ - 2 -(1 row) - -SELECT * FROM master_apply_delete_command('DELETE FROM multi_outer_join_right'); - master_apply_delete_command ------------------------------ - 2 -(1 row) - +TRUNCATE multi_outer_join_left; +TRUNCATE multi_outer_join_right; -- reload shards with 1-1 matching \copy multi_outer_join_left FROM '@abs_srcdir@/data/customer-subset-11-20.data' with delimiter '|' \copy multi_outer_join_left FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|' @@ -262,8 +248,8 @@ SELECT min(l_custkey), max(l_custkey) FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey); - min | max ------+----- + min | max +--------------------------------------------------------------------- 11 | 30 (1 row) @@ -280,8 +266,8 @@ FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey) WHERE r_custkey IS NULL; - min | max ------+----- + min | max +--------------------------------------------------------------------- 23 | 29 (1 row) @@ -292,8 +278,8 @@ FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey) WHERE r_custkey IS NULL OR r_custkey = 15; - min | max ------+----- + min | max +--------------------------------------------------------------------- 23 | 29 (1 row) @@ -305,8 +291,8 @@ FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey) WHERE r_custkey = 21 or r_custkey < 10; - min | max ------+----- + min | max +--------------------------------------------------------------------- 21 | 21 (1 row) @@ -316,8 +302,8 @@ SELECT FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey AND r_custkey = 21); - count | count --------+------- + count | count +--------------------------------------------------------------------- 17 | 1 (1 row) @@ -326,8 +312,8 @@ SELECT min(r_custkey), max(r_custkey) FROM multi_outer_join_left a RIGHT JOIN multi_outer_join_right b ON (l_custkey = r_custkey); - min | max ------+----- + min | max +--------------------------------------------------------------------- 11 | 30 (1 row) @@ -336,8 +322,8 @@ SELECT min(l_custkey), max(l_custkey) FROM multi_outer_join_right a RIGHT JOIN multi_outer_join_left b ON (l_custkey = r_custkey); - min | max ------+----- + min | max +--------------------------------------------------------------------- 11 | 30 (1 row) @@ -351,8 +337,8 @@ FROM RIGHT JOIN multi_outer_join_left l2 ON (r2.r_custkey = l2.l_custkey) ORDER BY 1 LIMIT 1; - l_custkey ------------ + l_custkey +--------------------------------------------------------------------- 11 (1 row) @@ -368,9 +354,9 @@ WHERE r1.r_custkey is NULL ORDER BY 1 LIMIT 1; - l_custkey ------------ - + l_custkey +--------------------------------------------------------------------- + (1 row) -- Three way join 2-2-1 (local + broadcast join) should work @@ -381,8 +367,8 @@ FROM LEFT JOIN multi_outer_join_right r1 ON (l1.l_custkey = r1.r_custkey) LEFT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey) ORDER BY l_custkey, r_custkey, t_custkey; - l_custkey | r_custkey | t_custkey ------------+-----------+----------- + l_custkey | r_custkey | t_custkey +--------------------------------------------------------------------- 11 | 11 | 11 12 | 12 | 12 14 | 14 | 14 @@ -392,13 +378,13 @@ ORDER BY l_custkey, r_custkey, t_custkey; 20 | 20 | 20 21 | 21 | 21 22 | 22 | 22 - 23 | | + 23 | | 24 | 24 | 24 - 25 | | + 25 | | 26 | 26 | 26 27 | 27 | 27 28 | 28 | 28 - 29 | | + 29 | | 30 | 30 | 30 (17 rows) @@ -420,17 +406,17 @@ FROM RIGHT JOIN multi_outer_join_right r1 ON (t1.t_custkey = r1.r_custkey) LEFT JOIN multi_outer_join_left l1 ON (r1.r_custkey = l1.l_custkey) ORDER BY t_custkey, r_custkey, l_custkey; - t_custkey | r_custkey | l_custkey ------------+-----------+----------- + t_custkey | r_custkey | l_custkey +--------------------------------------------------------------------- 11 | 11 | 11 12 | 12 | 12 - 13 | 13 | + 13 | 13 | 14 | 14 | 14 - 15 | 15 | + 15 | 15 | 16 | 16 | 16 17 | 17 | 17 18 | 18 | 18 - 19 | 19 | + 19 | 19 | 20 | 20 | 20 21 | 21 | 21 22 | 22 | 22 @@ -451,11 +437,11 @@ FROM WHERE l_custkey is NULL ORDER BY t_custkey, r_custkey, l_custkey; - t_custkey | r_custkey | l_custkey ------------+-----------+----------- - 13 | 13 | - 15 | 15 | - 19 | 19 | + t_custkey | r_custkey | l_custkey +--------------------------------------------------------------------- + 13 | 13 | + 15 | 15 | + 19 | 19 | (3 rows) -- Cascading right join with single shard left most table @@ -466,8 +452,8 @@ FROM RIGHT JOIN multi_outer_join_right r1 ON (t1.t_custkey = r1.r_custkey) RIGHT JOIN multi_outer_join_left l1 ON (r1.r_custkey = l1.l_custkey) ORDER BY 1,2,3; - t_custkey | r_custkey | l_custkey ------------+-----------+----------- + t_custkey | r_custkey | l_custkey +--------------------------------------------------------------------- 11 | 11 | 11 12 | 12 | 12 14 | 14 | 14 @@ -494,19 +480,19 @@ FROM multi_outer_join_left l1 FULL JOIN multi_outer_join_right r1 ON (l1.l_custkey = r1.r_custkey) ORDER BY 1 DESC, 2 DESC; - l_custkey | r_custkey ------------+----------- + l_custkey | r_custkey +--------------------------------------------------------------------- | 19 | 15 | 13 30 | 30 - 29 | + 29 | 28 | 28 27 | 27 26 | 26 - 25 | + 25 | 24 | 24 - 23 | + 23 | 22 | 22 21 | 21 20 | 20 @@ -524,14 +510,14 @@ SELECT FROM multi_outer_join_left l1 FULL JOIN multi_outer_join_right r1 ON (l1.l_custkey = r1.r_custkey) -WHERE +WHERE r_custkey is NULL ORDER BY 1 DESC, 2 DESC; - l_custkey | r_custkey ------------+----------- - 29 | - 25 | - 23 | + l_custkey | r_custkey +--------------------------------------------------------------------- + 29 | + 25 | + 23 | (3 rows) -- full outer join + anti (left) should work with 1-1 matched shards @@ -540,11 +526,11 @@ SELECT FROM multi_outer_join_left l1 FULL JOIN multi_outer_join_right r1 ON (l1.l_custkey = r1.r_custkey) -WHERE +WHERE l_custkey is NULL ORDER BY 1 DESC, 2 DESC; - l_custkey | r_custkey ------------+----------- + l_custkey | r_custkey +--------------------------------------------------------------------- | 19 | 15 | 13 @@ -556,17 +542,17 @@ SELECT FROM multi_outer_join_left l1 FULL JOIN multi_outer_join_right r1 ON (l1.l_custkey = r1.r_custkey) -WHERE +WHERE l_custkey is NULL or r_custkey is NULL ORDER BY 1 DESC, 2 DESC; - l_custkey | r_custkey ------------+----------- + l_custkey | r_custkey +--------------------------------------------------------------------- | 19 | 15 | 13 - 29 | - 25 | - 23 | + 29 | + 25 | + 23 | (6 rows) -- full outer join should error out for mismatched shards @@ -576,7 +562,40 @@ FROM multi_outer_join_left l1 FULL JOIN multi_outer_join_third t1 ON (l1.l_custkey = t1.t_custkey) ORDER BY 1 DESC, 2 DESC; -ERROR: shard counts of co-located tables do not match + l_custkey | t_custkey +--------------------------------------------------------------------- + | 19 + | 15 + | 13 + | 10 + | 9 + | 8 + | 7 + | 6 + | 5 + | 4 + | 3 + | 2 + | 1 + 30 | 30 + 29 | 29 + 28 | 28 + 27 | 27 + 26 | 26 + 25 | 25 + 24 | 24 + 23 | 23 + 22 | 22 + 21 | 21 + 20 | 20 + 18 | 18 + 17 | 17 + 16 | 16 + 14 | 14 + 12 | 12 + 11 | 11 +(30 rows) + -- inner join + single shard left join should work SELECT l_custkey, r_custkey, t_custkey @@ -585,8 +604,8 @@ FROM INNER JOIN multi_outer_join_right r1 ON (l1.l_custkey = r1.r_custkey) LEFT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey) ORDER BY 1 DESC, 2 DESC, 3 DESC; - l_custkey | r_custkey | t_custkey ------------+-----------+----------- + l_custkey | r_custkey | t_custkey +--------------------------------------------------------------------- 30 | 30 | 30 28 | 28 | 28 27 | 27 | 27 @@ -611,16 +630,16 @@ FROM INNER JOIN multi_outer_join_third_reference t1 ON (l1.l_custkey = t1.t_custkey) LEFT JOIN multi_outer_join_right r1 ON (l1.l_custkey = r1.r_custkey) ORDER BY 1 DESC, 2 DESC, 3 DESC; - l_custkey | t_custkey | r_custkey ------------+-----------+----------- + l_custkey | t_custkey | r_custkey +--------------------------------------------------------------------- 30 | 30 | 30 - 29 | 29 | + 29 | 29 | 28 | 28 | 28 27 | 27 | 27 26 | 26 | 26 - 25 | 25 | + 25 | 25 | 24 | 24 | 24 - 23 | 23 | + 23 | 23 | 22 | 22 | 22 21 | 21 | 21 20 | 20 | 20 @@ -641,8 +660,8 @@ FROM LEFT JOIN multi_outer_join_right r1 ON (l1.l_custkey = r1.r_custkey) ORDER BY t_custkey, l_custkey, r_custkey; - t_custkey | l_custkey | r_custkey ------------+-----------+----------- + t_custkey | l_custkey | r_custkey +--------------------------------------------------------------------- 11 | 11 | 11 12 | 12 | 12 14 | 14 | 14 @@ -652,13 +671,13 @@ ORDER BY 20 | 20 | 20 21 | 21 | 21 22 | 22 | 22 - 23 | 23 | + 23 | 23 | 24 | 24 | 24 - 25 | 25 | + 25 | 25 | 26 | 26 | 26 27 | 27 | 27 28 | 28 | 28 - 29 | 29 | + 29 | 29 | 30 | 30 | 30 (17 rows) @@ -670,16 +689,16 @@ FROM INNER JOIN multi_outer_join_third_reference t1 ON (l1.l_custkey = t1.t_custkey) LEFT JOIN multi_outer_join_right r1 ON (l1.l_custkey = r1.r_custkey) ORDER BY 1 DESC, 2 DESC, 3 DESC; - l_custkey | t_custkey | r_custkey ------------+-----------+----------- + l_custkey | t_custkey | r_custkey +--------------------------------------------------------------------- 30 | 30 | 30 - 29 | 29 | + 29 | 29 | 28 | 28 | 28 27 | 27 | 27 26 | 26 | 26 - 25 | 25 | + 25 | 25 | 24 | 24 | 24 - 23 | 23 | + 23 | 23 | 22 | 22 | 22 21 | 21 | 21 20 | 20 | 20 @@ -701,24 +720,24 @@ FROM WHERE r_custkey is NULL ORDER BY 1 DESC, 2 DESC, 3 DESC; - l_custkey | t_custkey | r_custkey ------------+-----------+----------- - 29 | 29 | - 25 | 25 | - 23 | 23 | + l_custkey | t_custkey | r_custkey +--------------------------------------------------------------------- + 29 | 29 | + 25 | 25 | + 23 | 23 | (3 rows) --- Test joinExpr aliases by performing an outer-join. -SELECT +-- Test joinExpr aliases by performing an outer-join. +SELECT t_custkey -FROM - (multi_outer_join_right r1 +FROM + (multi_outer_join_right r1 LEFT OUTER JOIN multi_outer_join_left l1 ON (l1.l_custkey = r1.r_custkey)) AS test(c_custkey, c_nationkey) INNER JOIN multi_outer_join_third_reference t1 ON (test.c_custkey = t1.t_custkey) ORDER BY 1 DESC; - t_custkey ------------ + t_custkey +--------------------------------------------------------------------- 30 28 27 @@ -755,8 +774,8 @@ LEFT JOIN ( GROUP BY l1.l_custkey ORDER BY cnt DESC, l1.l_custkey DESC LIMIT 20; - l_custkey | cnt ------------+----- + l_custkey | cnt +--------------------------------------------------------------------- 30 | 1 29 | 1 28 | 1 @@ -782,45 +801,88 @@ SELECT min(l_custkey), max(l_custkey) FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey); -ERROR: cannot push down this subquery -DETAIL: Currently append partitioned relations with overlapping shard intervals are not supported + min | max +--------------------------------------------------------------------- + 1 | 1000 +(1 row) + SELECT min(l_custkey), max(l_custkey) FROM multi_outer_join_left a RIGHT JOIN multi_outer_join_right b ON (l_custkey = r_custkey); -ERROR: cannot push down this subquery -DETAIL: Currently append partitioned relations with overlapping shard intervals are not supported + min | max +--------------------------------------------------------------------- + 11 | 30 +(1 row) + SELECT min(l_custkey), max(l_custkey) FROM multi_outer_join_left a FULL JOIN multi_outer_join_right b ON (l_custkey = r_custkey); -ERROR: cannot push down this subquery -DETAIL: Currently append partitioned relations with overlapping shard intervals are not supported -SELECT + min | max +--------------------------------------------------------------------- + 1 | 1000 +(1 row) + +SELECT t_custkey -FROM - (multi_outer_join_right r1 +FROM + (multi_outer_join_right r1 LEFT OUTER JOIN multi_outer_join_left l1 ON (l1.l_custkey = r1.r_custkey)) AS test(c_custkey, c_nationkey) - INNER JOIN multi_outer_join_third t1 ON (test.c_custkey = t1.t_custkey); -ERROR: cannot push down this subquery -DETAIL: Currently append partitioned relations with overlapping shard intervals are not supported + INNER JOIN multi_outer_join_third t1 ON (test.c_custkey = t1.t_custkey) +ORDER BY 1; + t_custkey +--------------------------------------------------------------------- + 11 + 11 + 12 + 12 + 13 + 14 + 14 + 15 + 16 + 16 + 17 + 17 + 18 + 18 + 19 + 20 + 20 + 21 + 21 + 22 + 22 + 24 + 24 + 26 + 26 + 27 + 27 + 28 + 28 + 30 + 30 +(31 rows) + -- simple test to ensure anti-joins work with hash-partitioned tables CREATE TABLE left_values(val int); SET citus.shard_count to 16; SET citus.shard_replication_factor to 1; SELECT create_distributed_table('left_values', 'val'); - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) \copy left_values from stdin CREATE TABLE right_values(val int); SELECT create_distributed_table('right_values', 'val'); - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) \copy right_values from stdin @@ -832,9 +894,9 @@ FROM WHERE r.val IS NULL ORDER BY 1 DESC, 2 DESC; - val | val ------+----- - 5 | - 1 | + val | val +--------------------------------------------------------------------- + 5 | + 1 | (2 rows) diff --git a/src/test/regress/sql/ignoring_orphaned_shards.sql b/src/test/regress/sql/ignoring_orphaned_shards.sql index a3aa74db9..2b56269ef 100644 --- a/src/test/regress/sql/ignoring_orphaned_shards.sql +++ b/src/test/regress/sql/ignoring_orphaned_shards.sql @@ -128,12 +128,16 @@ CREATE TABLE range2(id int); SELECT create_distributed_table('range2', 'id', 'range'); CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}'); +-- Mark tables co-located +UPDATE pg_dist_partition SET colocationid = 30001 +WHERE logicalrelid = 'range1'::regclass OR logicalrelid = 'range2'::regclass; + -- Move shard placement and DON'T clean it up, now range1 and range2 are -- colocated, but only range2 has an orphaned shard. SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid; --- Make sure that tables are detected as colocated +-- Make sure co-located join works SELECT * FROM range1 JOIN range2 ON range1.id = range2.id; -- Make sure we can create a foreign key on community edition, because diff --git a/src/test/regress/sql/multi_partition_pruning.sql b/src/test/regress/sql/multi_partition_pruning.sql index 1fcb17b61..4e849b803 100644 --- a/src/test/regress/sql/multi_partition_pruning.sql +++ b/src/test/regress/sql/multi_partition_pruning.sql @@ -38,7 +38,7 @@ CREATE TABLE varchar_partitioned_table ( varchar_column varchar(100) ); -SELECT create_distributed_table('varchar_partitioned_table', 'varchar_column', 'append'); +SELECT create_distributed_table('varchar_partitioned_table', 'varchar_column', 'range'); -- Create logical shards and shard placements with shardid 100,101 @@ -67,7 +67,7 @@ CREATE TABLE array_partitioned_table ( array_column text[] ); -SELECT create_distributed_table('array_partitioned_table', 'array_column', 'append'); +SELECT create_distributed_table('array_partitioned_table', 'array_column', 'range'); SET client_min_messages TO DEBUG2; -- Create logical shard with shardid 102, 103 @@ -105,7 +105,7 @@ CREATE TABLE composite_partitioned_table ( composite_column composite_type ); -SELECT create_distributed_table('composite_partitioned_table', 'composite_column', 'append'); +SELECT create_distributed_table('composite_partitioned_table', 'composite_column', 'range'); SET client_min_messages TO DEBUG2; -- Create logical shard with shardid 104, 105 diff --git a/src/test/regress/sql/non_colocated_join_order.sql b/src/test/regress/sql/non_colocated_join_order.sql index 631d1f3aa..4c77f68c2 100644 --- a/src/test/regress/sql/non_colocated_join_order.sql +++ b/src/test/regress/sql/non_colocated_join_order.sql @@ -36,9 +36,9 @@ SELECT master_create_distributed_table('test_table_2', 'id', 'append'); SET citus.log_multi_join_order to TRUE; SET client_min_messages to DEBUG1; +SET citus.enable_repartition_joins TO on; --- Since we both have same amount of shards and they are colocated on the same node --- local join logic will be triggered. +-- when joining append tables we always get dual re-partition joins SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id; -- Add two shards placement of interval [8,10] to test_table_1 diff --git a/src/test/regress/sql/subquery_append.sql b/src/test/regress/sql/subquery_append.sql new file mode 100644 index 000000000..02f02e9f4 --- /dev/null +++ b/src/test/regress/sql/subquery_append.sql @@ -0,0 +1,89 @@ +CREATE SCHEMA subquery_append; +SET search_path TO subquery_append; + +CREATE TABLE append_table (key text, value int, extra int default 0); +CREATE INDEX ON append_table (key); + +SELECT create_distributed_table('append_table', 'key', 'append'); +SELECT 1 FROM master_create_empty_shard('append_table'); +SELECT 1 FROM master_create_empty_shard('append_table'); + +CREATE TABLE ref_table (value int); +CREATE INDEX ON ref_table (value); +SELECT create_reference_table('ref_table'); + +\COPY append_table (key,value) FROM STDIN WITH CSV +abc,234 +bcd,123 +bcd,234 +cde,345 +def,456 +efg,234 +\. + +\COPY append_table (key,value) FROM STDIN WITH CSV +abc,123 +efg,123 +hij,123 +hij,234 +ijk,1 +jkl,0 +\. + +\COPY ref_table FROM STDIN WITH CSV +123 +234 +345 +\. + +-- exercise some optimizer pushdown features with subqueries +SELECT count(*) FROM (SELECT random() FROM append_table) u; + +SELECT * FROM (SELECT DISTINCT key FROM append_table) sub ORDER BY 1 LIMIT 3; +SELECT DISTINCT key FROM (SELECT key FROM append_table) sub ORDER BY 1 LIMIT 3; + +SELECT key, max(v) FROM (SELECT key, value + 1 AS v FROM append_table) sub GROUP BY key ORDER BY 1,2 LIMIT 3; +SELECT v, max(key) FROM (SELECT key, value + 1 AS v FROM append_table) sub GROUP BY v ORDER BY 1,2 LIMIT 3; + +SELECT key, row_number() OVER (ORDER BY value) FROM (SELECT key, value, random() FROM append_table) sub ORDER BY 1,2 LIMIT 3; +SELECT key, row_number() OVER (ORDER BY value PARTITION BY key) FROM (SELECT key, value, random() FROM append_table) sub ORDER BY 1,2 LIMIT 3; + +SELECT key, row_number() OVER (ORDER BY value) FROM (SELECT key, value, random() FROM append_table) sub ORDER BY 1,2 LIMIT 3; +SELECT key, row_number() OVER (PARTITION BY key) FROM (SELECT key, value, random() FROM append_table) sub ORDER BY 1,2 LIMIT 3; + +-- try some joins in subqueries +SELECT key, count(*) FROM (SELECT *, random() FROM append_table a JOIN append_table b USING (key)) u GROUP BY key ORDER BY 1,2 LIMIT 3; +SELECT key, count(*) FROM (SELECT *, random() FROM append_table a JOIN ref_table b USING (value)) u GROUP BY key ORDER BY 1,2 LIMIT 3; +SELECT key, value FROM append_table a WHERE value IN (SELECT * FROM ref_table) ORDER BY 1,2; +SELECT key, value FROM append_table a WHERE key IN (SELECT key FROM append_table WHERE value > 100) ORDER BY 1,2; +SELECT key, value FROM append_table a WHERE value = (SELECT max(value) FROM ref_table) ORDER BY 1,2; +SELECT key, value FROM append_table a WHERE value = (SELECT max(value) FROM ref_table r WHERE a.value = r.value) ORDER BY 1,2; +SELECT key, (SELECT max(value) FROM ref_table r WHERE r.value = a.value) FROM append_table a ORDER BY 1,2; + +-- Test delete +BEGIN; +DELETE FROM append_table a WHERE a.value IN (SELECT s FROM generate_series(1,100) s); +SELECT count(*) FROM append_table; +DELETE FROM append_table a USING ref_table r WHERE a.value = r.value; +SELECT count(*) FROM append_table; +DELETE FROM append_table WHERE value < 2; +SELECT count(*) FROM append_table; +DELETE FROM append_table; +SELECT count(*) FROM append_table; +DELETE FROM append_table a USING append_table b WHERE a.key = b.key; +END; + +-- Test update +BEGIN; +UPDATE append_table a SET extra = 1 WHERE a.value IN (SELECT s FROM generate_series(1,100) s); +SELECT count(*) FROM append_table WHERE extra = 1; +UPDATE append_table a SET extra = 1 FROM ref_table r WHERE a.value = r.value; +SELECT count(*) FROM append_table WHERE extra = 1; +UPDATE append_table SET extra = 1 WHERE value < 2; +SELECT count(*) FROM append_table WHERE extra = 1; +UPDATE append_table SET extra = 1; +SELECT count(*) FROM append_table WHERE extra = 1; +UPDATE append_table a sET extra = 1 FROM append_table b WHERE a.key = b.key; +END; + +DROP SCHEMA subquery_append CASCADE;