From 39b7711527aeb48af150a85f29e83a7ab778714e Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Mon, 24 Apr 2023 13:19:22 +0300 Subject: [PATCH] Add support for more pushable / non-pushable insert .. select queries with null-shard-key tables (#6823) * Add support for dist insert select by selecting from a reference table. This was the only pushable insert .. select case that #6773 didn't cover. * For the cases where we insert into a Citus table but the INSERT .. SELECT query cannot be pushed down, allow pull-to-coordinator when possible. Remove the checks that we had at the very beginning of CreateInsertSelectPlanInternal so that we can try insert .. select via pull-to-coordinator for the cases where we cannot push-down the insert .. select query. What we support via pull-to-coordinator is still limited due to lacking of logical planner support for SELECT queries, but this commit at least allows using pull-to-coordinator for the cases where the select query can be planned via router planner, without limiting ourselves to restrictive top-level checks. Also introduce some additional restrictions into CreateDistributedInsertSelectPlan for the cases it was missing to check for null-shard-key tables. Indeed, it would make more sense to have those checks for distributed tables in general, via separate PRs against main branch. See https://github.com/citusdata/citus/pull/6817. * Add support for inserting into a Postgres table. --- .../planner/insert_select_planner.c | 156 ++-- .../regress/expected/create_null_dist_key.out | 1 - .../expected/insert_select_null_dist_key.out | 814 ++++++++++++++++++ .../regress/expected/query_null_dist_key.out | 113 ++- src/test/regress/multi_1_schedule | 1 + .../sql/insert_select_null_dist_key.sql | 470 ++++++++++ src/test/regress/sql/query_null_dist_key.sql | 57 +- 7 files changed, 1484 insertions(+), 128 deletions(-) create mode 100644 src/test/regress/expected/insert_select_null_dist_key.out create mode 100644 src/test/regress/sql/insert_select_null_dist_key.sql diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 175f6bc6f..62c0e8d68 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -57,7 +57,6 @@ static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId, PlannerRestrictionContext * plannerRestrictionContext, ParamListInfo boundParams); -static void ErrorIfInsertSelectWithNullDistKeyNotSupported(Query *originalQuery); static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); @@ -242,12 +241,6 @@ CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery, RaiseDeferredError(deferredError, ERROR); } - /* - * We support a limited set of INSERT .. SELECT queries if the query - * references a null-dist-key table. - */ - ErrorIfInsertSelectWithNullDistKeyNotSupported(originalQuery); - DistributedPlan *distributedPlan = CreateDistributedInsertSelectPlan(originalQuery, plannerRestrictionContext); @@ -267,74 +260,6 @@ CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery, } -/* - * ErrorIfInsertSelectWithNullDistKeyNotSupported throws an error if given INSERT - * .. SELECT query references a null-dist-key table (as the target table or in - * the SELECT clause) and is unsupported. - * - * Such an INSERT .. SELECT query is supported as long as the it only references - * a "colocated" set of null-dist-key tables, no other relation rte types. - */ -static void -ErrorIfInsertSelectWithNullDistKeyNotSupported(Query *originalQuery) -{ - RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery); - Query *subquery = subqueryRte->subquery; - RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery(subquery); - - RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(originalQuery); - Oid targetRelationId = insertRte->relid; - if (!IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE) && - subqueryRteListProperties->hasDistTableWithoutShardKey) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot select from a distributed table that " - "does not have a shard key when inserting into " - "a different table type"))); - } - else if (IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE)) - { - if (subqueryRteListProperties->hasPostgresLocalTable || - subqueryRteListProperties->hasReferenceTable || - subqueryRteListProperties->hasCitusLocalTable || - subqueryRteListProperties->hasDistTableWithShardKey || - subqueryRteListProperties->hasMaterializedView) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot select from different table types " - "when inserting into a distributed table " - "that does not have a shard key"))); - } - - if (!subqueryRteListProperties->hasDistTableWithoutShardKey) - { - /* - * This means that the SELECT doesn't reference any Citus tables, - * Postgres tables or materialized views but references a function - * call, a values claue etc., or a cte from INSERT. - * - * In that case, we rely on the common restrictions enforced by the - * INSERT .. SELECT planners. - */ - Assert(!NeedsDistributedPlanning(subquery)); - return; - } - - List *distributedRelationIdList = DistributedRelationIdList(subquery); - distributedRelationIdList = lappend_oid(distributedRelationIdList, - targetRelationId); - - if (!AllDistributedRelationsInListColocated(distributedRelationIdList)) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot select from a non-colocated distributed " - "table when inserting into a distributed table " - "that does not have a shard key"))); - } - } -} - - /* * CreateDistributedInsertSelectPlan creates a DistributedPlan for distributed * INSERT ... SELECT queries which could consist of multiple tasks. @@ -454,16 +379,6 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery, { RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); - RTEListProperties *selectRteListProperties = - GetRTEListPropertiesForQuery(selectRte->subquery); - if (selectRteListProperties->hasDistTableWithoutShardKey) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot select from a distributed table that " - "does not have a shard key when inserting into " - "a local table"))); - } - PrepareInsertSelectForCitusPlanner(insertSelectQuery); /* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */ @@ -800,10 +715,6 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte, NULL, NULL); } } - else if (IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE)) - { - /* we've already checked the subquery via ErrorIfInsertSelectWithNullDistKeyNotSupported */ - } else { /* @@ -819,25 +730,49 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte, "table", NULL, NULL); } - /* ensure that INSERT's partition column comes from SELECT's partition column */ - error = InsertPartitionColumnMatchesSelect(queryTree, insertRte, subqueryRte, - &selectPartitionColumnTableId); - if (error) + if (!HasDistributionKey(targetRelationId) || + subqueryRteListProperties->hasDistTableWithoutShardKey) { - return error; + /* + * XXX: Better to check this regardless of the fact that the target table + * has a distribution column or not. + */ + List *distributedRelationIdList = DistributedRelationIdList(subquery); + distributedRelationIdList = lappend_oid(distributedRelationIdList, + targetRelationId); + + if (!AllDistributedRelationsInListColocated(distributedRelationIdList)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "distributed INSERT ... SELECT cannot reference a " + "distributed table without a shard key together " + "with non-colocated distributed tables", + NULL, NULL); + } } - /* - * We expect partition column values come from colocated tables. Note that we - * skip this check from the reference table case given that all reference tables - * are already (and by default) co-located. - */ - if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId)) + if (HasDistributionKey(targetRelationId)) { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "INSERT target table and the source relation of the SELECT partition " - "column value must be colocated in distributed INSERT ... SELECT", - NULL, NULL); + /* ensure that INSERT's partition column comes from SELECT's partition column */ + error = InsertPartitionColumnMatchesSelect(queryTree, insertRte, subqueryRte, + &selectPartitionColumnTableId); + if (error) + { + return error; + } + + /* + * We expect partition column values come from colocated tables. Note that we + * skip this check from the reference table case given that all reference tables + * are already (and by default) co-located. + */ + if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "INSERT target table and the source relation of the SELECT partition " + "column value must be colocated in distributed INSERT ... SELECT", + NULL, NULL); + } } } @@ -1626,6 +1561,19 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou bool repartitioned = IsRedistributablePlan(selectPlan->planTree) && IsSupportedRedistributionTarget(targetRelationId); + /* + * Today it's not possible to generate a distributed plan for a SELECT + * having more than one tasks if it references a null-shard-key table. + * This is because, we don't support queries beyond router planner + * if the query references a null-shard-key table. + * + * For this reason, right now we don't expect an INSERT .. SELECT + * query to go through the repartitioned INSERT .. SELECT logic if the + * SELECT query references a null-shard-key table. + */ + Assert(!repartitioned || + !GetRTEListPropertiesForQuery(selectQueryCopy)->hasDistTableWithoutShardKey); + distributedPlan->insertSelectQuery = insertSelectQuery; distributedPlan->selectPlanForInsertSelect = selectPlan; distributedPlan->insertSelectMethod = repartitioned ? diff --git a/src/test/regress/expected/create_null_dist_key.out b/src/test/regress/expected/create_null_dist_key.out index 43120a454..af6e66f62 100644 --- a/src/test/regress/expected/create_null_dist_key.out +++ b/src/test/regress/expected/create_null_dist_key.out @@ -1803,7 +1803,6 @@ ALTER TABLE trigger_table_3 ENABLE TRIGGER ALL; -- try a few simple queries at least to make sure that we don't crash BEGIN; INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c2_t1; -ERROR: cannot select from a non-colocated distributed table when inserting into a distributed table that does not have a shard key ROLLBACK; DROP TRIGGER IF EXISTS trigger_1 ON trigger_table_1; DROP TRIGGER trigger_2 ON trigger_table_2 CASCADE; diff --git a/src/test/regress/expected/insert_select_null_dist_key.out b/src/test/regress/expected/insert_select_null_dist_key.out new file mode 100644 index 000000000..b5391063c --- /dev/null +++ b/src/test/regress/expected/insert_select_null_dist_key.out @@ -0,0 +1,814 @@ +CREATE SCHEMA insert_select_null_dist_key; +SET search_path TO insert_select_null_dist_key; +SET citus.next_shard_id TO 1820000; +SET citus.shard_count TO 32; +SET client_min_messages TO WARNING; +SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SET client_min_messages TO NOTICE; +CREATE TABLE nullkey_c1_t1(a int, b int); +CREATE TABLE nullkey_c1_t2(a int, b int); +SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE nullkey_c2_t1(a int, b int); +SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE reference_table(a int, b int); +SELECT create_reference_table('reference_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE distributed_table_c1_t1(a int, b int); +SELECT create_distributed_table('distributed_table_c1_t1', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE distributed_table_c1_t2(a int, b int); +SELECT create_distributed_table('distributed_table_c1_t2', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE distributed_table_c2_t1(a int, b int); +SELECT create_distributed_table('distributed_table_c2_t1', 'a', colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE citus_local_table(a int, b int); +SELECT citus_add_local_table_to_metadata('citus_local_table'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE postgres_local_table(a int, b int); +CREATE FUNCTION reload_tables() RETURNS void AS $$ + BEGIN + SET LOCAL client_min_messages TO WARNING; + + TRUNCATE nullkey_c1_t1, nullkey_c1_t2, nullkey_c2_t1, reference_table, distributed_table_c1_t1, + distributed_table_c1_t2, distributed_table_c2_t1, citus_local_table, postgres_local_table; + + INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i; + INSERT INTO nullkey_c1_t2 SELECT i, i FROM generate_series(2, 7) i; + INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i; + INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; + INSERT INTO distributed_table_c1_t1 SELECT i, i FROM generate_series(3, 8) i; + INSERT INTO distributed_table_c1_t2 SELECT i, i FROM generate_series(2, 9) i; + INSERT INTO distributed_table_c2_t1 SELECT i, i FROM generate_series(5, 10) i; + INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; + INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; + END; +$$ LANGUAGE plpgsql; +SELECT reload_tables(); + reload_tables +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE append_table (a int, b int); +SELECT create_distributed_table('append_table', 'a', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT master_create_empty_shard('append_table') AS shardid1 \gset +SELECT master_create_empty_shard('append_table') AS shardid2 \gset +SELECT master_create_empty_shard('append_table') AS shardid3 \gset +COPY append_table (a, b) FROM STDIN WITH (format 'csv', append_to_shard :shardid1); +COPY append_table (a, b) FROM STDIN WITH (format 'csv', append_to_shard :shardid2); +CREATE TABLE range_table(a int, b int); +SELECT create_distributed_table('range_table', 'a', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('range_table', '{"0","25"}','{"24","49"}'); +INSERT INTO range_table VALUES (0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 50); +CREATE MATERIALIZED VIEW matview AS SELECT b*2+a AS a, a*a AS b FROM nullkey_c1_t1; +SET client_min_messages TO DEBUG2; +-- Test inserting into a distributed table by selecting from a combination of +-- different table types together with null-shard-key tables. +-- use a null-shard-key table +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1; +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- use a reference table +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a); +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 RIGHT JOIN reference_table USING (b) WHERE reference_table.a >= 1 AND reference_table.a <= 5; +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b); +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 INTERSECT SELECT * FROM reference_table; +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- use a colocated null-shard-key table +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b); +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a); +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN matview USING (a); +DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c1_t2; +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- use a non-colocated null-shard-key table +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a); +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c2_t1; +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +-- use a distributed table that is colocated with the target table +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b); +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1; +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 1 +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- use a distributed table that is not colocated with the target table +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN distributed_table_c2_t1 USING (a); +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +-- use a citus local table +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a); +DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +-- use a postgres local table +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a); +DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +-- use append / range distributed tables +INSERT INTO range_table SELECT * FROM nullkey_c1_t1; +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO append_table SELECT * FROM nullkey_c1_t1; +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: INSERT ... SELECT into an append-distributed table is not supported +SELECT avg(a), avg(b) FROM distributed_table_c1_t1 ORDER BY 1, 2; +DEBUG: Router planner cannot handle multi-shard select queries + avg | avg +--------------------------------------------------------------------- + 4.2105263157894737 | 4.2105263157894737 +(1 row) + +TRUNCATE distributed_table_c1_t1; +INSERT INTO distributed_table_c1_t1 SELECT i, i FROM generate_series(3, 8) i; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- Test inserting into a reference table by selecting from a combination of +-- different table types together with null-shard-key tables. +-- use a null-shard-key table +INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1; +DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- use a reference table +INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a); +DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b); +DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 UNION SELECT * FROM reference_table; +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b) WHERE b IN (SELECT b FROM matview); +DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +-- use a colocated null-shard-key table +INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b); +DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a); +DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- use a non-colocated null-shard-key table +INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a); +DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +-- use a distributed table +INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); +DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); +DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b); +DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1; +DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 1 +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- use a citus local table +INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a); +DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +-- use a postgres local table +INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a); +DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT avg(a), avg(b) FROM reference_table ORDER BY 1, 2; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + avg | avg +--------------------------------------------------------------------- + 4.0428571428571429 | 4.0428571428571429 +(1 row) + +TRUNCATE reference_table; +INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- Test inserting into a citus local table by selecting from a combination of +-- different table types together with null-shard-key tables. +-- use a null-shard-key table +INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1; +DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- use a reference table +INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a); +DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- use a colocated null-shard-key table +INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b); +DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- use a distributed table +INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); +DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +-- use a citus local table +INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a); +DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +-- use a postgres local table +INSERT INTO citus_local_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a); +DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT avg(a), avg(b) FROM citus_local_table ORDER BY 1, 2; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + avg | avg +--------------------------------------------------------------------- + 4.4333333333333333 | 4.4333333333333333 +(1 row) + +TRUNCATE citus_local_table; +INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- Test inserting into a null-shard-key table by selecting from a combination of +-- different table types, together with or without null-shard-key tables. +-- use a postgres local table +INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table JOIN reference_table USING (a); +DEBUG: distributed INSERT ... SELECT cannot select from a local table +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table LEFT JOIN nullkey_c1_t1 USING (a); +DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +-- use a citus local table +INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table; +DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table JOIN reference_table USING (a) JOIN postgres_local_table USING (a) ORDER BY 1,2 OFFSET 7; +DEBUG: distributed INSERT ... SELECT cannot select from a local table +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table JOIN nullkey_c1_t1 USING (a); +DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +-- use a distributed table +INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2; +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN reference_table USING (a); +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN nullkey_c1_t1 USING (a); +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +-- use a non-colocated null-shard-key table +INSERT INTO nullkey_c2_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a)) q JOIN nullkey_c1_t2 USING (a); +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- use a materialized view +INSERT INTO nullkey_c1_t1 SELECT * FROM matview; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO nullkey_c1_t1 SELECT reference_table.a, reference_table.b FROM reference_table JOIN matview ON (reference_table.a = matview.a); +DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO nullkey_c1_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table JOIN nullkey_c1_t1 USING (a)) q JOIN matview USING (a); +DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +-- use append / range distributed tables +INSERT INTO nullkey_c1_t1 SELECT * FROM range_table; +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO nullkey_c1_t1 SELECT * FROM append_table; +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Router planner does not support append-partitioned tables. +DEBUG: Collecting INSERT ... SELECT results on coordinator +SELECT avg(a), avg(b) FROM nullkey_c1_t1 ORDER BY 1, 2; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + avg | avg +--------------------------------------------------------------------- + 5.8611111111111111 | 13.9305555555555556 +(1 row) + +SELECT avg(a), avg(b) FROM nullkey_c2_t1 ORDER BY 1, 2; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + avg | avg +--------------------------------------------------------------------- + 3.8750000000000000 | 3.8750000000000000 +(1 row) + +TRUNCATE nullkey_c1_t1, nullkey_c2_t1; +INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- Test inserting into a local table by selecting from a combination of +-- different table types, together with or without null-shard-key tables. +INSERT INTO postgres_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a); +DEBUG: Creating router plan +INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1 ORDER BY 1,2 OFFSET 3 LIMIT 2; +DEBUG: Creating router plan +WITH cte_1 AS ( + DELETE FROM nullkey_c1_t1 WHERE a >= 1 and a <= 4 RETURNING * +) +INSERT INTO postgres_local_table SELECT cte_1.* FROM cte_1 LEFT JOIN nullkey_c1_t2 USING (a) WHERE nullkey_c1_t2.a IS NULL; +DEBUG: Creating router plan +INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1 EXCEPT SELECT * FROM postgres_local_table; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT avg(a), avg(b) FROM postgres_local_table ORDER BY 1, 2; + avg | avg +--------------------------------------------------------------------- + 5.0000000000000000 | 5.0000000000000000 +(1 row) + +TRUNCATE postgres_local_table; +INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; +-- Try slightly more complex queries. +WITH cte_1 AS ( + SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a) +), +cte_2 AS ( + SELECT reference_table.a, postgres_local_table.b FROM postgres_local_table LEFT JOIN reference_table USING (b) +) +INSERT INTO distributed_table_c1_t1 +SELECT cte_1.* FROM cte_1 JOIN cte_2 USING (a) JOIN distributed_table_c1_t2 USING (a) ORDER BY 1,2; +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match +DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: CTE cte_1 is going to be inlined via distributed planning +DEBUG: CTE cte_2 is going to be inlined via distributed planning +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +WITH cte_1 AS ( + SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a) +), +cte_2 AS ( + SELECT * FROM nullkey_c1_t2 WHERE EXISTS ( + SELECT 1 FROM reference_table WHERE reference_table.a = nullkey_c1_t2.a + ) + ORDER BY 1,2 OFFSET 1 LIMIT 4 +) +INSERT INTO distributed_table_c1_t1 +SELECT * FROM cte_1 UNION SELECT * FROM cte_2 EXCEPT SELECT * FROM reference_table; +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: CTE cte_1 is going to be inlined via distributed planning +DEBUG: CTE cte_2 is going to be inlined via distributed planning +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO distributed_table_c1_t1 (a, b) +SELECT t1.a, t2.b +FROM nullkey_c1_t1 t1 +JOIN ( + SELECT b FROM nullkey_c1_t2 ORDER BY b DESC LIMIT 1 +) t2 +ON t1.b < t2.b; +DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO distributed_table_c1_t1 (a, b) +WITH cte AS ( + SELECT a, b, + (SELECT a FROM nullkey_c1_t2 WHERE b = t.b) AS d1, + (SELECT a FROM reference_table WHERE b = t.b) AS d2 + FROM nullkey_c1_t1 t +) +SELECT d1, COALESCE(d2, a) FROM cte WHERE d1 IS NOT NULL AND d2 IS NOT NULL; +DEBUG: CTE cte is going to be inlined via distributed planning +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO citus_local_table (a, b) +SELECT t1.a, t2.b +FROM nullkey_c1_t1 t1 +CROSS JOIN ( + SELECT b FROM nullkey_c2_t1 ORDER BY b LIMIT 1 +) t2; +DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +INSERT INTO distributed_table_c1_t1 (a, b) +SELECT t1.a, t2.b +FROM reference_table t1 +LEFT JOIN ( + SELECT b, ROW_NUMBER() OVER (ORDER BY b DESC) AS rn + FROM nullkey_c1_t1 +) t2 ON t1.b = t2.b +WHERE t2.rn > 0; +DEBUG: Window functions without PARTITION BY on distribution column is currently unsupported +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO nullkey_c1_t1 (a, b) +SELECT t1.a, t2.b +FROM nullkey_c1_t1 t1 +JOIN ( + SELECT rn, b + FROM ( + SELECT b, ROW_NUMBER() OVER (ORDER BY b DESC) AS rn + FROM distributed_table_c2_t1 + ) q +) t2 ON t1.b = t2.b +WHERE t2.rn > 2; +DEBUG: Window functions without PARTITION BY on distribution column is currently unsupported +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +INSERT INTO distributed_table_c1_t1 (a, b) +SELECT t1.a, t2.b +FROM nullkey_c1_t1 t1 +JOIN ( + SELECT sum_val, b + FROM ( + SELECT b, SUM(a) OVER (PARTITION BY b) AS sum_val + FROM nullkey_c1_t1 + ) q +) t2 ON t1.b = t2.b +WHERE t2.sum_val > 2; +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- MultiTaskRouterSelectQuerySupported() is unnecessarily restrictive +-- about pushing down queries with DISTINCT ON clause even if the table +-- doesn't have a shard key. See https://github.com/citusdata/citus/pull/6752. +INSERT INTO nullkey_c1_t1 SELECT DISTINCT ON (a) a, b FROM nullkey_c1_t2; +DEBUG: DISTINCT ON (non-partition column) clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- Similarly, we could push down the following query as well. see +-- https://github.com/citusdata/citus/pull/6831. +INSERT INTO nullkey_c1_t1 SELECT b, SUM(a) OVER (ORDER BY b) AS sum_val FROM nullkey_c1_t1; +DEBUG: Window functions without PARTITION BY on distribution column is currently unsupported +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO nullkey_c2_t1 +SELECT t2.a, t2.b +FROM nullkey_c1_t1 AS t2 +JOIN reference_table AS t3 ON (t2.a = t3.a) +WHERE NOT EXISTS ( + SELECT 1 FROM nullkey_c1_t2 AS t1 WHERE t1.b = t3.b +); +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO distributed_table_c1_t1 +SELECT t1.a, t1.b +FROM nullkey_c1_t1 AS t1 +WHERE t1.a NOT IN ( + SELECT DISTINCT t2.a FROM distributed_table_c1_t2 AS t2 +); +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +INSERT INTO distributed_table_c1_t1 +SELECT t1.a, t1.b +FROM reference_table AS t1 +JOIN ( + SELECT t2.a FROM ( + SELECT a FROM nullkey_c1_t1 + UNION + SELECT a FROM nullkey_c1_t2 + ) AS t2 +) AS t3 ON t1.a = t3.a; +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- Temporaryly reduce the verbosity to avoid noise +-- in the output of the next query. +SET client_min_messages TO DEBUG1; +INSERT INTO nullkey_c1_t1 +SELECT t1.a, t1.b +FROM reference_table AS t1 +WHERE t1.a IN ( + SELECT t2.a FROM ( + SELECT t3.a FROM ( + SELECT a FROM distributed_table_c1_t1 WHERE b > 4 + ) AS t3 + JOIN ( + SELECT a FROM distributed_table_c1_t2 WHERE b < 7 + ) AS t4 ON t3.a = t4.a + ) AS t2 +); +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: generating subplan XXX_1 for subquery SELECT a FROM (SELECT t3.a FROM ((SELECT distributed_table_c1_t1.a FROM insert_select_null_dist_key.distributed_table_c1_t1 WHERE (distributed_table_c1_t1.b OPERATOR(pg_catalog.>) 4)) t3 JOIN (SELECT distributed_table_c1_t2.a FROM insert_select_null_dist_key.distributed_table_c1_t2 WHERE (distributed_table_c1_t2.b OPERATOR(pg_catalog.<) 7)) t4 ON ((t3.a OPERATOR(pg_catalog.=) t4.a)))) t2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM insert_select_null_dist_key.reference_table t1 WHERE (a OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer))) +DEBUG: Collecting INSERT ... SELECT results on coordinator +SET client_min_messages TO DEBUG2; +-- test upsert with plain INSERT query +CREATE TABLE upsert_test_1 +( + unique_col int UNIQUE, + other_col int, + third_col int +); +DEBUG: CREATE TABLE / UNIQUE will create implicit index "upsert_test_1_unique_col_key" for table "upsert_test_1" +SELECT create_distributed_table('upsert_test_1', null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE upsert_test_2(key int primary key, value text); +DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "upsert_test_2_pkey" for table "upsert_test_2" +SELECT create_distributed_table('upsert_test_2', null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO upsert_test_2 AS upsert_test_2_alias (key, value) VALUES (1, '5') ON CONFLICT(key) + DO UPDATE SET value = (upsert_test_2_alias.value::int * 2)::text; +DEBUG: Creating router plan +INSERT INTO upsert_test_2 (key, value) VALUES (1, '5') ON CONFLICT(key) + DO UPDATE SET value = (upsert_test_2.value::int * 3)::text; +DEBUG: Creating router plan +INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) + DO UPDATE SET other_col = (SELECT count(*) from upsert_test_1); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: subqueries are not supported within INSERT queries +HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. +INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) + DO UPDATE SET other_col = random()::int; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: functions used in the DO UPDATE SET clause of INSERTs on distributed tables must be marked IMMUTABLE +INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) + DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE +INSERT INTO upsert_test_1 VALUES (3, 5, 7); +DEBUG: Creating router plan +INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) WHERE unique_col = random()::int + DO UPDATE SET other_col = 5; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE +CREATE TABLE upsert_test_3 (key_1 int, key_2 bigserial, value text DEFAULT 'default_value', PRIMARY KEY (key_1, key_2)); +DEBUG: CREATE TABLE will create implicit sequence "upsert_test_3_key_2_seq" for serial column "upsert_test_3.key_2" +DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "upsert_test_3_pkey" for table "upsert_test_3" +SELECT create_distributed_table('upsert_test_3', null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO upsert_test_3 VALUES (1, DEFAULT, '1') RETURNING *; +DEBUG: Creating router plan + key_1 | key_2 | value +--------------------------------------------------------------------- + 1 | 1 | 1 +(1 row) + +INSERT INTO upsert_test_3 VALUES (5, DEFAULT, DEFAULT) RETURNING *; +DEBUG: Creating router plan + key_1 | key_2 | value +--------------------------------------------------------------------- + 5 | 2 | default_value +(1 row) + +SET client_min_messages TO DEBUG1; +INSERT INTO upsert_test_3 SELECT 7, other_col, 'harcoded_text_value' FROM upsert_test_1 RETURNING *; + key_1 | key_2 | value +--------------------------------------------------------------------- + 7 | 5 | harcoded_text_value +(1 row) + +SET client_min_messages TO DEBUG2; +-- test upsert with INSERT .. SELECT queries +SET client_min_messages TO DEBUG1; +INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col) + DO UPDATE SET other_col = upsert_test_1.other_col + 1; +-- Fails due to https://github.com/citusdata/citus/issues/6826. +INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col) + DO UPDATE SET other_col = (SELECT count(*) from upsert_test_1); +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. +CONTEXT: while executing command on localhost:xxxxx +SET client_min_messages TO DEBUG2; +INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col) + DO UPDATE SET other_col = random()::int; +ERROR: functions used in the DO UPDATE SET clause of INSERTs on distributed tables must be marked IMMUTABLE +INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col) + DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int; +ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE +SELECT reload_tables(); +DEBUG: function does not have co-located tables + reload_tables +--------------------------------------------------------------------- + +(1 row) + +ALTER TABLE nullkey_c1_t1 ADD PRIMARY KEY (a); +DEBUG: ALTER TABLE / ADD PRIMARY KEY will create implicit index "nullkey_c1_t1_pkey" for table "nullkey_c1_t1" +DEBUG: verifying table "nullkey_c1_t1" +ALTER TABLE distributed_table_c1_t1 ADD PRIMARY KEY (a,b); +DEBUG: ALTER TABLE / ADD PRIMARY KEY will create implicit index "distributed_table_c1_t1_pkey" for table "distributed_table_c1_t1" +DEBUG: verifying table "distributed_table_c1_t1" +INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM nullkey_c1_t2 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a) + DO UPDATE SET a = t1.a + 10; +DEBUG: distributed statement: INSERT INTO insert_select_null_dist_key.nullkey_c1_t1_1820000 AS t1 (a, b) SELECT t3.a, t3.b FROM (insert_select_null_dist_key.nullkey_c1_t2_1820001 t2 JOIN insert_select_null_dist_key.reference_table_1820003 t3 ON ((t2.a OPERATOR(pg_catalog.=) t3.a))) ON CONFLICT(a) DO UPDATE SET a = (t1.a OPERATOR(pg_catalog.+) 10) +SET client_min_messages TO DEBUG1; +INSERT INTO distributed_table_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM nullkey_c1_t2 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a, b) + DO UPDATE SET b = t1.b + 10; +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM distributed_table_c1_t1 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a) + DO UPDATE SET a = t1.a + 10; +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- This also fails due to https://github.com/citusdata/citus/issues/6826. +INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM distributed_table_c1_t1 t2 JOIN reference_table t3 ON (t2.a = t3.a) WHERE t2.a = 3 ON CONFLICT (a) + DO UPDATE SET a = (SELECT max(b)+1 FROM distributed_table_c1_t1 WHERE a = 3); +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. +CONTEXT: while executing command on localhost:xxxxx +SET client_min_messages TO DEBUG2; +SELECT avg(a), avg(b) FROM distributed_table_c1_t1; +DEBUG: Router planner cannot handle multi-shard select queries + avg | avg +--------------------------------------------------------------------- + 5.0000000000000000 | 9.2857142857142857 +(1 row) + +SELECT avg(a), avg(b) FROM nullkey_c1_t1; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + avg | avg +--------------------------------------------------------------------- + 7.5000000000000000 | 4.1666666666666667 +(1 row) + +SELECT avg(a), avg(b) FROM nullkey_c1_t2; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + avg | avg +--------------------------------------------------------------------- + 4.5000000000000000 | 4.5000000000000000 +(1 row) + +SELECT * FROM upsert_test_1 ORDER BY unique_col; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + unique_col | other_col | third_col +--------------------------------------------------------------------- + 3 | 6 | 7 +(1 row) + +SELECT * FROM upsert_test_2 ORDER BY key; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + key | value +--------------------------------------------------------------------- + 1 | 15 +(1 row) + +SELECT * FROM upsert_test_3 ORDER BY key_1, key_2; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + key_1 | key_2 | value +--------------------------------------------------------------------- + 1 | 1 | 1 + 5 | 2 | default_value + 7 | 5 | harcoded_text_value +(3 rows) + +SET client_min_messages TO WARNING; +DROP SCHEMA insert_select_null_dist_key CASCADE; +SELECT citus_remove_node('localhost', :master_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/expected/query_null_dist_key.out b/src/test/regress/expected/query_null_dist_key.out index 09413a3ea..09907a99b 100644 --- a/src/test/regress/expected/query_null_dist_key.out +++ b/src/test/regress/expected/query_null_dist_key.out @@ -827,24 +827,55 @@ INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c1_t2; SET client_min_messages TO DEBUG2; -- between two non-colocated null dist key tables INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c2_t1; -ERROR: cannot select from a non-colocated distributed table when inserting into a distributed table that does not have a shard key +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator -- between a null dist key table and a table of different type +SET client_min_messages TO WARNING; +EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE, VERBOSE FALSE) INSERT INTO nullkey_c1_t1 SELECT * FROM reference_table; -ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Insert on nullkey_c1_t1_1620000 citus_table_alias (actual rows=0 loops=1) + -> Seq Scan on reference_table_1620005 reference_table (actual rows=6 loops=1) +(7 rows) + +SET client_min_messages TO DEBUG2; INSERT INTO nullkey_c1_t1 SELECT * FROM distributed_table; -ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO nullkey_c1_t1 SELECT * FROM citus_local_table; -ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key +DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO nullkey_c1_t1 SELECT * FROM postgres_local_table; -ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO reference_table SELECT * FROM nullkey_c1_t1; -ERROR: cannot select from a distributed table that does not have a shard key when inserting into a different table type +DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO distributed_table SELECT * FROM nullkey_c1_t1; -ERROR: cannot select from a distributed table that does not have a shard key when inserting into a different table type +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO citus_local_table SELECT * FROM nullkey_c1_t1; -ERROR: cannot select from a distributed table that does not have a shard key when inserting into a different table type +DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1; -ERROR: cannot select from a distributed table that does not have a shard key when inserting into a local table +DEBUG: Creating router plan -- test subquery SELECT count(*) FROM ( @@ -873,7 +904,7 @@ WITH level_0 AS ( WITH RECURSIVE level_2_recursive(x) AS ( VALUES (1) UNION ALL - SELECT a + 1 FROM nullkey_c1_t1 JOIN level_2_recursive ON (a = x) WHERE a < 100 + SELECT a + 1 FROM nullkey_c1_t1 JOIN level_2_recursive ON (a = x) WHERE a < 2 ) SELECT * FROM level_2_recursive RIGHT JOIN reference_table ON (level_2_recursive.x = reference_table.a) ) @@ -885,7 +916,7 @@ DEBUG: CTE level_1 is going to be inlined via distributed planning DEBUG: Creating router plan count --------------------------------------------------------------------- - 122 + 106 (1 row) WITH level_0 AS ( @@ -1095,6 +1126,9 @@ INSERT INTO bigserial_test (x, y) SELECT x, y FROM bigserial_test; DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DETAIL: Sequences cannot be used in router queries +INSERT INTO bigserial_test (x, y) SELECT a, a FROM reference_table; +DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO agg_events (user_id) SELECT f2.id FROM @@ -1119,7 +1153,6 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, ON (f.id = f2.id) WHERE f.id IN (SELECT user_id FROM raw_events_second); -ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key -- upsert with returning INSERT INTO agg_events AS ae ( @@ -1153,6 +1186,17 @@ SELECT FROM raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id WHERE raw_events_second.user_id = 10 OR raw_events_second.user_id = 11; +INSERT INTO agg_events (user_id) +SELECT + users_ref_table.user_id +FROM + users_ref_table LEFT JOIN raw_events_second ON users_ref_table.user_id = raw_events_second.user_id + WHERE raw_events_second.user_id = 10 OR raw_events_second.user_id = 11; +INSERT INTO agg_events (user_id) +SELECT COALESCE(raw_events_first.user_id, users_ref_table.user_id) +FROM raw_events_first + RIGHT JOIN (users_ref_table LEFT JOIN raw_events_second ON users_ref_table.user_id = raw_events_second.user_id) + ON raw_events_first.user_id = users_ref_table.user_id; -- using a full join INSERT INTO agg_events (user_id, value_1_agg) SELECT t1.user_id AS col1, @@ -1176,11 +1220,22 @@ FROM raw_events_first WHERE NOT EXISTS (SELECT 1 FROM raw_events_second WHERE raw_events_second.user_id =raw_events_first.user_id); +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM users_ref_table +WHERE NOT EXISTS (SELECT 1 + FROM raw_events_second + WHERE raw_events_second.user_id = users_ref_table.user_id); -- using inner join INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4); +INSERT INTO agg_events (user_id) +SELECT raw_events_first.user_id +FROM raw_events_first INNER JOIN users_ref_table ON raw_events_first.user_id = users_ref_table.user_id +WHERE raw_events_first.value_1 IN (10, 11,12) OR users_ref_table.user_id IN (1,2,3,4); -- We could relax distributed insert .. select checks to allow pushing -- down more clauses down to the worker nodes when inserting into a single -- shard by selecting from a colocated one. We might want to do something @@ -1197,6 +1252,9 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first OFFSET 1; DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO agg_events (user_id) SELECT users_ref_table.user_id FROM users_ref_table LIMIT 1; +DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator -- using a materialized cte WITH cte AS MATERIALIZED (SELECT max(value_1)+1 as v1_agg, user_id FROM raw_events_first GROUP BY user_id) @@ -1207,6 +1265,9 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO raw_events_second WITH cte AS MATERIALIZED (SELECT * FROM raw_events_first) SELECT user_id * 1000, time, value_1, value_2, value_3, value_4 FROM cte; +INSERT INTO raw_events_second (user_id) + WITH cte AS MATERIALIZED (SELECT * FROM users_ref_table) + SELECT user_id FROM cte; -- using a regular cte WITH cte AS (SELECT * FROM raw_events_first) INSERT INTO raw_events_second @@ -1229,7 +1290,7 @@ DEBUG: Subqueries without relations are not allowed in distributed INSERT ... S DEBUG: Collecting INSERT ... SELECT results on coordinator -- we still support complex joins via INSERT's cte list .. WITH cte AS ( - SELECT reference_table.a AS a, 1 AS b + SELECT DISTINCT(reference_table.a) AS a, 1 AS b FROM distributed_table RIGHT JOIN reference_table USING (a) ) INSERT INTO raw_events_second (user_id, value_1) @@ -1240,17 +1301,23 @@ DEBUG: recursively planning left side of the right join since the outer side is DEBUG: recursively planning distributed relation "distributed_table" since it is part of a distributed join node that is outer joined with a recurring rel DEBUG: Wrapping relation "distributed_table" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT a FROM query_null_dist_key.distributed_table WHERE true -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT "?column?" AS user_id, b AS value_1 FROM (SELECT ((cte.a OPERATOR(pg_catalog.+) 5) OPERATOR(pg_catalog.*) '-1'::integer), cte.b FROM (SELECT reference_table.a, 1 AS b FROM ((SELECT distributed_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) distributed_table_1) distributed_table RIGHT JOIN query_null_dist_key.reference_table USING (a))) cte) citus_insert_select_subquery("?column?", b) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT "?column?" AS user_id, b AS value_1 FROM (SELECT ((cte.a OPERATOR(pg_catalog.+) 5) OPERATOR(pg_catalog.*) '-1'::integer), cte.b FROM (SELECT DISTINCT reference_table.a, 1 AS b FROM ((SELECT distributed_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) distributed_table_1) distributed_table RIGHT JOIN query_null_dist_key.reference_table USING (a))) cte) citus_insert_select_subquery("?column?", b) DEBUG: Collecting INSERT ... SELECT results on coordinator --- .. but can't do so via via SELECT's cte list +-- .. and via SELECT's cte list too INSERT INTO raw_events_second (user_id, value_1) WITH cte AS ( - SELECT reference_table.a AS a, 1 AS b + SELECT DISTINCT(reference_table.a) AS a, 1 AS b FROM distributed_table RIGHT JOIN reference_table USING (a) ) - SELECT (a+5)*-1, b FROM cte; + SELECT (a+5)*2, b FROM cte; DEBUG: CTE cte is going to be inlined via distributed planning -ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key +DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables +DEBUG: recursively planning left side of the right join since the outer side is a recurring rel +DEBUG: recursively planning distributed relation "distributed_table" since it is part of a distributed join node that is outer joined with a recurring rel +DEBUG: Wrapping relation "distributed_table" to a subquery +DEBUG: generating subplan XXX_1 for subquery SELECT a FROM query_null_dist_key.distributed_table WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((a OPERATOR(pg_catalog.+) 5) OPERATOR(pg_catalog.*) 2) AS user_id, b AS value_1 FROM (SELECT DISTINCT reference_table.a, 1 AS b FROM ((SELECT distributed_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) distributed_table_1) distributed_table RIGHT JOIN query_null_dist_key.reference_table USING (a))) cte +DEBUG: Collecting INSERT ... SELECT results on coordinator -- using set operations INSERT INTO raw_events_first(user_id) @@ -1258,6 +1325,12 @@ INSERT INTO (SELECT user_id FROM raw_events_first); DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO + raw_events_first(user_id) + (SELECT user_id FROM users_ref_table) INTERSECT + (SELECT user_id FROM raw_events_first); +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator -- group by clause inside subquery INSERT INTO agg_events (user_id) @@ -1313,6 +1386,10 @@ SELECT SUM(value_3), Avg(value_2) FROM raw_events_first GROUP BY user_id; +INSERT INTO agg_events (value_3_agg, value_1_agg) +SELECT AVG(user_id), SUM(user_id) +FROM users_ref_table +GROUP BY user_id; -- using generate_series INSERT INTO raw_events_first (user_id, value_1, value_2) SELECT s, s, s FROM generate_series(1, 5) s; diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index 45adf469e..9163a5864 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -201,6 +201,7 @@ test: local_dist_join_mixed test: citus_local_dist_joins test: recurring_outer_join test: query_null_dist_key +test: insert_select_null_dist_key test: pg_dump # --------- diff --git a/src/test/regress/sql/insert_select_null_dist_key.sql b/src/test/regress/sql/insert_select_null_dist_key.sql new file mode 100644 index 000000000..29454b0c1 --- /dev/null +++ b/src/test/regress/sql/insert_select_null_dist_key.sql @@ -0,0 +1,470 @@ +CREATE SCHEMA insert_select_null_dist_key; +SET search_path TO insert_select_null_dist_key; + +SET citus.next_shard_id TO 1820000; +SET citus.shard_count TO 32; + +SET client_min_messages TO WARNING; +SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0); + +SET client_min_messages TO NOTICE; + +CREATE TABLE nullkey_c1_t1(a int, b int); +CREATE TABLE nullkey_c1_t2(a int, b int); +SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none'); +SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1'); + +CREATE TABLE nullkey_c2_t1(a int, b int); +SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none'); + +CREATE TABLE reference_table(a int, b int); +SELECT create_reference_table('reference_table'); + +CREATE TABLE distributed_table_c1_t1(a int, b int); +SELECT create_distributed_table('distributed_table_c1_t1', 'a'); + +CREATE TABLE distributed_table_c1_t2(a int, b int); +SELECT create_distributed_table('distributed_table_c1_t2', 'a'); + +CREATE TABLE distributed_table_c2_t1(a int, b int); +SELECT create_distributed_table('distributed_table_c2_t1', 'a', colocate_with=>'none'); + +CREATE TABLE citus_local_table(a int, b int); +SELECT citus_add_local_table_to_metadata('citus_local_table'); + +CREATE TABLE postgres_local_table(a int, b int); + +CREATE FUNCTION reload_tables() RETURNS void AS $$ + BEGIN + SET LOCAL client_min_messages TO WARNING; + + TRUNCATE nullkey_c1_t1, nullkey_c1_t2, nullkey_c2_t1, reference_table, distributed_table_c1_t1, + distributed_table_c1_t2, distributed_table_c2_t1, citus_local_table, postgres_local_table; + + INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i; + INSERT INTO nullkey_c1_t2 SELECT i, i FROM generate_series(2, 7) i; + INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i; + INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; + INSERT INTO distributed_table_c1_t1 SELECT i, i FROM generate_series(3, 8) i; + INSERT INTO distributed_table_c1_t2 SELECT i, i FROM generate_series(2, 9) i; + INSERT INTO distributed_table_c2_t1 SELECT i, i FROM generate_series(5, 10) i; + INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; + INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; + END; +$$ LANGUAGE plpgsql; + +SELECT reload_tables(); + +CREATE TABLE append_table (a int, b int); +SELECT create_distributed_table('append_table', 'a', 'append'); +SELECT master_create_empty_shard('append_table') AS shardid1 \gset +SELECT master_create_empty_shard('append_table') AS shardid2 \gset +SELECT master_create_empty_shard('append_table') AS shardid3 \gset + +COPY append_table (a, b) FROM STDIN WITH (format 'csv', append_to_shard :shardid1); +1, 40 +2, 42 +3, 44 +4, 46 +5, 48 +\. + +COPY append_table (a, b) FROM STDIN WITH (format 'csv', append_to_shard :shardid2); +6, 50 +7, 52 +8, 54 +9, 56 +10, 58 +\. + +CREATE TABLE range_table(a int, b int); +SELECT create_distributed_table('range_table', 'a', 'range'); +CALL public.create_range_partitioned_shards('range_table', '{"0","25"}','{"24","49"}'); +INSERT INTO range_table VALUES (0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 50); + +CREATE MATERIALIZED VIEW matview AS SELECT b*2+a AS a, a*a AS b FROM nullkey_c1_t1; + +SET client_min_messages TO DEBUG2; + +-- Test inserting into a distributed table by selecting from a combination of +-- different table types together with null-shard-key tables. + +-- use a null-shard-key table +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1; + +-- use a reference table +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a); +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 RIGHT JOIN reference_table USING (b) WHERE reference_table.a >= 1 AND reference_table.a <= 5; +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b); +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 INTERSECT SELECT * FROM reference_table; + +-- use a colocated null-shard-key table +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b); +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a); +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN matview USING (a); +INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c1_t2; + +-- use a non-colocated null-shard-key table +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a); +INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c2_t1; + +-- use a distributed table that is colocated with the target table +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); +INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); +INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b); +INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1; + +-- use a distributed table that is not colocated with the target table +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN distributed_table_c2_t1 USING (a); + +-- use a citus local table +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a); + +-- use a postgres local table +INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a); + +-- use append / range distributed tables +INSERT INTO range_table SELECT * FROM nullkey_c1_t1; +INSERT INTO append_table SELECT * FROM nullkey_c1_t1; + +SELECT avg(a), avg(b) FROM distributed_table_c1_t1 ORDER BY 1, 2; +TRUNCATE distributed_table_c1_t1; +INSERT INTO distributed_table_c1_t1 SELECT i, i FROM generate_series(3, 8) i; + +-- Test inserting into a reference table by selecting from a combination of +-- different table types together with null-shard-key tables. + +-- use a null-shard-key table +INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1; + +-- use a reference table +INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a); +INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b); +INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 UNION SELECT * FROM reference_table; +INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b) WHERE b IN (SELECT b FROM matview); + +-- use a colocated null-shard-key table +INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b); +INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a); + +-- use a non-colocated null-shard-key table +INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a); + +-- use a distributed table +INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); +INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); +INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b); +INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1; + +-- use a citus local table +INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a); + +-- use a postgres local table +INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a); + +SELECT avg(a), avg(b) FROM reference_table ORDER BY 1, 2; +TRUNCATE reference_table; +INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; + +-- Test inserting into a citus local table by selecting from a combination of +-- different table types together with null-shard-key tables. + +-- use a null-shard-key table +INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1; + +-- use a reference table +INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a); + +-- use a colocated null-shard-key table +INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b); + +-- use a distributed table +INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); + +-- use a citus local table +INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a); + +-- use a postgres local table +INSERT INTO citus_local_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a); + +SELECT avg(a), avg(b) FROM citus_local_table ORDER BY 1, 2; +TRUNCATE citus_local_table; +INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; + +-- Test inserting into a null-shard-key table by selecting from a combination of +-- different table types, together with or without null-shard-key tables. + +-- use a postgres local table +INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table; +INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table JOIN reference_table USING (a); +INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table LEFT JOIN nullkey_c1_t1 USING (a); + +-- use a citus local table +INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table; +INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table JOIN reference_table USING (a) JOIN postgres_local_table USING (a) ORDER BY 1,2 OFFSET 7; +INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table JOIN nullkey_c1_t1 USING (a); + +-- use a distributed table +INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2; +INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN reference_table USING (a); +INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN nullkey_c1_t1 USING (a); + +-- use a non-colocated null-shard-key table +INSERT INTO nullkey_c2_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a)) q JOIN nullkey_c1_t2 USING (a); + +-- use a materialized view +INSERT INTO nullkey_c1_t1 SELECT * FROM matview; +INSERT INTO nullkey_c1_t1 SELECT reference_table.a, reference_table.b FROM reference_table JOIN matview ON (reference_table.a = matview.a); +INSERT INTO nullkey_c1_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table JOIN nullkey_c1_t1 USING (a)) q JOIN matview USING (a); + +-- use append / range distributed tables +INSERT INTO nullkey_c1_t1 SELECT * FROM range_table; +INSERT INTO nullkey_c1_t1 SELECT * FROM append_table; + +SELECT avg(a), avg(b) FROM nullkey_c1_t1 ORDER BY 1, 2; +SELECT avg(a), avg(b) FROM nullkey_c2_t1 ORDER BY 1, 2; +TRUNCATE nullkey_c1_t1, nullkey_c2_t1; +INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i; +INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i; + +-- Test inserting into a local table by selecting from a combination of +-- different table types, together with or without null-shard-key tables. + +INSERT INTO postgres_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a); + +INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1 ORDER BY 1,2 OFFSET 3 LIMIT 2; + +WITH cte_1 AS ( + DELETE FROM nullkey_c1_t1 WHERE a >= 1 and a <= 4 RETURNING * +) +INSERT INTO postgres_local_table SELECT cte_1.* FROM cte_1 LEFT JOIN nullkey_c1_t2 USING (a) WHERE nullkey_c1_t2.a IS NULL; + +INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1 EXCEPT SELECT * FROM postgres_local_table; + +SELECT avg(a), avg(b) FROM postgres_local_table ORDER BY 1, 2; +TRUNCATE postgres_local_table; +INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; + +-- Try slightly more complex queries. + +WITH cte_1 AS ( + SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a) +), +cte_2 AS ( + SELECT reference_table.a, postgres_local_table.b FROM postgres_local_table LEFT JOIN reference_table USING (b) +) +INSERT INTO distributed_table_c1_t1 +SELECT cte_1.* FROM cte_1 JOIN cte_2 USING (a) JOIN distributed_table_c1_t2 USING (a) ORDER BY 1,2; + +WITH cte_1 AS ( + SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a) +), +cte_2 AS ( + SELECT * FROM nullkey_c1_t2 WHERE EXISTS ( + SELECT 1 FROM reference_table WHERE reference_table.a = nullkey_c1_t2.a + ) + ORDER BY 1,2 OFFSET 1 LIMIT 4 +) +INSERT INTO distributed_table_c1_t1 +SELECT * FROM cte_1 UNION SELECT * FROM cte_2 EXCEPT SELECT * FROM reference_table; + +INSERT INTO distributed_table_c1_t1 (a, b) +SELECT t1.a, t2.b +FROM nullkey_c1_t1 t1 +JOIN ( + SELECT b FROM nullkey_c1_t2 ORDER BY b DESC LIMIT 1 +) t2 +ON t1.b < t2.b; + +INSERT INTO distributed_table_c1_t1 (a, b) +WITH cte AS ( + SELECT a, b, + (SELECT a FROM nullkey_c1_t2 WHERE b = t.b) AS d1, + (SELECT a FROM reference_table WHERE b = t.b) AS d2 + FROM nullkey_c1_t1 t +) +SELECT d1, COALESCE(d2, a) FROM cte WHERE d1 IS NOT NULL AND d2 IS NOT NULL; + +INSERT INTO citus_local_table (a, b) +SELECT t1.a, t2.b +FROM nullkey_c1_t1 t1 +CROSS JOIN ( + SELECT b FROM nullkey_c2_t1 ORDER BY b LIMIT 1 +) t2; + +INSERT INTO distributed_table_c1_t1 (a, b) +SELECT t1.a, t2.b +FROM reference_table t1 +LEFT JOIN ( + SELECT b, ROW_NUMBER() OVER (ORDER BY b DESC) AS rn + FROM nullkey_c1_t1 +) t2 ON t1.b = t2.b +WHERE t2.rn > 0; + +INSERT INTO nullkey_c1_t1 (a, b) +SELECT t1.a, t2.b +FROM nullkey_c1_t1 t1 +JOIN ( + SELECT rn, b + FROM ( + SELECT b, ROW_NUMBER() OVER (ORDER BY b DESC) AS rn + FROM distributed_table_c2_t1 + ) q +) t2 ON t1.b = t2.b +WHERE t2.rn > 2; + +INSERT INTO distributed_table_c1_t1 (a, b) +SELECT t1.a, t2.b +FROM nullkey_c1_t1 t1 +JOIN ( + SELECT sum_val, b + FROM ( + SELECT b, SUM(a) OVER (PARTITION BY b) AS sum_val + FROM nullkey_c1_t1 + ) q +) t2 ON t1.b = t2.b +WHERE t2.sum_val > 2; + +-- MultiTaskRouterSelectQuerySupported() is unnecessarily restrictive +-- about pushing down queries with DISTINCT ON clause even if the table +-- doesn't have a shard key. See https://github.com/citusdata/citus/pull/6752. +INSERT INTO nullkey_c1_t1 SELECT DISTINCT ON (a) a, b FROM nullkey_c1_t2; + +-- Similarly, we could push down the following query as well. see +-- https://github.com/citusdata/citus/pull/6831. +INSERT INTO nullkey_c1_t1 SELECT b, SUM(a) OVER (ORDER BY b) AS sum_val FROM nullkey_c1_t1; + +INSERT INTO nullkey_c2_t1 +SELECT t2.a, t2.b +FROM nullkey_c1_t1 AS t2 +JOIN reference_table AS t3 ON (t2.a = t3.a) +WHERE NOT EXISTS ( + SELECT 1 FROM nullkey_c1_t2 AS t1 WHERE t1.b = t3.b +); + +INSERT INTO distributed_table_c1_t1 +SELECT t1.a, t1.b +FROM nullkey_c1_t1 AS t1 +WHERE t1.a NOT IN ( + SELECT DISTINCT t2.a FROM distributed_table_c1_t2 AS t2 +); + +INSERT INTO distributed_table_c1_t1 +SELECT t1.a, t1.b +FROM reference_table AS t1 +JOIN ( + SELECT t2.a FROM ( + SELECT a FROM nullkey_c1_t1 + UNION + SELECT a FROM nullkey_c1_t2 + ) AS t2 +) AS t3 ON t1.a = t3.a; + +-- Temporaryly reduce the verbosity to avoid noise +-- in the output of the next query. +SET client_min_messages TO DEBUG1; + +INSERT INTO nullkey_c1_t1 +SELECT t1.a, t1.b +FROM reference_table AS t1 +WHERE t1.a IN ( + SELECT t2.a FROM ( + SELECT t3.a FROM ( + SELECT a FROM distributed_table_c1_t1 WHERE b > 4 + ) AS t3 + JOIN ( + SELECT a FROM distributed_table_c1_t2 WHERE b < 7 + ) AS t4 ON t3.a = t4.a + ) AS t2 +); + +SET client_min_messages TO DEBUG2; + +-- test upsert with plain INSERT query + +CREATE TABLE upsert_test_1 +( + unique_col int UNIQUE, + other_col int, + third_col int +); +SELECT create_distributed_table('upsert_test_1', null); + +CREATE TABLE upsert_test_2(key int primary key, value text); +SELECT create_distributed_table('upsert_test_2', null); + +INSERT INTO upsert_test_2 AS upsert_test_2_alias (key, value) VALUES (1, '5') ON CONFLICT(key) + DO UPDATE SET value = (upsert_test_2_alias.value::int * 2)::text; + +INSERT INTO upsert_test_2 (key, value) VALUES (1, '5') ON CONFLICT(key) + DO UPDATE SET value = (upsert_test_2.value::int * 3)::text; + +INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) + DO UPDATE SET other_col = (SELECT count(*) from upsert_test_1); + +INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) + DO UPDATE SET other_col = random()::int; + +INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) + DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int; + +INSERT INTO upsert_test_1 VALUES (3, 5, 7); + +INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) WHERE unique_col = random()::int + DO UPDATE SET other_col = 5; + +CREATE TABLE upsert_test_3 (key_1 int, key_2 bigserial, value text DEFAULT 'default_value', PRIMARY KEY (key_1, key_2)); +SELECT create_distributed_table('upsert_test_3', null); + +INSERT INTO upsert_test_3 VALUES (1, DEFAULT, '1') RETURNING *; +INSERT INTO upsert_test_3 VALUES (5, DEFAULT, DEFAULT) RETURNING *; + +SET client_min_messages TO DEBUG1; +INSERT INTO upsert_test_3 SELECT 7, other_col, 'harcoded_text_value' FROM upsert_test_1 RETURNING *; +SET client_min_messages TO DEBUG2; + +-- test upsert with INSERT .. SELECT queries + +SET client_min_messages TO DEBUG1; +INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col) + DO UPDATE SET other_col = upsert_test_1.other_col + 1; +-- Fails due to https://github.com/citusdata/citus/issues/6826. +INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col) + DO UPDATE SET other_col = (SELECT count(*) from upsert_test_1); +SET client_min_messages TO DEBUG2; + +INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col) + DO UPDATE SET other_col = random()::int; + +INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col) + DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int; + +SELECT reload_tables(); + +ALTER TABLE nullkey_c1_t1 ADD PRIMARY KEY (a); +ALTER TABLE distributed_table_c1_t1 ADD PRIMARY KEY (a,b); + +INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM nullkey_c1_t2 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a) + DO UPDATE SET a = t1.a + 10; + +SET client_min_messages TO DEBUG1; +INSERT INTO distributed_table_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM nullkey_c1_t2 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a, b) + DO UPDATE SET b = t1.b + 10; +INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM distributed_table_c1_t1 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a) + DO UPDATE SET a = t1.a + 10; +-- This also fails due to https://github.com/citusdata/citus/issues/6826. +INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM distributed_table_c1_t1 t2 JOIN reference_table t3 ON (t2.a = t3.a) WHERE t2.a = 3 ON CONFLICT (a) + DO UPDATE SET a = (SELECT max(b)+1 FROM distributed_table_c1_t1 WHERE a = 3); +SET client_min_messages TO DEBUG2; + +SELECT avg(a), avg(b) FROM distributed_table_c1_t1; +SELECT avg(a), avg(b) FROM nullkey_c1_t1; +SELECT avg(a), avg(b) FROM nullkey_c1_t2; +SELECT * FROM upsert_test_1 ORDER BY unique_col; +SELECT * FROM upsert_test_2 ORDER BY key; +SELECT * FROM upsert_test_3 ORDER BY key_1, key_2; + +SET client_min_messages TO WARNING; +DROP SCHEMA insert_select_null_dist_key CASCADE; + +SELECT citus_remove_node('localhost', :master_port); diff --git a/src/test/regress/sql/query_null_dist_key.sql b/src/test/regress/sql/query_null_dist_key.sql index f5d1fe3fc..02eac5c80 100644 --- a/src/test/regress/sql/query_null_dist_key.sql +++ b/src/test/regress/sql/query_null_dist_key.sql @@ -514,7 +514,11 @@ SET client_min_messages TO DEBUG2; INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c2_t1; -- between a null dist key table and a table of different type +SET client_min_messages TO WARNING; +EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE, VERBOSE FALSE) INSERT INTO nullkey_c1_t1 SELECT * FROM reference_table; +SET client_min_messages TO DEBUG2; + INSERT INTO nullkey_c1_t1 SELECT * FROM distributed_table; INSERT INTO nullkey_c1_t1 SELECT * FROM citus_local_table; INSERT INTO nullkey_c1_t1 SELECT * FROM postgres_local_table; @@ -543,7 +547,7 @@ WITH level_0 AS ( WITH RECURSIVE level_2_recursive(x) AS ( VALUES (1) UNION ALL - SELECT a + 1 FROM nullkey_c1_t1 JOIN level_2_recursive ON (a = x) WHERE a < 100 + SELECT a + 1 FROM nullkey_c1_t1 JOIN level_2_recursive ON (a = x) WHERE a < 2 ) SELECT * FROM level_2_recursive RIGHT JOIN reference_table ON (level_2_recursive.x = reference_table.a) ) @@ -638,6 +642,8 @@ SET client_min_messages TO DEBUG1; INSERT INTO bigserial_test (x, y) SELECT x, y FROM bigserial_test; +INSERT INTO bigserial_test (x, y) SELECT a, a FROM reference_table; + INSERT INTO agg_events (user_id) SELECT f2.id FROM @@ -689,6 +695,19 @@ FROM raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id WHERE raw_events_second.user_id = 10 OR raw_events_second.user_id = 11; +INSERT INTO agg_events (user_id) +SELECT + users_ref_table.user_id +FROM + users_ref_table LEFT JOIN raw_events_second ON users_ref_table.user_id = raw_events_second.user_id + WHERE raw_events_second.user_id = 10 OR raw_events_second.user_id = 11; + +INSERT INTO agg_events (user_id) +SELECT COALESCE(raw_events_first.user_id, users_ref_table.user_id) +FROM raw_events_first + RIGHT JOIN (users_ref_table LEFT JOIN raw_events_second ON users_ref_table.user_id = raw_events_second.user_id) + ON raw_events_first.user_id = users_ref_table.user_id; + -- using a full join INSERT INTO agg_events (user_id, value_1_agg) SELECT t1.user_id AS col1, @@ -715,12 +734,25 @@ WHERE NOT EXISTS (SELECT 1 FROM raw_events_second WHERE raw_events_second.user_id =raw_events_first.user_id); +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM users_ref_table +WHERE NOT EXISTS (SELECT 1 + FROM raw_events_second + WHERE raw_events_second.user_id = users_ref_table.user_id); + -- using inner join INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4); +INSERT INTO agg_events (user_id) +SELECT raw_events_first.user_id +FROM raw_events_first INNER JOIN users_ref_table ON raw_events_first.user_id = users_ref_table.user_id +WHERE raw_events_first.value_1 IN (10, 11,12) OR users_ref_table.user_id IN (1,2,3,4); + -- We could relax distributed insert .. select checks to allow pushing -- down more clauses down to the worker nodes when inserting into a single -- shard by selecting from a colocated one. We might want to do something @@ -734,6 +766,7 @@ WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1 -- limit / offset clause INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first LIMIT 1; INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first OFFSET 1; +INSERT INTO agg_events (user_id) SELECT users_ref_table.user_id FROM users_ref_table LIMIT 1; -- using a materialized cte WITH cte AS MATERIALIZED @@ -745,6 +778,10 @@ INSERT INTO raw_events_second WITH cte AS MATERIALIZED (SELECT * FROM raw_events_first) SELECT user_id * 1000, time, value_1, value_2, value_3, value_4 FROM cte; +INSERT INTO raw_events_second (user_id) + WITH cte AS MATERIALIZED (SELECT * FROM users_ref_table) + SELECT user_id FROM cte; + -- using a regular cte WITH cte AS (SELECT * FROM raw_events_first) INSERT INTO raw_events_second @@ -763,19 +800,19 @@ INSERT INTO agg_events -- we still support complex joins via INSERT's cte list .. WITH cte AS ( - SELECT reference_table.a AS a, 1 AS b + SELECT DISTINCT(reference_table.a) AS a, 1 AS b FROM distributed_table RIGHT JOIN reference_table USING (a) ) INSERT INTO raw_events_second (user_id, value_1) SELECT (a+5)*-1, b FROM cte; --- .. but can't do so via via SELECT's cte list +-- .. and via SELECT's cte list too INSERT INTO raw_events_second (user_id, value_1) WITH cte AS ( - SELECT reference_table.a AS a, 1 AS b + SELECT DISTINCT(reference_table.a) AS a, 1 AS b FROM distributed_table RIGHT JOIN reference_table USING (a) ) - SELECT (a+5)*-1, b FROM cte; + SELECT (a+5)*2, b FROM cte; -- using set operations INSERT INTO @@ -783,6 +820,11 @@ INSERT INTO (SELECT user_id FROM raw_events_first) INTERSECT (SELECT user_id FROM raw_events_first); +INSERT INTO + raw_events_first(user_id) + (SELECT user_id FROM users_ref_table) INTERSECT + (SELECT user_id FROM raw_events_first); + -- group by clause inside subquery INSERT INTO agg_events (user_id) @@ -842,6 +884,11 @@ SELECT SUM(value_3), FROM raw_events_first GROUP BY user_id; +INSERT INTO agg_events (value_3_agg, value_1_agg) +SELECT AVG(user_id), SUM(user_id) +FROM users_ref_table +GROUP BY user_id; + -- using generate_series INSERT INTO raw_events_first (user_id, value_1, value_2) SELECT s, s, s FROM generate_series(1, 5) s;