mirror of https://github.com/citusdata/citus.git
Disable co-located joins for append-distributed tables
parent
dfad73d918
commit
b97e5081c7
|
@ -171,10 +171,6 @@ static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
|||
TaskType taskType,
|
||||
bool modifyRequiresCoordinatorEvaluation,
|
||||
DeferredErrorMessage **planningError);
|
||||
static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction,
|
||||
Oid collation,
|
||||
ShardInterval *firstInterval,
|
||||
ShardInterval *secondInterval);
|
||||
static List * SqlTaskList(Job *job);
|
||||
static bool DependsOnHashPartitionJob(Job *job);
|
||||
static uint32 AnchorRangeTableId(List *rangeTableList);
|
||||
|
@ -228,8 +224,6 @@ static List * MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList,
|
|||
uint32 taskIdIndex);
|
||||
static StringInfo ColumnNameArrayString(uint32 columnCount, uint64 generatingJobId);
|
||||
static StringInfo ColumnTypeArrayString(List *targetEntryList);
|
||||
static bool CoPlacedShardIntervals(ShardInterval *firstInterval,
|
||||
ShardInterval *secondInterval);
|
||||
|
||||
static List * FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr);
|
||||
static List * FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr);
|
||||
|
@ -2648,12 +2642,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
|||
|
||||
|
||||
/*
|
||||
* CoPartitionedTables checks if given two distributed tables have 1-to-1 shard
|
||||
* placement matching. It first checks for the shard count, if tables don't have
|
||||
* same amount shard then it returns false. Note that, if any table does not
|
||||
* have any shard, it returns true. If two tables have same amount of shards,
|
||||
* we check colocationIds for hash distributed tables and shardInterval's min
|
||||
* max values for append and range distributed tables.
|
||||
* CoPartitionedTables checks if given two distributed tables are co-located.
|
||||
*/
|
||||
bool
|
||||
CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
|
||||
|
@ -2666,38 +2655,6 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
|
|||
CitusTableCacheEntry *firstTableCache = GetCitusTableCacheEntry(firstRelationId);
|
||||
CitusTableCacheEntry *secondTableCache = GetCitusTableCacheEntry(secondRelationId);
|
||||
|
||||
ShardInterval **sortedFirstIntervalArray = firstTableCache->sortedShardIntervalArray;
|
||||
ShardInterval **sortedSecondIntervalArray =
|
||||
secondTableCache->sortedShardIntervalArray;
|
||||
uint32 firstListShardCount = firstTableCache->shardIntervalArrayLength;
|
||||
uint32 secondListShardCount = secondTableCache->shardIntervalArrayLength;
|
||||
FmgrInfo *comparisonFunction = firstTableCache->shardIntervalCompareFunction;
|
||||
|
||||
/* reference tables are always & only copartitioned with reference tables */
|
||||
if (IsCitusTableTypeCacheEntry(firstTableCache, CITUS_TABLE_WITH_NO_DIST_KEY) &&
|
||||
IsCitusTableTypeCacheEntry(secondTableCache, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
else if (IsCitusTableTypeCacheEntry(firstTableCache, CITUS_TABLE_WITH_NO_DIST_KEY) ||
|
||||
IsCitusTableTypeCacheEntry(secondTableCache, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (firstListShardCount != secondListShardCount)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* if there are not any shards just return true */
|
||||
if (firstListShardCount == 0)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
Assert(comparisonFunction != NULL);
|
||||
|
||||
/*
|
||||
* Check if the tables have the same colocation ID - if so, we know
|
||||
* they're colocated.
|
||||
|
@ -2708,130 +2665,7 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
|
|||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* For hash distributed tables two tables are accepted as colocated only if
|
||||
* they have the same colocationId. Otherwise they may have same minimum and
|
||||
* maximum values for each shard interval, yet hash function may result with
|
||||
* different values for the same value. int vs bigint can be given as an
|
||||
* example.
|
||||
*/
|
||||
if (IsCitusTableTypeCacheEntry(firstTableCache, HASH_DISTRIBUTED) ||
|
||||
IsCitusTableTypeCacheEntry(secondTableCache, HASH_DISTRIBUTED))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Don't compare unequal types
|
||||
*/
|
||||
Oid collation = firstTableCache->partitionColumn->varcollid;
|
||||
if (firstTableCache->partitionColumn->vartype !=
|
||||
secondTableCache->partitionColumn->vartype ||
|
||||
collation != secondTableCache->partitionColumn->varcollid)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* If not known to be colocated check if the remaining shards are
|
||||
* anyway. Do so by comparing the shard interval arrays that are sorted on
|
||||
* interval minimum values. Then it compares every shard interval in order
|
||||
* and if any pair of shard intervals are not equal or they are not located
|
||||
* in the same node it returns false.
|
||||
*/
|
||||
for (uint32 intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++)
|
||||
{
|
||||
ShardInterval *firstInterval = sortedFirstIntervalArray[intervalIndex];
|
||||
ShardInterval *secondInterval = sortedSecondIntervalArray[intervalIndex];
|
||||
|
||||
bool shardIntervalsEqual = ShardIntervalsEqual(comparisonFunction,
|
||||
collation,
|
||||
firstInterval,
|
||||
secondInterval);
|
||||
if (!shardIntervalsEqual || !CoPlacedShardIntervals(firstInterval,
|
||||
secondInterval))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CoPlacedShardIntervals checks whether the given intervals located in the same nodes.
|
||||
*/
|
||||
static bool
|
||||
CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval)
|
||||
{
|
||||
List *firstShardPlacementList = ShardPlacementListWithoutOrphanedPlacements(
|
||||
firstInterval->shardId);
|
||||
List *secondShardPlacementList = ShardPlacementListWithoutOrphanedPlacements(
|
||||
secondInterval->shardId);
|
||||
ListCell *firstShardPlacementCell = NULL;
|
||||
ListCell *secondShardPlacementCell = NULL;
|
||||
|
||||
/* Shards must have same number of placements */
|
||||
if (list_length(firstShardPlacementList) != list_length(secondShardPlacementList))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
firstShardPlacementList = SortList(firstShardPlacementList, CompareShardPlacements);
|
||||
secondShardPlacementList = SortList(secondShardPlacementList, CompareShardPlacements);
|
||||
|
||||
forboth(firstShardPlacementCell, firstShardPlacementList, secondShardPlacementCell,
|
||||
secondShardPlacementList)
|
||||
{
|
||||
ShardPlacement *firstShardPlacement = (ShardPlacement *) lfirst(
|
||||
firstShardPlacementCell);
|
||||
ShardPlacement *secondShardPlacement = (ShardPlacement *) lfirst(
|
||||
secondShardPlacementCell);
|
||||
|
||||
if (firstShardPlacement->nodeId != secondShardPlacement->nodeId)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShardIntervalsEqual checks if given shard intervals have equal min/max values.
|
||||
*/
|
||||
static bool
|
||||
ShardIntervalsEqual(FmgrInfo *comparisonFunction, Oid collation,
|
||||
ShardInterval *firstInterval, ShardInterval *secondInterval)
|
||||
{
|
||||
bool shardIntervalsEqual = false;
|
||||
|
||||
Datum firstMin = firstInterval->minValue;
|
||||
Datum firstMax = firstInterval->maxValue;
|
||||
Datum secondMin = secondInterval->minValue;
|
||||
Datum secondMax = secondInterval->maxValue;
|
||||
|
||||
if (firstInterval->minValueExists && firstInterval->maxValueExists &&
|
||||
secondInterval->minValueExists && secondInterval->maxValueExists)
|
||||
{
|
||||
Datum minDatum = FunctionCall2Coll(comparisonFunction, collation, firstMin,
|
||||
secondMin);
|
||||
Datum maxDatum = FunctionCall2Coll(comparisonFunction, collation, firstMax,
|
||||
secondMax);
|
||||
int firstComparison = DatumGetInt32(minDatum);
|
||||
int secondComparison = DatumGetInt32(maxDatum);
|
||||
|
||||
if (firstComparison == 0 && secondComparison == 0)
|
||||
{
|
||||
shardIntervalsEqual = true;
|
||||
}
|
||||
}
|
||||
|
||||
return shardIntervalsEqual;
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -324,6 +324,9 @@ SELECT create_distributed_table('range2', 'id', 'range');
|
|||
(1 row)
|
||||
|
||||
CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}');
|
||||
-- Mark tables co-located
|
||||
UPDATE pg_dist_partition SET colocationid = 30001
|
||||
WHERE logicalrelid = 'range1'::regclass OR logicalrelid = 'range2'::regclass;
|
||||
-- Move shard placement and DON'T clean it up, now range1 and range2 are
|
||||
-- colocated, but only range2 has an orphaned shard.
|
||||
SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
|
||||
|
@ -339,7 +342,7 @@ SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid
|
|||
92448600 | 1 | 57637
|
||||
(2 rows)
|
||||
|
||||
-- Make sure that tables are detected as colocated
|
||||
-- Make sure co-located join works
|
||||
SELECT * FROM range1 JOIN range2 ON range1.id = range2.id;
|
||||
id | id
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -169,7 +169,7 @@ LOG: join order: [ "orders" ][ dual partition join "customer_hash" ]
|
|||
EXPLAIN (COSTS OFF)
|
||||
SELECT count(*) FROM orders_hash, customer_append
|
||||
WHERE c_custkey = o_custkey;
|
||||
LOG: join order: [ "orders_hash" ][ single range partition join "customer_append" ]
|
||||
LOG: join order: [ "orders_hash" ][ dual partition join "customer_append" ]
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Aggregate
|
||||
|
|
|
@ -53,7 +53,7 @@ GROUP BY
|
|||
ORDER BY
|
||||
revenue DESC,
|
||||
o_orderdate;
|
||||
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single range partition join "customer_append" ]
|
||||
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ dual partition join "customer_append" ]
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Sort
|
||||
|
@ -97,16 +97,14 @@ GROUP BY
|
|||
c_comment
|
||||
ORDER BY
|
||||
revenue DESC;
|
||||
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single range partition join "customer_append" ][ reference join "nation" ]
|
||||
QUERY PLAN
|
||||
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ dual partition join "customer_append" ][ reference join "nation" ]
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Sort
|
||||
Sort Key: (sum(remote_scan.revenue)) DESC
|
||||
-> HashAggregate
|
||||
Group Key: remote_scan.c_custkey, remote_scan.c_name, remote_scan.c_acctbal, remote_scan.c_phone, remote_scan.n_name, remote_scan.c_address, remote_scan.c_comment
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
explain statements for distributed queries are not enabled
|
||||
(6 rows)
|
||||
Sort Key: remote_scan.revenue DESC
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
explain statements for distributed queries are not enabled
|
||||
(4 rows)
|
||||
|
||||
-- Query #19 from the TPC-H decision support benchmark (modified)
|
||||
EXPLAIN (COSTS OFF)
|
||||
|
@ -139,7 +137,7 @@ WHERE
|
|||
AND l_shipmode in ('AIR', 'AIR REG', 'TRUCK')
|
||||
AND l_shipinstruct = 'DELIVER IN PERSON'
|
||||
);
|
||||
LOG: join order: [ "lineitem" ][ single range partition join "part_append" ]
|
||||
LOG: join order: [ "lineitem" ][ dual partition join "part_append" ]
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Aggregate
|
||||
|
@ -159,7 +157,7 @@ WHERE
|
|||
c_custkey = o_custkey
|
||||
GROUP BY
|
||||
l_partkey;
|
||||
LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single range partition join "part_append" ][ single range partition join "customer_append" ]
|
||||
LOG: join order: [ "lineitem" ][ local partition join "orders" ][ dual partition join "part_append" ][ dual partition join "customer_append" ]
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
HashAggregate
|
||||
|
|
|
@ -66,24 +66,62 @@ ORDER BY
|
|||
DEBUG: Router planner does not support append-partitioned tables.
|
||||
DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647]
|
||||
DEBUG: join prunable for intervals [0,2147483647] and [-2147483648,-1]
|
||||
DEBUG: join prunable for intervals [1,1000] and [6001,7000]
|
||||
DEBUG: join prunable for intervals [6001,7000] and [1,1000]
|
||||
DEBUG: join prunable for task partitionId 0 and 1
|
||||
DEBUG: join prunable for task partitionId 0 and 2
|
||||
DEBUG: join prunable for task partitionId 0 and 3
|
||||
DEBUG: join prunable for task partitionId 1 and 0
|
||||
DEBUG: join prunable for task partitionId 1 and 2
|
||||
DEBUG: join prunable for task partitionId 1 and 3
|
||||
DEBUG: join prunable for task partitionId 2 and 0
|
||||
DEBUG: join prunable for task partitionId 2 and 1
|
||||
DEBUG: join prunable for task partitionId 2 and 3
|
||||
DEBUG: join prunable for task partitionId 3 and 0
|
||||
DEBUG: join prunable for task partitionId 3 and 1
|
||||
DEBUG: join prunable for task partitionId 3 and 2
|
||||
DEBUG: pruning merge fetch taskId 1
|
||||
DETAIL: Creating dependency on merge taskId 3
|
||||
DEBUG: pruning merge fetch taskId 3
|
||||
DEBUG: pruning merge fetch taskId 2
|
||||
DETAIL: Creating dependency on merge taskId 3
|
||||
DEBUG: pruning merge fetch taskId 4
|
||||
DETAIL: Creating dependency on merge taskId 6
|
||||
DEBUG: join prunable for intervals [1,1000] and [1001,2000]
|
||||
DEBUG: join prunable for intervals [1,1000] and [6001,7000]
|
||||
DEBUG: join prunable for intervals [1001,2000] and [1,1000]
|
||||
DEBUG: join prunable for intervals [1001,2000] and [6001,7000]
|
||||
DEBUG: join prunable for intervals [6001,7000] and [1,1000]
|
||||
DEBUG: join prunable for intervals [6001,7000] and [1001,2000]
|
||||
DEBUG: pruning merge fetch taskId 1
|
||||
DETAIL: Creating dependency on merge taskId 5
|
||||
DEBUG: pruning merge fetch taskId 3
|
||||
DETAIL: Creating dependency on merge taskId 8
|
||||
DEBUG: pruning merge fetch taskId 5
|
||||
DETAIL: Creating dependency on merge taskId 11
|
||||
DETAIL: Creating dependency on merge taskId 6
|
||||
DEBUG: pruning merge fetch taskId 7
|
||||
DETAIL: Creating dependency on merge taskId 9
|
||||
DEBUG: pruning merge fetch taskId 8
|
||||
DETAIL: Creating dependency on merge taskId 9
|
||||
DEBUG: pruning merge fetch taskId 10
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: pruning merge fetch taskId 11
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: join prunable for task partitionId 0 and 1
|
||||
DEBUG: join prunable for task partitionId 0 and 2
|
||||
DEBUG: join prunable for task partitionId 0 and 3
|
||||
DEBUG: join prunable for task partitionId 1 and 0
|
||||
DEBUG: join prunable for task partitionId 1 and 2
|
||||
DEBUG: join prunable for task partitionId 1 and 3
|
||||
DEBUG: join prunable for task partitionId 2 and 0
|
||||
DEBUG: join prunable for task partitionId 2 and 1
|
||||
DEBUG: join prunable for task partitionId 2 and 3
|
||||
DEBUG: join prunable for task partitionId 3 and 0
|
||||
DEBUG: join prunable for task partitionId 3 and 1
|
||||
DEBUG: join prunable for task partitionId 3 and 2
|
||||
DEBUG: pruning merge fetch taskId 1
|
||||
DETAIL: Creating dependency on merge taskId 13
|
||||
DEBUG: pruning merge fetch taskId 2
|
||||
DETAIL: Creating dependency on merge taskId 4
|
||||
DEBUG: pruning merge fetch taskId 4
|
||||
DETAIL: Creating dependency on merge taskId 18
|
||||
DEBUG: pruning merge fetch taskId 5
|
||||
DETAIL: Creating dependency on merge taskId 8
|
||||
DEBUG: pruning merge fetch taskId 7
|
||||
DETAIL: Creating dependency on merge taskId 23
|
||||
DEBUG: pruning merge fetch taskId 8
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: pruning merge fetch taskId 10
|
||||
DETAIL: Creating dependency on merge taskId 28
|
||||
DEBUG: pruning merge fetch taskId 11
|
||||
DETAIL: Creating dependency on merge taskId 16
|
||||
l_partkey | o_orderkey | count
|
||||
---------------------------------------------------------------------
|
||||
18 | 12005 | 1
|
||||
|
|
|
@ -14,28 +14,47 @@ FROM
|
|||
WHERE
|
||||
o_custkey = c_custkey;
|
||||
DEBUG: Router planner does not support append-partitioned tables.
|
||||
DEBUG: join prunable for intervals [1,1000] and [1001,2000]
|
||||
DEBUG: join prunable for intervals [1,1000] and [6001,7000]
|
||||
DEBUG: join prunable for intervals [1001,2000] and [1,1000]
|
||||
DEBUG: join prunable for intervals [1001,2000] and [6001,7000]
|
||||
DEBUG: join prunable for intervals [6001,7000] and [1,1000]
|
||||
DEBUG: join prunable for intervals [6001,7000] and [1001,2000]
|
||||
DEBUG: join prunable for task partitionId 0 and 1
|
||||
DEBUG: join prunable for task partitionId 0 and 2
|
||||
DEBUG: join prunable for task partitionId 0 and 3
|
||||
DEBUG: join prunable for task partitionId 1 and 0
|
||||
DEBUG: join prunable for task partitionId 1 and 2
|
||||
DEBUG: join prunable for task partitionId 1 and 3
|
||||
DEBUG: join prunable for task partitionId 2 and 0
|
||||
DEBUG: join prunable for task partitionId 2 and 1
|
||||
DEBUG: join prunable for task partitionId 2 and 3
|
||||
DEBUG: join prunable for task partitionId 3 and 0
|
||||
DEBUG: join prunable for task partitionId 3 and 1
|
||||
DEBUG: join prunable for task partitionId 3 and 2
|
||||
DEBUG: pruning merge fetch taskId 1
|
||||
DETAIL: Creating dependency on merge taskId 3
|
||||
DEBUG: pruning merge fetch taskId 3
|
||||
DEBUG: pruning merge fetch taskId 2
|
||||
DETAIL: Creating dependency on merge taskId 4
|
||||
DEBUG: pruning merge fetch taskId 4
|
||||
DETAIL: Creating dependency on merge taskId 6
|
||||
DEBUG: pruning merge fetch taskId 5
|
||||
DETAIL: Creating dependency on merge taskId 8
|
||||
DEBUG: pruning merge fetch taskId 7
|
||||
DETAIL: Creating dependency on merge taskId 9
|
||||
DEBUG: pruning merge fetch taskId 8
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: pruning merge fetch taskId 10
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: pruning merge fetch taskId 11
|
||||
DETAIL: Creating dependency on merge taskId 16
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Aggregate
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 3
|
||||
Task Count: 4
|
||||
Tasks Shown: None, not supported for re-partition queries
|
||||
-> MapMergeJob
|
||||
Map Task Count: 2
|
||||
Merge Task Count: 3
|
||||
(7 rows)
|
||||
Merge Task Count: 4
|
||||
-> MapMergeJob
|
||||
Map Task Count: 3
|
||||
Merge Task Count: 4
|
||||
(10 rows)
|
||||
|
||||
SELECT
|
||||
count(*)
|
||||
|
@ -44,18 +63,34 @@ FROM
|
|||
WHERE
|
||||
o_custkey = c_custkey;
|
||||
DEBUG: Router planner does not support append-partitioned tables.
|
||||
DEBUG: join prunable for intervals [1,1000] and [1001,2000]
|
||||
DEBUG: join prunable for intervals [1,1000] and [6001,7000]
|
||||
DEBUG: join prunable for intervals [1001,2000] and [1,1000]
|
||||
DEBUG: join prunable for intervals [1001,2000] and [6001,7000]
|
||||
DEBUG: join prunable for intervals [6001,7000] and [1,1000]
|
||||
DEBUG: join prunable for intervals [6001,7000] and [1001,2000]
|
||||
DEBUG: join prunable for task partitionId 0 and 1
|
||||
DEBUG: join prunable for task partitionId 0 and 2
|
||||
DEBUG: join prunable for task partitionId 0 and 3
|
||||
DEBUG: join prunable for task partitionId 1 and 0
|
||||
DEBUG: join prunable for task partitionId 1 and 2
|
||||
DEBUG: join prunable for task partitionId 1 and 3
|
||||
DEBUG: join prunable for task partitionId 2 and 0
|
||||
DEBUG: join prunable for task partitionId 2 and 1
|
||||
DEBUG: join prunable for task partitionId 2 and 3
|
||||
DEBUG: join prunable for task partitionId 3 and 0
|
||||
DEBUG: join prunable for task partitionId 3 and 1
|
||||
DEBUG: join prunable for task partitionId 3 and 2
|
||||
DEBUG: pruning merge fetch taskId 1
|
||||
DETAIL: Creating dependency on merge taskId 3
|
||||
DEBUG: pruning merge fetch taskId 3
|
||||
DEBUG: pruning merge fetch taskId 2
|
||||
DETAIL: Creating dependency on merge taskId 4
|
||||
DEBUG: pruning merge fetch taskId 4
|
||||
DETAIL: Creating dependency on merge taskId 6
|
||||
DEBUG: pruning merge fetch taskId 5
|
||||
DETAIL: Creating dependency on merge taskId 8
|
||||
DEBUG: pruning merge fetch taskId 7
|
||||
DETAIL: Creating dependency on merge taskId 9
|
||||
DEBUG: pruning merge fetch taskId 8
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: pruning merge fetch taskId 10
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: pruning merge fetch taskId 11
|
||||
DETAIL: Creating dependency on merge taskId 16
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
2985
|
||||
|
@ -72,28 +107,47 @@ WHERE
|
|||
o_custkey = c_custkey AND
|
||||
o_orderkey < 0;
|
||||
DEBUG: Router planner does not support append-partitioned tables.
|
||||
DEBUG: join prunable for intervals [1,1000] and [1001,2000]
|
||||
DEBUG: join prunable for intervals [1,1000] and [6001,7000]
|
||||
DEBUG: join prunable for intervals [1001,2000] and [1,1000]
|
||||
DEBUG: join prunable for intervals [1001,2000] and [6001,7000]
|
||||
DEBUG: join prunable for intervals [6001,7000] and [1,1000]
|
||||
DEBUG: join prunable for intervals [6001,7000] and [1001,2000]
|
||||
DEBUG: join prunable for task partitionId 0 and 1
|
||||
DEBUG: join prunable for task partitionId 0 and 2
|
||||
DEBUG: join prunable for task partitionId 0 and 3
|
||||
DEBUG: join prunable for task partitionId 1 and 0
|
||||
DEBUG: join prunable for task partitionId 1 and 2
|
||||
DEBUG: join prunable for task partitionId 1 and 3
|
||||
DEBUG: join prunable for task partitionId 2 and 0
|
||||
DEBUG: join prunable for task partitionId 2 and 1
|
||||
DEBUG: join prunable for task partitionId 2 and 3
|
||||
DEBUG: join prunable for task partitionId 3 and 0
|
||||
DEBUG: join prunable for task partitionId 3 and 1
|
||||
DEBUG: join prunable for task partitionId 3 and 2
|
||||
DEBUG: pruning merge fetch taskId 1
|
||||
DETAIL: Creating dependency on merge taskId 3
|
||||
DEBUG: pruning merge fetch taskId 3
|
||||
DEBUG: pruning merge fetch taskId 2
|
||||
DETAIL: Creating dependency on merge taskId 4
|
||||
DEBUG: pruning merge fetch taskId 4
|
||||
DETAIL: Creating dependency on merge taskId 6
|
||||
DEBUG: pruning merge fetch taskId 5
|
||||
DETAIL: Creating dependency on merge taskId 8
|
||||
DEBUG: pruning merge fetch taskId 7
|
||||
DETAIL: Creating dependency on merge taskId 9
|
||||
DEBUG: pruning merge fetch taskId 8
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: pruning merge fetch taskId 10
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: pruning merge fetch taskId 11
|
||||
DETAIL: Creating dependency on merge taskId 16
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Aggregate
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 3
|
||||
Task Count: 4
|
||||
Tasks Shown: None, not supported for re-partition queries
|
||||
-> MapMergeJob
|
||||
Map Task Count: 2
|
||||
Merge Task Count: 3
|
||||
(7 rows)
|
||||
Merge Task Count: 4
|
||||
-> MapMergeJob
|
||||
Map Task Count: 3
|
||||
Merge Task Count: 4
|
||||
(10 rows)
|
||||
|
||||
SELECT
|
||||
count(*)
|
||||
|
@ -103,18 +157,34 @@ WHERE
|
|||
o_custkey = c_custkey AND
|
||||
o_orderkey < 0 AND o_orderkey > 0;
|
||||
DEBUG: Router planner does not support append-partitioned tables.
|
||||
DEBUG: join prunable for intervals [1,1000] and [1001,2000]
|
||||
DEBUG: join prunable for intervals [1,1000] and [6001,7000]
|
||||
DEBUG: join prunable for intervals [1001,2000] and [1,1000]
|
||||
DEBUG: join prunable for intervals [1001,2000] and [6001,7000]
|
||||
DEBUG: join prunable for intervals [6001,7000] and [1,1000]
|
||||
DEBUG: join prunable for intervals [6001,7000] and [1001,2000]
|
||||
DEBUG: join prunable for task partitionId 0 and 1
|
||||
DEBUG: join prunable for task partitionId 0 and 2
|
||||
DEBUG: join prunable for task partitionId 0 and 3
|
||||
DEBUG: join prunable for task partitionId 1 and 0
|
||||
DEBUG: join prunable for task partitionId 1 and 2
|
||||
DEBUG: join prunable for task partitionId 1 and 3
|
||||
DEBUG: join prunable for task partitionId 2 and 0
|
||||
DEBUG: join prunable for task partitionId 2 and 1
|
||||
DEBUG: join prunable for task partitionId 2 and 3
|
||||
DEBUG: join prunable for task partitionId 3 and 0
|
||||
DEBUG: join prunable for task partitionId 3 and 1
|
||||
DEBUG: join prunable for task partitionId 3 and 2
|
||||
DEBUG: pruning merge fetch taskId 1
|
||||
DETAIL: Creating dependency on merge taskId 3
|
||||
DEBUG: pruning merge fetch taskId 3
|
||||
DEBUG: pruning merge fetch taskId 2
|
||||
DETAIL: Creating dependency on merge taskId 4
|
||||
DEBUG: pruning merge fetch taskId 4
|
||||
DETAIL: Creating dependency on merge taskId 6
|
||||
DEBUG: pruning merge fetch taskId 5
|
||||
DETAIL: Creating dependency on merge taskId 8
|
||||
DEBUG: pruning merge fetch taskId 7
|
||||
DETAIL: Creating dependency on merge taskId 9
|
||||
DEBUG: pruning merge fetch taskId 8
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: pruning merge fetch taskId 10
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: pruning merge fetch taskId 11
|
||||
DETAIL: Creating dependency on merge taskId 16
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
|
@ -139,8 +209,11 @@ DEBUG: Router planner does not support append-partitioned tables.
|
|||
Tasks Shown: None, not supported for re-partition queries
|
||||
-> MapMergeJob
|
||||
Map Task Count: 2
|
||||
Merge Task Count: 3
|
||||
(7 rows)
|
||||
Merge Task Count: 4
|
||||
-> MapMergeJob
|
||||
Map Task Count: 0
|
||||
Merge Task Count: 0
|
||||
(10 rows)
|
||||
|
||||
SELECT
|
||||
count(*)
|
||||
|
@ -359,7 +432,10 @@ DEBUG: Router planner does not support append-partitioned tables.
|
|||
-> MapMergeJob
|
||||
Map Task Count: 0
|
||||
Merge Task Count: 0
|
||||
(6 rows)
|
||||
-> MapMergeJob
|
||||
Map Task Count: 0
|
||||
Merge Task Count: 0
|
||||
(9 rows)
|
||||
|
||||
-- execute once, to verify that's handled
|
||||
SELECT
|
||||
|
@ -389,7 +465,10 @@ DEBUG: Router planner does not support append-partitioned tables.
|
|||
-> MapMergeJob
|
||||
Map Task Count: 0
|
||||
Merge Task Count: 0
|
||||
(6 rows)
|
||||
-> MapMergeJob
|
||||
Map Task Count: 0
|
||||
Merge Task Count: 0
|
||||
(9 rows)
|
||||
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT
|
||||
|
|
|
@ -13,7 +13,7 @@ GROUP BY
|
|||
ORDER BY
|
||||
l_partkey, l_suppkey
|
||||
LIMIT 10;
|
||||
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ]
|
||||
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ]
|
||||
DEBUG: push down of limit count: 10
|
||||
l_partkey | l_suppkey | count
|
||||
---------------------------------------------------------------------
|
||||
|
@ -41,7 +41,7 @@ GROUP BY
|
|||
ORDER BY
|
||||
l_partkey, l_suppkey
|
||||
LIMIT 10;
|
||||
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ]
|
||||
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ]
|
||||
DEBUG: push down of limit count: 10
|
||||
l_partkey | l_suppkey | count
|
||||
---------------------------------------------------------------------
|
||||
|
@ -69,7 +69,7 @@ GROUP BY
|
|||
ORDER BY
|
||||
l_partkey, l_suppkey
|
||||
LIMIT 10;
|
||||
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ]
|
||||
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ]
|
||||
DEBUG: push down of limit count: 10
|
||||
l_partkey | l_suppkey | count
|
||||
---------------------------------------------------------------------
|
||||
|
@ -96,7 +96,7 @@ GROUP BY
|
|||
ORDER BY
|
||||
l_partkey, l_suppkey
|
||||
LIMIT 10;
|
||||
LOG: join order: [ "lineitem" ][ single range partition join "part_append" ][ cartesian product reference join "supplier" ]
|
||||
LOG: join order: [ "lineitem" ][ dual partition join "part_append" ][ cartesian product reference join "supplier" ]
|
||||
DEBUG: push down of limit count: 10
|
||||
l_partkey | l_suppkey | count
|
||||
---------------------------------------------------------------------
|
||||
|
@ -124,7 +124,7 @@ GROUP BY
|
|||
ORDER BY
|
||||
l_partkey, l_suppkey
|
||||
LIMIT 10;
|
||||
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ]
|
||||
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ]
|
||||
DEBUG: push down of limit count: 10
|
||||
l_partkey | l_suppkey | count
|
||||
---------------------------------------------------------------------
|
||||
|
@ -152,7 +152,7 @@ GROUP BY
|
|||
ORDER BY
|
||||
l_partkey, l_suppkey
|
||||
LIMIT 10;
|
||||
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ]
|
||||
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ]
|
||||
DEBUG: push down of limit count: 10
|
||||
l_partkey | l_suppkey | count
|
||||
---------------------------------------------------------------------
|
||||
|
@ -180,7 +180,7 @@ GROUP BY
|
|||
ORDER BY
|
||||
l_partkey, l_suppkey
|
||||
LIMIT 10;
|
||||
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ]
|
||||
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ]
|
||||
DEBUG: push down of limit count: 10
|
||||
l_partkey | l_suppkey | count
|
||||
---------------------------------------------------------------------
|
||||
|
@ -208,7 +208,7 @@ GROUP BY
|
|||
ORDER BY
|
||||
l_partkey, l_suppkey
|
||||
LIMIT 10;
|
||||
LOG: join order: [ "lineitem" ][ single range partition join "part_append" ][ reference join "supplier" ]
|
||||
LOG: join order: [ "lineitem" ][ dual partition join "part_append" ][ reference join "supplier" ]
|
||||
DEBUG: push down of limit count: 10
|
||||
l_partkey | l_suppkey | count
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -24,18 +24,38 @@ DEBUG: assigned task to node localhost:xxxxx
|
|||
DEBUG: assigned task to node localhost:xxxxx
|
||||
DEBUG: no sharding pruning constraints on customer_append found
|
||||
DEBUG: shard count after pruning for customer_append: 3
|
||||
DEBUG: join prunable for intervals [1,1000] and [1001,2000]
|
||||
DEBUG: join prunable for intervals [1,1000] and [6001,7000]
|
||||
DEBUG: join prunable for intervals [1001,2000] and [1,1000]
|
||||
DEBUG: join prunable for intervals [1001,2000] and [6001,7000]
|
||||
DEBUG: join prunable for intervals [6001,7000] and [1,1000]
|
||||
DEBUG: join prunable for intervals [6001,7000] and [1001,2000]
|
||||
DEBUG: assigned task to node localhost:xxxxx
|
||||
DEBUG: assigned task to node localhost:xxxxx
|
||||
DEBUG: assigned task to node localhost:xxxxx
|
||||
DEBUG: join prunable for task partitionId 0 and 1
|
||||
DEBUG: join prunable for task partitionId 0 and 2
|
||||
DEBUG: join prunable for task partitionId 0 and 3
|
||||
DEBUG: join prunable for task partitionId 1 and 0
|
||||
DEBUG: join prunable for task partitionId 1 and 2
|
||||
DEBUG: join prunable for task partitionId 1 and 3
|
||||
DEBUG: join prunable for task partitionId 2 and 0
|
||||
DEBUG: join prunable for task partitionId 2 and 1
|
||||
DEBUG: join prunable for task partitionId 2 and 3
|
||||
DEBUG: join prunable for task partitionId 3 and 0
|
||||
DEBUG: join prunable for task partitionId 3 and 1
|
||||
DEBUG: join prunable for task partitionId 3 and 2
|
||||
DEBUG: pruning merge fetch taskId 1
|
||||
DETAIL: Creating dependency on merge taskId 3
|
||||
DEBUG: pruning merge fetch taskId 3
|
||||
DEBUG: pruning merge fetch taskId 2
|
||||
DETAIL: Creating dependency on merge taskId 4
|
||||
DEBUG: pruning merge fetch taskId 4
|
||||
DETAIL: Creating dependency on merge taskId 6
|
||||
DEBUG: pruning merge fetch taskId 5
|
||||
DETAIL: Creating dependency on merge taskId 8
|
||||
DEBUG: pruning merge fetch taskId 7
|
||||
DETAIL: Creating dependency on merge taskId 9
|
||||
DEBUG: pruning merge fetch taskId 8
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: pruning merge fetch taskId 10
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: pruning merge fetch taskId 11
|
||||
DETAIL: Creating dependency on merge taskId 16
|
||||
DEBUG: assigned task to node localhost:xxxxx
|
||||
DEBUG: assigned task to node localhost:xxxxx
|
||||
DEBUG: assigned task to node localhost:xxxxx
|
||||
DEBUG: assigned task to node localhost:xxxxx
|
||||
|
@ -56,24 +76,44 @@ WHERE
|
|||
o_custkey = c_custkey AND
|
||||
o_orderkey = l_orderkey;
|
||||
DEBUG: Router planner does not support append-partitioned tables.
|
||||
DEBUG: no sharding pruning constraints on customer_append found
|
||||
DEBUG: shard count after pruning for customer_append: 3
|
||||
DEBUG: assigned task to node localhost:xxxxx
|
||||
DEBUG: assigned task to node localhost:xxxxx
|
||||
DEBUG: assigned task to node localhost:xxxxx
|
||||
DEBUG: no sharding pruning constraints on lineitem found
|
||||
DEBUG: shard count after pruning for lineitem: 2
|
||||
DEBUG: assigned task to node localhost:xxxxx
|
||||
DEBUG: assigned task to node localhost:xxxxx
|
||||
DEBUG: no sharding pruning constraints on customer_append found
|
||||
DEBUG: shard count after pruning for customer_append: 3
|
||||
DEBUG: join prunable for intervals [1,1000] and [1001,2000]
|
||||
DEBUG: join prunable for intervals [1,1000] and [6001,7000]
|
||||
DEBUG: join prunable for intervals [1001,2000] and [1,1000]
|
||||
DEBUG: join prunable for intervals [1001,2000] and [6001,7000]
|
||||
DEBUG: join prunable for intervals [6001,7000] and [1,1000]
|
||||
DEBUG: join prunable for intervals [6001,7000] and [1001,2000]
|
||||
DEBUG: join prunable for task partitionId 0 and 1
|
||||
DEBUG: join prunable for task partitionId 0 and 2
|
||||
DEBUG: join prunable for task partitionId 0 and 3
|
||||
DEBUG: join prunable for task partitionId 1 and 0
|
||||
DEBUG: join prunable for task partitionId 1 and 2
|
||||
DEBUG: join prunable for task partitionId 1 and 3
|
||||
DEBUG: join prunable for task partitionId 2 and 0
|
||||
DEBUG: join prunable for task partitionId 2 and 1
|
||||
DEBUG: join prunable for task partitionId 2 and 3
|
||||
DEBUG: join prunable for task partitionId 3 and 0
|
||||
DEBUG: join prunable for task partitionId 3 and 1
|
||||
DEBUG: join prunable for task partitionId 3 and 2
|
||||
DEBUG: pruning merge fetch taskId 1
|
||||
DETAIL: Creating dependency on merge taskId 4
|
||||
DEBUG: pruning merge fetch taskId 2
|
||||
DETAIL: Creating dependency on merge taskId 3
|
||||
DEBUG: pruning merge fetch taskId 3
|
||||
DETAIL: Creating dependency on merge taskId 6
|
||||
DEBUG: pruning merge fetch taskId 4
|
||||
DETAIL: Creating dependency on merge taskId 8
|
||||
DEBUG: pruning merge fetch taskId 5
|
||||
DETAIL: Creating dependency on merge taskId 6
|
||||
DEBUG: pruning merge fetch taskId 7
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: pruning merge fetch taskId 8
|
||||
DETAIL: Creating dependency on merge taskId 9
|
||||
DEBUG: pruning merge fetch taskId 10
|
||||
DETAIL: Creating dependency on merge taskId 16
|
||||
DEBUG: pruning merge fetch taskId 11
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: assigned task to node localhost:xxxxx
|
||||
DEBUG: assigned task to node localhost:xxxxx
|
||||
DEBUG: assigned task to node localhost:xxxxx
|
||||
DEBUG: assigned task to node localhost:xxxxx
|
||||
|
|
|
@ -1931,26 +1931,34 @@ DEBUG: query has a single distribution column value: 2
|
|||
SELECT * FROM articles_hash ar join authors_range au on (ar.author_id = au.id)
|
||||
WHERE ar.author_id = 3;
|
||||
DEBUG: found no worker with all shard placements
|
||||
DEBUG: join prunable for intervals [1,10] and [11,20]
|
||||
DEBUG: join prunable for intervals [1,10] and [21,30]
|
||||
DEBUG: join prunable for intervals [1,10] and [31,40]
|
||||
DEBUG: join prunable for intervals [11,20] and [1,10]
|
||||
DEBUG: join prunable for intervals [11,20] and [21,30]
|
||||
DEBUG: join prunable for intervals [11,20] and [31,40]
|
||||
DEBUG: join prunable for intervals [21,30] and [1,10]
|
||||
DEBUG: join prunable for intervals [21,30] and [11,20]
|
||||
DEBUG: join prunable for intervals [21,30] and [31,40]
|
||||
DEBUG: join prunable for intervals [31,40] and [1,10]
|
||||
DEBUG: join prunable for intervals [31,40] and [11,20]
|
||||
DEBUG: join prunable for intervals [31,40] and [21,30]
|
||||
DEBUG: join prunable for task partitionId 0 and 1
|
||||
DEBUG: join prunable for task partitionId 0 and 2
|
||||
DEBUG: join prunable for task partitionId 0 and 3
|
||||
DEBUG: join prunable for task partitionId 1 and 0
|
||||
DEBUG: join prunable for task partitionId 1 and 2
|
||||
DEBUG: join prunable for task partitionId 1 and 3
|
||||
DEBUG: join prunable for task partitionId 2 and 0
|
||||
DEBUG: join prunable for task partitionId 2 and 1
|
||||
DEBUG: join prunable for task partitionId 2 and 3
|
||||
DEBUG: join prunable for task partitionId 3 and 0
|
||||
DEBUG: join prunable for task partitionId 3 and 1
|
||||
DEBUG: join prunable for task partitionId 3 and 2
|
||||
DEBUG: pruning merge fetch taskId 1
|
||||
DETAIL: Creating dependency on merge taskId 2
|
||||
DEBUG: pruning merge fetch taskId 3
|
||||
DEBUG: pruning merge fetch taskId 2
|
||||
DETAIL: Creating dependency on merge taskId 5
|
||||
DEBUG: pruning merge fetch taskId 4
|
||||
DETAIL: Creating dependency on merge taskId 4
|
||||
DEBUG: pruning merge fetch taskId 5
|
||||
DETAIL: Creating dependency on merge taskId 6
|
||||
DETAIL: Creating dependency on merge taskId 10
|
||||
DEBUG: pruning merge fetch taskId 7
|
||||
DETAIL: Creating dependency on merge taskId 6
|
||||
DEBUG: pruning merge fetch taskId 8
|
||||
DETAIL: Creating dependency on merge taskId 15
|
||||
DEBUG: pruning merge fetch taskId 10
|
||||
DETAIL: Creating dependency on merge taskId 8
|
||||
DEBUG: pruning merge fetch taskId 11
|
||||
DETAIL: Creating dependency on merge taskId 20
|
||||
id | author_id | title | word_count | name | id
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
|
|
@ -823,8 +823,7 @@ FROM
|
|||
l_orderkey = o_orderkey
|
||||
GROUP BY
|
||||
l_orderkey) AS unit_prices;
|
||||
ERROR: cannot push down this subquery
|
||||
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
|
||||
ERROR: shard counts of co-located tables do not match
|
||||
-- Check that we can prune shards in subqueries with VARCHAR partition columns
|
||||
CREATE TABLE subquery_pruning_varchar_test_table
|
||||
(
|
||||
|
|
|
@ -22,10 +22,10 @@ SELECT master_create_distributed_table('test_table_2', 'id', 'append');
|
|||
\copy test_table_2 FROM STDIN DELIMITER ','
|
||||
SET citus.log_multi_join_order to TRUE;
|
||||
SET client_min_messages to DEBUG1;
|
||||
-- Since we both have same amount of shards and they are colocated on the same node
|
||||
-- local join logic will be triggered.
|
||||
SET citus.enable_repartition_joins TO on;
|
||||
-- when joining append tables we always get dual re-partition joins
|
||||
SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id;
|
||||
LOG: join order: [ "test_table_1" ][ local partition join "test_table_2" ]
|
||||
LOG: join order: [ "test_table_1" ][ dual partition join "test_table_2" ]
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
6
|
||||
|
|
|
@ -141,7 +141,7 @@ CREATE TABLE events (
|
|||
event_type character varying(255),
|
||||
event_time bigint
|
||||
);
|
||||
SELECT master_create_distributed_table('events', 'composite_id', 'range');
|
||||
SELECT create_distributed_table('events', 'composite_id', 'range');
|
||||
|
||||
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||
\gset
|
||||
|
@ -181,7 +181,11 @@ CREATE TABLE users (
|
|||
composite_id user_composite_type,
|
||||
lastseen bigint
|
||||
);
|
||||
SELECT master_create_distributed_table('users', 'composite_id', 'range');
|
||||
SELECT create_distributed_table('users', 'composite_id', 'range');
|
||||
|
||||
-- we will guarantee co-locatedness for these tables
|
||||
UPDATE pg_dist_partition SET colocationid = 20001
|
||||
WHERE logicalrelid = 'events'::regclass OR logicalrelid = 'users'::regclass;
|
||||
|
||||
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||
\gset
|
||||
|
@ -228,7 +232,7 @@ CREATE TABLE lineitem_subquery (
|
|||
l_shipmode char(10) not null,
|
||||
l_comment varchar(44) not null,
|
||||
PRIMARY KEY(l_orderkey, l_linenumber) );
|
||||
SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range');
|
||||
SELECT create_distributed_table('lineitem_subquery', 'l_orderkey', 'range');
|
||||
|
||||
CREATE TABLE orders_subquery (
|
||||
o_orderkey bigint not null,
|
||||
|
@ -241,7 +245,11 @@ CREATE TABLE orders_subquery (
|
|||
o_shippriority integer not null,
|
||||
o_comment varchar(79) not null,
|
||||
PRIMARY KEY(o_orderkey) );
|
||||
SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range');
|
||||
SELECT create_distributed_table('orders_subquery', 'o_orderkey', 'range');
|
||||
|
||||
-- we will guarantee co-locatedness for these tabes
|
||||
UPDATE pg_dist_partition SET colocationid = 20002
|
||||
WHERE logicalrelid = 'orders_subquery'::regclass OR logicalrelid = 'lineitem_subquery'::regclass;
|
||||
|
||||
SET citus.enable_router_execution TO 'false';
|
||||
|
||||
|
|
|
@ -7,9 +7,9 @@ SELECT run_command_on_master_and_workers($f$
|
|||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
$f$);
|
||||
run_command_on_master_and_workers
|
||||
-----------------------------------
|
||||
|
||||
run_command_on_master_and_workers
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_master_and_workers($f$
|
||||
|
@ -20,9 +20,9 @@ SELECT run_command_on_master_and_workers($f$
|
|||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
$f$);
|
||||
run_command_on_master_and_workers
|
||||
-----------------------------------
|
||||
|
||||
run_command_on_master_and_workers
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_master_and_workers($f$
|
||||
|
@ -33,9 +33,9 @@ SELECT run_command_on_master_and_workers($f$
|
|||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
$f$);
|
||||
run_command_on_master_and_workers
|
||||
-----------------------------------
|
||||
|
||||
run_command_on_master_and_workers
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_master_and_workers($f$
|
||||
|
@ -45,9 +45,9 @@ SELECT run_command_on_master_and_workers($f$
|
|||
AS 'record_eq'
|
||||
IMMUTABLE;
|
||||
$f$);
|
||||
run_command_on_master_and_workers
|
||||
-----------------------------------
|
||||
|
||||
run_command_on_master_and_workers
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_master_and_workers($f$
|
||||
|
@ -58,9 +58,9 @@ SELECT run_command_on_master_and_workers($f$
|
|||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
$f$);
|
||||
run_command_on_master_and_workers
|
||||
-----------------------------------
|
||||
|
||||
run_command_on_master_and_workers
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_master_and_workers($f$
|
||||
|
@ -71,9 +71,9 @@ SELECT run_command_on_master_and_workers($f$
|
|||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
$f$);
|
||||
run_command_on_master_and_workers
|
||||
-----------------------------------
|
||||
|
||||
run_command_on_master_and_workers
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_master_and_workers($f$
|
||||
|
@ -84,9 +84,9 @@ SELECT run_command_on_master_and_workers($f$
|
|||
PROCEDURE = gt_user_composite_type_function
|
||||
);
|
||||
$f$);
|
||||
run_command_on_master_and_workers
|
||||
-----------------------------------
|
||||
|
||||
run_command_on_master_and_workers
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_master_and_workers($f$
|
||||
|
@ -97,9 +97,9 @@ SELECT run_command_on_master_and_workers($f$
|
|||
PROCEDURE = ge_user_composite_type_function
|
||||
);
|
||||
$f$);
|
||||
run_command_on_master_and_workers
|
||||
-----------------------------------
|
||||
|
||||
run_command_on_master_and_workers
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- ... use that function to create a custom equality operator...
|
||||
|
@ -117,9 +117,9 @@ SELECT run_command_on_master_and_workers($f$
|
|||
hashes
|
||||
);
|
||||
$f$);
|
||||
run_command_on_master_and_workers
|
||||
-----------------------------------
|
||||
|
||||
run_command_on_master_and_workers
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_master_and_workers($f$
|
||||
|
@ -130,9 +130,9 @@ SELECT run_command_on_master_and_workers($f$
|
|||
PROCEDURE = le_user_composite_type_function
|
||||
);
|
||||
$f$);
|
||||
run_command_on_master_and_workers
|
||||
-----------------------------------
|
||||
|
||||
run_command_on_master_and_workers
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_master_and_workers($f$
|
||||
|
@ -143,9 +143,9 @@ SELECT run_command_on_master_and_workers($f$
|
|||
PROCEDURE = lt_user_composite_type_function
|
||||
);
|
||||
$f$);
|
||||
run_command_on_master_and_workers
|
||||
-----------------------------------
|
||||
|
||||
run_command_on_master_and_workers
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- ... and create a custom operator family for hash indexes...
|
||||
|
@ -153,9 +153,9 @@ SELECT run_command_on_master_and_workers($f$
|
|||
|
||||
CREATE OPERATOR FAMILY cats_2_op_fam USING hash;
|
||||
$f$);
|
||||
run_command_on_master_and_workers
|
||||
-----------------------------------
|
||||
|
||||
run_command_on_master_and_workers
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- We need to define two different operator classes for the composite types
|
||||
|
@ -172,9 +172,9 @@ SELECT run_command_on_master_and_workers($f$
|
|||
|
||||
FUNCTION 1 cmp_user_composite_type_function(user_composite_type, user_composite_type);
|
||||
$f$);
|
||||
run_command_on_master_and_workers
|
||||
-----------------------------------
|
||||
|
||||
run_command_on_master_and_workers
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_master_and_workers($f$
|
||||
|
@ -184,9 +184,9 @@ SELECT run_command_on_master_and_workers($f$
|
|||
OPERATOR 1 = (user_composite_type, user_composite_type),
|
||||
FUNCTION 1 test_composite_type_hash(user_composite_type);
|
||||
$f$);
|
||||
run_command_on_master_and_workers
|
||||
-----------------------------------
|
||||
|
||||
run_command_on_master_and_workers
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE events (
|
||||
|
@ -195,10 +195,10 @@ CREATE TABLE events (
|
|||
event_type character varying(255),
|
||||
event_time bigint
|
||||
);
|
||||
SELECT master_create_distributed_table('events', 'composite_id', 'range');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
SELECT create_distributed_table('events', 'composite_id', 'range');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||
|
@ -222,12 +222,15 @@ CREATE TABLE users (
|
|||
composite_id user_composite_type,
|
||||
lastseen bigint
|
||||
);
|
||||
SELECT master_create_distributed_table('users', 'composite_id', 'range');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
SELECT create_distributed_table('users', 'composite_id', 'range');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- we will guarantee co-locatedness for these tables
|
||||
UPDATE pg_dist_partition SET colocationid = 20001
|
||||
WHERE logicalrelid = 'events'::regclass OR logicalrelid = 'users'::regclass;
|
||||
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||
\gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
|
||||
|
@ -264,10 +267,10 @@ CREATE TABLE lineitem_subquery (
|
|||
l_shipmode char(10) not null,
|
||||
l_comment varchar(44) not null,
|
||||
PRIMARY KEY(l_orderkey, l_linenumber) );
|
||||
SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
SELECT create_distributed_table('lineitem_subquery', 'l_orderkey', 'range');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE orders_subquery (
|
||||
|
@ -281,12 +284,15 @@ CREATE TABLE orders_subquery (
|
|||
o_shippriority integer not null,
|
||||
o_comment varchar(79) not null,
|
||||
PRIMARY KEY(o_orderkey) );
|
||||
SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
SELECT create_distributed_table('orders_subquery', 'o_orderkey', 'range');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- we will guarantee co-locatedness for these tabes
|
||||
UPDATE pg_dist_partition SET colocationid = 20002
|
||||
WHERE logicalrelid = 'orders_subquery'::regclass OR logicalrelid = 'lineitem_subquery'::regclass;
|
||||
SET citus.enable_router_execution TO 'false';
|
||||
-- Check that we don't crash if there are not any shards.
|
||||
SELECT
|
||||
|
@ -302,9 +308,9 @@ FROM
|
|||
l_orderkey = o_orderkey
|
||||
GROUP BY
|
||||
l_orderkey) AS unit_prices;
|
||||
avg
|
||||
-----
|
||||
|
||||
avg
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Load data into tables.
|
||||
|
|
|
@ -128,12 +128,16 @@ CREATE TABLE range2(id int);
|
|||
SELECT create_distributed_table('range2', 'id', 'range');
|
||||
CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}');
|
||||
|
||||
-- Mark tables co-located
|
||||
UPDATE pg_dist_partition SET colocationid = 30001
|
||||
WHERE logicalrelid = 'range1'::regclass OR logicalrelid = 'range2'::regclass;
|
||||
|
||||
-- Move shard placement and DON'T clean it up, now range1 and range2 are
|
||||
-- colocated, but only range2 has an orphaned shard.
|
||||
SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
|
||||
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid;
|
||||
|
||||
-- Make sure that tables are detected as colocated
|
||||
-- Make sure co-located join works
|
||||
SELECT * FROM range1 JOIN range2 ON range1.id = range2.id;
|
||||
|
||||
-- Make sure we can create a foreign key on community edition, because
|
||||
|
|
|
@ -36,9 +36,9 @@ SELECT master_create_distributed_table('test_table_2', 'id', 'append');
|
|||
|
||||
SET citus.log_multi_join_order to TRUE;
|
||||
SET client_min_messages to DEBUG1;
|
||||
SET citus.enable_repartition_joins TO on;
|
||||
|
||||
-- Since we both have same amount of shards and they are colocated on the same node
|
||||
-- local join logic will be triggered.
|
||||
-- when joining append tables we always get dual re-partition joins
|
||||
SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id;
|
||||
|
||||
-- Add two shards placement of interval [8,10] to test_table_1
|
||||
|
|
Loading…
Reference in New Issue