From b58665773ba22a563ada3071ea92d1ec70a701d2 Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Wed, 3 May 2023 19:08:08 -0700 Subject: [PATCH] Move all pre-15-defined routines to the bottom of the file --- .../distributed/planner/merge_planner.c | 339 +++++++++--------- 1 file changed, 168 insertions(+), 171 deletions(-) diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 930a44db8..86163e131 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -49,179 +49,8 @@ static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid Node *quals, List *targetList, CmdType commandType); -#endif -/* - * CreateMergePlan attempts to create a plan for the given MERGE SQL - * statement. If planning fails ->planningError is set to a description - * of the failure. - */ -DistributedPlan * -CreateMergePlan(Query *originalQuery, Query *query, - PlannerRestrictionContext *plannerRestrictionContext) -{ - DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); - bool multiShardQuery = false; - Oid targetRelationId = ModifyQueryResultRelationId(originalQuery); - - Assert(originalQuery->commandType == CMD_MERGE); - Assert(OidIsValid(targetRelationId)); - - distributedPlan->targetRelationId = targetRelationId; - distributedPlan->modLevel = RowModifyLevelForQuery(query); - distributedPlan->planningError = MergeQuerySupported(targetRelationId, - originalQuery, - multiShardQuery, - plannerRestrictionContext); - - if (distributedPlan->planningError != NULL) - { - return distributedPlan; - } - - Job *job = RouterJob(originalQuery, plannerRestrictionContext, - &distributedPlan->planningError); - - if (distributedPlan->planningError != NULL) - { - return distributedPlan; - } - - ereport(DEBUG1, (errmsg("Creating MERGE router plan"))); - - distributedPlan->workerJob = job; - distributedPlan->combineQuery = NULL; - - /* MERGE doesn't support RETURNING clause */ - distributedPlan->expectResults = false; - distributedPlan->fastPathRouterPlan = - plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; - - return distributedPlan; -} - - -/* - * MergeQuerySupported does check for a MERGE command in the query, if it finds - * one, it will verify the below criteria - * - Supported tables and combinations in ErrorIfMergeHasUnsupportedTables - * - Distributed tables requirements in ErrorIfDistTablesNotColocated - * - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported - */ -DeferredErrorMessage * -MergeQuerySupported(Oid resultRelationId, Query *originalQuery, bool multiShardQuery, - PlannerRestrictionContext *plannerRestrictionContext) -{ - /* function is void for pre-15 versions of Postgres */ - #if PG_VERSION_NUM < PG_VERSION_15 - - return NULL; - - #else - - /* - * TODO: For now, we are adding an exception where any volatile or stable - * functions are not allowed in the MERGE query, but this will become too - * restrictive as this will prevent many useful and simple cases, such as, - * INSERT VALUES(ts::timestamp), bigserial column inserts etc. But without - * this restriction, we have a potential danger of some of the function(s) - * getting executed at the worker which will result in incorrect behavior. - */ - if (contain_mutable_functions((Node *) originalQuery)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "non-IMMUTABLE functions are not yet supported " - "in MERGE sql with distributed tables ", - NULL, NULL); - } - - List *rangeTableList = ExtractRangeTableEntryList(originalQuery); - - /* - * Fast path queries cannot have merge command, and we prevent the remaining here. - * In Citus we have limited support for MERGE, it's allowed only if all - * the tables(target, source or any CTE) tables are are local i.e. a - * combination of Citus local and Non-Citus tables (regular Postgres tables) - * or distributed tables with some restrictions, please see header of routine - * ErrorIfDistTablesNotColocated for details. - */ - DeferredErrorMessage *deferredError = - ErrorIfMergeHasUnsupportedTables(resultRelationId, - originalQuery, - rangeTableList, - plannerRestrictionContext); - if (deferredError) - { - /* MERGE's unsupported combination, raise the exception */ - RaiseDeferredError(deferredError, ERROR); - } - - deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId, - originalQuery->jointree, - originalQuery->jointree-> - quals, - originalQuery->targetList, - originalQuery->commandType); - if (deferredError) - { - return deferredError; - } - - /* - * MERGE is a special case where we have multiple modify statements - * within itself. Check each INSERT/UPDATE/DELETE individually. - */ - MergeAction *action = NULL; - foreach_ptr(action, originalQuery->mergeActionList) - { - Assert(originalQuery->returningList == NULL); - deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId, - originalQuery->jointree, - action->qual, - action->targetList, - action->commandType); - if (deferredError) - { - /* MERGE's unsupported scenario, raise the exception */ - RaiseDeferredError(deferredError, ERROR); - } - } - - deferredError = - InsertDistributionColumnMatchesSource(resultRelationId, originalQuery); - if (deferredError) - { - /* MERGE's unsupported scenario, raise the exception */ - RaiseDeferredError(deferredError, ERROR); - } - - if (multiShardQuery) - { - deferredError = - DeferErrorIfUnsupportedSubqueryPushdown(originalQuery, - plannerRestrictionContext); - if (deferredError) - { - return deferredError; - } - } - - if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "a join with USING causes an internal naming " - "conflict, use ON instead", NULL, NULL); - } - - return NULL; - - #endif -} - - -#if PG_VERSION_NUM >= PG_VERSION_15 - /* * ErrorIfDistTablesNotColocated Checks to see if * @@ -728,6 +557,174 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, FromExpr *joinTre #endif +/* + * MergeQuerySupported does check for a MERGE command in the query, if it finds + * one, it will verify the below criteria + * - Supported tables and combinations in ErrorIfMergeHasUnsupportedTables + * - Distributed tables requirements in ErrorIfDistTablesNotColocated + * - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported + */ +DeferredErrorMessage * +MergeQuerySupported(Oid resultRelationId, Query *originalQuery, bool multiShardQuery, + PlannerRestrictionContext *plannerRestrictionContext) +{ + /* function is void for pre-15 versions of Postgres */ + #if PG_VERSION_NUM < PG_VERSION_15 + + return NULL; + + #else + + /* + * TODO: For now, we are adding an exception where any volatile or stable + * functions are not allowed in the MERGE query, but this will become too + * restrictive as this will prevent many useful and simple cases, such as, + * INSERT VALUES(ts::timestamp), bigserial column inserts etc. But without + * this restriction, we have a potential danger of some of the function(s) + * getting executed at the worker which will result in incorrect behavior. + */ + if (contain_mutable_functions((Node *) originalQuery)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "non-IMMUTABLE functions are not yet supported " + "in MERGE sql with distributed tables ", + NULL, NULL); + } + + List *rangeTableList = ExtractRangeTableEntryList(originalQuery); + + /* + * Fast path queries cannot have merge command, and we prevent the remaining here. + * In Citus we have limited support for MERGE, it's allowed only if all + * the tables(target, source or any CTE) tables are are local i.e. a + * combination of Citus local and Non-Citus tables (regular Postgres tables) + * or distributed tables with some restrictions, please see header of routine + * ErrorIfDistTablesNotColocated for details. + */ + DeferredErrorMessage *deferredError = + ErrorIfMergeHasUnsupportedTables(resultRelationId, + originalQuery, + rangeTableList, + plannerRestrictionContext); + if (deferredError) + { + /* MERGE's unsupported combination, raise the exception */ + RaiseDeferredError(deferredError, ERROR); + } + + deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId, + originalQuery->jointree, + originalQuery->jointree-> + quals, + originalQuery->targetList, + originalQuery->commandType); + if (deferredError) + { + return deferredError; + } + + /* + * MERGE is a special case where we have multiple modify statements + * within itself. Check each INSERT/UPDATE/DELETE individually. + */ + MergeAction *action = NULL; + foreach_ptr(action, originalQuery->mergeActionList) + { + Assert(originalQuery->returningList == NULL); + deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId, + originalQuery->jointree, + action->qual, + action->targetList, + action->commandType); + if (deferredError) + { + /* MERGE's unsupported scenario, raise the exception */ + RaiseDeferredError(deferredError, ERROR); + } + } + + deferredError = + InsertDistributionColumnMatchesSource(resultRelationId, originalQuery); + if (deferredError) + { + /* MERGE's unsupported scenario, raise the exception */ + RaiseDeferredError(deferredError, ERROR); + } + + if (multiShardQuery) + { + deferredError = + DeferErrorIfUnsupportedSubqueryPushdown(originalQuery, + plannerRestrictionContext); + if (deferredError) + { + return deferredError; + } + } + + if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "a join with USING causes an internal naming " + "conflict, use ON instead", NULL, NULL); + } + + return NULL; + + #endif +} + + +/* + * CreateMergePlan attempts to create a plan for the given MERGE SQL + * statement. If planning fails ->planningError is set to a description + * of the failure. + */ +DistributedPlan * +CreateMergePlan(Query *originalQuery, Query *query, + PlannerRestrictionContext *plannerRestrictionContext) +{ + DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); + bool multiShardQuery = false; + Oid targetRelationId = ModifyQueryResultRelationId(originalQuery); + + Assert(originalQuery->commandType == CMD_MERGE); + Assert(OidIsValid(targetRelationId)); + + distributedPlan->targetRelationId = targetRelationId; + distributedPlan->modLevel = RowModifyLevelForQuery(query); + distributedPlan->planningError = MergeQuerySupported(targetRelationId, + originalQuery, + multiShardQuery, + plannerRestrictionContext); + + if (distributedPlan->planningError != NULL) + { + return distributedPlan; + } + + Job *job = RouterJob(originalQuery, plannerRestrictionContext, + &distributedPlan->planningError); + + if (distributedPlan->planningError != NULL) + { + return distributedPlan; + } + + ereport(DEBUG1, (errmsg("Creating MERGE router plan"))); + + distributedPlan->workerJob = job; + distributedPlan->combineQuery = NULL; + + /* MERGE doesn't support RETURNING clause */ + distributedPlan->expectResults = false; + distributedPlan->fastPathRouterPlan = + plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; + + return distributedPlan; +} + + /* * IsLocalTableModification returns true if the table modified is a Postgres table. * We do not support recursive planning for MERGE yet, so we could have a join