diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 63a8dcccb..4fd4c932e 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -90,6 +90,7 @@ MultiRealTimeExecute(Job *job) workerNodeList = ActiveReadableNodeList(); workerHash = WorkerHash(workerHashName, workerNodeList); + if (IsTransactionBlock()) { BeginOrContinueCoordinatedTransaction(); @@ -318,7 +319,15 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, if (pollStatus == CLIENT_CONNECTION_READY) { taskExecution->dataFetchTaskIndex = -1; - taskStatusArray[currentIndex] = EXEC_FETCH_TASK_LOOP; + + if (InCoordinatedTransaction()) + { + taskStatusArray[currentIndex] = EXEC_BEGIN_START; + } + else + { + taskStatusArray[currentIndex] = EXEC_FETCH_TASK_LOOP; + } } else if (pollStatus == CLIENT_CONNECTION_BUSY) { @@ -397,6 +406,74 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, break; } + case EXEC_BEGIN_START: + { + int32 connectionId = connectionIdArray[currentIndex]; + MultiConnection *connection = MultiClientGetConnection(connectionId); + RemoteTransaction *transaction = &connection->remoteTransaction; + + /* + * If BEGIN was not yet sent on this connection, send it now. + * Otherwise, continue with the task. + */ + if (transaction->transactionState == REMOTE_TRANS_INVALID) + { + StartRemoteTransactionBegin(connection); + taskStatusArray[currentIndex] = EXEC_BEGIN_RUNNING; + break; + } + else + { + /* + * We skip data fetches when in a distributed transaction since + * they cannot be performed in a transactional way (e.g. would + * trigger deadlock detection). + */ + taskStatusArray[currentIndex] = EXEC_COMPUTE_TASK_START; + break; + } + } + + case EXEC_BEGIN_RUNNING: + { + int32 connectionId = connectionIdArray[currentIndex]; + MultiConnection *connection = MultiClientGetConnection(connectionId); + RemoteTransaction *transaction = &connection->remoteTransaction; + + /* check if query results are in progress or unavailable */ + ResultStatus resultStatus = MultiClientResultStatus(connectionId); + if (resultStatus == CLIENT_RESULT_BUSY) + { + *executionStatus = TASK_STATUS_SOCKET_READ; + taskStatusArray[currentIndex] = EXEC_BEGIN_RUNNING; + break; + } + else if (resultStatus == CLIENT_RESULT_UNAVAILABLE) + { + taskStatusArray[currentIndex] = EXEC_TASK_FAILED; + break; + } + + /* read the results from BEGIN and update the transaction state */ + FinishRemoteTransactionBegin(connection); + + if (transaction->transactionFailed) + { + taskStatusArray[currentIndex] = EXEC_TASK_FAILED; + break; + } + else + { + /* + * We skip data fetches when in a distributed transaction since + * they cannot be performed in a transactional way (e.g. would + * trigger deadlock detection). + */ + taskStatusArray[currentIndex] = EXEC_COMPUTE_TASK_START; + break; + } + } + case EXEC_FETCH_TASK_LOOP: { List *dataFetchTaskList = task->dependedTaskList; diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index cab5b35a1..c062bd84a 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -58,7 +58,11 @@ typedef enum EXEC_TASK_TRACKER_RETRY = 13, EXEC_TASK_TRACKER_FAILED = 14, EXEC_SOURCE_TASK_TRACKER_RETRY = 15, - EXEC_SOURCE_TASK_TRACKER_FAILED = 16 + EXEC_SOURCE_TASK_TRACKER_FAILED = 16, + + /* transactional operations */ + EXEC_BEGIN_START = 20, + EXEC_BEGIN_RUNNING = 21 } TaskExecStatus;