Not allow local table updates with remote citus local tables

pull/4358/head
Sait Talha Nisanci 2020-12-11 17:25:22 +03:00
parent f5dd5379b2
commit c4d3927956
5 changed files with 111 additions and 98 deletions

View File

@ -370,7 +370,7 @@ HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry,
{ {
List *uniqueIndexColumnNos = indexColumns->indexColumnNos; List *uniqueIndexColumnNos = indexColumns->indexColumnNos;
if (FirstIsSuperSetOfSecond(rteEqualityColumnsNos, if (FirstIsSuperSetOfSecond(rteEqualityColumnsNos,
uniqueIndexColumnNos)) uniqueIndexColumnNos))
{ {
return true; return true;
} }
@ -480,7 +480,6 @@ CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext,
RangeTblEntry *rangeTableEntry = NULL; RangeTblEntry *rangeTableEntry = NULL;
foreach_ptr(rangeTableEntry, rangeTableList) foreach_ptr(rangeTableEntry, rangeTableList)
{ {
/* we're only interested in tables */ /* we're only interested in tables */
if (!IsRecursivelyPlannableRelation(rangeTableEntry)) if (!IsRecursivelyPlannableRelation(rangeTableEntry))
{ {
@ -488,6 +487,7 @@ CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext,
} }
int rteIdentity = GetRTEIdentity(rangeTableEntry); int rteIdentity = GetRTEIdentity(rangeTableEntry);
/* result relation cannot converted to a subquery */ /* result relation cannot converted to a subquery */
if (resultRTEIdentity == rteIdentity) if (resultRTEIdentity == rteIdentity)
{ {

View File

@ -177,6 +177,10 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
TaskAssignmentPolicyType TaskAssignmentPolicyType
taskAssignmentPolicy, taskAssignmentPolicy,
List *placementList); List *placementList);
static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList);
static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList);
static bool IsTableLocallyAccessible(Oid relationId);
/* /*
* CreateRouterPlan attempts to create a router executor plan for the given * CreateRouterPlan attempts to create a router executor plan for the given
@ -522,7 +526,7 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
} }
CmdType commandType = queryTree->commandType; CmdType commandType = queryTree->commandType;
deferredError = DeferErrorIfModifyView(queryTree); deferredError = DeferErrorIfUnsupportedLocalTableJoin(queryTree->rtable);
if (deferredError != NULL) if (deferredError != NULL)
{ {
return deferredError; return deferredError;
@ -614,7 +618,6 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
} }
Oid resultRelationId = ModifyQueryResultRelationId(queryTree); Oid resultRelationId = ModifyQueryResultRelationId(queryTree);
*distributedTableIdOutput = resultRelationId; *distributedTableIdOutput = resultRelationId;
uint32 rangeTableId = 1; uint32 rangeTableId = 1;
@ -758,6 +761,91 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
} }
/*
* DeferErrorIfUnsupportedLocalTableJoin returns an error message
* if there is an unsupported join in the given range table list.
*/
static DeferredErrorMessage *
DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList)
{
if (ModifiesLocalTableWithRemoteCitusLocalTable(rangeTableList))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"Modifying local tables with citus local tables is "
"supported only from the coordinator.",
NULL,
"Consider wrapping citus local table to a CTE, or subquery");
}
return NULL;
}
/*
* ModifiesLocalTableWithRemoteCitusLocalTable returns true if a local
* table is modified with a remote citus local table. This could be a case with
* MX structure.
*/
static bool
ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList)
{
bool containsLocalResultRelation = false;
bool containsRemoteCitusLocalTable = false;
RangeTblEntry *rangeTableEntry = NULL;
foreach_ptr(rangeTableEntry, rangeTableList)
{
if (!IsRecursivelyPlannableRelation(rangeTableEntry))
{
continue;
}
if (IsCitusTableType(rangeTableEntry->relid, CITUS_LOCAL_TABLE))
{
if (!IsTableLocallyAccessible(rangeTableEntry->relid))
{
containsRemoteCitusLocalTable = true;
}
}
else if (!IsCitusTable(rangeTableEntry->relid))
{
containsLocalResultRelation = true;
}
}
return containsLocalResultRelation && containsRemoteCitusLocalTable;
}
/*
* IsTableLocallyAccessible returns true if the given table
* can be accessed in local.
*/
static bool
IsTableLocallyAccessible(Oid relationId)
{
if (!IsCitusTable(relationId))
{
/* local tables are locally accessible */
return true;
}
List *shardIntervalList = LoadShardIntervalList(relationId);
if (list_length(shardIntervalList) != 1)
{
return false;
}
ShardInterval *shardInterval = linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
ShardPlacement *localShardPlacement =
ShardPlacementOnGroup(shardId, GetLocalGroupId());
if (localShardPlacement != NULL)
{
/* the table has a placement on this node */
return true;
}
return false;
}
/* /*
* NodeIsFieldStore returns true if given Node is a FieldStore object. * NodeIsFieldStore returns true if given Node is a FieldStore object.
*/ */

View File

@ -345,6 +345,7 @@ WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation, List *requiredAttributes
return subquery; return subquery;
} }
/* /*
* MakeVarAttNosSequential changes the attribute numbers of the given targetList * MakeVarAttNosSequential changes the attribute numbers of the given targetList
* to sequential numbers, [1, 2, 3] ... * to sequential numbers, [1, 2, 3] ...
@ -370,7 +371,6 @@ MakeVarAttNosSequential(List *targetList)
} }
/* /*
* UnionRelationRestrictionLists merges two relation restriction lists * UnionRelationRestrictionLists merges two relation restriction lists
* and returns a newly allocated list. The merged relation restriction * and returns a newly allocated list. The merged relation restriction

View File

@ -170,7 +170,6 @@ static bool ShouldRecursivelyPlanSubquery(Query *subquery,
static bool AllDistributionKeysInSubqueryAreEqual(Query *subquery, static bool AllDistributionKeysInSubqueryAreEqual(Query *subquery,
PlannerRestrictionContext * PlannerRestrictionContext *
restrictionContext); restrictionContext);
static bool IsTableLocallyAccessible(Oid relationId);
static bool ShouldRecursivelyPlanSetOperation(Query *query, static bool ShouldRecursivelyPlanSetOperation(Query *query,
RecursivePlanningContext *context); RecursivePlanningContext *context);
static bool RecursivelyPlanSubquery(Query *subquery, static bool RecursivelyPlanSubquery(Query *subquery,
@ -193,7 +192,6 @@ static Query * BuildReadIntermediateResultsQuery(List *targetEntryList,
Const *resultIdConst, Oid functionOid, Const *resultIdConst, Oid functionOid,
bool useBinaryCopyFormat); bool useBinaryCopyFormat);
static void UpdateVarNosInNode(Query *query, Index newVarNo); static void UpdateVarNosInNode(Query *query, Index newVarNo);
static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList);
static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList, static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList,
List **joinRangeTableEntries); List **joinRangeTableEntries);
static Query * CreateOuterSubquery(RangeTblEntry *rangeTableEntry, static Query * CreateOuterSubquery(RangeTblEntry *rangeTableEntry,
@ -1468,6 +1466,7 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
List *copyRestrictionList = copyObject(restrictionList); List *copyRestrictionList = copyObject(restrictionList);
Expr *andedBoundExpressions = make_ands_explicit(copyRestrictionList); Expr *andedBoundExpressions = make_ands_explicit(copyRestrictionList);
subquery->jointree->quals = (Node *) andedBoundExpressions; subquery->jointree->quals = (Node *) andedBoundExpressions;
/* /*
* Originally the quals were pointing to the RTE and its varno * Originally the quals were pointing to the RTE and its varno
* was pointing to its index in rtable. However now we converted the RTE * was pointing to its index in rtable. However now we converted the RTE
@ -1541,7 +1540,8 @@ GetRelationNameAndAliasName(RangeTblEntry *rangeTableEntry)
static Query * static Query *
CreateOuterSubquery(RangeTblEntry *rangeTableEntry, List *outerSubqueryTargetList) CreateOuterSubquery(RangeTblEntry *rangeTableEntry, List *outerSubqueryTargetList)
{ {
List *innerSubqueryColNames = GenerateRequiredColNamesFromTargetList(outerSubqueryTargetList); List *innerSubqueryColNames = GenerateRequiredColNamesFromTargetList(
outerSubqueryTargetList);
Query *outerSubquery = makeNode(Query); Query *outerSubquery = makeNode(Query);
outerSubquery->commandType = CMD_SELECT; outerSubquery->commandType = CMD_SELECT;
@ -1616,76 +1616,7 @@ ContainsTableToBeConvertedToSubquery(List *rangeTableList)
{ {
return true; return true;
} }
if (ModifiesLocalTableWithRemoteCitusLocalTable(rangeTableList))
{
return true;
}
return false;
}
/*
* ModifiesLocalTableWithRemoteCitusLocalTable returns true if a local
* table is modified with a remote citus local table. This could be a case with
* MX structure.
*/
static bool
ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList)
{
bool containsLocalResultRelation = false;
bool containsRemoteCitusLocalTable = false;
RangeTblEntry *rangeTableEntry = NULL;
foreach_ptr(rangeTableEntry, rangeTableList)
{
if (!IsRecursivelyPlannableRelation(rangeTableEntry))
{
continue;
}
if (IsCitusTableType(rangeTableEntry->relid, CITUS_LOCAL_TABLE))
{
if (!IsTableLocallyAccessible(rangeTableEntry->relid))
{
containsRemoteCitusLocalTable = true;
}
}
else if (!IsCitusTable(rangeTableEntry->relid))
{
containsLocalResultRelation = true;
}
}
return containsLocalResultRelation && containsRemoteCitusLocalTable;
}
/*
* IsTableLocallyAccessible returns true if the given table
* can be accessed in local.
*/
static bool
IsTableLocallyAccessible(Oid relationId)
{
if (!IsCitusTable(relationId))
{
/* local tables are locally accessible */
return true;
}
List *shardIntervalList = LoadShardIntervalList(relationId);
if (list_length(shardIntervalList) != 1)
{
return false;
}
ShardInterval *shardInterval = linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
ShardPlacement *localShardPlacement =
ShardPlacementOnGroup(shardId, GetLocalGroupId());
if (localShardPlacement != NULL)
{
/* the table has a placement on this node */
return true;
}
return false; return false;
} }
@ -1862,7 +1793,6 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry)
subquery->targetList = lappend(subquery->targetList, targetEntry); subquery->targetList = lappend(subquery->targetList, targetEntry);
} }
} }
/* /*
* If tupleDesc is NULL we have 2 different cases: * If tupleDesc is NULL we have 2 different cases:
* *
@ -1912,7 +1842,6 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry)
columnType = list_nth_oid(rangeTblFunction->funccoltypes, columnType = list_nth_oid(rangeTblFunction->funccoltypes,
targetColumnIndex); targetColumnIndex);
} }
/* use the types in the function definition otherwise */ /* use the types in the function definition otherwise */
else else
{ {

View File

@ -148,11 +148,7 @@ SELECT count(*) FROM
( (
SELECT * FROM (SELECT count(*) FROM citus_local_table, postgres_local_table) as subquery_inner SELECT * FROM (SELECT count(*) FROM citus_local_table, postgres_local_table) as subquery_inner
) as subquery_top; ) as subquery_top;
count ERROR: direct joins between distributed and local tables are not supported
---------------------------------------------------------------------
1
(1 row)
-- should fail as we don't support direct joins between distributed/local tables -- should fail as we don't support direct joins between distributed/local tables
SELECT count(*) FROM SELECT count(*) FROM
( (
@ -486,11 +482,7 @@ ERROR: could not run distributed query with FOR UPDATE/SHARE commands
SELECT count(citus_local_table.b), count(postgres_local_table.a) SELECT count(citus_local_table.b), count(postgres_local_table.a)
FROM citus_local_table, postgres_local_table FROM citus_local_table, postgres_local_table
WHERE citus_local_table.a = postgres_local_table.b; WHERE citus_local_table.a = postgres_local_table.b;
count | count ERROR: direct joins between distributed and local tables are not supported
---------------------------------------------------------------------
6 | 6
(1 row)
-- select for update is just OK -- select for update is just OK
SELECT * FROM citus_local_table ORDER BY 1,2 FOR UPDATE; SELECT * FROM citus_local_table ORDER BY 1,2 FOR UPDATE;
a | b a | b
@ -599,17 +591,21 @@ SELECT clear_and_init_test_tables();
DELETE FROM citus_local_table DELETE FROM citus_local_table
USING postgres_local_table USING postgres_local_table
WHERE citus_local_table.b = postgres_local_table.b; WHERE citus_local_table.b = postgres_local_table.b;
ERROR: Modifying local tables with citus local tables is supported only from the coordinator.
UPDATE citus_local_table UPDATE citus_local_table
SET b = 5 SET b = 5
FROM postgres_local_table FROM postgres_local_table
WHERE citus_local_table.a = 3 AND citus_local_table.b = postgres_local_table.b; WHERE citus_local_table.a = 3 AND citus_local_table.b = postgres_local_table.b;
ERROR: Modifying local tables with citus local tables is supported only from the coordinator.
DELETE FROM postgres_local_table DELETE FROM postgres_local_table
USING citus_local_table USING citus_local_table
WHERE citus_local_table.b = postgres_local_table.b; WHERE citus_local_table.b = postgres_local_table.b;
ERROR: Modifying local tables with citus local tables is supported only from the coordinator.
UPDATE postgres_local_table UPDATE postgres_local_table
SET b = 5 SET b = 5
FROM citus_local_table FROM citus_local_table
WHERE citus_local_table.a = 3 AND citus_local_table.b = postgres_local_table.b; WHERE citus_local_table.a = 3 AND citus_local_table.b = postgres_local_table.b;
ERROR: Modifying local tables with citus local tables is supported only from the coordinator.
-- no direct joins supported -- no direct joins supported
UPDATE distributed_table UPDATE distributed_table
SET b = 6 SET b = 6
@ -682,7 +678,7 @@ SELECT count(*) FROM distributed_table WHERE b in
(SELECT count FROM mat_view_4); (SELECT count FROM mat_view_4);
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 0
(1 row) (1 row)
CREATE VIEW view_2 AS CREATE VIEW view_2 AS
@ -713,7 +709,7 @@ SELECT count(*) FROM view_3;
SELECT count(*) FROM view_3, distributed_table; SELECT count(*) FROM view_3, distributed_table;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
6 0
(1 row) (1 row)
--------------------------------------------------------------------- ---------------------------------------------------------------------