mirror of https://github.com/citusdata/citus.git
Merge pull request #5734 from citusdata/remove_citus_backend
Drop support for CitusInitiatedBackendpull/5745/head
commit
dda47dae7d
|
@ -87,10 +87,11 @@ CitusSignalBackend(uint64 globalPID, uint64 timeout, int sig)
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int nodeId = ExtractNodeIdFromGlobalPID(globalPID);
|
bool missingOk = false;
|
||||||
|
int nodeId = ExtractNodeIdFromGlobalPID(globalPID, missingOk);
|
||||||
int processId = ExtractProcessIdFromGlobalPID(globalPID);
|
int processId = ExtractProcessIdFromGlobalPID(globalPID);
|
||||||
|
|
||||||
WorkerNode *workerNode = FindNodeWithNodeId(nodeId);
|
WorkerNode *workerNode = FindNodeWithNodeId(nodeId, missingOk);
|
||||||
|
|
||||||
StringInfo cancelQuery = makeStringInfo();
|
StringInfo cancelQuery = makeStringInfo();
|
||||||
|
|
||||||
|
|
|
@ -150,8 +150,6 @@ RegisterCitusCustomScanMethods(void)
|
||||||
static void
|
static void
|
||||||
CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
|
CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
|
||||||
{
|
{
|
||||||
MarkCitusInitiatedCoordinatorBackend();
|
|
||||||
|
|
||||||
CitusScanState *scanState = (CitusScanState *) node;
|
CitusScanState *scanState = (CitusScanState *) node;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -1541,7 +1541,7 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort)
|
||||||
* If the node cannot be found this functions errors.
|
* If the node cannot be found this functions errors.
|
||||||
*/
|
*/
|
||||||
WorkerNode *
|
WorkerNode *
|
||||||
FindNodeWithNodeId(int nodeId)
|
FindNodeWithNodeId(int nodeId, bool missingOk)
|
||||||
{
|
{
|
||||||
List *workerList = ActiveReadableNodeList();
|
List *workerList = ActiveReadableNodeList();
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
|
@ -1555,7 +1555,10 @@ FindNodeWithNodeId(int nodeId)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* there isn't any node with nodeId in pg_dist_node */
|
/* there isn't any node with nodeId in pg_dist_node */
|
||||||
|
if (!missingOk)
|
||||||
|
{
|
||||||
elog(ERROR, "worker node with node id %d could not be found", nodeId);
|
elog(ERROR, "worker node with node id %d could not be found", nodeId);
|
||||||
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -152,9 +152,6 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS)
|
||||||
MyBackendData->transactionId.timestamp = timestamp;
|
MyBackendData->transactionId.timestamp = timestamp;
|
||||||
MyBackendData->transactionId.transactionOriginator = false;
|
MyBackendData->transactionId.transactionOriginator = false;
|
||||||
|
|
||||||
MyBackendData->citusBackend.initiatorNodeIdentifier =
|
|
||||||
MyBackendData->transactionId.initiatorNodeIdentifier;
|
|
||||||
|
|
||||||
SpinLockRelease(&MyBackendData->mutex);
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
|
@ -385,7 +382,6 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto
|
||||||
&backendManagementShmemData->backends[backendIndex];
|
&backendManagementShmemData->backends[backendIndex];
|
||||||
|
|
||||||
/* to work on data after releasing g spinlock to protect against errors */
|
/* to work on data after releasing g spinlock to protect against errors */
|
||||||
int initiatorNodeIdentifier = -1;
|
|
||||||
uint64 transactionNumber = 0;
|
uint64 transactionNumber = 0;
|
||||||
|
|
||||||
SpinLockAcquire(¤tBackend->mutex);
|
SpinLockAcquire(¤tBackend->mutex);
|
||||||
|
@ -408,7 +404,6 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto
|
||||||
|
|
||||||
Oid databaseId = currentBackend->databaseId;
|
Oid databaseId = currentBackend->databaseId;
|
||||||
int backendPid = ProcGlobal->allProcs[backendIndex].pid;
|
int backendPid = ProcGlobal->allProcs[backendIndex].pid;
|
||||||
initiatorNodeIdentifier = currentBackend->citusBackend.initiatorNodeIdentifier;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We prefer to use worker_query instead of distributedCommandOriginator in
|
* We prefer to use worker_query instead of distributedCommandOriginator in
|
||||||
|
@ -423,9 +418,12 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto
|
||||||
|
|
||||||
SpinLockRelease(¤tBackend->mutex);
|
SpinLockRelease(¤tBackend->mutex);
|
||||||
|
|
||||||
|
bool missingOk = true;
|
||||||
|
int nodeId = ExtractNodeIdFromGlobalPID(currentBackend->globalPID, missingOk);
|
||||||
|
|
||||||
values[0] = ObjectIdGetDatum(databaseId);
|
values[0] = ObjectIdGetDatum(databaseId);
|
||||||
values[1] = Int32GetDatum(backendPid);
|
values[1] = Int32GetDatum(backendPid);
|
||||||
values[2] = Int32GetDatum(initiatorNodeIdentifier);
|
values[2] = Int32GetDatum(nodeId);
|
||||||
values[3] = !distributedCommandOriginator;
|
values[3] = !distributedCommandOriginator;
|
||||||
values[4] = UInt64GetDatum(transactionNumber);
|
values[4] = UInt64GetDatum(transactionNumber);
|
||||||
values[5] = TimestampTzGetDatum(transactionIdTimestamp);
|
values[5] = TimestampTzGetDatum(transactionIdTimestamp);
|
||||||
|
@ -520,7 +518,6 @@ BackendManagementShmemInit(void)
|
||||||
{
|
{
|
||||||
BackendData *backendData =
|
BackendData *backendData =
|
||||||
&backendManagementShmemData->backends[backendIndex];
|
&backendManagementShmemData->backends[backendIndex];
|
||||||
backendData->citusBackend.initiatorNodeIdentifier = -1;
|
|
||||||
SpinLockInit(&backendData->mutex);
|
SpinLockInit(&backendData->mutex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -660,8 +657,6 @@ UnSetDistributedTransactionId(void)
|
||||||
MyBackendData->transactionId.transactionNumber = 0;
|
MyBackendData->transactionId.transactionNumber = 0;
|
||||||
MyBackendData->transactionId.timestamp = 0;
|
MyBackendData->transactionId.timestamp = 0;
|
||||||
|
|
||||||
MyBackendData->citusBackend.initiatorNodeIdentifier = -1;
|
|
||||||
|
|
||||||
SpinLockRelease(&MyBackendData->mutex);
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -772,29 +767,6 @@ AssignDistributedTransactionId(void)
|
||||||
MyBackendData->transactionId.transactionNumber = nextTransactionNumber;
|
MyBackendData->transactionId.transactionNumber = nextTransactionNumber;
|
||||||
MyBackendData->transactionId.timestamp = currentTimestamp;
|
MyBackendData->transactionId.timestamp = currentTimestamp;
|
||||||
|
|
||||||
MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId;
|
|
||||||
|
|
||||||
SpinLockRelease(&MyBackendData->mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* MarkCitusInitiatedCoordinatorBackend sets that coordinator backend is
|
|
||||||
* initiated by Citus.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
MarkCitusInitiatedCoordinatorBackend(void)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* GetLocalGroupId may throw exception which can cause leaving spin lock
|
|
||||||
* unreleased. Calling GetLocalGroupId function before the lock to avoid this.
|
|
||||||
*/
|
|
||||||
int32 localGroupId = GetLocalGroupId();
|
|
||||||
|
|
||||||
SpinLockAcquire(&MyBackendData->mutex);
|
|
||||||
|
|
||||||
MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId;
|
|
||||||
|
|
||||||
SpinLockRelease(&MyBackendData->mutex);
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -926,11 +898,12 @@ ExtractGlobalPID(char *applicationName)
|
||||||
* gives us the node id.
|
* gives us the node id.
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
ExtractNodeIdFromGlobalPID(uint64 globalPID)
|
ExtractNodeIdFromGlobalPID(uint64 globalPID, bool missingOk)
|
||||||
{
|
{
|
||||||
int nodeId = (int) (globalPID / GLOBAL_PID_NODE_ID_MULTIPLIER);
|
int nodeId = (int) (globalPID / GLOBAL_PID_NODE_ID_MULTIPLIER);
|
||||||
|
|
||||||
if (nodeId == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA)
|
if (!missingOk &&
|
||||||
|
nodeId == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("originator node of the query with the global pid "
|
ereport(ERROR, (errmsg("originator node of the query with the global pid "
|
||||||
"%lu is not in Citus' metadata", globalPID),
|
"%lu is not in Citus' metadata", globalPID),
|
||||||
|
|
|
@ -538,33 +538,35 @@ ReplaceInitiatorNodeIdentifier(int initiator_node_identifier,
|
||||||
* transaction. However, we cannot know which node has initiated
|
* transaction. However, we cannot know which node has initiated
|
||||||
* the worker query.
|
* the worker query.
|
||||||
*/
|
*/
|
||||||
if (initiator_node_identifier > 0)
|
if (initiator_node_identifier == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA &&
|
||||||
{
|
IsCoordinator())
|
||||||
bool nodeExists = false;
|
|
||||||
|
|
||||||
initiatorWorkerNode = PrimaryNodeForGroup(initiator_node_identifier, &nodeExists);
|
|
||||||
|
|
||||||
/* a query should run on an existing node */
|
|
||||||
Assert(nodeExists);
|
|
||||||
if (initiatorWorkerNode == NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("no primary node found for group %d",
|
|
||||||
initiator_node_identifier)));
|
|
||||||
}
|
|
||||||
citusDistStat->master_query_host_name =
|
|
||||||
cstring_to_text(initiatorWorkerNode->workerName);
|
|
||||||
citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort;
|
|
||||||
}
|
|
||||||
else if (initiator_node_identifier == 0 && IsCoordinator())
|
|
||||||
{
|
{
|
||||||
citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name);
|
citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name);
|
||||||
citusDistStat->master_query_host_port = PostPortNumber;
|
citusDistStat->master_query_host_port = PostPortNumber;
|
||||||
}
|
}
|
||||||
else if (initiator_node_identifier == 0)
|
else if (initiator_node_identifier == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA)
|
||||||
{
|
{
|
||||||
citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name);
|
citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name);
|
||||||
citusDistStat->master_query_host_port = 0;
|
citusDistStat->master_query_host_port = 0;
|
||||||
}
|
}
|
||||||
|
else if (initiator_node_identifier > 0)
|
||||||
|
{
|
||||||
|
/* a query should run on an existing node, but lets be defensive */
|
||||||
|
bool missingOk = true;
|
||||||
|
initiatorWorkerNode = FindNodeWithNodeId(initiator_node_identifier, missingOk);
|
||||||
|
|
||||||
|
if (initiatorWorkerNode)
|
||||||
|
{
|
||||||
|
citusDistStat->master_query_host_name =
|
||||||
|
cstring_to_text(initiatorWorkerNode->workerName);
|
||||||
|
citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
citusDistStat->master_query_host_name = NULL;
|
||||||
|
citusDistStat->master_query_host_port = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
citusDistStat->master_query_host_name = NULL;
|
citusDistStat->master_query_host_name = NULL;
|
||||||
|
|
|
@ -22,16 +22,6 @@
|
||||||
#include "storage/s_lock.h"
|
#include "storage/s_lock.h"
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CitusInitiatedBackend keeps some information about the backends that are
|
|
||||||
* initiated by Citus.
|
|
||||||
*/
|
|
||||||
typedef struct CitusInitiatedBackend
|
|
||||||
{
|
|
||||||
int initiatorNodeIdentifier;
|
|
||||||
} CitusInitiatedBackend;
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Each backend's active distributed transaction information is tracked via
|
* Each backend's active distributed transaction information is tracked via
|
||||||
* BackendData in shared memory.
|
* BackendData in shared memory.
|
||||||
|
@ -51,7 +41,6 @@ typedef struct BackendData
|
||||||
bool cancelledDueToDeadlock;
|
bool cancelledDueToDeadlock;
|
||||||
uint64 globalPID;
|
uint64 globalPID;
|
||||||
bool distributedCommandOriginator;
|
bool distributedCommandOriginator;
|
||||||
CitusInitiatedBackend citusBackend;
|
|
||||||
DistributedTransactionId transactionId;
|
DistributedTransactionId transactionId;
|
||||||
} BackendData;
|
} BackendData;
|
||||||
|
|
||||||
|
@ -64,13 +53,12 @@ extern void UnlockBackendSharedMemory(void);
|
||||||
extern void UnSetDistributedTransactionId(void);
|
extern void UnSetDistributedTransactionId(void);
|
||||||
extern void UnSetGlobalPID(void);
|
extern void UnSetGlobalPID(void);
|
||||||
extern void AssignDistributedTransactionId(void);
|
extern void AssignDistributedTransactionId(void);
|
||||||
extern void MarkCitusInitiatedCoordinatorBackend(void);
|
|
||||||
extern void AssignGlobalPID(void);
|
extern void AssignGlobalPID(void);
|
||||||
extern uint64 GetGlobalPID(void);
|
extern uint64 GetGlobalPID(void);
|
||||||
extern void OverrideBackendDataDistributedCommandOriginator(bool
|
extern void OverrideBackendDataDistributedCommandOriginator(bool
|
||||||
distributedCommandOriginator);
|
distributedCommandOriginator);
|
||||||
extern uint64 ExtractGlobalPID(char *applicationName);
|
extern uint64 ExtractGlobalPID(char *applicationName);
|
||||||
extern int ExtractNodeIdFromGlobalPID(uint64 globalPID);
|
extern int ExtractNodeIdFromGlobalPID(uint64 globalPID, bool missingOk);
|
||||||
extern int ExtractProcessIdFromGlobalPID(uint64 globalPID);
|
extern int ExtractProcessIdFromGlobalPID(uint64 globalPID);
|
||||||
extern void GetBackendDataForProc(PGPROC *proc, BackendData *result);
|
extern void GetBackendDataForProc(PGPROC *proc, BackendData *result);
|
||||||
extern void CancelTransactionDueToDeadlock(PGPROC *proc);
|
extern void CancelTransactionDueToDeadlock(PGPROC *proc);
|
||||||
|
|
|
@ -86,7 +86,7 @@ extern List * ActiveReadableNodeList(void);
|
||||||
extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort);
|
extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort);
|
||||||
extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort);
|
extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort);
|
||||||
extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort);
|
extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort);
|
||||||
extern WorkerNode * FindNodeWithNodeId(int nodeId);
|
extern WorkerNode * FindNodeWithNodeId(int nodeId, bool missingOk);
|
||||||
extern List * ReadDistNode(bool includeNodesFromOtherClusters);
|
extern List * ReadDistNode(bool includeNodesFromOtherClusters);
|
||||||
extern void EnsureCoordinator(void);
|
extern void EnsureCoordinator(void);
|
||||||
extern void InsertCoordinatorIfClusterEmpty(void);
|
extern void InsertCoordinatorIfClusterEmpty(void);
|
||||||
|
|
|
@ -89,11 +89,11 @@ step s1-insert:
|
||||||
step s1-verify-current-xact-is-on-worker:
|
step s1-verify-current-xact-is-on-worker:
|
||||||
SELECT
|
SELECT
|
||||||
remote.nodeport,
|
remote.nodeport,
|
||||||
remote.result = row(xact.initiator_node_identifier, xact.transaction_number)::text AS xact_exists
|
remote.result = row(xact.transaction_number)::text AS xact_exists
|
||||||
FROM
|
FROM
|
||||||
get_current_transaction_id() as xact,
|
get_current_transaction_id() as xact,
|
||||||
run_command_on_workers($$
|
run_command_on_workers($$
|
||||||
SELECT row(initiator_node_identifier, transaction_number)
|
SELECT row(transaction_number)
|
||||||
FROM get_all_active_transactions()
|
FROM get_all_active_transactions()
|
||||||
WHERE transaction_number != 0;
|
WHERE transaction_number != 0;
|
||||||
$$) as remote
|
$$) as remote
|
||||||
|
|
|
@ -33,7 +33,7 @@ blocked_statement |current_statement_in_blockin
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|
|
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|
|
||||||
UPDATE ref_table SET value_1 = 15;
|
UPDATE ref_table SET value_1 = 15;
|
||||||
|localhost |coordinator_host | 57638| 57636
|
|coordinator_host |coordinator_host | 57636| 57636
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
step s1-commit:
|
step s1-commit:
|
||||||
|
@ -116,7 +116,7 @@ step s3-select-distributed-waiting-queries:
|
||||||
|
|
||||||
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
|
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|localhost |localhost | 57638| 57637
|
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|coordinator_host |coordinator_host | 57636| 57636
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
step s1-commit-worker:
|
step s1-commit-worker:
|
||||||
|
@ -212,7 +212,7 @@ step s3-select-distributed-waiting-queries:
|
||||||
|
|
||||||
blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
|
blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |localhost |localhost | 57638| 57637
|
UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |coordinator_host |coordinator_host | 57636| 57636
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
step s1-commit-worker:
|
step s1-commit-worker:
|
||||||
|
@ -308,7 +308,7 @@ step s3-select-distributed-waiting-queries:
|
||||||
|
|
||||||
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
|
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|DELETE FROM ref_table WHERE user_id = 1|localhost |localhost | 57638| 57637
|
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|DELETE FROM ref_table WHERE user_id = 1|coordinator_host |coordinator_host | 57636| 57636
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
step s1-commit-worker:
|
step s1-commit-worker:
|
||||||
|
@ -404,7 +404,7 @@ step s3-select-distributed-waiting-queries:
|
||||||
|
|
||||||
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
|
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|INSERT INTO ref_table VALUES(8,81),(9,91)|localhost |localhost | 57638| 57637
|
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|INSERT INTO ref_table VALUES(8,81),(9,91)|coordinator_host |coordinator_host | 57636| 57636
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
step s1-commit-worker:
|
step s1-commit-worker:
|
||||||
|
@ -594,7 +594,7 @@ step s3-select-distributed-waiting-queries:
|
||||||
|
|
||||||
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
|
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|COPY ref_table FROM PROGRAM 'echo 10, 101 && echo 11, 111' WITH CSV|localhost |localhost | 57638| 57637
|
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|COPY ref_table FROM PROGRAM 'echo 10, 101 && echo 11, 111' WITH CSV|coordinator_host |coordinator_host | 57636| 57636
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
step s1-commit-worker:
|
step s1-commit-worker:
|
||||||
|
@ -878,7 +878,7 @@ step s3-select-distributed-waiting-queries:
|
||||||
|
|
||||||
blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
|
blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|SELECT * FROM ref_table FOR UPDATE |localhost |localhost | 57638| 57637
|
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|SELECT * FROM ref_table FOR UPDATE |coordinator_host |coordinator_host | 57636| 57636
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
step s1-commit-worker:
|
step s1-commit-worker:
|
||||||
|
@ -963,7 +963,7 @@ blocked_statement |current_s
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id);
|
ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id);
|
||||||
|INSERT INTO ref_table VALUES(8,81),(9,91)|coordinator_host |localhost | 57636| 57638
|
|INSERT INTO ref_table VALUES(8,81),(9,91)|coordinator_host |coordinator_host | 57636| 57636
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
step s2-commit-worker:
|
step s2-commit-worker:
|
||||||
|
@ -1073,7 +1073,7 @@ step s3-select-distributed-waiting-queries:
|
||||||
|
|
||||||
blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
|
blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |localhost |localhost | 57637| 57637
|
UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |coordinator_host |coordinator_host | 57636| 57636
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
step s1-commit-worker:
|
step s1-commit-worker:
|
||||||
|
@ -1161,7 +1161,7 @@ step s3-select-distributed-waiting-queries:
|
||||||
|
|
||||||
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
|
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|localhost |localhost | 57638| 57637
|
UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|coordinator_host |coordinator_host | 57636| 57636
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
step s1-commit-worker:
|
step s1-commit-worker:
|
||||||
|
@ -1225,7 +1225,7 @@ blocked_statement |current_statement_in_blockin
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|
|
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|
|
||||||
UPDATE ref_table SET value_1 = 15;
|
UPDATE ref_table SET value_1 = 15;
|
||||||
|localhost |coordinator_host | 57638| 57636
|
|coordinator_host |coordinator_host | 57636| 57636
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
step s1-commit:
|
step s1-commit:
|
||||||
|
|
|
@ -1,11 +1,19 @@
|
||||||
Parsed test spec with 3 sessions
|
Parsed test spec with 4 sessions
|
||||||
|
|
||||||
starting permutation: s1-begin s2-begin s1-update-dist-table s2-lock-ref-table-placement-on-coordinator s1-lock-ref-table-placement-on-coordinator s2-update-dist-table deadlock-checker-call s1-end s2-end
|
starting permutation: add-node s1-begin s2-begin s1-update-dist-table s2-lock-ref-table-placement-on-coordinator s1-lock-ref-table-placement-on-coordinator s2-update-dist-table deadlock-checker-call s1-end s2-end
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
step add-node:
|
||||||
|
SELECT 1 FROM master_add_node('localhost', 57636, groupid => 0);
|
||||||
|
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
||||||
|
@ -59,12 +67,20 @@ master_remove_node
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
|
||||||
starting permutation: s1-begin s2-begin s1-update-ref-table s2-sleep s2-view-dist s2-view-worker s2-end s1-end
|
starting permutation: add-node s1-begin s2-begin s1-update-ref-table s2-sleep s2-view-dist s2-view-worker s2-end s1-end
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
step add-node:
|
||||||
|
SELECT 1 FROM master_add_node('localhost', 57636, groupid => 0);
|
||||||
|
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
||||||
|
@ -83,7 +99,7 @@ pg_sleep
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
step s2-view-dist:
|
step s2-view-dist:
|
||||||
SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' ORDER BY query DESC;
|
SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' AND query NOT ILIKE '%add_node%' ORDER BY query DESC;
|
||||||
|
|
||||||
query |query_hostname |query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname
|
query |query_hostname |query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -92,7 +108,7 @@ query |query_hostname |query_hostport|distri
|
||||||
|coordinator_host| 57636| | 0|idle |Client |ClientRead|postgres|regression
|
|coordinator_host| 57636| | 0|idle |Client |ClientRead|postgres|regression
|
||||||
|
|
||||||
update ref_table set a = a + 1;
|
update ref_table set a = a + 1;
|
||||||
|coordinator_host| 57636|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression
|
|coordinator_host| 57636| | 0|idle in transaction|Client |ClientRead|postgres|regression
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
step s2-view-worker:
|
step s2-view-worker:
|
||||||
|
@ -102,13 +118,14 @@ step s2-view-worker:
|
||||||
WHERE query NOT ILIKE '%pg_prepared_xacts%' AND
|
WHERE query NOT ILIKE '%pg_prepared_xacts%' AND
|
||||||
query NOT ILIKE '%COMMIT%' AND
|
query NOT ILIKE '%COMMIT%' AND
|
||||||
query NOT ILIKE '%dump_local_%' AND
|
query NOT ILIKE '%dump_local_%' AND
|
||||||
query NOT ILIKE '%citus_internal_local_blocked_processes%'
|
query NOT ILIKE '%citus_internal_local_blocked_processes%' AND
|
||||||
|
query NOT ILIKE '%add_node%'
|
||||||
ORDER BY query, query_hostport DESC;
|
ORDER BY query, query_hostport DESC;
|
||||||
|
|
||||||
query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname
|
query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression
|
UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638| | 0|idle in transaction|Client |ClientRead|postgres|regression
|
||||||
UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression
|
UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637| | 0|idle in transaction|Client |ClientRead|postgres|regression
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
step s2-end:
|
step s2-end:
|
||||||
|
@ -123,12 +140,20 @@ master_remove_node
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
|
||||||
starting permutation: s1-begin s2-begin s1-update-ref-table s2-active-transactions s1-end s2-end
|
starting permutation: add-node s1-begin s2-begin s1-update-ref-table s2-active-transactions s1-end s2-end
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
step add-node:
|
||||||
|
SELECT 1 FROM master_add_node('localhost', 57636, groupid => 0);
|
||||||
|
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
||||||
|
|
|
@ -49,11 +49,11 @@ step "s1-verify-current-xact-is-on-worker"
|
||||||
{
|
{
|
||||||
SELECT
|
SELECT
|
||||||
remote.nodeport,
|
remote.nodeport,
|
||||||
remote.result = row(xact.initiator_node_identifier, xact.transaction_number)::text AS xact_exists
|
remote.result = row(xact.transaction_number)::text AS xact_exists
|
||||||
FROM
|
FROM
|
||||||
get_current_transaction_id() as xact,
|
get_current_transaction_id() as xact,
|
||||||
run_command_on_workers($$
|
run_command_on_workers($$
|
||||||
SELECT row(initiator_node_identifier, transaction_number)
|
SELECT row(transaction_number)
|
||||||
FROM get_all_active_transactions()
|
FROM get_all_active_transactions()
|
||||||
WHERE transaction_number != 0;
|
WHERE transaction_number != 0;
|
||||||
$$) as remote
|
$$) as remote
|
||||||
|
|
|
@ -29,7 +29,6 @@ setup
|
||||||
|
|
||||||
teardown
|
teardown
|
||||||
{
|
{
|
||||||
// drop all distributed tables
|
|
||||||
DROP TABLE IF EXISTS ref_table,
|
DROP TABLE IF EXISTS ref_table,
|
||||||
dist_table,
|
dist_table,
|
||||||
dist_partitioned_table,
|
dist_partitioned_table,
|
||||||
|
@ -39,7 +38,6 @@ teardown
|
||||||
new_ref_table;
|
new_ref_table;
|
||||||
|
|
||||||
|
|
||||||
// drop all distributed objects
|
|
||||||
DROP FUNCTION activate_node_snapshot();
|
DROP FUNCTION activate_node_snapshot();
|
||||||
DROP FUNCTION IF EXISTS squares(int);
|
DROP FUNCTION IF EXISTS squares(int);
|
||||||
DROP TYPE IF EXISTS my_type;
|
DROP TYPE IF EXISTS my_type;
|
||||||
|
|
|
@ -3,8 +3,6 @@ setup
|
||||||
SELECT citus_internal.replace_isolation_tester_func();
|
SELECT citus_internal.replace_isolation_tester_func();
|
||||||
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
|
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
|
||||||
|
|
||||||
SELECT master_add_node('localhost', 57636, groupid => 0);
|
|
||||||
|
|
||||||
CREATE TABLE ref_table(a int primary key);
|
CREATE TABLE ref_table(a int primary key);
|
||||||
SELECT create_reference_table('ref_table');
|
SELECT create_reference_table('ref_table');
|
||||||
INSERT INTO ref_table VALUES (1), (3), (5), (7);
|
INSERT INTO ref_table VALUES (1), (3), (5), (7);
|
||||||
|
@ -83,7 +81,7 @@ step "s2-lock-ref-table-placement-on-coordinator"
|
||||||
|
|
||||||
step "s2-view-dist"
|
step "s2-view-dist"
|
||||||
{
|
{
|
||||||
SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' ORDER BY query DESC;
|
SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' AND query NOT ILIKE '%add_node%' ORDER BY query DESC;
|
||||||
}
|
}
|
||||||
|
|
||||||
step "s2-view-worker"
|
step "s2-view-worker"
|
||||||
|
@ -94,7 +92,8 @@ step "s2-view-worker"
|
||||||
WHERE query NOT ILIKE '%pg_prepared_xacts%' AND
|
WHERE query NOT ILIKE '%pg_prepared_xacts%' AND
|
||||||
query NOT ILIKE '%COMMIT%' AND
|
query NOT ILIKE '%COMMIT%' AND
|
||||||
query NOT ILIKE '%dump_local_%' AND
|
query NOT ILIKE '%dump_local_%' AND
|
||||||
query NOT ILIKE '%citus_internal_local_blocked_processes%'
|
query NOT ILIKE '%citus_internal_local_blocked_processes%' AND
|
||||||
|
query NOT ILIKE '%add_node%'
|
||||||
ORDER BY query, query_hostport DESC;
|
ORDER BY query, query_hostport DESC;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,14 +122,25 @@ step "deadlock-checker-call"
|
||||||
SELECT check_distributed_deadlocks();
|
SELECT check_distributed_deadlocks();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// adding node in setup stage prevents getting a gpid with proper nodeid
|
||||||
|
session "add-node"
|
||||||
|
|
||||||
|
// we issue the checker not only when there are deadlocks to ensure that we never cancel
|
||||||
|
// backend inappropriately
|
||||||
|
step "add-node"
|
||||||
|
{
|
||||||
|
SELECT 1 FROM master_add_node('localhost', 57636, groupid => 0);
|
||||||
|
}
|
||||||
|
|
||||||
// verify that locks on the placement of the reference table on the coordinator is
|
// verify that locks on the placement of the reference table on the coordinator is
|
||||||
// taken into account when looking for distributed deadlocks
|
// taken into account when looking for distributed deadlocks
|
||||||
permutation "s1-begin" "s2-begin" "s1-update-dist-table" "s2-lock-ref-table-placement-on-coordinator" "s1-lock-ref-table-placement-on-coordinator" "s2-update-dist-table" "deadlock-checker-call" "s1-end" "s2-end"
|
permutation "add-node" "s1-begin" "s2-begin" "s1-update-dist-table" "s2-lock-ref-table-placement-on-coordinator" "s1-lock-ref-table-placement-on-coordinator" "s2-update-dist-table" "deadlock-checker-call" "s1-end" "s2-end"
|
||||||
|
|
||||||
// verify that *_dist_stat_activity() functions return the correct result when query
|
// verify that *_dist_stat_activity() functions return the correct result when query
|
||||||
// has a task on the coordinator.
|
// has a task on the coordinator.
|
||||||
permutation "s1-begin" "s2-begin" "s1-update-ref-table" "s2-sleep" "s2-view-dist" "s2-view-worker" "s2-end" "s1-end"
|
permutation "add-node" "s1-begin" "s2-begin" "s1-update-ref-table" "s2-sleep" "s2-view-dist" "s2-view-worker" "s2-end" "s1-end"
|
||||||
|
|
||||||
// verify that get_*_active_transactions() functions return the correct result when
|
// verify that get_*_active_transactions() functions return the correct result when
|
||||||
// the query has a task on the coordinator.
|
// the query has a task on the coordinator.
|
||||||
permutation "s1-begin" "s2-begin" "s1-update-ref-table" "s2-active-transactions" "s1-end" "s2-end"
|
permutation "add-node" "s1-begin" "s2-begin" "s1-update-ref-table" "s2-active-transactions" "s1-end" "s2-end"
|
||||||
|
|
Loading…
Reference in New Issue