From 56ca590daa5e3329cb390615048f6b48566fde72 Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Mon, 16 Jan 2017 14:59:52 +0300 Subject: [PATCH] Propagate metadata changes for deleted reference table placements on master_remove_node call --- src/backend/distributed/utils/node_metadata.c | 9 +- .../distributed/utils/reference_table_utils.c | 9 +- .../multi_remove_node_reference_table.out | 305 ++++++++++++++++++ .../sql/multi_remove_node_reference_table.sql | 185 +++++++++++ 4 files changed, 506 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index cde71d763..4ea766ec8 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -364,12 +364,18 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove) bool hasShardPlacements = false; WorkerNode *workerNode = NULL; List *referenceTableList = NIL; + uint32 deletedNodeId = INVALID_PLACEMENT_ID; EnsureSchemaNode(); EnsureSuperUser(); workerNode = FindWorkerNode(nodeName, nodePort); + if (workerNode != NULL) + { + deletedNodeId = workerNode->nodeId; + } + DeleteNodeRow(nodeName, nodePort); DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort); @@ -410,7 +416,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove) } } - nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); + nodeDeleteCommand = NodeDeleteCommand(deletedNodeId); /* make sure we don't have any lingering session lifespan connections */ CloseNodeConnectionsAfterTransaction(nodeName, nodePort); @@ -728,6 +734,7 @@ DeleteNodeRow(char *nodeName, int32 nodePort) NULL, scanKeyCount, scanKey); heapTuple = systable_getnext(heapScan); + if (!HeapTupleIsValid(heapTuple)) { ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index a368bccb2..d963386f4 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -382,10 +382,17 @@ DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort) List *shardIntervalList = LoadShardIntervalList(referenceTableId); ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); uint64 shardId = shardInterval->shardId; + uint64 placementId = INVALID_PLACEMENT_ID; + StringInfo deletePlacementCommand = makeStringInfo(); LockShardDistributionMetadata(shardId, ExclusiveLock); - DeleteShardPlacementRow(shardId, workerName, workerPort); + placementId = DeleteShardPlacementRow(shardId, workerName, workerPort); + + appendStringInfo(deletePlacementCommand, + "DELETE FROM pg_dist_shard_placement WHERE placementid=%lu", + placementId); + SendCommandToWorkers(WORKERS_WITH_METADATA, deletePlacementCommand->data); } } diff --git a/src/test/regress/expected/multi_remove_node_reference_table.out b/src/test/regress/expected/multi_remove_node_reference_table.out index e10a80f06..17a81a35b 100644 --- a/src/test/regress/expected/multi_remove_node_reference_table.out +++ b/src/test/regress/expected/multi_remove_node_reference_table.out @@ -10,6 +10,13 @@ ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1380000; -- create copy of pg_dist_shard_placement to reload after the test CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +-- make worker 1 receive metadata changes +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + -- remove non-existing node SELECT master_remove_node('localhost', 55555); ERROR: could not find valid entry for node "localhost:55555" @@ -78,6 +85,27 @@ WHERE colocationid IN 1380000 | 1 | 2 | 0 (1 row) + +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + + +\c - - - :master_port SELECT master_remove_node('localhost', :worker_2_port); master_remove_node -------------------- @@ -112,6 +140,25 @@ WHERE colocationid IN 1380000 | 1 | 1 | 0 (1 row) +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + + +\c - - - :master_port -- remove same node twice SELECT master_remove_node('localhost', :worker_2_port); ERROR: could not find valid entry for node "localhost:57638" @@ -153,6 +200,27 @@ WHERE colocationid IN 1380000 | 1 | 2 | 0 (1 row) + +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + + +\c - - - :master_port BEGIN; SELECT master_remove_node('localhost', :worker_2_port); master_remove_node @@ -190,6 +258,26 @@ WHERE colocationid IN 1380000 | 1 | 2 | 0 (1 row) +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + + +\c - - - :master_port -- remove node in a transaction and COMMIT -- status before master_remove_node SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; @@ -220,6 +308,27 @@ WHERE colocationid IN 1380000 | 1 | 2 | 0 (1 row) + +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + + +\c - - - :master_port BEGIN; SELECT master_remove_node('localhost', :worker_2_port); master_remove_node @@ -256,6 +365,26 @@ WHERE colocationid IN 1380000 | 1 | 1 | 0 (1 row) + +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + + +\c - - - :master_port -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers @@ -294,6 +423,27 @@ WHERE colocationid IN 1380000 | 1 | 2 | 0 (1 row) +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + + +\c - - - :master_port + BEGIN; INSERT INTO remove_node_reference_table VALUES(1); SELECT master_remove_node('localhost', :worker_2_port); @@ -338,6 +488,32 @@ SELECT * FROM remove_node_reference_table; 1 (1 row) +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + + +SELECT * FROM remove_node_reference_table; + column1 +--------- + 1 +(1 row) + + +\c - - - :master_port -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers @@ -376,6 +552,26 @@ WHERE colocationid IN 1380000 | 1 | 2 | 0 (1 row) +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + + +\c - - - :master_port BEGIN; ALTER TABLE remove_node_reference_table ADD column2 int; NOTICE: using one-phase commit for distributed DDL commands @@ -415,6 +611,26 @@ WHERE colocationid IN 1380000 | 1 | 1 | 0 (1 row) + +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + + +\c - - - :master_port -- verify table structure is changed \d remove_node_reference_table Table "public.remove_node_reference_table" @@ -485,6 +701,28 @@ WHERE colocationid IN 1380000 | 1 | 2 | 0 (1 row) + +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 + 1380001 | 1 | 0 | localhost | 57638 +(2 rows) + + +\c - - - :master_port SELECT master_remove_node('localhost', :worker_2_port); master_remove_node -------------------- @@ -519,6 +757,26 @@ WHERE colocationid IN 1380000 | 1 | 1 | 0 (1 row) +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + + +\c - - - :master_port + -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers @@ -561,6 +819,28 @@ WHERE colocationid IN 1380000 | 1 | 2 | 0 (1 row) +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 + 1380001 | 1 | 0 | localhost | 57638 +(2 rows) + + +\c - - - :master_port + SELECT master_disable_node('localhost', :worker_2_port); master_disable_node --------------------- @@ -595,6 +875,25 @@ WHERE colocationid IN 1380000 | 1 | 1 | 0 (1 row) +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + + +\c - - - :master_port -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers @@ -608,6 +907,12 @@ NOTICE: Replicating reference table "table1" to all workers DROP TABLE remove_node_reference_table; DROP TABLE remove_node_reference_table_schema.table1; DROP SCHEMA remove_node_reference_table_schema CASCADE; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + -- reload pg_dist_shard_placement table INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); DROP TABLE tmp_shard_placement; diff --git a/src/test/regress/sql/multi_remove_node_reference_table.sql b/src/test/regress/sql/multi_remove_node_reference_table.sql index 5ab58e978..37d9f8aa3 100644 --- a/src/test/regress/sql/multi_remove_node_reference_table.sql +++ b/src/test/regress/sql/multi_remove_node_reference_table.sql @@ -14,6 +14,8 @@ ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1380000; CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +-- make worker 1 receive metadata changes +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -- remove non-existing node SELECT master_remove_node('localhost', 55555); @@ -52,6 +54,19 @@ WHERE colocationid IN (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port SELECT master_remove_node('localhost', :worker_2_port); @@ -72,6 +87,18 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port -- remove same node twice SELECT master_remove_node('localhost', :worker_2_port); @@ -97,6 +124,19 @@ WHERE colocationid IN (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port BEGIN; SELECT master_remove_node('localhost', :worker_2_port); @@ -119,6 +159,18 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port -- remove node in a transaction and COMMIT @@ -138,6 +190,19 @@ WHERE colocationid IN (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port BEGIN; SELECT master_remove_node('localhost', :worker_2_port); @@ -159,6 +224,19 @@ WHERE colocationid IN (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); @@ -182,6 +260,19 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port + BEGIN; INSERT INTO remove_node_reference_table VALUES(1); SELECT master_remove_node('localhost', :worker_2_port); @@ -207,6 +298,21 @@ WHERE colocationid IN --verify the data is inserted SELECT * FROM remove_node_reference_table; +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * FROM remove_node_reference_table; + +\c - - - :master_port + -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); @@ -230,6 +336,19 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port + BEGIN; ALTER TABLE remove_node_reference_table ADD column2 int; SELECT master_remove_node('localhost', :worker_2_port); @@ -251,6 +370,19 @@ WHERE colocationid IN (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port -- verify table structure is changed \d remove_node_reference_table @@ -289,6 +421,19 @@ WHERE colocationid IN (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); + +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port SELECT master_remove_node('localhost', :worker_2_port); @@ -309,6 +454,19 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port + -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); @@ -334,6 +492,19 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port + SELECT master_disable_node('localhost', :worker_2_port); -- status after master_disable_node @@ -353,6 +524,19 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port + -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); @@ -362,6 +546,7 @@ DROP TABLE remove_node_reference_table; DROP TABLE remove_node_reference_table_schema.table1; DROP SCHEMA remove_node_reference_table_schema CASCADE; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); -- reload pg_dist_shard_placement table INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);