Remove placement metadata of reference tables after master_remove_node

With this change, we start to delete placement of reference tables at given worker node
after master_remove_node UDF call. We remove placement metadata at master node but we do
not drop actual shard from the worker node. There are two reasons for that decision,
first, it is not critical to DROP the shards in the workers because Citus will ignore them
as long as node is removed from cluster and if we add that node back to cluster we will
DROP and recreate all reference tables. Second, if node is unreachable, it becomes
complicated to cover failure cases and have a transaction support.
pull/1105/head
Burak Yucesoy 2017-01-05 11:09:30 +03:00
parent 0af5d553d1
commit 3315ae6142
6 changed files with 1086 additions and 7 deletions

View File

@ -23,6 +23,7 @@
#include "access/xact.h"
#include "catalog/indexing.h"
#include "commands/sequence.h"
#include "distributed/colocation_utils.h"
#include "distributed/connection_management.h"
#include "distributed/master_protocol.h"
#include "distributed/master_metadata_utility.h"
@ -107,6 +108,9 @@ master_add_node(PG_FUNCTION_ARGS)
* the master node and all nodes with metadata.
* The call to the master_remove_node should be done by the super user and the specified
* node should not have any active placements.
* This function also deletes all reference table placements belong to the given node from
* pg_dist_shard_placement, but it does not drop actual placement at the node. In the case
* of re-adding the node, master_add_node first drops and re-creates the reference tables.
*/
Datum
master_remove_node(PG_FUNCTION_ARGS)
@ -126,6 +130,9 @@ master_remove_node(PG_FUNCTION_ARGS)
* the master node and all nodes with metadata regardless of the node having an active
* shard placement.
* The call to the master_remove_node should be done by the super user.
* This function also deletes all reference table placements belong to the given node from
* pg_dist_shard_placement, but it does not drop actual placement at the node. In the case
* of re-adding the node, master_add_node first drops and re-creates the reference tables.
*/
Datum
master_disable_node(PG_FUNCTION_ARGS)
@ -345,6 +352,10 @@ ReadWorkerNodes()
* The call to the master_remove_node should be done by the super user. If there are
* active shard placements on the node; the function removes the node when forceRemove
* flag is set, it errors out otherwise.
* This function also deletes all reference table placements belong to the given node from
* pg_dist_shard_placement, but it does not drop actual placement at the node. It also
* modifies replication factor of the colocation group of reference tables, so that
* replication factor will be equal to worker count.
*/
static void
RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove)
@ -352,10 +363,34 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove)
char *nodeDeleteCommand = NULL;
bool hasShardPlacements = false;
WorkerNode *workerNode = NULL;
List *referenceTableList = NIL;
EnsureSchemaNode();
EnsureSuperUser();
workerNode = FindWorkerNode(nodeName, nodePort);
DeleteNodeRow(nodeName, nodePort);
DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort);
/*
* After deleting reference tables placements, we will update replication factor
* column for colocation group of reference tables so that replication factor will
* be equal to worker count.
*/
referenceTableList = ReferenceTableOidList();
if (list_length(referenceTableList) != 0)
{
Oid firstReferenceTableId = linitial_oid(referenceTableList);
uint32 referenceTableColocationId = TableColocationId(firstReferenceTableId);
List *workerNodeList = WorkerNodeList();
int workerCount = list_length(workerNodeList);
UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount);
}
hasShardPlacements = NodeHasActiveShardPlacements(nodeName, nodePort);
if (hasShardPlacements)
{
@ -375,10 +410,6 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove)
}
}
workerNode = FindWorkerNode(nodeName, nodePort);
DeleteNodeRow(nodeName, nodePort);
nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
/* make sure we don't have any lingering session lifespan connections */

View File

@ -24,6 +24,7 @@
#include "distributed/reference_table_utils.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/transaction_management.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "utils/fmgroids.h"
@ -35,7 +36,7 @@
static void ReplicateSingleShardTableToAllWorkers(Oid relationId);
static void ReplicateShardToAllWorkers(ShardInterval *shardInterval);
static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId);
static List * ReferenceTableOidList(void);
static int CompareOids(const void *leftElement, const void *rightElement);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(upgrade_to_reference_table);
@ -129,6 +130,12 @@ ReplicateAllReferenceTablesToAllNodes()
workerNodeList = WorkerNodeList();
workerCount = list_length(workerNodeList);
/*
* We sort the reference table list to prevent deadlocks in concurrent
* ReplicateAllReferenceTablesToAllNodes calls.
*/
referenceTableList = SortList(referenceTableList, CompareOids);
foreach(referenceTableCell, referenceTableList)
{
Oid referenceTableId = lfirst_oid(referenceTableCell);
@ -153,7 +160,6 @@ ReplicateAllReferenceTablesToAllNodes()
firstReferenceTableId = linitial_oid(referenceTableList);
referenceTableColocationId = TableColocationId(firstReferenceTableId);
UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount);
heap_close(pgDistNode, NoLock);
}
@ -318,6 +324,44 @@ CreateReferenceTableColocationId()
}
/*
* DeleteAllReferenceTablePlacementsFromNode function iterates over list of reference
* tables and deletes all reference table placements from pg_dist_shard_placement table
* for given worker node. However, it does not modify replication factor of the colocation
* group of reference tables. It is caller's responsibility to do that if it is necessary.
*/
void
DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort)
{
List *referenceTableList = ReferenceTableOidList();
ListCell *referenceTableCell = NULL;
/* if there are no reference tables, we do not need to do anything */
if (list_length(referenceTableList) == 0)
{
return;
}
/*
* We sort the reference table list to prevent deadlocks in concurrent
* DeleteAllReferenceTablePlacementsFromNode calls.
*/
referenceTableList = SortList(referenceTableList, CompareOids);
foreach(referenceTableCell, referenceTableList)
{
Oid referenceTableId = lfirst_oid(referenceTableCell);
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
LockShardDistributionMetadata(shardId, ExclusiveLock);
DeleteShardPlacementRow(shardId, workerName, workerPort);
}
}
/*
* ReferenceTableOidList function scans pg_dist_partition to create a list of all
* reference tables. To create the list, it performs sequential scan. Since it is not
@ -325,7 +369,7 @@ CreateReferenceTableColocationId()
* If this function becomes performance bottleneck, it is possible to modify this function
* to perform index scan.
*/
static List *
List *
ReferenceTableOidList()
{
List *distTableOidList = DistTableOidList();
@ -348,3 +392,25 @@ ReferenceTableOidList()
return referenceTableList;
}
/* CompareOids is a comparison function for sort shard oids */
static int
CompareOids(const void *leftElement, const void *rightElement)
{
Oid *leftId = (Oid *) leftElement;
Oid *rightId = (Oid *) rightElement;
if (*leftId > *rightId)
{
return 1;
}
else if (*leftId < *rightId)
{
return -1;
}
else
{
return 0;
}
}

View File

@ -14,5 +14,8 @@
extern uint32 CreateReferenceTableColocationId(void);
extern void ReplicateAllReferenceTablesToAllNodes(void);
extern void DeleteAllReferenceTablePlacementsFromNode(char *workerName,
uint32 workerPort);
extern List * ReferenceTableOidList(void);
#endif /* REFERENCE_TABLE_UTILS_H_ */

View File

@ -0,0 +1,611 @@
--
-- MULTI_REMOVE_NODE_REFERENCE_TABLE
--
-- Tests that check the metadata after master_remove_node.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1380000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1380000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1380000;
-- create copy of pg_dist_shard_placement to reload after the test
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
-- remove non-existing node
SELECT master_remove_node('localhost', 55555);
ERROR: could not find valid entry for node "localhost:55555"
-- remove a node with no reference tables
-- verify node exist before removal
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
-- verify node is removed
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------
(15,15,localhost,57638,default,f)
(1 row)
-- remove a node with reference table
CREATE TABLE remove_node_reference_table(column1 int);
SELECT create_reference_table('remove_node_reference_table');
create_reference_table
------------------------
(1 row)
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1380000 | 1 | 2 | 0
(1 row)
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
-- status after master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1380000 | 1 | 1 | 0
(1 row)
-- remove same node twice
SELECT master_remove_node('localhost', :worker_2_port);
ERROR: could not find valid entry for node "localhost:57638"
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers
master_add_node
-----------------------------------
(16,16,localhost,57638,default,f)
(1 row)
-- remove node in a transaction and ROLLBACK
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1380000 | 1 | 2 | 0
(1 row)
BEGIN;
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
ROLLBACK;
-- status after master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1380000 | 1 | 2 | 0
(1 row)
-- remove node in a transaction and COMMIT
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1380000 | 1 | 2 | 0
(1 row)
BEGIN;
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
COMMIT;
-- status after master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1380000 | 1 | 1 | 0
(1 row)
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers
master_add_node
-----------------------------------
(17,17,localhost,57638,default,f)
(1 row)
-- test inserting a value then removing a node in a transaction
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1380000 | 1 | 2 | 0
(1 row)
BEGIN;
INSERT INTO remove_node_reference_table VALUES(1);
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
COMMIT;
-- status after master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1380000 | 1 | 1 | 0
(1 row)
--verify the data is inserted
SELECT * FROM remove_node_reference_table;
column1
---------
1
(1 row)
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers
master_add_node
-----------------------------------
(18,18,localhost,57638,default,f)
(1 row)
-- test executing DDL command then removing a node in a transaction
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1380000 | 1 | 2 | 0
(1 row)
BEGIN;
ALTER TABLE remove_node_reference_table ADD column2 int;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
COMMIT;
-- status after master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1380000 | 1 | 1 | 0
(1 row)
-- verify table structure is changed
\d remove_node_reference_table
Table "public.remove_node_reference_table"
Column | Type | Modifiers
---------+---------+-----------
column1 | integer |
column2 | integer |
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers
master_add_node
-----------------------------------
(19,19,localhost,57638,default,f)
(1 row)
-- test DROP table after removing a node in a transaction
BEGIN;
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
DROP TABLE remove_node_reference_table;
ERROR: DROP distributed table cannot run inside a transaction block
CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)"
PL/pgSQL function citus_drop_trigger() line 21 at PERFORM
ROLLBACK;
-- test removing a node while there is a reference table at another schema
CREATE SCHEMA remove_node_reference_table_schema;
CREATE TABLE remove_node_reference_table_schema.table1(column1 int);
SELECT create_reference_table('remove_node_reference_table_schema.table1');
create_reference_table
------------------------
(1 row)
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port
ORDER BY
shardid;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
1380001 | 1 | 0 | localhost | 57638
(2 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1380000 | 1 | 2 | 0
(1 row)
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
-- status after master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1380000 | 1 | 1 | 0
(1 row)
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers
NOTICE: Replicating reference table "table1" to all workers
master_add_node
-----------------------------------
(20,20,localhost,57638,default,f)
(1 row)
-- test with master_disable_node
-- status before master_disable_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port
ORDER BY
shardid;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
1380001 | 1 | 0 | localhost | 57638
(2 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1380000 | 1 | 2 | 0
(1 row)
SELECT master_disable_node('localhost', :worker_2_port);
master_disable_node
---------------------
(1 row)
-- status after master_disable_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1380000 | 1 | 1 | 0
(1 row)
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers
NOTICE: Replicating reference table "table1" to all workers
master_add_node
-----------------------------------
(21,21,localhost,57638,default,f)
(1 row)
-- DROP tables to clean workspace
DROP TABLE remove_node_reference_table;
DROP TABLE remove_node_reference_table_schema.table1;
DROP SCHEMA remove_node_reference_table_schema CASCADE;
-- reload pg_dist_shard_placement table
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;

View File

@ -210,6 +210,8 @@ test: multi_foreign_key
# ----------
# multi_upgrade_reference_table tests for upgrade_reference_table UDF
# multi_replicate_reference_table tests replicating reference tables to new nodes after we add new nodes
# multi_remove_node_reference_table tests metadata changes after master_remove_node
# ----------
test: multi_upgrade_reference_table
test: multi_replicate_reference_table
test: multi_remove_node_reference_table

View File

@ -0,0 +1,366 @@
--
-- MULTI_REMOVE_NODE_REFERENCE_TABLE
--
-- Tests that check the metadata after master_remove_node.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1380000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1380000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1380000;
-- create copy of pg_dist_shard_placement to reload after the test
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
-- remove non-existing node
SELECT master_remove_node('localhost', 55555);
-- remove a node with no reference tables
-- verify node exist before removal
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT master_remove_node('localhost', :worker_2_port);
-- verify node is removed
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
-- remove a node with reference table
CREATE TABLE remove_node_reference_table(column1 int);
SELECT create_reference_table('remove_node_reference_table');
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
SELECT master_remove_node('localhost', :worker_2_port);
-- status after master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
-- remove same node twice
SELECT master_remove_node('localhost', :worker_2_port);
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
-- remove node in a transaction and ROLLBACK
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
BEGIN;
SELECT master_remove_node('localhost', :worker_2_port);
ROLLBACK;
-- status after master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
-- remove node in a transaction and COMMIT
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
BEGIN;
SELECT master_remove_node('localhost', :worker_2_port);
COMMIT;
-- status after master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
-- test inserting a value then removing a node in a transaction
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
BEGIN;
INSERT INTO remove_node_reference_table VALUES(1);
SELECT master_remove_node('localhost', :worker_2_port);
COMMIT;
-- status after master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
--verify the data is inserted
SELECT * FROM remove_node_reference_table;
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
-- test executing DDL command then removing a node in a transaction
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
BEGIN;
ALTER TABLE remove_node_reference_table ADD column2 int;
SELECT master_remove_node('localhost', :worker_2_port);
COMMIT;
-- status after master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
-- verify table structure is changed
\d remove_node_reference_table
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
-- test DROP table after removing a node in a transaction
BEGIN;
SELECT master_remove_node('localhost', :worker_2_port);
DROP TABLE remove_node_reference_table;
ROLLBACK;
-- test removing a node while there is a reference table at another schema
CREATE SCHEMA remove_node_reference_table_schema;
CREATE TABLE remove_node_reference_table_schema.table1(column1 int);
SELECT create_reference_table('remove_node_reference_table_schema.table1');
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port
ORDER BY
shardid;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass);
SELECT master_remove_node('localhost', :worker_2_port);
-- status after master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass);
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
-- test with master_disable_node
-- status before master_disable_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port
ORDER BY
shardid;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
SELECT master_disable_node('localhost', :worker_2_port);
-- status after master_disable_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
-- DROP tables to clean workspace
DROP TABLE remove_node_reference_table;
DROP TABLE remove_node_reference_table_schema.table1;
DROP SCHEMA remove_node_reference_table_schema CASCADE;
-- reload pg_dist_shard_placement table
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;