diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 46e2c45e1..3e0b3f20a 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -35,6 +35,7 @@ #include "postgres.h" #include "distributed/distributed_planner.h" +#include "distributed/insert_select_planner.h" #include "distributed/multi_physical_planner.h" /* only to use some utility functions */ #include "distributed/metadata_cache.h" #include "distributed/multi_router_planner.h" @@ -52,6 +53,7 @@ #else #include "optimizer/clauses.h" #endif +#include "tcop/pquery.h" bool EnableFastPathRouterPlanner = true; @@ -95,7 +97,6 @@ FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams) parse->jointree->quals = (Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals); - PlannedStmt *result = GeneratePlaceHolderPlannedStmt(originalQuery); return result; @@ -126,7 +127,9 @@ GeneratePlaceHolderPlannedStmt(Query *parse) /* there is only a single relation rte */ seqScanNode->scanrelid = 1; - plan->targetlist = copyObject(parse->targetList); + plan->targetlist = + copyObject(FetchStatementTargetList((Node *) parse)); + plan->qual = NULL; plan->lefttree = NULL; plan->righttree = NULL; @@ -139,6 +142,7 @@ GeneratePlaceHolderPlannedStmt(Query *parse) result->rtable = copyObject(parse->rtable); result->planTree = (Plan *) plan; + result->hasReturning = (parse->returningList != NIL); Oid relationId = ExtractFirstDistributedTableId(parse); result->relationOids = list_make1_oid(relationId); @@ -158,7 +162,8 @@ GeneratePlaceHolderPlannedStmt(Query *parse) * and it should be ANDed with any other filters. Also, the distribution * key should only exists once in the WHERE clause. So basically, * SELECT ... FROM dist_table WHERE dist_key = X - * - No returning for UPDATE/DELETE queries + * - All INSERT statements (including multi-row INSERTs) as long as the commands + * don't have any sublinks/CTEs etc */ bool FastPathRouterQuery(Query *query) @@ -171,22 +176,27 @@ FastPathRouterQuery(Query *query) return false; } - if (!(query->commandType == CMD_SELECT || query->commandType == CMD_UPDATE || - query->commandType == CMD_DELETE)) + /* + * We want to deal with only very simple queries. Some of the + * checks might be too restrictive, still we prefer this way. + */ + if (query->cteList != NIL || query->hasSubLinks || + query->setOperations != NULL || query->hasTargetSRFs || + query->hasModifyingCTE) { return false; } - /* - * We want to deal with only very simple select queries. Some of the - * checks might be too restrictive, still we prefer this way. - */ - if (query->cteList != NIL || query->returningList != NIL || - query->hasSubLinks || query->setOperations != NULL || - query->hasTargetSRFs || query->hasModifyingCTE) + if (CheckInsertSelectQuery(query)) { + /* we don't support INSERT..SELECT in the fast-path */ return false; } + else if (query->commandType == CMD_INSERT) + { + /* we don't need to do any further checks, all INSERTs are fast-path */ + return true; + } /* make sure that the only range table in FROM clause */ if (list_length(query->rtable) != 1) diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 1b872b874..b216a42d6 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -70,7 +70,6 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, selectPartitionColumnTableId); static DistributedPlan * CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse); static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery); -static bool CheckInsertSelectQuery(Query *query); /* @@ -129,7 +128,7 @@ InsertSelectIntoLocalTable(Query *query) * This function is inspired from getInsertSelectQuery() on * rewrite/rewriteManip.c. */ -static bool +bool CheckInsertSelectQuery(Query *query) { CmdType commandType = query->commandType; diff --git a/src/include/distributed/insert_select_planner.h b/src/include/distributed/insert_select_planner.h index 23a031059..5dfac3259 100644 --- a/src/include/distributed/insert_select_planner.h +++ b/src/include/distributed/insert_select_planner.h @@ -24,6 +24,7 @@ extern bool InsertSelectIntoDistributedTable(Query *query); +extern bool CheckInsertSelectQuery(Query *query); extern bool InsertSelectIntoLocalTable(Query *query); extern Query * ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, diff --git a/src/test/regress/expected/ensure_no_intermediate_data_leak.out b/src/test/regress/expected/ensure_no_intermediate_data_leak.out index 2e4cb92b8..3267501db 100644 --- a/src/test/regress/expected/ensure_no_intermediate_data_leak.out +++ b/src/test/regress/expected/ensure_no_intermediate_data_leak.out @@ -1,20 +1,17 @@ --------------------------------------------------------------------- -- THIS TEST SHOULD IDEALLY BE EXECUTED AT THE END OF --- THE REGRESSION TEST SUITE TO MAKE SURE THAT WE +-- THE REGRESSION TEST SUITE TO MAKE SURE THAT WE -- CLEAR ALL INTERMEDIATE RESULTS ON BOTH THE COORDINATOR -- AND ON THE WORKERS. HOWEVER, WE HAVE SOME ISSUES AROUND --- WINDOWS SUPPORT, FAILURES IN TASK-TRACKER EXECUTOR --- SO WE DISABLE THIS TEST ON WINDOWS +-- WINDOWS SUPPORT SO WE DISABLE THIS TEST ON WINDOWS --------------------------------------------------------------------- SELECT pg_ls_dir('base/pgsql_job_cache') WHERE citus_version() NOT ILIKE '%windows%'; pg_ls_dir --------------------------------------------------------------------- (0 rows) -SELECT run_command_on_workers($$SELECT pg_ls_dir('base/pgsql_job_cache') WHERE citus_version() NOT ILIKE '%windows%'$$); - run_command_on_workers +SELECT * FROM run_command_on_workers($$SELECT pg_ls_dir('base/pgsql_job_cache') r WHERE citus_version() NOT ILIKE '%windows%'$$) WHERE result <> ''; + nodename | nodeport | success | result --------------------------------------------------------------------- - (localhost,57637,t,"") - (localhost,57638,t,"") -(2 rows) +(0 rows) diff --git a/src/test/regress/expected/fast_path_router_modify.out b/src/test/regress/expected/fast_path_router_modify.out index 06fe66e5a..c83c9dca0 100644 --- a/src/test/regress/expected/fast_path_router_modify.out +++ b/src/test/regress/expected/fast_path_router_modify.out @@ -105,15 +105,44 @@ ERROR: modifying the partition value of rows is not allowed UPDATE modify_fast_path SET key = 2::numeric WHERE key = 1; DEBUG: modifying the partition value of rows is not allowed ERROR: modifying the partition value of rows is not allowed --- returning is not supported via fast-path +-- returning is supported via fast-path +INSERT INTO modify_fast_path (key, value_1) VALUES (1,1); +DEBUG: Creating router plan +DEBUG: Plan is router executable +DETAIL: distribution column value: 1 DELETE FROM modify_fast_path WHERE key = 1 RETURNING *; +DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable DETAIL: distribution column value: 1 key | value_1 | value_2 --------------------------------------------------------------------- -(0 rows) + 1 | 1 | +(1 row) +INSERT INTO modify_fast_path (key, value_1) VALUES (2,1) RETURNING value_1, key; +DEBUG: Creating router plan +DEBUG: Plan is router executable +DETAIL: distribution column value: 2 + value_1 | key +--------------------------------------------------------------------- + 1 | 2 +(1 row) + +DELETE FROM modify_fast_path WHERE key = 2 RETURNING value_1 * 15, value_1::numeric * 16; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +DETAIL: distribution column value: 2 + ?column? | ?column? +--------------------------------------------------------------------- + 15 | 16 +(1 row) + +-- still, non-immutable functions are not supported +INSERT INTO modify_fast_path (key, value_1) VALUES (2,1) RETURNING value_1, random() * key; +DEBUG: non-IMMUTABLE functions are not allowed in the RETURNING clause +ERROR: non-IMMUTABLE functions are not allowed in the RETURNING clause -- modifying ctes are not supported via fast-path WITH t1 AS (DELETE FROM modify_fast_path WHERE key = 1), t2 AS (SELECT * FROM modify_fast_path) SELECT * FROM t2; DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 2a3834f03..b2116e903 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -644,7 +644,7 @@ CREATE OR REPLACE PROCEDURE only_local_execution_with_params(int) AS $$ END; $$ LANGUAGE plpgsql; CALL only_local_execution_with_params(1); -LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES ($1, '11'::text, 21) ON CONFLICT(key) DO UPDATE SET value = '29'::text +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '29'::text CONTEXT: SQL statement "INSERT INTO distributed_table VALUES ($1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29'" PL/pgSQL function only_local_execution_with_params(integer) line 4 at SQL statement LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) @@ -905,6 +905,163 @@ LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_e (1 row) COMMIT; +PREPARE local_insert_prepare_no_param AS INSERT INTO distributed_table VALUES (1+0*random(), '11',21::int) ON CONFLICT(key) DO UPDATE SET value = '29' || '28' RETURNING *, key + 1, value || '30', age * 15; +PREPARE local_insert_prepare_param (int) AS INSERT INTO distributed_table VALUES ($1+0*random(), '11',21::int) ON CONFLICT(key) DO UPDATE SET value = '29' || '28' RETURNING *, key + 1, value || '30', age * 15; +BEGIN; + -- 6 local execution without params + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +--------------------------------------------------------------------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +--------------------------------------------------------------------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +--------------------------------------------------------------------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +--------------------------------------------------------------------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +--------------------------------------------------------------------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +--------------------------------------------------------------------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + -- 6 local executions with params + EXECUTE local_insert_prepare_param(1); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +--------------------------------------------------------------------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_param(5); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +--------------------------------------------------------------------- + 5 | 2928 | 22 | 6 | 292830 | 330 +(1 row) + + EXECUTE local_insert_prepare_param(6); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +--------------------------------------------------------------------- + 6 | 11 | 21 | 7 | 1130 | 315 +(1 row) + + EXECUTE local_insert_prepare_param(1); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +--------------------------------------------------------------------- + 1 | 2928 | 21 | 2 | 292830 | 315 +(1 row) + + EXECUTE local_insert_prepare_param(5); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +--------------------------------------------------------------------- + 5 | 2928 | 22 | 6 | 292830 | 330 +(1 row) + + EXECUTE local_insert_prepare_param(6); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15) + key | value | age | ?column? | ?column? | ?column? +--------------------------------------------------------------------- + 6 | 2928 | 21 | 7 | 292830 | 315 +(1 row) + + -- followed by a non-local execution + EXECUTE remote_prepare_param(2); +LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.<>) 2) +LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE (key OPERATOR(pg_catalog.<>) 2) + count +--------------------------------------------------------------------- + 5 +(1 row) + +COMMIT; +PREPARE local_multi_row_insert_prepare_no_param AS + INSERT INTO distributed_table VALUES (1,'55', 21), (5,'15',33) ON CONFLICT (key) WHERE key > 3 and key < 4 DO UPDATE SET value = '88' || EXCLUDED.value; +PREPARE local_multi_row_insert_prepare_no_param_multi_shard AS + INSERT INTO distributed_table VALUES (6,'55', 21), (5,'15',33) ON CONFLICT (key) WHERE key > 3 AND key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;; +PREPARE local_multi_row_insert_prepare_params(int,int) AS + INSERT INTO distributed_table VALUES ($1,'55', 21), ($2,'15',33) ON CONFLICT (key) WHERE key > 3 and key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;; +INSERT INTO reference_table VALUES (11); +LOG: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 (key) VALUES (11) +BEGIN; + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(1,6); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(1,5); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(6,5); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(5,1); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'55'::text,'21'::bigint), (1,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(5,6); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + EXECUTE local_multi_row_insert_prepare_params(5,1); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'55'::text,'21'::bigint), (1,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) + -- one task is remote + EXECUTE local_multi_row_insert_prepare_params(5,11); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value) +ROLLBACK; -- failures of local execution should rollback both the -- local execution and remote executions -- fail on a local execution @@ -1110,6 +1267,102 @@ LOG: executing the command locally: SELECT key, ser, ts, collection_id, value F (2 rows) COMMIT; +TRUNCATE collections_list; +-- make sure that even if local execution is used, the sequence values +-- are generated locally +ALTER SEQUENCE collections_list_key_seq NO MINVALUE NO MAXVALUE; +PREPARE serial_prepared_local AS INSERT INTO collections_list (collection_id) VALUES (0) RETURNING key, ser; +SELECT setval('collections_list_key_seq', 4); + setval +--------------------------------------------------------------------- + 4 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470009 (key, ser, collection_id) VALUES ('5'::bigint, '3940649673949187'::bigint, 0) RETURNING key, ser + key | ser +--------------------------------------------------------------------- + 5 | 3940649673949187 +(1 row) + +SELECT setval('collections_list_key_seq', 5); + setval +--------------------------------------------------------------------- + 5 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470011 (key, ser, collection_id) VALUES ('6'::bigint, '3940649673949188'::bigint, 0) RETURNING key, ser + key | ser +--------------------------------------------------------------------- + 6 | 3940649673949188 +(1 row) + +SELECT setval('collections_list_key_seq', 499); + setval +--------------------------------------------------------------------- + 499 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470011 (key, ser, collection_id) VALUES ('500'::bigint, '3940649673949189'::bigint, 0) RETURNING key, ser + key | ser +--------------------------------------------------------------------- + 500 | 3940649673949189 +(1 row) + +SELECT setval('collections_list_key_seq', 700); + setval +--------------------------------------------------------------------- + 700 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470009 (key, ser, collection_id) VALUES ('701'::bigint, '3940649673949190'::bigint, 0) RETURNING key, ser + key | ser +--------------------------------------------------------------------- + 701 | 3940649673949190 +(1 row) + +SELECT setval('collections_list_key_seq', 708); + setval +--------------------------------------------------------------------- + 708 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470011 (key, ser, collection_id) VALUES ('709'::bigint, '3940649673949191'::bigint, 0) RETURNING key, ser + key | ser +--------------------------------------------------------------------- + 709 | 3940649673949191 +(1 row) + +SELECT setval('collections_list_key_seq', 709); + setval +--------------------------------------------------------------------- + 709 +(1 row) + +EXECUTE serial_prepared_local; +LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470009 (key, ser, collection_id) VALUES ('710'::bigint, '3940649673949192'::bigint, 0) RETURNING key, ser + key | ser +--------------------------------------------------------------------- + 710 | 3940649673949192 +(1 row) + +-- and, one remote test +SELECT setval('collections_list_key_seq', 10); + setval +--------------------------------------------------------------------- + 10 +(1 row) + +EXECUTE serial_prepared_local; + key | ser +--------------------------------------------------------------------- + 11 | 3940649673949193 +(1 row) + -- the final queries for the following CTEs are going to happen on the intermediate results only -- one of them will be executed remotely, and the other is locally -- Citus currently doesn't allow using task_assignment_policy for intermediate results diff --git a/src/test/regress/expected/multi_multiuser.out b/src/test/regress/expected/multi_multiuser.out index bcb132278..d88340208 100644 --- a/src/test/regress/expected/multi_multiuser.out +++ b/src/test/regress/expected/multi_multiuser.out @@ -809,6 +809,13 @@ SELECT count(*) FROM pg_merge_job_0042.task_000001; DROP TABLE pg_merge_job_0042.task_000001, pg_merge_job_0042.task_000001_merge; -- drop table so we can reuse the same files for more tests RESET ROLE; \c - - - :master_port +SELECT run_command_on_workers($$SELECT task_tracker_cleanup_job(42);$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"") + (localhost,57638,t,"") +(2 rows) + DROP SCHEMA full_access_user_schema CASCADE; NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table full_access_user_schema.t1 diff --git a/src/test/regress/expected/multi_upsert.out b/src/test/regress/expected/multi_upsert.out index 444484a10..08308aba0 100644 --- a/src/test/regress/expected/multi_upsert.out +++ b/src/test/regress/expected/multi_upsert.out @@ -153,6 +153,7 @@ INSERT INTO upsert_test_2 (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_ -- this errors out since there is no unique constraint on partition key INSERT INTO upsert_test_2 (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING; ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification +CONTEXT: while executing command on localhost:xxxxx -- create another table CREATE TABLE upsert_test_3 ( @@ -171,6 +172,7 @@ SELECT create_distributed_table('upsert_test_3', 'part_key', 'hash'); -- since there are no unique indexes, error-out INSERT INTO upsert_test_3 VALUES (1, 0) ON CONFLICT(part_key) DO UPDATE SET count = upsert_test_3.count + 1; ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification +CONTEXT: while executing command on localhost:xxxxx -- create another table CREATE TABLE upsert_test_4 ( diff --git a/src/test/regress/expected/task_tracker_cleanup_job.out b/src/test/regress/expected/task_tracker_cleanup_job.out index 8d44a9b1d..d8156de13 100644 --- a/src/test/regress/expected/task_tracker_cleanup_job.out +++ b/src/test/regress/expected/task_tracker_cleanup_job.out @@ -101,3 +101,10 @@ SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010/task_801107'); ERROR: could not stat file "base/pgsql_job_cache/job_401010/task_801107": No such file or directory SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010'); ERROR: could not stat file "base/pgsql_job_cache/job_401010": No such file or directory +-- Also clean up worker_cleanup_job_schema_cache job +SELECT task_tracker_cleanup_job(2); + task_tracker_cleanup_job +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/expected/worker_remove_files.out b/src/test/regress/expected/worker_remove_files.out new file mode 100644 index 000000000..0d2a80734 --- /dev/null +++ b/src/test/regress/expected/worker_remove_files.out @@ -0,0 +1,8 @@ +-- Clear job directory used by previous tests +\set JobId 201010 +SELECT task_tracker_cleanup_job(:JobId); + task_tracker_cleanup_job +-------------------------- + +(1 row) + diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index 8212e1bdd..c410c678e 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -30,3 +30,6 @@ test: failure_savepoints test: failure_multi_row_insert test: failure_mx_metadata_sync test: failure_connection_establishment + +# test that no tests leaked intermediate results. This should always be last +test: ensure_no_intermediate_data_leak diff --git a/src/test/regress/multi_follower_schedule b/src/test/regress/multi_follower_schedule index 9c0b63099..3e6efee6d 100644 --- a/src/test/regress/multi_follower_schedule +++ b/src/test/regress/multi_follower_schedule @@ -3,3 +3,6 @@ test: multi_follower_select_statements test: multi_follower_dml test: multi_follower_configure_followers test: multi_follower_task_tracker + +# test that no tests leaked intermediate results. This should always be last +test: ensure_no_intermediate_data_leak diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 726050b64..38d000eb4 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -42,3 +42,6 @@ test: multi_mx_transaction_recovery test: multi_mx_modifying_xacts test: multi_mx_explain test: multi_mx_reference_table + +# test that no tests leaked intermediate results. This should always be last +test: ensure_no_intermediate_data_leak diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index fe8045e77..c65cc7d7a 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -313,3 +313,8 @@ test: distributed_procedure # deparsing logic tests # --------- test: multi_deparse_function multi_deparse_procedure + +# --------- +# test that no tests leaked intermediate results. This should always be last +# --------- +test: ensure_no_intermediate_data_leak diff --git a/src/test/regress/multi_task_tracker_extra_schedule b/src/test/regress/multi_task_tracker_extra_schedule index 0e0bfd6d4..6cefdd098 100644 --- a/src/test/regress/multi_task_tracker_extra_schedule +++ b/src/test/regress/multi_task_tracker_extra_schedule @@ -110,3 +110,7 @@ test: multi_drop_extension # ---------- test: multi_schema_support +# ---------- +# test that no tests leaked intermediate results. This should always be last +# ---------- +test: ensure_no_intermediate_data_leak diff --git a/src/test/regress/sql/ensure_no_intermediate_data_leak.sql b/src/test/regress/sql/ensure_no_intermediate_data_leak.sql index 614208f43..b265a46de 100644 --- a/src/test/regress/sql/ensure_no_intermediate_data_leak.sql +++ b/src/test/regress/sql/ensure_no_intermediate_data_leak.sql @@ -1,12 +1,11 @@ ------ -- THIS TEST SHOULD IDEALLY BE EXECUTED AT THE END OF --- THE REGRESSION TEST SUITE TO MAKE SURE THAT WE +-- THE REGRESSION TEST SUITE TO MAKE SURE THAT WE -- CLEAR ALL INTERMEDIATE RESULTS ON BOTH THE COORDINATOR -- AND ON THE WORKERS. HOWEVER, WE HAVE SOME ISSUES AROUND --- WINDOWS SUPPORT, FAILURES IN TASK-TRACKER EXECUTOR --- SO WE DISABLE THIS TEST ON WINDOWS +-- WINDOWS SUPPORT SO WE DISABLE THIS TEST ON WINDOWS ------ SELECT pg_ls_dir('base/pgsql_job_cache') WHERE citus_version() NOT ILIKE '%windows%'; -SELECT run_command_on_workers($$SELECT pg_ls_dir('base/pgsql_job_cache') WHERE citus_version() NOT ILIKE '%windows%'$$); +SELECT * FROM run_command_on_workers($$SELECT pg_ls_dir('base/pgsql_job_cache') r WHERE citus_version() NOT ILIKE '%windows%'$$) WHERE result <> ''; diff --git a/src/test/regress/sql/fast_path_router_modify.sql b/src/test/regress/sql/fast_path_router_modify.sql index 98ddd476b..63ab5f3e7 100644 --- a/src/test/regress/sql/fast_path_router_modify.sql +++ b/src/test/regress/sql/fast_path_router_modify.sql @@ -53,8 +53,14 @@ UPDATE modify_fast_path SET key = 1::float WHERE key = 1; UPDATE modify_fast_path SET key = 2 WHERE key = 1; UPDATE modify_fast_path SET key = 2::numeric WHERE key = 1; --- returning is not supported via fast-path +-- returning is supported via fast-path +INSERT INTO modify_fast_path (key, value_1) VALUES (1,1); DELETE FROM modify_fast_path WHERE key = 1 RETURNING *; +INSERT INTO modify_fast_path (key, value_1) VALUES (2,1) RETURNING value_1, key; +DELETE FROM modify_fast_path WHERE key = 2 RETURNING value_1 * 15, value_1::numeric * 16; + +-- still, non-immutable functions are not supported +INSERT INTO modify_fast_path (key, value_1) VALUES (2,1) RETURNING value_1, random() * key; -- modifying ctes are not supported via fast-path WITH t1 AS (DELETE FROM modify_fast_path WHERE key = 1), t2 AS (SELECT * FROM modify_fast_path) SELECT * FROM t2; diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index c17957172..9876046ed 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -485,6 +485,65 @@ BEGIN; EXECUTE remote_prepare_param(1); COMMIT; +PREPARE local_insert_prepare_no_param AS INSERT INTO distributed_table VALUES (1+0*random(), '11',21::int) ON CONFLICT(key) DO UPDATE SET value = '29' || '28' RETURNING *, key + 1, value || '30', age * 15; +PREPARE local_insert_prepare_param (int) AS INSERT INTO distributed_table VALUES ($1+0*random(), '11',21::int) ON CONFLICT(key) DO UPDATE SET value = '29' || '28' RETURNING *, key + 1, value || '30', age * 15; +BEGIN; + -- 6 local execution without params + EXECUTE local_insert_prepare_no_param; + EXECUTE local_insert_prepare_no_param; + EXECUTE local_insert_prepare_no_param; + EXECUTE local_insert_prepare_no_param; + EXECUTE local_insert_prepare_no_param; + EXECUTE local_insert_prepare_no_param; + + -- 6 local executions with params + EXECUTE local_insert_prepare_param(1); + EXECUTE local_insert_prepare_param(5); + EXECUTE local_insert_prepare_param(6); + EXECUTE local_insert_prepare_param(1); + EXECUTE local_insert_prepare_param(5); + EXECUTE local_insert_prepare_param(6); + + -- followed by a non-local execution + EXECUTE remote_prepare_param(2); +COMMIT; + +PREPARE local_multi_row_insert_prepare_no_param AS + INSERT INTO distributed_table VALUES (1,'55', 21), (5,'15',33) ON CONFLICT (key) WHERE key > 3 and key < 4 DO UPDATE SET value = '88' || EXCLUDED.value; + +PREPARE local_multi_row_insert_prepare_no_param_multi_shard AS + INSERT INTO distributed_table VALUES (6,'55', 21), (5,'15',33) ON CONFLICT (key) WHERE key > 3 AND key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;; + +PREPARE local_multi_row_insert_prepare_params(int,int) AS + INSERT INTO distributed_table VALUES ($1,'55', 21), ($2,'15',33) ON CONFLICT (key) WHERE key > 3 and key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;; +INSERT INTO reference_table VALUES (11); +BEGIN; + EXECUTE local_multi_row_insert_prepare_no_param; + EXECUTE local_multi_row_insert_prepare_no_param; + EXECUTE local_multi_row_insert_prepare_no_param; + EXECUTE local_multi_row_insert_prepare_no_param; + EXECUTE local_multi_row_insert_prepare_no_param; + EXECUTE local_multi_row_insert_prepare_no_param; + + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + EXECUTE local_multi_row_insert_prepare_no_param_multi_shard; + + EXECUTE local_multi_row_insert_prepare_params(1,6); + EXECUTE local_multi_row_insert_prepare_params(1,5); + EXECUTE local_multi_row_insert_prepare_params(6,5); + EXECUTE local_multi_row_insert_prepare_params(5,1); + EXECUTE local_multi_row_insert_prepare_params(5,6); + EXECUTE local_multi_row_insert_prepare_params(5,1); + + -- one task is remote + EXECUTE local_multi_row_insert_prepare_params(5,11); +ROLLBACK; + + -- failures of local execution should rollback both the -- local execution and remote executions @@ -619,6 +678,32 @@ BEGIN; SELECT * FROM collections_list ORDER BY 1,2,3,4; COMMIT; + +TRUNCATE collections_list; + +-- make sure that even if local execution is used, the sequence values +-- are generated locally +ALTER SEQUENCE collections_list_key_seq NO MINVALUE NO MAXVALUE; + +PREPARE serial_prepared_local AS INSERT INTO collections_list (collection_id) VALUES (0) RETURNING key, ser; + +SELECT setval('collections_list_key_seq', 4); +EXECUTE serial_prepared_local; +SELECT setval('collections_list_key_seq', 5); +EXECUTE serial_prepared_local; +SELECT setval('collections_list_key_seq', 499); +EXECUTE serial_prepared_local; +SELECT setval('collections_list_key_seq', 700); +EXECUTE serial_prepared_local; +SELECT setval('collections_list_key_seq', 708); +EXECUTE serial_prepared_local; +SELECT setval('collections_list_key_seq', 709); +EXECUTE serial_prepared_local; + +-- and, one remote test +SELECT setval('collections_list_key_seq', 10); +EXECUTE serial_prepared_local; + -- the final queries for the following CTEs are going to happen on the intermediate results only -- one of them will be executed remotely, and the other is locally -- Citus currently doesn't allow using task_assignment_policy for intermediate results diff --git a/src/test/regress/sql/multi_multiuser.sql b/src/test/regress/sql/multi_multiuser.sql index 76ba3ee50..651e489e5 100644 --- a/src/test/regress/sql/multi_multiuser.sql +++ b/src/test/regress/sql/multi_multiuser.sql @@ -488,6 +488,7 @@ RESET ROLE; \c - - - :master_port +SELECT run_command_on_workers($$SELECT task_tracker_cleanup_job(42);$$); DROP SCHEMA full_access_user_schema CASCADE; DROP TABLE diff --git a/src/test/regress/sql/task_tracker_cleanup_job.sql b/src/test/regress/sql/task_tracker_cleanup_job.sql index 9a8b9af82..ffabf1eed 100644 --- a/src/test/regress/sql/task_tracker_cleanup_job.sql +++ b/src/test/regress/sql/task_tracker_cleanup_job.sql @@ -46,3 +46,6 @@ SELECT task_tracker_task_status(:JobId, :RunningTaskId); SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010/task_801107'); SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010'); + +-- Also clean up worker_cleanup_job_schema_cache job +SELECT task_tracker_cleanup_job(2); diff --git a/src/test/regress/sql/worker_remove_files.sql b/src/test/regress/sql/worker_remove_files.sql new file mode 100644 index 000000000..9b3ce85ba --- /dev/null +++ b/src/test/regress/sql/worker_remove_files.sql @@ -0,0 +1,5 @@ +-- Clear job directory used by previous tests + +\set JobId 201010 + +SELECT task_tracker_cleanup_job(:JobId); diff --git a/src/test/regress/worker_schedule b/src/test/regress/worker_schedule index e3b3d7a51..031bdff90 100644 --- a/src/test/regress/worker_schedule +++ b/src/test/regress/worker_schedule @@ -19,6 +19,7 @@ test: worker_hash_partition worker_hash_partition_complex test: worker_merge_range_files worker_merge_hash_files test: worker_binary_data_partition worker_null_data_partition test: worker_check_invalid_arguments +test: worker_remove_files # ---------- # All task tracker tests use the following tables @@ -26,3 +27,8 @@ test: worker_check_invalid_arguments test: task_tracker_create_table test: task_tracker_assign_task task_tracker_partition_task test: task_tracker_cleanup_job + +# --------- +# test that no tests leaked intermediate results. This should always be last +# --------- +test: ensure_no_intermediate_data_leak