fix insert select complex query pushdown

fix-noncolocated-insert-select-pushdown
aykutbozkurt 2023-05-09 21:17:04 +03:00
parent 07b8cd2634
commit 0668246a8a
8 changed files with 186 additions and 115 deletions

View File

@ -730,27 +730,6 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
"table", NULL, NULL);
}
if (!HasDistributionKey(targetRelationId) ||
subqueryRteListProperties->hasSingleShardDistTable)
{
/*
* XXX: Better to check this regardless of the fact that the target table
* has a distribution column or not.
*/
List *distributedRelationIdList = DistributedRelationIdList(subquery);
distributedRelationIdList = lappend_oid(distributedRelationIdList,
targetRelationId);
if (!AllDistributedRelationsInListColocated(distributedRelationIdList))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"distributed INSERT ... SELECT cannot reference a "
"distributed table without a shard key together "
"with non-colocated distributed tables",
NULL, NULL);
}
}
if (HasDistributionKey(targetRelationId))
{
/* ensure that INSERT's partition column comes from SELECT's partition column */
@ -760,22 +739,22 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
{
return error;
}
/*
* We expect partition column values come from colocated tables. Note that we
* skip this check from the reference table case given that all reference tables
* are already (and by default) co-located.
*/
if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"INSERT target table and the source relation of the SELECT partition "
"column value must be colocated in distributed INSERT ... SELECT",
NULL, NULL);
}
}
}
/* All tables in source list and target table should be colocated. */
List *distributedRelationIdList = DistributedRelationIdList(subquery);
distributedRelationIdList = lappend_oid(distributedRelationIdList,
targetRelationId);
if (!AllDistributedRelationsInListColocated(distributedRelationIdList))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"INSERT target relation and all source relations of the "
"SELECT must be colocated in distributed INSERT ... SELECT",
NULL, NULL);
}
return NULL;
}

View File

@ -549,7 +549,7 @@ SELECT create_distributed_table('target_table', 'a');
INSERT INTO source_table SELECT floor(i/4), i*i FROM generate_series(1, 20) i;
SET client_min_messages TO DEBUG1;
INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
RESET client_min_messages;
SELECT * FROM target_table ORDER BY a;
@ -622,40 +622,40 @@ INSERT INTO target_table
WHERE a BETWEEN $1 AND $2 GROUP BY a;
SET client_min_messages TO DEBUG1;
EXECUTE insert_plan(0, 2);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(0, 2);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(0, 2);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(0, 2);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(0, 2);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(0, 2);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(2, 4);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(2, 4);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(2, 4);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(2, 4);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(2, 4);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(2, 4);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
RESET client_min_messages;
SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a;
@ -680,25 +680,25 @@ INSERT INTO target_table
WHERE a=$1 GROUP BY a;
SET client_min_messages TO DEBUG1;
EXECUTE insert_plan(0);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
EXECUTE insert_plan(0);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
EXECUTE insert_plan(0);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
EXECUTE insert_plan(0);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
EXECUTE insert_plan(0);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
EXECUTE insert_plan(0);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
EXECUTE insert_plan(0);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
RESET client_min_messages;
SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a;
@ -760,10 +760,10 @@ WITH r AS (
INSERT INTO target_table SELECT * FROM source_table RETURNING *
)
INSERT INTO target_table SELECT source_table.a, max(source_table.b) FROM source_table NATURAL JOIN r GROUP BY source_table.a;
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: only SELECT, UPDATE, or DELETE common table expressions may be router planned
DEBUG: generating subplan XXX_1 for CTE r: INSERT INTO insert_select_repartition.target_table (a, b) SELECT source_table.a, source_table.b FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, max AS b FROM (SELECT source_table.a, max(source_table.b) AS max FROM (insert_select_repartition.source_table JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) r USING (a, b)) GROUP BY source_table.a) citus_insert_select_subquery
DEBUG: Router planner cannot handle multi-shard select queries
@ -1013,7 +1013,7 @@ SELECT create_distributed_table('target_table', 'a');
INSERT INTO source_table SELECT i, i * i FROM generate_series(1, 10) i;
SET client_min_messages TO DEBUG2;
INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table;
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: performing repartitioned INSERT ... SELECT
DEBUG: partitioning SELECT query by column index 0 with name 'a'
@ -1047,7 +1047,7 @@ EXPLAIN (costs off) INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2
SET client_min_messages TO DEBUG2;
INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table;
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
RESET client_min_messages;
@ -1217,7 +1217,7 @@ ON CONFLICT(c1, c2, c3, c4, c5, c6)
DO UPDATE SET
cardinality = enriched.cardinality + excluded.cardinality,
sum = enriched.sum + excluded.sum;
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: performing repartitioned INSERT ... SELECT
DEBUG: partitioning SELECT query by column index 0 with name 'c1'

View File

@ -549,7 +549,7 @@ SELECT create_distributed_table('target_table', 'a');
INSERT INTO source_table SELECT floor(i/4), i*i FROM generate_series(1, 20) i;
SET client_min_messages TO DEBUG1;
INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
RESET client_min_messages;
SELECT * FROM target_table ORDER BY a;
@ -622,40 +622,40 @@ INSERT INTO target_table
WHERE a BETWEEN $1 AND $2 GROUP BY a;
SET client_min_messages TO DEBUG1;
EXECUTE insert_plan(0, 2);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(0, 2);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(0, 2);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(0, 2);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(0, 2);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(0, 2);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(2, 4);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(2, 4);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(2, 4);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(2, 4);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(2, 4);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
EXECUTE insert_plan(2, 4);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: performing repartitioned INSERT ... SELECT
RESET client_min_messages;
SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a;
@ -680,25 +680,25 @@ INSERT INTO target_table
WHERE a=$1 GROUP BY a;
SET client_min_messages TO DEBUG1;
EXECUTE insert_plan(0);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
EXECUTE insert_plan(0);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
EXECUTE insert_plan(0);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
EXECUTE insert_plan(0);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
EXECUTE insert_plan(0);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
EXECUTE insert_plan(0);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
EXECUTE insert_plan(0);
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
RESET client_min_messages;
SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a;
@ -760,10 +760,10 @@ WITH r AS (
INSERT INTO target_table SELECT * FROM source_table RETURNING *
)
INSERT INTO target_table SELECT source_table.a, max(source_table.b) FROM source_table NATURAL JOIN r GROUP BY source_table.a;
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: only SELECT, UPDATE, or DELETE common table expressions may be router planned
DEBUG: generating subplan XXX_1 for CTE r: INSERT INTO insert_select_repartition.target_table (a, b) SELECT a, b FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, max AS b FROM (SELECT source_table.a, max(source_table.b) AS max FROM (insert_select_repartition.source_table JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) r USING (a, b)) GROUP BY source_table.a) citus_insert_select_subquery
DEBUG: Router planner cannot handle multi-shard select queries
@ -1013,7 +1013,7 @@ SELECT create_distributed_table('target_table', 'a');
INSERT INTO source_table SELECT i, i * i FROM generate_series(1, 10) i;
SET client_min_messages TO DEBUG2;
INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table;
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: performing repartitioned INSERT ... SELECT
DEBUG: partitioning SELECT query by column index 0 with name 'a'
@ -1047,7 +1047,7 @@ EXPLAIN (costs off) INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2
SET client_min_messages TO DEBUG2;
INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table;
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
RESET client_min_messages;
@ -1217,7 +1217,7 @@ ON CONFLICT(c1, c2, c3, c4, c5, c6)
DO UPDATE SET
cardinality = enriched.cardinality + excluded.cardinality,
sum = enriched.sum + excluded.sum;
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: performing repartitioned INSERT ... SELECT
DEBUG: partitioning SELECT query by column index 0 with name 'c1'

View File

@ -118,21 +118,25 @@ SET client_min_messages TO DEBUG2;
-- different table types together with single-shard tables.
-- use a single-shard table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a reference table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 RIGHT JOIN reference_table USING (b) WHERE reference_table.a >= 1 AND reference_table.a <= 5;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 INTERSECT SELECT * FROM reference_table;
@ -141,11 +145,13 @@ DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a colocated single-shard table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN matview USING (a);
@ -158,7 +164,8 @@ DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a non-colocated single-shard table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: found no worker with all shard placements
INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c2_t1;
@ -167,25 +174,27 @@ ERROR: queries that reference a distributed table without a shard key can only
DETAIL: found no worker with all shard placements
-- use a distributed table that is colocated with the target table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 1
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a distributed table that is not colocated with the target table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN distributed_table_c2_t1 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
-- use a citus local table
@ -200,12 +209,14 @@ ERROR: queries that reference a distributed table without a shard key can only
DETAIL: Local tables cannot be used in distributed queries.
-- use append / range distributed tables
INSERT INTO range_table SELECT * FROM nullkey_c1_t1;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO append_table SELECT * FROM nullkey_c1_t1;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: INSERT ... SELECT into an append-distributed table is not supported
SELECT avg(a), avg(b) FROM distributed_table_c1_t1 ORDER BY 1, 2;
@ -373,20 +384,20 @@ ERROR: queries that reference a distributed table without a shard key can only
DETAIL: Local tables cannot be used in distributed queries.
-- use a distributed table
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN reference_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN nullkey_c1_t1 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
-- use a non-colocated single-shard table
INSERT INTO nullkey_c2_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a)) q JOIN nullkey_c1_t2 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a materialized view
@ -407,7 +418,7 @@ DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT * FROM append_table;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner does not support append-partitioned tables.
DEBUG: Collecting INSERT ... SELECT results on coordinator
SELECT avg(a), avg(b) FROM nullkey_c1_t1 ORDER BY 1, 2;
@ -505,7 +516,9 @@ WITH cte AS (
)
SELECT d1, COALESCE(d2, a) FROM cte WHERE d1 IS NOT NULL AND d2 IS NOT NULL;
DEBUG: CTE cte is going to be inlined via distributed planning
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO citus_local_table (a, b)
@ -553,7 +566,8 @@ JOIN (
) q
) t2 ON t1.b = t2.b
WHERE t2.sum_val > 2;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- MultiTaskRouterSelectQuerySupported() is unnecessarily restrictive
@ -578,7 +592,7 @@ JOIN reference_table AS t3 ON (t2.a = t3.a)
WHERE NOT EXISTS (
SELECT 1 FROM nullkey_c1_t2 AS t1 WHERE t1.b = t3.b
);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1
@ -587,7 +601,8 @@ FROM nullkey_c1_t1 AS t1
WHERE t1.a NOT IN (
SELECT DISTINCT t2.a FROM distributed_table_c1_t2 AS t2
);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO distributed_table_c1_t1
@ -619,7 +634,7 @@ WHERE t1.a IN (
) AS t4 ON t3.a = t4.a
) AS t2
);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM (SELECT t3.a FROM ((SELECT distributed_table_c1_t1.a FROM insert_select_single_shard_table.distributed_table_c1_t1 WHERE (distributed_table_c1_t1.b OPERATOR(pg_catalog.>) 4)) t3 JOIN (SELECT distributed_table_c1_t2.a FROM insert_select_single_shard_table.distributed_table_c1_t2 WHERE (distributed_table_c1_t2.b OPERATOR(pg_catalog.<) 7)) t4 ON ((t3.a OPERATOR(pg_catalog.=) t4.a)))) t2
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM insert_select_single_shard_table.reference_table t1 WHERE (a OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)))
DEBUG: Collecting INSERT ... SELECT results on coordinator
@ -739,16 +754,17 @@ DEBUG: distributed statement: INSERT INTO insert_select_single_shard_table.null
SET client_min_messages TO DEBUG1;
INSERT INTO distributed_table_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM nullkey_c1_t2 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a, b)
DO UPDATE SET b = t1.b + 10;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM distributed_table_c1_t1 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a)
DO UPDATE SET a = t1.a + 10;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- This also fails due to https://github.com/citusdata/citus/issues/6826.
INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM distributed_table_c1_t1 t2 JOIN reference_table t3 ON (t2.a = t3.a) WHERE t2.a = 3 ON CONFLICT (a)
DO UPDATE SET a = (SELECT max(b)+1 FROM distributed_table_c1_t1 WHERE a = 3);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: cannot execute a distributed query from a query on a shard
DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.

View File

@ -3242,5 +3242,32 @@ returning text_col_1;
string
(1 row)
CREATE TABLE dist_table_3(
dist_col bigint,
int_col integer
);
SELECT create_distributed_table('dist_table_3', 'dist_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- dist_table_2 and dist_table_3 are non-colocated source tables. Pulling the SELECT part to coordinator is also not possible.
-- Citus would not be able to handle this complex insert select.
INSERT INTO dist_table_1 SELECT dist_table_2.dist_col FROM dist_table_2 JOIN dist_table_3 USING(dist_col);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
CREATE TABLE dist_table_4(
dist_col integer,
int_col integer
);
SELECT create_distributed_table('dist_table_4', 'dist_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Even if target table distribution column is colocated with dist_table_2's distributed column, source tables dist_table_2 and dist_table_4
-- are non-colocated. Hence, SELECT part of the query should be pulled to coordinator.
INSERT INTO dist_table_1 SELECT dist_table_2.dist_col FROM dist_table_2 JOIN dist_table_4 ON dist_table_2.dist_col = dist_table_4.int_col;
SET client_min_messages TO ERROR;
DROP SCHEMA multi_insert_select CASCADE;

View File

@ -3242,5 +3242,32 @@ returning text_col_1;
string
(1 row)
CREATE TABLE dist_table_3(
dist_col bigint,
int_col integer
);
SELECT create_distributed_table('dist_table_3', 'dist_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- dist_table_2 and dist_table_3 are non-colocated source tables. Pulling the SELECT part to coordinator is also not possible.
-- Citus would not be able to handle this complex insert select.
INSERT INTO dist_table_1 SELECT dist_table_2.dist_col FROM dist_table_2 JOIN dist_table_3 USING(dist_col);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
CREATE TABLE dist_table_4(
dist_col integer,
int_col integer
);
SELECT create_distributed_table('dist_table_4', 'dist_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Even if target table distribution column is colocated with dist_table_2's distributed column, source tables dist_table_2 and dist_table_4
-- are non-colocated. Hence, SELECT part of the query should be pulled to coordinator.
INSERT INTO dist_table_1 SELECT dist_table_2.dist_col FROM dist_table_2 JOIN dist_table_4 ON dist_table_2.dist_col = dist_table_4.int_col;
SET client_min_messages TO ERROR;
DROP SCHEMA multi_insert_select CASCADE;

View File

@ -827,7 +827,7 @@ INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c1_t2;
SET client_min_messages TO DEBUG2;
-- between two non-colocated single-shard tables
INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c2_t1;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
@ -848,7 +848,7 @@ INSERT INTO nullkey_c1_t1 SELECT * FROM reference_table;
SET client_min_messages TO DEBUG2;
INSERT INTO nullkey_c1_t1 SELECT * FROM distributed_table;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT * FROM citus_local_table;
@ -865,7 +865,8 @@ DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table SELECT * FROM nullkey_c1_t1;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
@ -1311,7 +1312,7 @@ WITH cte AS (
)
SELECT (a+5)*2, b FROM cte;
DEBUG: CTE cte is going to be inlined via distributed planning
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: recursively planning left side of the right join since the outer side is a recurring rel
DEBUG: recursively planning distributed relation "distributed_table" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "distributed_table" to a subquery

View File

@ -2341,5 +2341,26 @@ join dist_table_2 t2 using (dist_col)
limit 1
returning text_col_1;
CREATE TABLE dist_table_3(
dist_col bigint,
int_col integer
);
SELECT create_distributed_table('dist_table_3', 'dist_col');
-- dist_table_2 and dist_table_3 are non-colocated source tables. Pulling the SELECT part to coordinator is also not possible.
-- Citus would not be able to handle this complex insert select.
INSERT INTO dist_table_1 SELECT dist_table_2.dist_col FROM dist_table_2 JOIN dist_table_3 USING(dist_col);
CREATE TABLE dist_table_4(
dist_col integer,
int_col integer
);
SELECT create_distributed_table('dist_table_4', 'dist_col');
-- Even if target table distribution column is colocated with dist_table_2's distributed column, source tables dist_table_2 and dist_table_4
-- are non-colocated. Hence, SELECT part of the query should be pulled to coordinator.
INSERT INTO dist_table_1 SELECT dist_table_2.dist_col FROM dist_table_2 JOIN dist_table_4 ON dist_table_2.dist_col = dist_table_4.int_col;
SET client_min_messages TO ERROR;
DROP SCHEMA multi_insert_select CASCADE;