Merge pull request #1091 from citusdata/replicate_reference_table_on_add_node

Replicate reference tables when new node is added
pull/1938/head
Burak Yücesoy 2017-01-05 13:37:45 +02:00 committed by GitHub
commit e63f525453
19 changed files with 1367 additions and 150 deletions

View File

@ -31,6 +31,7 @@
#include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_shard_placement.h"
@ -867,6 +868,63 @@ UpdateShardPlacementState(uint64 placementId, char shardState)
}
/*
* UpdateColocationGroupReplicationFactor finds colocation group record for given
* colocationId and updates its replication factor to given replicationFactor value.
* Since we do not cache pg_dist_colocation table, we do not need to invalidate the
* cache after updating replication factor.
*/
void
UpdateColocationGroupReplicationFactor(uint32 colocationId, int replicationFactor)
{
Relation pgDistColocation = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
HeapTuple heapTuple = NULL;
TupleDesc tupleDescriptor = NULL;
Datum values[Natts_pg_dist_colocation];
bool isnull[Natts_pg_dist_colocation];
bool replace[Natts_pg_dist_colocation];
/* we first search for colocation group by its colocation id */
pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistColocation);
ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_colocationid, BTEqualStrategyNumber,
F_OIDEQ, ObjectIdGetDatum(colocationId));
scanDescriptor = systable_beginscan(pgDistColocation,
DistColocationColocationidIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("could not find valid entry for colocation group "
"%d", colocationId)));
}
/* after we find colocation group, we update it with new values */
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_colocation_replicationfactor - 1] = Int32GetDatum(
replicationFactor);
isnull[Anum_pg_dist_colocation_replicationfactor - 1] = false;
replace[Anum_pg_dist_colocation_replicationfactor - 1] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
simple_heap_update(pgDistColocation, &heapTuple->t_self, heapTuple);
CatalogUpdateIndexes(pgDistColocation, heapTuple);
systable_endscan(scanDescriptor);
heap_close(pgDistColocation, NoLock);
}
/*
* Check that the current user has `mode` permissions on relationId, error out
* if not. Superusers always have such permissions.

View File

@ -58,6 +58,7 @@ static Oid distNodeRelationId = InvalidOid;
static Oid distLocalGroupRelationId = InvalidOid;
static Oid distColocationRelationId = InvalidOid;
static Oid distColocationConfigurationIndexId = InvalidOid;
static Oid distColocationColocationidIndexId = InvalidOid;
static Oid distPartitionRelationId = InvalidOid;
static Oid distPartitionLogicalRelidIndexId = InvalidOid;
static Oid distPartitionColocationidIndexId = InvalidOid;
@ -105,7 +106,6 @@ static uint32 WorkerNodeHashCode(const void *key, Size keySize);
static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId);
static List * DistTableOidList(void);
static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId);
static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId);
static List * LookupDistShardTuples(Oid relationId);
@ -762,6 +762,17 @@ DistColocationConfigurationIndexId(void)
}
/* return oid of pg_dist_colocation_pkey index */
Oid
DistColocationColocationidIndexId(void)
{
CachedRelationLookup("pg_dist_colocation_pkey",
&distColocationColocationidIndexId);
return distColocationColocationidIndexId;
}
/* return oid of pg_dist_partition relation */
Oid
DistPartitionRelationId(void)
@ -1565,6 +1576,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
distNodeRelationId = InvalidOid;
distColocationRelationId = InvalidOid;
distColocationConfigurationIndexId = InvalidOid;
distColocationColocationidIndexId = InvalidOid;
distPartitionRelationId = InvalidOid;
distPartitionLogicalRelidIndexId = InvalidOid;
distPartitionColocationidIndexId = InvalidOid;
@ -1583,7 +1595,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
* DistTableOidList iterates over the pg_dist_partition table and returns
* a list that consists of the logicalrelids.
*/
static List *
List *
DistTableOidList(void)
{
SysScanDesc scanDescriptor = NULL;

View File

@ -30,6 +30,7 @@
#include "distributed/metadata_sync.h"
#include "distributed/multi_join_order.h"
#include "distributed/pg_dist_node.h"
#include "distributed/reference_table_utils.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
@ -48,7 +49,7 @@ int GroupSize = 1;
/* local function forward declarations */
static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId,
char *nodeRack, bool hasMetadata);
char *nodeRack, bool hasMetadata, bool *nodeAlreadyExists);
static Datum GenerateNodeTuple(WorkerNode *workerNode);
static int32 GetNextGroupId(void);
static uint32 GetMaxGroupId(void);
@ -67,7 +68,8 @@ PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
/*
* master_add_node function adds a new node to the cluster and returns its data.
* master_add_node function adds a new node to the cluster and returns its data. It also
* replicates all reference tables to the new node.
*/
Datum
master_add_node(PG_FUNCTION_ARGS)
@ -78,9 +80,21 @@ master_add_node(PG_FUNCTION_ARGS)
int32 groupId = 0;
char *nodeRack = WORKER_DEFAULT_RACK;
bool hasMetadata = false;
bool nodeAlreadyExists = false;
Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
hasMetadata);
hasMetadata, &nodeAlreadyExists);
/*
* After adding new node, if the node is not already exist, we replicate all existing
* reference tables to the new node. ReplicateAllReferenceTablesToAllNodes replicates
* reference tables to all nodes however, it skips nodes which already has healthy
* placement of particular reference table.
*/
if (!nodeAlreadyExists)
{
ReplicateAllReferenceTablesToAllNodes();
}
PG_RETURN_CSTRING(returnData);
}
@ -137,13 +151,14 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
{
ListCell *workerNodeCell = NULL;
List *workerNodes = ParseWorkerNodeFileAndRename();
bool nodeAlreadyExists = false;
foreach(workerNodeCell, workerNodes)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0,
workerNode->workerRack, false);
workerNode->workerRack, false, &nodeAlreadyExists);
}
PG_RETURN_BOOL(true);
@ -336,7 +351,7 @@ ReadWorkerNodes()
*/
static Datum
AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
bool hasMetadata)
bool hasMetadata, bool *nodeAlreadyExists)
{
Relation pgDistNode = NULL;
int nextNodeIdInt = 0;
@ -349,6 +364,8 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
EnsureSchemaNode();
EnsureSuperUser();
*nodeAlreadyExists = false;
/* acquire a lock so that no one can do this concurrently */
pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
@ -362,6 +379,8 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
/* close the heap */
heap_close(pgDistNode, AccessExclusiveLock);
*nodeAlreadyExists = true;
PG_RETURN_DATUM(returnData);
}

View File

@ -13,9 +13,12 @@
#include "miscadmin.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/genam.h"
#include "distributed/colocation_utils.h"
#include "distributed/listutils.h"
#include "distributed/master_protocol.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/reference_table_utils.h"
@ -23,14 +26,16 @@
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
/* local function forward declarations */
static void ReplicateSingleShardTableToAllWorkers(Oid relationId);
static void ReplicateShardToAllWorkers(ShardInterval *shardInterval);
static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId);
static List * ReferenceTableOidList(void);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(upgrade_to_reference_table);
@ -93,6 +98,66 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
}
/*
* ReplicateAllReferenceTablesToAllNodes function finds all reference tables and
* replicates them to all worker nodes. It also modifies pg_dist_colocation table to
* update the replication factor column. This function skips a worker node if that node
* already has healthy placement of a particular reference table to prevent unnecessary
* data transfer.
*/
void
ReplicateAllReferenceTablesToAllNodes()
{
List *referenceTableList = ReferenceTableOidList();
ListCell *referenceTableCell = NULL;
Relation pgDistNode = NULL;
List *workerNodeList = NIL;
int workerCount = 0;
Oid firstReferenceTableId = InvalidOid;
uint32 referenceTableColocationId = INVALID_COLOCATION_ID;
/* if there is no reference table, we do not need to do anything */
if (list_length(referenceTableList) == 0)
{
return;
}
/* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
workerNodeList = WorkerNodeList();
workerCount = list_length(workerNodeList);
foreach(referenceTableCell, referenceTableList)
{
Oid referenceTableId = lfirst_oid(referenceTableCell);
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
char *relationName = get_rel_name(referenceTableId);
LockShardDistributionMetadata(shardId, ExclusiveLock);
ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to all workers",
relationName)));
ReplicateShardToAllWorkers(shardInterval);
}
/*
* After replicating reference tables, we will update replication factor column for
* colocation group of reference tables so that worker count will be equal to
* replication factor again.
*/
firstReferenceTableId = linitial_oid(referenceTableList);
referenceTableColocationId = TableColocationId(firstReferenceTableId);
UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount);
heap_close(pgDistNode, NoLock);
}
/*
* ReplicateSingleShardTableToAllWorkers accepts a broadcast table and replicates it to
* all worker nodes. It assumes that caller of this function ensures that given broadcast
@ -176,6 +241,7 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval)
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
nodeName, nodePort,
missingWorkerOk);
if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED)
{
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner,
@ -250,3 +316,35 @@ CreateReferenceTableColocationId()
return colocationId;
}
/*
* ReferenceTableOidList function scans pg_dist_partition to create a list of all
* reference tables. To create the list, it performs sequential scan. Since it is not
* expected that this function will be called frequently, it is OK not to use index scan.
* If this function becomes performance bottleneck, it is possible to modify this function
* to perform index scan.
*/
static List *
ReferenceTableOidList()
{
List *distTableOidList = DistTableOidList();
ListCell *distTableOidCell = NULL;
List *referenceTableList = NIL;
foreach(distTableOidCell, distTableOidList)
{
DistTableCacheEntry *cacheEntry = NULL;
Oid relationId = lfirst_oid(distTableOidCell);
cacheEntry = DistributedTableCacheEntry(relationId);
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
referenceTableList = lappend_oid(referenceTableList, relationId);
}
}
return referenceTableList;
}

View File

@ -85,6 +85,8 @@ extern void DeleteShardRow(uint64 shardId);
extern void UpdateShardPlacementState(uint64 placementId, char shardState);
extern uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32
workerPort);
extern void UpdateColocationGroupReplicationFactor(uint32 colocationId,
int replicationFactor);
extern void CreateTruncateTrigger(Oid relationId);
/* Remaining metadata utility functions */

View File

@ -57,6 +57,7 @@ extern List * DistributedTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId);
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
extern int GetLocalGroupId(void);
extern List * DistTableOidList(void);
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateRelcacheByShardId(int64 shardId);
@ -68,6 +69,7 @@ extern HTAB * GetWorkerNodeHash(void);
/* relation oids */
extern Oid DistColocationRelationId(void);
extern Oid DistColocationConfigurationIndexId(void);
extern Oid DistColocationColocationidIndexId(void);
extern Oid DistPartitionRelationId(void);
extern Oid DistShardRelationId(void);
extern Oid DistShardPlacementRelationId(void);

View File

@ -13,5 +13,6 @@
#define REFERENCE_TABLE_UTILS_H_
extern uint32 CreateReferenceTableColocationId(void);
extern void ReplicateAllReferenceTablesToAllNodes(void);
#endif /* REFERENCE_TABLE_UTILS_H_ */

View File

@ -436,10 +436,10 @@ SELECT * FROM pg_dist_colocation
ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1 | 2 | 2 | 23
2 | 2 | 1 | 23
3 | 2 | 2 | 25
4 | 4 | 2 | 23
4 | 2 | 2 | 23
5 | 2 | 1 | 23
6 | 2 | 2 | 25
7 | 4 | 2 | 23
(4 rows)
SELECT logicalrelid, colocationid FROM pg_dist_partition
@ -447,28 +447,28 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition
ORDER BY logicalrelid;
logicalrelid | colocationid
---------------+--------------
table1_groupa | 1
table2_groupa | 1
table1_groupb | 2
table2_groupb | 2
table1_groupc | 3
table2_groupc | 3
table1_groupd | 4
table2_groupd | 4
table3_groupd | 4
table1_groupa | 4
table2_groupa | 4
table1_groupb | 5
table2_groupb | 5
table1_groupc | 6
table2_groupc | 6
table1_groupd | 7
table2_groupd | 7
table3_groupd | 7
(9 rows)
-- check effects of dropping tables
DROP TABLE table1_groupA;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
SELECT * FROM pg_dist_colocation WHERE colocationid = 4;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1 | 2 | 2 | 23
4 | 2 | 2 | 23
(1 row)
-- dropping all tables in a colocation group also deletes the colocation group
DROP TABLE table2_groupA;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
SELECT * FROM pg_dist_colocation WHERE colocationid = 4;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
(0 rows)
@ -558,11 +558,11 @@ SELECT * FROM pg_dist_colocation
ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
2 | 2 | 1 | 23
3 | 2 | 2 | 25
4 | 4 | 2 | 23
5 | 2 | 2 | 23
9 | 3 | 2 | 23
5 | 2 | 1 | 23
6 | 2 | 2 | 25
7 | 4 | 2 | 23
8 | 2 | 2 | 23
12 | 3 | 2 | 23
(5 rows)
SELECT logicalrelid, colocationid FROM pg_dist_partition
@ -570,23 +570,23 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition
ORDER BY colocationid, logicalrelid;
logicalrelid | colocationid
----------------------------------+--------------
table1_groupb | 2
table2_groupb | 2
table1_groupc | 3
table2_groupc | 3
table1_groupd | 4
table2_groupd | 4
table3_groupd | 4
table1_groupe | 5
table2_groupe | 5
table3_groupe | 5
schema_collocation.table4_groupe | 5
table4_groupe | 5
table1_group_none_1 | 6
table2_group_none_1 | 6
table1_group_none_2 | 7
table1_group_none_3 | 8
table1_group_default | 9
table1_groupb | 5
table2_groupb | 5
table1_groupc | 6
table2_groupc | 6
table1_groupd | 7
table2_groupd | 7
table3_groupd | 7
table1_groupe | 8
table2_groupe | 8
table3_groupe | 8
schema_collocation.table4_groupe | 8
table4_groupe | 8
table1_group_none_1 | 9
table2_group_none_1 | 9
table1_group_none_2 | 10
table1_group_none_3 | 11
table1_group_default | 12
(17 rows)
-- check failing colocate_with options
@ -651,12 +651,12 @@ SELECT * FROM pg_dist_colocation
ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
2 | 2 | 1 | 23
3 | 2 | 2 | 25
4 | 4 | 2 | 23
5 | 2 | 2 | 23
9 | 3 | 2 | 23
10 | 1 | 2 | 0
5 | 2 | 1 | 23
6 | 2 | 2 | 25
7 | 4 | 2 | 23
8 | 2 | 2 | 23
12 | 3 | 2 | 23
13 | 1 | 2 | 0
(6 rows)
-- cross check with internal colocation API

View File

@ -1574,7 +1574,7 @@ ERROR: single-shard DML commands must not appear in transaction blocks which co
ROLLBACK;
-- clean up tables
DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third,
reference_table_test_fourth, reference_table_ddl;
reference_table_test_fourth, reference_table_ddl, reference_table_composite;
DROP SCHEMA reference_schema CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table reference_schema.reference_table_test_sixth

View File

@ -0,0 +1,598 @@
--
-- MULTI_REPLICATE_REFERENCE_TABLE
--
-- Tests that check the metadata returned by the master node.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1370000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1370000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1370000;
-- remove a node for testing purposes
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
-- test adding new node with no reference tables
-- verify there is no node with nodeport = :worker_2_port before adding the node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
---------------------------------
(4,4,localhost,57638,default,f)
(1 row)
-- verify node is added
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
-- verify nothing is replicated to the new node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
-- test adding new node with a reference table which does not have any healthy placement
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
-- verify there is no node with nodeport = :worker_2_port before adding the node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
CREATE TABLE replicate_reference_table_unhealthy(column1 int);
SELECT create_reference_table('replicate_reference_table_unhealthy');
create_reference_table
------------------------
(1 row)
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_unhealthy" to all workers
ERROR: could not find any healthy placement for shard 1370000
-- verify node is not added
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
-- verify nothing is replicated to the new node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
DROP TABLE replicate_reference_table_unhealthy;
-- test replicating a reference table when a new node added
CREATE TABLE replicate_reference_table_valid(column1 int);
SELECT create_reference_table('replicate_reference_table_valid');
create_reference_table
------------------------
(1 row)
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370001 | 1 | 1 | 0
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_valid" to all workers
master_add_node
---------------------------------
(6,6,localhost,57638,default,f)
(1 row)
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370001 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370001 | 1 | 2 | 0
(1 row)
-- test add same node twice
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370001 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370001 | 1 | 2 | 0
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
---------------------------------
(6,6,localhost,57638,default,f)
(1 row)
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370001 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370001 | 1 | 2 | 0
(1 row)
DROP TABLE replicate_reference_table_valid;
-- test replicating a reference table when a new node added in TRANSACTION + ROLLBACK
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
CREATE TABLE replicate_reference_table_rollback(column1 int);
SELECT create_reference_table('replicate_reference_table_rollback');
create_reference_table
------------------------
(1 row)
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370002 | 1 | 1 | 0
(1 row)
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_rollback" to all workers
master_add_node
---------------------------------
(7,7,localhost,57638,default,f)
(1 row)
ROLLBACK;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370002 | 1 | 1 | 0
(1 row)
DROP TABLE replicate_reference_table_rollback;
-- test replicating a reference table when a new node added in TRANSACTION + COMMIT
CREATE TABLE replicate_reference_table_commit(column1 int);
SELECT create_reference_table('replicate_reference_table_commit');
create_reference_table
------------------------
(1 row)
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_commit'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370003 | 1 | 1 | 0
(1 row)
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_commit" to all workers
master_add_node
---------------------------------
(8,8,localhost,57638,default,f)
(1 row)
COMMIT;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370003 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_commit'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370003 | 1 | 2 | 0
(1 row)
DROP TABLE replicate_reference_table_commit;
-- test adding new node + upgrading another hash distributed table to reference table + creating new reference table in TRANSACTION
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
CREATE TABLE replicate_reference_table_reference_one(column1 int);
SELECT create_reference_table('replicate_reference_table_reference_one');
create_reference_table
------------------------
(1 row)
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
CREATE TABLE replicate_reference_table_hash(column1 int);
SELECT create_distributed_table('replicate_reference_table_hash', 'column1');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE replicate_reference_table_reference_two(column1 int);
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370004 | 1 | 1 | 0
(1 row)
SELECT
logicalrelid, partmethod, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two');
logicalrelid | partmethod | colocationid | repmodel
-----------------------------------------+------------+--------------+----------
replicate_reference_table_reference_one | n | 1370004 | t
replicate_reference_table_hash | h | 1370005 | s
(2 rows)
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_reference_one" to all workers
master_add_node
---------------------------------
(9,9,localhost,57638,default,f)
(1 row)
SELECT upgrade_to_reference_table('replicate_reference_table_hash');
upgrade_to_reference_table
----------------------------
(1 row)
SELECT create_reference_table('replicate_reference_table_reference_two');
create_reference_table
------------------------
(1 row)
COMMIT;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370004 | 1 | 0 | localhost | 57638
1370005 | 1 | 0 | localhost | 57638
1370006 | 1 | 0 | localhost | 57638
(3 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370004 | 1 | 2 | 0
(1 row)
SELECT
logicalrelid, partmethod, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two');
logicalrelid | partmethod | colocationid | repmodel
-----------------------------------------+------------+--------------+----------
replicate_reference_table_reference_one | n | 1370004 | t
replicate_reference_table_hash | n | 1370004 | t
replicate_reference_table_reference_two | n | 1370004 | t
(3 rows)
DROP TABLE replicate_reference_table_reference_one;
DROP TABLE replicate_reference_table_hash;
DROP TABLE replicate_reference_table_reference_two;
-- test inserting a value then adding a new node in a transaction
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
CREATE TABLE replicate_reference_table_insert(column1 int);
SELECT create_reference_table('replicate_reference_table_insert');
create_reference_table
------------------------
(1 row)
BEGIN;
INSERT INTO replicate_reference_table_insert VALUES(1);
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_insert" to all workers
ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK;
DROP TABLE replicate_reference_table_insert;
-- test COPY then adding a new node in a transaction
CREATE TABLE replicate_reference_table_copy(column1 int);
SELECT create_reference_table('replicate_reference_table_copy');
create_reference_table
------------------------
(1 row)
BEGIN;
COPY replicate_reference_table_copy FROM STDIN;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_copy" to all workers
ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK;
DROP TABLE replicate_reference_table_copy;
-- test executing DDL command then adding a new node in a transaction
CREATE TABLE replicate_reference_table_ddl(column1 int);
SELECT create_reference_table('replicate_reference_table_ddl');
create_reference_table
------------------------
(1 row)
BEGIN;
ALTER TABLE replicate_reference_table_ddl ADD column2 int;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_ddl" to all workers
ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK;
DROP TABLE replicate_reference_table_ddl;
-- test DROP table after adding new node in a transaction
CREATE TABLE replicate_reference_table_drop(column1 int);
SELECT create_reference_table('replicate_reference_table_drop');
create_reference_table
------------------------
(1 row)
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_drop" to all workers
master_add_node
-----------------------------------
(13,13,localhost,57638,default,f)
(1 row)
DROP TABLE replicate_reference_table_drop;
ERROR: DROP distributed table cannot run inside a transaction block
CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)"
PL/pgSQL function citus_drop_trigger() line 21 at PERFORM
ROLLBACK;
DROP TABLE replicate_reference_table_drop;
-- test adding a node while there is a reference table at another schema
CREATE SCHEMA replicate_reference_table_schema;
CREATE TABLE replicate_reference_table_schema.table1(column1 int);
SELECT create_reference_table('replicate_reference_table_schema.table1');
create_reference_table
------------------------
(1 row)
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370010 | 1 | 1 | 0
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "table1" to all workers
master_add_node
-----------------------------------
(14,14,localhost,57638,default,f)
(1 row)
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370011 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370010 | 1 | 2 | 0
(1 row)
DROP TABLE replicate_reference_table_schema.table1;
DROP SCHEMA replicate_reference_table_schema CASCADE;
-- reload pg_dist_shard_placement table
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;

View File

@ -5,6 +5,7 @@
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1360000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1360000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1360000;
-- test with not distributed table
CREATE TABLE upgrade_reference_table_local(column1 int);
SELECT upgrade_to_reference_table('upgrade_reference_table_local');
@ -138,15 +139,16 @@ WHERE colocationid IN
--------------+------------+-------------------+------------------------
(0 rows)
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass);
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+-----------+----------+-------------
1360009 | 1 | 8192 | localhost | 57637 | 379
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360009 | 1 | 8192 | localhost | 57637
(1 row)
SELECT upgrade_to_reference_table('upgrade_reference_table_append');
@ -164,7 +166,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_append'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
n | t | 33 | t
n | t | 1360002 | t
(1 row)
SELECT
@ -186,19 +188,20 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
33 | 1 | 2 | 0
1360002 | 1 | 2 | 0
(1 row)
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass);
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+-----------+----------+-------------
1360009 | 1 | 8192 | localhost | 57637 | 379
1360009 | 1 | 0 | localhost | 57638 | 380
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360009 | 1 | 8192 | localhost | 57637
1360009 | 1 | 0 | localhost | 57638
(2 rows)
-- test valid cases, shard exists at one worker
@ -218,7 +221,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_one_worker'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 32 | s
h | f | 1360001 | s
(1 row)
SELECT
@ -240,18 +243,19 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
32 | 1 | 1 | 23
1360001 | 1 | 1 | 23
(1 row)
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+-----------+----------+-------------
1360010 | 1 | 0 | localhost | 57637 | 381
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360010 | 1 | 0 | localhost | 57637
(1 row)
SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker');
@ -269,7 +273,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_one_worker'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
n | t | 33 | t
n | t | 1360002 | t
(1 row)
SELECT
@ -291,19 +295,20 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
33 | 1 | 2 | 0
1360002 | 1 | 2 | 0
(1 row)
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+-----------+----------+-------------
1360010 | 1 | 0 | localhost | 57637 | 381
1360010 | 1 | 0 | localhost | 57638 | 382
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360010 | 1 | 0 | localhost | 57637
1360010 | 1 | 0 | localhost | 57638
(2 rows)
-- test valid cases, shard exists at both workers but one is unhealthy
@ -325,7 +330,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 34 | c
h | f | 1360003 | c
(1 row)
SELECT
@ -347,19 +352,20 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
34 | 1 | 2 | 23
1360003 | 1 | 2 | 23
(1 row)
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass);
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+-----------+----------+-------------
1360011 | 1 | 0 | localhost | 57637 | 383
1360011 | 1 | 0 | localhost | 57638 | 384
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360011 | 1 | 0 | localhost | 57637
1360011 | 1 | 0 | localhost | 57638
(2 rows)
SELECT upgrade_to_reference_table('upgrade_reference_table_one_unhealthy');
@ -377,7 +383,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
n | t | 33 | t
n | t | 1360002 | t
(1 row)
SELECT
@ -399,19 +405,20 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
33 | 1 | 2 | 0
1360002 | 1 | 2 | 0
(1 row)
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass);
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+-----------+----------+-------------
1360011 | 1 | 0 | localhost | 57637 | 383
1360011 | 1 | 0 | localhost | 57638 | 384
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360011 | 1 | 0 | localhost | 57637
1360011 | 1 | 0 | localhost | 57638
(2 rows)
-- test valid cases, shard exists at both workers and both are healthy
@ -431,7 +438,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_both_healthy'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 35 | c
h | f | 1360004 | c
(1 row)
SELECT
@ -453,19 +460,20 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
35 | 1 | 2 | 23
1360004 | 1 | 2 | 23
(1 row)
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass);
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+-----------+----------+-------------
1360012 | 1 | 0 | localhost | 57637 | 385
1360012 | 1 | 0 | localhost | 57638 | 386
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360012 | 1 | 0 | localhost | 57637
1360012 | 1 | 0 | localhost | 57638
(2 rows)
SELECT upgrade_to_reference_table('upgrade_reference_table_both_healthy');
@ -483,7 +491,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_both_healthy'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
n | t | 33 | t
n | t | 1360002 | t
(1 row)
SELECT
@ -505,19 +513,20 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
33 | 1 | 2 | 0
1360002 | 1 | 2 | 0
(1 row)
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass);
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+-----------+----------+-------------
1360012 | 1 | 0 | localhost | 57637 | 385
1360012 | 1 | 0 | localhost | 57638 | 386
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360012 | 1 | 0 | localhost | 57637
1360012 | 1 | 0 | localhost | 57638
(2 rows)
-- test valid cases, do it in transaction and ROLLBACK
@ -538,7 +547,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 32 | s
h | f | 1360001 | s
(1 row)
SELECT
@ -560,18 +569,19 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
32 | 1 | 1 | 23
1360001 | 1 | 1 | 23
(1 row)
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+-----------+----------+-------------
1360013 | 1 | 0 | localhost | 57637 | 387
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360013 | 1 | 0 | localhost | 57637
(1 row)
BEGIN;
@ -591,7 +601,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 32 | s
h | f | 1360001 | s
(1 row)
SELECT
@ -613,18 +623,19 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
32 | 1 | 1 | 23
1360001 | 1 | 1 | 23
(1 row)
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+-----------+----------+-------------
1360013 | 1 | 0 | localhost | 57637 | 387
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360013 | 1 | 0 | localhost | 57637
(1 row)
-- test valid cases, do it in transaction and COMMIT
@ -645,7 +656,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 32 | s
h | f | 1360001 | s
(1 row)
SELECT
@ -667,18 +678,19 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
32 | 1 | 1 | 23
1360001 | 1 | 1 | 23
(1 row)
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+-----------+----------+-------------
1360014 | 1 | 0 | localhost | 57637 | 389
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360014 | 1 | 0 | localhost | 57637
(1 row)
BEGIN;
@ -698,7 +710,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
n | t | 33 | t
n | t | 1360002 | t
(1 row)
SELECT
@ -720,19 +732,20 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
33 | 1 | 2 | 0
1360002 | 1 | 2 | 0
(1 row)
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+-----------+----------+-------------
1360014 | 1 | 0 | localhost | 57637 | 389
1360014 | 1 | 0 | localhost | 57638 | 390
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360014 | 1 | 0 | localhost | 57637
1360014 | 1 | 0 | localhost | 57638
(2 rows)
-- verify that shard is replicated to other worker

View File

@ -449,3 +449,9 @@ SELECT
FROM
multi_outer_join_right_reference FULL JOIN
multi_outer_join_third_reference ON (t_custkey = r_custkey);
-- DROP unused tables to clean up workspace
DROP TABLE multi_outer_join_left_hash;
DROP TABLE multi_outer_join_right_reference;
DROP TABLE multi_outer_join_third_reference;
DROP TABLE multi_outer_join_right_hash;

View File

@ -157,6 +157,11 @@ test: multi_router_planner
# ----------
test: multi_large_shardid
# ----------
# multi_drop_extension makes sure we can safely drop and recreate the extension
# ----------
test: multi_drop_extension
# ----------
# multi_metadata_sync tests the propagation of mx-related metadata changes to metadata workers
# multi_unsupported_worker_operations tests that unsupported operations error out on metadata workers
@ -164,11 +169,6 @@ test: multi_large_shardid
test: multi_metadata_sync
test: multi_unsupported_worker_operations
# ----------
# multi_drop_extension makes sure we can safely drop and recreate the extension
# ----------
test: multi_drop_extension
# ----------
# multi_schema_support makes sure we can work with tables in schemas other than public with no problem
# ----------
@ -208,5 +208,7 @@ test: multi_foreign_key
# ----------
# multi_upgrade_reference_table tests for upgrade_reference_table UDF
# multi_replicate_reference_table tests replicating reference tables to new nodes after we add new nodes
# ----------
test: multi_upgrade_reference_table
test: multi_replicate_reference_table

View File

@ -834,3 +834,8 @@ FROM
7 |
(30 rows)
-- DROP unused tables to clean up workspace
DROP TABLE multi_outer_join_left_hash;
DROP TABLE multi_outer_join_right_reference;
DROP TABLE multi_outer_join_third_reference;
DROP TABLE multi_outer_join_right_hash;

View File

@ -219,11 +219,11 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition
-- check effects of dropping tables
DROP TABLE table1_groupA;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
SELECT * FROM pg_dist_colocation WHERE colocationid = 4;
-- dropping all tables in a colocation group also deletes the colocation group
DROP TABLE table2_groupA;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
SELECT * FROM pg_dist_colocation WHERE colocationid = 4;
-- create dropped colocation group again
SET citus.shard_count = 2;

View File

@ -986,5 +986,5 @@ ROLLBACK;
-- clean up tables
DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third,
reference_table_test_fourth, reference_table_ddl;
reference_table_test_fourth, reference_table_ddl, reference_table_composite;
DROP SCHEMA reference_schema CASCADE;

View File

@ -0,0 +1,387 @@
--
-- MULTI_REPLICATE_REFERENCE_TABLE
--
-- Tests that check the metadata returned by the master node.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1370000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1370000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1370000;
-- remove a node for testing purposes
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
SELECT master_remove_node('localhost', :worker_2_port);
-- test adding new node with no reference tables
-- verify there is no node with nodeport = :worker_2_port before adding the node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT master_add_node('localhost', :worker_2_port);
-- verify node is added
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
-- verify nothing is replicated to the new node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
-- test adding new node with a reference table which does not have any healthy placement
SELECT master_remove_node('localhost', :worker_2_port);
-- verify there is no node with nodeport = :worker_2_port before adding the node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
CREATE TABLE replicate_reference_table_unhealthy(column1 int);
SELECT create_reference_table('replicate_reference_table_unhealthy');
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000;
SELECT master_add_node('localhost', :worker_2_port);
-- verify node is not added
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
-- verify nothing is replicated to the new node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
DROP TABLE replicate_reference_table_unhealthy;
-- test replicating a reference table when a new node added
CREATE TABLE replicate_reference_table_valid(column1 int);
SELECT create_reference_table('replicate_reference_table_valid');
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
SELECT master_add_node('localhost', :worker_2_port);
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
-- test add same node twice
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
SELECT master_add_node('localhost', :worker_2_port);
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
DROP TABLE replicate_reference_table_valid;
-- test replicating a reference table when a new node added in TRANSACTION + ROLLBACK
SELECT master_remove_node('localhost', :worker_2_port);
CREATE TABLE replicate_reference_table_rollback(column1 int);
SELECT create_reference_table('replicate_reference_table_rollback');
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass);
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
ROLLBACK;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass);
DROP TABLE replicate_reference_table_rollback;
-- test replicating a reference table when a new node added in TRANSACTION + COMMIT
CREATE TABLE replicate_reference_table_commit(column1 int);
SELECT create_reference_table('replicate_reference_table_commit');
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_commit'::regclass);
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
COMMIT;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_commit'::regclass);
DROP TABLE replicate_reference_table_commit;
-- test adding new node + upgrading another hash distributed table to reference table + creating new reference table in TRANSACTION
SELECT master_remove_node('localhost', :worker_2_port);
CREATE TABLE replicate_reference_table_reference_one(column1 int);
SELECT create_reference_table('replicate_reference_table_reference_one');
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
CREATE TABLE replicate_reference_table_hash(column1 int);
SELECT create_distributed_table('replicate_reference_table_hash', 'column1');
CREATE TABLE replicate_reference_table_reference_two(column1 int);
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass);
SELECT
logicalrelid, partmethod, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two');
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
SELECT upgrade_to_reference_table('replicate_reference_table_hash');
SELECT create_reference_table('replicate_reference_table_reference_two');
COMMIT;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass);
SELECT
logicalrelid, partmethod, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two');
DROP TABLE replicate_reference_table_reference_one;
DROP TABLE replicate_reference_table_hash;
DROP TABLE replicate_reference_table_reference_two;
-- test inserting a value then adding a new node in a transaction
SELECT master_remove_node('localhost', :worker_2_port);
CREATE TABLE replicate_reference_table_insert(column1 int);
SELECT create_reference_table('replicate_reference_table_insert');
BEGIN;
INSERT INTO replicate_reference_table_insert VALUES(1);
SELECT master_add_node('localhost', :worker_2_port);
ROLLBACK;
DROP TABLE replicate_reference_table_insert;
-- test COPY then adding a new node in a transaction
CREATE TABLE replicate_reference_table_copy(column1 int);
SELECT create_reference_table('replicate_reference_table_copy');
BEGIN;
COPY replicate_reference_table_copy FROM STDIN;
1
2
3
4
5
\.
SELECT master_add_node('localhost', :worker_2_port);
ROLLBACK;
DROP TABLE replicate_reference_table_copy;
-- test executing DDL command then adding a new node in a transaction
CREATE TABLE replicate_reference_table_ddl(column1 int);
SELECT create_reference_table('replicate_reference_table_ddl');
BEGIN;
ALTER TABLE replicate_reference_table_ddl ADD column2 int;
SELECT master_add_node('localhost', :worker_2_port);
ROLLBACK;
DROP TABLE replicate_reference_table_ddl;
-- test DROP table after adding new node in a transaction
CREATE TABLE replicate_reference_table_drop(column1 int);
SELECT create_reference_table('replicate_reference_table_drop');
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
DROP TABLE replicate_reference_table_drop;
ROLLBACK;
DROP TABLE replicate_reference_table_drop;
-- test adding a node while there is a reference table at another schema
CREATE SCHEMA replicate_reference_table_schema;
CREATE TABLE replicate_reference_table_schema.table1(column1 int);
SELECT create_reference_table('replicate_reference_table_schema.table1');
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass);
SELECT master_add_node('localhost', :worker_2_port);
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass);
DROP TABLE replicate_reference_table_schema.table1;
DROP SCHEMA replicate_reference_table_schema CASCADE;
-- reload pg_dist_shard_placement table
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;

View File

@ -94,6 +94,7 @@ SELECT master_apply_delete_command('DELETE FROM mx_table');
SELECT count(*) FROM mx_table;
-- master_add_node
SELECT master_add_node('localhost', 5432);
SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;

View File

@ -6,6 +6,7 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1360000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1360000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1360000;
-- test with not distributed table
CREATE TABLE upgrade_reference_table_local(column1 int);
@ -91,7 +92,8 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass);
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
@ -122,7 +124,8 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass);
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
@ -155,7 +158,8 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
@ -186,7 +190,8 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
@ -221,7 +226,8 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass);
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
@ -252,7 +258,8 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass);
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
@ -285,7 +292,8 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass);
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
@ -316,7 +324,8 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass);
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
@ -350,7 +359,8 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
@ -383,7 +393,8 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
@ -417,7 +428,8 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
@ -450,7 +462,8 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
SELECT *
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid