Add support for more pushable / non-pushable insert .. select queries with null-shard-key tables (#6823)

* Add support for dist insert select by selecting from a reference
table.
  
  This was the only pushable insert .. select case that
  #6773 didn't cover.

* For the cases where we insert into a Citus table but the INSERT ..
SELECT
  query cannot be pushed down, allow pull-to-coordinator when possible.

  Remove the checks that we had at the very beginning of
  CreateInsertSelectPlanInternal so that we can try insert .. select via
  pull-to-coordinator for the cases where we cannot push-down the insert
  .. select query. What we support via pull-to-coordinator is still
  limited due to lacking of logical planner support for SELECT queries,
but this commit at least allows using pull-to-coordinator for the cases
  where the select query can be planned via router planner, without
  limiting ourselves to restrictive top-level checks.

  Also introduce some additional restrictions into
CreateDistributedInsertSelectPlan for the cases it was missing to check
  for null-shard-key tables. Indeed, it would make more sense to have
those checks for distributed tables in general, via separate PRs against
  main branch. See https://github.com/citusdata/citus/pull/6817.

* Add support for inserting into a Postgres table.
pull/6867/head
Onur Tirtir 2023-04-24 13:19:22 +03:00
parent 85745b46d5
commit 39b7711527
7 changed files with 1484 additions and 128 deletions

View File

@ -57,7 +57,6 @@ static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId,
PlannerRestrictionContext *
plannerRestrictionContext,
ParamListInfo boundParams);
static void ErrorIfInsertSelectWithNullDistKeyNotSupported(Query *originalQuery);
static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
@ -242,12 +241,6 @@ CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery,
RaiseDeferredError(deferredError, ERROR);
}
/*
* We support a limited set of INSERT .. SELECT queries if the query
* references a null-dist-key table.
*/
ErrorIfInsertSelectWithNullDistKeyNotSupported(originalQuery);
DistributedPlan *distributedPlan = CreateDistributedInsertSelectPlan(originalQuery,
plannerRestrictionContext);
@ -267,74 +260,6 @@ CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery,
}
/*
* ErrorIfInsertSelectWithNullDistKeyNotSupported throws an error if given INSERT
* .. SELECT query references a null-dist-key table (as the target table or in
* the SELECT clause) and is unsupported.
*
* Such an INSERT .. SELECT query is supported as long as the it only references
* a "colocated" set of null-dist-key tables, no other relation rte types.
*/
static void
ErrorIfInsertSelectWithNullDistKeyNotSupported(Query *originalQuery)
{
RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery);
Query *subquery = subqueryRte->subquery;
RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery(subquery);
RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(originalQuery);
Oid targetRelationId = insertRte->relid;
if (!IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE) &&
subqueryRteListProperties->hasDistTableWithoutShardKey)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot select from a distributed table that "
"does not have a shard key when inserting into "
"a different table type")));
}
else if (IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE))
{
if (subqueryRteListProperties->hasPostgresLocalTable ||
subqueryRteListProperties->hasReferenceTable ||
subqueryRteListProperties->hasCitusLocalTable ||
subqueryRteListProperties->hasDistTableWithShardKey ||
subqueryRteListProperties->hasMaterializedView)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot select from different table types "
"when inserting into a distributed table "
"that does not have a shard key")));
}
if (!subqueryRteListProperties->hasDistTableWithoutShardKey)
{
/*
* This means that the SELECT doesn't reference any Citus tables,
* Postgres tables or materialized views but references a function
* call, a values claue etc., or a cte from INSERT.
*
* In that case, we rely on the common restrictions enforced by the
* INSERT .. SELECT planners.
*/
Assert(!NeedsDistributedPlanning(subquery));
return;
}
List *distributedRelationIdList = DistributedRelationIdList(subquery);
distributedRelationIdList = lappend_oid(distributedRelationIdList,
targetRelationId);
if (!AllDistributedRelationsInListColocated(distributedRelationIdList))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot select from a non-colocated distributed "
"table when inserting into a distributed table "
"that does not have a shard key")));
}
}
}
/*
* CreateDistributedInsertSelectPlan creates a DistributedPlan for distributed
* INSERT ... SELECT queries which could consist of multiple tasks.
@ -454,16 +379,6 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery,
{
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
RTEListProperties *selectRteListProperties =
GetRTEListPropertiesForQuery(selectRte->subquery);
if (selectRteListProperties->hasDistTableWithoutShardKey)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot select from a distributed table that "
"does not have a shard key when inserting into "
"a local table")));
}
PrepareInsertSelectForCitusPlanner(insertSelectQuery);
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
@ -800,10 +715,6 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
NULL, NULL);
}
}
else if (IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE))
{
/* we've already checked the subquery via ErrorIfInsertSelectWithNullDistKeyNotSupported */
}
else
{
/*
@ -819,25 +730,49 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
"table", NULL, NULL);
}
/* ensure that INSERT's partition column comes from SELECT's partition column */
error = InsertPartitionColumnMatchesSelect(queryTree, insertRte, subqueryRte,
&selectPartitionColumnTableId);
if (error)
if (!HasDistributionKey(targetRelationId) ||
subqueryRteListProperties->hasDistTableWithoutShardKey)
{
return error;
/*
* XXX: Better to check this regardless of the fact that the target table
* has a distribution column or not.
*/
List *distributedRelationIdList = DistributedRelationIdList(subquery);
distributedRelationIdList = lappend_oid(distributedRelationIdList,
targetRelationId);
if (!AllDistributedRelationsInListColocated(distributedRelationIdList))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"distributed INSERT ... SELECT cannot reference a "
"distributed table without a shard key together "
"with non-colocated distributed tables",
NULL, NULL);
}
}
/*
* We expect partition column values come from colocated tables. Note that we
* skip this check from the reference table case given that all reference tables
* are already (and by default) co-located.
*/
if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId))
if (HasDistributionKey(targetRelationId))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"INSERT target table and the source relation of the SELECT partition "
"column value must be colocated in distributed INSERT ... SELECT",
NULL, NULL);
/* ensure that INSERT's partition column comes from SELECT's partition column */
error = InsertPartitionColumnMatchesSelect(queryTree, insertRte, subqueryRte,
&selectPartitionColumnTableId);
if (error)
{
return error;
}
/*
* We expect partition column values come from colocated tables. Note that we
* skip this check from the reference table case given that all reference tables
* are already (and by default) co-located.
*/
if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"INSERT target table and the source relation of the SELECT partition "
"column value must be colocated in distributed INSERT ... SELECT",
NULL, NULL);
}
}
}
@ -1626,6 +1561,19 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
bool repartitioned = IsRedistributablePlan(selectPlan->planTree) &&
IsSupportedRedistributionTarget(targetRelationId);
/*
* Today it's not possible to generate a distributed plan for a SELECT
* having more than one tasks if it references a null-shard-key table.
* This is because, we don't support queries beyond router planner
* if the query references a null-shard-key table.
*
* For this reason, right now we don't expect an INSERT .. SELECT
* query to go through the repartitioned INSERT .. SELECT logic if the
* SELECT query references a null-shard-key table.
*/
Assert(!repartitioned ||
!GetRTEListPropertiesForQuery(selectQueryCopy)->hasDistTableWithoutShardKey);
distributedPlan->insertSelectQuery = insertSelectQuery;
distributedPlan->selectPlanForInsertSelect = selectPlan;
distributedPlan->insertSelectMethod = repartitioned ?

View File

@ -1803,7 +1803,6 @@ ALTER TABLE trigger_table_3 ENABLE TRIGGER ALL;
-- try a few simple queries at least to make sure that we don't crash
BEGIN;
INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c2_t1;
ERROR: cannot select from a non-colocated distributed table when inserting into a distributed table that does not have a shard key
ROLLBACK;
DROP TRIGGER IF EXISTS trigger_1 ON trigger_table_1;
DROP TRIGGER trigger_2 ON trigger_table_2 CASCADE;

View File

@ -0,0 +1,814 @@
CREATE SCHEMA insert_select_null_dist_key;
SET search_path TO insert_select_null_dist_key;
SET citus.next_shard_id TO 1820000;
SET citus.shard_count TO 32;
SET client_min_messages TO WARNING;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
SET client_min_messages TO NOTICE;
CREATE TABLE nullkey_c1_t1(a int, b int);
CREATE TABLE nullkey_c1_t2(a int, b int);
SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE nullkey_c2_t1(a int, b int);
SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE reference_table(a int, b int);
SELECT create_reference_table('reference_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE distributed_table_c1_t1(a int, b int);
SELECT create_distributed_table('distributed_table_c1_t1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE distributed_table_c1_t2(a int, b int);
SELECT create_distributed_table('distributed_table_c1_t2', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE distributed_table_c2_t1(a int, b int);
SELECT create_distributed_table('distributed_table_c2_t1', 'a', colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE citus_local_table(a int, b int);
SELECT citus_add_local_table_to_metadata('citus_local_table');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
CREATE TABLE postgres_local_table(a int, b int);
CREATE FUNCTION reload_tables() RETURNS void AS $$
BEGIN
SET LOCAL client_min_messages TO WARNING;
TRUNCATE nullkey_c1_t1, nullkey_c1_t2, nullkey_c2_t1, reference_table, distributed_table_c1_t1,
distributed_table_c1_t2, distributed_table_c2_t1, citus_local_table, postgres_local_table;
INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i;
INSERT INTO nullkey_c1_t2 SELECT i, i FROM generate_series(2, 7) i;
INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i;
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
INSERT INTO distributed_table_c1_t1 SELECT i, i FROM generate_series(3, 8) i;
INSERT INTO distributed_table_c1_t2 SELECT i, i FROM generate_series(2, 9) i;
INSERT INTO distributed_table_c2_t1 SELECT i, i FROM generate_series(5, 10) i;
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i;
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
END;
$$ LANGUAGE plpgsql;
SELECT reload_tables();
reload_tables
---------------------------------------------------------------------
(1 row)
CREATE TABLE append_table (a int, b int);
SELECT create_distributed_table('append_table', 'a', 'append');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT master_create_empty_shard('append_table') AS shardid1 \gset
SELECT master_create_empty_shard('append_table') AS shardid2 \gset
SELECT master_create_empty_shard('append_table') AS shardid3 \gset
COPY append_table (a, b) FROM STDIN WITH (format 'csv', append_to_shard :shardid1);
COPY append_table (a, b) FROM STDIN WITH (format 'csv', append_to_shard :shardid2);
CREATE TABLE range_table(a int, b int);
SELECT create_distributed_table('range_table', 'a', 'range');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CALL public.create_range_partitioned_shards('range_table', '{"0","25"}','{"24","49"}');
INSERT INTO range_table VALUES (0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 50);
CREATE MATERIALIZED VIEW matview AS SELECT b*2+a AS a, a*a AS b FROM nullkey_c1_t1;
SET client_min_messages TO DEBUG2;
-- Test inserting into a distributed table by selecting from a combination of
-- different table types together with null-shard-key tables.
-- use a null-shard-key table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a reference table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 RIGHT JOIN reference_table USING (b) WHERE reference_table.a >= 1 AND reference_table.a <= 5;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 INTERSECT SELECT * FROM reference_table;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a colocated null-shard-key table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN matview USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c1_t2;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a non-colocated null-shard-key table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: found no worker with all shard placements
INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c2_t1;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: found no worker with all shard placements
-- use a distributed table that is colocated with the target table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 1
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a distributed table that is not colocated with the target table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN distributed_table_c2_t1 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
-- use a citus local table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
-- use a postgres local table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
-- use append / range distributed tables
INSERT INTO range_table SELECT * FROM nullkey_c1_t1;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO append_table SELECT * FROM nullkey_c1_t1;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: INSERT ... SELECT into an append-distributed table is not supported
SELECT avg(a), avg(b) FROM distributed_table_c1_t1 ORDER BY 1, 2;
DEBUG: Router planner cannot handle multi-shard select queries
avg | avg
---------------------------------------------------------------------
4.2105263157894737 | 4.2105263157894737
(1 row)
TRUNCATE distributed_table_c1_t1;
INSERT INTO distributed_table_c1_t1 SELECT i, i FROM generate_series(3, 8) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Test inserting into a reference table by selecting from a combination of
-- different table types together with null-shard-key tables.
-- use a null-shard-key table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1;
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a reference table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 UNION SELECT * FROM reference_table;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b) WHERE b IN (SELECT b FROM matview);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
-- use a colocated null-shard-key table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a non-colocated null-shard-key table
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: found no worker with all shard placements
-- use a distributed table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1;
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 1
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a citus local table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
-- use a postgres local table
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
SELECT avg(a), avg(b) FROM reference_table ORDER BY 1, 2;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
avg | avg
---------------------------------------------------------------------
4.0428571428571429 | 4.0428571428571429
(1 row)
TRUNCATE reference_table;
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Test inserting into a citus local table by selecting from a combination of
-- different table types together with null-shard-key tables.
-- use a null-shard-key table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1;
DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a reference table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a colocated null-shard-key table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b);
DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a distributed table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
-- use a citus local table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
-- use a postgres local table
INSERT INTO citus_local_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
SELECT avg(a), avg(b) FROM citus_local_table ORDER BY 1, 2;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
avg | avg
---------------------------------------------------------------------
4.4333333333333333 | 4.4333333333333333
(1 row)
TRUNCATE citus_local_table;
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Test inserting into a null-shard-key table by selecting from a combination of
-- different table types, together with or without null-shard-key tables.
-- use a postgres local table
INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table JOIN reference_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from a local table
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table LEFT JOIN nullkey_c1_t1 USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
-- use a citus local table
INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table;
DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table JOIN reference_table USING (a) JOIN postgres_local_table USING (a) ORDER BY 1,2 OFFSET 7;
DEBUG: distributed INSERT ... SELECT cannot select from a local table
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table JOIN nullkey_c1_t1 USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
-- use a distributed table
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN reference_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN nullkey_c1_t1 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
-- use a non-colocated null-shard-key table
INSERT INTO nullkey_c2_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a)) q JOIN nullkey_c1_t2 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a materialized view
INSERT INTO nullkey_c1_t1 SELECT * FROM matview;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT reference_table.a, reference_table.b FROM reference_table JOIN matview ON (reference_table.a = matview.a);
DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table JOIN nullkey_c1_t1 USING (a)) q JOIN matview USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
-- use append / range distributed tables
INSERT INTO nullkey_c1_t1 SELECT * FROM range_table;
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT * FROM append_table;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Router planner does not support append-partitioned tables.
DEBUG: Collecting INSERT ... SELECT results on coordinator
SELECT avg(a), avg(b) FROM nullkey_c1_t1 ORDER BY 1, 2;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
avg | avg
---------------------------------------------------------------------
5.8611111111111111 | 13.9305555555555556
(1 row)
SELECT avg(a), avg(b) FROM nullkey_c2_t1 ORDER BY 1, 2;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
avg | avg
---------------------------------------------------------------------
3.8750000000000000 | 3.8750000000000000
(1 row)
TRUNCATE nullkey_c1_t1, nullkey_c2_t1;
INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Test inserting into a local table by selecting from a combination of
-- different table types, together with or without null-shard-key tables.
INSERT INTO postgres_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
DEBUG: Creating router plan
INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1 ORDER BY 1,2 OFFSET 3 LIMIT 2;
DEBUG: Creating router plan
WITH cte_1 AS (
DELETE FROM nullkey_c1_t1 WHERE a >= 1 and a <= 4 RETURNING *
)
INSERT INTO postgres_local_table SELECT cte_1.* FROM cte_1 LEFT JOIN nullkey_c1_t2 USING (a) WHERE nullkey_c1_t2.a IS NULL;
DEBUG: Creating router plan
INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1 EXCEPT SELECT * FROM postgres_local_table;
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
SELECT avg(a), avg(b) FROM postgres_local_table ORDER BY 1, 2;
avg | avg
---------------------------------------------------------------------
5.0000000000000000 | 5.0000000000000000
(1 row)
TRUNCATE postgres_local_table;
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
-- Try slightly more complex queries.
WITH cte_1 AS (
SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a)
),
cte_2 AS (
SELECT reference_table.a, postgres_local_table.b FROM postgres_local_table LEFT JOIN reference_table USING (b)
)
INSERT INTO distributed_table_c1_t1
SELECT cte_1.* FROM cte_1 JOIN cte_2 USING (a) JOIN distributed_table_c1_t2 USING (a) ORDER BY 1,2;
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: CTE cte_1 is going to be inlined via distributed planning
DEBUG: CTE cte_2 is going to be inlined via distributed planning
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
WITH cte_1 AS (
SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a)
),
cte_2 AS (
SELECT * FROM nullkey_c1_t2 WHERE EXISTS (
SELECT 1 FROM reference_table WHERE reference_table.a = nullkey_c1_t2.a
)
ORDER BY 1,2 OFFSET 1 LIMIT 4
)
INSERT INTO distributed_table_c1_t1
SELECT * FROM cte_1 UNION SELECT * FROM cte_2 EXCEPT SELECT * FROM reference_table;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: CTE cte_1 is going to be inlined via distributed planning
DEBUG: CTE cte_2 is going to be inlined via distributed planning
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 (a, b)
SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1
JOIN (
SELECT b FROM nullkey_c1_t2 ORDER BY b DESC LIMIT 1
) t2
ON t1.b < t2.b;
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 (a, b)
WITH cte AS (
SELECT a, b,
(SELECT a FROM nullkey_c1_t2 WHERE b = t.b) AS d1,
(SELECT a FROM reference_table WHERE b = t.b) AS d2
FROM nullkey_c1_t1 t
)
SELECT d1, COALESCE(d2, a) FROM cte WHERE d1 IS NOT NULL AND d2 IS NOT NULL;
DEBUG: CTE cte is going to be inlined via distributed planning
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO citus_local_table (a, b)
SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1
CROSS JOIN (
SELECT b FROM nullkey_c2_t1 ORDER BY b LIMIT 1
) t2;
DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: found no worker with all shard placements
INSERT INTO distributed_table_c1_t1 (a, b)
SELECT t1.a, t2.b
FROM reference_table t1
LEFT JOIN (
SELECT b, ROW_NUMBER() OVER (ORDER BY b DESC) AS rn
FROM nullkey_c1_t1
) t2 ON t1.b = t2.b
WHERE t2.rn > 0;
DEBUG: Window functions without PARTITION BY on distribution column is currently unsupported
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 (a, b)
SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1
JOIN (
SELECT rn, b
FROM (
SELECT b, ROW_NUMBER() OVER (ORDER BY b DESC) AS rn
FROM distributed_table_c2_t1
) q
) t2 ON t1.b = t2.b
WHERE t2.rn > 2;
DEBUG: Window functions without PARTITION BY on distribution column is currently unsupported
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO distributed_table_c1_t1 (a, b)
SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1
JOIN (
SELECT sum_val, b
FROM (
SELECT b, SUM(a) OVER (PARTITION BY b) AS sum_val
FROM nullkey_c1_t1
) q
) t2 ON t1.b = t2.b
WHERE t2.sum_val > 2;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- MultiTaskRouterSelectQuerySupported() is unnecessarily restrictive
-- about pushing down queries with DISTINCT ON clause even if the table
-- doesn't have a shard key. See https://github.com/citusdata/citus/pull/6752.
INSERT INTO nullkey_c1_t1 SELECT DISTINCT ON (a) a, b FROM nullkey_c1_t2;
DEBUG: DISTINCT ON (non-partition column) clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Similarly, we could push down the following query as well. see
-- https://github.com/citusdata/citus/pull/6831.
INSERT INTO nullkey_c1_t1 SELECT b, SUM(a) OVER (ORDER BY b) AS sum_val FROM nullkey_c1_t1;
DEBUG: Window functions without PARTITION BY on distribution column is currently unsupported
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c2_t1
SELECT t2.a, t2.b
FROM nullkey_c1_t1 AS t2
JOIN reference_table AS t3 ON (t2.a = t3.a)
WHERE NOT EXISTS (
SELECT 1 FROM nullkey_c1_t2 AS t1 WHERE t1.b = t3.b
);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1
SELECT t1.a, t1.b
FROM nullkey_c1_t1 AS t1
WHERE t1.a NOT IN (
SELECT DISTINCT t2.a FROM distributed_table_c1_t2 AS t2
);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO distributed_table_c1_t1
SELECT t1.a, t1.b
FROM reference_table AS t1
JOIN (
SELECT t2.a FROM (
SELECT a FROM nullkey_c1_t1
UNION
SELECT a FROM nullkey_c1_t2
) AS t2
) AS t3 ON t1.a = t3.a;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Temporaryly reduce the verbosity to avoid noise
-- in the output of the next query.
SET client_min_messages TO DEBUG1;
INSERT INTO nullkey_c1_t1
SELECT t1.a, t1.b
FROM reference_table AS t1
WHERE t1.a IN (
SELECT t2.a FROM (
SELECT t3.a FROM (
SELECT a FROM distributed_table_c1_t1 WHERE b > 4
) AS t3
JOIN (
SELECT a FROM distributed_table_c1_t2 WHERE b < 7
) AS t4 ON t3.a = t4.a
) AS t2
);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM (SELECT t3.a FROM ((SELECT distributed_table_c1_t1.a FROM insert_select_null_dist_key.distributed_table_c1_t1 WHERE (distributed_table_c1_t1.b OPERATOR(pg_catalog.>) 4)) t3 JOIN (SELECT distributed_table_c1_t2.a FROM insert_select_null_dist_key.distributed_table_c1_t2 WHERE (distributed_table_c1_t2.b OPERATOR(pg_catalog.<) 7)) t4 ON ((t3.a OPERATOR(pg_catalog.=) t4.a)))) t2
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM insert_select_null_dist_key.reference_table t1 WHERE (a OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)))
DEBUG: Collecting INSERT ... SELECT results on coordinator
SET client_min_messages TO DEBUG2;
-- test upsert with plain INSERT query
CREATE TABLE upsert_test_1
(
unique_col int UNIQUE,
other_col int,
third_col int
);
DEBUG: CREATE TABLE / UNIQUE will create implicit index "upsert_test_1_unique_col_key" for table "upsert_test_1"
SELECT create_distributed_table('upsert_test_1', null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE upsert_test_2(key int primary key, value text);
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "upsert_test_2_pkey" for table "upsert_test_2"
SELECT create_distributed_table('upsert_test_2', null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO upsert_test_2 AS upsert_test_2_alias (key, value) VALUES (1, '5') ON CONFLICT(key)
DO UPDATE SET value = (upsert_test_2_alias.value::int * 2)::text;
DEBUG: Creating router plan
INSERT INTO upsert_test_2 (key, value) VALUES (1, '5') ON CONFLICT(key)
DO UPDATE SET value = (upsert_test_2.value::int * 3)::text;
DEBUG: Creating router plan
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col)
DO UPDATE SET other_col = (SELECT count(*) from upsert_test_1);
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col)
DO UPDATE SET other_col = random()::int;
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: functions used in the DO UPDATE SET clause of INSERTs on distributed tables must be marked IMMUTABLE
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col)
DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int;
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE
INSERT INTO upsert_test_1 VALUES (3, 5, 7);
DEBUG: Creating router plan
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) WHERE unique_col = random()::int
DO UPDATE SET other_col = 5;
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE
CREATE TABLE upsert_test_3 (key_1 int, key_2 bigserial, value text DEFAULT 'default_value', PRIMARY KEY (key_1, key_2));
DEBUG: CREATE TABLE will create implicit sequence "upsert_test_3_key_2_seq" for serial column "upsert_test_3.key_2"
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "upsert_test_3_pkey" for table "upsert_test_3"
SELECT create_distributed_table('upsert_test_3', null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO upsert_test_3 VALUES (1, DEFAULT, '1') RETURNING *;
DEBUG: Creating router plan
key_1 | key_2 | value
---------------------------------------------------------------------
1 | 1 | 1
(1 row)
INSERT INTO upsert_test_3 VALUES (5, DEFAULT, DEFAULT) RETURNING *;
DEBUG: Creating router plan
key_1 | key_2 | value
---------------------------------------------------------------------
5 | 2 | default_value
(1 row)
SET client_min_messages TO DEBUG1;
INSERT INTO upsert_test_3 SELECT 7, other_col, 'harcoded_text_value' FROM upsert_test_1 RETURNING *;
key_1 | key_2 | value
---------------------------------------------------------------------
7 | 5 | harcoded_text_value
(1 row)
SET client_min_messages TO DEBUG2;
-- test upsert with INSERT .. SELECT queries
SET client_min_messages TO DEBUG1;
INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col)
DO UPDATE SET other_col = upsert_test_1.other_col + 1;
-- Fails due to https://github.com/citusdata/citus/issues/6826.
INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col)
DO UPDATE SET other_col = (SELECT count(*) from upsert_test_1);
ERROR: cannot execute a distributed query from a query on a shard
DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.
HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness.
CONTEXT: while executing command on localhost:xxxxx
SET client_min_messages TO DEBUG2;
INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col)
DO UPDATE SET other_col = random()::int;
ERROR: functions used in the DO UPDATE SET clause of INSERTs on distributed tables must be marked IMMUTABLE
INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col)
DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int;
ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE
SELECT reload_tables();
DEBUG: function does not have co-located tables
reload_tables
---------------------------------------------------------------------
(1 row)
ALTER TABLE nullkey_c1_t1 ADD PRIMARY KEY (a);
DEBUG: ALTER TABLE / ADD PRIMARY KEY will create implicit index "nullkey_c1_t1_pkey" for table "nullkey_c1_t1"
DEBUG: verifying table "nullkey_c1_t1"
ALTER TABLE distributed_table_c1_t1 ADD PRIMARY KEY (a,b);
DEBUG: ALTER TABLE / ADD PRIMARY KEY will create implicit index "distributed_table_c1_t1_pkey" for table "distributed_table_c1_t1"
DEBUG: verifying table "distributed_table_c1_t1"
INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM nullkey_c1_t2 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a)
DO UPDATE SET a = t1.a + 10;
DEBUG: distributed statement: INSERT INTO insert_select_null_dist_key.nullkey_c1_t1_1820000 AS t1 (a, b) SELECT t3.a, t3.b FROM (insert_select_null_dist_key.nullkey_c1_t2_1820001 t2 JOIN insert_select_null_dist_key.reference_table_1820003 t3 ON ((t2.a OPERATOR(pg_catalog.=) t3.a))) ON CONFLICT(a) DO UPDATE SET a = (t1.a OPERATOR(pg_catalog.+) 10)
SET client_min_messages TO DEBUG1;
INSERT INTO distributed_table_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM nullkey_c1_t2 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a, b)
DO UPDATE SET b = t1.b + 10;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM distributed_table_c1_t1 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a)
DO UPDATE SET a = t1.a + 10;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- This also fails due to https://github.com/citusdata/citus/issues/6826.
INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM distributed_table_c1_t1 t2 JOIN reference_table t3 ON (t2.a = t3.a) WHERE t2.a = 3 ON CONFLICT (a)
DO UPDATE SET a = (SELECT max(b)+1 FROM distributed_table_c1_t1 WHERE a = 3);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: cannot execute a distributed query from a query on a shard
DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.
HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness.
CONTEXT: while executing command on localhost:xxxxx
SET client_min_messages TO DEBUG2;
SELECT avg(a), avg(b) FROM distributed_table_c1_t1;
DEBUG: Router planner cannot handle multi-shard select queries
avg | avg
---------------------------------------------------------------------
5.0000000000000000 | 9.2857142857142857
(1 row)
SELECT avg(a), avg(b) FROM nullkey_c1_t1;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
avg | avg
---------------------------------------------------------------------
7.5000000000000000 | 4.1666666666666667
(1 row)
SELECT avg(a), avg(b) FROM nullkey_c1_t2;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
avg | avg
---------------------------------------------------------------------
4.5000000000000000 | 4.5000000000000000
(1 row)
SELECT * FROM upsert_test_1 ORDER BY unique_col;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
unique_col | other_col | third_col
---------------------------------------------------------------------
3 | 6 | 7
(1 row)
SELECT * FROM upsert_test_2 ORDER BY key;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
key | value
---------------------------------------------------------------------
1 | 15
(1 row)
SELECT * FROM upsert_test_3 ORDER BY key_1, key_2;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
key_1 | key_2 | value
---------------------------------------------------------------------
1 | 1 | 1
5 | 2 | default_value
7 | 5 | harcoded_text_value
(3 rows)
SET client_min_messages TO WARNING;
DROP SCHEMA insert_select_null_dist_key CASCADE;
SELECT citus_remove_node('localhost', :master_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)

View File

@ -827,24 +827,55 @@ INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c1_t2;
SET client_min_messages TO DEBUG2;
-- between two non-colocated null dist key tables
INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c2_t1;
ERROR: cannot select from a non-colocated distributed table when inserting into a distributed table that does not have a shard key
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- between a null dist key table and a table of different type
SET client_min_messages TO WARNING;
EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE, VERBOSE FALSE)
INSERT INTO nullkey_c1_t1 SELECT * FROM reference_table;
ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Insert on nullkey_c1_t1_1620000 citus_table_alias (actual rows=0 loops=1)
-> Seq Scan on reference_table_1620005 reference_table (actual rows=6 loops=1)
(7 rows)
SET client_min_messages TO DEBUG2;
INSERT INTO nullkey_c1_t1 SELECT * FROM distributed_table;
ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT * FROM citus_local_table;
ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key
DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT * FROM postgres_local_table;
ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO reference_table SELECT * FROM nullkey_c1_t1;
ERROR: cannot select from a distributed table that does not have a shard key when inserting into a different table type
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table SELECT * FROM nullkey_c1_t1;
ERROR: cannot select from a distributed table that does not have a shard key when inserting into a different table type
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO citus_local_table SELECT * FROM nullkey_c1_t1;
ERROR: cannot select from a distributed table that does not have a shard key when inserting into a different table type
DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1;
ERROR: cannot select from a distributed table that does not have a shard key when inserting into a local table
DEBUG: Creating router plan
-- test subquery
SELECT count(*) FROM
(
@ -873,7 +904,7 @@ WITH level_0 AS (
WITH RECURSIVE level_2_recursive(x) AS (
VALUES (1)
UNION ALL
SELECT a + 1 FROM nullkey_c1_t1 JOIN level_2_recursive ON (a = x) WHERE a < 100
SELECT a + 1 FROM nullkey_c1_t1 JOIN level_2_recursive ON (a = x) WHERE a < 2
)
SELECT * FROM level_2_recursive RIGHT JOIN reference_table ON (level_2_recursive.x = reference_table.a)
)
@ -885,7 +916,7 @@ DEBUG: CTE level_1 is going to be inlined via distributed planning
DEBUG: Creating router plan
count
---------------------------------------------------------------------
122
106
(1 row)
WITH level_0 AS (
@ -1095,6 +1126,9 @@ INSERT INTO bigserial_test (x, y) SELECT x, y FROM bigserial_test;
DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Sequences cannot be used in router queries
INSERT INTO bigserial_test (x, y) SELECT a, a FROM reference_table;
DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO agg_events
(user_id)
SELECT f2.id FROM
@ -1119,7 +1153,6 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
ON (f.id = f2.id)
WHERE f.id IN (SELECT user_id
FROM raw_events_second);
ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key
-- upsert with returning
INSERT INTO agg_events AS ae
(
@ -1153,6 +1186,17 @@ SELECT
FROM
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id
WHERE raw_events_second.user_id = 10 OR raw_events_second.user_id = 11;
INSERT INTO agg_events (user_id)
SELECT
users_ref_table.user_id
FROM
users_ref_table LEFT JOIN raw_events_second ON users_ref_table.user_id = raw_events_second.user_id
WHERE raw_events_second.user_id = 10 OR raw_events_second.user_id = 11;
INSERT INTO agg_events (user_id)
SELECT COALESCE(raw_events_first.user_id, users_ref_table.user_id)
FROM raw_events_first
RIGHT JOIN (users_ref_table LEFT JOIN raw_events_second ON users_ref_table.user_id = raw_events_second.user_id)
ON raw_events_first.user_id = users_ref_table.user_id;
-- using a full join
INSERT INTO agg_events (user_id, value_1_agg)
SELECT t1.user_id AS col1,
@ -1176,11 +1220,22 @@ FROM raw_events_first
WHERE NOT EXISTS (SELECT 1
FROM raw_events_second
WHERE raw_events_second.user_id =raw_events_first.user_id);
INSERT INTO raw_events_second
(user_id)
SELECT user_id
FROM users_ref_table
WHERE NOT EXISTS (SELECT 1
FROM raw_events_second
WHERE raw_events_second.user_id = users_ref_table.user_id);
-- using inner join
INSERT INTO agg_events (user_id)
SELECT raw_events_first.user_id
FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1
WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4);
INSERT INTO agg_events (user_id)
SELECT raw_events_first.user_id
FROM raw_events_first INNER JOIN users_ref_table ON raw_events_first.user_id = users_ref_table.user_id
WHERE raw_events_first.value_1 IN (10, 11,12) OR users_ref_table.user_id IN (1,2,3,4);
-- We could relax distributed insert .. select checks to allow pushing
-- down more clauses down to the worker nodes when inserting into a single
-- shard by selecting from a colocated one. We might want to do something
@ -1197,6 +1252,9 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first OFFSET 1;
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO agg_events (user_id) SELECT users_ref_table.user_id FROM users_ref_table LIMIT 1;
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- using a materialized cte
WITH cte AS MATERIALIZED
(SELECT max(value_1)+1 as v1_agg, user_id FROM raw_events_first GROUP BY user_id)
@ -1207,6 +1265,9 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO raw_events_second
WITH cte AS MATERIALIZED (SELECT * FROM raw_events_first)
SELECT user_id * 1000, time, value_1, value_2, value_3, value_4 FROM cte;
INSERT INTO raw_events_second (user_id)
WITH cte AS MATERIALIZED (SELECT * FROM users_ref_table)
SELECT user_id FROM cte;
-- using a regular cte
WITH cte AS (SELECT * FROM raw_events_first)
INSERT INTO raw_events_second
@ -1229,7 +1290,7 @@ DEBUG: Subqueries without relations are not allowed in distributed INSERT ... S
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- we still support complex joins via INSERT's cte list ..
WITH cte AS (
SELECT reference_table.a AS a, 1 AS b
SELECT DISTINCT(reference_table.a) AS a, 1 AS b
FROM distributed_table RIGHT JOIN reference_table USING (a)
)
INSERT INTO raw_events_second (user_id, value_1)
@ -1240,17 +1301,23 @@ DEBUG: recursively planning left side of the right join since the outer side is
DEBUG: recursively planning distributed relation "distributed_table" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "distributed_table" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM query_null_dist_key.distributed_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT "?column?" AS user_id, b AS value_1 FROM (SELECT ((cte.a OPERATOR(pg_catalog.+) 5) OPERATOR(pg_catalog.*) '-1'::integer), cte.b FROM (SELECT reference_table.a, 1 AS b FROM ((SELECT distributed_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) distributed_table_1) distributed_table RIGHT JOIN query_null_dist_key.reference_table USING (a))) cte) citus_insert_select_subquery("?column?", b)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT "?column?" AS user_id, b AS value_1 FROM (SELECT ((cte.a OPERATOR(pg_catalog.+) 5) OPERATOR(pg_catalog.*) '-1'::integer), cte.b FROM (SELECT DISTINCT reference_table.a, 1 AS b FROM ((SELECT distributed_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) distributed_table_1) distributed_table RIGHT JOIN query_null_dist_key.reference_table USING (a))) cte) citus_insert_select_subquery("?column?", b)
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- .. but can't do so via via SELECT's cte list
-- .. and via SELECT's cte list too
INSERT INTO raw_events_second (user_id, value_1)
WITH cte AS (
SELECT reference_table.a AS a, 1 AS b
SELECT DISTINCT(reference_table.a) AS a, 1 AS b
FROM distributed_table RIGHT JOIN reference_table USING (a)
)
SELECT (a+5)*-1, b FROM cte;
SELECT (a+5)*2, b FROM cte;
DEBUG: CTE cte is going to be inlined via distributed planning
ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: recursively planning left side of the right join since the outer side is a recurring rel
DEBUG: recursively planning distributed relation "distributed_table" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "distributed_table" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM query_null_dist_key.distributed_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((a OPERATOR(pg_catalog.+) 5) OPERATOR(pg_catalog.*) 2) AS user_id, b AS value_1 FROM (SELECT DISTINCT reference_table.a, 1 AS b FROM ((SELECT distributed_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) distributed_table_1) distributed_table RIGHT JOIN query_null_dist_key.reference_table USING (a))) cte
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- using set operations
INSERT INTO
raw_events_first(user_id)
@ -1258,6 +1325,12 @@ INSERT INTO
(SELECT user_id FROM raw_events_first);
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO
raw_events_first(user_id)
(SELECT user_id FROM users_ref_table) INTERSECT
(SELECT user_id FROM raw_events_first);
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- group by clause inside subquery
INSERT INTO agg_events
(user_id)
@ -1313,6 +1386,10 @@ SELECT SUM(value_3),
Avg(value_2)
FROM raw_events_first
GROUP BY user_id;
INSERT INTO agg_events (value_3_agg, value_1_agg)
SELECT AVG(user_id), SUM(user_id)
FROM users_ref_table
GROUP BY user_id;
-- using generate_series
INSERT INTO raw_events_first (user_id, value_1, value_2)
SELECT s, s, s FROM generate_series(1, 5) s;

View File

@ -201,6 +201,7 @@ test: local_dist_join_mixed
test: citus_local_dist_joins
test: recurring_outer_join
test: query_null_dist_key
test: insert_select_null_dist_key
test: pg_dump
# ---------

View File

@ -0,0 +1,470 @@
CREATE SCHEMA insert_select_null_dist_key;
SET search_path TO insert_select_null_dist_key;
SET citus.next_shard_id TO 1820000;
SET citus.shard_count TO 32;
SET client_min_messages TO WARNING;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
SET client_min_messages TO NOTICE;
CREATE TABLE nullkey_c1_t1(a int, b int);
CREATE TABLE nullkey_c1_t2(a int, b int);
SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none');
SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1');
CREATE TABLE nullkey_c2_t1(a int, b int);
SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none');
CREATE TABLE reference_table(a int, b int);
SELECT create_reference_table('reference_table');
CREATE TABLE distributed_table_c1_t1(a int, b int);
SELECT create_distributed_table('distributed_table_c1_t1', 'a');
CREATE TABLE distributed_table_c1_t2(a int, b int);
SELECT create_distributed_table('distributed_table_c1_t2', 'a');
CREATE TABLE distributed_table_c2_t1(a int, b int);
SELECT create_distributed_table('distributed_table_c2_t1', 'a', colocate_with=>'none');
CREATE TABLE citus_local_table(a int, b int);
SELECT citus_add_local_table_to_metadata('citus_local_table');
CREATE TABLE postgres_local_table(a int, b int);
CREATE FUNCTION reload_tables() RETURNS void AS $$
BEGIN
SET LOCAL client_min_messages TO WARNING;
TRUNCATE nullkey_c1_t1, nullkey_c1_t2, nullkey_c2_t1, reference_table, distributed_table_c1_t1,
distributed_table_c1_t2, distributed_table_c2_t1, citus_local_table, postgres_local_table;
INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i;
INSERT INTO nullkey_c1_t2 SELECT i, i FROM generate_series(2, 7) i;
INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i;
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
INSERT INTO distributed_table_c1_t1 SELECT i, i FROM generate_series(3, 8) i;
INSERT INTO distributed_table_c1_t2 SELECT i, i FROM generate_series(2, 9) i;
INSERT INTO distributed_table_c2_t1 SELECT i, i FROM generate_series(5, 10) i;
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i;
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
END;
$$ LANGUAGE plpgsql;
SELECT reload_tables();
CREATE TABLE append_table (a int, b int);
SELECT create_distributed_table('append_table', 'a', 'append');
SELECT master_create_empty_shard('append_table') AS shardid1 \gset
SELECT master_create_empty_shard('append_table') AS shardid2 \gset
SELECT master_create_empty_shard('append_table') AS shardid3 \gset
COPY append_table (a, b) FROM STDIN WITH (format 'csv', append_to_shard :shardid1);
1, 40
2, 42
3, 44
4, 46
5, 48
\.
COPY append_table (a, b) FROM STDIN WITH (format 'csv', append_to_shard :shardid2);
6, 50
7, 52
8, 54
9, 56
10, 58
\.
CREATE TABLE range_table(a int, b int);
SELECT create_distributed_table('range_table', 'a', 'range');
CALL public.create_range_partitioned_shards('range_table', '{"0","25"}','{"24","49"}');
INSERT INTO range_table VALUES (0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 50);
CREATE MATERIALIZED VIEW matview AS SELECT b*2+a AS a, a*a AS b FROM nullkey_c1_t1;
SET client_min_messages TO DEBUG2;
-- Test inserting into a distributed table by selecting from a combination of
-- different table types together with null-shard-key tables.
-- use a null-shard-key table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1;
-- use a reference table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 RIGHT JOIN reference_table USING (b) WHERE reference_table.a >= 1 AND reference_table.a <= 5;
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b);
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 INTERSECT SELECT * FROM reference_table;
-- use a colocated null-shard-key table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b);
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a);
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN matview USING (a);
INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c1_t2;
-- use a non-colocated null-shard-key table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a);
INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c2_t1;
-- use a distributed table that is colocated with the target table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b);
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1;
-- use a distributed table that is not colocated with the target table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN distributed_table_c2_t1 USING (a);
-- use a citus local table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
-- use a postgres local table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a);
-- use append / range distributed tables
INSERT INTO range_table SELECT * FROM nullkey_c1_t1;
INSERT INTO append_table SELECT * FROM nullkey_c1_t1;
SELECT avg(a), avg(b) FROM distributed_table_c1_t1 ORDER BY 1, 2;
TRUNCATE distributed_table_c1_t1;
INSERT INTO distributed_table_c1_t1 SELECT i, i FROM generate_series(3, 8) i;
-- Test inserting into a reference table by selecting from a combination of
-- different table types together with null-shard-key tables.
-- use a null-shard-key table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1;
-- use a reference table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b);
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 UNION SELECT * FROM reference_table;
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b) WHERE b IN (SELECT b FROM matview);
-- use a colocated null-shard-key table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b);
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a);
-- use a non-colocated null-shard-key table
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a);
-- use a distributed table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b);
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1;
-- use a citus local table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
-- use a postgres local table
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a);
SELECT avg(a), avg(b) FROM reference_table ORDER BY 1, 2;
TRUNCATE reference_table;
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
-- Test inserting into a citus local table by selecting from a combination of
-- different table types together with null-shard-key tables.
-- use a null-shard-key table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1;
-- use a reference table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
-- use a colocated null-shard-key table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b);
-- use a distributed table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
-- use a citus local table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
-- use a postgres local table
INSERT INTO citus_local_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a);
SELECT avg(a), avg(b) FROM citus_local_table ORDER BY 1, 2;
TRUNCATE citus_local_table;
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i;
-- Test inserting into a null-shard-key table by selecting from a combination of
-- different table types, together with or without null-shard-key tables.
-- use a postgres local table
INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table;
INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table JOIN reference_table USING (a);
INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table LEFT JOIN nullkey_c1_t1 USING (a);
-- use a citus local table
INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table;
INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table JOIN reference_table USING (a) JOIN postgres_local_table USING (a) ORDER BY 1,2 OFFSET 7;
INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table JOIN nullkey_c1_t1 USING (a);
-- use a distributed table
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2;
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN reference_table USING (a);
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN nullkey_c1_t1 USING (a);
-- use a non-colocated null-shard-key table
INSERT INTO nullkey_c2_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a)) q JOIN nullkey_c1_t2 USING (a);
-- use a materialized view
INSERT INTO nullkey_c1_t1 SELECT * FROM matview;
INSERT INTO nullkey_c1_t1 SELECT reference_table.a, reference_table.b FROM reference_table JOIN matview ON (reference_table.a = matview.a);
INSERT INTO nullkey_c1_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table JOIN nullkey_c1_t1 USING (a)) q JOIN matview USING (a);
-- use append / range distributed tables
INSERT INTO nullkey_c1_t1 SELECT * FROM range_table;
INSERT INTO nullkey_c1_t1 SELECT * FROM append_table;
SELECT avg(a), avg(b) FROM nullkey_c1_t1 ORDER BY 1, 2;
SELECT avg(a), avg(b) FROM nullkey_c2_t1 ORDER BY 1, 2;
TRUNCATE nullkey_c1_t1, nullkey_c2_t1;
INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i;
INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i;
-- Test inserting into a local table by selecting from a combination of
-- different table types, together with or without null-shard-key tables.
INSERT INTO postgres_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1 ORDER BY 1,2 OFFSET 3 LIMIT 2;
WITH cte_1 AS (
DELETE FROM nullkey_c1_t1 WHERE a >= 1 and a <= 4 RETURNING *
)
INSERT INTO postgres_local_table SELECT cte_1.* FROM cte_1 LEFT JOIN nullkey_c1_t2 USING (a) WHERE nullkey_c1_t2.a IS NULL;
INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1 EXCEPT SELECT * FROM postgres_local_table;
SELECT avg(a), avg(b) FROM postgres_local_table ORDER BY 1, 2;
TRUNCATE postgres_local_table;
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
-- Try slightly more complex queries.
WITH cte_1 AS (
SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a)
),
cte_2 AS (
SELECT reference_table.a, postgres_local_table.b FROM postgres_local_table LEFT JOIN reference_table USING (b)
)
INSERT INTO distributed_table_c1_t1
SELECT cte_1.* FROM cte_1 JOIN cte_2 USING (a) JOIN distributed_table_c1_t2 USING (a) ORDER BY 1,2;
WITH cte_1 AS (
SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a)
),
cte_2 AS (
SELECT * FROM nullkey_c1_t2 WHERE EXISTS (
SELECT 1 FROM reference_table WHERE reference_table.a = nullkey_c1_t2.a
)
ORDER BY 1,2 OFFSET 1 LIMIT 4
)
INSERT INTO distributed_table_c1_t1
SELECT * FROM cte_1 UNION SELECT * FROM cte_2 EXCEPT SELECT * FROM reference_table;
INSERT INTO distributed_table_c1_t1 (a, b)
SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1
JOIN (
SELECT b FROM nullkey_c1_t2 ORDER BY b DESC LIMIT 1
) t2
ON t1.b < t2.b;
INSERT INTO distributed_table_c1_t1 (a, b)
WITH cte AS (
SELECT a, b,
(SELECT a FROM nullkey_c1_t2 WHERE b = t.b) AS d1,
(SELECT a FROM reference_table WHERE b = t.b) AS d2
FROM nullkey_c1_t1 t
)
SELECT d1, COALESCE(d2, a) FROM cte WHERE d1 IS NOT NULL AND d2 IS NOT NULL;
INSERT INTO citus_local_table (a, b)
SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1
CROSS JOIN (
SELECT b FROM nullkey_c2_t1 ORDER BY b LIMIT 1
) t2;
INSERT INTO distributed_table_c1_t1 (a, b)
SELECT t1.a, t2.b
FROM reference_table t1
LEFT JOIN (
SELECT b, ROW_NUMBER() OVER (ORDER BY b DESC) AS rn
FROM nullkey_c1_t1
) t2 ON t1.b = t2.b
WHERE t2.rn > 0;
INSERT INTO nullkey_c1_t1 (a, b)
SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1
JOIN (
SELECT rn, b
FROM (
SELECT b, ROW_NUMBER() OVER (ORDER BY b DESC) AS rn
FROM distributed_table_c2_t1
) q
) t2 ON t1.b = t2.b
WHERE t2.rn > 2;
INSERT INTO distributed_table_c1_t1 (a, b)
SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1
JOIN (
SELECT sum_val, b
FROM (
SELECT b, SUM(a) OVER (PARTITION BY b) AS sum_val
FROM nullkey_c1_t1
) q
) t2 ON t1.b = t2.b
WHERE t2.sum_val > 2;
-- MultiTaskRouterSelectQuerySupported() is unnecessarily restrictive
-- about pushing down queries with DISTINCT ON clause even if the table
-- doesn't have a shard key. See https://github.com/citusdata/citus/pull/6752.
INSERT INTO nullkey_c1_t1 SELECT DISTINCT ON (a) a, b FROM nullkey_c1_t2;
-- Similarly, we could push down the following query as well. see
-- https://github.com/citusdata/citus/pull/6831.
INSERT INTO nullkey_c1_t1 SELECT b, SUM(a) OVER (ORDER BY b) AS sum_val FROM nullkey_c1_t1;
INSERT INTO nullkey_c2_t1
SELECT t2.a, t2.b
FROM nullkey_c1_t1 AS t2
JOIN reference_table AS t3 ON (t2.a = t3.a)
WHERE NOT EXISTS (
SELECT 1 FROM nullkey_c1_t2 AS t1 WHERE t1.b = t3.b
);
INSERT INTO distributed_table_c1_t1
SELECT t1.a, t1.b
FROM nullkey_c1_t1 AS t1
WHERE t1.a NOT IN (
SELECT DISTINCT t2.a FROM distributed_table_c1_t2 AS t2
);
INSERT INTO distributed_table_c1_t1
SELECT t1.a, t1.b
FROM reference_table AS t1
JOIN (
SELECT t2.a FROM (
SELECT a FROM nullkey_c1_t1
UNION
SELECT a FROM nullkey_c1_t2
) AS t2
) AS t3 ON t1.a = t3.a;
-- Temporaryly reduce the verbosity to avoid noise
-- in the output of the next query.
SET client_min_messages TO DEBUG1;
INSERT INTO nullkey_c1_t1
SELECT t1.a, t1.b
FROM reference_table AS t1
WHERE t1.a IN (
SELECT t2.a FROM (
SELECT t3.a FROM (
SELECT a FROM distributed_table_c1_t1 WHERE b > 4
) AS t3
JOIN (
SELECT a FROM distributed_table_c1_t2 WHERE b < 7
) AS t4 ON t3.a = t4.a
) AS t2
);
SET client_min_messages TO DEBUG2;
-- test upsert with plain INSERT query
CREATE TABLE upsert_test_1
(
unique_col int UNIQUE,
other_col int,
third_col int
);
SELECT create_distributed_table('upsert_test_1', null);
CREATE TABLE upsert_test_2(key int primary key, value text);
SELECT create_distributed_table('upsert_test_2', null);
INSERT INTO upsert_test_2 AS upsert_test_2_alias (key, value) VALUES (1, '5') ON CONFLICT(key)
DO UPDATE SET value = (upsert_test_2_alias.value::int * 2)::text;
INSERT INTO upsert_test_2 (key, value) VALUES (1, '5') ON CONFLICT(key)
DO UPDATE SET value = (upsert_test_2.value::int * 3)::text;
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col)
DO UPDATE SET other_col = (SELECT count(*) from upsert_test_1);
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col)
DO UPDATE SET other_col = random()::int;
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col)
DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int;
INSERT INTO upsert_test_1 VALUES (3, 5, 7);
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) WHERE unique_col = random()::int
DO UPDATE SET other_col = 5;
CREATE TABLE upsert_test_3 (key_1 int, key_2 bigserial, value text DEFAULT 'default_value', PRIMARY KEY (key_1, key_2));
SELECT create_distributed_table('upsert_test_3', null);
INSERT INTO upsert_test_3 VALUES (1, DEFAULT, '1') RETURNING *;
INSERT INTO upsert_test_3 VALUES (5, DEFAULT, DEFAULT) RETURNING *;
SET client_min_messages TO DEBUG1;
INSERT INTO upsert_test_3 SELECT 7, other_col, 'harcoded_text_value' FROM upsert_test_1 RETURNING *;
SET client_min_messages TO DEBUG2;
-- test upsert with INSERT .. SELECT queries
SET client_min_messages TO DEBUG1;
INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col)
DO UPDATE SET other_col = upsert_test_1.other_col + 1;
-- Fails due to https://github.com/citusdata/citus/issues/6826.
INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col)
DO UPDATE SET other_col = (SELECT count(*) from upsert_test_1);
SET client_min_messages TO DEBUG2;
INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col)
DO UPDATE SET other_col = random()::int;
INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col)
DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int;
SELECT reload_tables();
ALTER TABLE nullkey_c1_t1 ADD PRIMARY KEY (a);
ALTER TABLE distributed_table_c1_t1 ADD PRIMARY KEY (a,b);
INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM nullkey_c1_t2 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a)
DO UPDATE SET a = t1.a + 10;
SET client_min_messages TO DEBUG1;
INSERT INTO distributed_table_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM nullkey_c1_t2 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a, b)
DO UPDATE SET b = t1.b + 10;
INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM distributed_table_c1_t1 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a)
DO UPDATE SET a = t1.a + 10;
-- This also fails due to https://github.com/citusdata/citus/issues/6826.
INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM distributed_table_c1_t1 t2 JOIN reference_table t3 ON (t2.a = t3.a) WHERE t2.a = 3 ON CONFLICT (a)
DO UPDATE SET a = (SELECT max(b)+1 FROM distributed_table_c1_t1 WHERE a = 3);
SET client_min_messages TO DEBUG2;
SELECT avg(a), avg(b) FROM distributed_table_c1_t1;
SELECT avg(a), avg(b) FROM nullkey_c1_t1;
SELECT avg(a), avg(b) FROM nullkey_c1_t2;
SELECT * FROM upsert_test_1 ORDER BY unique_col;
SELECT * FROM upsert_test_2 ORDER BY key;
SELECT * FROM upsert_test_3 ORDER BY key_1, key_2;
SET client_min_messages TO WARNING;
DROP SCHEMA insert_select_null_dist_key CASCADE;
SELECT citus_remove_node('localhost', :master_port);

View File

@ -514,7 +514,11 @@ SET client_min_messages TO DEBUG2;
INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c2_t1;
-- between a null dist key table and a table of different type
SET client_min_messages TO WARNING;
EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE, VERBOSE FALSE)
INSERT INTO nullkey_c1_t1 SELECT * FROM reference_table;
SET client_min_messages TO DEBUG2;
INSERT INTO nullkey_c1_t1 SELECT * FROM distributed_table;
INSERT INTO nullkey_c1_t1 SELECT * FROM citus_local_table;
INSERT INTO nullkey_c1_t1 SELECT * FROM postgres_local_table;
@ -543,7 +547,7 @@ WITH level_0 AS (
WITH RECURSIVE level_2_recursive(x) AS (
VALUES (1)
UNION ALL
SELECT a + 1 FROM nullkey_c1_t1 JOIN level_2_recursive ON (a = x) WHERE a < 100
SELECT a + 1 FROM nullkey_c1_t1 JOIN level_2_recursive ON (a = x) WHERE a < 2
)
SELECT * FROM level_2_recursive RIGHT JOIN reference_table ON (level_2_recursive.x = reference_table.a)
)
@ -638,6 +642,8 @@ SET client_min_messages TO DEBUG1;
INSERT INTO bigserial_test (x, y) SELECT x, y FROM bigserial_test;
INSERT INTO bigserial_test (x, y) SELECT a, a FROM reference_table;
INSERT INTO agg_events
(user_id)
SELECT f2.id FROM
@ -689,6 +695,19 @@ FROM
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id
WHERE raw_events_second.user_id = 10 OR raw_events_second.user_id = 11;
INSERT INTO agg_events (user_id)
SELECT
users_ref_table.user_id
FROM
users_ref_table LEFT JOIN raw_events_second ON users_ref_table.user_id = raw_events_second.user_id
WHERE raw_events_second.user_id = 10 OR raw_events_second.user_id = 11;
INSERT INTO agg_events (user_id)
SELECT COALESCE(raw_events_first.user_id, users_ref_table.user_id)
FROM raw_events_first
RIGHT JOIN (users_ref_table LEFT JOIN raw_events_second ON users_ref_table.user_id = raw_events_second.user_id)
ON raw_events_first.user_id = users_ref_table.user_id;
-- using a full join
INSERT INTO agg_events (user_id, value_1_agg)
SELECT t1.user_id AS col1,
@ -715,12 +734,25 @@ WHERE NOT EXISTS (SELECT 1
FROM raw_events_second
WHERE raw_events_second.user_id =raw_events_first.user_id);
INSERT INTO raw_events_second
(user_id)
SELECT user_id
FROM users_ref_table
WHERE NOT EXISTS (SELECT 1
FROM raw_events_second
WHERE raw_events_second.user_id = users_ref_table.user_id);
-- using inner join
INSERT INTO agg_events (user_id)
SELECT raw_events_first.user_id
FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1
WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4);
INSERT INTO agg_events (user_id)
SELECT raw_events_first.user_id
FROM raw_events_first INNER JOIN users_ref_table ON raw_events_first.user_id = users_ref_table.user_id
WHERE raw_events_first.value_1 IN (10, 11,12) OR users_ref_table.user_id IN (1,2,3,4);
-- We could relax distributed insert .. select checks to allow pushing
-- down more clauses down to the worker nodes when inserting into a single
-- shard by selecting from a colocated one. We might want to do something
@ -734,6 +766,7 @@ WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1
-- limit / offset clause
INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first LIMIT 1;
INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first OFFSET 1;
INSERT INTO agg_events (user_id) SELECT users_ref_table.user_id FROM users_ref_table LIMIT 1;
-- using a materialized cte
WITH cte AS MATERIALIZED
@ -745,6 +778,10 @@ INSERT INTO raw_events_second
WITH cte AS MATERIALIZED (SELECT * FROM raw_events_first)
SELECT user_id * 1000, time, value_1, value_2, value_3, value_4 FROM cte;
INSERT INTO raw_events_second (user_id)
WITH cte AS MATERIALIZED (SELECT * FROM users_ref_table)
SELECT user_id FROM cte;
-- using a regular cte
WITH cte AS (SELECT * FROM raw_events_first)
INSERT INTO raw_events_second
@ -763,19 +800,19 @@ INSERT INTO agg_events
-- we still support complex joins via INSERT's cte list ..
WITH cte AS (
SELECT reference_table.a AS a, 1 AS b
SELECT DISTINCT(reference_table.a) AS a, 1 AS b
FROM distributed_table RIGHT JOIN reference_table USING (a)
)
INSERT INTO raw_events_second (user_id, value_1)
SELECT (a+5)*-1, b FROM cte;
-- .. but can't do so via via SELECT's cte list
-- .. and via SELECT's cte list too
INSERT INTO raw_events_second (user_id, value_1)
WITH cte AS (
SELECT reference_table.a AS a, 1 AS b
SELECT DISTINCT(reference_table.a) AS a, 1 AS b
FROM distributed_table RIGHT JOIN reference_table USING (a)
)
SELECT (a+5)*-1, b FROM cte;
SELECT (a+5)*2, b FROM cte;
-- using set operations
INSERT INTO
@ -783,6 +820,11 @@ INSERT INTO
(SELECT user_id FROM raw_events_first) INTERSECT
(SELECT user_id FROM raw_events_first);
INSERT INTO
raw_events_first(user_id)
(SELECT user_id FROM users_ref_table) INTERSECT
(SELECT user_id FROM raw_events_first);
-- group by clause inside subquery
INSERT INTO agg_events
(user_id)
@ -842,6 +884,11 @@ SELECT SUM(value_3),
FROM raw_events_first
GROUP BY user_id;
INSERT INTO agg_events (value_3_agg, value_1_agg)
SELECT AVG(user_id), SUM(user_id)
FROM users_ref_table
GROUP BY user_id;
-- using generate_series
INSERT INTO raw_events_first (user_id, value_1, value_2)
SELECT s, s, s FROM generate_series(1, 5) s;