diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index f4be6263c..9f417166f 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -117,6 +117,14 @@ broadcast_intermediate_result(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); + /* + * Make sure that this transaction has a distributed transaction ID. + * + * Intermediate results will be stored in a directory that is derived + * from the distributed transaction ID. + */ + BeginOrContinueCoordinatedTransaction(); + nodeList = ActivePrimaryNodeList(); estate = CreateExecutorState(); resultDest = (RemoteFileDestReceiver *) CreateRemoteFileDestReceiver(resultIdString, @@ -151,6 +159,14 @@ create_intermediate_result(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); + /* + * Make sure that this transaction has a distributed transaction ID. + * + * Intermediate results will be stored in a directory that is derived + * from the distributed transaction ID. + */ + BeginOrContinueCoordinatedTransaction(); + estate = CreateExecutorState(); resultDest = (RemoteFileDestReceiver *) CreateRemoteFileDestReceiver(resultIdString, estate, nodeList, @@ -167,7 +183,9 @@ create_intermediate_result(PG_FUNCTION_ARGS) /* * CreateRemoteFileDestReceiver creates a DestReceiver that streams results - * to a set of worker nodes. + * to a set of worker nodes. If the scope of the intermediate result is a + * distributed transaction, then it's up to the caller to ensure that a + * coordinated transaction is started prior to using the DestReceiver. */ DestReceiver * CreateRemoteFileDestReceiver(char *resultId, EState *executorState, @@ -231,19 +249,6 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); - /* - * Make sure that this transaction has a distributed transaction ID. - * - * Intermediate results will be stored in a directory that is derived from - * the distributed transaction ID across all workers and on the coordinator - * itself. Even if we only store results locally, we still want to assign - * a transaction ID in case we later store results on workers. - * - * When we start using broadcast_intermediate_result from workers, we - * need to make sure that we don't override the transaction ID here. - */ - BeginOrContinueCoordinatedTransaction(); - if (resultDest->writeLocalFile) { const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY); diff --git a/src/backend/distributed/executor/subplan_execution.c b/src/backend/distributed/executor/subplan_execution.c index 72a5860a5..2ce2ac3ae 100644 --- a/src/backend/distributed/executor/subplan_execution.c +++ b/src/backend/distributed/executor/subplan_execution.c @@ -15,6 +15,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/recursive_planning.h" #include "distributed/subplan_execution.h" +#include "distributed/transaction_management.h" #include "distributed/worker_manager.h" #include "executor/executor.h" @@ -43,6 +44,14 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) return; } + /* + * Make sure that this transaction has a distributed transaction ID. + * + * Intermediate results of subplans will be stored in a directory that is + * derived from the distributed transaction ID. + */ + BeginOrContinueCoordinatedTransaction(); + nodeList = ActiveReadableNodeList(); foreach(subPlanCell, subPlanList) diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 2ba2565c2..7c20eeaba 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -297,6 +297,16 @@ ERROR: relation bidders is not distributed -- commands containing a CTE are supported WITH new_orders AS (INSERT INTO limit_orders VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66)) DELETE FROM limit_orders WHERE id < 0; +-- we have to be careful that modifying CTEs are part of the transaction and can thus roll back +WITH new_orders AS (INSERT INTO limit_orders VALUES (412, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66)) +DELETE FROM limit_orders RETURNING id / 0; +ERROR: division by zero +CONTEXT: while executing command on localhost:57638 +SELECT * FROM limit_orders WHERE id = 412; + id | symbol | bidder_id | placed_at | kind | limit_price +----+--------+-----------+-----------+------+------------- +(0 rows) + INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); -- simple UPDATE UPDATE limit_orders SET symbol = 'GM' WHERE id = 246; diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index acde8543e..fd7f12f12 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -201,6 +201,11 @@ DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND WITH new_orders AS (INSERT INTO limit_orders VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66)) DELETE FROM limit_orders WHERE id < 0; +-- we have to be careful that modifying CTEs are part of the transaction and can thus roll back +WITH new_orders AS (INSERT INTO limit_orders VALUES (412, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66)) +DELETE FROM limit_orders RETURNING id / 0; +SELECT * FROM limit_orders WHERE id = 412; + INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); -- simple UPDATE