diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index b1a16e699..b9531a4be 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -171,10 +171,6 @@ static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, TaskType taskType, bool modifyRequiresCoordinatorEvaluation, DeferredErrorMessage **planningError); -static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction, - Oid collation, - ShardInterval *firstInterval, - ShardInterval *secondInterval); static List * SqlTaskList(Job *job); static bool DependsOnHashPartitionJob(Job *job); static uint32 AnchorRangeTableId(List *rangeTableList); @@ -228,8 +224,6 @@ static List * MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex); static StringInfo ColumnNameArrayString(uint32 columnCount, uint64 generatingJobId); static StringInfo ColumnTypeArrayString(List *targetEntryList); -static bool CoPlacedShardIntervals(ShardInterval *firstInterval, - ShardInterval *secondInterval); static List * FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr); static List * FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr); @@ -2648,12 +2642,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, /* - * CoPartitionedTables checks if given two distributed tables have 1-to-1 shard - * placement matching. It first checks for the shard count, if tables don't have - * same amount shard then it returns false. Note that, if any table does not - * have any shard, it returns true. If two tables have same amount of shards, - * we check colocationIds for hash distributed tables and shardInterval's min - * max values for append and range distributed tables. + * CoPartitionedTables checks if given two distributed tables are co-located. */ bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) @@ -2666,38 +2655,6 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) CitusTableCacheEntry *firstTableCache = GetCitusTableCacheEntry(firstRelationId); CitusTableCacheEntry *secondTableCache = GetCitusTableCacheEntry(secondRelationId); - ShardInterval **sortedFirstIntervalArray = firstTableCache->sortedShardIntervalArray; - ShardInterval **sortedSecondIntervalArray = - secondTableCache->sortedShardIntervalArray; - uint32 firstListShardCount = firstTableCache->shardIntervalArrayLength; - uint32 secondListShardCount = secondTableCache->shardIntervalArrayLength; - FmgrInfo *comparisonFunction = firstTableCache->shardIntervalCompareFunction; - - /* reference tables are always & only copartitioned with reference tables */ - if (IsCitusTableTypeCacheEntry(firstTableCache, CITUS_TABLE_WITH_NO_DIST_KEY) && - IsCitusTableTypeCacheEntry(secondTableCache, CITUS_TABLE_WITH_NO_DIST_KEY)) - { - return true; - } - else if (IsCitusTableTypeCacheEntry(firstTableCache, CITUS_TABLE_WITH_NO_DIST_KEY) || - IsCitusTableTypeCacheEntry(secondTableCache, CITUS_TABLE_WITH_NO_DIST_KEY)) - { - return false; - } - - if (firstListShardCount != secondListShardCount) - { - return false; - } - - /* if there are not any shards just return true */ - if (firstListShardCount == 0) - { - return true; - } - - Assert(comparisonFunction != NULL); - /* * Check if the tables have the same colocation ID - if so, we know * they're colocated. @@ -2708,130 +2665,7 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) return true; } - /* - * For hash distributed tables two tables are accepted as colocated only if - * they have the same colocationId. Otherwise they may have same minimum and - * maximum values for each shard interval, yet hash function may result with - * different values for the same value. int vs bigint can be given as an - * example. - */ - if (IsCitusTableTypeCacheEntry(firstTableCache, HASH_DISTRIBUTED) || - IsCitusTableTypeCacheEntry(secondTableCache, HASH_DISTRIBUTED)) - { - return false; - } - - - /* - * Don't compare unequal types - */ - Oid collation = firstTableCache->partitionColumn->varcollid; - if (firstTableCache->partitionColumn->vartype != - secondTableCache->partitionColumn->vartype || - collation != secondTableCache->partitionColumn->varcollid) - { - return false; - } - - - /* - * If not known to be colocated check if the remaining shards are - * anyway. Do so by comparing the shard interval arrays that are sorted on - * interval minimum values. Then it compares every shard interval in order - * and if any pair of shard intervals are not equal or they are not located - * in the same node it returns false. - */ - for (uint32 intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++) - { - ShardInterval *firstInterval = sortedFirstIntervalArray[intervalIndex]; - ShardInterval *secondInterval = sortedSecondIntervalArray[intervalIndex]; - - bool shardIntervalsEqual = ShardIntervalsEqual(comparisonFunction, - collation, - firstInterval, - secondInterval); - if (!shardIntervalsEqual || !CoPlacedShardIntervals(firstInterval, - secondInterval)) - { - return false; - } - } - - return true; -} - - -/* - * CoPlacedShardIntervals checks whether the given intervals located in the same nodes. - */ -static bool -CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval) -{ - List *firstShardPlacementList = ShardPlacementListWithoutOrphanedPlacements( - firstInterval->shardId); - List *secondShardPlacementList = ShardPlacementListWithoutOrphanedPlacements( - secondInterval->shardId); - ListCell *firstShardPlacementCell = NULL; - ListCell *secondShardPlacementCell = NULL; - - /* Shards must have same number of placements */ - if (list_length(firstShardPlacementList) != list_length(secondShardPlacementList)) - { - return false; - } - - firstShardPlacementList = SortList(firstShardPlacementList, CompareShardPlacements); - secondShardPlacementList = SortList(secondShardPlacementList, CompareShardPlacements); - - forboth(firstShardPlacementCell, firstShardPlacementList, secondShardPlacementCell, - secondShardPlacementList) - { - ShardPlacement *firstShardPlacement = (ShardPlacement *) lfirst( - firstShardPlacementCell); - ShardPlacement *secondShardPlacement = (ShardPlacement *) lfirst( - secondShardPlacementCell); - - if (firstShardPlacement->nodeId != secondShardPlacement->nodeId) - { - return false; - } - } - - return true; -} - - -/* - * ShardIntervalsEqual checks if given shard intervals have equal min/max values. - */ -static bool -ShardIntervalsEqual(FmgrInfo *comparisonFunction, Oid collation, - ShardInterval *firstInterval, ShardInterval *secondInterval) -{ - bool shardIntervalsEqual = false; - - Datum firstMin = firstInterval->minValue; - Datum firstMax = firstInterval->maxValue; - Datum secondMin = secondInterval->minValue; - Datum secondMax = secondInterval->maxValue; - - if (firstInterval->minValueExists && firstInterval->maxValueExists && - secondInterval->minValueExists && secondInterval->maxValueExists) - { - Datum minDatum = FunctionCall2Coll(comparisonFunction, collation, firstMin, - secondMin); - Datum maxDatum = FunctionCall2Coll(comparisonFunction, collation, firstMax, - secondMax); - int firstComparison = DatumGetInt32(minDatum); - int secondComparison = DatumGetInt32(maxDatum); - - if (firstComparison == 0 && secondComparison == 0) - { - shardIntervalsEqual = true; - } - } - - return shardIntervalsEqual; + return false; } diff --git a/src/test/regress/expected/ignoring_orphaned_shards.out b/src/test/regress/expected/ignoring_orphaned_shards.out index a73dc8906..d6e4916af 100644 --- a/src/test/regress/expected/ignoring_orphaned_shards.out +++ b/src/test/regress/expected/ignoring_orphaned_shards.out @@ -324,6 +324,9 @@ SELECT create_distributed_table('range2', 'id', 'range'); (1 row) CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}'); +-- Mark tables co-located +UPDATE pg_dist_partition SET colocationid = 30001 +WHERE logicalrelid = 'range1'::regclass OR logicalrelid = 'range2'::regclass; -- Move shard placement and DON'T clean it up, now range1 and range2 are -- colocated, but only range2 has an orphaned shard. SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); @@ -339,7 +342,7 @@ SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid 92448600 | 1 | 57637 (2 rows) --- Make sure that tables are detected as colocated +-- Make sure co-located join works SELECT * FROM range1 JOIN range2 ON range1.id = range2.id; id | id --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index d0eaa1a4d..405962dbc 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -169,7 +169,7 @@ LOG: join order: [ "orders" ][ dual partition join "customer_hash" ] EXPLAIN (COSTS OFF) SELECT count(*) FROM orders_hash, customer_append WHERE c_custkey = o_custkey; -LOG: join order: [ "orders_hash" ][ single range partition join "customer_append" ] +LOG: join order: [ "orders_hash" ][ dual partition join "customer_append" ] QUERY PLAN --------------------------------------------------------------------- Aggregate diff --git a/src/test/regress/expected/multi_join_order_tpch_repartition.out b/src/test/regress/expected/multi_join_order_tpch_repartition.out index 959673bd6..e26a4bfec 100644 --- a/src/test/regress/expected/multi_join_order_tpch_repartition.out +++ b/src/test/regress/expected/multi_join_order_tpch_repartition.out @@ -53,7 +53,7 @@ GROUP BY ORDER BY revenue DESC, o_orderdate; -LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single range partition join "customer_append" ] +LOG: join order: [ "orders" ][ local partition join "lineitem" ][ dual partition join "customer_append" ] QUERY PLAN --------------------------------------------------------------------- Sort @@ -97,16 +97,14 @@ GROUP BY c_comment ORDER BY revenue DESC; -LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single range partition join "customer_append" ][ reference join "nation" ] - QUERY PLAN +LOG: join order: [ "orders" ][ local partition join "lineitem" ][ dual partition join "customer_append" ][ reference join "nation" ] + QUERY PLAN --------------------------------------------------------------------- Sort - Sort Key: (sum(remote_scan.revenue)) DESC - -> HashAggregate - Group Key: remote_scan.c_custkey, remote_scan.c_name, remote_scan.c_acctbal, remote_scan.c_phone, remote_scan.n_name, remote_scan.c_address, remote_scan.c_comment - -> Custom Scan (Citus Adaptive) - explain statements for distributed queries are not enabled -(6 rows) + Sort Key: remote_scan.revenue DESC + -> Custom Scan (Citus Adaptive) + explain statements for distributed queries are not enabled +(4 rows) -- Query #19 from the TPC-H decision support benchmark (modified) EXPLAIN (COSTS OFF) @@ -139,7 +137,7 @@ WHERE AND l_shipmode in ('AIR', 'AIR REG', 'TRUCK') AND l_shipinstruct = 'DELIVER IN PERSON' ); -LOG: join order: [ "lineitem" ][ single range partition join "part_append" ] +LOG: join order: [ "lineitem" ][ dual partition join "part_append" ] QUERY PLAN --------------------------------------------------------------------- Aggregate @@ -159,7 +157,7 @@ WHERE c_custkey = o_custkey GROUP BY l_partkey; -LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single range partition join "part_append" ][ single range partition join "customer_append" ] +LOG: join order: [ "lineitem" ][ local partition join "orders" ][ dual partition join "part_append" ][ dual partition join "customer_append" ] QUERY PLAN --------------------------------------------------------------------- HashAggregate diff --git a/src/test/regress/expected/multi_repartition_join_planning.out b/src/test/regress/expected/multi_repartition_join_planning.out index e1bcda671..13f569a4e 100644 --- a/src/test/regress/expected/multi_repartition_join_planning.out +++ b/src/test/regress/expected/multi_repartition_join_planning.out @@ -66,24 +66,62 @@ ORDER BY DEBUG: Router planner does not support append-partitioned tables. DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647] DEBUG: join prunable for intervals [0,2147483647] and [-2147483648,-1] -DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [6001,7000] and [1,1000] +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 3 -DEBUG: pruning merge fetch taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 3 +DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 6 -DEBUG: join prunable for intervals [1,1000] and [1001,2000] -DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] -DEBUG: join prunable for intervals [6001,7000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] -DEBUG: pruning merge fetch taskId 1 -DETAIL: Creating dependency on merge taskId 5 -DEBUG: pruning merge fetch taskId 3 -DETAIL: Creating dependency on merge taskId 8 DEBUG: pruning merge fetch taskId 5 -DETAIL: Creating dependency on merge taskId 11 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 13 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 18 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 23 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 28 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 16 l_partkey | o_orderkey | count --------------------------------------------------------------------- 18 | 12005 | 1 diff --git a/src/test/regress/expected/multi_repartition_join_pruning.out b/src/test/regress/expected/multi_repartition_join_pruning.out index 733a7205e..285263eea 100644 --- a/src/test/regress/expected/multi_repartition_join_pruning.out +++ b/src/test/regress/expected/multi_repartition_join_pruning.out @@ -14,28 +14,47 @@ FROM WHERE o_custkey = c_custkey; DEBUG: Router planner does not support append-partitioned tables. -DEBUG: join prunable for intervals [1,1000] and [1001,2000] -DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] -DEBUG: join prunable for intervals [6001,7000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 3 -DEBUG: pruning merge fetch taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 6 DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 16 QUERY PLAN --------------------------------------------------------------------- Aggregate -> Custom Scan (Citus Adaptive) - Task Count: 3 + Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob Map Task Count: 2 - Merge Task Count: 3 -(7 rows) + Merge Task Count: 4 + -> MapMergeJob + Map Task Count: 3 + Merge Task Count: 4 +(10 rows) SELECT count(*) @@ -44,18 +63,34 @@ FROM WHERE o_custkey = c_custkey; DEBUG: Router planner does not support append-partitioned tables. -DEBUG: join prunable for intervals [1,1000] and [1001,2000] -DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] -DEBUG: join prunable for intervals [6001,7000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 3 -DEBUG: pruning merge fetch taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 6 DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 16 count --------------------------------------------------------------------- 2985 @@ -72,28 +107,47 @@ WHERE o_custkey = c_custkey AND o_orderkey < 0; DEBUG: Router planner does not support append-partitioned tables. -DEBUG: join prunable for intervals [1,1000] and [1001,2000] -DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] -DEBUG: join prunable for intervals [6001,7000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 3 -DEBUG: pruning merge fetch taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 6 DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 16 QUERY PLAN --------------------------------------------------------------------- Aggregate -> Custom Scan (Citus Adaptive) - Task Count: 3 + Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob Map Task Count: 2 - Merge Task Count: 3 -(7 rows) + Merge Task Count: 4 + -> MapMergeJob + Map Task Count: 3 + Merge Task Count: 4 +(10 rows) SELECT count(*) @@ -103,18 +157,34 @@ WHERE o_custkey = c_custkey AND o_orderkey < 0 AND o_orderkey > 0; DEBUG: Router planner does not support append-partitioned tables. -DEBUG: join prunable for intervals [1,1000] and [1001,2000] -DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] -DEBUG: join prunable for intervals [6001,7000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 3 -DEBUG: pruning merge fetch taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 6 DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 16 count --------------------------------------------------------------------- 0 @@ -139,8 +209,11 @@ DEBUG: Router planner does not support append-partitioned tables. Tasks Shown: None, not supported for re-partition queries -> MapMergeJob Map Task Count: 2 - Merge Task Count: 3 -(7 rows) + Merge Task Count: 4 + -> MapMergeJob + Map Task Count: 0 + Merge Task Count: 0 +(10 rows) SELECT count(*) @@ -359,7 +432,10 @@ DEBUG: Router planner does not support append-partitioned tables. -> MapMergeJob Map Task Count: 0 Merge Task Count: 0 -(6 rows) + -> MapMergeJob + Map Task Count: 0 + Merge Task Count: 0 +(9 rows) -- execute once, to verify that's handled SELECT @@ -389,7 +465,10 @@ DEBUG: Router planner does not support append-partitioned tables. -> MapMergeJob Map Task Count: 0 Merge Task Count: 0 -(6 rows) + -> MapMergeJob + Map Task Count: 0 + Merge Task Count: 0 +(9 rows) EXPLAIN (COSTS OFF) SELECT diff --git a/src/test/regress/expected/multi_repartition_join_ref.out b/src/test/regress/expected/multi_repartition_join_ref.out index 9d14058ed..4f2a4da7b 100644 --- a/src/test/regress/expected/multi_repartition_join_ref.out +++ b/src/test/regress/expected/multi_repartition_join_ref.out @@ -13,7 +13,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -41,7 +41,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -69,7 +69,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -96,7 +96,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ single range partition join "part_append" ][ cartesian product reference join "supplier" ] +LOG: join order: [ "lineitem" ][ dual partition join "part_append" ][ cartesian product reference join "supplier" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -124,7 +124,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -152,7 +152,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -180,7 +180,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -208,7 +208,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ single range partition join "part_append" ][ reference join "supplier" ] +LOG: join order: [ "lineitem" ][ dual partition join "part_append" ][ reference join "supplier" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_repartition_join_task_assignment.out b/src/test/regress/expected/multi_repartition_join_task_assignment.out index c94012a27..a7426a0f9 100644 --- a/src/test/regress/expected/multi_repartition_join_task_assignment.out +++ b/src/test/regress/expected/multi_repartition_join_task_assignment.out @@ -24,18 +24,38 @@ DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: no sharding pruning constraints on customer_append found DEBUG: shard count after pruning for customer_append: 3 -DEBUG: join prunable for intervals [1,1000] and [1001,2000] -DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] -DEBUG: join prunable for intervals [6001,7000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 3 -DEBUG: pruning merge fetch taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 6 DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 16 +DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx @@ -56,24 +76,44 @@ WHERE o_custkey = c_custkey AND o_orderkey = l_orderkey; DEBUG: Router planner does not support append-partitioned tables. +DEBUG: no sharding pruning constraints on customer_append found +DEBUG: shard count after pruning for customer_append: 3 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx DEBUG: no sharding pruning constraints on lineitem found DEBUG: shard count after pruning for lineitem: 2 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx -DEBUG: no sharding pruning constraints on customer_append found -DEBUG: shard count after pruning for customer_append: 3 -DEBUG: join prunable for intervals [1,1000] and [1001,2000] -DEBUG: join prunable for intervals [1,1000] and [6001,7000] -DEBUG: join prunable for intervals [1001,2000] and [1,1000] -DEBUG: join prunable for intervals [1001,2000] and [6001,7000] -DEBUG: join prunable for intervals [6001,7000] and [1,1000] -DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 2 DETAIL: Creating dependency on merge taskId 3 -DEBUG: pruning merge fetch taskId 3 -DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 8 DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 8 DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 16 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index e79fdd5c7..4342f6dea 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -1931,26 +1931,34 @@ DEBUG: query has a single distribution column value: 2 SELECT * FROM articles_hash ar join authors_range au on (ar.author_id = au.id) WHERE ar.author_id = 3; DEBUG: found no worker with all shard placements -DEBUG: join prunable for intervals [1,10] and [11,20] -DEBUG: join prunable for intervals [1,10] and [21,30] -DEBUG: join prunable for intervals [1,10] and [31,40] -DEBUG: join prunable for intervals [11,20] and [1,10] -DEBUG: join prunable for intervals [11,20] and [21,30] -DEBUG: join prunable for intervals [11,20] and [31,40] -DEBUG: join prunable for intervals [21,30] and [1,10] -DEBUG: join prunable for intervals [21,30] and [11,20] -DEBUG: join prunable for intervals [21,30] and [31,40] -DEBUG: join prunable for intervals [31,40] and [1,10] -DEBUG: join prunable for intervals [31,40] and [11,20] -DEBUG: join prunable for intervals [31,40] and [21,30] +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 2 -DEBUG: pruning merge fetch taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 4 DEBUG: pruning merge fetch taskId 5 -DETAIL: Creating dependency on merge taskId 6 +DETAIL: Creating dependency on merge taskId 10 DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 10 DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 20 id | author_id | title | word_count | name | id --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/expected/multi_subquery.out b/src/test/regress/expected/multi_subquery.out index 2261e0f02..f4c4ccc21 100644 --- a/src/test/regress/expected/multi_subquery.out +++ b/src/test/regress/expected/multi_subquery.out @@ -823,8 +823,7 @@ FROM l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; -ERROR: cannot push down this subquery -DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning +ERROR: shard counts of co-located tables do not match -- Check that we can prune shards in subqueries with VARCHAR partition columns CREATE TABLE subquery_pruning_varchar_test_table ( diff --git a/src/test/regress/expected/non_colocated_join_order.out b/src/test/regress/expected/non_colocated_join_order.out index bf3ca55ae..5d646633d 100644 --- a/src/test/regress/expected/non_colocated_join_order.out +++ b/src/test/regress/expected/non_colocated_join_order.out @@ -22,10 +22,10 @@ SELECT master_create_distributed_table('test_table_2', 'id', 'append'); \copy test_table_2 FROM STDIN DELIMITER ',' SET citus.log_multi_join_order to TRUE; SET client_min_messages to DEBUG1; --- Since we both have same amount of shards and they are colocated on the same node --- local join logic will be triggered. +SET citus.enable_repartition_joins TO on; +-- when joining append tables we always get dual re-partition joins SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id; -LOG: join order: [ "test_table_1" ][ local partition join "test_table_2" ] +LOG: join order: [ "test_table_1" ][ dual partition join "test_table_2" ] count --------------------------------------------------------------------- 6 diff --git a/src/test/regress/input/multi_behavioral_analytics_create_table_superuser.source b/src/test/regress/input/multi_behavioral_analytics_create_table_superuser.source index e42f31005..d1f308db9 100644 --- a/src/test/regress/input/multi_behavioral_analytics_create_table_superuser.source +++ b/src/test/regress/input/multi_behavioral_analytics_create_table_superuser.source @@ -141,7 +141,7 @@ CREATE TABLE events ( event_type character varying(255), event_time bigint ); -SELECT master_create_distributed_table('events', 'composite_id', 'range'); +SELECT create_distributed_table('events', 'composite_id', 'range'); SELECT master_create_empty_shard('events') AS new_shard_id \gset @@ -181,7 +181,11 @@ CREATE TABLE users ( composite_id user_composite_type, lastseen bigint ); -SELECT master_create_distributed_table('users', 'composite_id', 'range'); +SELECT create_distributed_table('users', 'composite_id', 'range'); + +-- we will guarantee co-locatedness for these tables +UPDATE pg_dist_partition SET colocationid = 20001 +WHERE logicalrelid = 'events'::regclass OR logicalrelid = 'users'::regclass; SELECT master_create_empty_shard('users') AS new_shard_id \gset @@ -228,7 +232,7 @@ CREATE TABLE lineitem_subquery ( l_shipmode char(10) not null, l_comment varchar(44) not null, PRIMARY KEY(l_orderkey, l_linenumber) ); -SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range'); +SELECT create_distributed_table('lineitem_subquery', 'l_orderkey', 'range'); CREATE TABLE orders_subquery ( o_orderkey bigint not null, @@ -241,7 +245,11 @@ CREATE TABLE orders_subquery ( o_shippriority integer not null, o_comment varchar(79) not null, PRIMARY KEY(o_orderkey) ); -SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range'); +SELECT create_distributed_table('orders_subquery', 'o_orderkey', 'range'); + +-- we will guarantee co-locatedness for these tabes +UPDATE pg_dist_partition SET colocationid = 20002 +WHERE logicalrelid = 'orders_subquery'::regclass OR logicalrelid = 'lineitem_subquery'::regclass; SET citus.enable_router_execution TO 'false'; diff --git a/src/test/regress/output/multi_behavioral_analytics_create_table_superuser.source b/src/test/regress/output/multi_behavioral_analytics_create_table_superuser.source index 12e94074e..3813cef17 100644 --- a/src/test/regress/output/multi_behavioral_analytics_create_table_superuser.source +++ b/src/test/regress/output/multi_behavioral_analytics_create_table_superuser.source @@ -7,9 +7,9 @@ SELECT run_command_on_master_and_workers($f$ IMMUTABLE RETURNS NULL ON NULL INPUT; $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -20,9 +20,9 @@ SELECT run_command_on_master_and_workers($f$ IMMUTABLE RETURNS NULL ON NULL INPUT; $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -33,9 +33,9 @@ SELECT run_command_on_master_and_workers($f$ IMMUTABLE RETURNS NULL ON NULL INPUT; $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -45,9 +45,9 @@ SELECT run_command_on_master_and_workers($f$ AS 'record_eq' IMMUTABLE; $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -58,9 +58,9 @@ SELECT run_command_on_master_and_workers($f$ IMMUTABLE RETURNS NULL ON NULL INPUT; $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -71,9 +71,9 @@ SELECT run_command_on_master_and_workers($f$ IMMUTABLE RETURNS NULL ON NULL INPUT; $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -84,9 +84,9 @@ SELECT run_command_on_master_and_workers($f$ PROCEDURE = gt_user_composite_type_function ); $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -97,9 +97,9 @@ SELECT run_command_on_master_and_workers($f$ PROCEDURE = ge_user_composite_type_function ); $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) -- ... use that function to create a custom equality operator... @@ -117,9 +117,9 @@ SELECT run_command_on_master_and_workers($f$ hashes ); $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -130,9 +130,9 @@ SELECT run_command_on_master_and_workers($f$ PROCEDURE = le_user_composite_type_function ); $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -143,9 +143,9 @@ SELECT run_command_on_master_and_workers($f$ PROCEDURE = lt_user_composite_type_function ); $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) -- ... and create a custom operator family for hash indexes... @@ -153,9 +153,9 @@ SELECT run_command_on_master_and_workers($f$ CREATE OPERATOR FAMILY cats_2_op_fam USING hash; $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) -- We need to define two different operator classes for the composite types @@ -172,9 +172,9 @@ SELECT run_command_on_master_and_workers($f$ FUNCTION 1 cmp_user_composite_type_function(user_composite_type, user_composite_type); $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) SELECT run_command_on_master_and_workers($f$ @@ -184,9 +184,9 @@ SELECT run_command_on_master_and_workers($f$ OPERATOR 1 = (user_composite_type, user_composite_type), FUNCTION 1 test_composite_type_hash(user_composite_type); $f$); - run_command_on_master_and_workers ------------------------------------ - + run_command_on_master_and_workers +--------------------------------------------------------------------- + (1 row) CREATE TABLE events ( @@ -195,10 +195,10 @@ CREATE TABLE events ( event_type character varying(255), event_time bigint ); -SELECT master_create_distributed_table('events', 'composite_id', 'range'); - master_create_distributed_table ---------------------------------- - +SELECT create_distributed_table('events', 'composite_id', 'range'); + create_distributed_table +--------------------------------------------------------------------- + (1 row) SELECT master_create_empty_shard('events') AS new_shard_id @@ -222,12 +222,15 @@ CREATE TABLE users ( composite_id user_composite_type, lastseen bigint ); -SELECT master_create_distributed_table('users', 'composite_id', 'range'); - master_create_distributed_table ---------------------------------- - +SELECT create_distributed_table('users', 'composite_id', 'range'); + create_distributed_table +--------------------------------------------------------------------- + (1 row) +-- we will guarantee co-locatedness for these tables +UPDATE pg_dist_partition SET colocationid = 20001 +WHERE logicalrelid = 'events'::regclass OR logicalrelid = 'users'::regclass; SELECT master_create_empty_shard('users') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' @@ -264,10 +267,10 @@ CREATE TABLE lineitem_subquery ( l_shipmode char(10) not null, l_comment varchar(44) not null, PRIMARY KEY(l_orderkey, l_linenumber) ); -SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range'); - master_create_distributed_table ---------------------------------- - +SELECT create_distributed_table('lineitem_subquery', 'l_orderkey', 'range'); + create_distributed_table +--------------------------------------------------------------------- + (1 row) CREATE TABLE orders_subquery ( @@ -281,12 +284,15 @@ CREATE TABLE orders_subquery ( o_shippriority integer not null, o_comment varchar(79) not null, PRIMARY KEY(o_orderkey) ); -SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range'); - master_create_distributed_table ---------------------------------- - +SELECT create_distributed_table('orders_subquery', 'o_orderkey', 'range'); + create_distributed_table +--------------------------------------------------------------------- + (1 row) +-- we will guarantee co-locatedness for these tabes +UPDATE pg_dist_partition SET colocationid = 20002 +WHERE logicalrelid = 'orders_subquery'::regclass OR logicalrelid = 'lineitem_subquery'::regclass; SET citus.enable_router_execution TO 'false'; -- Check that we don't crash if there are not any shards. SELECT @@ -302,9 +308,9 @@ FROM l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; - avg ------ - + avg +--------------------------------------------------------------------- + (1 row) -- Load data into tables. diff --git a/src/test/regress/sql/ignoring_orphaned_shards.sql b/src/test/regress/sql/ignoring_orphaned_shards.sql index a3aa74db9..2b56269ef 100644 --- a/src/test/regress/sql/ignoring_orphaned_shards.sql +++ b/src/test/regress/sql/ignoring_orphaned_shards.sql @@ -128,12 +128,16 @@ CREATE TABLE range2(id int); SELECT create_distributed_table('range2', 'id', 'range'); CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}'); +-- Mark tables co-located +UPDATE pg_dist_partition SET colocationid = 30001 +WHERE logicalrelid = 'range1'::regclass OR logicalrelid = 'range2'::regclass; + -- Move shard placement and DON'T clean it up, now range1 and range2 are -- colocated, but only range2 has an orphaned shard. SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid; --- Make sure that tables are detected as colocated +-- Make sure co-located join works SELECT * FROM range1 JOIN range2 ON range1.id = range2.id; -- Make sure we can create a foreign key on community edition, because diff --git a/src/test/regress/sql/non_colocated_join_order.sql b/src/test/regress/sql/non_colocated_join_order.sql index 631d1f3aa..4c77f68c2 100644 --- a/src/test/regress/sql/non_colocated_join_order.sql +++ b/src/test/regress/sql/non_colocated_join_order.sql @@ -36,9 +36,9 @@ SELECT master_create_distributed_table('test_table_2', 'id', 'append'); SET citus.log_multi_join_order to TRUE; SET client_min_messages to DEBUG1; +SET citus.enable_repartition_joins TO on; --- Since we both have same amount of shards and they are colocated on the same node --- local join logic will be triggered. +-- when joining append tables we always get dual re-partition joins SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id; -- Add two shards placement of interval [8,10] to test_table_1