From ff2a125a5bb605b69cae833124b49ce4df0c490b Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Tue, 18 May 2021 16:46:31 +0300 Subject: [PATCH] Lookup hostname before execution (#4976) We lookup the hostname just before the execution so that even if there are cached entries in the prepared statement cache we use the updated entries. --- .../distributed/executor/adaptive_executor.c | 52 ++++++++- .../isolation_dump_global_wait_edges.out | 24 ++--- .../expected/isolation_update_node.out | 101 ++++++++++++++++-- .../isolation_update_node_lock_writes.out | 18 ++-- .../regress/spec/isolation_update_node.spec | 50 +++++++++ 5 files changed, 211 insertions(+), 34 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 2d52ad224..ca54b1b5d 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -142,6 +142,7 @@ #include "distributed/deparse_shard_query.h" #include "distributed/shared_connection_stats.h" #include "distributed/distributed_execution_locks.h" +#include "distributed/intermediate_result_pruning.h" #include "distributed/listutils.h" #include "distributed/local_executor.h" #include "distributed/multi_client_executor.h" @@ -684,6 +685,9 @@ static AttInMetadata * TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc); static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey); static void SetAttributeInputMetadata(DistributedExecution *execution, ShardCommandExecution *shardCommandExecution); +static void LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char **nodeName, + int *nodePort); +static bool IsDummyPlacement(ShardPlacement *taskPlacement); /* * AdaptiveExecutorPreExecutorRun gets called right before postgres starts its executor @@ -1751,8 +1755,10 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) foreach_ptr(taskPlacement, task->taskPlacementList) { int connectionFlags = 0; - char *nodeName = taskPlacement->nodeName; - int nodePort = taskPlacement->nodePort; + char *nodeName = NULL; + int nodePort = 0; + LookupTaskPlacementHostAndPort(taskPlacement, &nodeName, &nodePort); + WorkerPool *workerPool = FindOrCreateWorkerPool(execution, nodeName, nodePort); @@ -1900,6 +1906,48 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) } +/* + * LookupTaskPlacementHostAndPort sets the nodename and nodeport for the given task placement + * with a lookup. + */ +static void +LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char **nodeName, + int *nodePort) +{ + if (IsDummyPlacement(taskPlacement)) + { + /* + * If we create a dummy placement for the local node, it is possible + * that the entry doesn't exist in pg_dist_node, hence a lookup will fail. + * In that case we want to use the dummy placements values. + */ + *nodeName = taskPlacement->nodeName; + *nodePort = taskPlacement->nodePort; + } + else + { + /* + * We want to lookup the node information again since it is possible that + * there were changes in pg_dist_node and we will get those invalidations + * in LookupNodeForGroup. + */ + WorkerNode *workerNode = LookupNodeForGroup(taskPlacement->groupId); + *nodeName = workerNode->workerName; + *nodePort = workerNode->workerPort; + } +} + + +/* + * IsDummyPlacement returns true if the given placement is a dummy placement. + */ +static bool +IsDummyPlacement(ShardPlacement *taskPlacement) +{ + return taskPlacement->nodeId == LOCAL_NODE_ID; +} + + /* * WorkerPoolCompare is based on WorkerNodeCompare function. The function * compares two worker nodes by their host name and port number. diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges.out b/src/test/regress/expected/isolation_dump_global_wait_edges.out index b0343b982..4880b879a 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges.out @@ -13,7 +13,7 @@ step s1-update: step s2-update: UPDATE distributed_table SET y = 2 WHERE x = 1; -step detector-dump-wait-edges: +step detector-dump-wait-edges: SELECT waiting_transaction_num, blocking_transaction_num, @@ -28,11 +28,11 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -395 394 f +400 399 f transactionnumberwaitingtransactionnumbers -394 -395 394 +399 +400 399 step s1-abort: ABORT; @@ -57,10 +57,10 @@ step s1-update: step s2-update: UPDATE distributed_table SET y = 2 WHERE x = 1; -step s3-update: +step s3-update: UPDATE distributed_table SET y = 3 WHERE x = 1; -step detector-dump-wait-edges: +step detector-dump-wait-edges: SELECT waiting_transaction_num, blocking_transaction_num, @@ -75,14 +75,14 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -399 398 f -400 398 f -400 399 t +404 403 f +405 403 f +405 404 t transactionnumberwaitingtransactionnumbers -398 -399 398 -400 398,399 +403 +404 403 +405 403,404 step s1-abort: ABORT; diff --git a/src/test/regress/expected/isolation_update_node.out b/src/test/regress/expected/isolation_update_node.out index 7b22761dc..23602b509 100644 --- a/src/test/regress/expected/isolation_update_node.out +++ b/src/test/regress/expected/isolation_update_node.out @@ -6,7 +6,7 @@ nodeid nodename nodeport 22 localhost 57637 23 localhost 57638 step s1-begin: - BEGIN; + BEGIN; step s1-update-node-1: SELECT 1 FROM master_update_node( @@ -23,8 +23,8 @@ step s2-update-node-2: 'localhost', 58638); -step s1-commit: - COMMIT; +step s1-commit: + COMMIT; step s2-update-node-2: <... completed> ?column? @@ -48,7 +48,7 @@ nodeid nodename nodeport 24 localhost 57637 25 localhost 57638 step s1-begin: - BEGIN; + BEGIN; step s1-update-node-1: SELECT 1 FROM master_update_node( @@ -60,7 +60,7 @@ step s1-update-node-1: 1 step s2-begin: - BEGIN; + BEGIN; step s2-update-node-1: SELECT 1 FROM master_update_node( @@ -68,15 +68,15 @@ step s2-update-node-1: 'localhost', 58637); -step s1-commit: - COMMIT; +step s1-commit: + COMMIT; step s2-update-node-1: <... completed> ?column? 1 step s2-abort: - ABORT; + ABORT; step s1-show-nodes: SELECT nodeid, nodename, nodeport, isactive @@ -96,7 +96,7 @@ nodeid nodename nodeport 26 localhost 57637 27 localhost 57638 step s1-begin: - BEGIN; + BEGIN; step s1-update-node-1: SELECT 1 FROM master_update_node( @@ -110,8 +110,8 @@ step s1-update-node-1: step s2-start-metadata-sync-node-2: SELECT start_metadata_sync_to_node('localhost', 57638); -step s1-commit: - COMMIT; +step s1-commit: + COMMIT; step s2-start-metadata-sync-node-2: <... completed> start_metadata_sync_to_node @@ -133,3 +133,82 @@ master_run_on_worker (localhost,57638,t,"[{""f1"": 26, ""f2"": 26, ""f3"": ""localhost"", ""f4"": 58637}, {""f1"": 27, ""f2"": 27, ""f3"": ""localhost"", ""f4"": 57638}]") nodeid nodename nodeport + +starting permutation: s2-create-table s1-begin s1-update-node-nonexistent s1-prepare-transaction s2-cache-prepared-statement s1-commit-prepared s2-execute-prepared s1-update-node-existent s2-drop-table +nodeid nodename nodeport + +28 localhost 57637 +29 localhost 57638 +step s2-create-table: + CREATE TABLE test (a int); + SELECT create_distributed_table('test','a'); + +create_distributed_table + + +step s1-begin: + BEGIN; + +step s1-update-node-nonexistent: + SELECT 1 FROM master_update_node( + (select nodeid from pg_dist_node where nodeport = 57637), + 'non-existent', + 57637); + +?column? + +1 +step s1-prepare-transaction: + PREPARE transaction 'label'; + +step s2-cache-prepared-statement: + PREPARE foo AS SELECT COUNT(*) FROM test WHERE a = 3; + EXECUTE foo; + EXECUTE foo; + EXECUTE foo; + EXECUTE foo; + EXECUTE foo; + EXECUTE foo; + +count + +0 +count + +0 +count + +0 +count + +0 +count + +0 +count + +0 +step s1-commit-prepared: + COMMIT prepared 'label'; + +s2: WARNING: connection to the remote node non-existent:57637 failed with the following error: could not translate host name "non-existent" to address: Name or service not known +step s2-execute-prepared: + EXECUTE foo; + +count + +0 +step s1-update-node-existent: + SELECT 1 FROM master_update_node( + (select nodeid from pg_dist_node where nodeport = 57637), + 'localhost', + 57637); + +?column? + +1 +step s2-drop-table: + DROP TABLE test; + +nodeid nodename nodeport + diff --git a/src/test/regress/expected/isolation_update_node_lock_writes.out b/src/test/regress/expected/isolation_update_node_lock_writes.out index dcaa5b991..b60044b15 100644 --- a/src/test/regress/expected/isolation_update_node_lock_writes.out +++ b/src/test/regress/expected/isolation_update_node_lock_writes.out @@ -5,7 +5,7 @@ create_distributed_table step s1-begin: - BEGIN; + BEGIN; step s1-update-node-1: SELECT 1 FROM master_update_node( @@ -17,20 +17,20 @@ step s1-update-node-1: 1 step s2-begin: - BEGIN; + BEGIN; step s2-insert: INSERT INTO update_node(id, f1) SELECT id, md5(id::text) FROM generate_series(1, 10) as t(id); -step s1-commit: - COMMIT; +step s1-commit: + COMMIT; step s2-insert: <... completed> -error in steps s1-commit s2-insert: ERROR: relation "public.update_node_102008" does not exist +error in steps s1-commit s2-insert: ERROR: relation "public.update_node_102012" does not exist step s2-abort: - ABORT; + ABORT; nodeid nodename nodeport @@ -40,7 +40,7 @@ create_distributed_table step s2-begin: - BEGIN; + BEGIN; step s2-insert: INSERT INTO update_node(id, f1) @@ -53,8 +53,8 @@ step s1-update-node-1: 'localhost', 57638); -step s2-commit: - COMMIT; +step s2-commit: + COMMIT; step s1-update-node-1: <... completed> ?column? diff --git a/src/test/regress/spec/isolation_update_node.spec b/src/test/regress/spec/isolation_update_node.spec index feb21ffaf..09aabacce 100644 --- a/src/test/regress/spec/isolation_update_node.spec +++ b/src/test/regress/spec/isolation_update_node.spec @@ -19,6 +19,14 @@ step "s1-begin" BEGIN; } +step "s1-prepare-transaction" { + PREPARE transaction 'label'; +} + +step "s1-commit-prepared" { + COMMIT prepared 'label'; +} + step "s1-update-node-1" { SELECT 1 FROM master_update_node( @@ -27,6 +35,20 @@ step "s1-update-node-1" 58637); } +step "s1-update-node-nonexistent" { + SELECT 1 FROM master_update_node( + (select nodeid from pg_dist_node where nodeport = 57637), + 'non-existent', + 57637); +} + +step "s1-update-node-existent" { + SELECT 1 FROM master_update_node( + (select nodeid from pg_dist_node where nodeport = 57637), + 'localhost', + 57637); +} + step "s1-commit" { COMMIT; @@ -62,6 +84,25 @@ step "s2-update-node-2" 58638); } +step "s2-create-table" { + CREATE TABLE test (a int); + SELECT create_distributed_table('test','a'); +} + +step "s2-cache-prepared-statement" { + PREPARE foo AS SELECT COUNT(*) FROM test WHERE a = 3; + EXECUTE foo; + EXECUTE foo; + EXECUTE foo; + EXECUTE foo; + EXECUTE foo; + EXECUTE foo; +} + +step "s2-execute-prepared" { + EXECUTE foo; +} + step "s2-verify-metadata" { SELECT nodeid, groupid, nodename, nodeport FROM pg_dist_node ORDER BY nodeid; @@ -76,6 +117,10 @@ step "s2-start-metadata-sync-node-2" SELECT start_metadata_sync_to_node('localhost', 57638); } +step "s2-drop-table" { + DROP TABLE test; +} + step "s2-abort" { ABORT; @@ -91,3 +136,8 @@ permutation "s1-begin" "s1-update-node-1" "s2-begin" "s2-update-node-1" "s1-comm // cannot run start_metadata_sync_to_node in a transaction, so we're not // testing the reverse order here. permutation "s1-begin" "s1-update-node-1" "s2-start-metadata-sync-node-2" "s1-commit" "s2-verify-metadata" + +// make sure we have entries in prepared statement cache +// then make sure that after we update pg_dist_node, the changes are visible to +// the prepared statement +permutation "s2-create-table" "s1-begin" "s1-update-node-nonexistent" "s1-prepare-transaction" "s2-cache-prepared-statement" "s1-commit-prepared" "s2-execute-prepared" "s1-update-node-existent" "s2-drop-table"