Merge pull request #3887 from citusdata/local-router-joins

Implement local table joins in router planner
pull/3839/head
Hadi Moshayedi 2020-06-12 18:45:13 -07:00 committed by GitHub
commit b090dcd530
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 480 additions and 353 deletions

View File

@ -541,8 +541,12 @@ RegenerateTaskForFasthPathQuery(Job *workerJob)
UpdateRelationToShardNames((Node *) workerJob->jobQuery, relationShardList); UpdateRelationToShardNames((Node *) workerJob->jobQuery, relationShardList);
/* fast path queries cannot have local tables */
bool hasLocalRelation = false;
List *placementList = List *placementList =
FindRouterWorkerList(shardIntervalList, shardsPresent, true); CreateTaskPlacementListForShardIntervals(shardIntervalList, shardsPresent, true,
hasLocalRelation);
uint64 shardId = INVALID_SHARD_ID; uint64 shardId = INVALID_SHARD_ID;
if (shardsPresent) if (shardsPresent)

View File

@ -236,6 +236,12 @@ UpdateRelationToShardNames(Node *node, List *relationShardList)
return false; return false;
} }
if (!IsCitusTable(newRte->relid))
{
/* leave local tables as is */
return false;
}
/* /*
* Search for the restrictions associated with the RTE. There better be * Search for the restrictions associated with the RTE. There better be
* some, otherwise this query wouldn't be eligible as a router query. * some, otherwise this query wouldn't be eligible as a router query.

View File

@ -119,9 +119,6 @@ static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void);
static void PopPlannerRestrictionContext(void); static void PopPlannerRestrictionContext(void);
static void ResetPlannerRestrictionContext( static void ResetPlannerRestrictionContext(
PlannerRestrictionContext *plannerRestrictionContext); PlannerRestrictionContext *plannerRestrictionContext);
static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList);
static bool QueryIsNotSimpleSelect(Node *node);
static void UpdateReferenceTablesWithShard(List *rangeTableList);
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
Node *distributionKeyValue); Node *distributionKeyValue);
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
@ -147,24 +144,10 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
} }
else if (CitusHasBeenLoaded()) else if (CitusHasBeenLoaded())
{ {
if (IsLocalReferenceTableJoin(parse, rangeTableList)) needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList);
if (needsDistributedPlanning)
{ {
/* fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue);
* For joins between reference tables and local tables, we replace
* reference table names with shard tables names in the query, so
* we can use the standard_planner for planning it locally.
*/
UpdateReferenceTablesWithShard(rangeTableList);
needsDistributedPlanning = false;
}
else
{
needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList);
if (needsDistributedPlanning)
{
fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue);
}
} }
} }
@ -2309,148 +2292,3 @@ HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams)
boundParams); boundParams);
} }
} }
/*
* IsLocalReferenceTableJoin returns if the given query is a join between
* reference tables and local tables.
*/
static bool
IsLocalReferenceTableJoin(Query *parse, List *rangeTableList)
{
bool hasReferenceTable = false;
bool hasLocalTable = false;
ListCell *rangeTableCell = false;
bool hasReferenceTableReplica = false;
/*
* We only allow join between reference tables and local tables in the
* coordinator.
*/
if (!IsCoordinator())
{
return false;
}
/*
* All groups that have pg_dist_node entries, also have reference
* table replicas.
*/
PrimaryNodeForGroup(COORDINATOR_GROUP_ID, &hasReferenceTableReplica);
/*
* If reference table doesn't have replicas on the coordinator, we don't
* allow joins with local tables.
*/
if (!hasReferenceTableReplica)
{
return false;
}
if (FindNodeCheck((Node *) parse, QueryIsNotSimpleSelect))
{
return false;
}
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
/*
* Don't plan joins involving functions locally since we are not sure if
* they do distributed accesses or not, and defaulting to local planning
* might break transactional semantics.
*
* For example, access to the reference table in the function might go
* over a connection, but access to the same reference table outside
* the function will go over the current backend. The snapshot for the
* connection in the function is taken after the statement snapshot,
* so they can see two different views of data.
*
* Looking at gram.y, RTE_TABLEFUNC is used only for XMLTABLE() which
* is okay to be planned locally, so allowing that.
*/
if (rangeTableEntry->rtekind == RTE_FUNCTION)
{
return false;
}
if (rangeTableEntry->rtekind != RTE_RELATION)
{
continue;
}
/*
* We only allow local join for the relation kinds for which we can
* determine deterministically that access to them are local or distributed.
* For this reason, we don't allow non-materialized views.
*/
if (rangeTableEntry->relkind == RELKIND_VIEW)
{
return false;
}
if (!IsCitusTable(rangeTableEntry->relid))
{
hasLocalTable = true;
continue;
}
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(
rangeTableEntry->relid);
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
hasReferenceTable = true;
}
else
{
return false;
}
}
return hasLocalTable && hasReferenceTable;
}
/*
* QueryIsNotSimpleSelect returns true if node is a query which modifies or
* marks for modifications.
*/
static bool
QueryIsNotSimpleSelect(Node *node)
{
if (!IsA(node, Query))
{
return false;
}
Query *query = (Query *) node;
return (query->commandType != CMD_SELECT) || (query->rowMarks != NIL);
}
/*
* UpdateReferenceTablesWithShard recursively replaces the reference table names
* in the given range table list with the local shard table names.
*/
static void
UpdateReferenceTablesWithShard(List *rangeTableList)
{
List *referenceTableRTEList = ExtractReferenceTableRTEList(rangeTableList);
RangeTblEntry *rangeTableEntry = NULL;
foreach_ptr(rangeTableEntry, referenceTableRTEList)
{
Oid referenceTableLocalShardOid = GetReferenceTableLocalShardOid(
rangeTableEntry->relid);
rangeTableEntry->relid = referenceTableLocalShardOid;
/*
* Parser locks relations in addRangeTableEntry(). So we should lock the
* modified ones too.
*/
LockRelationOid(referenceTableLocalShardOid, AccessShareLock);
}
}

View File

@ -2646,8 +2646,8 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
Assert(anchorShardId != INVALID_SHARD_ID); Assert(anchorShardId != INVALID_SHARD_ID);
List *selectPlacementList = WorkersContainingAllShards(taskShardList); List *taskPlacementList = PlacementsForWorkersContainingAllShards(taskShardList);
if (list_length(selectPlacementList) == 0) if (list_length(taskPlacementList) == 0)
{ {
ereport(ERROR, (errmsg("cannot find a worker that has active placements for all " ereport(ERROR, (errmsg("cannot find a worker that has active placements for all "
"shards in the query"))); "shards in the query")));
@ -2683,7 +2683,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
subqueryTask->dependentTaskList = NULL; subqueryTask->dependentTaskList = NULL;
subqueryTask->anchorShardId = anchorShardId; subqueryTask->anchorShardId = anchorShardId;
subqueryTask->taskPlacementList = selectPlacementList; subqueryTask->taskPlacementList = taskPlacementList;
subqueryTask->relationShardList = relationShardList; subqueryTask->relationShardList = relationShardList;
return subqueryTask; return subqueryTask;

View File

@ -45,6 +45,7 @@
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/query_pushdown_planning.h" #include "distributed/query_pushdown_planning.h"
#include "distributed/query_utils.h" #include "distributed/query_utils.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_restriction_equivalence.h" #include "distributed/relation_restriction_equivalence.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
@ -154,7 +155,7 @@ static List * ExtractInsertValuesList(Query *query, Var *partitionColumn);
static DeferredErrorMessage * MultiRouterPlannableQuery(Query *query); static DeferredErrorMessage * MultiRouterPlannableQuery(Query *query);
static DeferredErrorMessage * ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree); static DeferredErrorMessage * ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree);
static bool SelectsFromDistributedTable(List *rangeTableList, Query *query); static bool SelectsFromDistributedTable(List *rangeTableList, Query *query);
static ShardPlacement * CreateDummyPlacement(void); static ShardPlacement * CreateDummyPlacement(bool hasLocalRelation);
static List * get_all_actual_clauses(List *restrictinfo_list); static List * get_all_actual_clauses(List *restrictinfo_list);
static int CompareInsertValuesByShardId(const void *leftElement, static int CompareInsertValuesByShardId(const void *leftElement,
const void *rightElement); const void *rightElement);
@ -2021,6 +2022,8 @@ PlanRouterQuery(Query *originalQuery,
bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery, bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery,
Const **partitionValueConst) Const **partitionValueConst)
{ {
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
bool isMultiShardQuery = false; bool isMultiShardQuery = false;
DeferredErrorMessage *planningError = NULL; DeferredErrorMessage *planningError = NULL;
bool shardsPresent = false; bool shardsPresent = false;
@ -2133,14 +2136,15 @@ PlanRouterQuery(Query *originalQuery,
/* we need anchor shard id for select queries with router planner */ /* we need anchor shard id for select queries with router planner */
uint64 shardId = GetAnchorShardId(*prunedShardIntervalListList); uint64 shardId = GetAnchorShardId(*prunedShardIntervalListList);
List *workerList = bool hasLocalRelation = relationRestrictionContext->hasLocalRelation;
FindRouterWorkerList(*prunedShardIntervalListList, shardsPresent,
replacePrunedQueryWithDummy);
if (workerList == NIL) List *taskPlacementList =
CreateTaskPlacementListForShardIntervals(*prunedShardIntervalListList,
shardsPresent,
replacePrunedQueryWithDummy,
hasLocalRelation);
if (taskPlacementList == NIL)
{ {
ereport(DEBUG2, (errmsg("Found no worker with all shard placements")));
planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"found no worker with all shard placements", "found no worker with all shard placements",
NULL, NULL); NULL, NULL);
@ -2157,39 +2161,74 @@ PlanRouterQuery(Query *originalQuery,
} }
*multiShardModifyQuery = false; *multiShardModifyQuery = false;
*placementList = workerList; *placementList = taskPlacementList;
*anchorShardId = shardId; *anchorShardId = shardId;
return planningError; return planningError;
} }
/*
* CreateTaskPlacementListForShardIntervals returns a list of shard placements
* on which it can access all shards in shardIntervalListList, which contains
* a list of shards for each relation in the query.
*
* If the query contains a local table then hasLocalRelation should be set to
* true. In that case, CreateTaskPlacementListForShardIntervals only returns
* a placement for the local node or an empty list if the shards cannot be
* accessed locally.
*
* If generateDummyPlacement is true and there are no shards that need to be
* accessed to answer the query (shardsPresent is false), then a single
* placement is returned that is either local or follows a round-robin policy.
* A typical example is a router query that only reads an intermediate result.
* This will happen on the coordinator, unless the user wants to balance the
* load by setting the citus.task_assignment_policy.
*/
List * List *
FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, CreateTaskPlacementListForShardIntervals(List *shardIntervalListList, bool shardsPresent,
bool replacePrunedQueryWithDummy) bool generateDummyPlacement,
bool hasLocalRelation)
{ {
List *workerList = NIL; List *placementList = NIL;
/*
* Determine the worker that has all shard placements if a shard placement found.
* If no shard placement exists and replacePrunedQueryWithDummy flag is set, we will
* still run the query but the result will be empty. We create a dummy shard
* placement for the first active worker.
*/
if (shardsPresent) if (shardsPresent)
{ {
workerList = WorkersContainingAllShards(shardIntervalList); /*
} * Determine the workers that have all shard placements, if any.
else if (replacePrunedQueryWithDummy) */
{ List *shardPlacementList =
ShardPlacement *dummyPlacement = CreateDummyPlacement(); PlacementsForWorkersContainingAllShards(shardIntervalListList);
if (dummyPlacement != NULL)
if (hasLocalRelation)
{ {
workerList = lappend(workerList, dummyPlacement); ShardPlacement *taskPlacement = NULL;
/*
* If there is a local table, we only allow the local placement to
* be used. If there is none, we disallow the query.
*/
foreach_ptr(taskPlacement, shardPlacementList)
{
if (taskPlacement->groupId == GetLocalGroupId())
{
placementList = lappend(placementList, taskPlacement);
}
}
}
else
{
placementList = shardPlacementList;
} }
} }
else if (generateDummyPlacement)
{
ShardPlacement *dummyPlacement = CreateDummyPlacement(hasLocalRelation);
return workerList; placementList = list_make1(dummyPlacement);
}
return placementList;
} }
@ -2201,14 +2240,17 @@ FindRouterWorkerList(List *shardIntervalList, bool shardsPresent,
* *
* If round robin policy is set, the placement could be on any node in pg_dist_node. * If round robin policy is set, the placement could be on any node in pg_dist_node.
* Else, the local node is set for the placement. * Else, the local node is set for the placement.
*
* Queries can also involve local tables. In that case we always use the local
* node.
*/ */
static ShardPlacement * static ShardPlacement *
CreateDummyPlacement(void) CreateDummyPlacement(bool hasLocalRelation)
{ {
static uint32 zeroShardQueryRoundRobin = 0; static uint32 zeroShardQueryRoundRobin = 0;
ShardPlacement *dummyPlacement = CitusMakeNode(ShardPlacement); ShardPlacement *dummyPlacement = CitusMakeNode(ShardPlacement);
if (TaskAssignmentPolicy == TASK_ASSIGNMENT_ROUND_ROBIN) if (TaskAssignmentPolicy == TASK_ASSIGNMENT_ROUND_ROBIN && !hasLocalRelation)
{ {
List *workerNodeList = ActiveReadableWorkerNodeList(); List *workerNodeList = ActiveReadableWorkerNodeList();
if (workerNodeList == NIL) if (workerNodeList == NIL)
@ -2441,6 +2483,13 @@ TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionConte
RelationRestriction *relationRestriction = RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(restrictionCell); (RelationRestriction *) lfirst(restrictionCell);
Oid relationId = relationRestriction->relationId; Oid relationId = relationRestriction->relationId;
if (!IsCitusTable(relationId))
{
/* ignore local tables for shard pruning purposes */
continue;
}
Index tableId = relationRestriction->index; Index tableId = relationRestriction->index;
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
int shardCount = cacheEntry->shardIntervalArrayLength; int shardCount = cacheEntry->shardIntervalArrayLength;
@ -2537,22 +2586,18 @@ RelationPrunesToMultipleShards(List *relationShardList)
/* /*
* WorkersContainingSelectShards returns list of shard placements that contain all * PlacementsForWorkersContainingAllShards returns list of shard placements for workers
* shard intervals provided to the select query. It returns NIL if no placement * that contain all shard intervals in the given list of shard interval lists.
* exists. The caller should check if there are any shard intervals exist for
* placement check prior to calling this function.
*/ */
List * List *
WorkersContainingAllShards(List *prunedShardIntervalsList) PlacementsForWorkersContainingAllShards(List *shardIntervalListList)
{ {
ListCell *prunedShardIntervalCell = NULL;
bool firstShard = true; bool firstShard = true;
List *currentPlacementList = NIL; List *currentPlacementList = NIL;
List *shardIntervalList = NIL;
foreach(prunedShardIntervalCell, prunedShardIntervalsList) foreach_ptr(shardIntervalList, shardIntervalListList)
{ {
List *shardIntervalList = (List *) lfirst(prunedShardIntervalCell);
if (shardIntervalList == NIL) if (shardIntervalList == NIL)
{ {
continue; continue;
@ -3197,22 +3242,22 @@ MultiRouterPlannableQuery(Query *query)
NULL, NULL); NULL, NULL);
} }
bool hasLocalTable = false;
bool hasDistributedTable = false;
ExtractRangeTableRelationWalker((Node *) query, &rangeTableRelationList); ExtractRangeTableRelationWalker((Node *) query, &rangeTableRelationList);
foreach(rangeTableRelationCell, rangeTableRelationList) foreach(rangeTableRelationCell, rangeTableRelationList)
{ {
RangeTblEntry *rte = (RangeTblEntry *) lfirst(rangeTableRelationCell); RangeTblEntry *rte = (RangeTblEntry *) lfirst(rangeTableRelationCell);
if (rte->rtekind == RTE_RELATION) if (rte->rtekind == RTE_RELATION)
{ {
/* only hash partitioned tables are supported */
Oid distributedTableId = rte->relid; Oid distributedTableId = rte->relid;
/* local tables are allowed if there are no distributed tables */
if (!IsCitusTable(distributedTableId)) if (!IsCitusTable(distributedTableId))
{ {
/* local tables cannot be read from workers */ hasLocalTable = true;
return DeferredError( continue;
ERRCODE_FEATURE_NOT_SUPPORTED,
"Local tables cannot be used in distributed queries.",
NULL, NULL);
} }
char partitionMethod = PartitionMethod(distributedTableId); char partitionMethod = PartitionMethod(distributedTableId);
@ -3225,6 +3270,11 @@ MultiRouterPlannableQuery(Query *query)
NULL, NULL); NULL, NULL);
} }
if (partitionMethod != DISTRIBUTE_BY_NONE)
{
hasDistributedTable = true;
}
/* /*
* Currently, we don't support tables with replication factor > 1, * Currently, we don't support tables with replication factor > 1,
* except reference tables with SELECT ... FOR UPDATE queries. It is * except reference tables with SELECT ... FOR UPDATE queries. It is
@ -3246,6 +3296,14 @@ MultiRouterPlannableQuery(Query *query)
} }
} }
/* local tables are not allowed if there are distributed tables */
if (hasLocalTable && hasDistributedTable)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"Local tables cannot be used in distributed queries.",
NULL, NULL);
}
return ErrorIfQueryHasUnroutableModifyingCTE(query); return ErrorIfQueryHasUnroutableModifyingCTE(query);
} }

View File

@ -45,8 +45,10 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery,
Const **partitionValueConst); Const **partitionValueConst);
extern List * RelationShardListForShardIntervalList(List *shardIntervalList, extern List * RelationShardListForShardIntervalList(List *shardIntervalList,
bool *shardsPresent); bool *shardsPresent);
extern List * FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, extern List * CreateTaskPlacementListForShardIntervals(List *shardIntervalList,
bool replacePrunedQueryWithDummy); bool shardsPresent,
bool generateDummyPlacement,
bool hasLocalRelation);
extern List * RouterInsertTaskList(Query *query, bool parametersInQueryResolved, extern List * RouterInsertTaskList(Query *query, bool parametersInQueryResolved,
DeferredErrorMessage **planningError); DeferredErrorMessage **planningError);
extern Const * ExtractInsertPartitionKeyValue(Query *query); extern Const * ExtractInsertPartitionKeyValue(Query *query);
@ -54,7 +56,7 @@ extern List * TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *
restrictionContext, restrictionContext,
bool *multiShardQuery, bool *multiShardQuery,
Const **partitionValueConst); Const **partitionValueConst);
extern List * WorkersContainingAllShards(List *prunedShardIntervalsList); extern List * PlacementsForWorkersContainingAllShards(List *shardIntervalListList);
extern List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList); extern List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList);
extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree, Query *originalQuery, extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree, Query *originalQuery,
bool multiShardQuery, bool multiShardQuery,
@ -74,7 +76,6 @@ extern bool IsMultiRowInsert(Query *query);
extern void AddShardIntervalRestrictionToSelect(Query *subqery, extern void AddShardIntervalRestrictionToSelect(Query *subqery,
ShardInterval *shardInterval); ShardInterval *shardInterval);
extern bool UpdateOrDeleteQuery(Query *query); extern bool UpdateOrDeleteQuery(Query *query);
extern List * WorkersContainingAllShards(List *prunedShardIntervalsList);
extern uint64 GetAnchorShardId(List *relationShardList); extern uint64 GetAnchorShardId(List *relationShardList);
extern List * TargetShardIntervalForFastPathQuery(Query *query, extern List * TargetShardIntervalForFastPathQuery(Query *query,

View File

@ -231,6 +231,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato
(1 row) (1 row)
SELECT * FROM ref JOIN local ON (a = x); SELECT * FROM ref JOIN local ON (a = x);
NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
a | b | x | y a | b | x | y
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
@ -250,6 +251,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato
TRUNCATE ref; TRUNCATE ref;
NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE
SELECT * FROM ref JOIN local ON (a = x); SELECT * FROM ref JOIN local ON (a = x);
NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
a | b | x | y a | b | x | y
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
@ -268,6 +270,7 @@ INSERT INTO ref VALUES (1,2);
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (1, 2) NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (1, 2)
INSERT INTO local VALUES (1,2); INSERT INTO local VALUES (1,2);
SELECT * FROM ref JOIN local ON (a = x); SELECT * FROM ref JOIN local ON (a = x);
NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
a | b | x | y a | b | x | y
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 2 | 1 | 2 1 | 2 | 1 | 2
@ -290,6 +293,7 @@ WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETU
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b
NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
count | x | y | a | b | count count | x | y | a | b | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -299,6 +303,7 @@ NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.co
TRUNCATE ref; TRUNCATE ref;
NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE
SELECT * FROM ref JOIN local ON (a = x); SELECT * FROM ref JOIN local ON (a = x);
NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
a | b | x | y a | b | x | y
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
@ -309,6 +314,7 @@ WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETU
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b
NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
count | x | y | a | b | count count | x | y | a | b | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -323,6 +329,7 @@ WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETU
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b
NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
count | x | y | a | b | count count | x | y | a | b | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -337,6 +344,7 @@ WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETU
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_1503016'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_1503016'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b
NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
count | x | y | a | b | count count | x | y | a | b | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -358,6 +366,7 @@ INSERT INTO ref VALUES (1,2);
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (1, 2) NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (1, 2)
INSERT INTO local VALUES (1,2); INSERT INTO local VALUES (1,2);
SELECT * FROM ref JOIN local ON (a = x); SELECT * FROM ref JOIN local ON (a = x);
NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
a | b | x | y a | b | x | y
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 2 | 1 | 2 1 | 2 | 1 | 2
@ -369,14 +378,14 @@ WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETU
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b
NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
count | x | y | a | b | count count | x | y | a | b | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
100 | 3 | 2 | 3 | 2 | 1 100 | 3 | 2 | 3 | 2 | 1
(1 row) (1 row)
-- issue #3801 -- joins between local tables and distributed tables are disallowed
SET citus.shard_replication_factor TO 2;
CREATE TABLE dist_table(a int); CREATE TABLE dist_table(a int);
ERROR: relation "dist_table" already exists ERROR: relation "dist_table" already exists
SELECT create_distributed_table('dist_table', 'a'); SELECT create_distributed_table('dist_table', 'a');
@ -389,6 +398,42 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut
(1 row) (1 row)
INSERT INTO dist_table VALUES(1);
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.dist_table_1503017 (a) VALUES (1)
SELECT * FROM local JOIN dist_table ON (a = x);
ERROR: relation local is not distributed
SELECT * FROM local JOIN dist_table ON (a = x) WHERE a = 1;;
ERROR: relation local is not distributed
-- intermediate results are allowed
WITH cte_1 AS (SELECT * FROM dist_table LIMIT 1)
SELECT * FROM ref JOIN local ON (a = x) JOIN cte_1 ON (local.x = cte_1.a);
NOTICE: executing the command locally: SELECT a FROM coordinator_shouldhaveshards.dist_table_1503017 dist_table WHERE true LIMIT '1'::bigint
NOTICE: executing the command locally: SELECT a FROM coordinator_shouldhaveshards.dist_table_1503020 dist_table WHERE true LIMIT '1'::bigint
NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y, cte_1.a FROM ((coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) cte_1 ON ((local.x OPERATOR(pg_catalog.=) cte_1.a)))
a | b | x | y | a
---------------------------------------------------------------------
1 | 2 | 1 | 2 | 1
(1 row)
-- full router query with CTE and local
WITH cte_1 AS (SELECT * FROM ref LIMIT 1)
SELECT * FROM ref JOIN local ON (a = x) JOIN cte_1 ON (local.x = cte_1.a);
NOTICE: executing the command locally: WITH cte_1 AS (SELECT ref_1.a, ref_1.b FROM coordinator_shouldhaveshards.ref_1503016 ref_1 LIMIT 1) SELECT ref.a, ref.b, local.x, local.y, cte_1.a, cte_1.b FROM ((coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) JOIN cte_1 ON ((local.x OPERATOR(pg_catalog.=) cte_1.a)))
a | b | x | y | a | b
---------------------------------------------------------------------
1 | 2 | 1 | 2 | 1 | 2
(1 row)
DROP TABLE dist_table;
-- issue #3801
SET citus.shard_replication_factor TO 2;
CREATE TABLE dist_table(a int);
SELECT create_distributed_table('dist_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
BEGIN; BEGIN;
-- this will use perPlacementQueryStrings, make sure it works correctly with -- this will use perPlacementQueryStrings, make sure it works correctly with
-- copying task -- copying task
@ -400,10 +445,10 @@ CREATE TABLE dist_table1(a int);
-- this will use queryStringList, make sure it works correctly with -- this will use queryStringList, make sure it works correctly with
-- copying task -- copying task
SELECT create_distributed_table('dist_table1', 'a'); SELECT create_distributed_table('dist_table1', 'a');
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503023, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503023, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503029, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503029, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503025, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503025, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503031, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503031, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503026, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503026, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503032, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503032, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503028, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503028, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503034, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503034, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres')
create_distributed_table create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -456,6 +456,7 @@ TRUNCATE TABLE reference_table;
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_copy.reference_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE local_shard_copy.reference_table_xxxxx CASCADE
TRUNCATE TABLE local_table; TRUNCATE TABLE local_table;
SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key; SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.reference_table_1570000 reference_table, local_shard_copy.local_table WHERE (reference_table.key OPERATOR(pg_catalog.=) local_table.key)
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0

View File

@ -2,13 +2,11 @@ CREATE SCHEMA locally_execute_intermediate_results;
SET search_path TO locally_execute_intermediate_results; SET search_path TO locally_execute_intermediate_results;
SET citus.log_intermediate_results TO TRUE; SET citus.log_intermediate_results TO TRUE;
SET citus.log_local_commands TO TRUE; SET citus.log_local_commands TO TRUE;
SET client_min_messages TO DEBUG1;
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
SET citus.next_shard_id TO 1580000; SET citus.next_shard_id TO 1580000;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO 'streaming'; SET citus.replication_model TO 'streaming';
CREATE TABLE table_1 (key int, value text); CREATE TABLE table_1 (key int, value text);
DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially
SELECT create_distributed_table('table_1', 'key'); SELECT create_distributed_table('table_1', 'key');
create_distributed_table create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -16,7 +14,6 @@ SELECT create_distributed_table('table_1', 'key');
(1 row) (1 row)
CREATE TABLE table_2 (key int, value text); CREATE TABLE table_2 (key int, value text);
DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially
SELECT create_distributed_table('table_2', 'key'); SELECT create_distributed_table('table_2', 'key');
create_distributed_table create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -24,22 +21,24 @@ SELECT create_distributed_table('table_2', 'key');
(1 row) (1 row)
CREATE TABLE ref_table (key int, value text); CREATE TABLE ref_table (key int, value text);
DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially
SELECT create_reference_table('ref_table'); SELECT create_reference_table('ref_table');
create_reference_table create_reference_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
CREATE TABLE local_table (key int, value text);
-- load some data -- load some data
INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6');
INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
NOTICE: executing the command locally: INSERT INTO locally_execute_intermediate_results.ref_table_1580008 AS citus_table_alias (key, value) VALUES (1,'1'::text), (2,'2'::text), (3,'3'::text), (4,'4'::text), (5,'5'::text), (6,'6'::text) NOTICE: executing the command locally: INSERT INTO locally_execute_intermediate_results.ref_table_1580008 AS citus_table_alias (key, value) VALUES (1,'1'::text), (2,'2'::text), (3,'3'::text), (4,'4'::text), (5,'5'::text), (6,'6'::text)
INSERT INTO local_table VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6');
-- prevent PG 11 - PG 12 outputs to diverge -- prevent PG 11 - PG 12 outputs to diverge
-- and have a lot more CTEs recursively planned for the -- and have a lot more CTEs recursively planned for the
-- sake of increasing the test coverage -- sake of increasing the test coverage
SET citus.enable_cte_inlining TO false; SET citus.enable_cte_inlining TO false;
SET client_min_messages TO DEBUG1;
-- the query cannot be executed locally, but still because of -- the query cannot be executed locally, but still because of
-- HAVING the intermediate result is written to local file as well -- HAVING the intermediate result is written to local file as well
WITH cte_1 AS (SELECT max(value) FROM table_1) WITH cte_1 AS (SELECT max(value) FROM table_1)
@ -462,6 +461,49 @@ NOTICE: executing the command locally: SELECT foo.key, bar.key FROM (SELECT int
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
-- queries in which the last step has only CTEs can use local tables
WITH cte_1 AS (SELECT max(value) FROM table_1)
SELECT
count(*)
FROM
local_table
GROUP BY key
HAVING max(value) > (SELECT max FROM cte_1);
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.local_table GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))
DEBUG: Subplan XXX_1 will be written to local file
NOTICE: executing the command locally: SELECT count(*) AS count FROM locally_execute_intermediate_results.local_table GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))
count
---------------------------------------------------------------------
1
1
(2 rows)
WITH cte_1 AS (SELECT max(value) FROM table_1),
cte_2 AS (SELECT * FROM table_2)
SELECT
count(*)
FROM
local_table
WHERE
key > (SELECT key FROM cte_2 ORDER BY 1 LIMIT 1)
GROUP BY key
HAVING max(value) > (SELECT max FROM cte_1);
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM locally_execute_intermediate_results.table_2
DEBUG: generating subplan XXX_3 for subquery SELECT key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 ORDER BY key LIMIT 1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.local_table WHERE (key OPERATOR(pg_catalog.>) (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer))) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_2 will be written to local file
DEBUG: Subplan XXX_3 will be written to local file
NOTICE: executing the command locally: SELECT key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 ORDER BY key LIMIT 1
NOTICE: executing the command locally: SELECT count(*) AS count FROM locally_execute_intermediate_results.local_table WHERE (key OPERATOR(pg_catalog.>) (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer))) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))
count
---------------------------------------------------------------------
1
1
(2 rows)
\c - - - :worker_1_port \c - - - :worker_1_port
-- now use the same queries on a worker -- now use the same queries on a worker
SET search_path TO locally_execute_intermediate_results; SET search_path TO locally_execute_intermediate_results;

View File

@ -1298,10 +1298,9 @@ DELETE FROM summary_table WHERE (
SELECT 1 FROM pg_catalog.pg_statio_sys_sequences SELECT 1 FROM pg_catalog.pg_statio_sys_sequences
) = null; ) = null;
DELETE FROM summary_table WHERE ( DELETE FROM summary_table WHERE (
SELECT (select action_statement from information_schema.triggers) SELECT (select min(action_statement) from information_schema.triggers)
FROM pg_catalog.pg_statio_sys_sequences FROM pg_catalog.pg_statio_sys_sequences
) = null; ) = null;
ERROR: relation pg_namespace is not distributed
DELETE FROM summary_table WHERE id < ( DELETE FROM summary_table WHERE id < (
SELECT 0 FROM pg_dist_node SELECT 0 FROM pg_dist_node
); );

View File

@ -137,11 +137,15 @@ SET search_path TO mx_add_coordinator,public;
INSERT INTO ref VALUES (1), (2), (3); INSERT INTO ref VALUES (1), (2), (3);
UPDATE ref SET a = a + 1; UPDATE ref SET a = a + 1;
DELETE FROM ref WHERE a > 3; DELETE FROM ref WHERE a > 3;
-- Test we don't allow reference/local joins on mx workers -- Test we allow reference/local joins on mx workers
CREATE TABLE local_table (a int); CREATE TABLE local_table (a int);
INSERT INTO local_table VALUES (2), (4); INSERT INTO local_table VALUES (2), (4);
SELECT r.a FROM ref r JOIN local_table lt on r.a = lt.a; SELECT r.a FROM ref r JOIN local_table lt on r.a = lt.a;
ERROR: relation local_table is not distributed a
---------------------------------------------------------------------
2
(1 row)
\c - - - :master_port \c - - - :master_port
SET search_path TO mx_add_coordinator,public; SET search_path TO mx_add_coordinator,public;
SELECT * FROM ref ORDER BY a; SELECT * FROM ref ORDER BY a;

View File

@ -825,7 +825,6 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
FROM articles_hash a, single_shard b FROM articles_hash a, single_shard b
WHERE a.author_id = 2 and a.author_id = b.author_id WHERE a.author_id = 2 and a.author_id = b.author_id
LIMIT 3; LIMIT 3;
DEBUG: Found no worker with all shard placements
DEBUG: found no worker with all shard placements DEBUG: found no worker with all shard placements
DEBUG: generating subplan XXX_1 for CTE single_shard: SELECT id, author_id, title, word_count FROM public.articles_single_shard_hash DEBUG: generating subplan XXX_1 for CTE single_shard: SELECT id, author_id, title, word_count FROM public.articles_single_shard_hash
DEBUG: Creating router plan DEBUG: Creating router plan
@ -1941,7 +1940,6 @@ DETAIL: distribution column value: 2
-- not router plannable -- not router plannable
SELECT * FROM articles_hash ar join authors_range au on (ar.author_id = au.id) SELECT * FROM articles_hash ar join authors_range au on (ar.author_id = au.id)
WHERE ar.author_id = 3; WHERE ar.author_id = 3;
DEBUG: Found no worker with all shard placements
DEBUG: found no worker with all shard placements DEBUG: found no worker with all shard placements
DEBUG: join prunable for intervals [1,10] and [11,30] DEBUG: join prunable for intervals [1,10] and [11,30]
DEBUG: join prunable for intervals [1,10] and [21,40] DEBUG: join prunable for intervals [1,10] and [21,40]

View File

@ -29,20 +29,6 @@ BEGIN
END LOOP; END LOOP;
RETURN; RETURN;
END; $$ language plpgsql; END; $$ language plpgsql;
-- Is a distributed plan?
CREATE OR REPLACE FUNCTION plan_is_distributed(explain_commmand text)
RETURNS BOOLEAN AS $$
DECLARE
query_plan TEXT;
BEGIN
FOR query_plan IN execute explain_commmand LOOP
IF query_plan LIKE '%Task Count:%'
THEN
RETURN TRUE;
END IF;
END LOOP;
RETURN FALSE;
END; $$ language plpgsql;
-- helper function to quickly run SQL on the whole cluster -- helper function to quickly run SQL on the whole cluster
CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text) CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
RETURNS void LANGUAGE plpgsql AS $$ RETURNS void LANGUAGE plpgsql AS $$

View File

@ -99,20 +99,26 @@ SELECT citus_table_is_visible('numbers_8000001'::regclass::oid);
-- Join between reference tables and local tables -- Join between reference tables and local tables
CREATE TABLE local_table(a int); CREATE TABLE local_table(a int);
INSERT INTO local_table VALUES (2), (4), (7), (20); INSERT INTO local_table VALUES (2), (4), (7), (20);
EXPLAIN SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers; EXPLAIN (COSTS OFF) SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers;
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Merge Join (cost=359.57..860.00 rows=32512 width=8) Custom Scan (Citus Adaptive)
Merge Cond: (local_table.a = numbers_8000001.a) Task Count: 1
-> Sort (cost=179.78..186.16 rows=2550 width=4) Tasks Shown: All
Sort Key: local_table.a -> Task
-> Seq Scan on local_table (cost=0.00..35.50 rows=2550 width=4) Node: host=localhost port=xxxxx dbname=regression
-> Sort (cost=179.78..186.16 rows=2550 width=4) -> Merge Join
Sort Key: numbers_8000001.a Merge Cond: (local_table.a = numbers.a)
-> Seq Scan on numbers_8000001 (cost=0.00..35.50 rows=2550 width=4) -> Sort
(8 rows) Sort Key: local_table.a
-> Seq Scan on local_table
-> Sort
Sort Key: numbers.a
-> Seq Scan on numbers_8000001 numbers
(13 rows)
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
NOTICE: executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) ORDER BY local_table.a
a | a a | a
--------------------------------------------------------------------- ---------------------------------------------------------------------
20 | 20 20 | 20
@ -123,6 +129,7 @@ SELECT lt.a, sq.a, sq.b
FROM local_table lt FROM local_table lt
JOIN squares sq ON sq.a > lt.a and sq.b > 90 JOIN squares sq ON sq.a > lt.a and sq.b > 90
ORDER BY 1,2,3; ORDER BY 1,2,3;
NOTICE: executing the command locally: SELECT lt.a, sq.a, sq.b FROM (replicate_ref_to_coordinator.local_table lt JOIN replicate_ref_to_coordinator.squares_8000000 sq ON (((sq.a OPERATOR(pg_catalog.>) lt.a) AND (sq.b OPERATOR(pg_catalog.>) 90)))) ORDER BY lt.a, sq.a, sq.b
a | a | b a | a | b
--------------------------------------------------------------------- ---------------------------------------------------------------------
2 | 10 | 100 2 | 10 | 100
@ -133,6 +140,7 @@ ORDER BY 1,2,3;
-- should work if in transaction block -- should work if in transaction block
BEGIN; BEGIN;
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
NOTICE: executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) ORDER BY local_table.a
a | a a | a
--------------------------------------------------------------------- ---------------------------------------------------------------------
20 | 20 20 | 20
@ -145,6 +153,9 @@ BEGIN
PERFORM local_table.a, numbers.a FROM local_table NATURAL JOIN numbers; PERFORM local_table.a, numbers.a FROM local_table NATURAL JOIN numbers;
END; END;
$$; $$;
NOTICE: executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a))
CONTEXT: SQL statement "SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers"
PL/pgSQL function inline_code_block line 3 at PERFORM
-- test plpgsql function -- test plpgsql function
CREATE FUNCTION test_reference_local_join_plpgsql_func() CREATE FUNCTION test_reference_local_join_plpgsql_func()
RETURNS void AS $$ RETURNS void AS $$
@ -160,6 +171,9 @@ SELECT test_reference_local_join_plpgsql_func();
NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 (a) VALUES (4) NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 (a) VALUES (4)
CONTEXT: SQL statement "INSERT INTO numbers VALUES (4)" CONTEXT: SQL statement "INSERT INTO numbers VALUES (4)"
PL/pgSQL function test_reference_local_join_plpgsql_func() line 4 at SQL statement PL/pgSQL function test_reference_local_join_plpgsql_func() line 4 at SQL statement
NOTICE: executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) ORDER BY local_table.a
CONTEXT: SQL statement "SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1"
PL/pgSQL function test_reference_local_join_plpgsql_func() line 5 at PERFORM
ERROR: ERROR:
CONTEXT: PL/pgSQL function test_reference_local_join_plpgsql_func() line 6 at RAISE CONTEXT: PL/pgSQL function test_reference_local_join_plpgsql_func() line 6 at RAISE
SELECT sum(a) FROM local_table; SELECT sum(a) FROM local_table;
@ -180,6 +194,8 @@ CREATE PROCEDURE test_reference_local_join_proc() AS $$
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
$$ LANGUAGE sql; $$ LANGUAGE sql;
CALL test_reference_local_join_proc(); CALL test_reference_local_join_proc();
NOTICE: executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) ORDER BY local_table.a
CONTEXT: SQL function "test_reference_local_join_proc" statement 1
CREATE SCHEMA s1; CREATE SCHEMA s1;
CREATE TABLE s1.ref(a int); CREATE TABLE s1.ref(a int);
SELECT create_reference_table('s1.ref'); SELECT create_reference_table('s1.ref');
@ -190,6 +206,7 @@ SELECT create_reference_table('s1.ref');
BEGIN; BEGIN;
SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1; SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1;
NOTICE: executing the command locally: SELECT local_table.a, r.a FROM (replicate_ref_to_coordinator.local_table JOIN s1.ref_8000002 r(a) USING (a)) ORDER BY local_table.a
a | a a | a
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
@ -199,6 +216,7 @@ BEGIN;
WITH t1 AS ( WITH t1 AS (
SELECT my_volatile_fn() r, a FROM local_table SELECT my_volatile_fn() r, a FROM local_table
) SELECT count(*) FROM t1, numbers WHERE t1.a = numbers.a AND r < 0.5; ) SELECT count(*) FROM t1, numbers WHERE t1.a = numbers.a AND r < 0.5;
NOTICE: executing the command locally: WITH t1 AS (SELECT replicate_ref_to_coordinator.my_volatile_fn() AS r, local_table.a FROM replicate_ref_to_coordinator.local_table) SELECT count(*) AS count FROM t1, replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((t1.a OPERATOR(pg_catalog.=) numbers.a) AND ((t1.r)::numeric OPERATOR(pg_catalog.<) 0.5))
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
@ -209,6 +227,7 @@ BEGIN;
WITH t1 AS ( WITH t1 AS (
SELECT my_volatile_fn() r, a FROM numbers SELECT my_volatile_fn() r, a FROM numbers
) SELECT count(*) FROM t1, local_table WHERE t1.a = local_table.a AND r < 0.5; ) SELECT count(*) FROM t1, local_table WHERE t1.a = local_table.a AND r < 0.5;
NOTICE: executing the command locally: WITH t1 AS (SELECT replicate_ref_to_coordinator.my_volatile_fn() AS r, numbers.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers) SELECT count(*) AS count FROM t1, replicate_ref_to_coordinator.local_table WHERE ((t1.a OPERATOR(pg_catalog.=) local_table.a) AND ((t1.r)::numeric OPERATOR(pg_catalog.<) 0.5))
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
@ -218,6 +237,7 @@ END;
BEGIN; BEGIN;
SELECT count(*) FROM local_table SELECT count(*) FROM local_table
WHERE EXISTS(SELECT my_volatile_fn() FROM numbers WHERE local_table.a = numbers.a); WHERE EXISTS(SELECT my_volatile_fn() FROM numbers WHERE local_table.a = numbers.a);
NOTICE: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.local_table WHERE (EXISTS (SELECT replicate_ref_to_coordinator.my_volatile_fn() AS my_volatile_fn FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE (local_table.a OPERATOR(pg_catalog.=) numbers.a)))
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
@ -227,6 +247,7 @@ END;
BEGIN; BEGIN;
SELECT count(*) FROM numbers SELECT count(*) FROM numbers
WHERE EXISTS(SELECT my_volatile_fn() FROM local_table WHERE local_table.a = numbers.a); WHERE EXISTS(SELECT my_volatile_fn() FROM local_table WHERE local_table.a = numbers.a);
NOTICE: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE (EXISTS (SELECT replicate_ref_to_coordinator.my_volatile_fn() AS my_volatile_fn FROM replicate_ref_to_coordinator.local_table WHERE (local_table.a OPERATOR(pg_catalog.=) numbers.a)))
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
@ -247,24 +268,54 @@ $$ LANGUAGE sql;
SELECT test_reference_local_join_func(); SELECT test_reference_local_join_func();
test_reference_local_join_func test_reference_local_join_func
--------------------------------------------------------------------- ---------------------------------------------------------------------
(2,2)
(20,20) (20,20)
(1 row) (2 rows)
-- shouldn't plan locally if modifications happen in CTEs, ... -- CTEs are allowed
WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *)
SELECT * FROM numbers, local_table; SELECT * FROM numbers, local_table;
ERROR: relation local_table is not distributed NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 (a) VALUES (1) RETURNING a
NOTICE: executing the command locally: SELECT numbers.a, local_table.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers, replicate_ref_to_coordinator.local_table
a | a
---------------------------------------------------------------------
20 | 2
20 | 4
20 | 7
20 | 20
21 | 2
21 | 4
21 | 7
21 | 20
2 | 2
2 | 4
2 | 7
2 | 20
(12 rows)
WITH t AS (SELECT *, my_volatile_fn() x FROM numbers FOR UPDATE) WITH t AS (SELECT *, my_volatile_fn() x FROM numbers FOR UPDATE)
SELECT * FROM numbers, local_table SELECT * FROM numbers, local_table
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
ERROR: relation local_table is not distributed NOTICE: executing the command locally: WITH t AS (SELECT numbers_1.a, replicate_ref_to_coordinator.my_volatile_fn() AS x FROM replicate_ref_to_coordinator.numbers_8000001 numbers_1 FOR UPDATE OF numbers_1) SELECT numbers.a, local_table.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers, replicate_ref_to_coordinator.local_table WHERE (EXISTS (SELECT t.a, t.x FROM t WHERE (t.x OPERATOR(pg_catalog.=) numbers.a)))
-- but this should be fine a | a
---------------------------------------------------------------------
1 | 2
1 | 4
1 | 7
1 | 20
(4 rows)
WITH t AS (SELECT *, my_volatile_fn() x FROM numbers) WITH t AS (SELECT *, my_volatile_fn() x FROM numbers)
SELECT * FROM numbers, local_table SELECT * FROM numbers, local_table
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
NOTICE: executing the command locally: WITH t AS (SELECT numbers_1.a, replicate_ref_to_coordinator.my_volatile_fn() AS x FROM replicate_ref_to_coordinator.numbers_8000001 numbers_1) SELECT numbers.a, local_table.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers, replicate_ref_to_coordinator.local_table WHERE (EXISTS (SELECT t.a, t.x FROM t WHERE (t.x OPERATOR(pg_catalog.=) numbers.a)))
a | a a | a
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) 1 | 2
1 | 4
1 | 7
1 | 20
(4 rows)
-- shouldn't plan locally even if distributed table is in CTE or subquery -- shouldn't plan locally even if distributed table is in CTE or subquery
CREATE TABLE dist(a int); CREATE TABLE dist(a int);
@ -278,10 +329,13 @@ INSERT INTO dist VALUES (20),(30);
WITH t AS (SELECT *, my_volatile_fn() x FROM dist) WITH t AS (SELECT *, my_volatile_fn() x FROM dist)
SELECT * FROM numbers, local_table SELECT * FROM numbers, local_table
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
ERROR: relation local_table is not distributed ERROR: function replicate_ref_to_coordinator.my_volatile_fn() does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
CONTEXT: while executing command on localhost:xxxxx
-- test CTE being reference/local join for distributed query -- test CTE being reference/local join for distributed query
WITH t as (SELECT n.a, my_volatile_fn() x FROM numbers n NATURAL JOIN local_table l) WITH t as (SELECT n.a, my_volatile_fn() x FROM numbers n NATURAL JOIN local_table l)
SELECT a FROM t NATURAL JOIN dist; SELECT a FROM t NATURAL JOIN dist;
NOTICE: executing the command locally: SELECT n.a, replicate_ref_to_coordinator.my_volatile_fn() AS x FROM (replicate_ref_to_coordinator.numbers_8000001 n(a) JOIN replicate_ref_to_coordinator.local_table l USING (a))
a a
--------------------------------------------------------------------- ---------------------------------------------------------------------
20 20
@ -289,11 +343,21 @@ SELECT a FROM t NATURAL JOIN dist;
-- shouldn't error if FOR UPDATE/FOR SHARE -- shouldn't error if FOR UPDATE/FOR SHARE
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE;
ERROR: could not run distributed query with FOR UPDATE/SHARE commands NOTICE: executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) FOR SHARE OF local_table FOR SHARE OF numbers
HINT: Consider using an equality filter on the distributed table's partition column. a | a
---------------------------------------------------------------------
2 | 2
20 | 20
(2 rows)
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE;
ERROR: could not run distributed query with FOR UPDATE/SHARE commands NOTICE: executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) FOR UPDATE OF local_table FOR UPDATE OF numbers
HINT: Consider using an equality filter on the distributed table's partition column. a | a
---------------------------------------------------------------------
2 | 2
20 | 20
(2 rows)
-- --
-- Joins between reference tables and views shouldn't be planned locally. -- Joins between reference tables and views shouldn't be planned locally.
-- --
@ -313,66 +377,58 @@ SELECT public.coordinator_plan($Q$
EXPLAIN (COSTS FALSE) EXPLAIN (COSTS FALSE)
SELECT * FROM squares JOIN local_table_v ON squares.a = local_table_v.a; SELECT * FROM squares JOIN local_table_v ON squares.a = local_table_v.a;
$Q$); $Q$);
coordinator_plan coordinator_plan
--------------------------------------------------------------------- ---------------------------------------------------------------------
Custom Scan (Citus Adaptive) Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Seq Scan on local_table
Filter: ((a >= 1) AND (a <= 10))
Task Count: 1 Task Count: 1
(5 rows) (2 rows)
DROP VIEW numbers_v, local_table_v; DROP VIEW numbers_v, local_table_v;
-- --
-- Joins between reference tables and materialized views are allowed to -- Joins between reference tables and materialized views are allowed to
-- be planned locally. -- be planned to be executed locally.
-- --
CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10; CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10;
NOTICE: executing the command locally: SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((a OPERATOR(pg_catalog.>=) 1) AND (a OPERATOR(pg_catalog.<=) 10)) NOTICE: executing the command locally: SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((a OPERATOR(pg_catalog.>=) 1) AND (a OPERATOR(pg_catalog.<=) 10))
REFRESH MATERIALIZED VIEW numbers_v; REFRESH MATERIALIZED VIEW numbers_v;
SELECT public.plan_is_distributed($Q$ NOTICE: executing the command locally: SELECT numbers.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((numbers.a OPERATOR(pg_catalog.>=) 1) AND (numbers.a OPERATOR(pg_catalog.<=) 10))
EXPLAIN (COSTS FALSE)
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
$Q$);
plan_is_distributed
---------------------------------------------------------------------
f
(1 row)
BEGIN;
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
NOTICE: executing the command locally: SELECT squares.a, squares.b, numbers_v.a FROM (replicate_ref_to_coordinator.squares_8000000 squares JOIN replicate_ref_to_coordinator.numbers_v ON ((squares.a OPERATOR(pg_catalog.=) numbers_v.a)))
a | b | a a | b | a
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 1 | 1
2 | 4 | 2 2 | 4 | 2
(1 row) (2 rows)
END;
-- --
-- Joins between reference tables, local tables, and function calls shouldn't -- Joins between reference tables, local tables, and function calls
-- be planned locally. -- are allowed
-- --
SELECT count(*) SELECT count(*)
FROM local_table a, numbers b, generate_series(1, 10) c FROM local_table a, numbers b, generate_series(1, 10) c
WHERE a.a = b.a AND a.a = c; WHERE a.a = b.a AND a.a = c;
ERROR: relation local_table is not distributed NOTICE: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.local_table a, replicate_ref_to_coordinator.numbers_8000001 b, generate_series(1, 10) c(c) WHERE ((a.a OPERATOR(pg_catalog.=) b.a) AND (a.a OPERATOR(pg_catalog.=) c.c))
-- but it should be okay if the function call is not a data source count
SELECT public.plan_is_distributed($Q$
EXPLAIN (COSTS FALSE)
SELECT abs(a.a) FROM local_table a, numbers b WHERE a.a = b.a;
$Q$);
plan_is_distributed
--------------------------------------------------------------------- ---------------------------------------------------------------------
f 1
(1 row) (1 row)
SELECT public.plan_is_distributed($Q$ -- and it should be okay if the function call is not a data source
EXPLAIN (COSTS FALSE) SELECT abs(a.a) FROM local_table a, numbers b WHERE a.a = b.a;
SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a); NOTICE: executing the command locally: SELECT abs(a.a) AS abs FROM replicate_ref_to_coordinator.local_table a, replicate_ref_to_coordinator.numbers_8000001 b WHERE (a.a OPERATOR(pg_catalog.=) b.a)
$Q$); abs
plan_is_distributed
--------------------------------------------------------------------- ---------------------------------------------------------------------
f 2
(1 row) 20
(2 rows)
SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a);
NOTICE: executing the command locally: SELECT a.a FROM replicate_ref_to_coordinator.local_table a, replicate_ref_to_coordinator.numbers_8000001 b WHERE (a.a OPERATOR(pg_catalog.=) b.a) ORDER BY (abs(a.a))
a
---------------------------------------------------------------------
2
20
(2 rows)
TRUNCATE local_table; TRUNCATE local_table;
TRUNCATE numbers; TRUNCATE numbers;
@ -384,6 +440,7 @@ NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator
ALTER TABLE numbers ADD COLUMN d int; ALTER TABLE numbers ADD COLUMN d int;
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (8000001, 'replicate_ref_to_coordinator', 'ALTER TABLE numbers ADD COLUMN d int;') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (8000001, 'replicate_ref_to_coordinator', 'ALTER TABLE numbers ADD COLUMN d int;')
SELECT * FROM local_table JOIN numbers USING(a) ORDER BY a; SELECT * FROM local_table JOIN numbers USING(a) ORDER BY a;
NOTICE: executing the command locally: SELECT local_table.a, numbers.d FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a, d) USING (a)) ORDER BY local_table.a
a | d a | d
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 1 |
@ -397,6 +454,7 @@ BEGIN;
INSERT INTO local_table VALUES (1), (2), (3); INSERT INTO local_table VALUES (1), (2), (3);
WITH t as (SELECT n.a, my_volatile_fn() x FROM numbers n NATURAL JOIN local_table l ORDER BY n.a, x) WITH t as (SELECT n.a, my_volatile_fn() x FROM numbers n NATURAL JOIN local_table l ORDER BY n.a, x)
SELECT a FROM t NATURAL JOIN dist ORDER BY a; SELECT a FROM t NATURAL JOIN dist ORDER BY a;
NOTICE: executing the command locally: SELECT n.a, replicate_ref_to_coordinator.my_volatile_fn() AS x FROM (replicate_ref_to_coordinator.numbers_8000001 n(a) JOIN replicate_ref_to_coordinator.local_table l USING (a)) ORDER BY n.a, (replicate_ref_to_coordinator.my_volatile_fn())
a a
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
@ -409,6 +467,7 @@ NOTICE: executing the copy locally for shard xxxxx
INSERT INTO numbers SELECT * FROM numbers; INSERT INTO numbers SELECT * FROM numbers;
NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 AS citus_table_alias (a) SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 AS citus_table_alias (a) SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers
SELECT COUNT(*) FROM local_table JOIN numbers using (a); SELECT COUNT(*) FROM local_table JOIN numbers using (a);
NOTICE: executing the command locally: SELECT count(*) AS count FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a))
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
6 6
@ -417,6 +476,7 @@ SELECT COUNT(*) FROM local_table JOIN numbers using (a);
UPDATE numbers SET a = a + 1; UPDATE numbers SET a = a + 1;
NOTICE: executing the command locally: UPDATE replicate_ref_to_coordinator.numbers_8000001 numbers SET a = (a OPERATOR(pg_catalog.+) 1) NOTICE: executing the command locally: UPDATE replicate_ref_to_coordinator.numbers_8000001 numbers SET a = (a OPERATOR(pg_catalog.+) 1)
SELECT COUNT(*) FROM local_table JOIN numbers using (a); SELECT COUNT(*) FROM local_table JOIN numbers using (a);
NOTICE: executing the command locally: SELECT count(*) AS count FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a))
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
4 4

View File

@ -218,18 +218,20 @@ DEBUG: Router planner cannot handle multi-shard select queries
(2 rows) (2 rows)
-- same query with subquery in where is wrapped in CTE -- same query with subquery in where is wrapped in CTE
SET citus.enable_cte_inlining TO off;
SELECT * FROM test a WHERE x IN (WITH cte AS (SELECT x FROM test b UNION SELECT y FROM test c UNION SELECT y FROM local_test d) SELECT * FROM cte) ORDER BY 1,2; SELECT * FROM test a WHERE x IN (WITH cte AS (SELECT x FROM test b UNION SELECT y FROM test c UNION SELECT y FROM local_test d) SELECT * FROM cte) ORDER BY 1,2;
DEBUG: CTE cte is going to be inlined via distributed planning DEBUG: Local tables cannot be used in distributed queries.
DEBUG: generating subplan XXX_1 for CTE cte: SELECT b.x FROM recursive_set_local.test b UNION SELECT c.y FROM recursive_set_local.test c UNION SELECT d.y FROM recursive_set_local.local_test d
DEBUG: Local tables cannot be used in distributed queries. DEBUG: Local tables cannot be used in distributed queries.
DEBUG: generating subplan XXX_1 for subquery SELECT y FROM recursive_set_local.local_test d DEBUG: generating subplan XXX_1 for subquery SELECT y FROM recursive_set_local.local_test d
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_2 for subquery SELECT x FROM recursive_set_local.test b DEBUG: generating subplan XXX_2 for subquery SELECT x FROM recursive_set_local.test b
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_3 for subquery SELECT y FROM recursive_set_local.test c DEBUG: generating subplan XXX_3 for subquery SELECT y FROM recursive_set_local.test c
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.x FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer) UNION SELECT intermediate_result.y FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(y integer) UNION SELECT intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(y integer)
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DEBUG: generating subplan XXX_4 for subquery SELECT intermediate_result.x FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer) UNION SELECT intermediate_result.y FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(y integer) UNION SELECT intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(y integer) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT x, y FROM recursive_set_local.test a WHERE (x OPERATOR(pg_catalog.=) ANY (SELECT cte.x FROM (SELECT intermediate_result.x FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer)) cte)) ORDER BY x, y
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT x, y FROM recursive_set_local.test a WHERE (x OPERATOR(pg_catalog.=) ANY (SELECT cte.x FROM (SELECT intermediate_result.x FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(x integer)) cte)) ORDER BY x, y
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries
x | y x | y
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -237,7 +239,8 @@ DEBUG: Router planner cannot handle multi-shard select queries
2 | 2 2 | 2
(2 rows) (2 rows)
-- not supported since local table is joined with a set operation RESET citus.enable_cte_inlining;
-- supported since final step only has local table and intermediate result
SELECT * FROM ((SELECT * FROM test) EXCEPT (SELECT * FROM test ORDER BY x LIMIT 1)) u JOIN local_test USING (x) ORDER BY 1,2; SELECT * FROM ((SELECT * FROM test) EXCEPT (SELECT * FROM test ORDER BY x LIMIT 1)) u JOIN local_test USING (x) ORDER BY 1,2;
DEBUG: Local tables cannot be used in distributed queries. DEBUG: Local tables cannot be used in distributed queries.
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries
@ -249,8 +252,12 @@ DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) EXCEPT SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) EXCEPT SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT u.x, u.y, local_test.y FROM ((SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) u JOIN recursive_set_local.local_test USING (x)) ORDER BY u.x, u.y DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT u.x, u.y, local_test.y FROM ((SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) u JOIN recursive_set_local.local_test USING (x)) ORDER BY u.x, u.y
DEBUG: Local tables cannot be used in distributed queries. DEBUG: Creating router plan
ERROR: relation local_test is not distributed DEBUG: Plan is router executable
x | y | y
---------------------------------------------------------------------
(0 rows)
-- though we replace some queries including the local query, the intermediate result is on the outer part of an outer join -- though we replace some queries including the local query, the intermediate result is on the outer part of an outer join
SELECT * FROM ((SELECT * FROM local_test) INTERSECT (SELECT * FROM test ORDER BY x LIMIT 1)) u LEFT JOIN test USING (x) ORDER BY 1,2; SELECT * FROM ((SELECT * FROM local_test) INTERSECT (SELECT * FROM test ORDER BY x LIMIT 1)) u LEFT JOIN test USING (x) ORDER BY 1,2;
DEBUG: Local tables cannot be used in distributed queries. DEBUG: Local tables cannot be used in distributed queries.

View File

@ -4,6 +4,14 @@ SET search_path TO with_executors, public;
SET citus.enable_repartition_joins TO on; SET citus.enable_repartition_joins TO on;
CREATE TABLE with_executors.local_table (id int); CREATE TABLE with_executors.local_table (id int);
INSERT INTO local_table VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9), (10); INSERT INTO local_table VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9), (10);
CREATE TABLE ref_table (id int);
SELECT create_reference_table('ref_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO ref_table VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9), (10);
-- CTEs should be able to use local queries -- CTEs should be able to use local queries
WITH cte AS ( WITH cte AS (
WITH local_cte AS ( WITH local_cte AS (
@ -284,11 +292,39 @@ SELECT DISTINCT uid_1, val_3 FROM cte join events_table on cte.val_3=events_tabl
2 | 1 2 | 1
(1 row) (1 row)
-- CTEs should not be able to terminate (the last SELECT) in a local query -- CTEs should be able to terminate (the last SELECT) in a local query
WITH cte AS ( WITH cte AS (
SELECT * FROM users_table SELECT user_id FROM users_table
) )
SELECT count(*) FROM cte JOIN local_table ON (user_id = id); SELECT min(user_id) FROM cte JOIN local_table ON (user_id = id);
min
---------------------------------------------------------------------
1
(1 row)
-- not if there are no distributed tables
WITH cte AS (
SELECT user_id FROM users_table
)
SELECT min(user_id) FROM cte JOIN local_table ON (user_id = id) JOIN events_table USING (user_id);
ERROR: relation local_table is not distributed
-- unless the distributed table is part of a recursively planned subquery
WITH cte AS (
SELECT user_id FROM users_table
)
SELECT min(user_id) FROM cte JOIN local_table ON (user_id = id) JOIN (SELECT * FROM events_table OFFSET 0) e USING (user_id);
min
---------------------------------------------------------------------
1
(1 row)
-- joins between local and reference tables not allowed
-- since the coordinator is not in the metadata at this stage
WITH cte AS (
SELECT user_id FROM users_table
)
SELECT count(*) FROM local_table JOIN ref_table USING (id)
WHERE id IN (SELECT * FROM cte);
ERROR: relation local_table is not distributed ERROR: relation local_table is not distributed
-- CTEs should be able to terminate a router query -- CTEs should be able to terminate a router query
WITH cte AS ( WITH cte AS (
@ -382,4 +418,6 @@ WHERE
users_table.user_id = cte_merge.u_id; users_table.user_id = cte_merge.u_id;
ERROR: Complex subqueries and CTEs are not supported when task_executor_type is set to 'task-tracker' ERROR: Complex subqueries and CTEs are not supported when task_executor_type is set to 'task-tracker'
DROP SCHEMA with_executors CASCADE; DROP SCHEMA with_executors CASCADE;
NOTICE: drop cascades to table local_table NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table local_table
drop cascades to table ref_table

View File

@ -155,6 +155,24 @@ SELECT * FROM ref JOIN local ON (a = x);
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. -- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
-- joins between local tables and distributed tables are disallowed
CREATE TABLE dist_table(a int);
SELECT create_distributed_table('dist_table', 'a');
INSERT INTO dist_table VALUES(1);
SELECT * FROM local JOIN dist_table ON (a = x);
SELECT * FROM local JOIN dist_table ON (a = x) WHERE a = 1;;
-- intermediate results are allowed
WITH cte_1 AS (SELECT * FROM dist_table LIMIT 1)
SELECT * FROM ref JOIN local ON (a = x) JOIN cte_1 ON (local.x = cte_1.a);
-- full router query with CTE and local
WITH cte_1 AS (SELECT * FROM ref LIMIT 1)
SELECT * FROM ref JOIN local ON (a = x) JOIN cte_1 ON (local.x = cte_1.a);
DROP TABLE dist_table;
-- issue #3801 -- issue #3801
SET citus.shard_replication_factor TO 2; SET citus.shard_replication_factor TO 2;
CREATE TABLE dist_table(a int); CREATE TABLE dist_table(a int);

View File

@ -2,7 +2,6 @@ CREATE SCHEMA locally_execute_intermediate_results;
SET search_path TO locally_execute_intermediate_results; SET search_path TO locally_execute_intermediate_results;
SET citus.log_intermediate_results TO TRUE; SET citus.log_intermediate_results TO TRUE;
SET citus.log_local_commands TO TRUE; SET citus.log_local_commands TO TRUE;
SET client_min_messages TO DEBUG1;
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
SET citus.next_shard_id TO 1580000; SET citus.next_shard_id TO 1580000;
@ -18,16 +17,21 @@ SELECT create_distributed_table('table_2', 'key');
CREATE TABLE ref_table (key int, value text); CREATE TABLE ref_table (key int, value text);
SELECT create_reference_table('ref_table'); SELECT create_reference_table('ref_table');
CREATE TABLE local_table (key int, value text);
-- load some data -- load some data
INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6');
INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
INSERT INTO local_table VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6');
-- prevent PG 11 - PG 12 outputs to diverge -- prevent PG 11 - PG 12 outputs to diverge
-- and have a lot more CTEs recursively planned for the -- and have a lot more CTEs recursively planned for the
-- sake of increasing the test coverage -- sake of increasing the test coverage
SET citus.enable_cte_inlining TO false; SET citus.enable_cte_inlining TO false;
SET client_min_messages TO DEBUG1;
-- the query cannot be executed locally, but still because of -- the query cannot be executed locally, but still because of
-- HAVING the intermediate result is written to local file as well -- HAVING the intermediate result is written to local file as well
WITH cte_1 AS (SELECT max(value) FROM table_1) WITH cte_1 AS (SELECT max(value) FROM table_1)
@ -233,6 +237,26 @@ SELECT * FROM
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar
WHERE foo.key != bar.key; WHERE foo.key != bar.key;
-- queries in which the last step has only CTEs can use local tables
WITH cte_1 AS (SELECT max(value) FROM table_1)
SELECT
count(*)
FROM
local_table
GROUP BY key
HAVING max(value) > (SELECT max FROM cte_1);
WITH cte_1 AS (SELECT max(value) FROM table_1),
cte_2 AS (SELECT * FROM table_2)
SELECT
count(*)
FROM
local_table
WHERE
key > (SELECT key FROM cte_2 ORDER BY 1 LIMIT 1)
GROUP BY key
HAVING max(value) > (SELECT max FROM cte_1);
\c - - - :worker_1_port \c - - - :worker_1_port
-- now use the same queries on a worker -- now use the same queries on a worker

View File

@ -874,7 +874,7 @@ DELETE FROM summary_table WHERE (
SELECT 1 FROM pg_catalog.pg_statio_sys_sequences SELECT 1 FROM pg_catalog.pg_statio_sys_sequences
) = null; ) = null;
DELETE FROM summary_table WHERE ( DELETE FROM summary_table WHERE (
SELECT (select action_statement from information_schema.triggers) SELECT (select min(action_statement) from information_schema.triggers)
FROM pg_catalog.pg_statio_sys_sequences FROM pg_catalog.pg_statio_sys_sequences
) = null; ) = null;

View File

@ -63,7 +63,7 @@ INSERT INTO ref VALUES (1), (2), (3);
UPDATE ref SET a = a + 1; UPDATE ref SET a = a + 1;
DELETE FROM ref WHERE a > 3; DELETE FROM ref WHERE a > 3;
-- Test we don't allow reference/local joins on mx workers -- Test we allow reference/local joins on mx workers
CREATE TABLE local_table (a int); CREATE TABLE local_table (a int);
INSERT INTO local_table VALUES (2), (4); INSERT INTO local_table VALUES (2), (4);

View File

@ -33,21 +33,6 @@ BEGIN
RETURN; RETURN;
END; $$ language plpgsql; END; $$ language plpgsql;
-- Is a distributed plan?
CREATE OR REPLACE FUNCTION plan_is_distributed(explain_commmand text)
RETURNS BOOLEAN AS $$
DECLARE
query_plan TEXT;
BEGIN
FOR query_plan IN execute explain_commmand LOOP
IF query_plan LIKE '%Task Count:%'
THEN
RETURN TRUE;
END IF;
END LOOP;
RETURN FALSE;
END; $$ language plpgsql;
-- helper function to quickly run SQL on the whole cluster -- helper function to quickly run SQL on the whole cluster
CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text) CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
RETURNS void LANGUAGE plpgsql AS $$ RETURNS void LANGUAGE plpgsql AS $$

View File

@ -51,7 +51,7 @@ SELECT citus_table_is_visible('numbers_8000001'::regclass::oid);
CREATE TABLE local_table(a int); CREATE TABLE local_table(a int);
INSERT INTO local_table VALUES (2), (4), (7), (20); INSERT INTO local_table VALUES (2), (4), (7), (20);
EXPLAIN SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers; EXPLAIN (COSTS OFF) SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers;
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
-- test non equijoin -- test non equijoin
@ -135,7 +135,7 @@ $$ LANGUAGE sql;
SELECT test_reference_local_join_func(); SELECT test_reference_local_join_func();
-- shouldn't plan locally if modifications happen in CTEs, ... -- CTEs are allowed
WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *)
SELECT * FROM numbers, local_table; SELECT * FROM numbers, local_table;
@ -143,7 +143,6 @@ WITH t AS (SELECT *, my_volatile_fn() x FROM numbers FOR UPDATE)
SELECT * FROM numbers, local_table SELECT * FROM numbers, local_table
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
-- but this should be fine
WITH t AS (SELECT *, my_volatile_fn() x FROM numbers) WITH t AS (SELECT *, my_volatile_fn() x FROM numbers)
SELECT * FROM numbers, local_table SELECT * FROM numbers, local_table
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
@ -186,37 +185,25 @@ DROP VIEW numbers_v, local_table_v;
-- --
-- Joins between reference tables and materialized views are allowed to -- Joins between reference tables and materialized views are allowed to
-- be planned locally. -- be planned to be executed locally.
-- --
CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10; CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10;
REFRESH MATERIALIZED VIEW numbers_v; REFRESH MATERIALIZED VIEW numbers_v;
SELECT public.plan_is_distributed($Q$
EXPLAIN (COSTS FALSE)
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
$Q$);
BEGIN;
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
END;
-- --
-- Joins between reference tables, local tables, and function calls shouldn't -- Joins between reference tables, local tables, and function calls
-- be planned locally. -- are allowed
-- --
SELECT count(*) SELECT count(*)
FROM local_table a, numbers b, generate_series(1, 10) c FROM local_table a, numbers b, generate_series(1, 10) c
WHERE a.a = b.a AND a.a = c; WHERE a.a = b.a AND a.a = c;
-- but it should be okay if the function call is not a data source -- and it should be okay if the function call is not a data source
SELECT public.plan_is_distributed($Q$
EXPLAIN (COSTS FALSE)
SELECT abs(a.a) FROM local_table a, numbers b WHERE a.a = b.a; SELECT abs(a.a) FROM local_table a, numbers b WHERE a.a = b.a;
$Q$);
SELECT public.plan_is_distributed($Q$
EXPLAIN (COSTS FALSE)
SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a); SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a);
$Q$);
TRUNCATE local_table; TRUNCATE local_table;
TRUNCATE numbers; TRUNCATE numbers;

View File

@ -74,9 +74,11 @@ FROM
SELECT * FROM test a WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c UNION SELECT y FROM local_test d) ORDER BY 1,2; SELECT * FROM test a WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c UNION SELECT y FROM local_test d) ORDER BY 1,2;
-- same query with subquery in where is wrapped in CTE -- same query with subquery in where is wrapped in CTE
SET citus.enable_cte_inlining TO off;
SELECT * FROM test a WHERE x IN (WITH cte AS (SELECT x FROM test b UNION SELECT y FROM test c UNION SELECT y FROM local_test d) SELECT * FROM cte) ORDER BY 1,2; SELECT * FROM test a WHERE x IN (WITH cte AS (SELECT x FROM test b UNION SELECT y FROM test c UNION SELECT y FROM local_test d) SELECT * FROM cte) ORDER BY 1,2;
RESET citus.enable_cte_inlining;
-- not supported since local table is joined with a set operation -- supported since final step only has local table and intermediate result
SELECT * FROM ((SELECT * FROM test) EXCEPT (SELECT * FROM test ORDER BY x LIMIT 1)) u JOIN local_test USING (x) ORDER BY 1,2; SELECT * FROM ((SELECT * FROM test) EXCEPT (SELECT * FROM test ORDER BY x LIMIT 1)) u JOIN local_test USING (x) ORDER BY 1,2;
-- though we replace some queries including the local query, the intermediate result is on the outer part of an outer join -- though we replace some queries including the local query, the intermediate result is on the outer part of an outer join

View File

@ -7,6 +7,10 @@ SET citus.enable_repartition_joins TO on;
CREATE TABLE with_executors.local_table (id int); CREATE TABLE with_executors.local_table (id int);
INSERT INTO local_table VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9), (10); INSERT INTO local_table VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9), (10);
CREATE TABLE ref_table (id int);
SELECT create_reference_table('ref_table');
INSERT INTO ref_table VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9), (10);
-- CTEs should be able to use local queries -- CTEs should be able to use local queries
WITH cte AS ( WITH cte AS (
WITH local_cte AS ( WITH local_cte AS (
@ -221,11 +225,31 @@ WITH cte AS (
SELECT DISTINCT uid_1, val_3 FROM cte join events_table on cte.val_3=events_table.event_type ORDER BY 1, 2; SELECT DISTINCT uid_1, val_3 FROM cte join events_table on cte.val_3=events_table.event_type ORDER BY 1, 2;
-- CTEs should not be able to terminate (the last SELECT) in a local query -- CTEs should be able to terminate (the last SELECT) in a local query
WITH cte AS ( WITH cte AS (
SELECT * FROM users_table SELECT user_id FROM users_table
) )
SELECT count(*) FROM cte JOIN local_table ON (user_id = id); SELECT min(user_id) FROM cte JOIN local_table ON (user_id = id);
-- not if there are no distributed tables
WITH cte AS (
SELECT user_id FROM users_table
)
SELECT min(user_id) FROM cte JOIN local_table ON (user_id = id) JOIN events_table USING (user_id);
-- unless the distributed table is part of a recursively planned subquery
WITH cte AS (
SELECT user_id FROM users_table
)
SELECT min(user_id) FROM cte JOIN local_table ON (user_id = id) JOIN (SELECT * FROM events_table OFFSET 0) e USING (user_id);
-- joins between local and reference tables not allowed
-- since the coordinator is not in the metadata at this stage
WITH cte AS (
SELECT user_id FROM users_table
)
SELECT count(*) FROM local_table JOIN ref_table USING (id)
WHERE id IN (SELECT * FROM cte);
-- CTEs should be able to terminate a router query -- CTEs should be able to terminate a router query
WITH cte AS ( WITH cte AS (