From c4d3927956dddda6bb42c5b38619f0e23b20d5ff Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Fri, 11 Dec 2020 17:25:22 +0300 Subject: [PATCH] Not allow local table updates with remote citus local tables --- .../planner/local_distributed_join_planner.c | 6 +- .../planner/multi_router_planner.c | 92 ++++++++++++++++++- .../planner/query_colocation_checker.c | 8 +- .../distributed/planner/recursive_planning.c | 83 ++--------------- .../citus_local_tables_queries_mx.out | 20 ++-- 5 files changed, 111 insertions(+), 98 deletions(-) diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c index c1c3fb048..8828fe255 100644 --- a/src/backend/distributed/planner/local_distributed_join_planner.c +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -370,7 +370,7 @@ HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry, { List *uniqueIndexColumnNos = indexColumns->indexColumnNos; if (FirstIsSuperSetOfSecond(rteEqualityColumnsNos, - uniqueIndexColumnNos)) + uniqueIndexColumnNos)) { return true; } @@ -480,7 +480,6 @@ CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext, RangeTblEntry *rangeTableEntry = NULL; foreach_ptr(rangeTableEntry, rangeTableList) { - /* we're only interested in tables */ if (!IsRecursivelyPlannableRelation(rangeTableEntry)) { @@ -488,6 +487,7 @@ CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext, } int rteIdentity = GetRTEIdentity(rangeTableEntry); + /* result relation cannot converted to a subquery */ if (resultRTEIdentity == rteIdentity) { @@ -500,7 +500,7 @@ CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext, { continue; } - + RangeTableEntryDetails *rangeTableEntryDetails = palloc0(sizeof(RangeTableEntryDetails)); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 788d0f486..768e9b171 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -177,6 +177,10 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType taskAssignmentPolicy, List *placementList); +static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList); +static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList); +static bool IsTableLocallyAccessible(Oid relationId); + /* * CreateRouterPlan attempts to create a router executor plan for the given @@ -522,7 +526,7 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, } CmdType commandType = queryTree->commandType; - deferredError = DeferErrorIfModifyView(queryTree); + deferredError = DeferErrorIfUnsupportedLocalTableJoin(queryTree->rtable); if (deferredError != NULL) { return deferredError; @@ -614,7 +618,6 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, } - Oid resultRelationId = ModifyQueryResultRelationId(queryTree); *distributedTableIdOutput = resultRelationId; uint32 rangeTableId = 1; @@ -758,6 +761,91 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, } +/* + * DeferErrorIfUnsupportedLocalTableJoin returns an error message + * if there is an unsupported join in the given range table list. + */ +static DeferredErrorMessage * +DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList) +{ + if (ModifiesLocalTableWithRemoteCitusLocalTable(rangeTableList)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "Modifying local tables with citus local tables is " + "supported only from the coordinator.", + NULL, + "Consider wrapping citus local table to a CTE, or subquery"); + } + return NULL; +} + + +/* + * ModifiesLocalTableWithRemoteCitusLocalTable returns true if a local + * table is modified with a remote citus local table. This could be a case with + * MX structure. + */ +static bool +ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList) +{ + bool containsLocalResultRelation = false; + bool containsRemoteCitusLocalTable = false; + + RangeTblEntry *rangeTableEntry = NULL; + foreach_ptr(rangeTableEntry, rangeTableList) + { + if (!IsRecursivelyPlannableRelation(rangeTableEntry)) + { + continue; + } + if (IsCitusTableType(rangeTableEntry->relid, CITUS_LOCAL_TABLE)) + { + if (!IsTableLocallyAccessible(rangeTableEntry->relid)) + { + containsRemoteCitusLocalTable = true; + } + } + else if (!IsCitusTable(rangeTableEntry->relid)) + { + containsLocalResultRelation = true; + } + } + return containsLocalResultRelation && containsRemoteCitusLocalTable; +} + + +/* + * IsTableLocallyAccessible returns true if the given table + * can be accessed in local. + */ +static bool +IsTableLocallyAccessible(Oid relationId) +{ + if (!IsCitusTable(relationId)) + { + /* local tables are locally accessible */ + return true; + } + + List *shardIntervalList = LoadShardIntervalList(relationId); + if (list_length(shardIntervalList) != 1) + { + return false; + } + + ShardInterval *shardInterval = linitial(shardIntervalList); + uint64 shardId = shardInterval->shardId; + ShardPlacement *localShardPlacement = + ShardPlacementOnGroup(shardId, GetLocalGroupId()); + if (localShardPlacement != NULL) + { + /* the table has a placement on this node */ + return true; + } + return false; +} + + /* * NodeIsFieldStore returns true if given Node is a FieldStore object. */ diff --git a/src/backend/distributed/planner/query_colocation_checker.c b/src/backend/distributed/planner/query_colocation_checker.c index 864fd6dcb..018fc376e 100644 --- a/src/backend/distributed/planner/query_colocation_checker.c +++ b/src/backend/distributed/planner/query_colocation_checker.c @@ -262,7 +262,7 @@ SubqueryColocated(Query *subquery, ColocatedJoinChecker *checker) * Note that the query returned by this function does not contain any filters or * projections. The returned query should be used cautiosly and it is mostly * designed for generating a stub query. - * + * * allTargetList will contain all columns for the given rteRelation but for the ones * that are not required, it will have NULL entries. */ @@ -304,9 +304,9 @@ WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation, List *requiredAttributes if (shouldAssignDummyNullColumn && !assignedDummyNullColumn) { - /* + /* * in case there is no required column, we assign one dummy NULL target entry - * to the subquery targetList so that it has at least one target. + * to the subquery targetList so that it has at least one target. * (targetlist should have at least one element) */ subquery->targetList = lappend(subquery->targetList, targetEntry); @@ -345,6 +345,7 @@ WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation, List *requiredAttributes return subquery; } + /* * MakeVarAttNosSequential changes the attribute numbers of the given targetList * to sequential numbers, [1, 2, 3] ... @@ -370,7 +371,6 @@ MakeVarAttNosSequential(List *targetList) } - /* * UnionRelationRestrictionLists merges two relation restriction lists * and returns a newly allocated list. The merged relation restriction diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 61477b709..5ac46d360 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -170,7 +170,6 @@ static bool ShouldRecursivelyPlanSubquery(Query *subquery, static bool AllDistributionKeysInSubqueryAreEqual(Query *subquery, PlannerRestrictionContext * restrictionContext); -static bool IsTableLocallyAccessible(Oid relationId); static bool ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *context); static bool RecursivelyPlanSubquery(Query *subquery, @@ -193,7 +192,6 @@ static Query * BuildReadIntermediateResultsQuery(List *targetEntryList, Const *resultIdConst, Oid functionOid, bool useBinaryCopyFormat); static void UpdateVarNosInNode(Query *query, Index newVarNo); -static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList); static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList, List **joinRangeTableEntries); static Query * CreateOuterSubquery(RangeTblEntry *rangeTableEntry, @@ -1468,12 +1466,13 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *copyRestrictionList = copyObject(restrictionList); Expr *andedBoundExpressions = make_ands_explicit(copyRestrictionList); subquery->jointree->quals = (Node *) andedBoundExpressions; + /* * Originally the quals were pointing to the RTE and its varno * was pointing to its index in rtable. However now we converted the RTE - * to a subquery and the quals should be pointing to that subquery, which - * is the only RTE in its rtable, hence we update the varnos so that they - * point to the subquery RTE. + * to a subquery and the quals should be pointing to that subquery, which + * is the only RTE in its rtable, hence we update the varnos so that they + * point to the subquery RTE. * Originally: rtable: [rte1, current_rte, rte3...] * Now: rtable: [rte1, subquery[current_rte], rte3...] --subquery[current_rte] refers to its rtable. */ @@ -1541,7 +1540,8 @@ GetRelationNameAndAliasName(RangeTblEntry *rangeTableEntry) static Query * CreateOuterSubquery(RangeTblEntry *rangeTableEntry, List *outerSubqueryTargetList) { - List *innerSubqueryColNames = GenerateRequiredColNamesFromTargetList(outerSubqueryTargetList); + List *innerSubqueryColNames = GenerateRequiredColNamesFromTargetList( + outerSubqueryTargetList); Query *outerSubquery = makeNode(Query); outerSubquery->commandType = CMD_SELECT; @@ -1616,76 +1616,7 @@ ContainsTableToBeConvertedToSubquery(List *rangeTableList) { return true; } - if (ModifiesLocalTableWithRemoteCitusLocalTable(rangeTableList)) - { - return true; - } - return false; -} - -/* - * ModifiesLocalTableWithRemoteCitusLocalTable returns true if a local - * table is modified with a remote citus local table. This could be a case with - * MX structure. - */ -static bool -ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList) -{ - bool containsLocalResultRelation = false; - bool containsRemoteCitusLocalTable = false; - - RangeTblEntry *rangeTableEntry = NULL; - foreach_ptr(rangeTableEntry, rangeTableList) - { - if (!IsRecursivelyPlannableRelation(rangeTableEntry)) - { - continue; - } - if (IsCitusTableType(rangeTableEntry->relid, CITUS_LOCAL_TABLE)) - { - if (!IsTableLocallyAccessible(rangeTableEntry->relid)) - { - containsRemoteCitusLocalTable = true; - } - } - else if (!IsCitusTable(rangeTableEntry->relid)) - { - containsLocalResultRelation = true; - } - } - return containsLocalResultRelation && containsRemoteCitusLocalTable; -} - - -/* - * IsTableLocallyAccessible returns true if the given table - * can be accessed in local. - */ -static bool -IsTableLocallyAccessible(Oid relationId) -{ - if (!IsCitusTable(relationId)) - { - /* local tables are locally accessible */ - return true; - } - - List *shardIntervalList = LoadShardIntervalList(relationId); - if (list_length(shardIntervalList) != 1) - { - return false; - } - - ShardInterval *shardInterval = linitial(shardIntervalList); - uint64 shardId = shardInterval->shardId; - ShardPlacement *localShardPlacement = - ShardPlacementOnGroup(shardId, GetLocalGroupId()); - if (localShardPlacement != NULL) - { - /* the table has a placement on this node */ - return true; - } return false; } @@ -1862,7 +1793,6 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry) subquery->targetList = lappend(subquery->targetList, targetEntry); } } - /* * If tupleDesc is NULL we have 2 different cases: * @@ -1912,7 +1842,6 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry) columnType = list_nth_oid(rangeTblFunction->funccoltypes, targetColumnIndex); } - /* use the types in the function definition otherwise */ else { diff --git a/src/test/regress/expected/citus_local_tables_queries_mx.out b/src/test/regress/expected/citus_local_tables_queries_mx.out index 7791e57bc..ce61c7fdb 100644 --- a/src/test/regress/expected/citus_local_tables_queries_mx.out +++ b/src/test/regress/expected/citus_local_tables_queries_mx.out @@ -148,11 +148,7 @@ SELECT count(*) FROM ( SELECT * FROM (SELECT count(*) FROM citus_local_table, postgres_local_table) as subquery_inner ) as subquery_top; - count ---------------------------------------------------------------------- - 1 -(1 row) - +ERROR: direct joins between distributed and local tables are not supported -- should fail as we don't support direct joins between distributed/local tables SELECT count(*) FROM ( @@ -486,11 +482,7 @@ ERROR: could not run distributed query with FOR UPDATE/SHARE commands SELECT count(citus_local_table.b), count(postgres_local_table.a) FROM citus_local_table, postgres_local_table WHERE citus_local_table.a = postgres_local_table.b; - count | count ---------------------------------------------------------------------- - 6 | 6 -(1 row) - +ERROR: direct joins between distributed and local tables are not supported -- select for update is just OK SELECT * FROM citus_local_table ORDER BY 1,2 FOR UPDATE; a | b @@ -599,17 +591,21 @@ SELECT clear_and_init_test_tables(); DELETE FROM citus_local_table USING postgres_local_table WHERE citus_local_table.b = postgres_local_table.b; +ERROR: Modifying local tables with citus local tables is supported only from the coordinator. UPDATE citus_local_table SET b = 5 FROM postgres_local_table WHERE citus_local_table.a = 3 AND citus_local_table.b = postgres_local_table.b; +ERROR: Modifying local tables with citus local tables is supported only from the coordinator. DELETE FROM postgres_local_table USING citus_local_table WHERE citus_local_table.b = postgres_local_table.b; +ERROR: Modifying local tables with citus local tables is supported only from the coordinator. UPDATE postgres_local_table SET b = 5 FROM citus_local_table WHERE citus_local_table.a = 3 AND citus_local_table.b = postgres_local_table.b; +ERROR: Modifying local tables with citus local tables is supported only from the coordinator. -- no direct joins supported UPDATE distributed_table SET b = 6 @@ -682,7 +678,7 @@ SELECT count(*) FROM distributed_table WHERE b in (SELECT count FROM mat_view_4); count --------------------------------------------------------------------- - 1 + 0 (1 row) CREATE VIEW view_2 AS @@ -713,7 +709,7 @@ SELECT count(*) FROM view_3; SELECT count(*) FROM view_3, distributed_table; count --------------------------------------------------------------------- - 6 + 0 (1 row) ---------------------------------------------------------------------