Enable logical planner for single-shard tables (#6950)

* Enable using logical planner for single-shard tables

* Improve non-colocated table error in physical planner

* Favor distributed tables over reference tables when chosing anchor shard
pull/6974/head
Onur Tirtir 2023-06-08 10:57:23 +03:00 committed by GitHub
parent b569d53a0c
commit fa8870217d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 2505 additions and 390 deletions

View File

@ -1025,17 +1025,6 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina
{ {
return distributedPlan; return distributedPlan;
} }
else if (ContainsSingleShardTable(originalQuery))
{
/*
* We only support router queries if the query contains reference to
* a single-shard table. This temporary restriction will be removed
* once we support recursive planning for the queries that reference
* single-shard tables.
*/
WrapRouterErrorForSingleShardTable(distributedPlan->planningError);
RaiseDeferredError(distributedPlan->planningError, ERROR);
}
else else
{ {
RaiseDeferredError(distributedPlan->planningError, DEBUG2); RaiseDeferredError(distributedPlan->planningError, DEBUG2);

View File

@ -1406,17 +1406,15 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
IsSupportedRedistributionTarget(targetRelationId); IsSupportedRedistributionTarget(targetRelationId);
/* /*
* Today it's not possible to generate a distributed plan for a SELECT * It's not possible to generate a distributed plan for a SELECT
* having more than one tasks if it references a single-shard table. * having more than one tasks if it references a single-shard table.
* This is because, we don't support queries beyond router planner
* if the query references a single-shard table.
* *
* For this reason, right now we don't expect an INSERT .. SELECT * For this reason, right now we don't expect an INSERT .. SELECT
* query to go through the repartitioned INSERT .. SELECT logic if the * query to go through the repartitioned INSERT .. SELECT logic if the
* SELECT query references a single-shard table. * SELECT query references a single-shard table.
*/ */
Assert(!repartitioned || Assert(!repartitioned ||
!GetRTEListPropertiesForQuery(selectQueryCopy)->hasSingleShardDistTable); !ContainsSingleShardTable(selectQueryCopy));
distributedPlan->modifyQueryViaCoordinatorOrRepartition = insertSelectQuery; distributedPlan->modifyQueryViaCoordinatorOrRepartition = insertSelectQuery;
distributedPlan->selectPlanForModifyViaCoordinatorOrRepartition = selectPlan; distributedPlan->selectPlanForModifyViaCoordinatorOrRepartition = selectPlan;

View File

@ -1028,7 +1028,8 @@ ErrorHintRequired(const char *errorHint, Query *queryTree)
{ {
continue; continue;
} }
else if (IsCitusTableType(relationId, HASH_DISTRIBUTED)) else if (IsCitusTableType(relationId, HASH_DISTRIBUTED) ||
IsCitusTableType(relationId, SINGLE_SHARD_DISTRIBUTED))
{ {
int colocationId = TableColocationId(relationId); int colocationId = TableColocationId(relationId);
colocationIdList = list_append_unique_int(colocationIdList, colocationId); colocationIdList = list_append_unique_int(colocationIdList, colocationId);

View File

@ -2367,7 +2367,7 @@ ErrorIfUnsupportedShardDistribution(Query *query)
ListCell *relationIdCell = NULL; ListCell *relationIdCell = NULL;
uint32 relationIndex = 0; uint32 relationIndex = 0;
uint32 rangeDistributedRelationCount = 0; uint32 rangeDistributedRelationCount = 0;
uint32 hashDistributedRelationCount = 0; uint32 hashDistOrSingleShardRelCount = 0;
uint32 appendDistributedRelationCount = 0; uint32 appendDistributedRelationCount = 0;
foreach(relationIdCell, relationIdList) foreach(relationIdCell, relationIdList)
@ -2379,9 +2379,10 @@ ErrorIfUnsupportedShardDistribution(Query *query)
nonReferenceRelations = lappend_oid(nonReferenceRelations, nonReferenceRelations = lappend_oid(nonReferenceRelations,
relationId); relationId);
} }
else if (IsCitusTableType(relationId, HASH_DISTRIBUTED)) else if (IsCitusTableType(relationId, HASH_DISTRIBUTED) ||
IsCitusTableType(relationId, SINGLE_SHARD_DISTRIBUTED))
{ {
hashDistributedRelationCount++; hashDistOrSingleShardRelCount++;
nonReferenceRelations = lappend_oid(nonReferenceRelations, nonReferenceRelations = lappend_oid(nonReferenceRelations,
relationId); relationId);
} }
@ -2396,7 +2397,7 @@ ErrorIfUnsupportedShardDistribution(Query *query)
} }
} }
if ((rangeDistributedRelationCount > 0) && (hashDistributedRelationCount > 0)) if ((rangeDistributedRelationCount > 0) && (hashDistOrSingleShardRelCount > 0))
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"), errmsg("cannot push down this subquery"),
@ -2410,7 +2411,7 @@ ErrorIfUnsupportedShardDistribution(Query *query)
errdetail("A query including both range and append " errdetail("A query including both range and append "
"partitioned relations are unsupported"))); "partitioned relations are unsupported")));
} }
else if ((appendDistributedRelationCount > 0) && (hashDistributedRelationCount > 0)) else if ((appendDistributedRelationCount > 0) && (hashDistOrSingleShardRelCount > 0))
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"), errmsg("cannot push down this subquery"),
@ -2439,8 +2440,9 @@ ErrorIfUnsupportedShardDistribution(Query *query)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"), errmsg("cannot push down this subquery"),
errdetail("Shards of relations in subquery need to " errdetail("%s and %s are not colocated",
"have 1-to-1 shard partitioning"))); get_rel_name(firstTableRelationId),
get_rel_name(currentRelationId))));
} }
} }
} }
@ -2813,15 +2815,15 @@ AnchorRangeTableId(List *rangeTableList)
* have the most number of shards, we have a draw. * have the most number of shards, we have a draw.
*/ */
List *baseTableIdList = BaseRangeTableIdList(rangeTableList); List *baseTableIdList = BaseRangeTableIdList(rangeTableList);
List *anchorTableIdList = AnchorRangeTableIdList(rangeTableList, baseTableIdList); List *anchorTableRTIList = AnchorRangeTableIdList(rangeTableList, baseTableIdList);
ListCell *anchorTableIdCell = NULL; ListCell *anchorTableIdCell = NULL;
int anchorTableIdCount = list_length(anchorTableIdList); int anchorTableIdCount = list_length(anchorTableRTIList);
Assert(anchorTableIdCount > 0); Assert(anchorTableIdCount > 0);
if (anchorTableIdCount == 1) if (anchorTableIdCount == 1)
{ {
anchorRangeTableId = (uint32) linitial_int(anchorTableIdList); anchorRangeTableId = (uint32) linitial_int(anchorTableRTIList);
return anchorRangeTableId; return anchorRangeTableId;
} }
@ -2829,7 +2831,7 @@ AnchorRangeTableId(List *rangeTableList)
* If more than one table has the most number of shards, we break the draw * If more than one table has the most number of shards, we break the draw
* by comparing table sizes and picking the table with the largest size. * by comparing table sizes and picking the table with the largest size.
*/ */
foreach(anchorTableIdCell, anchorTableIdList) foreach(anchorTableIdCell, anchorTableRTIList)
{ {
uint32 anchorTableId = (uint32) lfirst_int(anchorTableIdCell); uint32 anchorTableId = (uint32) lfirst_int(anchorTableIdCell);
RangeTblEntry *tableEntry = rt_fetch(anchorTableId, rangeTableList); RangeTblEntry *tableEntry = rt_fetch(anchorTableId, rangeTableList);
@ -2857,7 +2859,7 @@ AnchorRangeTableId(List *rangeTableList)
if (anchorRangeTableId == 0) if (anchorRangeTableId == 0)
{ {
/* all tables have the same shard count and size 0, pick the first */ /* all tables have the same shard count and size 0, pick the first */
anchorRangeTableId = (uint32) linitial_int(anchorTableIdList); anchorRangeTableId = (uint32) linitial_int(anchorTableRTIList);
} }
return anchorRangeTableId; return anchorRangeTableId;
@ -2898,7 +2900,7 @@ BaseRangeTableIdList(List *rangeTableList)
static List * static List *
AnchorRangeTableIdList(List *rangeTableList, List *baseRangeTableIdList) AnchorRangeTableIdList(List *rangeTableList, List *baseRangeTableIdList)
{ {
List *anchorTableIdList = NIL; List *anchorTableRTIList = NIL;
uint32 maxShardCount = 0; uint32 maxShardCount = 0;
ListCell *baseRangeTableIdCell = NULL; ListCell *baseRangeTableIdCell = NULL;
@ -2908,25 +2910,46 @@ AnchorRangeTableIdList(List *rangeTableList, List *baseRangeTableIdList)
return baseRangeTableIdList; return baseRangeTableIdList;
} }
uint32 referenceTableRTI = 0;
foreach(baseRangeTableIdCell, baseRangeTableIdList) foreach(baseRangeTableIdCell, baseRangeTableIdList)
{ {
uint32 baseRangeTableId = (uint32) lfirst_int(baseRangeTableIdCell); uint32 baseRangeTableId = (uint32) lfirst_int(baseRangeTableIdCell);
RangeTblEntry *tableEntry = rt_fetch(baseRangeTableId, rangeTableList); RangeTblEntry *tableEntry = rt_fetch(baseRangeTableId, rangeTableList);
List *shardList = LoadShardList(tableEntry->relid);
Oid citusTableId = tableEntry->relid;
if (IsCitusTableType(citusTableId, REFERENCE_TABLE))
{
referenceTableRTI = baseRangeTableId;
continue;
}
List *shardList = LoadShardList(citusTableId);
uint32 shardCount = (uint32) list_length(shardList); uint32 shardCount = (uint32) list_length(shardList);
if (shardCount > maxShardCount) if (shardCount > maxShardCount)
{ {
anchorTableIdList = list_make1_int(baseRangeTableId); anchorTableRTIList = list_make1_int(baseRangeTableId);
maxShardCount = shardCount; maxShardCount = shardCount;
} }
else if (shardCount == maxShardCount) else if (shardCount == maxShardCount)
{ {
anchorTableIdList = lappend_int(anchorTableIdList, baseRangeTableId); anchorTableRTIList = lappend_int(anchorTableRTIList, baseRangeTableId);
} }
} }
return anchorTableIdList; /*
* We favor distributed tables over reference tables as anchor tables. But
* in case we cannot find any distributed tables, we let reference table to be
* anchor table. For now, we cannot see a query that might require this, but we
* want to be backward compatiable.
*/
if (list_length(anchorTableRTIList) == 0)
{
return referenceTableRTI > 0 ? list_make1_int(referenceTableRTI) : NIL;
}
return anchorTableRTIList;
} }

View File

@ -258,22 +258,6 @@ CreateModifyPlan(Query *originalQuery, Query *query,
} }
/*
* WrapRouterErrorForSingleShardTable wraps given planning error with a
* generic error message if given query references a distributed table
* that doesn't have a distribution key.
*/
void
WrapRouterErrorForSingleShardTable(DeferredErrorMessage *planningError)
{
planningError->detail = planningError->message;
planningError->message = pstrdup("queries that reference a distributed "
"table without a shard key can only "
"reference colocated distributed "
"tables or reference tables");
}
/* /*
* CreateSingleTaskRouterSelectPlan creates a physical plan for given SELECT query. * CreateSingleTaskRouterSelectPlan creates a physical plan for given SELECT query.
* The returned plan is a router task that returns query results from a single worker. * The returned plan is a router task that returns query results from a single worker.
@ -1886,11 +1870,6 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
*/ */
if (IsMergeQuery(originalQuery)) if (IsMergeQuery(originalQuery))
{ {
if (ContainsSingleShardTable(originalQuery))
{
WrapRouterErrorForSingleShardTable(*planningError);
}
RaiseDeferredError(*planningError, ERROR); RaiseDeferredError(*planningError, ERROR);
} }
else else

View File

@ -168,11 +168,10 @@ AnchorRte(Query *subquery)
{ {
Oid relationId = currentRte->relid; Oid relationId = currentRte->relid;
if (IsCitusTable(relationId) && !HasDistributionKey(relationId)) if (!IsCitusTableType(relationId, DISTRIBUTED_TABLE))
{ {
/* /*
* Non-distributed tables should not be the anchor rte since they * We're not interested in non distributed relations.
* don't have distribution key.
*/ */
continue; continue;
} }

View File

@ -36,7 +36,6 @@ extern DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query,
extern DistributedPlan * CreateModifyPlan(Query *originalQuery, Query *query, extern DistributedPlan * CreateModifyPlan(Query *originalQuery, Query *query,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
extern void WrapRouterErrorForSingleShardTable(DeferredErrorMessage *planningError);
extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext, plannerRestrictionContext,

View File

@ -211,34 +211,14 @@ class AllSingleShardTableDefaultConfig(CitusDefaultClusterConfig):
super().__init__(arguments) super().__init__(arguments)
self.all_null_dist_key = True self.all_null_dist_key = True
self.skip_tests += [ self.skip_tests += [
# i) Skip the following tests because they require SQL support beyond # One of the distributed functions created in "function_create"
# router planner / supporting more DDL command types. # requires setting a distribution column, which cannot be the
# # case with single shard tables.
# group 1
"dropped_columns_create_load",
"dropped_columns_1",
# group 2
"distributed_planning_create_load",
"distributed_planning",
# group 4
"views_create",
"views",
# group 5
"intermediate_result_pruning_create",
"intermediate_result_pruning_queries_1",
"intermediate_result_pruning_queries_2",
# group 6
"local_dist_join_load",
"local_dist_join",
"arbitrary_configs_recurring_outer_join",
# group 7
"sequences_create",
"sequences",
# group 8
"function_create", "function_create",
"functions", "functions",
# # In "nested_execution", one of the tests that query
# ii) Skip the following test as it requires support for create_distributed_function. # "dist_query_single_shard" table acts differently when the table
# has a single shard. This is explained with a comment in the test.
"nested_execution", "nested_execution",
] ]

View File

@ -909,15 +909,18 @@ ALTER TABLE "NULL_!_dist_key"."nullKeyTable.1!?!90123456789012345678901234567890
ERROR: referenced table "local_table_for_fkey" must be a distributed table or a reference table ERROR: referenced table "local_table_for_fkey" must be a distributed table or a reference table
DETAIL: To enforce foreign keys, the referencing and referenced rows need to be stored on the same node. DETAIL: To enforce foreign keys, the referencing and referenced rows need to be stored on the same node.
HINT: You could use SELECT create_reference_table('local_table_for_fkey') to replicate the referenced table to all nodes or consider dropping the foreign key HINT: You could use SELECT create_reference_table('local_table_for_fkey') to replicate the referenced table to all nodes or consider dropping the foreign key
-- Normally, we support foreign keys from Postgres tables to distributed -- foreign key from a local table
-- tables assuming that the user will soon distribute the local table too
-- anyway. However, this is not the case for single-shard tables before
-- we improve SQL support.
ALTER TABLE local_table_for_fkey ALTER TABLE local_table_for_fkey
ADD CONSTRAINT fkey_from_dummy_local FOREIGN KEY (a) REFERENCES "NULL_!_dist_key"."nullKeyTable.1!?!9012345678901234567890123456789012345678901234567890123456789"(id); ADD CONSTRAINT fkey_from_dummy_local FOREIGN KEY (a) REFERENCES "NULL_!_dist_key"."nullKeyTable.1!?!9012345678901234567890123456789012345678901234567890123456789"(id);
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables SELECT create_distributed_table('local_table_for_fkey', null, colocate_with=>'none');
DETAIL: Local tables cannot be used in distributed queries. ERROR: cannot create foreign key constraint since relations are not colocated or not referencing a reference table
CONTEXT: SQL statement "SELECT fk."a" FROM ONLY "create_single_shard_table"."local_table_for_fkey" fk LEFT OUTER JOIN "NULL_!_dist_key"."nullKeyTable.1!?!9012345678901234567890123456789012345678901234" pk ON ( pk."id" OPERATOR(pg_catalog.=) fk."a") WHERE pk."id" IS NULL AND (fk."a" IS NOT NULL)" DETAIL: A distributed table can only have foreign keys if it is referencing another colocated hash distributed table or a reference table
SELECT create_distributed_table('local_table_for_fkey', null, colocate_with=>'"NULL_!_dist_key"."nullKeyTable.1!?!9012345678901234567890123456789012345678901234567890123456789"');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- foreign key to a citus local table, errors out -- foreign key to a citus local table, errors out
CREATE TABLE citus_local_table_for_fkey (a INT PRIMARY KEY); CREATE TABLE citus_local_table_for_fkey (a INT PRIMARY KEY);
SELECT citus_add_local_table_to_metadata('citus_local_table_for_fkey'); SELECT citus_add_local_table_to_metadata('citus_local_table_for_fkey');
@ -1128,7 +1131,7 @@ BEGIN;
INSERT INTO referencing_table VALUES (1, 2); INSERT INTO referencing_table VALUES (1, 2);
-- fails -- fails
INSERT INTO referencing_table VALUES (2, 2); INSERT INTO referencing_table VALUES (2, 2);
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730098" ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730099"
DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx". DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx".
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
ROLLBACK; ROLLBACK;
@ -1174,7 +1177,7 @@ BEGIN;
INSERT INTO referencing_table VALUES (1, 2); INSERT INTO referencing_table VALUES (1, 2);
-- fails -- fails
INSERT INTO referencing_table VALUES (2, 2); INSERT INTO referencing_table VALUES (2, 2);
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730134" ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730135"
DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx". DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx".
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
ROLLBACK; ROLLBACK;
@ -1292,8 +1295,8 @@ SELECT result, success FROM run_command_on_workers($$
$$); $$);
result | success result | success
--------------------------------------------------------------------- ---------------------------------------------------------------------
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730151" | f ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730152" | f
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730151" | f ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730152" | f
(2 rows) (2 rows)
DROP TABLE referencing_table, referenced_table; DROP TABLE referencing_table, referenced_table;
@ -1308,8 +1311,8 @@ SELECT create_distributed_table('self_fkey_test', NULL, distribution_type=>null)
INSERT INTO self_fkey_test VALUES (1, 1); -- ok INSERT INTO self_fkey_test VALUES (1, 1); -- ok
INSERT INTO self_fkey_test VALUES (2, 3); -- fails INSERT INTO self_fkey_test VALUES (2, 3); -- fails
ERROR: insert or update on table "self_fkey_test_1730152" violates foreign key constraint "self_fkey_test_b_fkey_1730152" ERROR: insert or update on table "self_fkey_test_1730153" violates foreign key constraint "self_fkey_test_b_fkey_1730153"
DETAIL: Key (b)=(3) is not present in table "self_fkey_test_1730152". DETAIL: Key (b)=(3) is not present in table "self_fkey_test_1730153".
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
-- similar foreign key tests but this time create the referencing table later on -- similar foreign key tests but this time create the referencing table later on
-- referencing table is a single-shard table -- referencing table is a single-shard table
@ -1333,7 +1336,7 @@ BEGIN;
INSERT INTO referencing_table VALUES (1, 2); INSERT INTO referencing_table VALUES (1, 2);
-- fails -- fails
INSERT INTO referencing_table VALUES (2, 2); INSERT INTO referencing_table VALUES (2, 2);
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730154" ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730155"
DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx". DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx".
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
ROLLBACK; ROLLBACK;
@ -1356,7 +1359,7 @@ BEGIN;
INSERT INTO referencing_table VALUES (2, 1); INSERT INTO referencing_table VALUES (2, 1);
-- fails -- fails
INSERT INTO referencing_table VALUES (1, 2); INSERT INTO referencing_table VALUES (1, 2);
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_b_fkey_1730156" ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_b_fkey_1730157"
DETAIL: Key (a, b)=(1, 2) is not present in table "referenced_table_xxxxxxx". DETAIL: Key (a, b)=(1, 2) is not present in table "referenced_table_xxxxxxx".
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
ROLLBACK; ROLLBACK;
@ -1463,7 +1466,7 @@ BEGIN;
INSERT INTO referencing_table VALUES (1, 2); INSERT INTO referencing_table VALUES (1, 2);
-- fails -- fails
INSERT INTO referencing_table VALUES (2, 2); INSERT INTO referencing_table VALUES (2, 2);
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730197" ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730198"
DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx". DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx".
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
ROLLBACK; ROLLBACK;

View File

@ -147,10 +147,20 @@ DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition
DETAIL: The target table's partition column should correspond to a partition column in the subquery. DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN matview USING (a); INSERT INTO distributed_table_c1_t1 SELECT COALESCE(nullkey_c1_t1.a, 1), nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN matview USING (a);
DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Local tables cannot be used in distributed queries.
DETAIL: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "matview" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM insert_select_single_shard_table.matview WHERE true
DEBUG: recursively planning left side of the full join since the other side is a recurring rel
DEBUG: recursively planning distributed relation "nullkey_c1_t1" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "nullkey_c1_t1" to a subquery
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: generating subplan XXX_2 for subquery SELECT a, b FROM insert_select_single_shard_table.nullkey_c1_t1 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT COALESCE(nullkey_c1_t1.a, 1) AS a, nullkey_c1_t1.b FROM ((SELECT nullkey_c1_t1_1.a, nullkey_c1_t1_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1_1) nullkey_c1_t1 FULL JOIN (SELECT matview_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) matview_1) matview USING (a))
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c1_t2; INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c1_t2;
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery. DETAIL: The target table's partition column should correspond to a partition column in the subquery.
@ -160,45 +170,65 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a); INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a);
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery. DETAIL: The target table's partition column should correspond to a partition column in the subquery.
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: router planner does not support queries that reference non-colocated distributed tables
DETAIL: router planner does not support queries that reference non-colocated distributed tables ERROR: cannot push down this subquery
DETAIL: nullkey_c1_t2 and nullkey_c2_t1 are not colocated
INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c2_t1; INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c2_t1;
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery. DETAIL: The target table's partition column should correspond to a partition column in the subquery.
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: router planner does not support queries that reference non-colocated distributed tables
DETAIL: router planner does not support queries that reference non-colocated distributed tables DEBUG: Distributed planning for a fast-path router query
-- use a distributed table that is colocated with the target table DEBUG: Creating router plan
DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM insert_select_single_shard_table.nullkey_c1_t1
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: generating subplan XXX_2 for subquery SELECT a, b FROM insert_select_single_shard_table.nullkey_c2_t1
DEBUG: Creating router plan
DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) UNION SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) citus_insert_select_subquery
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
SET client_min_messages TO DEBUG1;
SET citus.enable_repartition_joins TO ON;
-- use a distributed table that is colocated with the target table, with repartition joins enabled
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Collecting INSERT ... SELECT results on coordinator
DETAIL: router planner does not support queries that reference non-colocated distributed tables
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Collecting INSERT ... SELECT results on coordinator
DETAIL: router planner does not support queries that reference non-colocated distributed tables
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b); INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b);
DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Collecting INSERT ... SELECT results on coordinator
DETAIL: router planner does not support queries that reference non-colocated distributed tables
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1; INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1;
DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Collecting INSERT ... SELECT results on coordinator
DETAIL: router planner does not support queries that reference non-colocated distributed tables -- use a distributed table that is not colocated with the target table, with repartition joins enabled
-- use a distributed table that is not colocated with the target table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN distributed_table_c2_t1 USING (a); INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN distributed_table_c2_t1 USING (a);
DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Collecting INSERT ... SELECT results on coordinator
DETAIL: router planner does not support queries that reference non-colocated distributed tables RESET citus.enable_repartition_joins;
SET client_min_messages TO DEBUG2;
-- use a citus local table -- use a citus local table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a); INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Local tables cannot be used in distributed queries.
DETAIL: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "citus_local_table" to a subquery
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM insert_select_single_shard_table.citus_local_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM (insert_select_single_shard_table.nullkey_c1_t1 JOIN (SELECT citus_local_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) citus_local_table_1) citus_local_table USING (a))
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a postgres local table -- use a postgres local table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a); INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Local tables cannot be used in distributed queries.
DETAIL: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "postgres_local_table" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM insert_select_single_shard_table.postgres_local_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM (insert_select_single_shard_table.nullkey_c1_t2 JOIN (SELECT postgres_local_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) postgres_local_table_1) postgres_local_table USING (a))
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use append / range distributed tables -- use append / range distributed tables
INSERT INTO range_table SELECT * FROM nullkey_c1_t1; INSERT INTO range_table SELECT * FROM nullkey_c1_t1;
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
@ -209,13 +239,13 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO append_table SELECT * FROM nullkey_c1_t1; INSERT INTO append_table SELECT * FROM nullkey_c1_t1;
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery. DETAIL: The target table's partition column should correspond to a partition column in the subquery.
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: INSERT ... SELECT into an append-distributed table is not supported
DETAIL: INSERT ... SELECT into an append-distributed table is not supported ERROR: INSERT ... SELECT into an append-distributed table is not supported
SELECT avg(a), avg(b) FROM distributed_table_c1_t1 ORDER BY 1, 2; SELECT avg(a), avg(b) FROM distributed_table_c1_t1 ORDER BY 1, 2;
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries
avg | avg avg | avg
--------------------------------------------------------------------- ---------------------------------------------------------------------
4.2105263157894737 | 4.2105263157894737 4.3421052631578947 | 4.5277777777777778
(1 row) (1 row)
TRUNCATE distributed_table_c1_t1; TRUNCATE distributed_table_c1_t1;
@ -246,8 +276,11 @@ DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b) WHERE b IN (SELECT b FROM matview); INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b) WHERE b IN (SELECT b FROM matview);
DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Local tables cannot be used in distributed queries.
DETAIL: Local tables cannot be used in distributed queries. DEBUG: generating subplan XXX_1 for subquery SELECT b FROM insert_select_single_shard_table.matview
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM (insert_select_single_shard_table.nullkey_c1_t2 LEFT JOIN insert_select_single_shard_table.reference_table USING (b)) WHERE (nullkey_c1_t2.b OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)))
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a colocated single-shard table -- use a colocated single-shard table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b); INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
@ -260,41 +293,52 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a non-colocated single-shard table -- use a non-colocated single-shard table
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a); INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: router planner does not support queries that reference non-colocated distributed tables
DETAIL: router planner does not support queries that reference non-colocated distributed tables ERROR: cannot push down this subquery
DETAIL: nullkey_c1_t2 and nullkey_c2_t1 are not colocated
-- use a distributed table -- use a distributed table
SET client_min_messages TO DEBUG1;
SET citus.enable_repartition_joins TO ON;
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Collecting INSERT ... SELECT results on coordinator
DETAIL: router planner does not support queries that reference non-colocated distributed tables
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Collecting INSERT ... SELECT results on coordinator
DETAIL: router planner does not support queries that reference non-colocated distributed tables
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b); INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b);
DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Collecting INSERT ... SELECT results on coordinator
DETAIL: router planner does not support queries that reference non-colocated distributed tables
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1; INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1;
DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Collecting INSERT ... SELECT results on coordinator
DETAIL: router planner does not support queries that reference non-colocated distributed tables RESET citus.enable_repartition_joins;
SET client_min_messages TO DEBUG2;
-- use a citus local table -- use a citus local table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a); INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Local tables cannot be used in distributed queries.
DETAIL: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "citus_local_table" to a subquery
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM insert_select_single_shard_table.citus_local_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM (insert_select_single_shard_table.nullkey_c1_t1 JOIN (SELECT citus_local_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) citus_local_table_1) citus_local_table USING (a))
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a postgres local table -- use a postgres local table
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a); INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Local tables cannot be used in distributed queries.
DETAIL: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "postgres_local_table" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM insert_select_single_shard_table.postgres_local_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM (insert_select_single_shard_table.nullkey_c1_t2 JOIN (SELECT postgres_local_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) postgres_local_table_1) postgres_local_table USING (a))
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
SELECT avg(a), avg(b) FROM reference_table ORDER BY 1, 2; SELECT avg(a), avg(b) FROM reference_table ORDER BY 1, 2;
DEBUG: Distributed planning for a fast-path router query DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
avg | avg avg | avg
--------------------------------------------------------------------- ---------------------------------------------------------------------
4.0428571428571429 | 4.0428571428571429 4.3063063063063063 | 4.3063063063063063
(1 row) (1 row)
TRUNCATE reference_table; TRUNCATE reference_table;
@ -320,26 +364,39 @@ DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is a
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a distributed table -- use a distributed table
SET client_min_messages TO DEBUG1;
SET citus.enable_repartition_joins TO ON;
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Collecting INSERT ... SELECT results on coordinator
DETAIL: router planner does not support queries that reference non-colocated distributed tables RESET citus.enable_repartition_joins;
SET client_min_messages TO DEBUG2;
-- use a citus local table -- use a citus local table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a); INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Local tables cannot be used in distributed queries.
DETAIL: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "citus_local_table" to a subquery
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM insert_select_single_shard_table.citus_local_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM (insert_select_single_shard_table.nullkey_c1_t1 JOIN (SELECT citus_local_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) citus_local_table_1) citus_local_table USING (a))
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a postgres local table -- use a postgres local table
INSERT INTO citus_local_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a); INSERT INTO citus_local_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Local tables cannot be used in distributed queries.
DETAIL: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "postgres_local_table" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM insert_select_single_shard_table.postgres_local_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM (insert_select_single_shard_table.nullkey_c1_t2 JOIN (SELECT postgres_local_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) postgres_local_table_1) postgres_local_table USING (a))
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
SELECT avg(a), avg(b) FROM citus_local_table ORDER BY 1, 2; SELECT avg(a), avg(b) FROM citus_local_table ORDER BY 1, 2;
DEBUG: Distributed planning for a fast-path router query DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
avg | avg avg | avg
--------------------------------------------------------------------- ---------------------------------------------------------------------
4.4333333333333333 | 4.4333333333333333 4.5270270270270270 | 4.5270270270270270
(1 row) (1 row)
TRUNCATE citus_local_table; TRUNCATE citus_local_table;
@ -358,8 +415,18 @@ DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table LEFT JOIN nullkey_c1_t1 USING (a); INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table LEFT JOIN nullkey_c1_t1 USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Local tables cannot be used in distributed queries.
DETAIL: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "postgres_local_table" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM insert_select_single_shard_table.postgres_local_table WHERE true
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel
DEBUG: recursively planning distributed relation "nullkey_c1_t1" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "nullkey_c1_t1" to a subquery
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: generating subplan XXX_2 for subquery SELECT a FROM insert_select_single_shard_table.nullkey_c1_t1 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT postgres_local_table.a, postgres_local_table.b FROM ((SELECT postgres_local_table_1.a, postgres_local_table_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) postgres_local_table_1) postgres_local_table LEFT JOIN (SELECT nullkey_c1_t1_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) nullkey_c1_t1_1) nullkey_c1_t1 USING (a))
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a citus local table -- use a citus local table
INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table; INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table;
DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table
@ -372,8 +439,14 @@ DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table JOIN nullkey_c1_t1 USING (a); INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table JOIN nullkey_c1_t1 USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Local tables cannot be used in distributed queries.
DETAIL: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "citus_local_table" to a subquery
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM insert_select_single_shard_table.citus_local_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT citus_local_table.a, citus_local_table.b FROM ((SELECT citus_local_table_1.a, citus_local_table_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) citus_local_table_1) citus_local_table JOIN insert_select_single_shard_table.nullkey_c1_t1 USING (a))
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a distributed table -- use a distributed table
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2; INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2;
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
@ -383,10 +456,13 @@ INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1
DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
SET client_min_messages TO DEBUG1;
SET citus.enable_repartition_joins TO ON;
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN nullkey_c1_t1 USING (a); INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN nullkey_c1_t1 USING (a);
DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Collecting INSERT ... SELECT results on coordinator
DETAIL: router planner does not support queries that reference non-colocated distributed tables RESET citus.enable_repartition_joins;
SET client_min_messages TO DEBUG2;
-- use a non-colocated single-shard table -- use a non-colocated single-shard table
INSERT INTO nullkey_c2_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a)) q JOIN nullkey_c1_t2 USING (a); INSERT INTO nullkey_c2_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a)) q JOIN nullkey_c1_t2 USING (a);
DEBUG: cannot perform a lateral outer join when a distributed subquery references a reference table DEBUG: cannot perform a lateral outer join when a distributed subquery references a reference table
@ -402,8 +478,12 @@ DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table JOIN nullkey_c1_t1 USING (a)) q JOIN matview USING (a); INSERT INTO nullkey_c1_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table JOIN nullkey_c1_t1 USING (a)) q JOIN matview USING (a);
DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Local tables cannot be used in distributed queries.
DETAIL: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "matview" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM insert_select_single_shard_table.matview WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT q.a, q.b FROM ((SELECT reference_table.a, reference_table.b FROM (insert_select_single_shard_table.reference_table JOIN insert_select_single_shard_table.nullkey_c1_t1 USING (a))) q JOIN (SELECT matview_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) matview_1) matview USING (a))
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use append / range distributed tables -- use append / range distributed tables
INSERT INTO nullkey_c1_t1 SELECT * FROM range_table; INSERT INTO nullkey_c1_t1 SELECT * FROM range_table;
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries
@ -416,9 +496,9 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator
SELECT avg(a), avg(b) FROM nullkey_c1_t1 ORDER BY 1, 2; SELECT avg(a), avg(b) FROM nullkey_c1_t1 ORDER BY 1, 2;
DEBUG: Distributed planning for a fast-path router query DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
avg | avg avg | avg
--------------------------------------------------------------------- ---------------------------------------------------------------------
5.8611111111111111 | 13.9305555555555556 5.6971153846153846 | 8.4903846153846154
(1 row) (1 row)
SELECT avg(a), avg(b) FROM nullkey_c2_t1 ORDER BY 1, 2; SELECT avg(a), avg(b) FROM nullkey_c2_t1 ORDER BY 1, 2;
@ -426,7 +506,7 @@ DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
avg | avg avg | avg
--------------------------------------------------------------------- ---------------------------------------------------------------------
3.8750000000000000 | 3.8750000000000000 3.9864864864864865 | 3.9864864864864865
(1 row) (1 row)
TRUNCATE nullkey_c1_t1, nullkey_c2_t1; TRUNCATE nullkey_c1_t1, nullkey_c2_t1;
@ -448,8 +528,15 @@ WITH cte_1 AS (
INSERT INTO postgres_local_table SELECT cte_1.* FROM cte_1 LEFT JOIN nullkey_c1_t2 USING (a) WHERE nullkey_c1_t2.a IS NULL; INSERT INTO postgres_local_table SELECT cte_1.* FROM cte_1 LEFT JOIN nullkey_c1_t2 USING (a) WHERE nullkey_c1_t2.a IS NULL;
DEBUG: Creating router plan DEBUG: Creating router plan
INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1 EXCEPT SELECT * FROM postgres_local_table; INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1 EXCEPT SELECT * FROM postgres_local_table;
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Local tables cannot be used in distributed queries.
DETAIL: Local tables cannot be used in distributed queries. DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM insert_select_single_shard_table.postgres_local_table
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: generating subplan XXX_2 for subquery SELECT a, b FROM insert_select_single_shard_table.nullkey_c1_t1
DEBUG: Creating router plan
DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) EXCEPT SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) citus_insert_select_subquery
DEBUG: Creating router plan
SELECT avg(a), avg(b) FROM postgres_local_table ORDER BY 1, 2; SELECT avg(a), avg(b) FROM postgres_local_table ORDER BY 1, 2;
avg | avg avg | avg
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -459,6 +546,7 @@ SELECT avg(a), avg(b) FROM postgres_local_table ORDER BY 1, 2;
TRUNCATE postgres_local_table; TRUNCATE postgres_local_table;
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
-- Try slightly more complex queries. -- Try slightly more complex queries.
SET client_min_messages TO DEBUG1;
WITH cte_1 AS ( WITH cte_1 AS (
SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a) SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a)
), ),
@ -470,8 +558,12 @@ SELECT cte_1.* FROM cte_1 JOIN cte_2 USING (a) JOIN distributed_table_c1_t2 USIN
DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
DEBUG: CTE cte_1 is going to be inlined via distributed planning DEBUG: CTE cte_1 is going to be inlined via distributed planning
DEBUG: CTE cte_2 is going to be inlined via distributed planning DEBUG: CTE cte_2 is going to be inlined via distributed planning
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Wrapping relation "postgres_local_table" to a subquery
DETAIL: Local tables cannot be used in distributed queries. DEBUG: generating subplan XXX_1 for subquery SELECT b FROM insert_select_single_shard_table.postgres_local_table WHERE true
DEBUG: generating subplan XXX_2 for subquery SELECT nullkey_c1_t1.a, reference_table.b FROM (insert_select_single_shard_table.nullkey_c1_t1 JOIN insert_select_single_shard_table.reference_table USING (a))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM (SELECT cte_1.a, cte_1.b FROM (((SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 JOIN (SELECT reference_table.a, postgres_local_table.b FROM ((SELECT NULL::integer AS a, postgres_local_table_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) postgres_local_table_1) postgres_local_table LEFT JOIN insert_select_single_shard_table.reference_table USING (b))) cte_2 USING (a)) JOIN insert_select_single_shard_table.distributed_table_c1_t2 USING (a)) ORDER BY cte_1.a, cte_1.b) citus_insert_select_subquery
DEBUG: performing repartitioned INSERT ... SELECT
SET client_min_messages TO DEBUG2;
WITH cte_1 AS ( WITH cte_1 AS (
SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a) SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a)
), ),
@ -521,8 +613,13 @@ CROSS JOIN (
SELECT b FROM nullkey_c2_t1 ORDER BY b LIMIT 1 SELECT b FROM nullkey_c2_t1 ORDER BY b LIMIT 1
) t2; ) t2;
DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: router planner does not support queries that reference non-colocated distributed tables
DETAIL: router planner does not support queries that reference non-colocated distributed tables DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: generating subplan XXX_1 for subquery SELECT b FROM insert_select_single_shard_table.nullkey_c2_t1 ORDER BY b LIMIT 1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT t1.a, t2.b FROM (insert_select_single_shard_table.nullkey_c1_t1 t1 CROSS JOIN (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) t2)
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 (a, b) INSERT INTO distributed_table_c1_t1 (a, b)
SELECT t1.a, t2.b SELECT t1.a, t2.b
FROM reference_table t1 FROM reference_table t1
@ -547,8 +644,12 @@ JOIN (
) t2 ON t1.b = t2.b ) t2 ON t1.b = t2.b
WHERE t2.rn > 2; WHERE t2.rn > 2;
DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: router planner does not support queries that reference non-colocated distributed tables
DETAIL: router planner does not support queries that reference non-colocated distributed tables DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_1 for subquery SELECT b, row_number() OVER (ORDER BY b DESC) AS rn FROM insert_select_single_shard_table.distributed_table_c2_t1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT t1.a, t2.b FROM (insert_select_single_shard_table.nullkey_c1_t1 t1 JOIN (SELECT q.rn, q.b FROM (SELECT intermediate_result.b, intermediate_result.rn FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer, rn bigint)) q) t2 ON ((t1.b OPERATOR(pg_catalog.=) t2.b))) WHERE (t2.rn OPERATOR(pg_catalog.>) 2)
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 (a, b) INSERT INTO distributed_table_c1_t1 (a, b)
SELECT t1.a, t2.b SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1 FROM nullkey_c1_t1 t1
@ -567,9 +668,6 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Temporaryly reduce the verbosity to avoid noise -- Temporaryly reduce the verbosity to avoid noise
-- in the output of the next query. -- in the output of the next query.
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
-- MultiTaskRouterSelectQuerySupported() is unnecessarily restrictive
-- about pushing down queries with DISTINCT ON clause even if the table
-- doesn't have a shard key. See https://github.com/citusdata/citus/pull/6752.
INSERT INTO nullkey_c1_t1 SELECT DISTINCT ON (a) a, b FROM nullkey_c1_t2; INSERT INTO nullkey_c1_t1 SELECT DISTINCT ON (a) a, b FROM nullkey_c1_t2;
SET client_min_messages TO DEBUG2; SET client_min_messages TO DEBUG2;
-- Similarly, we could push down the following query as well. see -- Similarly, we could push down the following query as well. see
@ -597,8 +695,12 @@ WHERE t1.a NOT IN (
SELECT DISTINCT t2.a FROM distributed_table_c1_t2 AS t2 SELECT DISTINCT t2.a FROM distributed_table_c1_t2 AS t2
); );
DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: Router planner cannot handle multi-shard select queries
DETAIL: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_1 for subquery SELECT DISTINCT a FROM insert_select_single_shard_table.distributed_table_c1_t2 t2
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM insert_select_single_shard_table.nullkey_c1_t1 t1 WHERE (NOT (a OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer))))
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 INSERT INTO distributed_table_c1_t1
SELECT t1.a, t1.b SELECT t1.a, t1.b
FROM reference_table AS t1 FROM reference_table AS t1
@ -664,23 +766,30 @@ INSERT INTO upsert_test_2 (key, value) VALUES (1, '5') ON CONFLICT(key)
DEBUG: Creating router plan DEBUG: Creating router plan
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col)
DO UPDATE SET other_col = (SELECT count(*) from upsert_test_1); DO UPDATE SET other_col = (SELECT count(*) from upsert_test_1);
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: subqueries are not supported within INSERT queries
DETAIL: subqueries are not supported within INSERT queries HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: generating subplan XXX_1 for subquery SELECT count(*) AS count FROM insert_select_single_shard_table.upsert_test_1
DEBUG: Plan XXX query after replacing subqueries and CTEs: INSERT INTO insert_select_single_shard_table.upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT(unique_col) DO UPDATE SET other_col = (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint))
DEBUG: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
ERROR: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col)
DO UPDATE SET other_col = random()::int; DO UPDATE SET other_col = random()::int;
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: functions used in the DO UPDATE SET clause of INSERTs on distributed tables must be marked IMMUTABLE
DETAIL: functions used in the DO UPDATE SET clause of INSERTs on distributed tables must be marked IMMUTABLE ERROR: functions used in the DO UPDATE SET clause of INSERTs on distributed tables must be marked IMMUTABLE
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col)
DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int; DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int;
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE
DETAIL: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE
INSERT INTO upsert_test_1 VALUES (3, 5, 7); INSERT INTO upsert_test_1 VALUES (3, 5, 7);
DEBUG: Creating router plan DEBUG: Creating router plan
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) WHERE unique_col = random()::int INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) WHERE unique_col = random()::int
DO UPDATE SET other_col = 5; DO UPDATE SET other_col = 5;
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables DEBUG: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE
DETAIL: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE
CREATE TABLE upsert_test_3 (key_1 int, key_2 bigserial, value text DEFAULT 'default_value', PRIMARY KEY (key_1, key_2)); CREATE TABLE upsert_test_3 (key_1 int, key_2 bigserial, value text DEFAULT 'default_value', PRIMARY KEY (key_1, key_2));
DEBUG: CREATE TABLE will create implicit sequence "upsert_test_3_key_2_seq" for serial column "upsert_test_3.key_2" DEBUG: CREATE TABLE will create implicit sequence "upsert_test_3_key_2_seq" for serial column "upsert_test_3.key_2"
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "upsert_test_3_pkey" for table "upsert_test_3" DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "upsert_test_3_pkey" for table "upsert_test_3"

View File

@ -152,7 +152,7 @@ ORDER BY 1;
-- Full outer join with different distribution column types, should error out -- Full outer join with different distribution column types, should error out
SELECT * FROM test_table_1 full join test_table_2 using(id); SELECT * FROM test_table_1 full join test_table_2 using(id);
ERROR: cannot push down this subquery ERROR: cannot push down this subquery
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning DETAIL: test_table_1 and test_table_2 are not colocated
-- Test when the non-distributed column has the value of NULL -- Test when the non-distributed column has the value of NULL
INSERT INTO test_table_1 VALUES(7, NULL); INSERT INTO test_table_1 VALUES(7, NULL);
INSERT INTO test_table_2 VALUES(7, NULL); INSERT INTO test_table_2 VALUES(7, NULL);

View File

@ -725,7 +725,7 @@ SET value_2 = 5
FROM events_test_table_2 FROM events_test_table_2
WHERE users_test_table.user_id = events_test_table_2.user_id; WHERE users_test_table.user_id = events_test_table_2.user_id;
ERROR: cannot push down this subquery ERROR: cannot push down this subquery
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning DETAIL: users_test_table and events_test_table_2 are not colocated
-- Should error out due to multiple row return from subquery, but we can not get this information within -- Should error out due to multiple row return from subquery, but we can not get this information within
-- subquery pushdown planner. This query will be sent to worker with recursive planner. -- subquery pushdown planner. This query will be sent to worker with recursive planner.
\set VERBOSITY terse \set VERBOSITY terse

File diff suppressed because it is too large Load Diff

View File

@ -627,13 +627,13 @@ CREATE TABLE local_table_for_fkey (a INT PRIMARY KEY);
ALTER TABLE "NULL_!_dist_key"."nullKeyTable.1!?!9012345678901234567890123456789012345678901234567890123456789" ALTER TABLE "NULL_!_dist_key"."nullKeyTable.1!?!9012345678901234567890123456789012345678901234567890123456789"
ADD CONSTRAINT fkey_to_dummy_local FOREIGN KEY (id) REFERENCES local_table_for_fkey(a); ADD CONSTRAINT fkey_to_dummy_local FOREIGN KEY (id) REFERENCES local_table_for_fkey(a);
-- Normally, we support foreign keys from Postgres tables to distributed -- foreign key from a local table
-- tables assuming that the user will soon distribute the local table too
-- anyway. However, this is not the case for single-shard tables before
-- we improve SQL support.
ALTER TABLE local_table_for_fkey ALTER TABLE local_table_for_fkey
ADD CONSTRAINT fkey_from_dummy_local FOREIGN KEY (a) REFERENCES "NULL_!_dist_key"."nullKeyTable.1!?!9012345678901234567890123456789012345678901234567890123456789"(id); ADD CONSTRAINT fkey_from_dummy_local FOREIGN KEY (a) REFERENCES "NULL_!_dist_key"."nullKeyTable.1!?!9012345678901234567890123456789012345678901234567890123456789"(id);
SELECT create_distributed_table('local_table_for_fkey', null, colocate_with=>'none');
SELECT create_distributed_table('local_table_for_fkey', null, colocate_with=>'"NULL_!_dist_key"."nullKeyTable.1!?!9012345678901234567890123456789012345678901234567890123456789"');
-- foreign key to a citus local table, errors out -- foreign key to a citus local table, errors out
CREATE TABLE citus_local_table_for_fkey (a INT PRIMARY KEY); CREATE TABLE citus_local_table_for_fkey (a INT PRIMARY KEY);
SELECT citus_add_local_table_to_metadata('citus_local_table_for_fkey'); SELECT citus_add_local_table_to_metadata('citus_local_table_for_fkey');

View File

@ -98,22 +98,28 @@ INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM
-- use a colocated single-shard table -- use a colocated single-shard table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b); INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b);
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a); INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a);
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN matview USING (a); INSERT INTO distributed_table_c1_t1 SELECT COALESCE(nullkey_c1_t1.a, 1), nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN matview USING (a);
INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c1_t2; INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c1_t2;
-- use a non-colocated single-shard table -- use a non-colocated single-shard table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a); INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a);
INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c2_t1; INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c2_t1;
-- use a distributed table that is colocated with the target table SET client_min_messages TO DEBUG1;
SET citus.enable_repartition_joins TO ON;
-- use a distributed table that is colocated with the target table, with repartition joins enabled
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b); INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b);
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1; INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1;
-- use a distributed table that is not colocated with the target table -- use a distributed table that is not colocated with the target table, with repartition joins enabled
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN distributed_table_c2_t1 USING (a); INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN distributed_table_c2_t1 USING (a);
RESET citus.enable_repartition_joins;
SET client_min_messages TO DEBUG2;
-- use a citus local table -- use a citus local table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a); INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
@ -148,11 +154,18 @@ INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a); INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a);
-- use a distributed table -- use a distributed table
SET client_min_messages TO DEBUG1;
SET citus.enable_repartition_joins TO ON;
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b); INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b);
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1; INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1;
RESET citus.enable_repartition_joins;
SET client_min_messages TO DEBUG2;
-- use a citus local table -- use a citus local table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a); INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
@ -176,7 +189,11 @@ INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullk
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b); INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b);
-- use a distributed table -- use a distributed table
SET client_min_messages TO DEBUG1;
SET citus.enable_repartition_joins TO ON;
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
RESET citus.enable_repartition_joins;
SET client_min_messages TO DEBUG2;
-- use a citus local table -- use a citus local table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a); INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
@ -204,7 +221,12 @@ INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM c
-- use a distributed table -- use a distributed table
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2; INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2;
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN reference_table USING (a); INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN reference_table USING (a);
SET client_min_messages TO DEBUG1;
SET citus.enable_repartition_joins TO ON;
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN nullkey_c1_t1 USING (a); INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN nullkey_c1_t1 USING (a);
RESET citus.enable_repartition_joins;
SET client_min_messages TO DEBUG2;
-- use a non-colocated single-shard table -- use a non-colocated single-shard table
INSERT INTO nullkey_c2_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a)) q JOIN nullkey_c1_t2 USING (a); INSERT INTO nullkey_c2_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a)) q JOIN nullkey_c1_t2 USING (a);
@ -244,6 +266,8 @@ INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
-- Try slightly more complex queries. -- Try slightly more complex queries.
SET client_min_messages TO DEBUG1;
WITH cte_1 AS ( WITH cte_1 AS (
SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a) SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a)
), ),
@ -253,6 +277,8 @@ cte_2 AS (
INSERT INTO distributed_table_c1_t1 INSERT INTO distributed_table_c1_t1
SELECT cte_1.* FROM cte_1 JOIN cte_2 USING (a) JOIN distributed_table_c1_t2 USING (a) ORDER BY 1,2; SELECT cte_1.* FROM cte_1 JOIN cte_2 USING (a) JOIN distributed_table_c1_t2 USING (a) ORDER BY 1,2;
SET client_min_messages TO DEBUG2;
WITH cte_1 AS ( WITH cte_1 AS (
SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a) SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a)
), ),
@ -326,9 +352,6 @@ WHERE t2.sum_val > 2;
-- in the output of the next query. -- in the output of the next query.
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
-- MultiTaskRouterSelectQuerySupported() is unnecessarily restrictive
-- about pushing down queries with DISTINCT ON clause even if the table
-- doesn't have a shard key. See https://github.com/citusdata/citus/pull/6752.
INSERT INTO nullkey_c1_t1 SELECT DISTINCT ON (a) a, b FROM nullkey_c1_t2; INSERT INTO nullkey_c1_t1 SELECT DISTINCT ON (a) a, b FROM nullkey_c1_t2;
SET client_min_messages TO DEBUG2; SET client_min_messages TO DEBUG2;

View File

@ -109,6 +109,44 @@ SELECT create_distributed_table('range_table', 'a', 'range');
CALL public.create_range_partitioned_shards('range_table', '{"0","25"}','{"24","49"}'); CALL public.create_range_partitioned_shards('range_table', '{"0","25"}','{"24","49"}');
INSERT INTO range_table VALUES (0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 50); INSERT INTO range_table VALUES (0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 50);
\set users_table_data_file :abs_srcdir '/data/users_table.data'
\set events_table_data_file :abs_srcdir '/data/events_table.data'
CREATE TABLE users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint);
SELECT create_distributed_table('users_table', null, colocate_with=>'none');
\set client_side_copy_command '\\copy users_table FROM ' :'users_table_data_file' ' WITH CSV;'
:client_side_copy_command
CREATE TABLE non_colocated_users_table (id int, value int);
SELECT create_distributed_table('non_colocated_users_table', null, colocate_with => 'none');
INSERT INTO non_colocated_users_table (id, value) VALUES(1, 2),(2, 3),(3,4);
CREATE TABLE colocated_events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint);
SELECT create_distributed_table('colocated_events_table', null, colocate_with=>'users_table');
\set client_side_copy_command '\\copy colocated_events_table FROM ' :'events_table_data_file' ' WITH CSV;'
:client_side_copy_command
CREATE TABLE non_colocated_events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint);
SELECT create_distributed_table('non_colocated_events_table', null, colocate_with=>'non_colocated_users_table');
\set client_side_copy_command '\\copy non_colocated_events_table FROM ' :'events_table_data_file' ' WITH CSV;'
:client_side_copy_command
CREATE TABLE users_table_local AS SELECT * FROM users_table;
CREATE TABLE colocated_users_table (id int, value int);
SELECT create_distributed_table('colocated_users_table', null, colocate_with => 'users_table');
INSERT INTO colocated_users_table (id, value) VALUES(1, 2),(2, 3),(3,4);
CREATE TABLE users_reference_table (like users_table including all);
SELECT create_reference_table('users_reference_table');
CREATE TABLE events_reference_table (like colocated_events_table including all);
SELECT create_reference_table('events_reference_table');
CREATE FUNCTION func() RETURNS TABLE (id int, value int) AS $$
SELECT 1, 2
$$ LANGUAGE SQL;
SET client_min_messages to DEBUG2; SET client_min_messages to DEBUG2;
-- simple insert -- simple insert
@ -155,9 +193,13 @@ SET citus.enable_non_colocated_router_query_pushdown TO ON;
SELECT COUNT(*) FROM nullkey_c1_t1 JOIN nullkey_c3_t1 USING(a); SELECT COUNT(*) FROM nullkey_c1_t1 JOIN nullkey_c3_t1 USING(a);
SET citus.enable_non_colocated_router_query_pushdown TO OFF; SET citus.enable_non_colocated_router_query_pushdown TO OFF;
SET citus.enable_repartition_joins TO ON;
SET client_min_messages TO DEBUG1;
SELECT COUNT(*) FROM nullkey_c1_t1 JOIN nullkey_c3_t1 USING(a); SELECT COUNT(*) FROM nullkey_c1_t1 JOIN nullkey_c3_t1 USING(a);
SET client_min_messages TO DEBUG2;
SET citus.enable_repartition_joins TO OFF;
RESET citus.enable_non_colocated_router_query_pushdown; RESET citus.enable_non_colocated_router_query_pushdown;
-- colocated join between single-shard tables -- colocated join between single-shard tables
@ -191,13 +233,23 @@ WHERE t1.b NOT IN (
); );
-- non-colocated inner joins between single-shard tables -- non-colocated inner joins between single-shard tables
SET client_min_messages to DEBUG1;
SET citus.enable_repartition_joins TO ON;
SELECT * FROM nullkey_c1_t1 JOIN nullkey_c2_t1 USING(a) ORDER BY 1,2,3; SELECT * FROM nullkey_c1_t1 JOIN nullkey_c2_t1 USING(a) ORDER BY 1,2,3;
SELECT * FROM (SELECT * FROM nullkey_c1_t1) nullkey_c1_t1 JOIN nullkey_c2_t1 USING(a) ORDER BY 1,2,3;
SELECT * FROM nullkey_c2_t1 JOIN (SELECT * FROM nullkey_c1_t1) nullkey_c1_t1 USING(a) ORDER BY 1,2,3;
SELECT COUNT(*) FROM nullkey_c1_t1 t1 SELECT COUNT(*) FROM nullkey_c1_t1 t1
JOIN LATERAL ( JOIN LATERAL (
SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a SELECT * FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a
) q USING(a); ) q USING(a);
SET citus.enable_repartition_joins TO OFF;
SET client_min_messages to DEBUG2;
-- non-colocated outer joins between single-shard tables -- non-colocated outer joins between single-shard tables
SELECT * FROM nullkey_c1_t1 LEFT JOIN nullkey_c2_t2 USING(a) ORDER BY 1,2,3 LIMIT 4; SELECT * FROM nullkey_c1_t1 LEFT JOIN nullkey_c2_t2 USING(a) ORDER BY 1,2,3 LIMIT 4;
SELECT * FROM nullkey_c1_t1 FULL JOIN nullkey_c2_t2 USING(a) ORDER BY 1,2,3 LIMIT 4; SELECT * FROM nullkey_c1_t1 FULL JOIN nullkey_c2_t2 USING(a) ORDER BY 1,2,3 LIMIT 4;
@ -234,22 +286,57 @@ WITH cte_1 AS
SELECT COUNT(*) FROM cte_1; SELECT COUNT(*) FROM cte_1;
-- join with postgres / citus local tables -- join with postgres / citus local tables
SELECT * FROM nullkey_c1_t1 JOIN postgres_local_table USING(a); SELECT * FROM nullkey_c1_t1 JOIN postgres_local_table USING(a) ORDER BY 1,2,3;
SELECT * FROM nullkey_c1_t1 JOIN citus_local_table USING(a); SELECT * FROM nullkey_c1_t1 JOIN citus_local_table USING(a) ORDER BY 1,2,3;
SET citus.local_table_join_policy TO 'prefer-distributed';
SELECT * FROM nullkey_c1_t1 JOIN citus_local_table USING(a) ORDER BY 1,2,3;
RESET citus.local_table_join_policy;
-- join with a distributed table -- join with a distributed table
SELECT * FROM distributed_table d1 JOIN nullkey_c1_t1 USING(a);
SET citus.enable_repartition_joins TO ON;
SET client_min_messages TO DEBUG1;
SELECT * FROM distributed_table d1 JOIN nullkey_c1_t1 USING(a) ORDER BY 1,2,3;
SELECT * FROM (SELECT * FROM distributed_table) d1 JOIN nullkey_c1_t1 USING(a) ORDER BY 1,2,3;
SELECT * FROM nullkey_c1_t1 JOIN (SELECT * FROM distributed_table) d1 USING(a) ORDER BY 1,2,3;
SELECT * FROM distributed_table d1 JOIN (SELECT * FROM nullkey_c1_t1) nullkey_c1_t1 USING(a) ORDER BY 1,2,3;
SELECT * FROM (SELECT * FROM nullkey_c1_t1) nullkey_c1_t1 JOIN distributed_table d1 USING(a) ORDER BY 1,2,3;
-- test joins with non-colocated distributed tables, by using subqueries
SELECT * FROM nullkey_c1_t1 t1 JOIN (SELECT * FROM distributed_table) t2 USING (a) JOIN (SELECT * FROM nullkey_c1_t2) t3 USING (a) ORDER BY 1,2,3,4 LIMIT 1;
SELECT * FROM (SELECT * FROM nullkey_c1_t1) t1 JOIN nullkey_c2_t1 t2 USING (a) JOIN (SELECT * FROM nullkey_c1_t2) t3 USING (a) ORDER BY 1,2,3,4 LIMIT 1;
SELECT * FROM distributed_table t1 JOIN (SELECT * FROM nullkey_c1_t1) t2 USING (a) JOIN (SELECT b as a FROM distributed_table) t3 USING (a) ORDER BY 1,2,3 LIMIT 1;
SELECT * FROM (SELECT * FROM nullkey_c2_t1) t1 JOIN nullkey_c1_t1 t2 USING (a) JOIN (SELECT * FROM nullkey_c2_t1) t3 USING (a) ORDER BY 1,2,3,4 LIMIT 1;
SELECT * FROM nullkey_c1_t1 t1 JOIN (SELECT * FROM distributed_table) t2 USING (a) JOIN (SELECT * FROM distributed_table) t3 USING (a) ORDER BY 1,2,3,4 LIMIT 1;
SELECT * FROM (SELECT * FROM nullkey_c1_t1) t1 JOIN nullkey_c2_t1 t2 USING (a) JOIN (SELECT * FROM nullkey_c2_t1) t3 USING (a) ORDER BY 1,2,3,4 LIMIT 1;
SELECT * FROM distributed_table t1 JOIN (SELECT * FROM nullkey_c1_t1) t2 USING (a) JOIN (SELECT * FROM nullkey_c1_t1) t3 USING (a) ORDER BY 1,2,3,4 LIMIT 1;
SELECT * FROM (SELECT * FROM nullkey_c2_t1) t1 JOIN nullkey_c1_t1 t2 USING (a) JOIN (SELECT * FROM nullkey_c1_t1) t3 USING (a) ORDER BY 1,2,3,4 LIMIT 1;
SELECT * FROM nullkey_c1_t1 t1 JOIN (SELECT * FROM nullkey_c1_t1) t2 USING (a) JOIN distributed_table t3 USING (a) ORDER BY 1,2,3,4 LIMIT 1;
SELECT * FROM nullkey_c1_t1 t1 JOIN nullkey_c1_t1 t2 USING (a) JOIN nullkey_c2_t1 t3 USING (a) ORDER BY 1,2,3,4 LIMIT 1;
SELECT * FROM (SELECT * FROM distributed_table) t1 JOIN distributed_table t2 USING (a) JOIN (SELECT * FROM nullkey_c1_t1) t3 USING (a) ORDER BY 1,2,3,4 LIMIT 1;
SELECT * FROM (SELECT * FROM nullkey_c2_t1) t1 JOIN nullkey_c2_t1 t2 USING (a) JOIN (SELECT * FROM nullkey_c1_t1) t3 USING (a) ORDER BY 1,2,3,4 LIMIT 1;
SELECT COUNT(*) FROM nullkey_c1_t1 t1 SELECT COUNT(*) FROM nullkey_c1_t1 t1
JOIN LATERAL ( JOIN LATERAL (
SELECT * FROM distributed_table t2 WHERE t2.b > t1.a SELECT * FROM distributed_table t2 WHERE t2.b > t1.a
) q USING(a); ) q USING(a);
SELECT COUNT(*) FROM nullkey_c1_t1 t1
JOIN LATERAL (
SELECT *, random() FROM distributed_table t2 WHERE t2.b > t1.a
) q USING(a);
SELECT COUNT(*) FROM distributed_table t1 SELECT COUNT(*) FROM distributed_table t1
JOIN LATERAL ( JOIN LATERAL (
SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a
) q USING(a); ) q USING(a);
SET client_min_messages TO DEBUG2;
SET citus.enable_repartition_joins TO OFF;
-- outer joins with different table types -- outer joins with different table types
SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN reference_table USING(a); SELECT COUNT(*) FROM nullkey_c1_t1 LEFT JOIN reference_table USING(a);
SELECT COUNT(*) FROM reference_table LEFT JOIN nullkey_c1_t1 USING(a); SELECT COUNT(*) FROM reference_table LEFT JOIN nullkey_c1_t1 USING(a);
@ -264,17 +351,27 @@ SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN citus_local_table USING(a);
SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN postgres_local_table USING(a); SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN postgres_local_table USING(a);
SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN reference_table USING(a); SELECT COUNT(*) FROM nullkey_c1_t1 FULL JOIN reference_table USING(a);
SET citus.enable_repartition_joins TO ON;
SET client_min_messages TO DEBUG1;
SELECT COUNT(*) FROM nullkey_c1_t1 JOIN append_table USING(a); SELECT COUNT(*) FROM nullkey_c1_t1 JOIN append_table USING(a);
SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a); SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a);
SET client_min_messages TO DEBUG2;
SET citus.enable_repartition_joins TO OFF;
SET citus.enable_non_colocated_router_query_pushdown TO ON; SET citus.enable_non_colocated_router_query_pushdown TO ON;
SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a) WHERE range_table.a = 20; SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a) WHERE range_table.a = 20;
SET citus.enable_non_colocated_router_query_pushdown TO OFF; SET citus.enable_non_colocated_router_query_pushdown TO OFF;
SET citus.enable_repartition_joins TO ON;
SET client_min_messages TO DEBUG1;
SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a) WHERE range_table.a = 20; SELECT COUNT(*) FROM nullkey_c1_t1 JOIN range_table USING(a) WHERE range_table.a = 20;
SET client_min_messages TO DEBUG2;
SET citus.enable_repartition_joins TO OFF;
RESET citus.enable_non_colocated_router_query_pushdown; RESET citus.enable_non_colocated_router_query_pushdown;
-- lateral / semi / anti joins with different table types -- lateral / semi / anti joins with different table types
@ -412,6 +509,17 @@ JOIN LATERAL (
SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a SELECT * FROM citus_local_table t2 WHERE t2.b > t1.a
) q USING(a); ) q USING(a);
-- The following and a few other tests in this file unnecessarily go through
-- recursive planning. This is because we recursive plan distributed tables
-- when they are referred in the inner side of an outer join, if the outer
-- side is a recurring rel. In future, we can optimize that such that we
-- can skip recursively planning the single-shard table because such a join
-- wouldn't result in returning recurring tuples.
--
-- And specifically for the tests that contains a sublink (as below), things
-- get even more interesting. We try to recursively plan the single-shard
-- table but we cannot do so due to the sublink. However, the final query
-- can go through router planner and hence is supported.
SELECT COUNT(*) FROM citus_local_table t1 SELECT COUNT(*) FROM citus_local_table t1
LEFT JOIN LATERAL ( LEFT JOIN LATERAL (
SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a
@ -578,6 +686,33 @@ SELECT a.title AS name, (SELECT a2.id FROM articles_hash a2 WHERE a.id = a2.id
AS special_price FROM articles_hash a AS special_price FROM articles_hash a
ORDER BY 1,2; ORDER BY 1,2;
-- test having clause
SELECT COUNT(*), b FROM nullkey_c1_t1 GROUP BY 2
HAVING (SELECT COUNT(*) FROM nullkey_c1_t2) > 0
ORDER BY 1,2;
SELECT COUNT(*), b FROM nullkey_c1_t1 GROUP BY 2
HAVING (SELECT COUNT(*) FROM nullkey_c2_t1) > 0
ORDER BY 1,2;
SELECT COUNT(*), b FROM nullkey_c1_t1 GROUP BY 2
HAVING (SELECT COUNT(*) FROM distributed_table) > 0
ORDER BY 1,2;
SELECT COUNT(*), b FROM nullkey_c1_t1 t4 GROUP BY 2
HAVING (
SELECT COUNT(*) FROM nullkey_c1_t1 t1 JOIN (SELECT * FROM nullkey_c1_t2) t2 USING (a) JOIN (SELECT * FROM nullkey_c1_t2) t3 USING (a)
WHERE t2.b > t4.b
) > 5
ORDER BY 1,2;
SELECT COUNT(*), b FROM distributed_table t4 GROUP BY 2
HAVING (
SELECT COUNT(*) FROM nullkey_c1_t1 t1 JOIN (SELECT * FROM distributed_table) t2 USING (a) JOIN (SELECT * FROM nullkey_c1_t2) t3 USING (a)
WHERE t2.b > t4.b
) > 5
ORDER BY 1,2;
-- test prepared statements -- test prepared statements
-- prepare queries can be router plannable -- prepare queries can be router plannable
@ -1170,5 +1305,391 @@ ORDER BY
rnk DESC, 1 DESC rnk DESC, 1 DESC
LIMIT 10; LIMIT 10;
-- more tests with ctes and subqueries
-- CTEs are recursively planned, and subquery foo is also recursively planned.
-- Then the final plan becomes a router plan.
WITH cte AS MATERIALIZED (
WITH local_cte AS MATERIALIZED (
SELECT * FROM users_table_local
),
dist_cte AS MATERIALIZED (
SELECT user_id FROM colocated_events_table
)
SELECT dist_cte.user_id FROM local_cte JOIN dist_cte ON dist_cte.user_id=local_cte.user_id
)
SELECT count(*)
FROM cte,
(
SELECT DISTINCT users_table.user_id
FROM users_table, colocated_events_table
WHERE users_table.user_id = colocated_events_table.user_id AND event_type IN (1,2,3,4)
ORDER BY 1 DESC LIMIT 5
) AS foo
WHERE foo.user_id = cte.user_id;
-- CTEs are colocated, route entire query.
WITH cte1 AS (
SELECT * FROM users_table WHERE user_id = 1
), cte2 AS (
SELECT * FROM colocated_events_table WHERE user_id = 1
)
SELECT cte1.user_id, cte1.value_1, cte2.user_id, cte2.event_type
FROM cte1, cte2
ORDER BY cte1.user_id, cte1.value_1, cte2.user_id, cte2.event_type
LIMIT 5;
-- CTEs aren't colocated, CTEs become intermediate results.
WITH cte1 AS MATERIALIZED (
SELECT * FROM users_table WHERE user_id = 1
), cte2 AS MATERIALIZED (
SELECT * FROM non_colocated_events_table WHERE user_id = 6
)
SELECT cte1.user_id, cte1.value_1, cte2.user_id, cte2.user_id
FROM cte1, cte2
ORDER BY cte1.user_id, cte1.value_1, cte2.user_id, cte2.event_type
LIMIT 5;
-- users_table & colocated_users_table are colocated, route entire query.
WITH cte1 AS (
SELECT * FROM users_table WHERE user_id = 1
)
UPDATE colocated_users_table dt SET value = cte1.value_1
FROM cte1 WHERE cte1.user_id = dt.id AND dt.id = 1;
-- users_table & non_colocated_users_table are not colocated, cte is recursive planned.
WITH cte1 AS (
SELECT * FROM users_table WHERE user_id = 1
)
UPDATE non_colocated_users_table dt SET value = cte1.value_1
FROM cte1 WHERE cte1.user_id = dt.id AND dt.id = 1;
-- All relations are not colocated, CTEs become intermediate results.
WITH cte1 AS MATERIALIZED (
SELECT * FROM users_table WHERE user_id = 1
), cte2 AS MATERIALIZED (
SELECT * FROM non_colocated_events_table WHERE user_id = 6
)
UPDATE non_colocated_users_table dt SET value = cte1.value_1 + cte2.event_type
FROM cte1, cte2 WHERE cte1.user_id = dt.id AND dt.id = 1;
-- Volatile function calls should not be routed.
WITH cte1 AS MATERIALIZED (SELECT id, value FROM func())
UPDATE colocated_users_table dt SET value = cte1.value
FROM cte1 WHERE dt.id = 1;
-- CTEs are recursively planned, and subquery foo is also recursively planned.
WITH cte AS MATERIALIZED (
WITH local_cte AS MATERIALIZED (
SELECT * FROM users_table_local
),
dist_cte AS MATERIALIZED (
SELECT user_id FROM colocated_events_table
)
SELECT dist_cte.user_id FROM local_cte JOIN dist_cte ON dist_cte.user_id=local_cte.user_id
)
SELECT count(*)
FROM
cte,
(
SELECT DISTINCT users_table.user_id
FROM users_table, colocated_events_table
WHERE users_table.user_id = colocated_events_table.user_id AND event_type IN (1,2,3,4)
ORDER BY 1 DESC LIMIT 5
) AS foo, colocated_events_table
WHERE foo.user_id = cte.user_id AND colocated_events_table.user_id = cte.user_id;
-- CTEs are replaced and subquery in WHERE is also replaced.
WITH cte AS MATERIALIZED (
WITH local_cte AS MATERIALIZED (
SELECT * FROM users_table_local
),
dist_cte AS MATERIALIZED (
SELECT user_id FROM colocated_events_table
)
SELECT dist_cte.user_id FROM local_cte JOIN dist_cte ON dist_cte.user_id=local_cte.user_id
)
SELECT DISTINCT cte.user_id
FROM users_table, cte
WHERE users_table.user_id = cte.user_id AND
users_table.user_id IN (
SELECT DISTINCT value_2 FROM users_table WHERE value_1 >= 1 AND value_1 <= 20 ORDER BY 1 LIMIT 5
)
ORDER BY 1 DESC;
-- Subquery in WHERE clause is planned recursively due to the recurring table
-- in FROM clause.
WITH cte AS MATERIALIZED (
WITH local_cte AS MATERIALIZED (
SELECT * FROM users_table_local
),
dist_cte AS MATERIALIZED (
SELECT user_id FROM colocated_events_table
)
SELECT dist_cte.user_id FROM local_cte JOIN dist_cte ON dist_cte.user_id=local_cte.user_id
)
SELECT DISTINCT cte.user_id
FROM cte
WHERE cte.user_id IN (SELECT DISTINCT user_id FROM users_table WHERE value_1 >= 1 AND value_1 <= 20)
ORDER BY 1 DESC;
-- CTEs inside a subquery and the final query becomes a router
-- query.
SELECT
user_id
FROM
(
WITH cte AS MATERIALIZED (
SELECT DISTINCT users_table.user_id
FROM users_table, colocated_events_table
WHERE users_table.user_id = colocated_events_table.user_id AND
event_type IN (1,2,3,4)
)
SELECT * FROM cte ORDER BY 1 DESC
) AS foo
ORDER BY 1 DESC;
-- CTEs inside a deeper subquery and also the subquery that contains the CTE are
-- recursively planned.
SELECT DISTINCT bar.user_id
FROM
(
WITH cte AS MATERIALIZED (
SELECT DISTINCT users_table.user_id
FROM users_table, colocated_events_table
WHERE users_table.user_id = colocated_events_table.user_id AND event_type IN (1,2,3,4)
)
SELECT * FROM cte ORDER BY 1 DESC
) AS foo,
(
SELECT users_table.user_id, some_events.event_type
FROM
users_table,
(
WITH cte AS MATERIALIZED (
SELECT event_type, users_table.user_id
FROM users_table, colocated_events_table
WHERE users_table.user_id = colocated_events_table.user_id AND value_1 IN (1,2)
) SELECT * FROM cte ORDER BY 1 DESC
) AS some_events
WHERE users_table.user_id = some_events.user_id AND event_type IN (1,2,3,4)
ORDER BY 2,1 LIMIT 2
) AS bar
WHERE foo.user_id = bar.user_id
ORDER BY 1 DESC LIMIT 5;
-- Recursively plan subqueries inside the CTEs that contains LIMIT and OFFSET.
WITH cte AS MATERIALIZED (
WITH local_cte AS MATERIALIZED (
SELECT * FROM users_table_local
),
dist_cte AS MATERIALIZED (
SELECT
user_id
FROM
colocated_events_table,
(SELECT DISTINCT value_2 FROM users_table OFFSET 0) as foo
WHERE
colocated_events_table.user_id = foo.value_2 AND
colocated_events_table.user_id IN (SELECT DISTINCT value_1 FROM users_table ORDER BY 1 LIMIT 3)
)
SELECT dist_cte.user_id FROM local_cte JOIN dist_cte ON dist_cte.user_id=local_cte.user_id
)
SELECT count(*)
FROM
cte,
(
SELECT DISTINCT users_table.user_id
FROM users_table, colocated_events_table
WHERE users_table.user_id = colocated_events_table.user_id AND event_type IN (1,2,3,4)
ORDER BY 1 DESC LIMIT 5
) AS foo
WHERE foo.user_id = cte.user_id;
-- more tests with sublinks and subqueries in targetlist
SELECT event_type, (SELECT e.value_2 FROM users_reference_table WHERE user_id = 1 AND value_1 = 1), (SELECT e.value_2)
FROM non_colocated_events_table e
ORDER BY 1,2 LIMIT 1;
SELECT event_type, (SELECT time FROM users_table WHERE user_id = e.user_id ORDER BY time LIMIT 1)
FROM non_colocated_events_table e
ORDER BY 1,2 LIMIT 1;
SELECT event_type, (SELECT max(time) FROM users_table WHERE user_id = e.value_2)
FROM non_colocated_events_table e
ORDER BY 1,2 LIMIT 1;
SELECT event_type, (SELECT max(time) FROM users_table)
FROM non_colocated_events_table e
ORDER BY 1,2 LIMIT 1;
WITH cte_1 AS (SELECT max(time) FROM users_table)
SELECT event_type, (SELECT * FROM cte_1)
FROM non_colocated_events_table e
ORDER BY 1,2 LIMIT 1;
WITH cte_1 AS (SELECT max(time) FROM users_table)
SELECT event_type, (SELECT * FROM cte_1 LIMIT 1)
FROM non_colocated_events_table e
ORDER BY 1,2 LIMIT 1;
WITH cte_1 AS (SELECT max(time) m FROM users_table)
SELECT count(*), (SELECT * FROM cte_1 c1 join cte_1 c2 using (m))
FROM non_colocated_events_table e
GROUP BY 2
ORDER BY 1,2 LIMIT 1;
WITH cte_1 AS (SELECT min(user_id) u, max(time) m FROM users_table)
SELECT count(*), (SELECT max(time) FROM users_table WHERE user_id = cte_1.u GROUP BY user_id)
FROM cte_1
GROUP BY 2
ORDER BY 1,2 LIMIT 1;
SELECT sum(e.user_id) + (SELECT max(value_3) FROM users_table WHERE user_id = e.user_id GROUP BY user_id)
FROM non_colocated_events_table e
GROUP BY e.user_id
ORDER BY 1 LIMIT 3;
SELECT e.user_id, sum((SELECT any_value(value_3) FROM users_reference_table WHERE user_id = e.user_id GROUP BY user_id)) OVER (PARTITION BY e.user_id)
FROM non_colocated_events_table e
ORDER BY 1, 2 LIMIT 3;
SELECT (SELECT (SELECT e.user_id + user_id) FROM users_table WHERE user_id = e.user_id GROUP BY user_id)
FROM non_colocated_events_table e
GROUP BY 1
ORDER BY 1 LIMIT 3;
SELECT (SELECT (SELECT e.user_id + user_id) FROM users_reference_table WHERE user_id = e.user_id GROUP BY user_id)
FROM non_colocated_events_table e
GROUP BY 1
ORDER BY 1 LIMIT 3;
WITH cte_1 AS (SELECT user_id FROM users_table ORDER BY 1 LIMIT 1)
SELECT (SELECT (SELECT e.user_id + user_id) FROM cte_1 WHERE user_id = e.user_id GROUP BY user_id)
FROM non_colocated_events_table e
GROUP BY 1
ORDER BY 1 LIMIT 3;
SELECT (SELECT (SELECT e.user_id + user_id) FROM (SELECT 1 AS user_id) s WHERE user_id = e.user_id GROUP BY user_id)
FROM non_colocated_events_table e
GROUP BY 1
ORDER BY 1 LIMIT 3;
CREATE TEMP VIEW view_1 AS (SELECT user_id, value_2 FROM users_table WHERE user_id = 1 AND value_1 = 1 ORDER BY 1,2);
SELECT (SELECT value_2 FROM view_1 WHERE user_id = e.user_id GROUP BY value_2)
FROM non_colocated_events_table e
GROUP BY 1
ORDER BY 1 LIMIT 3;
SELECT
user_id, count(*)
FROM
non_colocated_events_table e1
GROUP BY user_id
HAVING
count(*) > (SELECT count(*) FROM (SELECT
(SELECT sum(user_id) FROM users_table WHERE user_id = u1.user_id GROUP BY user_id)
FROM users_table u1
GROUP BY user_id) as foo) ORDER BY 1 DESC;
SELECT count(*) FROM (SELECT
(SELECT user_id FROM users_table WHERE user_id = u1.user_id FOR UPDATE)
FROM users_table u1
GROUP BY user_id) as foo;
-- test single hash repartition join
SET citus.log_multi_join_order TO ON;
SET client_min_messages TO DEBUG1;
SET citus.enable_repartition_joins TO ON;
SET citus.enable_single_hash_repartition_joins TO ON;
SELECT count(*) FROM nullkey_c1_t1 JOIN distributed_table USING(a);
select count(*) from nullkey_c1_t1 JOIN nullkey_c2_t2 USING(a);
RESET citus.log_multi_join_order;
SET client_min_messages TO DEBUG2;
RESET citus.enable_repartition_joins;
RESET citus.enable_single_hash_repartition_joins;
SET client_min_messages TO DEBUG1;
SET citus.enable_repartition_joins TO ON;
SET citus.log_multi_join_order TO ON;
SELECT count(*), avg(avgsub.a)
FROM (
SELECT table_0.a
FROM reference_table AS table_0
INNER JOIN nullkey_c1_t1 AS table_1 USING (a)
INNER JOIN reference_table AS table_2 USING (a)
INNER JOIN nullkey_c2_t1 AS table_3 USING (a)
ORDER BY a LIMIT 7
) AS avgsub;
SET citus.enable_single_hash_repartition_joins TO ON;
-- We prefer dual-hash repartition join over single-hash repartition join
-- even if citus.enable_single_hash_repartition_joins is set to ON. This
-- happens because single shard tables don't have a shard key.
SELECT count(*), avg(avgsub.a)
FROM (
SELECT table_0.a
FROM reference_table AS table_0
INNER JOIN nullkey_c1_t1 AS table_1 USING (a)
INNER JOIN reference_table AS table_2 USING (a)
INNER JOIN nullkey_c2_t1 AS table_3 USING (a)
ORDER BY a LIMIT 7
) AS avgsub;
RESET citus.enable_single_hash_repartition_joins;
SET client_min_messages TO DEBUG2;
RESET citus.enable_repartition_joins;
RESET citus.log_multi_join_order;
SELECT count(*), avg(avgsub.a)
FROM (
SELECT table_0.a
FROM nullkey_c1_t1 AS table_0
RIGHT JOIN (
SELECT table_2.a FROM (
SELECT table_3.a FROM nullkey_c2_t1 AS table_3
ORDER BY a LIMIT 0
) AS table_2
INNER JOIN nullkey_c2_t1 AS table_4 USING (a)
WHERE table_4.a < 8
) AS table_1 USING (a)
) AS avgsub;
-- test nested exec
CREATE FUNCTION dist_query_single_shard(p_key int)
RETURNS bigint
LANGUAGE plpgsql AS $$
DECLARE
result bigint;
BEGIN
SELECT count(*) INTO result FROM query_single_shard_table.nullkey_c1_t1 WHERE a = p_key;
RETURN result;
END;
$$;
CREATE FUNCTION ref_query()
RETURNS bigint
LANGUAGE plpgsql AS $$
DECLARE
result bigint;
BEGIN
SELECT count(*) INTO result FROM query_single_shard_table.reference_table;
RETURN result;
END;
$$;
SELECT dist_query_single_shard(count(*)::int) FROM nullkey_c1_t1;
SELECT ref_query()+count(*) FROM nullkey_c1_t1;
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;
DROP SCHEMA query_single_shard_table CASCADE; DROP SCHEMA query_single_shard_table CASCADE;