Propagate metadata changes for deleted reference table placements on master_remove_node call

pull/1103/head
Eren Basak 2017-01-16 14:59:52 +03:00
parent be78769ae4
commit 56ca590daa
4 changed files with 506 additions and 2 deletions

View File

@ -364,12 +364,18 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove)
bool hasShardPlacements = false;
WorkerNode *workerNode = NULL;
List *referenceTableList = NIL;
uint32 deletedNodeId = INVALID_PLACEMENT_ID;
EnsureSchemaNode();
EnsureSuperUser();
workerNode = FindWorkerNode(nodeName, nodePort);
if (workerNode != NULL)
{
deletedNodeId = workerNode->nodeId;
}
DeleteNodeRow(nodeName, nodePort);
DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort);
@ -410,7 +416,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove)
}
}
nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
nodeDeleteCommand = NodeDeleteCommand(deletedNodeId);
/* make sure we don't have any lingering session lifespan connections */
CloseNodeConnectionsAfterTransaction(nodeName, nodePort);
@ -728,6 +734,7 @@ DeleteNodeRow(char *nodeName, int32 nodePort)
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(heapScan);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",

View File

@ -382,10 +382,17 @@ DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort)
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
uint64 placementId = INVALID_PLACEMENT_ID;
StringInfo deletePlacementCommand = makeStringInfo();
LockShardDistributionMetadata(shardId, ExclusiveLock);
DeleteShardPlacementRow(shardId, workerName, workerPort);
placementId = DeleteShardPlacementRow(shardId, workerName, workerPort);
appendStringInfo(deletePlacementCommand,
"DELETE FROM pg_dist_shard_placement WHERE placementid=%lu",
placementId);
SendCommandToWorkers(WORKERS_WITH_METADATA, deletePlacementCommand->data);
}
}

View File

@ -10,6 +10,13 @@ ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1380000;
-- create copy of pg_dist_shard_placement to reload after the test
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
-- make worker 1 receive metadata changes
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
-- remove non-existing node
SELECT master_remove_node('localhost', 55555);
ERROR: could not find valid entry for node "localhost:55555"
@ -78,6 +85,27 @@ WHERE colocationid IN
1380000 | 1 | 2 | 0
(1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
\c - - - :master_port
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
@ -112,6 +140,25 @@ WHERE colocationid IN
1380000 | 1 | 1 | 0
(1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
\c - - - :master_port
-- remove same node twice
SELECT master_remove_node('localhost', :worker_2_port);
ERROR: could not find valid entry for node "localhost:57638"
@ -153,6 +200,27 @@ WHERE colocationid IN
1380000 | 1 | 2 | 0
(1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
\c - - - :master_port
BEGIN;
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
@ -190,6 +258,26 @@ WHERE colocationid IN
1380000 | 1 | 2 | 0
(1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
\c - - - :master_port
-- remove node in a transaction and COMMIT
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
@ -220,6 +308,27 @@ WHERE colocationid IN
1380000 | 1 | 2 | 0
(1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
\c - - - :master_port
BEGIN;
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
@ -256,6 +365,26 @@ WHERE colocationid IN
1380000 | 1 | 1 | 0
(1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
\c - - - :master_port
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers
@ -294,6 +423,27 @@ WHERE colocationid IN
1380000 | 1 | 2 | 0
(1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
\c - - - :master_port
BEGIN;
INSERT INTO remove_node_reference_table VALUES(1);
SELECT master_remove_node('localhost', :worker_2_port);
@ -338,6 +488,32 @@ SELECT * FROM remove_node_reference_table;
1
(1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT * FROM remove_node_reference_table;
column1
---------
1
(1 row)
\c - - - :master_port
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers
@ -376,6 +552,26 @@ WHERE colocationid IN
1380000 | 1 | 2 | 0
(1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
\c - - - :master_port
BEGIN;
ALTER TABLE remove_node_reference_table ADD column2 int;
NOTICE: using one-phase commit for distributed DDL commands
@ -415,6 +611,26 @@ WHERE colocationid IN
1380000 | 1 | 1 | 0
(1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
\c - - - :master_port
-- verify table structure is changed
\d remove_node_reference_table
Table "public.remove_node_reference_table"
@ -485,6 +701,28 @@ WHERE colocationid IN
1380000 | 1 | 2 | 0
(1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
1380001 | 1 | 0 | localhost | 57638
(2 rows)
\c - - - :master_port
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
@ -519,6 +757,26 @@ WHERE colocationid IN
1380000 | 1 | 1 | 0
(1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
\c - - - :master_port
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers
@ -561,6 +819,28 @@ WHERE colocationid IN
1380000 | 1 | 2 | 0
(1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
1380001 | 1 | 0 | localhost | 57638
(2 rows)
\c - - - :master_port
SELECT master_disable_node('localhost', :worker_2_port);
master_disable_node
---------------------
@ -595,6 +875,25 @@ WHERE colocationid IN
1380000 | 1 | 1 | 0
(1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
\c - - - :master_port
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers
@ -608,6 +907,12 @@ NOTICE: Replicating reference table "table1" to all workers
DROP TABLE remove_node_reference_table;
DROP TABLE remove_node_reference_table_schema.table1;
DROP SCHEMA remove_node_reference_table_schema CASCADE;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
----------------------------
(1 row)
-- reload pg_dist_shard_placement table
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;

View File

@ -14,6 +14,8 @@ ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1380000;
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
-- make worker 1 receive metadata changes
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- remove non-existing node
SELECT master_remove_node('localhost', 55555);
@ -52,6 +54,19 @@ WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
SELECT master_remove_node('localhost', :worker_2_port);
@ -72,6 +87,18 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
-- remove same node twice
SELECT master_remove_node('localhost', :worker_2_port);
@ -97,6 +124,19 @@ WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
BEGIN;
SELECT master_remove_node('localhost', :worker_2_port);
@ -119,6 +159,18 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
-- remove node in a transaction and COMMIT
@ -138,6 +190,19 @@ WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
BEGIN;
SELECT master_remove_node('localhost', :worker_2_port);
@ -159,6 +224,19 @@ WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
@ -182,6 +260,19 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
BEGIN;
INSERT INTO remove_node_reference_table VALUES(1);
SELECT master_remove_node('localhost', :worker_2_port);
@ -207,6 +298,21 @@ WHERE colocationid IN
--verify the data is inserted
SELECT * FROM remove_node_reference_table;
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT * FROM remove_node_reference_table;
\c - - - :master_port
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
@ -230,6 +336,19 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
BEGIN;
ALTER TABLE remove_node_reference_table ADD column2 int;
SELECT master_remove_node('localhost', :worker_2_port);
@ -251,6 +370,19 @@ WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
-- verify table structure is changed
\d remove_node_reference_table
@ -289,6 +421,19 @@ WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
SELECT master_remove_node('localhost', :worker_2_port);
@ -309,6 +454,19 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
@ -334,6 +492,19 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
SELECT master_disable_node('localhost', :worker_2_port);
-- status after master_disable_node
@ -353,6 +524,19 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
@ -362,6 +546,7 @@ DROP TABLE remove_node_reference_table;
DROP TABLE remove_node_reference_table_schema.table1;
DROP SCHEMA remove_node_reference_table_schema CASCADE;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
-- reload pg_dist_shard_placement table
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);