diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index f44e03a4d..74c9f39d5 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -31,6 +31,7 @@ #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" #include "distributed/multi_logical_optimizer.h" +#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard_placement.h" @@ -867,6 +868,63 @@ UpdateShardPlacementState(uint64 placementId, char shardState) } +/* + * UpdateColocationGroupReplicationFactor finds colocation group record for given + * colocationId and updates its replication factor to given replicationFactor value. + * Since we do not cache pg_dist_colocation table, we do not need to invalidate the + * cache after updating replication factor. + */ +void +UpdateColocationGroupReplicationFactor(uint32 colocationId, int replicationFactor) +{ + Relation pgDistColocation = NULL; + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + bool indexOK = true; + HeapTuple heapTuple = NULL; + TupleDesc tupleDescriptor = NULL; + + Datum values[Natts_pg_dist_colocation]; + bool isnull[Natts_pg_dist_colocation]; + bool replace[Natts_pg_dist_colocation]; + + /* we first search for colocation group by its colocation id */ + pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); + tupleDescriptor = RelationGetDescr(pgDistColocation); + ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_colocationid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(colocationId)); + + scanDescriptor = systable_beginscan(pgDistColocation, + DistColocationColocationidIndexId(), indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("could not find valid entry for colocation group " + "%d", colocationId))); + } + + /* after we find colocation group, we update it with new values */ + memset(replace, 0, sizeof(replace)); + + values[Anum_pg_dist_colocation_replicationfactor - 1] = Int32GetDatum( + replicationFactor); + isnull[Anum_pg_dist_colocation_replicationfactor - 1] = false; + replace[Anum_pg_dist_colocation_replicationfactor - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + simple_heap_update(pgDistColocation, &heapTuple->t_self, heapTuple); + + CatalogUpdateIndexes(pgDistColocation, heapTuple); + + systable_endscan(scanDescriptor); + heap_close(pgDistColocation, NoLock); +} + + /* * Check that the current user has `mode` permissions on relationId, error out * if not. Superusers always have such permissions. diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index bb1bd75f9..55b5329c1 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -58,6 +58,7 @@ static Oid distNodeRelationId = InvalidOid; static Oid distLocalGroupRelationId = InvalidOid; static Oid distColocationRelationId = InvalidOid; static Oid distColocationConfigurationIndexId = InvalidOid; +static Oid distColocationColocationidIndexId = InvalidOid; static Oid distPartitionRelationId = InvalidOid; static Oid distPartitionLogicalRelidIndexId = InvalidOid; static Oid distPartitionColocationidIndexId = InvalidOid; @@ -105,7 +106,6 @@ static uint32 WorkerNodeHashCode(const void *key, Size keySize); static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); -static List * DistTableOidList(void); static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId); static List * LookupDistShardTuples(Oid relationId); @@ -762,6 +762,17 @@ DistColocationConfigurationIndexId(void) } +/* return oid of pg_dist_colocation_pkey index */ +Oid +DistColocationColocationidIndexId(void) +{ + CachedRelationLookup("pg_dist_colocation_pkey", + &distColocationColocationidIndexId); + + return distColocationColocationidIndexId; +} + + /* return oid of pg_dist_partition relation */ Oid DistPartitionRelationId(void) @@ -1565,6 +1576,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) distNodeRelationId = InvalidOid; distColocationRelationId = InvalidOid; distColocationConfigurationIndexId = InvalidOid; + distColocationColocationidIndexId = InvalidOid; distPartitionRelationId = InvalidOid; distPartitionLogicalRelidIndexId = InvalidOid; distPartitionColocationidIndexId = InvalidOid; @@ -1583,7 +1595,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) * DistTableOidList iterates over the pg_dist_partition table and returns * a list that consists of the logicalrelids. */ -static List * +List * DistTableOidList(void) { SysScanDesc scanDescriptor = NULL; diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 01911ed0b..ac8254955 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -30,6 +30,7 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_join_order.h" #include "distributed/pg_dist_node.h" +#include "distributed/reference_table_utils.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" @@ -48,7 +49,7 @@ int GroupSize = 1; /* local function forward declarations */ static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, - char *nodeRack, bool hasMetadata); + char *nodeRack, bool hasMetadata, bool *nodeAlreadyExists); static Datum GenerateNodeTuple(WorkerNode *workerNode); static int32 GetNextGroupId(void); static uint32 GetMaxGroupId(void); @@ -67,7 +68,8 @@ PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column); /* - * master_add_node function adds a new node to the cluster and returns its data. + * master_add_node function adds a new node to the cluster and returns its data. It also + * replicates all reference tables to the new node. */ Datum master_add_node(PG_FUNCTION_ARGS) @@ -78,9 +80,21 @@ master_add_node(PG_FUNCTION_ARGS) int32 groupId = 0; char *nodeRack = WORKER_DEFAULT_RACK; bool hasMetadata = false; + bool nodeAlreadyExists = false; Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, - hasMetadata); + hasMetadata, &nodeAlreadyExists); + + /* + * After adding new node, if the node is not already exist, we replicate all existing + * reference tables to the new node. ReplicateAllReferenceTablesToAllNodes replicates + * reference tables to all nodes however, it skips nodes which already has healthy + * placement of particular reference table. + */ + if (!nodeAlreadyExists) + { + ReplicateAllReferenceTablesToAllNodes(); + } PG_RETURN_CSTRING(returnData); } @@ -137,13 +151,14 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) { ListCell *workerNodeCell = NULL; List *workerNodes = ParseWorkerNodeFileAndRename(); + bool nodeAlreadyExists = false; foreach(workerNodeCell, workerNodes) { WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0, - workerNode->workerRack, false); + workerNode->workerRack, false, &nodeAlreadyExists); } PG_RETURN_BOOL(true); @@ -336,7 +351,7 @@ ReadWorkerNodes() */ static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, - bool hasMetadata) + bool hasMetadata, bool *nodeAlreadyExists) { Relation pgDistNode = NULL; int nextNodeIdInt = 0; @@ -349,6 +364,8 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, EnsureSchemaNode(); EnsureSuperUser(); + *nodeAlreadyExists = false; + /* acquire a lock so that no one can do this concurrently */ pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); @@ -362,6 +379,8 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, /* close the heap */ heap_close(pgDistNode, AccessExclusiveLock); + *nodeAlreadyExists = true; + PG_RETURN_DATUM(returnData); } diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 5f80a1bfb..905d7b8d2 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -13,9 +13,12 @@ #include "miscadmin.h" #include "access/heapam.h" +#include "access/htup_details.h" +#include "access/genam.h" #include "distributed/colocation_utils.h" #include "distributed/listutils.h" #include "distributed/master_protocol.h" +#include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_logical_planner.h" #include "distributed/reference_table_utils.h" @@ -23,14 +26,16 @@ #include "distributed/shardinterval_utils.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/rel.h" /* local function forward declarations */ static void ReplicateSingleShardTableToAllWorkers(Oid relationId); static void ReplicateShardToAllWorkers(ShardInterval *shardInterval); static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId); - +static List * ReferenceTableOidList(void); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(upgrade_to_reference_table); @@ -93,6 +98,66 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) } +/* + * ReplicateAllReferenceTablesToAllNodes function finds all reference tables and + * replicates them to all worker nodes. It also modifies pg_dist_colocation table to + * update the replication factor column. This function skips a worker node if that node + * already has healthy placement of a particular reference table to prevent unnecessary + * data transfer. + */ +void +ReplicateAllReferenceTablesToAllNodes() +{ + List *referenceTableList = ReferenceTableOidList(); + ListCell *referenceTableCell = NULL; + + Relation pgDistNode = NULL; + List *workerNodeList = NIL; + int workerCount = 0; + + Oid firstReferenceTableId = InvalidOid; + uint32 referenceTableColocationId = INVALID_COLOCATION_ID; + + /* if there is no reference table, we do not need to do anything */ + if (list_length(referenceTableList) == 0) + { + return; + } + + /* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */ + pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); + workerNodeList = WorkerNodeList(); + workerCount = list_length(workerNodeList); + + foreach(referenceTableCell, referenceTableList) + { + Oid referenceTableId = lfirst_oid(referenceTableCell); + List *shardIntervalList = LoadShardIntervalList(referenceTableId); + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + uint64 shardId = shardInterval->shardId; + char *relationName = get_rel_name(referenceTableId); + + LockShardDistributionMetadata(shardId, ExclusiveLock); + + ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to all workers", + relationName))); + + ReplicateShardToAllWorkers(shardInterval); + } + + /* + * After replicating reference tables, we will update replication factor column for + * colocation group of reference tables so that worker count will be equal to + * replication factor again. + */ + firstReferenceTableId = linitial_oid(referenceTableList); + referenceTableColocationId = TableColocationId(firstReferenceTableId); + UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount); + + heap_close(pgDistNode, NoLock); +} + + /* * ReplicateSingleShardTableToAllWorkers accepts a broadcast table and replicates it to * all worker nodes. It assumes that caller of this function ensures that given broadcast @@ -176,6 +241,7 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval) ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, nodeName, nodePort, missingWorkerOk); + if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) { SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, @@ -250,3 +316,35 @@ CreateReferenceTableColocationId() return colocationId; } + + +/* + * ReferenceTableOidList function scans pg_dist_partition to create a list of all + * reference tables. To create the list, it performs sequential scan. Since it is not + * expected that this function will be called frequently, it is OK not to use index scan. + * If this function becomes performance bottleneck, it is possible to modify this function + * to perform index scan. + */ +static List * +ReferenceTableOidList() +{ + List *distTableOidList = DistTableOidList(); + ListCell *distTableOidCell = NULL; + + List *referenceTableList = NIL; + + foreach(distTableOidCell, distTableOidList) + { + DistTableCacheEntry *cacheEntry = NULL; + Oid relationId = lfirst_oid(distTableOidCell); + + cacheEntry = DistributedTableCacheEntry(relationId); + + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) + { + referenceTableList = lappend_oid(referenceTableList, relationId); + } + } + + return referenceTableList; +} diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 4e3298c05..a6e6c9070 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -85,6 +85,8 @@ extern void DeleteShardRow(uint64 shardId); extern void UpdateShardPlacementState(uint64 placementId, char shardState); extern uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort); +extern void UpdateColocationGroupReplicationFactor(uint32 colocationId, + int replicationFactor); extern void CreateTruncateTrigger(Oid relationId); /* Remaining metadata utility functions */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 4b8753694..38fbb9cb1 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -57,6 +57,7 @@ extern List * DistributedTableList(void); extern ShardInterval * LoadShardInterval(uint64 shardId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern int GetLocalGroupId(void); +extern List * DistTableOidList(void); extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByShardId(int64 shardId); @@ -68,6 +69,7 @@ extern HTAB * GetWorkerNodeHash(void); /* relation oids */ extern Oid DistColocationRelationId(void); extern Oid DistColocationConfigurationIndexId(void); +extern Oid DistColocationColocationidIndexId(void); extern Oid DistPartitionRelationId(void); extern Oid DistShardRelationId(void); extern Oid DistShardPlacementRelationId(void); diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index 0ad98bbd9..cde2664ca 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -13,5 +13,6 @@ #define REFERENCE_TABLE_UTILS_H_ extern uint32 CreateReferenceTableColocationId(void); +extern void ReplicateAllReferenceTablesToAllNodes(void); #endif /* REFERENCE_TABLE_UTILS_H_ */ diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index 724f5fa04..048fa084d 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -436,10 +436,10 @@ SELECT * FROM pg_dist_colocation ORDER BY colocationid; colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1 | 2 | 2 | 23 - 2 | 2 | 1 | 23 - 3 | 2 | 2 | 25 - 4 | 4 | 2 | 23 + 4 | 2 | 2 | 23 + 5 | 2 | 1 | 23 + 6 | 2 | 2 | 25 + 7 | 4 | 2 | 23 (4 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition @@ -447,28 +447,28 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition ORDER BY logicalrelid; logicalrelid | colocationid ---------------+-------------- - table1_groupa | 1 - table2_groupa | 1 - table1_groupb | 2 - table2_groupb | 2 - table1_groupc | 3 - table2_groupc | 3 - table1_groupd | 4 - table2_groupd | 4 - table3_groupd | 4 + table1_groupa | 4 + table2_groupa | 4 + table1_groupb | 5 + table2_groupb | 5 + table1_groupc | 6 + table2_groupc | 6 + table1_groupd | 7 + table2_groupd | 7 + table3_groupd | 7 (9 rows) -- check effects of dropping tables DROP TABLE table1_groupA; -SELECT * FROM pg_dist_colocation WHERE colocationid = 1; +SELECT * FROM pg_dist_colocation WHERE colocationid = 4; colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1 | 2 | 2 | 23 + 4 | 2 | 2 | 23 (1 row) -- dropping all tables in a colocation group also deletes the colocation group DROP TABLE table2_groupA; -SELECT * FROM pg_dist_colocation WHERE colocationid = 1; +SELECT * FROM pg_dist_colocation WHERE colocationid = 4; colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ (0 rows) @@ -558,11 +558,11 @@ SELECT * FROM pg_dist_colocation ORDER BY colocationid; colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 2 | 2 | 1 | 23 - 3 | 2 | 2 | 25 - 4 | 4 | 2 | 23 - 5 | 2 | 2 | 23 - 9 | 3 | 2 | 23 + 5 | 2 | 1 | 23 + 6 | 2 | 2 | 25 + 7 | 4 | 2 | 23 + 8 | 2 | 2 | 23 + 12 | 3 | 2 | 23 (5 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition @@ -570,23 +570,23 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition ORDER BY colocationid, logicalrelid; logicalrelid | colocationid ----------------------------------+-------------- - table1_groupb | 2 - table2_groupb | 2 - table1_groupc | 3 - table2_groupc | 3 - table1_groupd | 4 - table2_groupd | 4 - table3_groupd | 4 - table1_groupe | 5 - table2_groupe | 5 - table3_groupe | 5 - schema_collocation.table4_groupe | 5 - table4_groupe | 5 - table1_group_none_1 | 6 - table2_group_none_1 | 6 - table1_group_none_2 | 7 - table1_group_none_3 | 8 - table1_group_default | 9 + table1_groupb | 5 + table2_groupb | 5 + table1_groupc | 6 + table2_groupc | 6 + table1_groupd | 7 + table2_groupd | 7 + table3_groupd | 7 + table1_groupe | 8 + table2_groupe | 8 + table3_groupe | 8 + schema_collocation.table4_groupe | 8 + table4_groupe | 8 + table1_group_none_1 | 9 + table2_group_none_1 | 9 + table1_group_none_2 | 10 + table1_group_none_3 | 11 + table1_group_default | 12 (17 rows) -- check failing colocate_with options @@ -651,12 +651,12 @@ SELECT * FROM pg_dist_colocation ORDER BY colocationid; colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 2 | 2 | 1 | 23 - 3 | 2 | 2 | 25 - 4 | 4 | 2 | 23 - 5 | 2 | 2 | 23 - 9 | 3 | 2 | 23 - 10 | 1 | 2 | 0 + 5 | 2 | 1 | 23 + 6 | 2 | 2 | 25 + 7 | 4 | 2 | 23 + 8 | 2 | 2 | 23 + 12 | 3 | 2 | 23 + 13 | 1 | 2 | 0 (6 rows) -- cross check with internal colocation API diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index d8b4a123e..9157c0331 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -1574,7 +1574,7 @@ ERROR: single-shard DML commands must not appear in transaction blocks which co ROLLBACK; -- clean up tables DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third, - reference_table_test_fourth, reference_table_ddl; + reference_table_test_fourth, reference_table_ddl, reference_table_composite; DROP SCHEMA reference_schema CASCADE; NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to table reference_schema.reference_table_test_sixth diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out new file mode 100644 index 000000000..82ec296ee --- /dev/null +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -0,0 +1,598 @@ +-- +-- MULTI_REPLICATE_REFERENCE_TABLE +-- +-- Tests that check the metadata returned by the master node. +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1370000; +-- remove a node for testing purposes +CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +-- test adding new node with no reference tables +-- verify there is no node with nodeport = :worker_2_port before adding the node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +--------------------------------- + (4,4,localhost,57638,default,f) +(1 row) + +-- verify node is added +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +-- verify nothing is replicated to the new node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +-- test adding new node with a reference table which does not have any healthy placement +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +-- verify there is no node with nodeport = :worker_2_port before adding the node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +CREATE TABLE replicate_reference_table_unhealthy(column1 int); +SELECT create_reference_table('replicate_reference_table_unhealthy'); + create_reference_table +------------------------ + +(1 row) + +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_unhealthy" to all workers +ERROR: could not find any healthy placement for shard 1370000 +-- verify node is not added +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +-- verify nothing is replicated to the new node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +DROP TABLE replicate_reference_table_unhealthy; +-- test replicating a reference table when a new node added +CREATE TABLE replicate_reference_table_valid(column1 int); +SELECT create_reference_table('replicate_reference_table_valid'); + create_reference_table +------------------------ + +(1 row) + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370001 | 1 | 1 | 0 +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_valid" to all workers + master_add_node +--------------------------------- + (6,6,localhost,57638,default,f) +(1 row) + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370001 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370001 | 1 | 2 | 0 +(1 row) + +-- test add same node twice +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370001 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370001 | 1 | 2 | 0 +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +--------------------------------- + (6,6,localhost,57638,default,f) +(1 row) + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370001 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370001 | 1 | 2 | 0 +(1 row) + +DROP TABLE replicate_reference_table_valid; +-- test replicating a reference table when a new node added in TRANSACTION + ROLLBACK +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +CREATE TABLE replicate_reference_table_rollback(column1 int); +SELECT create_reference_table('replicate_reference_table_rollback'); + create_reference_table +------------------------ + +(1 row) + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370002 | 1 | 1 | 0 +(1 row) + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_rollback" to all workers + master_add_node +--------------------------------- + (7,7,localhost,57638,default,f) +(1 row) + +ROLLBACK; +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370002 | 1 | 1 | 0 +(1 row) + +DROP TABLE replicate_reference_table_rollback; +-- test replicating a reference table when a new node added in TRANSACTION + COMMIT +CREATE TABLE replicate_reference_table_commit(column1 int); +SELECT create_reference_table('replicate_reference_table_commit'); + create_reference_table +------------------------ + +(1 row) + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_commit'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370003 | 1 | 1 | 0 +(1 row) + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_commit" to all workers + master_add_node +--------------------------------- + (8,8,localhost,57638,default,f) +(1 row) + +COMMIT; +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370003 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_commit'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370003 | 1 | 2 | 0 +(1 row) + +DROP TABLE replicate_reference_table_commit; +-- test adding new node + upgrading another hash distributed table to reference table + creating new reference table in TRANSACTION +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +CREATE TABLE replicate_reference_table_reference_one(column1 int); +SELECT create_reference_table('replicate_reference_table_reference_one'); + create_reference_table +------------------------ + +(1 row) + +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 1; +CREATE TABLE replicate_reference_table_hash(column1 int); +SELECT create_distributed_table('replicate_reference_table_hash', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE replicate_reference_table_reference_two(column1 int); +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370004 | 1 | 1 | 0 +(1 row) + +SELECT + logicalrelid, partmethod, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); + logicalrelid | partmethod | colocationid | repmodel +-----------------------------------------+------------+--------------+---------- + replicate_reference_table_reference_one | n | 1370004 | t + replicate_reference_table_hash | h | 1370005 | s +(2 rows) + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_reference_one" to all workers + master_add_node +--------------------------------- + (9,9,localhost,57638,default,f) +(1 row) + +SELECT upgrade_to_reference_table('replicate_reference_table_hash'); + upgrade_to_reference_table +---------------------------- + +(1 row) + +SELECT create_reference_table('replicate_reference_table_reference_two'); + create_reference_table +------------------------ + +(1 row) + +COMMIT; +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370004 | 1 | 0 | localhost | 57638 + 1370005 | 1 | 0 | localhost | 57638 + 1370006 | 1 | 0 | localhost | 57638 +(3 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370004 | 1 | 2 | 0 +(1 row) + +SELECT + logicalrelid, partmethod, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); + logicalrelid | partmethod | colocationid | repmodel +-----------------------------------------+------------+--------------+---------- + replicate_reference_table_reference_one | n | 1370004 | t + replicate_reference_table_hash | n | 1370004 | t + replicate_reference_table_reference_two | n | 1370004 | t +(3 rows) + +DROP TABLE replicate_reference_table_reference_one; +DROP TABLE replicate_reference_table_hash; +DROP TABLE replicate_reference_table_reference_two; +-- test inserting a value then adding a new node in a transaction +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +CREATE TABLE replicate_reference_table_insert(column1 int); +SELECT create_reference_table('replicate_reference_table_insert'); + create_reference_table +------------------------ + +(1 row) + +BEGIN; +INSERT INTO replicate_reference_table_insert VALUES(1); +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_insert" to all workers +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +DROP TABLE replicate_reference_table_insert; +-- test COPY then adding a new node in a transaction +CREATE TABLE replicate_reference_table_copy(column1 int); +SELECT create_reference_table('replicate_reference_table_copy'); + create_reference_table +------------------------ + +(1 row) + +BEGIN; +COPY replicate_reference_table_copy FROM STDIN; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_copy" to all workers +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +DROP TABLE replicate_reference_table_copy; +-- test executing DDL command then adding a new node in a transaction +CREATE TABLE replicate_reference_table_ddl(column1 int); +SELECT create_reference_table('replicate_reference_table_ddl'); + create_reference_table +------------------------ + +(1 row) + +BEGIN; +ALTER TABLE replicate_reference_table_ddl ADD column2 int; +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_ddl" to all workers +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +DROP TABLE replicate_reference_table_ddl; +-- test DROP table after adding new node in a transaction +CREATE TABLE replicate_reference_table_drop(column1 int); +SELECT create_reference_table('replicate_reference_table_drop'); + create_reference_table +------------------------ + +(1 row) + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_drop" to all workers + master_add_node +----------------------------------- + (13,13,localhost,57638,default,f) +(1 row) + +DROP TABLE replicate_reference_table_drop; +ERROR: DROP distributed table cannot run inside a transaction block +CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" +PL/pgSQL function citus_drop_trigger() line 21 at PERFORM +ROLLBACK; +DROP TABLE replicate_reference_table_drop; +-- test adding a node while there is a reference table at another schema +CREATE SCHEMA replicate_reference_table_schema; +CREATE TABLE replicate_reference_table_schema.table1(column1 int); +SELECT create_reference_table('replicate_reference_table_schema.table1'); + create_reference_table +------------------------ + +(1 row) + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370010 | 1 | 1 | 0 +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "table1" to all workers + master_add_node +----------------------------------- + (14,14,localhost,57638,default,f) +(1 row) + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370011 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370010 | 1 | 2 | 0 +(1 row) + +DROP TABLE replicate_reference_table_schema.table1; +DROP SCHEMA replicate_reference_table_schema CASCADE; +-- reload pg_dist_shard_placement table +INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); +DROP TABLE tmp_shard_placement; diff --git a/src/test/regress/expected/multi_upgrade_reference_table.out b/src/test/regress/expected/multi_upgrade_reference_table.out index 3ced545fa..348e9137f 100644 --- a/src/test/regress/expected/multi_upgrade_reference_table.out +++ b/src/test/regress/expected/multi_upgrade_reference_table.out @@ -5,6 +5,7 @@ -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1360000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1360000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1360000; -- test with not distributed table CREATE TABLE upgrade_reference_table_local(column1 int); SELECT upgrade_to_reference_table('upgrade_reference_table_local'); @@ -138,15 +139,16 @@ WHERE colocationid IN --------------+------------+-------------------+------------------------ (0 rows) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360009 | 1 | 8192 | localhost | 57637 | 379 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360009 | 1 | 8192 | localhost | 57637 (1 row) SELECT upgrade_to_reference_table('upgrade_reference_table_append'); @@ -164,7 +166,7 @@ WHERE logicalrelid = 'upgrade_reference_table_append'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - n | t | 33 | t + n | t | 1360002 | t (1 row) SELECT @@ -186,19 +188,20 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 33 | 1 | 2 | 0 + 1360002 | 1 | 2 | 0 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360009 | 1 | 8192 | localhost | 57637 | 379 - 1360009 | 1 | 0 | localhost | 57638 | 380 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360009 | 1 | 8192 | localhost | 57637 + 1360009 | 1 | 0 | localhost | 57638 (2 rows) -- test valid cases, shard exists at one worker @@ -218,7 +221,7 @@ WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 32 | s + h | f | 1360001 | s (1 row) SELECT @@ -240,18 +243,19 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 32 | 1 | 1 | 23 + 1360001 | 1 | 1 | 23 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360010 | 1 | 0 | localhost | 57637 | 381 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360010 | 1 | 0 | localhost | 57637 (1 row) SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker'); @@ -269,7 +273,7 @@ WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - n | t | 33 | t + n | t | 1360002 | t (1 row) SELECT @@ -291,19 +295,20 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 33 | 1 | 2 | 0 + 1360002 | 1 | 2 | 0 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360010 | 1 | 0 | localhost | 57637 | 381 - 1360010 | 1 | 0 | localhost | 57638 | 382 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360010 | 1 | 0 | localhost | 57637 + 1360010 | 1 | 0 | localhost | 57638 (2 rows) -- test valid cases, shard exists at both workers but one is unhealthy @@ -325,7 +330,7 @@ WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 34 | c + h | f | 1360003 | c (1 row) SELECT @@ -347,19 +352,20 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 34 | 1 | 2 | 23 + 1360003 | 1 | 2 | 23 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360011 | 1 | 0 | localhost | 57637 | 383 - 1360011 | 1 | 0 | localhost | 57638 | 384 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360011 | 1 | 0 | localhost | 57637 + 1360011 | 1 | 0 | localhost | 57638 (2 rows) SELECT upgrade_to_reference_table('upgrade_reference_table_one_unhealthy'); @@ -377,7 +383,7 @@ WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - n | t | 33 | t + n | t | 1360002 | t (1 row) SELECT @@ -399,19 +405,20 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 33 | 1 | 2 | 0 + 1360002 | 1 | 2 | 0 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360011 | 1 | 0 | localhost | 57637 | 383 - 1360011 | 1 | 0 | localhost | 57638 | 384 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360011 | 1 | 0 | localhost | 57637 + 1360011 | 1 | 0 | localhost | 57638 (2 rows) -- test valid cases, shard exists at both workers and both are healthy @@ -431,7 +438,7 @@ WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 35 | c + h | f | 1360004 | c (1 row) SELECT @@ -453,19 +460,20 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 35 | 1 | 2 | 23 + 1360004 | 1 | 2 | 23 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360012 | 1 | 0 | localhost | 57637 | 385 - 1360012 | 1 | 0 | localhost | 57638 | 386 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360012 | 1 | 0 | localhost | 57637 + 1360012 | 1 | 0 | localhost | 57638 (2 rows) SELECT upgrade_to_reference_table('upgrade_reference_table_both_healthy'); @@ -483,7 +491,7 @@ WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - n | t | 33 | t + n | t | 1360002 | t (1 row) SELECT @@ -505,19 +513,20 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 33 | 1 | 2 | 0 + 1360002 | 1 | 2 | 0 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360012 | 1 | 0 | localhost | 57637 | 385 - 1360012 | 1 | 0 | localhost | 57638 | 386 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360012 | 1 | 0 | localhost | 57637 + 1360012 | 1 | 0 | localhost | 57638 (2 rows) -- test valid cases, do it in transaction and ROLLBACK @@ -538,7 +547,7 @@ WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 32 | s + h | f | 1360001 | s (1 row) SELECT @@ -560,18 +569,19 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 32 | 1 | 1 | 23 + 1360001 | 1 | 1 | 23 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360013 | 1 | 0 | localhost | 57637 | 387 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360013 | 1 | 0 | localhost | 57637 (1 row) BEGIN; @@ -591,7 +601,7 @@ WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 32 | s + h | f | 1360001 | s (1 row) SELECT @@ -613,18 +623,19 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 32 | 1 | 1 | 23 + 1360001 | 1 | 1 | 23 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360013 | 1 | 0 | localhost | 57637 | 387 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360013 | 1 | 0 | localhost | 57637 (1 row) -- test valid cases, do it in transaction and COMMIT @@ -645,7 +656,7 @@ WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 32 | s + h | f | 1360001 | s (1 row) SELECT @@ -667,18 +678,19 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 32 | 1 | 1 | 23 + 1360001 | 1 | 1 | 23 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360014 | 1 | 0 | localhost | 57637 | 389 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360014 | 1 | 0 | localhost | 57637 (1 row) BEGIN; @@ -698,7 +710,7 @@ WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - n | t | 33 | t + n | t | 1360002 | t (1 row) SELECT @@ -720,19 +732,20 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 33 | 1 | 2 | 0 + 1360002 | 1 | 2 | 0 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360014 | 1 | 0 | localhost | 57637 | 389 - 1360014 | 1 | 0 | localhost | 57638 | 390 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360014 | 1 | 0 | localhost | 57637 + 1360014 | 1 | 0 | localhost | 57638 (2 rows) -- verify that shard is replicated to other worker diff --git a/src/test/regress/input/multi_outer_join_reference.source b/src/test/regress/input/multi_outer_join_reference.source index 40f1cf56d..be121413e 100644 --- a/src/test/regress/input/multi_outer_join_reference.source +++ b/src/test/regress/input/multi_outer_join_reference.source @@ -449,3 +449,9 @@ SELECT FROM multi_outer_join_right_reference FULL JOIN multi_outer_join_third_reference ON (t_custkey = r_custkey); + +-- DROP unused tables to clean up workspace +DROP TABLE multi_outer_join_left_hash; +DROP TABLE multi_outer_join_right_reference; +DROP TABLE multi_outer_join_third_reference; +DROP TABLE multi_outer_join_right_hash; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 3fb257d68..2043cdae8 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -157,6 +157,11 @@ test: multi_router_planner # ---------- test: multi_large_shardid +# ---------- +# multi_drop_extension makes sure we can safely drop and recreate the extension +# ---------- +test: multi_drop_extension + # ---------- # multi_metadata_sync tests the propagation of mx-related metadata changes to metadata workers # multi_unsupported_worker_operations tests that unsupported operations error out on metadata workers @@ -164,11 +169,6 @@ test: multi_large_shardid test: multi_metadata_sync test: multi_unsupported_worker_operations -# ---------- -# multi_drop_extension makes sure we can safely drop and recreate the extension -# ---------- -test: multi_drop_extension - # ---------- # multi_schema_support makes sure we can work with tables in schemas other than public with no problem # ---------- @@ -208,5 +208,7 @@ test: multi_foreign_key # ---------- # multi_upgrade_reference_table tests for upgrade_reference_table UDF +# multi_replicate_reference_table tests replicating reference tables to new nodes after we add new nodes # ---------- test: multi_upgrade_reference_table +test: multi_replicate_reference_table diff --git a/src/test/regress/output/multi_outer_join_reference.source b/src/test/regress/output/multi_outer_join_reference.source index a508aeb42..502d3cf2e 100644 --- a/src/test/regress/output/multi_outer_join_reference.source +++ b/src/test/regress/output/multi_outer_join_reference.source @@ -834,3 +834,8 @@ FROM 7 | (30 rows) +-- DROP unused tables to clean up workspace +DROP TABLE multi_outer_join_left_hash; +DROP TABLE multi_outer_join_right_reference; +DROP TABLE multi_outer_join_third_reference; +DROP TABLE multi_outer_join_right_hash; diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 336758f56..91867b426 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -219,11 +219,11 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition -- check effects of dropping tables DROP TABLE table1_groupA; -SELECT * FROM pg_dist_colocation WHERE colocationid = 1; +SELECT * FROM pg_dist_colocation WHERE colocationid = 4; -- dropping all tables in a colocation group also deletes the colocation group DROP TABLE table2_groupA; -SELECT * FROM pg_dist_colocation WHERE colocationid = 1; +SELECT * FROM pg_dist_colocation WHERE colocationid = 4; -- create dropped colocation group again SET citus.shard_count = 2; diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index 32af6ce74..eaea4ab66 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -986,5 +986,5 @@ ROLLBACK; -- clean up tables DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third, - reference_table_test_fourth, reference_table_ddl; + reference_table_test_fourth, reference_table_ddl, reference_table_composite; DROP SCHEMA reference_schema CASCADE; diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql new file mode 100644 index 000000000..6f793cac8 --- /dev/null +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -0,0 +1,387 @@ +-- +-- MULTI_REPLICATE_REFERENCE_TABLE +-- +-- Tests that check the metadata returned by the master node. + + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1370000; + + +-- remove a node for testing purposes +CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +SELECT master_remove_node('localhost', :worker_2_port); + + +-- test adding new node with no reference tables + +-- verify there is no node with nodeport = :worker_2_port before adding the node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT master_add_node('localhost', :worker_2_port); + +-- verify node is added +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +-- verify nothing is replicated to the new node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + + +-- test adding new node with a reference table which does not have any healthy placement +SELECT master_remove_node('localhost', :worker_2_port); + +-- verify there is no node with nodeport = :worker_2_port before adding the node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +CREATE TABLE replicate_reference_table_unhealthy(column1 int); +SELECT create_reference_table('replicate_reference_table_unhealthy'); +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000; + +SELECT master_add_node('localhost', :worker_2_port); + +-- verify node is not added +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +-- verify nothing is replicated to the new node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +DROP TABLE replicate_reference_table_unhealthy; + + +-- test replicating a reference table when a new node added +CREATE TABLE replicate_reference_table_valid(column1 int); +SELECT create_reference_table('replicate_reference_table_valid'); + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + +SELECT master_add_node('localhost', :worker_2_port); + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + + +-- test add same node twice + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + +SELECT master_add_node('localhost', :worker_2_port); + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + +DROP TABLE replicate_reference_table_valid; + + +-- test replicating a reference table when a new node added in TRANSACTION + ROLLBACK +SELECT master_remove_node('localhost', :worker_2_port); + +CREATE TABLE replicate_reference_table_rollback(column1 int); +SELECT create_reference_table('replicate_reference_table_rollback'); + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass); + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +ROLLBACK; + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass); + +DROP TABLE replicate_reference_table_rollback; + + +-- test replicating a reference table when a new node added in TRANSACTION + COMMIT +CREATE TABLE replicate_reference_table_commit(column1 int); +SELECT create_reference_table('replicate_reference_table_commit'); + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_commit'::regclass); + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +COMMIT; + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_commit'::regclass); + +DROP TABLE replicate_reference_table_commit; + + +-- test adding new node + upgrading another hash distributed table to reference table + creating new reference table in TRANSACTION +SELECT master_remove_node('localhost', :worker_2_port); + +CREATE TABLE replicate_reference_table_reference_one(column1 int); +SELECT create_reference_table('replicate_reference_table_reference_one'); + +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 1; +CREATE TABLE replicate_reference_table_hash(column1 int); +SELECT create_distributed_table('replicate_reference_table_hash', 'column1'); + +CREATE TABLE replicate_reference_table_reference_two(column1 int); + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass); + +SELECT + logicalrelid, partmethod, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +SELECT upgrade_to_reference_table('replicate_reference_table_hash'); +SELECT create_reference_table('replicate_reference_table_reference_two'); +COMMIT; + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass); + +SELECT + logicalrelid, partmethod, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); + +DROP TABLE replicate_reference_table_reference_one; +DROP TABLE replicate_reference_table_hash; +DROP TABLE replicate_reference_table_reference_two; + + +-- test inserting a value then adding a new node in a transaction +SELECT master_remove_node('localhost', :worker_2_port); + +CREATE TABLE replicate_reference_table_insert(column1 int); +SELECT create_reference_table('replicate_reference_table_insert'); + +BEGIN; +INSERT INTO replicate_reference_table_insert VALUES(1); +SELECT master_add_node('localhost', :worker_2_port); +ROLLBACK; + +DROP TABLE replicate_reference_table_insert; + + +-- test COPY then adding a new node in a transaction +CREATE TABLE replicate_reference_table_copy(column1 int); +SELECT create_reference_table('replicate_reference_table_copy'); + +BEGIN; +COPY replicate_reference_table_copy FROM STDIN; +1 +2 +3 +4 +5 +\. +SELECT master_add_node('localhost', :worker_2_port); +ROLLBACK; + +DROP TABLE replicate_reference_table_copy; + + +-- test executing DDL command then adding a new node in a transaction +CREATE TABLE replicate_reference_table_ddl(column1 int); +SELECT create_reference_table('replicate_reference_table_ddl'); + +BEGIN; +ALTER TABLE replicate_reference_table_ddl ADD column2 int; +SELECT master_add_node('localhost', :worker_2_port); +ROLLBACK; + +DROP TABLE replicate_reference_table_ddl; + + +-- test DROP table after adding new node in a transaction +CREATE TABLE replicate_reference_table_drop(column1 int); +SELECT create_reference_table('replicate_reference_table_drop'); + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +DROP TABLE replicate_reference_table_drop; +ROLLBACK; + +DROP TABLE replicate_reference_table_drop; + +-- test adding a node while there is a reference table at another schema +CREATE SCHEMA replicate_reference_table_schema; +CREATE TABLE replicate_reference_table_schema.table1(column1 int); +SELECT create_reference_table('replicate_reference_table_schema.table1'); + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass); + +SELECT master_add_node('localhost', :worker_2_port); + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass); + +DROP TABLE replicate_reference_table_schema.table1; +DROP SCHEMA replicate_reference_table_schema CASCADE; + + +-- reload pg_dist_shard_placement table +INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); +DROP TABLE tmp_shard_placement; diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index 9b30428d4..1ab19c3e2 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -94,6 +94,7 @@ SELECT master_apply_delete_command('DELETE FROM mx_table'); SELECT count(*) FROM mx_table; -- master_add_node + SELECT master_add_node('localhost', 5432); SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; diff --git a/src/test/regress/sql/multi_upgrade_reference_table.sql b/src/test/regress/sql/multi_upgrade_reference_table.sql index acfec782e..0bec3ade7 100644 --- a/src/test/regress/sql/multi_upgrade_reference_table.sql +++ b/src/test/regress/sql/multi_upgrade_reference_table.sql @@ -6,6 +6,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1360000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1360000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1360000; -- test with not distributed table CREATE TABLE upgrade_reference_table_local(column1 int); @@ -91,7 +92,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -122,7 +124,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -155,7 +158,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -186,7 +190,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -221,7 +226,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -252,7 +258,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -285,7 +292,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -316,7 +324,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -350,7 +359,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -383,7 +393,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -417,7 +428,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -450,7 +462,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid