mirror of https://github.com/citusdata/citus.git
Merge pull request #2297 from citusdata/fix/insert-select-onconflict
Fix insert ... select on conflict that allowed updates of distribution columnpull/2299/head
commit
6ae5fcd6c9
|
@ -176,6 +176,14 @@ CreateInsertSelectPlan(Query *originalQuery,
|
||||||
PlannerRestrictionContext *plannerRestrictionContext)
|
PlannerRestrictionContext *plannerRestrictionContext)
|
||||||
{
|
{
|
||||||
DistributedPlan *distributedPlan = NULL;
|
DistributedPlan *distributedPlan = NULL;
|
||||||
|
DeferredErrorMessage *deferredError = NULL;
|
||||||
|
|
||||||
|
deferredError = ErrorIfOnConflictNotSupported(originalQuery);
|
||||||
|
if (deferredError != NULL)
|
||||||
|
{
|
||||||
|
/* raising the error as there is no possible solution for the unsupported on conflict statements */
|
||||||
|
RaiseDeferredError(deferredError, ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
distributedPlan = CreateDistributedInsertSelectPlan(originalQuery,
|
distributedPlan = CreateDistributedInsertSelectPlan(originalQuery,
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext);
|
||||||
|
|
|
@ -530,12 +530,8 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
|
||||||
List *rangeTableList = NIL;
|
List *rangeTableList = NIL;
|
||||||
ListCell *rangeTableCell = NULL;
|
ListCell *rangeTableCell = NULL;
|
||||||
uint32 queryTableCount = 0;
|
uint32 queryTableCount = 0;
|
||||||
bool specifiesPartitionValue = false;
|
|
||||||
ListCell *setTargetCell = NULL;
|
|
||||||
List *onConflictSet = NIL;
|
|
||||||
Node *arbiterWhere = NULL;
|
|
||||||
Node *onConflictWhere = NULL;
|
|
||||||
CmdType commandType = queryTree->commandType;
|
CmdType commandType = queryTree->commandType;
|
||||||
|
DeferredErrorMessage *deferredError = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Here, we check if a recursively planned query tries to modify
|
* Here, we check if a recursively planned query tries to modify
|
||||||
|
@ -769,7 +765,10 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
|
||||||
TargetEntryChangesValue(targetEntry, partitionColumn,
|
TargetEntryChangesValue(targetEntry, partitionColumn,
|
||||||
queryTree->jointree))
|
queryTree->jointree))
|
||||||
{
|
{
|
||||||
specifiesPartitionValue = true;
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"modifying the partition value of rows is not "
|
||||||
|
"allowed",
|
||||||
|
NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (commandType == CMD_UPDATE &&
|
if (commandType == CMD_UPDATE &&
|
||||||
|
@ -829,13 +828,46 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (commandType == CMD_INSERT && queryTree->onConflict != NULL)
|
deferredError = ErrorIfOnConflictNotSupported(queryTree);
|
||||||
|
if (deferredError != NULL)
|
||||||
{
|
{
|
||||||
onConflictSet = queryTree->onConflict->onConflictSet;
|
return deferredError;
|
||||||
arbiterWhere = queryTree->onConflict->arbiterWhere;
|
|
||||||
onConflictWhere = queryTree->onConflict->onConflictWhere;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ErrorIfOnConflictNotSupprted returns an error if an INSERT query has an
|
||||||
|
* unsupported ON CONFLICT clause. In particular, changing the partition
|
||||||
|
* column value or using volatile functions is not allowed.
|
||||||
|
*/
|
||||||
|
DeferredErrorMessage *
|
||||||
|
ErrorIfOnConflictNotSupported(Query *queryTree)
|
||||||
|
{
|
||||||
|
Oid distributedTableId = InvalidOid;
|
||||||
|
uint32 rangeTableId = 1;
|
||||||
|
Var *partitionColumn = NULL;
|
||||||
|
List *onConflictSet = NIL;
|
||||||
|
Node *arbiterWhere = NULL;
|
||||||
|
Node *onConflictWhere = NULL;
|
||||||
|
ListCell *setTargetCell = NULL;
|
||||||
|
bool specifiesPartitionValue = false;
|
||||||
|
|
||||||
|
CmdType commandType = queryTree->commandType;
|
||||||
|
if (commandType != CMD_INSERT || queryTree->onConflict == NULL)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
distributedTableId = ExtractFirstDistributedTableId(queryTree);
|
||||||
|
partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
|
||||||
|
|
||||||
|
onConflictSet = queryTree->onConflict->onConflictSet;
|
||||||
|
arbiterWhere = queryTree->onConflict->arbiterWhere;
|
||||||
|
onConflictWhere = queryTree->onConflict->onConflictWhere;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* onConflictSet is expanded via expand_targetlist() on the standard planner.
|
* onConflictSet is expanded via expand_targetlist() on the standard planner.
|
||||||
* This ends up adding all the columns to the onConflictSet even if the user
|
* This ends up adding all the columns to the onConflictSet even if the user
|
||||||
|
|
|
@ -52,6 +52,7 @@ extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree, Query *orig
|
||||||
bool multiShardQuery,
|
bool multiShardQuery,
|
||||||
PlannerRestrictionContext *
|
PlannerRestrictionContext *
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext);
|
||||||
|
extern DeferredErrorMessage * ErrorIfOnConflictNotSupported(Query *queryTree);
|
||||||
extern List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex);
|
extern List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex);
|
||||||
extern RelationRestrictionContext * CopyRelationRestrictionContext(
|
extern RelationRestrictionContext * CopyRelationRestrictionContext(
|
||||||
RelationRestrictionContext *oldContext);
|
RelationRestrictionContext *oldContext);
|
||||||
|
|
|
@ -2735,6 +2735,22 @@ EXECUTE prepared_recursive_insert_select;
|
||||||
EXECUTE prepared_recursive_insert_select;
|
EXECUTE prepared_recursive_insert_select;
|
||||||
EXECUTE prepared_recursive_insert_select;
|
EXECUTE prepared_recursive_insert_select;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
-- upsert with on conflict update distribution column is unsupported
|
||||||
|
INSERT INTO agg_events AS ae
|
||||||
|
(
|
||||||
|
user_id,
|
||||||
|
value_1_agg,
|
||||||
|
agg_time
|
||||||
|
)
|
||||||
|
SELECT user_id,
|
||||||
|
value_1,
|
||||||
|
time
|
||||||
|
FROM raw_events_first
|
||||||
|
ON conflict (user_id, value_1_agg)
|
||||||
|
DO UPDATE
|
||||||
|
SET user_id = 42
|
||||||
|
RETURNING user_id, value_1_agg;
|
||||||
|
ERROR: modifying the partition value of rows is not allowed
|
||||||
-- wrap in a transaction to improve performance
|
-- wrap in a transaction to improve performance
|
||||||
BEGIN;
|
BEGIN;
|
||||||
DROP TABLE coerce_events;
|
DROP TABLE coerce_events;
|
||||||
|
|
|
@ -2104,6 +2104,22 @@ EXECUTE prepared_recursive_insert_select;
|
||||||
EXECUTE prepared_recursive_insert_select;
|
EXECUTE prepared_recursive_insert_select;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- upsert with on conflict update distribution column is unsupported
|
||||||
|
INSERT INTO agg_events AS ae
|
||||||
|
(
|
||||||
|
user_id,
|
||||||
|
value_1_agg,
|
||||||
|
agg_time
|
||||||
|
)
|
||||||
|
SELECT user_id,
|
||||||
|
value_1,
|
||||||
|
time
|
||||||
|
FROM raw_events_first
|
||||||
|
ON conflict (user_id, value_1_agg)
|
||||||
|
DO UPDATE
|
||||||
|
SET user_id = 42
|
||||||
|
RETURNING user_id, value_1_agg;
|
||||||
|
|
||||||
-- wrap in a transaction to improve performance
|
-- wrap in a transaction to improve performance
|
||||||
BEGIN;
|
BEGIN;
|
||||||
DROP TABLE coerce_events;
|
DROP TABLE coerce_events;
|
||||||
|
|
Loading…
Reference in New Issue