mirror of https://github.com/citusdata/citus.git
Rearrange the common code into a newfunction to facilitate the multiple checks of the same conditions in a multi-modify MERGE statement
parent
a7689c3f8d
commit
d7b499929c
|
@ -121,7 +121,6 @@ static void CreateSingleTaskRouterSelectPlan(DistributedPlan *distributedPlan,
|
||||||
Query *query,
|
Query *query,
|
||||||
PlannerRestrictionContext *
|
PlannerRestrictionContext *
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext);
|
||||||
static Oid ResultRelationOidForQuery(Query *query);
|
|
||||||
static bool IsTidColumn(Node *node);
|
static bool IsTidColumn(Node *node);
|
||||||
static DeferredErrorMessage * ModifyPartialQuerySupported(Query *queryTree, bool
|
static DeferredErrorMessage * ModifyPartialQuerySupported(Query *queryTree, bool
|
||||||
multiShardQuery,
|
multiShardQuery,
|
||||||
|
@ -180,7 +179,12 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
||||||
static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList);
|
static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList);
|
||||||
static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList);
|
static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList);
|
||||||
static bool IsLocallyAccessibleCitusLocalTable(Oid relationId);
|
static bool IsLocallyAccessibleCitusLocalTable(Oid relationId);
|
||||||
|
static DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelationId,
|
||||||
|
FromExpr *joinTree,
|
||||||
|
Node *quals,
|
||||||
|
List *targetList,
|
||||||
|
CmdType commandType,
|
||||||
|
List *returningList);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CreateRouterPlan attempts to create a router executor plan for the given
|
* CreateRouterPlan attempts to create a router executor plan for the given
|
||||||
|
@ -445,7 +449,7 @@ ModifyQueryResultRelationId(Query *query)
|
||||||
* ResultRelationOidForQuery returns the OID of the relation this is modified
|
* ResultRelationOidForQuery returns the OID of the relation this is modified
|
||||||
* by a given query.
|
* by a given query.
|
||||||
*/
|
*/
|
||||||
static Oid
|
Oid
|
||||||
ResultRelationOidForQuery(Query *query)
|
ResultRelationOidForQuery(Query *query)
|
||||||
{
|
{
|
||||||
RangeTblEntry *resultRTE = rt_fetch(query->resultRelation, query->rtable);
|
RangeTblEntry *resultRTE = rt_fetch(query->resultRelation, query->rtable);
|
||||||
|
@ -512,6 +516,161 @@ IsTidColumn(Node *node)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TargetlistAndFunctionsSupported implements a subset of what ModifyPartialQuerySupported
|
||||||
|
* checks, that subset being checking what functions are allowed, if we are
|
||||||
|
* updating distribution column, etc.
|
||||||
|
* Note: This subset of checks are repeated for each MERGE modify action.
|
||||||
|
*/
|
||||||
|
static DeferredErrorMessage *
|
||||||
|
TargetlistAndFunctionsSupported(Oid resultRelationId, FromExpr *joinTree, Node *quals,
|
||||||
|
List *targetList,
|
||||||
|
CmdType commandType, List *returningList)
|
||||||
|
{
|
||||||
|
uint32 rangeTableId = 1;
|
||||||
|
Var *partitionColumn = NULL;
|
||||||
|
|
||||||
|
if (IsCitusTable(resultRelationId))
|
||||||
|
{
|
||||||
|
partitionColumn = PartitionColumn(resultRelationId, rangeTableId);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool hasVarArgument = false; /* A STABLE function is passed a Var argument */
|
||||||
|
bool hasBadCoalesce = false; /* CASE/COALESCE passed a mutable function */
|
||||||
|
ListCell *targetEntryCell = NULL;
|
||||||
|
|
||||||
|
foreach(targetEntryCell, targetList)
|
||||||
|
{
|
||||||
|
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
||||||
|
|
||||||
|
/* skip resjunk entries: UPDATE adds some for ctid, etc. */
|
||||||
|
if (targetEntry->resjunk)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool targetEntryPartitionColumn = false;
|
||||||
|
AttrNumber targetColumnAttrNumber = InvalidAttrNumber;
|
||||||
|
|
||||||
|
/* reference tables do not have partition column */
|
||||||
|
if (partitionColumn == NULL)
|
||||||
|
{
|
||||||
|
targetEntryPartitionColumn = false;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (commandType == CMD_UPDATE)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Note that it is not possible to give an alias to
|
||||||
|
* UPDATE table SET ...
|
||||||
|
*/
|
||||||
|
if (targetEntry->resname)
|
||||||
|
{
|
||||||
|
targetColumnAttrNumber = get_attnum(resultRelationId,
|
||||||
|
targetEntry->resname);
|
||||||
|
if (targetColumnAttrNumber == partitionColumn->varattno)
|
||||||
|
{
|
||||||
|
targetEntryPartitionColumn = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (commandType == CMD_UPDATE &&
|
||||||
|
FindNodeMatchingCheckFunction((Node *) targetEntry->expr,
|
||||||
|
CitusIsVolatileFunction))
|
||||||
|
{
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"functions used in UPDATE queries on distributed "
|
||||||
|
"tables must not be VOLATILE",
|
||||||
|
NULL, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (commandType == CMD_UPDATE && targetEntryPartitionColumn &&
|
||||||
|
TargetEntryChangesValue(targetEntry, partitionColumn,
|
||||||
|
joinTree))
|
||||||
|
{
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"modifying the partition value of rows is not "
|
||||||
|
"allowed",
|
||||||
|
NULL, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (commandType == CMD_UPDATE &&
|
||||||
|
MasterIrreducibleExpression((Node *) targetEntry->expr,
|
||||||
|
&hasVarArgument, &hasBadCoalesce))
|
||||||
|
{
|
||||||
|
Assert(hasVarArgument || hasBadCoalesce);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (FindNodeMatchingCheckFunction((Node *) targetEntry->expr,
|
||||||
|
NodeIsFieldStore))
|
||||||
|
{
|
||||||
|
/* DELETE cannot do field indirection already */
|
||||||
|
Assert(commandType == CMD_UPDATE || commandType == CMD_INSERT);
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"inserting or modifying composite type fields is not "
|
||||||
|
"supported", NULL,
|
||||||
|
"Use the column name to insert or update the composite "
|
||||||
|
"type as a single value");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (joinTree != NULL)
|
||||||
|
{
|
||||||
|
if (FindNodeMatchingCheckFunction((Node *) quals,
|
||||||
|
CitusIsVolatileFunction))
|
||||||
|
{
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"functions used in the WHERE/ON/WHEN clause of modification "
|
||||||
|
"queries on distributed tables must not be VOLATILE",
|
||||||
|
NULL, NULL);
|
||||||
|
}
|
||||||
|
else if (MasterIrreducibleExpression(quals, &hasVarArgument,
|
||||||
|
&hasBadCoalesce))
|
||||||
|
{
|
||||||
|
Assert(hasVarArgument || hasBadCoalesce);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasVarArgument)
|
||||||
|
{
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"STABLE functions used in UPDATE queries "
|
||||||
|
"cannot be called with column references",
|
||||||
|
NULL, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasBadCoalesce)
|
||||||
|
{
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"non-IMMUTABLE functions are not allowed in CASE or "
|
||||||
|
"COALESCE statements",
|
||||||
|
NULL, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (contain_mutable_functions((Node *) returningList))
|
||||||
|
{
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"non-IMMUTABLE functions are not allowed in the "
|
||||||
|
"RETURNING clause",
|
||||||
|
NULL, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (quals != NULL &&
|
||||||
|
nodeTag(quals) == T_CurrentOfExpr)
|
||||||
|
{
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"cannot run DML queries with cursors", NULL,
|
||||||
|
NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ModifyPartialQuerySupported implements a subset of what ModifyQuerySupported checks,
|
* ModifyPartialQuerySupported implements a subset of what ModifyQuerySupported checks,
|
||||||
* that subset being what's necessary to check modifying CTEs for.
|
* that subset being what's necessary to check modifying CTEs for.
|
||||||
|
@ -620,148 +779,21 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
|
||||||
|
|
||||||
Oid resultRelationId = ModifyQueryResultRelationId(queryTree);
|
Oid resultRelationId = ModifyQueryResultRelationId(queryTree);
|
||||||
*distributedTableIdOutput = resultRelationId;
|
*distributedTableIdOutput = resultRelationId;
|
||||||
uint32 rangeTableId = 1;
|
|
||||||
|
|
||||||
Var *partitionColumn = NULL;
|
|
||||||
if (IsCitusTable(resultRelationId))
|
|
||||||
{
|
|
||||||
partitionColumn = PartitionColumn(resultRelationId, rangeTableId);
|
|
||||||
}
|
|
||||||
commandType = queryTree->commandType;
|
commandType = queryTree->commandType;
|
||||||
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
|
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
|
||||||
commandType == CMD_DELETE)
|
commandType == CMD_DELETE)
|
||||||
{
|
{
|
||||||
bool hasVarArgument = false; /* A STABLE function is passed a Var argument */
|
deferredError =
|
||||||
bool hasBadCoalesce = false; /* CASE/COALESCE passed a mutable function */
|
TargetlistAndFunctionsSupported(resultRelationId,
|
||||||
FromExpr *joinTree = queryTree->jointree;
|
queryTree->jointree,
|
||||||
ListCell *targetEntryCell = NULL;
|
queryTree->jointree->quals,
|
||||||
|
queryTree->targetList,
|
||||||
foreach(targetEntryCell, queryTree->targetList)
|
commandType,
|
||||||
|
queryTree->returningList);
|
||||||
|
if (deferredError)
|
||||||
{
|
{
|
||||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
return deferredError;
|
||||||
|
|
||||||
/* skip resjunk entries: UPDATE adds some for ctid, etc. */
|
|
||||||
if (targetEntry->resjunk)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool targetEntryPartitionColumn = false;
|
|
||||||
AttrNumber targetColumnAttrNumber = InvalidAttrNumber;
|
|
||||||
|
|
||||||
/* reference tables do not have partition column */
|
|
||||||
if (partitionColumn == NULL)
|
|
||||||
{
|
|
||||||
targetEntryPartitionColumn = false;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (commandType == CMD_UPDATE)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Note that it is not possible to give an alias to
|
|
||||||
* UPDATE table SET ...
|
|
||||||
*/
|
|
||||||
if (targetEntry->resname)
|
|
||||||
{
|
|
||||||
targetColumnAttrNumber = get_attnum(resultRelationId,
|
|
||||||
targetEntry->resname);
|
|
||||||
if (targetColumnAttrNumber == partitionColumn->varattno)
|
|
||||||
{
|
|
||||||
targetEntryPartitionColumn = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
if (commandType == CMD_UPDATE &&
|
|
||||||
FindNodeMatchingCheckFunction((Node *) targetEntry->expr,
|
|
||||||
CitusIsVolatileFunction))
|
|
||||||
{
|
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
|
||||||
"functions used in UPDATE queries on distributed "
|
|
||||||
"tables must not be VOLATILE",
|
|
||||||
NULL, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (commandType == CMD_UPDATE && targetEntryPartitionColumn &&
|
|
||||||
TargetEntryChangesValue(targetEntry, partitionColumn,
|
|
||||||
queryTree->jointree))
|
|
||||||
{
|
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
|
||||||
"modifying the partition value of rows is not "
|
|
||||||
"allowed",
|
|
||||||
NULL, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (commandType == CMD_UPDATE &&
|
|
||||||
MasterIrreducibleExpression((Node *) targetEntry->expr,
|
|
||||||
&hasVarArgument, &hasBadCoalesce))
|
|
||||||
{
|
|
||||||
Assert(hasVarArgument || hasBadCoalesce);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (FindNodeMatchingCheckFunction((Node *) targetEntry->expr,
|
|
||||||
NodeIsFieldStore))
|
|
||||||
{
|
|
||||||
/* DELETE cannot do field indirection already */
|
|
||||||
Assert(commandType == CMD_UPDATE || commandType == CMD_INSERT);
|
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
|
||||||
"inserting or modifying composite type fields is not "
|
|
||||||
"supported", NULL,
|
|
||||||
"Use the column name to insert or update the composite "
|
|
||||||
"type as a single value");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (joinTree != NULL)
|
|
||||||
{
|
|
||||||
if (FindNodeMatchingCheckFunction((Node *) joinTree->quals,
|
|
||||||
CitusIsVolatileFunction))
|
|
||||||
{
|
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
|
||||||
"functions used in the WHERE clause of modification "
|
|
||||||
"queries on distributed tables must not be VOLATILE",
|
|
||||||
NULL, NULL);
|
|
||||||
}
|
|
||||||
else if (MasterIrreducibleExpression(joinTree->quals, &hasVarArgument,
|
|
||||||
&hasBadCoalesce))
|
|
||||||
{
|
|
||||||
Assert(hasVarArgument || hasBadCoalesce);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (hasVarArgument)
|
|
||||||
{
|
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
|
||||||
"STABLE functions used in UPDATE queries "
|
|
||||||
"cannot be called with column references",
|
|
||||||
NULL, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (hasBadCoalesce)
|
|
||||||
{
|
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
|
||||||
"non-IMMUTABLE functions are not allowed in CASE or "
|
|
||||||
"COALESCE statements",
|
|
||||||
NULL, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (contain_mutable_functions((Node *) queryTree->returningList))
|
|
||||||
{
|
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
|
||||||
"non-IMMUTABLE functions are not allowed in the "
|
|
||||||
"RETURNING clause",
|
|
||||||
NULL, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (queryTree->jointree->quals != NULL &&
|
|
||||||
nodeTag(queryTree->jointree->quals) == T_CurrentOfExpr)
|
|
||||||
{
|
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
|
||||||
"cannot run DML queries with cursors", NULL,
|
|
||||||
NULL);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -99,6 +99,7 @@ extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamLi
|
||||||
boundParams);
|
boundParams);
|
||||||
extern bool FastPathRouterQuery(Query *query, Node **distributionKeyValue);
|
extern bool FastPathRouterQuery(Query *query, Node **distributionKeyValue);
|
||||||
extern bool JoinConditionIsOnFalse(List *relOptInfo);
|
extern bool JoinConditionIsOnFalse(List *relOptInfo);
|
||||||
|
extern Oid ResultRelationOidForQuery(Query *query);
|
||||||
|
|
||||||
|
|
||||||
#endif /* MULTI_ROUTER_PLANNER_H */
|
#endif /* MULTI_ROUTER_PLANNER_H */
|
||||||
|
|
|
@ -177,7 +177,7 @@ INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:
|
||||||
INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random());
|
INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random());
|
||||||
-- commands with mutable functions in their quals
|
-- commands with mutable functions in their quals
|
||||||
DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000);
|
DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000);
|
||||||
ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE
|
ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE
|
||||||
-- commands with mutable but non-volatile functions(ie: stable func.) in their quals
|
-- commands with mutable but non-volatile functions(ie: stable func.) in their quals
|
||||||
-- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable)
|
-- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable)
|
||||||
DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp;
|
DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp;
|
||||||
|
|
|
@ -95,7 +95,7 @@ INSERT INTO limit_orders_mx VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:
|
||||||
INSERT INTO limit_orders_mx VALUES (2036, 'GOOG', 5634, now(), 'buy', random());
|
INSERT INTO limit_orders_mx VALUES (2036, 'GOOG', 5634, now(), 'buy', random());
|
||||||
-- commands with mutable functions in their quals
|
-- commands with mutable functions in their quals
|
||||||
DELETE FROM limit_orders_mx WHERE id = 246 AND bidder_id = (random() * 1000);
|
DELETE FROM limit_orders_mx WHERE id = 246 AND bidder_id = (random() * 1000);
|
||||||
ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE
|
ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE
|
||||||
-- commands with mutable but non-volatile functions(ie: stable func.) in their quals
|
-- commands with mutable but non-volatile functions(ie: stable func.) in their quals
|
||||||
-- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable)
|
-- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable)
|
||||||
DELETE FROM limit_orders_mx WHERE id = 246 AND placed_at = current_timestamp::timestamp;
|
DELETE FROM limit_orders_mx WHERE id = 246 AND placed_at = current_timestamp::timestamp;
|
||||||
|
|
|
@ -674,7 +674,7 @@ UPDATE users_test_table
|
||||||
SET value_2 = 5
|
SET value_2 = 5
|
||||||
FROM events_test_table
|
FROM events_test_table
|
||||||
WHERE users_test_table.user_id = events_test_table.user_id * random();
|
WHERE users_test_table.user_id = events_test_table.user_id * random();
|
||||||
ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE
|
ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE
|
||||||
UPDATE users_test_table
|
UPDATE users_test_table
|
||||||
SET value_2 = 5 * random()
|
SET value_2 = 5 * random()
|
||||||
FROM events_test_table
|
FROM events_test_table
|
||||||
|
|
Loading…
Reference in New Issue