diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 744e851f6..916c0500b 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -139,7 +139,7 @@ static bool JoinOnPartitionColumn(Query *query); static void ErrorIfUnsupportedShardDistribution(Query *query); static List * RelationIdList(Query *query); static bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); -static bool ShardIntervalsEqual(FmgrInfo* comparisonFunction, ShardInterval *firstInterval, +static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval, ShardInterval *secondInterval); static void ErrorIfUnsupportedFilters(Query *subquery); static bool EqualOpExpressionLists(List *firstOpExpressionList, @@ -3515,8 +3515,8 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) ShardInterval *secondInterval = &sortedSecondIntervalArray[intervalIndex]; bool shardIntervalsEqual = ShardIntervalsEqual(comparisonFunction, - firstInterval, - secondInterval); + firstInterval, + secondInterval); if (!shardIntervalsEqual) { coPartitionedTables = false; @@ -3529,11 +3529,12 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) /* - * ShardIntervalsEqual checks if given shard intervals have equal min/max values. + * ShardIntervalsEqual checks if given shard intervals have equal min/max values under + * some comparison function */ static bool -ShardIntervalsEqual(FmgrInfo* comparisonFunction, ShardInterval *firstInterval, - ShardInterval *secondInterval) +ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval, + ShardInterval *secondInterval) { bool shardIntervalsEqual = false; Datum firstMin = 0; diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 47208225c..41689c466 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2003,7 +2003,7 @@ SubquerySqlTaskList(Job *job) uint32 anchorRangeTableId = 0; uint32 rangeTableIndex = 0; const uint32 fragmentSize = sizeof(RangeTableFragment); - uint64 maxTableSize = 0; + uint64 maxPrunedTableSize = 0; /* find filters on partition columns */ ExtractQueryWalker((Node *) subquery, &queryList); @@ -2045,7 +2045,7 @@ SubquerySqlTaskList(Job *job) uint32 tableId = rangeTableIndex + 1; /* tableId starts from 1 */ uint32 finalShardCount = 0; uint32 shardIndex = 0; - uint64 tableSize = 0; + uint64 prunedTableSize = 0; if (opExpressionList != NIL) { @@ -2073,9 +2073,6 @@ SubquerySqlTaskList(Job *job) for (shardIndex = 0; shardIndex < finalShardCount; shardIndex++) { ShardInterval *shardInterval = sortedIntervalArray[shardIndex]; - - tableSize += ShardLength(shardInterval->shardId); - RangeTableFragment *shardFragment = palloc0(fragmentSize); shardFragment->fragmentReference = &(shardInterval->shardId); shardFragment->fragmentType = CITUS_RTE_RELATION; @@ -2095,10 +2092,17 @@ SubquerySqlTaskList(Job *job) /* get next fragment for the first relation list */ fragmentCombinationCell = lnext(fragmentCombinationCell); } + + prunedTableSize += ShardLength(shardInterval->shardId); } - if (anchorRangeTableId == 0 || tableSize > maxTableSize){ - maxTableSize = tableSize; + /* + * The shards of the anchor table are not shipped between workers if it can be + * avoided. By picking the largest table to be the anchor we're reducing network + * traffic. + */ + if (anchorRangeTableId == 0 || prunedTableSize > maxPrunedTableSize){ + maxTableSize = prunedTableSize; anchorRangeTableId = tableId; }