diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 50fbbe976..708d371c6 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -194,7 +194,8 @@ static StringInfo MergeTableQueryString(uint32 taskIdIndex, List *targetEntryLis static StringInfo IntermediateTableQueryString(uint64 jobId, uint32 taskIdIndex, Query *reduceQuery); static uint32 FinalTargetEntryCount(List *targetEntryList); - +static bool CoPlacedShardIntervals(ShardInterval *firstInterval, + ShardInterval *secondInterval); /* * CreatePhysicalDistributedPlan is the entry point for physical plan generation. The @@ -2320,7 +2321,8 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) * If not known to be colocated check if the remaining shards are * anyway. Do so by comparing the shard interval arrays that are sorted on * interval minimum values. Then it compares every shard interval in order - * and if any pair of shard intervals are not equal it returns false. + * and if any pair of shard intervals are not equal or they are not located + * in the same node it returns false. */ for (intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++) { @@ -2330,7 +2332,8 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) bool shardIntervalsEqual = ShardIntervalsEqual(comparisonFunction, firstInterval, secondInterval); - if (!shardIntervalsEqual) + if (!shardIntervalsEqual || !CoPlacedShardIntervals(firstInterval, + secondInterval)) { coPartitionedTables = false; break; @@ -2341,6 +2344,45 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) } +/* + * CoPlacedShardIntervals checks whether the given intervals located in the same nodes. + */ +static bool +CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval) +{ + List *firstShardPlacementList = ShardPlacementList(firstInterval->shardId); + List *secondShardPlacementList = ShardPlacementList(secondInterval->shardId); + ListCell *firstShardPlacementCell = NULL; + ListCell *secondShardPlacementCell = NULL; + + /* Shards must have same number of placements */ + if (list_length(firstShardPlacementList) != list_length(secondShardPlacementList)) + { + return false; + } + + firstShardPlacementList = SortList(firstShardPlacementList, CompareShardPlacements); + secondShardPlacementList = SortList(secondShardPlacementList, CompareShardPlacements); + + forboth(firstShardPlacementCell, firstShardPlacementList, secondShardPlacementCell, + secondShardPlacementList) + { + ShardPlacement *firstShardPlacement = (ShardPlacement *) lfirst( + firstShardPlacementCell); + ShardPlacement *secondShardPlacement = (ShardPlacement *) lfirst( + secondShardPlacementCell); + + if (strcmp(firstShardPlacement->nodeName, secondShardPlacement->nodeName) != 0 || + firstShardPlacement->nodePort != secondShardPlacement->nodePort) + { + return false; + } + } + + return true; +} + + /* * ShardIntervalsEqual checks if given shard intervals have equal min/max values. */ diff --git a/src/test/regress/expected/non_colocated_join_order.out b/src/test/regress/expected/non_colocated_join_order.out new file mode 100644 index 000000000..a3441d241 --- /dev/null +++ b/src/test/regress/expected/non_colocated_join_order.out @@ -0,0 +1,55 @@ +-- +-- NON_COLOCATED_JOIN_ORDER +-- +-- Tests to check placements of shards must be equal to choose local join logic. +CREATE TABLE test_table_1(id int, value_1 int); +SELECT master_create_distributed_table('test_table_1', 'id', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SET citus.large_table_shard_count to 1; +\copy test_table_1 FROM STDIN DELIMITER ',' +\copy test_table_1 FROM STDIN DELIMITER ',' +CREATE TABLE test_table_2(id int, value_1 int); +SELECT master_create_distributed_table('test_table_2', 'id', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +\copy test_table_2 FROM STDIN DELIMITER ',' +\copy test_table_2 FROM STDIN DELIMITER ',' +SET citus.log_multi_join_order to TRUE; +SET client_min_messages to DEBUG1; +-- Since we both have same amount of shards and they are colocated on the same node +-- local join logic will be triggered. +SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id; +LOG: join order: [ "test_table_1" ][ local partition join "test_table_2" ] + count +------- + 6 +(1 row) + +-- Add two shards placement of interval [8,10] to test_table_1 +SET citus.shard_replication_factor to 2; +\copy test_table_1 FROM STDIN DELIMITER ',' +-- Add two shards placement of interval [8,10] to test_table_2 +SET citus.shard_replication_factor to 1; +\copy test_table_2 FROM STDIN DELIMITER ',' +-- Although shard interval of relation are same, since they have different amount of placements +-- for interval [8,10] repartition join logic will be triggered. +SET citus.enable_repartition_joins to ON; +SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id; +LOG: join order: [ "test_table_1" ][ single partition join "test_table_2" ] +DEBUG: cannot use real time executor with repartition jobs +HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. + count +------- + 9 +(1 row) + +SET client_min_messages TO default; +DROP TABLE test_table_1; +DROP TABLE test_table_2; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 461d773dc..25afd63df 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -45,7 +45,7 @@ test: multi_partitioning_utils multi_partitioning # ---------- test: subquery_basics subquery_local_tables subquery_executors subquery_and_cte set_operations set_operation_and_local_tables test: subqueries_deep subquery_view subquery_partitioning subquery_complex_target_list subqueries_not_supported subquery_in_where -test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins +test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins non_colocated_join_order test: subquery_prepared_statements # ---------- diff --git a/src/test/regress/sql/non_colocated_join_order.sql b/src/test/regress/sql/non_colocated_join_order.sql new file mode 100644 index 000000000..345cfd672 --- /dev/null +++ b/src/test/regress/sql/non_colocated_join_order.sql @@ -0,0 +1,71 @@ +-- +-- NON_COLOCATED_JOIN_ORDER +-- + +-- Tests to check placements of shards must be equal to choose local join logic. + +CREATE TABLE test_table_1(id int, value_1 int); +SELECT master_create_distributed_table('test_table_1', 'id', 'append'); +SET citus.large_table_shard_count to 1; + +\copy test_table_1 FROM STDIN DELIMITER ',' +1,2 +2,3 +3,4 +\. + +\copy test_table_1 FROM STDIN DELIMITER ',' +5,2 +6,3 +7,4 +\. + +CREATE TABLE test_table_2(id int, value_1 int); +SELECT master_create_distributed_table('test_table_2', 'id', 'append'); + +\copy test_table_2 FROM STDIN DELIMITER ',' +1,2 +2,3 +3,4 +\. + +\copy test_table_2 FROM STDIN DELIMITER ',' +5,2 +6,3 +7,4 +\. + +SET citus.log_multi_join_order to TRUE; +SET client_min_messages to DEBUG1; + +-- Since we both have same amount of shards and they are colocated on the same node +-- local join logic will be triggered. +SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id; + +-- Add two shards placement of interval [8,10] to test_table_1 +SET citus.shard_replication_factor to 2; + +\copy test_table_1 FROM STDIN DELIMITER ',' +8,2 +9,3 +10,4 +\. + +-- Add two shards placement of interval [8,10] to test_table_2 +SET citus.shard_replication_factor to 1; + +\copy test_table_2 FROM STDIN DELIMITER ',' +8,2 +9,3 +10,4 +\. + +-- Although shard interval of relation are same, since they have different amount of placements +-- for interval [8,10] repartition join logic will be triggered. +SET citus.enable_repartition_joins to ON; +SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id; + +SET client_min_messages TO default; + +DROP TABLE test_table_1; +DROP TABLE test_table_2;