diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 707bfcb6f..c0e446e3a 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -55,7 +55,8 @@ static Task * RouterModifyTaskForShardInterval(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext, uint32 taskIdIndex, - bool allRelationsJoinedOnPartitionKey); + bool allRelationsJoinedOnPartitionKey, + DeferredErrorMessage **routerPlannerError); static DeferredErrorMessage * DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte, RangeTblEntry *subqueryRte, @@ -259,7 +260,14 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, targetShardInterval, plannerRestrictionContext, taskIdIndex, - allDistributionKeysInQueryAreEqual); + allDistributionKeysInQueryAreEqual, + &distributedPlan-> + planningError); + + if (distributedPlan->planningError != NULL) + { + return distributedPlan; + } /* add the task if it could be created */ if (modifyTask != NULL) @@ -408,7 +416,8 @@ static Task * RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInterval, PlannerRestrictionContext *plannerRestrictionContext, uint32 taskIdIndex, - bool safeToPushdownSubquery) + bool safeToPushdownSubquery, + DeferredErrorMessage **routerPlannerError) { Query *copiedQuery = copyObject(originalQuery); RangeTblEntry *copiedInsertRte = ExtractResultRelationRTE(copiedQuery); @@ -517,10 +526,8 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter if (planningError) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given " - "modification"), - errdetail("Select query cannot be pushed down to the worker."))); + *routerPlannerError = planningError; + return NULL; } diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index 65d374e70..9aa0a5181 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -989,10 +989,62 @@ select create_distributed_table('test', 'x'); (1 row) set citus.enable_repartition_joins to true; +INSERT INTO test SELECT i, i FROM generate_series(1, 10) i; +EXPLAIN (costs off) INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y); + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: None, not supported for re-partition queries + -> MapMergeJob + Map Task Count: 3 + Merge Task Count: 4 + -> MapMergeJob + Map Task Count: 3 + Merge Task Count: 4 +(11 rows) + SET client_min_messages TO DEBUG1; -insert into test(y, x) select a.x, b.y from test a JOIN test b USING (y); +INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y); DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: Collecting INSERT ... SELECT results on coordinator RESET client_min_messages; +SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 20 +(1 row) + +TRUNCATE test; +INSERT INTO test SELECT i, i FROM generate_series(1, 10) i; +EXPLAIN (costs off) INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y); + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: None, not supported for re-partition queries + -> MapMergeJob + Map Task Count: 3 + Merge Task Count: 4 + -> MapMergeJob + Map Task Count: 3 + Merge Task Count: 4 +(11 rows) + +SET client_min_messages TO DEBUG1; +INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y); +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +RESET client_min_messages; +SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 20 +(1 row) + SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index e6f4bfa87..ea2f3190a 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -460,9 +460,26 @@ create table test(x int, y int); select create_distributed_table('test', 'x'); set citus.enable_repartition_joins to true; +INSERT INTO test SELECT i, i FROM generate_series(1, 10) i; + +EXPLAIN (costs off) INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y); + SET client_min_messages TO DEBUG1; -insert into test(y, x) select a.x, b.y from test a JOIN test b USING (y); +INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y); RESET client_min_messages; +SELECT count(*) FROM test; + +TRUNCATE test; +INSERT INTO test SELECT i, i FROM generate_series(1, 10) i; + +EXPLAIN (costs off) INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y); + +SET client_min_messages TO DEBUG1; +INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y); +RESET client_min_messages; + +SELECT count(*) FROM test; + SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE;