From c238e6c8b0a414d4409e69930291d76e4dfa691f Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Wed, 8 Jul 2020 21:05:58 +0300 Subject: [PATCH] send schema creation/cleanup to coordinator in repartitions We were using ALL_WORKERS TargetWorkerSet while sending temporary schema creation and cleanup. We(well mostly I) thought that ALL_WORKERS would also include coordinator when it is added as a worker. It turns out that it was FILTERING OUT the coordinator even if it is added as a worker to the cluster. So to have some context here, in repartitions, for each jobId we create (at least we were supposed to) a schema in each worker node in the cluster. Then we partition each shard table into some intermediate files, which is called the PARTITION step. So after this partition step each node has some intermediate files having tuples in those nodes. Then we fetch the partition files to necessary worker nodes, which is called the FETCH step. Then from the files we create intermediate tables in the temporarily created schemas, which is called a MERGE step. Then after evaluating the result, we remove the temporary schemas(one for each job ID in each node) and files. If node 1 has file1, and node 2 has file2 after PARTITION step, it is enough to either move file1 from node1 to node2 or vice versa. So we prune one of them. In the MERGE step, if the schema for a given jobID doesn't exist, the node tries to use the `public` schema if it is a superuser, which is actually added for testing in the past. So when we were not sending schema creation comands for each job ID to the coordinator(because we were using ALL_WORKERS flag, and it doesn't include the coordinator), we would basically not have any schemas for repartitions in the coordinator. The PARTITION step would be executed on the coordinator (because the tasks are generated in the planner part) and it wouldn't give us any error because it doesn't have anything to do with the temporary schemas(that we didn't create). But later two things would happen: - If by chance the fetch is pruned on the coordinator side, we the other nodes would fetch the partitioned files from the coordinator and execute the query as expected, because it has all the information. - If the fetch tasks are not pruned in the coordinator, in the MERGE step, the coordinator would either error out saying that the necessary schema doesn't exist, or it would try to create the temporary tables under public schema ( if it is a superuser). But then if we had the same task ID with different jobID it would fail saying that the table already exists, which is an error we were getting. In the first case, the query would work okay, but it would still not do the cleanup, hence we would leave the partitioned files from the PARTITION step there. Hence ensure_no_intermediate_data_leak would fail. To make things more explicit and prevent such bugs in the future, ALL_WORKERS is named as ALL_NON_COORD_WORKERS. And a new flag to return all the active nodes is added as ALL_DATA_NODES. For repartition case, we don't use the only-reference table nodes but this version makes the code simpler and there shouldn't be any significant performance issue with that. (cherry picked from commit 6532506f4b92b1316eea0812b2bcedb818d3b25c) --- .../executor/repartition_join_execution.c | 6 ++--- .../transaction/worker_transaction.c | 10 +++++++- src/include/distributed/worker_transaction.h | 3 ++- .../expected/coordinator_shouldhaveshards.out | 24 +++++++++++++------ .../sql/coordinator_shouldhaveshards.sql | 15 ++++++------ 5 files changed, 39 insertions(+), 19 deletions(-) diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 696c1ad9f..7809008de 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -104,7 +104,7 @@ CreateTemporarySchemasForMergeTasks(Job *topLeveLJob) { List *jobIds = ExtractJobsInJobTree(topLeveLJob); char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName()); - SendCommandToWorkersInParallel(NON_COORDINATOR_NODES, createSchemasCommand, + SendCommandToWorkersInParallel(ALL_DATA_NODES, createSchemasCommand, CitusExtensionOwnerName()); return jobIds; } @@ -191,8 +191,8 @@ GenerateJobCommands(List *jobIds, char *templateCommand) void DoRepartitionCleanup(List *jobIds) { - SendCommandToWorkersOptionalInParallel(NON_COORDINATOR_NODES, - GenerateDeleteJobsCommand(jobIds), + SendCommandToWorkersOptionalInParallel(ALL_DATA_NODES, GenerateDeleteJobsCommand( + jobIds), CitusExtensionOwnerName()); } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 0c0dad87d..186d04a97 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -198,7 +198,15 @@ SendOptionalCommandListToAllWorkers(List *commandList, const char *superuser) List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { - List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); + List *workerNodeList = NIL; + if (targetWorkerSet == ALL_DATA_NODES) + { + workerNodeList = ActivePrimaryNodeList(lockMode); + } + else + { + workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); + } List *result = NIL; WorkerNode *workerNode = NULL; diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index e3018a599..305da0a13 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -24,7 +24,8 @@ typedef enum TargetWorkerSet { NON_COORDINATOR_METADATA_NODES, - NON_COORDINATOR_NODES + NON_COORDINATOR_NODES, + ALL_DATA_NODES } TargetWorkerSet; diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 58ad5cb09..40f239547 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -198,13 +198,23 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato (1 row) ROLLBACK; --- Commented out since it currently does not clean up task files on the --- coordinator --- See this issue for details: https://github.com/citusdata/citus/issues/3996 --- BEGIN; --- SET citus.enable_repartition_joins TO ON; --- SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; --- ROLLBACK; +-- repartition queries should work fine +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; + count +--------------------------------------------------------------------- + 100 +(1 row) + +BEGIN; +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; + count +--------------------------------------------------------------------- + 100 +(1 row) + +END; BEGIN; SET citus.enable_repartition_joins TO ON; -- trigger local execution diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 8816579d2..0707be471 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -87,13 +87,14 @@ SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none'); SELECT count(*) FROM dist_table; ROLLBACK; --- Commented out since it currently does not clean up task files on the --- coordinator --- See this issue for details: https://github.com/citusdata/citus/issues/3996 --- BEGIN; --- SET citus.enable_repartition_joins TO ON; --- SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; --- ROLLBACK; +-- repartition queries should work fine +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; + +BEGIN; +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; +END; BEGIN; SET citus.enable_repartition_joins TO ON;