pull/1159/merge
Önder Kalacı 2017-01-25 08:49:48 +00:00 committed by GitHub
commit 7a8d06b225
20 changed files with 691 additions and 170 deletions

View File

@ -9,7 +9,8 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \ 5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 6.1-18 \
6.1-19
# All citus--*.sql files in the source directory # All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -129,9 +130,12 @@ $(EXTENSION)--6.1-16.sql: $(EXTENSION)--6.1-15.sql $(EXTENSION)--6.1-15--6.1-16.
cat $^ > $@ cat $^ > $@
$(EXTENSION)--6.1-17.sql: $(EXTENSION)--6.1-16.sql $(EXTENSION)--6.1-16--6.1-17.sql $(EXTENSION)--6.1-17.sql: $(EXTENSION)--6.1-16.sql $(EXTENSION)--6.1-16--6.1-17.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--6.1-18.sql: $(EXTENSION)--6.1-17.sql $(EXTENSION)--6.1-17--6.1-18.sql
cat $^ > $@
$(EXTENSION)--6.1-19.sql: $(EXTENSION)--6.1-18.sql $(EXTENSION)--6.1-18--6.1-19.sql
cat $^ > $@
NO_PGXS = 1 NO_PGXS = 1
SHLIB_LINK = $(libpq) SHLIB_LINK = $(libpq)
include $(citus_top_builddir)/Makefile.global include $(citus_top_builddir)/Makefile.global

View File

@ -0,0 +1,22 @@
/* citus--6.1-17--6.1-18.sql */
SET search_path = 'pg_catalog';
DROP FUNCTION IF EXISTS master_add_node(text, integer);
CREATE FUNCTION master_add_node(nodename text,
nodeport integer,
activate_node boolean DEFAULT TRUE,
OUT nodeid integer,
OUT groupid integer,
OUT nodename text,
OUT nodeport integer,
OUT noderack text,
OUT hasmetadata boolean)
RETURNS record
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_add_node$$;
COMMENT ON FUNCTION master_add_node(nodename text, nodeport integer, activate_node boolean)
IS 'add node to the cluster';
RESET search_path;

View File

@ -0,0 +1,13 @@
/* citus--6.1-18--6.1-19.sql */
SET search_path = 'pg_catalog';
CREATE FUNCTION master_activate_node(nodename text,
nodeport integer)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_activate_node$$;
COMMENT ON FUNCTION master_activate_node(nodename text, nodeport integer)
IS 'add node to the cluster';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension # Citus extension
comment = 'Citus distributed database' comment = 'Citus distributed database'
default_version = '6.1-17' default_version = '6.1-19'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog

View File

@ -133,6 +133,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
char *colocateWithTableName = NULL; char *colocateWithTableName = NULL;
EnsureSchemaNode(); EnsureSchemaNode();
EnsureAllNodesActivated();
/* guard against a binary update without a function update */ /* guard against a binary update without a function update */
if (PG_NARGS() >= 4) if (PG_NARGS() >= 4)
@ -194,6 +195,8 @@ create_reference_table(PG_FUNCTION_ARGS)
{ {
Oid relationId = PG_GETARG_OID(0); Oid relationId = PG_GETARG_OID(0);
EnsureAllNodesActivated();
CreateReferenceTable(relationId); CreateReferenceTable(relationId);
PG_RETURN_VOID(); PG_RETURN_VOID();

View File

@ -33,6 +33,7 @@
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/reference_table_utils.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
@ -68,6 +69,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
Oid distributedTableId = ResolveRelationId(tableNameText); Oid distributedTableId = ResolveRelationId(tableNameText);
EnsureSchemaNode(); EnsureSchemaNode();
EnsureAllNodesActivated();
CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor); CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor);

View File

@ -34,6 +34,7 @@
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
@ -90,6 +91,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
EnsureTablePermissions(relationId, ACL_INSERT); EnsureTablePermissions(relationId, ACL_INSERT);
CheckDistributedTable(relationId); CheckDistributedTable(relationId);
EnsureAllNodesActivated();
/* /*
* We check whether the table is a foreign table or not. If it is, we set * We check whether the table is a foreign table or not. If it is, we set

View File

@ -35,6 +35,7 @@
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/pg_dist_node.h" #include "distributed/pg_dist_node.h"
#include "distributed/reference_table_utils.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
#include "foreign/foreign.h" #include "foreign/foreign.h"
@ -84,6 +85,7 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
List *createMetadataCommandList = NIL; List *createMetadataCommandList = NIL;
EnsureSchemaNode(); EnsureSchemaNode();
EnsureAllNodesActivated();
EnsureSuperUser(); EnsureSuperUser();
PreventTransactionChain(true, "start_metadata_sync_to_node"); PreventTransactionChain(true, "start_metadata_sync_to_node");

View File

@ -51,6 +51,7 @@ int GroupSize = 1;
/* local function forward declarations */ /* local function forward declarations */
static void ActivateNode(char *nodeName, int nodePort);
static void RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove); static void RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove);
static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId,
char *nodeRack, bool hasMetadata, bool *nodeAlreadyExists); char *nodeRack, bool hasMetadata, bool *nodeAlreadyExists);
@ -70,17 +71,20 @@ PG_FUNCTION_INFO_V1(master_remove_node);
PG_FUNCTION_INFO_V1(master_disable_node); PG_FUNCTION_INFO_V1(master_disable_node);
PG_FUNCTION_INFO_V1(master_initialize_node_metadata); PG_FUNCTION_INFO_V1(master_initialize_node_metadata);
PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column); PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
PG_FUNCTION_INFO_V1(master_activate_node);
/* /*
* master_add_node function adds a new node to the cluster and returns its data. It also * master_add_node function adds a new node to the cluster and returns its data. It also
* replicates all reference tables to the new node. * replicates all reference tables to the new node if replicateReferenceTables is true.
*/ */
Datum Datum
master_add_node(PG_FUNCTION_ARGS) master_add_node(PG_FUNCTION_ARGS)
{ {
text *nodeName = PG_GETARG_TEXT_P(0); text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1); int32 nodePort = PG_GETARG_INT32(1);
bool activateNode = PG_GETARG_BOOL(2);
char *nodeNameString = text_to_cstring(nodeName); char *nodeNameString = text_to_cstring(nodeName);
int32 groupId = 0; int32 groupId = 0;
char *nodeRack = WORKER_DEFAULT_RACK; char *nodeRack = WORKER_DEFAULT_RACK;
@ -96,9 +100,9 @@ master_add_node(PG_FUNCTION_ARGS)
* reference tables to all nodes however, it skips nodes which already has healthy * reference tables to all nodes however, it skips nodes which already has healthy
* placement of particular reference table. * placement of particular reference table.
*/ */
if (!nodeAlreadyExists) if (!nodeAlreadyExists && activateNode)
{ {
ReplicateAllReferenceTablesToAllNodes(); ActivateNode(nodeNameString, nodePort);
} }
PG_RETURN_CSTRING(returnData); PG_RETURN_CSTRING(returnData);
@ -262,6 +266,43 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
} }
/*
* For the time-being master_activate_node is a wrapper around
* ReplicateAllReferenceTablesToNode().
*/
Datum
master_activate_node(PG_FUNCTION_ARGS)
{
text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
WorkerNode *workerNode = FindWorkerNode(nodeNameString, nodePort);
if (workerNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
(errmsg("cannot replicate reference tables to a "
"non-existing node"))));
}
ActivateNode(nodeNameString, nodePort);
PG_RETURN_VOID();
}
/*
* ActivateNode activates the node with nodeName and nodePort. Currently, activation
* includes only replicating the reference tables.
*/
static void
ActivateNode(char *nodeName, int nodePort)
{
ReplicateAllReferenceTablesToNode(nodeName, nodePort);
}
/* /*
* FindWorkerNode searches over the worker nodes and returns the workerNode * FindWorkerNode searches over the worker nodes and returns the workerNode
* if it already exists. Else, the function returns NULL. * if it already exists. Else, the function returns NULL.

View File

@ -36,7 +36,11 @@
/* local function forward declarations */ /* local function forward declarations */
static void ReplicateSingleShardTableToAllWorkers(Oid relationId); static void ReplicateSingleShardTableToAllWorkers(Oid relationId);
static void ReplicateShardToAllWorkers(ShardInterval *shardInterval); static void ReplicateShardToAllWorkers(ShardInterval *shardInterval);
static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName,
int nodePort);
static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId); static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId);
static void EnsureReferenceTablesReplicatedToAllNodes(void);
static WorkerNode * FindUnderReplicatedWorkerNode(List *finalizedShardPlacementList);
static int CompareOids(const void *leftElement, const void *rightElement); static int CompareOids(const void *leftElement, const void *rightElement);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
@ -114,22 +118,46 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
/* /*
* ReplicateAllReferenceTablesToAllNodes function finds all reference tables and * ReplicateAllReferenceTablesToAllNodes replicates all reference tables to all worker
* replicates them to all worker nodes. It also modifies pg_dist_colocation table to * nodes. It also modifies pg_dist_colocation table to update the replication factor
* update the replication factor column. This function skips a worker node if that node * column. This function skips a worker node if that node already has healthy placement
* already has healthy placement of a particular reference table to prevent unnecessary * of a particular reference table to prevent unnecessary data transfer.
* data transfer.
*/ */
void void
ReplicateAllReferenceTablesToAllNodes() ReplicateAllReferenceTablesToAllNodes()
{ {
List *referenceTableList = ReferenceTableOidList();
ListCell *referenceTableCell = NULL;
Relation pgDistNode = NULL; Relation pgDistNode = NULL;
List *workerNodeList = NIL; List *workerNodeList = NIL;
int workerCount = 0; ListCell *workerNodeCell = NULL;
/* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
workerNodeList = WorkerNodeList();
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
ReplicateAllReferenceTablesToNode(workerNode->workerName, workerNode->workerPort);
}
heap_close(pgDistNode, NoLock);
}
/*
* ReplicateAllReferenceTablesToNode function finds all reference tables and
* replicates them the given worker node. It also modifies pg_dist_colocation table to
* update the replication factor column when necessary. This function skips a worker node
* if that node already has healthy placement of a particular reference table to prevent
* unnecessary data transfer.
*/
void
ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
{
List *referenceTableList = ReferenceTableOidList();
ListCell *referenceTableCell = NULL;
List *workerNodeList = WorkerNodeList();
uint32 workerCount = 0;
Oid firstReferenceTableId = InvalidOid; Oid firstReferenceTableId = InvalidOid;
uint32 referenceTableColocationId = INVALID_COLOCATION_ID; uint32 referenceTableColocationId = INVALID_COLOCATION_ID;
@ -139,12 +167,6 @@ ReplicateAllReferenceTablesToAllNodes()
return; return;
} }
/* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
workerNodeList = WorkerNodeList();
workerCount = list_length(workerNodeList);
/* /*
* We sort the reference table list to prevent deadlocks in concurrent * We sort the reference table list to prevent deadlocks in concurrent
* ReplicateAllReferenceTablesToAllNodes calls. * ReplicateAllReferenceTablesToAllNodes calls.
@ -156,14 +178,10 @@ ReplicateAllReferenceTablesToAllNodes()
List *shardIntervalList = LoadShardIntervalList(referenceTableId); List *shardIntervalList = LoadShardIntervalList(referenceTableId);
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
char *relationName = get_rel_name(referenceTableId);
LockShardDistributionMetadata(shardId, ExclusiveLock); LockShardDistributionMetadata(shardId, ExclusiveLock);
ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to all workers", ReplicateShardToNode(shardInterval, nodeName, nodePort);
relationName)));
ReplicateShardToAllWorkers(shardInterval);
} }
/* /*
@ -171,10 +189,10 @@ ReplicateAllReferenceTablesToAllNodes()
* colocation group of reference tables so that worker count will be equal to * colocation group of reference tables so that worker count will be equal to
* replication factor again. * replication factor again.
*/ */
workerCount = list_length(workerNodeList);
firstReferenceTableId = linitial_oid(referenceTableList); firstReferenceTableId = linitial_oid(referenceTableList);
referenceTableColocationId = TableColocationId(firstReferenceTableId); referenceTableColocationId = TableColocationId(firstReferenceTableId);
UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount); UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount);
heap_close(pgDistNode, NoLock);
} }
@ -228,25 +246,15 @@ ReplicateSingleShardTableToAllWorkers(Oid relationId)
/* /*
* ReplicateShardToAllWorkers function replicates given shard to the given worker nodes * ReplicateShardToAllWorkers function replicates given shard to the all worker nodes
* in a separate transactions. While replicating, it only replicates the shard to the * in separate transactions. While replicating, it only replicates the shard to the
* workers which does not have a healthy replica of the shard. This function also modifies * workers which does not have a healthy replica of the shard. However, this function
* metadata by inserting/updating related rows in pg_dist_shard_placement. However, this * does not obtain any lock on shard resource and shard metadata. It is caller's
* function does not obtain any lock on shard resource and shard metadata. It is caller's
* responsibility to take those locks. * responsibility to take those locks.
*/ */
static void static void
ReplicateShardToAllWorkers(ShardInterval *shardInterval) ReplicateShardToAllWorkers(ShardInterval *shardInterval)
{ {
uint64 shardId = shardInterval->shardId;
List *shardPlacementList = ShardPlacementList(shardId);
bool missingOk = false;
ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk);
char *srcNodeName = sourceShardPlacement->nodeName;
uint32 srcNodePort = sourceShardPlacement->nodePort;
char *tableOwner = TableOwner(shardInterval->relationId);
List *ddlCommandList = CopyShardCommandList(shardInterval, srcNodeName, srcNodePort);
/* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */ /* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
List *workerNodeList = WorkerNodeList(); List *workerNodeList = WorkerNodeList();
@ -263,59 +271,84 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval)
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
char *nodeName = workerNode->workerName; char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort; uint32 nodePort = workerNode->workerPort;
bool missingWorkerOk = true;
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, ReplicateShardToNode(shardInterval, nodeName, nodePort);
nodeName, nodePort,
missingWorkerOk);
/*
* Although this function is used for reference tables and reference table shard
* placements always have shardState = FILE_FINALIZED, in case of an upgrade of
* a non-reference table to reference table, unhealty placements may exist. In
* this case, we repair the shard placement and update its state in
* pg_dist_shard_placement table.
*/
if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED)
{
uint64 placementId = 0;
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner,
ddlCommandList);
if (targetPlacement == NULL)
{
placementId = GetNextPlacementId();
InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, 0,
nodeName, nodePort);
}
else
{
placementId = targetPlacement->placementId;
UpdateShardPlacementState(placementId, FILE_FINALIZED);
}
/*
* Although ReplicateShardToAllWorkers is used only for reference tables,
* during the upgrade phase, the placements are created before the table is
* marked as a reference table. All metadata (including the placement
* metadata) will be copied to workers after all reference table changed
* are finished.
*/
if (ShouldSyncTableMetadata(shardInterval->relationId))
{
char *placementCommand = PlacementUpsertCommand(shardId, placementId,
FILE_FINALIZED, 0,
nodeName, nodePort);
SendCommandToWorkers(WORKERS_WITH_METADATA, placementCommand);
}
}
} }
heap_close(pgDistNode, NoLock); heap_close(pgDistNode, NoLock);
} }
/*
* ReplicateShardToNode function replicates given shard to the given worker node
* in a separate transaction. While replicating, it only replicates the shard to the
* workers which does not have a healthy replica of the shard. This function also modifies
* metadata by inserting/updating related rows in pg_dist_shard_placement.
*/
static void
ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
{
uint64 shardId = shardInterval->shardId;
List *shardPlacementList = ShardPlacementList(shardId);
bool missingOk = false;
ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk);
char *srcNodeName = sourceShardPlacement->nodeName;
uint32 srcNodePort = sourceShardPlacement->nodePort;
char *tableOwner = TableOwner(shardInterval->relationId);
List *ddlCommandList = CopyShardCommandList(shardInterval, srcNodeName, srcNodePort);
bool missingWorkerOk = true;
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
nodeName, nodePort,
missingWorkerOk);
/*
* Although this function is used for reference tables and reference table shard
* placements always have shardState = FILE_FINALIZED, in case of an upgrade of
* a non-reference table to reference table, unhealty placements may exist. In
* this case, we repair the shard placement and update its state in
* pg_dist_shard_placement table.
*/
if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED)
{
uint64 placementId = 0;
ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to the node %s:%d",
get_rel_name(shardInterval->relationId), nodeName,
nodePort)));
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner,
ddlCommandList);
if (targetPlacement == NULL)
{
placementId = GetNextPlacementId();
InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, 0,
nodeName, nodePort);
}
else
{
placementId = targetPlacement->placementId;
UpdateShardPlacementState(placementId, FILE_FINALIZED);
}
/*
* Although ReplicateShardToAllWorkers is used only for reference tables,
* during the upgrade phase, the placements are created before the table is
* marked as a reference table. All metadata (including the placement
* metadata) will be copied to workers after all reference table changed
* are finished.
*/
if (ShouldSyncTableMetadata(shardInterval->relationId))
{
char *placementCommand = PlacementUpsertCommand(shardId, placementId,
FILE_FINALIZED, 0,
nodeName, nodePort);
SendCommandToWorkers(WORKERS_WITH_METADATA, placementCommand);
}
}
}
/* /*
* ConvertToReferenceTableMetadata accepts a broadcast table and modifies its metadata to * ConvertToReferenceTableMetadata accepts a broadcast table and modifies its metadata to
* reference table metadata. To do this, this function updates pg_dist_partition, * reference table metadata. To do this, this function updates pg_dist_partition,
@ -417,6 +450,91 @@ DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort)
} }
/*
* Errors out if all nodes in the cluster is not activated. Currently only checks
* whether all reference tables are replicate to all nodes. If not, the function
* errors out.
*/
void
EnsureAllNodesActivated()
{
EnsureReferenceTablesReplicatedToAllNodes();
}
/*
* EnsureReferenceTablesReplicatedToAllNodes errors out if there exists any
* under-replicated reference tables in the cluster.
*/
static void
EnsureReferenceTablesReplicatedToAllNodes()
{
List *referenceTableOidList = ReferenceTableOidList();
ListCell *referenceTableOidCell = NULL;
List *workerNodeList = WorkerNodeList();
int workerCount = list_length(workerNodeList);
foreach(referenceTableOidCell, referenceTableOidList)
{
Oid tableId = lfirst_oid(referenceTableOidCell);
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId);
ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[0];
uint64 shardId = shardInterval->shardId;
List *finalizedShardPlacementList = FinalizedShardPlacementList(shardId);
if (list_length(finalizedShardPlacementList) != workerCount)
{
WorkerNode *missingWorkerNode =
FindUnderReplicatedWorkerNode(finalizedShardPlacementList);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("operation is not allowed because %s:%d is not "
"activated", missingWorkerNode->workerName,
missingWorkerNode->workerPort),
errdetail("At least one of the reference tables is "
"under-replicated: \"%s\"", get_rel_name(tableId)),
errhint("Use SELECT master_activate_node('%s', "
"%d) to replicate under-replicated reference tables "
"to the given node.",
missingWorkerNode->workerName,
missingWorkerNode->workerPort)));
}
}
}
/*
* FindUnderReplicatedWorkerNode iterates over worker list and tries to find
* a worker node where finalizedShardPlacementList does not have a placement
* on it. If found the workerNode is returned. Else, the function returns NULL.
*
* This function does an O(n^2) search and thus should be used with caution.
*/
static WorkerNode *
FindUnderReplicatedWorkerNode(List *finalizedShardPlacementList)
{
List *workerNodeList = WorkerNodeList();
ListCell *workerNodeCell = NULL;
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
ShardPlacement *placement = NULL;
bool missinOk = true;
placement = SearchShardPlacementInList(finalizedShardPlacementList,
workerNode->workerName,
workerNode->workerPort, missinOk);
if (placement == NULL)
{
return workerNode;
}
}
return NULL;
}
/* /*
* ReferenceTableOidList function scans pg_dist_partition to create a list of all * ReferenceTableOidList function scans pg_dist_partition to create a list of all
* reference tables. To create the list, it performs sequential scan. Since it is not * reference tables. To create the list, it performs sequential scan. Since it is not

View File

@ -14,8 +14,10 @@
extern uint32 CreateReferenceTableColocationId(void); extern uint32 CreateReferenceTableColocationId(void);
extern void ReplicateAllReferenceTablesToAllNodes(void); extern void ReplicateAllReferenceTablesToAllNodes(void);
extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort);
extern void DeleteAllReferenceTablePlacementsFromNode(char *workerName, extern void DeleteAllReferenceTablePlacementsFromNode(char *workerName,
uint32 workerPort); uint32 workerPort);
extern void EnsureAllNodesActivated(void);
extern List * ReferenceTableOidList(void); extern List * ReferenceTableOidList(void);
#endif /* REFERENCE_TABLE_UTILS_H_ */ #endif /* REFERENCE_TABLE_UTILS_H_ */

View File

@ -75,6 +75,8 @@ ALTER EXTENSION citus UPDATE TO '6.1-14';
ALTER EXTENSION citus UPDATE TO '6.1-15'; ALTER EXTENSION citus UPDATE TO '6.1-15';
ALTER EXTENSION citus UPDATE TO '6.1-16'; ALTER EXTENSION citus UPDATE TO '6.1-16';
ALTER EXTENSION citus UPDATE TO '6.1-17'; ALTER EXTENSION citus UPDATE TO '6.1-17';
ALTER EXTENSION citus UPDATE TO '6.1-18';
ALTER EXTENSION citus UPDATE TO '6.1-19';
-- ensure no objects were created outside pg_catalog -- ensure no objects were created outside pg_catalog
SELECT COUNT(*) SELECT COUNT(*)
FROM pg_depend AS pgd, FROM pg_depend AS pgd,

View File

@ -1297,7 +1297,8 @@ SELECT create_reference_table('mx_ref');
SELECT shardid, nodename, nodeport SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass; WHERE logicalrelid='mx_ref'::regclass
ORDER BY shardid, nodename, nodeport;
shardid | nodename | nodeport shardid | nodename | nodeport
---------+-----------+---------- ---------+-----------+----------
1310184 | localhost | 57637 1310184 | localhost | 57637
@ -1306,7 +1307,8 @@ WHERE logicalrelid='mx_ref'::regclass;
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT shardid, nodename, nodeport SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass; WHERE logicalrelid='mx_ref'::regclass
ORDER BY shardid, nodename, nodeport;
shardid | nodename | nodeport shardid | nodename | nodeport
---------+-----------+---------- ---------+-----------+----------
1310184 | localhost | 57637 1310184 | localhost | 57637
@ -1314,7 +1316,7 @@ WHERE logicalrelid='mx_ref'::regclass;
\c - - - :master_port \c - - - :master_port
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "mx_ref" to all workers NOTICE: Replicating reference table "mx_ref" to the node localhost:57638
master_add_node master_add_node
--------------------------------- ---------------------------------
(5,5,localhost,57638,default,f) (5,5,localhost,57638,default,f)
@ -1322,7 +1324,8 @@ NOTICE: Replicating reference table "mx_ref" to all workers
SELECT shardid, nodename, nodeport SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass; WHERE logicalrelid='mx_ref'::regclass
ORDER BY shardid, nodename, nodeport;
shardid | nodename | nodeport shardid | nodename | nodeport
---------+-----------+---------- ---------+-----------+----------
1310184 | localhost | 57637 1310184 | localhost | 57637
@ -1332,7 +1335,8 @@ WHERE logicalrelid='mx_ref'::regclass;
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT shardid, nodename, nodeport SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass; WHERE logicalrelid='mx_ref'::regclass
ORDER BY shardid, nodename, nodeport;
shardid | nodename | nodeport shardid | nodename | nodeport
---------+-----------+---------- ---------+-----------+----------
1310184 | localhost | 57637 1310184 | localhost | 57637

View File

@ -68,7 +68,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638 1380000 | 1 | 0 | localhost | 57638
@ -98,7 +100,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638 1380000 | 1 | 0 | localhost | 57638
@ -124,7 +128,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+---------- ---------+------------+-------------+----------+----------
(0 rows) (0 rows)
@ -152,7 +158,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+---------- ---------+------------+-------------+----------+----------
(0 rows) (0 rows)
@ -164,7 +172,7 @@ SELECT master_remove_node('localhost', :worker_2_port);
ERROR: could not find valid entry for node "localhost:57638" ERROR: could not find valid entry for node "localhost:57638"
-- re-add the node for next tests -- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
master_add_node master_add_node
--------------------------------------------- ---------------------------------------------
(1380001,1380001,localhost,57638,default,f) (1380001,1380001,localhost,57638,default,f)
@ -183,7 +191,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638 1380000 | 1 | 0 | localhost | 57638
@ -213,7 +223,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638 1380000 | 1 | 0 | localhost | 57638
@ -241,7 +253,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638 1380000 | 1 | 0 | localhost | 57638
@ -270,7 +284,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638 1380000 | 1 | 0 | localhost | 57638
@ -291,7 +307,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638 1380000 | 1 | 0 | localhost | 57638
@ -321,7 +339,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638 1380000 | 1 | 0 | localhost | 57638
@ -349,7 +369,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+---------- ---------+------------+-------------+----------+----------
(0 rows) (0 rows)
@ -378,7 +400,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+---------- ---------+------------+-------------+----------+----------
(0 rows) (0 rows)
@ -387,7 +411,7 @@ WHERE
\c - - - :master_port \c - - - :master_port
-- re-add the node for next tests -- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
master_add_node master_add_node
--------------------------------------------- ---------------------------------------------
(1380002,1380002,localhost,57638,default,f) (1380002,1380002,localhost,57638,default,f)
@ -406,7 +430,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638 1380000 | 1 | 0 | localhost | 57638
@ -435,7 +461,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638 1380000 | 1 | 0 | localhost | 57638
@ -465,7 +493,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+---------- ---------+------------+-------------+----------+----------
(0 rows) (0 rows)
@ -500,7 +530,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+---------- ---------+------------+-------------+----------+----------
(0 rows) (0 rows)
@ -516,7 +548,7 @@ SELECT * FROM remove_node_reference_table;
\c - - - :master_port \c - - - :master_port
-- re-add the node for next tests -- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
master_add_node master_add_node
--------------------------------------------- ---------------------------------------------
(1380003,1380003,localhost,57638,default,f) (1380003,1380003,localhost,57638,default,f)
@ -535,7 +567,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638 1380000 | 1 | 0 | localhost | 57638
@ -564,7 +598,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638 1380000 | 1 | 0 | localhost | 57638
@ -595,7 +631,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+---------- ---------+------------+-------------+----------+----------
(0 rows) (0 rows)
@ -624,7 +662,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+---------- ---------+------------+-------------+----------+----------
(0 rows) (0 rows)
@ -641,7 +681,7 @@ Table "public.remove_node_reference_table"
-- re-add the node for next tests -- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
master_add_node master_add_node
--------------------------------------------- ---------------------------------------------
(1380004,1380004,localhost,57638,default,f) (1380004,1380004,localhost,57638,default,f)
@ -746,7 +786,7 @@ FROM
WHERE WHERE
nodeport = :worker_2_port nodeport = :worker_2_port
ORDER BY ORDER BY
shardid; shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380001 | 1 | 0 | localhost | 57638 1380001 | 1 | 0 | localhost | 57638
@ -777,7 +817,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380001 | 1 | 0 | localhost | 57638 1380001 | 1 | 0 | localhost | 57638
@ -832,7 +874,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+---------- ---------+------------+-------------+----------+----------
(0 rows) (0 rows)
@ -842,8 +886,8 @@ WHERE
-- re-add the node for next tests -- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
NOTICE: Replicating reference table "table1" to all workers NOTICE: Replicating reference table "table1" to the node localhost:57638
master_add_node master_add_node
--------------------------------------------- ---------------------------------------------
(1380006,1380006,localhost,57638,default,f) (1380006,1380006,localhost,57638,default,f)
@ -864,7 +908,7 @@ FROM
WHERE WHERE
nodeport = :worker_2_port nodeport = :worker_2_port
ORDER BY ORDER BY
shardid; shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380001 | 1 | 0 | localhost | 57638 1380001 | 1 | 0 | localhost | 57638
@ -894,7 +938,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380001 | 1 | 0 | localhost | 57638 1380001 | 1 | 0 | localhost | 57638
@ -922,7 +968,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+---------- ---------+------------+-------------+----------+----------
(0 rows) (0 rows)
@ -950,7 +998,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+---------- ---------+------------+-------------+----------+----------
(0 rows) (0 rows)
@ -959,8 +1009,8 @@ WHERE
\c - - - :master_port \c - - - :master_port
-- re-add the node for next tests -- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
NOTICE: Replicating reference table "table1" to all workers NOTICE: Replicating reference table "table1" to the node localhost:57638
master_add_node master_add_node
--------------------------------------------- ---------------------------------------------
(1380007,1380007,localhost,57638,default,f) (1380007,1380007,localhost,57638,default,f)

View File

@ -71,7 +71,6 @@ SELECT create_reference_table('replicate_reference_table_unhealthy');
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000; UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000;
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_unhealthy" to all workers
ERROR: could not find any healthy placement for shard 1370000 ERROR: could not find any healthy placement for shard 1370000
-- verify node is not added -- verify node is not added
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
@ -123,7 +122,7 @@ WHERE colocationid IN
(1 row) (1 row)
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_valid" to all workers NOTICE: Replicating reference table "replicate_reference_table_valid" to the node localhost:57638
master_add_node master_add_node
--------------------------------------------- ---------------------------------------------
(1370002,1370002,localhost,57638,default,f) (1370002,1370002,localhost,57638,default,f)
@ -244,7 +243,7 @@ WHERE colocationid IN
BEGIN; BEGIN;
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_rollback" to all workers NOTICE: Replicating reference table "replicate_reference_table_rollback" to the node localhost:57638
master_add_node master_add_node
--------------------------------------------- ---------------------------------------------
(1370003,1370003,localhost,57638,default,f) (1370003,1370003,localhost,57638,default,f)
@ -306,7 +305,7 @@ WHERE colocationid IN
BEGIN; BEGIN;
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_commit" to all workers NOTICE: Replicating reference table "replicate_reference_table_commit" to the node localhost:57638
master_add_node master_add_node
--------------------------------------------- ---------------------------------------------
(1370004,1370004,localhost,57638,default,f) (1370004,1370004,localhost,57638,default,f)
@ -401,13 +400,14 @@ ORDER BY logicalrelid;
BEGIN; BEGIN;
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_reference_one" to all workers NOTICE: Replicating reference table "replicate_reference_table_reference_one" to the node localhost:57638
master_add_node master_add_node
--------------------------------------------- ---------------------------------------------
(1370005,1370005,localhost,57638,default,f) (1370005,1370005,localhost,57638,default,f)
(1 row) (1 row)
SELECT upgrade_to_reference_table('replicate_reference_table_hash'); SELECT upgrade_to_reference_table('replicate_reference_table_hash');
NOTICE: Replicating reference table "replicate_reference_table_hash" to the node localhost:57638
upgrade_to_reference_table upgrade_to_reference_table
---------------------------- ----------------------------
@ -482,7 +482,7 @@ SELECT create_reference_table('replicate_reference_table_insert');
BEGIN; BEGIN;
INSERT INTO replicate_reference_table_insert VALUES(1); INSERT INTO replicate_reference_table_insert VALUES(1);
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_insert" to all workers NOTICE: Replicating reference table "replicate_reference_table_insert" to the node localhost:57638
ERROR: cannot open new connections after the first modification command within a transaction ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK; ROLLBACK;
DROP TABLE replicate_reference_table_insert; DROP TABLE replicate_reference_table_insert;
@ -497,7 +497,7 @@ SELECT create_reference_table('replicate_reference_table_copy');
BEGIN; BEGIN;
COPY replicate_reference_table_copy FROM STDIN; COPY replicate_reference_table_copy FROM STDIN;
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_copy" to all workers NOTICE: Replicating reference table "replicate_reference_table_copy" to the node localhost:57638
ERROR: cannot open new connections after the first modification command within a transaction ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK; ROLLBACK;
DROP TABLE replicate_reference_table_copy; DROP TABLE replicate_reference_table_copy;
@ -514,7 +514,7 @@ ALTER TABLE replicate_reference_table_ddl ADD column2 int;
NOTICE: using one-phase commit for distributed DDL commands NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_ddl" to all workers NOTICE: Replicating reference table "replicate_reference_table_ddl" to the node localhost:57638
ERROR: cannot open new connections after the first modification command within a transaction ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK; ROLLBACK;
DROP TABLE replicate_reference_table_ddl; DROP TABLE replicate_reference_table_ddl;
@ -550,7 +550,7 @@ WHERE colocationid IN
BEGIN; BEGIN;
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_drop" to all workers NOTICE: Replicating reference table "replicate_reference_table_drop" to the node localhost:57638
master_add_node master_add_node
--------------------------------------------- ---------------------------------------------
(1370009,1370009,localhost,57638,default,f) (1370009,1370009,localhost,57638,default,f)
@ -612,7 +612,7 @@ WHERE colocationid IN
(1 row) (1 row)
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "table1" to all workers NOTICE: Replicating reference table "table1" to the node localhost:57638
master_add_node master_add_node
--------------------------------------------- ---------------------------------------------
(1370010,1370010,localhost,57638,default,f) (1370010,1370010,localhost,57638,default,f)
@ -643,6 +643,130 @@ WHERE colocationid IN
DROP TABLE replicate_reference_table_schema.table1; DROP TABLE replicate_reference_table_schema.table1;
DROP SCHEMA replicate_reference_table_schema CASCADE; DROP SCHEMA replicate_reference_table_schema CASCADE;
-- do some tests with manually disabling reference table replication
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
CREATE TABLE initially_not_replicated_reference_table (key int);
SELECT create_reference_table('initially_not_replicated_reference_table');
create_reference_table
------------------------
(1 row)
CREATE TABLE initially_not_replicated_reference_table_second (key int);
SELECT create_reference_table('initially_not_replicated_reference_table_second');
create_reference_table
------------------------
(1 row)
CREATE TABLE append_test_table(key int);
SELECT create_distributed_table('append_test_table', 'key', 'append');
create_distributed_table
--------------------------
(1 row)
SELECT master_add_node('localhost', :worker_2_port, activate_node := false);
master_add_node
---------------------------------------------
(1370011,1370011,localhost,57638,default,f)
(1 row)
-- we should see only two shard placements
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT
shardid
FROM
pg_dist_shard
WHERE
logicalrelid IN
('initially_not_replicated_reference_table', 'initially_not_replicated_reference_table_second'))
ORDER BY 1,4,5;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370012 | 1 | 0 | localhost | 57637
1370013 | 1 | 0 | localhost | 57637
(2 rows)
-- now, see that certain operations are disallowed
CREATE TABLE disallow_test_table (key int);
SELECT create_reference_table('disallow_test_table');
ERROR: operation is not allowed because localhost:57638 is not activated
DETAIL: At least one of the reference tables is under-replicated: "initially_not_replicated_reference_table"
HINT: Use SELECT master_activate_node('localhost', 57638) to replicate under-replicated reference tables to the given node.
SELECT create_distributed_table('disallow_test_table', 'key');
ERROR: operation is not allowed because localhost:57638 is not activated
DETAIL: At least one of the reference tables is under-replicated: "initially_not_replicated_reference_table"
HINT: Use SELECT master_activate_node('localhost', 57638) to replicate under-replicated reference tables to the given node.
SELECT master_create_empty_shard('append_test_table');
ERROR: operation is not allowed because localhost:57638 is not activated
DETAIL: At least one of the reference tables is under-replicated: "initially_not_replicated_reference_table"
HINT: Use SELECT master_activate_node('localhost', 57638) to replicate under-replicated reference tables to the given node.
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
ERROR: operation is not allowed because localhost:57638 is not activated
DETAIL: At least one of the reference tables is under-replicated: "initially_not_replicated_reference_table"
HINT: Use SELECT master_activate_node('localhost', 57638) to replicate under-replicated reference tables to the given node.
COPY append_test_table FROM STDIN;
ERROR: operation is not allowed because localhost:57638 is not activated
DETAIL: At least one of the reference tables is under-replicated: "initially_not_replicated_reference_table"
HINT: Use SELECT master_activate_node('localhost', 57638) to replicate under-replicated reference tables to the given node.
SELECT master_activate_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "initially_not_replicated_reference_table" to the node localhost:57638
NOTICE: Replicating reference table "initially_not_replicated_reference_table_second" to the node localhost:57638
master_activate_node
----------------------
(1 row)
-- we should see the four shard placements
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT
shardid
FROM
pg_dist_shard
WHERE
logicalrelid IN
('initially_not_replicated_reference_table', 'initially_not_replicated_reference_table_second'))
ORDER BY 1,4,5;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370012 | 1 | 0 | localhost | 57637
1370012 | 1 | 0 | localhost | 57638
1370013 | 1 | 0 | localhost | 57637
1370013 | 1 | 0 | localhost | 57638
(4 rows)
-- this should have no effect
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
---------------------------------------------
(1370011,1370011,localhost,57638,default,f)
(1 row)
-- now, should be able to create tables
CREATE TABLE initially_not_replicated_reference_table_third (key int);
SELECT create_reference_table('initially_not_replicated_reference_table_third');
create_reference_table
------------------------
(1 row)
-- drop unnecassary tables
DROP TABLE initially_not_replicated_reference_table_third, initially_not_replicated_reference_table_second,
initially_not_replicated_reference_table, append_test_table;
-- reload pg_dist_shard_placement table -- reload pg_dist_shard_placement table
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement; DROP TABLE tmp_shard_placement;

View File

@ -95,6 +95,7 @@ SELECT create_distributed_table('upgrade_reference_table_composite', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass; UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass;
SELECT upgrade_to_reference_table('upgrade_reference_table_composite'); SELECT upgrade_to_reference_table('upgrade_reference_table_composite');
NOTICE: Replicating reference table "upgrade_reference_table_composite" to the node localhost:57638
ERROR: type "public.upgrade_test_composite_type" does not exist ERROR: type "public.upgrade_test_composite_type" does not exist
CONTEXT: while executing command on localhost:57638 CONTEXT: while executing command on localhost:57638
DROP TABLE upgrade_reference_table_composite; DROP TABLE upgrade_reference_table_composite;
@ -165,6 +166,7 @@ WHERE shardid IN
(1 row) (1 row)
SELECT upgrade_to_reference_table('upgrade_reference_table_append'); SELECT upgrade_to_reference_table('upgrade_reference_table_append');
NOTICE: Replicating reference table "upgrade_reference_table_append" to the node localhost:57638
upgrade_to_reference_table upgrade_to_reference_table
---------------------------- ----------------------------
@ -277,6 +279,7 @@ WHERE shardid IN
(1 row) (1 row)
SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker'); SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker');
NOTICE: Replicating reference table "upgrade_reference_table_one_worker" to the node localhost:57638
upgrade_to_reference_table upgrade_to_reference_table
---------------------------- ----------------------------
@ -621,6 +624,7 @@ WHERE shardid IN
BEGIN; BEGIN;
SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_rollback'); SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_rollback');
NOTICE: Replicating reference table "upgrade_reference_table_transaction_rollback" to the node localhost:57638
upgrade_to_reference_table upgrade_to_reference_table
---------------------------- ----------------------------
@ -733,6 +737,7 @@ WHERE shardid IN
BEGIN; BEGIN;
SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_commit'); SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_commit');
NOTICE: Replicating reference table "upgrade_reference_table_transaction_commit" to the node localhost:57638
upgrade_to_reference_table upgrade_to_reference_table
---------------------------- ----------------------------
@ -980,6 +985,7 @@ ORDER BY nodeport;
SELECT upgrade_to_reference_table('upgrade_reference_table_mx'); SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
NOTICE: Replicating reference table "upgrade_reference_table_mx" to the node localhost:57638
upgrade_to_reference_table upgrade_to_reference_table
---------------------------- ----------------------------

View File

@ -75,6 +75,8 @@ ALTER EXTENSION citus UPDATE TO '6.1-14';
ALTER EXTENSION citus UPDATE TO '6.1-15'; ALTER EXTENSION citus UPDATE TO '6.1-15';
ALTER EXTENSION citus UPDATE TO '6.1-16'; ALTER EXTENSION citus UPDATE TO '6.1-16';
ALTER EXTENSION citus UPDATE TO '6.1-17'; ALTER EXTENSION citus UPDATE TO '6.1-17';
ALTER EXTENSION citus UPDATE TO '6.1-18';
ALTER EXTENSION citus UPDATE TO '6.1-19';
-- ensure no objects were created outside pg_catalog -- ensure no objects were created outside pg_catalog
SELECT COUNT(*) SELECT COUNT(*)

View File

@ -569,24 +569,28 @@ SELECT create_reference_table('mx_ref');
SELECT shardid, nodename, nodeport SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass; WHERE logicalrelid='mx_ref'::regclass
ORDER BY shardid, nodename, nodeport;
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT shardid, nodename, nodeport SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass; WHERE logicalrelid='mx_ref'::regclass
ORDER BY shardid, nodename, nodeport;
\c - - - :master_port \c - - - :master_port
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
SELECT shardid, nodename, nodeport SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass; WHERE logicalrelid='mx_ref'::regclass
ORDER BY shardid, nodename, nodeport;
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT shardid, nodename, nodeport SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass; WHERE logicalrelid='mx_ref'::regclass
ORDER BY shardid, nodename, nodeport;
\c - - - :master_port \c - - - :master_port
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);

View File

@ -46,7 +46,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
SELECT * SELECT *
FROM pg_dist_colocation FROM pg_dist_colocation
@ -64,7 +66,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
\c - - - :master_port \c - - - :master_port
@ -78,7 +82,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
SELECT * SELECT *
FROM pg_dist_colocation FROM pg_dist_colocation
@ -96,7 +102,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
\c - - - :master_port \c - - - :master_port
@ -116,7 +124,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
SELECT * SELECT *
FROM pg_dist_colocation FROM pg_dist_colocation
@ -134,7 +144,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
\c - - - :master_port \c - - - :master_port
@ -150,7 +162,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
SELECT * SELECT *
FROM pg_dist_colocation FROM pg_dist_colocation
@ -168,7 +182,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
\c - - - :master_port \c - - - :master_port
@ -182,7 +198,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
SELECT * SELECT *
FROM pg_dist_colocation FROM pg_dist_colocation
@ -200,7 +218,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
\c - - - :master_port \c - - - :master_port
@ -216,7 +236,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
SELECT * SELECT *
FROM pg_dist_colocation FROM pg_dist_colocation
@ -234,7 +256,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
\c - - - :master_port \c - - - :master_port
@ -251,7 +275,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
SELECT * SELECT *
FROM pg_dist_colocation FROM pg_dist_colocation
@ -269,7 +295,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
\c - - - :master_port \c - - - :master_port
@ -286,7 +314,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
SELECT * SELECT *
FROM pg_dist_colocation FROM pg_dist_colocation
@ -307,7 +337,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
SELECT * FROM remove_node_reference_table; SELECT * FROM remove_node_reference_table;
@ -327,7 +359,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
SELECT * SELECT *
FROM pg_dist_colocation FROM pg_dist_colocation
@ -345,7 +379,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
\c - - - :master_port \c - - - :master_port
@ -362,7 +398,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
SELECT * SELECT *
FROM pg_dist_colocation FROM pg_dist_colocation
@ -380,7 +418,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
\c - - - :master_port \c - - - :master_port
@ -449,7 +489,7 @@ FROM
WHERE WHERE
nodeport = :worker_2_port nodeport = :worker_2_port
ORDER BY ORDER BY
shardid; shardid, nodename, nodeport;
SELECT * SELECT *
FROM pg_dist_colocation FROM pg_dist_colocation
@ -467,7 +507,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
\c - - - :master_port \c - - - :master_port
@ -499,7 +541,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
\c - - - :master_port \c - - - :master_port
@ -519,7 +563,7 @@ FROM
WHERE WHERE
nodeport = :worker_2_port nodeport = :worker_2_port
ORDER BY ORDER BY
shardid; shardid, nodename, nodeport;
SELECT * SELECT *
FROM pg_dist_colocation FROM pg_dist_colocation
@ -537,7 +581,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
\c - - - :master_port \c - - - :master_port
@ -551,7 +597,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
SELECT * SELECT *
FROM pg_dist_colocation FROM pg_dist_colocation
@ -569,7 +617,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid, nodename, nodeport;
\c - - - :master_port \c - - - :master_port

View File

@ -417,6 +417,76 @@ WHERE colocationid IN
DROP TABLE replicate_reference_table_schema.table1; DROP TABLE replicate_reference_table_schema.table1;
DROP SCHEMA replicate_reference_table_schema CASCADE; DROP SCHEMA replicate_reference_table_schema CASCADE;
-- do some tests with manually disabling reference table replication
SELECT master_remove_node('localhost', :worker_2_port);
CREATE TABLE initially_not_replicated_reference_table (key int);
SELECT create_reference_table('initially_not_replicated_reference_table');
CREATE TABLE initially_not_replicated_reference_table_second (key int);
SELECT create_reference_table('initially_not_replicated_reference_table_second');
CREATE TABLE append_test_table(key int);
SELECT create_distributed_table('append_test_table', 'key', 'append');
SELECT master_add_node('localhost', :worker_2_port, activate_node := false);
-- we should see only two shard placements
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT
shardid
FROM
pg_dist_shard
WHERE
logicalrelid IN
('initially_not_replicated_reference_table', 'initially_not_replicated_reference_table_second'))
ORDER BY 1,4,5;
-- now, see that certain operations are disallowed
CREATE TABLE disallow_test_table (key int);
SELECT create_reference_table('disallow_test_table');
SELECT create_distributed_table('disallow_test_table', 'key');
SELECT master_create_empty_shard('append_test_table');
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
COPY append_test_table FROM STDIN;
1
2
3
\.
SELECT master_activate_node('localhost', :worker_2_port);
-- we should see the four shard placements
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT
shardid
FROM
pg_dist_shard
WHERE
logicalrelid IN
('initially_not_replicated_reference_table', 'initially_not_replicated_reference_table_second'))
ORDER BY 1,4,5;
-- this should have no effect
SELECT master_add_node('localhost', :worker_2_port);
-- now, should be able to create tables
CREATE TABLE initially_not_replicated_reference_table_third (key int);
SELECT create_reference_table('initially_not_replicated_reference_table_third');
-- drop unnecassary tables
DROP TABLE initially_not_replicated_reference_table_third, initially_not_replicated_reference_table_second,
initially_not_replicated_reference_table, append_test_table;
-- reload pg_dist_shard_placement table -- reload pg_dist_shard_placement table
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);