Don't start a coordinated transaction on workers.

Otherwise transaction hooks of Citus kick in and might cause unwanted errors.
pull/3376/head
Hadi Moshayedi 2020-01-13 16:52:15 -08:00
parent 42c3c03b85
commit 44a2aede16
8 changed files with 214 additions and 66 deletions

View File

@ -267,17 +267,11 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
scanState->tuplestorestate = scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem); tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
TransactionProperties xactProperties = { uint64 rowsInserted = ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE,
.errorOnAnyFailure = true, taskList,
.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED,
.requires2PC = false
};
int64 rowsInserted = ExecuteTaskListExtended(ROW_MODIFY_COMMUTATIVE, taskList,
tupleDescriptor, tupleDescriptor,
scanState->tuplestorestate, scanState->tuplestorestate,
hasReturning, hasReturning);
MaxAdaptiveExecutorPoolSize,
&xactProperties);
executorState->es_processed = rowsInserted; executorState->es_processed = rowsInserted;
} }
@ -981,6 +975,7 @@ RedistributedInsertSelectTaskList(Query *insertSelectQuery,
Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK, Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK,
queryString->data); queryString->data);
modifyTask->dependentTaskList = NIL;
modifyTask->anchorShardId = shardId; modifyTask->anchorShardId = shardId;
modifyTask->taskPlacementList = insertShardPlacementList; modifyTask->taskPlacementList = insertShardPlacementList;
modifyTask->relationShardList = list_make1(relationShard); modifyTask->relationShardList = list_make1(relationShard);
@ -1043,7 +1038,7 @@ IsRedistributablePlan(Plan *selectPlan, bool hasReturning)
/* /*
* Don't use redistribution if only one task. This is to keep the existing * Don't use redistribution if only one task. This is to keep the existing
* behaviour for CTEs that the last step is a read_intermediate_result() * behaviour for CTEs that the last step is a read_intermediate_result()
* call. It doesn't hurt much in other case too. * call. It doesn't hurt much in other cases too.
*/ */
if (list_length(distSelectTaskList) <= 1) if (list_length(distSelectTaskList) <= 1)
{ {

View File

@ -804,7 +804,7 @@ fetch_intermediate_results(PG_FUNCTION_ARGS)
char *remoteHost = text_to_cstring(remoteHostText); char *remoteHost = text_to_cstring(remoteHostText);
int remotePort = PG_GETARG_INT32(2); int remotePort = PG_GETARG_INT32(2);
int connectionFlags = 0; int connectionFlags = FORCE_NEW_CONNECTION;
int resultIndex = 0; int resultIndex = 0;
int64 totalBytesWritten = 0L; int64 totalBytesWritten = 0L;
@ -827,7 +827,7 @@ fetch_intermediate_results(PG_FUNCTION_ARGS)
* Intermediate results will be stored in a directory that is derived * Intermediate results will be stored in a directory that is derived
* from the distributed transaction ID. * from the distributed transaction ID.
*/ */
UseCoordinatedTransaction(); EnsureDistributedTransactionId();
MultiConnection *connection = GetNodeConnection(connectionFlags, remoteHost, MultiConnection *connection = GetNodeConnection(connectionFlags, remoteHost,
remotePort); remotePort);
@ -838,7 +838,8 @@ fetch_intermediate_results(PG_FUNCTION_ARGS)
remoteHost, remotePort))); remoteHost, remotePort)));
} }
RemoteTransactionBeginIfNecessary(connection); StringInfo beginAndSetXactId = BeginAndSetDistributedTransactionIdCommand();
ExecuteCriticalRemoteCommand(connection, beginAndSetXactId->data);
for (resultIndex = 0; resultIndex < resultCount; resultIndex++) for (resultIndex = 0; resultIndex < resultCount; resultIndex++)
{ {
@ -847,7 +848,9 @@ fetch_intermediate_results(PG_FUNCTION_ARGS)
totalBytesWritten += FetchRemoteIntermediateResult(connection, resultId); totalBytesWritten += FetchRemoteIntermediateResult(connection, resultId);
} }
UnclaimConnection(connection); ExecuteCriticalRemoteCommand(connection, "END");
CloseConnection(connection);
PG_RETURN_INT64(totalBytesWritten); PG_RETURN_INT64(totalBytesWritten);
} }

View File

@ -149,7 +149,7 @@ worker_partition_query_result(PG_FUNCTION_ARGS)
* Intermediate results will be stored in a directory that is derived * Intermediate results will be stored in a directory that is derived
* from the distributed transaction ID. * from the distributed transaction ID.
*/ */
UseCoordinatedTransaction(); EnsureDistributedTransactionId();
CreateIntermediateResultsDirectory(); CreateIntermediateResultsDirectory();

View File

@ -58,7 +58,6 @@ void
StartRemoteTransactionBegin(struct MultiConnection *connection) StartRemoteTransactionBegin(struct MultiConnection *connection)
{ {
RemoteTransaction *transaction = &connection->remoteTransaction; RemoteTransaction *transaction = &connection->remoteTransaction;
StringInfo beginAndSetDistributedTransactionId = makeStringInfo();
ListCell *subIdCell = NULL; ListCell *subIdCell = NULL;
Assert(transaction->transactionState == REMOTE_TRANS_NOT_STARTED); Assert(transaction->transactionState == REMOTE_TRANS_NOT_STARTED);
@ -68,28 +67,8 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
transaction->transactionState = REMOTE_TRANS_STARTING; transaction->transactionState = REMOTE_TRANS_STARTING;
/* StringInfo beginAndSetDistributedTransactionId =
* Explicitly specify READ COMMITTED, the default on the remote BeginAndSetDistributedTransactionIdCommand();
* side might have been changed, and that would cause problematic
* behaviour.
*/
appendStringInfoString(beginAndSetDistributedTransactionId,
"BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;");
/*
* Append BEGIN and assign_distributed_transaction_id() statements into a single command
* and send both in one step. The reason is purely performance, we don't want
* seperate roundtrips for these two statements.
*/
DistributedTransactionId *distributedTransactionId =
GetCurrentDistributedTransactionId();
const char *timestamp = timestamptz_to_str(distributedTransactionId->timestamp);
appendStringInfo(beginAndSetDistributedTransactionId,
"SELECT assign_distributed_transaction_id(%d, " UINT64_FORMAT
", '%s');",
distributedTransactionId->initiatorNodeIdentifier,
distributedTransactionId->transactionNumber,
timestamp);
/* append context for in-progress SAVEPOINTs for this transaction */ /* append context for in-progress SAVEPOINTs for this transaction */
List *activeSubXacts = ActiveSubXactContexts(); List *activeSubXacts = ActiveSubXactContexts();
@ -129,6 +108,42 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
} }
/*
* BeginAndSetDistributedTransactionIdCommand returns a command which starts
* a transaction and assigns the current distributed transaction id.
*/
StringInfo
BeginAndSetDistributedTransactionIdCommand(void)
{
StringInfo beginAndSetDistributedTransactionId = makeStringInfo();
/*
* Explicitly specify READ COMMITTED, the default on the remote
* side might have been changed, and that would cause problematic
* behaviour.
*/
appendStringInfoString(beginAndSetDistributedTransactionId,
"BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;");
/*
* Append BEGIN and assign_distributed_transaction_id() statements into a single command
* and send both in one step. The reason is purely performance, we don't want
* seperate roundtrips for these two statements.
*/
DistributedTransactionId *distributedTransactionId =
GetCurrentDistributedTransactionId();
const char *timestamp = timestamptz_to_str(distributedTransactionId->timestamp);
appendStringInfo(beginAndSetDistributedTransactionId,
"SELECT assign_distributed_transaction_id(%d, " UINT64_FORMAT
", '%s');",
distributedTransactionId->initiatorNodeIdentifier,
distributedTransactionId->transactionNumber,
timestamp);
return beginAndSetDistributedTransactionId;
}
/* /*
* FinishRemoteTransactionBegin finishes the work StartRemoteTransactionBegin * FinishRemoteTransactionBegin finishes the work StartRemoteTransactionBegin
* initiated. It blocks if necessary (i.e. if PQisBusy() would return true). * initiated. It blocks if necessary (i.e. if PQisBusy() would return true).

View File

@ -117,8 +117,7 @@ static bool MaybeExecutingUDF(void);
void void
UseCoordinatedTransaction(void) UseCoordinatedTransaction(void)
{ {
if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED || if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED)
CurrentCoordinatedTransactionState == COORD_TRANS_STARTED_ON_WORKER)
{ {
return; return;
} }
@ -130,21 +129,23 @@ UseCoordinatedTransaction(void)
} }
CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; CurrentCoordinatedTransactionState = COORD_TRANS_STARTED;
AssignDistributedTransactionId();
}
/* /*
* This might be part of bigger distributed transaction originating from * EnsureDistributedTransactionId makes sure that the current transaction
* another node, in which case transaction id has already been assigned * has a distributed transaction id. It is either assigned by a previous
* by a assign_distributed_transaction_id() call. * call of assign_distributed_transaction_id(), or by starting a coordinated
* transaction.
*/ */
void
EnsureDistributedTransactionId(void)
{
DistributedTransactionId *transactionId = GetCurrentDistributedTransactionId(); DistributedTransactionId *transactionId = GetCurrentDistributedTransactionId();
if (transactionId->transactionNumber == 0) if (transactionId->transactionNumber == 0)
{ {
CurrentCoordinatedTransactionState = COORD_TRANS_STARTED_ON_WORKER; UseCoordinatedTransaction();
AssignDistributedTransactionId();
}
else
{
CurrentCoordinatedTransactionState = COORD_TRANS_STARTED;
} }
} }

View File

@ -35,13 +35,6 @@ typedef enum CoordinatedTransactionState
/* no coordinated transaction in progress, but connections established */ /* no coordinated transaction in progress, but connections established */
COORD_TRANS_IDLE, COORD_TRANS_IDLE,
/*
* Coordinated transaction was initiated by coordinator, but the worker also
* needs to start a coordinated transaction to be able to send commands to
* other workers.
*/
COORD_TRANS_STARTED_ON_WORKER,
/* coordinated transaction in progress */ /* coordinated transaction in progress */
COORD_TRANS_STARTED, COORD_TRANS_STARTED,
@ -117,6 +110,7 @@ extern void UseCoordinatedTransaction(void);
extern bool InCoordinatedTransaction(void); extern bool InCoordinatedTransaction(void);
extern void CoordinatedTransactionUse2PC(void); extern void CoordinatedTransactionUse2PC(void);
extern bool IsMultiStatementTransaction(void); extern bool IsMultiStatementTransaction(void);
extern void EnsureDistributedTransactionId(void);
/* initialization function(s) */ /* initialization function(s) */
extern void InitializeTransactionManagement(void); extern void InitializeTransactionManagement(void);
@ -124,6 +118,7 @@ extern void InitializeTransactionManagement(void);
/* other functions */ /* other functions */
extern List * ActiveSubXacts(void); extern List * ActiveSubXacts(void);
extern List * ActiveSubXactContexts(void); extern List * ActiveSubXactContexts(void);
extern StringInfo BeginAndSetDistributedTransactionIdCommand(void);
#endif /* TRANSACTION_MANAGMENT_H */ #endif /* TRANSACTION_MANAGMENT_H */

View File

@ -269,8 +269,8 @@ SELECT * FROM target_table ORDER BY a;
(4 rows) (4 rows)
-- --
-- worker queries have more columns than necessary. ExpandWorkerTargetEntry() might -- worker queries can have more columns than necessary. ExpandWorkerTargetEntry()
-- add additional columns to the target list. -- might add additional columns to the target list.
-- --
TRUNCATE target_table; TRUNCATE target_table;
\set VERBOSITY TERSE \set VERBOSITY TERSE
@ -304,6 +304,107 @@ SELECT * FROM target_table ORDER BY a;
-1 | {-1} -1 | {-1}
(4 rows) (4 rows)
DROP TABLE source_table, target_table; --
-- repartitioned INSERT/SELECT followed by other DML in stame transaction
--
-- case 1. followed by DELETE
TRUNCATE target_table;
BEGIN;
INSERT INTO target_table SELECT mapped_key, c FROM source_table;
SELECT * FROM target_table ORDER BY a;
a | b
---------------------------------------------------------------------
-4 | {3}
-3 | {}
-2 | {4,6}
-1 | {1,2,3}
(4 rows)
DELETE FROM target_table;
END;
SELECT * FROM target_table ORDER BY a;
a | b
---------------------------------------------------------------------
(0 rows)
-- case 2. followed by UPDATE
TRUNCATE target_table;
BEGIN;
INSERT INTO target_table SELECT mapped_key, c FROM source_table;
SELECT * FROM target_table ORDER BY a;
a | b
---------------------------------------------------------------------
-4 | {3}
-3 | {}
-2 | {4,6}
-1 | {1,2,3}
(4 rows)
UPDATE target_table SET b=array_append(b, a);
END;
SELECT * FROM target_table ORDER BY a;
a | b
---------------------------------------------------------------------
-4 | {3,-4}
-3 | {-3}
-2 | {4,6,-2}
-1 | {1,2,3,-1}
(4 rows)
-- case 3. followed by multi-row INSERT
TRUNCATE target_table;
BEGIN;
INSERT INTO target_table SELECT mapped_key, c FROM source_table;
SELECT * FROM target_table ORDER BY a;
a | b
---------------------------------------------------------------------
-4 | {3}
-3 | {}
-2 | {4,6}
-1 | {1,2,3}
(4 rows)
INSERT INTO target_table VALUES (-5, ARRAY[10,11]), (-6, ARRAY[11,12]), (-7, ARRAY[999]);
END;
SELECT * FROM target_table ORDER BY a;
a | b
---------------------------------------------------------------------
-7 | {999}
-6 | {11,12}
-5 | {10,11}
-4 | {3}
-3 | {}
-2 | {4,6}
-1 | {1,2,3}
(7 rows)
-- case 4. followed by distributed INSERT/SELECT
TRUNCATE target_table;
BEGIN;
INSERT INTO target_table SELECT mapped_key, c FROM source_table;
SELECT * FROM target_table ORDER BY a;
a | b
---------------------------------------------------------------------
-4 | {3}
-3 | {}
-2 | {4,6}
-1 | {1,2,3}
(4 rows)
INSERT INTO target_table SELECT * FROM target_table;
END;
SELECT * FROM target_table ORDER BY a;
a | b
---------------------------------------------------------------------
-4 | {3}
-4 | {3}
-3 | {}
-3 | {}
-2 | {4,6}
-2 | {4,6}
-1 | {1,2,3}
-1 | {1,2,3}
(8 rows)
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA insert_select_repartition CASCADE; DROP SCHEMA insert_select_repartition CASCADE;

View File

@ -124,8 +124,8 @@ RESET client_min_messages;
SELECT * FROM target_table ORDER BY a; SELECT * FROM target_table ORDER BY a;
-- --
-- worker queries have more columns than necessary. ExpandWorkerTargetEntry() might -- worker queries can have more columns than necessary. ExpandWorkerTargetEntry()
-- add additional columns to the target list. -- might add additional columns to the target list.
-- --
TRUNCATE target_table; TRUNCATE target_table;
\set VERBOSITY TERSE \set VERBOSITY TERSE
@ -144,7 +144,45 @@ RESET client_min_messages;
SELECT * FROM target_table ORDER BY a; SELECT * FROM target_table ORDER BY a;
DROP TABLE source_table, target_table; --
-- repartitioned INSERT/SELECT followed by other DML in stame transaction
--
-- case 1. followed by DELETE
TRUNCATE target_table;
BEGIN;
INSERT INTO target_table SELECT mapped_key, c FROM source_table;
SELECT * FROM target_table ORDER BY a;
DELETE FROM target_table;
END;
SELECT * FROM target_table ORDER BY a;
-- case 2. followed by UPDATE
TRUNCATE target_table;
BEGIN;
INSERT INTO target_table SELECT mapped_key, c FROM source_table;
SELECT * FROM target_table ORDER BY a;
UPDATE target_table SET b=array_append(b, a);
END;
SELECT * FROM target_table ORDER BY a;
-- case 3. followed by multi-row INSERT
TRUNCATE target_table;
BEGIN;
INSERT INTO target_table SELECT mapped_key, c FROM source_table;
SELECT * FROM target_table ORDER BY a;
INSERT INTO target_table VALUES (-5, ARRAY[10,11]), (-6, ARRAY[11,12]), (-7, ARRAY[999]);
END;
SELECT * FROM target_table ORDER BY a;
-- case 4. followed by distributed INSERT/SELECT
TRUNCATE target_table;
BEGIN;
INSERT INTO target_table SELECT mapped_key, c FROM source_table;
SELECT * FROM target_table ORDER BY a;
INSERT INTO target_table SELECT * FROM target_table;
END;
SELECT * FROM target_table ORDER BY a;
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA insert_select_repartition CASCADE; DROP SCHEMA insert_select_repartition CASCADE;