diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 38962b333..50509baea 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -1025,6 +1025,17 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina { return distributedPlan; } + else if (ContainsNullDistKeyTable(originalQuery)) + { + /* + * We only support router queries if the query contains reference to + * a null-dist-key table. This temporary restriction will be removed + * once we support recursive planning for the queries that reference + * null-dist-key tables. + */ + WrapRouterErrorForNullDistKeyTable(distributedPlan->planningError); + RaiseDeferredError(distributedPlan->planningError, ERROR); + } else { RaiseDeferredError(distributedPlan->planningError, DEBUG2); @@ -2462,6 +2473,18 @@ HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams) } +/* + * ContainsNullDistKeyTable returns true if given query contains reference + * to a null-dist-key table. + */ +bool +ContainsNullDistKeyTable(Query *query) +{ + RTEListProperties *rteListProperties = GetRTEListPropertiesForQuery(query); + return rteListProperties->hasDistTableWithoutShardKey; +} + + /* * GetRTEListPropertiesForQuery is a wrapper around GetRTEListProperties that * returns RTEListProperties for the rte list retrieved from query. @@ -2538,6 +2561,15 @@ GetRTEListProperties(List *rangeTableList) else if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE)) { rteListProperties->hasDistributedTable = true; + + if (!HasDistributionKeyCacheEntry(cacheEntry)) + { + rteListProperties->hasDistTableWithoutShardKey = true; + } + else + { + rteListProperties->hasDistTableWithShardKey = true; + } } else { diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index ecb62478a..2be4a5626 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -212,6 +212,16 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue) return false; } + /* + * If the table doesn't have a distribution column, we don't need to + * check anything further. + */ + Var *distributionKey = PartitionColumn(distributedTableId, 1); + if (!distributionKey) + { + return true; + } + /* WHERE clause should not be empty for distributed tables */ if (joinTree == NULL || (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && joinTree->quals == @@ -220,13 +230,6 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue) return false; } - /* if that's a reference table, we don't need to check anything further */ - Var *distributionKey = PartitionColumn(distributedTableId, 1); - if (!distributionKey) - { - return true; - } - /* convert list of expressions into expression tree for further processing */ quals = joinTree->quals; if (quals != NULL && IsA(quals, List)) diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 53fe58cdb..175f6bc6f 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -57,6 +57,7 @@ static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId, PlannerRestrictionContext * plannerRestrictionContext, ParamListInfo boundParams); +static void ErrorIfInsertSelectWithNullDistKeyNotSupported(Query *originalQuery); static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); @@ -241,6 +242,12 @@ CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery, RaiseDeferredError(deferredError, ERROR); } + /* + * We support a limited set of INSERT .. SELECT queries if the query + * references a null-dist-key table. + */ + ErrorIfInsertSelectWithNullDistKeyNotSupported(originalQuery); + DistributedPlan *distributedPlan = CreateDistributedInsertSelectPlan(originalQuery, plannerRestrictionContext); @@ -260,6 +267,74 @@ CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery, } +/* + * ErrorIfInsertSelectWithNullDistKeyNotSupported throws an error if given INSERT + * .. SELECT query references a null-dist-key table (as the target table or in + * the SELECT clause) and is unsupported. + * + * Such an INSERT .. SELECT query is supported as long as the it only references + * a "colocated" set of null-dist-key tables, no other relation rte types. + */ +static void +ErrorIfInsertSelectWithNullDistKeyNotSupported(Query *originalQuery) +{ + RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery); + Query *subquery = subqueryRte->subquery; + RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery(subquery); + + RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(originalQuery); + Oid targetRelationId = insertRte->relid; + if (!IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE) && + subqueryRteListProperties->hasDistTableWithoutShardKey) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot select from a distributed table that " + "does not have a shard key when inserting into " + "a different table type"))); + } + else if (IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE)) + { + if (subqueryRteListProperties->hasPostgresLocalTable || + subqueryRteListProperties->hasReferenceTable || + subqueryRteListProperties->hasCitusLocalTable || + subqueryRteListProperties->hasDistTableWithShardKey || + subqueryRteListProperties->hasMaterializedView) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot select from different table types " + "when inserting into a distributed table " + "that does not have a shard key"))); + } + + if (!subqueryRteListProperties->hasDistTableWithoutShardKey) + { + /* + * This means that the SELECT doesn't reference any Citus tables, + * Postgres tables or materialized views but references a function + * call, a values claue etc., or a cte from INSERT. + * + * In that case, we rely on the common restrictions enforced by the + * INSERT .. SELECT planners. + */ + Assert(!NeedsDistributedPlanning(subquery)); + return; + } + + List *distributedRelationIdList = DistributedRelationIdList(subquery); + distributedRelationIdList = lappend_oid(distributedRelationIdList, + targetRelationId); + + if (!AllDistributedRelationsInListColocated(distributedRelationIdList)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot select from a non-colocated distributed " + "table when inserting into a distributed table " + "that does not have a shard key"))); + } + } +} + + /* * CreateDistributedInsertSelectPlan creates a DistributedPlan for distributed * INSERT ... SELECT queries which could consist of multiple tasks. @@ -379,6 +454,16 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery, { RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); + RTEListProperties *selectRteListProperties = + GetRTEListPropertiesForQuery(selectRte->subquery); + if (selectRteListProperties->hasDistTableWithoutShardKey) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot select from a distributed table that " + "does not have a shard key when inserting into " + "a local table"))); + } + PrepareInsertSelectForCitusPlanner(insertSelectQuery); /* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */ @@ -717,10 +802,7 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte, } else if (IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE)) { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "distributed INSERT ... SELECT cannot target a distributed " - "table with a null shard key", - NULL, NULL); + /* we've already checked the subquery via ErrorIfInsertSelectWithNullDistKeyNotSupported */ } else { @@ -874,7 +956,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, */ RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery( copiedSubquery); - if (subqueryRteListProperties->hasDistributedTable) + if (subqueryRteListProperties->hasDistTableWithShardKey) { AddPartitionKeyNotNullFilterToSelect(copiedSubquery); } diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 5b39aeba6..930a44db8 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -509,6 +509,11 @@ InsertDistributionColumnMatchesSource(Oid targetRelationId, Query *query) return NULL; } + if (!HasDistributionKey(targetRelationId)) + { + return NULL; + } + bool foundDistributionColumn = false; MergeAction *action = NULL; foreach_ptr(action, query->mergeActionList) diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index d9322bf5e..7732b6c5e 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -272,7 +272,7 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList) if (!targetListOnPartitionColumn) { if (!FindNodeMatchingCheckFunctionInRangeTableList(query->rtable, - IsDistributedTableRTE)) + IsTableWithDistKeyRTE)) { targetListOnPartitionColumn = true; } @@ -379,6 +379,20 @@ IsReferenceTableRTE(Node *node) } +/* + * IsTableWithDistKeyRTE gets a node and returns true if the node + * is a range table relation entry that points to a distributed table + * that has a distribution column. + */ +bool +IsTableWithDistKeyRTE(Node *node) +{ + Oid relationId = NodeTryGetRteRelid(node); + return relationId != InvalidOid && IsCitusTable(relationId) && + HasDistributionKey(relationId); +} + + /* * FullCompositeFieldList gets a composite field list, and checks if all fields * of composite type are used in the list. diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 97c2cecf6..47d11172f 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -258,6 +258,22 @@ CreateModifyPlan(Query *originalQuery, Query *query, } +/* + * WrapRouterErrorForNullDistKeyTable wraps given planning error with a + * generic error message if given query references a distributed table + * that doesn't have a distribution key. + */ +void +WrapRouterErrorForNullDistKeyTable(DeferredErrorMessage *planningError) +{ + planningError->detail = planningError->message; + planningError->message = pstrdup("queries that reference a distributed " + "table without a shard key can only " + "reference colocated distributed " + "tables or reference tables"); +} + + /* * CreateSingleTaskRouterSelectPlan creates a physical plan for given SELECT query. * The returned plan is a router task that returns query results from a single worker. @@ -1870,6 +1886,11 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon */ if (IsMergeQuery(originalQuery)) { + if (ContainsNullDistKeyTable(originalQuery)) + { + WrapRouterErrorForNullDistKeyTable(*planningError); + } + RaiseDeferredError(*planningError, ERROR); } else @@ -3855,7 +3876,8 @@ ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree) CitusTableCacheEntry *modificationTableCacheEntry = GetCitusTableCacheEntry(distributedTableId); - if (!HasDistributionKeyCacheEntry(modificationTableCacheEntry)) + if (!IsCitusTableTypeCacheEntry(modificationTableCacheEntry, + DISTRIBUTED_TABLE)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "cannot router plan modification of a non-distributed table", diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 412859449..753504131 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -147,9 +147,19 @@ typedef struct RTEListProperties bool hasReferenceTable; bool hasCitusLocalTable; - /* includes hash, append and range partitioned tables */ + /* includes hash, null dist key, append and range partitioned tables */ bool hasDistributedTable; + /* + * Effectively, hasDistributedTable is equal to + * "hasDistTableWithShardKey || hasDistTableWithoutShardKey". + * + * We provide below two for the callers that want to know what kind of + * distributed tables that given query has references to. + */ + bool hasDistTableWithShardKey; + bool hasDistTableWithoutShardKey; + /* union of hasReferenceTable, hasCitusLocalTable and hasDistributedTable */ bool hasCitusTable; @@ -243,6 +253,7 @@ extern int32 BlessRecordExpression(Expr *expr); extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan); extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan, struct DistributedPlan *distributedPlan); +extern bool ContainsNullDistKeyTable(Query *query); extern RTEListProperties * GetRTEListPropertiesForQuery(Query *query); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 189170358..de4901ea2 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -200,6 +200,7 @@ extern bool IsCitusTableRTE(Node *node); extern bool IsDistributedOrReferenceTableRTE(Node *node); extern bool IsDistributedTableRTE(Node *node); extern bool IsReferenceTableRTE(Node *node); +extern bool IsTableWithDistKeyRTE(Node *node); extern bool IsCitusExtraDataContainerRelation(RangeTblEntry *rte); extern bool ContainsReadIntermediateResultFunction(Node *node); extern bool ContainsReadIntermediateResultArrayFunction(Node *node); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index a255fd520..40d92fead 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -36,6 +36,7 @@ extern DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query, extern DistributedPlan * CreateModifyPlan(Query *originalQuery, Query *query, PlannerRestrictionContext * plannerRestrictionContext); +extern void WrapRouterErrorForNullDistKeyTable(DeferredErrorMessage *planningError); extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext, diff --git a/src/test/regress/citus_tests/config.py b/src/test/regress/citus_tests/config.py index cd1be125b..69cc5599c 100644 --- a/src/test/regress/citus_tests/config.py +++ b/src/test/regress/citus_tests/config.py @@ -237,12 +237,6 @@ class AllNullDistKeyDefaultConfig(CitusDefaultClusterConfig): # group 8 "function_create", "functions", - # group 9 - "merge_arbitrary_create", - "merge_arbitrary", - # group 10 - "arbitrary_configs_router_create", - "arbitrary_configs_router", # # ii) Skip the following test as it requires support for create_distributed_function. "nested_execution", diff --git a/src/test/regress/expected/create_null_dist_key.out b/src/test/regress/expected/create_null_dist_key.out index af6e66f62..43120a454 100644 --- a/src/test/regress/expected/create_null_dist_key.out +++ b/src/test/regress/expected/create_null_dist_key.out @@ -1803,6 +1803,7 @@ ALTER TABLE trigger_table_3 ENABLE TRIGGER ALL; -- try a few simple queries at least to make sure that we don't crash BEGIN; INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c2_t1; +ERROR: cannot select from a non-colocated distributed table when inserting into a distributed table that does not have a shard key ROLLBACK; DROP TRIGGER IF EXISTS trigger_1 ON trigger_table_1; DROP TRIGGER trigger_2 ON trigger_table_2 CASCADE; diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 2196d966d..fd82efa8c 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -3228,6 +3228,153 @@ WHEN NOT MATCHED THEN INSERT VALUES(dist_source.id, dist_source.val); ERROR: For MERGE command, all the distributed tables must be colocated, for append/range distribution, colocation is not supported HINT: Consider using hash distribution instead +-- test merge with null shard key tables +CREATE SCHEMA query_null_dist_key; +SET search_path TO query_null_dist_key; +SET client_min_messages TO DEBUG2; +CREATE TABLE nullkey_c1_t1(a int, b int); +CREATE TABLE nullkey_c1_t2(a int, b int); +SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE nullkey_c2_t1(a int, b int); +CREATE TABLE nullkey_c2_t2(a int, b int); +SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE reference_table(a int, b int); +SELECT create_reference_table('reference_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CREATE TABLE distributed_table(a int, b int); +SELECT create_distributed_table('distributed_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CREATE TABLE citus_local_table(a int, b int); +SELECT citus_add_local_table_to_metadata('citus_local_table'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CREATE TABLE postgres_local_table(a int, b int); +INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; +-- with a colocated table +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b; +DEBUG: +DEBUG: Creating MERGE router plan +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN DELETE; +DEBUG: +DEBUG: Creating MERGE router plan +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); +DEBUG: +DEBUG: Creating MERGE router plan +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); +DEBUG: +DEBUG: Creating MERGE router plan +-- with non-colocated null-dist-key table +MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b; +ERROR: For MERGE command, all the distributed tables must be colocated +MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b); +ERROR: For MERGE command, all the distributed tables must be colocated +-- with a distributed table +MERGE INTO nullkey_c1_t1 USING distributed_table ON (nullkey_c1_t1.a = distributed_table.a) +WHEN MATCHED THEN UPDATE SET b = distributed_table.b +WHEN NOT MATCHED THEN INSERT VALUES (distributed_table.a, distributed_table.b); +ERROR: For MERGE command, all the distributed tables must be colocated +MERGE INTO distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = distributed_table.a) +WHEN MATCHED THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); +ERROR: For MERGE command, all the distributed tables must be colocated +-- with a reference table +MERGE INTO nullkey_c1_t1 USING reference_table ON (nullkey_c1_t1.a = reference_table.a) +WHEN MATCHED THEN UPDATE SET b = reference_table.b; +ERROR: MERGE command is not supported on reference tables yet +MERGE INTO reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = reference_table.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); +ERROR: MERGE command is not supported on reference tables yet +-- with a citus local table +MERGE INTO nullkey_c1_t1 USING citus_local_table ON (nullkey_c1_t1.a = citus_local_table.a) +WHEN MATCHED THEN UPDATE SET b = citus_local_table.b; +ERROR: MERGE command is not supported with combination of distributed/local tables yet +MERGE INTO citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = citus_local_table.a) +WHEN MATCHED THEN DELETE; +ERROR: MERGE command is not supported with combination of distributed/local tables yet +-- with a postgres table +MERGE INTO nullkey_c1_t1 USING postgres_local_table ON (nullkey_c1_t1.a = postgres_local_table.a) +WHEN MATCHED THEN UPDATE SET b = postgres_local_table.b; +ERROR: MERGE command is not supported with combination of distributed/local tables yet +MERGE INTO postgres_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = postgres_local_table.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); +ERROR: MERGE command is not supported with combination of distributed/local tables yet +-- using ctes +WITH cte AS ( + SELECT * FROM nullkey_c1_t1 +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; +DEBUG: +DEBUG: Creating MERGE router plan +WITH cte AS ( + SELECT * FROM distributed_table +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; +ERROR: For MERGE command, all the distributed tables must be colocated +WITH cte AS materialized ( + SELECT * FROM distributed_table +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; +ERROR: For MERGE command, all the distributed tables must be colocated +SET client_min_messages TO WARNING; +DROP SCHEMA query_null_dist_key CASCADE; +RESET client_min_messages; +SET search_path TO merge_schema; DROP SERVER foreign_server CASCADE; NOTICE: drop cascades to 3 other objects DETAIL: drop cascades to user mapping for postgres on server foreign_server diff --git a/src/test/regress/expected/query_null_dist_key.out b/src/test/regress/expected/query_null_dist_key.out new file mode 100644 index 000000000..09413a3ea --- /dev/null +++ b/src/test/regress/expected/query_null_dist_key.out @@ -0,0 +1,1796 @@ +CREATE SCHEMA query_null_dist_key; +SET search_path TO query_null_dist_key; +SET citus.next_shard_id TO 1620000; +SET citus.shard_count TO 32; +SET client_min_messages TO WARNING; +SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SET client_min_messages TO NOTICE; +CREATE TABLE nullkey_c1_t1(a int, b int); +CREATE TABLE nullkey_c1_t2(a int, b int); +SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i; +INSERT INTO nullkey_c1_t2 SELECT i, i FROM generate_series(2, 7) i; +CREATE TABLE nullkey_c2_t1(a int, b int); +CREATE TABLE nullkey_c2_t2(a int, b int); +SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i; +INSERT INTO nullkey_c2_t2 SELECT i, i FROM generate_series(1, 8) i; +CREATE TABLE nullkey_c3_t1(a int, b int); +SELECT create_distributed_table('nullkey_c3_t1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO nullkey_c3_t1 SELECT i, i FROM generate_series(1, 8) i; +CREATE TABLE reference_table(a int, b int); +SELECT create_reference_table('reference_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; +CREATE TABLE distributed_table(a int, b int); +SELECT create_distributed_table('distributed_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i; +CREATE TABLE citus_local_table(a int, b int); +SELECT citus_add_local_table_to_metadata('citus_local_table'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; +CREATE TABLE postgres_local_table(a int, b int); +INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; +CREATE TABLE articles_hash ( + id bigint NOT NULL, + author_id bigint NOT NULL, + title varchar(20) NOT NULL, + word_count integer +); +INSERT INTO articles_hash VALUES ( 4, 4, 'altdorfer', 14551),( 5, 5, 'aruru', 11389), + (13, 3, 'aseyev', 2255),(15, 5, 'adversa', 3164), + (18, 8, 'assembly', 911),(19, 9, 'aubergiste', 4981), + (28, 8, 'aerophyte', 5454),(29, 9, 'amateur', 9524), + (42, 2, 'ausable', 15885),(43, 3, 'affixal', 12723), + (49, 9, 'anyone', 2681),(50, 10, 'anjanette', 19519); +SELECT create_distributed_table('articles_hash', null, colocate_with=>'none'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$query_null_dist_key.articles_hash$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE raw_events_first (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1)); +SELECT create_distributed_table('raw_events_first', null, colocate_with=>'none', distribution_type=>null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE raw_events_second (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1)); +SELECT create_distributed_table('raw_events_second', null, colocate_with=>'raw_events_first', distribution_type=>null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE agg_events (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp, UNIQUE(user_id, value_1_agg)); +SELECT create_distributed_table('agg_events', null, colocate_with=>'raw_events_first', distribution_type=>null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE users_ref_table (user_id int); +SELECT create_reference_table('users_ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO raw_events_first VALUES (1, '1970-01-01', 10, 100, 1000.1, 10000), (3, '1971-01-01', 30, 300, 3000.1, 30000), + (5, '1972-01-01', 50, 500, 5000.1, 50000), (2, '1973-01-01', 20, 200, 2000.1, 20000), + (4, '1974-01-01', 40, 400, 4000.1, 40000), (6, '1975-01-01', 60, 600, 6000.1, 60000); +CREATE TABLE modify_fast_path(key int, value_1 int, value_2 text); +SELECT create_distributed_table('modify_fast_path', null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE modify_fast_path_reference(key int, value_1 int, value_2 text); +SELECT create_reference_table('modify_fast_path_reference'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE bigserial_test (x int, y int, z bigserial); +SELECT create_distributed_table('bigserial_test', null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE append_table (text_col text, a int); +SELECT create_distributed_table('append_table', 'a', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT master_create_empty_shard('append_table') AS shardid1 \gset +SELECT master_create_empty_shard('append_table') AS shardid2 \gset +SELECT master_create_empty_shard('append_table') AS shardid3 \gset +COPY append_table (text_col, a) FROM STDIN WITH (format 'csv', append_to_shard :shardid1); +COPY append_table (text_col, a) FROM STDIN WITH (format 'csv', append_to_shard :shardid2); +CREATE TABLE range_table(a int, b int); +SELECT create_distributed_table('range_table', 'a', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('range_table', '{"0","25"}','{"24","49"}'); +INSERT INTO range_table VALUES (0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 50); +SET client_min_messages to DEBUG2; +-- simple insert +INSERT INTO nullkey_c1_t1 VALUES (1,2), (2,2), (3,4); +DEBUG: Creating router plan +INSERT INTO nullkey_c1_t2 VALUES (1,3), (3,4), (5,1), (6,2); +DEBUG: Creating router plan +INSERT INTO nullkey_c2_t1 VALUES (1,0), (2,5), (4,3), (5,2); +DEBUG: Creating router plan +INSERT INTO nullkey_c2_t2 VALUES (2,4), (3,2), (5,2), (7,4); +DEBUG: Creating router plan +-- simple select +SELECT * FROM nullkey_c1_t1 ORDER BY 1,2; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + a | b +--------------------------------------------------------------------- + 1 | 1 + 1 | 2 + 2 | 2 + 2 | 2 + 3 | 3 + 3 | 4 + 4 | 4 + 5 | 5 + 6 | 6 + 7 | 7 + 8 | 8 +(11 rows) + +-- for update / share +SELECT * FROM modify_fast_path WHERE key = 1 FOR UPDATE; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + key | value_1 | value_2 +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM modify_fast_path WHERE key = 1 FOR SHARE; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + key | value_1 | value_2 +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM modify_fast_path FOR UPDATE; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + key | value_1 | value_2 +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM modify_fast_path FOR SHARE; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + key | value_1 | value_2 +--------------------------------------------------------------------- +(0 rows) + +-- cartesian product with different table types +-- with other table types +SELECT COUNT(*) FROM distributed_table d1, nullkey_c1_t1; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM reference_table d1, nullkey_c1_t1; +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 66 +(1 row) + +SELECT COUNT(*) FROM citus_local_table d1, nullkey_c1_t1; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM postgres_local_table d1, nullkey_c1_t1; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +-- with a colocated null dist key table +SELECT COUNT(*) FROM nullkey_c1_t1 d1, nullkey_c1_t2; +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 110 +(1 row) + +-- with a non-colocated null dist key table +SELECT COUNT(*) FROM nullkey_c1_t1 d1, nullkey_c2_t1; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +-- First, show that nullkey_c1_t1 and nullkey_c3_t1 are not colocated. +SELECT + (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'query_null_dist_key.nullkey_c1_t1'::regclass) != + (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'query_null_dist_key.nullkey_c3_t1'::regclass); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- Now verify that we can join them via router planner because it doesn't care +-- about whether two tables are colocated or not but physical location of shards +-- when citus.enable_non_colocated_router_query_pushdown is set to on. +SET citus.enable_non_colocated_router_query_pushdown TO ON; +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN nullkey_c3_t1 USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 11 +(1 row) + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN nullkey_c3_t1 USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: router planner does not support queries that reference non-colocated distributed tables +RESET citus.enable_non_colocated_router_query_pushdown; +-- colocated join between null dist key tables +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 14 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN nullkey_c1_t2 USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 15 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 15 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t2 t2 WHERE t2.b > t1.a +) q USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 11 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t2 t2 WHERE t2.b > t1.a +) q USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 4 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t2 t2 WHERE t2.b > t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 9 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t2 t2 WHERE t2.b = t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t2 t2 WHERE t2.b > t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 7 +(1 row) + +-- non-colocated inner joins between null dist key tables +SELECT * FROM nullkey_c1_t1 JOIN nullkey_c2_t1 USING(a) ORDER BY 1,2,3; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +-- non-colocated outer joins between null dist key tables +SELECT * FROM nullkey_c1_t1 LEFT JOIN nullkey_c2_t2 USING(a) ORDER BY 1,2,3 LIMIT 4; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT * FROM nullkey_c1_t1 FULL JOIN nullkey_c2_t2 USING(a) ORDER BY 1,2,3 LIMIT 4; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT * FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +) q USING(a) ORDER BY 1,2,3 OFFSET 3 LIMIT 4; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c2_t2 t2 WHERE t2.b = t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +-- join with a reference table +SELECT COUNT(*) FROM nullkey_c1_t1, reference_table WHERE nullkey_c1_t1.a = reference_table.a; +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 8 +(1 row) + +WITH cte_1 AS + (SELECT * FROM nullkey_c1_t1, reference_table WHERE nullkey_c1_t1.a = reference_table.a ORDER BY 1,2,3,4 FOR UPDATE) +SELECT COUNT(*) FROM cte_1; +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 8 +(1 row) + +-- join with postgres / citus local tables +SELECT * FROM nullkey_c1_t1 JOIN postgres_local_table USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT * FROM nullkey_c1_t1 JOIN citus_local_table USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +-- join with a distributed table +SELECT * FROM distributed_table d1 JOIN nullkey_c1_t1 USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM distributed_table t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM distributed_table t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +-- outer joins with different table types +SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN reference_table USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 11 +(1 row) + +SELECT COUNT(*) FROM reference_table LEFT JOIN nullkey_c1_t1 USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 9 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN citus_local_table USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM citus_local_table LEFT JOIN nullkey_c1_t1 USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN postgres_local_table USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM postgres_local_table LEFT JOIN nullkey_c1_t1 USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN citus_local_table USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN postgres_local_table USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN reference_table USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 12 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN append_table USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner does not support append-partitioned tables. +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SET citus.enable_non_colocated_router_query_pushdown TO ON; +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a) WHERE range_table.a = 20; +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 0 +(1 row) + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a) WHERE range_table.a = 20; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: router planner does not support queries that reference non-colocated distributed tables +RESET citus.enable_non_colocated_router_query_pushdown; +-- lateral / semi / anti joins with different table types +-- with a reference table +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM reference_table t2 WHERE t2.b > t1.a +) q USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 11 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM reference_table t2 WHERE t2.b > t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 7 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE NOT EXISTS ( + SELECT * FROM reference_table t2 WHERE t2.b > t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 4 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM reference_table t2 WHERE t2.b = t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM reference_table t2 WHERE t2.b > t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 9 +(1 row) + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM reference_table t2 WHERE t2.b > t1.a +) q USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT COUNT(*) FROM reference_table t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 6 +(1 row) + +SELECT COUNT(*) FROM reference_table t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 6 +(1 row) + +SELECT COUNT(*) FROM reference_table t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT COUNT(*) FROM reference_table t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 4 +(1 row) + +SELECT COUNT(*) FROM reference_table t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 2 +(1 row) + +-- with a distributed table +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM distributed_table t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM distributed_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE NOT EXISTS ( + SELECT * FROM distributed_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM distributed_table t2 WHERE t2.b = t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM distributed_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM distributed_table t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM distributed_table t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM distributed_table t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +SELECT COUNT(*) FROM distributed_table t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +-- with postgres / citus local tables +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE NOT EXISTS ( + SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM citus_local_table t2 WHERE t2.b = t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM citus_local_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM citus_local_table t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM postgres_local_table t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM citus_local_table t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM citus_local_table t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM citus_local_table t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM citus_local_table t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM postgres_local_table t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM postgres_local_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE NOT EXISTS ( + SELECT * FROM postgres_local_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM postgres_local_table t2 WHERE t2.b = t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM postgres_local_table t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM postgres_local_table t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM postgres_local_table t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +SELECT COUNT(*) FROM postgres_local_table t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM postgres_local_table t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +SELECT COUNT(*) FROM postgres_local_table t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +-- insert .. select +-- between two colocated null dist key tables +-- The target list of "distributed statement"s that we send to workers +-- differ(*) in Postgres versions < 15. For this reason, we temporarily +-- disable debug messages here and run the EXPLAIN'ed version of the +-- command. +-- +-- (*): < SELECT a, b > vs < SELECT table_name.a, table_name.b > +SET client_min_messages TO WARNING; +EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE, VERBOSE FALSE) +INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c1_t2; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Insert on nullkey_c1_t1_1620000 citus_table_alias (actual rows=0 loops=1) + -> Seq Scan on nullkey_c1_t2_1620001 nullkey_c1_t2 (actual rows=10 loops=1) +(7 rows) + +SET client_min_messages TO DEBUG2; +-- between two non-colocated null dist key tables +INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c2_t1; +ERROR: cannot select from a non-colocated distributed table when inserting into a distributed table that does not have a shard key +-- between a null dist key table and a table of different type +INSERT INTO nullkey_c1_t1 SELECT * FROM reference_table; +ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key +INSERT INTO nullkey_c1_t1 SELECT * FROM distributed_table; +ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key +INSERT INTO nullkey_c1_t1 SELECT * FROM citus_local_table; +ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key +INSERT INTO nullkey_c1_t1 SELECT * FROM postgres_local_table; +ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key +INSERT INTO reference_table SELECT * FROM nullkey_c1_t1; +ERROR: cannot select from a distributed table that does not have a shard key when inserting into a different table type +INSERT INTO distributed_table SELECT * FROM nullkey_c1_t1; +ERROR: cannot select from a distributed table that does not have a shard key when inserting into a different table type +INSERT INTO citus_local_table SELECT * FROM nullkey_c1_t1; +ERROR: cannot select from a distributed table that does not have a shard key when inserting into a different table type +INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1; +ERROR: cannot select from a distributed table that does not have a shard key when inserting into a local table +-- test subquery +SELECT count(*) FROM +( + SELECT * FROM (SELECT * FROM nullkey_c1_t2) as subquery_inner +) AS subquery_top; +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 10 +(1 row) + +-- test cte inlining +WITH cte_nullkey_c1_t1 AS (SELECT * FROM nullkey_c1_t1), + cte_postgres_local_table AS (SELECT * FROM postgres_local_table), + cte_distributed_table AS (SELECT * FROM distributed_table) +SELECT COUNT(*) FROM cte_distributed_table, cte_nullkey_c1_t1, cte_postgres_local_table +WHERE cte_nullkey_c1_t1.a > 3 AND cte_distributed_table.a < 5; +DEBUG: CTE cte_nullkey_c1_t1 is going to be inlined via distributed planning +DEBUG: CTE cte_postgres_local_table is going to be inlined via distributed planning +DEBUG: CTE cte_distributed_table is going to be inlined via distributed planning +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Local tables cannot be used in distributed queries. +-- test recursive ctes +WITH level_0 AS ( + WITH level_1 AS ( + WITH RECURSIVE level_2_recursive(x) AS ( + VALUES (1) + UNION ALL + SELECT a + 1 FROM nullkey_c1_t1 JOIN level_2_recursive ON (a = x) WHERE a < 100 + ) + SELECT * FROM level_2_recursive RIGHT JOIN reference_table ON (level_2_recursive.x = reference_table.a) + ) + SELECT * FROM level_1 +) +SELECT COUNT(*) FROM level_0; +DEBUG: CTE level_0 is going to be inlined via distributed planning +DEBUG: CTE level_1 is going to be inlined via distributed planning +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 122 +(1 row) + +WITH level_0 AS ( + WITH level_1 AS ( + WITH RECURSIVE level_2_recursive(x) AS ( + VALUES (1) + UNION ALL + SELECT a + 1 FROM nullkey_c1_t1 JOIN level_2_recursive ON (a = x) WHERE a < 100 + ) + SELECT * FROM level_2_recursive JOIN distributed_table ON (level_2_recursive.x = distributed_table.a) + ) + SELECT * FROM level_1 +) +SELECT COUNT(*) FROM level_0; +DEBUG: CTE level_0 is going to be inlined via distributed planning +DEBUG: CTE level_1 is going to be inlined via distributed planning +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +-- grouping set +SELECT + id, substring(title, 2, 1) AS subtitle, count(*) + FROM articles_hash + WHERE author_id = 1 or author_id = 2 + GROUP BY GROUPING SETS ((id),(subtitle)) + ORDER BY id, subtitle; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + id | subtitle | count +--------------------------------------------------------------------- + 42 | | 1 + | u | 1 +(2 rows) + +-- subquery in SELECT clause +SELECT a.title AS name, (SELECT a2.id FROM articles_hash a2 WHERE a.id = a2.id LIMIT 1) + AS special_price FROM articles_hash a +ORDER BY 1,2; +DEBUG: Creating router plan + name | special_price +--------------------------------------------------------------------- + adversa | 15 + aerophyte | 28 + affixal | 43 + altdorfer | 4 + amateur | 29 + anjanette | 50 + anyone | 49 + aruru | 5 + aseyev | 13 + assembly | 18 + aubergiste | 19 + ausable | 42 +(12 rows) + +-- test prepared statements +-- prepare queries can be router plannable +PREPARE author_1_articles as + SELECT * + FROM articles_hash + WHERE author_id = 1; +EXECUTE author_1_articles; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_1_articles; + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_1_articles; + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_1_articles; + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_1_articles; + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_1_articles; + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +-- parametric prepare queries can be router plannable +PREPARE author_articles(int) as + SELECT * + FROM articles_hash + WHERE author_id = $1; +EXECUTE author_articles(1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(NULL); + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(NULL); + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(NULL); + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(NULL); + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(NULL); + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(NULL); + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +EXECUTE author_articles(NULL); + id | author_id | title | word_count +--------------------------------------------------------------------- +(0 rows) + +PREPARE author_articles_update(int) AS + UPDATE articles_hash SET title = 'test' WHERE author_id = $1; +EXECUTE author_articles_update(NULL); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE author_articles_update(NULL); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE author_articles_update(NULL); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE author_articles_update(NULL); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE author_articles_update(NULL); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE author_articles_update(NULL); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE author_articles_update(NULL); +-- More tests with insert .. select. +-- +-- The target list of "distributed statement"s that we send to workers +-- might differ(*) in Postgres versions < 15 and they are reported when +-- "log level >= DEBUG2". For this reason, we set log level to DEBUG1 to +-- avoid reporting them. +-- +-- DEBUG1 still allows reporting the reason why given INSERT .. SELECT +-- query is not distributed / requires pull-to-coordinator. +SET client_min_messages TO DEBUG1; +INSERT INTO bigserial_test (x, y) SELECT x, y FROM bigserial_test; +DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Sequences cannot be used in router queries +INSERT INTO agg_events + (user_id) +SELECT f2.id FROM +(SELECT + id +FROM (SELECT users_ref_table.user_id AS id + FROM raw_events_first, + users_ref_table + WHERE raw_events_first.user_id = users_ref_table.user_id ) AS foo) as f +INNER JOIN +(SELECT v4, + v1, + id +FROM (SELECT SUM(raw_events_second.value_4) AS v4, + SUM(raw_events_first.value_1) AS v1, + raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id + GROUP BY raw_events_second.user_id + HAVING SUM(raw_events_second.value_4) > 1000) AS foo2 ) as f2 +ON (f.id = f2.id) +WHERE f.id IN (SELECT user_id + FROM raw_events_second); +ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key +-- upsert with returning +INSERT INTO agg_events AS ae + ( + user_id, + value_1_agg, + agg_time + ) +SELECT user_id, + value_1, + time +FROM raw_events_first +ON conflict (user_id, value_1_agg) +DO UPDATE + SET agg_time = EXCLUDED.agg_time + WHERE ae.agg_time < EXCLUDED.agg_time +RETURNING user_id, value_1_agg; + user_id | value_1_agg +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 + 4 | 40 + 5 | 50 + 6 | 60 +(6 rows) + +-- using a left join +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id + WHERE raw_events_second.user_id = 10 OR raw_events_second.user_id = 11; +-- using a full join +INSERT INTO agg_events (user_id, value_1_agg) +SELECT t1.user_id AS col1, + t2.user_id AS col2 +FROM raw_events_first t1 + FULL JOIN raw_events_second t2 + ON t1.user_id = t2.user_id; +-- using semi join +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT raw_events_second.user_id + FROM raw_events_second, raw_events_first + WHERE raw_events_second.user_id = raw_events_first.user_id AND raw_events_first.user_id = 200); +-- using lateral join +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE NOT EXISTS (SELECT 1 + FROM raw_events_second + WHERE raw_events_second.user_id =raw_events_first.user_id); +-- using inner join +INSERT INTO agg_events (user_id) +SELECT raw_events_first.user_id +FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 +WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4); +-- We could relax distributed insert .. select checks to allow pushing +-- down more clauses down to the worker nodes when inserting into a single +-- shard by selecting from a colocated one. We might want to do something +-- like https://github.com/citusdata/citus/pull/6772. +-- +-- e.g., insert into null_shard_key_1/citus_local/reference +-- select * from null_shard_key_1/citus_local/reference limit 1 +-- +-- Below "limit / offset clause" test and some others are examples of this. +-- limit / offset clause +INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first LIMIT 1; +DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first OFFSET 1; +DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- using a materialized cte +WITH cte AS MATERIALIZED + (SELECT max(value_1)+1 as v1_agg, user_id FROM raw_events_first GROUP BY user_id) +INSERT INTO agg_events (value_1_agg, user_id) +SELECT v1_agg, user_id FROM cte; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO raw_events_second + WITH cte AS MATERIALIZED (SELECT * FROM raw_events_first) + SELECT user_id * 1000, time, value_1, value_2, value_3, value_4 FROM cte; +-- using a regular cte +WITH cte AS (SELECT * FROM raw_events_first) +INSERT INTO raw_events_second + SELECT user_id * 7000, time, value_1, value_2, value_3, value_4 FROM cte; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: CTE cte is going to be inlined via distributed planning +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO raw_events_second + WITH cte AS (SELECT * FROM raw_events_first) + SELECT * FROM cte; +DEBUG: CTE cte is going to be inlined via distributed planning +INSERT INTO agg_events + WITH sub_cte AS (SELECT 1) + SELECT + raw_events_first.user_id, (SELECT * FROM sub_cte) + FROM + raw_events_first; +DEBUG: CTE sub_cte is going to be inlined via distributed planning +DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- we still support complex joins via INSERT's cte list .. +WITH cte AS ( + SELECT reference_table.a AS a, 1 AS b + FROM distributed_table RIGHT JOIN reference_table USING (a) +) +INSERT INTO raw_events_second (user_id, value_1) + SELECT (a+5)*-1, b FROM cte; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: CTE cte is going to be inlined via distributed planning +DEBUG: recursively planning left side of the right join since the outer side is a recurring rel +DEBUG: recursively planning distributed relation "distributed_table" since it is part of a distributed join node that is outer joined with a recurring rel +DEBUG: Wrapping relation "distributed_table" to a subquery +DEBUG: generating subplan XXX_1 for subquery SELECT a FROM query_null_dist_key.distributed_table WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT "?column?" AS user_id, b AS value_1 FROM (SELECT ((cte.a OPERATOR(pg_catalog.+) 5) OPERATOR(pg_catalog.*) '-1'::integer), cte.b FROM (SELECT reference_table.a, 1 AS b FROM ((SELECT distributed_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) distributed_table_1) distributed_table RIGHT JOIN query_null_dist_key.reference_table USING (a))) cte) citus_insert_select_subquery("?column?", b) +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- .. but can't do so via via SELECT's cte list +INSERT INTO raw_events_second (user_id, value_1) +WITH cte AS ( + SELECT reference_table.a AS a, 1 AS b + FROM distributed_table RIGHT JOIN reference_table USING (a) +) + SELECT (a+5)*-1, b FROM cte; +DEBUG: CTE cte is going to be inlined via distributed planning +ERROR: cannot select from different table types when inserting into a distributed table that does not have a shard key +-- using set operations +INSERT INTO + raw_events_first(user_id) + (SELECT user_id FROM raw_events_first) INTERSECT + (SELECT user_id FROM raw_events_first); +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- group by clause inside subquery +INSERT INTO agg_events + (user_id) +SELECT f2.id FROM +(SELECT + id +FROM (SELECT raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id ) AS foo) as f +INNER JOIN +(SELECT v4, + v1, + id +FROM (SELECT SUM(raw_events_second.value_4) AS v4, + SUM(raw_events_first.value_1) AS v1, + raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id + GROUP BY raw_events_second.user_id + HAVING SUM(raw_events_second.value_4) > 1000) AS foo2 ) as f2 +ON (f.id = f2.id) +WHERE f.id IN (SELECT user_id + FROM raw_events_second); +-- group by clause inside lateral subquery +INSERT INTO agg_events (user_id, value_4_agg) +SELECT + averages.user_id, avg(averages.value_4) +FROM + (SELECT + t1.user_id + FROM + raw_events_second t1 JOIN raw_events_second t2 on (t1.user_id = t2.user_id) + ) reference_ids + JOIN LATERAL + (SELECT + user_id, value_4 + FROM + raw_events_first) as averages ON averages.value_4 = reference_ids.user_id + GROUP BY averages.user_id; +-- using aggregates +INSERT INTO agg_events + (value_3_agg, + value_4_agg, + value_1_agg, + value_2_agg, + user_id) +SELECT SUM(value_3), + Count(value_4), + user_id, + SUM(value_1), + Avg(value_2) +FROM raw_events_first +GROUP BY user_id; +-- using generate_series +INSERT INTO raw_events_first (user_id, value_1, value_2) +SELECT s, s, s FROM generate_series(1, 5) s; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CREATE SEQUENCE insert_select_test_seq; +-- nextval() expression in select's targetlist +INSERT INTO raw_events_first (user_id, value_1, value_2) +SELECT s, nextval('insert_select_test_seq'), (random()*10)::int +FROM generate_series(100, 105) s; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- non-immutable function +INSERT INTO modify_fast_path (key, value_1) VALUES (2,1) RETURNING value_1, random() * key; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: non-IMMUTABLE functions are not allowed in the RETURNING clause +SET client_min_messages TO DEBUG2; +-- update / delete +UPDATE nullkey_c1_t1 SET a = 1 WHERE b = 5; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +UPDATE nullkey_c1_t1 SET a = 1 WHERE a = 5; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +UPDATE nullkey_c1_t1 SET a = random(); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: functions used in UPDATE queries on distributed tables must not be VOLATILE +UPDATE nullkey_c1_t1 SET a = 1 WHERE a = random(); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +DELETE FROM nullkey_c1_t1 WHERE b = 5; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DELETE FROM nullkey_c1_t1 WHERE a = random(); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +-- simple update queries between different table types / colocated tables +UPDATE nullkey_c1_t1 SET b = 5 FROM nullkey_c1_t2 WHERE nullkey_c1_t1.b = nullkey_c1_t2.b; +DEBUG: Creating router plan +UPDATE nullkey_c1_t1 SET b = 5 FROM nullkey_c2_t1 WHERE nullkey_c1_t1.b = nullkey_c2_t1.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +UPDATE nullkey_c1_t1 SET b = 5 FROM reference_table WHERE nullkey_c1_t1.b = reference_table.b; +DEBUG: Creating router plan +UPDATE nullkey_c1_t1 SET b = 5 FROM distributed_table WHERE nullkey_c1_t1.b = distributed_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +UPDATE nullkey_c1_t1 SET b = 5 FROM distributed_table WHERE nullkey_c1_t1.b = distributed_table.a; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +UPDATE nullkey_c1_t1 SET b = 5 FROM citus_local_table WHERE nullkey_c1_t1.b = citus_local_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: local table citus_local_table cannot be joined with these distributed tables +UPDATE nullkey_c1_t1 SET b = 5 FROM postgres_local_table WHERE nullkey_c1_t1.b = postgres_local_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: relation postgres_local_table is not distributed +UPDATE reference_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = reference_table.b; +ERROR: cannot perform select on a distributed table and modify a reference table +UPDATE distributed_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = distributed_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +UPDATE distributed_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = distributed_table.a; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +UPDATE citus_local_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = citus_local_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: local table citus_local_table cannot be joined with these distributed tables +UPDATE postgres_local_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = postgres_local_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: relation postgres_local_table is not distributed +-- simple delete queries between different table types / colocated tables +DELETE FROM nullkey_c1_t1 USING nullkey_c1_t2 WHERE nullkey_c1_t1.b = nullkey_c1_t2.b; +DEBUG: Creating router plan +DELETE FROM nullkey_c1_t1 USING nullkey_c2_t1 WHERE nullkey_c1_t1.b = nullkey_c2_t1.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +DELETE FROM nullkey_c1_t1 USING reference_table WHERE nullkey_c1_t1.b = reference_table.b; +DEBUG: Creating router plan +DELETE FROM nullkey_c1_t1 USING distributed_table WHERE nullkey_c1_t1.b = distributed_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +DELETE FROM nullkey_c1_t1 USING distributed_table WHERE nullkey_c1_t1.b = distributed_table.a; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +DELETE FROM nullkey_c1_t1 USING citus_local_table WHERE nullkey_c1_t1.b = citus_local_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: local table citus_local_table cannot be joined with these distributed tables +DELETE FROM nullkey_c1_t1 USING postgres_local_table WHERE nullkey_c1_t1.b = postgres_local_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: relation postgres_local_table is not distributed +DELETE FROM reference_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = reference_table.b; +ERROR: cannot perform select on a distributed table and modify a reference table +DELETE FROM distributed_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = distributed_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +DELETE FROM distributed_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = distributed_table.a; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +DELETE FROM citus_local_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = citus_local_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: local table citus_local_table cannot be joined with these distributed tables +DELETE FROM postgres_local_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = postgres_local_table.b; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: relation postgres_local_table is not distributed +-- slightly more complex update queries +UPDATE nullkey_c1_t1 SET b = 5 WHERE nullkey_c1_t1.b IN (SELECT b FROM distributed_table); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +WITH cte AS materialized( + SELECT * FROM distributed_table +) +UPDATE nullkey_c1_t1 SET b = 5 FROM cte WHERE nullkey_c1_t1.b = cte.a; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +WITH cte AS ( + SELECT reference_table.a AS a, 1 AS b + FROM distributed_table RIGHT JOIN reference_table USING (a) +) +UPDATE nullkey_c1_t1 SET b = 5 WHERE nullkey_c1_t1.b IN (SELECT b FROM cte); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +UPDATE nullkey_c1_t1 SET b = 5 FROM reference_table WHERE EXISTS ( + SELECT 1 FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a) WHERE nullkey_c1_t1.b IS NULL +); +DEBUG: Creating router plan +UPDATE nullkey_c1_t1 tx SET b = ( + SELECT nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN nullkey_c1_t1 ON (nullkey_c1_t1.a != nullkey_c1_t2.a) WHERE nullkey_c1_t1.a = tx.a ORDER BY 1 LIMIT 1 +); +DEBUG: Creating router plan +UPDATE nullkey_c1_t1 tx SET b = t2.b FROM nullkey_c1_t1 t1 JOIN nullkey_c1_t2 t2 ON (t1.a = t2.a); +DEBUG: Creating router plan +WITH cte AS ( + SELECT * FROM nullkey_c1_t2 ORDER BY 1,2 LIMIT 10 +) +UPDATE nullkey_c1_t1 SET b = 5 WHERE nullkey_c1_t1.a IN (SELECT b FROM cte); +DEBUG: Creating router plan +UPDATE modify_fast_path SET value_1 = value_1 + 12 * value_1 WHERE key = 1; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +UPDATE modify_fast_path SET value_1 = NULL WHERE value_1 = 15 AND (key = 1 OR value_2 = 'citus'); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +UPDATE modify_fast_path SET value_1 = 5 WHERE key = 2 RETURNING value_1 * 15, value_1::numeric * 16; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + ?column? | ?column? +--------------------------------------------------------------------- +(0 rows) + +UPDATE modify_fast_path + SET value_1 = 1 + FROM modify_fast_path_reference + WHERE + modify_fast_path.key = modify_fast_path_reference.key AND + modify_fast_path.key = 1 AND + modify_fast_path_reference.key = 1; +DEBUG: Creating router plan +PREPARE p1 (int, int, int) AS + UPDATE modify_fast_path SET value_1 = value_1 + $1 WHERE key = $2 AND value_1 = $3; +EXECUTE p1(1,1,1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p1(2,2,2); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p1(3,3,3); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p1(4,4,4); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p1(5,5,5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p1(6,6,6); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p1(7,7,7); +PREPARE prepared_zero_shard_update(int) AS UPDATE modify_fast_path SET value_1 = 1 WHERE key = $1 AND false; +EXECUTE prepared_zero_shard_update(1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_update(2); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_update(3); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_update(4); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_update(5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_update(6); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_update(7); +-- slightly more complex delete queries +DELETE FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b IN (SELECT b FROM distributed_table); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +WITH cte AS materialized( + SELECT * FROM distributed_table +) +DELETE FROM nullkey_c1_t1 USING cte WHERE nullkey_c1_t1.b = cte.a; +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +WITH cte AS ( + SELECT reference_table.a AS a, 1 AS b + FROM distributed_table RIGHT JOIN reference_table USING (a) +) +DELETE FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b IN (SELECT b FROM cte); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +DELETE FROM nullkey_c1_t1 USING reference_table WHERE EXISTS ( + SELECT 1 FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a) WHERE nullkey_c1_t1.b IS NULL +); +DEBUG: Creating router plan +DELETE FROM nullkey_c1_t1 tx USING nullkey_c1_t1 t1 JOIN nullkey_c1_t2 t2 ON (t1.a = t2.a); +DEBUG: Creating router plan +WITH cte AS ( + SELECT * FROM nullkey_c1_t2 ORDER BY 1,2 LIMIT 10 +) +DELETE FROM nullkey_c1_t1 WHERE nullkey_c1_t1.a IN (SELECT b FROM cte); +DEBUG: Creating router plan +DELETE FROM modify_fast_path WHERE value_1 = 15 AND (key = 1 OR value_2 = 'citus'); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DELETE FROM modify_fast_path WHERE key = 2 RETURNING value_1 * 15, value_1::numeric * 16; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + ?column? | ?column? +--------------------------------------------------------------------- +(0 rows) + +DELETE FROM modify_fast_path + USING modify_fast_path_reference + WHERE + modify_fast_path.key = modify_fast_path_reference.key AND + modify_fast_path.key = 1 AND + modify_fast_path_reference.key = 1; +DEBUG: Creating router plan +PREPARE p2 (int, int, int) AS + DELETE FROM modify_fast_path WHERE key = ($2)*$1 AND value_1 = $3; +EXECUTE p2(1,1,1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p2(2,2,2); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p2(3,3,3); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p2(4,4,4); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p2(5,5,5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p2(6,6,6); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE p2(7,7,7); +PREPARE prepared_zero_shard_delete(int) AS DELETE FROM modify_fast_path WHERE key = $1 AND false; +EXECUTE prepared_zero_shard_delete(1); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_delete(2); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_delete(3); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_delete(4); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_delete(5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_delete(6); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +EXECUTE prepared_zero_shard_delete(7); +-- test modifying ctes +WITH cte AS ( + UPDATE modify_fast_path SET value_1 = value_1 + 1 WHERE key = 1 RETURNING * +) +SELECT * FROM cte; +DEBUG: Creating router plan + key | value_1 | value_2 +--------------------------------------------------------------------- +(0 rows) + +WITH cte AS ( + DELETE FROM modify_fast_path WHERE key = 1 RETURNING * +) +SELECT * FROM modify_fast_path; +DEBUG: Creating router plan + key | value_1 | value_2 +--------------------------------------------------------------------- +(0 rows) + +WITH cte AS ( + DELETE FROM modify_fast_path WHERE key = 1 RETURNING * +) +SELECT * FROM modify_fast_path_reference WHERE key IN (SELECT key FROM cte); +DEBUG: Creating router plan + key | value_1 | value_2 +--------------------------------------------------------------------- +(0 rows) + +WITH cte AS ( + DELETE FROM reference_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE a IN (SELECT a FROM cte); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: cannot router plan modification of a non-distributed table +WITH cte AS ( + DELETE FROM nullkey_c1_t1 WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t2 WHERE a IN (SELECT a FROM cte); +DEBUG: Creating router plan + a | b +--------------------------------------------------------------------- +(0 rows) + +WITH cte AS ( + DELETE FROM nullkey_c1_t1 WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c2_t1 WHERE a IN (SELECT a FROM cte); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: found no worker with all shard placements +WITH cte AS ( + DELETE FROM nullkey_c1_t1 WHERE a = 1 RETURNING * +) +SELECT * FROM distributed_table WHERE a IN (SELECT a FROM cte); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: Router planner cannot handle multi-shard select queries +-- Below two queries fail very late when +-- citus.enable_non_colocated_router_query_pushdown is set to on. +SET citus.enable_non_colocated_router_query_pushdown TO ON; +WITH cte AS ( + DELETE FROM distributed_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE a IN (SELECT a FROM cte); +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 1 +ERROR: relation "query_null_dist_key.nullkey_c1_t1_1620000" does not exist +CONTEXT: while executing command on localhost:xxxxx +WITH cte AS ( + DELETE FROM distributed_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE b IN (SELECT b FROM cte); +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 1 +ERROR: relation "query_null_dist_key.nullkey_c1_t1_1620000" does not exist +CONTEXT: while executing command on localhost:xxxxx +SET citus.enable_non_colocated_router_query_pushdown TO OFF; +WITH cte AS ( + DELETE FROM distributed_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE a IN (SELECT a FROM cte); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: router planner does not support queries that reference non-colocated distributed tables +WITH cte AS ( + DELETE FROM distributed_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE b IN (SELECT b FROM cte); +ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables +DETAIL: router planner does not support queries that reference non-colocated distributed tables +RESET citus.enable_non_colocated_router_query_pushdown; +WITH cte AS ( + UPDATE modify_fast_path SET value_1 = value_1 + 1 WHERE key = 1 RETURNING * +) +UPDATE modify_fast_path SET value_1 = value_1 + 1 WHERE key = 1; +DEBUG: Creating router plan +WITH cte AS ( + DELETE FROM modify_fast_path WHERE key = 1 RETURNING * +) +DELETE FROM modify_fast_path WHERE key = 1; +DEBUG: Creating router plan +-- test window functions +SELECT + user_id, avg(avg(value_3)) OVER (PARTITION BY user_id, MIN(value_2)) +FROM + raw_events_first +GROUP BY + 1 +ORDER BY + 2 DESC NULLS LAST, 1 DESC; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + user_id | avg +--------------------------------------------------------------------- + 6 | 6000.1 + 5 | 5000.1 + 4 | 4000.1 + 3 | 3000.1 + 2 | 2000.1 + 1 | 1000.1 + 105 | + 104 | + 103 | + 102 | + 101 | + 100 | +(12 rows) + +SELECT + user_id, max(value_1) OVER (PARTITION BY user_id, MIN(value_2)) +FROM ( + SELECT + DISTINCT us.user_id, us.value_2, us.value_1, random() as r1 + FROM + raw_events_first as us, raw_events_second + WHERE + us.user_id = raw_events_second.user_id + ORDER BY + user_id, value_2 + ) s +GROUP BY + 1, value_1 +ORDER BY + 2 DESC, 1; +DEBUG: Creating router plan + user_id | max +--------------------------------------------------------------------- + 1 | + 2 | + 3 | + 4 | + 5 | + 6 | + 6 | 60 + 5 | 50 + 4 | 40 + 3 | 30 + 2 | 20 + 1 | 10 + 5 | 5 + 4 | 4 + 3 | 3 + 2 | 2 + 1 | 1 +(17 rows) + +SELECT + DISTINCT ON (raw_events_second.user_id, rnk) raw_events_second.user_id, rank() OVER my_win AS rnk +FROM + raw_events_second, raw_events_first +WHERE + raw_events_first.user_id = raw_events_second.user_id +WINDOW + my_win AS (PARTITION BY raw_events_second.user_id, raw_events_first.value_1 ORDER BY raw_events_second.time DESC) +ORDER BY + rnk DESC, 1 DESC +LIMIT 10; +DEBUG: Creating router plan + user_id | rnk +--------------------------------------------------------------------- + 6 | 2 + 5 | 2 + 4 | 2 + 3 | 2 + 2 | 2 + 1 | 2 + 6 | 1 + 5 | 1 + 4 | 1 + 3 | 1 +(10 rows) + +SET client_min_messages TO ERROR; +DROP SCHEMA query_null_dist_key CASCADE; +SELECT citus_remove_node('localhost', :master_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index 5d6fbb068..45adf469e 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -200,6 +200,7 @@ test: local_table_join test: local_dist_join_mixed test: citus_local_dist_joins test: recurring_outer_join +test: query_null_dist_key test: pg_dump # --------- diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index 5b9190516..1fdc3a514 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -2051,6 +2051,118 @@ UPDATE SET val = dist_source.val WHEN NOT MATCHED THEN INSERT VALUES(dist_source.id, dist_source.val); +-- test merge with null shard key tables + +CREATE SCHEMA query_null_dist_key; + +SET search_path TO query_null_dist_key; +SET client_min_messages TO DEBUG2; + +CREATE TABLE nullkey_c1_t1(a int, b int); +CREATE TABLE nullkey_c1_t2(a int, b int); +SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none'); +SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1'); + +CREATE TABLE nullkey_c2_t1(a int, b int); +CREATE TABLE nullkey_c2_t2(a int, b int); +SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none'); +SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>null); + +CREATE TABLE reference_table(a int, b int); +SELECT create_reference_table('reference_table'); +INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; + +CREATE TABLE distributed_table(a int, b int); +SELECT create_distributed_table('distributed_table', 'a'); +INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i; + +CREATE TABLE citus_local_table(a int, b int); +SELECT citus_add_local_table_to_metadata('citus_local_table'); +INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; + +CREATE TABLE postgres_local_table(a int, b int); +INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; + +-- with a colocated table +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b; + +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN DELETE; + +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); + +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); + +-- with non-colocated null-dist-key table +MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b; + +MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b); + +-- with a distributed table +MERGE INTO nullkey_c1_t1 USING distributed_table ON (nullkey_c1_t1.a = distributed_table.a) +WHEN MATCHED THEN UPDATE SET b = distributed_table.b +WHEN NOT MATCHED THEN INSERT VALUES (distributed_table.a, distributed_table.b); + +MERGE INTO distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = distributed_table.a) +WHEN MATCHED THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); + +-- with a reference table +MERGE INTO nullkey_c1_t1 USING reference_table ON (nullkey_c1_t1.a = reference_table.a) +WHEN MATCHED THEN UPDATE SET b = reference_table.b; + +MERGE INTO reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = reference_table.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); + +-- with a citus local table +MERGE INTO nullkey_c1_t1 USING citus_local_table ON (nullkey_c1_t1.a = citus_local_table.a) +WHEN MATCHED THEN UPDATE SET b = citus_local_table.b; + +MERGE INTO citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = citus_local_table.a) +WHEN MATCHED THEN DELETE; + +-- with a postgres table +MERGE INTO nullkey_c1_t1 USING postgres_local_table ON (nullkey_c1_t1.a = postgres_local_table.a) +WHEN MATCHED THEN UPDATE SET b = postgres_local_table.b; + +MERGE INTO postgres_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = postgres_local_table.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); + +-- using ctes +WITH cte AS ( + SELECT * FROM nullkey_c1_t1 +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; + +WITH cte AS ( + SELECT * FROM distributed_table +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; + +WITH cte AS materialized ( + SELECT * FROM distributed_table +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; + +SET client_min_messages TO WARNING; +DROP SCHEMA query_null_dist_key CASCADE; + +RESET client_min_messages; +SET search_path TO merge_schema; + DROP SERVER foreign_server CASCADE; DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; diff --git a/src/test/regress/sql/query_null_dist_key.sql b/src/test/regress/sql/query_null_dist_key.sql new file mode 100644 index 000000000..f5d1fe3fc --- /dev/null +++ b/src/test/regress/sql/query_null_dist_key.sql @@ -0,0 +1,1132 @@ +CREATE SCHEMA query_null_dist_key; +SET search_path TO query_null_dist_key; + +SET citus.next_shard_id TO 1620000; +SET citus.shard_count TO 32; + +SET client_min_messages TO WARNING; +SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0); + +SET client_min_messages TO NOTICE; + +CREATE TABLE nullkey_c1_t1(a int, b int); +CREATE TABLE nullkey_c1_t2(a int, b int); +SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none'); +SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1'); +INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i; +INSERT INTO nullkey_c1_t2 SELECT i, i FROM generate_series(2, 7) i; + +CREATE TABLE nullkey_c2_t1(a int, b int); +CREATE TABLE nullkey_c2_t2(a int, b int); +SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none'); +SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>null); +INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i; +INSERT INTO nullkey_c2_t2 SELECT i, i FROM generate_series(1, 8) i; + +CREATE TABLE nullkey_c3_t1(a int, b int); +SELECT create_distributed_table('nullkey_c3_t1', null, colocate_with=>'none'); +INSERT INTO nullkey_c3_t1 SELECT i, i FROM generate_series(1, 8) i; + +CREATE TABLE reference_table(a int, b int); +SELECT create_reference_table('reference_table'); +INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; + +CREATE TABLE distributed_table(a int, b int); +SELECT create_distributed_table('distributed_table', 'a'); +INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i; + +CREATE TABLE citus_local_table(a int, b int); +SELECT citus_add_local_table_to_metadata('citus_local_table'); +INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; + +CREATE TABLE postgres_local_table(a int, b int); +INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; + +CREATE TABLE articles_hash ( + id bigint NOT NULL, + author_id bigint NOT NULL, + title varchar(20) NOT NULL, + word_count integer +); + +INSERT INTO articles_hash VALUES ( 4, 4, 'altdorfer', 14551),( 5, 5, 'aruru', 11389), + (13, 3, 'aseyev', 2255),(15, 5, 'adversa', 3164), + (18, 8, 'assembly', 911),(19, 9, 'aubergiste', 4981), + (28, 8, 'aerophyte', 5454),(29, 9, 'amateur', 9524), + (42, 2, 'ausable', 15885),(43, 3, 'affixal', 12723), + (49, 9, 'anyone', 2681),(50, 10, 'anjanette', 19519); + +SELECT create_distributed_table('articles_hash', null, colocate_with=>'none'); + +CREATE TABLE raw_events_first (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1)); +SELECT create_distributed_table('raw_events_first', null, colocate_with=>'none', distribution_type=>null); + +CREATE TABLE raw_events_second (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1)); +SELECT create_distributed_table('raw_events_second', null, colocate_with=>'raw_events_first', distribution_type=>null); + +CREATE TABLE agg_events (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp, UNIQUE(user_id, value_1_agg)); +SELECT create_distributed_table('agg_events', null, colocate_with=>'raw_events_first', distribution_type=>null); + +CREATE TABLE users_ref_table (user_id int); +SELECT create_reference_table('users_ref_table'); + +INSERT INTO raw_events_first VALUES (1, '1970-01-01', 10, 100, 1000.1, 10000), (3, '1971-01-01', 30, 300, 3000.1, 30000), + (5, '1972-01-01', 50, 500, 5000.1, 50000), (2, '1973-01-01', 20, 200, 2000.1, 20000), + (4, '1974-01-01', 40, 400, 4000.1, 40000), (6, '1975-01-01', 60, 600, 6000.1, 60000); + +CREATE TABLE modify_fast_path(key int, value_1 int, value_2 text); +SELECT create_distributed_table('modify_fast_path', null); + +CREATE TABLE modify_fast_path_reference(key int, value_1 int, value_2 text); +SELECT create_reference_table('modify_fast_path_reference'); + +CREATE TABLE bigserial_test (x int, y int, z bigserial); +SELECT create_distributed_table('bigserial_test', null); + +CREATE TABLE append_table (text_col text, a int); +SELECT create_distributed_table('append_table', 'a', 'append'); +SELECT master_create_empty_shard('append_table') AS shardid1 \gset +SELECT master_create_empty_shard('append_table') AS shardid2 \gset +SELECT master_create_empty_shard('append_table') AS shardid3 \gset + +COPY append_table (text_col, a) FROM STDIN WITH (format 'csv', append_to_shard :shardid1); +abc,234 +bcd,123 +bcd,234 +cde,345 +def,456 +efg,234 +\. + +COPY append_table (text_col, a) FROM STDIN WITH (format 'csv', append_to_shard :shardid2); +abc,123 +efg,123 +hij,123 +hij,234 +ijk,1 +jkl,0 +\. + +CREATE TABLE range_table(a int, b int); +SELECT create_distributed_table('range_table', 'a', 'range'); +CALL public.create_range_partitioned_shards('range_table', '{"0","25"}','{"24","49"}'); +INSERT INTO range_table VALUES (0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 50); + +SET client_min_messages to DEBUG2; + +-- simple insert +INSERT INTO nullkey_c1_t1 VALUES (1,2), (2,2), (3,4); +INSERT INTO nullkey_c1_t2 VALUES (1,3), (3,4), (5,1), (6,2); + +INSERT INTO nullkey_c2_t1 VALUES (1,0), (2,5), (4,3), (5,2); +INSERT INTO nullkey_c2_t2 VALUES (2,4), (3,2), (5,2), (7,4); + +-- simple select +SELECT * FROM nullkey_c1_t1 ORDER BY 1,2; + +-- for update / share +SELECT * FROM modify_fast_path WHERE key = 1 FOR UPDATE; +SELECT * FROM modify_fast_path WHERE key = 1 FOR SHARE; +SELECT * FROM modify_fast_path FOR UPDATE; +SELECT * FROM modify_fast_path FOR SHARE; + +-- cartesian product with different table types + +-- with other table types +SELECT COUNT(*) FROM distributed_table d1, nullkey_c1_t1; +SELECT COUNT(*) FROM reference_table d1, nullkey_c1_t1; +SELECT COUNT(*) FROM citus_local_table d1, nullkey_c1_t1; +SELECT COUNT(*) FROM postgres_local_table d1, nullkey_c1_t1; + +-- with a colocated null dist key table +SELECT COUNT(*) FROM nullkey_c1_t1 d1, nullkey_c1_t2; + +-- with a non-colocated null dist key table +SELECT COUNT(*) FROM nullkey_c1_t1 d1, nullkey_c2_t1; + +-- First, show that nullkey_c1_t1 and nullkey_c3_t1 are not colocated. +SELECT + (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'query_null_dist_key.nullkey_c1_t1'::regclass) != + (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'query_null_dist_key.nullkey_c3_t1'::regclass); + +-- Now verify that we can join them via router planner because it doesn't care +-- about whether two tables are colocated or not but physical location of shards +-- when citus.enable_non_colocated_router_query_pushdown is set to on. + +SET citus.enable_non_colocated_router_query_pushdown TO ON; + +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN nullkey_c3_t1 USING(a); + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; + +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN nullkey_c3_t1 USING(a); + +RESET citus.enable_non_colocated_router_query_pushdown; + +-- colocated join between null dist key tables +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING(a); +SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN nullkey_c1_t2 USING(a); +SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t2 t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t2 t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t2 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t2 t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t2 t2 WHERE t2.b > t1.a +); + +-- non-colocated inner joins between null dist key tables +SELECT * FROM nullkey_c1_t1 JOIN nullkey_c2_t1 USING(a) ORDER BY 1,2,3; + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +) q USING(a); + +-- non-colocated outer joins between null dist key tables +SELECT * FROM nullkey_c1_t1 LEFT JOIN nullkey_c2_t2 USING(a) ORDER BY 1,2,3 LIMIT 4; +SELECT * FROM nullkey_c1_t1 FULL JOIN nullkey_c2_t2 USING(a) ORDER BY 1,2,3 LIMIT 4; +SELECT * FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +) q USING(a) ORDER BY 1,2,3 OFFSET 3 LIMIT 4; + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c2_t2 t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a +); + +-- join with a reference table +SELECT COUNT(*) FROM nullkey_c1_t1, reference_table WHERE nullkey_c1_t1.a = reference_table.a; + +WITH cte_1 AS + (SELECT * FROM nullkey_c1_t1, reference_table WHERE nullkey_c1_t1.a = reference_table.a ORDER BY 1,2,3,4 FOR UPDATE) +SELECT COUNT(*) FROM cte_1; + +-- join with postgres / citus local tables +SELECT * FROM nullkey_c1_t1 JOIN postgres_local_table USING(a); +SELECT * FROM nullkey_c1_t1 JOIN citus_local_table USING(a); + +-- join with a distributed table +SELECT * FROM distributed_table d1 JOIN nullkey_c1_t1 USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM distributed_table t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM distributed_table t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); + +-- outer joins with different table types +SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN reference_table USING(a); +SELECT COUNT(*) FROM reference_table LEFT JOIN nullkey_c1_t1 USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN citus_local_table USING(a); +SELECT COUNT(*) FROM citus_local_table LEFT JOIN nullkey_c1_t1 USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN postgres_local_table USING(a); +SELECT COUNT(*) FROM postgres_local_table LEFT JOIN nullkey_c1_t1 USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN citus_local_table USING(a); +SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN postgres_local_table USING(a); +SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN reference_table USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN append_table USING(a); +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a); + +SET citus.enable_non_colocated_router_query_pushdown TO ON; + +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a) WHERE range_table.a = 20; + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; + +SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a) WHERE range_table.a = 20; + +RESET citus.enable_non_colocated_router_query_pushdown; + +-- lateral / semi / anti joins with different table types + +-- with a reference table +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM reference_table t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM reference_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE NOT EXISTS ( + SELECT * FROM reference_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM reference_table t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM reference_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM reference_table t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM reference_table t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM reference_table t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM reference_table t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM reference_table t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM reference_table t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); + +-- with a distributed table +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM distributed_table t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM distributed_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE NOT EXISTS ( + SELECT * FROM distributed_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM distributed_table t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM distributed_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM distributed_table t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM distributed_table t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM distributed_table t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM distributed_table t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); + +-- with postgres / citus local tables +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE NOT EXISTS ( + SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM citus_local_table t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM citus_local_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM citus_local_table t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM postgres_local_table t1 +LEFT JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM citus_local_table t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM citus_local_table t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM citus_local_table t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM citus_local_table t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +LEFT JOIN LATERAL ( + SELECT * FROM postgres_local_table t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE EXISTS ( + SELECT * FROM postgres_local_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE NOT EXISTS ( + SELECT * FROM postgres_local_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b IN ( + SELECT b+1 FROM postgres_local_table t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +WHERE t1.b NOT IN ( + SELECT a FROM postgres_local_table t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM nullkey_c1_t1 t1 +JOIN LATERAL ( + SELECT * FROM postgres_local_table t2 WHERE t2.b > t1.a +) q USING(a); + +SELECT COUNT(*) FROM postgres_local_table t1 +WHERE EXISTS ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM postgres_local_table t1 +WHERE t1.b IN ( + SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a +); + +SELECT COUNT(*) FROM postgres_local_table t1 +WHERE t1.b NOT IN ( + SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +); + +SELECT COUNT(*) FROM postgres_local_table t1 +JOIN LATERAL ( + SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a +) q USING(a); + +-- insert .. select + +-- between two colocated null dist key tables + +-- The target list of "distributed statement"s that we send to workers +-- differ(*) in Postgres versions < 15. For this reason, we temporarily +-- disable debug messages here and run the EXPLAIN'ed version of the +-- command. +-- +-- (*): < SELECT a, b > vs < SELECT table_name.a, table_name.b > +SET client_min_messages TO WARNING; +EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE, VERBOSE FALSE) +INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c1_t2; +SET client_min_messages TO DEBUG2; + +-- between two non-colocated null dist key tables +INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c2_t1; + +-- between a null dist key table and a table of different type +INSERT INTO nullkey_c1_t1 SELECT * FROM reference_table; +INSERT INTO nullkey_c1_t1 SELECT * FROM distributed_table; +INSERT INTO nullkey_c1_t1 SELECT * FROM citus_local_table; +INSERT INTO nullkey_c1_t1 SELECT * FROM postgres_local_table; + +INSERT INTO reference_table SELECT * FROM nullkey_c1_t1; +INSERT INTO distributed_table SELECT * FROM nullkey_c1_t1; +INSERT INTO citus_local_table SELECT * FROM nullkey_c1_t1; +INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1; + +-- test subquery +SELECT count(*) FROM +( + SELECT * FROM (SELECT * FROM nullkey_c1_t2) as subquery_inner +) AS subquery_top; + +-- test cte inlining +WITH cte_nullkey_c1_t1 AS (SELECT * FROM nullkey_c1_t1), + cte_postgres_local_table AS (SELECT * FROM postgres_local_table), + cte_distributed_table AS (SELECT * FROM distributed_table) +SELECT COUNT(*) FROM cte_distributed_table, cte_nullkey_c1_t1, cte_postgres_local_table +WHERE cte_nullkey_c1_t1.a > 3 AND cte_distributed_table.a < 5; + +-- test recursive ctes +WITH level_0 AS ( + WITH level_1 AS ( + WITH RECURSIVE level_2_recursive(x) AS ( + VALUES (1) + UNION ALL + SELECT a + 1 FROM nullkey_c1_t1 JOIN level_2_recursive ON (a = x) WHERE a < 100 + ) + SELECT * FROM level_2_recursive RIGHT JOIN reference_table ON (level_2_recursive.x = reference_table.a) + ) + SELECT * FROM level_1 +) +SELECT COUNT(*) FROM level_0; + +WITH level_0 AS ( + WITH level_1 AS ( + WITH RECURSIVE level_2_recursive(x) AS ( + VALUES (1) + UNION ALL + SELECT a + 1 FROM nullkey_c1_t1 JOIN level_2_recursive ON (a = x) WHERE a < 100 + ) + SELECT * FROM level_2_recursive JOIN distributed_table ON (level_2_recursive.x = distributed_table.a) + ) + SELECT * FROM level_1 +) +SELECT COUNT(*) FROM level_0; + +-- grouping set +SELECT + id, substring(title, 2, 1) AS subtitle, count(*) + FROM articles_hash + WHERE author_id = 1 or author_id = 2 + GROUP BY GROUPING SETS ((id),(subtitle)) + ORDER BY id, subtitle; + +-- subquery in SELECT clause +SELECT a.title AS name, (SELECT a2.id FROM articles_hash a2 WHERE a.id = a2.id LIMIT 1) + AS special_price FROM articles_hash a +ORDER BY 1,2; + +-- test prepared statements + +-- prepare queries can be router plannable +PREPARE author_1_articles as + SELECT * + FROM articles_hash + WHERE author_id = 1; + +EXECUTE author_1_articles; +EXECUTE author_1_articles; +EXECUTE author_1_articles; +EXECUTE author_1_articles; +EXECUTE author_1_articles; +EXECUTE author_1_articles; + +-- parametric prepare queries can be router plannable +PREPARE author_articles(int) as + SELECT * + FROM articles_hash + WHERE author_id = $1; + +EXECUTE author_articles(1); +EXECUTE author_articles(1); +EXECUTE author_articles(1); +EXECUTE author_articles(1); +EXECUTE author_articles(1); +EXECUTE author_articles(1); + +EXECUTE author_articles(NULL); +EXECUTE author_articles(NULL); +EXECUTE author_articles(NULL); +EXECUTE author_articles(NULL); +EXECUTE author_articles(NULL); +EXECUTE author_articles(NULL); +EXECUTE author_articles(NULL); + +PREPARE author_articles_update(int) AS + UPDATE articles_hash SET title = 'test' WHERE author_id = $1; + +EXECUTE author_articles_update(NULL); +EXECUTE author_articles_update(NULL); +EXECUTE author_articles_update(NULL); +EXECUTE author_articles_update(NULL); +EXECUTE author_articles_update(NULL); +EXECUTE author_articles_update(NULL); +EXECUTE author_articles_update(NULL); + +-- More tests with insert .. select. +-- +-- The target list of "distributed statement"s that we send to workers +-- might differ(*) in Postgres versions < 15 and they are reported when +-- "log level >= DEBUG2". For this reason, we set log level to DEBUG1 to +-- avoid reporting them. +-- +-- DEBUG1 still allows reporting the reason why given INSERT .. SELECT +-- query is not distributed / requires pull-to-coordinator. + +SET client_min_messages TO DEBUG1; + +INSERT INTO bigserial_test (x, y) SELECT x, y FROM bigserial_test; + +INSERT INTO agg_events + (user_id) +SELECT f2.id FROM + +(SELECT + id +FROM (SELECT users_ref_table.user_id AS id + FROM raw_events_first, + users_ref_table + WHERE raw_events_first.user_id = users_ref_table.user_id ) AS foo) as f +INNER JOIN +(SELECT v4, + v1, + id +FROM (SELECT SUM(raw_events_second.value_4) AS v4, + SUM(raw_events_first.value_1) AS v1, + raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id + GROUP BY raw_events_second.user_id + HAVING SUM(raw_events_second.value_4) > 1000) AS foo2 ) as f2 +ON (f.id = f2.id) +WHERE f.id IN (SELECT user_id + FROM raw_events_second); + +-- upsert with returning +INSERT INTO agg_events AS ae + ( + user_id, + value_1_agg, + agg_time + ) +SELECT user_id, + value_1, + time +FROM raw_events_first +ON conflict (user_id, value_1_agg) +DO UPDATE + SET agg_time = EXCLUDED.agg_time + WHERE ae.agg_time < EXCLUDED.agg_time +RETURNING user_id, value_1_agg; + +-- using a left join +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id + WHERE raw_events_second.user_id = 10 OR raw_events_second.user_id = 11; + +-- using a full join +INSERT INTO agg_events (user_id, value_1_agg) +SELECT t1.user_id AS col1, + t2.user_id AS col2 +FROM raw_events_first t1 + FULL JOIN raw_events_second t2 + ON t1.user_id = t2.user_id; + +-- using semi join +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT raw_events_second.user_id + FROM raw_events_second, raw_events_first + WHERE raw_events_second.user_id = raw_events_first.user_id AND raw_events_first.user_id = 200); + +-- using lateral join +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE NOT EXISTS (SELECT 1 + FROM raw_events_second + WHERE raw_events_second.user_id =raw_events_first.user_id); + +-- using inner join +INSERT INTO agg_events (user_id) +SELECT raw_events_first.user_id +FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 +WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4); + +-- We could relax distributed insert .. select checks to allow pushing +-- down more clauses down to the worker nodes when inserting into a single +-- shard by selecting from a colocated one. We might want to do something +-- like https://github.com/citusdata/citus/pull/6772. +-- +-- e.g., insert into null_shard_key_1/citus_local/reference +-- select * from null_shard_key_1/citus_local/reference limit 1 +-- +-- Below "limit / offset clause" test and some others are examples of this. + +-- limit / offset clause +INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first LIMIT 1; +INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first OFFSET 1; + +-- using a materialized cte +WITH cte AS MATERIALIZED + (SELECT max(value_1)+1 as v1_agg, user_id FROM raw_events_first GROUP BY user_id) +INSERT INTO agg_events (value_1_agg, user_id) +SELECT v1_agg, user_id FROM cte; + +INSERT INTO raw_events_second + WITH cte AS MATERIALIZED (SELECT * FROM raw_events_first) + SELECT user_id * 1000, time, value_1, value_2, value_3, value_4 FROM cte; + +-- using a regular cte +WITH cte AS (SELECT * FROM raw_events_first) +INSERT INTO raw_events_second + SELECT user_id * 7000, time, value_1, value_2, value_3, value_4 FROM cte; + +INSERT INTO raw_events_second + WITH cte AS (SELECT * FROM raw_events_first) + SELECT * FROM cte; + +INSERT INTO agg_events + WITH sub_cte AS (SELECT 1) + SELECT + raw_events_first.user_id, (SELECT * FROM sub_cte) + FROM + raw_events_first; + +-- we still support complex joins via INSERT's cte list .. +WITH cte AS ( + SELECT reference_table.a AS a, 1 AS b + FROM distributed_table RIGHT JOIN reference_table USING (a) +) +INSERT INTO raw_events_second (user_id, value_1) + SELECT (a+5)*-1, b FROM cte; + +-- .. but can't do so via via SELECT's cte list +INSERT INTO raw_events_second (user_id, value_1) +WITH cte AS ( + SELECT reference_table.a AS a, 1 AS b + FROM distributed_table RIGHT JOIN reference_table USING (a) +) + SELECT (a+5)*-1, b FROM cte; + +-- using set operations +INSERT INTO + raw_events_first(user_id) + (SELECT user_id FROM raw_events_first) INTERSECT + (SELECT user_id FROM raw_events_first); + +-- group by clause inside subquery +INSERT INTO agg_events + (user_id) +SELECT f2.id FROM + +(SELECT + id +FROM (SELECT raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id ) AS foo) as f +INNER JOIN +(SELECT v4, + v1, + id +FROM (SELECT SUM(raw_events_second.value_4) AS v4, + SUM(raw_events_first.value_1) AS v1, + raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id + GROUP BY raw_events_second.user_id + HAVING SUM(raw_events_second.value_4) > 1000) AS foo2 ) as f2 +ON (f.id = f2.id) +WHERE f.id IN (SELECT user_id + FROM raw_events_second); + +-- group by clause inside lateral subquery +INSERT INTO agg_events (user_id, value_4_agg) +SELECT + averages.user_id, avg(averages.value_4) +FROM + (SELECT + t1.user_id + FROM + raw_events_second t1 JOIN raw_events_second t2 on (t1.user_id = t2.user_id) + ) reference_ids + JOIN LATERAL + (SELECT + user_id, value_4 + FROM + raw_events_first) as averages ON averages.value_4 = reference_ids.user_id + GROUP BY averages.user_id; + +-- using aggregates +INSERT INTO agg_events + (value_3_agg, + value_4_agg, + value_1_agg, + value_2_agg, + user_id) +SELECT SUM(value_3), + Count(value_4), + user_id, + SUM(value_1), + Avg(value_2) +FROM raw_events_first +GROUP BY user_id; + +-- using generate_series +INSERT INTO raw_events_first (user_id, value_1, value_2) +SELECT s, s, s FROM generate_series(1, 5) s; + +CREATE SEQUENCE insert_select_test_seq; + +-- nextval() expression in select's targetlist +INSERT INTO raw_events_first (user_id, value_1, value_2) +SELECT s, nextval('insert_select_test_seq'), (random()*10)::int +FROM generate_series(100, 105) s; + +-- non-immutable function +INSERT INTO modify_fast_path (key, value_1) VALUES (2,1) RETURNING value_1, random() * key; + +SET client_min_messages TO DEBUG2; + +-- update / delete + +UPDATE nullkey_c1_t1 SET a = 1 WHERE b = 5; +UPDATE nullkey_c1_t1 SET a = 1 WHERE a = 5; +UPDATE nullkey_c1_t1 SET a = random(); +UPDATE nullkey_c1_t1 SET a = 1 WHERE a = random(); + +DELETE FROM nullkey_c1_t1 WHERE b = 5; +DELETE FROM nullkey_c1_t1 WHERE a = random(); + +-- simple update queries between different table types / colocated tables +UPDATE nullkey_c1_t1 SET b = 5 FROM nullkey_c1_t2 WHERE nullkey_c1_t1.b = nullkey_c1_t2.b; +UPDATE nullkey_c1_t1 SET b = 5 FROM nullkey_c2_t1 WHERE nullkey_c1_t1.b = nullkey_c2_t1.b; +UPDATE nullkey_c1_t1 SET b = 5 FROM reference_table WHERE nullkey_c1_t1.b = reference_table.b; +UPDATE nullkey_c1_t1 SET b = 5 FROM distributed_table WHERE nullkey_c1_t1.b = distributed_table.b; +UPDATE nullkey_c1_t1 SET b = 5 FROM distributed_table WHERE nullkey_c1_t1.b = distributed_table.a; +UPDATE nullkey_c1_t1 SET b = 5 FROM citus_local_table WHERE nullkey_c1_t1.b = citus_local_table.b; +UPDATE nullkey_c1_t1 SET b = 5 FROM postgres_local_table WHERE nullkey_c1_t1.b = postgres_local_table.b; + +UPDATE reference_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = reference_table.b; +UPDATE distributed_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = distributed_table.b; +UPDATE distributed_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = distributed_table.a; +UPDATE citus_local_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = citus_local_table.b; +UPDATE postgres_local_table SET b = 5 FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b = postgres_local_table.b; + +-- simple delete queries between different table types / colocated tables +DELETE FROM nullkey_c1_t1 USING nullkey_c1_t2 WHERE nullkey_c1_t1.b = nullkey_c1_t2.b; +DELETE FROM nullkey_c1_t1 USING nullkey_c2_t1 WHERE nullkey_c1_t1.b = nullkey_c2_t1.b; +DELETE FROM nullkey_c1_t1 USING reference_table WHERE nullkey_c1_t1.b = reference_table.b; +DELETE FROM nullkey_c1_t1 USING distributed_table WHERE nullkey_c1_t1.b = distributed_table.b; +DELETE FROM nullkey_c1_t1 USING distributed_table WHERE nullkey_c1_t1.b = distributed_table.a; +DELETE FROM nullkey_c1_t1 USING citus_local_table WHERE nullkey_c1_t1.b = citus_local_table.b; +DELETE FROM nullkey_c1_t1 USING postgres_local_table WHERE nullkey_c1_t1.b = postgres_local_table.b; + +DELETE FROM reference_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = reference_table.b; +DELETE FROM distributed_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = distributed_table.b; +DELETE FROM distributed_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = distributed_table.a; +DELETE FROM citus_local_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = citus_local_table.b; +DELETE FROM postgres_local_table USING nullkey_c1_t1 WHERE nullkey_c1_t1.b = postgres_local_table.b; + +-- slightly more complex update queries +UPDATE nullkey_c1_t1 SET b = 5 WHERE nullkey_c1_t1.b IN (SELECT b FROM distributed_table); + +WITH cte AS materialized( + SELECT * FROM distributed_table +) +UPDATE nullkey_c1_t1 SET b = 5 FROM cte WHERE nullkey_c1_t1.b = cte.a; + +WITH cte AS ( + SELECT reference_table.a AS a, 1 AS b + FROM distributed_table RIGHT JOIN reference_table USING (a) +) +UPDATE nullkey_c1_t1 SET b = 5 WHERE nullkey_c1_t1.b IN (SELECT b FROM cte); + +UPDATE nullkey_c1_t1 SET b = 5 FROM reference_table WHERE EXISTS ( + SELECT 1 FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a) WHERE nullkey_c1_t1.b IS NULL +); + +UPDATE nullkey_c1_t1 tx SET b = ( + SELECT nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN nullkey_c1_t1 ON (nullkey_c1_t1.a != nullkey_c1_t2.a) WHERE nullkey_c1_t1.a = tx.a ORDER BY 1 LIMIT 1 +); + +UPDATE nullkey_c1_t1 tx SET b = t2.b FROM nullkey_c1_t1 t1 JOIN nullkey_c1_t2 t2 ON (t1.a = t2.a); + +WITH cte AS ( + SELECT * FROM nullkey_c1_t2 ORDER BY 1,2 LIMIT 10 +) +UPDATE nullkey_c1_t1 SET b = 5 WHERE nullkey_c1_t1.a IN (SELECT b FROM cte); + +UPDATE modify_fast_path SET value_1 = value_1 + 12 * value_1 WHERE key = 1; +UPDATE modify_fast_path SET value_1 = NULL WHERE value_1 = 15 AND (key = 1 OR value_2 = 'citus'); +UPDATE modify_fast_path SET value_1 = 5 WHERE key = 2 RETURNING value_1 * 15, value_1::numeric * 16; +UPDATE modify_fast_path + SET value_1 = 1 + FROM modify_fast_path_reference + WHERE + modify_fast_path.key = modify_fast_path_reference.key AND + modify_fast_path.key = 1 AND + modify_fast_path_reference.key = 1; + +PREPARE p1 (int, int, int) AS + UPDATE modify_fast_path SET value_1 = value_1 + $1 WHERE key = $2 AND value_1 = $3; +EXECUTE p1(1,1,1); +EXECUTE p1(2,2,2); +EXECUTE p1(3,3,3); +EXECUTE p1(4,4,4); +EXECUTE p1(5,5,5); +EXECUTE p1(6,6,6); +EXECUTE p1(7,7,7); + +PREPARE prepared_zero_shard_update(int) AS UPDATE modify_fast_path SET value_1 = 1 WHERE key = $1 AND false; +EXECUTE prepared_zero_shard_update(1); +EXECUTE prepared_zero_shard_update(2); +EXECUTE prepared_zero_shard_update(3); +EXECUTE prepared_zero_shard_update(4); +EXECUTE prepared_zero_shard_update(5); +EXECUTE prepared_zero_shard_update(6); +EXECUTE prepared_zero_shard_update(7); + +-- slightly more complex delete queries +DELETE FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b IN (SELECT b FROM distributed_table); + +WITH cte AS materialized( + SELECT * FROM distributed_table +) +DELETE FROM nullkey_c1_t1 USING cte WHERE nullkey_c1_t1.b = cte.a; + +WITH cte AS ( + SELECT reference_table.a AS a, 1 AS b + FROM distributed_table RIGHT JOIN reference_table USING (a) +) +DELETE FROM nullkey_c1_t1 WHERE nullkey_c1_t1.b IN (SELECT b FROM cte); + +DELETE FROM nullkey_c1_t1 USING reference_table WHERE EXISTS ( + SELECT 1 FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a) WHERE nullkey_c1_t1.b IS NULL +); + +DELETE FROM nullkey_c1_t1 tx USING nullkey_c1_t1 t1 JOIN nullkey_c1_t2 t2 ON (t1.a = t2.a); + +WITH cte AS ( + SELECT * FROM nullkey_c1_t2 ORDER BY 1,2 LIMIT 10 +) +DELETE FROM nullkey_c1_t1 WHERE nullkey_c1_t1.a IN (SELECT b FROM cte); + +DELETE FROM modify_fast_path WHERE value_1 = 15 AND (key = 1 OR value_2 = 'citus'); +DELETE FROM modify_fast_path WHERE key = 2 RETURNING value_1 * 15, value_1::numeric * 16; +DELETE FROM modify_fast_path + USING modify_fast_path_reference + WHERE + modify_fast_path.key = modify_fast_path_reference.key AND + modify_fast_path.key = 1 AND + modify_fast_path_reference.key = 1; + +PREPARE p2 (int, int, int) AS + DELETE FROM modify_fast_path WHERE key = ($2)*$1 AND value_1 = $3; +EXECUTE p2(1,1,1); +EXECUTE p2(2,2,2); +EXECUTE p2(3,3,3); +EXECUTE p2(4,4,4); +EXECUTE p2(5,5,5); +EXECUTE p2(6,6,6); +EXECUTE p2(7,7,7); + +PREPARE prepared_zero_shard_delete(int) AS DELETE FROM modify_fast_path WHERE key = $1 AND false; +EXECUTE prepared_zero_shard_delete(1); +EXECUTE prepared_zero_shard_delete(2); +EXECUTE prepared_zero_shard_delete(3); +EXECUTE prepared_zero_shard_delete(4); +EXECUTE prepared_zero_shard_delete(5); +EXECUTE prepared_zero_shard_delete(6); +EXECUTE prepared_zero_shard_delete(7); + +-- test modifying ctes + +WITH cte AS ( + UPDATE modify_fast_path SET value_1 = value_1 + 1 WHERE key = 1 RETURNING * +) +SELECT * FROM cte; + +WITH cte AS ( + DELETE FROM modify_fast_path WHERE key = 1 RETURNING * +) +SELECT * FROM modify_fast_path; + +WITH cte AS ( + DELETE FROM modify_fast_path WHERE key = 1 RETURNING * +) +SELECT * FROM modify_fast_path_reference WHERE key IN (SELECT key FROM cte); + +WITH cte AS ( + DELETE FROM reference_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE a IN (SELECT a FROM cte); + +WITH cte AS ( + DELETE FROM nullkey_c1_t1 WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t2 WHERE a IN (SELECT a FROM cte); + +WITH cte AS ( + DELETE FROM nullkey_c1_t1 WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c2_t1 WHERE a IN (SELECT a FROM cte); + +WITH cte AS ( + DELETE FROM nullkey_c1_t1 WHERE a = 1 RETURNING * +) +SELECT * FROM distributed_table WHERE a IN (SELECT a FROM cte); + +-- Below two queries fail very late when +-- citus.enable_non_colocated_router_query_pushdown is set to on. + +SET citus.enable_non_colocated_router_query_pushdown TO ON; + +WITH cte AS ( + DELETE FROM distributed_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE a IN (SELECT a FROM cte); + +WITH cte AS ( + DELETE FROM distributed_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE b IN (SELECT b FROM cte); + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; + +WITH cte AS ( + DELETE FROM distributed_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE a IN (SELECT a FROM cte); + +WITH cte AS ( + DELETE FROM distributed_table WHERE a = 1 RETURNING * +) +SELECT * FROM nullkey_c1_t1 WHERE b IN (SELECT b FROM cte); + +RESET citus.enable_non_colocated_router_query_pushdown; + +WITH cte AS ( + UPDATE modify_fast_path SET value_1 = value_1 + 1 WHERE key = 1 RETURNING * +) +UPDATE modify_fast_path SET value_1 = value_1 + 1 WHERE key = 1; + +WITH cte AS ( + DELETE FROM modify_fast_path WHERE key = 1 RETURNING * +) +DELETE FROM modify_fast_path WHERE key = 1; + +-- test window functions + +SELECT + user_id, avg(avg(value_3)) OVER (PARTITION BY user_id, MIN(value_2)) +FROM + raw_events_first +GROUP BY + 1 +ORDER BY + 2 DESC NULLS LAST, 1 DESC; + +SELECT + user_id, max(value_1) OVER (PARTITION BY user_id, MIN(value_2)) +FROM ( + SELECT + DISTINCT us.user_id, us.value_2, us.value_1, random() as r1 + FROM + raw_events_first as us, raw_events_second + WHERE + us.user_id = raw_events_second.user_id + ORDER BY + user_id, value_2 + ) s +GROUP BY + 1, value_1 +ORDER BY + 2 DESC, 1; + +SELECT + DISTINCT ON (raw_events_second.user_id, rnk) raw_events_second.user_id, rank() OVER my_win AS rnk +FROM + raw_events_second, raw_events_first +WHERE + raw_events_first.user_id = raw_events_second.user_id +WINDOW + my_win AS (PARTITION BY raw_events_second.user_id, raw_events_first.value_1 ORDER BY raw_events_second.time DESC) +ORDER BY + rnk DESC, 1 DESC +LIMIT 10; + +SET client_min_messages TO ERROR; +DROP SCHEMA query_null_dist_key CASCADE; + +SELECT citus_remove_node('localhost', :master_port);