diff --git a/src/backend/distributed/commands/citus_global_signal.c b/src/backend/distributed/commands/citus_global_signal.c index fc7618159..64bb67f0d 100644 --- a/src/backend/distributed/commands/citus_global_signal.c +++ b/src/backend/distributed/commands/citus_global_signal.c @@ -87,10 +87,11 @@ CitusSignalBackend(uint64 globalPID, uint64 timeout, int sig) } #endif - int nodeId = ExtractNodeIdFromGlobalPID(globalPID); + bool missingOk = false; + int nodeId = ExtractNodeIdFromGlobalPID(globalPID, missingOk); int processId = ExtractProcessIdFromGlobalPID(globalPID); - WorkerNode *workerNode = FindNodeWithNodeId(nodeId); + WorkerNode *workerNode = FindNodeWithNodeId(nodeId, missingOk); StringInfo cancelQuery = makeStringInfo(); diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 1c9800663..92bfbd715 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -150,8 +150,6 @@ RegisterCitusCustomScanMethods(void) static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags) { - MarkCitusInitiatedCoordinatorBackend(); - CitusScanState *scanState = (CitusScanState *) node; /* diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 967d6bef3..498497e64 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -1541,7 +1541,7 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort) * If the node cannot be found this functions errors. */ WorkerNode * -FindNodeWithNodeId(int nodeId) +FindNodeWithNodeId(int nodeId, bool missingOk) { List *workerList = ActiveReadableNodeList(); WorkerNode *workerNode = NULL; @@ -1555,7 +1555,10 @@ FindNodeWithNodeId(int nodeId) } /* there isn't any node with nodeId in pg_dist_node */ - elog(ERROR, "worker node with node id %d could not be found", nodeId); + if (!missingOk) + { + elog(ERROR, "worker node with node id %d could not be found", nodeId); + } return NULL; } diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 5ca517199..12ff8d78b 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -152,9 +152,6 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS) MyBackendData->transactionId.timestamp = timestamp; MyBackendData->transactionId.transactionOriginator = false; - MyBackendData->citusBackend.initiatorNodeIdentifier = - MyBackendData->transactionId.initiatorNodeIdentifier; - SpinLockRelease(&MyBackendData->mutex); PG_RETURN_VOID(); @@ -385,7 +382,6 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto &backendManagementShmemData->backends[backendIndex]; /* to work on data after releasing g spinlock to protect against errors */ - int initiatorNodeIdentifier = -1; uint64 transactionNumber = 0; SpinLockAcquire(¤tBackend->mutex); @@ -408,7 +404,6 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto Oid databaseId = currentBackend->databaseId; int backendPid = ProcGlobal->allProcs[backendIndex].pid; - initiatorNodeIdentifier = currentBackend->citusBackend.initiatorNodeIdentifier; /* * We prefer to use worker_query instead of distributedCommandOriginator in @@ -423,9 +418,12 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto SpinLockRelease(¤tBackend->mutex); + bool missingOk = true; + int nodeId = ExtractNodeIdFromGlobalPID(currentBackend->globalPID, missingOk); + values[0] = ObjectIdGetDatum(databaseId); values[1] = Int32GetDatum(backendPid); - values[2] = Int32GetDatum(initiatorNodeIdentifier); + values[2] = Int32GetDatum(nodeId); values[3] = !distributedCommandOriginator; values[4] = UInt64GetDatum(transactionNumber); values[5] = TimestampTzGetDatum(transactionIdTimestamp); @@ -520,7 +518,6 @@ BackendManagementShmemInit(void) { BackendData *backendData = &backendManagementShmemData->backends[backendIndex]; - backendData->citusBackend.initiatorNodeIdentifier = -1; SpinLockInit(&backendData->mutex); } } @@ -660,8 +657,6 @@ UnSetDistributedTransactionId(void) MyBackendData->transactionId.transactionNumber = 0; MyBackendData->transactionId.timestamp = 0; - MyBackendData->citusBackend.initiatorNodeIdentifier = -1; - SpinLockRelease(&MyBackendData->mutex); } } @@ -772,29 +767,6 @@ AssignDistributedTransactionId(void) MyBackendData->transactionId.transactionNumber = nextTransactionNumber; MyBackendData->transactionId.timestamp = currentTimestamp; - MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId; - - SpinLockRelease(&MyBackendData->mutex); -} - - -/* - * MarkCitusInitiatedCoordinatorBackend sets that coordinator backend is - * initiated by Citus. - */ -void -MarkCitusInitiatedCoordinatorBackend(void) -{ - /* - * GetLocalGroupId may throw exception which can cause leaving spin lock - * unreleased. Calling GetLocalGroupId function before the lock to avoid this. - */ - int32 localGroupId = GetLocalGroupId(); - - SpinLockAcquire(&MyBackendData->mutex); - - MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId; - SpinLockRelease(&MyBackendData->mutex); } @@ -926,11 +898,12 @@ ExtractGlobalPID(char *applicationName) * gives us the node id. */ int -ExtractNodeIdFromGlobalPID(uint64 globalPID) +ExtractNodeIdFromGlobalPID(uint64 globalPID, bool missingOk) { int nodeId = (int) (globalPID / GLOBAL_PID_NODE_ID_MULTIPLIER); - if (nodeId == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA) + if (!missingOk && + nodeId == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA) { ereport(ERROR, (errmsg("originator node of the query with the global pid " "%lu is not in Citus' metadata", globalPID), diff --git a/src/backend/distributed/transaction/citus_dist_stat_activity.c b/src/backend/distributed/transaction/citus_dist_stat_activity.c index ec4f6e8a3..0ee3925fb 100644 --- a/src/backend/distributed/transaction/citus_dist_stat_activity.c +++ b/src/backend/distributed/transaction/citus_dist_stat_activity.c @@ -538,33 +538,35 @@ ReplaceInitiatorNodeIdentifier(int initiator_node_identifier, * transaction. However, we cannot know which node has initiated * the worker query. */ - if (initiator_node_identifier > 0) - { - bool nodeExists = false; - - initiatorWorkerNode = PrimaryNodeForGroup(initiator_node_identifier, &nodeExists); - - /* a query should run on an existing node */ - Assert(nodeExists); - if (initiatorWorkerNode == NULL) - { - ereport(ERROR, (errmsg("no primary node found for group %d", - initiator_node_identifier))); - } - citusDistStat->master_query_host_name = - cstring_to_text(initiatorWorkerNode->workerName); - citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort; - } - else if (initiator_node_identifier == 0 && IsCoordinator()) + if (initiator_node_identifier == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA && + IsCoordinator()) { citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); citusDistStat->master_query_host_port = PostPortNumber; } - else if (initiator_node_identifier == 0) + else if (initiator_node_identifier == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA) { citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); citusDistStat->master_query_host_port = 0; } + else if (initiator_node_identifier > 0) + { + /* a query should run on an existing node, but lets be defensive */ + bool missingOk = true; + initiatorWorkerNode = FindNodeWithNodeId(initiator_node_identifier, missingOk); + + if (initiatorWorkerNode) + { + citusDistStat->master_query_host_name = + cstring_to_text(initiatorWorkerNode->workerName); + citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort; + } + else + { + citusDistStat->master_query_host_name = NULL; + citusDistStat->master_query_host_port = 0; + } + } else { citusDistStat->master_query_host_name = NULL; diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index b463b89f5..c34d94670 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -22,16 +22,6 @@ #include "storage/s_lock.h" -/* - * CitusInitiatedBackend keeps some information about the backends that are - * initiated by Citus. - */ -typedef struct CitusInitiatedBackend -{ - int initiatorNodeIdentifier; -} CitusInitiatedBackend; - - /* * Each backend's active distributed transaction information is tracked via * BackendData in shared memory. @@ -51,7 +41,6 @@ typedef struct BackendData bool cancelledDueToDeadlock; uint64 globalPID; bool distributedCommandOriginator; - CitusInitiatedBackend citusBackend; DistributedTransactionId transactionId; } BackendData; @@ -64,13 +53,12 @@ extern void UnlockBackendSharedMemory(void); extern void UnSetDistributedTransactionId(void); extern void UnSetGlobalPID(void); extern void AssignDistributedTransactionId(void); -extern void MarkCitusInitiatedCoordinatorBackend(void); extern void AssignGlobalPID(void); extern uint64 GetGlobalPID(void); extern void OverrideBackendDataDistributedCommandOriginator(bool distributedCommandOriginator); extern uint64 ExtractGlobalPID(char *applicationName); -extern int ExtractNodeIdFromGlobalPID(uint64 globalPID); +extern int ExtractNodeIdFromGlobalPID(uint64 globalPID, bool missingOk); extern int ExtractProcessIdFromGlobalPID(uint64 globalPID); extern void GetBackendDataForProc(PGPROC *proc, BackendData *result); extern void CancelTransactionDueToDeadlock(PGPROC *proc); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index db8adaedb..27de1d464 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -86,7 +86,7 @@ extern List * ActiveReadableNodeList(void); extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort); -extern WorkerNode * FindNodeWithNodeId(int nodeId); +extern WorkerNode * FindNodeWithNodeId(int nodeId, bool missingOk); extern List * ReadDistNode(bool includeNodesFromOtherClusters); extern void EnsureCoordinator(void); extern void InsertCoordinatorIfClusterEmpty(void); diff --git a/src/test/regress/expected/isolation_distributed_transaction_id.out b/src/test/regress/expected/isolation_distributed_transaction_id.out index 3d44f0069..be52248f8 100644 --- a/src/test/regress/expected/isolation_distributed_transaction_id.out +++ b/src/test/regress/expected/isolation_distributed_transaction_id.out @@ -13,7 +13,7 @@ assign_distributed_transaction_id (1 row) step s1-get-all-transactions: - SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; initiator_node_identifier|transaction_number|transaction_stamp --------------------------------------------------------------------- @@ -32,7 +32,7 @@ assign_distributed_transaction_id (1 row) step s2-get-all-transactions: - SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; initiator_node_identifier|transaction_number|transaction_stamp --------------------------------------------------------------------- @@ -51,7 +51,7 @@ assign_distributed_transaction_id (1 row) step s3-get-all-transactions: - SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; initiator_node_identifier|transaction_number|transaction_stamp --------------------------------------------------------------------- @@ -70,10 +70,10 @@ step s3-commit: starting permutation: s1-create-table s1-begin s1-insert s1-verify-current-xact-is-on-worker s1-drop-table s1-commit step s1-create-table: - -- some tests also use distributed table - CREATE TABLE distributed_transaction_id_table(some_value int, other_value int); - SET citus.shard_count TO 4; - SELECT create_distributed_table('distributed_transaction_id_table', 'some_value'); + -- some tests also use distributed table + CREATE TABLE distributed_transaction_id_table(some_value int, other_value int); + SET citus.shard_count TO 4; + SELECT create_distributed_table('distributed_transaction_id_table', 'some_value'); create_distributed_table --------------------------------------------------------------------- @@ -84,16 +84,16 @@ step s1-begin: BEGIN; step s1-insert: - INSERT INTO distributed_transaction_id_table VALUES (1, 1); + INSERT INTO distributed_transaction_id_table VALUES (1, 1); step s1-verify-current-xact-is-on-worker: - SELECT - remote.nodeport, - remote.result = row(xact.initiator_node_identifier, xact.transaction_number)::text AS xact_exists - FROM - get_current_transaction_id() as xact, - run_command_on_workers($$ - SELECT row(initiator_node_identifier, transaction_number) + SELECT + remote.nodeport, + remote.result = row(xact.transaction_number)::text AS xact_exists + FROM + get_current_transaction_id() as xact, + run_command_on_workers($$ + SELECT row(transaction_number) FROM get_all_active_transactions() WHERE transaction_number != 0; $$) as remote @@ -106,7 +106,7 @@ nodeport|xact_exists (2 rows) step s1-drop-table: - DROP TABLE distributed_transaction_id_table; + DROP TABLE distributed_transaction_id_table; step s1-commit: COMMIT; diff --git a/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out b/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out index cae2222ed..109c61186 100644 --- a/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out +++ b/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out @@ -33,7 +33,7 @@ blocked_statement |current_statement_in_blockin --------------------------------------------------------------------- UPDATE ref_table SET value_1 = 12 WHERE user_id = 1| UPDATE ref_table SET value_1 = 15; -|localhost |coordinator_host | 57638| 57636 +|coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit: @@ -116,7 +116,7 @@ step s3-select-distributed-waiting-queries: blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- -UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|localhost |localhost | 57638| 57637 +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit-worker: @@ -212,7 +212,7 @@ step s3-select-distributed-waiting-queries: blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- -UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |localhost |localhost | 57638| 57637 +UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit-worker: @@ -308,7 +308,7 @@ step s3-select-distributed-waiting-queries: blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- -UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|DELETE FROM ref_table WHERE user_id = 1|localhost |localhost | 57638| 57637 +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|DELETE FROM ref_table WHERE user_id = 1|coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit-worker: @@ -404,7 +404,7 @@ step s3-select-distributed-waiting-queries: blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- -UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|INSERT INTO ref_table VALUES(8,81),(9,91)|localhost |localhost | 57638| 57637 +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|INSERT INTO ref_table VALUES(8,81),(9,91)|coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit-worker: @@ -594,7 +594,7 @@ step s3-select-distributed-waiting-queries: blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- -UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|COPY ref_table FROM PROGRAM 'echo 10, 101 && echo 11, 111' WITH CSV|localhost |localhost | 57638| 57637 +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|COPY ref_table FROM PROGRAM 'echo 10, 101 && echo 11, 111' WITH CSV|coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit-worker: @@ -878,7 +878,7 @@ step s3-select-distributed-waiting-queries: blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- -UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|SELECT * FROM ref_table FOR UPDATE |localhost |localhost | 57638| 57637 +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|SELECT * FROM ref_table FOR UPDATE |coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit-worker: @@ -963,7 +963,7 @@ blocked_statement |current_s --------------------------------------------------------------------- ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); -|INSERT INTO ref_table VALUES(8,81),(9,91)|coordinator_host |localhost | 57636| 57638 +|INSERT INTO ref_table VALUES(8,81),(9,91)|coordinator_host |coordinator_host | 57636| 57636 (1 row) step s2-commit-worker: @@ -1073,7 +1073,7 @@ step s3-select-distributed-waiting-queries: blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- -UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |localhost |localhost | 57637| 57637 +UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit-worker: @@ -1161,7 +1161,7 @@ step s3-select-distributed-waiting-queries: blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- -UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|localhost |localhost | 57638| 57637 +UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit-worker: @@ -1225,7 +1225,7 @@ blocked_statement |current_statement_in_blockin --------------------------------------------------------------------- UPDATE ref_table SET value_1 = 12 WHERE user_id = 1| UPDATE ref_table SET value_1 = 15; -|localhost |coordinator_host | 57638| 57636 +|coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit: diff --git a/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out index 2d49f8586..dd7ddefad 100644 --- a/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out @@ -1,11 +1,19 @@ -Parsed test spec with 3 sessions +Parsed test spec with 4 sessions -starting permutation: s1-begin s2-begin s1-update-dist-table s2-lock-ref-table-placement-on-coordinator s1-lock-ref-table-placement-on-coordinator s2-update-dist-table deadlock-checker-call s1-end s2-end +starting permutation: add-node s1-begin s2-begin s1-update-dist-table s2-lock-ref-table-placement-on-coordinator s1-lock-ref-table-placement-on-coordinator s2-update-dist-table deadlock-checker-call s1-end s2-end create_distributed_table --------------------------------------------------------------------- (1 row) +step add-node: + SELECT 1 FROM master_add_node('localhost', 57636, groupid => 0); + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + step s1-begin: BEGIN; @@ -59,12 +67,20 @@ master_remove_node (1 row) -starting permutation: s1-begin s2-begin s1-update-ref-table s2-sleep s2-view-dist s2-view-worker s2-end s1-end +starting permutation: add-node s1-begin s2-begin s1-update-ref-table s2-sleep s2-view-dist s2-view-worker s2-end s1-end create_distributed_table --------------------------------------------------------------------- (1 row) +step add-node: + SELECT 1 FROM master_add_node('localhost', 57636, groupid => 0); + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + step s1-begin: BEGIN; @@ -83,7 +99,7 @@ pg_sleep (1 row) step s2-view-dist: - SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' ORDER BY query DESC; + SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' AND query NOT ILIKE '%add_node%' ORDER BY query DESC; query |query_hostname |query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- @@ -92,7 +108,7 @@ query |query_hostname |query_hostport|distri |coordinator_host| 57636| | 0|idle |Client |ClientRead|postgres|regression update ref_table set a = a + 1; - |coordinator_host| 57636|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression + |coordinator_host| 57636| | 0|idle in transaction|Client |ClientRead|postgres|regression (2 rows) step s2-view-worker: @@ -102,13 +118,14 @@ step s2-view-worker: WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%dump_local_%' AND - query NOT ILIKE '%citus_internal_local_blocked_processes%' + query NOT ILIKE '%citus_internal_local_blocked_processes%' AND + query NOT ILIKE '%add_node%' ORDER BY query, query_hostport DESC; -query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname +query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- -UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression -UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638| | 0|idle in transaction|Client |ClientRead|postgres|regression +UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637| | 0|idle in transaction|Client |ClientRead|postgres|regression (2 rows) step s2-end: @@ -123,12 +140,20 @@ master_remove_node (1 row) -starting permutation: s1-begin s2-begin s1-update-ref-table s2-active-transactions s1-end s2-end +starting permutation: add-node s1-begin s2-begin s1-update-ref-table s2-active-transactions s1-end s2-end create_distributed_table --------------------------------------------------------------------- (1 row) +step add-node: + SELECT 1 FROM master_add_node('localhost', 57636, groupid => 0); + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + step s1-begin: BEGIN; diff --git a/src/test/regress/spec/isolation_distributed_transaction_id.spec b/src/test/regress/spec/isolation_distributed_transaction_id.spec index f928918ed..0b626f2c8 100644 --- a/src/test/regress/spec/isolation_distributed_transaction_id.spec +++ b/src/test/regress/spec/isolation_distributed_transaction_id.spec @@ -49,11 +49,11 @@ step "s1-verify-current-xact-is-on-worker" { SELECT remote.nodeport, - remote.result = row(xact.initiator_node_identifier, xact.transaction_number)::text AS xact_exists + remote.result = row(xact.transaction_number)::text AS xact_exists FROM get_current_transaction_id() as xact, run_command_on_workers($$ - SELECT row(initiator_node_identifier, transaction_number) + SELECT row(transaction_number) FROM get_all_active_transactions() WHERE transaction_number != 0; $$) as remote diff --git a/src/test/regress/spec/isolation_metadata_sync_vs_all.spec b/src/test/regress/spec/isolation_metadata_sync_vs_all.spec index d11565e2f..80cfe2e33 100644 --- a/src/test/regress/spec/isolation_metadata_sync_vs_all.spec +++ b/src/test/regress/spec/isolation_metadata_sync_vs_all.spec @@ -29,7 +29,6 @@ setup teardown { - // drop all distributed tables DROP TABLE IF EXISTS ref_table, dist_table, dist_partitioned_table, @@ -39,7 +38,6 @@ teardown new_ref_table; - // drop all distributed objects DROP FUNCTION activate_node_snapshot(); DROP FUNCTION IF EXISTS squares(int); DROP TYPE IF EXISTS my_type; diff --git a/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec b/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec index fa2079ba5..c4d6c8fc1 100644 --- a/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec +++ b/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec @@ -3,8 +3,6 @@ setup SELECT citus_internal.replace_isolation_tester_func(); SELECT citus_internal.refresh_isolation_tester_prepared_statement(); - SELECT master_add_node('localhost', 57636, groupid => 0); - CREATE TABLE ref_table(a int primary key); SELECT create_reference_table('ref_table'); INSERT INTO ref_table VALUES (1), (3), (5), (7); @@ -83,7 +81,7 @@ step "s2-lock-ref-table-placement-on-coordinator" step "s2-view-dist" { - SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' ORDER BY query DESC; + SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' AND query NOT ILIKE '%add_node%' ORDER BY query DESC; } step "s2-view-worker" @@ -94,7 +92,8 @@ step "s2-view-worker" WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%dump_local_%' AND - query NOT ILIKE '%citus_internal_local_blocked_processes%' + query NOT ILIKE '%citus_internal_local_blocked_processes%' AND + query NOT ILIKE '%add_node%' ORDER BY query, query_hostport DESC; } @@ -123,14 +122,25 @@ step "deadlock-checker-call" SELECT check_distributed_deadlocks(); } + +// adding node in setup stage prevents getting a gpid with proper nodeid +session "add-node" + +// we issue the checker not only when there are deadlocks to ensure that we never cancel +// backend inappropriately +step "add-node" +{ + SELECT 1 FROM master_add_node('localhost', 57636, groupid => 0); +} + // verify that locks on the placement of the reference table on the coordinator is // taken into account when looking for distributed deadlocks -permutation "s1-begin" "s2-begin" "s1-update-dist-table" "s2-lock-ref-table-placement-on-coordinator" "s1-lock-ref-table-placement-on-coordinator" "s2-update-dist-table" "deadlock-checker-call" "s1-end" "s2-end" +permutation "add-node" "s1-begin" "s2-begin" "s1-update-dist-table" "s2-lock-ref-table-placement-on-coordinator" "s1-lock-ref-table-placement-on-coordinator" "s2-update-dist-table" "deadlock-checker-call" "s1-end" "s2-end" // verify that *_dist_stat_activity() functions return the correct result when query // has a task on the coordinator. -permutation "s1-begin" "s2-begin" "s1-update-ref-table" "s2-sleep" "s2-view-dist" "s2-view-worker" "s2-end" "s1-end" +permutation "add-node" "s1-begin" "s2-begin" "s1-update-ref-table" "s2-sleep" "s2-view-dist" "s2-view-worker" "s2-end" "s1-end" // verify that get_*_active_transactions() functions return the correct result when // the query has a task on the coordinator. -permutation "s1-begin" "s2-begin" "s1-update-ref-table" "s2-active-transactions" "s1-end" "s2-end" +permutation "add-node" "s1-begin" "s2-begin" "s1-update-ref-table" "s2-active-transactions" "s1-end" "s2-end"