diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index f44e03a4d..74c9f39d5 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -31,6 +31,7 @@ #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" #include "distributed/multi_logical_optimizer.h" +#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard_placement.h" @@ -867,6 +868,63 @@ UpdateShardPlacementState(uint64 placementId, char shardState) } +/* + * UpdateColocationGroupReplicationFactor finds colocation group record for given + * colocationId and updates its replication factor to given replicationFactor value. + * Since we do not cache pg_dist_colocation table, we do not need to invalidate the + * cache after updating replication factor. + */ +void +UpdateColocationGroupReplicationFactor(uint32 colocationId, int replicationFactor) +{ + Relation pgDistColocation = NULL; + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + bool indexOK = true; + HeapTuple heapTuple = NULL; + TupleDesc tupleDescriptor = NULL; + + Datum values[Natts_pg_dist_colocation]; + bool isnull[Natts_pg_dist_colocation]; + bool replace[Natts_pg_dist_colocation]; + + /* we first search for colocation group by its colocation id */ + pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); + tupleDescriptor = RelationGetDescr(pgDistColocation); + ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_colocationid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(colocationId)); + + scanDescriptor = systable_beginscan(pgDistColocation, + DistColocationColocationidIndexId(), indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("could not find valid entry for colocation group " + "%d", colocationId))); + } + + /* after we find colocation group, we update it with new values */ + memset(replace, 0, sizeof(replace)); + + values[Anum_pg_dist_colocation_replicationfactor - 1] = Int32GetDatum( + replicationFactor); + isnull[Anum_pg_dist_colocation_replicationfactor - 1] = false; + replace[Anum_pg_dist_colocation_replicationfactor - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + simple_heap_update(pgDistColocation, &heapTuple->t_self, heapTuple); + + CatalogUpdateIndexes(pgDistColocation, heapTuple); + + systable_endscan(scanDescriptor); + heap_close(pgDistColocation, NoLock); +} + + /* * Check that the current user has `mode` permissions on relationId, error out * if not. Superusers always have such permissions. diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index bb1bd75f9..55b5329c1 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -58,6 +58,7 @@ static Oid distNodeRelationId = InvalidOid; static Oid distLocalGroupRelationId = InvalidOid; static Oid distColocationRelationId = InvalidOid; static Oid distColocationConfigurationIndexId = InvalidOid; +static Oid distColocationColocationidIndexId = InvalidOid; static Oid distPartitionRelationId = InvalidOid; static Oid distPartitionLogicalRelidIndexId = InvalidOid; static Oid distPartitionColocationidIndexId = InvalidOid; @@ -105,7 +106,6 @@ static uint32 WorkerNodeHashCode(const void *key, Size keySize); static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); -static List * DistTableOidList(void); static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId); static List * LookupDistShardTuples(Oid relationId); @@ -762,6 +762,17 @@ DistColocationConfigurationIndexId(void) } +/* return oid of pg_dist_colocation_pkey index */ +Oid +DistColocationColocationidIndexId(void) +{ + CachedRelationLookup("pg_dist_colocation_pkey", + &distColocationColocationidIndexId); + + return distColocationColocationidIndexId; +} + + /* return oid of pg_dist_partition relation */ Oid DistPartitionRelationId(void) @@ -1565,6 +1576,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) distNodeRelationId = InvalidOid; distColocationRelationId = InvalidOid; distColocationConfigurationIndexId = InvalidOid; + distColocationColocationidIndexId = InvalidOid; distPartitionRelationId = InvalidOid; distPartitionLogicalRelidIndexId = InvalidOid; distPartitionColocationidIndexId = InvalidOid; @@ -1583,7 +1595,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) * DistTableOidList iterates over the pg_dist_partition table and returns * a list that consists of the logicalrelids. */ -static List * +List * DistTableOidList(void) { SysScanDesc scanDescriptor = NULL; diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 01911ed0b..ac8254955 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -30,6 +30,7 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_join_order.h" #include "distributed/pg_dist_node.h" +#include "distributed/reference_table_utils.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" @@ -48,7 +49,7 @@ int GroupSize = 1; /* local function forward declarations */ static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, - char *nodeRack, bool hasMetadata); + char *nodeRack, bool hasMetadata, bool *nodeAlreadyExists); static Datum GenerateNodeTuple(WorkerNode *workerNode); static int32 GetNextGroupId(void); static uint32 GetMaxGroupId(void); @@ -67,7 +68,8 @@ PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column); /* - * master_add_node function adds a new node to the cluster and returns its data. + * master_add_node function adds a new node to the cluster and returns its data. It also + * replicates all reference tables to the new node. */ Datum master_add_node(PG_FUNCTION_ARGS) @@ -78,9 +80,21 @@ master_add_node(PG_FUNCTION_ARGS) int32 groupId = 0; char *nodeRack = WORKER_DEFAULT_RACK; bool hasMetadata = false; + bool nodeAlreadyExists = false; Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, - hasMetadata); + hasMetadata, &nodeAlreadyExists); + + /* + * After adding new node, if the node is not already exist, we replicate all existing + * reference tables to the new node. ReplicateAllReferenceTablesToAllNodes replicates + * reference tables to all nodes however, it skips nodes which already has healthy + * placement of particular reference table. + */ + if (!nodeAlreadyExists) + { + ReplicateAllReferenceTablesToAllNodes(); + } PG_RETURN_CSTRING(returnData); } @@ -137,13 +151,14 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) { ListCell *workerNodeCell = NULL; List *workerNodes = ParseWorkerNodeFileAndRename(); + bool nodeAlreadyExists = false; foreach(workerNodeCell, workerNodes) { WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0, - workerNode->workerRack, false); + workerNode->workerRack, false, &nodeAlreadyExists); } PG_RETURN_BOOL(true); @@ -336,7 +351,7 @@ ReadWorkerNodes() */ static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, - bool hasMetadata) + bool hasMetadata, bool *nodeAlreadyExists) { Relation pgDistNode = NULL; int nextNodeIdInt = 0; @@ -349,6 +364,8 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, EnsureSchemaNode(); EnsureSuperUser(); + *nodeAlreadyExists = false; + /* acquire a lock so that no one can do this concurrently */ pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); @@ -362,6 +379,8 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, /* close the heap */ heap_close(pgDistNode, AccessExclusiveLock); + *nodeAlreadyExists = true; + PG_RETURN_DATUM(returnData); } diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 5f80a1bfb..905d7b8d2 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -13,9 +13,12 @@ #include "miscadmin.h" #include "access/heapam.h" +#include "access/htup_details.h" +#include "access/genam.h" #include "distributed/colocation_utils.h" #include "distributed/listutils.h" #include "distributed/master_protocol.h" +#include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_logical_planner.h" #include "distributed/reference_table_utils.h" @@ -23,14 +26,16 @@ #include "distributed/shardinterval_utils.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/rel.h" /* local function forward declarations */ static void ReplicateSingleShardTableToAllWorkers(Oid relationId); static void ReplicateShardToAllWorkers(ShardInterval *shardInterval); static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId); - +static List * ReferenceTableOidList(void); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(upgrade_to_reference_table); @@ -93,6 +98,66 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) } +/* + * ReplicateAllReferenceTablesToAllNodes function finds all reference tables and + * replicates them to all worker nodes. It also modifies pg_dist_colocation table to + * update the replication factor column. This function skips a worker node if that node + * already has healthy placement of a particular reference table to prevent unnecessary + * data transfer. + */ +void +ReplicateAllReferenceTablesToAllNodes() +{ + List *referenceTableList = ReferenceTableOidList(); + ListCell *referenceTableCell = NULL; + + Relation pgDistNode = NULL; + List *workerNodeList = NIL; + int workerCount = 0; + + Oid firstReferenceTableId = InvalidOid; + uint32 referenceTableColocationId = INVALID_COLOCATION_ID; + + /* if there is no reference table, we do not need to do anything */ + if (list_length(referenceTableList) == 0) + { + return; + } + + /* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */ + pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); + workerNodeList = WorkerNodeList(); + workerCount = list_length(workerNodeList); + + foreach(referenceTableCell, referenceTableList) + { + Oid referenceTableId = lfirst_oid(referenceTableCell); + List *shardIntervalList = LoadShardIntervalList(referenceTableId); + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + uint64 shardId = shardInterval->shardId; + char *relationName = get_rel_name(referenceTableId); + + LockShardDistributionMetadata(shardId, ExclusiveLock); + + ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to all workers", + relationName))); + + ReplicateShardToAllWorkers(shardInterval); + } + + /* + * After replicating reference tables, we will update replication factor column for + * colocation group of reference tables so that worker count will be equal to + * replication factor again. + */ + firstReferenceTableId = linitial_oid(referenceTableList); + referenceTableColocationId = TableColocationId(firstReferenceTableId); + UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount); + + heap_close(pgDistNode, NoLock); +} + + /* * ReplicateSingleShardTableToAllWorkers accepts a broadcast table and replicates it to * all worker nodes. It assumes that caller of this function ensures that given broadcast @@ -176,6 +241,7 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval) ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, nodeName, nodePort, missingWorkerOk); + if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) { SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, @@ -250,3 +316,35 @@ CreateReferenceTableColocationId() return colocationId; } + + +/* + * ReferenceTableOidList function scans pg_dist_partition to create a list of all + * reference tables. To create the list, it performs sequential scan. Since it is not + * expected that this function will be called frequently, it is OK not to use index scan. + * If this function becomes performance bottleneck, it is possible to modify this function + * to perform index scan. + */ +static List * +ReferenceTableOidList() +{ + List *distTableOidList = DistTableOidList(); + ListCell *distTableOidCell = NULL; + + List *referenceTableList = NIL; + + foreach(distTableOidCell, distTableOidList) + { + DistTableCacheEntry *cacheEntry = NULL; + Oid relationId = lfirst_oid(distTableOidCell); + + cacheEntry = DistributedTableCacheEntry(relationId); + + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) + { + referenceTableList = lappend_oid(referenceTableList, relationId); + } + } + + return referenceTableList; +} diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 4e3298c05..a6e6c9070 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -85,6 +85,8 @@ extern void DeleteShardRow(uint64 shardId); extern void UpdateShardPlacementState(uint64 placementId, char shardState); extern uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort); +extern void UpdateColocationGroupReplicationFactor(uint32 colocationId, + int replicationFactor); extern void CreateTruncateTrigger(Oid relationId); /* Remaining metadata utility functions */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 4b8753694..38fbb9cb1 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -57,6 +57,7 @@ extern List * DistributedTableList(void); extern ShardInterval * LoadShardInterval(uint64 shardId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern int GetLocalGroupId(void); +extern List * DistTableOidList(void); extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByShardId(int64 shardId); @@ -68,6 +69,7 @@ extern HTAB * GetWorkerNodeHash(void); /* relation oids */ extern Oid DistColocationRelationId(void); extern Oid DistColocationConfigurationIndexId(void); +extern Oid DistColocationColocationidIndexId(void); extern Oid DistPartitionRelationId(void); extern Oid DistShardRelationId(void); extern Oid DistShardPlacementRelationId(void); diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index 0ad98bbd9..cde2664ca 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -13,5 +13,6 @@ #define REFERENCE_TABLE_UTILS_H_ extern uint32 CreateReferenceTableColocationId(void); +extern void ReplicateAllReferenceTablesToAllNodes(void); #endif /* REFERENCE_TABLE_UTILS_H_ */ diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out new file mode 100644 index 000000000..82ec296ee --- /dev/null +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -0,0 +1,598 @@ +-- +-- MULTI_REPLICATE_REFERENCE_TABLE +-- +-- Tests that check the metadata returned by the master node. +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1370000; +-- remove a node for testing purposes +CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +-- test adding new node with no reference tables +-- verify there is no node with nodeport = :worker_2_port before adding the node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +--------------------------------- + (4,4,localhost,57638,default,f) +(1 row) + +-- verify node is added +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +-- verify nothing is replicated to the new node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +-- test adding new node with a reference table which does not have any healthy placement +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +-- verify there is no node with nodeport = :worker_2_port before adding the node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +CREATE TABLE replicate_reference_table_unhealthy(column1 int); +SELECT create_reference_table('replicate_reference_table_unhealthy'); + create_reference_table +------------------------ + +(1 row) + +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_unhealthy" to all workers +ERROR: could not find any healthy placement for shard 1370000 +-- verify node is not added +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +-- verify nothing is replicated to the new node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +DROP TABLE replicate_reference_table_unhealthy; +-- test replicating a reference table when a new node added +CREATE TABLE replicate_reference_table_valid(column1 int); +SELECT create_reference_table('replicate_reference_table_valid'); + create_reference_table +------------------------ + +(1 row) + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370001 | 1 | 1 | 0 +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_valid" to all workers + master_add_node +--------------------------------- + (6,6,localhost,57638,default,f) +(1 row) + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370001 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370001 | 1 | 2 | 0 +(1 row) + +-- test add same node twice +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370001 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370001 | 1 | 2 | 0 +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +--------------------------------- + (6,6,localhost,57638,default,f) +(1 row) + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370001 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370001 | 1 | 2 | 0 +(1 row) + +DROP TABLE replicate_reference_table_valid; +-- test replicating a reference table when a new node added in TRANSACTION + ROLLBACK +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +CREATE TABLE replicate_reference_table_rollback(column1 int); +SELECT create_reference_table('replicate_reference_table_rollback'); + create_reference_table +------------------------ + +(1 row) + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370002 | 1 | 1 | 0 +(1 row) + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_rollback" to all workers + master_add_node +--------------------------------- + (7,7,localhost,57638,default,f) +(1 row) + +ROLLBACK; +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370002 | 1 | 1 | 0 +(1 row) + +DROP TABLE replicate_reference_table_rollback; +-- test replicating a reference table when a new node added in TRANSACTION + COMMIT +CREATE TABLE replicate_reference_table_commit(column1 int); +SELECT create_reference_table('replicate_reference_table_commit'); + create_reference_table +------------------------ + +(1 row) + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_commit'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370003 | 1 | 1 | 0 +(1 row) + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_commit" to all workers + master_add_node +--------------------------------- + (8,8,localhost,57638,default,f) +(1 row) + +COMMIT; +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370003 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_commit'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370003 | 1 | 2 | 0 +(1 row) + +DROP TABLE replicate_reference_table_commit; +-- test adding new node + upgrading another hash distributed table to reference table + creating new reference table in TRANSACTION +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +CREATE TABLE replicate_reference_table_reference_one(column1 int); +SELECT create_reference_table('replicate_reference_table_reference_one'); + create_reference_table +------------------------ + +(1 row) + +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 1; +CREATE TABLE replicate_reference_table_hash(column1 int); +SELECT create_distributed_table('replicate_reference_table_hash', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE replicate_reference_table_reference_two(column1 int); +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370004 | 1 | 1 | 0 +(1 row) + +SELECT + logicalrelid, partmethod, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); + logicalrelid | partmethod | colocationid | repmodel +-----------------------------------------+------------+--------------+---------- + replicate_reference_table_reference_one | n | 1370004 | t + replicate_reference_table_hash | h | 1370005 | s +(2 rows) + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_reference_one" to all workers + master_add_node +--------------------------------- + (9,9,localhost,57638,default,f) +(1 row) + +SELECT upgrade_to_reference_table('replicate_reference_table_hash'); + upgrade_to_reference_table +---------------------------- + +(1 row) + +SELECT create_reference_table('replicate_reference_table_reference_two'); + create_reference_table +------------------------ + +(1 row) + +COMMIT; +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370004 | 1 | 0 | localhost | 57638 + 1370005 | 1 | 0 | localhost | 57638 + 1370006 | 1 | 0 | localhost | 57638 +(3 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370004 | 1 | 2 | 0 +(1 row) + +SELECT + logicalrelid, partmethod, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); + logicalrelid | partmethod | colocationid | repmodel +-----------------------------------------+------------+--------------+---------- + replicate_reference_table_reference_one | n | 1370004 | t + replicate_reference_table_hash | n | 1370004 | t + replicate_reference_table_reference_two | n | 1370004 | t +(3 rows) + +DROP TABLE replicate_reference_table_reference_one; +DROP TABLE replicate_reference_table_hash; +DROP TABLE replicate_reference_table_reference_two; +-- test inserting a value then adding a new node in a transaction +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +CREATE TABLE replicate_reference_table_insert(column1 int); +SELECT create_reference_table('replicate_reference_table_insert'); + create_reference_table +------------------------ + +(1 row) + +BEGIN; +INSERT INTO replicate_reference_table_insert VALUES(1); +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_insert" to all workers +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +DROP TABLE replicate_reference_table_insert; +-- test COPY then adding a new node in a transaction +CREATE TABLE replicate_reference_table_copy(column1 int); +SELECT create_reference_table('replicate_reference_table_copy'); + create_reference_table +------------------------ + +(1 row) + +BEGIN; +COPY replicate_reference_table_copy FROM STDIN; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_copy" to all workers +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +DROP TABLE replicate_reference_table_copy; +-- test executing DDL command then adding a new node in a transaction +CREATE TABLE replicate_reference_table_ddl(column1 int); +SELECT create_reference_table('replicate_reference_table_ddl'); + create_reference_table +------------------------ + +(1 row) + +BEGIN; +ALTER TABLE replicate_reference_table_ddl ADD column2 int; +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_ddl" to all workers +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +DROP TABLE replicate_reference_table_ddl; +-- test DROP table after adding new node in a transaction +CREATE TABLE replicate_reference_table_drop(column1 int); +SELECT create_reference_table('replicate_reference_table_drop'); + create_reference_table +------------------------ + +(1 row) + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_drop" to all workers + master_add_node +----------------------------------- + (13,13,localhost,57638,default,f) +(1 row) + +DROP TABLE replicate_reference_table_drop; +ERROR: DROP distributed table cannot run inside a transaction block +CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" +PL/pgSQL function citus_drop_trigger() line 21 at PERFORM +ROLLBACK; +DROP TABLE replicate_reference_table_drop; +-- test adding a node while there is a reference table at another schema +CREATE SCHEMA replicate_reference_table_schema; +CREATE TABLE replicate_reference_table_schema.table1(column1 int); +SELECT create_reference_table('replicate_reference_table_schema.table1'); + create_reference_table +------------------------ + +(1 row) + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370010 | 1 | 1 | 0 +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "table1" to all workers + master_add_node +----------------------------------- + (14,14,localhost,57638,default,f) +(1 row) + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370011 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370010 | 1 | 2 | 0 +(1 row) + +DROP TABLE replicate_reference_table_schema.table1; +DROP SCHEMA replicate_reference_table_schema CASCADE; +-- reload pg_dist_shard_placement table +INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); +DROP TABLE tmp_shard_placement; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 79a6dc0e1..2043cdae8 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -208,5 +208,7 @@ test: multi_foreign_key # ---------- # multi_upgrade_reference_table tests for upgrade_reference_table UDF +# multi_replicate_reference_table tests replicating reference tables to new nodes after we add new nodes # ---------- test: multi_upgrade_reference_table +test: multi_replicate_reference_table diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql new file mode 100644 index 000000000..6f793cac8 --- /dev/null +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -0,0 +1,387 @@ +-- +-- MULTI_REPLICATE_REFERENCE_TABLE +-- +-- Tests that check the metadata returned by the master node. + + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1370000; + + +-- remove a node for testing purposes +CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +SELECT master_remove_node('localhost', :worker_2_port); + + +-- test adding new node with no reference tables + +-- verify there is no node with nodeport = :worker_2_port before adding the node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT master_add_node('localhost', :worker_2_port); + +-- verify node is added +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +-- verify nothing is replicated to the new node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + + +-- test adding new node with a reference table which does not have any healthy placement +SELECT master_remove_node('localhost', :worker_2_port); + +-- verify there is no node with nodeport = :worker_2_port before adding the node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +CREATE TABLE replicate_reference_table_unhealthy(column1 int); +SELECT create_reference_table('replicate_reference_table_unhealthy'); +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000; + +SELECT master_add_node('localhost', :worker_2_port); + +-- verify node is not added +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +-- verify nothing is replicated to the new node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +DROP TABLE replicate_reference_table_unhealthy; + + +-- test replicating a reference table when a new node added +CREATE TABLE replicate_reference_table_valid(column1 int); +SELECT create_reference_table('replicate_reference_table_valid'); + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + +SELECT master_add_node('localhost', :worker_2_port); + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + + +-- test add same node twice + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + +SELECT master_add_node('localhost', :worker_2_port); + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + +DROP TABLE replicate_reference_table_valid; + + +-- test replicating a reference table when a new node added in TRANSACTION + ROLLBACK +SELECT master_remove_node('localhost', :worker_2_port); + +CREATE TABLE replicate_reference_table_rollback(column1 int); +SELECT create_reference_table('replicate_reference_table_rollback'); + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass); + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +ROLLBACK; + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass); + +DROP TABLE replicate_reference_table_rollback; + + +-- test replicating a reference table when a new node added in TRANSACTION + COMMIT +CREATE TABLE replicate_reference_table_commit(column1 int); +SELECT create_reference_table('replicate_reference_table_commit'); + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_commit'::regclass); + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +COMMIT; + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_commit'::regclass); + +DROP TABLE replicate_reference_table_commit; + + +-- test adding new node + upgrading another hash distributed table to reference table + creating new reference table in TRANSACTION +SELECT master_remove_node('localhost', :worker_2_port); + +CREATE TABLE replicate_reference_table_reference_one(column1 int); +SELECT create_reference_table('replicate_reference_table_reference_one'); + +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 1; +CREATE TABLE replicate_reference_table_hash(column1 int); +SELECT create_distributed_table('replicate_reference_table_hash', 'column1'); + +CREATE TABLE replicate_reference_table_reference_two(column1 int); + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass); + +SELECT + logicalrelid, partmethod, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +SELECT upgrade_to_reference_table('replicate_reference_table_hash'); +SELECT create_reference_table('replicate_reference_table_reference_two'); +COMMIT; + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass); + +SELECT + logicalrelid, partmethod, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); + +DROP TABLE replicate_reference_table_reference_one; +DROP TABLE replicate_reference_table_hash; +DROP TABLE replicate_reference_table_reference_two; + + +-- test inserting a value then adding a new node in a transaction +SELECT master_remove_node('localhost', :worker_2_port); + +CREATE TABLE replicate_reference_table_insert(column1 int); +SELECT create_reference_table('replicate_reference_table_insert'); + +BEGIN; +INSERT INTO replicate_reference_table_insert VALUES(1); +SELECT master_add_node('localhost', :worker_2_port); +ROLLBACK; + +DROP TABLE replicate_reference_table_insert; + + +-- test COPY then adding a new node in a transaction +CREATE TABLE replicate_reference_table_copy(column1 int); +SELECT create_reference_table('replicate_reference_table_copy'); + +BEGIN; +COPY replicate_reference_table_copy FROM STDIN; +1 +2 +3 +4 +5 +\. +SELECT master_add_node('localhost', :worker_2_port); +ROLLBACK; + +DROP TABLE replicate_reference_table_copy; + + +-- test executing DDL command then adding a new node in a transaction +CREATE TABLE replicate_reference_table_ddl(column1 int); +SELECT create_reference_table('replicate_reference_table_ddl'); + +BEGIN; +ALTER TABLE replicate_reference_table_ddl ADD column2 int; +SELECT master_add_node('localhost', :worker_2_port); +ROLLBACK; + +DROP TABLE replicate_reference_table_ddl; + + +-- test DROP table after adding new node in a transaction +CREATE TABLE replicate_reference_table_drop(column1 int); +SELECT create_reference_table('replicate_reference_table_drop'); + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +DROP TABLE replicate_reference_table_drop; +ROLLBACK; + +DROP TABLE replicate_reference_table_drop; + +-- test adding a node while there is a reference table at another schema +CREATE SCHEMA replicate_reference_table_schema; +CREATE TABLE replicate_reference_table_schema.table1(column1 int); +SELECT create_reference_table('replicate_reference_table_schema.table1'); + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass); + +SELECT master_add_node('localhost', :worker_2_port); + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass); + +DROP TABLE replicate_reference_table_schema.table1; +DROP SCHEMA replicate_reference_table_schema CASCADE; + + +-- reload pg_dist_shard_placement table +INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); +DROP TABLE tmp_shard_placement;