diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 751b0bc87..6ccd9db4e 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -32,6 +32,7 @@ #include "distributed/adaptive_executor.h" #include "distributed/directed_acyclic_graph_execution.h" #include "distributed/listutils.h" +#include "distributed/local_executor.h" #include "distributed/metadata_cache.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_server_executor.h" @@ -49,6 +50,7 @@ static void TraverseJobTree(Job *curJob, List **jobs); static char * GenerateCreateSchemasCommand(List *jobIds, char *schemaOwner); static char * GenerateJobCommands(List *jobIds, char *templateCommand); static char * GenerateDeleteJobsCommand(List *jobIds); +static void EnsureCompatibleLocalExecutionState(List *taskList); /* @@ -63,6 +65,8 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob) List *allTasks = TaskAndExecutionList(topLevelTasks); + EnsureCompatibleLocalExecutionState(allTasks); + List *jobIds = CreateTemporarySchemasForMergeTasks(topLevelJob); ExecuteTasksInDependencyOrder(allTasks, topLevelTasks, jobIds); @@ -71,6 +75,24 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob) } +/* + * EnsureCompatibleLocalExecutionState makes sure that the tasks won't have + * any visibility problems because of local execution. + */ +static void +EnsureCompatibleLocalExecutionState(List *taskList) +{ + /* + * We have TransactionAccessedLocalPlacement check here to avoid unnecessarily + * iterating the task list in AnyTaskAccessesLocalNode. + */ + if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(taskList)) + { + ErrorIfTransactionAccessedPlacementsLocally(); + } +} + + /* * CreateTemporarySchemasForMergeTasks creates the necessary schemas that will be used * later in each worker. Single transaction is used to create the schemas. diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 803851ff2..cc235ac32 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -28,6 +28,7 @@ #include "distributed/listutils.h" #include "distributed/local_executor.h" #include "distributed/multi_executor.h" +#include "distributed/repartition_join_execution.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" #include "distributed/subplan_execution.h" diff --git a/src/test/regress/expected/ensure_no_intermediate_data_leak.out b/src/test/regress/expected/ensure_no_intermediate_data_leak.out index 1b5aa7e22..e1e7e1be1 100644 --- a/src/test/regress/expected/ensure_no_intermediate_data_leak.out +++ b/src/test/regress/expected/ensure_no_intermediate_data_leak.out @@ -28,3 +28,27 @@ $$) WHERE result <> ''; --------------------------------------------------------------------- (0 rows) +-- ensure that we didn't leak any schemas in repartition joins +SELECT nspname +FROM pg_catalog.pg_namespace +WHERE nspname like 'pg_merge_job%'; + nspname +--------------------------------------------------------------------- +(0 rows) + +\c - - - :worker_1_port +SELECT nspname +FROM pg_catalog.pg_namespace +WHERE nspname like 'pg_merge_job%'; + nspname +--------------------------------------------------------------------- +(0 rows) + +\c - - - :worker_2_port +SELECT nspname +FROM pg_catalog.pg_namespace +WHERE nspname like 'pg_merge_job%'; + nspname +--------------------------------------------------------------------- +(0 rows) + diff --git a/src/test/regress/sql/ensure_no_intermediate_data_leak.sql b/src/test/regress/sql/ensure_no_intermediate_data_leak.sql index 0f0b5bc18..06cc05bf4 100644 --- a/src/test/regress/sql/ensure_no_intermediate_data_leak.sql +++ b/src/test/regress/sql/ensure_no_intermediate_data_leak.sql @@ -24,3 +24,18 @@ SELECT * FROM run_command_on_workers($$ SELECT array_agg((xact_dirs.dir, result_files.result_file)) FROM xact_dirs LEFT OUTER JOIN result_files ON xact_dirs.dir = result_files.dir; $$) WHERE result <> ''; + +-- ensure that we didn't leak any schemas in repartition joins +SELECT nspname +FROM pg_catalog.pg_namespace +WHERE nspname like 'pg_merge_job%'; + +\c - - - :worker_1_port +SELECT nspname +FROM pg_catalog.pg_namespace +WHERE nspname like 'pg_merge_job%'; + +\c - - - :worker_2_port +SELECT nspname +FROM pg_catalog.pg_namespace +WHERE nspname like 'pg_merge_job%';