Add co-placement check to CoPartition function

pull/2103/head
velioglu 2018-04-12 14:16:48 +03:00
parent b4c9e2c1ea
commit 1b92812be2
4 changed files with 172 additions and 4 deletions

View File

@ -194,7 +194,8 @@ static StringInfo MergeTableQueryString(uint32 taskIdIndex, List *targetEntryLis
static StringInfo IntermediateTableQueryString(uint64 jobId, uint32 taskIdIndex, static StringInfo IntermediateTableQueryString(uint64 jobId, uint32 taskIdIndex,
Query *reduceQuery); Query *reduceQuery);
static uint32 FinalTargetEntryCount(List *targetEntryList); static uint32 FinalTargetEntryCount(List *targetEntryList);
static bool CoPlacedShardIntervals(ShardInterval *firstInterval,
ShardInterval *secondInterval);
/* /*
* CreatePhysicalDistributedPlan is the entry point for physical plan generation. The * CreatePhysicalDistributedPlan is the entry point for physical plan generation. The
@ -2320,7 +2321,8 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
* If not known to be colocated check if the remaining shards are * If not known to be colocated check if the remaining shards are
* anyway. Do so by comparing the shard interval arrays that are sorted on * anyway. Do so by comparing the shard interval arrays that are sorted on
* interval minimum values. Then it compares every shard interval in order * interval minimum values. Then it compares every shard interval in order
* and if any pair of shard intervals are not equal it returns false. * and if any pair of shard intervals are not equal or they are not located
* in the same node it returns false.
*/ */
for (intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++) for (intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++)
{ {
@ -2330,7 +2332,8 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
bool shardIntervalsEqual = ShardIntervalsEqual(comparisonFunction, bool shardIntervalsEqual = ShardIntervalsEqual(comparisonFunction,
firstInterval, firstInterval,
secondInterval); secondInterval);
if (!shardIntervalsEqual) if (!shardIntervalsEqual || !CoPlacedShardIntervals(firstInterval,
secondInterval))
{ {
coPartitionedTables = false; coPartitionedTables = false;
break; break;
@ -2341,6 +2344,45 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
} }
/*
* CoPlacedShardIntervals checks whether the given intervals located in the same nodes.
*/
static bool
CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval)
{
List *firstShardPlacementList = ShardPlacementList(firstInterval->shardId);
List *secondShardPlacementList = ShardPlacementList(secondInterval->shardId);
ListCell *firstShardPlacementCell = NULL;
ListCell *secondShardPlacementCell = NULL;
/* Shards must have same number of placements */
if (list_length(firstShardPlacementList) != list_length(secondShardPlacementList))
{
return false;
}
firstShardPlacementList = SortList(firstShardPlacementList, CompareShardPlacements);
secondShardPlacementList = SortList(secondShardPlacementList, CompareShardPlacements);
forboth(firstShardPlacementCell, firstShardPlacementList, secondShardPlacementCell,
secondShardPlacementList)
{
ShardPlacement *firstShardPlacement = (ShardPlacement *) lfirst(
firstShardPlacementCell);
ShardPlacement *secondShardPlacement = (ShardPlacement *) lfirst(
secondShardPlacementCell);
if (strcmp(firstShardPlacement->nodeName, secondShardPlacement->nodeName) != 0 ||
firstShardPlacement->nodePort != secondShardPlacement->nodePort)
{
return false;
}
}
return true;
}
/* /*
* ShardIntervalsEqual checks if given shard intervals have equal min/max values. * ShardIntervalsEqual checks if given shard intervals have equal min/max values.
*/ */

View File

@ -0,0 +1,55 @@
--
-- NON_COLOCATED_JOIN_ORDER
--
-- Tests to check placements of shards must be equal to choose local join logic.
CREATE TABLE test_table_1(id int, value_1 int);
SELECT master_create_distributed_table('test_table_1', 'id', 'append');
master_create_distributed_table
---------------------------------
(1 row)
SET citus.large_table_shard_count to 1;
\copy test_table_1 FROM STDIN DELIMITER ','
\copy test_table_1 FROM STDIN DELIMITER ','
CREATE TABLE test_table_2(id int, value_1 int);
SELECT master_create_distributed_table('test_table_2', 'id', 'append');
master_create_distributed_table
---------------------------------
(1 row)
\copy test_table_2 FROM STDIN DELIMITER ','
\copy test_table_2 FROM STDIN DELIMITER ','
SET citus.log_multi_join_order to TRUE;
SET client_min_messages to DEBUG1;
-- Since we both have same amount of shards and they are colocated on the same node
-- local join logic will be triggered.
SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id;
LOG: join order: [ "test_table_1" ][ local partition join "test_table_2" ]
count
-------
6
(1 row)
-- Add two shards placement of interval [8,10] to test_table_1
SET citus.shard_replication_factor to 2;
\copy test_table_1 FROM STDIN DELIMITER ','
-- Add two shards placement of interval [8,10] to test_table_2
SET citus.shard_replication_factor to 1;
\copy test_table_2 FROM STDIN DELIMITER ','
-- Although shard interval of relation are same, since they have different amount of placements
-- for interval [8,10] repartition join logic will be triggered.
SET citus.enable_repartition_joins to ON;
SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id;
LOG: join order: [ "test_table_1" ][ single partition join "test_table_2" ]
DEBUG: cannot use real time executor with repartition jobs
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
count
-------
9
(1 row)
SET client_min_messages TO default;
DROP TABLE test_table_1;
DROP TABLE test_table_2;

View File

@ -45,7 +45,7 @@ test: multi_partitioning_utils multi_partitioning
# ---------- # ----------
test: subquery_basics subquery_local_tables subquery_executors subquery_and_cte set_operations set_operation_and_local_tables test: subquery_basics subquery_local_tables subquery_executors subquery_and_cte set_operations set_operation_and_local_tables
test: subqueries_deep subquery_view subquery_partitioning subquery_complex_target_list subqueries_not_supported subquery_in_where test: subqueries_deep subquery_view subquery_partitioning subquery_complex_target_list subqueries_not_supported subquery_in_where
test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins non_colocated_join_order
test: subquery_prepared_statements test: subquery_prepared_statements
# ---------- # ----------

View File

@ -0,0 +1,71 @@
--
-- NON_COLOCATED_JOIN_ORDER
--
-- Tests to check placements of shards must be equal to choose local join logic.
CREATE TABLE test_table_1(id int, value_1 int);
SELECT master_create_distributed_table('test_table_1', 'id', 'append');
SET citus.large_table_shard_count to 1;
\copy test_table_1 FROM STDIN DELIMITER ','
1,2
2,3
3,4
\.
\copy test_table_1 FROM STDIN DELIMITER ','
5,2
6,3
7,4
\.
CREATE TABLE test_table_2(id int, value_1 int);
SELECT master_create_distributed_table('test_table_2', 'id', 'append');
\copy test_table_2 FROM STDIN DELIMITER ','
1,2
2,3
3,4
\.
\copy test_table_2 FROM STDIN DELIMITER ','
5,2
6,3
7,4
\.
SET citus.log_multi_join_order to TRUE;
SET client_min_messages to DEBUG1;
-- Since we both have same amount of shards and they are colocated on the same node
-- local join logic will be triggered.
SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id;
-- Add two shards placement of interval [8,10] to test_table_1
SET citus.shard_replication_factor to 2;
\copy test_table_1 FROM STDIN DELIMITER ','
8,2
9,3
10,4
\.
-- Add two shards placement of interval [8,10] to test_table_2
SET citus.shard_replication_factor to 1;
\copy test_table_2 FROM STDIN DELIMITER ','
8,2
9,3
10,4
\.
-- Although shard interval of relation are same, since they have different amount of placements
-- for interval [8,10] repartition join logic will be triggered.
SET citus.enable_repartition_joins to ON;
SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id;
SET client_min_messages TO default;
DROP TABLE test_table_1;
DROP TABLE test_table_2;