Replicate reference tables when new node is added

With this change, we start to replicate all reference tables to the new node when new node
is added to the cluster with master_add_node command. We also update replication factor
of reference table's colocation group.
pull/1091/head
Burak Yucesoy 2017-01-05 12:29:32 +03:00
parent 1d18950860
commit 9c9f479e4b
10 changed files with 1187 additions and 8 deletions

View File

@ -31,6 +31,7 @@
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_shard_placement.h" #include "distributed/pg_dist_shard_placement.h"
@ -867,6 +868,63 @@ UpdateShardPlacementState(uint64 placementId, char shardState)
} }
/*
* UpdateColocationGroupReplicationFactor finds colocation group record for given
* colocationId and updates its replication factor to given replicationFactor value.
* Since we do not cache pg_dist_colocation table, we do not need to invalidate the
* cache after updating replication factor.
*/
void
UpdateColocationGroupReplicationFactor(uint32 colocationId, int replicationFactor)
{
Relation pgDistColocation = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
HeapTuple heapTuple = NULL;
TupleDesc tupleDescriptor = NULL;
Datum values[Natts_pg_dist_colocation];
bool isnull[Natts_pg_dist_colocation];
bool replace[Natts_pg_dist_colocation];
/* we first search for colocation group by its colocation id */
pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistColocation);
ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_colocationid, BTEqualStrategyNumber,
F_OIDEQ, ObjectIdGetDatum(colocationId));
scanDescriptor = systable_beginscan(pgDistColocation,
DistColocationColocationidIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("could not find valid entry for colocation group "
"%d", colocationId)));
}
/* after we find colocation group, we update it with new values */
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_colocation_replicationfactor - 1] = Int32GetDatum(
replicationFactor);
isnull[Anum_pg_dist_colocation_replicationfactor - 1] = false;
replace[Anum_pg_dist_colocation_replicationfactor - 1] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
simple_heap_update(pgDistColocation, &heapTuple->t_self, heapTuple);
CatalogUpdateIndexes(pgDistColocation, heapTuple);
systable_endscan(scanDescriptor);
heap_close(pgDistColocation, NoLock);
}
/* /*
* Check that the current user has `mode` permissions on relationId, error out * Check that the current user has `mode` permissions on relationId, error out
* if not. Superusers always have such permissions. * if not. Superusers always have such permissions.

View File

@ -58,6 +58,7 @@ static Oid distNodeRelationId = InvalidOid;
static Oid distLocalGroupRelationId = InvalidOid; static Oid distLocalGroupRelationId = InvalidOid;
static Oid distColocationRelationId = InvalidOid; static Oid distColocationRelationId = InvalidOid;
static Oid distColocationConfigurationIndexId = InvalidOid; static Oid distColocationConfigurationIndexId = InvalidOid;
static Oid distColocationColocationidIndexId = InvalidOid;
static Oid distPartitionRelationId = InvalidOid; static Oid distPartitionRelationId = InvalidOid;
static Oid distPartitionLogicalRelidIndexId = InvalidOid; static Oid distPartitionLogicalRelidIndexId = InvalidOid;
static Oid distPartitionColocationidIndexId = InvalidOid; static Oid distPartitionColocationidIndexId = InvalidOid;
@ -105,7 +106,6 @@ static uint32 WorkerNodeHashCode(const void *key, Size keySize);
static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId);
static List * DistTableOidList(void);
static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId);
static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId);
static List * LookupDistShardTuples(Oid relationId); static List * LookupDistShardTuples(Oid relationId);
@ -762,6 +762,17 @@ DistColocationConfigurationIndexId(void)
} }
/* return oid of pg_dist_colocation_pkey index */
Oid
DistColocationColocationidIndexId(void)
{
CachedRelationLookup("pg_dist_colocation_pkey",
&distColocationColocationidIndexId);
return distColocationColocationidIndexId;
}
/* return oid of pg_dist_partition relation */ /* return oid of pg_dist_partition relation */
Oid Oid
DistPartitionRelationId(void) DistPartitionRelationId(void)
@ -1565,6 +1576,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
distNodeRelationId = InvalidOid; distNodeRelationId = InvalidOid;
distColocationRelationId = InvalidOid; distColocationRelationId = InvalidOid;
distColocationConfigurationIndexId = InvalidOid; distColocationConfigurationIndexId = InvalidOid;
distColocationColocationidIndexId = InvalidOid;
distPartitionRelationId = InvalidOid; distPartitionRelationId = InvalidOid;
distPartitionLogicalRelidIndexId = InvalidOid; distPartitionLogicalRelidIndexId = InvalidOid;
distPartitionColocationidIndexId = InvalidOid; distPartitionColocationidIndexId = InvalidOid;
@ -1583,7 +1595,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
* DistTableOidList iterates over the pg_dist_partition table and returns * DistTableOidList iterates over the pg_dist_partition table and returns
* a list that consists of the logicalrelids. * a list that consists of the logicalrelids.
*/ */
static List * List *
DistTableOidList(void) DistTableOidList(void)
{ {
SysScanDesc scanDescriptor = NULL; SysScanDesc scanDescriptor = NULL;

View File

@ -30,6 +30,7 @@
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/pg_dist_node.h" #include "distributed/pg_dist_node.h"
#include "distributed/reference_table_utils.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
@ -48,7 +49,7 @@ int GroupSize = 1;
/* local function forward declarations */ /* local function forward declarations */
static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId,
char *nodeRack, bool hasMetadata); char *nodeRack, bool hasMetadata, bool *nodeAlreadyExists);
static Datum GenerateNodeTuple(WorkerNode *workerNode); static Datum GenerateNodeTuple(WorkerNode *workerNode);
static int32 GetNextGroupId(void); static int32 GetNextGroupId(void);
static uint32 GetMaxGroupId(void); static uint32 GetMaxGroupId(void);
@ -67,7 +68,8 @@ PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
/* /*
* master_add_node function adds a new node to the cluster and returns its data. * master_add_node function adds a new node to the cluster and returns its data. It also
* replicates all reference tables to the new node.
*/ */
Datum Datum
master_add_node(PG_FUNCTION_ARGS) master_add_node(PG_FUNCTION_ARGS)
@ -78,9 +80,21 @@ master_add_node(PG_FUNCTION_ARGS)
int32 groupId = 0; int32 groupId = 0;
char *nodeRack = WORKER_DEFAULT_RACK; char *nodeRack = WORKER_DEFAULT_RACK;
bool hasMetadata = false; bool hasMetadata = false;
bool nodeAlreadyExists = false;
Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
hasMetadata); hasMetadata, &nodeAlreadyExists);
/*
* After adding new node, if the node is not already exist, we replicate all existing
* reference tables to the new node. ReplicateAllReferenceTablesToAllNodes replicates
* reference tables to all nodes however, it skips nodes which already has healthy
* placement of particular reference table.
*/
if (!nodeAlreadyExists)
{
ReplicateAllReferenceTablesToAllNodes();
}
PG_RETURN_CSTRING(returnData); PG_RETURN_CSTRING(returnData);
} }
@ -137,13 +151,14 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
{ {
ListCell *workerNodeCell = NULL; ListCell *workerNodeCell = NULL;
List *workerNodes = ParseWorkerNodeFileAndRename(); List *workerNodes = ParseWorkerNodeFileAndRename();
bool nodeAlreadyExists = false;
foreach(workerNodeCell, workerNodes) foreach(workerNodeCell, workerNodes)
{ {
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0, AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0,
workerNode->workerRack, false); workerNode->workerRack, false, &nodeAlreadyExists);
} }
PG_RETURN_BOOL(true); PG_RETURN_BOOL(true);
@ -336,7 +351,7 @@ ReadWorkerNodes()
*/ */
static Datum static Datum
AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
bool hasMetadata) bool hasMetadata, bool *nodeAlreadyExists)
{ {
Relation pgDistNode = NULL; Relation pgDistNode = NULL;
int nextNodeIdInt = 0; int nextNodeIdInt = 0;
@ -349,6 +364,8 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
EnsureSchemaNode(); EnsureSchemaNode();
EnsureSuperUser(); EnsureSuperUser();
*nodeAlreadyExists = false;
/* acquire a lock so that no one can do this concurrently */ /* acquire a lock so that no one can do this concurrently */
pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
@ -362,6 +379,8 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
/* close the heap */ /* close the heap */
heap_close(pgDistNode, AccessExclusiveLock); heap_close(pgDistNode, AccessExclusiveLock);
*nodeAlreadyExists = true;
PG_RETURN_DATUM(returnData); PG_RETURN_DATUM(returnData);
} }

View File

@ -13,9 +13,12 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "access/heapam.h" #include "access/heapam.h"
#include "access/htup_details.h"
#include "access/genam.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
@ -23,14 +26,16 @@
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/rel.h"
/* local function forward declarations */ /* local function forward declarations */
static void ReplicateSingleShardTableToAllWorkers(Oid relationId); static void ReplicateSingleShardTableToAllWorkers(Oid relationId);
static void ReplicateShardToAllWorkers(ShardInterval *shardInterval); static void ReplicateShardToAllWorkers(ShardInterval *shardInterval);
static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId); static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId);
static List * ReferenceTableOidList(void);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(upgrade_to_reference_table); PG_FUNCTION_INFO_V1(upgrade_to_reference_table);
@ -93,6 +98,66 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
} }
/*
* ReplicateAllReferenceTablesToAllNodes function finds all reference tables and
* replicates them to all worker nodes. It also modifies pg_dist_colocation table to
* update the replication factor column. This function skips a worker node if that node
* already has healthy placement of a particular reference table to prevent unnecessary
* data transfer.
*/
void
ReplicateAllReferenceTablesToAllNodes()
{
List *referenceTableList = ReferenceTableOidList();
ListCell *referenceTableCell = NULL;
Relation pgDistNode = NULL;
List *workerNodeList = NIL;
int workerCount = 0;
Oid firstReferenceTableId = InvalidOid;
uint32 referenceTableColocationId = INVALID_COLOCATION_ID;
/* if there is no reference table, we do not need to do anything */
if (list_length(referenceTableList) == 0)
{
return;
}
/* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
workerNodeList = WorkerNodeList();
workerCount = list_length(workerNodeList);
foreach(referenceTableCell, referenceTableList)
{
Oid referenceTableId = lfirst_oid(referenceTableCell);
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
char *relationName = get_rel_name(referenceTableId);
LockShardDistributionMetadata(shardId, ExclusiveLock);
ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to all workers",
relationName)));
ReplicateShardToAllWorkers(shardInterval);
}
/*
* After replicating reference tables, we will update replication factor column for
* colocation group of reference tables so that worker count will be equal to
* replication factor again.
*/
firstReferenceTableId = linitial_oid(referenceTableList);
referenceTableColocationId = TableColocationId(firstReferenceTableId);
UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount);
heap_close(pgDistNode, NoLock);
}
/* /*
* ReplicateSingleShardTableToAllWorkers accepts a broadcast table and replicates it to * ReplicateSingleShardTableToAllWorkers accepts a broadcast table and replicates it to
* all worker nodes. It assumes that caller of this function ensures that given broadcast * all worker nodes. It assumes that caller of this function ensures that given broadcast
@ -176,6 +241,7 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval)
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
nodeName, nodePort, nodeName, nodePort,
missingWorkerOk); missingWorkerOk);
if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED)
{ {
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner,
@ -250,3 +316,35 @@ CreateReferenceTableColocationId()
return colocationId; return colocationId;
} }
/*
* ReferenceTableOidList function scans pg_dist_partition to create a list of all
* reference tables. To create the list, it performs sequential scan. Since it is not
* expected that this function will be called frequently, it is OK not to use index scan.
* If this function becomes performance bottleneck, it is possible to modify this function
* to perform index scan.
*/
static List *
ReferenceTableOidList()
{
List *distTableOidList = DistTableOidList();
ListCell *distTableOidCell = NULL;
List *referenceTableList = NIL;
foreach(distTableOidCell, distTableOidList)
{
DistTableCacheEntry *cacheEntry = NULL;
Oid relationId = lfirst_oid(distTableOidCell);
cacheEntry = DistributedTableCacheEntry(relationId);
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
referenceTableList = lappend_oid(referenceTableList, relationId);
}
}
return referenceTableList;
}

View File

@ -85,6 +85,8 @@ extern void DeleteShardRow(uint64 shardId);
extern void UpdateShardPlacementState(uint64 placementId, char shardState); extern void UpdateShardPlacementState(uint64 placementId, char shardState);
extern uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 extern uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32
workerPort); workerPort);
extern void UpdateColocationGroupReplicationFactor(uint32 colocationId,
int replicationFactor);
extern void CreateTruncateTrigger(Oid relationId); extern void CreateTruncateTrigger(Oid relationId);
/* Remaining metadata utility functions */ /* Remaining metadata utility functions */

View File

@ -57,6 +57,7 @@ extern List * DistributedTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId); extern ShardInterval * LoadShardInterval(uint64 shardId);
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
extern int GetLocalGroupId(void); extern int GetLocalGroupId(void);
extern List * DistTableOidList(void);
extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateRelcacheByShardId(int64 shardId); extern void CitusInvalidateRelcacheByShardId(int64 shardId);
@ -68,6 +69,7 @@ extern HTAB * GetWorkerNodeHash(void);
/* relation oids */ /* relation oids */
extern Oid DistColocationRelationId(void); extern Oid DistColocationRelationId(void);
extern Oid DistColocationConfigurationIndexId(void); extern Oid DistColocationConfigurationIndexId(void);
extern Oid DistColocationColocationidIndexId(void);
extern Oid DistPartitionRelationId(void); extern Oid DistPartitionRelationId(void);
extern Oid DistShardRelationId(void); extern Oid DistShardRelationId(void);
extern Oid DistShardPlacementRelationId(void); extern Oid DistShardPlacementRelationId(void);

View File

@ -13,5 +13,6 @@
#define REFERENCE_TABLE_UTILS_H_ #define REFERENCE_TABLE_UTILS_H_
extern uint32 CreateReferenceTableColocationId(void); extern uint32 CreateReferenceTableColocationId(void);
extern void ReplicateAllReferenceTablesToAllNodes(void);
#endif /* REFERENCE_TABLE_UTILS_H_ */ #endif /* REFERENCE_TABLE_UTILS_H_ */

View File

@ -0,0 +1,598 @@
--
-- MULTI_REPLICATE_REFERENCE_TABLE
--
-- Tests that check the metadata returned by the master node.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1370000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1370000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1370000;
-- remove a node for testing purposes
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
-- test adding new node with no reference tables
-- verify there is no node with nodeport = :worker_2_port before adding the node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
---------------------------------
(4,4,localhost,57638,default,f)
(1 row)
-- verify node is added
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
-- verify nothing is replicated to the new node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
-- test adding new node with a reference table which does not have any healthy placement
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
-- verify there is no node with nodeport = :worker_2_port before adding the node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
CREATE TABLE replicate_reference_table_unhealthy(column1 int);
SELECT create_reference_table('replicate_reference_table_unhealthy');
create_reference_table
------------------------
(1 row)
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_unhealthy" to all workers
ERROR: could not find any healthy placement for shard 1370000
-- verify node is not added
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
-- verify nothing is replicated to the new node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
DROP TABLE replicate_reference_table_unhealthy;
-- test replicating a reference table when a new node added
CREATE TABLE replicate_reference_table_valid(column1 int);
SELECT create_reference_table('replicate_reference_table_valid');
create_reference_table
------------------------
(1 row)
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370001 | 1 | 1 | 0
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_valid" to all workers
master_add_node
---------------------------------
(6,6,localhost,57638,default,f)
(1 row)
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370001 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370001 | 1 | 2 | 0
(1 row)
-- test add same node twice
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370001 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370001 | 1 | 2 | 0
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
---------------------------------
(6,6,localhost,57638,default,f)
(1 row)
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370001 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370001 | 1 | 2 | 0
(1 row)
DROP TABLE replicate_reference_table_valid;
-- test replicating a reference table when a new node added in TRANSACTION + ROLLBACK
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
CREATE TABLE replicate_reference_table_rollback(column1 int);
SELECT create_reference_table('replicate_reference_table_rollback');
create_reference_table
------------------------
(1 row)
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370002 | 1 | 1 | 0
(1 row)
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_rollback" to all workers
master_add_node
---------------------------------
(7,7,localhost,57638,default,f)
(1 row)
ROLLBACK;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370002 | 1 | 1 | 0
(1 row)
DROP TABLE replicate_reference_table_rollback;
-- test replicating a reference table when a new node added in TRANSACTION + COMMIT
CREATE TABLE replicate_reference_table_commit(column1 int);
SELECT create_reference_table('replicate_reference_table_commit');
create_reference_table
------------------------
(1 row)
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_commit'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370003 | 1 | 1 | 0
(1 row)
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_commit" to all workers
master_add_node
---------------------------------
(8,8,localhost,57638,default,f)
(1 row)
COMMIT;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370003 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_commit'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370003 | 1 | 2 | 0
(1 row)
DROP TABLE replicate_reference_table_commit;
-- test adding new node + upgrading another hash distributed table to reference table + creating new reference table in TRANSACTION
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
CREATE TABLE replicate_reference_table_reference_one(column1 int);
SELECT create_reference_table('replicate_reference_table_reference_one');
create_reference_table
------------------------
(1 row)
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
CREATE TABLE replicate_reference_table_hash(column1 int);
SELECT create_distributed_table('replicate_reference_table_hash', 'column1');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE replicate_reference_table_reference_two(column1 int);
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370004 | 1 | 1 | 0
(1 row)
SELECT
logicalrelid, partmethod, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two');
logicalrelid | partmethod | colocationid | repmodel
-----------------------------------------+------------+--------------+----------
replicate_reference_table_reference_one | n | 1370004 | t
replicate_reference_table_hash | h | 1370005 | s
(2 rows)
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_reference_one" to all workers
master_add_node
---------------------------------
(9,9,localhost,57638,default,f)
(1 row)
SELECT upgrade_to_reference_table('replicate_reference_table_hash');
upgrade_to_reference_table
----------------------------
(1 row)
SELECT create_reference_table('replicate_reference_table_reference_two');
create_reference_table
------------------------
(1 row)
COMMIT;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370004 | 1 | 0 | localhost | 57638
1370005 | 1 | 0 | localhost | 57638
1370006 | 1 | 0 | localhost | 57638
(3 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370004 | 1 | 2 | 0
(1 row)
SELECT
logicalrelid, partmethod, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two');
logicalrelid | partmethod | colocationid | repmodel
-----------------------------------------+------------+--------------+----------
replicate_reference_table_reference_one | n | 1370004 | t
replicate_reference_table_hash | n | 1370004 | t
replicate_reference_table_reference_two | n | 1370004 | t
(3 rows)
DROP TABLE replicate_reference_table_reference_one;
DROP TABLE replicate_reference_table_hash;
DROP TABLE replicate_reference_table_reference_two;
-- test inserting a value then adding a new node in a transaction
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
CREATE TABLE replicate_reference_table_insert(column1 int);
SELECT create_reference_table('replicate_reference_table_insert');
create_reference_table
------------------------
(1 row)
BEGIN;
INSERT INTO replicate_reference_table_insert VALUES(1);
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_insert" to all workers
ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK;
DROP TABLE replicate_reference_table_insert;
-- test COPY then adding a new node in a transaction
CREATE TABLE replicate_reference_table_copy(column1 int);
SELECT create_reference_table('replicate_reference_table_copy');
create_reference_table
------------------------
(1 row)
BEGIN;
COPY replicate_reference_table_copy FROM STDIN;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_copy" to all workers
ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK;
DROP TABLE replicate_reference_table_copy;
-- test executing DDL command then adding a new node in a transaction
CREATE TABLE replicate_reference_table_ddl(column1 int);
SELECT create_reference_table('replicate_reference_table_ddl');
create_reference_table
------------------------
(1 row)
BEGIN;
ALTER TABLE replicate_reference_table_ddl ADD column2 int;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_ddl" to all workers
ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK;
DROP TABLE replicate_reference_table_ddl;
-- test DROP table after adding new node in a transaction
CREATE TABLE replicate_reference_table_drop(column1 int);
SELECT create_reference_table('replicate_reference_table_drop');
create_reference_table
------------------------
(1 row)
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_drop" to all workers
master_add_node
-----------------------------------
(13,13,localhost,57638,default,f)
(1 row)
DROP TABLE replicate_reference_table_drop;
ERROR: DROP distributed table cannot run inside a transaction block
CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)"
PL/pgSQL function citus_drop_trigger() line 21 at PERFORM
ROLLBACK;
DROP TABLE replicate_reference_table_drop;
-- test adding a node while there is a reference table at another schema
CREATE SCHEMA replicate_reference_table_schema;
CREATE TABLE replicate_reference_table_schema.table1(column1 int);
SELECT create_reference_table('replicate_reference_table_schema.table1');
create_reference_table
------------------------
(1 row)
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370010 | 1 | 1 | 0
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "table1" to all workers
master_add_node
-----------------------------------
(14,14,localhost,57638,default,f)
(1 row)
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370011 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370010 | 1 | 2 | 0
(1 row)
DROP TABLE replicate_reference_table_schema.table1;
DROP SCHEMA replicate_reference_table_schema CASCADE;
-- reload pg_dist_shard_placement table
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;

View File

@ -208,5 +208,7 @@ test: multi_foreign_key
# ---------- # ----------
# multi_upgrade_reference_table tests for upgrade_reference_table UDF # multi_upgrade_reference_table tests for upgrade_reference_table UDF
# multi_replicate_reference_table tests replicating reference tables to new nodes after we add new nodes
# ---------- # ----------
test: multi_upgrade_reference_table test: multi_upgrade_reference_table
test: multi_replicate_reference_table

View File

@ -0,0 +1,387 @@
--
-- MULTI_REPLICATE_REFERENCE_TABLE
--
-- Tests that check the metadata returned by the master node.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1370000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1370000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1370000;
-- remove a node for testing purposes
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
SELECT master_remove_node('localhost', :worker_2_port);
-- test adding new node with no reference tables
-- verify there is no node with nodeport = :worker_2_port before adding the node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT master_add_node('localhost', :worker_2_port);
-- verify node is added
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
-- verify nothing is replicated to the new node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
-- test adding new node with a reference table which does not have any healthy placement
SELECT master_remove_node('localhost', :worker_2_port);
-- verify there is no node with nodeport = :worker_2_port before adding the node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
CREATE TABLE replicate_reference_table_unhealthy(column1 int);
SELECT create_reference_table('replicate_reference_table_unhealthy');
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000;
SELECT master_add_node('localhost', :worker_2_port);
-- verify node is not added
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
-- verify nothing is replicated to the new node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
DROP TABLE replicate_reference_table_unhealthy;
-- test replicating a reference table when a new node added
CREATE TABLE replicate_reference_table_valid(column1 int);
SELECT create_reference_table('replicate_reference_table_valid');
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
SELECT master_add_node('localhost', :worker_2_port);
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
-- test add same node twice
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
SELECT master_add_node('localhost', :worker_2_port);
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
DROP TABLE replicate_reference_table_valid;
-- test replicating a reference table when a new node added in TRANSACTION + ROLLBACK
SELECT master_remove_node('localhost', :worker_2_port);
CREATE TABLE replicate_reference_table_rollback(column1 int);
SELECT create_reference_table('replicate_reference_table_rollback');
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass);
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
ROLLBACK;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass);
DROP TABLE replicate_reference_table_rollback;
-- test replicating a reference table when a new node added in TRANSACTION + COMMIT
CREATE TABLE replicate_reference_table_commit(column1 int);
SELECT create_reference_table('replicate_reference_table_commit');
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_commit'::regclass);
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
COMMIT;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_commit'::regclass);
DROP TABLE replicate_reference_table_commit;
-- test adding new node + upgrading another hash distributed table to reference table + creating new reference table in TRANSACTION
SELECT master_remove_node('localhost', :worker_2_port);
CREATE TABLE replicate_reference_table_reference_one(column1 int);
SELECT create_reference_table('replicate_reference_table_reference_one');
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
CREATE TABLE replicate_reference_table_hash(column1 int);
SELECT create_distributed_table('replicate_reference_table_hash', 'column1');
CREATE TABLE replicate_reference_table_reference_two(column1 int);
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass);
SELECT
logicalrelid, partmethod, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two');
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
SELECT upgrade_to_reference_table('replicate_reference_table_hash');
SELECT create_reference_table('replicate_reference_table_reference_two');
COMMIT;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass);
SELECT
logicalrelid, partmethod, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two');
DROP TABLE replicate_reference_table_reference_one;
DROP TABLE replicate_reference_table_hash;
DROP TABLE replicate_reference_table_reference_two;
-- test inserting a value then adding a new node in a transaction
SELECT master_remove_node('localhost', :worker_2_port);
CREATE TABLE replicate_reference_table_insert(column1 int);
SELECT create_reference_table('replicate_reference_table_insert');
BEGIN;
INSERT INTO replicate_reference_table_insert VALUES(1);
SELECT master_add_node('localhost', :worker_2_port);
ROLLBACK;
DROP TABLE replicate_reference_table_insert;
-- test COPY then adding a new node in a transaction
CREATE TABLE replicate_reference_table_copy(column1 int);
SELECT create_reference_table('replicate_reference_table_copy');
BEGIN;
COPY replicate_reference_table_copy FROM STDIN;
1
2
3
4
5
\.
SELECT master_add_node('localhost', :worker_2_port);
ROLLBACK;
DROP TABLE replicate_reference_table_copy;
-- test executing DDL command then adding a new node in a transaction
CREATE TABLE replicate_reference_table_ddl(column1 int);
SELECT create_reference_table('replicate_reference_table_ddl');
BEGIN;
ALTER TABLE replicate_reference_table_ddl ADD column2 int;
SELECT master_add_node('localhost', :worker_2_port);
ROLLBACK;
DROP TABLE replicate_reference_table_ddl;
-- test DROP table after adding new node in a transaction
CREATE TABLE replicate_reference_table_drop(column1 int);
SELECT create_reference_table('replicate_reference_table_drop');
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
DROP TABLE replicate_reference_table_drop;
ROLLBACK;
DROP TABLE replicate_reference_table_drop;
-- test adding a node while there is a reference table at another schema
CREATE SCHEMA replicate_reference_table_schema;
CREATE TABLE replicate_reference_table_schema.table1(column1 int);
SELECT create_reference_table('replicate_reference_table_schema.table1');
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass);
SELECT master_add_node('localhost', :worker_2_port);
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass);
DROP TABLE replicate_reference_table_schema.table1;
DROP SCHEMA replicate_reference_table_schema CASCADE;
-- reload pg_dist_shard_placement table
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;