Add single-shard router Merge command support (#7088)

Similar to https://github.com/citusdata/citus/pull/7077.

As PG 16+ has changed the join restriction information for certain outer
joins, MERGE is also impacted given that is is also underlying an outer
join.

See #7077 for the details.
pull/7100/head
Önder Kalacı 2023-08-04 08:16:29 +03:00 committed by GitHub
parent 0d503dd5ac
commit 4ae3982d14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 94 additions and 27 deletions

View File

@ -100,7 +100,7 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
Query *mergeQuery =
copyObject(distributedPlan->modifyQueryViaCoordinatorOrRepartition);
RangeTblEntry *targetRte = ExtractResultRelationRTE(mergeQuery);
RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery);
RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery, false);
Oid targetRelationId = targetRte->relid;
bool hasReturning = distributedPlan->expectResults;
Query *sourceQuery = sourceRte->subquery;
@ -211,7 +211,7 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
Query *mergeQuery =
copyObject(distributedPlan->modifyQueryViaCoordinatorOrRepartition);
RangeTblEntry *targetRte = ExtractResultRelationRTE(mergeQuery);
RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery);
RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery, false);
Query *sourceQuery = sourceRte->subquery;
Oid targetRelationId = targetRte->relid;
PlannedStmt *sourcePlan =

View File

@ -57,6 +57,9 @@ static DeferredErrorMessage * DeferErrorIfRoutableMergeNotSupported(Query *query
*
plannerRestrictionContext,
Oid targetRelationId);
static bool MergeSourceHasRouterSelect(Query *query,
PlannerRestrictionContext *
plannerRestrictionContext);
static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid
resultRelationId,
Query *query,
@ -234,7 +237,7 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ
ParamListInfo boundParams)
{
Query *mergeQuery = copyObject(originalQuery);
RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery);
RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery, false);
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
ereport(DEBUG1, (errmsg("Creating MERGE repartition plan")));
@ -959,7 +962,8 @@ DeferErrorIfTargetHasFalseClause(Oid targetRelationId,
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
if (ContainsFalseClause(restrictClauseList))
if (ContainsFalseClause(restrictClauseList) ||
JoinConditionIsOnFalse(relationRestriction->relOptInfo->joininfo))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"Routing query is not possible with "
@ -1047,22 +1051,41 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
"must be colocated", NULL, NULL);
}
DeferredErrorMessage *deferredError =
DeferErrorIfUnsupportedSubqueryPushdown(query,
plannerRestrictionContext);
if (deferredError)
{
ereport(DEBUG1, (errmsg("Sub-query is not pushable, try repartitioning")));
return deferredError;
}
DeferredErrorMessage *deferredError = NULL;
if (HasDangerousJoinUsing(query->rtable, (Node *) query->jointree))
/*
* if the query goes to a single node ("router" in Citus' parlance),
* we don't need to go through certain SQL support and colocation checks.
*
* For PG16+, this is required as some of the outer JOINs are converted to
* "ON(true)" and filters are pushed down to the table scans. As
* DeferErrorIfUnsupportedSubqueryPushdown rely on JOIN filters, it will fail to
* detect the router case. However, we can still detect it by checking if
* the query is a router query as the router query checks the filters on
* the tables.
*/
if (!MergeSourceHasRouterSelect(query, plannerRestrictionContext))
{
ereport(DEBUG1, (errmsg(
"Query has ambigious joins, merge is not pushable, try repartitioning")));
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"a join with USING causes an internal naming "
"conflict, use ON instead", NULL, NULL);
deferredError =
DeferErrorIfUnsupportedSubqueryPushdown(query,
plannerRestrictionContext);
if (deferredError)
{
ereport(DEBUG1, (errmsg("Sub-query is not pushable, try repartitioning")));
return deferredError;
}
if (HasDangerousJoinUsing(query->rtable, (Node *) query->jointree))
{
ereport(DEBUG1, (errmsg(
"Query has ambigious joins, merge is not pushable, try repartitioning")));
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"a join with USING causes an internal naming "
"conflict, use ON instead", NULL, NULL);
}
}
deferredError = DeferErrorIfTargetHasFalseClause(targetRelationId,
@ -1080,6 +1103,36 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
}
/*
* MergeSourceHasRouterSelect is a helper function that returns true of the source
* part of the merge query is a router query.
*/
static bool
MergeSourceHasRouterSelect(Query *query,
PlannerRestrictionContext *plannerRestrictionContext)
{
Query *copiedQuery = copyObject(query);
RangeTblEntry *mergeSourceRte = ExtractMergeSourceRangeTableEntry(copiedQuery, true);
if (mergeSourceRte == NULL)
{
/*
* We might potentially support this case in the future, but for now,
* we don't support MERGE with JOIN in the source.
*/
return false;
}
ConvertSourceRTEIntoSubquery(copiedQuery, mergeSourceRte, plannerRestrictionContext);
Query *sourceQuery = mergeSourceRte->subquery;
DistributedPlan *distributedPlan = CreateRouterPlan(sourceQuery, sourceQuery,
plannerRestrictionContext);
return distributedPlan->planningError == NULL;
}
/*
* ErrorIfMergeQueryQualAndTargetListNotSupported does check for a MERGE command in the query, if it finds
* one, it will verify the below criteria
@ -1288,7 +1341,7 @@ SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
* table or source query in USING clause.
*/
RangeTblEntry *
ExtractMergeSourceRangeTableEntry(Query *query)
ExtractMergeSourceRangeTableEntry(Query *query, bool joinSourceOk)
{
/* function is void for pre-15 versions of Postgres */
#if PG_VERSION_NUM < PG_VERSION_15
@ -1301,7 +1354,10 @@ ExtractMergeSourceRangeTableEntry(Query *query)
List *fromList = query->jointree->fromlist;
/* We should have only one RTE(MergeStmt->sourceRelation) in the from-list */
/*
* We should have only one RTE(MergeStmt->sourceRelation) in the from-list
* unless Postgres community changes the representation of merge.
*/
if (list_length(fromList) != 1)
{
ereport(ERROR, (errmsg("Unexpected source list in MERGE sql USING clause")));
@ -1316,12 +1372,18 @@ ExtractMergeSourceRangeTableEntry(Query *query)
*/
if (reference->rtindex == 0)
{
ereport(ERROR, (errmsg("Source is not an explicit query"),
errhint("Source query is a Join expression, "
"try converting into a query as SELECT * "
"FROM (..Join..)")));
if (!joinSourceOk)
{
ereport(ERROR, (errmsg("Source is not an explicit query"),
errhint("Source query is a Join expression, "
"try converting into a query as SELECT * "
"FROM (..Join..)")));
}
return NULL;
}
Assert(reference->rtindex >= 1);
RangeTblEntry *subqueryRte = rt_fetch(reference->rtindex, query->rtable);

View File

@ -295,7 +295,7 @@ NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors,
CitusScanState *scanState = (CitusScanState *) node;
DistributedPlan *distributedPlan = scanState->distributedPlan;
Query *mergeQuery = distributedPlan->modifyQueryViaCoordinatorOrRepartition;
RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery);
RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery, false);
/*
* Create a copy because ExplainOneQuery can modify the query, and later

View File

@ -396,7 +396,7 @@ ExtractSourceResultRangeTableEntry(Query *query)
{
if (IsMergeQuery(query))
{
return ExtractMergeSourceRangeTableEntry(query);
return ExtractMergeSourceRangeTableEntry(query, false);
}
else if (CheckInsertSelectQuery(query))
{

View File

@ -30,7 +30,7 @@ extern bool IsLocalTableModification(Oid targetRelationId, Query *query,
extern void NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors,
struct ExplainState *es);
extern Var * FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query);
extern RangeTblEntry * ExtractMergeSourceRangeTableEntry(Query *query);
extern RangeTblEntry * ExtractMergeSourceRangeTableEntry(Query *query, bool joinSourceOk);
#endif /* MERGE_PLANNER_H */

View File

@ -71,20 +71,24 @@ SET search_path TO schema_shard_table1;
SET client_min_messages TO DEBUG2;
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b;
DEBUG: Creating router plan
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING schema_shard_table1.nullkey_c1_t2_4005007 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b>
DEBUG: Creating MERGE router plan
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN DELETE;
DEBUG: Creating router plan
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING schema_shard_table1.nullkey_c1_t2_4005007 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE>
DEBUG: Creating MERGE router plan
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
DEBUG: Creating router plan
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING schema_shard_table1.nullkey_c1_t2_4005007 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
DEBUG: Creating MERGE router plan
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
DEBUG: Creating router plan
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING schema_shard_table1.nullkey_c1_t2_4005007 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
DEBUG: Creating MERGE router plan
SET search_path TO schema_shard_table2;
@ -195,6 +199,7 @@ WITH cte AS (
)
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
WHEN MATCHED THEN UPDATE SET b = cte.b;
DEBUG: Creating router plan
DEBUG: <Deparsed MERGE query: WITH cte AS (SELECT nullkey_c1_t1_1.a, nullkey_c1_t1_1.b FROM schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1_1) MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b>
DEBUG: Creating MERGE router plan
WITH cte AS (