mirror of https://github.com/citusdata/citus.git
explicitly return false if transaction connected to local node (#3715)
* explicitly return false if transaction connected to local node * not set TransactionConnectedToLocalGroup if we are writing to a file We use TransactionConnectedToLocalGroup to prevent local execution from happening as that might cause visibility problems. As files are visible to all transactions, we shouldn't set this variable if we are writing to a file.pull/3666/head
parent
225adbc7ac
commit
0430b568be
|
@ -246,7 +246,7 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash,
|
||||||
static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash,
|
static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash,
|
||||||
HTAB *connectionStateHash, bool stopOnFailure,
|
HTAB *connectionStateHash, bool stopOnFailure,
|
||||||
bool *found, bool shouldUseLocalCopy, CopyOutState
|
bool *found, bool shouldUseLocalCopy, CopyOutState
|
||||||
copyOutState);
|
copyOutState, bool isCopyToIntermediateFile);
|
||||||
static MultiConnection * CopyGetPlacementConnection(ShardPlacement *placement,
|
static MultiConnection * CopyGetPlacementConnection(ShardPlacement *placement,
|
||||||
bool stopOnFailure);
|
bool stopOnFailure);
|
||||||
static List * ConnectionStateList(HTAB *connectionStateHash);
|
static List * ConnectionStateList(HTAB *connectionStateHash);
|
||||||
|
@ -254,7 +254,8 @@ static void InitializeCopyShardState(CopyShardState *shardState,
|
||||||
HTAB *connectionStateHash,
|
HTAB *connectionStateHash,
|
||||||
uint64 shardId, bool stopOnFailure, bool
|
uint64 shardId, bool stopOnFailure, bool
|
||||||
canUseLocalCopy,
|
canUseLocalCopy,
|
||||||
CopyOutState copyOutState);
|
CopyOutState copyOutState,
|
||||||
|
bool isCopyToIntermediateFile);
|
||||||
static void StartPlacementStateCopyCommand(CopyPlacementState *placementState,
|
static void StartPlacementStateCopyCommand(CopyPlacementState *placementState,
|
||||||
CopyStmt *copyStatement,
|
CopyStmt *copyStatement,
|
||||||
CopyOutState copyOutState);
|
CopyOutState copyOutState);
|
||||||
|
@ -2292,12 +2293,14 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
|
||||||
/* connections hash is kept in memory context */
|
/* connections hash is kept in memory context */
|
||||||
MemoryContextSwitchTo(copyDest->memoryContext);
|
MemoryContextSwitchTo(copyDest->memoryContext);
|
||||||
|
|
||||||
|
bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL;
|
||||||
CopyShardState *shardState = GetShardState(shardId, copyDest->shardStateHash,
|
CopyShardState *shardState = GetShardState(shardId, copyDest->shardStateHash,
|
||||||
copyDest->connectionStateHash,
|
copyDest->connectionStateHash,
|
||||||
stopOnFailure,
|
stopOnFailure,
|
||||||
&cachedShardStateFound,
|
&cachedShardStateFound,
|
||||||
copyDest->shouldUseLocalCopy,
|
copyDest->shouldUseLocalCopy,
|
||||||
copyDest->copyOutState);
|
copyDest->copyOutState,
|
||||||
|
isIntermediateResult);
|
||||||
if (!cachedShardStateFound)
|
if (!cachedShardStateFound)
|
||||||
{
|
{
|
||||||
firstTupleInShard = true;
|
firstTupleInShard = true;
|
||||||
|
@ -3219,7 +3222,8 @@ ConnectionStateList(HTAB *connectionStateHash)
|
||||||
static CopyShardState *
|
static CopyShardState *
|
||||||
GetShardState(uint64 shardId, HTAB *shardStateHash,
|
GetShardState(uint64 shardId, HTAB *shardStateHash,
|
||||||
HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool
|
HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool
|
||||||
shouldUseLocalCopy, CopyOutState copyOutState)
|
shouldUseLocalCopy, CopyOutState copyOutState, bool
|
||||||
|
isCopyToIntermediateFile)
|
||||||
{
|
{
|
||||||
CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId,
|
CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId,
|
||||||
HASH_ENTER, found);
|
HASH_ENTER, found);
|
||||||
|
@ -3227,7 +3231,7 @@ GetShardState(uint64 shardId, HTAB *shardStateHash,
|
||||||
{
|
{
|
||||||
InitializeCopyShardState(shardState, connectionStateHash,
|
InitializeCopyShardState(shardState, connectionStateHash,
|
||||||
shardId, stopOnFailure, shouldUseLocalCopy,
|
shardId, stopOnFailure, shouldUseLocalCopy,
|
||||||
copyOutState);
|
copyOutState, isCopyToIntermediateFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
return shardState;
|
return shardState;
|
||||||
|
@ -3243,7 +3247,7 @@ static void
|
||||||
InitializeCopyShardState(CopyShardState *shardState,
|
InitializeCopyShardState(CopyShardState *shardState,
|
||||||
HTAB *connectionStateHash, uint64 shardId,
|
HTAB *connectionStateHash, uint64 shardId,
|
||||||
bool stopOnFailure, bool shouldUseLocalCopy, CopyOutState
|
bool stopOnFailure, bool shouldUseLocalCopy, CopyOutState
|
||||||
copyOutState)
|
copyOutState, bool isCopyToIntermediateFile)
|
||||||
{
|
{
|
||||||
ListCell *placementCell = NULL;
|
ListCell *placementCell = NULL;
|
||||||
int failedPlacementCount = 0;
|
int failedPlacementCount = 0;
|
||||||
|
@ -3281,6 +3285,19 @@ InitializeCopyShardState(CopyShardState *shardState,
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (placement->groupId == GetLocalGroupId())
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* if we are copying into an intermediate file we won't use local copy.
|
||||||
|
* Files are visible to all transactions so we can still use local execution, therefore
|
||||||
|
* we shouldn't restrict only using connection in this case.
|
||||||
|
*/
|
||||||
|
if (!isCopyToIntermediateFile)
|
||||||
|
{
|
||||||
|
TransactionConnectedToLocalGroup = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
MultiConnection *connection =
|
MultiConnection *connection =
|
||||||
CopyGetPlacementConnection(placement, stopOnFailure);
|
CopyGetPlacementConnection(placement, stopOnFailure);
|
||||||
if (connection == NULL)
|
if (connection == NULL)
|
||||||
|
@ -3289,6 +3306,7 @@ InitializeCopyShardState(CopyShardState *shardState,
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
CopyConnectionState *connectionState = GetConnectionState(connectionStateHash,
|
CopyConnectionState *connectionState = GetConnectionState(connectionStateHash,
|
||||||
connection);
|
connection);
|
||||||
|
|
||||||
|
@ -3439,11 +3457,6 @@ CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure)
|
||||||
ClaimConnectionExclusively(connection);
|
ClaimConnectionExclusively(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!TransactionConnectedToLocalGroup && placement->groupId == GetLocalGroupId())
|
|
||||||
{
|
|
||||||
TransactionConnectedToLocalGroup = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -598,6 +598,15 @@ ShouldExecuteTasksLocally(List *taskList)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (TransactionConnectedToLocalGroup)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* if the current transaction accessed the local node over a connection
|
||||||
|
* then we can't use local execution because of visibility problems.
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (TransactionAccessedLocalPlacement)
|
if (TransactionAccessedLocalPlacement)
|
||||||
{
|
{
|
||||||
bool isValidLocalExecutionPath PG_USED_FOR_ASSERTS_ONLY = false;
|
bool isValidLocalExecutionPath PG_USED_FOR_ASSERTS_ONLY = false;
|
||||||
|
@ -657,7 +666,7 @@ ShouldExecuteTasksLocally(List *taskList)
|
||||||
* has happened because that'd break transaction visibility rules and
|
* has happened because that'd break transaction visibility rules and
|
||||||
* many other things.
|
* many other things.
|
||||||
*/
|
*/
|
||||||
return !TransactionConnectedToLocalGroup;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!singleTask)
|
if (!singleTask)
|
||||||
|
|
Loading…
Reference in New Issue