diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index ebcc52215..cde71d763 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -23,6 +23,7 @@ #include "access/xact.h" #include "catalog/indexing.h" #include "commands/sequence.h" +#include "distributed/colocation_utils.h" #include "distributed/connection_management.h" #include "distributed/master_protocol.h" #include "distributed/master_metadata_utility.h" @@ -107,6 +108,9 @@ master_add_node(PG_FUNCTION_ARGS) * the master node and all nodes with metadata. * The call to the master_remove_node should be done by the super user and the specified * node should not have any active placements. + * This function also deletes all reference table placements belong to the given node from + * pg_dist_shard_placement, but it does not drop actual placement at the node. In the case + * of re-adding the node, master_add_node first drops and re-creates the reference tables. */ Datum master_remove_node(PG_FUNCTION_ARGS) @@ -126,6 +130,9 @@ master_remove_node(PG_FUNCTION_ARGS) * the master node and all nodes with metadata regardless of the node having an active * shard placement. * The call to the master_remove_node should be done by the super user. + * This function also deletes all reference table placements belong to the given node from + * pg_dist_shard_placement, but it does not drop actual placement at the node. In the case + * of re-adding the node, master_add_node first drops and re-creates the reference tables. */ Datum master_disable_node(PG_FUNCTION_ARGS) @@ -345,6 +352,10 @@ ReadWorkerNodes() * The call to the master_remove_node should be done by the super user. If there are * active shard placements on the node; the function removes the node when forceRemove * flag is set, it errors out otherwise. + * This function also deletes all reference table placements belong to the given node from + * pg_dist_shard_placement, but it does not drop actual placement at the node. It also + * modifies replication factor of the colocation group of reference tables, so that + * replication factor will be equal to worker count. */ static void RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove) @@ -352,10 +363,34 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove) char *nodeDeleteCommand = NULL; bool hasShardPlacements = false; WorkerNode *workerNode = NULL; + List *referenceTableList = NIL; EnsureSchemaNode(); EnsureSuperUser(); + workerNode = FindWorkerNode(nodeName, nodePort); + + DeleteNodeRow(nodeName, nodePort); + + DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort); + + /* + * After deleting reference tables placements, we will update replication factor + * column for colocation group of reference tables so that replication factor will + * be equal to worker count. + */ + referenceTableList = ReferenceTableOidList(); + if (list_length(referenceTableList) != 0) + { + Oid firstReferenceTableId = linitial_oid(referenceTableList); + uint32 referenceTableColocationId = TableColocationId(firstReferenceTableId); + + List *workerNodeList = WorkerNodeList(); + int workerCount = list_length(workerNodeList); + + UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount); + } + hasShardPlacements = NodeHasActiveShardPlacements(nodeName, nodePort); if (hasShardPlacements) { @@ -375,10 +410,6 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove) } } - workerNode = FindWorkerNode(nodeName, nodePort); - - DeleteNodeRow(nodeName, nodePort); - nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); /* make sure we don't have any lingering session lifespan connections */ diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 905d7b8d2..3b67919f6 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -24,6 +24,7 @@ #include "distributed/reference_table_utils.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" +#include "distributed/transaction_management.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" #include "utils/fmgroids.h" @@ -35,7 +36,7 @@ static void ReplicateSingleShardTableToAllWorkers(Oid relationId); static void ReplicateShardToAllWorkers(ShardInterval *shardInterval); static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId); -static List * ReferenceTableOidList(void); +static int CompareOids(const void *leftElement, const void *rightElement); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(upgrade_to_reference_table); @@ -129,6 +130,12 @@ ReplicateAllReferenceTablesToAllNodes() workerNodeList = WorkerNodeList(); workerCount = list_length(workerNodeList); + + /* + * We sort the reference table list to prevent deadlocks in concurrent + * ReplicateAllReferenceTablesToAllNodes calls. + */ + referenceTableList = SortList(referenceTableList, CompareOids); foreach(referenceTableCell, referenceTableList) { Oid referenceTableId = lfirst_oid(referenceTableCell); @@ -153,7 +160,6 @@ ReplicateAllReferenceTablesToAllNodes() firstReferenceTableId = linitial_oid(referenceTableList); referenceTableColocationId = TableColocationId(firstReferenceTableId); UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount); - heap_close(pgDistNode, NoLock); } @@ -318,6 +324,44 @@ CreateReferenceTableColocationId() } +/* + * DeleteAllReferenceTablePlacementsFromNode function iterates over list of reference + * tables and deletes all reference table placements from pg_dist_shard_placement table + * for given worker node. However, it does not modify replication factor of the colocation + * group of reference tables. It is caller's responsibility to do that if it is necessary. + */ +void +DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort) +{ + List *referenceTableList = ReferenceTableOidList(); + ListCell *referenceTableCell = NULL; + + /* if there are no reference tables, we do not need to do anything */ + if (list_length(referenceTableList) == 0) + { + return; + } + + /* + * We sort the reference table list to prevent deadlocks in concurrent + * DeleteAllReferenceTablePlacementsFromNode calls. + */ + referenceTableList = SortList(referenceTableList, CompareOids); + foreach(referenceTableCell, referenceTableList) + { + Oid referenceTableId = lfirst_oid(referenceTableCell); + + List *shardIntervalList = LoadShardIntervalList(referenceTableId); + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + uint64 shardId = shardInterval->shardId; + + LockShardDistributionMetadata(shardId, ExclusiveLock); + + DeleteShardPlacementRow(shardId, workerName, workerPort); + } +} + + /* * ReferenceTableOidList function scans pg_dist_partition to create a list of all * reference tables. To create the list, it performs sequential scan. Since it is not @@ -325,7 +369,7 @@ CreateReferenceTableColocationId() * If this function becomes performance bottleneck, it is possible to modify this function * to perform index scan. */ -static List * +List * ReferenceTableOidList() { List *distTableOidList = DistTableOidList(); @@ -348,3 +392,25 @@ ReferenceTableOidList() return referenceTableList; } + + +/* CompareOids is a comparison function for sort shard oids */ +static int +CompareOids(const void *leftElement, const void *rightElement) +{ + Oid *leftId = (Oid *) leftElement; + Oid *rightId = (Oid *) rightElement; + + if (*leftId > *rightId) + { + return 1; + } + else if (*leftId < *rightId) + { + return -1; + } + else + { + return 0; + } +} diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index cde2664ca..5a84c6e36 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -14,5 +14,8 @@ extern uint32 CreateReferenceTableColocationId(void); extern void ReplicateAllReferenceTablesToAllNodes(void); +extern void DeleteAllReferenceTablePlacementsFromNode(char *workerName, + uint32 workerPort); +extern List * ReferenceTableOidList(void); #endif /* REFERENCE_TABLE_UTILS_H_ */ diff --git a/src/test/regress/expected/multi_remove_node_reference_table.out b/src/test/regress/expected/multi_remove_node_reference_table.out new file mode 100644 index 000000000..383b2850e --- /dev/null +++ b/src/test/regress/expected/multi_remove_node_reference_table.out @@ -0,0 +1,611 @@ +-- +-- MULTI_REMOVE_NODE_REFERENCE_TABLE +-- +-- Tests that check the metadata after master_remove_node. +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1380000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1380000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1380000; +-- create copy of pg_dist_shard_placement to reload after the test +CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +-- remove non-existing node +SELECT master_remove_node('localhost', 55555); +ERROR: could not find valid entry for node "localhost:55555" +-- remove a node with no reference tables +-- verify node exist before removal +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +-- verify node is removed +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +----------------------------------- + (15,15,localhost,57638,default,f) +(1 row) + +-- remove a node with reference table +CREATE TABLE remove_node_reference_table(column1 int); +SELECT create_reference_table('remove_node_reference_table'); + create_reference_table +------------------------ + +(1 row) + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 1 | 0 +(1 row) + +-- remove same node twice +SELECT master_remove_node('localhost', :worker_2_port); +ERROR: could not find valid entry for node "localhost:57638" +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "remove_node_reference_table" to all workers + master_add_node +----------------------------------- + (16,16,localhost,57638,default,f) +(1 row) + +-- remove node in a transaction and ROLLBACK +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + +BEGIN; +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +ROLLBACK; +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + +-- remove node in a transaction and COMMIT +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + +BEGIN; +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +COMMIT; +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 1 | 0 +(1 row) + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "remove_node_reference_table" to all workers + master_add_node +----------------------------------- + (17,17,localhost,57638,default,f) +(1 row) + +-- test inserting a value then removing a node in a transaction +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + +BEGIN; +INSERT INTO remove_node_reference_table VALUES(1); +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +COMMIT; +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 1 | 0 +(1 row) + +--verify the data is inserted +SELECT * FROM remove_node_reference_table; + column1 +--------- + 1 +(1 row) + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "remove_node_reference_table" to all workers + master_add_node +----------------------------------- + (18,18,localhost,57638,default,f) +(1 row) + +-- test executing DDL command then removing a node in a transaction +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + +BEGIN; +ALTER TABLE remove_node_reference_table ADD column2 int; +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +COMMIT; +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 1 | 0 +(1 row) + +-- verify table structure is changed +\d remove_node_reference_table +Table "public.remove_node_reference_table" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + column2 | integer | + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "remove_node_reference_table" to all workers + master_add_node +----------------------------------- + (19,19,localhost,57638,default,f) +(1 row) + +-- test DROP table after removing a node in a transaction +BEGIN; +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +DROP TABLE remove_node_reference_table; +ERROR: DROP distributed table cannot run inside a transaction block +CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" +PL/pgSQL function citus_drop_trigger() line 21 at PERFORM +ROLLBACK; +-- test removing a node while there is a reference table at another schema +CREATE SCHEMA remove_node_reference_table_schema; +CREATE TABLE remove_node_reference_table_schema.table1(column1 int); +SELECT create_reference_table('remove_node_reference_table_schema.table1'); + create_reference_table +------------------------ + +(1 row) + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port +ORDER BY + shardid; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 + 1380001 | 1 | 0 | localhost | 57638 +(2 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 1 | 0 +(1 row) + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "remove_node_reference_table" to all workers +NOTICE: Replicating reference table "table1" to all workers + master_add_node +----------------------------------- + (20,20,localhost,57638,default,f) +(1 row) + +-- test with master_disable_node +-- status before master_disable_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port +ORDER BY + shardid; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 + 1380001 | 1 | 0 | localhost | 57638 +(2 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + +SELECT master_disable_node('localhost', :worker_2_port); + master_disable_node +--------------------- + +(1 row) + +-- status after master_disable_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 1 | 0 +(1 row) + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "remove_node_reference_table" to all workers +NOTICE: Replicating reference table "table1" to all workers + master_add_node +----------------------------------- + (21,21,localhost,57638,default,f) +(1 row) + +-- DROP tables to clean workspace +DROP TABLE remove_node_reference_table; +DROP TABLE remove_node_reference_table_schema.table1; +DROP SCHEMA remove_node_reference_table_schema CASCADE; +-- reload pg_dist_shard_placement table +INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); +DROP TABLE tmp_shard_placement; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 640488d5a..8e2982a92 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -210,6 +210,8 @@ test: multi_foreign_key # ---------- # multi_upgrade_reference_table tests for upgrade_reference_table UDF # multi_replicate_reference_table tests replicating reference tables to new nodes after we add new nodes +# multi_remove_node_reference_table tests metadata changes after master_remove_node # ---------- test: multi_upgrade_reference_table test: multi_replicate_reference_table +test: multi_remove_node_reference_table diff --git a/src/test/regress/sql/multi_remove_node_reference_table.sql b/src/test/regress/sql/multi_remove_node_reference_table.sql new file mode 100644 index 000000000..a5a08314a --- /dev/null +++ b/src/test/regress/sql/multi_remove_node_reference_table.sql @@ -0,0 +1,366 @@ +-- +-- MULTI_REMOVE_NODE_REFERENCE_TABLE +-- +-- Tests that check the metadata after master_remove_node. + + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1380000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1380000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1380000; + +-- create copy of pg_dist_shard_placement to reload after the test +CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; + + +-- remove non-existing node +SELECT master_remove_node('localhost', 55555); + + +-- remove a node with no reference tables + +-- verify node exist before removal +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT master_remove_node('localhost', :worker_2_port); + +-- verify node is removed +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + +-- remove a node with reference table +CREATE TABLE remove_node_reference_table(column1 int); +SELECT create_reference_table('remove_node_reference_table'); + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +SELECT master_remove_node('localhost', :worker_2_port); + +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + + +-- remove same node twice +SELECT master_remove_node('localhost', :worker_2_port); + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + +-- remove node in a transaction and ROLLBACK + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +BEGIN; +SELECT master_remove_node('localhost', :worker_2_port); +ROLLBACK; + +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + + +-- remove node in a transaction and COMMIT + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +BEGIN; +SELECT master_remove_node('localhost', :worker_2_port); +COMMIT; + +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + +-- test inserting a value then removing a node in a transaction + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +BEGIN; +INSERT INTO remove_node_reference_table VALUES(1); +SELECT master_remove_node('localhost', :worker_2_port); +COMMIT; + +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +--verify the data is inserted +SELECT * FROM remove_node_reference_table; + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + + +-- test executing DDL command then removing a node in a transaction + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +BEGIN; +ALTER TABLE remove_node_reference_table ADD column2 int; +SELECT master_remove_node('localhost', :worker_2_port); +COMMIT; + +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +-- verify table structure is changed +\d remove_node_reference_table + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + + +-- test DROP table after removing a node in a transaction +BEGIN; +SELECT master_remove_node('localhost', :worker_2_port); +DROP TABLE remove_node_reference_table; +ROLLBACK; + + +-- test removing a node while there is a reference table at another schema +CREATE SCHEMA remove_node_reference_table_schema; +CREATE TABLE remove_node_reference_table_schema.table1(column1 int); +SELECT create_reference_table('remove_node_reference_table_schema.table1'); + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port +ORDER BY + shardid; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); + +SELECT master_remove_node('localhost', :worker_2_port); + +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + + +-- test with master_disable_node + +-- status before master_disable_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port +ORDER BY + shardid; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +SELECT master_disable_node('localhost', :worker_2_port); + +-- status after master_disable_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + + +-- DROP tables to clean workspace +DROP TABLE remove_node_reference_table; +DROP TABLE remove_node_reference_table_schema.table1; +DROP SCHEMA remove_node_reference_table_schema CASCADE; + + +-- reload pg_dist_shard_placement table +INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); +DROP TABLE tmp_shard_placement;