send schema creation/cleanup to coordinator in repartitions

We were using ALL_WORKERS TargetWorkerSet while sending temporary schema
creation and cleanup. We(well mostly I) thought that ALL_WORKERS would also include coordinator when it is added as a worker. It turns out that it was FILTERING OUT the coordinator even if it is added as a worker to the cluster.

So to have some context here, in repartitions, for each jobId we create
(at least we were supposed to) a schema in each worker node in the cluster. Then we partition each shard table into some intermediate files, which is called the PARTITION step. So after this partition step each node has some intermediate files having tuples in those nodes. Then we fetch the partition files to necessary worker nodes, which is called the FETCH step. Then from the files we create intermediate tables in the temporarily created schemas, which is called a MERGE step. Then after evaluating the result, we remove the temporary schemas(one for each job ID in each node) and files.

If node 1 has file1, and node 2 has file2 after PARTITION step, it is
enough to either move file1 from node1 to node2 or vice versa. So we
prune one of them.

In the MERGE step, if the schema for a given jobID doesn't exist, the
node tries to use the `public` schema if it is a superuser, which is
actually added for testing in the past.

So when we were not sending schema creation comands for each job ID to
the coordinator(because we were using ALL_WORKERS flag, and it doesn't
include the coordinator), we would basically not have any schemas for
repartitions in the coordinator. The PARTITION step would be executed on
the coordinator (because the tasks are generated in the planner part)
and it wouldn't give us any error because it doesn't have anything to do
with the temporary schemas(that we didn't create). But later two things
would happen:

- If by chance the fetch is pruned on the coordinator side, we the other
nodes would fetch the partitioned files from the coordinator and execute
the query as expected, because it has all the information.
- If the fetch tasks are not pruned in the coordinator, in the MERGE
step, the coordinator would either error out saying that the necessary
schema doesn't exist, or it would try to create the temporary tables
under public schema ( if it is a superuser). But then if we had the same
task ID with different jobID it would fail saying that the table already
exists, which is an error we were getting.

In the first case, the query would work okay, but it would still not do
the cleanup, hence we would leave the partitioned files from the
PARTITION step there. Hence ensure_no_intermediate_data_leak would fail.

To make things more explicit and prevent such bugs in the future,
ALL_WORKERS is named as ALL_NON_COORD_WORKERS. And a new flag to return
all the active nodes is added as ALL_DATA_NODES. For repartition case,
we don't use the only-reference table nodes but this version makes the
code simpler and there shouldn't be any significant performance issue
with that.
pull/4005/head
Sait Talha Nisanci 2020-07-08 21:05:58 +03:00
parent 76ddb85545
commit db1b78148c
5 changed files with 39 additions and 19 deletions

View File

@ -104,7 +104,7 @@ CreateTemporarySchemasForMergeTasks(Job *topLeveLJob)
{
List *jobIds = ExtractJobsInJobTree(topLeveLJob);
char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName());
SendCommandToWorkersInParallel(NON_COORDINATOR_NODES, createSchemasCommand,
SendCommandToWorkersInParallel(ALL_DATA_NODES, createSchemasCommand,
CitusExtensionOwnerName());
return jobIds;
}
@ -191,8 +191,8 @@ GenerateJobCommands(List *jobIds, char *templateCommand)
void
DoRepartitionCleanup(List *jobIds)
{
SendCommandToWorkersOptionalInParallel(NON_COORDINATOR_NODES,
GenerateDeleteJobsCommand(jobIds),
SendCommandToWorkersOptionalInParallel(ALL_DATA_NODES, GenerateDeleteJobsCommand(
jobIds),
CitusExtensionOwnerName());
}

View File

@ -130,7 +130,15 @@ SendCommandToWorkersWithMetadata(const char *command)
List *
TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
{
List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode);
List *workerNodeList = NIL;
if (targetWorkerSet == ALL_DATA_NODES)
{
workerNodeList = ActivePrimaryNodeList(lockMode);
}
else
{
workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode);
}
List *result = NIL;
WorkerNode *workerNode = NULL;

View File

@ -24,7 +24,8 @@
typedef enum TargetWorkerSet
{
NON_COORDINATOR_METADATA_NODES,
NON_COORDINATOR_NODES
NON_COORDINATOR_NODES,
ALL_DATA_NODES
} TargetWorkerSet;

View File

@ -198,13 +198,23 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato
(1 row)
ROLLBACK;
-- Commented out since it currently does not clean up task files on the
-- coordinator
-- See this issue for details: https://github.com/citusdata/citus/issues/3996
-- BEGIN;
-- SET citus.enable_repartition_joins TO ON;
-- SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
-- ROLLBACK;
-- repartition queries should work fine
SET citus.enable_repartition_joins TO ON;
SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
count
---------------------------------------------------------------------
100
(1 row)
BEGIN;
SET citus.enable_repartition_joins TO ON;
SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
count
---------------------------------------------------------------------
100
(1 row)
END;
BEGIN;
SET citus.enable_repartition_joins TO ON;
-- trigger local execution

View File

@ -87,13 +87,14 @@ SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none');
SELECT count(*) FROM dist_table;
ROLLBACK;
-- Commented out since it currently does not clean up task files on the
-- coordinator
-- See this issue for details: https://github.com/citusdata/citus/issues/3996
-- BEGIN;
-- SET citus.enable_repartition_joins TO ON;
-- SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
-- ROLLBACK;
-- repartition queries should work fine
SET citus.enable_repartition_joins TO ON;
SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
BEGIN;
SET citus.enable_repartition_joins TO ON;
SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
END;
BEGIN;
SET citus.enable_repartition_joins TO ON;