From fa88046ce1ebd3e18be24d685e72210a54193390 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Thu, 9 Apr 2020 12:17:41 +0300 Subject: [PATCH] test that we don't leak intermediate schemas (#3737) * test that we don't leak intermediate schemas We have tests to make sure that we don't intermediate any intermediate files, tables etc but we don't test if we are leaking schemas. It makes sense to test this as well. * remove all repartition schemas in case of error This solution is not an ideal one but it seems to be doing the job. We should have a more generic solution for the cleanup but it seems that putting the cleanup in the abort handler is dangerous and it was crashing. --- .../executor/repartition_join_execution.c | 22 +++++++++++++++++ .../transaction/transaction_management.c | 1 + .../ensure_no_intermediate_data_leak.out | 24 +++++++++++++++++++ .../sql/ensure_no_intermediate_data_leak.sql | 15 ++++++++++++ 4 files changed, 62 insertions(+) diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 751b0bc87..6ccd9db4e 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -32,6 +32,7 @@ #include "distributed/adaptive_executor.h" #include "distributed/directed_acyclic_graph_execution.h" #include "distributed/listutils.h" +#include "distributed/local_executor.h" #include "distributed/metadata_cache.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_server_executor.h" @@ -49,6 +50,7 @@ static void TraverseJobTree(Job *curJob, List **jobs); static char * GenerateCreateSchemasCommand(List *jobIds, char *schemaOwner); static char * GenerateJobCommands(List *jobIds, char *templateCommand); static char * GenerateDeleteJobsCommand(List *jobIds); +static void EnsureCompatibleLocalExecutionState(List *taskList); /* @@ -63,6 +65,8 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob) List *allTasks = TaskAndExecutionList(topLevelTasks); + EnsureCompatibleLocalExecutionState(allTasks); + List *jobIds = CreateTemporarySchemasForMergeTasks(topLevelJob); ExecuteTasksInDependencyOrder(allTasks, topLevelTasks, jobIds); @@ -71,6 +75,24 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob) } +/* + * EnsureCompatibleLocalExecutionState makes sure that the tasks won't have + * any visibility problems because of local execution. + */ +static void +EnsureCompatibleLocalExecutionState(List *taskList) +{ + /* + * We have TransactionAccessedLocalPlacement check here to avoid unnecessarily + * iterating the task list in AnyTaskAccessesLocalNode. + */ + if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(taskList)) + { + ErrorIfTransactionAccessedPlacementsLocally(); + } +} + + /* * CreateTemporarySchemasForMergeTasks creates the necessary schemas that will be used * later in each worker. Single transaction is used to create the schemas. diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 803851ff2..cc235ac32 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -28,6 +28,7 @@ #include "distributed/listutils.h" #include "distributed/local_executor.h" #include "distributed/multi_executor.h" +#include "distributed/repartition_join_execution.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" #include "distributed/subplan_execution.h" diff --git a/src/test/regress/expected/ensure_no_intermediate_data_leak.out b/src/test/regress/expected/ensure_no_intermediate_data_leak.out index 1b5aa7e22..e1e7e1be1 100644 --- a/src/test/regress/expected/ensure_no_intermediate_data_leak.out +++ b/src/test/regress/expected/ensure_no_intermediate_data_leak.out @@ -28,3 +28,27 @@ $$) WHERE result <> ''; --------------------------------------------------------------------- (0 rows) +-- ensure that we didn't leak any schemas in repartition joins +SELECT nspname +FROM pg_catalog.pg_namespace +WHERE nspname like 'pg_merge_job%'; + nspname +--------------------------------------------------------------------- +(0 rows) + +\c - - - :worker_1_port +SELECT nspname +FROM pg_catalog.pg_namespace +WHERE nspname like 'pg_merge_job%'; + nspname +--------------------------------------------------------------------- +(0 rows) + +\c - - - :worker_2_port +SELECT nspname +FROM pg_catalog.pg_namespace +WHERE nspname like 'pg_merge_job%'; + nspname +--------------------------------------------------------------------- +(0 rows) + diff --git a/src/test/regress/sql/ensure_no_intermediate_data_leak.sql b/src/test/regress/sql/ensure_no_intermediate_data_leak.sql index 0f0b5bc18..06cc05bf4 100644 --- a/src/test/regress/sql/ensure_no_intermediate_data_leak.sql +++ b/src/test/regress/sql/ensure_no_intermediate_data_leak.sql @@ -24,3 +24,18 @@ SELECT * FROM run_command_on_workers($$ SELECT array_agg((xact_dirs.dir, result_files.result_file)) FROM xact_dirs LEFT OUTER JOIN result_files ON xact_dirs.dir = result_files.dir; $$) WHERE result <> ''; + +-- ensure that we didn't leak any schemas in repartition joins +SELECT nspname +FROM pg_catalog.pg_namespace +WHERE nspname like 'pg_merge_job%'; + +\c - - - :worker_1_port +SELECT nspname +FROM pg_catalog.pg_namespace +WHERE nspname like 'pg_merge_job%'; + +\c - - - :worker_2_port +SELECT nspname +FROM pg_catalog.pg_namespace +WHERE nspname like 'pg_merge_job%';