Merge pull request #2151 from citusdata/fix_cte_xact

Ensure sigle-shard modifying CTEs are part of distributed transaction
pull/2154/head
Marco Slot 2018-05-08 14:04:40 +02:00 committed by GitHub
commit b86d6eb544
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 43 additions and 14 deletions

View File

@ -117,6 +117,14 @@ broadcast_intermediate_result(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
/*
* Make sure that this transaction has a distributed transaction ID.
*
* Intermediate results will be stored in a directory that is derived
* from the distributed transaction ID.
*/
BeginOrContinueCoordinatedTransaction();
nodeList = ActivePrimaryNodeList(); nodeList = ActivePrimaryNodeList();
estate = CreateExecutorState(); estate = CreateExecutorState();
resultDest = (RemoteFileDestReceiver *) CreateRemoteFileDestReceiver(resultIdString, resultDest = (RemoteFileDestReceiver *) CreateRemoteFileDestReceiver(resultIdString,
@ -151,6 +159,14 @@ create_intermediate_result(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
/*
* Make sure that this transaction has a distributed transaction ID.
*
* Intermediate results will be stored in a directory that is derived
* from the distributed transaction ID.
*/
BeginOrContinueCoordinatedTransaction();
estate = CreateExecutorState(); estate = CreateExecutorState();
resultDest = (RemoteFileDestReceiver *) CreateRemoteFileDestReceiver(resultIdString, resultDest = (RemoteFileDestReceiver *) CreateRemoteFileDestReceiver(resultIdString,
estate, nodeList, estate, nodeList,
@ -167,7 +183,9 @@ create_intermediate_result(PG_FUNCTION_ARGS)
/* /*
* CreateRemoteFileDestReceiver creates a DestReceiver that streams results * CreateRemoteFileDestReceiver creates a DestReceiver that streams results
* to a set of worker nodes. * to a set of worker nodes. If the scope of the intermediate result is a
* distributed transaction, then it's up to the caller to ensure that a
* coordinated transaction is started prior to using the DestReceiver.
*/ */
DestReceiver * DestReceiver *
CreateRemoteFileDestReceiver(char *resultId, EState *executorState, CreateRemoteFileDestReceiver(char *resultId, EState *executorState,
@ -231,19 +249,6 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
copyOutState->binary); copyOutState->binary);
/*
* Make sure that this transaction has a distributed transaction ID.
*
* Intermediate results will be stored in a directory that is derived from
* the distributed transaction ID across all workers and on the coordinator
* itself. Even if we only store results locally, we still want to assign
* a transaction ID in case we later store results on workers.
*
* When we start using broadcast_intermediate_result from workers, we
* need to make sure that we don't override the transaction ID here.
*/
BeginOrContinueCoordinatedTransaction();
if (resultDest->writeLocalFile) if (resultDest->writeLocalFile)
{ {
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY); const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);

View File

@ -15,6 +15,7 @@
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/recursive_planning.h" #include "distributed/recursive_planning.h"
#include "distributed/subplan_execution.h" #include "distributed/subplan_execution.h"
#include "distributed/transaction_management.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "executor/executor.h" #include "executor/executor.h"
@ -43,6 +44,14 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
return; return;
} }
/*
* Make sure that this transaction has a distributed transaction ID.
*
* Intermediate results of subplans will be stored in a directory that is
* derived from the distributed transaction ID.
*/
BeginOrContinueCoordinatedTransaction();
nodeList = ActiveReadableNodeList(); nodeList = ActiveReadableNodeList();
foreach(subPlanCell, subPlanList) foreach(subPlanCell, subPlanList)

View File

@ -297,6 +297,16 @@ ERROR: relation bidders is not distributed
-- commands containing a CTE are supported -- commands containing a CTE are supported
WITH new_orders AS (INSERT INTO limit_orders VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66)) WITH new_orders AS (INSERT INTO limit_orders VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66))
DELETE FROM limit_orders WHERE id < 0; DELETE FROM limit_orders WHERE id < 0;
-- we have to be careful that modifying CTEs are part of the transaction and can thus roll back
WITH new_orders AS (INSERT INTO limit_orders VALUES (412, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66))
DELETE FROM limit_orders RETURNING id / 0;
ERROR: division by zero
CONTEXT: while executing command on localhost:57638
SELECT * FROM limit_orders WHERE id = 412;
id | symbol | bidder_id | placed_at | kind | limit_price
----+--------+-----------+-----------+------+-------------
(0 rows)
INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69);
-- simple UPDATE -- simple UPDATE
UPDATE limit_orders SET symbol = 'GM' WHERE id = 246; UPDATE limit_orders SET symbol = 'GM' WHERE id = 246;

View File

@ -201,6 +201,11 @@ DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND
WITH new_orders AS (INSERT INTO limit_orders VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66)) WITH new_orders AS (INSERT INTO limit_orders VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66))
DELETE FROM limit_orders WHERE id < 0; DELETE FROM limit_orders WHERE id < 0;
-- we have to be careful that modifying CTEs are part of the transaction and can thus roll back
WITH new_orders AS (INSERT INTO limit_orders VALUES (412, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66))
DELETE FROM limit_orders RETURNING id / 0;
SELECT * FROM limit_orders WHERE id = 412;
INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69);
-- simple UPDATE -- simple UPDATE