Don't use distributed insert/select for repartitioned joins

pull/3451/head
Hadi Moshayedi 2020-01-30 15:25:00 -08:00
parent 2e8c118a8f
commit 264530311a
3 changed files with 85 additions and 9 deletions

View File

@ -55,7 +55,8 @@ static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext, plannerRestrictionContext,
uint32 taskIdIndex, uint32 taskIdIndex,
bool allRelationsJoinedOnPartitionKey); bool allRelationsJoinedOnPartitionKey,
DeferredErrorMessage **routerPlannerError);
static DeferredErrorMessage * DistributedInsertSelectSupported(Query *queryTree, static DeferredErrorMessage * DistributedInsertSelectSupported(Query *queryTree,
RangeTblEntry *insertRte, RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte, RangeTblEntry *subqueryRte,
@ -259,7 +260,14 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
targetShardInterval, targetShardInterval,
plannerRestrictionContext, plannerRestrictionContext,
taskIdIndex, taskIdIndex,
allDistributionKeysInQueryAreEqual); allDistributionKeysInQueryAreEqual,
&distributedPlan->
planningError);
if (distributedPlan->planningError != NULL)
{
return distributedPlan;
}
/* add the task if it could be created */ /* add the task if it could be created */
if (modifyTask != NULL) if (modifyTask != NULL)
@ -408,7 +416,8 @@ static Task *
RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInterval, RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInterval,
PlannerRestrictionContext *plannerRestrictionContext, PlannerRestrictionContext *plannerRestrictionContext,
uint32 taskIdIndex, uint32 taskIdIndex,
bool safeToPushdownSubquery) bool safeToPushdownSubquery,
DeferredErrorMessage **routerPlannerError)
{ {
Query *copiedQuery = copyObject(originalQuery); Query *copiedQuery = copyObject(originalQuery);
RangeTblEntry *copiedInsertRte = ExtractResultRelationRTE(copiedQuery); RangeTblEntry *copiedInsertRte = ExtractResultRelationRTE(copiedQuery);
@ -517,10 +526,8 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
if (planningError) if (planningError)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), *routerPlannerError = planningError;
errmsg("cannot perform distributed planning for the given " return NULL;
"modification"),
errdetail("Select query cannot be pushed down to the worker.")));
} }

View File

@ -989,10 +989,62 @@ select create_distributed_table('test', 'x');
(1 row) (1 row)
set citus.enable_repartition_joins to true; set citus.enable_repartition_joins to true;
INSERT INTO test SELECT i, i FROM generate_series(1, 10) i;
EXPLAIN (costs off) INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y);
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
Map Task Count: 3
Merge Task Count: 4
-> MapMergeJob
Map Task Count: 3
Merge Task Count: 4
(11 rows)
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
insert into test(y, x) select a.x, b.y from test a JOIN test b USING (y); INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y);
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
RESET client_min_messages; RESET client_min_messages;
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
20
(1 row)
TRUNCATE test;
INSERT INTO test SELECT i, i FROM generate_series(1, 10) i;
EXPLAIN (costs off) INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y);
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
Map Task Count: 3
Merge Task Count: 4
-> MapMergeJob
Map Task Count: 3
Merge Task Count: 4
(11 rows)
SET client_min_messages TO DEBUG1;
INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y);
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
RESET client_min_messages;
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
20
(1 row)
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA insert_select_repartition CASCADE; DROP SCHEMA insert_select_repartition CASCADE;

View File

@ -460,9 +460,26 @@ create table test(x int, y int);
select create_distributed_table('test', 'x'); select create_distributed_table('test', 'x');
set citus.enable_repartition_joins to true; set citus.enable_repartition_joins to true;
INSERT INTO test SELECT i, i FROM generate_series(1, 10) i;
EXPLAIN (costs off) INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y);
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
insert into test(y, x) select a.x, b.y from test a JOIN test b USING (y); INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y);
RESET client_min_messages; RESET client_min_messages;
SELECT count(*) FROM test;
TRUNCATE test;
INSERT INTO test SELECT i, i FROM generate_series(1, 10) i;
EXPLAIN (costs off) INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y);
SET client_min_messages TO DEBUG1;
INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y);
RESET client_min_messages;
SELECT count(*) FROM test;
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA insert_select_repartition CASCADE; DROP SCHEMA insert_select_repartition CASCADE;