From 24feadc23013016450a731614c15a970b777b39d Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 9 Jun 2020 14:58:51 +0200 Subject: [PATCH] Handle joins between local/reference/cte via router planner --- .../distributed/executor/citus_custom_scan.c | 5 +- .../distributed/planner/deparse_shard_query.c | 6 + .../distributed/planner/distributed_planner.c | 168 +----------------- .../planner/multi_router_planner.c | 86 +++++++-- .../distributed/multi_router_planner.h | 3 +- .../expected/coordinator_shouldhaveshards.out | 57 +++++- .../regress/expected/local_shard_copy.out | 1 + .../locally_execute_intermediate_results.out | 50 +++++- .../regress/expected/multi_modifications.out | 3 +- .../expected/multi_mx_add_coordinator.out | 8 +- ...licate_reference_tables_to_coordinator.out | 141 +++++++++++---- .../set_operation_and_local_tables.out | 19 +- src/test/regress/expected/with_executors.out | 46 ++++- .../sql/coordinator_shouldhaveshards.sql | 18 ++ .../locally_execute_intermediate_results.sql | 26 ++- src/test/regress/sql/multi_modifications.sql | 2 +- .../regress/sql/multi_mx_add_coordinator.sql | 2 +- ...licate_reference_tables_to_coordinator.sql | 9 +- .../sql/set_operation_and_local_tables.sql | 4 +- src/test/regress/sql/with_executors.sql | 30 +++- 20 files changed, 428 insertions(+), 256 deletions(-) diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index da0323550..ba4c4836a 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -541,8 +541,11 @@ RegenerateTaskForFasthPathQuery(Job *workerJob) UpdateRelationToShardNames((Node *) workerJob->jobQuery, relationShardList); + /* fast path queries cannot have local tables */ + bool hasLocalRelation = false; + List *placementList = - FindRouterWorkerList(shardIntervalList, shardsPresent, true); + FindRouterWorkerList(shardIntervalList, shardsPresent, true, hasLocalRelation); uint64 shardId = INVALID_SHARD_ID; if (shardsPresent) diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 137c506ec..eb1dc8d75 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -236,6 +236,12 @@ UpdateRelationToShardNames(Node *node, List *relationShardList) return false; } + if (!IsCitusTable(newRte->relid)) + { + /* leave local tables as is */ + return false; + } + /* * Search for the restrictions associated with the RTE. There better be * some, otherwise this query wouldn't be eligible as a router query. diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 2b64773d3..f1a67759a 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -119,9 +119,6 @@ static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void); static void PopPlannerRestrictionContext(void); static void ResetPlannerRestrictionContext( PlannerRestrictionContext *plannerRestrictionContext); -static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList); -static bool QueryIsNotSimpleSelect(Node *node); -static void UpdateReferenceTablesWithShard(List *rangeTableList); static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, Node *distributionKeyValue); static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, @@ -147,24 +144,10 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) } else if (CitusHasBeenLoaded()) { - if (IsLocalReferenceTableJoin(parse, rangeTableList)) + needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList); + if (needsDistributedPlanning) { - /* - * For joins between reference tables and local tables, we replace - * reference table names with shard tables names in the query, so - * we can use the standard_planner for planning it locally. - */ - UpdateReferenceTablesWithShard(rangeTableList); - - needsDistributedPlanning = false; - } - else - { - needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList); - if (needsDistributedPlanning) - { - fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue); - } + fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue); } } @@ -2309,148 +2292,3 @@ HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams) boundParams); } } - - -/* - * IsLocalReferenceTableJoin returns if the given query is a join between - * reference tables and local tables. - */ -static bool -IsLocalReferenceTableJoin(Query *parse, List *rangeTableList) -{ - bool hasReferenceTable = false; - bool hasLocalTable = false; - ListCell *rangeTableCell = false; - - bool hasReferenceTableReplica = false; - - /* - * We only allow join between reference tables and local tables in the - * coordinator. - */ - if (!IsCoordinator()) - { - return false; - } - - /* - * All groups that have pg_dist_node entries, also have reference - * table replicas. - */ - PrimaryNodeForGroup(COORDINATOR_GROUP_ID, &hasReferenceTableReplica); - - /* - * If reference table doesn't have replicas on the coordinator, we don't - * allow joins with local tables. - */ - if (!hasReferenceTableReplica) - { - return false; - } - - if (FindNodeCheck((Node *) parse, QueryIsNotSimpleSelect)) - { - return false; - } - - foreach(rangeTableCell, rangeTableList) - { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - - /* - * Don't plan joins involving functions locally since we are not sure if - * they do distributed accesses or not, and defaulting to local planning - * might break transactional semantics. - * - * For example, access to the reference table in the function might go - * over a connection, but access to the same reference table outside - * the function will go over the current backend. The snapshot for the - * connection in the function is taken after the statement snapshot, - * so they can see two different views of data. - * - * Looking at gram.y, RTE_TABLEFUNC is used only for XMLTABLE() which - * is okay to be planned locally, so allowing that. - */ - if (rangeTableEntry->rtekind == RTE_FUNCTION) - { - return false; - } - - if (rangeTableEntry->rtekind != RTE_RELATION) - { - continue; - } - - /* - * We only allow local join for the relation kinds for which we can - * determine deterministically that access to them are local or distributed. - * For this reason, we don't allow non-materialized views. - */ - if (rangeTableEntry->relkind == RELKIND_VIEW) - { - return false; - } - - if (!IsCitusTable(rangeTableEntry->relid)) - { - hasLocalTable = true; - continue; - } - - CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry( - rangeTableEntry->relid); - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) - { - hasReferenceTable = true; - } - else - { - return false; - } - } - - return hasLocalTable && hasReferenceTable; -} - - -/* - * QueryIsNotSimpleSelect returns true if node is a query which modifies or - * marks for modifications. - */ -static bool -QueryIsNotSimpleSelect(Node *node) -{ - if (!IsA(node, Query)) - { - return false; - } - - Query *query = (Query *) node; - return (query->commandType != CMD_SELECT) || (query->rowMarks != NIL); -} - - -/* - * UpdateReferenceTablesWithShard recursively replaces the reference table names - * in the given range table list with the local shard table names. - */ -static void -UpdateReferenceTablesWithShard(List *rangeTableList) -{ - List *referenceTableRTEList = ExtractReferenceTableRTEList(rangeTableList); - - RangeTblEntry *rangeTableEntry = NULL; - foreach_ptr(rangeTableEntry, referenceTableRTEList) - { - Oid referenceTableLocalShardOid = GetReferenceTableLocalShardOid( - rangeTableEntry->relid); - - rangeTableEntry->relid = referenceTableLocalShardOid; - - /* - * Parser locks relations in addRangeTableEntry(). So we should lock the - * modified ones too. - */ - LockRelationOid(referenceTableLocalShardOid, AccessShareLock); - } -} diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 170dc526d..c1ef5df8e 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -45,6 +45,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/query_pushdown_planning.h" #include "distributed/query_utils.h" +#include "distributed/reference_table_utils.h" #include "distributed/relation_restriction_equivalence.h" #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" @@ -154,7 +155,7 @@ static List * ExtractInsertValuesList(Query *query, Var *partitionColumn); static DeferredErrorMessage * MultiRouterPlannableQuery(Query *query); static DeferredErrorMessage * ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree); static bool SelectsFromDistributedTable(List *rangeTableList, Query *query); -static ShardPlacement * CreateDummyPlacement(void); +static ShardPlacement * CreateDummyPlacement(bool hasLocalRelation); static List * get_all_actual_clauses(List *restrictinfo_list); static int CompareInsertValuesByShardId(const void *leftElement, const void *rightElement); @@ -2021,6 +2022,8 @@ PlanRouterQuery(Query *originalQuery, bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery, Const **partitionValueConst) { + RelationRestrictionContext *relationRestrictionContext = + plannerRestrictionContext->relationRestrictionContext; bool isMultiShardQuery = false; DeferredErrorMessage *planningError = NULL; bool shardsPresent = false; @@ -2133,9 +2136,11 @@ PlanRouterQuery(Query *originalQuery, /* we need anchor shard id for select queries with router planner */ uint64 shardId = GetAnchorShardId(*prunedShardIntervalListList); + bool hasLocalRelation = relationRestrictionContext->hasLocalRelation; + List *workerList = FindRouterWorkerList(*prunedShardIntervalListList, shardsPresent, - replacePrunedQueryWithDummy); + replacePrunedQueryWithDummy, hasLocalRelation); if (workerList == NIL) { @@ -2166,9 +2171,9 @@ PlanRouterQuery(Query *originalQuery, List * FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, - bool replacePrunedQueryWithDummy) + bool replacePrunedQueryWithDummy, bool hasLocalRelation) { - List *workerList = NIL; + List *placementList = NIL; /* * Determine the worker that has all shard placements if a shard placement found. @@ -2178,18 +2183,38 @@ FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, */ if (shardsPresent) { - workerList = WorkersContainingAllShards(shardIntervalList); + List *workerList = WorkersContainingAllShards(shardIntervalList); + + if (hasLocalRelation) + { + ShardPlacement *taskPlacement = NULL; + + /* + * If there is a local table, we only allow the local placement to + * be used. If there is none, we disallow the query. + */ + foreach_ptr(taskPlacement, workerList) + { + /* include only the local placement */ + if (taskPlacement->groupId == GetLocalGroupId()) + { + placementList = lappend(placementList, taskPlacement); + } + } + } + else + { + placementList = workerList; + } } else if (replacePrunedQueryWithDummy) { - ShardPlacement *dummyPlacement = CreateDummyPlacement(); - if (dummyPlacement != NULL) - { - workerList = lappend(workerList, dummyPlacement); - } + ShardPlacement *dummyPlacement = CreateDummyPlacement(hasLocalRelation); + + placementList = list_make1(dummyPlacement); } - return workerList; + return placementList; } @@ -2201,14 +2226,17 @@ FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, * * If round robin policy is set, the placement could be on any node in pg_dist_node. * Else, the local node is set for the placement. + * + * Queries can also involve local tables. In that case we always use the local + * node. */ static ShardPlacement * -CreateDummyPlacement(void) +CreateDummyPlacement(bool hasLocalRelation) { static uint32 zeroShardQueryRoundRobin = 0; ShardPlacement *dummyPlacement = CitusMakeNode(ShardPlacement); - if (TaskAssignmentPolicy == TASK_ASSIGNMENT_ROUND_ROBIN) + if (TaskAssignmentPolicy == TASK_ASSIGNMENT_ROUND_ROBIN && !hasLocalRelation) { List *workerNodeList = ActiveReadableWorkerNodeList(); if (workerNodeList == NIL) @@ -2441,6 +2469,13 @@ TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionConte RelationRestriction *relationRestriction = (RelationRestriction *) lfirst(restrictionCell); Oid relationId = relationRestriction->relationId; + + if (!IsCitusTable(relationId)) + { + /* ignore local tables for shard pruning purposes */ + continue; + } + Index tableId = relationRestriction->index; CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); int shardCount = cacheEntry->shardIntervalArrayLength; @@ -3197,22 +3232,22 @@ MultiRouterPlannableQuery(Query *query) NULL, NULL); } + bool hasLocalTable = false; + bool hasDistributedTable = false; + ExtractRangeTableRelationWalker((Node *) query, &rangeTableRelationList); foreach(rangeTableRelationCell, rangeTableRelationList) { RangeTblEntry *rte = (RangeTblEntry *) lfirst(rangeTableRelationCell); if (rte->rtekind == RTE_RELATION) { - /* only hash partitioned tables are supported */ Oid distributedTableId = rte->relid; + /* local tables are allowed if there are no distributed tables */ if (!IsCitusTable(distributedTableId)) { - /* local tables cannot be read from workers */ - return DeferredError( - ERRCODE_FEATURE_NOT_SUPPORTED, - "Local tables cannot be used in distributed queries.", - NULL, NULL); + hasLocalTable = true; + continue; } char partitionMethod = PartitionMethod(distributedTableId); @@ -3225,6 +3260,11 @@ MultiRouterPlannableQuery(Query *query) NULL, NULL); } + if (partitionMethod != DISTRIBUTE_BY_NONE) + { + hasDistributedTable = true; + } + /* * Currently, we don't support tables with replication factor > 1, * except reference tables with SELECT ... FOR UPDATE queries. It is @@ -3246,6 +3286,14 @@ MultiRouterPlannableQuery(Query *query) } } + /* local tables are allowed if there are no distributed tables */ + if (hasLocalTable && hasDistributedTable) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "Local tables cannot be used in distributed queries.", + NULL, NULL); + } + return ErrorIfQueryHasUnroutableModifyingCTE(query); } diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index c534dc174..9dc5d9978 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -46,7 +46,8 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, extern List * RelationShardListForShardIntervalList(List *shardIntervalList, bool *shardsPresent); extern List * FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, - bool replacePrunedQueryWithDummy); + bool replacePrunedQueryWithDummy, + bool hasLocalRelation); extern List * RouterInsertTaskList(Query *query, bool parametersInQueryResolved, DeferredErrorMessage **planningError); extern Const * ExtractInsertPartitionKeyValue(Query *query); diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 191f728c0..37ba87a51 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -231,6 +231,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato (1 row) SELECT * FROM ref JOIN local ON (a = x); +NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) a | b | x | y --------------------------------------------------------------------- (0 rows) @@ -250,6 +251,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato TRUNCATE ref; NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE SELECT * FROM ref JOIN local ON (a = x); +NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) a | b | x | y --------------------------------------------------------------------- (0 rows) @@ -268,6 +270,7 @@ INSERT INTO ref VALUES (1,2); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (1, 2) INSERT INTO local VALUES (1,2); SELECT * FROM ref JOIN local ON (a = x); +NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) a | b | x | y --------------------------------------------------------------------- 1 | 2 | 1 | 2 @@ -290,6 +293,7 @@ WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETU NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b +NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b count | x | y | a | b | count --------------------------------------------------------------------- @@ -299,6 +303,7 @@ NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.co TRUNCATE ref; NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE SELECT * FROM ref JOIN local ON (a = x); +NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) a | b | x | y --------------------------------------------------------------------- (0 rows) @@ -309,6 +314,7 @@ WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETU NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b +NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b count | x | y | a | b | count --------------------------------------------------------------------- @@ -323,6 +329,7 @@ WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETU NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b +NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b count | x | y | a | b | count --------------------------------------------------------------------- @@ -337,6 +344,7 @@ WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETU NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_1503016'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b +NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b count | x | y | a | b | count --------------------------------------------------------------------- @@ -358,6 +366,7 @@ INSERT INTO ref VALUES (1,2); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (1, 2) INSERT INTO local VALUES (1,2); SELECT * FROM ref JOIN local ON (a = x); +NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) a | b | x | y --------------------------------------------------------------------- 1 | 2 | 1 | 2 @@ -369,14 +378,14 @@ WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETU NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b +NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b count | x | y | a | b | count --------------------------------------------------------------------- 100 | 3 | 2 | 3 | 2 | 1 (1 row) --- issue #3801 -SET citus.shard_replication_factor TO 2; +-- joins between local tables and distributed tables are disallowed CREATE TABLE dist_table(a int); ERROR: relation "dist_table" already exists SELECT create_distributed_table('dist_table', 'a'); @@ -389,6 +398,42 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) +INSERT INTO dist_table VALUES(1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.dist_table_1503017 (a) VALUES (1) +SELECT * FROM local JOIN dist_table ON (a = x); +ERROR: relation local is not distributed +SELECT * FROM local JOIN dist_table ON (a = x) WHERE a = 1;; +ERROR: relation local is not distributed +-- intermediate results are allowed +WITH cte_1 AS (SELECT * FROM dist_table LIMIT 1) +SELECT * FROM ref JOIN local ON (a = x) JOIN cte_1 ON (local.x = cte_1.a); +NOTICE: executing the command locally: SELECT a FROM coordinator_shouldhaveshards.dist_table_1503017 dist_table WHERE true LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT a FROM coordinator_shouldhaveshards.dist_table_1503020 dist_table WHERE true LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y, cte_1.a FROM ((coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) cte_1 ON ((local.x OPERATOR(pg_catalog.=) cte_1.a))) + a | b | x | y | a +--------------------------------------------------------------------- + 1 | 2 | 1 | 2 | 1 +(1 row) + +-- full router query with CTE and local +WITH cte_1 AS (SELECT * FROM ref LIMIT 1) +SELECT * FROM ref JOIN local ON (a = x) JOIN cte_1 ON (local.x = cte_1.a); +NOTICE: executing the command locally: WITH cte_1 AS (SELECT ref_1.a, ref_1.b FROM coordinator_shouldhaveshards.ref_1503016 ref_1 LIMIT 1) SELECT ref.a, ref.b, local.x, local.y, cte_1.a, cte_1.b FROM ((coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) JOIN cte_1 ON ((local.x OPERATOR(pg_catalog.=) cte_1.a))) + a | b | x | y | a | b +--------------------------------------------------------------------- + 1 | 2 | 1 | 2 | 1 | 2 +(1 row) + +DROP TABLE dist_table; +-- issue #3801 +SET citus.shard_replication_factor TO 2; +CREATE TABLE dist_table(a int); +SELECT create_distributed_table('dist_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + BEGIN; -- this will use perPlacementQueryStrings, make sure it works correctly with -- copying task @@ -400,10 +445,10 @@ CREATE TABLE dist_table1(a int); -- this will use queryStringList, make sure it works correctly with -- copying task SELECT create_distributed_table('dist_table1', 'a'); -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503023, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503023, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503025, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503025, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503026, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503026, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503028, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503028, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503029, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503029, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503031, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503031, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503032, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503032, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503034, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503034, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') create_distributed_table --------------------------------------------------------------------- diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index 3d201d27e..92c3f8ee8 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -456,6 +456,7 @@ TRUNCATE TABLE reference_table; NOTICE: executing the command locally: TRUNCATE TABLE local_shard_copy.reference_table_xxxxx CASCADE TRUNCATE TABLE local_table; SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.reference_table_1570000 reference_table, local_shard_copy.local_table WHERE (reference_table.key OPERATOR(pg_catalog.=) local_table.key) count --------------------------------------------------------------------- 0 diff --git a/src/test/regress/expected/locally_execute_intermediate_results.out b/src/test/regress/expected/locally_execute_intermediate_results.out index b97baf836..406f203da 100644 --- a/src/test/regress/expected/locally_execute_intermediate_results.out +++ b/src/test/regress/expected/locally_execute_intermediate_results.out @@ -2,13 +2,11 @@ CREATE SCHEMA locally_execute_intermediate_results; SET search_path TO locally_execute_intermediate_results; SET citus.log_intermediate_results TO TRUE; SET citus.log_local_commands TO TRUE; -SET client_min_messages TO DEBUG1; SET citus.shard_count TO 4; SET citus.next_shard_id TO 1580000; SET citus.shard_replication_factor TO 1; SET citus.replication_model TO 'streaming'; CREATE TABLE table_1 (key int, value text); -DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially SELECT create_distributed_table('table_1', 'key'); create_distributed_table --------------------------------------------------------------------- @@ -16,7 +14,6 @@ SELECT create_distributed_table('table_1', 'key'); (1 row) CREATE TABLE table_2 (key int, value text); -DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially SELECT create_distributed_table('table_2', 'key'); create_distributed_table --------------------------------------------------------------------- @@ -24,22 +21,24 @@ SELECT create_distributed_table('table_2', 'key'); (1 row) CREATE TABLE ref_table (key int, value text); -DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially SELECT create_reference_table('ref_table'); create_reference_table --------------------------------------------------------------------- (1 row) +CREATE TABLE local_table (key int, value text); -- load some data INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); NOTICE: executing the command locally: INSERT INTO locally_execute_intermediate_results.ref_table_1580008 AS citus_table_alias (key, value) VALUES (1,'1'::text), (2,'2'::text), (3,'3'::text), (4,'4'::text), (5,'5'::text), (6,'6'::text) +INSERT INTO local_table VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); -- prevent PG 11 - PG 12 outputs to diverge -- and have a lot more CTEs recursively planned for the -- sake of increasing the test coverage SET citus.enable_cte_inlining TO false; +SET client_min_messages TO DEBUG1; -- the query cannot be executed locally, but still because of -- HAVING the intermediate result is written to local file as well WITH cte_1 AS (SELECT max(value) FROM table_1) @@ -462,6 +461,49 @@ NOTICE: executing the command locally: SELECT foo.key, bar.key FROM (SELECT int --------------------------------------------------------------------- (0 rows) +-- queries in which the last step has only CTEs can use local tables +WITH cte_1 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +local_table +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.local_table GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) +DEBUG: Subplan XXX_1 will be written to local file +NOTICE: executing the command locally: SELECT count(*) AS count FROM locally_execute_intermediate_results.local_table GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +local_table +WHERE +key > (SELECT key FROM cte_2 ORDER BY 1 LIMIT 1) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: generating subplan XXX_3 for subquery SELECT key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 ORDER BY key LIMIT 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.local_table WHERE (key OPERATOR(pg_catalog.>) (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer))) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be written to local file +NOTICE: executing the command locally: SELECT key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 ORDER BY key LIMIT 1 +NOTICE: executing the command locally: SELECT count(*) AS count FROM locally_execute_intermediate_results.local_table WHERE (key OPERATOR(pg_catalog.>) (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer))) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + \c - - - :worker_1_port -- now use the same queries on a worker SET search_path TO locally_execute_intermediate_results; diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 92b3de6f0..e0b386084 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -1298,10 +1298,9 @@ DELETE FROM summary_table WHERE ( SELECT 1 FROM pg_catalog.pg_statio_sys_sequences ) = null; DELETE FROM summary_table WHERE ( - SELECT (select action_statement from information_schema.triggers) + SELECT (select min(action_statement) from information_schema.triggers) FROM pg_catalog.pg_statio_sys_sequences ) = null; -ERROR: relation pg_namespace is not distributed DELETE FROM summary_table WHERE id < ( SELECT 0 FROM pg_dist_node ); diff --git a/src/test/regress/expected/multi_mx_add_coordinator.out b/src/test/regress/expected/multi_mx_add_coordinator.out index bdf42af57..217018a89 100644 --- a/src/test/regress/expected/multi_mx_add_coordinator.out +++ b/src/test/regress/expected/multi_mx_add_coordinator.out @@ -137,11 +137,15 @@ SET search_path TO mx_add_coordinator,public; INSERT INTO ref VALUES (1), (2), (3); UPDATE ref SET a = a + 1; DELETE FROM ref WHERE a > 3; --- Test we don't allow reference/local joins on mx workers +-- Test we allow reference/local joins on mx workers CREATE TABLE local_table (a int); INSERT INTO local_table VALUES (2), (4); SELECT r.a FROM ref r JOIN local_table lt on r.a = lt.a; -ERROR: relation local_table is not distributed + a +--------------------------------------------------------------------- + 2 +(1 row) + \c - - - :master_port SET search_path TO mx_add_coordinator,public; SELECT * FROM ref ORDER BY a; diff --git a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out index f8596eb52..5c792fddc 100644 --- a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out @@ -99,20 +99,26 @@ SELECT citus_table_is_visible('numbers_8000001'::regclass::oid); -- Join between reference tables and local tables CREATE TABLE local_table(a int); INSERT INTO local_table VALUES (2), (4), (7), (20); -EXPLAIN SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers; - QUERY PLAN +EXPLAIN (COSTS OFF) SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers; + QUERY PLAN --------------------------------------------------------------------- - Merge Join (cost=359.57..860.00 rows=32512 width=8) - Merge Cond: (local_table.a = numbers_8000001.a) - -> Sort (cost=179.78..186.16 rows=2550 width=4) - Sort Key: local_table.a - -> Seq Scan on local_table (cost=0.00..35.50 rows=2550 width=4) - -> Sort (cost=179.78..186.16 rows=2550 width=4) - Sort Key: numbers_8000001.a - -> Seq Scan on numbers_8000001 (cost=0.00..35.50 rows=2550 width=4) -(8 rows) + Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge Join + Merge Cond: (local_table.a = numbers.a) + -> Sort + Sort Key: local_table.a + -> Seq Scan on local_table + -> Sort + Sort Key: numbers.a + -> Seq Scan on numbers_8000001 numbers +(13 rows) SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; +NOTICE: executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) ORDER BY local_table.a a | a --------------------------------------------------------------------- 20 | 20 @@ -123,6 +129,7 @@ SELECT lt.a, sq.a, sq.b FROM local_table lt JOIN squares sq ON sq.a > lt.a and sq.b > 90 ORDER BY 1,2,3; +NOTICE: executing the command locally: SELECT lt.a, sq.a, sq.b FROM (replicate_ref_to_coordinator.local_table lt JOIN replicate_ref_to_coordinator.squares_8000000 sq ON (((sq.a OPERATOR(pg_catalog.>) lt.a) AND (sq.b OPERATOR(pg_catalog.>) 90)))) ORDER BY lt.a, sq.a, sq.b a | a | b --------------------------------------------------------------------- 2 | 10 | 100 @@ -133,6 +140,7 @@ ORDER BY 1,2,3; -- should work if in transaction block BEGIN; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; +NOTICE: executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) ORDER BY local_table.a a | a --------------------------------------------------------------------- 20 | 20 @@ -145,6 +153,9 @@ BEGIN PERFORM local_table.a, numbers.a FROM local_table NATURAL JOIN numbers; END; $$; +NOTICE: executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) +CONTEXT: SQL statement "SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers" +PL/pgSQL function inline_code_block line 3 at PERFORM -- test plpgsql function CREATE FUNCTION test_reference_local_join_plpgsql_func() RETURNS void AS $$ @@ -160,6 +171,9 @@ SELECT test_reference_local_join_plpgsql_func(); NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 (a) VALUES (4) CONTEXT: SQL statement "INSERT INTO numbers VALUES (4)" PL/pgSQL function test_reference_local_join_plpgsql_func() line 4 at SQL statement +NOTICE: executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) ORDER BY local_table.a +CONTEXT: SQL statement "SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1" +PL/pgSQL function test_reference_local_join_plpgsql_func() line 5 at PERFORM ERROR: CONTEXT: PL/pgSQL function test_reference_local_join_plpgsql_func() line 6 at RAISE SELECT sum(a) FROM local_table; @@ -180,6 +194,8 @@ CREATE PROCEDURE test_reference_local_join_proc() AS $$ SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; $$ LANGUAGE sql; CALL test_reference_local_join_proc(); +NOTICE: executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) ORDER BY local_table.a +CONTEXT: SQL function "test_reference_local_join_proc" statement 1 CREATE SCHEMA s1; CREATE TABLE s1.ref(a int); SELECT create_reference_table('s1.ref'); @@ -190,6 +206,7 @@ SELECT create_reference_table('s1.ref'); BEGIN; SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1; +NOTICE: executing the command locally: SELECT local_table.a, r.a FROM (replicate_ref_to_coordinator.local_table JOIN s1.ref_8000002 r(a) USING (a)) ORDER BY local_table.a a | a --------------------------------------------------------------------- (0 rows) @@ -199,6 +216,7 @@ BEGIN; WITH t1 AS ( SELECT my_volatile_fn() r, a FROM local_table ) SELECT count(*) FROM t1, numbers WHERE t1.a = numbers.a AND r < 0.5; +NOTICE: executing the command locally: WITH t1 AS (SELECT replicate_ref_to_coordinator.my_volatile_fn() AS r, local_table.a FROM replicate_ref_to_coordinator.local_table) SELECT count(*) AS count FROM t1, replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((t1.a OPERATOR(pg_catalog.=) numbers.a) AND ((t1.r)::numeric OPERATOR(pg_catalog.<) 0.5)) count --------------------------------------------------------------------- 0 @@ -209,6 +227,7 @@ BEGIN; WITH t1 AS ( SELECT my_volatile_fn() r, a FROM numbers ) SELECT count(*) FROM t1, local_table WHERE t1.a = local_table.a AND r < 0.5; +NOTICE: executing the command locally: WITH t1 AS (SELECT replicate_ref_to_coordinator.my_volatile_fn() AS r, numbers.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers) SELECT count(*) AS count FROM t1, replicate_ref_to_coordinator.local_table WHERE ((t1.a OPERATOR(pg_catalog.=) local_table.a) AND ((t1.r)::numeric OPERATOR(pg_catalog.<) 0.5)) count --------------------------------------------------------------------- 0 @@ -218,6 +237,7 @@ END; BEGIN; SELECT count(*) FROM local_table WHERE EXISTS(SELECT my_volatile_fn() FROM numbers WHERE local_table.a = numbers.a); +NOTICE: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.local_table WHERE (EXISTS (SELECT replicate_ref_to_coordinator.my_volatile_fn() AS my_volatile_fn FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE (local_table.a OPERATOR(pg_catalog.=) numbers.a))) count --------------------------------------------------------------------- 1 @@ -227,6 +247,7 @@ END; BEGIN; SELECT count(*) FROM numbers WHERE EXISTS(SELECT my_volatile_fn() FROM local_table WHERE local_table.a = numbers.a); +NOTICE: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE (EXISTS (SELECT replicate_ref_to_coordinator.my_volatile_fn() AS my_volatile_fn FROM replicate_ref_to_coordinator.local_table WHERE (local_table.a OPERATOR(pg_catalog.=) numbers.a))) count --------------------------------------------------------------------- 1 @@ -247,24 +268,54 @@ $$ LANGUAGE sql; SELECT test_reference_local_join_func(); test_reference_local_join_func --------------------------------------------------------------------- + (2,2) (20,20) -(1 row) +(2 rows) --- shouldn't plan locally if modifications happen in CTEs, ... +-- CTEs are allowed WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) SELECT * FROM numbers, local_table; -ERROR: relation local_table is not distributed +NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 (a) VALUES (1) RETURNING a +NOTICE: executing the command locally: SELECT numbers.a, local_table.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers, replicate_ref_to_coordinator.local_table + a | a +--------------------------------------------------------------------- + 20 | 2 + 20 | 4 + 20 | 7 + 20 | 20 + 21 | 2 + 21 | 4 + 21 | 7 + 21 | 20 + 2 | 2 + 2 | 4 + 2 | 7 + 2 | 20 +(12 rows) + WITH t AS (SELECT *, my_volatile_fn() x FROM numbers FOR UPDATE) SELECT * FROM numbers, local_table WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); -ERROR: relation local_table is not distributed --- but this should be fine +NOTICE: executing the command locally: WITH t AS (SELECT numbers_1.a, replicate_ref_to_coordinator.my_volatile_fn() AS x FROM replicate_ref_to_coordinator.numbers_8000001 numbers_1 FOR UPDATE OF numbers_1) SELECT numbers.a, local_table.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers, replicate_ref_to_coordinator.local_table WHERE (EXISTS (SELECT t.a, t.x FROM t WHERE (t.x OPERATOR(pg_catalog.=) numbers.a))) + a | a +--------------------------------------------------------------------- + 1 | 2 + 1 | 4 + 1 | 7 + 1 | 20 +(4 rows) + WITH t AS (SELECT *, my_volatile_fn() x FROM numbers) SELECT * FROM numbers, local_table WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); +NOTICE: executing the command locally: WITH t AS (SELECT numbers_1.a, replicate_ref_to_coordinator.my_volatile_fn() AS x FROM replicate_ref_to_coordinator.numbers_8000001 numbers_1) SELECT numbers.a, local_table.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers, replicate_ref_to_coordinator.local_table WHERE (EXISTS (SELECT t.a, t.x FROM t WHERE (t.x OPERATOR(pg_catalog.=) numbers.a))) a | a --------------------------------------------------------------------- -(0 rows) + 1 | 2 + 1 | 4 + 1 | 7 + 1 | 20 +(4 rows) -- shouldn't plan locally even if distributed table is in CTE or subquery CREATE TABLE dist(a int); @@ -278,10 +329,13 @@ INSERT INTO dist VALUES (20),(30); WITH t AS (SELECT *, my_volatile_fn() x FROM dist) SELECT * FROM numbers, local_table WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); -ERROR: relation local_table is not distributed +ERROR: function replicate_ref_to_coordinator.my_volatile_fn() does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +CONTEXT: while executing command on localhost:xxxxx -- test CTE being reference/local join for distributed query WITH t as (SELECT n.a, my_volatile_fn() x FROM numbers n NATURAL JOIN local_table l) SELECT a FROM t NATURAL JOIN dist; +NOTICE: executing the command locally: SELECT n.a, replicate_ref_to_coordinator.my_volatile_fn() AS x FROM (replicate_ref_to_coordinator.numbers_8000001 n(a) JOIN replicate_ref_to_coordinator.local_table l USING (a)) a --------------------------------------------------------------------- 20 @@ -289,11 +343,21 @@ SELECT a FROM t NATURAL JOIN dist; -- shouldn't error if FOR UPDATE/FOR SHARE SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE; -ERROR: could not run distributed query with FOR UPDATE/SHARE commands -HINT: Consider using an equality filter on the distributed table's partition column. +NOTICE: executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) FOR SHARE OF local_table FOR SHARE OF numbers + a | a +--------------------------------------------------------------------- + 2 | 2 + 20 | 20 +(2 rows) + SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE; -ERROR: could not run distributed query with FOR UPDATE/SHARE commands -HINT: Consider using an equality filter on the distributed table's partition column. +NOTICE: executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) FOR UPDATE OF local_table FOR UPDATE OF numbers + a | a +--------------------------------------------------------------------- + 2 | 2 + 20 | 20 +(2 rows) + -- -- Joins between reference tables and views shouldn't be planned locally. -- @@ -313,14 +377,11 @@ SELECT public.coordinator_plan($Q$ EXPLAIN (COSTS FALSE) SELECT * FROM squares JOIN local_table_v ON squares.a = local_table_v.a; $Q$); - coordinator_plan + coordinator_plan --------------------------------------------------------------------- Custom Scan (Citus Adaptive) - -> Distributed Subplan XXX_1 - -> Seq Scan on local_table - Filter: ((a >= 1) AND (a <= 10)) Task Count: 1 -(5 rows) +(2 rows) DROP VIEW numbers_v, local_table_v; -- @@ -330,31 +391,39 @@ DROP VIEW numbers_v, local_table_v; CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10; NOTICE: executing the command locally: SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((a OPERATOR(pg_catalog.>=) 1) AND (a OPERATOR(pg_catalog.<=) 10)) REFRESH MATERIALIZED VIEW numbers_v; +NOTICE: executing the command locally: SELECT numbers.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((numbers.a OPERATOR(pg_catalog.>=) 1) AND (numbers.a OPERATOR(pg_catalog.<=) 10)) SELECT public.plan_is_distributed($Q$ EXPLAIN (COSTS FALSE) SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; $Q$); plan_is_distributed --------------------------------------------------------------------- - f + t (1 row) BEGIN; SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; +NOTICE: executing the command locally: SELECT squares.a, squares.b, numbers_v.a FROM (replicate_ref_to_coordinator.squares_8000000 squares JOIN replicate_ref_to_coordinator.numbers_v ON ((squares.a OPERATOR(pg_catalog.=) numbers_v.a))) a | b | a --------------------------------------------------------------------- + 1 | 1 | 1 2 | 4 | 2 -(1 row) +(2 rows) END; -- --- Joins between reference tables, local tables, and function calls shouldn't --- be planned locally. +-- Joins between reference tables, local tables, and function calls +-- are allowed -- SELECT count(*) FROM local_table a, numbers b, generate_series(1, 10) c WHERE a.a = b.a AND a.a = c; -ERROR: relation local_table is not distributed +NOTICE: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.local_table a, replicate_ref_to_coordinator.numbers_8000001 b, generate_series(1, 10) c(c) WHERE ((a.a OPERATOR(pg_catalog.=) b.a) AND (a.a OPERATOR(pg_catalog.=) c.c)) + count +--------------------------------------------------------------------- + 1 +(1 row) + -- but it should be okay if the function call is not a data source SELECT public.plan_is_distributed($Q$ EXPLAIN (COSTS FALSE) @@ -362,7 +431,7 @@ SELECT abs(a.a) FROM local_table a, numbers b WHERE a.a = b.a; $Q$); plan_is_distributed --------------------------------------------------------------------- - f + t (1 row) SELECT public.plan_is_distributed($Q$ @@ -371,7 +440,7 @@ SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a); $Q$); plan_is_distributed --------------------------------------------------------------------- - f + t (1 row) TRUNCATE local_table; @@ -384,6 +453,7 @@ NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator ALTER TABLE numbers ADD COLUMN d int; NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (8000001, 'replicate_ref_to_coordinator', 'ALTER TABLE numbers ADD COLUMN d int;') SELECT * FROM local_table JOIN numbers USING(a) ORDER BY a; +NOTICE: executing the command locally: SELECT local_table.a, numbers.d FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a, d) USING (a)) ORDER BY local_table.a a | d --------------------------------------------------------------------- 1 | @@ -397,6 +467,7 @@ BEGIN; INSERT INTO local_table VALUES (1), (2), (3); WITH t as (SELECT n.a, my_volatile_fn() x FROM numbers n NATURAL JOIN local_table l ORDER BY n.a, x) SELECT a FROM t NATURAL JOIN dist ORDER BY a; +NOTICE: executing the command locally: SELECT n.a, replicate_ref_to_coordinator.my_volatile_fn() AS x FROM (replicate_ref_to_coordinator.numbers_8000001 n(a) JOIN replicate_ref_to_coordinator.local_table l USING (a)) ORDER BY n.a, (replicate_ref_to_coordinator.my_volatile_fn()) a --------------------------------------------------------------------- (0 rows) @@ -409,6 +480,7 @@ NOTICE: executing the copy locally for shard xxxxx INSERT INTO numbers SELECT * FROM numbers; NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 AS citus_table_alias (a) SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers SELECT COUNT(*) FROM local_table JOIN numbers using (a); +NOTICE: executing the command locally: SELECT count(*) AS count FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) count --------------------------------------------------------------------- 6 @@ -417,6 +489,7 @@ SELECT COUNT(*) FROM local_table JOIN numbers using (a); UPDATE numbers SET a = a + 1; NOTICE: executing the command locally: UPDATE replicate_ref_to_coordinator.numbers_8000001 numbers SET a = (a OPERATOR(pg_catalog.+) 1) SELECT COUNT(*) FROM local_table JOIN numbers using (a); +NOTICE: executing the command locally: SELECT count(*) AS count FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) count --------------------------------------------------------------------- 4 diff --git a/src/test/regress/expected/set_operation_and_local_tables.out b/src/test/regress/expected/set_operation_and_local_tables.out index c0b9c27c9..e8242b032 100644 --- a/src/test/regress/expected/set_operation_and_local_tables.out +++ b/src/test/regress/expected/set_operation_and_local_tables.out @@ -218,18 +218,20 @@ DEBUG: Router planner cannot handle multi-shard select queries (2 rows) -- same query with subquery in where is wrapped in CTE +SET citus.enable_cte_inlining TO off; SELECT * FROM test a WHERE x IN (WITH cte AS (SELECT x FROM test b UNION SELECT y FROM test c UNION SELECT y FROM local_test d) SELECT * FROM cte) ORDER BY 1,2; -DEBUG: CTE cte is going to be inlined via distributed planning +DEBUG: Local tables cannot be used in distributed queries. +DEBUG: generating subplan XXX_1 for CTE cte: SELECT b.x FROM recursive_set_local.test b UNION SELECT c.y FROM recursive_set_local.test c UNION SELECT d.y FROM recursive_set_local.local_test d DEBUG: Local tables cannot be used in distributed queries. DEBUG: generating subplan XXX_1 for subquery SELECT y FROM recursive_set_local.local_test d DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_2 for subquery SELECT x FROM recursive_set_local.test b DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_3 for subquery SELECT y FROM recursive_set_local.test c +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.x FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer) UNION SELECT intermediate_result.y FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(y integer) UNION SELECT intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(y integer) DEBUG: Creating router plan DEBUG: Plan is router executable -DEBUG: generating subplan XXX_4 for subquery SELECT intermediate_result.x FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer) UNION SELECT intermediate_result.y FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(y integer) UNION SELECT intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(y integer) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT x, y FROM recursive_set_local.test a WHERE (x OPERATOR(pg_catalog.=) ANY (SELECT cte.x FROM (SELECT intermediate_result.x FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(x integer)) cte)) ORDER BY x, y +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT x, y FROM recursive_set_local.test a WHERE (x OPERATOR(pg_catalog.=) ANY (SELECT cte.x FROM (SELECT intermediate_result.x FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer)) cte)) ORDER BY x, y DEBUG: Router planner cannot handle multi-shard select queries x | y --------------------------------------------------------------------- @@ -237,7 +239,8 @@ DEBUG: Router planner cannot handle multi-shard select queries 2 | 2 (2 rows) --- not supported since local table is joined with a set operation +RESET citus.enable_cte_inlining; +-- supported since final step only has local table and intermediate result SELECT * FROM ((SELECT * FROM test) EXCEPT (SELECT * FROM test ORDER BY x LIMIT 1)) u JOIN local_test USING (x) ORDER BY 1,2; DEBUG: Local tables cannot be used in distributed queries. DEBUG: Router planner cannot handle multi-shard select queries @@ -249,8 +252,12 @@ DEBUG: Creating router plan DEBUG: Plan is router executable DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) EXCEPT SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT u.x, u.y, local_test.y FROM ((SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) u JOIN recursive_set_local.local_test USING (x)) ORDER BY u.x, u.y -DEBUG: Local tables cannot be used in distributed queries. -ERROR: relation local_test is not distributed +DEBUG: Creating router plan +DEBUG: Plan is router executable + x | y | y +--------------------------------------------------------------------- +(0 rows) + -- though we replace some queries including the local query, the intermediate result is on the outer part of an outer join SELECT * FROM ((SELECT * FROM local_test) INTERSECT (SELECT * FROM test ORDER BY x LIMIT 1)) u LEFT JOIN test USING (x) ORDER BY 1,2; DEBUG: Local tables cannot be used in distributed queries. diff --git a/src/test/regress/expected/with_executors.out b/src/test/regress/expected/with_executors.out index 5c1fb8317..b7ac8f0fd 100644 --- a/src/test/regress/expected/with_executors.out +++ b/src/test/regress/expected/with_executors.out @@ -4,6 +4,14 @@ SET search_path TO with_executors, public; SET citus.enable_repartition_joins TO on; CREATE TABLE with_executors.local_table (id int); INSERT INTO local_table VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9), (10); +CREATE TABLE ref_table (id int); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO ref_table VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9), (10); -- CTEs should be able to use local queries WITH cte AS ( WITH local_cte AS ( @@ -284,11 +292,39 @@ SELECT DISTINCT uid_1, val_3 FROM cte join events_table on cte.val_3=events_tabl 2 | 1 (1 row) --- CTEs should not be able to terminate (the last SELECT) in a local query +-- CTEs should be able to terminate (the last SELECT) in a local query WITH cte AS ( - SELECT * FROM users_table + SELECT user_id FROM users_table ) -SELECT count(*) FROM cte JOIN local_table ON (user_id = id); +SELECT min(user_id) FROM cte JOIN local_table ON (user_id = id); + min +--------------------------------------------------------------------- + 1 +(1 row) + +-- not if there are no distributed tables +WITH cte AS ( + SELECT user_id FROM users_table +) +SELECT min(user_id) FROM cte JOIN local_table ON (user_id = id) JOIN events_table USING (user_id); +ERROR: relation local_table is not distributed +-- unless the distributed table is part of a recursively planned subquery +WITH cte AS ( + SELECT user_id FROM users_table +) +SELECT min(user_id) FROM cte JOIN local_table ON (user_id = id) JOIN (SELECT * FROM events_table OFFSET 0) e USING (user_id); + min +--------------------------------------------------------------------- + 1 +(1 row) + +-- joins between local and reference tables not allowed +-- since the coordinator is not in the metadata at this stage +WITH cte AS ( + SELECT user_id FROM users_table +) +SELECT count(*) FROM local_table JOIN ref_table USING (id) +WHERE id IN (SELECT * FROM cte); ERROR: relation local_table is not distributed -- CTEs should be able to terminate a router query WITH cte AS ( @@ -382,4 +418,6 @@ WHERE users_table.user_id = cte_merge.u_id; ERROR: Complex subqueries and CTEs are not supported when task_executor_type is set to 'task-tracker' DROP SCHEMA with_executors CASCADE; -NOTICE: drop cascades to table local_table +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table local_table +drop cascades to table ref_table diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 566d54254..df88dd3b7 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -155,6 +155,24 @@ SELECT * FROM ref JOIN local ON (a = x); -- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; +-- joins between local tables and distributed tables are disallowed +CREATE TABLE dist_table(a int); +SELECT create_distributed_table('dist_table', 'a'); +INSERT INTO dist_table VALUES(1); + +SELECT * FROM local JOIN dist_table ON (a = x); +SELECT * FROM local JOIN dist_table ON (a = x) WHERE a = 1;; + +-- intermediate results are allowed +WITH cte_1 AS (SELECT * FROM dist_table LIMIT 1) +SELECT * FROM ref JOIN local ON (a = x) JOIN cte_1 ON (local.x = cte_1.a); + +-- full router query with CTE and local +WITH cte_1 AS (SELECT * FROM ref LIMIT 1) +SELECT * FROM ref JOIN local ON (a = x) JOIN cte_1 ON (local.x = cte_1.a); + +DROP TABLE dist_table; + -- issue #3801 SET citus.shard_replication_factor TO 2; CREATE TABLE dist_table(a int); diff --git a/src/test/regress/sql/locally_execute_intermediate_results.sql b/src/test/regress/sql/locally_execute_intermediate_results.sql index e6c0e1949..136cffc2a 100644 --- a/src/test/regress/sql/locally_execute_intermediate_results.sql +++ b/src/test/regress/sql/locally_execute_intermediate_results.sql @@ -2,7 +2,6 @@ CREATE SCHEMA locally_execute_intermediate_results; SET search_path TO locally_execute_intermediate_results; SET citus.log_intermediate_results TO TRUE; SET citus.log_local_commands TO TRUE; -SET client_min_messages TO DEBUG1; SET citus.shard_count TO 4; SET citus.next_shard_id TO 1580000; @@ -18,16 +17,21 @@ SELECT create_distributed_table('table_2', 'key'); CREATE TABLE ref_table (key int, value text); SELECT create_reference_table('ref_table'); +CREATE TABLE local_table (key int, value text); + -- load some data INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); +INSERT INTO local_table VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); -- prevent PG 11 - PG 12 outputs to diverge -- and have a lot more CTEs recursively planned for the -- sake of increasing the test coverage SET citus.enable_cte_inlining TO false; +SET client_min_messages TO DEBUG1; + -- the query cannot be executed locally, but still because of -- HAVING the intermediate result is written to local file as well WITH cte_1 AS (SELECT max(value) FROM table_1) @@ -233,6 +237,26 @@ SELECT * FROM (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar WHERE foo.key != bar.key; +-- queries in which the last step has only CTEs can use local tables +WITH cte_1 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +local_table +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); + +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +local_table +WHERE +key > (SELECT key FROM cte_2 ORDER BY 1 LIMIT 1) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); + \c - - - :worker_1_port -- now use the same queries on a worker diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 912198f69..3076aeeeb 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -874,7 +874,7 @@ DELETE FROM summary_table WHERE ( SELECT 1 FROM pg_catalog.pg_statio_sys_sequences ) = null; DELETE FROM summary_table WHERE ( - SELECT (select action_statement from information_schema.triggers) + SELECT (select min(action_statement) from information_schema.triggers) FROM pg_catalog.pg_statio_sys_sequences ) = null; diff --git a/src/test/regress/sql/multi_mx_add_coordinator.sql b/src/test/regress/sql/multi_mx_add_coordinator.sql index c9f839ede..4b7821672 100644 --- a/src/test/regress/sql/multi_mx_add_coordinator.sql +++ b/src/test/regress/sql/multi_mx_add_coordinator.sql @@ -63,7 +63,7 @@ INSERT INTO ref VALUES (1), (2), (3); UPDATE ref SET a = a + 1; DELETE FROM ref WHERE a > 3; --- Test we don't allow reference/local joins on mx workers +-- Test we allow reference/local joins on mx workers CREATE TABLE local_table (a int); INSERT INTO local_table VALUES (2), (4); diff --git a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql index 420c24226..e31589926 100644 --- a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql +++ b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql @@ -51,7 +51,7 @@ SELECT citus_table_is_visible('numbers_8000001'::regclass::oid); CREATE TABLE local_table(a int); INSERT INTO local_table VALUES (2), (4), (7), (20); -EXPLAIN SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers; +EXPLAIN (COSTS OFF) SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; -- test non equijoin @@ -135,7 +135,7 @@ $$ LANGUAGE sql; SELECT test_reference_local_join_func(); --- shouldn't plan locally if modifications happen in CTEs, ... +-- CTEs are allowed WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) SELECT * FROM numbers, local_table; @@ -143,7 +143,6 @@ WITH t AS (SELECT *, my_volatile_fn() x FROM numbers FOR UPDATE) SELECT * FROM numbers, local_table WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); --- but this should be fine WITH t AS (SELECT *, my_volatile_fn() x FROM numbers) SELECT * FROM numbers, local_table WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); @@ -200,8 +199,8 @@ SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; END; -- --- Joins between reference tables, local tables, and function calls shouldn't --- be planned locally. +-- Joins between reference tables, local tables, and function calls +-- are allowed -- SELECT count(*) FROM local_table a, numbers b, generate_series(1, 10) c diff --git a/src/test/regress/sql/set_operation_and_local_tables.sql b/src/test/regress/sql/set_operation_and_local_tables.sql index 4c87784f8..b00fd67d2 100644 --- a/src/test/regress/sql/set_operation_and_local_tables.sql +++ b/src/test/regress/sql/set_operation_and_local_tables.sql @@ -74,9 +74,11 @@ FROM SELECT * FROM test a WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c UNION SELECT y FROM local_test d) ORDER BY 1,2; -- same query with subquery in where is wrapped in CTE +SET citus.enable_cte_inlining TO off; SELECT * FROM test a WHERE x IN (WITH cte AS (SELECT x FROM test b UNION SELECT y FROM test c UNION SELECT y FROM local_test d) SELECT * FROM cte) ORDER BY 1,2; +RESET citus.enable_cte_inlining; --- not supported since local table is joined with a set operation +-- supported since final step only has local table and intermediate result SELECT * FROM ((SELECT * FROM test) EXCEPT (SELECT * FROM test ORDER BY x LIMIT 1)) u JOIN local_test USING (x) ORDER BY 1,2; -- though we replace some queries including the local query, the intermediate result is on the outer part of an outer join diff --git a/src/test/regress/sql/with_executors.sql b/src/test/regress/sql/with_executors.sql index 4a183cd96..67e0f4fa1 100644 --- a/src/test/regress/sql/with_executors.sql +++ b/src/test/regress/sql/with_executors.sql @@ -7,6 +7,10 @@ SET citus.enable_repartition_joins TO on; CREATE TABLE with_executors.local_table (id int); INSERT INTO local_table VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9), (10); +CREATE TABLE ref_table (id int); +SELECT create_reference_table('ref_table'); +INSERT INTO ref_table VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9), (10); + -- CTEs should be able to use local queries WITH cte AS ( WITH local_cte AS ( @@ -221,11 +225,31 @@ WITH cte AS ( SELECT DISTINCT uid_1, val_3 FROM cte join events_table on cte.val_3=events_table.event_type ORDER BY 1, 2; --- CTEs should not be able to terminate (the last SELECT) in a local query +-- CTEs should be able to terminate (the last SELECT) in a local query WITH cte AS ( - SELECT * FROM users_table + SELECT user_id FROM users_table ) -SELECT count(*) FROM cte JOIN local_table ON (user_id = id); +SELECT min(user_id) FROM cte JOIN local_table ON (user_id = id); + +-- not if there are no distributed tables +WITH cte AS ( + SELECT user_id FROM users_table +) +SELECT min(user_id) FROM cte JOIN local_table ON (user_id = id) JOIN events_table USING (user_id); + +-- unless the distributed table is part of a recursively planned subquery +WITH cte AS ( + SELECT user_id FROM users_table +) +SELECT min(user_id) FROM cte JOIN local_table ON (user_id = id) JOIN (SELECT * FROM events_table OFFSET 0) e USING (user_id); + +-- joins between local and reference tables not allowed +-- since the coordinator is not in the metadata at this stage +WITH cte AS ( + SELECT user_id FROM users_table +) +SELECT count(*) FROM local_table JOIN ref_table USING (id) +WHERE id IN (SELECT * FROM cte); -- CTEs should be able to terminate a router query WITH cte AS (