mirror of https://github.com/citusdata/citus.git
change ErrorIfTransactionAccessedPlacementsLocally logic & usages
parent
82f3157750
commit
38a59b4767
|
@ -936,10 +936,12 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
|
|||
ParamListInfo paramListInfo = NULL;
|
||||
|
||||
/*
|
||||
* The code-paths that rely on this function do not know how to execute
|
||||
* commands locally.
|
||||
* If current transaction accessed local placements and task list includes
|
||||
* tasks that should be executed locally (accessing any of the local placements),
|
||||
* then we should error out as it would cause inconsistencies across the
|
||||
* remote connection and local execution
|
||||
*/
|
||||
ErrorIfTransactionAccessedPlacementsLocally();
|
||||
ErrorIfRemoteTaskExecutionOnLocallyAccessedNode(taskList);
|
||||
|
||||
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
||||
{
|
||||
|
|
|
@ -592,6 +592,27 @@ ShouldExecuteTasksLocally(List *taskList)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* AnyTaskAccessesLocalNode returns true if a task within the task list accesses
|
||||
* to the local node.
|
||||
*/
|
||||
bool
|
||||
AnyTaskAccessesLocalNode(List *taskList)
|
||||
{
|
||||
Task *task = NULL;
|
||||
|
||||
foreach_ptr(task, taskList)
|
||||
{
|
||||
if (TaskAccessesLocalNode(task))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TaskAccessesLocalNode returns true if any placements of the task reside on
|
||||
* the node that we're executing the query.
|
||||
|
@ -614,6 +635,21 @@ TaskAccessesLocalNode(Task *task)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfRemoteTaskExecutionOnLocallyAccessedNode errors out if the current
|
||||
* transaction already accessed local placements in the current session(locally)
|
||||
* and any of the given remote tasks will access a local placement.
|
||||
*/
|
||||
void
|
||||
ErrorIfRemoteTaskExecutionOnLocallyAccessedNode(List *remoteTaskList)
|
||||
{
|
||||
if (AnyTaskAccessesLocalNode(remoteTaskList))
|
||||
{
|
||||
ErrorIfTransactionAccessedPlacementsLocally();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfTransactionAccessedPlacementsLocally errors out if a local query
|
||||
* on any shard has already been executed in the same transaction.
|
||||
|
|
|
@ -28,6 +28,7 @@ extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
|
|||
extern bool ShouldExecuteTasksLocally(List *taskList);
|
||||
extern bool AnyTaskAccessesLocalNode(List *taskList);
|
||||
extern bool TaskAccessesLocalNode(Task *task);
|
||||
extern void ErrorIfRemoteTaskExecutionOnLocallyAccessedNode(List *taskList);
|
||||
extern void ErrorIfTransactionAccessedPlacementsLocally(void);
|
||||
extern void DisableLocalExecution(void);
|
||||
|
||||
|
|
Loading…
Reference in New Issue