mirror of https://github.com/citusdata/citus.git
Merge pull request #1320 from citusdata/prepared_update_delete
Support UPDATE/DELETE with parameterised partition column qualpull/1326/head
commit
acb84d9ca3
|
@ -100,8 +100,9 @@ static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *sta
|
||||||
static char MostPermissiveVolatileFlag(char left, char right);
|
static char MostPermissiveVolatileFlag(char left, char right);
|
||||||
static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
|
static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
|
||||||
FromExpr *joinTree);
|
FromExpr *joinTree);
|
||||||
static Task * RouterModifyTask(Query *originalQuery, Query *query);
|
static Task * RouterModifyTask(Query *originalQuery, ShardInterval *shardInterval);
|
||||||
static ShardInterval * TargetShardIntervalForModify(Query *query);
|
static ShardInterval * TargetShardIntervalForModify(Query *query,
|
||||||
|
DeferredErrorMessage **planningError);
|
||||||
static List * QueryRestrictList(Query *query);
|
static List * QueryRestrictList(Query *query);
|
||||||
static bool FastShardPruningPossible(CmdType commandType, char partitionMethod);
|
static bool FastShardPruningPossible(CmdType commandType, char partitionMethod);
|
||||||
static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
|
static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
|
||||||
|
@ -213,13 +214,25 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
|
||||||
|
|
||||||
if (modifyTask)
|
if (modifyTask)
|
||||||
{
|
{
|
||||||
|
ShardInterval *targetShardInterval = NULL;
|
||||||
|
DeferredErrorMessage *planningError = NULL;
|
||||||
|
|
||||||
/* FIXME: this should probably rather be inlined into CreateModifyPlan */
|
/* FIXME: this should probably rather be inlined into CreateModifyPlan */
|
||||||
multiPlan->planningError = ModifyQuerySupported(query);
|
planningError = ModifyQuerySupported(query);
|
||||||
if (multiPlan->planningError)
|
if (planningError != NULL)
|
||||||
{
|
{
|
||||||
|
multiPlan->planningError = planningError;
|
||||||
return multiPlan;
|
return multiPlan;
|
||||||
}
|
}
|
||||||
task = RouterModifyTask(originalQuery, query);
|
|
||||||
|
targetShardInterval = TargetShardIntervalForModify(query, &planningError);
|
||||||
|
if (planningError != NULL)
|
||||||
|
{
|
||||||
|
multiPlan->planningError = planningError;
|
||||||
|
return multiPlan;
|
||||||
|
}
|
||||||
|
|
||||||
|
task = RouterModifyTask(originalQuery, targetShardInterval);
|
||||||
Assert(task);
|
Assert(task);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -1803,9 +1816,8 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
|
||||||
* shard-extended deparsed SQL to be run during execution.
|
* shard-extended deparsed SQL to be run during execution.
|
||||||
*/
|
*/
|
||||||
static Task *
|
static Task *
|
||||||
RouterModifyTask(Query *originalQuery, Query *query)
|
RouterModifyTask(Query *originalQuery, ShardInterval *shardInterval)
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = TargetShardIntervalForModify(query);
|
|
||||||
uint64 shardId = shardInterval->shardId;
|
uint64 shardId = shardInterval->shardId;
|
||||||
Oid distributedTableId = shardInterval->relationId;
|
Oid distributedTableId = shardInterval->relationId;
|
||||||
StringInfo queryString = makeStringInfo();
|
StringInfo queryString = makeStringInfo();
|
||||||
|
@ -1851,11 +1863,12 @@ RouterModifyTask(Query *originalQuery, Query *query)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TargetShardIntervalForModify determines the single shard targeted by a provided
|
* TargetShardIntervalForModify determines the single shard targeted by a provided
|
||||||
* modify command. If no matching shards exist, or if the modification targets more
|
* modify command. If no matching shards exist, it throws an error. If the modification
|
||||||
* than one shard, this function raises an error depending on the command type.
|
* targets more than one shard, this function sets the deferred error and returns NULL,
|
||||||
|
* to handle cases in which we cannot prune down to one shard due to a parameter.
|
||||||
*/
|
*/
|
||||||
static ShardInterval *
|
static ShardInterval *
|
||||||
TargetShardIntervalForModify(Query *query)
|
TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError)
|
||||||
{
|
{
|
||||||
List *prunedShardList = NIL;
|
List *prunedShardList = NIL;
|
||||||
int prunedShardCount = 0;
|
int prunedShardCount = 0;
|
||||||
|
@ -1930,6 +1943,7 @@ TargetShardIntervalForModify(Query *query)
|
||||||
Oid relationId = cacheEntry->relationId;
|
Oid relationId = cacheEntry->relationId;
|
||||||
char *partitionKeyString = cacheEntry->partitionKeyString;
|
char *partitionKeyString = cacheEntry->partitionKeyString;
|
||||||
char *partitionColumnName = ColumnNameToColumn(relationId, partitionKeyString);
|
char *partitionColumnName = ColumnNameToColumn(relationId, partitionKeyString);
|
||||||
|
StringInfo errorMessage = makeStringInfo();
|
||||||
StringInfo errorHint = makeStringInfo();
|
StringInfo errorHint = makeStringInfo();
|
||||||
const char *targetCountType = NULL;
|
const char *targetCountType = NULL;
|
||||||
bool showHint = false;
|
bool showHint = false;
|
||||||
|
@ -1973,10 +1987,14 @@ TargetShardIntervalForModify(Query *query)
|
||||||
|
|
||||||
showHint = errorHint->len > 0;
|
showHint = errorHint->len > 0;
|
||||||
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
appendStringInfo(errorMessage, "cannot run %s command which targets %s shards",
|
||||||
errmsg("cannot run %s command which targets %s shards",
|
commandName, targetCountType);
|
||||||
commandName, targetCountType),
|
|
||||||
showHint ? errhint("%s", errorHint->data) : 0));
|
(*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
errorMessage->data, NULL,
|
||||||
|
showHint ? errorHint->data : NULL);
|
||||||
|
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (ShardInterval *) linitial(prunedShardList);
|
return (ShardInterval *) linitial(prunedShardList);
|
||||||
|
|
|
@ -917,10 +917,11 @@ SELECT partition_parameter_update(5, 51);
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT partition_parameter_update(6, 61);
|
SELECT partition_parameter_update(6, 61);
|
||||||
ERROR: cannot run UPDATE command which targets multiple shards
|
partition_parameter_update
|
||||||
HINT: Consider using an equality filter on partition column "key" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
|
----------------------------
|
||||||
CONTEXT: SQL statement "UPDATE plpgsql_table SET value = $2 WHERE key = $1"
|
|
||||||
PL/pgSQL function partition_parameter_update(integer,integer) line 3 at SQL statement
|
(1 row)
|
||||||
|
|
||||||
CREATE FUNCTION non_partition_parameter_update(int, int) RETURNS void as $$
|
CREATE FUNCTION non_partition_parameter_update(int, int) RETURNS void as $$
|
||||||
BEGIN
|
BEGIN
|
||||||
UPDATE plpgsql_table SET value = $2 WHERE key = 0 AND value = $1;
|
UPDATE plpgsql_table SET value = $2 WHERE key = 0 AND value = $1;
|
||||||
|
@ -989,8 +990,8 @@ SELECT * FROM plpgsql_table ORDER BY key, value;
|
||||||
4 | 41
|
4 | 41
|
||||||
5 | 51
|
5 | 51
|
||||||
5 | 51
|
5 | 51
|
||||||
6 | 60
|
6 | 61
|
||||||
6 |
|
6 | 61
|
||||||
(24 rows)
|
(24 rows)
|
||||||
|
|
||||||
-- check deletes
|
-- check deletes
|
||||||
|
@ -1031,10 +1032,11 @@ SELECT partition_parameter_delete(5, 51);
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT partition_parameter_delete(6, 61);
|
SELECT partition_parameter_delete(6, 61);
|
||||||
ERROR: cannot run DELETE command which targets multiple shards
|
partition_parameter_delete
|
||||||
HINT: Consider using an equality filter on partition column "key" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
|
----------------------------
|
||||||
CONTEXT: SQL statement "DELETE FROM plpgsql_table WHERE key = $1 AND value = $2"
|
|
||||||
PL/pgSQL function partition_parameter_delete(integer,integer) line 3 at SQL statement
|
(1 row)
|
||||||
|
|
||||||
CREATE FUNCTION non_partition_parameter_delete(int) RETURNS void as $$
|
CREATE FUNCTION non_partition_parameter_delete(int) RETURNS void as $$
|
||||||
BEGIN
|
BEGIN
|
||||||
DELETE FROM plpgsql_table WHERE key = 0 AND value = $1;
|
DELETE FROM plpgsql_table WHERE key = 0 AND value = $1;
|
||||||
|
@ -1087,9 +1089,7 @@ SELECT * FROM plpgsql_table ORDER BY key, value;
|
||||||
0 |
|
0 |
|
||||||
0 |
|
0 |
|
||||||
0 |
|
0 |
|
||||||
6 | 60
|
(6 rows)
|
||||||
6 |
|
|
||||||
(8 rows)
|
|
||||||
|
|
||||||
-- check whether we can handle execute parameters
|
-- check whether we can handle execute parameters
|
||||||
CREATE TABLE execute_parameter_test (key int, val date);
|
CREATE TABLE execute_parameter_test (key int, val date);
|
||||||
|
|
|
@ -776,8 +776,6 @@ EXECUTE prepared_partition_parameter_update(3, 31);
|
||||||
EXECUTE prepared_partition_parameter_update(4, 41);
|
EXECUTE prepared_partition_parameter_update(4, 41);
|
||||||
EXECUTE prepared_partition_parameter_update(5, 51);
|
EXECUTE prepared_partition_parameter_update(5, 51);
|
||||||
EXECUTE prepared_partition_parameter_update(6, 61);
|
EXECUTE prepared_partition_parameter_update(6, 61);
|
||||||
ERROR: cannot run UPDATE command which targets multiple shards
|
|
||||||
HINT: Consider using an equality filter on partition column "key" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
|
|
||||||
PREPARE prepared_non_partition_parameter_update(int, int) AS
|
PREPARE prepared_non_partition_parameter_update(int, int) AS
|
||||||
UPDATE prepare_table SET value = $2 WHERE key = 0 AND value = $1;
|
UPDATE prepare_table SET value = $2 WHERE key = 0 AND value = $1;
|
||||||
-- execute 6 times to trigger prepared statement usage
|
-- execute 6 times to trigger prepared statement usage
|
||||||
|
@ -813,8 +811,8 @@ SELECT * FROM prepare_table ORDER BY key, value;
|
||||||
4 | 41
|
4 | 41
|
||||||
5 | 51
|
5 | 51
|
||||||
5 | 51
|
5 | 51
|
||||||
6 | 60
|
6 | 61
|
||||||
6 |
|
6 | 61
|
||||||
(24 rows)
|
(24 rows)
|
||||||
|
|
||||||
-- check deletes
|
-- check deletes
|
||||||
|
@ -826,8 +824,6 @@ EXECUTE prepared_partition_parameter_delete(3, 31);
|
||||||
EXECUTE prepared_partition_parameter_delete(4, 41);
|
EXECUTE prepared_partition_parameter_delete(4, 41);
|
||||||
EXECUTE prepared_partition_parameter_delete(5, 51);
|
EXECUTE prepared_partition_parameter_delete(5, 51);
|
||||||
EXECUTE prepared_partition_parameter_delete(6, 61);
|
EXECUTE prepared_partition_parameter_delete(6, 61);
|
||||||
ERROR: cannot run DELETE command which targets multiple shards
|
|
||||||
HINT: Consider using an equality filter on partition column "key" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
|
|
||||||
PREPARE prepared_non_partition_parameter_delete(int) AS
|
PREPARE prepared_non_partition_parameter_delete(int) AS
|
||||||
DELETE FROM prepare_table WHERE key = 0 AND value = $1;
|
DELETE FROM prepare_table WHERE key = 0 AND value = $1;
|
||||||
-- execute 6 times to trigger prepared statement usage
|
-- execute 6 times to trigger prepared statement usage
|
||||||
|
@ -847,9 +843,7 @@ SELECT * FROM prepare_table ORDER BY key, value;
|
||||||
0 |
|
0 |
|
||||||
0 |
|
0 |
|
||||||
0 |
|
0 |
|
||||||
6 | 60
|
(6 rows)
|
||||||
6 |
|
|
||||||
(8 rows)
|
|
||||||
|
|
||||||
-- Testing parameters + function evaluation
|
-- Testing parameters + function evaluation
|
||||||
CREATE TABLE prepare_func_table (
|
CREATE TABLE prepare_func_table (
|
||||||
|
@ -999,3 +993,4 @@ EXECUTE countsome; -- no replanning
|
||||||
\set VERBOSITY default
|
\set VERBOSITY default
|
||||||
-- clean-up prepared statements
|
-- clean-up prepared statements
|
||||||
DEALLOCATE ALL;
|
DEALLOCATE ALL;
|
||||||
|
DROP TABLE prepare_table;
|
||||||
|
|
|
@ -253,7 +253,7 @@ SELECT * FROM temp_table ORDER BY key, value;
|
||||||
|
|
||||||
-- check deletes
|
-- check deletes
|
||||||
CREATE FUNCTION non_partition_parameter_delete_sql(int) RETURNS void AS $$
|
CREATE FUNCTION non_partition_parameter_delete_sql(int) RETURNS void AS $$
|
||||||
DELETE FROM prepare_table WHERE key = 0 AND value = $1;
|
DELETE FROM temp_table WHERE key = 0 AND value = $1;
|
||||||
$$ LANGUAGE SQL;
|
$$ LANGUAGE SQL;
|
||||||
-- execute 6 times to trigger prepared statement usage
|
-- execute 6 times to trigger prepared statement usage
|
||||||
SELECT non_partition_parameter_delete_sql(12);
|
SELECT non_partition_parameter_delete_sql(12);
|
||||||
|
@ -293,7 +293,7 @@ SELECT non_partition_parameter_delete_sql(62);
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- check after deletes
|
-- check after deletes
|
||||||
SELECT * FROM prepare_table ORDER BY key, value;
|
SELECT * FROM temp_table ORDER BY key, value;
|
||||||
key | value
|
key | value
|
||||||
-----+-------
|
-----+-------
|
||||||
0 |
|
0 |
|
||||||
|
@ -302,9 +302,7 @@ SELECT * FROM prepare_table ORDER BY key, value;
|
||||||
0 |
|
0 |
|
||||||
0 |
|
0 |
|
||||||
0 |
|
0 |
|
||||||
6 | 60
|
(6 rows)
|
||||||
6 |
|
|
||||||
(8 rows)
|
|
||||||
|
|
||||||
-- test running parameterized SQL function
|
-- test running parameterized SQL function
|
||||||
CREATE TABLE test_parameterized_sql(id integer, org_id integer);
|
CREATE TABLE test_parameterized_sql(id integer, org_id integer);
|
||||||
|
|
|
@ -529,3 +529,5 @@ EXECUTE countsome; -- no replanning
|
||||||
|
|
||||||
-- clean-up prepared statements
|
-- clean-up prepared statements
|
||||||
DEALLOCATE ALL;
|
DEALLOCATE ALL;
|
||||||
|
|
||||||
|
DROP TABLE prepare_table;
|
||||||
|
|
|
@ -111,7 +111,7 @@ SELECT * FROM temp_table ORDER BY key, value;
|
||||||
|
|
||||||
-- check deletes
|
-- check deletes
|
||||||
CREATE FUNCTION non_partition_parameter_delete_sql(int) RETURNS void AS $$
|
CREATE FUNCTION non_partition_parameter_delete_sql(int) RETURNS void AS $$
|
||||||
DELETE FROM prepare_table WHERE key = 0 AND value = $1;
|
DELETE FROM temp_table WHERE key = 0 AND value = $1;
|
||||||
$$ LANGUAGE SQL;
|
$$ LANGUAGE SQL;
|
||||||
|
|
||||||
-- execute 6 times to trigger prepared statement usage
|
-- execute 6 times to trigger prepared statement usage
|
||||||
|
@ -123,7 +123,7 @@ SELECT non_partition_parameter_delete_sql(52);
|
||||||
SELECT non_partition_parameter_delete_sql(62);
|
SELECT non_partition_parameter_delete_sql(62);
|
||||||
|
|
||||||
-- check after deletes
|
-- check after deletes
|
||||||
SELECT * FROM prepare_table ORDER BY key, value;
|
SELECT * FROM temp_table ORDER BY key, value;
|
||||||
|
|
||||||
-- test running parameterized SQL function
|
-- test running parameterized SQL function
|
||||||
CREATE TABLE test_parameterized_sql(id integer, org_id integer);
|
CREATE TABLE test_parameterized_sql(id integer, org_id integer);
|
||||||
|
|
Loading…
Reference in New Issue