mirror of https://github.com/citusdata/citus.git
Send BEGIN in the real-time executor when in a transaction
parent
581c8c02cc
commit
d6dd0b3a81
|
@ -90,6 +90,7 @@ MultiRealTimeExecute(Job *job)
|
|||
|
||||
workerNodeList = ActiveReadableNodeList();
|
||||
workerHash = WorkerHash(workerHashName, workerNodeList);
|
||||
|
||||
if (IsTransactionBlock())
|
||||
{
|
||||
BeginOrContinueCoordinatedTransaction();
|
||||
|
@ -318,8 +319,16 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
|||
if (pollStatus == CLIENT_CONNECTION_READY)
|
||||
{
|
||||
taskExecution->dataFetchTaskIndex = -1;
|
||||
|
||||
if (InCoordinatedTransaction())
|
||||
{
|
||||
taskStatusArray[currentIndex] = EXEC_BEGIN_START;
|
||||
}
|
||||
else
|
||||
{
|
||||
taskStatusArray[currentIndex] = EXEC_FETCH_TASK_LOOP;
|
||||
}
|
||||
}
|
||||
else if (pollStatus == CLIENT_CONNECTION_BUSY)
|
||||
{
|
||||
/* immediately retry */
|
||||
|
@ -397,6 +406,74 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
|||
break;
|
||||
}
|
||||
|
||||
case EXEC_BEGIN_START:
|
||||
{
|
||||
int32 connectionId = connectionIdArray[currentIndex];
|
||||
MultiConnection *connection = MultiClientGetConnection(connectionId);
|
||||
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||
|
||||
/*
|
||||
* If BEGIN was not yet sent on this connection, send it now.
|
||||
* Otherwise, continue with the task.
|
||||
*/
|
||||
if (transaction->transactionState == REMOTE_TRANS_INVALID)
|
||||
{
|
||||
StartRemoteTransactionBegin(connection);
|
||||
taskStatusArray[currentIndex] = EXEC_BEGIN_RUNNING;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* We skip data fetches when in a distributed transaction since
|
||||
* they cannot be performed in a transactional way (e.g. would
|
||||
* trigger deadlock detection).
|
||||
*/
|
||||
taskStatusArray[currentIndex] = EXEC_COMPUTE_TASK_START;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
case EXEC_BEGIN_RUNNING:
|
||||
{
|
||||
int32 connectionId = connectionIdArray[currentIndex];
|
||||
MultiConnection *connection = MultiClientGetConnection(connectionId);
|
||||
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||
|
||||
/* check if query results are in progress or unavailable */
|
||||
ResultStatus resultStatus = MultiClientResultStatus(connectionId);
|
||||
if (resultStatus == CLIENT_RESULT_BUSY)
|
||||
{
|
||||
*executionStatus = TASK_STATUS_SOCKET_READ;
|
||||
taskStatusArray[currentIndex] = EXEC_BEGIN_RUNNING;
|
||||
break;
|
||||
}
|
||||
else if (resultStatus == CLIENT_RESULT_UNAVAILABLE)
|
||||
{
|
||||
taskStatusArray[currentIndex] = EXEC_TASK_FAILED;
|
||||
break;
|
||||
}
|
||||
|
||||
/* read the results from BEGIN and update the transaction state */
|
||||
FinishRemoteTransactionBegin(connection);
|
||||
|
||||
if (transaction->transactionFailed)
|
||||
{
|
||||
taskStatusArray[currentIndex] = EXEC_TASK_FAILED;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* We skip data fetches when in a distributed transaction since
|
||||
* they cannot be performed in a transactional way (e.g. would
|
||||
* trigger deadlock detection).
|
||||
*/
|
||||
taskStatusArray[currentIndex] = EXEC_COMPUTE_TASK_START;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
case EXEC_FETCH_TASK_LOOP:
|
||||
{
|
||||
List *dataFetchTaskList = task->dependedTaskList;
|
||||
|
|
|
@ -58,7 +58,11 @@ typedef enum
|
|||
EXEC_TASK_TRACKER_RETRY = 13,
|
||||
EXEC_TASK_TRACKER_FAILED = 14,
|
||||
EXEC_SOURCE_TASK_TRACKER_RETRY = 15,
|
||||
EXEC_SOURCE_TASK_TRACKER_FAILED = 16
|
||||
EXEC_SOURCE_TASK_TRACKER_FAILED = 16,
|
||||
|
||||
/* transactional operations */
|
||||
EXEC_BEGIN_START = 20,
|
||||
EXEC_BEGIN_RUNNING = 21
|
||||
} TaskExecStatus;
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue