diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 707bfcb6f..c0e446e3a 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -55,7 +55,8 @@ static Task * RouterModifyTaskForShardInterval(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext, uint32 taskIdIndex, - bool allRelationsJoinedOnPartitionKey); + bool allRelationsJoinedOnPartitionKey, + DeferredErrorMessage **routerPlannerError); static DeferredErrorMessage * DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte, RangeTblEntry *subqueryRte, @@ -259,7 +260,14 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, targetShardInterval, plannerRestrictionContext, taskIdIndex, - allDistributionKeysInQueryAreEqual); + allDistributionKeysInQueryAreEqual, + &distributedPlan-> + planningError); + + if (distributedPlan->planningError != NULL) + { + return distributedPlan; + } /* add the task if it could be created */ if (modifyTask != NULL) @@ -408,7 +416,8 @@ static Task * RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInterval, PlannerRestrictionContext *plannerRestrictionContext, uint32 taskIdIndex, - bool safeToPushdownSubquery) + bool safeToPushdownSubquery, + DeferredErrorMessage **routerPlannerError) { Query *copiedQuery = copyObject(originalQuery); RangeTblEntry *copiedInsertRte = ExtractResultRelationRTE(copiedQuery); @@ -517,10 +526,8 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter if (planningError) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given " - "modification"), - errdetail("Select query cannot be pushed down to the worker."))); + *routerPlannerError = planningError; + return NULL; } diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index 65d374e70..9aa0a5181 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -989,10 +989,62 @@ select create_distributed_table('test', 'x'); (1 row) set citus.enable_repartition_joins to true; +INSERT INTO test SELECT i, i FROM generate_series(1, 10) i; +EXPLAIN (costs off) INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y); + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: None, not supported for re-partition queries + -> MapMergeJob + Map Task Count: 3 + Merge Task Count: 4 + -> MapMergeJob + Map Task Count: 3 + Merge Task Count: 4 +(11 rows) + SET client_min_messages TO DEBUG1; -insert into test(y, x) select a.x, b.y from test a JOIN test b USING (y); +INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y); DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: Collecting INSERT ... SELECT results on coordinator RESET client_min_messages; +SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 20 +(1 row) + +TRUNCATE test; +INSERT INTO test SELECT i, i FROM generate_series(1, 10) i; +EXPLAIN (costs off) INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y); + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: None, not supported for re-partition queries + -> MapMergeJob + Map Task Count: 3 + Merge Task Count: 4 + -> MapMergeJob + Map Task Count: 3 + Merge Task Count: 4 +(11 rows) + +SET client_min_messages TO DEBUG1; +INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y); +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +RESET client_min_messages; +SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 20 +(1 row) + SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index c8398ccbb..c2697bbe6 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -851,13 +851,10 @@ DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_t DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300002 raw_events_first JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE ((raw_events_second.user_id OPERATOR(pg_catalog.=) ANY (ARRAY[19, 20, 21])) AND ((worker_hash(raw_events_first.user_id) OPERATOR(pg_catalog.>=) 0) AND (worker_hash(raw_events_first.user_id) OPERATOR(pg_catalog.<=) 1073741823))) DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300003 raw_events_first JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE ((raw_events_second.user_id OPERATOR(pg_catalog.=) ANY (ARRAY[19, 20, 21])) AND ((worker_hash(raw_events_first.user_id) OPERATOR(pg_catalog.>=) 1073741824) AND (worker_hash(raw_events_first.user_id) OPERATOR(pg_catalog.<=) 2147483647))) DEBUG: Plan is router executable - -- the following is a very tricky query for Citus - -- although we do not support pushing down JOINs on non-partition - -- columns here it is safe to push it down given that we're looking for - -- a specific value (i.e., value_1 = 12) on the joining column. - -- Note that the query always hits the same shard on raw_events_second - -- and this query wouldn't have worked if we're to use different worker - -- count or shard replication factor +SET client_min_messages TO WARNING; + -- following query should use repartitioned joins and results should + -- be routed via coordinator + SET citus.enable_repartition_joins TO true; INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id @@ -865,8 +862,6 @@ DEBUG: Plan is router executable raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1 AND raw_events_first.value_1 = 12; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. -- some unsupported LEFT/INNER JOINs -- JOIN on one table with partition column other is not INSERT INTO agg_events (user_id) @@ -874,16 +869,13 @@ DETAIL: Select query cannot be pushed down to the worker. raw_events_first.user_id FROM raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -- same as the above with INNER JOIN INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. -- a not meaningful query INSERT INTO agg_events (user_id) @@ -891,24 +883,33 @@ DETAIL: Select query cannot be pushed down to the worker. FROM raw_events_first, raw_events_second WHERE raw_events_first.user_id = raw_events_first.value_1; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot perform distributed planning on this query +DETAIL: Cartesian products are currently unsupported -- both tables joined on non-partition columns INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -- same as the above with INNER JOIN + -- we support this with route to coordinator + SELECT coordinator_plan($Q$ + EXPLAIN (costs off) INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(4 rows) + -- even if there is a filter on the partition key, since the join is not on the partition key we reject -- this query INSERT INTO agg_events (user_id) @@ -918,38 +919,70 @@ FROM raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 WHERE raw_events_first.user_id = 10; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -- same as the above with INNER JOIN + -- we support this with route to coordinator + SELECT coordinator_plan($Q$ + EXPLAIN (costs off) INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 WHERE raw_events_first.user_id = 10; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(4 rows) + -- make things a bit more complicate with IN clauses + -- we support this with route to coordinator + SELECT coordinator_plan($Q$ + EXPLAIN (costs off) INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. - -- implicit join on non partition column should also not be pushed down +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(4 rows) + + -- implicit join on non partition column should also not be pushed down, + -- so we fall back to route via coordinator + SELECT coordinator_plan($Q$ + EXPLAIN (costs off) INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first, raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. - -- the following is again a tricky query for Citus - -- if the given filter was on value_1 as shown in the above, Citus could - -- push it down. But here the query is refused +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(4 rows) + +RESET client_min_messages; + -- The following is again a tricky query for Citus. If the given filter was + -- on value_1 as shown in the above, Citus could push it down and use + -- distributed INSERT/SELECT. But we instead fall back to route via coordinator. + SELECT coordinator_plan($Q$ + EXPLAIN (costs off) INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id @@ -957,11 +990,19 @@ DETAIL: Select query cannot be pushed down to the worker. raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1 AND raw_events_first.value_2 = 12; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. - -- lets do some unsupported query tests with subqueries +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(4 rows) + -- foo is not joined on the partition key so the query is not - -- pushed down + -- pushed down. So instead we route via coordinator. + SELECT coordinator_plan($Q$ + EXPLAIN (costs off) INSERT INTO agg_events (user_id, value_4_agg) SELECT @@ -990,10 +1031,23 @@ DETAIL: Select query cannot be pushed down to the worker. ON (f.id = f2.id)) as outer_most GROUP BY outer_most.id; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> HashAggregate + Group Key: remote_scan.id + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(8 rows) + -- if the given filter was on value_1 as shown in the above, Citus could - -- push it down. But here the query is refused + -- push it down. But here the query falls back to route via coordinator. + SELECT coordinator_plan($Q$ + EXPLAIN (costs off) INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id @@ -1001,11 +1055,19 @@ DETAIL: Select query cannot be pushed down to the worker. raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1 AND raw_events_first.value_2 = 12; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. - -- lets do some unsupported query tests with subqueries +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(4 rows) + -- foo is not joined on the partition key so the query is not - -- pushed down + -- pushed down, and it falls back to route via coordinator +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_events (user_id, value_4_agg) SELECT @@ -1034,8 +1096,19 @@ DETAIL: Select query cannot be pushed down to the worker. ON (f.id = f2.id)) as outer_most GROUP BY outer_most.id; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> HashAggregate + Group Key: remote_scan.id + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(8 rows) + INSERT INTO agg_events (value_4_agg, value_1_agg, @@ -1050,8 +1123,8 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, raw_events_second WHERE raw_events_first.user_id != raw_events_second.user_id GROUP BY raw_events_second.user_id) AS foo; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +SET client_min_messages TO DEBUG2; -- INSERT returns NULL partition key value via coordinator INSERT INTO agg_events (value_4_agg, @@ -1290,7 +1363,11 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT f.id AS user_ DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'user_id' +SET client_min_messages TO WARNING; -- cannot pushdown the query since the JOIN is not equi JOIN +-- falls back to route via coordinator +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_events (user_id, value_4_agg) SELECT @@ -1318,9 +1395,23 @@ outer_most.id, max(outer_most.value) HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 ON (f.id != f2.id)) as outer_most GROUP BY outer_most.id; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> HashAggregate + Group Key: remote_scan.id + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(8 rows) + -- cannot pushdown since foo2 is not join on partition key +-- falls back to route via coordinator +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_events (user_id, value_4_agg) SELECT @@ -1349,9 +1440,26 @@ FROM ON (f.id = f2.id)) as outer_most GROUP BY outer_most.id; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> HashAggregate + Group Key: remote_scan.id + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> HashAggregate + Group Key: remote_scan.id + Filter: (pg_catalog.sum(remote_scan.worker_column_4) > '10'::numeric) + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(11 rows) + -- cannot push down since foo doesn't have en equi join +-- falls back to route via coordinator +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_events (user_id, value_4_agg) SELECT @@ -1380,10 +1488,24 @@ FROM ON (f.id = f2.id)) as outer_most GROUP BY outer_most.id; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> HashAggregate + Group Key: remote_scan.id + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(8 rows) + -- some unsupported LATERAL JOINs -- join on averages is not on the partition key +-- should fall back to route via coordinator +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_events (user_id, value_4_agg) SELECT averages.user_id, avg(averages.value_4) @@ -1400,9 +1522,21 @@ FROM raw_events_first WHERE value_4 = reference_ids.user_id) as averages ON true GROUP BY averages.user_id; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> HashAggregate + Group Key: remote_scan.user_id + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(6 rows) + -- join among reference_ids and averages is not on the partition key +-- should fall back to route via coordinator +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_events (user_id, value_4_agg) SELECT averages.user_id, avg(averages.value_4) @@ -1418,9 +1552,23 @@ FROM FROM raw_events_first) as averages ON averages.value_4 = reference_ids.user_id GROUP BY averages.user_id; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> HashAggregate + Group Key: remote_scan.user_id + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(8 rows) + -- join among the agg_ids and averages is not on the partition key +-- should fall back to route via coordinator +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_events (user_id, value_4_agg) SELECT averages.user_id, avg(averages.value_4) @@ -1438,24 +1586,44 @@ FROM JOIN LATERAL (SELECT user_id, value_4 FROM agg_events) as agg_ids ON (agg_ids.value_4 = averages.user_id) GROUP BY averages.user_id; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. --- not supported subqueries in WHERE clause --- since the selected value in the WHERE is not --- partition key +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(4 rows) + +-- Selected value in the WHERE is not partition key, so we cannot use distributed +-- INSERT/SELECT and falls back route via coordinator +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO raw_events_second (user_id) SELECT user_id FROM raw_events_first WHERE user_id IN (SELECT value_1 FROM raw_events_second); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: repartition + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(6 rows) + -- same as above but slightly more complex -- since it also includes subquery in FROM as well +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_events (user_id) SELECT f2.id FROM + (SELECT id FROM (SELECT reference_table.user_id AS id @@ -1477,9 +1645,19 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, ON (f.id = f2.id) WHERE f.id IN (SELECT value_1 FROM raw_events_second); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: repartition + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(6 rows) + -- some more semi-anti join tests +SET client_min_messages TO DEBUG2; -- join in where INSERT INTO raw_events_second (user_id) @@ -1493,7 +1671,11 @@ DEBUG: Skipping target shard interval 13300005 since SELECT query for it pruned DEBUG: Skipping target shard interval 13300006 since SELECT query for it pruned away DEBUG: Skipping target shard interval 13300007 since SELECT query for it pruned away DEBUG: Plan is router executable +RESET client_min_messages; -- we cannot push this down since it is NOT IN +-- we use repartition insert/select instead +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO raw_events_second (user_id) SELECT user_id @@ -1501,8 +1683,18 @@ FROM raw_events_first WHERE user_id NOT IN (SELECT raw_events_second.user_id FROM raw_events_second, raw_events_first WHERE raw_events_second.user_id = raw_events_first.user_id AND raw_events_first.user_id = 200); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: repartition + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 1 +(6 rows) + +SET client_min_messages TO DEBUG2; -- safe to push down INSERT INTO raw_events_second (user_id) @@ -1563,7 +1755,11 @@ DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_t DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_4_agg) SELECT id, max(value) AS max FROM (SELECT f2.id, f2.v4 AS value FROM ((SELECT foo.id FROM (SELECT raw_events_first.user_id AS id FROM (public.raw_events_first_13300002 raw_events_first LEFT JOIN public.reference_table_13300012 reference_table ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) reference_table.user_id)))) foo) f LEFT JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300002 raw_events_first, public.raw_events_second_13300006 raw_events_second WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.user_id HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id)))) outer_most WHERE ((worker_hash(id) OPERATOR(pg_catalog.>=) 0) AND (worker_hash(id) OPERATOR(pg_catalog.<=) 1073741823)) GROUP BY id DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_4_agg) SELECT id, max(value) AS max FROM (SELECT f2.id, f2.v4 AS value FROM ((SELECT foo.id FROM (SELECT raw_events_first.user_id AS id FROM (public.raw_events_first_13300003 raw_events_first LEFT JOIN public.reference_table_13300012 reference_table ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) reference_table.user_id)))) foo) f LEFT JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300003 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.user_id HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id)))) outer_most WHERE ((worker_hash(id) OPERATOR(pg_catalog.>=) 1073741824) AND (worker_hash(id) OPERATOR(pg_catalog.<=) 2147483647)) GROUP BY id DEBUG: Plan is router executable +RESET client_min_messages; -- cannot push down since the f.id IN is matched with value_1 +-- we use repartition insert/select instead +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO raw_events_second (user_id) SELECT user_id @@ -1591,8 +1787,18 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, ON (f.id = f2.id) WHERE f.id IN (SELECT value_1 FROM raw_events_second)); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: repartition + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(6 rows) + +SET client_min_messages TO DEBUG2; -- same as above, but this time is it safe to push down since -- f.id IN is matched with user_id INSERT INTO raw_events_second @@ -1627,6 +1833,7 @@ DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300002 raw_events_first WHERE ((user_id OPERATOR(pg_catalog.=) ANY (SELECT f2.id FROM ((SELECT foo.id FROM (SELECT reference_table.user_id AS id FROM public.raw_events_first_13300002 raw_events_first_1, public.reference_table_13300012 reference_table WHERE (raw_events_first_1.user_id OPERATOR(pg_catalog.=) reference_table.user_id)) foo) f JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first_1.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300002 raw_events_first_1, public.raw_events_second_13300006 raw_events_second WHERE (raw_events_first_1.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.user_id HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id))) WHERE (f.id OPERATOR(pg_catalog.=) ANY (SELECT raw_events_second.user_id FROM public.raw_events_second_13300006 raw_events_second)))) AND ((worker_hash(user_id) OPERATOR(pg_catalog.>=) 0) AND (worker_hash(user_id) OPERATOR(pg_catalog.<=) 1073741823))) DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id OPERATOR(pg_catalog.=) ANY (SELECT f2.id FROM ((SELECT foo.id FROM (SELECT reference_table.user_id AS id FROM public.raw_events_first_13300003 raw_events_first_1, public.reference_table_13300012 reference_table WHERE (raw_events_first_1.user_id OPERATOR(pg_catalog.=) reference_table.user_id)) foo) f JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first_1.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300003 raw_events_first_1, public.raw_events_second_13300007 raw_events_second WHERE (raw_events_first_1.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.user_id HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id))) WHERE (f.id OPERATOR(pg_catalog.=) ANY (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second)))) AND ((worker_hash(user_id) OPERATOR(pg_catalog.>=) 1073741824) AND (worker_hash(user_id) OPERATOR(pg_catalog.<=) 2147483647))) DEBUG: Plan is router executable +RESET client_min_messages; -- cannot push down since top level user_id is matched with NOT IN INSERT INTO raw_events_second (user_id) @@ -1655,8 +1862,8 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, ON (f.id = f2.id) WHERE f.id IN (SELECT user_id FROM raw_events_second)); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot pushdown the subquery +DETAIL: There exist a reference table in the outer part of the outer join -- cannot push down since join is not equi join (f.id > f2.id) INSERT INTO raw_events_second (user_id) @@ -1685,8 +1892,8 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, ON (f.id > f2.id) WHERE f.id IN (SELECT user_id FROM raw_events_second)); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot pushdown the subquery +DETAIL: There exist a reference table in the outer part of the outer join -- we currently not support grouping sets INSERT INTO agg_events (user_id, @@ -1697,8 +1904,6 @@ SELECT user_id, Sum(value_2) AS sum_val2 FROM raw_events_second GROUP BY grouping sets ( ( user_id ), ( value_1 ), ( user_id, value_1 ), ( ) ); -DEBUG: grouping sets are not allowed in distributed INSERT ... SELECT queries -DEBUG: Router planner cannot handle multi-shard select queries ERROR: could not run distributed query with GROUPING SETS, CUBE, or ROLLUP HINT: Consider using an equality filter on the distributed table's partition column. -- set back to INFO diff --git a/src/test/regress/expected/multi_insert_select_conflict.out b/src/test/regress/expected/multi_insert_select_conflict.out index 400f60669..d069cf7d3 100644 --- a/src/test/regress/expected/multi_insert_select_conflict.out +++ b/src/test/regress/expected/multi_insert_select_conflict.out @@ -274,8 +274,9 @@ DEBUG: generating subplan XXX_1 for CTE basic: SELECT col_1, col_2 FROM on_conf DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT basic.col_1, basic.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) basic) citus_insert_select_subquery DEBUG: Collecting INSERT ... SELECT results on coordinator RESET client_min_messages; --- Following query is not supported since error checks of the subquery pushdown planner --- and insert select planner have not been unified. It should work after unifying them. +-- Following query is supported by using repartition join for the insert/select +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) WITH cte AS ( SELECT col_1, col_2 @@ -288,8 +289,17 @@ SELECT source_table_1.col_2 FROM cte, source_table_1 WHERE cte.col_1 = source_table_1.col_1 ON CONFLICT DO NOTHING; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: repartition + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(6 rows) + SET citus.enable_cte_inlining TO true; -- Tests with foreign key to reference table CREATE TABLE test_ref_table (key int PRIMARY KEY); diff --git a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out index d37928073..46c29b801 100644 --- a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out +++ b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out @@ -30,8 +30,7 @@ FROM ( ) t GROUP BY user_id ) q; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator --------------------------------------------------------------------- --------------------------------------------------------------------- -- Funnel grouped by whether or not a user has done an event @@ -350,8 +349,7 @@ FROM ( GROUP BY user_id ) AS shard_union ORDER BY user_lastseen DESC; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -- not pushable since lateral join is not on the partition key INSERT INTO agg_results_third (user_id, agg_time, value_2_agg) SELECT @@ -379,8 +377,7 @@ FROM ( GROUP BY user_id ) AS shard_union ORDER BY user_lastseen DESC; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -- not pushable since lateral join is not on the partition key INSERT INTO agg_results_third (user_id, agg_time, value_2_agg) SELECT @@ -408,40 +405,80 @@ FROM ( GROUP BY user_id ) AS shard_union ORDER BY user_lastseen DESC; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns --------------------------------------------------------------------- --------------------------------------------------------------------- -- Count the number of distinct users_table who are in segment X and Y and Z --------------------------------------------------------------------- --------------------------------------------------------------------- --- not pushable since partition key is NOT IN +-- not pushable since partition key is NOT IN. Use pull to coordinator instead. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third (user_id) SELECT DISTINCT user_id FROM users_table WHERE user_id NOT IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20) AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 30 AND value_1 <= 40) AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. --- not pushable since partition key is not selected from the second subquery +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> HashAggregate + Group Key: remote_scan.user_id + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(8 rows) + +-- not pushable since partition key is not selected from the second subquery. +-- Use pull to coordinator instead. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third (user_id) SELECT DISTINCT user_id FROM users_table WHERE user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20) AND user_id IN (SELECT value_1 FROM users_table WHERE value_1 >= 30 AND value_1 <= 40) AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. --- not pushable since second subquery does not return bare partition key +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> HashAggregate + Group Key: remote_scan.user_id + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(8 rows) + +-- not pushable since second subquery does not return bare partition key. +-- Use pull to coordinator instead. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third (user_id) SELECT DISTINCT user_id FROM users_table WHERE user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20) AND user_id IN (SELECT 3 * user_id FROM users_table WHERE value_1 >= 30 AND value_1 <= 40) AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> HashAggregate + Group Key: remote_scan.user_id + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(8 rows) + --------------------------------------------------------------------- --------------------------------------------------------------------- -- Find customers who have done X, and satisfy other customer specific criteria @@ -453,16 +490,14 @@ SELECT user_id, value_2 FROM users_table WHERE value_1 > 101 AND value_1 < 110 AND value_2 >= 5 AND EXISTS (SELECT user_id FROM events_table WHERE event_type>101 AND event_type < 110 AND value_3 > 100 AND user_id!=users_table.user_id); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -- not pushable since the join is not on the partition key INSERT INTO agg_results_third(user_id, value_2_agg) SELECT user_id, value_2 FROM users_table WHERE value_1 > 101 AND value_1 < 110 AND value_2 >= 5 AND EXISTS (SELECT user_id FROM events_table WHERE event_type>101 AND event_type < 110 AND value_3 > 100 AND event_type = users_table.user_id); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns --------------------------------------------------------------------- --------------------------------------------------------------------- -- Customers who haven’t done X, and satisfy other customer specific criteria @@ -474,16 +509,14 @@ SELECT user_id, value_2 FROM users_table WHERE value_1 = 101 AND value_2 >= 5 AND NOT EXISTS (SELECT user_id FROM events_table WHERE event_type=101 AND value_3 > 100 AND user_id!=users_table.user_id); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -- not pushable since the join is not the partition key INSERT INTO agg_results_third(user_id, value_2_agg) SELECT user_id, value_2 FROM users_table WHERE value_1 = 101 AND value_2 >= 5 AND NOT EXISTS (SELECT user_id FROM events_table WHERE event_type=101 AND value_3 > 100 AND event_type=users_table.user_id); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns --------------------------------------------------------------------- --------------------------------------------------------------------- -- Customers who have done X and Y, and satisfy other customer specific criteria @@ -496,8 +529,7 @@ SELECT user_id, value_2 FROM users_table WHERE AND value_2 >= 5 AND EXISTS (SELECT user_id FROM events_table WHERE event_type!=100 AND value_3 > 100 AND user_id=users_table.user_id) AND EXISTS (SELECT user_id FROM events_table WHERE event_type=101 AND value_3 > 100 AND user_id!=users_table.user_id); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns --------------------------------------------------------------------- --------------------------------------------------------------------- -- Customers who have done X and haven’t done Y, and satisfy other customer specific criteria @@ -509,8 +541,7 @@ SELECT user_id, value_2 FROM users_table WHERE value_2 >= 5 AND EXISTS (SELECT user_id FROM events_table WHERE event_type > 100 AND event_type <= 300 AND value_3 > 100 AND user_id!=users_table.user_id) AND NOT EXISTS (SELECT user_id FROM events_table WHERE event_type > 300 AND event_type <= 350 AND value_3 > 100 AND user_id=users_table.user_id); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns --------------------------------------------------------------------- --------------------------------------------------------------------- -- Customers who have done X more than 2 times, and satisfy other customer specific criteria @@ -532,8 +563,7 @@ INSERT INTO agg_results_third(user_id, value_2_agg) AND user_id != users_table.user_id GROUP BY user_id HAVING Count(*) > 2); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -- not pushable since the second join is not on the partition key INSERT INTO agg_results_third(user_id, value_2_agg) SELECT user_id, @@ -550,8 +580,7 @@ INSERT INTO agg_results_third(user_id, value_2_agg) AND event_type = users_table.user_id GROUP BY user_id HAVING Count(*) > 2); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -- not pushable since the second join is not on the partition key INSERT INTO agg_results_third(user_id, value_2_agg) SELECT user_id, @@ -568,14 +597,15 @@ INSERT INTO agg_results_third(user_id, value_2_agg) AND user_id = users_table.value_1 GROUP BY user_id HAVING Count(*) > 2); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns --------------------------------------------------------------------- --------------------------------------------------------------------- -- Find me all users_table who has done some event and has filters --------------------------------------------------------------------- --------------------------------------------------------------------- --- not pushable due to NOT IN +-- not pushable due to NOT IN. Use repartition insert/select. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third(user_id) Select user_id From events_table @@ -586,9 +616,21 @@ And user_id NOT in From users_table Where value_1 = 15 And value_2 > 25); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. --- not pushable since we're not selecting the partition key +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: repartition + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(6 rows) + +-- not pushable since we're not selecting the partition key. +-- Use repartition insert/select. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third(user_id) Select user_id From events_table @@ -599,10 +641,21 @@ And user_id in From users_table Where value_1 = 15 And value_2 > 25); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: repartition + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(6 rows) + -- not pushable since we're not selecting the partition key - -- from the events table + -- from the events table. Use repartition insert/select. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third(user_id) Select user_id From events_table @@ -613,34 +666,83 @@ And event_type in From users_table Where value_1 = 15 And value_2 > 25); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: repartition + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(6 rows) + --------------------------------------------------------------------- --------------------------------------------------------------------- -- Which events_table did people who has done some specific events_table --------------------------------------------------------------------- --------------------------------------------------------------------- --- not pushable due to NOT IN +-- not pushable due to NOT IN. Use pull to coordinator instead. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third(user_id, value_1_agg) SELECT user_id, event_type FROM events_table WHERE user_id NOT IN (SELECT user_id from events_table WHERE event_type > 500 and event_type < 505) GROUP BY user_id, event_type; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. --- not pushable due to not selecting the partition key +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> HashAggregate + Group Key: remote_scan.user_id, remote_scan.event_type + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(8 rows) + +-- not pushable due to not selecting the partition key. Use pull to coordinator. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third(user_id, value_1_agg) SELECT user_id, event_type FROM events_table WHERE user_id IN (SELECT value_2 from events_table WHERE event_type > 500 and event_type < 505) GROUP BY user_id, event_type; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. --- not pushable due to not comparing user id from the events table +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> HashAggregate + Group Key: remote_scan.user_id, remote_scan.event_type + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(8 rows) + +-- not pushable due to not comparing user id from the events table. +-- Use pull to coordinator. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third(user_id, value_1_agg) SELECT user_id, event_type FROM events_table WHERE event_type IN (SELECT user_id from events_table WHERE event_type > 500 and event_type < 505) GROUP BY user_id, event_type; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> HashAggregate + Group Key: remote_scan.user_id, remote_scan.event_type + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(8 rows) + --------------------------------------------------------------------- --------------------------------------------------------------------- -- Find my assets that have the highest probability and fetch their metadata @@ -662,8 +764,7 @@ FROM ) temp ON users_table.user_id = temp.user_id WHERE users_table.value_1 < 50; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator -- not pushable since the join is not on the partition key INSERT INTO agg_results_third(user_id, value_1_agg, value_3_agg) SELECT @@ -680,8 +781,8 @@ FROM ) temp ON users_table.user_id = temp.user_id WHERE users_table.value_1 < 50; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning -- supported via recursive planning INSERT INTO agg_results (user_id, agg_time, value_2_agg) SELECT diff --git a/src/test/regress/expected/with_dml.out b/src/test/regress/expected/with_dml.out index 766d4cece..07a25f686 100644 --- a/src/test/regress/expected/with_dml.out +++ b/src/test/regress/expected/with_dml.out @@ -70,9 +70,13 @@ WHERE AND EXISTS (SELECT * FROM ids_to_delete); DEBUG: generating subplan XXX_1 for CTE ids_to_delete: SELECT tenant_id FROM with_dml.distributed_table WHERE (dept OPERATOR(pg_catalog.=) 5) DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE with_dml.distributed_table SET dept = (distributed_table.dept OPERATOR(pg_catalog.+) 1) FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_delete, (SELECT distributed_table_1.tenant_id FROM with_dml.distributed_table distributed_table_1 WHERE ((distributed_table_1.tenant_id)::integer OPERATOR(pg_catalog.<) 60)) some_tenants WHERE ((some_tenants.tenant_id OPERATOR(pg_catalog.=) ids_to_delete.tenant_id) AND (distributed_table.tenant_id OPERATOR(pg_catalog.=) some_tenants.tenant_id) AND (EXISTS (SELECT ids_to_delete_1.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_delete_1))) --- this query errors out since we've some hard --- errors in the INSERT ... SELECT pushdown --- which prevents to fallback to recursive planning +SET client_min_messages TO WARNING; +-- this query falls back repartitioned insert/select since we've some hard +-- errors in the INSERT ... SELECT pushdown which prevents to fallback to +-- recursive planning +SELECT * FROM +coordinator_plan($Q$ +EXPLAIN (costs off) WITH ids_to_upsert AS ( SELECT tenant_id FROM distributed_table WHERE dept > 7 @@ -81,8 +85,14 @@ INSERT INTO distributed_table SELECT distributed_table.tenant_id FROM ids_to_upsert, distributed_table WHERE distributed_table.tenant_id = ids_to_upsert.tenant_id ON CONFLICT (tenant_id) DO UPDATE SET dept = 8; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +$Q$) s +WHERE s LIKE '%INSERT/SELECT method%'; + query_plan +--------------------------------------------------------------------- + INSERT/SELECT method: repartition +(1 row) + +SET client_min_messages TO DEBUG1; -- the following query is very similar to the above one -- but this time the query is pulled to coordinator since -- we return before hitting any hard errors diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index e6f4bfa87..ea2f3190a 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -460,9 +460,26 @@ create table test(x int, y int); select create_distributed_table('test', 'x'); set citus.enable_repartition_joins to true; +INSERT INTO test SELECT i, i FROM generate_series(1, 10) i; + +EXPLAIN (costs off) INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y); + SET client_min_messages TO DEBUG1; -insert into test(y, x) select a.x, b.y from test a JOIN test b USING (y); +INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y); RESET client_min_messages; +SELECT count(*) FROM test; + +TRUNCATE test; +INSERT INTO test SELECT i, i FROM generate_series(1, 10) i; + +EXPLAIN (costs off) INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y); + +SET client_min_messages TO DEBUG1; +INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y); +RESET client_min_messages; + +SELECT count(*) FROM test; + SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index bb9ea4619..117d4fc6c 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -623,13 +623,11 @@ FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id WHERE raw_events_second.user_id IN (19, 20, 21); - -- the following is a very tricky query for Citus - -- although we do not support pushing down JOINs on non-partition - -- columns here it is safe to push it down given that we're looking for - -- a specific value (i.e., value_1 = 12) on the joining column. - -- Note that the query always hits the same shard on raw_events_second - -- and this query wouldn't have worked if we're to use different worker - -- count or shard replication factor +SET client_min_messages TO WARNING; + + -- following query should use repartitioned joins and results should + -- be routed via coordinator + SET citus.enable_repartition_joins TO true; INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id @@ -669,11 +667,15 @@ FROM raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1; -- same as the above with INNER JOIN + -- we support this with route to coordinator + SELECT coordinator_plan($Q$ + EXPLAIN (costs off) INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1; +$Q$); -- even if there is a filter on the partition key, since the join is not on the partition key we reject -- this query @@ -686,32 +688,48 @@ WHERE raw_events_first.user_id = 10; -- same as the above with INNER JOIN + -- we support this with route to coordinator + SELECT coordinator_plan($Q$ + EXPLAIN (costs off) INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 WHERE raw_events_first.user_id = 10; +$Q$); -- make things a bit more complicate with IN clauses + -- we support this with route to coordinator + SELECT coordinator_plan($Q$ + EXPLAIN (costs off) INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4); +$Q$); - -- implicit join on non partition column should also not be pushed down + -- implicit join on non partition column should also not be pushed down, + -- so we fall back to route via coordinator + SELECT coordinator_plan($Q$ + EXPLAIN (costs off) INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first, raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1; +$Q$); - -- the following is again a tricky query for Citus - -- if the given filter was on value_1 as shown in the above, Citus could - -- push it down. But here the query is refused +RESET client_min_messages; + + -- The following is again a tricky query for Citus. If the given filter was + -- on value_1 as shown in the above, Citus could push it down and use + -- distributed INSERT/SELECT. But we instead fall back to route via coordinator. + SELECT coordinator_plan($Q$ + EXPLAIN (costs off) INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id @@ -719,10 +737,12 @@ WHERE raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1 AND raw_events_first.value_2 = 12; +$Q$); - -- lets do some unsupported query tests with subqueries -- foo is not joined on the partition key so the query is not - -- pushed down + -- pushed down. So instead we route via coordinator. + SELECT coordinator_plan($Q$ + EXPLAIN (costs off) INSERT INTO agg_events (user_id, value_4_agg) SELECT @@ -751,9 +771,12 @@ WHERE ON (f.id = f2.id)) as outer_most GROUP BY outer_most.id; +$Q$); -- if the given filter was on value_1 as shown in the above, Citus could - -- push it down. But here the query is refused + -- push it down. But here the query falls back to route via coordinator. + SELECT coordinator_plan($Q$ + EXPLAIN (costs off) INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id @@ -761,10 +784,12 @@ WHERE raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1 AND raw_events_first.value_2 = 12; +$Q$); - -- lets do some unsupported query tests with subqueries -- foo is not joined on the partition key so the query is not - -- pushed down + -- pushed down, and it falls back to route via coordinator +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_events (user_id, value_4_agg) SELECT @@ -793,6 +818,7 @@ WHERE ON (f.id = f2.id)) as outer_most GROUP BY outer_most.id; +$Q$); INSERT INTO agg_events (value_4_agg, @@ -810,6 +836,8 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, GROUP BY raw_events_second.user_id) AS foo; +SET client_min_messages TO DEBUG2; + -- INSERT returns NULL partition key value via coordinator INSERT INTO agg_events (value_4_agg, @@ -941,7 +969,12 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 ON (f.id = f2.id); +SET client_min_messages TO WARNING; + -- cannot pushdown the query since the JOIN is not equi JOIN +-- falls back to route via coordinator +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_events (user_id, value_4_agg) SELECT @@ -969,9 +1002,12 @@ outer_most.id, max(outer_most.value) HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 ON (f.id != f2.id)) as outer_most GROUP BY outer_most.id; - +$Q$); -- cannot pushdown since foo2 is not join on partition key +-- falls back to route via coordinator +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_events (user_id, value_4_agg) SELECT @@ -1000,8 +1036,12 @@ FROM ON (f.id = f2.id)) as outer_most GROUP BY outer_most.id; +$Q$); -- cannot push down since foo doesn't have en equi join +-- falls back to route via coordinator +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_events (user_id, value_4_agg) SELECT @@ -1030,10 +1070,13 @@ FROM ON (f.id = f2.id)) as outer_most GROUP BY outer_most.id; - +$Q$); -- some unsupported LATERAL JOINs -- join on averages is not on the partition key +-- should fall back to route via coordinator +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_events (user_id, value_4_agg) SELECT averages.user_id, avg(averages.value_4) @@ -1050,8 +1093,12 @@ FROM raw_events_first WHERE value_4 = reference_ids.user_id) as averages ON true GROUP BY averages.user_id; +$Q$); -- join among reference_ids and averages is not on the partition key +-- should fall back to route via coordinator +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_events (user_id, value_4_agg) SELECT averages.user_id, avg(averages.value_4) @@ -1067,8 +1114,12 @@ FROM FROM raw_events_first) as averages ON averages.value_4 = reference_ids.user_id GROUP BY averages.user_id; +$Q$); -- join among the agg_ids and averages is not on the partition key +-- should fall back to route via coordinator +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_events (user_id, value_4_agg) SELECT averages.user_id, avg(averages.value_4) @@ -1086,19 +1137,24 @@ FROM JOIN LATERAL (SELECT user_id, value_4 FROM agg_events) as agg_ids ON (agg_ids.value_4 = averages.user_id) GROUP BY averages.user_id; +$Q$); --- not supported subqueries in WHERE clause --- since the selected value in the WHERE is not --- partition key +-- Selected value in the WHERE is not partition key, so we cannot use distributed +-- INSERT/SELECT and falls back route via coordinator +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO raw_events_second (user_id) SELECT user_id FROM raw_events_first WHERE user_id IN (SELECT value_1 FROM raw_events_second); +$Q$); -- same as above but slightly more complex -- since it also includes subquery in FROM as well +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_events (user_id) SELECT f2.id FROM @@ -1124,9 +1180,12 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, ON (f.id = f2.id) WHERE f.id IN (SELECT value_1 FROM raw_events_second); +$Q$); -- some more semi-anti join tests +SET client_min_messages TO DEBUG2; + -- join in where INSERT INTO raw_events_second (user_id) @@ -1136,7 +1195,12 @@ WHERE user_id IN (SELECT raw_events_second.user_id FROM raw_events_second, raw_events_first WHERE raw_events_second.user_id = raw_events_first.user_id AND raw_events_first.user_id = 200); +RESET client_min_messages; + -- we cannot push this down since it is NOT IN +-- we use repartition insert/select instead +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO raw_events_second (user_id) SELECT user_id @@ -1144,7 +1208,9 @@ FROM raw_events_first WHERE user_id NOT IN (SELECT raw_events_second.user_id FROM raw_events_second, raw_events_first WHERE raw_events_second.user_id = raw_events_first.user_id AND raw_events_first.user_id = 200); +$Q$); +SET client_min_messages TO DEBUG2; -- safe to push down INSERT INTO raw_events_second @@ -1195,8 +1261,12 @@ WHERE NOT EXISTS (SELECT 1 GROUP BY outer_most.id; +RESET client_min_messages; -- cannot push down since the f.id IN is matched with value_1 +-- we use repartition insert/select instead +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO raw_events_second (user_id) SELECT user_id @@ -1224,6 +1294,9 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, ON (f.id = f2.id) WHERE f.id IN (SELECT value_1 FROM raw_events_second)); +$Q$); + +SET client_min_messages TO DEBUG2; -- same as above, but this time is it safe to push down since -- f.id IN is matched with user_id @@ -1255,6 +1328,8 @@ ON (f.id = f2.id) WHERE f.id IN (SELECT user_id FROM raw_events_second)); +RESET client_min_messages; + -- cannot push down since top level user_id is matched with NOT IN INSERT INTO raw_events_second (user_id) diff --git a/src/test/regress/sql/multi_insert_select_conflict.sql b/src/test/regress/sql/multi_insert_select_conflict.sql index 9e5e3cb34..cd0057fe0 100644 --- a/src/test/regress/sql/multi_insert_select_conflict.sql +++ b/src/test/regress/sql/multi_insert_select_conflict.sql @@ -153,8 +153,9 @@ UPDATE target_table SET col_2 = 4 WHERE col_1 IN (SELECT col_1 FROM cte); RESET client_min_messages; --- Following query is not supported since error checks of the subquery pushdown planner --- and insert select planner have not been unified. It should work after unifying them. +-- Following query is supported by using repartition join for the insert/select +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) WITH cte AS ( SELECT col_1, col_2 @@ -167,6 +168,7 @@ SELECT source_table_1.col_2 FROM cte, source_table_1 WHERE cte.col_1 = source_table_1.col_1 ON CONFLICT DO NOTHING; +$Q$); SET citus.enable_cte_inlining TO true; diff --git a/src/test/regress/sql/multi_insert_select_non_pushable_queries.sql b/src/test/regress/sql/multi_insert_select_non_pushable_queries.sql index 9dc9373c8..3cc90ca2d 100644 --- a/src/test/regress/sql/multi_insert_select_non_pushable_queries.sql +++ b/src/test/regress/sql/multi_insert_select_non_pushable_queries.sql @@ -416,29 +416,40 @@ ORDER BY user_lastseen DESC; ------------------------------------ ------------------------------------ --- not pushable since partition key is NOT IN +-- not pushable since partition key is NOT IN. Use pull to coordinator instead. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third (user_id) SELECT DISTINCT user_id FROM users_table WHERE user_id NOT IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20) AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 30 AND value_1 <= 40) AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60); +$Q$); --- not pushable since partition key is not selected from the second subquery +-- not pushable since partition key is not selected from the second subquery. +-- Use pull to coordinator instead. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third (user_id) SELECT DISTINCT user_id FROM users_table WHERE user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20) AND user_id IN (SELECT value_1 FROM users_table WHERE value_1 >= 30 AND value_1 <= 40) AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60); +$Q$); --- not pushable since second subquery does not return bare partition key +-- not pushable since second subquery does not return bare partition key. +-- Use pull to coordinator instead. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third (user_id) SELECT DISTINCT user_id FROM users_table WHERE user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20) AND user_id IN (SELECT 3 * user_id FROM users_table WHERE value_1 >= 30 AND value_1 <= 40) AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60); +$Q$); ------------------------------------ ------------------------------------ @@ -568,7 +579,9 @@ INSERT INTO agg_results_third(user_id, value_2_agg) ------------------------------------ ------------------------------------ --- not pushable due to NOT IN +-- not pushable due to NOT IN. Use repartition insert/select. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third(user_id) Select user_id From events_table @@ -579,8 +592,12 @@ And user_id NOT in From users_table Where value_1 = 15 And value_2 > 25); +$Q$); --- not pushable since we're not selecting the partition key +-- not pushable since we're not selecting the partition key. +-- Use repartition insert/select. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third(user_id) Select user_id From events_table @@ -591,9 +608,12 @@ And user_id in From users_table Where value_1 = 15 And value_2 > 25); +$Q$); -- not pushable since we're not selecting the partition key - -- from the events table + -- from the events table. Use repartition insert/select. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third(user_id) Select user_id From events_table @@ -604,6 +624,7 @@ And event_type in From users_table Where value_1 = 15 And value_2 > 25); +$Q$); ------------------------------------ ------------------------------------ @@ -611,23 +632,33 @@ And event_type in ------------------------------------ ------------------------------------ --- not pushable due to NOT IN +-- not pushable due to NOT IN. Use pull to coordinator instead. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third(user_id, value_1_agg) SELECT user_id, event_type FROM events_table WHERE user_id NOT IN (SELECT user_id from events_table WHERE event_type > 500 and event_type < 505) GROUP BY user_id, event_type; +$Q$); --- not pushable due to not selecting the partition key +-- not pushable due to not selecting the partition key. Use pull to coordinator. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third(user_id, value_1_agg) SELECT user_id, event_type FROM events_table WHERE user_id IN (SELECT value_2 from events_table WHERE event_type > 500 and event_type < 505) GROUP BY user_id, event_type; +$Q$); --- not pushable due to not comparing user id from the events table +-- not pushable due to not comparing user id from the events table. +-- Use pull to coordinator. +SELECT coordinator_plan($Q$ +EXPLAIN (costs off) INSERT INTO agg_results_third(user_id, value_1_agg) SELECT user_id, event_type FROM events_table WHERE event_type IN (SELECT user_id from events_table WHERE event_type > 500 and event_type < 505) GROUP BY user_id, event_type; +$Q$); ------------------------------------ ------------------------------------ diff --git a/src/test/regress/sql/with_dml.sql b/src/test/regress/sql/with_dml.sql index e6f52aa7d..0a3052c2f 100644 --- a/src/test/regress/sql/with_dml.sql +++ b/src/test/regress/sql/with_dml.sql @@ -56,9 +56,14 @@ WHERE AND distributed_table.tenant_id = some_tenants.tenant_id AND EXISTS (SELECT * FROM ids_to_delete); --- this query errors out since we've some hard --- errors in the INSERT ... SELECT pushdown --- which prevents to fallback to recursive planning +SET client_min_messages TO WARNING; + +-- this query falls back repartitioned insert/select since we've some hard +-- errors in the INSERT ... SELECT pushdown which prevents to fallback to +-- recursive planning +SELECT * FROM +coordinator_plan($Q$ +EXPLAIN (costs off) WITH ids_to_upsert AS ( SELECT tenant_id FROM distributed_table WHERE dept > 7 @@ -67,6 +72,10 @@ INSERT INTO distributed_table SELECT distributed_table.tenant_id FROM ids_to_upsert, distributed_table WHERE distributed_table.tenant_id = ids_to_upsert.tenant_id ON CONFLICT (tenant_id) DO UPDATE SET dept = 8; +$Q$) s +WHERE s LIKE '%INSERT/SELECT method%'; + +SET client_min_messages TO DEBUG1; -- the following query is very similar to the above one -- but this time the query is pulled to coordinator since