Merge branch 'main' into citus_pause_node

test_branch
Gürkan İndibay 2023-08-04 09:41:37 +03:00 committed by GitHub
commit 18c55a44a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 94 additions and 27 deletions

View File

@ -100,7 +100,7 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
Query *mergeQuery = Query *mergeQuery =
copyObject(distributedPlan->modifyQueryViaCoordinatorOrRepartition); copyObject(distributedPlan->modifyQueryViaCoordinatorOrRepartition);
RangeTblEntry *targetRte = ExtractResultRelationRTE(mergeQuery); RangeTblEntry *targetRte = ExtractResultRelationRTE(mergeQuery);
RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery); RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery, false);
Oid targetRelationId = targetRte->relid; Oid targetRelationId = targetRte->relid;
bool hasReturning = distributedPlan->expectResults; bool hasReturning = distributedPlan->expectResults;
Query *sourceQuery = sourceRte->subquery; Query *sourceQuery = sourceRte->subquery;
@ -211,7 +211,7 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
Query *mergeQuery = Query *mergeQuery =
copyObject(distributedPlan->modifyQueryViaCoordinatorOrRepartition); copyObject(distributedPlan->modifyQueryViaCoordinatorOrRepartition);
RangeTblEntry *targetRte = ExtractResultRelationRTE(mergeQuery); RangeTblEntry *targetRte = ExtractResultRelationRTE(mergeQuery);
RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery); RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery, false);
Query *sourceQuery = sourceRte->subquery; Query *sourceQuery = sourceRte->subquery;
Oid targetRelationId = targetRte->relid; Oid targetRelationId = targetRte->relid;
PlannedStmt *sourcePlan = PlannedStmt *sourcePlan =

View File

@ -57,6 +57,9 @@ static DeferredErrorMessage * DeferErrorIfRoutableMergeNotSupported(Query *query
* *
plannerRestrictionContext, plannerRestrictionContext,
Oid targetRelationId); Oid targetRelationId);
static bool MergeSourceHasRouterSelect(Query *query,
PlannerRestrictionContext *
plannerRestrictionContext);
static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid
resultRelationId, resultRelationId,
Query *query, Query *query,
@ -234,7 +237,7 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ
ParamListInfo boundParams) ParamListInfo boundParams)
{ {
Query *mergeQuery = copyObject(originalQuery); Query *mergeQuery = copyObject(originalQuery);
RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery); RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery, false);
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
ereport(DEBUG1, (errmsg("Creating MERGE repartition plan"))); ereport(DEBUG1, (errmsg("Creating MERGE repartition plan")));
@ -959,7 +962,8 @@ DeferErrorIfTargetHasFalseClause(Oid targetRelationId,
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo; List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
List *restrictClauseList = get_all_actual_clauses(baseRestrictionList); List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
if (ContainsFalseClause(restrictClauseList)) if (ContainsFalseClause(restrictClauseList) ||
JoinConditionIsOnFalse(relationRestriction->relOptInfo->joininfo))
{ {
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"Routing query is not possible with " "Routing query is not possible with "
@ -1047,7 +1051,25 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
"must be colocated", NULL, NULL); "must be colocated", NULL, NULL);
} }
DeferredErrorMessage *deferredError = DeferredErrorMessage *deferredError = NULL;
/*
* if the query goes to a single node ("router" in Citus' parlance),
* we don't need to go through certain SQL support and colocation checks.
*
* For PG16+, this is required as some of the outer JOINs are converted to
* "ON(true)" and filters are pushed down to the table scans. As
* DeferErrorIfUnsupportedSubqueryPushdown rely on JOIN filters, it will fail to
* detect the router case. However, we can still detect it by checking if
* the query is a router query as the router query checks the filters on
* the tables.
*/
if (!MergeSourceHasRouterSelect(query, plannerRestrictionContext))
{
deferredError =
DeferErrorIfUnsupportedSubqueryPushdown(query, DeferErrorIfUnsupportedSubqueryPushdown(query,
plannerRestrictionContext); plannerRestrictionContext);
if (deferredError) if (deferredError)
@ -1064,6 +1086,7 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
"a join with USING causes an internal naming " "a join with USING causes an internal naming "
"conflict, use ON instead", NULL, NULL); "conflict, use ON instead", NULL, NULL);
} }
}
deferredError = DeferErrorIfTargetHasFalseClause(targetRelationId, deferredError = DeferErrorIfTargetHasFalseClause(targetRelationId,
plannerRestrictionContext); plannerRestrictionContext);
@ -1080,6 +1103,36 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
} }
/*
* MergeSourceHasRouterSelect is a helper function that returns true of the source
* part of the merge query is a router query.
*/
static bool
MergeSourceHasRouterSelect(Query *query,
PlannerRestrictionContext *plannerRestrictionContext)
{
Query *copiedQuery = copyObject(query);
RangeTblEntry *mergeSourceRte = ExtractMergeSourceRangeTableEntry(copiedQuery, true);
if (mergeSourceRte == NULL)
{
/*
* We might potentially support this case in the future, but for now,
* we don't support MERGE with JOIN in the source.
*/
return false;
}
ConvertSourceRTEIntoSubquery(copiedQuery, mergeSourceRte, plannerRestrictionContext);
Query *sourceQuery = mergeSourceRte->subquery;
DistributedPlan *distributedPlan = CreateRouterPlan(sourceQuery, sourceQuery,
plannerRestrictionContext);
return distributedPlan->planningError == NULL;
}
/* /*
* ErrorIfMergeQueryQualAndTargetListNotSupported does check for a MERGE command in the query, if it finds * ErrorIfMergeQueryQualAndTargetListNotSupported does check for a MERGE command in the query, if it finds
* one, it will verify the below criteria * one, it will verify the below criteria
@ -1288,7 +1341,7 @@ SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
* table or source query in USING clause. * table or source query in USING clause.
*/ */
RangeTblEntry * RangeTblEntry *
ExtractMergeSourceRangeTableEntry(Query *query) ExtractMergeSourceRangeTableEntry(Query *query, bool joinSourceOk)
{ {
/* function is void for pre-15 versions of Postgres */ /* function is void for pre-15 versions of Postgres */
#if PG_VERSION_NUM < PG_VERSION_15 #if PG_VERSION_NUM < PG_VERSION_15
@ -1301,7 +1354,10 @@ ExtractMergeSourceRangeTableEntry(Query *query)
List *fromList = query->jointree->fromlist; List *fromList = query->jointree->fromlist;
/* We should have only one RTE(MergeStmt->sourceRelation) in the from-list */ /*
* We should have only one RTE(MergeStmt->sourceRelation) in the from-list
* unless Postgres community changes the representation of merge.
*/
if (list_length(fromList) != 1) if (list_length(fromList) != 1)
{ {
ereport(ERROR, (errmsg("Unexpected source list in MERGE sql USING clause"))); ereport(ERROR, (errmsg("Unexpected source list in MERGE sql USING clause")));
@ -1315,6 +1371,8 @@ ExtractMergeSourceRangeTableEntry(Query *query)
* such joins. * such joins.
*/ */
if (reference->rtindex == 0) if (reference->rtindex == 0)
{
if (!joinSourceOk)
{ {
ereport(ERROR, (errmsg("Source is not an explicit query"), ereport(ERROR, (errmsg("Source is not an explicit query"),
errhint("Source query is a Join expression, " errhint("Source query is a Join expression, "
@ -1322,6 +1380,10 @@ ExtractMergeSourceRangeTableEntry(Query *query)
"FROM (..Join..)"))); "FROM (..Join..)")));
} }
return NULL;
}
Assert(reference->rtindex >= 1); Assert(reference->rtindex >= 1);
RangeTblEntry *subqueryRte = rt_fetch(reference->rtindex, query->rtable); RangeTblEntry *subqueryRte = rt_fetch(reference->rtindex, query->rtable);

View File

@ -295,7 +295,7 @@ NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors,
CitusScanState *scanState = (CitusScanState *) node; CitusScanState *scanState = (CitusScanState *) node;
DistributedPlan *distributedPlan = scanState->distributedPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
Query *mergeQuery = distributedPlan->modifyQueryViaCoordinatorOrRepartition; Query *mergeQuery = distributedPlan->modifyQueryViaCoordinatorOrRepartition;
RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery); RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery, false);
/* /*
* Create a copy because ExplainOneQuery can modify the query, and later * Create a copy because ExplainOneQuery can modify the query, and later

View File

@ -396,7 +396,7 @@ ExtractSourceResultRangeTableEntry(Query *query)
{ {
if (IsMergeQuery(query)) if (IsMergeQuery(query))
{ {
return ExtractMergeSourceRangeTableEntry(query); return ExtractMergeSourceRangeTableEntry(query, false);
} }
else if (CheckInsertSelectQuery(query)) else if (CheckInsertSelectQuery(query))
{ {

View File

@ -30,7 +30,7 @@ extern bool IsLocalTableModification(Oid targetRelationId, Query *query,
extern void NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors, extern void NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors,
struct ExplainState *es); struct ExplainState *es);
extern Var * FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query); extern Var * FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query);
extern RangeTblEntry * ExtractMergeSourceRangeTableEntry(Query *query); extern RangeTblEntry * ExtractMergeSourceRangeTableEntry(Query *query, bool joinSourceOk);
#endif /* MERGE_PLANNER_H */ #endif /* MERGE_PLANNER_H */

View File

@ -71,20 +71,24 @@ SET search_path TO schema_shard_table1;
SET client_min_messages TO DEBUG2; SET client_min_messages TO DEBUG2;
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b; WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b;
DEBUG: Creating router plan
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING schema_shard_table1.nullkey_c1_t2_4005007 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b> DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING schema_shard_table1.nullkey_c1_t2_4005007 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b>
DEBUG: Creating MERGE router plan DEBUG: Creating MERGE router plan
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN DELETE; WHEN MATCHED THEN DELETE;
DEBUG: Creating router plan
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING schema_shard_table1.nullkey_c1_t2_4005007 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE> DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING schema_shard_table1.nullkey_c1_t2_4005007 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE>
DEBUG: Creating MERGE router plan DEBUG: Creating MERGE router plan
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
DEBUG: Creating router plan
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING schema_shard_table1.nullkey_c1_t2_4005007 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)> DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING schema_shard_table1.nullkey_c1_t2_4005007 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
DEBUG: Creating MERGE router plan DEBUG: Creating MERGE router plan
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN DELETE WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
DEBUG: Creating router plan
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING schema_shard_table1.nullkey_c1_t2_4005007 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)> DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING schema_shard_table1.nullkey_c1_t2_4005007 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
DEBUG: Creating MERGE router plan DEBUG: Creating MERGE router plan
SET search_path TO schema_shard_table2; SET search_path TO schema_shard_table2;
@ -195,6 +199,7 @@ WITH cte AS (
) )
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
WHEN MATCHED THEN UPDATE SET b = cte.b; WHEN MATCHED THEN UPDATE SET b = cte.b;
DEBUG: Creating router plan
DEBUG: <Deparsed MERGE query: WITH cte AS (SELECT nullkey_c1_t1_1.a, nullkey_c1_t1_1.b FROM schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1_1) MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b> DEBUG: <Deparsed MERGE query: WITH cte AS (SELECT nullkey_c1_t1_1.a, nullkey_c1_t1_1.b FROM schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1_1) MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b>
DEBUG: Creating MERGE router plan DEBUG: Creating MERGE router plan
WITH cte AS ( WITH cte AS (