Move all pre-15-defined routines to the bottom of the file

pull/6893/head
Teja Mupparti 2023-05-03 19:08:08 -07:00 committed by Teja Mupparti
parent 072ae44742
commit b58665773b
1 changed files with 168 additions and 171 deletions

View File

@ -49,179 +49,8 @@ static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid
Node *quals,
List *targetList,
CmdType commandType);
#endif
/*
* CreateMergePlan attempts to create a plan for the given MERGE SQL
* statement. If planning fails ->planningError is set to a description
* of the failure.
*/
DistributedPlan *
CreateMergePlan(Query *originalQuery, Query *query,
PlannerRestrictionContext *plannerRestrictionContext)
{
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
bool multiShardQuery = false;
Oid targetRelationId = ModifyQueryResultRelationId(originalQuery);
Assert(originalQuery->commandType == CMD_MERGE);
Assert(OidIsValid(targetRelationId));
distributedPlan->targetRelationId = targetRelationId;
distributedPlan->modLevel = RowModifyLevelForQuery(query);
distributedPlan->planningError = MergeQuerySupported(targetRelationId,
originalQuery,
multiShardQuery,
plannerRestrictionContext);
if (distributedPlan->planningError != NULL)
{
return distributedPlan;
}
Job *job = RouterJob(originalQuery, plannerRestrictionContext,
&distributedPlan->planningError);
if (distributedPlan->planningError != NULL)
{
return distributedPlan;
}
ereport(DEBUG1, (errmsg("Creating MERGE router plan")));
distributedPlan->workerJob = job;
distributedPlan->combineQuery = NULL;
/* MERGE doesn't support RETURNING clause */
distributedPlan->expectResults = false;
distributedPlan->fastPathRouterPlan =
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
return distributedPlan;
}
/*
* MergeQuerySupported does check for a MERGE command in the query, if it finds
* one, it will verify the below criteria
* - Supported tables and combinations in ErrorIfMergeHasUnsupportedTables
* - Distributed tables requirements in ErrorIfDistTablesNotColocated
* - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported
*/
DeferredErrorMessage *
MergeQuerySupported(Oid resultRelationId, Query *originalQuery, bool multiShardQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{
/* function is void for pre-15 versions of Postgres */
#if PG_VERSION_NUM < PG_VERSION_15
return NULL;
#else
/*
* TODO: For now, we are adding an exception where any volatile or stable
* functions are not allowed in the MERGE query, but this will become too
* restrictive as this will prevent many useful and simple cases, such as,
* INSERT VALUES(ts::timestamp), bigserial column inserts etc. But without
* this restriction, we have a potential danger of some of the function(s)
* getting executed at the worker which will result in incorrect behavior.
*/
if (contain_mutable_functions((Node *) originalQuery))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"non-IMMUTABLE functions are not yet supported "
"in MERGE sql with distributed tables ",
NULL, NULL);
}
List *rangeTableList = ExtractRangeTableEntryList(originalQuery);
/*
* Fast path queries cannot have merge command, and we prevent the remaining here.
* In Citus we have limited support for MERGE, it's allowed only if all
* the tables(target, source or any CTE) tables are are local i.e. a
* combination of Citus local and Non-Citus tables (regular Postgres tables)
* or distributed tables with some restrictions, please see header of routine
* ErrorIfDistTablesNotColocated for details.
*/
DeferredErrorMessage *deferredError =
ErrorIfMergeHasUnsupportedTables(resultRelationId,
originalQuery,
rangeTableList,
plannerRestrictionContext);
if (deferredError)
{
/* MERGE's unsupported combination, raise the exception */
RaiseDeferredError(deferredError, ERROR);
}
deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId,
originalQuery->jointree,
originalQuery->jointree->
quals,
originalQuery->targetList,
originalQuery->commandType);
if (deferredError)
{
return deferredError;
}
/*
* MERGE is a special case where we have multiple modify statements
* within itself. Check each INSERT/UPDATE/DELETE individually.
*/
MergeAction *action = NULL;
foreach_ptr(action, originalQuery->mergeActionList)
{
Assert(originalQuery->returningList == NULL);
deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId,
originalQuery->jointree,
action->qual,
action->targetList,
action->commandType);
if (deferredError)
{
/* MERGE's unsupported scenario, raise the exception */
RaiseDeferredError(deferredError, ERROR);
}
}
deferredError =
InsertDistributionColumnMatchesSource(resultRelationId, originalQuery);
if (deferredError)
{
/* MERGE's unsupported scenario, raise the exception */
RaiseDeferredError(deferredError, ERROR);
}
if (multiShardQuery)
{
deferredError =
DeferErrorIfUnsupportedSubqueryPushdown(originalQuery,
plannerRestrictionContext);
if (deferredError)
{
return deferredError;
}
}
if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"a join with USING causes an internal naming "
"conflict, use ON instead", NULL, NULL);
}
return NULL;
#endif
}
#if PG_VERSION_NUM >= PG_VERSION_15
/*
* ErrorIfDistTablesNotColocated Checks to see if
*
@ -728,6 +557,174 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, FromExpr *joinTre
#endif
/*
* MergeQuerySupported does check for a MERGE command in the query, if it finds
* one, it will verify the below criteria
* - Supported tables and combinations in ErrorIfMergeHasUnsupportedTables
* - Distributed tables requirements in ErrorIfDistTablesNotColocated
* - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported
*/
DeferredErrorMessage *
MergeQuerySupported(Oid resultRelationId, Query *originalQuery, bool multiShardQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{
/* function is void for pre-15 versions of Postgres */
#if PG_VERSION_NUM < PG_VERSION_15
return NULL;
#else
/*
* TODO: For now, we are adding an exception where any volatile or stable
* functions are not allowed in the MERGE query, but this will become too
* restrictive as this will prevent many useful and simple cases, such as,
* INSERT VALUES(ts::timestamp), bigserial column inserts etc. But without
* this restriction, we have a potential danger of some of the function(s)
* getting executed at the worker which will result in incorrect behavior.
*/
if (contain_mutable_functions((Node *) originalQuery))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"non-IMMUTABLE functions are not yet supported "
"in MERGE sql with distributed tables ",
NULL, NULL);
}
List *rangeTableList = ExtractRangeTableEntryList(originalQuery);
/*
* Fast path queries cannot have merge command, and we prevent the remaining here.
* In Citus we have limited support for MERGE, it's allowed only if all
* the tables(target, source or any CTE) tables are are local i.e. a
* combination of Citus local and Non-Citus tables (regular Postgres tables)
* or distributed tables with some restrictions, please see header of routine
* ErrorIfDistTablesNotColocated for details.
*/
DeferredErrorMessage *deferredError =
ErrorIfMergeHasUnsupportedTables(resultRelationId,
originalQuery,
rangeTableList,
plannerRestrictionContext);
if (deferredError)
{
/* MERGE's unsupported combination, raise the exception */
RaiseDeferredError(deferredError, ERROR);
}
deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId,
originalQuery->jointree,
originalQuery->jointree->
quals,
originalQuery->targetList,
originalQuery->commandType);
if (deferredError)
{
return deferredError;
}
/*
* MERGE is a special case where we have multiple modify statements
* within itself. Check each INSERT/UPDATE/DELETE individually.
*/
MergeAction *action = NULL;
foreach_ptr(action, originalQuery->mergeActionList)
{
Assert(originalQuery->returningList == NULL);
deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId,
originalQuery->jointree,
action->qual,
action->targetList,
action->commandType);
if (deferredError)
{
/* MERGE's unsupported scenario, raise the exception */
RaiseDeferredError(deferredError, ERROR);
}
}
deferredError =
InsertDistributionColumnMatchesSource(resultRelationId, originalQuery);
if (deferredError)
{
/* MERGE's unsupported scenario, raise the exception */
RaiseDeferredError(deferredError, ERROR);
}
if (multiShardQuery)
{
deferredError =
DeferErrorIfUnsupportedSubqueryPushdown(originalQuery,
plannerRestrictionContext);
if (deferredError)
{
return deferredError;
}
}
if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"a join with USING causes an internal naming "
"conflict, use ON instead", NULL, NULL);
}
return NULL;
#endif
}
/*
* CreateMergePlan attempts to create a plan for the given MERGE SQL
* statement. If planning fails ->planningError is set to a description
* of the failure.
*/
DistributedPlan *
CreateMergePlan(Query *originalQuery, Query *query,
PlannerRestrictionContext *plannerRestrictionContext)
{
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
bool multiShardQuery = false;
Oid targetRelationId = ModifyQueryResultRelationId(originalQuery);
Assert(originalQuery->commandType == CMD_MERGE);
Assert(OidIsValid(targetRelationId));
distributedPlan->targetRelationId = targetRelationId;
distributedPlan->modLevel = RowModifyLevelForQuery(query);
distributedPlan->planningError = MergeQuerySupported(targetRelationId,
originalQuery,
multiShardQuery,
plannerRestrictionContext);
if (distributedPlan->planningError != NULL)
{
return distributedPlan;
}
Job *job = RouterJob(originalQuery, plannerRestrictionContext,
&distributedPlan->planningError);
if (distributedPlan->planningError != NULL)
{
return distributedPlan;
}
ereport(DEBUG1, (errmsg("Creating MERGE router plan")));
distributedPlan->workerJob = job;
distributedPlan->combineQuery = NULL;
/* MERGE doesn't support RETURNING clause */
distributedPlan->expectResults = false;
distributedPlan->fastPathRouterPlan =
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
return distributedPlan;
}
/*
* IsLocalTableModification returns true if the table modified is a Postgres table.
* We do not support recursive planning for MERGE yet, so we could have a join