Merge pull request #1529 from citusdata/deadlock_detection_main

Distributed Deadlock detection
pull/1551/head
Önder Kalacı 2017-08-12 14:02:27 +03:00 committed by GitHub
commit 45957e5688
30 changed files with 2088 additions and 138 deletions

View File

@ -11,7 +11,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
6.2-1 6.2-2 6.2-3 6.2-4 \
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10 7.0-11 7.0-12 7.0-13
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10 7.0-11 7.0-12 7.0-13 7.0-14
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -165,6 +165,8 @@ $(EXTENSION)--7.0-12.sql: $(EXTENSION)--7.0-11.sql $(EXTENSION)--7.0-11--7.0-12.
cat $^ > $@
$(EXTENSION)--7.0-13.sql: $(EXTENSION)--7.0-12.sql $(EXTENSION)--7.0-12--7.0-13.sql
cat $^ > $@
$(EXTENSION)--7.0-14.sql: $(EXTENSION)--7.0-13.sql $(EXTENSION)--7.0-13--7.0-14.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -44,3 +44,4 @@ COMMENT ON FUNCTION citus_drop_trigger()
IS 'perform checks and actions at the end of DROP actions';
RESET search_path;

View File

@ -0,0 +1,13 @@
/* citus--7.0-13--7.0-14.sql */
SET search_path = 'pg_catalog';
CREATE OR REPLACE FUNCTION check_distributed_deadlocks()
RETURNS BOOL
LANGUAGE 'c' STRICT
AS $$MODULE_PATHNAME$$, $$check_distributed_deadlocks$$;
COMMENT ON FUNCTION check_distributed_deadlocks()
IS 'does a distributed deadlock check, if a deadlock found cancels one of the participating backends and returns true ';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '7.0-13'
default_version = '7.0-14'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -70,6 +70,8 @@
/* controls use of locks to enforce safe commutativity */
bool AllModificationsCommutative = false;
/* we've deprecated this flag, keeping here for some time not to break existing users */
bool EnableDeadlockPrevention = true;
/* functions needed during run phase */
@ -79,8 +81,7 @@ static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
bool expectResults);
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
static List * GetModifyConnections(Task *task, bool markCritical,
bool startedInTransaction);
static List * GetModifyConnections(Task *task, bool markCritical);
static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
bool isModificationQuery, bool expectResults);
static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
@ -680,8 +681,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
char *queryString = task->queryString;
bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC);
bool startedInTransaction =
InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA;
/*
* Modifications for reference tables are always done using 2PC. First
@ -711,9 +710,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
* establish the connection, mark as critical (when modifying reference
* table) and start a transaction (when in a transaction).
*/
connectionList = GetModifyConnections(task,
taskRequiresTwoPhaseCommit,
startedInTransaction);
connectionList = GetModifyConnections(task, taskRequiresTwoPhaseCommit);
/* prevent replicas of the same shard from diverging */
AcquireExecutorShardLock(task, operation);
@ -809,12 +806,10 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
* modify commands on the placements in tasPlacementList. If necessary remote
* transactions are started.
*
* If markCritical is true remote transactions are marked as critical. If
* noNewTransactions is true, this function errors out if there's no
* transaction in progress.
* If markCritical is true remote transactions are marked as critical.
*/
static List *
GetModifyConnections(Task *task, bool markCritical, bool noNewTransactions)
GetModifyConnections(Task *task, bool markCritical)
{
List *taskPlacementList = task->taskPlacementList;
ListCell *taskPlacementCell = NULL;
@ -844,22 +839,17 @@ GetModifyConnections(Task *task, bool markCritical, bool noNewTransactions)
NULL);
/*
* If already in a transaction, disallow expanding set of remote
* transactions. That prevents some forms of distributed deadlocks.
* If we're expanding the set nodes that participate in the distributed
* transaction, conform to MultiShardCommitProtocol.
*/
if (noNewTransactions)
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC &&
InCoordinatedTransaction() &&
XactModificationLevel == XACT_MODIFICATION_DATA)
{
RemoteTransaction *transaction = &multiConnection->remoteTransaction;
if (EnableDeadlockPrevention &&
transaction->transactionState == REMOTE_TRANS_INVALID)
if (transaction->transactionState == REMOTE_TRANS_INVALID)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
errmsg("no transaction participant matches %s:%d",
taskPlacement->nodeName, taskPlacement->nodePort),
errdetail("Transactions which modify distributed tables "
"may only target nodes affected by the "
"modification command which began the transaction.")));
CoordinatedTransactionUse2PC();
}
}

View File

@ -23,6 +23,7 @@
#include "distributed/citus_nodefuncs.h"
#include "distributed/connection_management.h"
#include "distributed/connection_management.h"
#include "distributed/distributed_deadlock_detection.h"
#include "distributed/maintenanced.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
@ -57,8 +58,12 @@ static char *CitusVersion = CITUS_VERSION;
void _PG_init(void);
static void multi_log_hook(ErrorData *edata);
static void CreateRequiredDirectories(void);
static void RegisterCitusConfigVariables(void);
static void WarningForEnableDeadlockPrevention(bool newval, void *extra);
static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra,
GucSource source);
static void NormalizeWorkerListPath(void);
@ -174,6 +179,9 @@ _PG_init(void)
set_rel_pathlist_hook = multi_relation_restriction_hook;
set_join_pathlist_hook = multi_join_restriction_hook;
/* register hook for error messages */
emit_log_hook = multi_log_hook;
InitializeMaintenanceDaemon();
/* organize that task tracker is started once server is up */
@ -194,6 +202,27 @@ _PG_init(void)
}
/*
* multi_log_hook intercepts postgres log commands. We use this to override
* postgres error messages when they're not specific enough for the users.
*/
static void
multi_log_hook(ErrorData *edata)
{
/*
* Show the user a meaningful error message when a backend is cancelled
* by the distributed deadlock detection.
*/
if (edata->elevel == ERROR && edata->sqlerrcode == ERRCODE_QUERY_CANCELED &&
MyBackendGotCancelledDueToDeadlock())
{
edata->sqlerrcode = ERRCODE_T_R_DEADLOCK_DETECTED;
edata->message = "canceling the transaction since it has "
"involved in a distributed deadlock";
}
}
/*
* StartupCitusBackend initializes per-backend infrastructure, and is called
* the first time citus is used in a database.
@ -333,6 +362,17 @@ RegisterCitusConfigVariables(void)
0,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.log_distributed_deadlock_detection",
gettext_noop("Log distributed deadlock detection related processing in "
"the server log"),
NULL,
&LogDistributedDeadlockDetection,
false,
PGC_SIGHUP,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.explain_distributed_queries",
gettext_noop("Enables Explain for distributed queries."),
@ -368,6 +408,19 @@ RegisterCitusConfigVariables(void)
0,
NULL, NULL, NULL);
DefineCustomRealVariable(
"citus.distributed_deadlock_detection_factor",
gettext_noop("Sets the time to wait before checking for distributed "
"deadlocks. Postgres' deadlock_timeout setting is "
"multiplied with the value. If the value is set to"
"1000, distributed deadlock detection is disabled."),
NULL,
&DistributedDeadlockDetectionTimeoutFactor,
2.0, -1.0, 1000.0,
PGC_SIGHUP,
0,
ErrorIfNotASuitableDeadlockFactor, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_deadlock_prevention",
gettext_noop("Prevents transactions from expanding to multiple nodes"),
@ -379,7 +432,7 @@ RegisterCitusConfigVariables(void)
true,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
NULL, WarningForEnableDeadlockPrevention, NULL);
DefineCustomBoolVariable(
"citus.enable_ddl_propagation",
@ -737,6 +790,39 @@ RegisterCitusConfigVariables(void)
}
/*
* Inform the users about the deprecated flag.
*/
static void
WarningForEnableDeadlockPrevention(bool newval, void *extra)
{
ereport(WARNING, (errcode(ERRCODE_WARNING_DEPRECATED_FEATURE),
errmsg("citus.enable_deadlock_prevention is deprecated and it has "
"no effect. The flag will be removed in the next release.")));
}
/*
* We don't want to allow values less than 1.0. However, we define -1 as the value to disable
* distributed deadlock checking. Here we enforce our special constraint.
*/
static bool
ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra, GucSource source)
{
if (*newval <= 1.0 && *newval != -1.0)
{
ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg(
"citus.distributed_deadlock_detection_factor cannot be less than 1. "
"To disable distributed deadlock detection set the value to -1.")));
return false;
}
return true;
}
/*
* NormalizeWorkerListPath converts the path configured via
* citus.worker_list_file into an absolute path, falling back to the default

View File

@ -26,9 +26,6 @@
#include "utils/timestamp.h"
static char * WaitsForToString(List *waitsFor);
PG_FUNCTION_INFO_V1(get_adjacency_list_wait_graph);
@ -114,31 +111,3 @@ get_adjacency_list_wait_graph(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
/*
* WaitsForToString is only intended for testing and debugging. It gets a
* waitsForList and returns the list of transaction nodes' transactionNumber
* in a string.
*/
static char *
WaitsForToString(List *waitsFor)
{
StringInfo transactionIdStr = makeStringInfo();
ListCell *waitsForCell = NULL;
foreach(waitsForCell, waitsFor)
{
TransactionNode *waitingNode = (TransactionNode *) lfirst(waitsForCell);
if (transactionIdStr->len != 0)
{
appendStringInfoString(transactionIdStr, ",");
}
appendStringInfo(transactionIdStr, "%ld",
waitingNode->transactionId.transactionNumber);
}
return transactionIdStr->data;
}

View File

@ -19,6 +19,7 @@
#include "datatype/timestamp.h"
#include "distributed/backend_data.h"
#include "distributed/listutils.h"
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h"
#include "distributed/transaction_identifier.h"
#include "nodes/execnodes.h"
@ -111,6 +112,7 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS)
MyBackendData->transactionId.initiatorNodeIdentifier = PG_GETARG_INT32(0);
MyBackendData->transactionId.transactionNumber = PG_GETARG_INT64(1);
MyBackendData->transactionId.timestamp = PG_GETARG_TIMESTAMPTZ(2);
MyBackendData->transactionId.transactionOriginator = false;
SpinLockRelease(&MyBackendData->mutex);
@ -412,6 +414,7 @@ InitializeBackendData(void)
MyBackendData->databaseId = MyDatabaseId;
MyBackendData->transactionId.initiatorNodeIdentifier = 0;
MyBackendData->transactionId.transactionOriginator = false;
MyBackendData->transactionId.transactionNumber = 0;
MyBackendData->transactionId.timestamp = 0;
@ -435,6 +438,7 @@ UnSetDistributedTransactionId(void)
MyBackendData->databaseId = 0;
MyBackendData->transactionId.initiatorNodeIdentifier = 0;
MyBackendData->transactionId.transactionOriginator = false;
MyBackendData->transactionId.transactionNumber = 0;
MyBackendData->transactionId.timestamp = 0;
@ -486,6 +490,8 @@ GetCurrentDistributedTransactionId(void)
currentDistributedTransactionId->initiatorNodeIdentifier =
MyBackendData->transactionId.initiatorNodeIdentifier;
currentDistributedTransactionId->transactionOriginator =
MyBackendData->transactionId.transactionOriginator;
currentDistributedTransactionId->transactionNumber =
MyBackendData->transactionId.transactionNumber;
currentDistributedTransactionId->timestamp =
@ -520,6 +526,7 @@ AssignDistributedTransactionId(void)
MyBackendData->databaseId = MyDatabaseId;
MyBackendData->transactionId.initiatorNodeIdentifier = localGroupId;
MyBackendData->transactionId.transactionOriginator = true;
MyBackendData->transactionId.transactionNumber =
nextTransactionNumber;
MyBackendData->transactionId.timestamp = currentTimestamp;
@ -566,3 +573,70 @@ GetBackendDataForProc(PGPROC *proc, BackendData *result)
SpinLockRelease(&backendData->mutex);
}
/*
* CancelTransactionDueToDeadlock cancels the input proc and also marks the backend
* data with this information.
*/
void
CancelTransactionDueToDeadlock(PGPROC *proc)
{
BackendData *backendData = &backendManagementShmemData->backends[proc->pgprocno];
/* backend might not have used citus yet and thus not initialized backend data */
if (!backendData)
{
return;
}
SpinLockAcquire(&backendData->mutex);
/* send a SIGINT only if the process is still in a distributed transaction */
if (backendData->transactionId.transactionNumber != 0)
{
backendData->cancelledDueToDeadlock = true;
SpinLockRelease(&backendData->mutex);
if (kill(proc->pid, SIGINT) != 0)
{
ereport(WARNING,
(errmsg("attempted to cancel this backend (pid: %d) to resolve a "
"distributed deadlock but the backend could not "
"be cancelled", proc->pid)));
}
}
else
{
SpinLockRelease(&backendData->mutex);
}
}
/*
* MyBackendGotCancelledDueToDeadlock returns whether the current distributed
* transaction was cancelled due to a deadlock. If the backend is not in a
* distributed transaction, the function returns false.
*/
bool
MyBackendGotCancelledDueToDeadlock(void)
{
bool cancelledDueToDeadlock = false;
/* backend might not have used citus yet and thus not initialized backend data */
if (!MyBackendData)
{
return false;
}
SpinLockAcquire(&MyBackendData->mutex);
if (IsInDistributedTransaction(MyBackendData))
{
cancelledDueToDeadlock = MyBackendData->cancelledDueToDeadlock;
}
SpinLockRelease(&MyBackendData->mutex);
return cancelledDueToDeadlock;
}

View File

@ -10,7 +10,9 @@
*/
#include "postgres.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "access/hash.h"
#include "distributed/backend_data.h"
@ -25,11 +27,355 @@
#include "utils/timestamp.h"
/* used only for finding the deadlock cycle path */
typedef struct QueuedTransactionNode
{
TransactionNode *transactionNode;
int currentStackDepth;
} QueuedTransactionNode;
/* GUC, determining whether debug messages for deadlock detection sent to LOG */
bool LogDistributedDeadlockDetection = false;
static bool CheckDeadlockForTransactionNode(TransactionNode *startingTransactionNode,
TransactionNode **transactionNodeStack,
List **deadlockPath);
static void PrependOutgoingNodesToQueue(TransactionNode *queuedTransactionNode,
int currentStackDepth,
List **toBeVisitedNodes);
static void BuildDeadlockPathList(QueuedTransactionNode *cycledTransactionNode,
TransactionNode **transactionNodeStack,
List **deadlockPath);
static void ResetVisitedFields(HTAB *adjacencyList);
static void AssociateDistributedTransactionWithBackendProc(TransactionNode *
transactionNode);
static TransactionNode * GetOrCreateTransactionNode(HTAB *adjacencyList,
DistributedTransactionId *
transactionId);
static uint32 DistributedTransactionIdHash(const void *key, Size keysize);
static int DistributedTransactionIdCompare(const void *a, const void *b, Size keysize);
static void LogCancellingBackend(TransactionNode *transactionNode);
static void LogTransactionNode(TransactionNode *transactionNode);
static void LogDistributedDeadlockDebugMessage(const char *errorMessage);
PG_FUNCTION_INFO_V1(check_distributed_deadlocks);
/*
* check_distributed_deadlocks is the external API for manually
* checking for distributed deadlocks. For the details, see
* CheckForDistributedDeadlocks().
*/
Datum
check_distributed_deadlocks(PG_FUNCTION_ARGS)
{
bool deadlockFound = CheckForDistributedDeadlocks();
return BoolGetDatum(deadlockFound);
}
/*
* CheckForDistributedDeadlocks is the entry point for detecing
* distributed deadlocks.
*
* In plain words, the function first builds a wait graph by
* adding the wait edges from the local node and then adding the
* remote wait edges to form a global wait graph. Later, the wait
* graph is converted into another graph representation (adjacency
* lists) for more efficient searches. Finally, a DFS is done on
* the adjacency lists. Finding a cycle in the graph unveils a
* distributed deadlock. Upon finding a deadlock, the youngest
* participant backend is cancelled.
*
* The complexity of the algorithm is O(N) for each distributed
* transaction that's checked for deadlocks. Note that there exists
* 0 to MaxBackends number of transactions.
*
* The function returns true if a deadlock is found. Otherwise, returns
* false.
*/
bool
CheckForDistributedDeadlocks(void)
{
WaitGraph *waitGraph = BuildGlobalWaitGraph();
HTAB *adjacencyLists = BuildAdjacencyListsForWaitGraph(waitGraph);
HASH_SEQ_STATUS status;
TransactionNode *transactionNode = NULL;
int edgeCount = waitGraph->edgeCount;
int localGroupId = GetLocalGroupId();
/*
* We iterate on transaction nodes and search for deadlocks where the
* starting node is the given transaction node.
*/
hash_seq_init(&status, adjacencyLists);
while ((transactionNode = (TransactionNode *) hash_seq_search(&status)) != 0)
{
bool deadlockFound = false;
List *deadlockPath = NIL;
TransactionNode *transactionNodeStack[edgeCount];
/* we're only interested in finding deadlocks originating from this node */
if (transactionNode->transactionId.initiatorNodeIdentifier != localGroupId)
{
continue;
}
ResetVisitedFields(adjacencyLists);
deadlockFound = CheckDeadlockForTransactionNode(transactionNode,
transactionNodeStack,
&deadlockPath);
if (deadlockFound)
{
TransactionNode *youngestTransaction = transactionNode;
ListCell *participantTransactionCell = NULL;
/* there should be at least two transactions to get into a deadlock */
Assert(list_length(deadlockPath) > 1);
LogDistributedDeadlockDebugMessage("Distributed deadlock found among the "
"following distributed transactions:");
/*
* We search for the youngest participant for two reasons
* (i) predictable results (ii) cancel the youngest transaction
* (i.e., if a DDL continues for 1 hour and deadlocks with a
* SELECT continues for 10 msec, we prefer to cancel the SELECT).
*
* We're also searching for the youngest transactions initiated by
* this node.
*/
foreach(participantTransactionCell, deadlockPath)
{
TransactionNode *currentNode =
(TransactionNode *) lfirst(participantTransactionCell);
TimestampTz youngestTimestamp =
youngestTransaction->transactionId.timestamp;
TimestampTz currentTimestamp = currentNode->transactionId.timestamp;
AssociateDistributedTransactionWithBackendProc(currentNode);
LogTransactionNode(currentNode);
if (currentNode->transactionId.initiatorNodeIdentifier ==
GetLocalGroupId() &&
timestamptz_cmp_internal(currentTimestamp, youngestTimestamp) == 1)
{
youngestTransaction = currentNode;
}
}
/* we should find the backend */
Assert(youngestTransaction->initiatorProc != NULL);
CancelTransactionDueToDeadlock(youngestTransaction->initiatorProc);
LogCancellingBackend(youngestTransaction);
hash_seq_term(&status);
return true;
}
}
return false;
}
/*
* CheckDeadlockForDistributedTransaction does a DFS starting with the given
* transaction node and checks for a cycle (i.e., the node can be reached again
* while traversing the graph).
*
* Finding a cycle indicates a distributed deadlock and the function returns
* true on that case. Also, the deadlockPath is filled with the transaction
* nodes that form the cycle.
*/
static bool
CheckDeadlockForTransactionNode(TransactionNode *startingTransactionNode,
TransactionNode **transactionNodeStack,
List **deadlockPath)
{
List *toBeVisitedNodes = NIL;
int currentStackDepth = 0;
/*
* We keep transactionNodeStack to keep track of the deadlock paths. At this point,
* adjust the depth of the starting node and set the stack's first element with
* the starting node.
*/
transactionNodeStack[currentStackDepth] = startingTransactionNode;
PrependOutgoingNodesToQueue(startingTransactionNode, currentStackDepth,
&toBeVisitedNodes);
/* traverse the graph and search for the deadlocks */
while (toBeVisitedNodes != NIL)
{
QueuedTransactionNode *queuedTransactionNode =
(QueuedTransactionNode *) linitial(toBeVisitedNodes);
TransactionNode *currentTransactionNode = queuedTransactionNode->transactionNode;
toBeVisitedNodes = list_delete_first(toBeVisitedNodes);
/* cycle found, let the caller know about the cycle */
if (currentTransactionNode == startingTransactionNode)
{
BuildDeadlockPathList(queuedTransactionNode, transactionNodeStack,
deadlockPath);
return true;
}
/* don't need to revisit the node again */
if (currentTransactionNode->transactionVisited)
{
continue;
}
currentTransactionNode->transactionVisited = true;
/* set the stack's corresponding element with the current node */
currentStackDepth = queuedTransactionNode->currentStackDepth;
transactionNodeStack[currentStackDepth] = currentTransactionNode;
PrependOutgoingNodesToQueue(currentTransactionNode, currentStackDepth,
&toBeVisitedNodes);
}
return false;
}
/*
* PrependOutgoingNodesToQueue prepends the waiters of the input transaction nodes to the
* toBeVisitedNodes.
*/
static void
PrependOutgoingNodesToQueue(TransactionNode *transactionNode, int currentStackDepth,
List **toBeVisitedNodes)
{
ListCell *currentWaitForCell = NULL;
/* as we traverse outgoing edges, increment the depth */
currentStackDepth++;
/* prepend to the list to continue depth-first search */
foreach(currentWaitForCell, transactionNode->waitsFor)
{
TransactionNode *waitForTransaction =
(TransactionNode *) lfirst(currentWaitForCell);
QueuedTransactionNode *queuedNode = palloc0(sizeof(QueuedTransactionNode));
queuedNode->transactionNode = waitForTransaction;
queuedNode->currentStackDepth = currentStackDepth;
*toBeVisitedNodes = lappend(*toBeVisitedNodes, queuedNode);
}
}
/*
* BuildDeadlockPathList fills deadlockPath with a list of transactions involved
* in a distributed deadlock (i.e. a cycle in the graph).
*/
static void
BuildDeadlockPathList(QueuedTransactionNode *cycledTransactionNode,
TransactionNode **transactionNodeStack,
List **deadlockPath)
{
int deadlockStackDepth = cycledTransactionNode->currentStackDepth;
int stackIndex = 0;
*deadlockPath = NIL;
for (stackIndex = 0; stackIndex < deadlockStackDepth; stackIndex++)
{
*deadlockPath = lappend(*deadlockPath, transactionNodeStack[stackIndex]);
}
}
/*
* ResetVisitedFields goes over all the elements of the input adjacency list
* and sets transactionVisited to false.
*/
static void
ResetVisitedFields(HTAB *adjacencyList)
{
HASH_SEQ_STATUS status;
TransactionNode *resetNode = NULL;
/* reset all visited fields */
hash_seq_init(&status, adjacencyList);
while ((resetNode = (TransactionNode *) hash_seq_search(&status)) != 0)
{
resetNode->transactionVisited = false;
}
}
/*
* AssociateDistributedTransactionWithBackendProc gets a transaction node
* and searches the corresponding backend. Once found, transactionNodes'
* initiatorProc is set to it.
*
* The function goes over all the backends, checks for the backend with
* the same transaction number as the given transaction node.
*/
static void
AssociateDistributedTransactionWithBackendProc(TransactionNode *transactionNode)
{
int backendIndex = 0;
for (backendIndex = 0; backendIndex < MaxBackends; ++backendIndex)
{
PGPROC *currentProc = &ProcGlobal->allProcs[backendIndex];
BackendData currentBackendData;
DistributedTransactionId *currentTransactionId = NULL;
/* we're not interested in processes that are not active or waiting on a lock */
if (currentProc->pid <= 0)
{
continue;
}
GetBackendDataForProc(currentProc, &currentBackendData);
/* we're only interested in distribtued transactions */
if (!IsInDistributedTransaction(&currentBackendData))
{
continue;
}
currentTransactionId = &currentBackendData.transactionId;
if (currentTransactionId->transactionNumber !=
transactionNode->transactionId.transactionNumber)
{
continue;
}
/* we're only interested in transactions started on this node */
if (!currentTransactionId->transactionOriginator)
{
continue;
}
/* at the point we should only have transactions initiated by this node */
Assert(currentTransactionId->initiatorNodeIdentifier == GetLocalGroupId());
transactionNode->initiatorProc = currentProc;
break;
}
}
/*
@ -79,15 +425,18 @@ BuildAdjacencyListsForWaitGraph(WaitGraph *waitGraph)
WaitEdge *edge = &waitGraph->edges[edgeIndex];
TransactionNode *waitingTransaction = NULL;
TransactionNode *blockingTransaction = NULL;
bool transactionOriginator = false;
DistributedTransactionId waitingId = {
edge->waitingNodeId,
transactionOriginator,
edge->waitingTransactionNum,
edge->waitingTransactionStamp
};
DistributedTransactionId blockingId = {
edge->blockingNodeId,
transactionOriginator,
edge->blockingTransactionNum,
edge->blockingTransactionStamp
};
@ -121,6 +470,7 @@ GetOrCreateTransactionNode(HTAB *adjacencyList, DistributedTransactionId *transa
if (!found)
{
transactionNode->waitsFor = NIL;
transactionNode->initiatorProc = NULL;
}
return transactionNode;
@ -191,3 +541,116 @@ DistributedTransactionIdCompare(const void *a, const void *b, Size keysize)
return 0;
}
}
/*
* LogCancellingBackend should only be called when a distributed transaction's
* backend is cancelled due to distributed deadlocks. It sends which transaction
* is cancelled and its corresponding pid to the log.
*/
static void
LogCancellingBackend(TransactionNode *transactionNode)
{
StringInfo logMessage = NULL;
if (!LogDistributedDeadlockDetection)
{
return;
}
logMessage = makeStringInfo();
appendStringInfo(logMessage, "Cancelling the following backend "
"to resolve distributed deadlock "
"(transaction numner = %ld, pid = %d)",
transactionNode->transactionId.transactionNumber,
transactionNode->initiatorProc->pid);
LogDistributedDeadlockDebugMessage(logMessage->data);
}
/*
* LogTransactionNode converts the transaction node to a human readable form
* and sends to the logs via LogDistributedDeadlockDebugMessage().
*/
static void
LogTransactionNode(TransactionNode *transactionNode)
{
StringInfo logMessage = NULL;
DistributedTransactionId *transactionId = NULL;
if (!LogDistributedDeadlockDetection)
{
return;
}
logMessage = makeStringInfo();
transactionId = &(transactionNode->transactionId);
appendStringInfo(logMessage, "[DistributedTransactionId: (%d, %ld, %s)] = ",
transactionId->initiatorNodeIdentifier,
transactionId->transactionNumber,
timestamptz_to_str(transactionId->timestamp));
appendStringInfo(logMessage, "[WaitsFor transaction numbers: %s]",
WaitsForToString(transactionNode->waitsFor));
/* log the backend query if the proc is associated with the transaction */
if (transactionNode->initiatorProc != NULL)
{
const char *backendQuery =
pgstat_get_backend_current_activity(transactionNode->initiatorProc->pid,
false);
appendStringInfo(logMessage, "[Backend Query: %s]", backendQuery);
}
LogDistributedDeadlockDebugMessage(logMessage->data);
}
/*
* LogDistributedDeadlockDebugMessage checks EnableDistributedDeadlockDebugging flag. If
* it is true, the input message is sent to the logs with LOG level. Also, current timestamp
* is prepanded to the message.
*/
static void
LogDistributedDeadlockDebugMessage(const char *errorMessage)
{
if (!LogDistributedDeadlockDetection)
{
return;
}
ereport(LOG, (errmsg("[%s] %s", timestamptz_to_str(GetCurrentTimestamp()),
errorMessage)));
}
/*
* WaitsForToString is only intended for testing and debugging. It gets a
* waitsForList and returns the list of transaction nodes' transactionNumber
* in a string.
*/
char *
WaitsForToString(List *waitsFor)
{
StringInfo transactionIdStr = makeStringInfo();
ListCell *waitsForCell = NULL;
foreach(waitsForCell, waitsFor)
{
TransactionNode *waitingNode = (TransactionNode *) lfirst(waitsForCell);
if (transactionIdStr->len != 0)
{
appendStringInfoString(transactionIdStr, ",");
}
appendStringInfo(transactionIdStr, "%ld",
waitingNode->transactionId.transactionNumber);
}
return transactionIdStr->data;
}

View File

@ -56,10 +56,8 @@ static void AddEdgesForWaitQueue(WaitGraph *waitGraph, PGPROC *waitingProc,
static void AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
PROCStack *remaining);
static WaitEdge * AllocWaitEdge(WaitGraph *waitGraph);
static bool IsProcessWaitingForLock(PGPROC *proc);
static bool IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc);
static bool IsConflictingLockMask(int holdMask, int conflictMask);
static bool IsInDistributedTransaction(BackendData *backendData);
PG_FUNCTION_INFO_V1(dump_local_wait_edges);
@ -710,7 +708,7 @@ AllocWaitEdge(WaitGraph *waitGraph)
/*
* IsProcessWaitingForLock returns whether a given process is waiting for a lock.
*/
static bool
bool
IsProcessWaitingForLock(PGPROC *proc)
{
return proc->waitStatus == STATUS_WAITING;
@ -750,7 +748,7 @@ IsConflictingLockMask(int holdMask, int conflictMask)
* IsInDistributedTransaction returns whether the given backend is in a
* distributed transaction.
*/
static bool
bool
IsInDistributedTransaction(BackendData *backendData)
{
return backendData->transactionId.transactionNumber != 0;

View File

@ -22,6 +22,7 @@
#include "access/xact.h"
#include "libpq/pqsignal.h"
#include "distributed/distributed_deadlock_detection.h"
#include "distributed/maintenanced.h"
#include "distributed/metadata_cache.h"
#include "postmaster/bgworker.h"
@ -72,6 +73,8 @@ typedef struct MaintenanceDaemonDBData
Latch *latch; /* pointer to the background worker's latch */
} MaintenanceDaemonDBData;
/* config variable for distributed deadlock detection timeout */
double DistributedDeadlockDetectionTimeoutFactor = 2.0;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL;
@ -248,7 +251,8 @@ CitusMaintenanceDaemonMain(Datum main_arg)
{
int rc;
int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
int timeout = 10000; /* wake up at least every so often */
double timeout = 10000.0; /* use this if the deadlock detection is disabled */
bool foundDeadlock = false;
CHECK_FOR_INTERRUPTS();
@ -258,13 +262,40 @@ CitusMaintenanceDaemonMain(Datum main_arg)
* tasks should do their own time math about whether to re-run checks.
*/
/* the config value -1 disables the distributed deadlock detection */
if (DistributedDeadlockDetectionTimeoutFactor != -1.0)
{
StartTransactionCommand();
foundDeadlock = CheckForDistributedDeadlocks();
CommitTransactionCommand();
/*
* If we find any deadlocks, run the distributed deadlock detection
* more often since it is quite possible that there are other
* deadlocks need to be resolved.
*
* Thus, we use 1/20 of the calculated value. With the default
* values (i.e., deadlock_timeout 1 seconds,
* citus.distributed_deadlock_detection_factor 2), we'd be able to cancel
* ~10 distributed deadlocks per second.
*/
timeout =
DistributedDeadlockDetectionTimeoutFactor * (double) DeadlockTimeout;
if (foundDeadlock)
{
timeout = timeout / 20.0;
}
}
/*
* Wait until timeout, or until somebody wakes us up.
* Wait until timeout, or until somebody wakes us up. Also cast the timeout to
* integer where we've calculated it using double for not losing the precision.
*/
#if (PG_VERSION_NUM >= 100000)
rc = WaitLatch(MyLatch, latchFlags, timeout, PG_WAIT_EXTENSION);
rc = WaitLatch(MyLatch, latchFlags, (long) timeout, PG_WAIT_EXTENSION);
#else
rc = WaitLatch(MyLatch, latchFlags, timeout);
rc = WaitLatch(MyLatch, latchFlags, (long) timeout);
#endif
/* emergency bailout if postmaster has died */

View File

@ -29,6 +29,7 @@ typedef struct BackendData
{
Oid databaseId;
slock_t mutex;
bool cancelledDueToDeadlock;
DistributedTransactionId transactionId;
} BackendData;
@ -40,5 +41,7 @@ extern void UnlockBackendSharedMemory(void);
extern void UnSetDistributedTransactionId(void);
extern void AssignDistributedTransactionId(void);
extern void GetBackendDataForProc(PGPROC *proc, BackendData *result);
extern void CancelTransactionDueToDeadlock(PGPROC *proc);
extern bool MyBackendGotCancelledDueToDeadlock(void);
#endif /* BACKEND_DATA_H */

View File

@ -26,10 +26,21 @@ typedef struct TransactionNode
/* list of TransactionNode that this distributed transaction is waiting for */
List *waitsFor;
/* backend that is on the initiator node */
PGPROC *initiatorProc;
bool transactionVisited;
} TransactionNode;
HTAB * BuildAdjacencyListsForWaitGraph(WaitGraph *waitGraph);
/* GUC, determining whether debug messages for deadlock detection sent to LOG */
extern bool LogDistributedDeadlockDetection;
extern bool CheckForDistributedDeadlocks(void);
extern HTAB * BuildAdjacencyListsForWaitGraph(WaitGraph *waitGraph);
extern char * WaitsForToString(List *waitsFor);
#endif /* DISTRIBUTED_DEADLOCK_DETECTION_H */

View File

@ -55,6 +55,8 @@ typedef struct WaitGraph
extern WaitGraph * BuildGlobalWaitGraph(void);
extern bool IsProcessWaitingForLock(PGPROC *proc);
extern bool IsInDistributedTransaction(BackendData *backendData);
#endif /* LOCK_GRAPH_H */

View File

@ -12,6 +12,9 @@
#ifndef MAINTENANCED_H
#define MAINTENANCED_H
/* config variable for */
extern double DistributedDeadlockDetectionTimeoutFactor;
extern void InitializeMaintenanceDaemon(void);
extern void InitializeMaintenanceDaemonBackend(void);

View File

@ -21,6 +21,10 @@
*
* - initiatorNodeIdentifier: A unique identifier of the node that initiated
* the distributed transaction
* - transactionOriginator: Set to true only for the transactions initialized on
* the coordinator. This is only useful for MX in order to distinguish the transaction
* that started the distributed transaction on the coordinator where we could
* have the same transactions' worker queries on the same node
* - transactionNumber: A locally unique identifier assigned for the distributed
* transaction on the node that initiated the distributed transaction
* - timestamp: The current timestamp of distributed transaction initiation
@ -29,6 +33,7 @@
typedef struct DistributedTransactionId
{
int initiatorNodeIdentifier;
bool transactionOriginator;
uint64 transactionNumber;
TimestampTz timestamp;
} DistributedTransactionId;

View File

@ -0,0 +1,876 @@
Parsed test spec with 7 sessions
starting permutation: s1-begin s2-begin s1-update-1 s2-update-2 s2-update-1 deadlock-checker-call s1-update-2 deadlock-checker-call s1-finish s2-finish
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-update-1:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
step s2-update-2:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2;
step s2-update-1:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 1;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
f
step s1-update-2:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 2;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s2-update-1: <... completed>
step s1-update-2: <... completed>
error in steps deadlock-checker-call s2-update-1 s1-update-2: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s1-finish:
COMMIT;
step s2-finish:
COMMIT;
starting permutation: s1-begin s2-begin s1-update-1-rep-2 s2-update-2-rep-2 s2-update-1-rep-2 deadlock-checker-call s1-update-2-rep-2 deadlock-checker-call s1-finish s2-finish
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-update-1-rep-2:
UPDATE deadlock_detection_test_rep_2 SET some_val = 1 WHERE user_id = 1;
step s2-update-2-rep-2:
UPDATE deadlock_detection_test_rep_2 SET some_val = 1 WHERE user_id = 2;
step s2-update-1-rep-2:
UPDATE deadlock_detection_test_rep_2 SET some_val = 1 WHERE user_id = 1;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
f
step s1-update-2-rep-2:
UPDATE deadlock_detection_test_rep_2 SET some_val = 1 WHERE user_id = 2;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s2-update-1-rep-2: <... completed>
step s1-update-2-rep-2: <... completed>
error in steps deadlock-checker-call s2-update-1-rep-2 s1-update-2-rep-2: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s1-finish:
COMMIT;
step s2-finish:
COMMIT;
starting permutation: s1-begin s2-begin s1-set-2pc s2-set-2pc s1-update-1 s2-update-2 s2-update-1 deadlock-checker-call s1-update-2 deadlock-checker-call s1-finish s2-finish
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-set-2pc:
set citus.multi_shard_commit_protocol TO '2pc';
step s2-set-2pc:
set citus.multi_shard_commit_protocol TO '2pc';
step s1-update-1:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
step s2-update-2:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2;
step s2-update-1:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 1;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
f
step s1-update-2:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 2;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s2-update-1: <... completed>
step s1-update-2: <... completed>
error in steps deadlock-checker-call s2-update-1 s1-update-2: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s1-finish:
COMMIT;
step s2-finish:
COMMIT;
starting permutation: s1-begin s2-begin s1-update-1 s2-update-2 s1-update-2 deadlock-checker-call s2-upsert-select-all deadlock-checker-call s1-finish s2-finish
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-update-1:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
step s2-update-2:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2;
step s1-update-2:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 2;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
f
step s2-upsert-select-all:
INSERT INTO deadlock_detection_test SELECT * FROM deadlock_detection_test ON CONFLICT(user_id) DO UPDATE SET some_val = deadlock_detection_test.some_val + 5 RETURNING *;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s1-update-2: <... completed>
step s2-upsert-select-all: <... completed>
error in steps deadlock-checker-call s1-update-2 s2-upsert-select-all: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s1-finish:
COMMIT;
step s2-finish:
COMMIT;
starting permutation: s1-begin s2-begin s1-update-1 s2-update-2 s1-update-2 deadlock-checker-call s2-ddl deadlock-checker-call s1-finish s2-finish
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-update-1:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
step s2-update-2:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2;
step s1-update-2:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 2;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
f
step s2-ddl:
ALTER TABLE deadlock_detection_test ADD COLUMN test_col INT;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s1-update-2: <... completed>
step s2-ddl: <... completed>
error in steps deadlock-checker-call s1-update-2 s2-ddl: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s1-finish:
COMMIT;
step s2-finish:
COMMIT;
starting permutation: s1-begin s2-begin s1-insert-dist-10 s2-insert-local-10 s2-insert-dist-10 s1-insert-local-10 deadlock-checker-call s1-finish s2-finish
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-insert-dist-10:
INSERT INTO deadlock_detection_test VALUES (10, 10);
step s2-insert-local-10:
INSERT INTO local_deadlock_table VALUES (10, 10);
step s2-insert-dist-10:
INSERT INTO deadlock_detection_test VALUES (10, 10);
<waiting ...>
step s1-insert-local-10:
INSERT INTO local_deadlock_table VALUES (10, 10);
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s2-insert-dist-10: <... completed>
step s1-insert-local-10: <... completed>
error in steps deadlock-checker-call s2-insert-dist-10 s1-insert-local-10: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s1-finish:
COMMIT;
step s2-finish:
COMMIT;
starting permutation: s1-begin s2-begin s2-insert-ref-10 s1-insert-ref-11 s2-insert-ref-11 s1-insert-ref-10 deadlock-checker-call s1-finish s2-finish
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s2-insert-ref-10:
INSERT INTO deadlock_detection_reference VALUES (10, 10);
step s1-insert-ref-11:
INSERT INTO deadlock_detection_reference VALUES (11, 11);
step s2-insert-ref-11:
INSERT INTO deadlock_detection_reference VALUES (11, 11);
<waiting ...>
step s1-insert-ref-10:
INSERT INTO deadlock_detection_reference VALUES (10, 10);
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s2-insert-ref-11: <... completed>
step s1-insert-ref-10: <... completed>
error in steps deadlock-checker-call s2-insert-ref-11 s1-insert-ref-10: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s1-finish:
COMMIT;
step s2-finish:
COMMIT;
starting permutation: s1-begin s2-begin s2-insert-ref-10 s1-update-1 deadlock-checker-call s2-update-1 s1-insert-ref-10 deadlock-checker-call s1-finish s2-finish
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s2-insert-ref-10:
INSERT INTO deadlock_detection_reference VALUES (10, 10);
step s1-update-1:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
f
step s2-update-1:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 1;
<waiting ...>
step s1-insert-ref-10:
INSERT INTO deadlock_detection_reference VALUES (10, 10);
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s2-update-1: <... completed>
step s1-insert-ref-10: <... completed>
error in steps deadlock-checker-call s2-update-1 s1-insert-ref-10: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s1-finish:
COMMIT;
step s2-finish:
COMMIT;
starting permutation: s1-begin s2-begin s3-begin s1-update-1 s2-update-2 s3-update-3 deadlock-checker-call s1-update-2 s2-update-3 s3-update-1 deadlock-checker-call s3-finish s2-finish s1-finish
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s3-begin:
BEGIN;
step s1-update-1:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
step s2-update-2:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2;
step s3-update-3:
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 3;
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
f
step s1-update-2:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 2;
<waiting ...>
step s2-update-3:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 3;
<waiting ...>
step s3-update-1:
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 1;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s2-update-3: <... completed>
step s3-update-1: <... completed>
error in steps deadlock-checker-call s2-update-3 s3-update-1: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s3-finish:
COMMIT;
step s2-finish:
COMMIT;
step s1-update-2: <... completed>
step s1-finish:
COMMIT;
starting permutation: s1-begin s2-begin s3-begin s2-update-1 s1-update-1 s2-update-2 s3-update-3 s3-update-2 deadlock-checker-call s2-update-3 deadlock-checker-call s3-finish s2-finish s1-finish
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s3-begin:
BEGIN;
step s2-update-1:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 1;
step s1-update-1:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
<waiting ...>
step s2-update-2:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2;
step s3-update-3:
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 3;
step s3-update-2:
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 2;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
f
step s2-update-3:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 3;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s3-update-2: <... completed>
step s2-update-3: <... completed>
error in steps deadlock-checker-call s3-update-2 s2-update-3: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s3-finish:
COMMIT;
step s2-finish:
COMMIT;
step s1-update-1: <... completed>
step s1-finish:
COMMIT;
starting permutation: s1-begin s2-begin s3-begin s4-begin s1-update-1 s2-update-2 s3-update-3 s3-update-2 deadlock-checker-call s4-update-4 s2-update-3 deadlock-checker-call s3-finish s2-finish s1-finish s4-finish
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s3-begin:
BEGIN;
step s4-begin:
BEGIN;
step s1-update-1:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
step s2-update-2:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2;
step s3-update-3:
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 3;
step s3-update-2:
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 2;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
f
step s4-update-4:
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 4;
step s2-update-3:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 3;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s3-update-2: <... completed>
step s2-update-3: <... completed>
error in steps deadlock-checker-call s3-update-2 s2-update-3: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s3-finish:
COMMIT;
step s2-finish:
COMMIT;
step s1-finish:
COMMIT;
step s4-finish:
COMMIT;
starting permutation: s1-begin s2-begin s3-begin s4-begin s4-update-1 s1-update-1 deadlock-checker-call s2-update-2 s3-update-3 s2-update-3 s3-update-2 deadlock-checker-call s3-finish s2-finish s4-finish s1-finish
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s3-begin:
BEGIN;
step s4-begin:
BEGIN;
step s4-update-1:
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 1;
step s1-update-1:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
f
step s2-update-2:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2;
step s3-update-3:
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 3;
step s2-update-3:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 3;
<waiting ...>
step s3-update-2:
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 2;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s2-update-3: <... completed>
step s3-update-2: <... completed>
error in steps deadlock-checker-call s2-update-3 s3-update-2: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s3-finish:
COMMIT;
step s2-finish:
COMMIT;
step s4-finish:
COMMIT;
step s1-update-1: <... completed>
step s1-finish:
COMMIT;
starting permutation: s1-begin s2-begin s3-begin s4-begin s1-update-1 s4-update-4 s2-update-2 s3-update-3 s3-update-2 s4-update-1 s1-update-4 deadlock-checker-call s1-finish s4-finish s2-update-3 deadlock-checker-call s2-finish s3-finish
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s3-begin:
BEGIN;
step s4-begin:
BEGIN;
step s1-update-1:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
step s4-update-4:
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 4;
step s2-update-2:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2;
step s3-update-3:
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 3;
step s3-update-2:
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 2;
<waiting ...>
step s4-update-1:
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 1;
<waiting ...>
step s1-update-4:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 4;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s4-update-1: <... completed>
step s1-update-4: <... completed>
error in steps deadlock-checker-call s4-update-1 s1-update-4: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s1-finish:
COMMIT;
step s4-finish:
COMMIT;
step s2-update-3:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 3;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s3-update-2: <... completed>
step s2-update-3: <... completed>
error in steps deadlock-checker-call s3-update-2 s2-update-3: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s2-finish:
COMMIT;
step s3-finish:
COMMIT;
starting permutation: s1-begin s2-begin s3-begin s4-begin s5-begin s6-begin s1-update-1 s5-update-5 s3-update-2 s2-update-3 s4-update-4 s3-update-4 deadlock-checker-call s6-update-6 s4-update-6 s1-update-5 s5-update-1 deadlock-checker-call s1-finish s5-finish s6-finish s4-finish s3-finish s2-finish
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s3-begin:
BEGIN;
step s4-begin:
BEGIN;
step s5-begin:
BEGIN;
step s6-begin:
BEGIN;
step s1-update-1:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
step s5-update-5:
UPDATE deadlock_detection_test SET some_val = 5 WHERE user_id = 5;
step s3-update-2:
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 2;
step s2-update-3:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 3;
step s4-update-4:
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 4;
step s3-update-4:
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 4;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
f
step s6-update-6:
UPDATE deadlock_detection_test SET some_val = 6 WHERE user_id = 6;
step s4-update-6:
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 6;
<waiting ...>
step s1-update-5:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 5;
<waiting ...>
step s5-update-1:
UPDATE deadlock_detection_test SET some_val = 5 WHERE user_id = 1;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s1-update-5: <... completed>
step s5-update-1: <... completed>
error in steps deadlock-checker-call s1-update-5 s5-update-1: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s1-finish:
COMMIT;
step s5-finish:
COMMIT;
step s6-finish:
COMMIT;
step s4-update-6: <... completed>
step s4-finish:
COMMIT;
step s3-update-4: <... completed>
step s3-finish:
COMMIT;
step s2-finish:
COMMIT;
starting permutation: s1-begin s2-begin s3-begin s4-begin s5-begin s6-begin s6-update-6 s5-update-5 s5-update-6 s4-update-4 s1-update-4 s4-update-5 deadlock-checker-call s2-update-3 s3-update-2 s2-update-2 s3-update-3 deadlock-checker-call s6-finish s5-finish s4-finish s1-finish s3-finish s2-finish
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s3-begin:
BEGIN;
step s4-begin:
BEGIN;
step s5-begin:
BEGIN;
step s6-begin:
BEGIN;
step s6-update-6:
UPDATE deadlock_detection_test SET some_val = 6 WHERE user_id = 6;
step s5-update-5:
UPDATE deadlock_detection_test SET some_val = 5 WHERE user_id = 5;
step s5-update-6:
UPDATE deadlock_detection_test SET some_val = 5 WHERE user_id = 6;
<waiting ...>
step s4-update-4:
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 4;
step s1-update-4:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 4;
<waiting ...>
step s4-update-5:
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 5;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
f
step s2-update-3:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 3;
step s3-update-2:
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 2;
step s2-update-2:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2;
<waiting ...>
step s3-update-3:
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 3;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s2-update-2: <... completed>
step s3-update-3: <... completed>
error in steps deadlock-checker-call s2-update-2 s3-update-3: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s6-finish:
COMMIT;
step s5-update-6: <... completed>
step s5-finish:
COMMIT;
step s4-update-5: <... completed>
step s4-finish:
COMMIT;
step s1-update-4: <... completed>
step s1-finish:
COMMIT;
step s3-finish:
COMMIT;
step s2-finish:
COMMIT;
starting permutation: s1-begin s2-begin s3-begin s4-begin s5-begin s6-begin s5-update-5 s3-update-2 s2-update-2 s4-update-4 s3-update-4 s4-update-5 s1-update-4 deadlock-checker-call s6-update-6 s5-update-6 s6-update-5 deadlock-checker-call s5-finish s6-finish s4-finish s3-finish s1-finish s2-finish
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s3-begin:
BEGIN;
step s4-begin:
BEGIN;
step s5-begin:
BEGIN;
step s6-begin:
BEGIN;
step s5-update-5:
UPDATE deadlock_detection_test SET some_val = 5 WHERE user_id = 5;
step s3-update-2:
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 2;
step s2-update-2:
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2;
<waiting ...>
step s4-update-4:
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 4;
step s3-update-4:
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 4;
<waiting ...>
step s4-update-5:
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 5;
<waiting ...>
step s1-update-4:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 4;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
f
step s6-update-6:
UPDATE deadlock_detection_test SET some_val = 6 WHERE user_id = 6;
step s5-update-6:
UPDATE deadlock_detection_test SET some_val = 5 WHERE user_id = 6;
<waiting ...>
step s6-update-5:
UPDATE deadlock_detection_test SET some_val = 6 WHERE user_id = 5;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s5-update-6: <... completed>
step s6-update-5: <... completed>
error in steps deadlock-checker-call s5-update-6 s6-update-5: ERROR: canceling the transaction since it has involved in a distributed deadlock
step s5-finish:
COMMIT;
step s4-update-5: <... completed>
step s6-finish:
COMMIT;
step s4-finish:
COMMIT;
step s3-update-4: <... completed>
step s3-finish:
COMMIT;
step s2-update-2: <... completed>
step s1-update-4: <... completed>
step s1-finish:
COMMIT;
step s2-finish:
COMMIT;

View File

@ -123,6 +123,7 @@ ALTER EXTENSION citus UPDATE TO '7.0-10';
ALTER EXTENSION citus UPDATE TO '7.0-11';
ALTER EXTENSION citus UPDATE TO '7.0-12';
ALTER EXTENSION citus UPDATE TO '7.0-13';
ALTER EXTENSION citus UPDATE TO '7.0-14';
-- show running version
SHOW citus.version;
citus.version

View File

@ -145,18 +145,18 @@ SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id;
8 | 5 | Douglas Engelbart | 5 | Los Alamos
(1 row)
-- but not the other way around (would require expanding xact participants)...
-- and the other way around is also allowed
BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs');
INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
ERROR: no transaction participant matches localhost:57638
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
COMMIT;
-- unless we disable deadlock prevention
-- we should be able to expand the transaction participants
BEGIN;
SET citus.enable_deadlock_prevention TO off;
INSERT INTO labs VALUES (6, 'Bell Labs');
INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200001"
DETAIL: Key (lab_id, name)=(6, Leslie Lamport) already exists.
CONTEXT: while executing command on localhost:57638
ABORT;
-- SELECTs may occur after a modification: First check that selecting
-- from the modified node works.
@ -165,7 +165,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
SELECT count(*) FROM researchers WHERE lab_id = 6;
count
-------
0
1
(1 row)
ABORT;
@ -181,7 +181,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
SELECT count(*) FROM researchers WHERE lab_id = 6;
count
-------
0
1
(1 row)
ABORT;
@ -204,9 +204,10 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::
(2 rows)
SELECT * FROM labs WHERE id = 6;
id | name
----+------
(0 rows)
id | name
----+-----------
6 | Bell Labs
(1 row)
-- COPY can happen after single row INSERT
BEGIN;
@ -272,9 +273,10 @@ DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists.
COMMIT;
-- verify rollback
SELECT * FROM researchers WHERE lab_id = 6;
id | lab_id | name
----+--------+------
(0 rows)
id | lab_id | name
----+--------+----------------
9 | 6 | Leslie Lamport
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
@ -299,9 +301,10 @@ DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists.
COMMIT;
-- verify rollback
SELECT * FROM researchers WHERE lab_id = 6;
id | lab_id | name
----+--------+------
(0 rows)
id | lab_id | name
----+--------+----------------
9 | 6 | Leslie Lamport
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
@ -317,9 +320,10 @@ COMMIT;
SELECT * FROM researchers WHERE lab_id = 6;
id | lab_id | name
----+--------+----------------------
9 | 6 | Leslie Lamport
17 | 6 | 'Bjarne Stroustrup'
18 | 6 | 'Dennis Ritchie'
(2 rows)
(3 rows)
-- verify 2pc
SELECT count(*) FROM pg_dist_transaction;
@ -376,9 +380,10 @@ ERROR: could not commit transaction on any active node
SELECT * FROM researchers WHERE lab_id = 6;
id | lab_id | name
----+--------+----------------------
9 | 6 | Leslie Lamport
17 | 6 | 'Bjarne Stroustrup'
18 | 6 | 'Dennis Ritchie'
(2 rows)
(3 rows)
-- cleanup triggers and the function
SELECT * from run_command_on_placements('researchers', 'drop trigger reject_large_researcher_id on %s')
@ -429,7 +434,7 @@ ALTER TABLE labs ADD COLUMN motto text;
SELECT master_modify_multiple_shards('DELETE FROM labs');
master_modify_multiple_shards
-------------------------------
7
8
(1 row)
ALTER TABLE labs ADD COLUMN score float;
@ -909,16 +914,13 @@ SELECT create_distributed_table('hash_modifying_xacts', 'key');
BEGIN;
INSERT INTO hash_modifying_xacts VALUES (1, 1);
INSERT INTO reference_modifying_xacts VALUES (10, 10);
ERROR: no transaction participant matches localhost:57638
COMMIT;
-- it is allowed when turning off deadlock prevention
BEGIN;
SET citus.enable_deadlock_prevention TO off;
INSERT INTO hash_modifying_xacts VALUES (1, 1);
INSERT INTO reference_modifying_xacts VALUES (10, 10);
ABORT;
BEGIN;
SET citus.enable_deadlock_prevention TO off;
INSERT INTO hash_modifying_xacts VALUES (1, 1);
INSERT INTO hash_modifying_xacts VALUES (2, 2);
ABORT;

View File

@ -352,6 +352,8 @@ Custom Scan (Citus Router)
-> Index Scan using lineitem_mx_pkey_1220052 on lineitem_mx_1220052 lineitem_mx
Index Cond: (l_orderkey = 1)
Filter: (l_partkey = 0)
-- make the outputs more consistent
VACUUM ANALYZE lineitem_mx;
-- Test single-shard SELECT
EXPLAIN (COSTS FALSE)
SELECT l_quantity FROM lineitem_mx WHERE l_orderkey = 5;
@ -360,10 +362,8 @@ Custom Scan (Citus Router)
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Bitmap Heap Scan on lineitem_mx_1220055 lineitem_mx
Recheck Cond: (l_orderkey = 5)
-> Bitmap Index Scan on lineitem_mx_pkey_1220055
Index Cond: (l_orderkey = 5)
-> Index Scan using lineitem_mx_pkey_1220055 on lineitem_mx_1220055 lineitem_mx
Index Cond: (l_orderkey = 5)
SELECT true AS valid FROM explain_xml($$
SELECT l_quantity FROM lineitem_mx WHERE l_orderkey = 5$$);
t
@ -391,68 +391,68 @@ Aggregate
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_mx_1220052 lineitem_mx
Filter: (l_orderkey > 9030)
-> Index Only Scan using lineitem_mx_pkey_1220052 on lineitem_mx_1220052 lineitem_mx
Index Cond: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_mx_1220053 lineitem_mx
Filter: (l_orderkey > 9030)
-> Index Only Scan using lineitem_mx_pkey_1220053 on lineitem_mx_1220053 lineitem_mx
Index Cond: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_mx_1220054 lineitem_mx
Filter: (l_orderkey > 9030)
-> Index Only Scan using lineitem_mx_pkey_1220054 on lineitem_mx_1220054 lineitem_mx
Index Cond: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_mx_1220055 lineitem_mx
Filter: (l_orderkey > 9030)
-> Index Only Scan using lineitem_mx_pkey_1220055 on lineitem_mx_1220055 lineitem_mx
Index Cond: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_mx_1220056 lineitem_mx
Filter: (l_orderkey > 9030)
-> Index Only Scan using lineitem_mx_pkey_1220056 on lineitem_mx_1220056 lineitem_mx
Index Cond: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_mx_1220057 lineitem_mx
Filter: (l_orderkey > 9030)
-> Index Only Scan using lineitem_mx_pkey_1220057 on lineitem_mx_1220057 lineitem_mx
Index Cond: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_mx_1220058 lineitem_mx
Filter: (l_orderkey > 9030)
-> Index Only Scan using lineitem_mx_pkey_1220058 on lineitem_mx_1220058 lineitem_mx
Index Cond: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_mx_1220059 lineitem_mx
Filter: (l_orderkey > 9030)
-> Index Only Scan using lineitem_mx_pkey_1220059 on lineitem_mx_1220059 lineitem_mx
Index Cond: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_mx_1220060 lineitem_mx
Filter: (l_orderkey > 9030)
-> Index Only Scan using lineitem_mx_pkey_1220060 on lineitem_mx_1220060 lineitem_mx
Index Cond: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_mx_1220061 lineitem_mx
Filter: (l_orderkey > 9030)
-> Index Only Scan using lineitem_mx_pkey_1220061 on lineitem_mx_1220061 lineitem_mx
Index Cond: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_mx_1220062 lineitem_mx
Filter: (l_orderkey > 9030)
-> Index Only Scan using lineitem_mx_pkey_1220062 on lineitem_mx_1220062 lineitem_mx
Index Cond: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_mx_1220063 lineitem_mx
Filter: (l_orderkey > 9030)
-> Index Only Scan using lineitem_mx_pkey_1220063 on lineitem_mx_1220063 lineitem_mx
Index Cond: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_mx_1220064 lineitem_mx
Filter: (l_orderkey > 9030)
-> Index Only Scan using lineitem_mx_pkey_1220064 on lineitem_mx_1220064 lineitem_mx
Index Cond: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
@ -461,13 +461,13 @@ Aggregate
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_mx_1220066 lineitem_mx
Filter: (l_orderkey > 9030)
-> Index Only Scan using lineitem_mx_pkey_1220066 on lineitem_mx_1220066 lineitem_mx
Index Cond: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_mx_1220067 lineitem_mx
Filter: (l_orderkey > 9030)
-> Index Only Scan using lineitem_mx_pkey_1220067 on lineitem_mx_1220067 lineitem_mx
Index Cond: (l_orderkey > 9030)
SELECT true AS valid FROM explain_xml($$
SELECT avg(l_linenumber) FROM lineitem_mx WHERE l_orderkey > 9030$$);
t
@ -486,8 +486,8 @@ Aggregate
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_mx_1220052 lineitem_mx
Filter: (l_orderkey > 9030)
-> Index Only Scan using lineitem_mx_pkey_1220052 on lineitem_mx_1220052 lineitem_mx
Index Cond: (l_orderkey > 9030)
-- Test re-partition join
SET citus.large_table_shard_count TO 1;
EXPLAIN (COSTS FALSE)

View File

@ -136,12 +136,10 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id;
8 | 5 | Douglas Engelbart | 5 | Los Alamos
(1 row)
-- but not the other way around (would require expanding xact participants)...
-- and the other way around is also allowed
BEGIN;
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
ERROR: no transaction participant matches localhost:57638
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
COMMIT;
-- have the same test on the other worker node
\c - - - :worker_2_port
@ -159,12 +157,10 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id;
8 | 5 | Douglas Engelbart | 5 | Los Alamos
(4 rows)
-- but not the other way around (would require expanding xact participants)...
-- and the other way around is also allowed
BEGIN;
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
ERROR: no transaction participant matches localhost:57638
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
COMMIT;
-- switch back to the worker node
\c - - - :worker_1_port
@ -175,7 +171,7 @@ INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
count
-------
0
2
(1 row)
ABORT;

View File

@ -798,6 +798,7 @@ SELECT et.* FROM recent_10_users JOIN events_table et USING(user_id) ORDER BY et
(10 rows)
RESET citus.subquery_pushdown;
VACUUM ANALYZE users_table;
-- explain tests
EXPLAIN (COSTS FALSE) SELECT user_id FROM recent_selected_users GROUP BY 1 ORDER BY 1;
QUERY PLAN

View File

@ -16,6 +16,7 @@ test: isolation_distributed_transaction_id isolation_progress_monitoring
test: isolation_dump_local_wait_edges isolation_dump_global_wait_edges
test: isolation_replace_wait_function
test: isolation_distributed_deadlock_detection
# creating a restore point briefly blocks all
# writes, run this test serially.

View File

@ -253,6 +253,17 @@ if ($followercluster)
push(@pgOptions, '-c', "wal_level=replica");
}
# disable automatic distributed deadlock detection during the isolation testing
# to make sure that we always get consistent test outputs. If we don't manually
# (i.e., calling a UDF) detect the deadlocks, some sessions that do not participate
# in the deadlock may interleave with the deadlock detection, which results in non-
# consistent test outputs.
if($isolationtester)
{
push(@pgOptions, '-c', "citus.log_distributed_deadlock_detection=on");
push(@pgOptions, '-c', "citus.distributed_deadlock_detection_factor=-1");
}
# Add externally added options last, so they overwrite the default ones above
for my $option (@userPgOptions)
{

View File

@ -0,0 +1,408 @@
setup
{
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
CREATE TABLE deadlock_detection_reference (user_id int UNIQUE, some_val int);
SELECT create_reference_table('deadlock_detection_reference');
CREATE TABLE deadlock_detection_test (user_id int UNIQUE, some_val int);
INSERT INTO deadlock_detection_test SELECT i, i FROM generate_series(1,7) i;
SELECT create_distributed_table('deadlock_detection_test', 'user_id');
CREATE TABLE local_deadlock_table (user_id int UNIQUE, some_val int);
CREATE TABLE deadlock_detection_test_rep_2 (user_id int UNIQUE, some_val int);
SET citus.shard_replication_factor = 2;
SELECT create_distributed_table('deadlock_detection_test_rep_2', 'user_id');
INSERT INTO deadlock_detection_test_rep_2 VALUES (1,1);
INSERT INTO deadlock_detection_test_rep_2 VALUES (2,2);
}
teardown
{
DROP TABLE deadlock_detection_test;
DROP TABLE local_deadlock_table;
DROP TABLE deadlock_detection_test_rep_2;
DROP TABLE deadlock_detection_reference;
SELECT citus.restore_isolation_tester_func();
SET citus.shard_replication_factor = 1;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-update-1"
{
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
}
step "s1-update-2"
{
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 2;
}
step "s1-update-3"
{
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 3;
}
step "s1-update-4"
{
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 4;
}
step "s1-update-5"
{
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 5;
}
step "s1-insert-dist-10"
{
INSERT INTO deadlock_detection_test VALUES (10, 10);
}
step "s1-insert-local-10"
{
INSERT INTO local_deadlock_table VALUES (10, 10);
}
step "s1-set-2pc"
{
set citus.multi_shard_commit_protocol TO '2pc';
}
step "s1-update-1-rep-2"
{
UPDATE deadlock_detection_test_rep_2 SET some_val = 1 WHERE user_id = 1;
}
step "s1-update-2-rep-2"
{
UPDATE deadlock_detection_test_rep_2 SET some_val = 1 WHERE user_id = 2;
}
step "s1-insert-ref-10"
{
INSERT INTO deadlock_detection_reference VALUES (10, 10);
}
step "s1-insert-ref-11"
{
INSERT INTO deadlock_detection_reference VALUES (11, 11);
}
step "s1-finish"
{
COMMIT;
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-update-1"
{
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 1;
}
step "s2-update-2"
{
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2;
}
step "s2-update-3"
{
UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 3;
}
step "s2-upsert-select-all"
{
INSERT INTO deadlock_detection_test SELECT * FROM deadlock_detection_test ON CONFLICT(user_id) DO UPDATE SET some_val = deadlock_detection_test.some_val + 5 RETURNING *;
}
step "s2-ddl"
{
ALTER TABLE deadlock_detection_test ADD COLUMN test_col INT;
}
step "s2-insert-dist-10"
{
INSERT INTO deadlock_detection_test VALUES (10, 10);
}
step "s2-insert-local-10"
{
INSERT INTO local_deadlock_table VALUES (10, 10);
}
step "s2-set-2pc"
{
set citus.multi_shard_commit_protocol TO '2pc';
}
step "s2-update-1-rep-2"
{
UPDATE deadlock_detection_test_rep_2 SET some_val = 1 WHERE user_id = 1;
}
step "s2-update-2-rep-2"
{
UPDATE deadlock_detection_test_rep_2 SET some_val = 1 WHERE user_id = 2;
}
step "s2-insert-ref-10"
{
INSERT INTO deadlock_detection_reference VALUES (10, 10);
}
step "s2-insert-ref-11"
{
INSERT INTO deadlock_detection_reference VALUES (11, 11);
}
step "s2-finish"
{
COMMIT;
}
session "s3"
step "s3-begin"
{
BEGIN;
}
step "s3-update-1"
{
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 1;
}
step "s3-update-2"
{
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 2;
}
step "s3-update-3"
{
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 3;
}
step "s3-update-4"
{
UPDATE deadlock_detection_test SET some_val = 3 WHERE user_id = 4;
}
step "s3-finish"
{
COMMIT;
}
session "s4"
step "s4-begin"
{
BEGIN;
}
step "s4-update-1"
{
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 1;
}
step "s4-update-2"
{
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 2;
}
step "s4-update-3"
{
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 3;
}
step "s4-update-4"
{
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 4;
}
step "s4-update-5"
{
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 5;
}
step "s4-update-6"
{
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 6;
}
step "s4-update-7"
{
UPDATE deadlock_detection_test SET some_val = 4 WHERE user_id = 7;
}
step "s4-finish"
{
COMMIT;
}
session "s5"
step "s5-begin"
{
BEGIN;
}
step "s5-update-1"
{
UPDATE deadlock_detection_test SET some_val = 5 WHERE user_id = 1;
}
step "s5-update-2"
{
UPDATE deadlock_detection_test SET some_val = 5 WHERE user_id = 2;
}
step "s5-update-3"
{
UPDATE deadlock_detection_test SET some_val = 5 WHERE user_id = 3;
}
step "s5-update-4"
{
UPDATE deadlock_detection_test SET some_val = 5 WHERE user_id = 4;
}
step "s5-update-5"
{
UPDATE deadlock_detection_test SET some_val = 5 WHERE user_id = 5;
}
step "s5-update-6"
{
UPDATE deadlock_detection_test SET some_val = 5 WHERE user_id = 6;
}
step "s5-update-7"
{
UPDATE deadlock_detection_test SET some_val = 5 WHERE user_id = 7;
}
step "s5-finish"
{
COMMIT;
}
session "s6"
step "s6-begin"
{
BEGIN;
}
step "s6-update-1"
{
UPDATE deadlock_detection_test SET some_val = 6 WHERE user_id = 1;
}
step "s6-update-2"
{
UPDATE deadlock_detection_test SET some_val = 6 WHERE user_id = 2;
}
step "s6-update-3"
{
UPDATE deadlock_detection_test SET some_val = 6 WHERE user_id = 3;
}
step "s6-update-4"
{
UPDATE deadlock_detection_test SET some_val = 6 WHERE user_id = 4;
}
step "s6-update-5"
{
UPDATE deadlock_detection_test SET some_val = 6 WHERE user_id = 5;
}
step "s6-update-6"
{
UPDATE deadlock_detection_test SET some_val = 6 WHERE user_id = 6;
}
step "s6-update-7"
{
UPDATE deadlock_detection_test SET some_val = 6 WHERE user_id = 7;
}
step "s6-finish"
{
COMMIT;
}
# we disable the deamon during the regression tests in order to get consistent results
# thus we manually issue the deadlock detection
session "deadlock-checker"
# we issue the checker not only when there are deadlocks to ensure that we never cancel
# backend inappropriately
step "deadlock-checker-call"
{
SELECT check_distributed_deadlocks();
}
# simplest case, loop with two nodes
permutation "s1-begin" "s2-begin" "s1-update-1" "s2-update-2" "s2-update-1" "deadlock-checker-call" "s1-update-2" "deadlock-checker-call" "s1-finish" "s2-finish"
# simplest case with replication factor 2
permutation "s1-begin" "s2-begin" "s1-update-1-rep-2" "s2-update-2-rep-2" "s2-update-1-rep-2" "deadlock-checker-call" "s1-update-2-rep-2" "deadlock-checker-call" "s1-finish" "s2-finish"
# simplest case with 2pc enabled
permutation "s1-begin" "s2-begin" "s1-set-2pc" "s2-set-2pc" "s1-update-1" "s2-update-2" "s2-update-1" "deadlock-checker-call" "s1-update-2" "deadlock-checker-call" "s1-finish" "s2-finish"
# simplest case with multi-shard query is cancelled
permutation "s1-begin" "s2-begin" "s1-update-1" "s2-update-2" "s1-update-2" "deadlock-checker-call" "s2-upsert-select-all" "deadlock-checker-call" "s1-finish" "s2-finish"
# simplest case with DDL is cancelled
permutation "s1-begin" "s2-begin" "s1-update-1" "s2-update-2" "s1-update-2" "deadlock-checker-call" "s2-ddl" "deadlock-checker-call" "s1-finish" "s2-finish"
# daedlock with local table
permutation "s1-begin" "s2-begin" "s1-insert-dist-10" "s2-insert-local-10" "s2-insert-dist-10" "s1-insert-local-10" "deadlock-checker-call" "s1-finish" "s2-finish"
# daedlock with reference tables only
permutation "s1-begin" "s2-begin" "s2-insert-ref-10" "s1-insert-ref-11" "s2-insert-ref-11" "s1-insert-ref-10" "deadlock-checker-call" "s1-finish" "s2-finish"
# deadlock with referecen + distributed tables
permutation "s1-begin" "s2-begin" "s2-insert-ref-10" "s1-update-1" "deadlock-checker-call" "s2-update-1" "s1-insert-ref-10" "deadlock-checker-call" "s1-finish" "s2-finish"
# slightly more complex case, loop with three nodes
permutation "s1-begin" "s2-begin" "s3-begin" "s1-update-1" "s2-update-2" "s3-update-3" "deadlock-checker-call" "s1-update-2" "s2-update-3" "s3-update-1" "deadlock-checker-call" "s3-finish" "s2-finish" "s1-finish"
# similar to the above (i.e., 3 nodes), but the cycle starts from the second node
permutation "s1-begin" "s2-begin" "s3-begin" "s2-update-1" "s1-update-1" "s2-update-2" "s3-update-3" "s3-update-2" "deadlock-checker-call" "s2-update-3" "deadlock-checker-call" "s3-finish" "s2-finish" "s1-finish"
# not connected graph
permutation "s1-begin" "s2-begin" "s3-begin" "s4-begin" "s1-update-1" "s2-update-2" "s3-update-3" "s3-update-2" "deadlock-checker-call" "s4-update-4" "s2-update-3" "deadlock-checker-call" "s3-finish" "s2-finish" "s1-finish" "s4-finish"
# still a not connected graph, but each smaller graph contains dependencies, one of which is a distributed deadlock
permutation "s1-begin" "s2-begin" "s3-begin" "s4-begin" "s4-update-1" "s1-update-1" "deadlock-checker-call" "s2-update-2" "s3-update-3" "s2-update-3" "s3-update-2" "deadlock-checker-call" "s3-finish" "s2-finish" "s4-finish" "s1-finish"
# multiple deadlocks on a not connected graph
permutation "s1-begin" "s2-begin" "s3-begin" "s4-begin" "s1-update-1" "s4-update-4" "s2-update-2" "s3-update-3" "s3-update-2" "s4-update-1" "s1-update-4" "deadlock-checker-call" "s1-finish" "s4-finish" "s2-update-3" "deadlock-checker-call" "s2-finish" "s3-finish"
# a larger graph where the first node is in the distributed deadlock
permutation "s1-begin" "s2-begin" "s3-begin" "s4-begin" "s5-begin" "s6-begin" "s1-update-1" "s5-update-5" "s3-update-2" "s2-update-3" "s4-update-4" "s3-update-4" "deadlock-checker-call" "s6-update-6" "s4-update-6" "s1-update-5" "s5-update-1" "deadlock-checker-call" "s1-finish" "s5-finish" "s6-finish" "s4-finish" "s3-finish" "s2-finish"
# a larger graph where the deadlock starts from a middle node
permutation "s1-begin" "s2-begin" "s3-begin" "s4-begin" "s5-begin" "s6-begin" "s6-update-6" "s5-update-5" "s5-update-6" "s4-update-4" "s1-update-4" "s4-update-5" "deadlock-checker-call" "s2-update-3" "s3-update-2" "s2-update-2" "s3-update-3" "deadlock-checker-call" "s6-finish" "s5-finish" "s4-finish" "s1-finish" "s3-finish" "s2-finish"
# a larger graph where the deadlock starts from the last node
permutation "s1-begin" "s2-begin" "s3-begin" "s4-begin" "s5-begin" "s6-begin" "s5-update-5" "s3-update-2" "s2-update-2" "s4-update-4" "s3-update-4" "s4-update-5" "s1-update-4" "deadlock-checker-call" "s6-update-6" "s5-update-6" "s6-update-5" "deadlock-checker-call" "s5-finish" "s6-finish" "s4-finish" "s3-finish" "s1-finish" "s2-finish"

View File

@ -123,6 +123,7 @@ ALTER EXTENSION citus UPDATE TO '7.0-10';
ALTER EXTENSION citus UPDATE TO '7.0-11';
ALTER EXTENSION citus UPDATE TO '7.0-12';
ALTER EXTENSION citus UPDATE TO '7.0-13';
ALTER EXTENSION citus UPDATE TO '7.0-14';
-- show running version
SHOW citus.version;

View File

@ -113,15 +113,14 @@ COMMIT;
SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id;
-- but not the other way around (would require expanding xact participants)...
-- and the other way around is also allowed
BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs');
INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
COMMIT;
-- unless we disable deadlock prevention
-- we should be able to expand the transaction participants
BEGIN;
SET citus.enable_deadlock_prevention TO off;
INSERT INTO labs VALUES (6, 'Bell Labs');
INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
ABORT;
@ -703,13 +702,11 @@ COMMIT;
-- it is allowed when turning off deadlock prevention
BEGIN;
SET citus.enable_deadlock_prevention TO off;
INSERT INTO hash_modifying_xacts VALUES (1, 1);
INSERT INTO reference_modifying_xacts VALUES (10, 10);
ABORT;
BEGIN;
SET citus.enable_deadlock_prevention TO off;
INSERT INTO hash_modifying_xacts VALUES (1, 1);
INSERT INTO hash_modifying_xacts VALUES (2, 2);
ABORT;

View File

@ -127,6 +127,9 @@ EXPLAIN (COSTS FALSE)
DELETE FROM lineitem_mx
WHERE l_orderkey = 1 AND l_partkey = 0;
-- make the outputs more consistent
VACUUM ANALYZE lineitem_mx;
-- Test single-shard SELECT
EXPLAIN (COSTS FALSE)
SELECT l_quantity FROM lineitem_mx WHERE l_orderkey = 5;

View File

@ -118,7 +118,7 @@ COMMIT;
SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id;
-- but not the other way around (would require expanding xact participants)...
-- and the other way around is also allowed
BEGIN;
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
@ -134,7 +134,7 @@ COMMIT;
SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id;
-- but not the other way around (would require expanding xact participants)...
-- and the other way around is also allowed
BEGIN;
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');

View File

@ -375,6 +375,8 @@ SELECT et.* FROM recent_10_users JOIN events_table et USING(user_id) ORDER BY et
RESET citus.subquery_pushdown;
VACUUM ANALYZE users_table;
-- explain tests
EXPLAIN (COSTS FALSE) SELECT user_id FROM recent_selected_users GROUP BY 1 ORDER BY 1;