From 44a2aede16ee762263d626b3cb374f7a0e6223d5 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Mon, 13 Jan 2020 16:52:15 -0800 Subject: [PATCH] Don't start a coordinated transaction on workers. Otherwise transaction hooks of Citus kick in and might cause unwanted errors. --- .../executor/insert_select_executor.c | 19 ++-- .../executor/intermediate_results.c | 11 +- .../partitioned_intermediate_results.c | 2 +- .../transaction/remote_transaction.c | 61 ++++++---- .../transaction/transaction_management.c | 27 ++--- .../distributed/transaction_management.h | 9 +- .../expected/insert_select_repartition.out | 107 +++++++++++++++++- .../regress/sql/insert_select_repartition.sql | 44 ++++++- 8 files changed, 214 insertions(+), 66 deletions(-) diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 84ddca0d0..97b31c9cf 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -267,17 +267,11 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - TransactionProperties xactProperties = { - .errorOnAnyFailure = true, - .useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED, - .requires2PC = false - }; - int64 rowsInserted = ExecuteTaskListExtended(ROW_MODIFY_COMMUTATIVE, taskList, - tupleDescriptor, - scanState->tuplestorestate, - hasReturning, - MaxAdaptiveExecutorPoolSize, - &xactProperties); + uint64 rowsInserted = ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, + taskList, + tupleDescriptor, + scanState->tuplestorestate, + hasReturning); executorState->es_processed = rowsInserted; } @@ -981,6 +975,7 @@ RedistributedInsertSelectTaskList(Query *insertSelectQuery, Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK, queryString->data); + modifyTask->dependentTaskList = NIL; modifyTask->anchorShardId = shardId; modifyTask->taskPlacementList = insertShardPlacementList; modifyTask->relationShardList = list_make1(relationShard); @@ -1043,7 +1038,7 @@ IsRedistributablePlan(Plan *selectPlan, bool hasReturning) /* * Don't use redistribution if only one task. This is to keep the existing * behaviour for CTEs that the last step is a read_intermediate_result() - * call. It doesn't hurt much in other case too. + * call. It doesn't hurt much in other cases too. */ if (list_length(distSelectTaskList) <= 1) { diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 4b7197363..c1b06e30b 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -804,7 +804,7 @@ fetch_intermediate_results(PG_FUNCTION_ARGS) char *remoteHost = text_to_cstring(remoteHostText); int remotePort = PG_GETARG_INT32(2); - int connectionFlags = 0; + int connectionFlags = FORCE_NEW_CONNECTION; int resultIndex = 0; int64 totalBytesWritten = 0L; @@ -827,7 +827,7 @@ fetch_intermediate_results(PG_FUNCTION_ARGS) * Intermediate results will be stored in a directory that is derived * from the distributed transaction ID. */ - UseCoordinatedTransaction(); + EnsureDistributedTransactionId(); MultiConnection *connection = GetNodeConnection(connectionFlags, remoteHost, remotePort); @@ -838,7 +838,8 @@ fetch_intermediate_results(PG_FUNCTION_ARGS) remoteHost, remotePort))); } - RemoteTransactionBeginIfNecessary(connection); + StringInfo beginAndSetXactId = BeginAndSetDistributedTransactionIdCommand(); + ExecuteCriticalRemoteCommand(connection, beginAndSetXactId->data); for (resultIndex = 0; resultIndex < resultCount; resultIndex++) { @@ -847,7 +848,9 @@ fetch_intermediate_results(PG_FUNCTION_ARGS) totalBytesWritten += FetchRemoteIntermediateResult(connection, resultId); } - UnclaimConnection(connection); + ExecuteCriticalRemoteCommand(connection, "END"); + + CloseConnection(connection); PG_RETURN_INT64(totalBytesWritten); } diff --git a/src/backend/distributed/executor/partitioned_intermediate_results.c b/src/backend/distributed/executor/partitioned_intermediate_results.c index a111a68fb..d1083f8fa 100644 --- a/src/backend/distributed/executor/partitioned_intermediate_results.c +++ b/src/backend/distributed/executor/partitioned_intermediate_results.c @@ -149,7 +149,7 @@ worker_partition_query_result(PG_FUNCTION_ARGS) * Intermediate results will be stored in a directory that is derived * from the distributed transaction ID. */ - UseCoordinatedTransaction(); + EnsureDistributedTransactionId(); CreateIntermediateResultsDirectory(); diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 8ca78791d..34e1cb15f 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -58,7 +58,6 @@ void StartRemoteTransactionBegin(struct MultiConnection *connection) { RemoteTransaction *transaction = &connection->remoteTransaction; - StringInfo beginAndSetDistributedTransactionId = makeStringInfo(); ListCell *subIdCell = NULL; Assert(transaction->transactionState == REMOTE_TRANS_NOT_STARTED); @@ -68,28 +67,8 @@ StartRemoteTransactionBegin(struct MultiConnection *connection) transaction->transactionState = REMOTE_TRANS_STARTING; - /* - * Explicitly specify READ COMMITTED, the default on the remote - * side might have been changed, and that would cause problematic - * behaviour. - */ - appendStringInfoString(beginAndSetDistributedTransactionId, - "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;"); - - /* - * Append BEGIN and assign_distributed_transaction_id() statements into a single command - * and send both in one step. The reason is purely performance, we don't want - * seperate roundtrips for these two statements. - */ - DistributedTransactionId *distributedTransactionId = - GetCurrentDistributedTransactionId(); - const char *timestamp = timestamptz_to_str(distributedTransactionId->timestamp); - appendStringInfo(beginAndSetDistributedTransactionId, - "SELECT assign_distributed_transaction_id(%d, " UINT64_FORMAT - ", '%s');", - distributedTransactionId->initiatorNodeIdentifier, - distributedTransactionId->transactionNumber, - timestamp); + StringInfo beginAndSetDistributedTransactionId = + BeginAndSetDistributedTransactionIdCommand(); /* append context for in-progress SAVEPOINTs for this transaction */ List *activeSubXacts = ActiveSubXactContexts(); @@ -129,6 +108,42 @@ StartRemoteTransactionBegin(struct MultiConnection *connection) } +/* + * BeginAndSetDistributedTransactionIdCommand returns a command which starts + * a transaction and assigns the current distributed transaction id. + */ +StringInfo +BeginAndSetDistributedTransactionIdCommand(void) +{ + StringInfo beginAndSetDistributedTransactionId = makeStringInfo(); + + /* + * Explicitly specify READ COMMITTED, the default on the remote + * side might have been changed, and that would cause problematic + * behaviour. + */ + appendStringInfoString(beginAndSetDistributedTransactionId, + "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;"); + + /* + * Append BEGIN and assign_distributed_transaction_id() statements into a single command + * and send both in one step. The reason is purely performance, we don't want + * seperate roundtrips for these two statements. + */ + DistributedTransactionId *distributedTransactionId = + GetCurrentDistributedTransactionId(); + const char *timestamp = timestamptz_to_str(distributedTransactionId->timestamp); + appendStringInfo(beginAndSetDistributedTransactionId, + "SELECT assign_distributed_transaction_id(%d, " UINT64_FORMAT + ", '%s');", + distributedTransactionId->initiatorNodeIdentifier, + distributedTransactionId->transactionNumber, + timestamp); + + return beginAndSetDistributedTransactionId; +} + + /* * FinishRemoteTransactionBegin finishes the work StartRemoteTransactionBegin * initiated. It blocks if necessary (i.e. if PQisBusy() would return true). diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index e86e5ac55..b94057da6 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -117,8 +117,7 @@ static bool MaybeExecutingUDF(void); void UseCoordinatedTransaction(void) { - if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED || - CurrentCoordinatedTransactionState == COORD_TRANS_STARTED_ON_WORKER) + if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED) { return; } @@ -130,21 +129,23 @@ UseCoordinatedTransaction(void) } CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; + AssignDistributedTransactionId(); +} - /* - * This might be part of bigger distributed transaction originating from - * another node, in which case transaction id has already been assigned - * by a assign_distributed_transaction_id() call. - */ + +/* + * EnsureDistributedTransactionId makes sure that the current transaction + * has a distributed transaction id. It is either assigned by a previous + * call of assign_distributed_transaction_id(), or by starting a coordinated + * transaction. + */ +void +EnsureDistributedTransactionId(void) +{ DistributedTransactionId *transactionId = GetCurrentDistributedTransactionId(); if (transactionId->transactionNumber == 0) { - CurrentCoordinatedTransactionState = COORD_TRANS_STARTED_ON_WORKER; - AssignDistributedTransactionId(); - } - else - { - CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; + UseCoordinatedTransaction(); } } diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index f1e9ece90..43c1e4d02 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -35,13 +35,6 @@ typedef enum CoordinatedTransactionState /* no coordinated transaction in progress, but connections established */ COORD_TRANS_IDLE, - /* - * Coordinated transaction was initiated by coordinator, but the worker also - * needs to start a coordinated transaction to be able to send commands to - * other workers. - */ - COORD_TRANS_STARTED_ON_WORKER, - /* coordinated transaction in progress */ COORD_TRANS_STARTED, @@ -117,6 +110,7 @@ extern void UseCoordinatedTransaction(void); extern bool InCoordinatedTransaction(void); extern void CoordinatedTransactionUse2PC(void); extern bool IsMultiStatementTransaction(void); +extern void EnsureDistributedTransactionId(void); /* initialization function(s) */ extern void InitializeTransactionManagement(void); @@ -124,6 +118,7 @@ extern void InitializeTransactionManagement(void); /* other functions */ extern List * ActiveSubXacts(void); extern List * ActiveSubXactContexts(void); +extern StringInfo BeginAndSetDistributedTransactionIdCommand(void); #endif /* TRANSACTION_MANAGMENT_H */ diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index af45576b5..68d1f9271 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -269,8 +269,8 @@ SELECT * FROM target_table ORDER BY a; (4 rows) -- --- worker queries have more columns than necessary. ExpandWorkerTargetEntry() might --- add additional columns to the target list. +-- worker queries can have more columns than necessary. ExpandWorkerTargetEntry() +-- might add additional columns to the target list. -- TRUNCATE target_table; \set VERBOSITY TERSE @@ -304,6 +304,107 @@ SELECT * FROM target_table ORDER BY a; -1 | {-1} (4 rows) -DROP TABLE source_table, target_table; +-- +-- repartitioned INSERT/SELECT followed by other DML in stame transaction +-- +-- case 1. followed by DELETE +TRUNCATE target_table; +BEGIN; +INSERT INTO target_table SELECT mapped_key, c FROM source_table; +SELECT * FROM target_table ORDER BY a; + a | b +--------------------------------------------------------------------- + -4 | {3} + -3 | {} + -2 | {4,6} + -1 | {1,2,3} +(4 rows) + +DELETE FROM target_table; +END; +SELECT * FROM target_table ORDER BY a; + a | b +--------------------------------------------------------------------- +(0 rows) + +-- case 2. followed by UPDATE +TRUNCATE target_table; +BEGIN; +INSERT INTO target_table SELECT mapped_key, c FROM source_table; +SELECT * FROM target_table ORDER BY a; + a | b +--------------------------------------------------------------------- + -4 | {3} + -3 | {} + -2 | {4,6} + -1 | {1,2,3} +(4 rows) + +UPDATE target_table SET b=array_append(b, a); +END; +SELECT * FROM target_table ORDER BY a; + a | b +--------------------------------------------------------------------- + -4 | {3,-4} + -3 | {-3} + -2 | {4,6,-2} + -1 | {1,2,3,-1} +(4 rows) + +-- case 3. followed by multi-row INSERT +TRUNCATE target_table; +BEGIN; +INSERT INTO target_table SELECT mapped_key, c FROM source_table; +SELECT * FROM target_table ORDER BY a; + a | b +--------------------------------------------------------------------- + -4 | {3} + -3 | {} + -2 | {4,6} + -1 | {1,2,3} +(4 rows) + +INSERT INTO target_table VALUES (-5, ARRAY[10,11]), (-6, ARRAY[11,12]), (-7, ARRAY[999]); +END; +SELECT * FROM target_table ORDER BY a; + a | b +--------------------------------------------------------------------- + -7 | {999} + -6 | {11,12} + -5 | {10,11} + -4 | {3} + -3 | {} + -2 | {4,6} + -1 | {1,2,3} +(7 rows) + +-- case 4. followed by distributed INSERT/SELECT +TRUNCATE target_table; +BEGIN; +INSERT INTO target_table SELECT mapped_key, c FROM source_table; +SELECT * FROM target_table ORDER BY a; + a | b +--------------------------------------------------------------------- + -4 | {3} + -3 | {} + -2 | {4,6} + -1 | {1,2,3} +(4 rows) + +INSERT INTO target_table SELECT * FROM target_table; +END; +SELECT * FROM target_table ORDER BY a; + a | b +--------------------------------------------------------------------- + -4 | {3} + -4 | {3} + -3 | {} + -3 | {} + -2 | {4,6} + -2 | {4,6} + -1 | {1,2,3} + -1 | {1,2,3} +(8 rows) + SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index ef60350a9..db6cee7dd 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -124,8 +124,8 @@ RESET client_min_messages; SELECT * FROM target_table ORDER BY a; -- --- worker queries have more columns than necessary. ExpandWorkerTargetEntry() might --- add additional columns to the target list. +-- worker queries can have more columns than necessary. ExpandWorkerTargetEntry() +-- might add additional columns to the target list. -- TRUNCATE target_table; \set VERBOSITY TERSE @@ -144,7 +144,45 @@ RESET client_min_messages; SELECT * FROM target_table ORDER BY a; -DROP TABLE source_table, target_table; +-- +-- repartitioned INSERT/SELECT followed by other DML in stame transaction +-- + +-- case 1. followed by DELETE +TRUNCATE target_table; +BEGIN; +INSERT INTO target_table SELECT mapped_key, c FROM source_table; +SELECT * FROM target_table ORDER BY a; +DELETE FROM target_table; +END; +SELECT * FROM target_table ORDER BY a; + +-- case 2. followed by UPDATE +TRUNCATE target_table; +BEGIN; +INSERT INTO target_table SELECT mapped_key, c FROM source_table; +SELECT * FROM target_table ORDER BY a; +UPDATE target_table SET b=array_append(b, a); +END; +SELECT * FROM target_table ORDER BY a; + +-- case 3. followed by multi-row INSERT +TRUNCATE target_table; +BEGIN; +INSERT INTO target_table SELECT mapped_key, c FROM source_table; +SELECT * FROM target_table ORDER BY a; +INSERT INTO target_table VALUES (-5, ARRAY[10,11]), (-6, ARRAY[11,12]), (-7, ARRAY[999]); +END; +SELECT * FROM target_table ORDER BY a; + +-- case 4. followed by distributed INSERT/SELECT +TRUNCATE target_table; +BEGIN; +INSERT INTO target_table SELECT mapped_key, c FROM source_table; +SELECT * FROM target_table ORDER BY a; +INSERT INTO target_table SELECT * FROM target_table; +END; +SELECT * FROM target_table ORDER BY a; SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE;