Run update quals code only for queries with outer joins, update planners for error handling, update test cases.

onur-leftjoin_push-improvements
eaydingol 2025-08-13 02:06:34 +03:00
parent b8fd1e64ff
commit 13405e7871
18 changed files with 193 additions and 74 deletions

View File

@ -319,6 +319,35 @@ DefineQualsForShardInterval(RelationShard *relationShard, int attnum, int rtinde
} }
/*
* UpdateWhereClauseForOuterJoinWalker walks over the query tree and
* updates the WHERE clause for outer joins satisfying feasibility conditions.
*/
bool
UpdateWhereClauseForOuterJoinWalker(Node *node, List *relationShardList)
{
if (node == NULL)
{
return false;
}
if (IsA(node, Query))
{
UpdateWhereClauseForOuterJoin((Query *) node, relationShardList);
return query_tree_walker((Query *) node, UpdateWhereClauseForOuterJoinWalker,
relationShardList, QTW_EXAMINE_RTES_BEFORE);
}
if (!IsA(node, RangeTblEntry))
{
return expression_tree_walker(node, UpdateWhereClauseForOuterJoinWalker,
relationShardList);
}
return false;
}
/* /*
* UpdateWhereClauseForOuterJoin * UpdateWhereClauseForOuterJoin
* *
@ -442,7 +471,6 @@ UpdateRelationToShardNames(Node *node, List *relationShardList)
/* want to look at all RTEs, even in subqueries, CTEs and such */ /* want to look at all RTEs, even in subqueries, CTEs and such */
if (IsA(node, Query)) if (IsA(node, Query))
{ {
UpdateWhereClauseForOuterJoin((Query *) node, relationShardList); /* TODO, check this again, we might want to skip this for fast path queries */
return query_tree_walker((Query *) node, UpdateRelationToShardNames, return query_tree_walker((Query *) node, UpdateRelationToShardNames,
relationShardList, QTW_EXAMINE_RTES_BEFORE); relationShardList, QTW_EXAMINE_RTES_BEFORE);
} }

View File

@ -75,17 +75,6 @@
#endif #endif
/* RouterPlanType is used to determine the router plan to invoke */
typedef enum RouterPlanType
{
INSERT_SELECT_INTO_CITUS_TABLE,
INSERT_SELECT_INTO_LOCAL_TABLE,
DML_QUERY,
SELECT_QUERY,
MERGE_QUERY,
REPLAN_WITH_BOUND_PARAMETERS
} RouterPlanType;
static List *plannerRestrictionContextList = NIL; static List *plannerRestrictionContextList = NIL;
int MultiTaskQueryLogLevel = CITUS_LOG_LEVEL_OFF; /* multi-task query log level */ int MultiTaskQueryLogLevel = CITUS_LOG_LEVEL_OFF; /* multi-task query log level */
static uint64 NextPlanId = 1; static uint64 NextPlanId = 1;
@ -1097,7 +1086,8 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina
* set_plan_references>add_rtes_to_flat_rtable>add_rte_to_flat_rtable. * set_plan_references>add_rtes_to_flat_rtable>add_rte_to_flat_rtable.
*/ */
List *subPlanList = GenerateSubplansForSubqueriesAndCTEs(planId, originalQuery, List *subPlanList = GenerateSubplansForSubqueriesAndCTEs(planId, originalQuery,
plannerRestrictionContext); plannerRestrictionContext,
routerPlan);
/* /*
* If subqueries were recursively planned then we need to replan the query * If subqueries were recursively planned then we need to replan the query

View File

@ -766,7 +766,8 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
{ {
/* first apply toplevel pushdown checks to SELECT query */ /* first apply toplevel pushdown checks to SELECT query */
error = error =
DeferErrorIfUnsupportedSubqueryPushdown(subquery, plannerRestrictionContext); DeferErrorIfUnsupportedSubqueryPushdown(subquery, plannerRestrictionContext,
true);
if (error) if (error)
{ {
return error; return error;

View File

@ -1149,7 +1149,8 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
{ {
deferredError = deferredError =
DeferErrorIfUnsupportedSubqueryPushdown(query, DeferErrorIfUnsupportedSubqueryPushdown(query,
plannerRestrictionContext); plannerRestrictionContext,
true);
if (deferredError) if (deferredError)
{ {
ereport(DEBUG1, (errmsg("Sub-query is not pushable, try repartitioning"))); ereport(DEBUG1, (errmsg("Sub-query is not pushable, try repartitioning")));

View File

@ -168,13 +168,15 @@ static uint32 HashPartitionCount(void);
static Job * BuildJobTreeTaskList(Job *jobTree, static Job * BuildJobTreeTaskList(Job *jobTree,
PlannerRestrictionContext *plannerRestrictionContext); PlannerRestrictionContext *plannerRestrictionContext);
static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction, static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction,
Bitmapset *distributedTables); Bitmapset *distributedTables,
bool *outerPartHasDistributedTable);
static void ErrorIfUnsupportedShardDistribution(Query *query); static void ErrorIfUnsupportedShardDistribution(Query *query);
static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext, RelationRestrictionContext *restrictionContext,
uint32 taskId, uint32 taskId,
TaskType taskType, TaskType taskType,
bool modifyRequiresCoordinatorEvaluation, bool modifyRequiresCoordinatorEvaluation,
bool updateQualsForOuterJoin,
DeferredErrorMessage **planningError); DeferredErrorMessage **planningError);
static List * SqlTaskList(Job *job); static List * SqlTaskList(Job *job);
static bool DependsOnHashPartitionJob(Job *job); static bool DependsOnHashPartitionJob(Job *job);
@ -2248,6 +2250,8 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
} }
/* In the second loop, populate taskRequiredForShardIndex */ /* In the second loop, populate taskRequiredForShardIndex */
bool updateQualsForOuterJoin = false;
bool outerPartHasDistributedTable = false;
forboth_ptr(prunedShardList, prunedRelationShardList, forboth_ptr(prunedShardList, prunedRelationShardList,
relationRestriction, relationRestrictionContext->relationRestrictionList) relationRestriction, relationRestrictionContext->relationRestrictionList)
{ {
@ -2270,9 +2274,21 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
* the table is part of the non-outer side of the join and the outer side has a * the table is part of the non-outer side of the join and the outer side has a
* distributed table. * distributed table.
*/ */
if (IsInnerTableOfOuterJoin(relationRestriction, distributedTableIndex)) if (IsInnerTableOfOuterJoin(relationRestriction, distributedTableIndex,
&outerPartHasDistributedTable))
{ {
continue; if (outerPartHasDistributedTable)
{
/* we can skip the shards from this relation restriction */
continue;
}
else
{
/* The outer part does not include distributed tables, we can not skip shards.
* Also, we will possibly update the quals of the outer relation for recurring join push down, mark here.
*/
updateQualsForOuterJoin = true;
}
} }
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
@ -2305,6 +2321,7 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
taskIdIndex, taskIdIndex,
taskType, taskType,
modifyRequiresCoordinatorEvaluation, modifyRequiresCoordinatorEvaluation,
updateQualsForOuterJoin,
planningError); planningError);
if (*planningError != NULL) if (*planningError != NULL)
{ {
@ -2337,13 +2354,16 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
* RelationRestriction if the table accessed for this relation is * RelationRestriction if the table accessed for this relation is
* a) in an outer join * a) in an outer join
* b) on the inner part of said join * b) on the inner part of said join
* c) the outer part of the join has a distributed table *
* The function also sets outerPartHasDistributedTable if the outer part
* of the corresponding join has a distributed table.
* *
* The function returns true only if all three conditions above hold true. * The function returns true only if all three conditions above hold true.
*/ */
static bool static bool
IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction, IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction,
Bitmapset *distributedTables) Bitmapset *distributedTables,
bool *outerPartHasDistributedTable)
{ {
RestrictInfo *joinInfo = NULL; RestrictInfo *joinInfo = NULL;
foreach_declared_ptr(joinInfo, relationRestriction->relOptInfo->joininfo) foreach_declared_ptr(joinInfo, relationRestriction->relOptInfo->joininfo)
@ -2364,14 +2384,12 @@ IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction,
if (!isInOuter) if (!isInOuter)
{ {
/* this table is joined in the inner part of an outer join */ /* this table is joined in the inner part of an outer join */
/* check if the outer part has a distributed relation */ /* set if the outer part has a distributed relation */
bool outerPartHasDistributedTable = bms_overlap(joinInfo->outer_relids, *outerPartHasDistributedTable = bms_overlap(joinInfo->outer_relids,
distributedTables); distributedTables);
if (outerPartHasDistributedTable)
{ /* this is an inner table of an outer join */
/* this is an inner table of an outer join with a distributed table */ return true;
return true;
}
} }
} }
@ -2487,6 +2505,7 @@ static Task *
QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext, uint32 taskId, RelationRestrictionContext *restrictionContext, uint32 taskId,
TaskType taskType, bool modifyRequiresCoordinatorEvaluation, TaskType taskType, bool modifyRequiresCoordinatorEvaluation,
bool updateQualsForOuterJoin,
DeferredErrorMessage **planningError) DeferredErrorMessage **planningError)
{ {
Query *taskQuery = copyObject(originalQuery); Query *taskQuery = copyObject(originalQuery);
@ -2579,6 +2598,10 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
*/ */
UpdateRelationToShardNames((Node *) taskQuery, relationShardList); UpdateRelationToShardNames((Node *) taskQuery, relationShardList);
if (updateQualsForOuterJoin)
{
UpdateWhereClauseForOuterJoinWalker((Node *) taskQuery, relationShardList);
}
/* /*
* Ands are made implicit during shard pruning, as predicate comparison and * Ands are made implicit during shard pruning, as predicate comparison and

View File

@ -1321,7 +1321,8 @@ MultiShardUpdateDeleteSupported(Query *originalQuery,
{ {
errorMessage = DeferErrorIfUnsupportedSubqueryPushdown( errorMessage = DeferErrorIfUnsupportedSubqueryPushdown(
originalQuery, originalQuery,
plannerRestrictionContext); plannerRestrictionContext,
true);
} }
return errorMessage; return errorMessage;

View File

@ -88,7 +88,7 @@ static bool WindowPartitionOnDistributionColumn(Query *query);
static DeferredErrorMessage * DeferErrorIfFromClauseRecurs(Query *queryTree); static DeferredErrorMessage * DeferErrorIfFromClauseRecurs(Query *queryTree);
static RecurringTuplesType FromClauseRecurringTupleType(Query *queryTree); static RecurringTuplesType FromClauseRecurringTupleType(Query *queryTree);
static DeferredErrorMessage * DeferredErrorIfUnsupportedRecurringTuplesJoin( static DeferredErrorMessage * DeferredErrorIfUnsupportedRecurringTuplesJoin(
PlannerRestrictionContext *plannerRestrictionContext); PlannerRestrictionContext *plannerRestrictionContext, bool plannerPhase);
static DeferredErrorMessage * DeferErrorIfUnsupportedTableCombination(Query *queryTree); static DeferredErrorMessage * DeferErrorIfUnsupportedTableCombination(Query *queryTree);
static DeferredErrorMessage * DeferErrorIfSubqueryRequiresMerge(Query *subqueryTree, bool static DeferredErrorMessage * DeferErrorIfSubqueryRequiresMerge(Query *subqueryTree, bool
lateral, lateral,
@ -536,9 +536,16 @@ SubqueryMultiNodeTree(Query *originalQuery, Query *queryTree,
RaiseDeferredError(unsupportedQueryError, ERROR); RaiseDeferredError(unsupportedQueryError, ERROR);
} }
/*
* We reach here at the third step of the planning, thus we already checked for pushed down
* feasibility of recurring outer joins, at this step the unsupported outer join check should
* only generate an error when there is a lateral subquery.
*/
DeferredErrorMessage *subqueryPushdownError = DeferErrorIfUnsupportedSubqueryPushdown( DeferredErrorMessage *subqueryPushdownError = DeferErrorIfUnsupportedSubqueryPushdown(
originalQuery, originalQuery,
plannerRestrictionContext); plannerRestrictionContext,
false);
if (subqueryPushdownError != NULL) if (subqueryPushdownError != NULL)
{ {
RaiseDeferredError(subqueryPushdownError, ERROR); RaiseDeferredError(subqueryPushdownError, ERROR);
@ -561,7 +568,8 @@ SubqueryMultiNodeTree(Query *originalQuery, Query *queryTree,
DeferredErrorMessage * DeferredErrorMessage *
DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery, DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext) plannerRestrictionContext,
bool plannerPhase)
{ {
bool outerMostQueryHasLimit = false; bool outerMostQueryHasLimit = false;
ListCell *subqueryCell = NULL; ListCell *subqueryCell = NULL;
@ -613,7 +621,8 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
return error; return error;
} }
error = DeferredErrorIfUnsupportedRecurringTuplesJoin(plannerRestrictionContext); error = DeferredErrorIfUnsupportedRecurringTuplesJoin(plannerRestrictionContext,
plannerPhase);
if (error) if (error)
{ {
return error; return error;
@ -771,12 +780,17 @@ FromClauseRecurringTupleType(Query *queryTree)
* DeferredErrorIfUnsupportedRecurringTuplesJoin returns a DeferredError if * DeferredErrorIfUnsupportedRecurringTuplesJoin returns a DeferredError if
* there exists a join between a recurring rel (such as reference tables * there exists a join between a recurring rel (such as reference tables
* and intermediate_results) and a non-recurring rel (such as distributed tables * and intermediate_results) and a non-recurring rel (such as distributed tables
* and subqueries that we can push-down to worker nodes) that can return an * and subqueries that we can push-down to worker nodes) when plannerPhase is
* incorrect result set due to recurring tuples coming from the recurring rel. * true, so that we try to recursively plan these joins.
* During recursive planning phase, we either replace those with recursive plans
* or leave them if it is safe to push-down.
* During the logical planning phase (plannerPhase is false), we only check if
* such queries have lateral subqueries.
*/ */
static DeferredErrorMessage * static DeferredErrorMessage *
DeferredErrorIfUnsupportedRecurringTuplesJoin( DeferredErrorIfUnsupportedRecurringTuplesJoin(
PlannerRestrictionContext *plannerRestrictionContext) PlannerRestrictionContext *plannerRestrictionContext,
bool plannerPhase)
{ {
List *joinRestrictionList = List *joinRestrictionList =
plannerRestrictionContext->joinRestrictionContext->joinRestrictionList; plannerRestrictionContext->joinRestrictionContext->joinRestrictionList;
@ -829,6 +843,16 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin(
if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, outerrelRelids)) if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, outerrelRelids))
{ {
if (plannerPhase)
{
/*
* We have not yet tried to recursively plan this join, we should
* defer an error.
*/
recurType = FetchFirstRecurType(plannerInfo, outerrelRelids);
break;
}
/* /*
* Inner side contains distributed rels but the outer side only * Inner side contains distributed rels but the outer side only
* contains recurring rels, might be an unsupported lateral outer * contains recurring rels, might be an unsupported lateral outer

View File

@ -110,6 +110,7 @@ struct RecursivePlanningContextInternal
List *subPlanList; List *subPlanList;
PlannerRestrictionContext *plannerRestrictionContext; PlannerRestrictionContext *plannerRestrictionContext;
bool restrictionEquivalenceCheck; bool restrictionEquivalenceCheck;
bool forceRecursivePlanning;
}; };
/* track depth of current recursive planner query */ /* track depth of current recursive planner query */
@ -214,7 +215,8 @@ static bool hasPseudoconstantQuals(
*/ */
List * List *
GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext) PlannerRestrictionContext *plannerRestrictionContext,
RouterPlanType routerPlan)
{ {
RecursivePlanningContext context; RecursivePlanningContext context;
@ -228,6 +230,17 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
context.planId = planId; context.planId = planId;
context.subPlanList = NIL; context.subPlanList = NIL;
context.plannerRestrictionContext = plannerRestrictionContext; context.plannerRestrictionContext = plannerRestrictionContext;
context.forceRecursivePlanning = false;
/*
* Force recursive planning of recurring outer joins for these queries
* since the planning error from the previous step is generated prior to
* the actual planning attempt.
*/
if (routerPlan == DML_QUERY)
{
context.forceRecursivePlanning = true;
}
/* /*
* Calculating the distribution key equality upfront is a trade-off for us. * Calculating the distribution key equality upfront is a trade-off for us.
@ -756,7 +769,8 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
/* <recurring> left join <distributed> */ /* <recurring> left join <distributed> */
if (leftNodeRecurs && !rightNodeRecurs) if (leftNodeRecurs && !rightNodeRecurs)
{ {
if (chainedJoin || !CheckPushDownFeasibilityOuterJoin(joinExpr, if (recursivePlanningContext->forceRecursivePlanning ||
chainedJoin || !CheckPushDownFeasibilityOuterJoin(joinExpr,
query)) query))
{ {
ereport(DEBUG1, (errmsg("recursively planning right side of " ereport(DEBUG1, (errmsg("recursively planning right side of "
@ -787,7 +801,8 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
/* <distributed> right join <recurring> */ /* <distributed> right join <recurring> */
if (!leftNodeRecurs && rightNodeRecurs) if (!leftNodeRecurs && rightNodeRecurs)
{ {
if (chainedJoin || !CheckPushDownFeasibilityOuterJoin(joinExpr, if (recursivePlanningContext->forceRecursivePlanning ||
chainedJoin || !CheckPushDownFeasibilityOuterJoin(joinExpr,
query)) query))
{ {
ereport(DEBUG1, (errmsg("recursively planning left side of " ereport(DEBUG1, (errmsg("recursively planning left side of "

View File

@ -26,6 +26,7 @@
extern void RebuildQueryStrings(Job *workerJob); extern void RebuildQueryStrings(Job *workerJob);
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
extern void UpdateWhereClauseForOuterJoin(Query *query, List *relationShardList); extern void UpdateWhereClauseForOuterJoin(Query *query, List *relationShardList);
extern bool UpdateWhereClauseForOuterJoinWalker(Node *node, List *relationShardList);
Node * DefineQualsForShardInterval(RelationShard *relationShard, int attnum, int Node * DefineQualsForShardInterval(RelationShard *relationShard, int attnum, int
outerRtIndex); outerRtIndex);
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);

View File

@ -33,6 +33,18 @@
extern int PlannerLevel; extern int PlannerLevel;
/* RouterPlanType is used to determine the router plan to invoke */
typedef enum RouterPlanType
{
INSERT_SELECT_INTO_CITUS_TABLE,
INSERT_SELECT_INTO_LOCAL_TABLE,
DML_QUERY,
SELECT_QUERY,
MERGE_QUERY,
REPLAN_WITH_BOUND_PARAMETERS
} RouterPlanType;
typedef struct RelationRestrictionContext typedef struct RelationRestrictionContext
{ {
bool allReferenceTables; bool allReferenceTables;

View File

@ -37,11 +37,12 @@ extern MultiNode * SubqueryMultiNodeTree(Query *originalQuery,
Query *queryTree, Query *queryTree,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
extern DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryPushdown(Query * extern DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryPushdown(
originalQuery, Query *originalQuery,
PlannerRestrictionContext PlannerRestrictionContext
* *
plannerRestrictionContext); plannerRestrictionContext,
bool plannerPhase);
extern DeferredErrorMessage * DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, extern DeferredErrorMessage * DeferErrorIfCannotPushdownSubquery(Query *subqueryTree,
bool bool
outerMostQueryHasLimit); outerMostQueryHasLimit);

View File

@ -16,6 +16,7 @@
#include "pg_version_constants.h" #include "pg_version_constants.h"
#include "distributed/distributed_planner.h"
#include "distributed/errormessage.h" #include "distributed/errormessage.h"
#include "distributed/log_utils.h" #include "distributed/log_utils.h"
#include "distributed/relation_restriction_equivalence.h" #include "distributed/relation_restriction_equivalence.h"
@ -33,7 +34,8 @@ extern PlannerRestrictionContext * GetPlannerRestrictionContext(
RecursivePlanningContext *recursivePlanningContext); RecursivePlanningContext *recursivePlanningContext);
extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext,
RouterPlanType routerPlan);
extern char * GenerateResultId(uint64 planId, uint32 subPlanId); extern char * GenerateResultId(uint64 planId, uint32 subPlanId);
extern Query * BuildSubPlanResultQuery(List *targetEntryList, List *columnAliasList, extern Query * BuildSubPlanResultQuery(List *targetEntryList, List *columnAliasList,
char *resultId); char *resultId);

View File

@ -760,15 +760,28 @@ DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_133000
DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (multi_insert_select.raw_events_first_13300001 raw_events_first LEFT JOIN multi_insert_select.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE (raw_events_first.user_id IS NOT NULL) DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (multi_insert_select.raw_events_first_13300001 raw_events_first LEFT JOIN multi_insert_select.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE (raw_events_first.user_id IS NOT NULL)
DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (multi_insert_select.raw_events_first_13300002 raw_events_first LEFT JOIN multi_insert_select.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE (raw_events_first.user_id IS NOT NULL) DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (multi_insert_select.raw_events_first_13300002 raw_events_first LEFT JOIN multi_insert_select.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE (raw_events_first.user_id IS NOT NULL)
DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (multi_insert_select.raw_events_first_13300003 raw_events_first LEFT JOIN multi_insert_select.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE (raw_events_first.user_id IS NOT NULL) DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (multi_insert_select.raw_events_first_13300003 raw_events_first LEFT JOIN multi_insert_select.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE (raw_events_first.user_id IS NOT NULL)
SET client_min_messages to debug3;
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_second.user_id raw_events_second.user_id
FROM FROM
reference_table LEFT JOIN raw_events_second ON reference_table.user_id = raw_events_second.user_id; reference_table LEFT JOIN raw_events_second ON reference_table.user_id = raw_events_second.user_id;
DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_second.user_id FROM (multi_insert_select.reference_table_13300012 reference_table LEFT JOIN multi_insert_select.raw_events_second_13300004 raw_events_second ON ((reference_table.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE ((raw_events_second.user_id IS NOT NULL) AND ((reference_table.user_id IS NULL) OR ((btint4cmp('-2147483648'::integer, hashint4(reference_table.user_id)) OPERATOR(pg_catalog.<) 0) AND (btint4cmp(hashint4(reference_table.user_id), '-1073741825'::integer) OPERATOR(pg_catalog.<=) 0)))) DEBUG: no shard pruning constraints on raw_events_second found
DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_second.user_id FROM (multi_insert_select.reference_table_13300012 reference_table LEFT JOIN multi_insert_select.raw_events_second_13300005 raw_events_second ON ((reference_table.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE ((raw_events_second.user_id IS NOT NULL) AND ((btint4cmp('-1073741824'::integer, hashint4(reference_table.user_id)) OPERATOR(pg_catalog.<) 0) AND (btint4cmp(hashint4(reference_table.user_id), '-1'::integer) OPERATOR(pg_catalog.<=) 0))) DEBUG: shard count after pruning for raw_events_second: 4
DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_second.user_id FROM (multi_insert_select.reference_table_13300012 reference_table LEFT JOIN multi_insert_select.raw_events_second_13300006 raw_events_second ON ((reference_table.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE ((raw_events_second.user_id IS NOT NULL) AND ((btint4cmp(0, hashint4(reference_table.user_id)) OPERATOR(pg_catalog.<) 0) AND (btint4cmp(hashint4(reference_table.user_id), 1073741823) OPERATOR(pg_catalog.<=) 0))) DEBUG: cannot perform a lateral outer join when a distributed subquery references a reference table
DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_second.user_id FROM (multi_insert_select.reference_table_13300012 reference_table LEFT JOIN multi_insert_select.raw_events_second_13300007 raw_events_second ON ((reference_table.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE ((raw_events_second.user_id IS NOT NULL) AND ((btint4cmp(1073741824, hashint4(reference_table.user_id)) OPERATOR(pg_catalog.<) 0) AND (btint4cmp(hashint4(reference_table.user_id), 2147483647) OPERATOR(pg_catalog.<=) 0))) DEBUG: no shard pruning constraints on raw_events_second found
DEBUG: shard count after pruning for raw_events_second: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on raw_events_second found
DEBUG: shard count after pruning for raw_events_second: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: performing repartitioned INSERT ... SELECT
DEBUG: partitioning SELECT query by column index 0 with name 'user_id'
SET client_min_messages to debug2;
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_first.user_id raw_events_first.user_id
@ -3372,17 +3385,21 @@ $$);
Task Count: 1 Task Count: 1
(4 rows) (4 rows)
-- verify that insert select can be pushed down when we have reference table in outside of outer join. -- verify that insert select cannot be pushed down when we have reference table in outside of outer join in a chained-join.
SELECT coordinator_plan($$ SELECT coordinator_plan($$
EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT a.id FROM dist_table_5 a LEFT JOIN ref_table_1 b ON (true) RIGHT JOIN ref_table_1 c ON (true); EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT a.id FROM dist_table_5 a LEFT JOIN ref_table_1 b ON (true) RIGHT JOIN ref_table_1 c ON (true);
$$); $$);
coordinator_plan coordinator_plan
--------------------------------------------------------------------- ---------------------------------------------------------------------
Custom Scan (Citus Adaptive) Custom Scan (Citus INSERT ... SELECT)
Task Count: 4 INSERT/SELECT method: pull to coordinator
(2 rows) -> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
(6 rows)
-- verify that insert select can be pushed down when we have reference table in outside of left join. -- verify that insert select can be pushed down when we have reference table in outside of outer join.
SELECT coordinator_plan($$ SELECT coordinator_plan($$
EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT id FROM ref_table_1 LEFT JOIN dist_table_5 USING(id); EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT id FROM ref_table_1 LEFT JOIN dist_table_5 USING(id);
$$); $$);
@ -3394,7 +3411,7 @@ $$);
Task Count: 4 Task Count: 4
(4 rows) (4 rows)
-- verify that insert select cannot be pushed down when we have reference table in outside of left join and joined on non-distribution column. -- verify that insert select cannot be pushed down when we have reference table in outside of left join and joined on non-partition column.
SELECT coordinator_plan($$ SELECT coordinator_plan($$
EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT ref_table_1.id FROM ref_table_1 LEFT JOIN dist_table_5 ON ref_table_1.id = dist_table_5.id2; EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT ref_table_1.id FROM ref_table_1 LEFT JOIN dist_table_5 ON ref_table_1.id = dist_table_5.id2;
$$); $$);

View File

@ -152,9 +152,9 @@ SELECT
FROM FROM
users_table RIGHT JOIN users_reference_table USING (user_id) users_table RIGHT JOIN users_reference_table USING (user_id)
WHERE WHERE
users_reference_table.value_2 IN (users_reference_table.value_2, random()*0) IN
(SELECT (SELECT
value_2 value_2, 0
FROM FROM
events_table events_table
WHERE WHERE

View File

@ -1996,8 +1996,7 @@ WITH cte AS (
) )
SELECT (a+5)*2, b FROM cte; SELECT (a+5)*2, b FROM cte;
DEBUG: CTE cte is going to be inlined via distributed planning DEBUG: CTE cte is going to be inlined via distributed planning
DEBUG: cannot push down this subquery DEBUG: cannot perform a lateral outer join when a distributed subquery references a reference table
DETAIL: Distinct on columns without partition column is currently unsupported
DEBUG: generating subplan XXX_1 for subquery SELECT DISTINCT reference_table.a, 1 AS b FROM (query_single_shard_table.distributed_table RIGHT JOIN query_single_shard_table.reference_table USING (a)) DEBUG: generating subplan XXX_1 for subquery SELECT DISTINCT reference_table.a, 1 AS b FROM (query_single_shard_table.distributed_table RIGHT JOIN query_single_shard_table.reference_table USING (a))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((a OPERATOR(pg_catalog.+) 5) OPERATOR(pg_catalog.*) 2) AS user_id, b AS value_1 FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((a OPERATOR(pg_catalog.+) 5) OPERATOR(pg_catalog.*) 2) AS user_id, b AS value_1 FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator

View File

@ -1820,8 +1820,10 @@ BEGIN;
) q ) q
WHERE dist_5.a = q.a WHERE dist_5.a = q.a
RETURNING *; RETURNING *;
DEBUG: generating subplan XXX_1 for subquery SELECT t1.a, t1.b FROM (recurring_outer_join.ref_1 t1 LEFT JOIN (SELECT DISTINCT ON (t2.a) t2.a, t2.b FROM recurring_outer_join.dist_1 t2 WHERE (EXISTS (SELECT t4.a, t4.b FROM recurring_outer_join.dist_1 t4 WHERE (t4.a OPERATOR(pg_catalog.=) t2.a))) ORDER BY t2.a, t2.b) t3 USING (a)) DEBUG: recursively planning right side of the left join since the outer side is a recurring rel
DEBUG: Plan XXX query after replacing subqueries and CTEs: DELETE FROM recurring_outer_join.dist_5 USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) q WHERE (dist_5.a OPERATOR(pg_catalog.=) q.a) RETURNING dist_5.a, dist_5.b, q.a, q.b DEBUG: recursively planning the distributed subquery since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: generating subplan XXX_1 for subquery SELECT DISTINCT ON (a) a, b FROM recurring_outer_join.dist_1 t2 WHERE (EXISTS (SELECT t4.a, t4.b FROM recurring_outer_join.dist_1 t4 WHERE (t4.a OPERATOR(pg_catalog.=) t2.a))) ORDER BY a, b
DEBUG: Plan XXX query after replacing subqueries and CTEs: DELETE FROM recurring_outer_join.dist_5 USING (SELECT t1.a, t1.b FROM (recurring_outer_join.ref_1 t1 LEFT JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t3 USING (a))) q WHERE (dist_5.a OPERATOR(pg_catalog.=) q.a) RETURNING dist_5.a, dist_5.b, q.a, q.b
a | b | a | b a | b | a | b
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 10 | 1 | 11 1 | 10 | 1 | 11
@ -1849,8 +1851,10 @@ BEGIN;
USING (a) USING (a)
) )
RETURNING *; RETURNING *;
DEBUG: generating subplan XXX_1 for subquery SELECT t1.a FROM (recurring_outer_join.ref_1 t1 LEFT JOIN (SELECT t2.a, t2.b FROM recurring_outer_join.dist_1 t2 WHERE (EXISTS (SELECT t4.a, t4.b FROM recurring_outer_join.dist_1 t4 WHERE (t4.a OPERATOR(pg_catalog.=) t2.a)))) t3 USING (a)) DEBUG: recursively planning right side of the left join since the outer side is a recurring rel
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE recurring_outer_join.dist_5 SET b = 10 WHERE (a OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer))) RETURNING a, b DEBUG: recursively planning the distributed subquery since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM recurring_outer_join.dist_1 t2 WHERE (EXISTS (SELECT t4.a, t4.b FROM recurring_outer_join.dist_1 t4 WHERE (t4.a OPERATOR(pg_catalog.=) t2.a)))
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE recurring_outer_join.dist_5 SET b = 10 WHERE (a OPERATOR(pg_catalog.=) ANY (SELECT t1.a FROM (recurring_outer_join.ref_1 t1 LEFT JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t3 USING (a)))) RETURNING a, b
a | b a | b
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 10 1 | 10
@ -1871,8 +1875,7 @@ BEGIN;
FROM ref_1 t1 FROM ref_1 t1
LEFT JOIN dist_1 t2 LEFT JOIN dist_1 t2
ON (t1.a = t2.a); ON (t1.a = t2.a);
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: cannot perform a lateral outer join when a distributed subquery references a reference table
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: performing repartitioned INSERT ... SELECT DEBUG: performing repartitioned INSERT ... SELECT
ROLLBACK; ROLLBACK;
-- INSERT .. SELECT: pull to coordinator -- INSERT .. SELECT: pull to coordinator
@ -1883,8 +1886,7 @@ BEGIN;
FROM ref_1 t1 FROM ref_1 t1
LEFT JOIN dist_1 t2 LEFT JOIN dist_1 t2
ON (t1.b = t2.b); ON (t1.b = t2.b);
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: cannot perform a lateral outer join when a distributed subquery references a reference table
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel DEBUG: recursively planning right side of the left join since the outer side is a recurring rel
DEBUG: recursively planning distributed relation "dist_1" "t2" since it is part of a distributed join node that is outer joined with a recurring rel DEBUG: recursively planning distributed relation "dist_1" "t2" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "dist_1" "t2" to a subquery DEBUG: Wrapping relation "dist_1" "t2" to a subquery

View File

@ -577,12 +577,14 @@ FROM
FROM FROM
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id; raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id;
SET client_min_messages to debug3;
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_second.user_id raw_events_second.user_id
FROM FROM
reference_table LEFT JOIN raw_events_second ON reference_table.user_id = raw_events_second.user_id; reference_table LEFT JOIN raw_events_second ON reference_table.user_id = raw_events_second.user_id;
SET client_min_messages to debug2;
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_first.user_id raw_events_first.user_id
@ -2397,17 +2399,17 @@ SELECT coordinator_plan($$
EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT id, (SELECT id FROM ref_table_1 WHERE id = 1) FROM ref_table_1; EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT id, (SELECT id FROM ref_table_1 WHERE id = 1) FROM ref_table_1;
$$); $$);
-- verify that insert select can be pushed down when we have reference table in outside of outer join. -- verify that insert select cannot be pushed down when we have reference table in outside of outer join in a chained-join.
SELECT coordinator_plan($$ SELECT coordinator_plan($$
EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT a.id FROM dist_table_5 a LEFT JOIN ref_table_1 b ON (true) RIGHT JOIN ref_table_1 c ON (true); EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT a.id FROM dist_table_5 a LEFT JOIN ref_table_1 b ON (true) RIGHT JOIN ref_table_1 c ON (true);
$$); $$);
-- verify that insert select can be pushed down when we have reference table in outside of left join. -- verify that insert select can be pushed down when we have reference table in outside of outer join.
SELECT coordinator_plan($$ SELECT coordinator_plan($$
EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT id FROM ref_table_1 LEFT JOIN dist_table_5 USING(id); EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT id FROM ref_table_1 LEFT JOIN dist_table_5 USING(id);
$$); $$);
-- verify that insert select cannot be pushed down when we have reference table in outside of left join and joined on non-distribution column. -- verify that insert select cannot be pushed down when we have reference table in outside of left join and joined on non-partition column.
SELECT coordinator_plan($$ SELECT coordinator_plan($$
EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT ref_table_1.id FROM ref_table_1 LEFT JOIN dist_table_5 ON ref_table_1.id = dist_table_5.id2; EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT ref_table_1.id FROM ref_table_1 LEFT JOIN dist_table_5 ON ref_table_1.id = dist_table_5.id2;
$$); $$);

View File

@ -132,9 +132,9 @@ SELECT
FROM FROM
users_table RIGHT JOIN users_reference_table USING (user_id) users_table RIGHT JOIN users_reference_table USING (user_id)
WHERE WHERE
users_reference_table.value_2 IN (users_reference_table.value_2, random()*0) IN
(SELECT (SELECT
value_2 value_2, 0
FROM FROM
events_table events_table
WHERE WHERE