Drop support for CitusInitiatedBackend

CitusInitiatedBackend was a pre-mature implemenation of the whole
GlobalPID infrastructure. We used it to track whether any individual
query is triggered by Citus or not.

As of now, after GlobalPID is already in place, we don't need
CitusInitiatedBackend, in fact it could even be wrong.
pull/5734/head
Onder Kalaci 2022-02-23 11:06:47 +01:00
parent 9b4db12651
commit df95d59e33
13 changed files with 119 additions and 121 deletions

View File

@ -87,10 +87,11 @@ CitusSignalBackend(uint64 globalPID, uint64 timeout, int sig)
} }
#endif #endif
int nodeId = ExtractNodeIdFromGlobalPID(globalPID); bool missingOk = false;
int nodeId = ExtractNodeIdFromGlobalPID(globalPID, missingOk);
int processId = ExtractProcessIdFromGlobalPID(globalPID); int processId = ExtractProcessIdFromGlobalPID(globalPID);
WorkerNode *workerNode = FindNodeWithNodeId(nodeId); WorkerNode *workerNode = FindNodeWithNodeId(nodeId, missingOk);
StringInfo cancelQuery = makeStringInfo(); StringInfo cancelQuery = makeStringInfo();

View File

@ -150,8 +150,6 @@ RegisterCitusCustomScanMethods(void)
static void static void
CitusBeginScan(CustomScanState *node, EState *estate, int eflags) CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
{ {
MarkCitusInitiatedCoordinatorBackend();
CitusScanState *scanState = (CitusScanState *) node; CitusScanState *scanState = (CitusScanState *) node;
/* /*

View File

@ -1541,7 +1541,7 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort)
* If the node cannot be found this functions errors. * If the node cannot be found this functions errors.
*/ */
WorkerNode * WorkerNode *
FindNodeWithNodeId(int nodeId) FindNodeWithNodeId(int nodeId, bool missingOk)
{ {
List *workerList = ActiveReadableNodeList(); List *workerList = ActiveReadableNodeList();
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
@ -1555,7 +1555,10 @@ FindNodeWithNodeId(int nodeId)
} }
/* there isn't any node with nodeId in pg_dist_node */ /* there isn't any node with nodeId in pg_dist_node */
if (!missingOk)
{
elog(ERROR, "worker node with node id %d could not be found", nodeId); elog(ERROR, "worker node with node id %d could not be found", nodeId);
}
return NULL; return NULL;
} }

View File

@ -152,9 +152,6 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS)
MyBackendData->transactionId.timestamp = timestamp; MyBackendData->transactionId.timestamp = timestamp;
MyBackendData->transactionId.transactionOriginator = false; MyBackendData->transactionId.transactionOriginator = false;
MyBackendData->citusBackend.initiatorNodeIdentifier =
MyBackendData->transactionId.initiatorNodeIdentifier;
SpinLockRelease(&MyBackendData->mutex); SpinLockRelease(&MyBackendData->mutex);
PG_RETURN_VOID(); PG_RETURN_VOID();
@ -385,7 +382,6 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto
&backendManagementShmemData->backends[backendIndex]; &backendManagementShmemData->backends[backendIndex];
/* to work on data after releasing g spinlock to protect against errors */ /* to work on data after releasing g spinlock to protect against errors */
int initiatorNodeIdentifier = -1;
uint64 transactionNumber = 0; uint64 transactionNumber = 0;
SpinLockAcquire(&currentBackend->mutex); SpinLockAcquire(&currentBackend->mutex);
@ -408,7 +404,6 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto
Oid databaseId = currentBackend->databaseId; Oid databaseId = currentBackend->databaseId;
int backendPid = ProcGlobal->allProcs[backendIndex].pid; int backendPid = ProcGlobal->allProcs[backendIndex].pid;
initiatorNodeIdentifier = currentBackend->citusBackend.initiatorNodeIdentifier;
/* /*
* We prefer to use worker_query instead of distributedCommandOriginator in * We prefer to use worker_query instead of distributedCommandOriginator in
@ -423,9 +418,12 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto
SpinLockRelease(&currentBackend->mutex); SpinLockRelease(&currentBackend->mutex);
bool missingOk = true;
int nodeId = ExtractNodeIdFromGlobalPID(currentBackend->globalPID, missingOk);
values[0] = ObjectIdGetDatum(databaseId); values[0] = ObjectIdGetDatum(databaseId);
values[1] = Int32GetDatum(backendPid); values[1] = Int32GetDatum(backendPid);
values[2] = Int32GetDatum(initiatorNodeIdentifier); values[2] = Int32GetDatum(nodeId);
values[3] = !distributedCommandOriginator; values[3] = !distributedCommandOriginator;
values[4] = UInt64GetDatum(transactionNumber); values[4] = UInt64GetDatum(transactionNumber);
values[5] = TimestampTzGetDatum(transactionIdTimestamp); values[5] = TimestampTzGetDatum(transactionIdTimestamp);
@ -520,7 +518,6 @@ BackendManagementShmemInit(void)
{ {
BackendData *backendData = BackendData *backendData =
&backendManagementShmemData->backends[backendIndex]; &backendManagementShmemData->backends[backendIndex];
backendData->citusBackend.initiatorNodeIdentifier = -1;
SpinLockInit(&backendData->mutex); SpinLockInit(&backendData->mutex);
} }
} }
@ -660,8 +657,6 @@ UnSetDistributedTransactionId(void)
MyBackendData->transactionId.transactionNumber = 0; MyBackendData->transactionId.transactionNumber = 0;
MyBackendData->transactionId.timestamp = 0; MyBackendData->transactionId.timestamp = 0;
MyBackendData->citusBackend.initiatorNodeIdentifier = -1;
SpinLockRelease(&MyBackendData->mutex); SpinLockRelease(&MyBackendData->mutex);
} }
} }
@ -772,29 +767,6 @@ AssignDistributedTransactionId(void)
MyBackendData->transactionId.transactionNumber = nextTransactionNumber; MyBackendData->transactionId.transactionNumber = nextTransactionNumber;
MyBackendData->transactionId.timestamp = currentTimestamp; MyBackendData->transactionId.timestamp = currentTimestamp;
MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId;
SpinLockRelease(&MyBackendData->mutex);
}
/*
* MarkCitusInitiatedCoordinatorBackend sets that coordinator backend is
* initiated by Citus.
*/
void
MarkCitusInitiatedCoordinatorBackend(void)
{
/*
* GetLocalGroupId may throw exception which can cause leaving spin lock
* unreleased. Calling GetLocalGroupId function before the lock to avoid this.
*/
int32 localGroupId = GetLocalGroupId();
SpinLockAcquire(&MyBackendData->mutex);
MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId;
SpinLockRelease(&MyBackendData->mutex); SpinLockRelease(&MyBackendData->mutex);
} }
@ -926,11 +898,12 @@ ExtractGlobalPID(char *applicationName)
* gives us the node id. * gives us the node id.
*/ */
int int
ExtractNodeIdFromGlobalPID(uint64 globalPID) ExtractNodeIdFromGlobalPID(uint64 globalPID, bool missingOk)
{ {
int nodeId = (int) (globalPID / GLOBAL_PID_NODE_ID_MULTIPLIER); int nodeId = (int) (globalPID / GLOBAL_PID_NODE_ID_MULTIPLIER);
if (nodeId == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA) if (!missingOk &&
nodeId == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA)
{ {
ereport(ERROR, (errmsg("originator node of the query with the global pid " ereport(ERROR, (errmsg("originator node of the query with the global pid "
"%lu is not in Citus' metadata", globalPID), "%lu is not in Citus' metadata", globalPID),

View File

@ -538,33 +538,35 @@ ReplaceInitiatorNodeIdentifier(int initiator_node_identifier,
* transaction. However, we cannot know which node has initiated * transaction. However, we cannot know which node has initiated
* the worker query. * the worker query.
*/ */
if (initiator_node_identifier > 0) if (initiator_node_identifier == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA &&
{ IsCoordinator())
bool nodeExists = false;
initiatorWorkerNode = PrimaryNodeForGroup(initiator_node_identifier, &nodeExists);
/* a query should run on an existing node */
Assert(nodeExists);
if (initiatorWorkerNode == NULL)
{
ereport(ERROR, (errmsg("no primary node found for group %d",
initiator_node_identifier)));
}
citusDistStat->master_query_host_name =
cstring_to_text(initiatorWorkerNode->workerName);
citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort;
}
else if (initiator_node_identifier == 0 && IsCoordinator())
{ {
citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name);
citusDistStat->master_query_host_port = PostPortNumber; citusDistStat->master_query_host_port = PostPortNumber;
} }
else if (initiator_node_identifier == 0) else if (initiator_node_identifier == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA)
{ {
citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name);
citusDistStat->master_query_host_port = 0; citusDistStat->master_query_host_port = 0;
} }
else if (initiator_node_identifier > 0)
{
/* a query should run on an existing node, but lets be defensive */
bool missingOk = true;
initiatorWorkerNode = FindNodeWithNodeId(initiator_node_identifier, missingOk);
if (initiatorWorkerNode)
{
citusDistStat->master_query_host_name =
cstring_to_text(initiatorWorkerNode->workerName);
citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort;
}
else
{
citusDistStat->master_query_host_name = NULL;
citusDistStat->master_query_host_port = 0;
}
}
else else
{ {
citusDistStat->master_query_host_name = NULL; citusDistStat->master_query_host_name = NULL;

View File

@ -22,16 +22,6 @@
#include "storage/s_lock.h" #include "storage/s_lock.h"
/*
* CitusInitiatedBackend keeps some information about the backends that are
* initiated by Citus.
*/
typedef struct CitusInitiatedBackend
{
int initiatorNodeIdentifier;
} CitusInitiatedBackend;
/* /*
* Each backend's active distributed transaction information is tracked via * Each backend's active distributed transaction information is tracked via
* BackendData in shared memory. * BackendData in shared memory.
@ -51,7 +41,6 @@ typedef struct BackendData
bool cancelledDueToDeadlock; bool cancelledDueToDeadlock;
uint64 globalPID; uint64 globalPID;
bool distributedCommandOriginator; bool distributedCommandOriginator;
CitusInitiatedBackend citusBackend;
DistributedTransactionId transactionId; DistributedTransactionId transactionId;
} BackendData; } BackendData;
@ -64,13 +53,12 @@ extern void UnlockBackendSharedMemory(void);
extern void UnSetDistributedTransactionId(void); extern void UnSetDistributedTransactionId(void);
extern void UnSetGlobalPID(void); extern void UnSetGlobalPID(void);
extern void AssignDistributedTransactionId(void); extern void AssignDistributedTransactionId(void);
extern void MarkCitusInitiatedCoordinatorBackend(void);
extern void AssignGlobalPID(void); extern void AssignGlobalPID(void);
extern uint64 GetGlobalPID(void); extern uint64 GetGlobalPID(void);
extern void OverrideBackendDataDistributedCommandOriginator(bool extern void OverrideBackendDataDistributedCommandOriginator(bool
distributedCommandOriginator); distributedCommandOriginator);
extern uint64 ExtractGlobalPID(char *applicationName); extern uint64 ExtractGlobalPID(char *applicationName);
extern int ExtractNodeIdFromGlobalPID(uint64 globalPID); extern int ExtractNodeIdFromGlobalPID(uint64 globalPID, bool missingOk);
extern int ExtractProcessIdFromGlobalPID(uint64 globalPID); extern int ExtractProcessIdFromGlobalPID(uint64 globalPID);
extern void GetBackendDataForProc(PGPROC *proc, BackendData *result); extern void GetBackendDataForProc(PGPROC *proc, BackendData *result);
extern void CancelTransactionDueToDeadlock(PGPROC *proc); extern void CancelTransactionDueToDeadlock(PGPROC *proc);

View File

@ -86,7 +86,7 @@ extern List * ActiveReadableNodeList(void);
extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort);
extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort);
extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort);
extern WorkerNode * FindNodeWithNodeId(int nodeId); extern WorkerNode * FindNodeWithNodeId(int nodeId, bool missingOk);
extern List * ReadDistNode(bool includeNodesFromOtherClusters); extern List * ReadDistNode(bool includeNodesFromOtherClusters);
extern void EnsureCoordinator(void); extern void EnsureCoordinator(void);
extern void InsertCoordinatorIfClusterEmpty(void); extern void InsertCoordinatorIfClusterEmpty(void);

View File

@ -89,11 +89,11 @@ step s1-insert:
step s1-verify-current-xact-is-on-worker: step s1-verify-current-xact-is-on-worker:
SELECT SELECT
remote.nodeport, remote.nodeport,
remote.result = row(xact.initiator_node_identifier, xact.transaction_number)::text AS xact_exists remote.result = row(xact.transaction_number)::text AS xact_exists
FROM FROM
get_current_transaction_id() as xact, get_current_transaction_id() as xact,
run_command_on_workers($$ run_command_on_workers($$
SELECT row(initiator_node_identifier, transaction_number) SELECT row(transaction_number)
FROM get_all_active_transactions() FROM get_all_active_transactions()
WHERE transaction_number != 0; WHERE transaction_number != 0;
$$) as remote $$) as remote

View File

@ -33,7 +33,7 @@ blocked_statement |current_statement_in_blockin
--------------------------------------------------------------------- ---------------------------------------------------------------------
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1| UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|
UPDATE ref_table SET value_1 = 15; UPDATE ref_table SET value_1 = 15;
|localhost |coordinator_host | 57638| 57636 |coordinator_host |coordinator_host | 57636| 57636
(1 row) (1 row)
step s1-commit: step s1-commit:
@ -116,7 +116,7 @@ step s3-select-distributed-waiting-queries:
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|localhost |localhost | 57638| 57637 UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|coordinator_host |coordinator_host | 57636| 57636
(1 row) (1 row)
step s1-commit-worker: step s1-commit-worker:
@ -212,7 +212,7 @@ step s3-select-distributed-waiting-queries:
blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |localhost |localhost | 57638| 57637 UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |coordinator_host |coordinator_host | 57636| 57636
(1 row) (1 row)
step s1-commit-worker: step s1-commit-worker:
@ -308,7 +308,7 @@ step s3-select-distributed-waiting-queries:
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|DELETE FROM ref_table WHERE user_id = 1|localhost |localhost | 57638| 57637 UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|DELETE FROM ref_table WHERE user_id = 1|coordinator_host |coordinator_host | 57636| 57636
(1 row) (1 row)
step s1-commit-worker: step s1-commit-worker:
@ -404,7 +404,7 @@ step s3-select-distributed-waiting-queries:
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|INSERT INTO ref_table VALUES(8,81),(9,91)|localhost |localhost | 57638| 57637 UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|INSERT INTO ref_table VALUES(8,81),(9,91)|coordinator_host |coordinator_host | 57636| 57636
(1 row) (1 row)
step s1-commit-worker: step s1-commit-worker:
@ -594,7 +594,7 @@ step s3-select-distributed-waiting-queries:
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|COPY ref_table FROM PROGRAM 'echo 10, 101 && echo 11, 111' WITH CSV|localhost |localhost | 57638| 57637 UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|COPY ref_table FROM PROGRAM 'echo 10, 101 && echo 11, 111' WITH CSV|coordinator_host |coordinator_host | 57636| 57636
(1 row) (1 row)
step s1-commit-worker: step s1-commit-worker:
@ -878,7 +878,7 @@ step s3-select-distributed-waiting-queries:
blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|SELECT * FROM ref_table FOR UPDATE |localhost |localhost | 57638| 57637 UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|SELECT * FROM ref_table FOR UPDATE |coordinator_host |coordinator_host | 57636| 57636
(1 row) (1 row)
step s1-commit-worker: step s1-commit-worker:
@ -963,7 +963,7 @@ blocked_statement |current_s
--------------------------------------------------------------------- ---------------------------------------------------------------------
ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id);
|INSERT INTO ref_table VALUES(8,81),(9,91)|coordinator_host |localhost | 57636| 57638 |INSERT INTO ref_table VALUES(8,81),(9,91)|coordinator_host |coordinator_host | 57636| 57636
(1 row) (1 row)
step s2-commit-worker: step s2-commit-worker:
@ -1073,7 +1073,7 @@ step s3-select-distributed-waiting-queries:
blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |localhost |localhost | 57637| 57637 UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |coordinator_host |coordinator_host | 57636| 57636
(1 row) (1 row)
step s1-commit-worker: step s1-commit-worker:
@ -1161,7 +1161,7 @@ step s3-select-distributed-waiting-queries:
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|localhost |localhost | 57638| 57637 UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|coordinator_host |coordinator_host | 57636| 57636
(1 row) (1 row)
step s1-commit-worker: step s1-commit-worker:
@ -1225,7 +1225,7 @@ blocked_statement |current_statement_in_blockin
--------------------------------------------------------------------- ---------------------------------------------------------------------
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1| UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|
UPDATE ref_table SET value_1 = 15; UPDATE ref_table SET value_1 = 15;
|localhost |coordinator_host | 57638| 57636 |coordinator_host |coordinator_host | 57636| 57636
(1 row) (1 row)
step s1-commit: step s1-commit:

View File

@ -1,11 +1,19 @@
Parsed test spec with 3 sessions Parsed test spec with 4 sessions
starting permutation: s1-begin s2-begin s1-update-dist-table s2-lock-ref-table-placement-on-coordinator s1-lock-ref-table-placement-on-coordinator s2-update-dist-table deadlock-checker-call s1-end s2-end starting permutation: add-node s1-begin s2-begin s1-update-dist-table s2-lock-ref-table-placement-on-coordinator s1-lock-ref-table-placement-on-coordinator s2-update-dist-table deadlock-checker-call s1-end s2-end
create_distributed_table create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
step add-node:
SELECT 1 FROM master_add_node('localhost', 57636, groupid => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
step s1-begin: step s1-begin:
BEGIN; BEGIN;
@ -59,12 +67,20 @@ master_remove_node
(1 row) (1 row)
starting permutation: s1-begin s2-begin s1-update-ref-table s2-sleep s2-view-dist s2-view-worker s2-end s1-end starting permutation: add-node s1-begin s2-begin s1-update-ref-table s2-sleep s2-view-dist s2-view-worker s2-end s1-end
create_distributed_table create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
step add-node:
SELECT 1 FROM master_add_node('localhost', 57636, groupid => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
step s1-begin: step s1-begin:
BEGIN; BEGIN;
@ -83,7 +99,7 @@ pg_sleep
(1 row) (1 row)
step s2-view-dist: step s2-view-dist:
SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' ORDER BY query DESC; SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' AND query NOT ILIKE '%add_node%' ORDER BY query DESC;
query |query_hostname |query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname query |query_hostname |query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -92,7 +108,7 @@ query |query_hostname |query_hostport|distri
|coordinator_host| 57636| | 0|idle |Client |ClientRead|postgres|regression |coordinator_host| 57636| | 0|idle |Client |ClientRead|postgres|regression
update ref_table set a = a + 1; update ref_table set a = a + 1;
|coordinator_host| 57636|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression |coordinator_host| 57636| | 0|idle in transaction|Client |ClientRead|postgres|regression
(2 rows) (2 rows)
step s2-view-worker: step s2-view-worker:
@ -102,13 +118,14 @@ step s2-view-worker:
WHERE query NOT ILIKE '%pg_prepared_xacts%' AND WHERE query NOT ILIKE '%pg_prepared_xacts%' AND
query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%COMMIT%' AND
query NOT ILIKE '%dump_local_%' AND query NOT ILIKE '%dump_local_%' AND
query NOT ILIKE '%citus_internal_local_blocked_processes%' query NOT ILIKE '%citus_internal_local_blocked_processes%' AND
query NOT ILIKE '%add_node%'
ORDER BY query, query_hostport DESC; ORDER BY query, query_hostport DESC;
query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname
--------------------------------------------------------------------- ---------------------------------------------------------------------
UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638| | 0|idle in transaction|Client |ClientRead|postgres|regression
UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637| | 0|idle in transaction|Client |ClientRead|postgres|regression
(2 rows) (2 rows)
step s2-end: step s2-end:
@ -123,12 +140,20 @@ master_remove_node
(1 row) (1 row)
starting permutation: s1-begin s2-begin s1-update-ref-table s2-active-transactions s1-end s2-end starting permutation: add-node s1-begin s2-begin s1-update-ref-table s2-active-transactions s1-end s2-end
create_distributed_table create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
step add-node:
SELECT 1 FROM master_add_node('localhost', 57636, groupid => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
step s1-begin: step s1-begin:
BEGIN; BEGIN;

View File

@ -49,11 +49,11 @@ step "s1-verify-current-xact-is-on-worker"
{ {
SELECT SELECT
remote.nodeport, remote.nodeport,
remote.result = row(xact.initiator_node_identifier, xact.transaction_number)::text AS xact_exists remote.result = row(xact.transaction_number)::text AS xact_exists
FROM FROM
get_current_transaction_id() as xact, get_current_transaction_id() as xact,
run_command_on_workers($$ run_command_on_workers($$
SELECT row(initiator_node_identifier, transaction_number) SELECT row(transaction_number)
FROM get_all_active_transactions() FROM get_all_active_transactions()
WHERE transaction_number != 0; WHERE transaction_number != 0;
$$) as remote $$) as remote

View File

@ -29,7 +29,6 @@ setup
teardown teardown
{ {
// drop all distributed tables
DROP TABLE IF EXISTS ref_table, DROP TABLE IF EXISTS ref_table,
dist_table, dist_table,
dist_partitioned_table, dist_partitioned_table,
@ -39,7 +38,6 @@ teardown
new_ref_table; new_ref_table;
// drop all distributed objects
DROP FUNCTION activate_node_snapshot(); DROP FUNCTION activate_node_snapshot();
DROP FUNCTION IF EXISTS squares(int); DROP FUNCTION IF EXISTS squares(int);
DROP TYPE IF EXISTS my_type; DROP TYPE IF EXISTS my_type;

View File

@ -3,8 +3,6 @@ setup
SELECT citus_internal.replace_isolation_tester_func(); SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement(); SELECT citus_internal.refresh_isolation_tester_prepared_statement();
SELECT master_add_node('localhost', 57636, groupid => 0);
CREATE TABLE ref_table(a int primary key); CREATE TABLE ref_table(a int primary key);
SELECT create_reference_table('ref_table'); SELECT create_reference_table('ref_table');
INSERT INTO ref_table VALUES (1), (3), (5), (7); INSERT INTO ref_table VALUES (1), (3), (5), (7);
@ -83,7 +81,7 @@ step "s2-lock-ref-table-placement-on-coordinator"
step "s2-view-dist" step "s2-view-dist"
{ {
SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' ORDER BY query DESC; SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' AND query NOT ILIKE '%add_node%' ORDER BY query DESC;
} }
step "s2-view-worker" step "s2-view-worker"
@ -94,7 +92,8 @@ step "s2-view-worker"
WHERE query NOT ILIKE '%pg_prepared_xacts%' AND WHERE query NOT ILIKE '%pg_prepared_xacts%' AND
query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%COMMIT%' AND
query NOT ILIKE '%dump_local_%' AND query NOT ILIKE '%dump_local_%' AND
query NOT ILIKE '%citus_internal_local_blocked_processes%' query NOT ILIKE '%citus_internal_local_blocked_processes%' AND
query NOT ILIKE '%add_node%'
ORDER BY query, query_hostport DESC; ORDER BY query, query_hostport DESC;
} }
@ -123,14 +122,25 @@ step "deadlock-checker-call"
SELECT check_distributed_deadlocks(); SELECT check_distributed_deadlocks();
} }
// adding node in setup stage prevents getting a gpid with proper nodeid
session "add-node"
// we issue the checker not only when there are deadlocks to ensure that we never cancel
// backend inappropriately
step "add-node"
{
SELECT 1 FROM master_add_node('localhost', 57636, groupid => 0);
}
// verify that locks on the placement of the reference table on the coordinator is // verify that locks on the placement of the reference table on the coordinator is
// taken into account when looking for distributed deadlocks // taken into account when looking for distributed deadlocks
permutation "s1-begin" "s2-begin" "s1-update-dist-table" "s2-lock-ref-table-placement-on-coordinator" "s1-lock-ref-table-placement-on-coordinator" "s2-update-dist-table" "deadlock-checker-call" "s1-end" "s2-end" permutation "add-node" "s1-begin" "s2-begin" "s1-update-dist-table" "s2-lock-ref-table-placement-on-coordinator" "s1-lock-ref-table-placement-on-coordinator" "s2-update-dist-table" "deadlock-checker-call" "s1-end" "s2-end"
// verify that *_dist_stat_activity() functions return the correct result when query // verify that *_dist_stat_activity() functions return the correct result when query
// has a task on the coordinator. // has a task on the coordinator.
permutation "s1-begin" "s2-begin" "s1-update-ref-table" "s2-sleep" "s2-view-dist" "s2-view-worker" "s2-end" "s1-end" permutation "add-node" "s1-begin" "s2-begin" "s1-update-ref-table" "s2-sleep" "s2-view-dist" "s2-view-worker" "s2-end" "s1-end"
// verify that get_*_active_transactions() functions return the correct result when // verify that get_*_active_transactions() functions return the correct result when
// the query has a task on the coordinator. // the query has a task on the coordinator.
permutation "s1-begin" "s2-begin" "s1-update-ref-table" "s2-active-transactions" "s1-end" "s2-end" permutation "add-node" "s1-begin" "s2-begin" "s1-update-ref-table" "s2-active-transactions" "s1-end" "s2-end"