From a174eb4f7b9f60413815deda48b92c9e4e861ecb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96nder=20Kalac=C4=B1?= Date: Fri, 3 Jan 2020 12:15:22 +0000 Subject: [PATCH] Do not go through standard_planner() for INSERTs (#3348) That seems unnecessary. We already have the notion of FastPath queries, simply add it there. --- .../planner/fast_path_router_planner.c | 29 +- .../planner/insert_select_planner.c | 3 +- .../distributed/insert_select_planner.h | 1 + .../expected/local_shard_execution.out | 255 +++++++++++++++++- src/test/regress/expected/multi_upsert.out | 2 + .../regress/sql/local_shard_execution.sql | 85 ++++++ 6 files changed, 363 insertions(+), 12 deletions(-) diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 46e2c45e1..952f57459 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -35,6 +35,7 @@ #include "postgres.h" #include "distributed/distributed_planner.h" +#include "distributed/insert_select_planner.h" #include "distributed/multi_physical_planner.h" /* only to use some utility functions */ #include "distributed/metadata_cache.h" #include "distributed/multi_router_planner.h" @@ -52,6 +53,7 @@ #else #include "optimizer/clauses.h" #endif +#include "tcop/pquery.h" bool EnableFastPathRouterPlanner = true; @@ -95,7 +97,6 @@ FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams) parse->jointree->quals = (Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals); - PlannedStmt *result = GeneratePlaceHolderPlannedStmt(originalQuery); return result; @@ -126,7 +127,9 @@ GeneratePlaceHolderPlannedStmt(Query *parse) /* there is only a single relation rte */ seqScanNode->scanrelid = 1; - plan->targetlist = copyObject(parse->targetList); + plan->targetlist = + copyObject(FetchStatementTargetList((Node *) parse)); + plan->qual = NULL; plan->lefttree = NULL; plan->righttree = NULL; @@ -139,6 +142,7 @@ GeneratePlaceHolderPlannedStmt(Query *parse) result->rtable = copyObject(parse->rtable); result->planTree = (Plan *) plan; + result->hasReturning = (parse->returningList != NIL); Oid relationId = ExtractFirstDistributedTableId(parse); result->relationOids = list_make1_oid(relationId); @@ -159,6 +163,8 @@ GeneratePlaceHolderPlannedStmt(Query *parse) * key should only exists once in the WHERE clause. So basically, * SELECT ... FROM dist_table WHERE dist_key = X * - No returning for UPDATE/DELETE queries + * - All INSERT statements (including multi-row INSERTs) as long as the commands + * don't have any sublinks/CTEs etc */ bool FastPathRouterQuery(Query *query) @@ -171,14 +177,8 @@ FastPathRouterQuery(Query *query) return false; } - if (!(query->commandType == CMD_SELECT || query->commandType == CMD_UPDATE || - query->commandType == CMD_DELETE)) - { - return false; - } - /* - * We want to deal with only very simple select queries. Some of the + * We want to deal with only very simple queries. Some of the * checks might be too restrictive, still we prefer this way. */ if (query->cteList != NIL || query->returningList != NIL || @@ -188,6 +188,17 @@ FastPathRouterQuery(Query *query) return false; } + if (CheckInsertSelectQuery(query)) + { + /* we don't support INSERT..SELECT in the fast-path */ + return false; + } + else if (query->commandType == CMD_INSERT) + { + /* we don't need to do any further checks, all INSERTs are fast-path */ + return true; + } + /* make sure that the only range table in FROM clause */ if (list_length(query->rtable) != 1) { diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 1b872b874..b216a42d6 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -70,7 +70,6 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, selectPartitionColumnTableId); static DistributedPlan * CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse); static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery); -static bool CheckInsertSelectQuery(Query *query); /* @@ -129,7 +128,7 @@ InsertSelectIntoLocalTable(Query *query) * This function is inspired from getInsertSelectQuery() on * rewrite/rewriteManip.c. */ -static bool +bool CheckInsertSelectQuery(Query *query) { CmdType commandType = query->commandType; diff --git a/src/include/distributed/insert_select_planner.h b/src/include/distributed/insert_select_planner.h index 23a031059..5dfac3259 100644 --- a/src/include/distributed/insert_select_planner.h +++ b/src/include/distributed/insert_select_planner.h @@ -24,6 +24,7 @@ extern bool InsertSelectIntoDistributedTable(Query *query); +extern bool CheckInsertSelectQuery(Query *query); extern bool InsertSelectIntoLocalTable(Query *query); extern Query * ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 797f390c9..1a4626b6d 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -644,7 +644,7 @@ CREATE OR REPLACE PROCEDURE only_local_execution_with_params(int) AS $$ END; $$ LANGUAGE plpgsql; CALL only_local_execution_with_params(1); -LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES ($1, '11'::text, 21) ON CONFLICT(key) DO UPDATE SET value = '29'::text +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '29'::text CONTEXT: SQL statement "INSERT INTO distributed_table VALUES ($1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29'" PL/pgSQL function only_local_execution_with_params(integer) line 4 at SQL statement LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) @@ -905,6 +905,163 @@ LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_e (1 row) COMMIT; +PREPARE local_insert_prepare_no_param AS INSERT INTO distributed_table VALUES (1+0*random(), '11',21::int) ON CONFLICT(key) DO UPDATE SET value = '29' || '28' RETURNING *, key + 1, value || '30', age * 15; +PREPARE local_insert_prepare_param (int) AS INSERT INTO distributed_table VALUES ($1+0*random(), '11',21::int) ON CONFLICT(key) DO UPDATE SET value = '29' || '28' RETURNING *, key + 1, value || '30', age * 15; +BEGIN; + -- 6 local execution without params + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + -- 6 local executions with params + EXECUTE local_insert_prepare_param(1); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_param(5); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 5 | 2928 | 22 | 6 | 292830 | 330 +(1 row) + + EXECUTE local_insert_prepare_param(6); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 6 | 11 | 21 | 7 | 1130 | 315 +(1 row) + + EXECUTE local_insert_prepare_param(1); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_param(5); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 5 | 2928 | 22 | 6 | 292830 | 330 +(1 row) + + EXECUTE local_insert_prepare_param(6); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 6 | 2928 | 21 | 7 | 292830 | 315 +(1 row) + + -- followed by a non-local execution + EXECUTE remote_prepare_param(2); +LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.<>) 2) +LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE (key OPERATOR(pg_catalog.<>) 2) + count +------- + 5 +(1 row) + +COMMIT; +PREPARE local_multi_row_insert_prepare_no_param AS + INSERT INTO distributed_table VALUES (1,'55', 21), (5,'15',33) ON CONFLICT (key) WHERE key > 3 and key < 4 DO UPDATE SET value = '88' || EXCLUDED.value; +PREPARE local_multi_row_insert_prepare_no_param_multi_shard AS + INSERT INTO distributed_table VALUES (6,'55', 21), (5,'15',33) ON CONFLICT (key) WHERE key > 3 AND key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;; +PREPARE local_multi_row_insert_prepare_params(int,int) AS + INSERT INTO distributed_table VALUES ($1,'55', 21), ($2,'15',33) ON CONFLICT (key) WHERE key > 3 and key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;; +INSERT INTO reference_table VALUES (11); +LOG: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 (key) VALUES (11) +BEGIN; + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(1,6); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(1,5); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(6,5); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(5,1); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'55'::text,'21'::bigint), (1,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(5,6); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(5,1); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'55'::text,'21'::bigint), (1,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + -- one task is remote + EXECUTE local_multi_row_insert_prepare_params(5,11); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +ROLLBACK; -- failures of local execution should rollback both the -- local execution and remote executions -- fail on a local execution @@ -1110,6 +1267,102 @@ LOG: executing the command locally: SELECT key, ser, ts, collection_id, value F (2 rows) COMMIT; +TRUNCATE collections_list; +-- make sure that even if local execution is used, the sequence values +-- are generated locally +ALTER SEQUENCE collections_list_key_seq NO MINVALUE NO MAXVALUE; +PREPARE serial_prepared_local AS INSERT INTO collections_list (collection_id) VALUES (0) RETURNING key, ser; +SELECT setval('collections_list_key_seq', 4); + setval +-------- + 4 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470009 (key, ser, collection_id) VALUES ('5'::bigint, '3940649673949187'::bigint, 0) RETURNING key, ser + key | ser +-----+------------------ + 5 | 3940649673949187 +(1 row) + +SELECT setval('collections_list_key_seq', 5); + setval +-------- + 5 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470011 (key, ser, collection_id) VALUES ('6'::bigint, '3940649673949188'::bigint, 0) RETURNING key, ser + key | ser +-----+------------------ + 6 | 3940649673949188 +(1 row) + +SELECT setval('collections_list_key_seq', 499); + setval +-------- + 499 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470011 (key, ser, collection_id) VALUES ('500'::bigint, '3940649673949189'::bigint, 0) RETURNING key, ser + key | ser +-----+------------------ + 500 | 3940649673949189 +(1 row) + +SELECT setval('collections_list_key_seq', 700); + setval +-------- + 700 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470009 (key, ser, collection_id) VALUES ('701'::bigint, '3940649673949190'::bigint, 0) RETURNING key, ser + key | ser +-----+------------------ + 701 | 3940649673949190 +(1 row) + +SELECT setval('collections_list_key_seq', 708); + setval +-------- + 708 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470011 (key, ser, collection_id) VALUES ('709'::bigint, '3940649673949191'::bigint, 0) RETURNING key, ser + key | ser +-----+------------------ + 709 | 3940649673949191 +(1 row) + +SELECT setval('collections_list_key_seq', 709); + setval +-------- + 709 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470009 (key, ser, collection_id) VALUES ('710'::bigint, '3940649673949192'::bigint, 0) RETURNING key, ser + key | ser +-----+------------------ + 710 | 3940649673949192 +(1 row) + +-- and, one remote test +SELECT setval('collections_list_key_seq', 10); + setval +-------- + 10 +(1 row) + +EXECUTE serial_prepared_local; + key | ser +-----+------------------ + 11 | 3940649673949193 +(1 row) + -- the final queries for the following CTEs are going to happen on the intermediate results only -- one of them will be executed remotely, and the other is locally -- Citus currently doesn't allow using task_assignment_policy for intermediate results diff --git a/src/test/regress/expected/multi_upsert.out b/src/test/regress/expected/multi_upsert.out index f05ecf242..c98fe9649 100644 --- a/src/test/regress/expected/multi_upsert.out +++ b/src/test/regress/expected/multi_upsert.out @@ -153,6 +153,7 @@ INSERT INTO upsert_test_2 (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_ -- this errors out since there is no unique constraint on partition key INSERT INTO upsert_test_2 (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING; ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification +CONTEXT: while executing command on localhost:57637 -- create another table CREATE TABLE upsert_test_3 ( @@ -171,6 +172,7 @@ SELECT create_distributed_table('upsert_test_3', 'part_key', 'hash'); -- since there are no unique indexes, error-out INSERT INTO upsert_test_3 VALUES (1, 0) ON CONFLICT(part_key) DO UPDATE SET count = upsert_test_3.count + 1; ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification +CONTEXT: while executing command on localhost:57637 -- create another table CREATE TABLE upsert_test_4 ( diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index c17957172..9876046ed 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -485,6 +485,65 @@ BEGIN; EXECUTE remote_prepare_param(1); COMMIT; +PREPARE local_insert_prepare_no_param AS INSERT INTO distributed_table VALUES (1+0*random(), '11',21::int) ON CONFLICT(key) DO UPDATE SET value = '29' || '28' RETURNING *, key + 1, value || '30', age * 15; +PREPARE local_insert_prepare_param (int) AS INSERT INTO distributed_table VALUES ($1+0*random(), '11',21::int) ON CONFLICT(key) DO UPDATE SET value = '29' || '28' RETURNING *, key + 1, value || '30', age * 15; +BEGIN; + -- 6 local execution without params + EXECUTE local_insert_prepare_no_param; + EXECUTE local_insert_prepare_no_param; + EXECUTE local_insert_prepare_no_param; + EXECUTE local_insert_prepare_no_param; + EXECUTE local_insert_prepare_no_param; + EXECUTE local_insert_prepare_no_param; + + -- 6 local executions with params + EXECUTE local_insert_prepare_param(1); + EXECUTE local_insert_prepare_param(5); + EXECUTE local_insert_prepare_param(6); + EXECUTE local_insert_prepare_param(1); + EXECUTE local_insert_prepare_param(5); + EXECUTE local_insert_prepare_param(6); + + -- followed by a non-local execution + EXECUTE remote_prepare_param(2); +COMMIT; + +PREPARE local_multi_row_insert_prepare_no_param AS + INSERT INTO distributed_table VALUES (1,'55', 21), (5,'15',33) ON CONFLICT (key) WHERE key > 3 and key < 4 DO UPDATE SET value = '88' || EXCLUDED.value; + +PREPARE local_multi_row_insert_prepare_no_param_multi_shard AS + INSERT INTO distributed_table VALUES (6,'55', 21), (5,'15',33) ON CONFLICT (key) WHERE key > 3 AND key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;; + +PREPARE local_multi_row_insert_prepare_params(int,int) AS + INSERT INTO distributed_table VALUES ($1,'55', 21), ($2,'15',33) ON CONFLICT (key) WHERE key > 3 and key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;; +INSERT INTO reference_table VALUES (11); +BEGIN; + EXECUTE local_multi_row_insert_prepare_no_param; + EXECUTE local_multi_row_insert_prepare_no_param; + EXECUTE local_multi_row_insert_prepare_no_param; + EXECUTE local_multi_row_insert_prepare_no_param; + EXECUTE local_multi_row_insert_prepare_no_param; + EXECUTE local_multi_row_insert_prepare_no_param; + + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + + EXECUTE local_multi_row_insert_prepare_params(1,6); + EXECUTE local_multi_row_insert_prepare_params(1,5); + EXECUTE local_multi_row_insert_prepare_params(6,5); + EXECUTE local_multi_row_insert_prepare_params(5,1); + EXECUTE local_multi_row_insert_prepare_params(5,6); + EXECUTE local_multi_row_insert_prepare_params(5,1); + + -- one task is remote + EXECUTE local_multi_row_insert_prepare_params(5,11); +ROLLBACK; + + -- failures of local execution should rollback both the -- local execution and remote executions @@ -619,6 +678,32 @@ BEGIN; SELECT * FROM collections_list ORDER BY 1,2,3,4; COMMIT; + +TRUNCATE collections_list; + +-- make sure that even if local execution is used, the sequence values +-- are generated locally +ALTER SEQUENCE collections_list_key_seq NO MINVALUE NO MAXVALUE; + +PREPARE serial_prepared_local AS INSERT INTO collections_list (collection_id) VALUES (0) RETURNING key, ser; + +SELECT setval('collections_list_key_seq', 4); +EXECUTE serial_prepared_local; +SELECT setval('collections_list_key_seq', 5); +EXECUTE serial_prepared_local; +SELECT setval('collections_list_key_seq', 499); +EXECUTE serial_prepared_local; +SELECT setval('collections_list_key_seq', 700); +EXECUTE serial_prepared_local; +SELECT setval('collections_list_key_seq', 708); +EXECUTE serial_prepared_local; +SELECT setval('collections_list_key_seq', 709); +EXECUTE serial_prepared_local; + +-- and, one remote test +SELECT setval('collections_list_key_seq', 10); +EXECUTE serial_prepared_local; + -- the final queries for the following CTEs are going to happen on the intermediate results only -- one of them will be executed remotely, and the other is locally -- Citus currently doesn't allow using task_assignment_policy for intermediate results