From 85745b46d5e98d5b5fa667a9bc30f5bd42ba9ad6 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Tue, 28 Mar 2023 18:47:59 +0300 Subject: [PATCH] Add initial sql support for distributed tables that don't have a shard key (#6773/#6822) Enable router planner and a limited version of INSERT .. SELECT planner for the queries that reference colocated null shard key tables. * SELECT / UPDATE / DELETE / MERGE is supported as long as it's a router query. * INSERT .. SELECT is supported as long as it only references colocated null shard key tables. Note that this is not only limited to distributed INSERT .. SELECT but also covers a limited set of query types that require pull-to-coordinator, e.g., due to LIMIT clause, generate_series() etc. ... (Ideally distributed INSERT .. SELECT could handle such queries too, e.g., when we're only referencing tables that don't have a shard key, but today this is not the case. See https://github.com/citusdata/citus/pull/6773#discussion_r1140130562. --- .../distributed/planner/distributed_planner.c | 32 + .../planner/fast_path_router_planner.c | 17 +- .../planner/insert_select_planner.c | 92 +- .../distributed/planner/merge_planner.c | 5 + .../planner/multi_logical_planner.c | 16 +- .../planner/multi_router_planner.c | 24 +- src/include/distributed/distributed_planner.h | 13 +- .../distributed/multi_logical_planner.h | 1 + .../distributed/multi_router_planner.h | 1 + src/test/regress/citus_tests/config.py | 6 - .../regress/expected/create_null_dist_key.out | 1 + src/test/regress/expected/merge.out | 147 ++ .../regress/expected/query_null_dist_key.out | 1796 +++++++++++++++++ src/test/regress/multi_1_schedule | 1 + src/test/regress/sql/merge.sql | 112 + src/test/regress/sql/query_null_dist_key.sql | 1132 +++++++++++ 16 files changed, 3375 insertions(+), 21 deletions(-) create mode 100644 src/test/regress/expected/query_null_dist_key.out create mode 100644 src/test/regress/sql/query_null_dist_key.sql diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 38962b333..50509baea 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -1025,6 +1025,17 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina { return distributedPlan; } + else if (ContainsNullDistKeyTable(originalQuery)) + { + /* + * We only support router queries if the query contains reference to + * a null-dist-key table. This temporary restriction will be removed + * once we support recursive planning for the queries that reference + * null-dist-key tables. + */ + WrapRouterErrorForNullDistKeyTable(distributedPlan->planningError); + RaiseDeferredError(distributedPlan->planningError, ERROR); + } else { RaiseDeferredError(distributedPlan->planningError, DEBUG2); @@ -2462,6 +2473,18 @@ HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams) } +/* + * ContainsNullDistKeyTable returns true if given query contains reference + * to a null-dist-key table. + */ +bool +ContainsNullDistKeyTable(Query *query) +{ + RTEListProperties *rteListProperties = GetRTEListPropertiesForQuery(query); + return rteListProperties->hasDistTableWithoutShardKey; +} + + /* * GetRTEListPropertiesForQuery is a wrapper around GetRTEListProperties that * returns RTEListProperties for the rte list retrieved from query. @@ -2538,6 +2561,15 @@ GetRTEListProperties(List *rangeTableList) else if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE)) { rteListProperties->hasDistributedTable = true; + + if (!HasDistributionKeyCacheEntry(cacheEntry)) + { + rteListProperties->hasDistTableWithoutShardKey = true; + } + else + { + rteListProperties->hasDistTableWithShardKey = true; + } } else { diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index ecb62478a..2be4a5626 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -212,6 +212,16 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue) return false; } + /* + * If the table doesn't have a distribution column, we don't need to + * check anything further. + */ + Var *distributionKey = PartitionColumn(distributedTableId, 1); + if (!distributionKey) + { + return true; + } + /* WHERE clause should not be empty for distributed tables */ if (joinTree == NULL || (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && joinTree->quals == @@ -220,13 +230,6 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue) return false; } - /* if that's a reference table, we don't need to check anything further */ - Var *distributionKey = PartitionColumn(distributedTableId, 1); - if (!distributionKey) - { - return true; - } - /* convert list of expressions into expression tree for further processing */ quals = joinTree->quals; if (quals != NULL && IsA(quals, List)) diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 53fe58cdb..175f6bc6f 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -57,6 +57,7 @@ static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId, PlannerRestrictionContext * plannerRestrictionContext, ParamListInfo boundParams); +static void ErrorIfInsertSelectWithNullDistKeyNotSupported(Query *originalQuery); static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); @@ -241,6 +242,12 @@ CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery, RaiseDeferredError(deferredError, ERROR); } + /* + * We support a limited set of INSERT .. SELECT queries if the query + * references a null-dist-key table. + */ + ErrorIfInsertSelectWithNullDistKeyNotSupported(originalQuery); + DistributedPlan *distributedPlan = CreateDistributedInsertSelectPlan(originalQuery, plannerRestrictionContext); @@ -260,6 +267,74 @@ CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery, } +/* + * ErrorIfInsertSelectWithNullDistKeyNotSupported throws an error if given INSERT + * .. SELECT query references a null-dist-key table (as the target table or in + * the SELECT clause) and is unsupported. + * + * Such an INSERT .. SELECT query is supported as long as the it only references + * a "colocated" set of null-dist-key tables, no other relation rte types. + */ +static void +ErrorIfInsertSelectWithNullDistKeyNotSupported(Query *originalQuery) +{ + RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery); + Query *subquery = subqueryRte->subquery; + RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery(subquery); + + RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(originalQuery); + Oid targetRelationId = insertRte->relid; + if (!IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE) && + subqueryRteListProperties->hasDistTableWithoutShardKey) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot select from a distributed table that " + "does not have a shard key when inserting into " + "a different table type"))); + } + else if (IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE)) + { + if (subqueryRteListProperties->hasPostgresLocalTable || + subqueryRteListProperties->hasReferenceTable || + subqueryRteListProperties->hasCitusLocalTable || + subqueryRteListProperties->hasDistTableWithShardKey || + subqueryRteListProperties->hasMaterializedView) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot select from different table types " + "when inserting into a distributed table " + "that does not have a shard key"))); + } + + if (!subqueryRteListProperties->hasDistTableWithoutShardKey) + { + /* + * This means that the SELECT doesn't reference any Citus tables, + * Postgres tables or materialized views but references a function + * call, a values claue etc., or a cte from INSERT. + * + * In that case, we rely on the common restrictions enforced by the + * INSERT .. SELECT planners. + */ + Assert(!NeedsDistributedPlanning(subquery)); + return; + } + + List *distributedRelationIdList = DistributedRelationIdList(subquery); + distributedRelationIdList = lappend_oid(distributedRelationIdList, + targetRelationId); + + if (!AllDistributedRelationsInListColocated(distributedRelationIdList)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot select from a non-colocated distributed " + "table when inserting into a distributed table " + "that does not have a shard key"))); + } + } +} + + /* * CreateDistributedInsertSelectPlan creates a DistributedPlan for distributed * INSERT ... SELECT queries which could consist of multiple tasks. @@ -379,6 +454,16 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery, { RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); + RTEListProperties *selectRteListProperties = + GetRTEListPropertiesForQuery(selectRte->subquery); + if (selectRteListProperties->hasDistTableWithoutShardKey) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot select from a distributed table that " + "does not have a shard key when inserting into " + "a local table"))); + } + PrepareInsertSelectForCitusPlanner(insertSelectQuery); /* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */ @@ -717,10 +802,7 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte, } else if (IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE)) { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "distributed INSERT ... SELECT cannot target a distributed " - "table with a null shard key", - NULL, NULL); + /* we've already checked the subquery via ErrorIfInsertSelectWithNullDistKeyNotSupported */ } else { @@ -874,7 +956,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, */ RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery( copiedSubquery); - if (subqueryRteListProperties->hasDistributedTable) + if (subqueryRteListProperties->hasDistTableWithShardKey) { AddPartitionKeyNotNullFilterToSelect(copiedSubquery); } diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 5b39aeba6..930a44db8 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -509,6 +509,11 @@ InsertDistributionColumnMatchesSource(Oid targetRelationId, Query *query) return NULL; } + if (!HasDistributionKey(targetRelationId)) + { + return NULL; + } + bool foundDistributionColumn = false; MergeAction *action = NULL; foreach_ptr(action, query->mergeActionList) diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index d9322bf5e..7732b6c5e 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -272,7 +272,7 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList) if (!targetListOnPartitionColumn) { if (!FindNodeMatchingCheckFunctionInRangeTableList(query->rtable, - IsDistributedTableRTE)) + IsTableWithDistKeyRTE)) { targetListOnPartitionColumn = true; } @@ -379,6 +379,20 @@ IsReferenceTableRTE(Node *node) } +/* + * IsTableWithDistKeyRTE gets a node and returns true if the node + * is a range table relation entry that points to a distributed table + * that has a distribution column. + */ +bool +IsTableWithDistKeyRTE(Node *node) +{ + Oid relationId = NodeTryGetRteRelid(node); + return relationId != InvalidOid && IsCitusTable(relationId) && + HasDistributionKey(relationId); +} + + /* * FullCompositeFieldList gets a composite field list, and checks if all fields * of composite type are used in the list. diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 97c2cecf6..47d11172f 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -258,6 +258,22 @@ CreateModifyPlan(Query *originalQuery, Query *query, } +/* + * WrapRouterErrorForNullDistKeyTable wraps given planning error with a + * generic error message if given query references a distributed table + * that doesn't have a distribution key. + */ +void +WrapRouterErrorForNullDistKeyTable(DeferredErrorMessage *planningError) +{ + planningError->detail = planningError->message; + planningError->message = pstrdup("queries that reference a distributed " + "table without a shard key can only " + "reference colocated distributed " + "tables or reference tables"); +} + + /* * CreateSingleTaskRouterSelectPlan creates a physical plan for given SELECT query. * The returned plan is a router task that returns query results from a single worker. @@ -1870,6 +1886,11 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon */ if (IsMergeQuery(originalQuery)) { + if (ContainsNullDistKeyTable(originalQuery)) + { + WrapRouterErrorForNullDistKeyTable(*planningError); + } + RaiseDeferredError(*planningError, ERROR); } else @@ -3855,7 +3876,8 @@ ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree) CitusTableCacheEntry *modificationTableCacheEntry = GetCitusTableCacheEntry(distributedTableId); - if (!HasDistributionKeyCacheEntry(modificationTableCacheEntry)) + if (!IsCitusTableTypeCacheEntry(modificationTableCacheEntry, + DISTRIBUTED_TABLE)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "cannot router plan modification of a non-distributed table", diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 412859449..753504131 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -147,9 +147,19 @@ typedef struct RTEListProperties bool hasReferenceTable; bool hasCitusLocalTable; - /* includes hash, append and range partitioned tables */ + /* includes hash, null dist key, append and range partitioned tables */ bool hasDistributedTable; + /* + * Effectively, hasDistributedTable is equal to + * "hasDistTableWithShardKey || hasDistTableWithoutShardKey". + * + * We provide below two for the callers that want to know what kind of + * distributed tables that given query has references to. + */ + bool hasDistTableWithShardKey; + bool hasDistTableWithoutShardKey; + /* union of hasReferenceTable, hasCitusLocalTable and hasDistributedTable */ bool hasCitusTable; @@ -243,6 +253,7 @@ extern int32 BlessRecordExpression(Expr *expr); extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan); extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan, struct DistributedPlan *distributedPlan); +extern bool ContainsNullDistKeyTable(Query *query); extern RTEListProperties * GetRTEListPropertiesForQuery(Query *query); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 189170358..de4901ea2 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -200,6 +200,7 @@ extern bool IsCitusTableRTE(Node *node); extern bool IsDistributedOrReferenceTableRTE(Node *node); extern bool IsDistributedTableRTE(Node *node); extern bool IsReferenceTableRTE(Node *node); +extern bool IsTableWithDistKeyRTE(Node *node); extern bool IsCitusExtraDataContainerRelation(RangeTblEntry *rte); extern bool ContainsReadIntermediateResultFunction(Node *node); extern bool ContainsReadIntermediateResultArrayFunction(Node *node); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index a255fd520..40d92fead 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -36,6 +36,7 @@ extern DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query, extern DistributedPlan * CreateModifyPlan(Query *originalQuery, Query *query, PlannerRestrictionContext * plannerRestrictionContext); +extern void WrapRouterErrorForNullDistKeyTable(DeferredErrorMessage *planningError); extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext, diff --git a/src/test/regress/citus_tests/config.py b/src/test/regress/citus_tests/config.py index cd1be125b..69cc5599c 100644 --- a/src/test/regress/citus_tests/config.py +++ b/src/test/regress/citus_tests/config.py @@ -237,12 +237,6 @@ class AllNullDistKeyDefaultConfig(CitusDefaultClusterConfig): # group 8 "function_create", "functions", - # group 9 - "merge_arbitrary_create", - "merge_arbitrary", - # group 10 - "arbitrary_configs_router_create", - "arbitrary_configs_router", # # ii) Skip the following test as it requires support for create_distributed_function. "nested_execution", diff --git a/src/test/regress/expected/create_null_dist_key.out b/src/test/regress/expected/create_null_dist_key.out index af6e66f62..43120a454 100644 --- a/src/test/regress/expected/create_null_dist_key.out +++ b/src/test/regress/expected/create_null_dist_key.out @@ -1803,6 +1803,7 @@ ALTER TABLE trigger_table_3 ENABLE TRIGGER ALL; -- try a few simple queries at least to make sure that we don't crash BEGIN; INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c2_t1; +ERROR: cannot select from a non-colocated distributed table when inserting into a distributed table that does not have a shard key ROLLBACK; DROP TRIGGER IF EXISTS trigger_1 ON trigger_table_1; DROP TRIGGER trigger_2 ON trigger_table_2 CASCADE; diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 2196d966d..fd82efa8c 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -3228,6 +3228,153 @@ WHEN NOT MATCHED THEN INSERT VALUES(dist_source.id, dist_source.val); ERROR: For MERGE command, all the distributed tables must be colocated, for append/range distribution, colocation is not supported HINT: Consider using hash distribution instead +-- test merge with null shard key tables +CREATE SCHEMA query_null_dist_key; +SET search_path TO query_null_dist_key; +SET client_min_messages TO DEBUG2; +CREATE TABLE nullkey_c1_t1(a int, b int); +CREATE TABLE nullkey_c1_t2(a int, b int); +SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE nullkey_c2_t1(a int, b int); +CREATE TABLE nullkey_c2_t2(a int, b int); +SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE reference_table(a int, b int); +SELECT create_reference_table('reference_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CREATE TABLE distributed_table(a int, b int); +SELECT create_distributed_table('distributed_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CREATE TABLE citus_local_table(a int, b int); +SELECT citus_add_local_table_to_metadata('citus_local_table'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CREATE TABLE postgres_local_table(a int, b int); +INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; +-- with a colocated table +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b; +DEBUG: +DEBUG: Creating MERGE router plan +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN DELETE; +DEBUG: +DEBUG: Creating MERGE router plan +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); +DEBUG: +DEBUG: Creating MERGE router plan +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); +DEBUG: +DEBUG: Creating MERGE router plan +-- with non-colocated null-dist-key table +MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b; +ERROR: For MERGE command, all the distributed tables must be colocated +MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b); +ERROR: For MERGE command, all the distributed tables must be colocated +-- with a distributed table +MERGE INTO nullkey_c1_t1 USING distributed_table ON (nullkey_c1_t1.a = distributed_table.a) +WHEN MATCHED THEN UPDATE SET b = distributed_table.b +WHEN NOT MATCHED THEN INSERT VALUES (distributed_table.a, distributed_table.b); +ERROR: For MERGE command, all the distributed tables must be colocated +MERGE INTO distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = distributed_table.a) +WHEN MATCHED THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); +ERROR: For MERGE command, all the distributed tables must be colocated +-- with a reference table +MERGE INTO nullkey_c1_t1 USING reference_table ON (nullkey_c1_t1.a = reference_table.a) +WHEN MATCHED THEN UPDATE SET b = reference_table.b; +ERROR: MERGE command is not supported on reference tables yet +MERGE INTO reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = reference_table.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); +ERROR: MERGE command is not supported on reference tables yet +-- with a citus local table +MERGE INTO nullkey_c1_t1 USING citus_local_table ON (nullkey_c1_t1.a = citus_local_table.a) +WHEN MATCHED THEN UPDATE SET b = citus_local_table.b; +ERROR: MERGE command is not supported with combination of distributed/local tables yet +MERGE INTO citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = citus_local_table.a) +WHEN MATCHED THEN DELETE; +ERROR: MERGE command is not supported with combination of distributed/local tables yet +-- with a postgres table +MERGE INTO nullkey_c1_t1 USING postgres_local_table ON (nullkey_c1_t1.a = postgres_local_table.a) +WHEN MATCHED THEN UPDATE SET b = postgres_local_table.b; +ERROR: MERGE command is not supported with combination of distributed/local tables yet +MERGE INTO postgres_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = postgres_local_table.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); +ERROR: MERGE command is not supported with combination of distributed/local tables yet +-- using ctes +WITH cte AS ( + SELECT * FROM nullkey_c1_t1 +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; +DEBUG: +DEBUG: Creating MERGE router plan +WITH cte AS ( + SELECT * FROM distributed_table +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; +ERROR: For MERGE command, all the distributed tables must be colocated +WITH cte AS materialized ( + SELECT * FROM distributed_table +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; +ERROR: For MERGE command, all the distributed tables must be colocated +SET client_min_messages TO WARNING; +DROP SCHEMA query_null_dist_key CASCADE; +RESET client_min_messages; +SET search_path TO merge_schema; DROP SERVER foreign_server CASCADE; NOTICE: drop cascades to 3 other objects DETAIL: drop cascades to user mapping for postgres on server foreign_server diff --git a/src/test/regress/expected/query_null_dist_key.out b/src/test/regress/expected/query_null_dist_key.out new file mode 100644 index 000000000..09413a3ea --- /dev/null +++ b/src/test/regress/expected/query_null_dist_key.out @@ -0,0 +1,1796 @@ +CREATE SCHEMA query_null_dist_key; +SET search_path TO query_null_dist_key; +SET citus.next_shard_id TO 1620000; +SET citus.shard_count TO 32; +SET client_min_messages TO WARNING; +SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SET client_min_messages TO NOTICE; +CREATE TABLE nullkey_c1_t1(a int, b int); +CREATE TABLE nullkey_c1_t2(a int, b int); +SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i; +INSERT INTO nullkey_c1_t2 SELECT i, i FROM generate_series(2, 7) i; +CREATE TABLE nullkey_c2_t1(a int, b int); +CREATE TABLE nullkey_c2_t2(a int, b int); +SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i; +INSERT INTO nullkey_c2_t2 SELECT i, i FROM generate_series(1, 8) i; +CREATE TABLE nullkey_c3_t1(a int, b int); +SELECT create_distributed_table('nullkey_c3_t1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO nullkey_c3_t1 SELECT i, i FROM generate_series(1, 8) i; +CREATE TABLE reference_table(a int, b int); +SELECT create_reference_table('reference_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; +CREATE TABLE distributed_table(a int, b int); +SELECT create_distributed_table('distributed_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i; +CREATE TABLE citus_local_table(a int, b int); +SELECT citus_add_local_table_to_metadata('citus_local_table'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; +CREATE TABLE postgres_local_table(a int, b int); +INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; +CREATE TABLE articles_hash ( + id bigint NOT NULL, + author_id bigint NOT NULL, + title varchar(20) NOT NULL, + word_count integer +); +INSERT INTO articles_hash VALUES ( 4, 4, 'altdorfer', 14551),( 5, 5, 'aruru', 11389), + (13, 3, 'aseyev', 2255),(15, 5, 'adversa', 3164), + (18, 8, 'assembly', 911),(19, 9, 'aubergiste', 4981), + (28, 8, 'aerophyte', 5454),(29, 9, 'amateur', 9524), + (42, 2, 'ausable', 15885),(43, 3, 'affixal', 12723), + (49, 9, 'anyone', 2681),(50, 10, 'anjanette', 19519); +SELECT create_distributed_table('articles_hash', null, colocate_with=>'none'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$query_null_dist_key.articles_hash$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE raw_events_first (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1)); +SELECT create_distributed_table('raw_events_first', null, colocate_with=>'none', distribution_type=>null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE raw_events_second (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1)); +SELECT create_distributed_table('raw_events_second', null, colocate_with=>'raw_events_first', distribution_type=>null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE agg_events (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp, UNIQUE(user_id, value_1_agg)); +SELECT create_distributed_table('agg_events', null, colocate_with=>'raw_events_first', distribution_type=>null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE users_ref_table (user_id int); +SELECT create_reference_table('users_ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO raw_events_first VALUES (1, '1970-01-01', 10, 100, 1000.1, 10000), (3, '1971-01-01', 30, 300, 3000.1, 30000), + (5, '1972-01-01', 50, 500, 5000.1, 50000), (2, '1973-01-01', 20, 200, 2000.1, 20000), + (4, '1974-01-01', 40, 400, 4000.1, 40000), (6, '1975-01-01', 60, 600, 6000.1, 60000); +CREATE TABLE modify_fast_path(key int, value_1 int, value_2 text); +SELECT create_distributed_table('modify_fast_path', null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE modify_fast_path_reference(key int, value_1 int, value_2 text); +SELECT create_reference_table('modify_fast_path_reference'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE bigserial_test (x int, y int, z bigserial); +SELECT create_distributed_table('bigserial_test', null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE append_table (text_col text, a int); +SELECT create_distributed_table('append_table', 'a', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT master_create_empty_shard('append_table') AS shardid1 \gset +SELECT master_create_empty_shard('append_table') AS shardid2 \gset +SELECT master_create_empty_shard('append_table') AS shardid3 \gset +COPY append_table (text_col, a) FROM STDIN WITH (format 'csv', append_to_shard :shardid1); +COPY append_table (text_col, a) FROM STDIN WITH (format 'csv', append_to_shard :shardid2); +CREATE TABLE range_table(a int, b int); +SELECT create_distributed_table('range_table', 'a', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('range_table', '{"0","25"}','{"24","49"}'); +INSERT INTO range_table VALUES (0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 50); +SET client_min_messages to DEBUG2; +-- simple insert +INSERT INTO nullkey_c1_t1 VALUES (1,2), (2,2), (3,4); +DEBUG: Creating router plan +INSERT INTO nullkey_c1_t2 VALUES (1,3), (3,4), (5,1), (6,2); +DEBUG: Creating router plan +INSERT INTO nullkey_c2_t1 VALUES (1,0), (2,5), (4,3), (5,2); +DEBUG: Creating router plan +INSERT INTO nullkey_c2_t2 VALUES (2,4), (3,2), (5,2), (7,4); +DEBUG: Creating router plan +-- simple select +SELECT * FROM nullkey_c1_t1 ORDER BY 1,2; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + a | b +--------------------------------------------------------------------- + 1 | 1 + 1 | 2 + 2 | 2 + 2 | 2 + 3 | 3 + 3 | 4 + 4 | 4 + 5 | 5 + 6 | 6 + 7 | 7 + 8 | 8 +(11 rows) + +-- for update / share +SELECT * FROM modify_fast_path WHERE key = 1 FOR UPDATE; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + key | value_1 | value_2 +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM modify_fast_path WHERE key = 1 FOR SHARE; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + key | value_1 | value_2 +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM modify_fast_path FOR UPDATE; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + key | value_1 | value_2 +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM modify_fast_path FOR SHARE; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + key | value_1 | value_2 +--------------------------------------------------------------------- +(0 rows) + +-- cartesian product with different table types +-- with other table types +SELECT COUNT(*) FROM distributed_table d1, nullkey_c1_t1; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM reference_table d1, nullkey_c1_t1; +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 66 +(1 row) + +SELECT COUNT(*) FROM citus_local_table d1, nullkey_c1_t1; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM postgres_local_table d1, nullkey_c1_t1; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +-- with a colocated null dist key table +SELECT COUNT(*) FROM nullkey_c1_t1 d1, nullkey_c1_t2; +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 110 +(1 row) + +-- with a non-colocated null dist key table +SELECT COUNT(*) FROM nullkey_c1_t1 d1, nullkey_c2_t1; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +-- First, show that nullkey_c1_t1 and nullkey_c3_t1 are not colocated. +SELECT + (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'query_null_dist_key.nullkey_c1_t1'::regclass) != + (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'query_null_dist_key.nullkey_c3_t1'::regclass); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- Now verify that we can join them via router planner because it doesn't care +-- about whether two tables are colocated or not but physical location of shards +-- when citus.enable_non_colocated_router_query_pushdown is set to on. +SET citus.enable_non_colocated_router_query_pushdown TO ON; +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN nullkey_c3_t1 USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 11 +(1 row) + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN nullkey_c3_t1 USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: router planner does not support queries that reference non-colocated distributed tables +RESET citus.enable_non_colocated_router_query_pushdown; +-- colocated join between null dist key tables +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 14 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN nullkey_c1_t2 USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 15 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 15 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t2 t2 WHERE t2.b > t1.a +) q USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 11 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t2 t2 WHERE t2.b > t1.a +) q USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 4 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t2 t2 WHERE t2.b > t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 9 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t2 t2 WHERE t2.b = t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t2 t2 WHERE t2.b > t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 7 +(1 row) + +-- non-colocated inner joins between null dist key tables +SELECT * FROM nullkey_c1_t1 JOIN nullkey_c2_t1 USING(a) ORDER BY 1,2,3; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +-- non-colocated outer joins between null dist key tables +SELECT * FROM nullkey_c1_t1 LEFT JOIN nullkey_c2_t2 USING(a) ORDER BY 1,2,3 LIMIT 4; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT * FROM nullkey_c1_t1 FULL JOIN nullkey_c2_t2 USING(a) ORDER BY 1,2,3 LIMIT 4; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT * FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +) q USING(a) ORDER BY 1,2,3 OFFSET 3 LIMIT 4; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c2_t2 t2 WHERE t2.b = t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +-- join with a reference table +SELECT COUNT(*) FROM nullkey_c1_t1, reference_table WHERE nullkey_c1_t1.a = reference_table.a; +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 8 +(1 row) + +WITH cte_1 AS + (SELECT * FROM nullkey_c1_t1, reference_table WHERE nullkey_c1_t1.a = reference_table.a ORDER BY 1,2,3,4 FOR UPDATE) +SELECT COUNT(*) FROM cte_1; +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 8 +(1 row) + +-- join with postgres / citus local tables +SELECT * FROM nullkey_c1_t1 JOIN postgres_local_table USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT * FROM nullkey_c1_t1 JOIN citus_local_table USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +-- join with a distributed table +SELECT * FROM distributed_table d1 JOIN nullkey_c1_t1 USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM distributed_table t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM distributed_table t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +-- outer joins with different table types +SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN reference_table USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 11 +(1 row) + +SELECT COUNT(*) FROM reference_table LEFT JOIN nullkey_c1_t1 USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 9 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN citus_local_table USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM citus_local_table LEFT JOIN nullkey_c1_t1 USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN postgres_local_table USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM postgres_local_table LEFT JOIN nullkey_c1_t1 USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN citus_local_table USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN postgres_local_table USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN reference_table USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 12 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN append_table USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner does not support append-partitioned tables. +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SET citus.enable_non_colocated_router_query_pushdown TO ON; +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a) WHERE range_table.a = 20; +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 0 +(1 row) + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a) WHERE range_table.a = 20; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: router planner does not support queries that reference non-colocated distributed tables +RESET citus.enable_non_colocated_router_query_pushdown; +-- lateral / semi / anti joins with different table types +-- with a reference table +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM reference_table t2 WHERE t2.b > t1.a +) q USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 11 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM reference_table t2 WHERE t2.b > t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 7 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE NOT EXISTS ( + SELECT * FROM reference_table t2 WHERE t2.b > t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 4 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM reference_table t2 WHERE t2.b = t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM reference_table t2 WHERE t2.b > t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 9 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM reference_table t2 WHERE t2.b > t1.a +) q USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT COUNT(*) FROM reference_table t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 6 +(1 row) + +SELECT COUNT(*) FROM reference_table t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 6 +(1 row) + +SELECT COUNT(*) FROM reference_table t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT COUNT(*) FROM reference_table t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 4 +(1 row) + +SELECT COUNT(*) FROM reference_table t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 2 +(1 row) + +-- with a distributed table +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM distributed_table t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM distributed_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE NOT EXISTS ( + SELECT * FROM distributed_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM distributed_table t2 WHERE t2.b = t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM distributed_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM distributed_table t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM distributed_table t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM distributed_table t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM distributed_table t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +-- with postgres / citus local tables +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE NOT EXISTS ( + SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM citus_local_table t2 WHERE t2.b = t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM citus_local_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM citus_local_table t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM postgres_local_table t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM citus_local_table t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM citus_local_table t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM citus_local_table t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM citus_local_table t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM postgres_local_table t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM postgres_local_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE NOT EXISTS ( + SELECT * FROM postgres_local_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM postgres_local_table t2 WHERE t2.b = t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM postgres_local_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM postgres_local_table t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM postgres_local_table t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM postgres_local_table t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM postgres_local_table t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM postgres_local_table t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +-- insert .. select +-- between two colocated null dist key tables +-- The target list of "distributed statement"s that we send to workers +-- differ(*) in Postgres versions < 15. For this reason, we temporarily +-- disable debug messages here and run the EXPLAIN'ed version of the +-- command. +-- +-- (*): < SELECT a, b > vs < SELECT table_name.a, table_name.b > +SET client_min_messages TO WARNING; +EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE, VERBOSE FALSE) +INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c1_t2; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Insert on nullkey_c1_t1_1620000 citus_table_alias (actual rows=0 loops=1) + -> Seq Scan on nullkey_c1_t2_1620001 nullkey_c1_t2 (actual rows=10 loops=1) +(7 rows) + +SET client_min_messages TO DEBUG2; +-- between two non-colocated null dist key tables +INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c2_t1; +ERROR: cannot select from a non-colocated distributed table when inserting into a distributed table that does not have a shard key +-- between a null dist key table and a table of different type +INSERT INTO nullkey_c1_t1 SELECT * FROM reference_table; +ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key +INSERT INTO nullkey_c1_t1 SELECT * FROM distributed_table; +ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key +INSERT INTO nullkey_c1_t1 SELECT * FROM citus_local_table; +ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key +INSERT INTO nullkey_c1_t1 SELECT * FROM postgres_local_table; +ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key +INSERT INTO reference_table SELECT * FROM nullkey_c1_t1; +ERROR: cannot select from a distributed table that does not have a shard key when inserting into a different table type +INSERT INTO distributed_table SELECT * FROM nullkey_c1_t1; +ERROR: cannot select from a distributed table that does not have a shard key when inserting into a different table type +INSERT INTO citus_local_table SELECT * FROM nullkey_c1_t1; +ERROR: cannot select from a distributed table that does not have a shard key when inserting into a different table type +INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1; +ERROR: cannot select from a distributed table that does not have a shard key when inserting into a local table +-- test subquery +SELECT count(*) FROM +( + SELECT * FROM (SELECT * FROM nullkey_c1_t2) as subquery_inner +) AS subquery_top; +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 10 +(1 row) + +-- test cte inlining +WITH cte_nullkey_c1_t1 AS (SELECT * FROM nullkey_c1_t1), + cte_postgres_local_table AS (SELECT * FROM postgres_local_table), + cte_distributed_table AS (SELECT * FROM distributed_table) +SELECT COUNT(*) FROM cte_distributed_table, cte_nullkey_c1_t1, cte_postgres_local_table +WHERE cte_nullkey_c1_t1.a > 3 AND cte_distributed_table.a < 5; +DEBUG: CTE cte_nullkey_c1_t1 is going to be inlined via distributed planning +DEBUG: CTE cte_postgres_local_table is going to be inlined via distributed planning +DEBUG: CTE cte_distributed_table is going to be inlined via distributed planning +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +-- test recursive ctes +WITH level_0 AS ( + WITH level_1 AS ( + WITH RECURSIVE level_2_recursive(x) AS ( + VALUES (1) + UNION ALL + SELECT a + 1 FROM nullkey_c1_t1 JOIN level_2_recursive ON (a = x) WHERE a < 100 + ) + SELECT * FROM level_2_recursive RIGHT JOIN reference_table ON (level_2_recursive.x = reference_table.a) + ) + SELECT * FROM level_1 +) +SELECT COUNT(*) FROM level_0; +DEBUG: CTE level_0 is going to be inlined via distributed planning +DEBUG: CTE level_1 is going to be inlined via distributed planning +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 122 +(1 row) + +WITH level_0 AS ( + WITH level_1 AS ( + WITH RECURSIVE level_2_recursive(x) AS ( + VALUES (1) + UNION ALL + SELECT a + 1 FROM nullkey_c1_t1 JOIN level_2_recursive ON (a = x) WHERE a < 100 + ) + SELECT * FROM level_2_recursive JOIN distributed_table ON (level_2_recursive.x = distributed_table.a) + ) + SELECT * FROM level_1 +) +SELECT COUNT(*) FROM level_0; +DEBUG: CTE level_0 is going to be inlined via distributed planning +DEBUG: CTE level_1 is going to be inlined via distributed planning +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +-- grouping set +SELECT + id, substring(title, 2, 1) AS subtitle, count(*) + FROM articles_hash + WHERE author_id = 1 or author_id = 2 + GROUP BY GROUPING SETS ((id),(subtitle)) + ORDER BY id, subtitle; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + id | subtitle | count +--------------------------------------------------------------------- + 42 | | 1 + | u | 1 +(2 rows) + +-- subquery in SELECT clause +SELECT a.title AS name, (SELECT a2.id FROM articles_hash a2 WHERE a.id = a2.id LIMIT 1) + AS special_price FROM articles_hash a +ORDER BY 1,2; +DEBUG: Creating router plan + name | special_price +--------------------------------------------------------------------- + adversa | 15 + aerophyte | 28 + affixal | 43 + altdorfer | 4 + amateur | 29 + anjanette | 50 + anyone | 49 + aruru | 5 + aseyev | 13 + assembly | 18 + aubergiste | 19 + ausable | 42 +(12 rows) + +-- test prepared statements +-- prepare queries can be router plannable +PREPARE author_1_articles as + SELECT * + FROM articles_hash + WHERE author_id = 1; +EXECUTE author_1_articles; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_1_articles; + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_1_articles; + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_1_articles; + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_1_articles; + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_1_articles; + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +-- parametric prepare queries can be router plannable +PREPARE author_articles(int) as + SELECT * + FROM articles_hash + WHERE author_id = $1; +EXECUTE author_articles(1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(NULL); + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(NULL); + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(NULL); + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(NULL); + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(NULL); + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(NULL); + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(NULL); + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +PREPARE author_articles_update(int) AS + UPDATE articles_hash SET title = 'test' WHERE author_id = $1; +EXECUTE author_articles_update(NULL); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE author_articles_update(NULL); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE author_articles_update(NULL); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE author_articles_update(NULL); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE author_articles_update(NULL); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE author_articles_update(NULL); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE author_articles_update(NULL); +-- More tests with insert .. select. +-- +-- The target list of "distributed statement"s that we send to workers +-- might differ(*) in Postgres versions < 15 and they are reported when +-- "log level >= DEBUG2". For this reason, we set log level to DEBUG1 to +-- avoid reporting them. +-- +-- DEBUG1 still allows reporting the reason why given INSERT .. SELECT +-- query is not distributed / requires pull-to-coordinator. +SET client_min_messages TO DEBUG1; +INSERT INTO bigserial_test (x, y) SELECT x, y FROM bigserial_test; +DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Sequences cannot be used in router queries +INSERT INTO agg_events + (user_id) +SELECT f2.id FROM +(SELECT + id +FROM (SELECT users_ref_table.user_id AS id + FROM raw_events_first, + users_ref_table + WHERE raw_events_first.user_id = users_ref_table.user_id ) AS foo) as f +INNER JOIN +(SELECT v4, + v1, + id +FROM (SELECT SUM(raw_events_second.value_4) AS v4, + SUM(raw_events_first.value_1) AS v1, + raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id + GROUP BY raw_events_second.user_id + HAVING SUM(raw_events_second.value_4) > 1000) AS foo2 ) as f2 +ON (f.id = f2.id) +WHERE f.id IN (SELECT user_id + FROM raw_events_second); +ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key +-- upsert with returning +INSERT INTO agg_events AS ae + ( + user_id, + value_1_agg, + agg_time + ) +SELECT user_id, + value_1, + time +FROM raw_events_first +ON conflict (user_id, value_1_agg) +DO UPDATE + SET agg_time = EXCLUDED.agg_time + WHERE ae.agg_time < EXCLUDED.agg_time +RETURNING user_id, value_1_agg; + user_id | value_1_agg +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 + 4 | 40 + 5 | 50 + 6 | 60 +(6 rows) + +-- using a left join +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id + WHERE raw_events_second.user_id = 10 OR raw_events_second.user_id = 11; +-- using a full join +INSERT INTO agg_events (user_id, value_1_agg) +SELECT t1.user_id AS col1, + t2.user_id AS col2 +FROM raw_events_first t1 + FULL JOIN raw_events_second t2 + ON t1.user_id = t2.user_id; +-- using semi join +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT raw_events_second.user_id + FROM raw_events_second, raw_events_first + WHERE raw_events_second.user_id = raw_events_first.user_id AND raw_events_first.user_id = 200); +-- using lateral join +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE NOT EXISTS (SELECT 1 + FROM raw_events_second + WHERE raw_events_second.user_id =raw_events_first.user_id); +-- using inner join +INSERT INTO agg_events (user_id) +SELECT raw_events_first.user_id +FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 +WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4); +-- We could relax distributed insert .. select checks to allow pushing +-- down more clauses down to the worker nodes when inserting into a single +-- shard by selecting from a colocated one. We might want to do something +-- like https://github.com/citusdata/citus/pull/6772. +-- +-- e.g., insert into null_shard_key_1/citus_local/reference +-- select * from null_shard_key_1/citus_local/reference limit 1 +-- +-- Below "limit / offset clause" test and some others are examples of this. +-- limit / offset clause +INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first LIMIT 1; +DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first OFFSET 1; +DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- using a materialized cte +WITH cte AS MATERIALIZED + (SELECT max(value_1)+1 as v1_agg, user_id FROM raw_events_first GROUP BY user_id) +INSERT INTO agg_events (value_1_agg, user_id) +SELECT v1_agg, user_id FROM cte; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO raw_events_second + WITH cte AS MATERIALIZED (SELECT * FROM raw_events_first) + SELECT user_id * 1000, time, value_1, value_2, value_3, value_4 FROM cte; +-- using a regular cte +WITH cte AS (SELECT * FROM raw_events_first) +INSERT INTO raw_events_second + SELECT user_id * 7000, time, value_1, value_2, value_3, value_4 FROM cte; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: CTE cte is going to be inlined via distributed planning +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO raw_events_second + WITH cte AS (SELECT * FROM raw_events_first) + SELECT * FROM cte; +DEBUG: CTE cte is going to be inlined via distributed planning +INSERT INTO agg_events + WITH sub_cte AS (SELECT 1) + SELECT + raw_events_first.user_id, (SELECT * FROM sub_cte) + FROM + raw_events_first; +DEBUG: CTE sub_cte is going to be inlined via distributed planning +DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- we still support complex joins via INSERT's cte list .. +WITH cte AS ( + SELECT reference_table.a AS a, 1 AS b + FROM distributed_table RIGHT JOIN reference_table USING (a) +) +INSERT INTO raw_events_second (user_id, value_1) + SELECT (a+5)*-1, b FROM cte; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: CTE cte is going to be inlined via distributed planning +DEBUG: recursively planning left side of the right join since the outer side is a recurring rel +DEBUG: recursively planning distributed relation "distributed_table" since it is part of a distributed join node that is outer joined with a recurring rel +DEBUG: Wrapping relation "distributed_table" to a subquery +DEBUG: generating subplan XXX_1 for subquery SELECT a FROM query_null_dist_key.distributed_table WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT "?column?" AS user_id, b AS value_1 FROM (SELECT ((cte.a OPERATOR(pg_catalog.+) 5) OPERATOR(pg_catalog.*) '-1'::integer), cte.b FROM (SELECT reference_table.a, 1 AS b FROM ((SELECT distributed_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) distributed_table_1) distributed_table RIGHT JOIN query_null_dist_key.reference_table USING (a))) cte) citus_insert_select_subquery("?column?", b) +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- .. but can't do so via via SELECT's cte list +INSERT INTO raw_events_second (user_id, value_1) +WITH cte AS ( + SELECT reference_table.a AS a, 1 AS b + FROM distributed_table RIGHT JOIN reference_table USING (a) +) + SELECT (a+5)*-1, b FROM cte; +DEBUG: CTE cte is going to be inlined via distributed planning +ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key +-- using set operations +INSERT INTO + raw_events_first(user_id) + (SELECT user_id FROM raw_events_first) INTERSECT + (SELECT user_id FROM raw_events_first); +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- group by clause inside subquery +INSERT INTO agg_events + (user_id) +SELECT f2.id FROM +(SELECT + id +FROM (SELECT raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id ) AS foo) as f +INNER JOIN +(SELECT v4, + v1, + id +FROM (SELECT SUM(raw_events_second.value_4) AS v4, + SUM(raw_events_first.value_1) AS v1, + raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id + GROUP BY raw_events_second.user_id + HAVING SUM(raw_events_second.value_4) > 1000) AS foo2 ) as f2 +ON (f.id = f2.id) +WHERE f.id IN (SELECT user_id + FROM raw_events_second); +-- group by clause inside lateral subquery +INSERT INTO agg_events (user_id, value_4_agg) +SELECT + averages.user_id, avg(averages.value_4) +FROM + (SELECT + t1.user_id + FROM + raw_events_second t1 JOIN raw_events_second t2 on (t1.user_id = t2.user_id) + ) reference_ids + JOIN LATERAL + (SELECT + user_id, value_4 + FROM + raw_events_first) as averages ON averages.value_4 = reference_ids.user_id + GROUP BY averages.user_id; +-- using aggregates +INSERT INTO agg_events + (value_3_agg, + value_4_agg, + value_1_agg, + value_2_agg, + user_id) +SELECT SUM(value_3), + Count(value_4), + user_id, + SUM(value_1), + Avg(value_2) +FROM raw_events_first +GROUP BY user_id; +-- using generate_series +INSERT INTO raw_events_first (user_id, value_1, value_2) +SELECT s, s, s FROM generate_series(1, 5) s; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CREATE SEQUENCE insert_select_test_seq; +-- nextval() expression in select's targetlist +INSERT INTO raw_events_first (user_id, value_1, value_2) +SELECT s, nextval('insert_select_test_seq'), (random()*10)::int +FROM generate_series(100, 105) s; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- non-immutable function +INSERT INTO modify_fast_path (key, value_1) VALUES (2,1) RETURNING value_1, random() * key; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: non-IMMUTABLE functions are not allowed in the RETURNING clause +SET client_min_messages TO DEBUG2; +-- update / delete +UPDATE nullkey_c1_t1 SET a = 1 WHERE b = 5; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +UPDATE nullkey_c1_t1 SET a = 1 WHERE a = 5; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +UPDATE nullkey_c1_t1 SET a = random(); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: functions used in UPDATE queries on distributed tables must not be VOLATILE +UPDATE nullkey_c1_t1 SET a = 1 WHERE a = random(); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +DELETE FROM nullkey_c1_t1 WHERE b = 5; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DELETE FROM nullkey_c1_t1 WHERE a = random(); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +-- simple update queries between different table types / colocated tables +UPDATE nullkey_c1_t1 SET b = 5 FROM nullkey_c1_t2 WHERE nullkey_c1_t1.b = nullkey_c1_t2.b; +DEBUG: Creating router plan +UPDATE nullkey_c1_t1 SET b = 5 FROM nullkey_c2_t1 WHERE nullkey_c1_t1.b = nullkey_c2_t1.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +UPDATE nullkey_c1_t1 SET b = 5 FROM reference_table WHERE nullkey_c1_t1.b = reference_table.b; +DEBUG: Creating router plan +UPDATE nullkey_c1_t1 SET b = 5 FROM distributed_table WHERE nullkey_c1_t1.b = distributed_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +UPDATE nullkey_c1_t1 SET b = 5 FROM distributed_table WHERE nullkey_c1_t1.b = distributed_table.a; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +UPDATE nullkey_c1_t1 SET b = 5 FROM citus_local_table WHERE nullkey_c1_t1.b = citus_local_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: local table citus_local_table cannot be joined with these distributed tables +UPDATE nullkey_c1_t1 SET b = 5 FROM postgres_local_table WHERE nullkey_c1_t1.b = postgres_local_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: relation postgres_local_table is not distributed +UPDATE reference_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = reference_table.b; +ERROR: cannot perform select on a distributed table and modify a reference table +UPDATE distributed_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = distributed_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +UPDATE distributed_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = distributed_table.a; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +UPDATE citus_local_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = citus_local_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: local table citus_local_table cannot be joined with these distributed tables +UPDATE postgres_local_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = postgres_local_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: relation postgres_local_table is not distributed +-- simple delete queries between different table types / colocated tables +DELETE FROM nullkey_c1_t1 USING nullkey_c1_t2 WHERE nullkey_c1_t1.b = nullkey_c1_t2.b; +DEBUG: Creating router plan +DELETE FROM nullkey_c1_t1 USING nullkey_c2_t1 WHERE nullkey_c1_t1.b = nullkey_c2_t1.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +DELETE FROM nullkey_c1_t1 USING reference_table WHERE nullkey_c1_t1.b = reference_table.b; +DEBUG: Creating router plan +DELETE FROM nullkey_c1_t1 USING distributed_table WHERE nullkey_c1_t1.b = distributed_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +DELETE FROM nullkey_c1_t1 USING distributed_table WHERE nullkey_c1_t1.b = distributed_table.a; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +DELETE FROM nullkey_c1_t1 USING citus_local_table WHERE nullkey_c1_t1.b = citus_local_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: local table citus_local_table cannot be joined with these distributed tables +DELETE FROM nullkey_c1_t1 USING postgres_local_table WHERE nullkey_c1_t1.b = postgres_local_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: relation postgres_local_table is not distributed +DELETE FROM reference_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = reference_table.b; +ERROR: cannot perform select on a distributed table and modify a reference table +DELETE FROM distributed_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = distributed_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +DELETE FROM distributed_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = distributed_table.a; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +DELETE FROM citus_local_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = citus_local_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: local table citus_local_table cannot be joined with these distributed tables +DELETE FROM postgres_local_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = postgres_local_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: relation postgres_local_table is not distributed +-- slightly more complex update queries +UPDATE nullkey_c1_t1 SET b = 5 WHERE nullkey_c1_t1.b IN (SELECT b FROM distributed_table); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +WITH cte AS materialized( + SELECT * FROM distributed_table +) +UPDATE nullkey_c1_t1 SET b = 5 FROM cte WHERE nullkey_c1_t1.b = cte.a; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +WITH cte AS ( + SELECT reference_table.a AS a, 1 AS b + FROM distributed_table RIGHT JOIN reference_table USING (a) +) +UPDATE nullkey_c1_t1 SET b = 5 WHERE nullkey_c1_t1.b IN (SELECT b FROM cte); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +UPDATE nullkey_c1_t1 SET b = 5 FROM reference_table WHERE EXISTS ( + SELECT 1 FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a) WHERE nullkey_c1_t1.b IS NULL +); +DEBUG: Creating router plan +UPDATE nullkey_c1_t1 tx SET b = ( + SELECT nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN nullkey_c1_t1 ON (nullkey_c1_t1.a != nullkey_c1_t2.a) WHERE nullkey_c1_t1.a = tx.a ORDER BY 1 LIMIT 1 +); +DEBUG: Creating router plan +UPDATE nullkey_c1_t1 tx SET b = t2.b FROM nullkey_c1_t1 t1 JOIN nullkey_c1_t2 t2 ON (t1.a = t2.a); +DEBUG: Creating router plan +WITH cte AS ( + SELECT * FROM nullkey_c1_t2 ORDER BY 1,2 LIMIT 10 +) +UPDATE nullkey_c1_t1 SET b = 5 WHERE nullkey_c1_t1.a IN (SELECT b FROM cte); +DEBUG: Creating router plan +UPDATE modify_fast_path SET value_1 = value_1 + 12 * value_1 WHERE key = 1; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +UPDATE modify_fast_path SET value_1 = NULL WHERE value_1 = 15 AND (key = 1 OR value_2 = 'citus'); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +UPDATE modify_fast_path SET value_1 = 5 WHERE key = 2 RETURNING value_1 * 15, value_1::numeric * 16; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + ?column? | ?column? +--------------------------------------------------------------------- +(0 rows) + +UPDATE modify_fast_path + SET value_1 = 1 + FROM modify_fast_path_reference + WHERE + modify_fast_path.key = modify_fast_path_reference.key AND + modify_fast_path.key = 1 AND + modify_fast_path_reference.key = 1; +DEBUG: Creating router plan +PREPARE p1 (int, int, int) AS + UPDATE modify_fast_path SET value_1 = value_1 + $1 WHERE key = $2 AND value_1 = $3; +EXECUTE p1(1,1,1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p1(2,2,2); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p1(3,3,3); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p1(4,4,4); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p1(5,5,5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p1(6,6,6); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p1(7,7,7); +PREPARE prepared_zero_shard_update(int) AS UPDATE modify_fast_path SET value_1 = 1 WHERE key = $1 AND false; +EXECUTE prepared_zero_shard_update(1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_update(2); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_update(3); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_update(4); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_update(5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_update(6); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_update(7); +-- slightly more complex delete queries +DELETE FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b IN (SELECT b FROM distributed_table); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +WITH cte AS materialized( + SELECT * FROM distributed_table +) +DELETE FROM nullkey_c1_t1 USING cte WHERE nullkey_c1_t1.b = cte.a; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +WITH cte AS ( + SELECT reference_table.a AS a, 1 AS b + FROM distributed_table RIGHT JOIN reference_table USING (a) +) +DELETE FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b IN (SELECT b FROM cte); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +DELETE FROM nullkey_c1_t1 USING reference_table WHERE EXISTS ( + SELECT 1 FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a) WHERE nullkey_c1_t1.b IS NULL +); +DEBUG: Creating router plan +DELETE FROM nullkey_c1_t1 tx USING nullkey_c1_t1 t1 JOIN nullkey_c1_t2 t2 ON (t1.a = t2.a); +DEBUG: Creating router plan +WITH cte AS ( + SELECT * FROM nullkey_c1_t2 ORDER BY 1,2 LIMIT 10 +) +DELETE FROM nullkey_c1_t1 WHERE nullkey_c1_t1.a IN (SELECT b FROM cte); +DEBUG: Creating router plan +DELETE FROM modify_fast_path WHERE value_1 = 15 AND (key = 1 OR value_2 = 'citus'); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DELETE FROM modify_fast_path WHERE key = 2 RETURNING value_1 * 15, value_1::numeric * 16; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + ?column? | ?column? +--------------------------------------------------------------------- +(0 rows) + +DELETE FROM modify_fast_path + USING modify_fast_path_reference + WHERE + modify_fast_path.key = modify_fast_path_reference.key AND + modify_fast_path.key = 1 AND + modify_fast_path_reference.key = 1; +DEBUG: Creating router plan +PREPARE p2 (int, int, int) AS + DELETE FROM modify_fast_path WHERE key = ($2)*$1 AND value_1 = $3; +EXECUTE p2(1,1,1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p2(2,2,2); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p2(3,3,3); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p2(4,4,4); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p2(5,5,5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p2(6,6,6); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p2(7,7,7); +PREPARE prepared_zero_shard_delete(int) AS DELETE FROM modify_fast_path WHERE key = $1 AND false; +EXECUTE prepared_zero_shard_delete(1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_delete(2); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_delete(3); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_delete(4); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_delete(5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_delete(6); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_delete(7); +-- test modifying ctes +WITH cte AS ( + UPDATE modify_fast_path SET value_1 = value_1 + 1 WHERE key = 1 RETURNING * +) +SELECT * FROM cte; +DEBUG: Creating router plan + key | value_1 | value_2 +--------------------------------------------------------------------- +(0 rows) + +WITH cte AS ( + DELETE FROM modify_fast_path WHERE key = 1 RETURNING * +) +SELECT * FROM modify_fast_path; +DEBUG: Creating router plan + key | value_1 | value_2 +--------------------------------------------------------------------- +(0 rows) + +WITH cte AS ( + DELETE FROM modify_fast_path WHERE key = 1 RETURNING * +) +SELECT * FROM modify_fast_path_reference WHERE key IN (SELECT key FROM cte); +DEBUG: Creating router plan + key | value_1 | value_2 +--------------------------------------------------------------------- +(0 rows) + +WITH cte AS ( + DELETE FROM reference_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE a IN (SELECT a FROM cte); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: cannot router plan modification of a non-distributed table +WITH cte AS ( + DELETE FROM nullkey_c1_t1 WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t2 WHERE a IN (SELECT a FROM cte); +DEBUG: Creating router plan + a | b +--------------------------------------------------------------------- +(0 rows) + +WITH cte AS ( + DELETE FROM nullkey_c1_t1 WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c2_t1 WHERE a IN (SELECT a FROM cte); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +WITH cte AS ( + DELETE FROM nullkey_c1_t1 WHERE a = 1 RETURNING * +) +SELECT * FROM distributed_table WHERE a IN (SELECT a FROM cte); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +-- Below two queries fail very late when +-- citus.enable_non_colocated_router_query_pushdown is set to on. +SET citus.enable_non_colocated_router_query_pushdown TO ON; +WITH cte AS ( + DELETE FROM distributed_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE a IN (SELECT a FROM cte); +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 1 +ERROR: relation "query_null_dist_key.nullkey_c1_t1_1620000" does not exist +CONTEXT: while executing command on localhost:xxxxx +WITH cte AS ( + DELETE FROM distributed_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE b IN (SELECT b FROM cte); +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 1 +ERROR: relation "query_null_dist_key.nullkey_c1_t1_1620000" does not exist +CONTEXT: while executing command on localhost:xxxxx +SET citus.enable_non_colocated_router_query_pushdown TO OFF; +WITH cte AS ( + DELETE FROM distributed_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE a IN (SELECT a FROM cte); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: router planner does not support queries that reference non-colocated distributed tables +WITH cte AS ( + DELETE FROM distributed_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE b IN (SELECT b FROM cte); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: router planner does not support queries that reference non-colocated distributed tables +RESET citus.enable_non_colocated_router_query_pushdown; +WITH cte AS ( + UPDATE modify_fast_path SET value_1 = value_1 + 1 WHERE key = 1 RETURNING * +) +UPDATE modify_fast_path SET value_1 = value_1 + 1 WHERE key = 1; +DEBUG: Creating router plan +WITH cte AS ( + DELETE FROM modify_fast_path WHERE key = 1 RETURNING * +) +DELETE FROM modify_fast_path WHERE key = 1; +DEBUG: Creating router plan +-- test window functions +SELECT + user_id, avg(avg(value_3)) OVER (PARTITION BY user_id, MIN(value_2)) +FROM + raw_events_first +GROUP BY + 1 +ORDER BY + 2 DESC NULLS LAST, 1 DESC; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + user_id | avg +--------------------------------------------------------------------- + 6 | 6000.1 + 5 | 5000.1 + 4 | 4000.1 + 3 | 3000.1 + 2 | 2000.1 + 1 | 1000.1 + 105 | + 104 | + 103 | + 102 | + 101 | + 100 | +(12 rows) + +SELECT + user_id, max(value_1) OVER (PARTITION BY user_id, MIN(value_2)) +FROM ( + SELECT + DISTINCT us.user_id, us.value_2, us.value_1, random() as r1 + FROM + raw_events_first as us, raw_events_second + WHERE + us.user_id = raw_events_second.user_id + ORDER BY + user_id, value_2 + ) s +GROUP BY + 1, value_1 +ORDER BY + 2 DESC, 1; +DEBUG: Creating router plan + user_id | max +--------------------------------------------------------------------- + 1 | + 2 | + 3 | + 4 | + 5 | + 6 | + 6 | 60 + 5 | 50 + 4 | 40 + 3 | 30 + 2 | 20 + 1 | 10 + 5 | 5 + 4 | 4 + 3 | 3 + 2 | 2 + 1 | 1 +(17 rows) + +SELECT + DISTINCT ON (raw_events_second.user_id, rnk) raw_events_second.user_id, rank() OVER my_win AS rnk +FROM + raw_events_second, raw_events_first +WHERE + raw_events_first.user_id = raw_events_second.user_id +WINDOW + my_win AS (PARTITION BY raw_events_second.user_id, raw_events_first.value_1 ORDER BY raw_events_second.time DESC) +ORDER BY + rnk DESC, 1 DESC +LIMIT 10; +DEBUG: Creating router plan + user_id | rnk +--------------------------------------------------------------------- + 6 | 2 + 5 | 2 + 4 | 2 + 3 | 2 + 2 | 2 + 1 | 2 + 6 | 1 + 5 | 1 + 4 | 1 + 3 | 1 +(10 rows) + +SET client_min_messages TO ERROR; +DROP SCHEMA query_null_dist_key CASCADE; +SELECT citus_remove_node('localhost', :master_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index 5d6fbb068..45adf469e 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -200,6 +200,7 @@ test: local_table_join test: local_dist_join_mixed test: citus_local_dist_joins test: recurring_outer_join +test: query_null_dist_key test: pg_dump # --------- diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index 5b9190516..1fdc3a514 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -2051,6 +2051,118 @@ UPDATE SET val = dist_source.val WHEN NOT MATCHED THEN INSERT VALUES(dist_source.id, dist_source.val); +-- test merge with null shard key tables + +CREATE SCHEMA query_null_dist_key; + +SET search_path TO query_null_dist_key; +SET client_min_messages TO DEBUG2; + +CREATE TABLE nullkey_c1_t1(a int, b int); +CREATE TABLE nullkey_c1_t2(a int, b int); +SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none'); +SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1'); + +CREATE TABLE nullkey_c2_t1(a int, b int); +CREATE TABLE nullkey_c2_t2(a int, b int); +SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none'); +SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>null); + +CREATE TABLE reference_table(a int, b int); +SELECT create_reference_table('reference_table'); +INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; + +CREATE TABLE distributed_table(a int, b int); +SELECT create_distributed_table('distributed_table', 'a'); +INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i; + +CREATE TABLE citus_local_table(a int, b int); +SELECT citus_add_local_table_to_metadata('citus_local_table'); +INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; + +CREATE TABLE postgres_local_table(a int, b int); +INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; + +-- with a colocated table +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b; + +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN DELETE; + +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); + +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); + +-- with non-colocated null-dist-key table +MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b; + +MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b); + +-- with a distributed table +MERGE INTO nullkey_c1_t1 USING distributed_table ON (nullkey_c1_t1.a = distributed_table.a) +WHEN MATCHED THEN UPDATE SET b = distributed_table.b +WHEN NOT MATCHED THEN INSERT VALUES (distributed_table.a, distributed_table.b); + +MERGE INTO distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = distributed_table.a) +WHEN MATCHED THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); + +-- with a reference table +MERGE INTO nullkey_c1_t1 USING reference_table ON (nullkey_c1_t1.a = reference_table.a) +WHEN MATCHED THEN UPDATE SET b = reference_table.b; + +MERGE INTO reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = reference_table.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); + +-- with a citus local table +MERGE INTO nullkey_c1_t1 USING citus_local_table ON (nullkey_c1_t1.a = citus_local_table.a) +WHEN MATCHED THEN UPDATE SET b = citus_local_table.b; + +MERGE INTO citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = citus_local_table.a) +WHEN MATCHED THEN DELETE; + +-- with a postgres table +MERGE INTO nullkey_c1_t1 USING postgres_local_table ON (nullkey_c1_t1.a = postgres_local_table.a) +WHEN MATCHED THEN UPDATE SET b = postgres_local_table.b; + +MERGE INTO postgres_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = postgres_local_table.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); + +-- using ctes +WITH cte AS ( + SELECT * FROM nullkey_c1_t1 +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; + +WITH cte AS ( + SELECT * FROM distributed_table +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; + +WITH cte AS materialized ( + SELECT * FROM distributed_table +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; + +SET client_min_messages TO WARNING; +DROP SCHEMA query_null_dist_key CASCADE; + +RESET client_min_messages; +SET search_path TO merge_schema; + DROP SERVER foreign_server CASCADE; DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; diff --git a/src/test/regress/sql/query_null_dist_key.sql b/src/test/regress/sql/query_null_dist_key.sql new file mode 100644 index 000000000..f5d1fe3fc --- /dev/null +++ b/src/test/regress/sql/query_null_dist_key.sql @@ -0,0 +1,1132 @@ +CREATE SCHEMA query_null_dist_key; +SET search_path TO query_null_dist_key; + +SET citus.next_shard_id TO 1620000; +SET citus.shard_count TO 32; + +SET client_min_messages TO WARNING; +SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0); + +SET client_min_messages TO NOTICE; + +CREATE TABLE nullkey_c1_t1(a int, b int); +CREATE TABLE nullkey_c1_t2(a int, b int); +SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none'); +SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1'); +INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i; +INSERT INTO nullkey_c1_t2 SELECT i, i FROM generate_series(2, 7) i; + +CREATE TABLE nullkey_c2_t1(a int, b int); +CREATE TABLE nullkey_c2_t2(a int, b int); +SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none'); +SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>null); +INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i; +INSERT INTO nullkey_c2_t2 SELECT i, i FROM generate_series(1, 8) i; + +CREATE TABLE nullkey_c3_t1(a int, b int); +SELECT create_distributed_table('nullkey_c3_t1', null, colocate_with=>'none'); +INSERT INTO nullkey_c3_t1 SELECT i, i FROM generate_series(1, 8) i; + +CREATE TABLE reference_table(a int, b int); +SELECT create_reference_table('reference_table'); +INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; + +CREATE TABLE distributed_table(a int, b int); +SELECT create_distributed_table('distributed_table', 'a'); +INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i; + +CREATE TABLE citus_local_table(a int, b int); +SELECT citus_add_local_table_to_metadata('citus_local_table'); +INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; + +CREATE TABLE postgres_local_table(a int, b int); +INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; + +CREATE TABLE articles_hash ( + id bigint NOT NULL, + author_id bigint NOT NULL, + title varchar(20) NOT NULL, + word_count integer +); + +INSERT INTO articles_hash VALUES ( 4, 4, 'altdorfer', 14551),( 5, 5, 'aruru', 11389), + (13, 3, 'aseyev', 2255),(15, 5, 'adversa', 3164), + (18, 8, 'assembly', 911),(19, 9, 'aubergiste', 4981), + (28, 8, 'aerophyte', 5454),(29, 9, 'amateur', 9524), + (42, 2, 'ausable', 15885),(43, 3, 'affixal', 12723), + (49, 9, 'anyone', 2681),(50, 10, 'anjanette', 19519); + +SELECT create_distributed_table('articles_hash', null, colocate_with=>'none'); + +CREATE TABLE raw_events_first (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1)); +SELECT create_distributed_table('raw_events_first', null, colocate_with=>'none', distribution_type=>null); + +CREATE TABLE raw_events_second (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1)); +SELECT create_distributed_table('raw_events_second', null, colocate_with=>'raw_events_first', distribution_type=>null); + +CREATE TABLE agg_events (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp, UNIQUE(user_id, value_1_agg)); +SELECT create_distributed_table('agg_events', null, colocate_with=>'raw_events_first', distribution_type=>null); + +CREATE TABLE users_ref_table (user_id int); +SELECT create_reference_table('users_ref_table'); + +INSERT INTO raw_events_first VALUES (1, '1970-01-01', 10, 100, 1000.1, 10000), (3, '1971-01-01', 30, 300, 3000.1, 30000), + (5, '1972-01-01', 50, 500, 5000.1, 50000), (2, '1973-01-01', 20, 200, 2000.1, 20000), + (4, '1974-01-01', 40, 400, 4000.1, 40000), (6, '1975-01-01', 60, 600, 6000.1, 60000); + +CREATE TABLE modify_fast_path(key int, value_1 int, value_2 text); +SELECT create_distributed_table('modify_fast_path', null); + +CREATE TABLE modify_fast_path_reference(key int, value_1 int, value_2 text); +SELECT create_reference_table('modify_fast_path_reference'); + +CREATE TABLE bigserial_test (x int, y int, z bigserial); +SELECT create_distributed_table('bigserial_test', null); + +CREATE TABLE append_table (text_col text, a int); +SELECT create_distributed_table('append_table', 'a', 'append'); +SELECT master_create_empty_shard('append_table') AS shardid1 \gset +SELECT master_create_empty_shard('append_table') AS shardid2 \gset +SELECT master_create_empty_shard('append_table') AS shardid3 \gset + +COPY append_table (text_col, a) FROM STDIN WITH (format 'csv', append_to_shard :shardid1); +abc,234 +bcd,123 +bcd,234 +cde,345 +def,456 +efg,234 +\. + +COPY append_table (text_col, a) FROM STDIN WITH (format 'csv', append_to_shard :shardid2); +abc,123 +efg,123 +hij,123 +hij,234 +ijk,1 +jkl,0 +\. + +CREATE TABLE range_table(a int, b int); +SELECT create_distributed_table('range_table', 'a', 'range'); +CALL public.create_range_partitioned_shards('range_table', '{"0","25"}','{"24","49"}'); +INSERT INTO range_table VALUES (0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 50); + +SET client_min_messages to DEBUG2; + +-- simple insert +INSERT INTO nullkey_c1_t1 VALUES (1,2), (2,2), (3,4); +INSERT INTO nullkey_c1_t2 VALUES (1,3), (3,4), (5,1), (6,2); + +INSERT INTO nullkey_c2_t1 VALUES (1,0), (2,5), (4,3), (5,2); +INSERT INTO nullkey_c2_t2 VALUES (2,4), (3,2), (5,2), (7,4); + +-- simple select +SELECT * FROM nullkey_c1_t1 ORDER BY 1,2; + +-- for update / share +SELECT * FROM modify_fast_path WHERE key = 1 FOR UPDATE; +SELECT * FROM modify_fast_path WHERE key = 1 FOR SHARE; +SELECT * FROM modify_fast_path FOR UPDATE; +SELECT * FROM modify_fast_path FOR SHARE; + +-- cartesian product with different table types + +-- with other table types +SELECT COUNT(*) FROM distributed_table d1, nullkey_c1_t1; +SELECT COUNT(*) FROM reference_table d1, nullkey_c1_t1; +SELECT COUNT(*) FROM citus_local_table d1, nullkey_c1_t1; +SELECT COUNT(*) FROM postgres_local_table d1, nullkey_c1_t1; + +-- with a colocated null dist key table +SELECT COUNT(*) FROM nullkey_c1_t1 d1, nullkey_c1_t2; + +-- with a non-colocated null dist key table +SELECT COUNT(*) FROM nullkey_c1_t1 d1, nullkey_c2_t1; + +-- First, show that nullkey_c1_t1 and nullkey_c3_t1 are not colocated. +SELECT + (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'query_null_dist_key.nullkey_c1_t1'::regclass) != + (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'query_null_dist_key.nullkey_c3_t1'::regclass); + +-- Now verify that we can join them via router planner because it doesn't care +-- about whether two tables are colocated or not but physical location of shards +-- when citus.enable_non_colocated_router_query_pushdown is set to on. + +SET citus.enable_non_colocated_router_query_pushdown TO ON; + +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN nullkey_c3_t1 USING(a); + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; + +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN nullkey_c3_t1 USING(a); + +RESET citus.enable_non_colocated_router_query_pushdown; + +-- colocated join between null dist key tables +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING(a); +SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN nullkey_c1_t2 USING(a); +SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t2 t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t2 t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t2 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t2 t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t2 t2 WHERE t2.b > t1.a +); + +-- non-colocated inner joins between null dist key tables +SELECT * FROM nullkey_c1_t1 JOIN nullkey_c2_t1 USING(a) ORDER BY 1,2,3; + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +) q USING(a); + +-- non-colocated outer joins between null dist key tables +SELECT * FROM nullkey_c1_t1 LEFT JOIN nullkey_c2_t2 USING(a) ORDER BY 1,2,3 LIMIT 4; +SELECT * FROM nullkey_c1_t1 FULL JOIN nullkey_c2_t2 USING(a) ORDER BY 1,2,3 LIMIT 4; +SELECT * FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +) q USING(a) ORDER BY 1,2,3 OFFSET 3 LIMIT 4; + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c2_t2 t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +); + +-- join with a reference table +SELECT COUNT(*) FROM nullkey_c1_t1, reference_table WHERE nullkey_c1_t1.a = reference_table.a; + +WITH cte_1 AS + (SELECT * FROM nullkey_c1_t1, reference_table WHERE nullkey_c1_t1.a = reference_table.a ORDER BY 1,2,3,4 FOR UPDATE) +SELECT COUNT(*) FROM cte_1; + +-- join with postgres / citus local tables +SELECT * FROM nullkey_c1_t1 JOIN postgres_local_table USING(a); +SELECT * FROM nullkey_c1_t1 JOIN citus_local_table USING(a); + +-- join with a distributed table +SELECT * FROM distributed_table d1 JOIN nullkey_c1_t1 USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM distributed_table t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM distributed_table t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); + +-- outer joins with different table types +SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN reference_table USING(a); +SELECT COUNT(*) FROM reference_table LEFT JOIN nullkey_c1_t1 USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN citus_local_table USING(a); +SELECT COUNT(*) FROM citus_local_table LEFT JOIN nullkey_c1_t1 USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN postgres_local_table USING(a); +SELECT COUNT(*) FROM postgres_local_table LEFT JOIN nullkey_c1_t1 USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN citus_local_table USING(a); +SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN postgres_local_table USING(a); +SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN reference_table USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN append_table USING(a); +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a); + +SET citus.enable_non_colocated_router_query_pushdown TO ON; + +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a) WHERE range_table.a = 20; + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; + +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a) WHERE range_table.a = 20; + +RESET citus.enable_non_colocated_router_query_pushdown; + +-- lateral / semi / anti joins with different table types + +-- with a reference table +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM reference_table t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM reference_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE NOT EXISTS ( + SELECT * FROM reference_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM reference_table t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM reference_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM reference_table t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM reference_table t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM reference_table t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM reference_table t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM reference_table t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM reference_table t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); + +-- with a distributed table +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM distributed_table t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM distributed_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE NOT EXISTS ( + SELECT * FROM distributed_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM distributed_table t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM distributed_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM distributed_table t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM distributed_table t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM distributed_table t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM distributed_table t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); + +-- with postgres / citus local tables +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE NOT EXISTS ( + SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM citus_local_table t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM citus_local_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM citus_local_table t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM postgres_local_table t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM citus_local_table t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM citus_local_table t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM citus_local_table t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM citus_local_table t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM postgres_local_table t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM postgres_local_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE NOT EXISTS ( + SELECT * FROM postgres_local_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM postgres_local_table t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM postgres_local_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM postgres_local_table t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM postgres_local_table t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM postgres_local_table t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM postgres_local_table t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM postgres_local_table t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); + +-- insert .. select + +-- between two colocated null dist key tables + +-- The target list of "distributed statement"s that we send to workers +-- differ(*) in Postgres versions < 15. For this reason, we temporarily +-- disable debug messages here and run the EXPLAIN'ed version of the +-- command. +-- +-- (*): < SELECT a, b > vs < SELECT table_name.a, table_name.b > +SET client_min_messages TO WARNING; +EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE, VERBOSE FALSE) +INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c1_t2; +SET client_min_messages TO DEBUG2; + +-- between two non-colocated null dist key tables +INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c2_t1; + +-- between a null dist key table and a table of different type +INSERT INTO nullkey_c1_t1 SELECT * FROM reference_table; +INSERT INTO nullkey_c1_t1 SELECT * FROM distributed_table; +INSERT INTO nullkey_c1_t1 SELECT * FROM citus_local_table; +INSERT INTO nullkey_c1_t1 SELECT * FROM postgres_local_table; + +INSERT INTO reference_table SELECT * FROM nullkey_c1_t1; +INSERT INTO distributed_table SELECT * FROM nullkey_c1_t1; +INSERT INTO citus_local_table SELECT * FROM nullkey_c1_t1; +INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1; + +-- test subquery +SELECT count(*) FROM +( + SELECT * FROM (SELECT * FROM nullkey_c1_t2) as subquery_inner +) AS subquery_top; + +-- test cte inlining +WITH cte_nullkey_c1_t1 AS (SELECT * FROM nullkey_c1_t1), + cte_postgres_local_table AS (SELECT * FROM postgres_local_table), + cte_distributed_table AS (SELECT * FROM distributed_table) +SELECT COUNT(*) FROM cte_distributed_table, cte_nullkey_c1_t1, cte_postgres_local_table +WHERE cte_nullkey_c1_t1.a > 3 AND cte_distributed_table.a < 5; + +-- test recursive ctes +WITH level_0 AS ( + WITH level_1 AS ( + WITH RECURSIVE level_2_recursive(x) AS ( + VALUES (1) + UNION ALL + SELECT a + 1 FROM nullkey_c1_t1 JOIN level_2_recursive ON (a = x) WHERE a < 100 + ) + SELECT * FROM level_2_recursive RIGHT JOIN reference_table ON (level_2_recursive.x = reference_table.a) + ) + SELECT * FROM level_1 +) +SELECT COUNT(*) FROM level_0; + +WITH level_0 AS ( + WITH level_1 AS ( + WITH RECURSIVE level_2_recursive(x) AS ( + VALUES (1) + UNION ALL + SELECT a + 1 FROM nullkey_c1_t1 JOIN level_2_recursive ON (a = x) WHERE a < 100 + ) + SELECT * FROM level_2_recursive JOIN distributed_table ON (level_2_recursive.x = distributed_table.a) + ) + SELECT * FROM level_1 +) +SELECT COUNT(*) FROM level_0; + +-- grouping set +SELECT + id, substring(title, 2, 1) AS subtitle, count(*) + FROM articles_hash + WHERE author_id = 1 or author_id = 2 + GROUP BY GROUPING SETS ((id),(subtitle)) + ORDER BY id, subtitle; + +-- subquery in SELECT clause +SELECT a.title AS name, (SELECT a2.id FROM articles_hash a2 WHERE a.id = a2.id LIMIT 1) + AS special_price FROM articles_hash a +ORDER BY 1,2; + +-- test prepared statements + +-- prepare queries can be router plannable +PREPARE author_1_articles as + SELECT * + FROM articles_hash + WHERE author_id = 1; + +EXECUTE author_1_articles; +EXECUTE author_1_articles; +EXECUTE author_1_articles; +EXECUTE author_1_articles; +EXECUTE author_1_articles; +EXECUTE author_1_articles; + +-- parametric prepare queries can be router plannable +PREPARE author_articles(int) as + SELECT * + FROM articles_hash + WHERE author_id = $1; + +EXECUTE author_articles(1); +EXECUTE author_articles(1); +EXECUTE author_articles(1); +EXECUTE author_articles(1); +EXECUTE author_articles(1); +EXECUTE author_articles(1); + +EXECUTE author_articles(NULL); +EXECUTE author_articles(NULL); +EXECUTE author_articles(NULL); +EXECUTE author_articles(NULL); +EXECUTE author_articles(NULL); +EXECUTE author_articles(NULL); +EXECUTE author_articles(NULL); + +PREPARE author_articles_update(int) AS + UPDATE articles_hash SET title = 'test' WHERE author_id = $1; + +EXECUTE author_articles_update(NULL); +EXECUTE author_articles_update(NULL); +EXECUTE author_articles_update(NULL); +EXECUTE author_articles_update(NULL); +EXECUTE author_articles_update(NULL); +EXECUTE author_articles_update(NULL); +EXECUTE author_articles_update(NULL); + +-- More tests with insert .. select. +-- +-- The target list of "distributed statement"s that we send to workers +-- might differ(*) in Postgres versions < 15 and they are reported when +-- "log level >= DEBUG2". For this reason, we set log level to DEBUG1 to +-- avoid reporting them. +-- +-- DEBUG1 still allows reporting the reason why given INSERT .. SELECT +-- query is not distributed / requires pull-to-coordinator. + +SET client_min_messages TO DEBUG1; + +INSERT INTO bigserial_test (x, y) SELECT x, y FROM bigserial_test; + +INSERT INTO agg_events + (user_id) +SELECT f2.id FROM + +(SELECT + id +FROM (SELECT users_ref_table.user_id AS id + FROM raw_events_first, + users_ref_table + WHERE raw_events_first.user_id = users_ref_table.user_id ) AS foo) as f +INNER JOIN +(SELECT v4, + v1, + id +FROM (SELECT SUM(raw_events_second.value_4) AS v4, + SUM(raw_events_first.value_1) AS v1, + raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id + GROUP BY raw_events_second.user_id + HAVING SUM(raw_events_second.value_4) > 1000) AS foo2 ) as f2 +ON (f.id = f2.id) +WHERE f.id IN (SELECT user_id + FROM raw_events_second); + +-- upsert with returning +INSERT INTO agg_events AS ae + ( + user_id, + value_1_agg, + agg_time + ) +SELECT user_id, + value_1, + time +FROM raw_events_first +ON conflict (user_id, value_1_agg) +DO UPDATE + SET agg_time = EXCLUDED.agg_time + WHERE ae.agg_time < EXCLUDED.agg_time +RETURNING user_id, value_1_agg; + +-- using a left join +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id + WHERE raw_events_second.user_id = 10 OR raw_events_second.user_id = 11; + +-- using a full join +INSERT INTO agg_events (user_id, value_1_agg) +SELECT t1.user_id AS col1, + t2.user_id AS col2 +FROM raw_events_first t1 + FULL JOIN raw_events_second t2 + ON t1.user_id = t2.user_id; + +-- using semi join +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT raw_events_second.user_id + FROM raw_events_second, raw_events_first + WHERE raw_events_second.user_id = raw_events_first.user_id AND raw_events_first.user_id = 200); + +-- using lateral join +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE NOT EXISTS (SELECT 1 + FROM raw_events_second + WHERE raw_events_second.user_id =raw_events_first.user_id); + +-- using inner join +INSERT INTO agg_events (user_id) +SELECT raw_events_first.user_id +FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 +WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4); + +-- We could relax distributed insert .. select checks to allow pushing +-- down more clauses down to the worker nodes when inserting into a single +-- shard by selecting from a colocated one. We might want to do something +-- like https://github.com/citusdata/citus/pull/6772. +-- +-- e.g., insert into null_shard_key_1/citus_local/reference +-- select * from null_shard_key_1/citus_local/reference limit 1 +-- +-- Below "limit / offset clause" test and some others are examples of this. + +-- limit / offset clause +INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first LIMIT 1; +INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first OFFSET 1; + +-- using a materialized cte +WITH cte AS MATERIALIZED + (SELECT max(value_1)+1 as v1_agg, user_id FROM raw_events_first GROUP BY user_id) +INSERT INTO agg_events (value_1_agg, user_id) +SELECT v1_agg, user_id FROM cte; + +INSERT INTO raw_events_second + WITH cte AS MATERIALIZED (SELECT * FROM raw_events_first) + SELECT user_id * 1000, time, value_1, value_2, value_3, value_4 FROM cte; + +-- using a regular cte +WITH cte AS (SELECT * FROM raw_events_first) +INSERT INTO raw_events_second + SELECT user_id * 7000, time, value_1, value_2, value_3, value_4 FROM cte; + +INSERT INTO raw_events_second + WITH cte AS (SELECT * FROM raw_events_first) + SELECT * FROM cte; + +INSERT INTO agg_events + WITH sub_cte AS (SELECT 1) + SELECT + raw_events_first.user_id, (SELECT * FROM sub_cte) + FROM + raw_events_first; + +-- we still support complex joins via INSERT's cte list .. +WITH cte AS ( + SELECT reference_table.a AS a, 1 AS b + FROM distributed_table RIGHT JOIN reference_table USING (a) +) +INSERT INTO raw_events_second (user_id, value_1) + SELECT (a+5)*-1, b FROM cte; + +-- .. but can't do so via via SELECT's cte list +INSERT INTO raw_events_second (user_id, value_1) +WITH cte AS ( + SELECT reference_table.a AS a, 1 AS b + FROM distributed_table RIGHT JOIN reference_table USING (a) +) + SELECT (a+5)*-1, b FROM cte; + +-- using set operations +INSERT INTO + raw_events_first(user_id) + (SELECT user_id FROM raw_events_first) INTERSECT + (SELECT user_id FROM raw_events_first); + +-- group by clause inside subquery +INSERT INTO agg_events + (user_id) +SELECT f2.id FROM + +(SELECT + id +FROM (SELECT raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id ) AS foo) as f +INNER JOIN +(SELECT v4, + v1, + id +FROM (SELECT SUM(raw_events_second.value_4) AS v4, + SUM(raw_events_first.value_1) AS v1, + raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id + GROUP BY raw_events_second.user_id + HAVING SUM(raw_events_second.value_4) > 1000) AS foo2 ) as f2 +ON (f.id = f2.id) +WHERE f.id IN (SELECT user_id + FROM raw_events_second); + +-- group by clause inside lateral subquery +INSERT INTO agg_events (user_id, value_4_agg) +SELECT + averages.user_id, avg(averages.value_4) +FROM + (SELECT + t1.user_id + FROM + raw_events_second t1 JOIN raw_events_second t2 on (t1.user_id = t2.user_id) + ) reference_ids + JOIN LATERAL + (SELECT + user_id, value_4 + FROM + raw_events_first) as averages ON averages.value_4 = reference_ids.user_id + GROUP BY averages.user_id; + +-- using aggregates +INSERT INTO agg_events + (value_3_agg, + value_4_agg, + value_1_agg, + value_2_agg, + user_id) +SELECT SUM(value_3), + Count(value_4), + user_id, + SUM(value_1), + Avg(value_2) +FROM raw_events_first +GROUP BY user_id; + +-- using generate_series +INSERT INTO raw_events_first (user_id, value_1, value_2) +SELECT s, s, s FROM generate_series(1, 5) s; + +CREATE SEQUENCE insert_select_test_seq; + +-- nextval() expression in select's targetlist +INSERT INTO raw_events_first (user_id, value_1, value_2) +SELECT s, nextval('insert_select_test_seq'), (random()*10)::int +FROM generate_series(100, 105) s; + +-- non-immutable function +INSERT INTO modify_fast_path (key, value_1) VALUES (2,1) RETURNING value_1, random() * key; + +SET client_min_messages TO DEBUG2; + +-- update / delete + +UPDATE nullkey_c1_t1 SET a = 1 WHERE b = 5; +UPDATE nullkey_c1_t1 SET a = 1 WHERE a = 5; +UPDATE nullkey_c1_t1 SET a = random(); +UPDATE nullkey_c1_t1 SET a = 1 WHERE a = random(); + +DELETE FROM nullkey_c1_t1 WHERE b = 5; +DELETE FROM nullkey_c1_t1 WHERE a = random(); + +-- simple update queries between different table types / colocated tables +UPDATE nullkey_c1_t1 SET b = 5 FROM nullkey_c1_t2 WHERE nullkey_c1_t1.b = nullkey_c1_t2.b; +UPDATE nullkey_c1_t1 SET b = 5 FROM nullkey_c2_t1 WHERE nullkey_c1_t1.b = nullkey_c2_t1.b; +UPDATE nullkey_c1_t1 SET b = 5 FROM reference_table WHERE nullkey_c1_t1.b = reference_table.b; +UPDATE nullkey_c1_t1 SET b = 5 FROM distributed_table WHERE nullkey_c1_t1.b = distributed_table.b; +UPDATE nullkey_c1_t1 SET b = 5 FROM distributed_table WHERE nullkey_c1_t1.b = distributed_table.a; +UPDATE nullkey_c1_t1 SET b = 5 FROM citus_local_table WHERE nullkey_c1_t1.b = citus_local_table.b; +UPDATE nullkey_c1_t1 SET b = 5 FROM postgres_local_table WHERE nullkey_c1_t1.b = postgres_local_table.b; + +UPDATE reference_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = reference_table.b; +UPDATE distributed_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = distributed_table.b; +UPDATE distributed_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = distributed_table.a; +UPDATE citus_local_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = citus_local_table.b; +UPDATE postgres_local_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = postgres_local_table.b; + +-- simple delete queries between different table types / colocated tables +DELETE FROM nullkey_c1_t1 USING nullkey_c1_t2 WHERE nullkey_c1_t1.b = nullkey_c1_t2.b; +DELETE FROM nullkey_c1_t1 USING nullkey_c2_t1 WHERE nullkey_c1_t1.b = nullkey_c2_t1.b; +DELETE FROM nullkey_c1_t1 USING reference_table WHERE nullkey_c1_t1.b = reference_table.b; +DELETE FROM nullkey_c1_t1 USING distributed_table WHERE nullkey_c1_t1.b = distributed_table.b; +DELETE FROM nullkey_c1_t1 USING distributed_table WHERE nullkey_c1_t1.b = distributed_table.a; +DELETE FROM nullkey_c1_t1 USING citus_local_table WHERE nullkey_c1_t1.b = citus_local_table.b; +DELETE FROM nullkey_c1_t1 USING postgres_local_table WHERE nullkey_c1_t1.b = postgres_local_table.b; + +DELETE FROM reference_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = reference_table.b; +DELETE FROM distributed_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = distributed_table.b; +DELETE FROM distributed_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = distributed_table.a; +DELETE FROM citus_local_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = citus_local_table.b; +DELETE FROM postgres_local_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = postgres_local_table.b; + +-- slightly more complex update queries +UPDATE nullkey_c1_t1 SET b = 5 WHERE nullkey_c1_t1.b IN (SELECT b FROM distributed_table); + +WITH cte AS materialized( + SELECT * FROM distributed_table +) +UPDATE nullkey_c1_t1 SET b = 5 FROM cte WHERE nullkey_c1_t1.b = cte.a; + +WITH cte AS ( + SELECT reference_table.a AS a, 1 AS b + FROM distributed_table RIGHT JOIN reference_table USING (a) +) +UPDATE nullkey_c1_t1 SET b = 5 WHERE nullkey_c1_t1.b IN (SELECT b FROM cte); + +UPDATE nullkey_c1_t1 SET b = 5 FROM reference_table WHERE EXISTS ( + SELECT 1 FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a) WHERE nullkey_c1_t1.b IS NULL +); + +UPDATE nullkey_c1_t1 tx SET b = ( + SELECT nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN nullkey_c1_t1 ON (nullkey_c1_t1.a != nullkey_c1_t2.a) WHERE nullkey_c1_t1.a = tx.a ORDER BY 1 LIMIT 1 +); + +UPDATE nullkey_c1_t1 tx SET b = t2.b FROM nullkey_c1_t1 t1 JOIN nullkey_c1_t2 t2 ON (t1.a = t2.a); + +WITH cte AS ( + SELECT * FROM nullkey_c1_t2 ORDER BY 1,2 LIMIT 10 +) +UPDATE nullkey_c1_t1 SET b = 5 WHERE nullkey_c1_t1.a IN (SELECT b FROM cte); + +UPDATE modify_fast_path SET value_1 = value_1 + 12 * value_1 WHERE key = 1; +UPDATE modify_fast_path SET value_1 = NULL WHERE value_1 = 15 AND (key = 1 OR value_2 = 'citus'); +UPDATE modify_fast_path SET value_1 = 5 WHERE key = 2 RETURNING value_1 * 15, value_1::numeric * 16; +UPDATE modify_fast_path + SET value_1 = 1 + FROM modify_fast_path_reference + WHERE + modify_fast_path.key = modify_fast_path_reference.key AND + modify_fast_path.key = 1 AND + modify_fast_path_reference.key = 1; + +PREPARE p1 (int, int, int) AS + UPDATE modify_fast_path SET value_1 = value_1 + $1 WHERE key = $2 AND value_1 = $3; +EXECUTE p1(1,1,1); +EXECUTE p1(2,2,2); +EXECUTE p1(3,3,3); +EXECUTE p1(4,4,4); +EXECUTE p1(5,5,5); +EXECUTE p1(6,6,6); +EXECUTE p1(7,7,7); + +PREPARE prepared_zero_shard_update(int) AS UPDATE modify_fast_path SET value_1 = 1 WHERE key = $1 AND false; +EXECUTE prepared_zero_shard_update(1); +EXECUTE prepared_zero_shard_update(2); +EXECUTE prepared_zero_shard_update(3); +EXECUTE prepared_zero_shard_update(4); +EXECUTE prepared_zero_shard_update(5); +EXECUTE prepared_zero_shard_update(6); +EXECUTE prepared_zero_shard_update(7); + +-- slightly more complex delete queries +DELETE FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b IN (SELECT b FROM distributed_table); + +WITH cte AS materialized( + SELECT * FROM distributed_table +) +DELETE FROM nullkey_c1_t1 USING cte WHERE nullkey_c1_t1.b = cte.a; + +WITH cte AS ( + SELECT reference_table.a AS a, 1 AS b + FROM distributed_table RIGHT JOIN reference_table USING (a) +) +DELETE FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b IN (SELECT b FROM cte); + +DELETE FROM nullkey_c1_t1 USING reference_table WHERE EXISTS ( + SELECT 1 FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a) WHERE nullkey_c1_t1.b IS NULL +); + +DELETE FROM nullkey_c1_t1 tx USING nullkey_c1_t1 t1 JOIN nullkey_c1_t2 t2 ON (t1.a = t2.a); + +WITH cte AS ( + SELECT * FROM nullkey_c1_t2 ORDER BY 1,2 LIMIT 10 +) +DELETE FROM nullkey_c1_t1 WHERE nullkey_c1_t1.a IN (SELECT b FROM cte); + +DELETE FROM modify_fast_path WHERE value_1 = 15 AND (key = 1 OR value_2 = 'citus'); +DELETE FROM modify_fast_path WHERE key = 2 RETURNING value_1 * 15, value_1::numeric * 16; +DELETE FROM modify_fast_path + USING modify_fast_path_reference + WHERE + modify_fast_path.key = modify_fast_path_reference.key AND + modify_fast_path.key = 1 AND + modify_fast_path_reference.key = 1; + +PREPARE p2 (int, int, int) AS + DELETE FROM modify_fast_path WHERE key = ($2)*$1 AND value_1 = $3; +EXECUTE p2(1,1,1); +EXECUTE p2(2,2,2); +EXECUTE p2(3,3,3); +EXECUTE p2(4,4,4); +EXECUTE p2(5,5,5); +EXECUTE p2(6,6,6); +EXECUTE p2(7,7,7); + +PREPARE prepared_zero_shard_delete(int) AS DELETE FROM modify_fast_path WHERE key = $1 AND false; +EXECUTE prepared_zero_shard_delete(1); +EXECUTE prepared_zero_shard_delete(2); +EXECUTE prepared_zero_shard_delete(3); +EXECUTE prepared_zero_shard_delete(4); +EXECUTE prepared_zero_shard_delete(5); +EXECUTE prepared_zero_shard_delete(6); +EXECUTE prepared_zero_shard_delete(7); + +-- test modifying ctes + +WITH cte AS ( + UPDATE modify_fast_path SET value_1 = value_1 + 1 WHERE key = 1 RETURNING * +) +SELECT * FROM cte; + +WITH cte AS ( + DELETE FROM modify_fast_path WHERE key = 1 RETURNING * +) +SELECT * FROM modify_fast_path; + +WITH cte AS ( + DELETE FROM modify_fast_path WHERE key = 1 RETURNING * +) +SELECT * FROM modify_fast_path_reference WHERE key IN (SELECT key FROM cte); + +WITH cte AS ( + DELETE FROM reference_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE a IN (SELECT a FROM cte); + +WITH cte AS ( + DELETE FROM nullkey_c1_t1 WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t2 WHERE a IN (SELECT a FROM cte); + +WITH cte AS ( + DELETE FROM nullkey_c1_t1 WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c2_t1 WHERE a IN (SELECT a FROM cte); + +WITH cte AS ( + DELETE FROM nullkey_c1_t1 WHERE a = 1 RETURNING * +) +SELECT * FROM distributed_table WHERE a IN (SELECT a FROM cte); + +-- Below two queries fail very late when +-- citus.enable_non_colocated_router_query_pushdown is set to on. + +SET citus.enable_non_colocated_router_query_pushdown TO ON; + +WITH cte AS ( + DELETE FROM distributed_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE a IN (SELECT a FROM cte); + +WITH cte AS ( + DELETE FROM distributed_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE b IN (SELECT b FROM cte); + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; + +WITH cte AS ( + DELETE FROM distributed_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE a IN (SELECT a FROM cte); + +WITH cte AS ( + DELETE FROM distributed_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE b IN (SELECT b FROM cte); + +RESET citus.enable_non_colocated_router_query_pushdown; + +WITH cte AS ( + UPDATE modify_fast_path SET value_1 = value_1 + 1 WHERE key = 1 RETURNING * +) +UPDATE modify_fast_path SET value_1 = value_1 + 1 WHERE key = 1; + +WITH cte AS ( + DELETE FROM modify_fast_path WHERE key = 1 RETURNING * +) +DELETE FROM modify_fast_path WHERE key = 1; + +-- test window functions + +SELECT + user_id, avg(avg(value_3)) OVER (PARTITION BY user_id, MIN(value_2)) +FROM + raw_events_first +GROUP BY + 1 +ORDER BY + 2 DESC NULLS LAST, 1 DESC; + +SELECT + user_id, max(value_1) OVER (PARTITION BY user_id, MIN(value_2)) +FROM ( + SELECT + DISTINCT us.user_id, us.value_2, us.value_1, random() as r1 + FROM + raw_events_first as us, raw_events_second + WHERE + us.user_id = raw_events_second.user_id + ORDER BY + user_id, value_2 + ) s +GROUP BY + 1, value_1 +ORDER BY + 2 DESC, 1; + +SELECT + DISTINCT ON (raw_events_second.user_id, rnk) raw_events_second.user_id, rank() OVER my_win AS rnk +FROM + raw_events_second, raw_events_first +WHERE + raw_events_first.user_id = raw_events_second.user_id +WINDOW + my_win AS (PARTITION BY raw_events_second.user_id, raw_events_first.value_1 ORDER BY raw_events_second.time DESC) +ORDER BY + rnk DESC, 1 DESC +LIMIT 10; + +SET client_min_messages TO ERROR; +DROP SCHEMA query_null_dist_key CASCADE; + +SELECT citus_remove_node('localhost', :master_port);