mirror of https://github.com/citusdata/citus.git
Lookup hostname before execution (#4976)
We lookup the hostname just before the execution so that even if there are cached entries in the prepared statement cache we use the updated entries.pull/4987/head^2
parent
eaa7d2bada
commit
ff2a125a5b
|
@ -142,6 +142,7 @@
|
||||||
#include "distributed/deparse_shard_query.h"
|
#include "distributed/deparse_shard_query.h"
|
||||||
#include "distributed/shared_connection_stats.h"
|
#include "distributed/shared_connection_stats.h"
|
||||||
#include "distributed/distributed_execution_locks.h"
|
#include "distributed/distributed_execution_locks.h"
|
||||||
|
#include "distributed/intermediate_result_pruning.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/local_executor.h"
|
#include "distributed/local_executor.h"
|
||||||
#include "distributed/multi_client_executor.h"
|
#include "distributed/multi_client_executor.h"
|
||||||
|
@ -684,6 +685,9 @@ static AttInMetadata * TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc);
|
||||||
static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey);
|
static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey);
|
||||||
static void SetAttributeInputMetadata(DistributedExecution *execution,
|
static void SetAttributeInputMetadata(DistributedExecution *execution,
|
||||||
ShardCommandExecution *shardCommandExecution);
|
ShardCommandExecution *shardCommandExecution);
|
||||||
|
static void LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char **nodeName,
|
||||||
|
int *nodePort);
|
||||||
|
static bool IsDummyPlacement(ShardPlacement *taskPlacement);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AdaptiveExecutorPreExecutorRun gets called right before postgres starts its executor
|
* AdaptiveExecutorPreExecutorRun gets called right before postgres starts its executor
|
||||||
|
@ -1751,8 +1755,10 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
|
||||||
foreach_ptr(taskPlacement, task->taskPlacementList)
|
foreach_ptr(taskPlacement, task->taskPlacementList)
|
||||||
{
|
{
|
||||||
int connectionFlags = 0;
|
int connectionFlags = 0;
|
||||||
char *nodeName = taskPlacement->nodeName;
|
char *nodeName = NULL;
|
||||||
int nodePort = taskPlacement->nodePort;
|
int nodePort = 0;
|
||||||
|
LookupTaskPlacementHostAndPort(taskPlacement, &nodeName, &nodePort);
|
||||||
|
|
||||||
WorkerPool *workerPool = FindOrCreateWorkerPool(execution, nodeName,
|
WorkerPool *workerPool = FindOrCreateWorkerPool(execution, nodeName,
|
||||||
nodePort);
|
nodePort);
|
||||||
|
|
||||||
|
@ -1900,6 +1906,48 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LookupTaskPlacementHostAndPort sets the nodename and nodeport for the given task placement
|
||||||
|
* with a lookup.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char **nodeName,
|
||||||
|
int *nodePort)
|
||||||
|
{
|
||||||
|
if (IsDummyPlacement(taskPlacement))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If we create a dummy placement for the local node, it is possible
|
||||||
|
* that the entry doesn't exist in pg_dist_node, hence a lookup will fail.
|
||||||
|
* In that case we want to use the dummy placements values.
|
||||||
|
*/
|
||||||
|
*nodeName = taskPlacement->nodeName;
|
||||||
|
*nodePort = taskPlacement->nodePort;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We want to lookup the node information again since it is possible that
|
||||||
|
* there were changes in pg_dist_node and we will get those invalidations
|
||||||
|
* in LookupNodeForGroup.
|
||||||
|
*/
|
||||||
|
WorkerNode *workerNode = LookupNodeForGroup(taskPlacement->groupId);
|
||||||
|
*nodeName = workerNode->workerName;
|
||||||
|
*nodePort = workerNode->workerPort;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsDummyPlacement returns true if the given placement is a dummy placement.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
IsDummyPlacement(ShardPlacement *taskPlacement)
|
||||||
|
{
|
||||||
|
return taskPlacement->nodeId == LOCAL_NODE_ID;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WorkerPoolCompare is based on WorkerNodeCompare function. The function
|
* WorkerPoolCompare is based on WorkerNodeCompare function. The function
|
||||||
* compares two worker nodes by their host name and port number.
|
* compares two worker nodes by their host name and port number.
|
||||||
|
|
|
@ -28,11 +28,11 @@ step detector-dump-wait-edges:
|
||||||
|
|
||||||
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
||||||
|
|
||||||
395 394 f
|
400 399 f
|
||||||
transactionnumberwaitingtransactionnumbers
|
transactionnumberwaitingtransactionnumbers
|
||||||
|
|
||||||
394
|
399
|
||||||
395 394
|
400 399
|
||||||
step s1-abort:
|
step s1-abort:
|
||||||
ABORT;
|
ABORT;
|
||||||
|
|
||||||
|
@ -75,14 +75,14 @@ step detector-dump-wait-edges:
|
||||||
|
|
||||||
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
||||||
|
|
||||||
399 398 f
|
404 403 f
|
||||||
400 398 f
|
405 403 f
|
||||||
400 399 t
|
405 404 t
|
||||||
transactionnumberwaitingtransactionnumbers
|
transactionnumberwaitingtransactionnumbers
|
||||||
|
|
||||||
398
|
403
|
||||||
399 398
|
404 403
|
||||||
400 398,399
|
405 403,404
|
||||||
step s1-abort:
|
step s1-abort:
|
||||||
ABORT;
|
ABORT;
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,7 @@ nodeid nodename nodeport
|
||||||
22 localhost 57637
|
22 localhost 57637
|
||||||
23 localhost 57638
|
23 localhost 57638
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
||||||
step s1-update-node-1:
|
step s1-update-node-1:
|
||||||
SELECT 1 FROM master_update_node(
|
SELECT 1 FROM master_update_node(
|
||||||
|
@ -24,7 +24,7 @@ step s2-update-node-2:
|
||||||
58638);
|
58638);
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
step s1-commit:
|
step s1-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
step s2-update-node-2: <... completed>
|
step s2-update-node-2: <... completed>
|
||||||
?column?
|
?column?
|
||||||
|
@ -48,7 +48,7 @@ nodeid nodename nodeport
|
||||||
24 localhost 57637
|
24 localhost 57637
|
||||||
25 localhost 57638
|
25 localhost 57638
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
||||||
step s1-update-node-1:
|
step s1-update-node-1:
|
||||||
SELECT 1 FROM master_update_node(
|
SELECT 1 FROM master_update_node(
|
||||||
|
@ -60,7 +60,7 @@ step s1-update-node-1:
|
||||||
|
|
||||||
1
|
1
|
||||||
step s2-begin:
|
step s2-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
||||||
step s2-update-node-1:
|
step s2-update-node-1:
|
||||||
SELECT 1 FROM master_update_node(
|
SELECT 1 FROM master_update_node(
|
||||||
|
@ -69,14 +69,14 @@ step s2-update-node-1:
|
||||||
58637);
|
58637);
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
step s1-commit:
|
step s1-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
step s2-update-node-1: <... completed>
|
step s2-update-node-1: <... completed>
|
||||||
?column?
|
?column?
|
||||||
|
|
||||||
1
|
1
|
||||||
step s2-abort:
|
step s2-abort:
|
||||||
ABORT;
|
ABORT;
|
||||||
|
|
||||||
step s1-show-nodes:
|
step s1-show-nodes:
|
||||||
SELECT nodeid, nodename, nodeport, isactive
|
SELECT nodeid, nodename, nodeport, isactive
|
||||||
|
@ -96,7 +96,7 @@ nodeid nodename nodeport
|
||||||
26 localhost 57637
|
26 localhost 57637
|
||||||
27 localhost 57638
|
27 localhost 57638
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
||||||
step s1-update-node-1:
|
step s1-update-node-1:
|
||||||
SELECT 1 FROM master_update_node(
|
SELECT 1 FROM master_update_node(
|
||||||
|
@ -111,7 +111,7 @@ step s2-start-metadata-sync-node-2:
|
||||||
SELECT start_metadata_sync_to_node('localhost', 57638);
|
SELECT start_metadata_sync_to_node('localhost', 57638);
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
step s1-commit:
|
step s1-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
step s2-start-metadata-sync-node-2: <... completed>
|
step s2-start-metadata-sync-node-2: <... completed>
|
||||||
start_metadata_sync_to_node
|
start_metadata_sync_to_node
|
||||||
|
@ -133,3 +133,82 @@ master_run_on_worker
|
||||||
(localhost,57638,t,"[{""f1"": 26, ""f2"": 26, ""f3"": ""localhost"", ""f4"": 58637}, {""f1"": 27, ""f2"": 27, ""f3"": ""localhost"", ""f4"": 57638}]")
|
(localhost,57638,t,"[{""f1"": 26, ""f2"": 26, ""f3"": ""localhost"", ""f4"": 58637}, {""f1"": 27, ""f2"": 27, ""f3"": ""localhost"", ""f4"": 57638}]")
|
||||||
nodeid nodename nodeport
|
nodeid nodename nodeport
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s2-create-table s1-begin s1-update-node-nonexistent s1-prepare-transaction s2-cache-prepared-statement s1-commit-prepared s2-execute-prepared s1-update-node-existent s2-drop-table
|
||||||
|
nodeid nodename nodeport
|
||||||
|
|
||||||
|
28 localhost 57637
|
||||||
|
29 localhost 57638
|
||||||
|
step s2-create-table:
|
||||||
|
CREATE TABLE test (a int);
|
||||||
|
SELECT create_distributed_table('test','a');
|
||||||
|
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-update-node-nonexistent:
|
||||||
|
SELECT 1 FROM master_update_node(
|
||||||
|
(select nodeid from pg_dist_node where nodeport = 57637),
|
||||||
|
'non-existent',
|
||||||
|
57637);
|
||||||
|
|
||||||
|
?column?
|
||||||
|
|
||||||
|
1
|
||||||
|
step s1-prepare-transaction:
|
||||||
|
PREPARE transaction 'label';
|
||||||
|
|
||||||
|
step s2-cache-prepared-statement:
|
||||||
|
PREPARE foo AS SELECT COUNT(*) FROM test WHERE a = 3;
|
||||||
|
EXECUTE foo;
|
||||||
|
EXECUTE foo;
|
||||||
|
EXECUTE foo;
|
||||||
|
EXECUTE foo;
|
||||||
|
EXECUTE foo;
|
||||||
|
EXECUTE foo;
|
||||||
|
|
||||||
|
count
|
||||||
|
|
||||||
|
0
|
||||||
|
count
|
||||||
|
|
||||||
|
0
|
||||||
|
count
|
||||||
|
|
||||||
|
0
|
||||||
|
count
|
||||||
|
|
||||||
|
0
|
||||||
|
count
|
||||||
|
|
||||||
|
0
|
||||||
|
count
|
||||||
|
|
||||||
|
0
|
||||||
|
step s1-commit-prepared:
|
||||||
|
COMMIT prepared 'label';
|
||||||
|
|
||||||
|
s2: WARNING: connection to the remote node non-existent:57637 failed with the following error: could not translate host name "non-existent" to address: Name or service not known
|
||||||
|
step s2-execute-prepared:
|
||||||
|
EXECUTE foo;
|
||||||
|
|
||||||
|
count
|
||||||
|
|
||||||
|
0
|
||||||
|
step s1-update-node-existent:
|
||||||
|
SELECT 1 FROM master_update_node(
|
||||||
|
(select nodeid from pg_dist_node where nodeport = 57637),
|
||||||
|
'localhost',
|
||||||
|
57637);
|
||||||
|
|
||||||
|
?column?
|
||||||
|
|
||||||
|
1
|
||||||
|
step s2-drop-table:
|
||||||
|
DROP TABLE test;
|
||||||
|
|
||||||
|
nodeid nodename nodeport
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,7 @@ create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
||||||
step s1-update-node-1:
|
step s1-update-node-1:
|
||||||
SELECT 1 FROM master_update_node(
|
SELECT 1 FROM master_update_node(
|
||||||
|
@ -17,7 +17,7 @@ step s1-update-node-1:
|
||||||
|
|
||||||
1
|
1
|
||||||
step s2-begin:
|
step s2-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
||||||
step s2-insert:
|
step s2-insert:
|
||||||
INSERT INTO update_node(id, f1)
|
INSERT INTO update_node(id, f1)
|
||||||
|
@ -25,12 +25,12 @@ step s2-insert:
|
||||||
FROM generate_series(1, 10) as t(id);
|
FROM generate_series(1, 10) as t(id);
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
step s1-commit:
|
step s1-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
step s2-insert: <... completed>
|
step s2-insert: <... completed>
|
||||||
error in steps s1-commit s2-insert: ERROR: relation "public.update_node_102008" does not exist
|
error in steps s1-commit s2-insert: ERROR: relation "public.update_node_102012" does not exist
|
||||||
step s2-abort:
|
step s2-abort:
|
||||||
ABORT;
|
ABORT;
|
||||||
|
|
||||||
nodeid nodename nodeport
|
nodeid nodename nodeport
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
step s2-begin:
|
step s2-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
||||||
step s2-insert:
|
step s2-insert:
|
||||||
INSERT INTO update_node(id, f1)
|
INSERT INTO update_node(id, f1)
|
||||||
|
@ -54,7 +54,7 @@ step s1-update-node-1:
|
||||||
57638);
|
57638);
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
step s2-commit:
|
step s2-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
step s1-update-node-1: <... completed>
|
step s1-update-node-1: <... completed>
|
||||||
?column?
|
?column?
|
||||||
|
|
|
@ -19,6 +19,14 @@ step "s1-begin"
|
||||||
BEGIN;
|
BEGIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
step "s1-prepare-transaction" {
|
||||||
|
PREPARE transaction 'label';
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-commit-prepared" {
|
||||||
|
COMMIT prepared 'label';
|
||||||
|
}
|
||||||
|
|
||||||
step "s1-update-node-1"
|
step "s1-update-node-1"
|
||||||
{
|
{
|
||||||
SELECT 1 FROM master_update_node(
|
SELECT 1 FROM master_update_node(
|
||||||
|
@ -27,6 +35,20 @@ step "s1-update-node-1"
|
||||||
58637);
|
58637);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
step "s1-update-node-nonexistent" {
|
||||||
|
SELECT 1 FROM master_update_node(
|
||||||
|
(select nodeid from pg_dist_node where nodeport = 57637),
|
||||||
|
'non-existent',
|
||||||
|
57637);
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-update-node-existent" {
|
||||||
|
SELECT 1 FROM master_update_node(
|
||||||
|
(select nodeid from pg_dist_node where nodeport = 57637),
|
||||||
|
'localhost',
|
||||||
|
57637);
|
||||||
|
}
|
||||||
|
|
||||||
step "s1-commit"
|
step "s1-commit"
|
||||||
{
|
{
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -62,6 +84,25 @@ step "s2-update-node-2"
|
||||||
58638);
|
58638);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
step "s2-create-table" {
|
||||||
|
CREATE TABLE test (a int);
|
||||||
|
SELECT create_distributed_table('test','a');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-cache-prepared-statement" {
|
||||||
|
PREPARE foo AS SELECT COUNT(*) FROM test WHERE a = 3;
|
||||||
|
EXECUTE foo;
|
||||||
|
EXECUTE foo;
|
||||||
|
EXECUTE foo;
|
||||||
|
EXECUTE foo;
|
||||||
|
EXECUTE foo;
|
||||||
|
EXECUTE foo;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-execute-prepared" {
|
||||||
|
EXECUTE foo;
|
||||||
|
}
|
||||||
|
|
||||||
step "s2-verify-metadata"
|
step "s2-verify-metadata"
|
||||||
{
|
{
|
||||||
SELECT nodeid, groupid, nodename, nodeport FROM pg_dist_node ORDER BY nodeid;
|
SELECT nodeid, groupid, nodename, nodeport FROM pg_dist_node ORDER BY nodeid;
|
||||||
|
@ -76,6 +117,10 @@ step "s2-start-metadata-sync-node-2"
|
||||||
SELECT start_metadata_sync_to_node('localhost', 57638);
|
SELECT start_metadata_sync_to_node('localhost', 57638);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
step "s2-drop-table" {
|
||||||
|
DROP TABLE test;
|
||||||
|
}
|
||||||
|
|
||||||
step "s2-abort"
|
step "s2-abort"
|
||||||
{
|
{
|
||||||
ABORT;
|
ABORT;
|
||||||
|
@ -91,3 +136,8 @@ permutation "s1-begin" "s1-update-node-1" "s2-begin" "s2-update-node-1" "s1-comm
|
||||||
// cannot run start_metadata_sync_to_node in a transaction, so we're not
|
// cannot run start_metadata_sync_to_node in a transaction, so we're not
|
||||||
// testing the reverse order here.
|
// testing the reverse order here.
|
||||||
permutation "s1-begin" "s1-update-node-1" "s2-start-metadata-sync-node-2" "s1-commit" "s2-verify-metadata"
|
permutation "s1-begin" "s1-update-node-1" "s2-start-metadata-sync-node-2" "s1-commit" "s2-verify-metadata"
|
||||||
|
|
||||||
|
// make sure we have entries in prepared statement cache
|
||||||
|
// then make sure that after we update pg_dist_node, the changes are visible to
|
||||||
|
// the prepared statement
|
||||||
|
permutation "s2-create-table" "s1-begin" "s1-update-node-nonexistent" "s1-prepare-transaction" "s2-cache-prepared-statement" "s1-commit-prepared" "s2-execute-prepared" "s1-update-node-existent" "s2-drop-table"
|
||||||
|
|
Loading…
Reference in New Issue