mirror of https://github.com/citusdata/citus.git
Support UPDATE/DELETE with parameterised partition column qual
parent
09c42481bb
commit
b8aa47e58f
|
@ -93,8 +93,9 @@ static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *sta
|
|||
static char MostPermissiveVolatileFlag(char left, char right);
|
||||
static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
|
||||
FromExpr *joinTree);
|
||||
static Task * RouterModifyTask(Query *originalQuery, Query *query);
|
||||
static ShardInterval * TargetShardIntervalForModify(Query *query);
|
||||
static Task * RouterModifyTask(Query *originalQuery, ShardInterval *shardInterval);
|
||||
static ShardInterval * TargetShardIntervalForModify(Query *query,
|
||||
DeferredErrorMessage **planningError);
|
||||
static List * QueryRestrictList(Query *query);
|
||||
static bool FastShardPruningPossible(CmdType commandType, char partitionMethod);
|
||||
static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
|
||||
|
@ -203,13 +204,25 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
|
|||
|
||||
if (modifyTask)
|
||||
{
|
||||
ShardInterval *targetShardInterval = NULL;
|
||||
DeferredErrorMessage *planningError = NULL;
|
||||
|
||||
/* FIXME: this should probably rather be inlined into CreateModifyPlan */
|
||||
multiPlan->planningError = ModifyQuerySupported(query);
|
||||
if (multiPlan->planningError)
|
||||
planningError = ModifyQuerySupported(query);
|
||||
if (planningError != NULL)
|
||||
{
|
||||
multiPlan->planningError = planningError;
|
||||
return multiPlan;
|
||||
}
|
||||
task = RouterModifyTask(originalQuery, query);
|
||||
|
||||
targetShardInterval = TargetShardIntervalForModify(query, &planningError);
|
||||
if (planningError != NULL)
|
||||
{
|
||||
multiPlan->planningError = planningError;
|
||||
return multiPlan;
|
||||
}
|
||||
|
||||
task = RouterModifyTask(originalQuery, targetShardInterval);
|
||||
Assert(task);
|
||||
}
|
||||
else
|
||||
|
@ -1847,9 +1860,8 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
|
|||
* shard-extended deparsed SQL to be run during execution.
|
||||
*/
|
||||
static Task *
|
||||
RouterModifyTask(Query *originalQuery, Query *query)
|
||||
RouterModifyTask(Query *originalQuery, ShardInterval *shardInterval)
|
||||
{
|
||||
ShardInterval *shardInterval = TargetShardIntervalForModify(query);
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
Oid distributedTableId = shardInterval->relationId;
|
||||
StringInfo queryString = makeStringInfo();
|
||||
|
@ -1895,11 +1907,12 @@ RouterModifyTask(Query *originalQuery, Query *query)
|
|||
|
||||
/*
|
||||
* TargetShardIntervalForModify determines the single shard targeted by a provided
|
||||
* modify command. If no matching shards exist, or if the modification targets more
|
||||
* than one shard, this function raises an error depending on the command type.
|
||||
* modify command. If no matching shards exist, it throws an error. If the modification
|
||||
* targets more than one shard, this function sets the deferred error and returns NULL,
|
||||
* to handle cases in which we cannot prune down to one shard due to a parameter.
|
||||
*/
|
||||
static ShardInterval *
|
||||
TargetShardIntervalForModify(Query *query)
|
||||
TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError)
|
||||
{
|
||||
List *prunedShardList = NIL;
|
||||
int prunedShardCount = 0;
|
||||
|
@ -1994,22 +2007,12 @@ TargetShardIntervalForModify(Query *query)
|
|||
"all shards satisfying delete criteria.");
|
||||
}
|
||||
|
||||
(*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"distributed modifications must target "
|
||||
"exactly one shard",
|
||||
errorDetail, errorHint->data);
|
||||
|
||||
if (errorDetail == NULL)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("distributed modifications must target exactly one "
|
||||
"shard"),
|
||||
errhint("%s", errorHint->data)));
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("distributed modifications must target exactly one "
|
||||
"shard"),
|
||||
errdetail("%s", errorDetail),
|
||||
errhint("%s", errorHint->data)));
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return (ShardInterval *) linitial(prunedShardList);
|
||||
|
|
|
@ -917,11 +917,11 @@ SELECT partition_parameter_update(5, 51);
|
|||
(1 row)
|
||||
|
||||
SELECT partition_parameter_update(6, 61);
|
||||
ERROR: distributed modifications must target exactly one shard
|
||||
DETAIL: This command modifies all shards.
|
||||
HINT: Consider using an equality filter on partition column "key". You can use master_modify_multiple_shards() to perform multi-shard delete or update operations.
|
||||
CONTEXT: SQL statement "UPDATE plpgsql_table SET value = $2 WHERE key = $1"
|
||||
PL/pgSQL function partition_parameter_update(integer,integer) line 3 at SQL statement
|
||||
partition_parameter_update
|
||||
----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE FUNCTION non_partition_parameter_update(int, int) RETURNS void as $$
|
||||
BEGIN
|
||||
UPDATE plpgsql_table SET value = $2 WHERE key = 0 AND value = $1;
|
||||
|
@ -990,8 +990,8 @@ SELECT * FROM plpgsql_table ORDER BY key, value;
|
|||
4 | 41
|
||||
5 | 51
|
||||
5 | 51
|
||||
6 | 60
|
||||
6 |
|
||||
6 | 61
|
||||
6 | 61
|
||||
(24 rows)
|
||||
|
||||
-- check deletes
|
||||
|
@ -1032,11 +1032,11 @@ SELECT partition_parameter_delete(5, 51);
|
|||
(1 row)
|
||||
|
||||
SELECT partition_parameter_delete(6, 61);
|
||||
ERROR: distributed modifications must target exactly one shard
|
||||
DETAIL: This command modifies all shards.
|
||||
HINT: Consider using an equality filter on partition column "key". You can use master_modify_multiple_shards() to perform multi-shard delete or update operations.
|
||||
CONTEXT: SQL statement "DELETE FROM plpgsql_table WHERE key = $1 AND value = $2"
|
||||
PL/pgSQL function partition_parameter_delete(integer,integer) line 3 at SQL statement
|
||||
partition_parameter_delete
|
||||
----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE FUNCTION non_partition_parameter_delete(int) RETURNS void as $$
|
||||
BEGIN
|
||||
DELETE FROM plpgsql_table WHERE key = 0 AND value = $1;
|
||||
|
@ -1089,9 +1089,7 @@ SELECT * FROM plpgsql_table ORDER BY key, value;
|
|||
0 |
|
||||
0 |
|
||||
0 |
|
||||
6 | 60
|
||||
6 |
|
||||
(8 rows)
|
||||
(6 rows)
|
||||
|
||||
-- check whether we can handle execute parameters
|
||||
CREATE TABLE execute_parameter_test (key int, val date);
|
||||
|
|
|
@ -776,9 +776,6 @@ EXECUTE prepared_partition_parameter_update(3, 31);
|
|||
EXECUTE prepared_partition_parameter_update(4, 41);
|
||||
EXECUTE prepared_partition_parameter_update(5, 51);
|
||||
EXECUTE prepared_partition_parameter_update(6, 61);
|
||||
ERROR: distributed modifications must target exactly one shard
|
||||
DETAIL: This command modifies all shards.
|
||||
HINT: Consider using an equality filter on partition column "key". You can use master_modify_multiple_shards() to perform multi-shard delete or update operations.
|
||||
PREPARE prepared_non_partition_parameter_update(int, int) AS
|
||||
UPDATE prepare_table SET value = $2 WHERE key = 0 AND value = $1;
|
||||
-- execute 6 times to trigger prepared statement usage
|
||||
|
@ -814,8 +811,8 @@ SELECT * FROM prepare_table ORDER BY key, value;
|
|||
4 | 41
|
||||
5 | 51
|
||||
5 | 51
|
||||
6 | 60
|
||||
6 |
|
||||
6 | 61
|
||||
6 | 61
|
||||
(24 rows)
|
||||
|
||||
-- check deletes
|
||||
|
@ -827,9 +824,6 @@ EXECUTE prepared_partition_parameter_delete(3, 31);
|
|||
EXECUTE prepared_partition_parameter_delete(4, 41);
|
||||
EXECUTE prepared_partition_parameter_delete(5, 51);
|
||||
EXECUTE prepared_partition_parameter_delete(6, 61);
|
||||
ERROR: distributed modifications must target exactly one shard
|
||||
DETAIL: This command modifies all shards.
|
||||
HINT: Consider using an equality filter on partition column "key". You can use master_modify_multiple_shards() to perform multi-shard delete or update operations.
|
||||
PREPARE prepared_non_partition_parameter_delete(int) AS
|
||||
DELETE FROM prepare_table WHERE key = 0 AND value = $1;
|
||||
-- execute 6 times to trigger prepared statement usage
|
||||
|
@ -849,9 +843,7 @@ SELECT * FROM prepare_table ORDER BY key, value;
|
|||
0 |
|
||||
0 |
|
||||
0 |
|
||||
6 | 60
|
||||
6 |
|
||||
(8 rows)
|
||||
(6 rows)
|
||||
|
||||
-- verify placement state updates invalidate shard state
|
||||
--
|
||||
|
|
|
@ -315,9 +315,7 @@ SELECT * FROM prepare_table ORDER BY key, value;
|
|||
0 |
|
||||
0 |
|
||||
0 |
|
||||
6 | 60
|
||||
6 |
|
||||
(8 rows)
|
||||
(6 rows)
|
||||
|
||||
DROP TABLE temp_table;
|
||||
-- clean-up functions
|
||||
|
|
Loading…
Reference in New Issue