Merge pull request #3451 from citusdata/insert_select_partitioned_joins_2

Don't error out when subquery in INSERT/SELECT is not router plannable.
pull/3460/head
Hadi Moshayedi 2020-02-03 13:29:04 -08:00 committed by GitHub
commit 13d27cb280
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 704 additions and 185 deletions

View File

@ -55,7 +55,8 @@ static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext, plannerRestrictionContext,
uint32 taskIdIndex, uint32 taskIdIndex,
bool allRelationsJoinedOnPartitionKey); bool allRelationsJoinedOnPartitionKey,
DeferredErrorMessage **routerPlannerError);
static DeferredErrorMessage * DistributedInsertSelectSupported(Query *queryTree, static DeferredErrorMessage * DistributedInsertSelectSupported(Query *queryTree,
RangeTblEntry *insertRte, RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte, RangeTblEntry *subqueryRte,
@ -259,7 +260,14 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
targetShardInterval, targetShardInterval,
plannerRestrictionContext, plannerRestrictionContext,
taskIdIndex, taskIdIndex,
allDistributionKeysInQueryAreEqual); allDistributionKeysInQueryAreEqual,
&distributedPlan->
planningError);
if (distributedPlan->planningError != NULL)
{
return distributedPlan;
}
/* add the task if it could be created */ /* add the task if it could be created */
if (modifyTask != NULL) if (modifyTask != NULL)
@ -408,7 +416,8 @@ static Task *
RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInterval, RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInterval,
PlannerRestrictionContext *plannerRestrictionContext, PlannerRestrictionContext *plannerRestrictionContext,
uint32 taskIdIndex, uint32 taskIdIndex,
bool safeToPushdownSubquery) bool safeToPushdownSubquery,
DeferredErrorMessage **routerPlannerError)
{ {
Query *copiedQuery = copyObject(originalQuery); Query *copiedQuery = copyObject(originalQuery);
RangeTblEntry *copiedInsertRte = ExtractResultRelationRTE(copiedQuery); RangeTblEntry *copiedInsertRte = ExtractResultRelationRTE(copiedQuery);
@ -517,10 +526,8 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
if (planningError) if (planningError)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), *routerPlannerError = planningError;
errmsg("cannot perform distributed planning for the given " return NULL;
"modification"),
errdetail("Select query cannot be pushed down to the worker.")));
} }

View File

@ -989,10 +989,62 @@ select create_distributed_table('test', 'x');
(1 row) (1 row)
set citus.enable_repartition_joins to true; set citus.enable_repartition_joins to true;
INSERT INTO test SELECT i, i FROM generate_series(1, 10) i;
EXPLAIN (costs off) INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y);
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
Map Task Count: 3
Merge Task Count: 4
-> MapMergeJob
Map Task Count: 3
Merge Task Count: 4
(11 rows)
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
insert into test(y, x) select a.x, b.y from test a JOIN test b USING (y); INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y);
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
RESET client_min_messages; RESET client_min_messages;
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
20
(1 row)
TRUNCATE test;
INSERT INTO test SELECT i, i FROM generate_series(1, 10) i;
EXPLAIN (costs off) INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y);
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
Map Task Count: 3
Merge Task Count: 4
-> MapMergeJob
Map Task Count: 3
Merge Task Count: 4
(11 rows)
SET client_min_messages TO DEBUG1;
INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y);
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
RESET client_min_messages;
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
20
(1 row)
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA insert_select_repartition CASCADE; DROP SCHEMA insert_select_repartition CASCADE;

View File

@ -851,13 +851,10 @@ DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_t
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300002 raw_events_first JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE ((raw_events_second.user_id OPERATOR(pg_catalog.=) ANY (ARRAY[19, 20, 21])) AND ((worker_hash(raw_events_first.user_id) OPERATOR(pg_catalog.>=) 0) AND (worker_hash(raw_events_first.user_id) OPERATOR(pg_catalog.<=) 1073741823))) DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300002 raw_events_first JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE ((raw_events_second.user_id OPERATOR(pg_catalog.=) ANY (ARRAY[19, 20, 21])) AND ((worker_hash(raw_events_first.user_id) OPERATOR(pg_catalog.>=) 0) AND (worker_hash(raw_events_first.user_id) OPERATOR(pg_catalog.<=) 1073741823)))
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300003 raw_events_first JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE ((raw_events_second.user_id OPERATOR(pg_catalog.=) ANY (ARRAY[19, 20, 21])) AND ((worker_hash(raw_events_first.user_id) OPERATOR(pg_catalog.>=) 1073741824) AND (worker_hash(raw_events_first.user_id) OPERATOR(pg_catalog.<=) 2147483647))) DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300003 raw_events_first JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE ((raw_events_second.user_id OPERATOR(pg_catalog.=) ANY (ARRAY[19, 20, 21])) AND ((worker_hash(raw_events_first.user_id) OPERATOR(pg_catalog.>=) 1073741824) AND (worker_hash(raw_events_first.user_id) OPERATOR(pg_catalog.<=) 2147483647)))
DEBUG: Plan is router executable DEBUG: Plan is router executable
-- the following is a very tricky query for Citus SET client_min_messages TO WARNING;
-- although we do not support pushing down JOINs on non-partition -- following query should use repartitioned joins and results should
-- columns here it is safe to push it down given that we're looking for -- be routed via coordinator
-- a specific value (i.e., value_1 = 12) on the joining column. SET citus.enable_repartition_joins TO true;
-- Note that the query always hits the same shard on raw_events_second
-- and this query wouldn't have worked if we're to use different worker
-- count or shard replication factor
INSERT INTO agg_events INSERT INTO agg_events
(user_id) (user_id)
SELECT raw_events_first.user_id SELECT raw_events_first.user_id
@ -865,8 +862,6 @@ DEBUG: Plan is router executable
raw_events_second raw_events_second
WHERE raw_events_second.user_id = raw_events_first.value_1 WHERE raw_events_second.user_id = raw_events_first.value_1
AND raw_events_first.value_1 = 12; AND raw_events_first.value_1 = 12;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Select query cannot be pushed down to the worker.
-- some unsupported LEFT/INNER JOINs -- some unsupported LEFT/INNER JOINs
-- JOIN on one table with partition column other is not -- JOIN on one table with partition column other is not
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
@ -874,16 +869,13 @@ DETAIL: Select query cannot be pushed down to the worker.
raw_events_first.user_id raw_events_first.user_id
FROM FROM
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1; raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1;
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
DETAIL: Select query cannot be pushed down to the worker.
-- same as the above with INNER JOIN -- same as the above with INNER JOIN
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_first.user_id raw_events_first.user_id
FROM FROM
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1; raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Select query cannot be pushed down to the worker.
-- a not meaningful query -- a not meaningful query
INSERT INTO agg_events INSERT INTO agg_events
(user_id) (user_id)
@ -891,24 +883,33 @@ DETAIL: Select query cannot be pushed down to the worker.
FROM raw_events_first, FROM raw_events_first,
raw_events_second raw_events_second
WHERE raw_events_first.user_id = raw_events_first.value_1; WHERE raw_events_first.user_id = raw_events_first.value_1;
ERROR: cannot perform distributed planning for the given modification ERROR: cannot perform distributed planning on this query
DETAIL: Select query cannot be pushed down to the worker. DETAIL: Cartesian products are currently unsupported
-- both tables joined on non-partition columns -- both tables joined on non-partition columns
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_first.user_id raw_events_first.user_id
FROM FROM
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1; raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1;
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
DETAIL: Select query cannot be pushed down to the worker.
-- same as the above with INNER JOIN -- same as the above with INNER JOIN
-- we support this with route to coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_first.user_id raw_events_first.user_id
FROM FROM
raw_events_first INNER JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1; raw_events_first INNER JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> Custom Scan (Citus Adaptive)
Task Count: 4
(4 rows)
-- even if there is a filter on the partition key, since the join is not on the partition key we reject -- even if there is a filter on the partition key, since the join is not on the partition key we reject
-- this query -- this query
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
@ -918,38 +919,70 @@ FROM
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1
WHERE WHERE
raw_events_first.user_id = 10; raw_events_first.user_id = 10;
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
DETAIL: Select query cannot be pushed down to the worker.
-- same as the above with INNER JOIN -- same as the above with INNER JOIN
-- we support this with route to coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_first.user_id raw_events_first.user_id
FROM FROM
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1
WHERE raw_events_first.user_id = 10; WHERE raw_events_first.user_id = 10;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> Custom Scan (Citus Adaptive)
Task Count: 4
(4 rows)
-- make things a bit more complicate with IN clauses -- make things a bit more complicate with IN clauses
-- we support this with route to coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_first.user_id raw_events_first.user_id
FROM FROM
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1
WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4); WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4);
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
-- implicit join on non partition column should also not be pushed down ---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> Custom Scan (Citus Adaptive)
Task Count: 4
(4 rows)
-- implicit join on non partition column should also not be pushed down,
-- so we fall back to route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id) (user_id)
SELECT raw_events_first.user_id SELECT raw_events_first.user_id
FROM raw_events_first, FROM raw_events_first,
raw_events_second raw_events_second
WHERE raw_events_second.user_id = raw_events_first.value_1; WHERE raw_events_second.user_id = raw_events_first.value_1;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
-- the following is again a tricky query for Citus ---------------------------------------------------------------------
-- if the given filter was on value_1 as shown in the above, Citus could Custom Scan (Citus INSERT ... SELECT)
-- push it down. But here the query is refused INSERT/SELECT method: pull to coordinator
-> Custom Scan (Citus Adaptive)
Task Count: 4
(4 rows)
RESET client_min_messages;
-- The following is again a tricky query for Citus. If the given filter was
-- on value_1 as shown in the above, Citus could push it down and use
-- distributed INSERT/SELECT. But we instead fall back to route via coordinator.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id) (user_id)
SELECT raw_events_first.user_id SELECT raw_events_first.user_id
@ -957,11 +990,19 @@ DETAIL: Select query cannot be pushed down to the worker.
raw_events_second raw_events_second
WHERE raw_events_second.user_id = raw_events_first.value_1 WHERE raw_events_second.user_id = raw_events_first.value_1
AND raw_events_first.value_2 = 12; AND raw_events_first.value_2 = 12;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
-- lets do some unsupported query tests with subqueries ---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> Custom Scan (Citus Adaptive)
Task Count: 4
(4 rows)
-- foo is not joined on the partition key so the query is not -- foo is not joined on the partition key so the query is not
-- pushed down -- pushed down. So instead we route via coordinator.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id, value_4_agg) (user_id, value_4_agg)
SELECT SELECT
@ -990,10 +1031,23 @@ DETAIL: Select query cannot be pushed down to the worker.
ON (f.id = f2.id)) as outer_most ON (f.id = f2.id)) as outer_most
GROUP BY GROUP BY
outer_most.id; outer_most.id;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> HashAggregate
Group Key: remote_scan.id
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(8 rows)
-- if the given filter was on value_1 as shown in the above, Citus could -- if the given filter was on value_1 as shown in the above, Citus could
-- push it down. But here the query is refused -- push it down. But here the query falls back to route via coordinator.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id) (user_id)
SELECT raw_events_first.user_id SELECT raw_events_first.user_id
@ -1001,11 +1055,19 @@ DETAIL: Select query cannot be pushed down to the worker.
raw_events_second raw_events_second
WHERE raw_events_second.user_id = raw_events_first.value_1 WHERE raw_events_second.user_id = raw_events_first.value_1
AND raw_events_first.value_2 = 12; AND raw_events_first.value_2 = 12;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
-- lets do some unsupported query tests with subqueries ---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> Custom Scan (Citus Adaptive)
Task Count: 4
(4 rows)
-- foo is not joined on the partition key so the query is not -- foo is not joined on the partition key so the query is not
-- pushed down -- pushed down, and it falls back to route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id, value_4_agg) (user_id, value_4_agg)
SELECT SELECT
@ -1034,8 +1096,19 @@ DETAIL: Select query cannot be pushed down to the worker.
ON (f.id = f2.id)) as outer_most ON (f.id = f2.id)) as outer_most
GROUP BY GROUP BY
outer_most.id; outer_most.id;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> HashAggregate
Group Key: remote_scan.id
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(8 rows)
INSERT INTO agg_events INSERT INTO agg_events
(value_4_agg, (value_4_agg,
value_1_agg, value_1_agg,
@ -1050,8 +1123,8 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
raw_events_second raw_events_second
WHERE raw_events_first.user_id != raw_events_second.user_id WHERE raw_events_first.user_id != raw_events_second.user_id
GROUP BY raw_events_second.user_id) AS foo; GROUP BY raw_events_second.user_id) AS foo;
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
DETAIL: Select query cannot be pushed down to the worker. SET client_min_messages TO DEBUG2;
-- INSERT returns NULL partition key value via coordinator -- INSERT returns NULL partition key value via coordinator
INSERT INTO agg_events INSERT INTO agg_events
(value_4_agg, (value_4_agg,
@ -1290,7 +1363,11 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT f.id AS user_
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: performing repartitioned INSERT ... SELECT DEBUG: performing repartitioned INSERT ... SELECT
DEBUG: partitioning SELECT query by column index 0 with name 'user_id' DEBUG: partitioning SELECT query by column index 0 with name 'user_id'
SET client_min_messages TO WARNING;
-- cannot pushdown the query since the JOIN is not equi JOIN -- cannot pushdown the query since the JOIN is not equi JOIN
-- falls back to route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id, value_4_agg) (user_id, value_4_agg)
SELECT SELECT
@ -1318,9 +1395,23 @@ outer_most.id, max(outer_most.value)
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id != f2.id)) as outer_most ON (f.id != f2.id)) as outer_most
GROUP BY outer_most.id; GROUP BY outer_most.id;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> HashAggregate
Group Key: remote_scan.id
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(8 rows)
-- cannot pushdown since foo2 is not join on partition key -- cannot pushdown since foo2 is not join on partition key
-- falls back to route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id, value_4_agg) (user_id, value_4_agg)
SELECT SELECT
@ -1349,9 +1440,26 @@ FROM
ON (f.id = f2.id)) as outer_most ON (f.id = f2.id)) as outer_most
GROUP BY GROUP BY
outer_most.id; outer_most.id;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> HashAggregate
Group Key: remote_scan.id
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> HashAggregate
Group Key: remote_scan.id
Filter: (pg_catalog.sum(remote_scan.worker_column_4) > '10'::numeric)
-> Custom Scan (Citus Adaptive)
Task Count: 4
(11 rows)
-- cannot push down since foo doesn't have en equi join -- cannot push down since foo doesn't have en equi join
-- falls back to route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id, value_4_agg) (user_id, value_4_agg)
SELECT SELECT
@ -1380,10 +1488,24 @@ FROM
ON (f.id = f2.id)) as outer_most ON (f.id = f2.id)) as outer_most
GROUP BY GROUP BY
outer_most.id; outer_most.id;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> HashAggregate
Group Key: remote_scan.id
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(8 rows)
-- some unsupported LATERAL JOINs -- some unsupported LATERAL JOINs
-- join on averages is not on the partition key -- join on averages is not on the partition key
-- should fall back to route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events (user_id, value_4_agg) INSERT INTO agg_events (user_id, value_4_agg)
SELECT SELECT
averages.user_id, avg(averages.value_4) averages.user_id, avg(averages.value_4)
@ -1400,9 +1522,21 @@ FROM
raw_events_first WHERE raw_events_first WHERE
value_4 = reference_ids.user_id) as averages ON true value_4 = reference_ids.user_id) as averages ON true
GROUP BY averages.user_id; GROUP BY averages.user_id;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> HashAggregate
Group Key: remote_scan.user_id
-> Custom Scan (Citus Adaptive)
Task Count: 4
(6 rows)
-- join among reference_ids and averages is not on the partition key -- join among reference_ids and averages is not on the partition key
-- should fall back to route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events (user_id, value_4_agg) INSERT INTO agg_events (user_id, value_4_agg)
SELECT SELECT
averages.user_id, avg(averages.value_4) averages.user_id, avg(averages.value_4)
@ -1418,9 +1552,23 @@ FROM
FROM FROM
raw_events_first) as averages ON averages.value_4 = reference_ids.user_id raw_events_first) as averages ON averages.value_4 = reference_ids.user_id
GROUP BY averages.user_id; GROUP BY averages.user_id;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> HashAggregate
Group Key: remote_scan.user_id
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(8 rows)
-- join among the agg_ids and averages is not on the partition key -- join among the agg_ids and averages is not on the partition key
-- should fall back to route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events (user_id, value_4_agg) INSERT INTO agg_events (user_id, value_4_agg)
SELECT SELECT
averages.user_id, avg(averages.value_4) averages.user_id, avg(averages.value_4)
@ -1438,24 +1586,44 @@ FROM
JOIN LATERAL JOIN LATERAL
(SELECT user_id, value_4 FROM agg_events) as agg_ids ON (agg_ids.value_4 = averages.user_id) (SELECT user_id, value_4 FROM agg_events) as agg_ids ON (agg_ids.value_4 = averages.user_id)
GROUP BY averages.user_id; GROUP BY averages.user_id;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
-- not supported subqueries in WHERE clause ---------------------------------------------------------------------
-- since the selected value in the WHERE is not Custom Scan (Citus INSERT ... SELECT)
-- partition key INSERT/SELECT method: pull to coordinator
-> Custom Scan (Citus Adaptive)
Task Count: 4
(4 rows)
-- Selected value in the WHERE is not partition key, so we cannot use distributed
-- INSERT/SELECT and falls back route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)
SELECT user_id SELECT user_id
FROM raw_events_first FROM raw_events_first
WHERE user_id IN (SELECT value_1 WHERE user_id IN (SELECT value_1
FROM raw_events_second); FROM raw_events_second);
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: repartition
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(6 rows)
-- same as above but slightly more complex -- same as above but slightly more complex
-- since it also includes subquery in FROM as well -- since it also includes subquery in FROM as well
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id) (user_id)
SELECT f2.id FROM SELECT f2.id FROM
(SELECT (SELECT
id id
FROM (SELECT reference_table.user_id AS id FROM (SELECT reference_table.user_id AS id
@ -1477,9 +1645,19 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
ON (f.id = f2.id) ON (f.id = f2.id)
WHERE f.id IN (SELECT value_1 WHERE f.id IN (SELECT value_1
FROM raw_events_second); FROM raw_events_second);
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: repartition
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(6 rows)
-- some more semi-anti join tests -- some more semi-anti join tests
SET client_min_messages TO DEBUG2;
-- join in where -- join in where
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)
@ -1493,7 +1671,11 @@ DEBUG: Skipping target shard interval 13300005 since SELECT query for it pruned
DEBUG: Skipping target shard interval 13300006 since SELECT query for it pruned away DEBUG: Skipping target shard interval 13300006 since SELECT query for it pruned away
DEBUG: Skipping target shard interval 13300007 since SELECT query for it pruned away DEBUG: Skipping target shard interval 13300007 since SELECT query for it pruned away
DEBUG: Plan is router executable DEBUG: Plan is router executable
RESET client_min_messages;
-- we cannot push this down since it is NOT IN -- we cannot push this down since it is NOT IN
-- we use repartition insert/select instead
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)
SELECT user_id SELECT user_id
@ -1501,8 +1683,18 @@ FROM raw_events_first
WHERE user_id NOT IN (SELECT raw_events_second.user_id WHERE user_id NOT IN (SELECT raw_events_second.user_id
FROM raw_events_second, raw_events_first FROM raw_events_second, raw_events_first
WHERE raw_events_second.user_id = raw_events_first.user_id AND raw_events_first.user_id = 200); WHERE raw_events_second.user_id = raw_events_first.user_id AND raw_events_first.user_id = 200);
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: repartition
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 1
(6 rows)
SET client_min_messages TO DEBUG2;
-- safe to push down -- safe to push down
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)
@ -1563,7 +1755,11 @@ DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_t
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_4_agg) SELECT id, max(value) AS max FROM (SELECT f2.id, f2.v4 AS value FROM ((SELECT foo.id FROM (SELECT raw_events_first.user_id AS id FROM (public.raw_events_first_13300002 raw_events_first LEFT JOIN public.reference_table_13300012 reference_table ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) reference_table.user_id)))) foo) f LEFT JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300002 raw_events_first, public.raw_events_second_13300006 raw_events_second WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.user_id HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id)))) outer_most WHERE ((worker_hash(id) OPERATOR(pg_catalog.>=) 0) AND (worker_hash(id) OPERATOR(pg_catalog.<=) 1073741823)) GROUP BY id DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_4_agg) SELECT id, max(value) AS max FROM (SELECT f2.id, f2.v4 AS value FROM ((SELECT foo.id FROM (SELECT raw_events_first.user_id AS id FROM (public.raw_events_first_13300002 raw_events_first LEFT JOIN public.reference_table_13300012 reference_table ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) reference_table.user_id)))) foo) f LEFT JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300002 raw_events_first, public.raw_events_second_13300006 raw_events_second WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.user_id HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id)))) outer_most WHERE ((worker_hash(id) OPERATOR(pg_catalog.>=) 0) AND (worker_hash(id) OPERATOR(pg_catalog.<=) 1073741823)) GROUP BY id
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_4_agg) SELECT id, max(value) AS max FROM (SELECT f2.id, f2.v4 AS value FROM ((SELECT foo.id FROM (SELECT raw_events_first.user_id AS id FROM (public.raw_events_first_13300003 raw_events_first LEFT JOIN public.reference_table_13300012 reference_table ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) reference_table.user_id)))) foo) f LEFT JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300003 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.user_id HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id)))) outer_most WHERE ((worker_hash(id) OPERATOR(pg_catalog.>=) 1073741824) AND (worker_hash(id) OPERATOR(pg_catalog.<=) 2147483647)) GROUP BY id DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_4_agg) SELECT id, max(value) AS max FROM (SELECT f2.id, f2.v4 AS value FROM ((SELECT foo.id FROM (SELECT raw_events_first.user_id AS id FROM (public.raw_events_first_13300003 raw_events_first LEFT JOIN public.reference_table_13300012 reference_table ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) reference_table.user_id)))) foo) f LEFT JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300003 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.user_id HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id)))) outer_most WHERE ((worker_hash(id) OPERATOR(pg_catalog.>=) 1073741824) AND (worker_hash(id) OPERATOR(pg_catalog.<=) 2147483647)) GROUP BY id
DEBUG: Plan is router executable DEBUG: Plan is router executable
RESET client_min_messages;
-- cannot push down since the f.id IN is matched with value_1 -- cannot push down since the f.id IN is matched with value_1
-- we use repartition insert/select instead
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)
SELECT user_id SELECT user_id
@ -1591,8 +1787,18 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
ON (f.id = f2.id) ON (f.id = f2.id)
WHERE f.id IN (SELECT value_1 WHERE f.id IN (SELECT value_1
FROM raw_events_second)); FROM raw_events_second));
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: repartition
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(6 rows)
SET client_min_messages TO DEBUG2;
-- same as above, but this time is it safe to push down since -- same as above, but this time is it safe to push down since
-- f.id IN is matched with user_id -- f.id IN is matched with user_id
INSERT INTO raw_events_second INSERT INTO raw_events_second
@ -1627,6 +1833,7 @@ DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300002 raw_events_first WHERE ((user_id OPERATOR(pg_catalog.=) ANY (SELECT f2.id FROM ((SELECT foo.id FROM (SELECT reference_table.user_id AS id FROM public.raw_events_first_13300002 raw_events_first_1, public.reference_table_13300012 reference_table WHERE (raw_events_first_1.user_id OPERATOR(pg_catalog.=) reference_table.user_id)) foo) f JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first_1.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300002 raw_events_first_1, public.raw_events_second_13300006 raw_events_second WHERE (raw_events_first_1.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.user_id HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id))) WHERE (f.id OPERATOR(pg_catalog.=) ANY (SELECT raw_events_second.user_id FROM public.raw_events_second_13300006 raw_events_second)))) AND ((worker_hash(user_id) OPERATOR(pg_catalog.>=) 0) AND (worker_hash(user_id) OPERATOR(pg_catalog.<=) 1073741823))) DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300002 raw_events_first WHERE ((user_id OPERATOR(pg_catalog.=) ANY (SELECT f2.id FROM ((SELECT foo.id FROM (SELECT reference_table.user_id AS id FROM public.raw_events_first_13300002 raw_events_first_1, public.reference_table_13300012 reference_table WHERE (raw_events_first_1.user_id OPERATOR(pg_catalog.=) reference_table.user_id)) foo) f JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first_1.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300002 raw_events_first_1, public.raw_events_second_13300006 raw_events_second WHERE (raw_events_first_1.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.user_id HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id))) WHERE (f.id OPERATOR(pg_catalog.=) ANY (SELECT raw_events_second.user_id FROM public.raw_events_second_13300006 raw_events_second)))) AND ((worker_hash(user_id) OPERATOR(pg_catalog.>=) 0) AND (worker_hash(user_id) OPERATOR(pg_catalog.<=) 1073741823)))
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id OPERATOR(pg_catalog.=) ANY (SELECT f2.id FROM ((SELECT foo.id FROM (SELECT reference_table.user_id AS id FROM public.raw_events_first_13300003 raw_events_first_1, public.reference_table_13300012 reference_table WHERE (raw_events_first_1.user_id OPERATOR(pg_catalog.=) reference_table.user_id)) foo) f JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first_1.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300003 raw_events_first_1, public.raw_events_second_13300007 raw_events_second WHERE (raw_events_first_1.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.user_id HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id))) WHERE (f.id OPERATOR(pg_catalog.=) ANY (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second)))) AND ((worker_hash(user_id) OPERATOR(pg_catalog.>=) 1073741824) AND (worker_hash(user_id) OPERATOR(pg_catalog.<=) 2147483647))) DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id OPERATOR(pg_catalog.=) ANY (SELECT f2.id FROM ((SELECT foo.id FROM (SELECT reference_table.user_id AS id FROM public.raw_events_first_13300003 raw_events_first_1, public.reference_table_13300012 reference_table WHERE (raw_events_first_1.user_id OPERATOR(pg_catalog.=) reference_table.user_id)) foo) f JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first_1.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300003 raw_events_first_1, public.raw_events_second_13300007 raw_events_second WHERE (raw_events_first_1.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.user_id HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id))) WHERE (f.id OPERATOR(pg_catalog.=) ANY (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second)))) AND ((worker_hash(user_id) OPERATOR(pg_catalog.>=) 1073741824) AND (worker_hash(user_id) OPERATOR(pg_catalog.<=) 2147483647)))
DEBUG: Plan is router executable DEBUG: Plan is router executable
RESET client_min_messages;
-- cannot push down since top level user_id is matched with NOT IN -- cannot push down since top level user_id is matched with NOT IN
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)
@ -1655,8 +1862,8 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
ON (f.id = f2.id) ON (f.id = f2.id)
WHERE f.id IN (SELECT user_id WHERE f.id IN (SELECT user_id
FROM raw_events_second)); FROM raw_events_second));
ERROR: cannot perform distributed planning for the given modification ERROR: cannot pushdown the subquery
DETAIL: Select query cannot be pushed down to the worker. DETAIL: There exist a reference table in the outer part of the outer join
-- cannot push down since join is not equi join (f.id > f2.id) -- cannot push down since join is not equi join (f.id > f2.id)
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)
@ -1685,8 +1892,8 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
ON (f.id > f2.id) ON (f.id > f2.id)
WHERE f.id IN (SELECT user_id WHERE f.id IN (SELECT user_id
FROM raw_events_second)); FROM raw_events_second));
ERROR: cannot perform distributed planning for the given modification ERROR: cannot pushdown the subquery
DETAIL: Select query cannot be pushed down to the worker. DETAIL: There exist a reference table in the outer part of the outer join
-- we currently not support grouping sets -- we currently not support grouping sets
INSERT INTO agg_events INSERT INTO agg_events
(user_id, (user_id,
@ -1697,8 +1904,6 @@ SELECT user_id,
Sum(value_2) AS sum_val2 Sum(value_2) AS sum_val2
FROM raw_events_second FROM raw_events_second
GROUP BY grouping sets ( ( user_id ), ( value_1 ), ( user_id, value_1 ), ( ) ); GROUP BY grouping sets ( ( user_id ), ( value_1 ), ( user_id, value_1 ), ( ) );
DEBUG: grouping sets are not allowed in distributed INSERT ... SELECT queries
DEBUG: Router planner cannot handle multi-shard select queries
ERROR: could not run distributed query with GROUPING SETS, CUBE, or ROLLUP ERROR: could not run distributed query with GROUPING SETS, CUBE, or ROLLUP
HINT: Consider using an equality filter on the distributed table's partition column. HINT: Consider using an equality filter on the distributed table's partition column.
-- set back to INFO -- set back to INFO

View File

@ -274,8 +274,9 @@ DEBUG: generating subplan XXX_1 for CTE basic: SELECT col_1, col_2 FROM on_conf
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT basic.col_1, basic.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) basic) citus_insert_select_subquery DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT basic.col_1, basic.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) basic) citus_insert_select_subquery
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
RESET client_min_messages; RESET client_min_messages;
-- Following query is not supported since error checks of the subquery pushdown planner -- Following query is supported by using repartition join for the insert/select
-- and insert select planner have not been unified. It should work after unifying them. SELECT coordinator_plan($Q$
EXPLAIN (costs off)
WITH cte AS ( WITH cte AS (
SELECT SELECT
col_1, col_2 col_1, col_2
@ -288,8 +289,17 @@ SELECT
source_table_1.col_2 source_table_1.col_2
FROM cte, source_table_1 FROM cte, source_table_1
WHERE cte.col_1 = source_table_1.col_1 ON CONFLICT DO NOTHING; WHERE cte.col_1 = source_table_1.col_1 ON CONFLICT DO NOTHING;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: repartition
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(6 rows)
SET citus.enable_cte_inlining TO true; SET citus.enable_cte_inlining TO true;
-- Tests with foreign key to reference table -- Tests with foreign key to reference table
CREATE TABLE test_ref_table (key int PRIMARY KEY); CREATE TABLE test_ref_table (key int PRIMARY KEY);

View File

@ -30,8 +30,7 @@ FROM (
) t ) t
GROUP BY user_id GROUP BY user_id
) q; ) q;
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
DETAIL: Select query cannot be pushed down to the worker.
--------------------------------------------------------------------- ---------------------------------------------------------------------
--------------------------------------------------------------------- ---------------------------------------------------------------------
-- Funnel grouped by whether or not a user has done an event -- Funnel grouped by whether or not a user has done an event
@ -350,8 +349,7 @@ FROM (
GROUP BY user_id GROUP BY user_id
) AS shard_union ) AS shard_union
ORDER BY user_lastseen DESC; ORDER BY user_lastseen DESC;
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
DETAIL: Select query cannot be pushed down to the worker.
-- not pushable since lateral join is not on the partition key -- not pushable since lateral join is not on the partition key
INSERT INTO agg_results_third (user_id, agg_time, value_2_agg) INSERT INTO agg_results_third (user_id, agg_time, value_2_agg)
SELECT SELECT
@ -379,8 +377,7 @@ FROM (
GROUP BY user_id GROUP BY user_id
) AS shard_union ) AS shard_union
ORDER BY user_lastseen DESC; ORDER BY user_lastseen DESC;
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
DETAIL: Select query cannot be pushed down to the worker.
-- not pushable since lateral join is not on the partition key -- not pushable since lateral join is not on the partition key
INSERT INTO agg_results_third (user_id, agg_time, value_2_agg) INSERT INTO agg_results_third (user_id, agg_time, value_2_agg)
SELECT SELECT
@ -408,40 +405,80 @@ FROM (
GROUP BY user_id GROUP BY user_id
) AS shard_union ) AS shard_union
ORDER BY user_lastseen DESC; ORDER BY user_lastseen DESC;
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
DETAIL: Select query cannot be pushed down to the worker.
--------------------------------------------------------------------- ---------------------------------------------------------------------
--------------------------------------------------------------------- ---------------------------------------------------------------------
-- Count the number of distinct users_table who are in segment X and Y and Z -- Count the number of distinct users_table who are in segment X and Y and Z
--------------------------------------------------------------------- ---------------------------------------------------------------------
--------------------------------------------------------------------- ---------------------------------------------------------------------
-- not pushable since partition key is NOT IN -- not pushable since partition key is NOT IN. Use pull to coordinator instead.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third (user_id) INSERT INTO agg_results_third (user_id)
SELECT DISTINCT user_id SELECT DISTINCT user_id
FROM users_table FROM users_table
WHERE user_id NOT IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20) WHERE user_id NOT IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20)
AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 30 AND value_1 <= 40) AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 30 AND value_1 <= 40)
AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60); AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60);
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
-- not pushable since partition key is not selected from the second subquery ---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> HashAggregate
Group Key: remote_scan.user_id
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(8 rows)
-- not pushable since partition key is not selected from the second subquery.
-- Use pull to coordinator instead.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third (user_id) INSERT INTO agg_results_third (user_id)
SELECT DISTINCT user_id SELECT DISTINCT user_id
FROM users_table FROM users_table
WHERE user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20) WHERE user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20)
AND user_id IN (SELECT value_1 FROM users_table WHERE value_1 >= 30 AND value_1 <= 40) AND user_id IN (SELECT value_1 FROM users_table WHERE value_1 >= 30 AND value_1 <= 40)
AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60); AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60);
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
-- not pushable since second subquery does not return bare partition key ---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> HashAggregate
Group Key: remote_scan.user_id
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(8 rows)
-- not pushable since second subquery does not return bare partition key.
-- Use pull to coordinator instead.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third (user_id) INSERT INTO agg_results_third (user_id)
SELECT DISTINCT user_id SELECT DISTINCT user_id
FROM users_table FROM users_table
WHERE user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20) WHERE user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20)
AND user_id IN (SELECT 3 * user_id FROM users_table WHERE value_1 >= 30 AND value_1 <= 40) AND user_id IN (SELECT 3 * user_id FROM users_table WHERE value_1 >= 30 AND value_1 <= 40)
AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60); AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60);
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> HashAggregate
Group Key: remote_scan.user_id
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(8 rows)
--------------------------------------------------------------------- ---------------------------------------------------------------------
--------------------------------------------------------------------- ---------------------------------------------------------------------
-- Find customers who have done X, and satisfy other customer specific criteria -- Find customers who have done X, and satisfy other customer specific criteria
@ -453,16 +490,14 @@ SELECT user_id, value_2 FROM users_table WHERE
value_1 > 101 AND value_1 < 110 value_1 > 101 AND value_1 < 110
AND value_2 >= 5 AND value_2 >= 5
AND EXISTS (SELECT user_id FROM events_table WHERE event_type>101 AND event_type < 110 AND value_3 > 100 AND user_id!=users_table.user_id); AND EXISTS (SELECT user_id FROM events_table WHERE event_type>101 AND event_type < 110 AND value_3 > 100 AND user_id!=users_table.user_id);
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
DETAIL: Select query cannot be pushed down to the worker.
-- not pushable since the join is not on the partition key -- not pushable since the join is not on the partition key
INSERT INTO agg_results_third(user_id, value_2_agg) INSERT INTO agg_results_third(user_id, value_2_agg)
SELECT user_id, value_2 FROM users_table WHERE SELECT user_id, value_2 FROM users_table WHERE
value_1 > 101 AND value_1 < 110 value_1 > 101 AND value_1 < 110
AND value_2 >= 5 AND value_2 >= 5
AND EXISTS (SELECT user_id FROM events_table WHERE event_type>101 AND event_type < 110 AND value_3 > 100 AND event_type = users_table.user_id); AND EXISTS (SELECT user_id FROM events_table WHERE event_type>101 AND event_type < 110 AND value_3 > 100 AND event_type = users_table.user_id);
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
DETAIL: Select query cannot be pushed down to the worker.
--------------------------------------------------------------------- ---------------------------------------------------------------------
--------------------------------------------------------------------- ---------------------------------------------------------------------
-- Customers who havent done X, and satisfy other customer specific criteria -- Customers who havent done X, and satisfy other customer specific criteria
@ -474,16 +509,14 @@ SELECT user_id, value_2 FROM users_table WHERE
value_1 = 101 value_1 = 101
AND value_2 >= 5 AND value_2 >= 5
AND NOT EXISTS (SELECT user_id FROM events_table WHERE event_type=101 AND value_3 > 100 AND user_id!=users_table.user_id); AND NOT EXISTS (SELECT user_id FROM events_table WHERE event_type=101 AND value_3 > 100 AND user_id!=users_table.user_id);
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
DETAIL: Select query cannot be pushed down to the worker.
-- not pushable since the join is not the partition key -- not pushable since the join is not the partition key
INSERT INTO agg_results_third(user_id, value_2_agg) INSERT INTO agg_results_third(user_id, value_2_agg)
SELECT user_id, value_2 FROM users_table WHERE SELECT user_id, value_2 FROM users_table WHERE
value_1 = 101 value_1 = 101
AND value_2 >= 5 AND value_2 >= 5
AND NOT EXISTS (SELECT user_id FROM events_table WHERE event_type=101 AND value_3 > 100 AND event_type=users_table.user_id); AND NOT EXISTS (SELECT user_id FROM events_table WHERE event_type=101 AND value_3 > 100 AND event_type=users_table.user_id);
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
DETAIL: Select query cannot be pushed down to the worker.
--------------------------------------------------------------------- ---------------------------------------------------------------------
--------------------------------------------------------------------- ---------------------------------------------------------------------
-- Customers who have done X and Y, and satisfy other customer specific criteria -- Customers who have done X and Y, and satisfy other customer specific criteria
@ -496,8 +529,7 @@ SELECT user_id, value_2 FROM users_table WHERE
AND value_2 >= 5 AND value_2 >= 5
AND EXISTS (SELECT user_id FROM events_table WHERE event_type!=100 AND value_3 > 100 AND user_id=users_table.user_id) AND EXISTS (SELECT user_id FROM events_table WHERE event_type!=100 AND value_3 > 100 AND user_id=users_table.user_id)
AND EXISTS (SELECT user_id FROM events_table WHERE event_type=101 AND value_3 > 100 AND user_id!=users_table.user_id); AND EXISTS (SELECT user_id FROM events_table WHERE event_type=101 AND value_3 > 100 AND user_id!=users_table.user_id);
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
DETAIL: Select query cannot be pushed down to the worker.
--------------------------------------------------------------------- ---------------------------------------------------------------------
--------------------------------------------------------------------- ---------------------------------------------------------------------
-- Customers who have done X and havent done Y, and satisfy other customer specific criteria -- Customers who have done X and havent done Y, and satisfy other customer specific criteria
@ -509,8 +541,7 @@ SELECT user_id, value_2 FROM users_table WHERE
value_2 >= 5 value_2 >= 5
AND EXISTS (SELECT user_id FROM events_table WHERE event_type > 100 AND event_type <= 300 AND value_3 > 100 AND user_id!=users_table.user_id) AND EXISTS (SELECT user_id FROM events_table WHERE event_type > 100 AND event_type <= 300 AND value_3 > 100 AND user_id!=users_table.user_id)
AND NOT EXISTS (SELECT user_id FROM events_table WHERE event_type > 300 AND event_type <= 350 AND value_3 > 100 AND user_id=users_table.user_id); AND NOT EXISTS (SELECT user_id FROM events_table WHERE event_type > 300 AND event_type <= 350 AND value_3 > 100 AND user_id=users_table.user_id);
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
DETAIL: Select query cannot be pushed down to the worker.
--------------------------------------------------------------------- ---------------------------------------------------------------------
--------------------------------------------------------------------- ---------------------------------------------------------------------
-- Customers who have done X more than 2 times, and satisfy other customer specific criteria -- Customers who have done X more than 2 times, and satisfy other customer specific criteria
@ -532,8 +563,7 @@ INSERT INTO agg_results_third(user_id, value_2_agg)
AND user_id != users_table.user_id AND user_id != users_table.user_id
GROUP BY user_id GROUP BY user_id
HAVING Count(*) > 2); HAVING Count(*) > 2);
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
DETAIL: Select query cannot be pushed down to the worker.
-- not pushable since the second join is not on the partition key -- not pushable since the second join is not on the partition key
INSERT INTO agg_results_third(user_id, value_2_agg) INSERT INTO agg_results_third(user_id, value_2_agg)
SELECT user_id, SELECT user_id,
@ -550,8 +580,7 @@ INSERT INTO agg_results_third(user_id, value_2_agg)
AND event_type = users_table.user_id AND event_type = users_table.user_id
GROUP BY user_id GROUP BY user_id
HAVING Count(*) > 2); HAVING Count(*) > 2);
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
DETAIL: Select query cannot be pushed down to the worker.
-- not pushable since the second join is not on the partition key -- not pushable since the second join is not on the partition key
INSERT INTO agg_results_third(user_id, value_2_agg) INSERT INTO agg_results_third(user_id, value_2_agg)
SELECT user_id, SELECT user_id,
@ -568,14 +597,15 @@ INSERT INTO agg_results_third(user_id, value_2_agg)
AND user_id = users_table.value_1 AND user_id = users_table.value_1
GROUP BY user_id GROUP BY user_id
HAVING Count(*) > 2); HAVING Count(*) > 2);
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
DETAIL: Select query cannot be pushed down to the worker.
--------------------------------------------------------------------- ---------------------------------------------------------------------
--------------------------------------------------------------------- ---------------------------------------------------------------------
-- Find me all users_table who has done some event and has filters -- Find me all users_table who has done some event and has filters
--------------------------------------------------------------------- ---------------------------------------------------------------------
--------------------------------------------------------------------- ---------------------------------------------------------------------
-- not pushable due to NOT IN -- not pushable due to NOT IN. Use repartition insert/select.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third(user_id) INSERT INTO agg_results_third(user_id)
Select user_id Select user_id
From events_table From events_table
@ -586,9 +616,21 @@ And user_id NOT in
From users_table From users_table
Where value_1 = 15 Where value_1 = 15
And value_2 > 25); And value_2 > 25);
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
-- not pushable since we're not selecting the partition key ---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: repartition
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(6 rows)
-- not pushable since we're not selecting the partition key.
-- Use repartition insert/select.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third(user_id) INSERT INTO agg_results_third(user_id)
Select user_id Select user_id
From events_table From events_table
@ -599,10 +641,21 @@ And user_id in
From users_table From users_table
Where value_1 = 15 Where value_1 = 15
And value_2 > 25); And value_2 > 25);
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: repartition
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(6 rows)
-- not pushable since we're not selecting the partition key -- not pushable since we're not selecting the partition key
-- from the events table -- from the events table. Use repartition insert/select.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third(user_id) INSERT INTO agg_results_third(user_id)
Select user_id Select user_id
From events_table From events_table
@ -613,34 +666,83 @@ And event_type in
From users_table From users_table
Where value_1 = 15 Where value_1 = 15
And value_2 > 25); And value_2 > 25);
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: repartition
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(6 rows)
--------------------------------------------------------------------- ---------------------------------------------------------------------
--------------------------------------------------------------------- ---------------------------------------------------------------------
-- Which events_table did people who has done some specific events_table -- Which events_table did people who has done some specific events_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
--------------------------------------------------------------------- ---------------------------------------------------------------------
-- not pushable due to NOT IN -- not pushable due to NOT IN. Use pull to coordinator instead.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third(user_id, value_1_agg) INSERT INTO agg_results_third(user_id, value_1_agg)
SELECT user_id, event_type FROM events_table SELECT user_id, event_type FROM events_table
WHERE user_id NOT IN (SELECT user_id from events_table WHERE event_type > 500 and event_type < 505) WHERE user_id NOT IN (SELECT user_id from events_table WHERE event_type > 500 and event_type < 505)
GROUP BY user_id, event_type; GROUP BY user_id, event_type;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
-- not pushable due to not selecting the partition key ---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> HashAggregate
Group Key: remote_scan.user_id, remote_scan.event_type
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(8 rows)
-- not pushable due to not selecting the partition key. Use pull to coordinator.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third(user_id, value_1_agg) INSERT INTO agg_results_third(user_id, value_1_agg)
SELECT user_id, event_type FROM events_table SELECT user_id, event_type FROM events_table
WHERE user_id IN (SELECT value_2 from events_table WHERE event_type > 500 and event_type < 505) WHERE user_id IN (SELECT value_2 from events_table WHERE event_type > 500 and event_type < 505)
GROUP BY user_id, event_type; GROUP BY user_id, event_type;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
-- not pushable due to not comparing user id from the events table ---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> HashAggregate
Group Key: remote_scan.user_id, remote_scan.event_type
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(8 rows)
-- not pushable due to not comparing user id from the events table.
-- Use pull to coordinator.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third(user_id, value_1_agg) INSERT INTO agg_results_third(user_id, value_1_agg)
SELECT user_id, event_type FROM events_table SELECT user_id, event_type FROM events_table
WHERE event_type IN (SELECT user_id from events_table WHERE event_type > 500 and event_type < 505) WHERE event_type IN (SELECT user_id from events_table WHERE event_type > 500 and event_type < 505)
GROUP BY user_id, event_type; GROUP BY user_id, event_type;
ERROR: cannot perform distributed planning for the given modification $Q$);
DETAIL: Select query cannot be pushed down to the worker. coordinator_plan
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> HashAggregate
Group Key: remote_scan.user_id, remote_scan.event_type
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(8 rows)
--------------------------------------------------------------------- ---------------------------------------------------------------------
--------------------------------------------------------------------- ---------------------------------------------------------------------
-- Find my assets that have the highest probability and fetch their metadata -- Find my assets that have the highest probability and fetch their metadata
@ -662,8 +764,7 @@ FROM
) temp ) temp
ON users_table.user_id = temp.user_id ON users_table.user_id = temp.user_id
WHERE users_table.value_1 < 50; WHERE users_table.value_1 < 50;
ERROR: cannot perform distributed planning for the given modification ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
DETAIL: Select query cannot be pushed down to the worker.
-- not pushable since the join is not on the partition key -- not pushable since the join is not on the partition key
INSERT INTO agg_results_third(user_id, value_1_agg, value_3_agg) INSERT INTO agg_results_third(user_id, value_1_agg, value_3_agg)
SELECT SELECT
@ -680,8 +781,8 @@ FROM
) temp ) temp
ON users_table.user_id = temp.user_id ON users_table.user_id = temp.user_id
WHERE users_table.value_1 < 50; WHERE users_table.value_1 < 50;
ERROR: cannot perform distributed planning for the given modification ERROR: the query contains a join that requires repartitioning
DETAIL: Select query cannot be pushed down to the worker. HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- supported via recursive planning -- supported via recursive planning
INSERT INTO agg_results (user_id, agg_time, value_2_agg) INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT SELECT

View File

@ -70,9 +70,13 @@ WHERE
AND EXISTS (SELECT * FROM ids_to_delete); AND EXISTS (SELECT * FROM ids_to_delete);
DEBUG: generating subplan XXX_1 for CTE ids_to_delete: SELECT tenant_id FROM with_dml.distributed_table WHERE (dept OPERATOR(pg_catalog.=) 5) DEBUG: generating subplan XXX_1 for CTE ids_to_delete: SELECT tenant_id FROM with_dml.distributed_table WHERE (dept OPERATOR(pg_catalog.=) 5)
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE with_dml.distributed_table SET dept = (distributed_table.dept OPERATOR(pg_catalog.+) 1) FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_delete, (SELECT distributed_table_1.tenant_id FROM with_dml.distributed_table distributed_table_1 WHERE ((distributed_table_1.tenant_id)::integer OPERATOR(pg_catalog.<) 60)) some_tenants WHERE ((some_tenants.tenant_id OPERATOR(pg_catalog.=) ids_to_delete.tenant_id) AND (distributed_table.tenant_id OPERATOR(pg_catalog.=) some_tenants.tenant_id) AND (EXISTS (SELECT ids_to_delete_1.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_delete_1))) DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE with_dml.distributed_table SET dept = (distributed_table.dept OPERATOR(pg_catalog.+) 1) FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_delete, (SELECT distributed_table_1.tenant_id FROM with_dml.distributed_table distributed_table_1 WHERE ((distributed_table_1.tenant_id)::integer OPERATOR(pg_catalog.<) 60)) some_tenants WHERE ((some_tenants.tenant_id OPERATOR(pg_catalog.=) ids_to_delete.tenant_id) AND (distributed_table.tenant_id OPERATOR(pg_catalog.=) some_tenants.tenant_id) AND (EXISTS (SELECT ids_to_delete_1.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_delete_1)))
-- this query errors out since we've some hard SET client_min_messages TO WARNING;
-- errors in the INSERT ... SELECT pushdown -- this query falls back repartitioned insert/select since we've some hard
-- which prevents to fallback to recursive planning -- errors in the INSERT ... SELECT pushdown which prevents to fallback to
-- recursive planning
SELECT * FROM
coordinator_plan($Q$
EXPLAIN (costs off)
WITH ids_to_upsert AS WITH ids_to_upsert AS
( (
SELECT tenant_id FROM distributed_table WHERE dept > 7 SELECT tenant_id FROM distributed_table WHERE dept > 7
@ -81,8 +85,14 @@ INSERT INTO distributed_table
SELECT distributed_table.tenant_id FROM ids_to_upsert, distributed_table SELECT distributed_table.tenant_id FROM ids_to_upsert, distributed_table
WHERE distributed_table.tenant_id = ids_to_upsert.tenant_id WHERE distributed_table.tenant_id = ids_to_upsert.tenant_id
ON CONFLICT (tenant_id) DO UPDATE SET dept = 8; ON CONFLICT (tenant_id) DO UPDATE SET dept = 8;
ERROR: cannot perform distributed planning for the given modification $Q$) s
DETAIL: Select query cannot be pushed down to the worker. WHERE s LIKE '%INSERT/SELECT method%';
query_plan
---------------------------------------------------------------------
INSERT/SELECT method: repartition
(1 row)
SET client_min_messages TO DEBUG1;
-- the following query is very similar to the above one -- the following query is very similar to the above one
-- but this time the query is pulled to coordinator since -- but this time the query is pulled to coordinator since
-- we return before hitting any hard errors -- we return before hitting any hard errors

View File

@ -460,9 +460,26 @@ create table test(x int, y int);
select create_distributed_table('test', 'x'); select create_distributed_table('test', 'x');
set citus.enable_repartition_joins to true; set citus.enable_repartition_joins to true;
INSERT INTO test SELECT i, i FROM generate_series(1, 10) i;
EXPLAIN (costs off) INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y);
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
insert into test(y, x) select a.x, b.y from test a JOIN test b USING (y); INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y);
RESET client_min_messages; RESET client_min_messages;
SELECT count(*) FROM test;
TRUNCATE test;
INSERT INTO test SELECT i, i FROM generate_series(1, 10) i;
EXPLAIN (costs off) INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y);
SET client_min_messages TO DEBUG1;
INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y);
RESET client_min_messages;
SELECT count(*) FROM test;
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA insert_select_repartition CASCADE; DROP SCHEMA insert_select_repartition CASCADE;

View File

@ -623,13 +623,11 @@ FROM
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id
WHERE raw_events_second.user_id IN (19, 20, 21); WHERE raw_events_second.user_id IN (19, 20, 21);
-- the following is a very tricky query for Citus SET client_min_messages TO WARNING;
-- although we do not support pushing down JOINs on non-partition
-- columns here it is safe to push it down given that we're looking for -- following query should use repartitioned joins and results should
-- a specific value (i.e., value_1 = 12) on the joining column. -- be routed via coordinator
-- Note that the query always hits the same shard on raw_events_second SET citus.enable_repartition_joins TO true;
-- and this query wouldn't have worked if we're to use different worker
-- count or shard replication factor
INSERT INTO agg_events INSERT INTO agg_events
(user_id) (user_id)
SELECT raw_events_first.user_id SELECT raw_events_first.user_id
@ -669,11 +667,15 @@ FROM
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1; raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1;
-- same as the above with INNER JOIN -- same as the above with INNER JOIN
-- we support this with route to coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_first.user_id raw_events_first.user_id
FROM FROM
raw_events_first INNER JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1; raw_events_first INNER JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1;
$Q$);
-- even if there is a filter on the partition key, since the join is not on the partition key we reject -- even if there is a filter on the partition key, since the join is not on the partition key we reject
-- this query -- this query
@ -686,32 +688,48 @@ WHERE
raw_events_first.user_id = 10; raw_events_first.user_id = 10;
-- same as the above with INNER JOIN -- same as the above with INNER JOIN
-- we support this with route to coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_first.user_id raw_events_first.user_id
FROM FROM
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1
WHERE raw_events_first.user_id = 10; WHERE raw_events_first.user_id = 10;
$Q$);
-- make things a bit more complicate with IN clauses -- make things a bit more complicate with IN clauses
-- we support this with route to coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_first.user_id raw_events_first.user_id
FROM FROM
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1
WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4); WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4);
$Q$);
-- implicit join on non partition column should also not be pushed down -- implicit join on non partition column should also not be pushed down,
-- so we fall back to route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id) (user_id)
SELECT raw_events_first.user_id SELECT raw_events_first.user_id
FROM raw_events_first, FROM raw_events_first,
raw_events_second raw_events_second
WHERE raw_events_second.user_id = raw_events_first.value_1; WHERE raw_events_second.user_id = raw_events_first.value_1;
$Q$);
-- the following is again a tricky query for Citus RESET client_min_messages;
-- if the given filter was on value_1 as shown in the above, Citus could
-- push it down. But here the query is refused -- The following is again a tricky query for Citus. If the given filter was
-- on value_1 as shown in the above, Citus could push it down and use
-- distributed INSERT/SELECT. But we instead fall back to route via coordinator.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id) (user_id)
SELECT raw_events_first.user_id SELECT raw_events_first.user_id
@ -719,10 +737,12 @@ WHERE
raw_events_second raw_events_second
WHERE raw_events_second.user_id = raw_events_first.value_1 WHERE raw_events_second.user_id = raw_events_first.value_1
AND raw_events_first.value_2 = 12; AND raw_events_first.value_2 = 12;
$Q$);
-- lets do some unsupported query tests with subqueries
-- foo is not joined on the partition key so the query is not -- foo is not joined on the partition key so the query is not
-- pushed down -- pushed down. So instead we route via coordinator.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id, value_4_agg) (user_id, value_4_agg)
SELECT SELECT
@ -751,9 +771,12 @@ WHERE
ON (f.id = f2.id)) as outer_most ON (f.id = f2.id)) as outer_most
GROUP BY GROUP BY
outer_most.id; outer_most.id;
$Q$);
-- if the given filter was on value_1 as shown in the above, Citus could -- if the given filter was on value_1 as shown in the above, Citus could
-- push it down. But here the query is refused -- push it down. But here the query falls back to route via coordinator.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id) (user_id)
SELECT raw_events_first.user_id SELECT raw_events_first.user_id
@ -761,10 +784,12 @@ WHERE
raw_events_second raw_events_second
WHERE raw_events_second.user_id = raw_events_first.value_1 WHERE raw_events_second.user_id = raw_events_first.value_1
AND raw_events_first.value_2 = 12; AND raw_events_first.value_2 = 12;
$Q$);
-- lets do some unsupported query tests with subqueries
-- foo is not joined on the partition key so the query is not -- foo is not joined on the partition key so the query is not
-- pushed down -- pushed down, and it falls back to route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id, value_4_agg) (user_id, value_4_agg)
SELECT SELECT
@ -793,6 +818,7 @@ WHERE
ON (f.id = f2.id)) as outer_most ON (f.id = f2.id)) as outer_most
GROUP BY GROUP BY
outer_most.id; outer_most.id;
$Q$);
INSERT INTO agg_events INSERT INTO agg_events
(value_4_agg, (value_4_agg,
@ -810,6 +836,8 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
GROUP BY raw_events_second.user_id) AS foo; GROUP BY raw_events_second.user_id) AS foo;
SET client_min_messages TO DEBUG2;
-- INSERT returns NULL partition key value via coordinator -- INSERT returns NULL partition key value via coordinator
INSERT INTO agg_events INSERT INTO agg_events
(value_4_agg, (value_4_agg,
@ -941,7 +969,12 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id = f2.id); ON (f.id = f2.id);
SET client_min_messages TO WARNING;
-- cannot pushdown the query since the JOIN is not equi JOIN -- cannot pushdown the query since the JOIN is not equi JOIN
-- falls back to route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id, value_4_agg) (user_id, value_4_agg)
SELECT SELECT
@ -969,9 +1002,12 @@ outer_most.id, max(outer_most.value)
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id != f2.id)) as outer_most ON (f.id != f2.id)) as outer_most
GROUP BY outer_most.id; GROUP BY outer_most.id;
$Q$);
-- cannot pushdown since foo2 is not join on partition key -- cannot pushdown since foo2 is not join on partition key
-- falls back to route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id, value_4_agg) (user_id, value_4_agg)
SELECT SELECT
@ -1000,8 +1036,12 @@ FROM
ON (f.id = f2.id)) as outer_most ON (f.id = f2.id)) as outer_most
GROUP BY GROUP BY
outer_most.id; outer_most.id;
$Q$);
-- cannot push down since foo doesn't have en equi join -- cannot push down since foo doesn't have en equi join
-- falls back to route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id, value_4_agg) (user_id, value_4_agg)
SELECT SELECT
@ -1030,10 +1070,13 @@ FROM
ON (f.id = f2.id)) as outer_most ON (f.id = f2.id)) as outer_most
GROUP BY GROUP BY
outer_most.id; outer_most.id;
$Q$);
-- some unsupported LATERAL JOINs -- some unsupported LATERAL JOINs
-- join on averages is not on the partition key -- join on averages is not on the partition key
-- should fall back to route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events (user_id, value_4_agg) INSERT INTO agg_events (user_id, value_4_agg)
SELECT SELECT
averages.user_id, avg(averages.value_4) averages.user_id, avg(averages.value_4)
@ -1050,8 +1093,12 @@ FROM
raw_events_first WHERE raw_events_first WHERE
value_4 = reference_ids.user_id) as averages ON true value_4 = reference_ids.user_id) as averages ON true
GROUP BY averages.user_id; GROUP BY averages.user_id;
$Q$);
-- join among reference_ids and averages is not on the partition key -- join among reference_ids and averages is not on the partition key
-- should fall back to route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events (user_id, value_4_agg) INSERT INTO agg_events (user_id, value_4_agg)
SELECT SELECT
averages.user_id, avg(averages.value_4) averages.user_id, avg(averages.value_4)
@ -1067,8 +1114,12 @@ FROM
FROM FROM
raw_events_first) as averages ON averages.value_4 = reference_ids.user_id raw_events_first) as averages ON averages.value_4 = reference_ids.user_id
GROUP BY averages.user_id; GROUP BY averages.user_id;
$Q$);
-- join among the agg_ids and averages is not on the partition key -- join among the agg_ids and averages is not on the partition key
-- should fall back to route via coordinator
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events (user_id, value_4_agg) INSERT INTO agg_events (user_id, value_4_agg)
SELECT SELECT
averages.user_id, avg(averages.value_4) averages.user_id, avg(averages.value_4)
@ -1086,19 +1137,24 @@ FROM
JOIN LATERAL JOIN LATERAL
(SELECT user_id, value_4 FROM agg_events) as agg_ids ON (agg_ids.value_4 = averages.user_id) (SELECT user_id, value_4 FROM agg_events) as agg_ids ON (agg_ids.value_4 = averages.user_id)
GROUP BY averages.user_id; GROUP BY averages.user_id;
$Q$);
-- not supported subqueries in WHERE clause -- Selected value in the WHERE is not partition key, so we cannot use distributed
-- since the selected value in the WHERE is not -- INSERT/SELECT and falls back route via coordinator
-- partition key SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)
SELECT user_id SELECT user_id
FROM raw_events_first FROM raw_events_first
WHERE user_id IN (SELECT value_1 WHERE user_id IN (SELECT value_1
FROM raw_events_second); FROM raw_events_second);
$Q$);
-- same as above but slightly more complex -- same as above but slightly more complex
-- since it also includes subquery in FROM as well -- since it also includes subquery in FROM as well
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_events INSERT INTO agg_events
(user_id) (user_id)
SELECT f2.id FROM SELECT f2.id FROM
@ -1124,9 +1180,12 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
ON (f.id = f2.id) ON (f.id = f2.id)
WHERE f.id IN (SELECT value_1 WHERE f.id IN (SELECT value_1
FROM raw_events_second); FROM raw_events_second);
$Q$);
-- some more semi-anti join tests -- some more semi-anti join tests
SET client_min_messages TO DEBUG2;
-- join in where -- join in where
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)
@ -1136,7 +1195,12 @@ WHERE user_id IN (SELECT raw_events_second.user_id
FROM raw_events_second, raw_events_first FROM raw_events_second, raw_events_first
WHERE raw_events_second.user_id = raw_events_first.user_id AND raw_events_first.user_id = 200); WHERE raw_events_second.user_id = raw_events_first.user_id AND raw_events_first.user_id = 200);
RESET client_min_messages;
-- we cannot push this down since it is NOT IN -- we cannot push this down since it is NOT IN
-- we use repartition insert/select instead
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)
SELECT user_id SELECT user_id
@ -1144,7 +1208,9 @@ FROM raw_events_first
WHERE user_id NOT IN (SELECT raw_events_second.user_id WHERE user_id NOT IN (SELECT raw_events_second.user_id
FROM raw_events_second, raw_events_first FROM raw_events_second, raw_events_first
WHERE raw_events_second.user_id = raw_events_first.user_id AND raw_events_first.user_id = 200); WHERE raw_events_second.user_id = raw_events_first.user_id AND raw_events_first.user_id = 200);
$Q$);
SET client_min_messages TO DEBUG2;
-- safe to push down -- safe to push down
INSERT INTO raw_events_second INSERT INTO raw_events_second
@ -1195,8 +1261,12 @@ WHERE NOT EXISTS (SELECT 1
GROUP BY GROUP BY
outer_most.id; outer_most.id;
RESET client_min_messages;
-- cannot push down since the f.id IN is matched with value_1 -- cannot push down since the f.id IN is matched with value_1
-- we use repartition insert/select instead
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)
SELECT user_id SELECT user_id
@ -1224,6 +1294,9 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
ON (f.id = f2.id) ON (f.id = f2.id)
WHERE f.id IN (SELECT value_1 WHERE f.id IN (SELECT value_1
FROM raw_events_second)); FROM raw_events_second));
$Q$);
SET client_min_messages TO DEBUG2;
-- same as above, but this time is it safe to push down since -- same as above, but this time is it safe to push down since
-- f.id IN is matched with user_id -- f.id IN is matched with user_id
@ -1255,6 +1328,8 @@ ON (f.id = f2.id)
WHERE f.id IN (SELECT user_id WHERE f.id IN (SELECT user_id
FROM raw_events_second)); FROM raw_events_second));
RESET client_min_messages;
-- cannot push down since top level user_id is matched with NOT IN -- cannot push down since top level user_id is matched with NOT IN
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)

View File

@ -153,8 +153,9 @@ UPDATE target_table SET col_2 = 4 WHERE col_1 IN (SELECT col_1 FROM cte);
RESET client_min_messages; RESET client_min_messages;
-- Following query is not supported since error checks of the subquery pushdown planner -- Following query is supported by using repartition join for the insert/select
-- and insert select planner have not been unified. It should work after unifying them. SELECT coordinator_plan($Q$
EXPLAIN (costs off)
WITH cte AS ( WITH cte AS (
SELECT SELECT
col_1, col_2 col_1, col_2
@ -167,6 +168,7 @@ SELECT
source_table_1.col_2 source_table_1.col_2
FROM cte, source_table_1 FROM cte, source_table_1
WHERE cte.col_1 = source_table_1.col_1 ON CONFLICT DO NOTHING; WHERE cte.col_1 = source_table_1.col_1 ON CONFLICT DO NOTHING;
$Q$);
SET citus.enable_cte_inlining TO true; SET citus.enable_cte_inlining TO true;

View File

@ -416,29 +416,40 @@ ORDER BY user_lastseen DESC;
------------------------------------ ------------------------------------
------------------------------------ ------------------------------------
-- not pushable since partition key is NOT IN -- not pushable since partition key is NOT IN. Use pull to coordinator instead.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third (user_id) INSERT INTO agg_results_third (user_id)
SELECT DISTINCT user_id SELECT DISTINCT user_id
FROM users_table FROM users_table
WHERE user_id NOT IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20) WHERE user_id NOT IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20)
AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 30 AND value_1 <= 40) AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 30 AND value_1 <= 40)
AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60); AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60);
$Q$);
-- not pushable since partition key is not selected from the second subquery -- not pushable since partition key is not selected from the second subquery.
-- Use pull to coordinator instead.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third (user_id) INSERT INTO agg_results_third (user_id)
SELECT DISTINCT user_id SELECT DISTINCT user_id
FROM users_table FROM users_table
WHERE user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20) WHERE user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20)
AND user_id IN (SELECT value_1 FROM users_table WHERE value_1 >= 30 AND value_1 <= 40) AND user_id IN (SELECT value_1 FROM users_table WHERE value_1 >= 30 AND value_1 <= 40)
AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60); AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60);
$Q$);
-- not pushable since second subquery does not return bare partition key -- not pushable since second subquery does not return bare partition key.
-- Use pull to coordinator instead.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third (user_id) INSERT INTO agg_results_third (user_id)
SELECT DISTINCT user_id SELECT DISTINCT user_id
FROM users_table FROM users_table
WHERE user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20) WHERE user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 10 AND value_1 <= 20)
AND user_id IN (SELECT 3 * user_id FROM users_table WHERE value_1 >= 30 AND value_1 <= 40) AND user_id IN (SELECT 3 * user_id FROM users_table WHERE value_1 >= 30 AND value_1 <= 40)
AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60); AND user_id IN (SELECT user_id FROM users_table WHERE value_1 >= 50 AND value_1 <= 60);
$Q$);
------------------------------------ ------------------------------------
------------------------------------ ------------------------------------
@ -568,7 +579,9 @@ INSERT INTO agg_results_third(user_id, value_2_agg)
------------------------------------ ------------------------------------
------------------------------------ ------------------------------------
-- not pushable due to NOT IN -- not pushable due to NOT IN. Use repartition insert/select.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third(user_id) INSERT INTO agg_results_third(user_id)
Select user_id Select user_id
From events_table From events_table
@ -579,8 +592,12 @@ And user_id NOT in
From users_table From users_table
Where value_1 = 15 Where value_1 = 15
And value_2 > 25); And value_2 > 25);
$Q$);
-- not pushable since we're not selecting the partition key -- not pushable since we're not selecting the partition key.
-- Use repartition insert/select.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third(user_id) INSERT INTO agg_results_third(user_id)
Select user_id Select user_id
From events_table From events_table
@ -591,9 +608,12 @@ And user_id in
From users_table From users_table
Where value_1 = 15 Where value_1 = 15
And value_2 > 25); And value_2 > 25);
$Q$);
-- not pushable since we're not selecting the partition key -- not pushable since we're not selecting the partition key
-- from the events table -- from the events table. Use repartition insert/select.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third(user_id) INSERT INTO agg_results_third(user_id)
Select user_id Select user_id
From events_table From events_table
@ -604,6 +624,7 @@ And event_type in
From users_table From users_table
Where value_1 = 15 Where value_1 = 15
And value_2 > 25); And value_2 > 25);
$Q$);
------------------------------------ ------------------------------------
------------------------------------ ------------------------------------
@ -611,23 +632,33 @@ And event_type in
------------------------------------ ------------------------------------
------------------------------------ ------------------------------------
-- not pushable due to NOT IN -- not pushable due to NOT IN. Use pull to coordinator instead.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third(user_id, value_1_agg) INSERT INTO agg_results_third(user_id, value_1_agg)
SELECT user_id, event_type FROM events_table SELECT user_id, event_type FROM events_table
WHERE user_id NOT IN (SELECT user_id from events_table WHERE event_type > 500 and event_type < 505) WHERE user_id NOT IN (SELECT user_id from events_table WHERE event_type > 500 and event_type < 505)
GROUP BY user_id, event_type; GROUP BY user_id, event_type;
$Q$);
-- not pushable due to not selecting the partition key -- not pushable due to not selecting the partition key. Use pull to coordinator.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third(user_id, value_1_agg) INSERT INTO agg_results_third(user_id, value_1_agg)
SELECT user_id, event_type FROM events_table SELECT user_id, event_type FROM events_table
WHERE user_id IN (SELECT value_2 from events_table WHERE event_type > 500 and event_type < 505) WHERE user_id IN (SELECT value_2 from events_table WHERE event_type > 500 and event_type < 505)
GROUP BY user_id, event_type; GROUP BY user_id, event_type;
$Q$);
-- not pushable due to not comparing user id from the events table -- not pushable due to not comparing user id from the events table.
-- Use pull to coordinator.
SELECT coordinator_plan($Q$
EXPLAIN (costs off)
INSERT INTO agg_results_third(user_id, value_1_agg) INSERT INTO agg_results_third(user_id, value_1_agg)
SELECT user_id, event_type FROM events_table SELECT user_id, event_type FROM events_table
WHERE event_type IN (SELECT user_id from events_table WHERE event_type > 500 and event_type < 505) WHERE event_type IN (SELECT user_id from events_table WHERE event_type > 500 and event_type < 505)
GROUP BY user_id, event_type; GROUP BY user_id, event_type;
$Q$);
------------------------------------ ------------------------------------
------------------------------------ ------------------------------------

View File

@ -56,9 +56,14 @@ WHERE
AND distributed_table.tenant_id = some_tenants.tenant_id AND distributed_table.tenant_id = some_tenants.tenant_id
AND EXISTS (SELECT * FROM ids_to_delete); AND EXISTS (SELECT * FROM ids_to_delete);
-- this query errors out since we've some hard SET client_min_messages TO WARNING;
-- errors in the INSERT ... SELECT pushdown
-- which prevents to fallback to recursive planning -- this query falls back repartitioned insert/select since we've some hard
-- errors in the INSERT ... SELECT pushdown which prevents to fallback to
-- recursive planning
SELECT * FROM
coordinator_plan($Q$
EXPLAIN (costs off)
WITH ids_to_upsert AS WITH ids_to_upsert AS
( (
SELECT tenant_id FROM distributed_table WHERE dept > 7 SELECT tenant_id FROM distributed_table WHERE dept > 7
@ -67,6 +72,10 @@ INSERT INTO distributed_table
SELECT distributed_table.tenant_id FROM ids_to_upsert, distributed_table SELECT distributed_table.tenant_id FROM ids_to_upsert, distributed_table
WHERE distributed_table.tenant_id = ids_to_upsert.tenant_id WHERE distributed_table.tenant_id = ids_to_upsert.tenant_id
ON CONFLICT (tenant_id) DO UPDATE SET dept = 8; ON CONFLICT (tenant_id) DO UPDATE SET dept = 8;
$Q$) s
WHERE s LIKE '%INSERT/SELECT method%';
SET client_min_messages TO DEBUG1;
-- the following query is very similar to the above one -- the following query is very similar to the above one
-- but this time the query is pulled to coordinator since -- but this time the query is pulled to coordinator since