From 3315ae614298f2e82f01a3eac0aeddf67ccf80a9 Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Thu, 5 Jan 2017 11:09:30 +0300 Subject: [PATCH] Remove placement metadata of reference tables after master_remove_node With this change, we start to delete placement of reference tables at given worker node after master_remove_node UDF call. We remove placement metadata at master node but we do not drop actual shard from the worker node. There are two reasons for that decision, first, it is not critical to DROP the shards in the workers because Citus will ignore them as long as node is removed from cluster and if we add that node back to cluster we will DROP and recreate all reference tables. Second, if node is unreachable, it becomes complicated to cover failure cases and have a transaction support. --- src/backend/distributed/utils/node_metadata.c | 39 +- .../distributed/utils/reference_table_utils.c | 72 ++- .../distributed/reference_table_utils.h | 3 + .../multi_remove_node_reference_table.out | 611 ++++++++++++++++++ src/test/regress/multi_schedule | 2 + .../sql/multi_remove_node_reference_table.sql | 366 +++++++++++ 6 files changed, 1086 insertions(+), 7 deletions(-) create mode 100644 src/test/regress/expected/multi_remove_node_reference_table.out create mode 100644 src/test/regress/sql/multi_remove_node_reference_table.sql diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index ebcc52215..cde71d763 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -23,6 +23,7 @@ #include "access/xact.h" #include "catalog/indexing.h" #include "commands/sequence.h" +#include "distributed/colocation_utils.h" #include "distributed/connection_management.h" #include "distributed/master_protocol.h" #include "distributed/master_metadata_utility.h" @@ -107,6 +108,9 @@ master_add_node(PG_FUNCTION_ARGS) * the master node and all nodes with metadata. * The call to the master_remove_node should be done by the super user and the specified * node should not have any active placements. + * This function also deletes all reference table placements belong to the given node from + * pg_dist_shard_placement, but it does not drop actual placement at the node. In the case + * of re-adding the node, master_add_node first drops and re-creates the reference tables. */ Datum master_remove_node(PG_FUNCTION_ARGS) @@ -126,6 +130,9 @@ master_remove_node(PG_FUNCTION_ARGS) * the master node and all nodes with metadata regardless of the node having an active * shard placement. * The call to the master_remove_node should be done by the super user. + * This function also deletes all reference table placements belong to the given node from + * pg_dist_shard_placement, but it does not drop actual placement at the node. In the case + * of re-adding the node, master_add_node first drops and re-creates the reference tables. */ Datum master_disable_node(PG_FUNCTION_ARGS) @@ -345,6 +352,10 @@ ReadWorkerNodes() * The call to the master_remove_node should be done by the super user. If there are * active shard placements on the node; the function removes the node when forceRemove * flag is set, it errors out otherwise. + * This function also deletes all reference table placements belong to the given node from + * pg_dist_shard_placement, but it does not drop actual placement at the node. It also + * modifies replication factor of the colocation group of reference tables, so that + * replication factor will be equal to worker count. */ static void RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove) @@ -352,10 +363,34 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove) char *nodeDeleteCommand = NULL; bool hasShardPlacements = false; WorkerNode *workerNode = NULL; + List *referenceTableList = NIL; EnsureSchemaNode(); EnsureSuperUser(); + workerNode = FindWorkerNode(nodeName, nodePort); + + DeleteNodeRow(nodeName, nodePort); + + DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort); + + /* + * After deleting reference tables placements, we will update replication factor + * column for colocation group of reference tables so that replication factor will + * be equal to worker count. + */ + referenceTableList = ReferenceTableOidList(); + if (list_length(referenceTableList) != 0) + { + Oid firstReferenceTableId = linitial_oid(referenceTableList); + uint32 referenceTableColocationId = TableColocationId(firstReferenceTableId); + + List *workerNodeList = WorkerNodeList(); + int workerCount = list_length(workerNodeList); + + UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount); + } + hasShardPlacements = NodeHasActiveShardPlacements(nodeName, nodePort); if (hasShardPlacements) { @@ -375,10 +410,6 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove) } } - workerNode = FindWorkerNode(nodeName, nodePort); - - DeleteNodeRow(nodeName, nodePort); - nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); /* make sure we don't have any lingering session lifespan connections */ diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 905d7b8d2..3b67919f6 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -24,6 +24,7 @@ #include "distributed/reference_table_utils.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" +#include "distributed/transaction_management.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" #include "utils/fmgroids.h" @@ -35,7 +36,7 @@ static void ReplicateSingleShardTableToAllWorkers(Oid relationId); static void ReplicateShardToAllWorkers(ShardInterval *shardInterval); static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId); -static List * ReferenceTableOidList(void); +static int CompareOids(const void *leftElement, const void *rightElement); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(upgrade_to_reference_table); @@ -129,6 +130,12 @@ ReplicateAllReferenceTablesToAllNodes() workerNodeList = WorkerNodeList(); workerCount = list_length(workerNodeList); + + /* + * We sort the reference table list to prevent deadlocks in concurrent + * ReplicateAllReferenceTablesToAllNodes calls. + */ + referenceTableList = SortList(referenceTableList, CompareOids); foreach(referenceTableCell, referenceTableList) { Oid referenceTableId = lfirst_oid(referenceTableCell); @@ -153,7 +160,6 @@ ReplicateAllReferenceTablesToAllNodes() firstReferenceTableId = linitial_oid(referenceTableList); referenceTableColocationId = TableColocationId(firstReferenceTableId); UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount); - heap_close(pgDistNode, NoLock); } @@ -318,6 +324,44 @@ CreateReferenceTableColocationId() } +/* + * DeleteAllReferenceTablePlacementsFromNode function iterates over list of reference + * tables and deletes all reference table placements from pg_dist_shard_placement table + * for given worker node. However, it does not modify replication factor of the colocation + * group of reference tables. It is caller's responsibility to do that if it is necessary. + */ +void +DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort) +{ + List *referenceTableList = ReferenceTableOidList(); + ListCell *referenceTableCell = NULL; + + /* if there are no reference tables, we do not need to do anything */ + if (list_length(referenceTableList) == 0) + { + return; + } + + /* + * We sort the reference table list to prevent deadlocks in concurrent + * DeleteAllReferenceTablePlacementsFromNode calls. + */ + referenceTableList = SortList(referenceTableList, CompareOids); + foreach(referenceTableCell, referenceTableList) + { + Oid referenceTableId = lfirst_oid(referenceTableCell); + + List *shardIntervalList = LoadShardIntervalList(referenceTableId); + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + uint64 shardId = shardInterval->shardId; + + LockShardDistributionMetadata(shardId, ExclusiveLock); + + DeleteShardPlacementRow(shardId, workerName, workerPort); + } +} + + /* * ReferenceTableOidList function scans pg_dist_partition to create a list of all * reference tables. To create the list, it performs sequential scan. Since it is not @@ -325,7 +369,7 @@ CreateReferenceTableColocationId() * If this function becomes performance bottleneck, it is possible to modify this function * to perform index scan. */ -static List * +List * ReferenceTableOidList() { List *distTableOidList = DistTableOidList(); @@ -348,3 +392,25 @@ ReferenceTableOidList() return referenceTableList; } + + +/* CompareOids is a comparison function for sort shard oids */ +static int +CompareOids(const void *leftElement, const void *rightElement) +{ + Oid *leftId = (Oid *) leftElement; + Oid *rightId = (Oid *) rightElement; + + if (*leftId > *rightId) + { + return 1; + } + else if (*leftId < *rightId) + { + return -1; + } + else + { + return 0; + } +} diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index cde2664ca..5a84c6e36 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -14,5 +14,8 @@ extern uint32 CreateReferenceTableColocationId(void); extern void ReplicateAllReferenceTablesToAllNodes(void); +extern void DeleteAllReferenceTablePlacementsFromNode(char *workerName, + uint32 workerPort); +extern List * ReferenceTableOidList(void); #endif /* REFERENCE_TABLE_UTILS_H_ */ diff --git a/src/test/regress/expected/multi_remove_node_reference_table.out b/src/test/regress/expected/multi_remove_node_reference_table.out new file mode 100644 index 000000000..383b2850e --- /dev/null +++ b/src/test/regress/expected/multi_remove_node_reference_table.out @@ -0,0 +1,611 @@ +-- +-- MULTI_REMOVE_NODE_REFERENCE_TABLE +-- +-- Tests that check the metadata after master_remove_node. +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1380000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1380000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1380000; +-- create copy of pg_dist_shard_placement to reload after the test +CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +-- remove non-existing node +SELECT master_remove_node('localhost', 55555); +ERROR: could not find valid entry for node "localhost:55555" +-- remove a node with no reference tables +-- verify node exist before removal +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +-- verify node is removed +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +----------------------------------- + (15,15,localhost,57638,default,f) +(1 row) + +-- remove a node with reference table +CREATE TABLE remove_node_reference_table(column1 int); +SELECT create_reference_table('remove_node_reference_table'); + create_reference_table +------------------------ + +(1 row) + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 1 | 0 +(1 row) + +-- remove same node twice +SELECT master_remove_node('localhost', :worker_2_port); +ERROR: could not find valid entry for node "localhost:57638" +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "remove_node_reference_table" to all workers + master_add_node +----------------------------------- + (16,16,localhost,57638,default,f) +(1 row) + +-- remove node in a transaction and ROLLBACK +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + +BEGIN; +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +ROLLBACK; +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + +-- remove node in a transaction and COMMIT +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + +BEGIN; +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +COMMIT; +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 1 | 0 +(1 row) + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "remove_node_reference_table" to all workers + master_add_node +----------------------------------- + (17,17,localhost,57638,default,f) +(1 row) + +-- test inserting a value then removing a node in a transaction +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + +BEGIN; +INSERT INTO remove_node_reference_table VALUES(1); +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +COMMIT; +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 1 | 0 +(1 row) + +--verify the data is inserted +SELECT * FROM remove_node_reference_table; + column1 +--------- + 1 +(1 row) + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "remove_node_reference_table" to all workers + master_add_node +----------------------------------- + (18,18,localhost,57638,default,f) +(1 row) + +-- test executing DDL command then removing a node in a transaction +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + +BEGIN; +ALTER TABLE remove_node_reference_table ADD column2 int; +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +COMMIT; +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 1 | 0 +(1 row) + +-- verify table structure is changed +\d remove_node_reference_table +Table "public.remove_node_reference_table" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + column2 | integer | + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "remove_node_reference_table" to all workers + master_add_node +----------------------------------- + (19,19,localhost,57638,default,f) +(1 row) + +-- test DROP table after removing a node in a transaction +BEGIN; +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +DROP TABLE remove_node_reference_table; +ERROR: DROP distributed table cannot run inside a transaction block +CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" +PL/pgSQL function citus_drop_trigger() line 21 at PERFORM +ROLLBACK; +-- test removing a node while there is a reference table at another schema +CREATE SCHEMA remove_node_reference_table_schema; +CREATE TABLE remove_node_reference_table_schema.table1(column1 int); +SELECT create_reference_table('remove_node_reference_table_schema.table1'); + create_reference_table +------------------------ + +(1 row) + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port +ORDER BY + shardid; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 + 1380001 | 1 | 0 | localhost | 57638 +(2 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 1 | 0 +(1 row) + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "remove_node_reference_table" to all workers +NOTICE: Replicating reference table "table1" to all workers + master_add_node +----------------------------------- + (20,20,localhost,57638,default,f) +(1 row) + +-- test with master_disable_node +-- status before master_disable_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port +ORDER BY + shardid; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 + 1380001 | 1 | 0 | localhost | 57638 +(2 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + +SELECT master_disable_node('localhost', :worker_2_port); + master_disable_node +--------------------- + +(1 row) + +-- status after master_disable_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 1 | 0 +(1 row) + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "remove_node_reference_table" to all workers +NOTICE: Replicating reference table "table1" to all workers + master_add_node +----------------------------------- + (21,21,localhost,57638,default,f) +(1 row) + +-- DROP tables to clean workspace +DROP TABLE remove_node_reference_table; +DROP TABLE remove_node_reference_table_schema.table1; +DROP SCHEMA remove_node_reference_table_schema CASCADE; +-- reload pg_dist_shard_placement table +INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); +DROP TABLE tmp_shard_placement; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 640488d5a..8e2982a92 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -210,6 +210,8 @@ test: multi_foreign_key # ---------- # multi_upgrade_reference_table tests for upgrade_reference_table UDF # multi_replicate_reference_table tests replicating reference tables to new nodes after we add new nodes +# multi_remove_node_reference_table tests metadata changes after master_remove_node # ---------- test: multi_upgrade_reference_table test: multi_replicate_reference_table +test: multi_remove_node_reference_table diff --git a/src/test/regress/sql/multi_remove_node_reference_table.sql b/src/test/regress/sql/multi_remove_node_reference_table.sql new file mode 100644 index 000000000..a5a08314a --- /dev/null +++ b/src/test/regress/sql/multi_remove_node_reference_table.sql @@ -0,0 +1,366 @@ +-- +-- MULTI_REMOVE_NODE_REFERENCE_TABLE +-- +-- Tests that check the metadata after master_remove_node. + + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1380000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1380000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1380000; + +-- create copy of pg_dist_shard_placement to reload after the test +CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; + + +-- remove non-existing node +SELECT master_remove_node('localhost', 55555); + + +-- remove a node with no reference tables + +-- verify node exist before removal +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT master_remove_node('localhost', :worker_2_port); + +-- verify node is removed +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + +-- remove a node with reference table +CREATE TABLE remove_node_reference_table(column1 int); +SELECT create_reference_table('remove_node_reference_table'); + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +SELECT master_remove_node('localhost', :worker_2_port); + +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + + +-- remove same node twice +SELECT master_remove_node('localhost', :worker_2_port); + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + +-- remove node in a transaction and ROLLBACK + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +BEGIN; +SELECT master_remove_node('localhost', :worker_2_port); +ROLLBACK; + +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + + +-- remove node in a transaction and COMMIT + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +BEGIN; +SELECT master_remove_node('localhost', :worker_2_port); +COMMIT; + +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + +-- test inserting a value then removing a node in a transaction + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +BEGIN; +INSERT INTO remove_node_reference_table VALUES(1); +SELECT master_remove_node('localhost', :worker_2_port); +COMMIT; + +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +--verify the data is inserted +SELECT * FROM remove_node_reference_table; + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + + +-- test executing DDL command then removing a node in a transaction + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +BEGIN; +ALTER TABLE remove_node_reference_table ADD column2 int; +SELECT master_remove_node('localhost', :worker_2_port); +COMMIT; + +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +-- verify table structure is changed +\d remove_node_reference_table + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + + +-- test DROP table after removing a node in a transaction +BEGIN; +SELECT master_remove_node('localhost', :worker_2_port); +DROP TABLE remove_node_reference_table; +ROLLBACK; + + +-- test removing a node while there is a reference table at another schema +CREATE SCHEMA remove_node_reference_table_schema; +CREATE TABLE remove_node_reference_table_schema.table1(column1 int); +SELECT create_reference_table('remove_node_reference_table_schema.table1'); + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port +ORDER BY + shardid; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); + +SELECT master_remove_node('localhost', :worker_2_port); + +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + + +-- test with master_disable_node + +-- status before master_disable_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port +ORDER BY + shardid; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +SELECT master_disable_node('localhost', :worker_2_port); + +-- status after master_disable_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + + +-- DROP tables to clean workspace +DROP TABLE remove_node_reference_table; +DROP TABLE remove_node_reference_table_schema.table1; +DROP SCHEMA remove_node_reference_table_schema CASCADE; + + +-- reload pg_dist_shard_placement table +INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); +DROP TABLE tmp_shard_placement;