test that we don't leak intermediate schemas (#3737)

* test that we don't leak intermediate schemas

We have tests to make sure that we don't intermediate any intermediate
files, tables etc but we don't test if we are leaking schemas. It makes
sense to test this as well.

* remove all repartition schemas in case of error

This solution is not an ideal one but it seems to be doing the job.
We should have a more generic solution for the cleanup but it seems that
putting the cleanup in the abort handler is dangerous and it was
crashing.
pull/3730/head
SaitTalhaNisanci 2020-04-09 12:17:41 +03:00 committed by GitHub
parent 362d72853c
commit fa88046ce1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 62 additions and 0 deletions

View File

@ -32,6 +32,7 @@
#include "distributed/adaptive_executor.h"
#include "distributed/directed_acyclic_graph_execution.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
@ -49,6 +50,7 @@ static void TraverseJobTree(Job *curJob, List **jobs);
static char * GenerateCreateSchemasCommand(List *jobIds, char *schemaOwner);
static char * GenerateJobCommands(List *jobIds, char *templateCommand);
static char * GenerateDeleteJobsCommand(List *jobIds);
static void EnsureCompatibleLocalExecutionState(List *taskList);
/*
@ -63,6 +65,8 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob)
List *allTasks = TaskAndExecutionList(topLevelTasks);
EnsureCompatibleLocalExecutionState(allTasks);
List *jobIds = CreateTemporarySchemasForMergeTasks(topLevelJob);
ExecuteTasksInDependencyOrder(allTasks, topLevelTasks, jobIds);
@ -71,6 +75,24 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob)
}
/*
* EnsureCompatibleLocalExecutionState makes sure that the tasks won't have
* any visibility problems because of local execution.
*/
static void
EnsureCompatibleLocalExecutionState(List *taskList)
{
/*
* We have TransactionAccessedLocalPlacement check here to avoid unnecessarily
* iterating the task list in AnyTaskAccessesLocalNode.
*/
if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(taskList))
{
ErrorIfTransactionAccessedPlacementsLocally();
}
}
/*
* CreateTemporarySchemasForMergeTasks creates the necessary schemas that will be used
* later in each worker. Single transaction is used to create the schemas.

View File

@ -28,6 +28,7 @@
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/repartition_join_execution.h"
#include "distributed/transaction_management.h"
#include "distributed/placement_connection.h"
#include "distributed/subplan_execution.h"

View File

@ -28,3 +28,27 @@ $$) WHERE result <> '';
---------------------------------------------------------------------
(0 rows)
-- ensure that we didn't leak any schemas in repartition joins
SELECT nspname
FROM pg_catalog.pg_namespace
WHERE nspname like 'pg_merge_job%';
nspname
---------------------------------------------------------------------
(0 rows)
\c - - - :worker_1_port
SELECT nspname
FROM pg_catalog.pg_namespace
WHERE nspname like 'pg_merge_job%';
nspname
---------------------------------------------------------------------
(0 rows)
\c - - - :worker_2_port
SELECT nspname
FROM pg_catalog.pg_namespace
WHERE nspname like 'pg_merge_job%';
nspname
---------------------------------------------------------------------
(0 rows)

View File

@ -24,3 +24,18 @@ SELECT * FROM run_command_on_workers($$
SELECT array_agg((xact_dirs.dir, result_files.result_file)) FROM xact_dirs LEFT OUTER JOIN result_files ON xact_dirs.dir = result_files.dir;
$$) WHERE result <> '';
-- ensure that we didn't leak any schemas in repartition joins
SELECT nspname
FROM pg_catalog.pg_namespace
WHERE nspname like 'pg_merge_job%';
\c - - - :worker_1_port
SELECT nspname
FROM pg_catalog.pg_namespace
WHERE nspname like 'pg_merge_job%';
\c - - - :worker_2_port
SELECT nspname
FROM pg_catalog.pg_namespace
WHERE nspname like 'pg_merge_job%';