mirror of https://github.com/citusdata/citus.git
Rename CoordinatedTransactionUse2PC and make it public
parent
9acfc15c73
commit
512b4141a5
|
@ -36,6 +36,7 @@
|
||||||
#include "distributed/placement_connection.h"
|
#include "distributed/placement_connection.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
|
#include "distributed/transaction_management.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
|
@ -289,10 +290,8 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
/*
|
/*
|
||||||
* Abort if all placements failed, mark placements invalid if only some failed. By
|
* Abort if all placements failed, mark placements invalid if only some failed. By
|
||||||
* doing this UpdateShardStatistics never works on failed placements.
|
* doing this UpdateShardStatistics never works on failed placements.
|
||||||
*
|
|
||||||
* (Pass false for using2PC arbitrarily, the parameter is not used)
|
|
||||||
*/
|
*/
|
||||||
CheckForFailedPlacements(true, false);
|
CheckForFailedPlacements(true, CoordinatedTransactionUses2PC);
|
||||||
|
|
||||||
/* update shard statistics and get new shard size */
|
/* update shard statistics and get new shard size */
|
||||||
newShardSize = UpdateShardStatistics(shardId);
|
newShardSize = UpdateShardStatistics(shardId);
|
||||||
|
|
|
@ -48,7 +48,7 @@ static bool subXactAbortAttempted = false;
|
||||||
* CoordinatedTransactionUse2PC(), e.g. if DDL was issued and
|
* CoordinatedTransactionUse2PC(), e.g. if DDL was issued and
|
||||||
* MultiShardCommitProtocol was set to 2PC.
|
* MultiShardCommitProtocol was set to 2PC.
|
||||||
*/
|
*/
|
||||||
static bool CurrentTransactionUse2PC = false;
|
bool CoordinatedTransactionUses2PC = false;
|
||||||
|
|
||||||
/* transaction management functions */
|
/* transaction management functions */
|
||||||
static void CoordinatedTransactionCallback(XactEvent event, void *arg);
|
static void CoordinatedTransactionCallback(XactEvent event, void *arg);
|
||||||
|
@ -113,7 +113,7 @@ CoordinatedTransactionUse2PC(void)
|
||||||
{
|
{
|
||||||
Assert(InCoordinatedTransaction());
|
Assert(InCoordinatedTransaction());
|
||||||
|
|
||||||
CurrentTransactionUse2PC = true;
|
CoordinatedTransactionUses2PC = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -167,7 +167,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
||||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||||
dlist_init(&InProgressTransactions);
|
dlist_init(&InProgressTransactions);
|
||||||
CurrentTransactionUse2PC = false;
|
CoordinatedTransactionUses2PC = false;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
@ -202,7 +202,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
||||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||||
dlist_init(&InProgressTransactions);
|
dlist_init(&InProgressTransactions);
|
||||||
CurrentTransactionUse2PC = false;
|
CoordinatedTransactionUses2PC = false;
|
||||||
subXactAbortAttempted = false;
|
subXactAbortAttempted = false;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -247,9 +247,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
* case that iteratively executed commands marked all placements
|
* case that iteratively executed commands marked all placements
|
||||||
* as invalid.
|
* as invalid.
|
||||||
*/
|
*/
|
||||||
CheckForFailedPlacements(true, CurrentTransactionUse2PC);
|
CheckForFailedPlacements(true, CoordinatedTransactionUses2PC);
|
||||||
|
|
||||||
if (CurrentTransactionUse2PC)
|
if (CoordinatedTransactionUses2PC)
|
||||||
{
|
{
|
||||||
CoordinatedRemoteTransactionsPrepare();
|
CoordinatedRemoteTransactionsPrepare();
|
||||||
CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;
|
CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;
|
||||||
|
@ -270,7 +270,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
* Check again whether shards/placement successfully
|
* Check again whether shards/placement successfully
|
||||||
* committed. This handles failure at COMMIT/PREPARE time.
|
* committed. This handles failure at COMMIT/PREPARE time.
|
||||||
*/
|
*/
|
||||||
CheckForFailedPlacements(false, CurrentTransactionUse2PC);
|
CheckForFailedPlacements(false, CoordinatedTransactionUses2PC);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
|
|
@ -66,6 +66,9 @@ extern CoordinatedTransactionState CurrentCoordinatedTransactionState;
|
||||||
/* list of connections that are part of the current coordinated transaction */
|
/* list of connections that are part of the current coordinated transaction */
|
||||||
extern dlist_head InProgressTransactions;
|
extern dlist_head InProgressTransactions;
|
||||||
|
|
||||||
|
/* whether we've been asked to use 2PC (by calling CoordinatedTransactionUse2PC()) */
|
||||||
|
extern bool CoordinatedTransactionUses2PC;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Coordinated transaction management.
|
* Coordinated transaction management.
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue