From df95d59e33196ce11ad8c2a217e98e55956b5a0b Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 23 Feb 2022 11:06:47 +0100 Subject: [PATCH] Drop support for CitusInitiatedBackend CitusInitiatedBackend was a pre-mature implemenation of the whole GlobalPID infrastructure. We used it to track whether any individual query is triggered by Citus or not. As of now, after GlobalPID is already in place, we don't need CitusInitiatedBackend, in fact it could even be wrong. --- .../commands/citus_global_signal.c | 5 ++- .../distributed/executor/citus_custom_scan.c | 2 - .../distributed/metadata/node_metadata.c | 7 ++- .../distributed/transaction/backend_data.c | 41 +++-------------- .../transaction/citus_dist_stat_activity.c | 40 +++++++++-------- src/include/distributed/backend_data.h | 14 +----- src/include/distributed/worker_manager.h | 2 +- .../isolation_distributed_transaction_id.out | 32 ++++++------- ...lation_get_distributed_wait_queries_mx.out | 22 ++++----- ...licate_reference_tables_to_coordinator.out | 45 ++++++++++++++----- .../isolation_distributed_transaction_id.spec | 4 +- .../spec/isolation_metadata_sync_vs_all.spec | 2 - ...icate_reference_tables_to_coordinator.spec | 24 +++++++--- 13 files changed, 119 insertions(+), 121 deletions(-) diff --git a/src/backend/distributed/commands/citus_global_signal.c b/src/backend/distributed/commands/citus_global_signal.c index fc7618159..64bb67f0d 100644 --- a/src/backend/distributed/commands/citus_global_signal.c +++ b/src/backend/distributed/commands/citus_global_signal.c @@ -87,10 +87,11 @@ CitusSignalBackend(uint64 globalPID, uint64 timeout, int sig) } #endif - int nodeId = ExtractNodeIdFromGlobalPID(globalPID); + bool missingOk = false; + int nodeId = ExtractNodeIdFromGlobalPID(globalPID, missingOk); int processId = ExtractProcessIdFromGlobalPID(globalPID); - WorkerNode *workerNode = FindNodeWithNodeId(nodeId); + WorkerNode *workerNode = FindNodeWithNodeId(nodeId, missingOk); StringInfo cancelQuery = makeStringInfo(); diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 1c9800663..92bfbd715 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -150,8 +150,6 @@ RegisterCitusCustomScanMethods(void) static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags) { - MarkCitusInitiatedCoordinatorBackend(); - CitusScanState *scanState = (CitusScanState *) node; /* diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 967d6bef3..498497e64 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -1541,7 +1541,7 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort) * If the node cannot be found this functions errors. */ WorkerNode * -FindNodeWithNodeId(int nodeId) +FindNodeWithNodeId(int nodeId, bool missingOk) { List *workerList = ActiveReadableNodeList(); WorkerNode *workerNode = NULL; @@ -1555,7 +1555,10 @@ FindNodeWithNodeId(int nodeId) } /* there isn't any node with nodeId in pg_dist_node */ - elog(ERROR, "worker node with node id %d could not be found", nodeId); + if (!missingOk) + { + elog(ERROR, "worker node with node id %d could not be found", nodeId); + } return NULL; } diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 5ca517199..12ff8d78b 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -152,9 +152,6 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS) MyBackendData->transactionId.timestamp = timestamp; MyBackendData->transactionId.transactionOriginator = false; - MyBackendData->citusBackend.initiatorNodeIdentifier = - MyBackendData->transactionId.initiatorNodeIdentifier; - SpinLockRelease(&MyBackendData->mutex); PG_RETURN_VOID(); @@ -385,7 +382,6 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto &backendManagementShmemData->backends[backendIndex]; /* to work on data after releasing g spinlock to protect against errors */ - int initiatorNodeIdentifier = -1; uint64 transactionNumber = 0; SpinLockAcquire(¤tBackend->mutex); @@ -408,7 +404,6 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto Oid databaseId = currentBackend->databaseId; int backendPid = ProcGlobal->allProcs[backendIndex].pid; - initiatorNodeIdentifier = currentBackend->citusBackend.initiatorNodeIdentifier; /* * We prefer to use worker_query instead of distributedCommandOriginator in @@ -423,9 +418,12 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto SpinLockRelease(¤tBackend->mutex); + bool missingOk = true; + int nodeId = ExtractNodeIdFromGlobalPID(currentBackend->globalPID, missingOk); + values[0] = ObjectIdGetDatum(databaseId); values[1] = Int32GetDatum(backendPid); - values[2] = Int32GetDatum(initiatorNodeIdentifier); + values[2] = Int32GetDatum(nodeId); values[3] = !distributedCommandOriginator; values[4] = UInt64GetDatum(transactionNumber); values[5] = TimestampTzGetDatum(transactionIdTimestamp); @@ -520,7 +518,6 @@ BackendManagementShmemInit(void) { BackendData *backendData = &backendManagementShmemData->backends[backendIndex]; - backendData->citusBackend.initiatorNodeIdentifier = -1; SpinLockInit(&backendData->mutex); } } @@ -660,8 +657,6 @@ UnSetDistributedTransactionId(void) MyBackendData->transactionId.transactionNumber = 0; MyBackendData->transactionId.timestamp = 0; - MyBackendData->citusBackend.initiatorNodeIdentifier = -1; - SpinLockRelease(&MyBackendData->mutex); } } @@ -772,29 +767,6 @@ AssignDistributedTransactionId(void) MyBackendData->transactionId.transactionNumber = nextTransactionNumber; MyBackendData->transactionId.timestamp = currentTimestamp; - MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId; - - SpinLockRelease(&MyBackendData->mutex); -} - - -/* - * MarkCitusInitiatedCoordinatorBackend sets that coordinator backend is - * initiated by Citus. - */ -void -MarkCitusInitiatedCoordinatorBackend(void) -{ - /* - * GetLocalGroupId may throw exception which can cause leaving spin lock - * unreleased. Calling GetLocalGroupId function before the lock to avoid this. - */ - int32 localGroupId = GetLocalGroupId(); - - SpinLockAcquire(&MyBackendData->mutex); - - MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId; - SpinLockRelease(&MyBackendData->mutex); } @@ -926,11 +898,12 @@ ExtractGlobalPID(char *applicationName) * gives us the node id. */ int -ExtractNodeIdFromGlobalPID(uint64 globalPID) +ExtractNodeIdFromGlobalPID(uint64 globalPID, bool missingOk) { int nodeId = (int) (globalPID / GLOBAL_PID_NODE_ID_MULTIPLIER); - if (nodeId == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA) + if (!missingOk && + nodeId == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA) { ereport(ERROR, (errmsg("originator node of the query with the global pid " "%lu is not in Citus' metadata", globalPID), diff --git a/src/backend/distributed/transaction/citus_dist_stat_activity.c b/src/backend/distributed/transaction/citus_dist_stat_activity.c index ec4f6e8a3..0ee3925fb 100644 --- a/src/backend/distributed/transaction/citus_dist_stat_activity.c +++ b/src/backend/distributed/transaction/citus_dist_stat_activity.c @@ -538,33 +538,35 @@ ReplaceInitiatorNodeIdentifier(int initiator_node_identifier, * transaction. However, we cannot know which node has initiated * the worker query. */ - if (initiator_node_identifier > 0) - { - bool nodeExists = false; - - initiatorWorkerNode = PrimaryNodeForGroup(initiator_node_identifier, &nodeExists); - - /* a query should run on an existing node */ - Assert(nodeExists); - if (initiatorWorkerNode == NULL) - { - ereport(ERROR, (errmsg("no primary node found for group %d", - initiator_node_identifier))); - } - citusDistStat->master_query_host_name = - cstring_to_text(initiatorWorkerNode->workerName); - citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort; - } - else if (initiator_node_identifier == 0 && IsCoordinator()) + if (initiator_node_identifier == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA && + IsCoordinator()) { citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); citusDistStat->master_query_host_port = PostPortNumber; } - else if (initiator_node_identifier == 0) + else if (initiator_node_identifier == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA) { citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); citusDistStat->master_query_host_port = 0; } + else if (initiator_node_identifier > 0) + { + /* a query should run on an existing node, but lets be defensive */ + bool missingOk = true; + initiatorWorkerNode = FindNodeWithNodeId(initiator_node_identifier, missingOk); + + if (initiatorWorkerNode) + { + citusDistStat->master_query_host_name = + cstring_to_text(initiatorWorkerNode->workerName); + citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort; + } + else + { + citusDistStat->master_query_host_name = NULL; + citusDistStat->master_query_host_port = 0; + } + } else { citusDistStat->master_query_host_name = NULL; diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index b463b89f5..c34d94670 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -22,16 +22,6 @@ #include "storage/s_lock.h" -/* - * CitusInitiatedBackend keeps some information about the backends that are - * initiated by Citus. - */ -typedef struct CitusInitiatedBackend -{ - int initiatorNodeIdentifier; -} CitusInitiatedBackend; - - /* * Each backend's active distributed transaction information is tracked via * BackendData in shared memory. @@ -51,7 +41,6 @@ typedef struct BackendData bool cancelledDueToDeadlock; uint64 globalPID; bool distributedCommandOriginator; - CitusInitiatedBackend citusBackend; DistributedTransactionId transactionId; } BackendData; @@ -64,13 +53,12 @@ extern void UnlockBackendSharedMemory(void); extern void UnSetDistributedTransactionId(void); extern void UnSetGlobalPID(void); extern void AssignDistributedTransactionId(void); -extern void MarkCitusInitiatedCoordinatorBackend(void); extern void AssignGlobalPID(void); extern uint64 GetGlobalPID(void); extern void OverrideBackendDataDistributedCommandOriginator(bool distributedCommandOriginator); extern uint64 ExtractGlobalPID(char *applicationName); -extern int ExtractNodeIdFromGlobalPID(uint64 globalPID); +extern int ExtractNodeIdFromGlobalPID(uint64 globalPID, bool missingOk); extern int ExtractProcessIdFromGlobalPID(uint64 globalPID); extern void GetBackendDataForProc(PGPROC *proc, BackendData *result); extern void CancelTransactionDueToDeadlock(PGPROC *proc); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index db8adaedb..27de1d464 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -86,7 +86,7 @@ extern List * ActiveReadableNodeList(void); extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort); -extern WorkerNode * FindNodeWithNodeId(int nodeId); +extern WorkerNode * FindNodeWithNodeId(int nodeId, bool missingOk); extern List * ReadDistNode(bool includeNodesFromOtherClusters); extern void EnsureCoordinator(void); extern void InsertCoordinatorIfClusterEmpty(void); diff --git a/src/test/regress/expected/isolation_distributed_transaction_id.out b/src/test/regress/expected/isolation_distributed_transaction_id.out index 3d44f0069..be52248f8 100644 --- a/src/test/regress/expected/isolation_distributed_transaction_id.out +++ b/src/test/regress/expected/isolation_distributed_transaction_id.out @@ -13,7 +13,7 @@ assign_distributed_transaction_id (1 row) step s1-get-all-transactions: - SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; initiator_node_identifier|transaction_number|transaction_stamp --------------------------------------------------------------------- @@ -32,7 +32,7 @@ assign_distributed_transaction_id (1 row) step s2-get-all-transactions: - SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; initiator_node_identifier|transaction_number|transaction_stamp --------------------------------------------------------------------- @@ -51,7 +51,7 @@ assign_distributed_transaction_id (1 row) step s3-get-all-transactions: - SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; initiator_node_identifier|transaction_number|transaction_stamp --------------------------------------------------------------------- @@ -70,10 +70,10 @@ step s3-commit: starting permutation: s1-create-table s1-begin s1-insert s1-verify-current-xact-is-on-worker s1-drop-table s1-commit step s1-create-table: - -- some tests also use distributed table - CREATE TABLE distributed_transaction_id_table(some_value int, other_value int); - SET citus.shard_count TO 4; - SELECT create_distributed_table('distributed_transaction_id_table', 'some_value'); + -- some tests also use distributed table + CREATE TABLE distributed_transaction_id_table(some_value int, other_value int); + SET citus.shard_count TO 4; + SELECT create_distributed_table('distributed_transaction_id_table', 'some_value'); create_distributed_table --------------------------------------------------------------------- @@ -84,16 +84,16 @@ step s1-begin: BEGIN; step s1-insert: - INSERT INTO distributed_transaction_id_table VALUES (1, 1); + INSERT INTO distributed_transaction_id_table VALUES (1, 1); step s1-verify-current-xact-is-on-worker: - SELECT - remote.nodeport, - remote.result = row(xact.initiator_node_identifier, xact.transaction_number)::text AS xact_exists - FROM - get_current_transaction_id() as xact, - run_command_on_workers($$ - SELECT row(initiator_node_identifier, transaction_number) + SELECT + remote.nodeport, + remote.result = row(xact.transaction_number)::text AS xact_exists + FROM + get_current_transaction_id() as xact, + run_command_on_workers($$ + SELECT row(transaction_number) FROM get_all_active_transactions() WHERE transaction_number != 0; $$) as remote @@ -106,7 +106,7 @@ nodeport|xact_exists (2 rows) step s1-drop-table: - DROP TABLE distributed_transaction_id_table; + DROP TABLE distributed_transaction_id_table; step s1-commit: COMMIT; diff --git a/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out b/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out index cae2222ed..109c61186 100644 --- a/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out +++ b/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out @@ -33,7 +33,7 @@ blocked_statement |current_statement_in_blockin --------------------------------------------------------------------- UPDATE ref_table SET value_1 = 12 WHERE user_id = 1| UPDATE ref_table SET value_1 = 15; -|localhost |coordinator_host | 57638| 57636 +|coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit: @@ -116,7 +116,7 @@ step s3-select-distributed-waiting-queries: blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- -UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|localhost |localhost | 57638| 57637 +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit-worker: @@ -212,7 +212,7 @@ step s3-select-distributed-waiting-queries: blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- -UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |localhost |localhost | 57638| 57637 +UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit-worker: @@ -308,7 +308,7 @@ step s3-select-distributed-waiting-queries: blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- -UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|DELETE FROM ref_table WHERE user_id = 1|localhost |localhost | 57638| 57637 +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|DELETE FROM ref_table WHERE user_id = 1|coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit-worker: @@ -404,7 +404,7 @@ step s3-select-distributed-waiting-queries: blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- -UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|INSERT INTO ref_table VALUES(8,81),(9,91)|localhost |localhost | 57638| 57637 +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|INSERT INTO ref_table VALUES(8,81),(9,91)|coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit-worker: @@ -594,7 +594,7 @@ step s3-select-distributed-waiting-queries: blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- -UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|COPY ref_table FROM PROGRAM 'echo 10, 101 && echo 11, 111' WITH CSV|localhost |localhost | 57638| 57637 +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|COPY ref_table FROM PROGRAM 'echo 10, 101 && echo 11, 111' WITH CSV|coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit-worker: @@ -878,7 +878,7 @@ step s3-select-distributed-waiting-queries: blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- -UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|SELECT * FROM ref_table FOR UPDATE |localhost |localhost | 57638| 57637 +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|SELECT * FROM ref_table FOR UPDATE |coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit-worker: @@ -963,7 +963,7 @@ blocked_statement |current_s --------------------------------------------------------------------- ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); -|INSERT INTO ref_table VALUES(8,81),(9,91)|coordinator_host |localhost | 57636| 57638 +|INSERT INTO ref_table VALUES(8,81),(9,91)|coordinator_host |coordinator_host | 57636| 57636 (1 row) step s2-commit-worker: @@ -1073,7 +1073,7 @@ step s3-select-distributed-waiting-queries: blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- -UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |localhost |localhost | 57637| 57637 +UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit-worker: @@ -1161,7 +1161,7 @@ step s3-select-distributed-waiting-queries: blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- -UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|localhost |localhost | 57638| 57637 +UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit-worker: @@ -1225,7 +1225,7 @@ blocked_statement |current_statement_in_blockin --------------------------------------------------------------------- UPDATE ref_table SET value_1 = 12 WHERE user_id = 1| UPDATE ref_table SET value_1 = 15; -|localhost |coordinator_host | 57638| 57636 +|coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit: diff --git a/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out index 2d49f8586..dd7ddefad 100644 --- a/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out @@ -1,11 +1,19 @@ -Parsed test spec with 3 sessions +Parsed test spec with 4 sessions -starting permutation: s1-begin s2-begin s1-update-dist-table s2-lock-ref-table-placement-on-coordinator s1-lock-ref-table-placement-on-coordinator s2-update-dist-table deadlock-checker-call s1-end s2-end +starting permutation: add-node s1-begin s2-begin s1-update-dist-table s2-lock-ref-table-placement-on-coordinator s1-lock-ref-table-placement-on-coordinator s2-update-dist-table deadlock-checker-call s1-end s2-end create_distributed_table --------------------------------------------------------------------- (1 row) +step add-node: + SELECT 1 FROM master_add_node('localhost', 57636, groupid => 0); + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + step s1-begin: BEGIN; @@ -59,12 +67,20 @@ master_remove_node (1 row) -starting permutation: s1-begin s2-begin s1-update-ref-table s2-sleep s2-view-dist s2-view-worker s2-end s1-end +starting permutation: add-node s1-begin s2-begin s1-update-ref-table s2-sleep s2-view-dist s2-view-worker s2-end s1-end create_distributed_table --------------------------------------------------------------------- (1 row) +step add-node: + SELECT 1 FROM master_add_node('localhost', 57636, groupid => 0); + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + step s1-begin: BEGIN; @@ -83,7 +99,7 @@ pg_sleep (1 row) step s2-view-dist: - SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' ORDER BY query DESC; + SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' AND query NOT ILIKE '%add_node%' ORDER BY query DESC; query |query_hostname |query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- @@ -92,7 +108,7 @@ query |query_hostname |query_hostport|distri |coordinator_host| 57636| | 0|idle |Client |ClientRead|postgres|regression update ref_table set a = a + 1; - |coordinator_host| 57636|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression + |coordinator_host| 57636| | 0|idle in transaction|Client |ClientRead|postgres|regression (2 rows) step s2-view-worker: @@ -102,13 +118,14 @@ step s2-view-worker: WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%dump_local_%' AND - query NOT ILIKE '%citus_internal_local_blocked_processes%' + query NOT ILIKE '%citus_internal_local_blocked_processes%' AND + query NOT ILIKE '%add_node%' ORDER BY query, query_hostport DESC; -query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname +query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- -UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression -UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638| | 0|idle in transaction|Client |ClientRead|postgres|regression +UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637| | 0|idle in transaction|Client |ClientRead|postgres|regression (2 rows) step s2-end: @@ -123,12 +140,20 @@ master_remove_node (1 row) -starting permutation: s1-begin s2-begin s1-update-ref-table s2-active-transactions s1-end s2-end +starting permutation: add-node s1-begin s2-begin s1-update-ref-table s2-active-transactions s1-end s2-end create_distributed_table --------------------------------------------------------------------- (1 row) +step add-node: + SELECT 1 FROM master_add_node('localhost', 57636, groupid => 0); + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + step s1-begin: BEGIN; diff --git a/src/test/regress/spec/isolation_distributed_transaction_id.spec b/src/test/regress/spec/isolation_distributed_transaction_id.spec index f928918ed..0b626f2c8 100644 --- a/src/test/regress/spec/isolation_distributed_transaction_id.spec +++ b/src/test/regress/spec/isolation_distributed_transaction_id.spec @@ -49,11 +49,11 @@ step "s1-verify-current-xact-is-on-worker" { SELECT remote.nodeport, - remote.result = row(xact.initiator_node_identifier, xact.transaction_number)::text AS xact_exists + remote.result = row(xact.transaction_number)::text AS xact_exists FROM get_current_transaction_id() as xact, run_command_on_workers($$ - SELECT row(initiator_node_identifier, transaction_number) + SELECT row(transaction_number) FROM get_all_active_transactions() WHERE transaction_number != 0; $$) as remote diff --git a/src/test/regress/spec/isolation_metadata_sync_vs_all.spec b/src/test/regress/spec/isolation_metadata_sync_vs_all.spec index d11565e2f..80cfe2e33 100644 --- a/src/test/regress/spec/isolation_metadata_sync_vs_all.spec +++ b/src/test/regress/spec/isolation_metadata_sync_vs_all.spec @@ -29,7 +29,6 @@ setup teardown { - // drop all distributed tables DROP TABLE IF EXISTS ref_table, dist_table, dist_partitioned_table, @@ -39,7 +38,6 @@ teardown new_ref_table; - // drop all distributed objects DROP FUNCTION activate_node_snapshot(); DROP FUNCTION IF EXISTS squares(int); DROP TYPE IF EXISTS my_type; diff --git a/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec b/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec index fa2079ba5..c4d6c8fc1 100644 --- a/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec +++ b/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec @@ -3,8 +3,6 @@ setup SELECT citus_internal.replace_isolation_tester_func(); SELECT citus_internal.refresh_isolation_tester_prepared_statement(); - SELECT master_add_node('localhost', 57636, groupid => 0); - CREATE TABLE ref_table(a int primary key); SELECT create_reference_table('ref_table'); INSERT INTO ref_table VALUES (1), (3), (5), (7); @@ -83,7 +81,7 @@ step "s2-lock-ref-table-placement-on-coordinator" step "s2-view-dist" { - SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' ORDER BY query DESC; + SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' AND query NOT ILIKE '%add_node%' ORDER BY query DESC; } step "s2-view-worker" @@ -94,7 +92,8 @@ step "s2-view-worker" WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%dump_local_%' AND - query NOT ILIKE '%citus_internal_local_blocked_processes%' + query NOT ILIKE '%citus_internal_local_blocked_processes%' AND + query NOT ILIKE '%add_node%' ORDER BY query, query_hostport DESC; } @@ -123,14 +122,25 @@ step "deadlock-checker-call" SELECT check_distributed_deadlocks(); } + +// adding node in setup stage prevents getting a gpid with proper nodeid +session "add-node" + +// we issue the checker not only when there are deadlocks to ensure that we never cancel +// backend inappropriately +step "add-node" +{ + SELECT 1 FROM master_add_node('localhost', 57636, groupid => 0); +} + // verify that locks on the placement of the reference table on the coordinator is // taken into account when looking for distributed deadlocks -permutation "s1-begin" "s2-begin" "s1-update-dist-table" "s2-lock-ref-table-placement-on-coordinator" "s1-lock-ref-table-placement-on-coordinator" "s2-update-dist-table" "deadlock-checker-call" "s1-end" "s2-end" +permutation "add-node" "s1-begin" "s2-begin" "s1-update-dist-table" "s2-lock-ref-table-placement-on-coordinator" "s1-lock-ref-table-placement-on-coordinator" "s2-update-dist-table" "deadlock-checker-call" "s1-end" "s2-end" // verify that *_dist_stat_activity() functions return the correct result when query // has a task on the coordinator. -permutation "s1-begin" "s2-begin" "s1-update-ref-table" "s2-sleep" "s2-view-dist" "s2-view-worker" "s2-end" "s1-end" +permutation "add-node" "s1-begin" "s2-begin" "s1-update-ref-table" "s2-sleep" "s2-view-dist" "s2-view-worker" "s2-end" "s1-end" // verify that get_*_active_transactions() functions return the correct result when // the query has a task on the coordinator. -permutation "s1-begin" "s2-begin" "s1-update-ref-table" "s2-active-transactions" "s1-end" "s2-end" +permutation "add-node" "s1-begin" "s2-begin" "s1-update-ref-table" "s2-active-transactions" "s1-end" "s2-end"