From a174eb4f7b9f60413815deda48b92c9e4e861ecb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96nder=20Kalac=C4=B1?= Date: Fri, 3 Jan 2020 12:15:22 +0000 Subject: [PATCH 1/3] Do not go through standard_planner() for INSERTs (#3348) That seems unnecessary. We already have the notion of FastPath queries, simply add it there. --- .../planner/fast_path_router_planner.c | 29 +- .../planner/insert_select_planner.c | 3 +- .../distributed/insert_select_planner.h | 1 + .../expected/local_shard_execution.out | 255 +++++++++++++++++- src/test/regress/expected/multi_upsert.out | 2 + .../regress/sql/local_shard_execution.sql | 85 ++++++ 6 files changed, 363 insertions(+), 12 deletions(-) diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 46e2c45e1..952f57459 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -35,6 +35,7 @@ #include "postgres.h" #include "distributed/distributed_planner.h" +#include "distributed/insert_select_planner.h" #include "distributed/multi_physical_planner.h" /* only to use some utility functions */ #include "distributed/metadata_cache.h" #include "distributed/multi_router_planner.h" @@ -52,6 +53,7 @@ #else #include "optimizer/clauses.h" #endif +#include "tcop/pquery.h" bool EnableFastPathRouterPlanner = true; @@ -95,7 +97,6 @@ FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams) parse->jointree->quals = (Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals); - PlannedStmt *result = GeneratePlaceHolderPlannedStmt(originalQuery); return result; @@ -126,7 +127,9 @@ GeneratePlaceHolderPlannedStmt(Query *parse) /* there is only a single relation rte */ seqScanNode->scanrelid = 1; - plan->targetlist = copyObject(parse->targetList); + plan->targetlist = + copyObject(FetchStatementTargetList((Node *) parse)); + plan->qual = NULL; plan->lefttree = NULL; plan->righttree = NULL; @@ -139,6 +142,7 @@ GeneratePlaceHolderPlannedStmt(Query *parse) result->rtable = copyObject(parse->rtable); result->planTree = (Plan *) plan; + result->hasReturning = (parse->returningList != NIL); Oid relationId = ExtractFirstDistributedTableId(parse); result->relationOids = list_make1_oid(relationId); @@ -159,6 +163,8 @@ GeneratePlaceHolderPlannedStmt(Query *parse) * key should only exists once in the WHERE clause. So basically, * SELECT ... FROM dist_table WHERE dist_key = X * - No returning for UPDATE/DELETE queries + * - All INSERT statements (including multi-row INSERTs) as long as the commands + * don't have any sublinks/CTEs etc */ bool FastPathRouterQuery(Query *query) @@ -171,14 +177,8 @@ FastPathRouterQuery(Query *query) return false; } - if (!(query->commandType == CMD_SELECT || query->commandType == CMD_UPDATE || - query->commandType == CMD_DELETE)) - { - return false; - } - /* - * We want to deal with only very simple select queries. Some of the + * We want to deal with only very simple queries. Some of the * checks might be too restrictive, still we prefer this way. */ if (query->cteList != NIL || query->returningList != NIL || @@ -188,6 +188,17 @@ FastPathRouterQuery(Query *query) return false; } + if (CheckInsertSelectQuery(query)) + { + /* we don't support INSERT..SELECT in the fast-path */ + return false; + } + else if (query->commandType == CMD_INSERT) + { + /* we don't need to do any further checks, all INSERTs are fast-path */ + return true; + } + /* make sure that the only range table in FROM clause */ if (list_length(query->rtable) != 1) { diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 1b872b874..b216a42d6 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -70,7 +70,6 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, selectPartitionColumnTableId); static DistributedPlan * CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse); static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery); -static bool CheckInsertSelectQuery(Query *query); /* @@ -129,7 +128,7 @@ InsertSelectIntoLocalTable(Query *query) * This function is inspired from getInsertSelectQuery() on * rewrite/rewriteManip.c. */ -static bool +bool CheckInsertSelectQuery(Query *query) { CmdType commandType = query->commandType; diff --git a/src/include/distributed/insert_select_planner.h b/src/include/distributed/insert_select_planner.h index 23a031059..5dfac3259 100644 --- a/src/include/distributed/insert_select_planner.h +++ b/src/include/distributed/insert_select_planner.h @@ -24,6 +24,7 @@ extern bool InsertSelectIntoDistributedTable(Query *query); +extern bool CheckInsertSelectQuery(Query *query); extern bool InsertSelectIntoLocalTable(Query *query); extern Query * ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 797f390c9..1a4626b6d 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -644,7 +644,7 @@ CREATE OR REPLACE PROCEDURE only_local_execution_with_params(int) AS $$ END; $$ LANGUAGE plpgsql; CALL only_local_execution_with_params(1); -LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES ($1, '11'::text, 21) ON CONFLICT(key) DO UPDATE SET value = '29'::text +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '29'::text CONTEXT: SQL statement "INSERT INTO distributed_table VALUES ($1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29'" PL/pgSQL function only_local_execution_with_params(integer) line 4 at SQL statement LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) @@ -905,6 +905,163 @@ LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_e (1 row) COMMIT; +PREPARE local_insert_prepare_no_param AS INSERT INTO distributed_table VALUES (1+0*random(), '11',21::int) ON CONFLICT(key) DO UPDATE SET value = '29' || '28' RETURNING *, key + 1, value || '30', age * 15; +PREPARE local_insert_prepare_param (int) AS INSERT INTO distributed_table VALUES ($1+0*random(), '11',21::int) ON CONFLICT(key) DO UPDATE SET value = '29' || '28' RETURNING *, key + 1, value || '30', age * 15; +BEGIN; + -- 6 local execution without params + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + -- 6 local executions with params + EXECUTE local_insert_prepare_param(1); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_param(5); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 5 | 2928 | 22 | 6 | 292830 | 330 +(1 row) + + EXECUTE local_insert_prepare_param(6); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 6 | 11 | 21 | 7 | 1130 | 315 +(1 row) + + EXECUTE local_insert_prepare_param(1); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_param(5); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 5 | 2928 | 22 | 6 | 292830 | 330 +(1 row) + + EXECUTE local_insert_prepare_param(6); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +-----+-------+-----+----------+----------+---------- + 6 | 2928 | 21 | 7 | 292830 | 315 +(1 row) + + -- followed by a non-local execution + EXECUTE remote_prepare_param(2); +LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.<>) 2) +LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE (key OPERATOR(pg_catalog.<>) 2) + count +------- + 5 +(1 row) + +COMMIT; +PREPARE local_multi_row_insert_prepare_no_param AS + INSERT INTO distributed_table VALUES (1,'55', 21), (5,'15',33) ON CONFLICT (key) WHERE key > 3 and key < 4 DO UPDATE SET value = '88' || EXCLUDED.value; +PREPARE local_multi_row_insert_prepare_no_param_multi_shard AS + INSERT INTO distributed_table VALUES (6,'55', 21), (5,'15',33) ON CONFLICT (key) WHERE key > 3 AND key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;; +PREPARE local_multi_row_insert_prepare_params(int,int) AS + INSERT INTO distributed_table VALUES ($1,'55', 21), ($2,'15',33) ON CONFLICT (key) WHERE key > 3 and key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;; +INSERT INTO reference_table VALUES (11); +LOG: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 (key) VALUES (11) +BEGIN; + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(1,6); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(1,5); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(6,5); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(5,1); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'55'::text,'21'::bigint), (1,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(5,6); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(5,1); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'55'::text,'21'::bigint), (1,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + -- one task is remote + EXECUTE local_multi_row_insert_prepare_params(5,11); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +ROLLBACK; -- failures of local execution should rollback both the -- local execution and remote executions -- fail on a local execution @@ -1110,6 +1267,102 @@ LOG: executing the command locally: SELECT key, ser, ts, collection_id, value F (2 rows) COMMIT; +TRUNCATE collections_list; +-- make sure that even if local execution is used, the sequence values +-- are generated locally +ALTER SEQUENCE collections_list_key_seq NO MINVALUE NO MAXVALUE; +PREPARE serial_prepared_local AS INSERT INTO collections_list (collection_id) VALUES (0) RETURNING key, ser; +SELECT setval('collections_list_key_seq', 4); + setval +-------- + 4 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470009 (key, ser, collection_id) VALUES ('5'::bigint, '3940649673949187'::bigint, 0) RETURNING key, ser + key | ser +-----+------------------ + 5 | 3940649673949187 +(1 row) + +SELECT setval('collections_list_key_seq', 5); + setval +-------- + 5 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470011 (key, ser, collection_id) VALUES ('6'::bigint, '3940649673949188'::bigint, 0) RETURNING key, ser + key | ser +-----+------------------ + 6 | 3940649673949188 +(1 row) + +SELECT setval('collections_list_key_seq', 499); + setval +-------- + 499 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470011 (key, ser, collection_id) VALUES ('500'::bigint, '3940649673949189'::bigint, 0) RETURNING key, ser + key | ser +-----+------------------ + 500 | 3940649673949189 +(1 row) + +SELECT setval('collections_list_key_seq', 700); + setval +-------- + 700 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470009 (key, ser, collection_id) VALUES ('701'::bigint, '3940649673949190'::bigint, 0) RETURNING key, ser + key | ser +-----+------------------ + 701 | 3940649673949190 +(1 row) + +SELECT setval('collections_list_key_seq', 708); + setval +-------- + 708 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470011 (key, ser, collection_id) VALUES ('709'::bigint, '3940649673949191'::bigint, 0) RETURNING key, ser + key | ser +-----+------------------ + 709 | 3940649673949191 +(1 row) + +SELECT setval('collections_list_key_seq', 709); + setval +-------- + 709 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470009 (key, ser, collection_id) VALUES ('710'::bigint, '3940649673949192'::bigint, 0) RETURNING key, ser + key | ser +-----+------------------ + 710 | 3940649673949192 +(1 row) + +-- and, one remote test +SELECT setval('collections_list_key_seq', 10); + setval +-------- + 10 +(1 row) + +EXECUTE serial_prepared_local; + key | ser +-----+------------------ + 11 | 3940649673949193 +(1 row) + -- the final queries for the following CTEs are going to happen on the intermediate results only -- one of them will be executed remotely, and the other is locally -- Citus currently doesn't allow using task_assignment_policy for intermediate results diff --git a/src/test/regress/expected/multi_upsert.out b/src/test/regress/expected/multi_upsert.out index f05ecf242..c98fe9649 100644 --- a/src/test/regress/expected/multi_upsert.out +++ b/src/test/regress/expected/multi_upsert.out @@ -153,6 +153,7 @@ INSERT INTO upsert_test_2 (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_ -- this errors out since there is no unique constraint on partition key INSERT INTO upsert_test_2 (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING; ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification +CONTEXT: while executing command on localhost:57637 -- create another table CREATE TABLE upsert_test_3 ( @@ -171,6 +172,7 @@ SELECT create_distributed_table('upsert_test_3', 'part_key', 'hash'); -- since there are no unique indexes, error-out INSERT INTO upsert_test_3 VALUES (1, 0) ON CONFLICT(part_key) DO UPDATE SET count = upsert_test_3.count + 1; ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification +CONTEXT: while executing command on localhost:57637 -- create another table CREATE TABLE upsert_test_4 ( diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index c17957172..9876046ed 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -485,6 +485,65 @@ BEGIN; EXECUTE remote_prepare_param(1); COMMIT; +PREPARE local_insert_prepare_no_param AS INSERT INTO distributed_table VALUES (1+0*random(), '11',21::int) ON CONFLICT(key) DO UPDATE SET value = '29' || '28' RETURNING *, key + 1, value || '30', age * 15; +PREPARE local_insert_prepare_param (int) AS INSERT INTO distributed_table VALUES ($1+0*random(), '11',21::int) ON CONFLICT(key) DO UPDATE SET value = '29' || '28' RETURNING *, key + 1, value || '30', age * 15; +BEGIN; + -- 6 local execution without params + EXECUTE local_insert_prepare_no_param; + EXECUTE local_insert_prepare_no_param; + EXECUTE local_insert_prepare_no_param; + EXECUTE local_insert_prepare_no_param; + EXECUTE local_insert_prepare_no_param; + EXECUTE local_insert_prepare_no_param; + + -- 6 local executions with params + EXECUTE local_insert_prepare_param(1); + EXECUTE local_insert_prepare_param(5); + EXECUTE local_insert_prepare_param(6); + EXECUTE local_insert_prepare_param(1); + EXECUTE local_insert_prepare_param(5); + EXECUTE local_insert_prepare_param(6); + + -- followed by a non-local execution + EXECUTE remote_prepare_param(2); +COMMIT; + +PREPARE local_multi_row_insert_prepare_no_param AS + INSERT INTO distributed_table VALUES (1,'55', 21), (5,'15',33) ON CONFLICT (key) WHERE key > 3 and key < 4 DO UPDATE SET value = '88' || EXCLUDED.value; + +PREPARE local_multi_row_insert_prepare_no_param_multi_shard AS + INSERT INTO distributed_table VALUES (6,'55', 21), (5,'15',33) ON CONFLICT (key) WHERE key > 3 AND key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;; + +PREPARE local_multi_row_insert_prepare_params(int,int) AS + INSERT INTO distributed_table VALUES ($1,'55', 21), ($2,'15',33) ON CONFLICT (key) WHERE key > 3 and key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;; +INSERT INTO reference_table VALUES (11); +BEGIN; + EXECUTE local_multi_row_insert_prepare_no_param; + EXECUTE local_multi_row_insert_prepare_no_param; + EXECUTE local_multi_row_insert_prepare_no_param; + EXECUTE local_multi_row_insert_prepare_no_param; + EXECUTE local_multi_row_insert_prepare_no_param; + EXECUTE local_multi_row_insert_prepare_no_param; + + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + + EXECUTE local_multi_row_insert_prepare_params(1,6); + EXECUTE local_multi_row_insert_prepare_params(1,5); + EXECUTE local_multi_row_insert_prepare_params(6,5); + EXECUTE local_multi_row_insert_prepare_params(5,1); + EXECUTE local_multi_row_insert_prepare_params(5,6); + EXECUTE local_multi_row_insert_prepare_params(5,1); + + -- one task is remote + EXECUTE local_multi_row_insert_prepare_params(5,11); +ROLLBACK; + + -- failures of local execution should rollback both the -- local execution and remote executions @@ -619,6 +678,32 @@ BEGIN; SELECT * FROM collections_list ORDER BY 1,2,3,4; COMMIT; + +TRUNCATE collections_list; + +-- make sure that even if local execution is used, the sequence values +-- are generated locally +ALTER SEQUENCE collections_list_key_seq NO MINVALUE NO MAXVALUE; + +PREPARE serial_prepared_local AS INSERT INTO collections_list (collection_id) VALUES (0) RETURNING key, ser; + +SELECT setval('collections_list_key_seq', 4); +EXECUTE serial_prepared_local; +SELECT setval('collections_list_key_seq', 5); +EXECUTE serial_prepared_local; +SELECT setval('collections_list_key_seq', 499); +EXECUTE serial_prepared_local; +SELECT setval('collections_list_key_seq', 700); +EXECUTE serial_prepared_local; +SELECT setval('collections_list_key_seq', 708); +EXECUTE serial_prepared_local; +SELECT setval('collections_list_key_seq', 709); +EXECUTE serial_prepared_local; + +-- and, one remote test +SELECT setval('collections_list_key_seq', 10); +EXECUTE serial_prepared_local; + -- the final queries for the following CTEs are going to happen on the intermediate results only -- one of them will be executed remotely, and the other is locally -- Citus currently doesn't allow using task_assignment_policy for intermediate results From 0c70a5470eb2835ebc7285f934600f917aef1356 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96nder=20Kalac=C4=B1?= Date: Fri, 3 Jan 2020 13:42:50 +0000 Subject: [PATCH 2/3] Allow RETURNING in fast-path queries (#3352) * Allow RETURNING in fast-path queries Because there is no specific reason for that. --- .../planner/fast_path_router_planner.c | 7 +-- .../expected/fast_path_router_modify.out | 55 ++++++++++++++----- .../regress/sql/fast_path_router_modify.sql | 8 ++- 3 files changed, 52 insertions(+), 18 deletions(-) diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 952f57459..3e0b3f20a 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -162,7 +162,6 @@ GeneratePlaceHolderPlannedStmt(Query *parse) * and it should be ANDed with any other filters. Also, the distribution * key should only exists once in the WHERE clause. So basically, * SELECT ... FROM dist_table WHERE dist_key = X - * - No returning for UPDATE/DELETE queries * - All INSERT statements (including multi-row INSERTs) as long as the commands * don't have any sublinks/CTEs etc */ @@ -181,9 +180,9 @@ FastPathRouterQuery(Query *query) * We want to deal with only very simple queries. Some of the * checks might be too restrictive, still we prefer this way. */ - if (query->cteList != NIL || query->returningList != NIL || - query->hasSubLinks || query->setOperations != NULL || - query->hasTargetSRFs || query->hasModifyingCTE) + if (query->cteList != NIL || query->hasSubLinks || + query->setOperations != NULL || query->hasTargetSRFs || + query->hasModifyingCTE) { return false; } diff --git a/src/test/regress/expected/fast_path_router_modify.out b/src/test/regress/expected/fast_path_router_modify.out index 964c49629..80546438e 100644 --- a/src/test/regress/expected/fast_path_router_modify.out +++ b/src/test/regress/expected/fast_path_router_modify.out @@ -2,8 +2,8 @@ CREATE SCHEMA fast_path_router_modify; SET search_path TO fast_path_router_modify; SET citus.next_shard_id TO 1840000; -- all the tests in this file is intended for testing fast-path --- router planner, so we're explicitly enabling itin this file. --- We've bunch of other tests that triggers non-fast-path-router +-- router planner, so we're explicitly enabling itin this file. +-- We've bunch of other tests that triggers non-fast-path-router -- planner (note this is already true by default) SET citus.enable_fast_path_router_planner TO true; SET citus.shard_replication_factor TO 1; @@ -105,26 +105,55 @@ ERROR: modifying the partition value of rows is not allowed UPDATE modify_fast_path SET key = 2::numeric WHERE key = 1; DEBUG: modifying the partition value of rows is not allowed ERROR: modifying the partition value of rows is not allowed --- returning is not supported via fast-path +-- returning is supported via fast-path +INSERT INTO modify_fast_path (key, value_1) VALUES (1,1); +DEBUG: Creating router plan +DEBUG: Plan is router executable +DETAIL: distribution column value: 1 DELETE FROM modify_fast_path WHERE key = 1 RETURNING *; +DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable DETAIL: distribution column value: 1 key | value_1 | value_2 -----+---------+--------- -(0 rows) + 1 | 1 | +(1 row) +INSERT INTO modify_fast_path (key, value_1) VALUES (2,1) RETURNING value_1, key; +DEBUG: Creating router plan +DEBUG: Plan is router executable +DETAIL: distribution column value: 2 + value_1 | key +---------+----- + 1 | 2 +(1 row) + +DELETE FROM modify_fast_path WHERE key = 2 RETURNING value_1 * 15, value_1::numeric * 16; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +DETAIL: distribution column value: 2 + ?column? | ?column? +----------+---------- + 15 | 16 +(1 row) + +-- still, non-immutable functions are not supported +INSERT INTO modify_fast_path (key, value_1) VALUES (2,1) RETURNING value_1, random() * key; +DEBUG: non-IMMUTABLE functions are not allowed in the RETURNING clause +ERROR: non-IMMUTABLE functions are not allowed in the RETURNING clause -- modifying ctes are not supported via fast-path WITH t1 AS (DELETE FROM modify_fast_path WHERE key = 1), t2 AS (SELECT * FROM modify_fast_path) SELECT * FROM t2; DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries -DEBUG: generating subplan 18_1 for CTE t1: DELETE FROM fast_path_router_modify.modify_fast_path WHERE (key OPERATOR(pg_catalog.=) 1) +DEBUG: generating subplan 22_1 for CTE t1: DELETE FROM fast_path_router_modify.modify_fast_path WHERE (key OPERATOR(pg_catalog.=) 1) DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable DETAIL: distribution column value: 1 -DEBUG: generating subplan 18_2 for CTE t2: SELECT key, value_1, value_2 FROM fast_path_router_modify.modify_fast_path +DEBUG: generating subplan 22_2 for CTE t2: SELECT key, value_1, value_2 FROM fast_path_router_modify.modify_fast_path DEBUG: Router planner cannot handle multi-shard select queries -DEBUG: Plan 18 query after replacing subqueries and CTEs: SELECT key, value_1, value_2 FROM (SELECT intermediate_result.key, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('18_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value_1 integer, value_2 text)) t2 +DEBUG: Plan 22 query after replacing subqueries and CTEs: SELECT key, value_1, value_2 FROM (SELECT intermediate_result.key, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('22_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value_1 integer, value_2 text)) t2 DEBUG: Creating router plan DEBUG: Plan is router executable key | value_1 | value_2 @@ -193,12 +222,12 @@ DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -- joins are not supported via fast-path -UPDATE modify_fast_path - SET value_1 = 1 - FROM modify_fast_path_reference - WHERE - modify_fast_path.key = modify_fast_path_reference.key AND - modify_fast_path.key = 1 AND +UPDATE modify_fast_path + SET value_1 = 1 + FROM modify_fast_path_reference + WHERE + modify_fast_path.key = modify_fast_path_reference.key AND + modify_fast_path.key = 1 AND modify_fast_path_reference.key = 1; DEBUG: Creating router plan DEBUG: Plan is router executable diff --git a/src/test/regress/sql/fast_path_router_modify.sql b/src/test/regress/sql/fast_path_router_modify.sql index 98ddd476b..63ab5f3e7 100644 --- a/src/test/regress/sql/fast_path_router_modify.sql +++ b/src/test/regress/sql/fast_path_router_modify.sql @@ -53,8 +53,14 @@ UPDATE modify_fast_path SET key = 1::float WHERE key = 1; UPDATE modify_fast_path SET key = 2 WHERE key = 1; UPDATE modify_fast_path SET key = 2::numeric WHERE key = 1; --- returning is not supported via fast-path +-- returning is supported via fast-path +INSERT INTO modify_fast_path (key, value_1) VALUES (1,1); DELETE FROM modify_fast_path WHERE key = 1 RETURNING *; +INSERT INTO modify_fast_path (key, value_1) VALUES (2,1) RETURNING value_1, key; +DELETE FROM modify_fast_path WHERE key = 2 RETURNING value_1 * 15, value_1::numeric * 16; + +-- still, non-immutable functions are not supported +INSERT INTO modify_fast_path (key, value_1) VALUES (2,1) RETURNING value_1, random() * key; -- modifying ctes are not supported via fast-path WITH t1 AS (DELETE FROM modify_fast_path WHERE key = 1), t2 AS (SELECT * FROM modify_fast_path) SELECT * FROM t2; From 566246ecd4cf02211f99bedbdea56a456b1f7f31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 31 Dec 2019 18:16:17 +0000 Subject: [PATCH 3/3] End regression tests with ensure_no_intermediate_data_leak Also update tests to clean up jobs when they're directly testing job udfs --- .../ensure_no_intermediate_data_leak.out | 19 ++++++++----------- src/test/regress/expected/multi_multiuser.out | 15 +++++++++++---- .../expected/task_tracker_cleanup_job.out | 7 +++++++ .../regress/expected/worker_remove_files.out | 8 ++++++++ src/test/regress/failure_schedule | 3 +++ src/test/regress/multi_follower_schedule | 3 +++ src/test/regress/multi_mx_schedule | 3 +++ src/test/regress/multi_schedule | 5 +++++ .../regress/multi_task_tracker_extra_schedule | 4 ++++ .../sql/ensure_no_intermediate_data_leak.sql | 7 +++---- src/test/regress/sql/multi_multiuser.sql | 1 + .../regress/sql/task_tracker_cleanup_job.sql | 3 +++ src/test/regress/sql/worker_remove_files.sql | 5 +++++ src/test/regress/worker_schedule | 6 ++++++ 14 files changed, 70 insertions(+), 19 deletions(-) create mode 100644 src/test/regress/expected/worker_remove_files.out create mode 100644 src/test/regress/sql/worker_remove_files.sql diff --git a/src/test/regress/expected/ensure_no_intermediate_data_leak.out b/src/test/regress/expected/ensure_no_intermediate_data_leak.out index 4b9fb7a3a..2ed340294 100644 --- a/src/test/regress/expected/ensure_no_intermediate_data_leak.out +++ b/src/test/regress/expected/ensure_no_intermediate_data_leak.out @@ -1,20 +1,17 @@ ------ --- THIS TEST SHOULD IDEALLY BE EXECUTED AT THE END OF --- THE REGRESSION TEST SUITE TO MAKE SURE THAT WE --- CLEAR ALL INTERMEDIATE RESULTS ON BOTH THE COORDINATOR +-- THIS TEST SHOULD IDEALLY BE EXECUTED AT THE END OF +-- THE REGRESSION TEST SUITE TO MAKE SURE THAT WE +-- CLEAR ALL INTERMEDIATE RESULTS ON BOTH THE COORDINATOR -- AND ON THE WORKERS. HOWEVER, WE HAVE SOME ISSUES AROUND --- WINDOWS SUPPORT, FAILURES IN TASK-TRACKER EXECUTOR --- SO WE DISABLE THIS TEST ON WINDOWS +-- WINDOWS SUPPORT SO WE DISABLE THIS TEST ON WINDOWS ------ SELECT pg_ls_dir('base/pgsql_job_cache') WHERE citus_version() NOT ILIKE '%windows%'; pg_ls_dir ----------- (0 rows) -SELECT run_command_on_workers($$SELECT pg_ls_dir('base/pgsql_job_cache') WHERE citus_version() NOT ILIKE '%windows%'$$); - run_command_on_workers ------------------------- - (localhost,57637,t,"") - (localhost,57638,t,"") -(2 rows) +SELECT * FROM run_command_on_workers($$SELECT pg_ls_dir('base/pgsql_job_cache') r WHERE citus_version() NOT ILIKE '%windows%'$$) WHERE result <> ''; + nodename | nodeport | success | result +----------+----------+---------+-------- +(0 rows) diff --git a/src/test/regress/expected/multi_multiuser.out b/src/test/regress/expected/multi_multiuser.out index 28d7f114b..1276fa493 100644 --- a/src/test/regress/expected/multi_multiuser.out +++ b/src/test/regress/expected/multi_multiuser.out @@ -421,7 +421,7 @@ INSERT INTO full_access_user_schema.t1 VALUES (1),(2),(3); -- not allowed to create a table SELECT create_distributed_table('full_access_user_schema.t1', 'id'); ERROR: permission denied for schema full_access_user_schema -CONTEXT: while executing command on localhost:57638 +CONTEXT: while executing command on localhost:57637 RESET ROLE; SET ROLE usage_access; CREATE TYPE usage_access_type AS ENUM ('a', 'b'); @@ -672,7 +672,7 @@ ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_0000 -- different user should not be able to fetch partition file SET ROLE usage_access; SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port); -WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.44518": No such file or directory +WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.18110": No such file or directory CONTEXT: while executing command on localhost:57637 ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_00001" from localhost:57637 -- only the user whom created the files should be able to fetch @@ -711,7 +711,7 @@ RESET ROLE; -- test that the super user is unable to read the contents of the intermediate file, -- although it does create the table SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']); -WARNING: Task file "task_000001.43115" does not have expected suffix ".10" +WARNING: Task file "task_000001.18048" does not have expected suffix ".10" worker_merge_files_into_table ------------------------------- @@ -753,7 +753,7 @@ SELECT worker_merge_files_and_run_query(42, 1, 'CREATE TABLE task_000001_merge(merge_column_0 int)', 'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge' ); -WARNING: Task file "task_000001.43115" does not have expected suffix ".10" +WARNING: Task file "task_000001.18048" does not have expected suffix ".10" worker_merge_files_and_run_query ---------------------------------- @@ -811,6 +811,13 @@ SELECT count(*) FROM pg_merge_job_0042.task_000001; DROP TABLE pg_merge_job_0042.task_000001, pg_merge_job_0042.task_000001_merge; -- drop table so we can reuse the same files for more tests RESET ROLE; \c - - - :master_port +SELECT run_command_on_workers($$SELECT task_tracker_cleanup_job(42);$$); + run_command_on_workers +------------------------ + (localhost,57637,t,"") + (localhost,57638,t,"") +(2 rows) + DROP SCHEMA full_access_user_schema CASCADE; NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table full_access_user_schema.t1 diff --git a/src/test/regress/expected/task_tracker_cleanup_job.out b/src/test/regress/expected/task_tracker_cleanup_job.out index 664084a77..4b5ade859 100644 --- a/src/test/regress/expected/task_tracker_cleanup_job.out +++ b/src/test/regress/expected/task_tracker_cleanup_job.out @@ -101,3 +101,10 @@ SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010/task_801107'); ERROR: could not stat file "base/pgsql_job_cache/job_401010/task_801107": No such file or directory SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010'); ERROR: could not stat file "base/pgsql_job_cache/job_401010": No such file or directory +-- Also clean up worker_cleanup_job_schema_cache job +SELECT task_tracker_cleanup_job(2); + task_tracker_cleanup_job +-------------------------- + +(1 row) + diff --git a/src/test/regress/expected/worker_remove_files.out b/src/test/regress/expected/worker_remove_files.out new file mode 100644 index 000000000..0d2a80734 --- /dev/null +++ b/src/test/regress/expected/worker_remove_files.out @@ -0,0 +1,8 @@ +-- Clear job directory used by previous tests +\set JobId 201010 +SELECT task_tracker_cleanup_job(:JobId); + task_tracker_cleanup_job +-------------------------- + +(1 row) + diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index 8212e1bdd..c410c678e 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -30,3 +30,6 @@ test: failure_savepoints test: failure_multi_row_insert test: failure_mx_metadata_sync test: failure_connection_establishment + +# test that no tests leaked intermediate results. This should always be last +test: ensure_no_intermediate_data_leak diff --git a/src/test/regress/multi_follower_schedule b/src/test/regress/multi_follower_schedule index 9c0b63099..3e6efee6d 100644 --- a/src/test/regress/multi_follower_schedule +++ b/src/test/regress/multi_follower_schedule @@ -3,3 +3,6 @@ test: multi_follower_select_statements test: multi_follower_dml test: multi_follower_configure_followers test: multi_follower_task_tracker + +# test that no tests leaked intermediate results. This should always be last +test: ensure_no_intermediate_data_leak diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 726050b64..38d000eb4 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -42,3 +42,6 @@ test: multi_mx_transaction_recovery test: multi_mx_modifying_xacts test: multi_mx_explain test: multi_mx_reference_table + +# test that no tests leaked intermediate results. This should always be last +test: ensure_no_intermediate_data_leak diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index fe8045e77..c65cc7d7a 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -313,3 +313,8 @@ test: distributed_procedure # deparsing logic tests # --------- test: multi_deparse_function multi_deparse_procedure + +# --------- +# test that no tests leaked intermediate results. This should always be last +# --------- +test: ensure_no_intermediate_data_leak diff --git a/src/test/regress/multi_task_tracker_extra_schedule b/src/test/regress/multi_task_tracker_extra_schedule index 0e0bfd6d4..6cefdd098 100644 --- a/src/test/regress/multi_task_tracker_extra_schedule +++ b/src/test/regress/multi_task_tracker_extra_schedule @@ -110,3 +110,7 @@ test: multi_drop_extension # ---------- test: multi_schema_support +# ---------- +# test that no tests leaked intermediate results. This should always be last +# ---------- +test: ensure_no_intermediate_data_leak diff --git a/src/test/regress/sql/ensure_no_intermediate_data_leak.sql b/src/test/regress/sql/ensure_no_intermediate_data_leak.sql index 614208f43..b265a46de 100644 --- a/src/test/regress/sql/ensure_no_intermediate_data_leak.sql +++ b/src/test/regress/sql/ensure_no_intermediate_data_leak.sql @@ -1,12 +1,11 @@ ------ -- THIS TEST SHOULD IDEALLY BE EXECUTED AT THE END OF --- THE REGRESSION TEST SUITE TO MAKE SURE THAT WE +-- THE REGRESSION TEST SUITE TO MAKE SURE THAT WE -- CLEAR ALL INTERMEDIATE RESULTS ON BOTH THE COORDINATOR -- AND ON THE WORKERS. HOWEVER, WE HAVE SOME ISSUES AROUND --- WINDOWS SUPPORT, FAILURES IN TASK-TRACKER EXECUTOR --- SO WE DISABLE THIS TEST ON WINDOWS +-- WINDOWS SUPPORT SO WE DISABLE THIS TEST ON WINDOWS ------ SELECT pg_ls_dir('base/pgsql_job_cache') WHERE citus_version() NOT ILIKE '%windows%'; -SELECT run_command_on_workers($$SELECT pg_ls_dir('base/pgsql_job_cache') WHERE citus_version() NOT ILIKE '%windows%'$$); +SELECT * FROM run_command_on_workers($$SELECT pg_ls_dir('base/pgsql_job_cache') r WHERE citus_version() NOT ILIKE '%windows%'$$) WHERE result <> ''; diff --git a/src/test/regress/sql/multi_multiuser.sql b/src/test/regress/sql/multi_multiuser.sql index 76ba3ee50..651e489e5 100644 --- a/src/test/regress/sql/multi_multiuser.sql +++ b/src/test/regress/sql/multi_multiuser.sql @@ -488,6 +488,7 @@ RESET ROLE; \c - - - :master_port +SELECT run_command_on_workers($$SELECT task_tracker_cleanup_job(42);$$); DROP SCHEMA full_access_user_schema CASCADE; DROP TABLE diff --git a/src/test/regress/sql/task_tracker_cleanup_job.sql b/src/test/regress/sql/task_tracker_cleanup_job.sql index 9a8b9af82..ffabf1eed 100644 --- a/src/test/regress/sql/task_tracker_cleanup_job.sql +++ b/src/test/regress/sql/task_tracker_cleanup_job.sql @@ -46,3 +46,6 @@ SELECT task_tracker_task_status(:JobId, :RunningTaskId); SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010/task_801107'); SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010'); + +-- Also clean up worker_cleanup_job_schema_cache job +SELECT task_tracker_cleanup_job(2); diff --git a/src/test/regress/sql/worker_remove_files.sql b/src/test/regress/sql/worker_remove_files.sql new file mode 100644 index 000000000..9b3ce85ba --- /dev/null +++ b/src/test/regress/sql/worker_remove_files.sql @@ -0,0 +1,5 @@ +-- Clear job directory used by previous tests + +\set JobId 201010 + +SELECT task_tracker_cleanup_job(:JobId); diff --git a/src/test/regress/worker_schedule b/src/test/regress/worker_schedule index e3b3d7a51..031bdff90 100644 --- a/src/test/regress/worker_schedule +++ b/src/test/regress/worker_schedule @@ -19,6 +19,7 @@ test: worker_hash_partition worker_hash_partition_complex test: worker_merge_range_files worker_merge_hash_files test: worker_binary_data_partition worker_null_data_partition test: worker_check_invalid_arguments +test: worker_remove_files # ---------- # All task tracker tests use the following tables @@ -26,3 +27,8 @@ test: worker_check_invalid_arguments test: task_tracker_create_table test: task_tracker_assign_task task_tracker_partition_task test: task_tracker_cleanup_job + +# --------- +# test that no tests leaked intermediate results. This should always be last +# --------- +test: ensure_no_intermediate_data_leak