mirror of https://github.com/citusdata/citus.git
Add shouldhaveshards to pg_dist_node (#2960)
This is an improvement over #2512. This adds the boolean shouldhaveshards column to pg_dist_node. When it's false, create_distributed_table for new collocation groups will not create shards on that node. Reference tables will still be created on nodes where it is false.pull/3114/head
parent
5001c44990
commit
78e495e030
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '9.0-1'
|
default_version = '9.1-1'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -177,7 +177,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
||||||
LockRelationOid(DistNodeRelationId(), RowShareLock);
|
LockRelationOid(DistNodeRelationId(), RowShareLock);
|
||||||
|
|
||||||
/* load and sort the worker node list for deterministic placement */
|
/* load and sort the worker node list for deterministic placement */
|
||||||
workerNodeList = ActivePrimaryNodeList(NoLock);
|
workerNodeList = ActivePrimaryShouldHaveShardsNodeList(NoLock);
|
||||||
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
|
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -17,3 +17,4 @@ NOT_SUPPORTED_IN_COMMUNITY(rebalance_table_shards);
|
||||||
NOT_SUPPORTED_IN_COMMUNITY(replicate_table_shards);
|
NOT_SUPPORTED_IN_COMMUNITY(replicate_table_shards);
|
||||||
NOT_SUPPORTED_IN_COMMUNITY(get_rebalance_table_shards_plan);
|
NOT_SUPPORTED_IN_COMMUNITY(get_rebalance_table_shards_plan);
|
||||||
NOT_SUPPORTED_IN_COMMUNITY(get_rebalance_progress);
|
NOT_SUPPORTED_IN_COMMUNITY(get_rebalance_progress);
|
||||||
|
NOT_SUPPORTED_IN_COMMUNITY(master_drain_node);
|
||||||
|
|
|
@ -15,9 +15,10 @@
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
|
||||||
#include "commands/dbcommands.h"
|
#include "commands/dbcommands.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/hash_helpers.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_client_executor.h"
|
#include "distributed/multi_client_executor.h"
|
||||||
|
#include "distributed/worker_manager.h"
|
||||||
#include "libpq/hba.h"
|
#include "libpq/hba.h"
|
||||||
#include "common/ip.h"
|
#include "common/ip.h"
|
||||||
#include "libpq/libpq-be.h"
|
#include "libpq/libpq-be.h"
|
||||||
|
@ -319,19 +320,20 @@ ActiveReadableNodeCount(void)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ActivePrimaryNodeList returns a list of all the active primary nodes in workerNodeHash
|
* ActiveNodeListFilterFunc returns a list of all active nodes that checkFunction
|
||||||
|
* returns true for.
|
||||||
* lockMode specifies which lock to use on pg_dist_node, this is necessary when
|
* lockMode specifies which lock to use on pg_dist_node, this is necessary when
|
||||||
* the caller wouldn't want nodes to be added concurrent to their use of this list
|
* the caller wouldn't want nodes to be added concurrent to their use of this list
|
||||||
*/
|
*/
|
||||||
List *
|
static List *
|
||||||
ActivePrimaryNodeList(LOCKMODE lockMode)
|
FilterActiveNodeListFunc(LOCKMODE lockMode, bool (*checkFunction)(WorkerNode *))
|
||||||
{
|
{
|
||||||
List *workerNodeList = NIL;
|
List *workerNodeList = NIL;
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
HTAB *workerNodeHash = NULL;
|
HTAB *workerNodeHash = NULL;
|
||||||
HASH_SEQ_STATUS status;
|
HASH_SEQ_STATUS status;
|
||||||
|
|
||||||
EnsureModificationsCanRun();
|
Assert(checkFunction != NULL);
|
||||||
|
|
||||||
if (lockMode != NoLock)
|
if (lockMode != NoLock)
|
||||||
{
|
{
|
||||||
|
@ -343,7 +345,7 @@ ActivePrimaryNodeList(LOCKMODE lockMode)
|
||||||
|
|
||||||
while ((workerNode = hash_seq_search(&status)) != NULL)
|
while ((workerNode = hash_seq_search(&status)) != NULL)
|
||||||
{
|
{
|
||||||
if (workerNode->isActive && WorkerNodeIsPrimary(workerNode))
|
if (workerNode->isActive && checkFunction(workerNode))
|
||||||
{
|
{
|
||||||
WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode));
|
WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode));
|
||||||
memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode));
|
memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode));
|
||||||
|
@ -355,39 +357,38 @@ ActivePrimaryNodeList(LOCKMODE lockMode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ActivePrimaryNodeList returns a list of all the active primary nodes in workerNodeHash
|
||||||
|
* lockMode specifies which lock to use on pg_dist_node, this is necessary when
|
||||||
|
* the caller wouldn't want nodes to be added concurrent to their use of this list
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
ActivePrimaryNodeList(LOCKMODE lockMode)
|
||||||
|
{
|
||||||
|
EnsureModificationsCanRun();
|
||||||
|
return FilterActiveNodeListFunc(lockMode, WorkerNodeIsPrimary);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ActivePrimaryShouldHaveShardsNodeList returns a list of all active, primary
|
||||||
|
* worker nodes that can store new data, i.e shouldstoreshards is 'true'
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
ActivePrimaryShouldHaveShardsNodeList(LOCKMODE lockMode)
|
||||||
|
{
|
||||||
|
EnsureModificationsCanRun();
|
||||||
|
return FilterActiveNodeListFunc(lockMode, WorkerNodeIsPrimaryShouldHaveShardsNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ActiveReadableNodeList returns a list of all nodes in workerNodeHash we can read from.
|
* ActiveReadableNodeList returns a list of all nodes in workerNodeHash we can read from.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
ActiveReadableNodeList(void)
|
ActiveReadableNodeList(void)
|
||||||
{
|
{
|
||||||
List *workerNodeList = NIL;
|
return FilterActiveNodeListFunc(NoLock, WorkerNodeIsReadable);
|
||||||
WorkerNode *workerNode = NULL;
|
|
||||||
HTAB *workerNodeHash = GetWorkerNodeHash();
|
|
||||||
HASH_SEQ_STATUS status;
|
|
||||||
|
|
||||||
hash_seq_init(&status, workerNodeHash);
|
|
||||||
|
|
||||||
while ((workerNode = hash_seq_search(&status)) != NULL)
|
|
||||||
{
|
|
||||||
WorkerNode *workerNodeCopy;
|
|
||||||
|
|
||||||
if (!workerNode->isActive)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!WorkerNodeIsReadable(workerNode))
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
workerNodeCopy = palloc0(sizeof(WorkerNode));
|
|
||||||
memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode));
|
|
||||||
workerNodeList = lappend(workerNodeList, workerNodeCopy);
|
|
||||||
}
|
|
||||||
|
|
||||||
return workerNodeList;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -899,6 +899,24 @@ NodeStateUpdateCommand(uint32 nodeId, bool isActive)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ShouldHaveShardsUpdateCommand generates a command that can be executed to
|
||||||
|
* update the shouldhaveshards column of a node in pg_dist_node table.
|
||||||
|
*/
|
||||||
|
char *
|
||||||
|
ShouldHaveShardsUpdateCommand(uint32 nodeId, bool shouldHaveShards)
|
||||||
|
{
|
||||||
|
StringInfo nodeStateUpdateCommand = makeStringInfo();
|
||||||
|
char *shouldHaveShardsString = shouldHaveShards ? "TRUE" : "FALSE";
|
||||||
|
|
||||||
|
appendStringInfo(nodeStateUpdateCommand,
|
||||||
|
"UPDATE pg_catalog.pg_dist_node SET shouldhaveshards = %s "
|
||||||
|
"WHERE nodeid = %u", shouldHaveShardsString, nodeId);
|
||||||
|
|
||||||
|
return nodeStateUpdateCommand->data;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ColocationIdUpdateCommand creates the SQL command to change the colocationId
|
* ColocationIdUpdateCommand creates the SQL command to change the colocationId
|
||||||
* of the table with the given name to the given colocationId in pg_dist_partition
|
* of the table with the given name to the given colocationId in pg_dist_partition
|
||||||
|
|
|
@ -0,0 +1,6 @@
|
||||||
|
ALTER TABLE pg_catalog.pg_dist_node ADD shouldhaveshards bool NOT NULL DEFAULT true;
|
||||||
|
COMMENT ON COLUMN pg_catalog.pg_dist_node.shouldhaveshards IS
|
||||||
|
'indicates whether the node is eligible to contain data from distributed tables';
|
||||||
|
|
||||||
|
#include "udfs/master_set_node_property/9.1-1.sql"
|
||||||
|
#include "udfs/master_drain_node/9.1-1.sql"
|
|
@ -0,0 +1,14 @@
|
||||||
|
CREATE FUNCTION pg_catalog.master_drain_node(
|
||||||
|
nodename text,
|
||||||
|
nodeport integer,
|
||||||
|
threshold float4 default 0,
|
||||||
|
max_shard_moves int default 1000000,
|
||||||
|
excluded_shard_list bigint[] default '{}',
|
||||||
|
shard_transfer_mode citus.shard_transfer_mode default 'auto')
|
||||||
|
RETURNS VOID
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$master_drain_node$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.master_drain_node(text,int,float4,int,bigint[],citus.shard_transfer_mode)
|
||||||
|
IS 'mark a node to be drained of data and actually drain it as well';
|
||||||
|
|
||||||
|
REVOKE ALL ON FUNCTION pg_catalog.master_drain_node(text,int,float4,int,bigint[],citus.shard_transfer_mode) FROM PUBLIC;
|
|
@ -0,0 +1,14 @@
|
||||||
|
CREATE FUNCTION pg_catalog.master_drain_node(
|
||||||
|
nodename text,
|
||||||
|
nodeport integer,
|
||||||
|
threshold float4 default 0,
|
||||||
|
max_shard_moves int default 1000000,
|
||||||
|
excluded_shard_list bigint[] default '{}',
|
||||||
|
shard_transfer_mode citus.shard_transfer_mode default 'auto')
|
||||||
|
RETURNS VOID
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$master_drain_node$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.master_drain_node(text,int,float4,int,bigint[],citus.shard_transfer_mode)
|
||||||
|
IS 'mark a node to be drained of data and actually drain it as well';
|
||||||
|
|
||||||
|
REVOKE ALL ON FUNCTION pg_catalog.master_drain_node(text,int,float4,int,bigint[],citus.shard_transfer_mode) FROM PUBLIC;
|
|
@ -0,0 +1,21 @@
|
||||||
|
CREATE FUNCTION pg_catalog.master_set_node_property(
|
||||||
|
nodename text,
|
||||||
|
nodeport integer,
|
||||||
|
property text,
|
||||||
|
value boolean)
|
||||||
|
RETURNS VOID
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', 'master_set_node_property';
|
||||||
|
COMMENT ON FUNCTION pg_catalog.master_set_node_property(
|
||||||
|
nodename text,
|
||||||
|
nodeport integer,
|
||||||
|
property text,
|
||||||
|
value boolean)
|
||||||
|
IS 'set a property of a node in pg_dist_node';
|
||||||
|
|
||||||
|
REVOKE ALL ON FUNCTION pg_catalog.master_set_node_property(
|
||||||
|
nodename text,
|
||||||
|
nodeport integer,
|
||||||
|
property text,
|
||||||
|
value boolean)
|
||||||
|
FROM PUBLIC;
|
|
@ -0,0 +1,21 @@
|
||||||
|
CREATE FUNCTION pg_catalog.master_set_node_property(
|
||||||
|
nodename text,
|
||||||
|
nodeport integer,
|
||||||
|
property text,
|
||||||
|
value boolean)
|
||||||
|
RETURNS VOID
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', 'master_set_node_property';
|
||||||
|
COMMENT ON FUNCTION pg_catalog.master_set_node_property(
|
||||||
|
nodename text,
|
||||||
|
nodeport integer,
|
||||||
|
property text,
|
||||||
|
value boolean)
|
||||||
|
IS 'set a property of a node in pg_dist_node';
|
||||||
|
|
||||||
|
REVOKE ALL ON FUNCTION pg_catalog.master_set_node_property(
|
||||||
|
nodename text,
|
||||||
|
nodeport integer,
|
||||||
|
property text,
|
||||||
|
value boolean)
|
||||||
|
FROM PUBLIC;
|
|
@ -32,3 +32,18 @@ hash_delete_all(HTAB *htab)
|
||||||
Assert(found);
|
Assert(found);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* foreach_htab_cleanup cleans up the hash iteration state after the iteration
|
||||||
|
* is done. This is only needed when break statements are present in the
|
||||||
|
* foreach block.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
foreach_htab_cleanup(void *var, HASH_SEQ_STATUS *status)
|
||||||
|
{
|
||||||
|
if ((var) != NULL)
|
||||||
|
{
|
||||||
|
hash_seq_term(status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -2385,14 +2385,14 @@ CurrentUserName(void)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* LookupNodeRoleValueId returns the Oid of the "pg_catalog.noderole" type, or InvalidOid
|
* LookupTypeOid returns the Oid of the "pg_catalog.{typeNameString}" type, or
|
||||||
* if it does not exist.
|
* InvalidOid if it does not exist.
|
||||||
*/
|
*/
|
||||||
static Oid
|
static Oid
|
||||||
LookupNodeRoleTypeOid()
|
LookupTypeOid(char *typeNameString)
|
||||||
{
|
{
|
||||||
Value *schemaName = makeString("pg_catalog");
|
Value *schemaName = makeString("pg_catalog");
|
||||||
Value *typeName = makeString("noderole");
|
Value *typeName = makeString(typeNameString);
|
||||||
List *qualifiedName = list_make2(schemaName, typeName);
|
List *qualifiedName = list_make2(schemaName, typeName);
|
||||||
TypeName *enumTypeName = makeTypeNameFromNameList(qualifiedName);
|
TypeName *enumTypeName = makeTypeNameFromNameList(qualifiedName);
|
||||||
|
|
||||||
|
@ -2417,21 +2417,21 @@ LookupNodeRoleTypeOid()
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* LookupNodeRoleValueId returns the Oid of the value in "pg_catalog.noderole" which
|
* LookupStringEnumValueId returns the Oid of the value in "pg_catalog.{enumName}"
|
||||||
* matches the provided name, or InvalidOid if the noderole enum doesn't exist yet.
|
* which matches the provided valueName, or InvalidOid if the enum doesn't exist yet.
|
||||||
*/
|
*/
|
||||||
static Oid
|
static Oid
|
||||||
LookupNodeRoleValueId(char *valueName)
|
LookupStringEnumValueId(char *enumName, char *valueName)
|
||||||
{
|
{
|
||||||
Oid nodeRoleTypId = LookupNodeRoleTypeOid();
|
Oid enumTypeId = LookupTypeOid(enumName);
|
||||||
|
|
||||||
if (nodeRoleTypId == InvalidOid)
|
if (enumTypeId == InvalidOid)
|
||||||
{
|
{
|
||||||
return InvalidOid;
|
return InvalidOid;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
Oid valueId = LookupEnumValueId(nodeRoleTypId, valueName);
|
Oid valueId = LookupEnumValueId(enumTypeId, valueName);
|
||||||
return valueId;
|
return valueId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2458,7 +2458,7 @@ PrimaryNodeRoleId(void)
|
||||||
{
|
{
|
||||||
if (!MetadataCache.primaryNodeRoleId)
|
if (!MetadataCache.primaryNodeRoleId)
|
||||||
{
|
{
|
||||||
MetadataCache.primaryNodeRoleId = LookupNodeRoleValueId("primary");
|
MetadataCache.primaryNodeRoleId = LookupStringEnumValueId("noderole", "primary");
|
||||||
}
|
}
|
||||||
|
|
||||||
return MetadataCache.primaryNodeRoleId;
|
return MetadataCache.primaryNodeRoleId;
|
||||||
|
@ -2471,7 +2471,8 @@ SecondaryNodeRoleId(void)
|
||||||
{
|
{
|
||||||
if (!MetadataCache.secondaryNodeRoleId)
|
if (!MetadataCache.secondaryNodeRoleId)
|
||||||
{
|
{
|
||||||
MetadataCache.secondaryNodeRoleId = LookupNodeRoleValueId("secondary");
|
MetadataCache.secondaryNodeRoleId = LookupStringEnumValueId("noderole",
|
||||||
|
"secondary");
|
||||||
}
|
}
|
||||||
|
|
||||||
return MetadataCache.secondaryNodeRoleId;
|
return MetadataCache.secondaryNodeRoleId;
|
||||||
|
@ -2484,7 +2485,8 @@ UnavailableNodeRoleId(void)
|
||||||
{
|
{
|
||||||
if (!MetadataCache.unavailableNodeRoleId)
|
if (!MetadataCache.unavailableNodeRoleId)
|
||||||
{
|
{
|
||||||
MetadataCache.unavailableNodeRoleId = LookupNodeRoleValueId("unavailable");
|
MetadataCache.unavailableNodeRoleId = LookupStringEnumValueId("noderole",
|
||||||
|
"unavailable");
|
||||||
}
|
}
|
||||||
|
|
||||||
return MetadataCache.unavailableNodeRoleId;
|
return MetadataCache.unavailableNodeRoleId;
|
||||||
|
@ -3037,6 +3039,7 @@ InitializeWorkerNodeCache(void)
|
||||||
workerNode->metadataSynced = currentNode->metadataSynced;
|
workerNode->metadataSynced = currentNode->metadataSynced;
|
||||||
workerNode->isActive = currentNode->isActive;
|
workerNode->isActive = currentNode->isActive;
|
||||||
workerNode->nodeRole = currentNode->nodeRole;
|
workerNode->nodeRole = currentNode->nodeRole;
|
||||||
|
workerNode->shouldHaveShards = currentNode->shouldHaveShards;
|
||||||
strlcpy(workerNode->nodeCluster, currentNode->nodeCluster, NAMEDATALEN);
|
strlcpy(workerNode->nodeCluster, currentNode->nodeCluster, NAMEDATALEN);
|
||||||
|
|
||||||
newWorkerNodeArray[workerNodeIndex++] = workerNode;
|
newWorkerNodeArray[workerNodeIndex++] = workerNode;
|
||||||
|
|
|
@ -64,6 +64,7 @@ typedef struct NodeMetadata
|
||||||
bool metadataSynced;
|
bool metadataSynced;
|
||||||
bool isActive;
|
bool isActive;
|
||||||
Oid nodeRole;
|
Oid nodeRole;
|
||||||
|
bool shouldHaveShards;
|
||||||
char *nodeCluster;
|
char *nodeCluster;
|
||||||
} NodeMetadata;
|
} NodeMetadata;
|
||||||
|
|
||||||
|
@ -72,22 +73,26 @@ static int ActivateNode(char *nodeName, int nodePort);
|
||||||
static void RemoveNodeFromCluster(char *nodeName, int32 nodePort);
|
static void RemoveNodeFromCluster(char *nodeName, int32 nodePort);
|
||||||
static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata
|
static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata
|
||||||
*nodeMetadata, bool *nodeAlreadyExists);
|
*nodeMetadata, bool *nodeAlreadyExists);
|
||||||
static void SetNodeState(char *nodeName, int32 nodePort, bool isActive);
|
static WorkerNode * SetNodeState(char *nodeName, int32 nodePort, bool isActive);
|
||||||
static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort);
|
static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort);
|
||||||
static int32 GetNextGroupId(void);
|
static int32 GetNextGroupId(void);
|
||||||
static int GetNextNodeId(void);
|
static int GetNextNodeId(void);
|
||||||
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
|
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
|
||||||
*nodeMetadata);
|
*nodeMetadata);
|
||||||
static void DeleteNodeRow(char *nodename, int32 nodeport);
|
static void DeleteNodeRow(char *nodename, int32 nodeport);
|
||||||
|
static void SetUpDistributedTableDependencies(WorkerNode *workerNode);
|
||||||
static List * ParseWorkerNodeFileAndRename(void);
|
static List * ParseWorkerNodeFileAndRename(void);
|
||||||
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
|
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
|
||||||
|
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
|
||||||
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
|
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
|
||||||
static bool UnsetMetadataSyncedForAll(void);
|
static bool UnsetMetadataSyncedForAll(void);
|
||||||
|
static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards);
|
||||||
|
|
||||||
/* declarations for dynamic loading */
|
/* declarations for dynamic loading */
|
||||||
PG_FUNCTION_INFO_V1(master_add_node);
|
PG_FUNCTION_INFO_V1(master_add_node);
|
||||||
PG_FUNCTION_INFO_V1(master_add_inactive_node);
|
PG_FUNCTION_INFO_V1(master_add_inactive_node);
|
||||||
PG_FUNCTION_INFO_V1(master_add_secondary_node);
|
PG_FUNCTION_INFO_V1(master_add_secondary_node);
|
||||||
|
PG_FUNCTION_INFO_V1(master_set_node_property);
|
||||||
PG_FUNCTION_INFO_V1(master_remove_node);
|
PG_FUNCTION_INFO_V1(master_remove_node);
|
||||||
PG_FUNCTION_INFO_V1(master_disable_node);
|
PG_FUNCTION_INFO_V1(master_disable_node);
|
||||||
PG_FUNCTION_INFO_V1(master_activate_node);
|
PG_FUNCTION_INFO_V1(master_activate_node);
|
||||||
|
@ -105,6 +110,7 @@ DefaultNodeMetadata()
|
||||||
{
|
{
|
||||||
NodeMetadata nodeMetadata = {
|
NodeMetadata nodeMetadata = {
|
||||||
.nodeRack = WORKER_DEFAULT_RACK,
|
.nodeRack = WORKER_DEFAULT_RACK,
|
||||||
|
.shouldHaveShards = true,
|
||||||
};
|
};
|
||||||
return nodeMetadata;
|
return nodeMetadata;
|
||||||
}
|
}
|
||||||
|
@ -237,13 +243,12 @@ master_add_secondary_node(PG_FUNCTION_ARGS)
|
||||||
Datum
|
Datum
|
||||||
master_remove_node(PG_FUNCTION_ARGS)
|
master_remove_node(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
text *nodeName = PG_GETARG_TEXT_P(0);
|
text *nodeNameText = PG_GETARG_TEXT_P(0);
|
||||||
int32 nodePort = PG_GETARG_INT32(1);
|
int32 nodePort = PG_GETARG_INT32(1);
|
||||||
char *nodeNameString = text_to_cstring(nodeName);
|
|
||||||
|
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
RemoveNodeFromCluster(nodeNameString, nodePort);
|
RemoveNodeFromCluster(text_to_cstring(nodeNameText), nodePort);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
@ -264,41 +269,32 @@ master_remove_node(PG_FUNCTION_ARGS)
|
||||||
Datum
|
Datum
|
||||||
master_disable_node(PG_FUNCTION_ARGS)
|
master_disable_node(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
const bool onlyConsiderActivePlacements = true;
|
|
||||||
text *nodeNameText = PG_GETARG_TEXT_P(0);
|
text *nodeNameText = PG_GETARG_TEXT_P(0);
|
||||||
int32 nodePort = PG_GETARG_INT32(1);
|
int32 nodePort = PG_GETARG_INT32(1);
|
||||||
|
|
||||||
char *nodeName = text_to_cstring(nodeNameText);
|
char *nodeName = text_to_cstring(nodeNameText);
|
||||||
|
WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort);
|
||||||
bool isActive = false;
|
bool isActive = false;
|
||||||
|
bool onlyConsiderActivePlacements = false;
|
||||||
WorkerNode *workerNode = NULL;
|
|
||||||
|
|
||||||
CheckCitusVersion(ERROR);
|
|
||||||
|
|
||||||
EnsureCoordinator();
|
|
||||||
|
|
||||||
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
|
|
||||||
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
|
|
||||||
|
|
||||||
workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
|
|
||||||
if (workerNode == NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (WorkerNodeIsPrimary(workerNode))
|
if (WorkerNodeIsPrimary(workerNode))
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* Delete reference table placements so they are not taken into account
|
||||||
|
* for the check if there are placements after this
|
||||||
|
*/
|
||||||
DeleteAllReferenceTablePlacementsFromNodeGroup(workerNode->groupId);
|
DeleteAllReferenceTablePlacementsFromNodeGroup(workerNode->groupId);
|
||||||
}
|
|
||||||
|
|
||||||
if (WorkerNodeIsPrimary(workerNode) &&
|
if (NodeGroupHasShardPlacements(workerNode->groupId,
|
||||||
NodeGroupHasShardPlacements(workerNode->groupId, onlyConsiderActivePlacements))
|
onlyConsiderActivePlacements))
|
||||||
{
|
{
|
||||||
ereport(NOTICE, (errmsg("Node %s:%d has active shard placements. Some queries "
|
ereport(NOTICE, (errmsg(
|
||||||
|
"Node %s:%d has active shard placements. Some queries "
|
||||||
"may fail after this operation. Use "
|
"may fail after this operation. Use "
|
||||||
"SELECT master_activate_node('%s', %d) to activate this "
|
"SELECT master_activate_node('%s', %d) to activate this "
|
||||||
"node back.",
|
"node back.",
|
||||||
nodeName, nodePort, nodeName, nodePort)));
|
workerNode->workerName, nodePort, workerNode->workerName,
|
||||||
|
nodePort)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SetNodeState(nodeName, nodePort, isActive);
|
SetNodeState(nodeName, nodePort, isActive);
|
||||||
|
@ -313,6 +309,95 @@ master_disable_node(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* master_set_node_property sets a property of the node
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
master_set_node_property(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
text *nodeNameText = PG_GETARG_TEXT_P(0);
|
||||||
|
int32 nodePort = PG_GETARG_INT32(1);
|
||||||
|
text *propertyText = PG_GETARG_TEXT_P(2);
|
||||||
|
bool value = PG_GETARG_BOOL(3);
|
||||||
|
|
||||||
|
WorkerNode *workerNode = ModifiableWorkerNode(text_to_cstring(nodeNameText),
|
||||||
|
nodePort);
|
||||||
|
|
||||||
|
if (strcmp(text_to_cstring(propertyText), "shouldhaveshards") == 0)
|
||||||
|
{
|
||||||
|
SetShouldHaveShards(workerNode, value);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg(
|
||||||
|
"only the 'shouldhaveshards' property can be set using this function"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SetUpDistributedTableDependencies sets up up the following on a node if it's
|
||||||
|
* a primary node that currently stores data:
|
||||||
|
* - All dependencies (e.g., types, schemas)
|
||||||
|
* - Reference tables, because they are needed to handle queries efficiently.
|
||||||
|
* - Distributed functions
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
SetUpDistributedTableDependencies(WorkerNode *newWorkerNode)
|
||||||
|
{
|
||||||
|
if (WorkerNodeIsPrimary(newWorkerNode))
|
||||||
|
{
|
||||||
|
EnsureNoModificationsHaveBeenDone();
|
||||||
|
ReplicateAllDependenciesToNode(newWorkerNode->workerName,
|
||||||
|
newWorkerNode->workerPort);
|
||||||
|
ReplicateAllReferenceTablesToNode(newWorkerNode->workerName,
|
||||||
|
newWorkerNode->workerPort);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Let the maintanince deamon do the hard work of syncing the metadata.
|
||||||
|
* We prefer this because otherwise node activation might fail within
|
||||||
|
* transaction blocks.
|
||||||
|
*/
|
||||||
|
if (ClusterHasDistributedFunctionWithDistArgument())
|
||||||
|
{
|
||||||
|
MarkNodeHasMetadata(newWorkerNode->workerName, newWorkerNode->workerPort,
|
||||||
|
true);
|
||||||
|
TriggerMetadataSync(MyDatabaseId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ModifiableWorkerNode gets the requested WorkerNode and also gets locks
|
||||||
|
* required for modifying it. This fails if the node does not exist.
|
||||||
|
*/
|
||||||
|
static WorkerNode *
|
||||||
|
ModifiableWorkerNode(const char *nodeName, int32 nodePort)
|
||||||
|
{
|
||||||
|
WorkerNode *workerNode = NULL;
|
||||||
|
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
EnsureCoordinator();
|
||||||
|
|
||||||
|
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
|
||||||
|
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
|
||||||
|
|
||||||
|
workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
|
||||||
|
if (workerNode == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return workerNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* master_activate_node UDF activates the given node. It sets the node's isactive
|
* master_activate_node UDF activates the given node. It sets the node's isactive
|
||||||
* value to active and replicates all reference tables to that node.
|
* value to active and replicates all reference tables to that node.
|
||||||
|
@ -320,19 +405,14 @@ master_disable_node(PG_FUNCTION_ARGS)
|
||||||
Datum
|
Datum
|
||||||
master_activate_node(PG_FUNCTION_ARGS)
|
master_activate_node(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
text *nodeName = PG_GETARG_TEXT_P(0);
|
text *nodeNameText = PG_GETARG_TEXT_P(0);
|
||||||
int32 nodePort = PG_GETARG_INT32(1);
|
int32 nodePort = PG_GETARG_INT32(1);
|
||||||
|
|
||||||
char *nodeNameString = text_to_cstring(nodeName);
|
WorkerNode *workerNode = ModifiableWorkerNode(text_to_cstring(nodeNameText),
|
||||||
int nodeId = 0;
|
nodePort);
|
||||||
|
ActivateNode(workerNode->workerName, workerNode->workerPort);
|
||||||
|
|
||||||
CheckCitusVersion(ERROR);
|
PG_RETURN_INT32(workerNode->nodeId);
|
||||||
|
|
||||||
EnsureCoordinator();
|
|
||||||
|
|
||||||
nodeId = ActivateNode(nodeNameString, nodePort);
|
|
||||||
|
|
||||||
PG_RETURN_INT32(nodeId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -391,6 +471,22 @@ WorkerNodeIsSecondary(WorkerNode *worker)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* WorkerNodeIsPrimaryShouldHaveShardsNode returns whether the argument represents a
|
||||||
|
* primary node that is a eligible for new data.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
WorkerNodeIsPrimaryShouldHaveShardsNode(WorkerNode *worker)
|
||||||
|
{
|
||||||
|
if (!WorkerNodeIsPrimary(worker))
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return worker->shouldHaveShards;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WorkerNodeIsReadable returns whether we're allowed to send SELECT queries to this
|
* WorkerNodeIsReadable returns whether we're allowed to send SELECT queries to this
|
||||||
* node.
|
* node.
|
||||||
|
@ -461,34 +557,16 @@ PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes)
|
||||||
static int
|
static int
|
||||||
ActivateNode(char *nodeName, int nodePort)
|
ActivateNode(char *nodeName, int nodePort)
|
||||||
{
|
{
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *newWorkerNode = NULL;
|
||||||
bool isActive = true;
|
bool isActive = true;
|
||||||
|
|
||||||
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
|
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
|
||||||
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
|
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
|
||||||
|
|
||||||
SetNodeState(nodeName, nodePort, isActive);
|
newWorkerNode = SetNodeState(nodeName, nodePort, isActive);
|
||||||
|
|
||||||
workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
|
SetUpDistributedTableDependencies(newWorkerNode);
|
||||||
|
return newWorkerNode->nodeId;
|
||||||
if (WorkerNodeIsPrimary(workerNode))
|
|
||||||
{
|
|
||||||
EnsureNoModificationsHaveBeenDone();
|
|
||||||
ReplicateAllDependenciesToNode(nodeName, nodePort);
|
|
||||||
ReplicateAllReferenceTablesToNode(nodeName, nodePort);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Let the maintanince deamon do the hard work of syncing the metadata. We prefer
|
|
||||||
* this because otherwise node activation might fail withing transaction blocks.
|
|
||||||
*/
|
|
||||||
if (ClusterHasDistributedFunctionWithDistArgument())
|
|
||||||
{
|
|
||||||
MarkNodeHasMetadata(nodeName, nodePort, true);
|
|
||||||
TriggerMetadataSync(MyDatabaseId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return workerNode->nodeId;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -845,7 +923,7 @@ FindWorkerNode(char *nodeName, int32 nodePort)
|
||||||
* clusters do not exist.
|
* clusters do not exist.
|
||||||
*/
|
*/
|
||||||
WorkerNode *
|
WorkerNode *
|
||||||
FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort)
|
FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort)
|
||||||
{
|
{
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
|
|
||||||
|
@ -924,40 +1002,28 @@ ReadWorkerNodes(bool includeNodesFromOtherClusters)
|
||||||
static void
|
static void
|
||||||
RemoveNodeFromCluster(char *nodeName, int32 nodePort)
|
RemoveNodeFromCluster(char *nodeName, int32 nodePort)
|
||||||
{
|
{
|
||||||
const bool onlyConsiderActivePlacements = false;
|
|
||||||
char *nodeDeleteCommand = NULL;
|
char *nodeDeleteCommand = NULL;
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort);
|
||||||
uint32 deletedNodeId = INVALID_PLACEMENT_ID;
|
|
||||||
|
|
||||||
EnsureCoordinator();
|
|
||||||
|
|
||||||
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
|
|
||||||
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
|
|
||||||
|
|
||||||
workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
|
|
||||||
if (workerNode == NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (workerNode != NULL)
|
|
||||||
{
|
|
||||||
deletedNodeId = workerNode->nodeId;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (WorkerNodeIsPrimary(workerNode))
|
if (WorkerNodeIsPrimary(workerNode))
|
||||||
{
|
{
|
||||||
DeleteAllReferenceTablePlacementsFromNodeGroup(workerNode->groupId);
|
bool onlyConsiderActivePlacements = false;
|
||||||
}
|
|
||||||
|
|
||||||
if (WorkerNodeIsPrimary(workerNode) &&
|
/*
|
||||||
NodeGroupHasShardPlacements(workerNode->groupId, onlyConsiderActivePlacements))
|
* Delete reference table placements so they are not taken into account
|
||||||
|
* for the check if there are placements after this
|
||||||
|
*/
|
||||||
|
DeleteAllReferenceTablePlacementsFromNodeGroup(workerNode->groupId);
|
||||||
|
|
||||||
|
if (NodeGroupHasShardPlacements(workerNode->groupId,
|
||||||
|
onlyConsiderActivePlacements))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("you cannot remove the primary node of a node group "
|
ereport(ERROR, (errmsg("you cannot remove the primary node of a node group "
|
||||||
"which has shard placements")));
|
"which has shard placements")));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
DeleteNodeRow(nodeName, nodePort);
|
DeleteNodeRow(workerNode->workerName, nodePort);
|
||||||
|
|
||||||
if (WorkerNodeIsPrimary(workerNode))
|
if (WorkerNodeIsPrimary(workerNode))
|
||||||
{
|
{
|
||||||
|
@ -965,10 +1031,10 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
|
||||||
ActivePrimaryNodeCount());
|
ActivePrimaryNodeCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeDeleteCommand = NodeDeleteCommand(deletedNodeId);
|
nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
|
||||||
|
|
||||||
/* make sure we don't have any lingering session lifespan connections */
|
/* make sure we don't have any lingering session lifespan connections */
|
||||||
CloseNodeConnectionsAfterTransaction(nodeName, nodePort);
|
CloseNodeConnectionsAfterTransaction(workerNode->workerName, nodePort);
|
||||||
|
|
||||||
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand);
|
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand);
|
||||||
}
|
}
|
||||||
|
@ -1088,38 +1154,62 @@ AddNodeMetadata(char *nodeName, int32 nodePort,
|
||||||
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand);
|
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand);
|
||||||
}
|
}
|
||||||
|
|
||||||
return nextNodeIdInt;
|
return workerNode->nodeId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SetNodeState function sets the isactive column of the specified worker in
|
* SetWorkerColumn function sets the column with the specified index
|
||||||
* pg_dist_node to isActive.
|
* (see pg_dist_node.h) on the worker in pg_dist_node.
|
||||||
|
* It returns the new worker node after the modification.
|
||||||
*/
|
*/
|
||||||
static void
|
static WorkerNode *
|
||||||
SetNodeState(char *nodeName, int32 nodePort, bool isActive)
|
SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value)
|
||||||
{
|
{
|
||||||
Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
|
Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||||
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort);
|
HeapTuple heapTuple = GetNodeTuple(workerNode->workerName, workerNode->workerPort);
|
||||||
|
WorkerNode *newWorkerNode = NULL;
|
||||||
|
|
||||||
Datum values[Natts_pg_dist_node];
|
Datum values[Natts_pg_dist_node];
|
||||||
bool isnull[Natts_pg_dist_node];
|
bool isnull[Natts_pg_dist_node];
|
||||||
bool replace[Natts_pg_dist_node];
|
bool replace[Natts_pg_dist_node];
|
||||||
|
char *metadataSyncCommand = NULL;
|
||||||
|
|
||||||
char *nodeStateUpdateCommand = NULL;
|
|
||||||
WorkerNode *workerNode = NULL;
|
switch (columnIndex)
|
||||||
|
{
|
||||||
|
case Anum_pg_dist_node_isactive:
|
||||||
|
{
|
||||||
|
metadataSyncCommand = ShouldHaveShardsUpdateCommand(workerNode->nodeId,
|
||||||
|
DatumGetBool(value));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case Anum_pg_dist_node_shouldhaveshards:
|
||||||
|
{
|
||||||
|
metadataSyncCommand = ShouldHaveShardsUpdateCommand(workerNode->nodeId,
|
||||||
|
DatumGetBool(value));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
|
||||||
|
workerNode->workerName, workerNode->workerPort)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (heapTuple == NULL)
|
if (heapTuple == NULL)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
|
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
|
||||||
nodeName, nodePort)));
|
workerNode->workerName, workerNode->workerPort)));
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(replace, 0, sizeof(replace));
|
memset(replace, 0, sizeof(replace));
|
||||||
values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(isActive);
|
values[columnIndex - 1] = value;
|
||||||
isnull[Anum_pg_dist_node_isactive - 1] = false;
|
isnull[columnIndex - 1] = false;
|
||||||
replace[Anum_pg_dist_node_isactive - 1] = true;
|
replace[columnIndex - 1] = true;
|
||||||
|
|
||||||
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
||||||
|
|
||||||
|
@ -1128,13 +1218,40 @@ SetNodeState(char *nodeName, int32 nodePort, bool isActive)
|
||||||
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
|
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
|
||||||
CommandCounterIncrement();
|
CommandCounterIncrement();
|
||||||
|
|
||||||
workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);
|
newWorkerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);
|
||||||
|
|
||||||
heap_close(pgDistNode, NoLock);
|
heap_close(pgDistNode, NoLock);
|
||||||
|
|
||||||
/* we also update isactive column at worker nodes */
|
/* we also update the column at worker nodes */
|
||||||
nodeStateUpdateCommand = NodeStateUpdateCommand(workerNode->nodeId, isActive);
|
SendCommandToWorkers(WORKERS_WITH_METADATA, metadataSyncCommand);
|
||||||
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeStateUpdateCommand);
|
return newWorkerNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SetShouldHaveShards function sets the shouldhaveshards column of the
|
||||||
|
* specified worker in pg_dist_node.
|
||||||
|
* It returns the new worker node after the modification.
|
||||||
|
*/
|
||||||
|
static WorkerNode *
|
||||||
|
SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards)
|
||||||
|
{
|
||||||
|
return SetWorkerColumn(workerNode, Anum_pg_dist_node_shouldhaveshards,
|
||||||
|
BoolGetDatum(shouldHaveShards));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SetNodeState function sets the isactive column of the specified worker in
|
||||||
|
* pg_dist_node to isActive.
|
||||||
|
* It returns the new worker node after the modification.
|
||||||
|
*/
|
||||||
|
static WorkerNode *
|
||||||
|
SetNodeState(char *nodeName, int nodePort, bool isActive)
|
||||||
|
{
|
||||||
|
WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
|
||||||
|
return SetWorkerColumn(workerNode, Anum_pg_dist_node_isactive,
|
||||||
|
BoolGetDatum(isActive));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1145,7 +1262,7 @@ SetNodeState(char *nodeName, int32 nodePort, bool isActive)
|
||||||
* This function may return worker nodes from other clusters.
|
* This function may return worker nodes from other clusters.
|
||||||
*/
|
*/
|
||||||
static HeapTuple
|
static HeapTuple
|
||||||
GetNodeTuple(char *nodeName, int32 nodePort)
|
GetNodeTuple(const char *nodeName, int32 nodePort)
|
||||||
{
|
{
|
||||||
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
||||||
const int scanKeyCount = 2;
|
const int scanKeyCount = 2;
|
||||||
|
@ -1296,6 +1413,8 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
|
||||||
values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(nodeMetadata->isActive);
|
values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(nodeMetadata->isActive);
|
||||||
values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeMetadata->nodeRole);
|
values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeMetadata->nodeRole);
|
||||||
values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
|
values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
|
||||||
|
values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum(
|
||||||
|
nodeMetadata->shouldHaveShards);
|
||||||
|
|
||||||
pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
|
pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
|
||||||
|
|
||||||
|
@ -1562,6 +1681,9 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
|
||||||
DatumGetBool(datumArray[Anum_pg_dist_node_metadatasynced - 1]);
|
DatumGetBool(datumArray[Anum_pg_dist_node_metadatasynced - 1]);
|
||||||
workerNode->isActive = DatumGetBool(datumArray[Anum_pg_dist_node_isactive - 1]);
|
workerNode->isActive = DatumGetBool(datumArray[Anum_pg_dist_node_isactive - 1]);
|
||||||
workerNode->nodeRole = DatumGetObjectId(datumArray[Anum_pg_dist_node_noderole - 1]);
|
workerNode->nodeRole = DatumGetObjectId(datumArray[Anum_pg_dist_node_noderole - 1]);
|
||||||
|
workerNode->shouldHaveShards = DatumGetBool(
|
||||||
|
datumArray[Anum_pg_dist_node_shouldhaveshards -
|
||||||
|
1]);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* nodecluster column can be missing. In the case of extension creation/upgrade,
|
* nodecluster column can be missing. In the case of extension creation/upgrade,
|
||||||
|
|
|
@ -9,6 +9,8 @@
|
||||||
#ifndef HASH_HELPERS_H
|
#ifndef HASH_HELPERS_H
|
||||||
#define HASH_HELPERS_H
|
#define HASH_HELPERS_H
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
#include "utils/hsearch.h"
|
#include "utils/hsearch.h"
|
||||||
|
|
||||||
/* pg12 includes this exact implementation of hash_combine */
|
/* pg12 includes this exact implementation of hash_combine */
|
||||||
|
@ -33,4 +35,17 @@ hash_combine(uint32 a, uint32 b)
|
||||||
|
|
||||||
extern void hash_delete_all(HTAB *htab);
|
extern void hash_delete_all(HTAB *htab);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* foreach_htab -
|
||||||
|
* a convenience macro which loops through a HTAB
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define foreach_htab(var, status, htab) \
|
||||||
|
hash_seq_init((status), (htab)); \
|
||||||
|
for ((var) = hash_seq_search(status); \
|
||||||
|
(var) != NULL; \
|
||||||
|
(var) = hash_seq_search(status))
|
||||||
|
|
||||||
|
extern void foreach_htab_cleanup(void *var, HASH_SEQ_STATUS *status);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -43,6 +43,7 @@ extern List * ShardListInsertCommand(List *shardIntervalList);
|
||||||
extern List * ShardDeleteCommandList(ShardInterval *shardInterval);
|
extern List * ShardDeleteCommandList(ShardInterval *shardInterval);
|
||||||
extern char * NodeDeleteCommand(uint32 nodeId);
|
extern char * NodeDeleteCommand(uint32 nodeId);
|
||||||
extern char * NodeStateUpdateCommand(uint32 nodeId, bool isActive);
|
extern char * NodeStateUpdateCommand(uint32 nodeId, bool isActive);
|
||||||
|
extern char * ShouldHaveShardsUpdateCommand(uint32 nodeId, bool shouldHaveShards);
|
||||||
extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId);
|
extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId);
|
||||||
extern char * CreateSchemaDDLCommand(Oid schemaId);
|
extern char * CreateSchemaDDLCommand(Oid schemaId);
|
||||||
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
|
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
* in particular their OUT parameters) must be changed whenever the definition of
|
* in particular their OUT parameters) must be changed whenever the definition of
|
||||||
* pg_dist_node changes.
|
* pg_dist_node changes.
|
||||||
*/
|
*/
|
||||||
#define Natts_pg_dist_node 10
|
#define Natts_pg_dist_node 11
|
||||||
#define Anum_pg_dist_node_nodeid 1
|
#define Anum_pg_dist_node_nodeid 1
|
||||||
#define Anum_pg_dist_node_groupid 2
|
#define Anum_pg_dist_node_groupid 2
|
||||||
#define Anum_pg_dist_node_nodename 3
|
#define Anum_pg_dist_node_nodename 3
|
||||||
|
@ -31,6 +31,7 @@
|
||||||
#define Anum_pg_dist_node_noderole 8
|
#define Anum_pg_dist_node_noderole 8
|
||||||
#define Anum_pg_dist_node_nodecluster 9
|
#define Anum_pg_dist_node_nodecluster 9
|
||||||
#define Anum_pg_dist_node_metadatasynced 10
|
#define Anum_pg_dist_node_metadatasynced 10
|
||||||
|
#define Anum_pg_dist_node_shouldhaveshards 11
|
||||||
|
|
||||||
#define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq"
|
#define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq"
|
||||||
#define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq"
|
#define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq"
|
||||||
|
|
|
@ -12,6 +12,10 @@
|
||||||
#ifndef REFERENCE_TABLE_UTILS_H_
|
#ifndef REFERENCE_TABLE_UTILS_H_
|
||||||
#define REFERENCE_TABLE_UTILS_H_
|
#define REFERENCE_TABLE_UTILS_H_
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "listutils.h"
|
||||||
|
|
||||||
extern uint32 CreateReferenceTableColocationId(void);
|
extern uint32 CreateReferenceTableColocationId(void);
|
||||||
extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort);
|
extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort);
|
||||||
extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId);
|
extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId);
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "storage/lmgr.h"
|
||||||
#include "storage/lockdefs.h"
|
#include "storage/lockdefs.h"
|
||||||
#include "nodes/pg_list.h"
|
#include "nodes/pg_list.h"
|
||||||
|
|
||||||
|
@ -38,6 +39,8 @@
|
||||||
/*
|
/*
|
||||||
* In memory representation of pg_dist_node table elements. The elements are hold in
|
* In memory representation of pg_dist_node table elements. The elements are hold in
|
||||||
* WorkerNodeHash table.
|
* WorkerNodeHash table.
|
||||||
|
* IMPORTANT: The order of the fields in this definition should match the
|
||||||
|
* column order of pg_dist_node
|
||||||
*/
|
*/
|
||||||
typedef struct WorkerNode
|
typedef struct WorkerNode
|
||||||
{
|
{
|
||||||
|
@ -51,6 +54,7 @@ typedef struct WorkerNode
|
||||||
Oid nodeRole; /* the node's role in its group */
|
Oid nodeRole; /* the node's role in its group */
|
||||||
char nodeCluster[NAMEDATALEN]; /* the cluster the node is a part of */
|
char nodeCluster[NAMEDATALEN]; /* the cluster the node is a part of */
|
||||||
bool metadataSynced; /* node has the most recent metadata */
|
bool metadataSynced; /* node has the most recent metadata */
|
||||||
|
bool shouldHaveShards; /* if the node should have distributed table shards on it or not */
|
||||||
} WorkerNode;
|
} WorkerNode;
|
||||||
|
|
||||||
|
|
||||||
|
@ -68,17 +72,19 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList,
|
||||||
extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList);
|
extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList);
|
||||||
extern uint32 ActivePrimaryNodeCount(void);
|
extern uint32 ActivePrimaryNodeCount(void);
|
||||||
extern List * ActivePrimaryNodeList(LOCKMODE lockMode);
|
extern List * ActivePrimaryNodeList(LOCKMODE lockMode);
|
||||||
|
extern List * ActivePrimaryShouldHaveShardsNodeList(LOCKMODE lockMode);
|
||||||
extern uint32 ActiveReadableNodeCount(void);
|
extern uint32 ActiveReadableNodeCount(void);
|
||||||
extern List * ActiveReadableNodeList(void);
|
extern List * ActiveReadableNodeList(void);
|
||||||
extern WorkerNode * GetWorkerNodeByNodeId(int nodeId);
|
extern WorkerNode * GetWorkerNodeByNodeId(int nodeId);
|
||||||
extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort);
|
extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort);
|
||||||
extern WorkerNode * FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort);
|
extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort);
|
||||||
extern List * ReadWorkerNodes(bool includeNodesFromOtherClusters);
|
extern List * ReadWorkerNodes(bool includeNodesFromOtherClusters);
|
||||||
extern void EnsureCoordinator(void);
|
extern void EnsureCoordinator(void);
|
||||||
extern uint32 GroupForNode(char *nodeName, int32 nodePorT);
|
extern uint32 GroupForNode(char *nodeName, int32 nodePorT);
|
||||||
extern WorkerNode * PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes);
|
extern WorkerNode * PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes);
|
||||||
extern bool WorkerNodeIsPrimary(WorkerNode *worker);
|
extern bool WorkerNodeIsPrimary(WorkerNode *worker);
|
||||||
extern bool WorkerNodeIsSecondary(WorkerNode *worker);
|
extern bool WorkerNodeIsSecondary(WorkerNode *worker);
|
||||||
|
extern bool WorkerNodeIsPrimaryShouldHaveShardsNode(WorkerNode *worker);
|
||||||
extern bool WorkerNodeIsReadable(WorkerNode *worker);
|
extern bool WorkerNodeIsReadable(WorkerNode *worker);
|
||||||
extern uint32 CountPrimariesWithMetadata(void);
|
extern uint32 CountPrimariesWithMetadata(void);
|
||||||
extern WorkerNode * GetFirstPrimaryWorkerNode(void);
|
extern WorkerNode * GetFirstPrimaryWorkerNode(void);
|
||||||
|
|
|
@ -29,11 +29,11 @@ step detector-dump-wait-edges:
|
||||||
|
|
||||||
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
||||||
|
|
||||||
276 275 f
|
290 289 f
|
||||||
transactionnumberwaitingtransactionnumbers
|
transactionnumberwaitingtransactionnumbers
|
||||||
|
|
||||||
275
|
289
|
||||||
276 275
|
290 289
|
||||||
step s1-abort:
|
step s1-abort:
|
||||||
ABORT;
|
ABORT;
|
||||||
|
|
||||||
|
@ -77,14 +77,14 @@ step detector-dump-wait-edges:
|
||||||
|
|
||||||
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
||||||
|
|
||||||
280 279 f
|
294 293 f
|
||||||
281 279 f
|
295 293 f
|
||||||
281 280 t
|
295 294 t
|
||||||
transactionnumberwaitingtransactionnumbers
|
transactionnumberwaitingtransactionnumbers
|
||||||
|
|
||||||
279
|
293
|
||||||
280 279
|
294 293
|
||||||
281 279,280
|
295 293,294
|
||||||
step s1-abort:
|
step s1-abort:
|
||||||
ABORT;
|
ABORT;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,168 @@
|
||||||
|
Parsed test spec with 2 sessions
|
||||||
|
|
||||||
|
starting permutation: s1-add-second-node s1-begin s2-begin s2-create-distributed-table s1-noshards s2-commit s1-commit s2-shardcounts
|
||||||
|
?column?
|
||||||
|
|
||||||
|
1
|
||||||
|
step s1-add-second-node:
|
||||||
|
SELECT 1 FROM master_add_node('localhost', 57638);
|
||||||
|
|
||||||
|
?column?
|
||||||
|
|
||||||
|
1
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-create-distributed-table:
|
||||||
|
CREATE TABLE t1 (a int);
|
||||||
|
-- session needs to have replication factor set to 1, can't do in setup
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT create_distributed_table('t1', 'a');
|
||||||
|
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step s1-noshards:
|
||||||
|
SELECT * from master_set_node_property('localhost', 57637, 'shouldhaveshards', false);
|
||||||
|
<waiting ...>
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-noshards: <... completed>
|
||||||
|
master_set_node_property
|
||||||
|
|
||||||
|
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-shardcounts:
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 't1'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
|
||||||
|
nodeport count
|
||||||
|
|
||||||
|
57637 2
|
||||||
|
57638 2
|
||||||
|
master_remove_node
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-add-second-node s1-begin s2-begin s1-noshards s2-create-distributed-table s1-commit s2-commit s2-shardcounts
|
||||||
|
?column?
|
||||||
|
|
||||||
|
1
|
||||||
|
step s1-add-second-node:
|
||||||
|
SELECT 1 FROM master_add_node('localhost', 57638);
|
||||||
|
|
||||||
|
?column?
|
||||||
|
|
||||||
|
1
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-noshards:
|
||||||
|
SELECT * from master_set_node_property('localhost', 57637, 'shouldhaveshards', false);
|
||||||
|
|
||||||
|
master_set_node_property
|
||||||
|
|
||||||
|
|
||||||
|
step s2-create-distributed-table:
|
||||||
|
CREATE TABLE t1 (a int);
|
||||||
|
-- session needs to have replication factor set to 1, can't do in setup
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT create_distributed_table('t1', 'a');
|
||||||
|
<waiting ...>
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-create-distributed-table: <... completed>
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-shardcounts:
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 't1'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
|
||||||
|
nodeport count
|
||||||
|
|
||||||
|
57638 4
|
||||||
|
master_remove_node
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s1-noshards s2-update-node s1-commit s2-commit
|
||||||
|
?column?
|
||||||
|
|
||||||
|
1
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-noshards:
|
||||||
|
SELECT * from master_set_node_property('localhost', 57637, 'shouldhaveshards', false);
|
||||||
|
|
||||||
|
master_set_node_property
|
||||||
|
|
||||||
|
|
||||||
|
step s2-update-node:
|
||||||
|
select * from master_update_node((select nodeid from pg_dist_node where nodeport = 57637), 'localhost', 57638)
|
||||||
|
<waiting ...>
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-update-node: <... completed>
|
||||||
|
master_update_node
|
||||||
|
|
||||||
|
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
master_remove_node
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s2-update-node s1-noshards s2-commit s1-commit
|
||||||
|
?column?
|
||||||
|
|
||||||
|
1
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-update-node:
|
||||||
|
select * from master_update_node((select nodeid from pg_dist_node where nodeport = 57637), 'localhost', 57638)
|
||||||
|
|
||||||
|
master_update_node
|
||||||
|
|
||||||
|
|
||||||
|
step s1-noshards:
|
||||||
|
SELECT * from master_set_node_property('localhost', 57637, 'shouldhaveshards', false);
|
||||||
|
<waiting ...>
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-noshards: <... completed>
|
||||||
|
error in steps s2-commit s1-noshards: ERROR: node at "localhost:57637" does not exist
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
master_remove_node
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
SET citus.next_shard_id TO 1220000;
|
SET citus.next_shard_id TO 1220000;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000;
|
||||||
-- Tests functions related to cluster membership
|
-- Tests functions related to cluster membership
|
||||||
-- before starting the test, lets try to create reference table and see a
|
-- before starting the test, lets try to create reference table and see a
|
||||||
-- meaningful error
|
-- meaningful error
|
||||||
|
@ -386,10 +387,10 @@ SELECT
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
|
||||||
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------
|
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------
|
||||||
11 | 9 | localhost | 57637 | default | f | t | primary | default | f
|
11 | 9 | localhost | 57637 | default | f | t | primary | default | f | t
|
||||||
12 | 10 | localhost | 57638 | default | f | t | primary | default | f
|
12 | 10 | localhost | 57638 | default | f | t | primary | default | f | t
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
-- check that mixed add/remove node commands work fine inside transaction
|
-- check that mixed add/remove node commands work fine inside transaction
|
||||||
|
@ -522,7 +523,6 @@ WHERE
|
||||||
4
|
4
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
|
||||||
DROP TABLE temp;
|
DROP TABLE temp;
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
DELETE FROM pg_dist_partition;
|
DELETE FROM pg_dist_partition;
|
||||||
|
@ -608,11 +608,11 @@ CONTEXT: PL/pgSQL function citus_internal.pg_dist_node_trigger_func() line 18 a
|
||||||
INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole, nodecluster)
|
INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole, nodecluster)
|
||||||
VALUES ('localhost', 5000, 1000, 'primary', 'olap');
|
VALUES ('localhost', 5000, 1000, 'primary', 'olap');
|
||||||
ERROR: new row for relation "pg_dist_node" violates check constraint "primaries_are_only_allowed_in_the_default_cluster"
|
ERROR: new row for relation "pg_dist_node" violates check constraint "primaries_are_only_allowed_in_the_default_cluster"
|
||||||
DETAIL: Failing row contains (24, 1000, localhost, 5000, default, f, t, primary, olap, f).
|
DETAIL: Failing row contains (24, 1000, localhost, 5000, default, f, t, primary, olap, f, t).
|
||||||
UPDATE pg_dist_node SET nodecluster = 'olap'
|
UPDATE pg_dist_node SET nodecluster = 'olap'
|
||||||
WHERE nodeport = :worker_1_port;
|
WHERE nodeport = :worker_1_port;
|
||||||
ERROR: new row for relation "pg_dist_node" violates check constraint "primaries_are_only_allowed_in_the_default_cluster"
|
ERROR: new row for relation "pg_dist_node" violates check constraint "primaries_are_only_allowed_in_the_default_cluster"
|
||||||
DETAIL: Failing row contains (16, 14, localhost, 57637, default, f, t, primary, olap, f).
|
DETAIL: Failing row contains (16, 14, localhost, 57637, default, f, t, primary, olap, f, t).
|
||||||
-- check that you /can/ add a secondary node to a non-default cluster
|
-- check that you /can/ add a secondary node to a non-default cluster
|
||||||
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
|
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
|
||||||
SELECT master_add_node('localhost', 8888, groupid => :worker_1_group, noderole => 'secondary', nodecluster=> 'olap');
|
SELECT master_add_node('localhost', 8888, groupid => :worker_1_group, noderole => 'secondary', nodecluster=> 'olap');
|
||||||
|
@ -635,9 +635,9 @@ SELECT master_add_node('localhost', 8887, groupid => :worker_1_group, noderole =
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_node WHERE nodeport=8887;
|
SELECT * FROM pg_dist_node WHERE nodeport=8887;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
|
||||||
--------+---------+-----------+----------+----------+-------------+----------+-----------+-----------------------------------------------------------------+----------------
|
--------+---------+-----------+----------+----------+-------------+----------+-----------+-----------------------------------------------------------------+----------------+------------------
|
||||||
26 | 14 | localhost | 8887 | default | f | t | secondary | thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars. | f
|
26 | 14 | localhost | 8887 | default | f | t | secondary | thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars. | f | t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- don't remove the secondary and unavailable nodes, check that no commands are sent to
|
-- don't remove the secondary and unavailable nodes, check that no commands are sent to
|
||||||
|
@ -678,9 +678,9 @@ SELECT master_update_node(:worker_1_node, 'somehost', 9000);
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;
|
SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
|
||||||
--------+---------+----------+----------+----------+-------------+----------+----------+-------------+----------------
|
--------+---------+----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------
|
||||||
16 | 14 | somehost | 9000 | default | f | t | primary | default | f
|
16 | 14 | somehost | 9000 | default | f | t | primary | default | f | t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- cleanup
|
-- cleanup
|
||||||
|
@ -691,8 +691,196 @@ SELECT master_update_node(:worker_1_node, 'localhost', :worker_1_port);
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;
|
SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
|
||||||
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------
|
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------
|
||||||
16 | 14 | localhost | 57637 | default | f | t | primary | default | f
|
16 | 14 | localhost | 57637 | default | f | t | primary | default | f | t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
CREATE TABLE test_dist (x int, y int);
|
||||||
|
SELECT create_distributed_table('test_dist', 'x');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- testing behaviour when setting shouldhaveshards to false on partially empty node
|
||||||
|
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
|
||||||
|
master_set_node_property
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE test_dist_colocated (x int, y int);
|
||||||
|
CREATE TABLE test_dist_non_colocated (x int, y int);
|
||||||
|
CREATE TABLE test_dist_colocated_with_non_colocated (x int, y int);
|
||||||
|
CREATE TABLE test_ref (a int, b int);
|
||||||
|
SELECT create_distributed_table('test_dist_colocated', 'x');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('test_dist_non_colocated', 'x', colocate_with => 'none');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('test_dist_colocated_with_non_colocated', 'x', colocate_with => 'test_dist_non_colocated');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_reference_table('test_ref');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- colocated tables should still be placed on shouldhaveshards false nodes for safety
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_dist_colocated'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
nodeport | count
|
||||||
|
----------+-------
|
||||||
|
57637 | 2
|
||||||
|
57638 | 2
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- non colocated tables should not be placed on shouldhaveshards false nodes anymore
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_dist_non_colocated'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
nodeport | count
|
||||||
|
----------+-------
|
||||||
|
57637 | 4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- this table should be colocated with the test_dist_non_colocated table
|
||||||
|
-- correctly only on nodes with shouldhaveshards true
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_dist_colocated_with_non_colocated'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
nodeport | count
|
||||||
|
----------+-------
|
||||||
|
57637 | 4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- reference tables should be placed on with shouldhaveshards false
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_ref'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
nodeport | count
|
||||||
|
----------+-------
|
||||||
|
57637 | 1
|
||||||
|
57638 | 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- cleanup for next test
|
||||||
|
DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated, test_dist_colocated_with_non_colocated;
|
||||||
|
-- testing behaviour when setting shouldhaveshards to false on fully empty node
|
||||||
|
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
|
||||||
|
master_set_node_property
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE test_dist (x int, y int);
|
||||||
|
CREATE TABLE test_dist_colocated (x int, y int);
|
||||||
|
CREATE TABLE test_dist_non_colocated (x int, y int);
|
||||||
|
CREATE TABLE test_ref (a int, b int);
|
||||||
|
SELECT create_distributed_table('test_dist', 'x');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_reference_table('test_ref');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- distributed tables should not be placed on nodes with shouldhaveshards false
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_dist'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
nodeport | count
|
||||||
|
----------+-------
|
||||||
|
57637 | 4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- reference tables should be placed on nodes with shouldhaveshards false
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_ref'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
nodeport | count
|
||||||
|
----------+-------
|
||||||
|
57637 | 1
|
||||||
|
57638 | 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
|
||||||
|
master_set_node_property
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- distributed tables should still not be placed on nodes that were switched to
|
||||||
|
-- shouldhaveshards true
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_dist'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
nodeport | count
|
||||||
|
----------+-------
|
||||||
|
57637 | 4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- reference tables should still be placed on all nodes with isdatanode 'true'
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_ref'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
nodeport | count
|
||||||
|
----------+-------
|
||||||
|
57637 | 1
|
||||||
|
57638 | 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('test_dist_colocated', 'x');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('test_dist_non_colocated', 'x', colocate_with => 'none');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- colocated tables should not be placed on nodedes that were switched to
|
||||||
|
-- shouldhaveshards true
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_dist_colocated'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
nodeport | count
|
||||||
|
----------+-------
|
||||||
|
57637 | 4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- non colocated tables should be placed on nodedes that were switched to
|
||||||
|
-- shouldhaveshards true
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_dist_non_colocated'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
nodeport | count
|
||||||
|
----------+-------
|
||||||
|
57637 | 2
|
||||||
|
57638 | 2
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT * from master_set_node_property('localhost', :worker_2_port, 'bogusproperty', false);
|
||||||
|
ERROR: only the 'shouldhaveshards' property can be set using this function
|
||||||
|
DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated;
|
||||||
|
|
|
@ -1,15 +1,14 @@
|
||||||
-- if the output of following query changes, we might need to change
|
-- if the output of following query changes, we might need to change
|
||||||
-- some heap_getattr() calls to heap_deform_tuple().
|
-- some heap_getattr() calls to heap_deform_tuple(). This errors out in
|
||||||
|
-- postgres versions before 11. If this test fails check out
|
||||||
|
-- https://github.com/citusdata/citus/pull/2464 for an explanation of what to
|
||||||
|
-- do. Once you used the new code for the table you can add it to the NOT IN
|
||||||
|
-- part of the query so new changes to it won't affect this test.
|
||||||
SELECT attrelid::regclass, attname, atthasmissing, attmissingval
|
SELECT attrelid::regclass, attname, atthasmissing, attmissingval
|
||||||
FROM pg_attribute
|
FROM pg_attribute
|
||||||
WHERE atthasmissing
|
WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass)
|
||||||
ORDER BY attrelid, attname;
|
ORDER BY attrelid, attname;
|
||||||
attrelid | attname | atthasmissing | attmissingval
|
attrelid | attname | atthasmissing | attmissingval
|
||||||
--------------+----------------+---------------+---------------
|
----------+---------+---------------+---------------
|
||||||
pg_dist_node | hasmetadata | t | {f}
|
(0 rows)
|
||||||
pg_dist_node | isactive | t | {t}
|
|
||||||
pg_dist_node | metadatasynced | t | {f}
|
|
||||||
pg_dist_node | nodecluster | t | {default}
|
|
||||||
pg_dist_node | noderole | t | {primary}
|
|
||||||
(5 rows)
|
|
||||||
|
|
||||||
|
|
|
@ -14,10 +14,8 @@ CREATE FUNCTION master_metadata_snapshot()
|
||||||
RETURNS text[]
|
RETURNS text[]
|
||||||
LANGUAGE C STRICT
|
LANGUAGE C STRICT
|
||||||
AS 'citus';
|
AS 'citus';
|
||||||
|
|
||||||
COMMENT ON FUNCTION master_metadata_snapshot()
|
COMMENT ON FUNCTION master_metadata_snapshot()
|
||||||
IS 'commands to create the metadata snapshot';
|
IS 'commands to create the metadata snapshot';
|
||||||
|
|
||||||
-- Show that none of the existing tables are qualified to be MX tables
|
-- Show that none of the existing tables are qualified to be MX tables
|
||||||
SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
|
SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
|
||||||
logicalrelid | partmethod | partkey | colocationid | repmodel
|
logicalrelid | partmethod | partkey | colocationid | repmodel
|
||||||
|
@ -79,7 +77,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
||||||
ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
|
ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
|
||||||
ALTER TABLE public.mx_test_table OWNER TO postgres
|
ALTER TABLE public.mx_test_table OWNER TO postgres
|
||||||
ALTER TABLE public.mx_test_table OWNER TO postgres
|
ALTER TABLE public.mx_test_table OWNER TO postgres
|
||||||
CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2) TABLESPACE pg_default
|
CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2)
|
||||||
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
||||||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's')
|
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's')
|
||||||
|
@ -103,7 +101,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
||||||
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
|
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
|
||||||
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
||||||
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
||||||
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default
|
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
|
||||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
||||||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
|
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
|
||||||
|
@ -131,7 +129,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
||||||
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
|
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
|
||||||
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
||||||
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
||||||
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default
|
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
|
||||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
||||||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
|
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
|
||||||
|
@ -152,7 +150,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
||||||
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
|
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
|
||||||
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
||||||
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
||||||
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default
|
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
|
||||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
||||||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
|
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
|
||||||
|
@ -233,12 +231,12 @@ SELECT * FROM pg_dist_local_group;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
|
||||||
--------+---------+-----------+----------+----------+-------------+----------+-----------+----------------+----------------
|
--------+---------+-----------+----------+----------+-------------+----------+-----------+----------------+----------------+------------------
|
||||||
1 | 1 | localhost | 57637 | default | t | t | primary | default | f
|
1 | 1 | localhost | 57637 | default | t | t | primary | default | f | t
|
||||||
2 | 2 | localhost | 57638 | default | f | t | primary | default | f
|
2 | 2 | localhost | 57638 | default | f | t | primary | default | f | t
|
||||||
4 | 1 | localhost | 8888 | default | f | t | secondary | default | f
|
4 | 1 | localhost | 8888 | default | f | t | secondary | default | f | t
|
||||||
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f
|
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t
|
||||||
(4 rows)
|
(4 rows)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
|
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
|
||||||
|
@ -330,7 +328,6 @@ SELECT create_distributed_table('mx_testing_schema_2.fk_test_2', 'col1');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
start_metadata_sync_to_node
|
start_metadata_sync_to_node
|
||||||
-----------------------------
|
-----------------------------
|
||||||
|
@ -341,8 +338,8 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SELECT "Constraint", "Definition" FROM table_fkeys WHERE relid='mx_testing_schema_2.fk_test_2'::regclass;
|
SELECT "Constraint", "Definition" FROM table_fkeys WHERE relid='mx_testing_schema_2.fk_test_2'::regclass;
|
||||||
Constraint | Definition
|
Constraint | Definition
|
||||||
---------------------+-----------------------------------------------------------------------------
|
--------------------------+-----------------------------------------------------------------------------
|
||||||
fk_test_2_col1_fkey | FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1(col1, col3)
|
fk_test_2_col1_col2_fkey | FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1(col1, col3)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
@ -372,12 +369,12 @@ SELECT * FROM pg_dist_local_group;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
|
||||||
--------+---------+-----------+----------+----------+-------------+----------+-----------+----------------+----------------
|
--------+---------+-----------+----------+----------+-------------+----------+-----------+----------------+----------------+------------------
|
||||||
1 | 1 | localhost | 57637 | default | t | t | primary | default | t
|
1 | 1 | localhost | 57637 | default | t | t | primary | default | t | t
|
||||||
2 | 2 | localhost | 57638 | default | f | t | primary | default | f
|
2 | 2 | localhost | 57638 | default | f | t | primary | default | f | t
|
||||||
4 | 1 | localhost | 8888 | default | f | t | secondary | default | f
|
4 | 1 | localhost | 8888 | default | f | t | secondary | default | f | t
|
||||||
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f
|
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t
|
||||||
(4 rows)
|
(4 rows)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
|
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
|
||||||
|
@ -645,7 +642,6 @@ ORDER BY
|
||||||
mx_test_schema_2.mx_table_2 | 1310029 | localhost | 57637
|
mx_test_schema_2.mx_table_2 | 1310029 | localhost | 57637
|
||||||
(10 rows)
|
(10 rows)
|
||||||
|
|
||||||
|
|
||||||
-- Check that metadata of MX tables exist on the metadata worker
|
-- Check that metadata of MX tables exist on the metadata worker
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
-- Check that tables are created
|
-- Check that tables are created
|
||||||
|
@ -829,7 +825,6 @@ ORDER BY logicalrelid;
|
||||||
mx_colocation_test_2 | 10000
|
mx_colocation_test_2 | 10000
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
|
|
||||||
-- Reset the colocation IDs of the test tables
|
-- Reset the colocation IDs of the test tables
|
||||||
DELETE FROM
|
DELETE FROM
|
||||||
pg_dist_colocation
|
pg_dist_colocation
|
||||||
|
@ -889,7 +884,6 @@ DROP TABLE mx_colocation_test_2;
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
\d mx_colocation_test_1
|
\d mx_colocation_test_1
|
||||||
\d mx_colocation_test_2
|
\d mx_colocation_test_2
|
||||||
|
|
||||||
-- Check that dropped MX table can be recreated again
|
-- Check that dropped MX table can be recreated again
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
SET citus.shard_count TO 7;
|
SET citus.shard_count TO 7;
|
||||||
|
@ -1308,7 +1302,6 @@ ORDER BY
|
||||||
mx_ref | n | t | 1310072 | 100073 | localhost | 57638
|
mx_ref | n | t | 1310072 | 100073 | localhost | 57638
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
|
|
||||||
SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset
|
SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset
|
||||||
-- Check that DDL commands are propagated to reference tables on workers
|
-- Check that DDL commands are propagated to reference tables on workers
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
@ -1345,7 +1338,6 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE
|
||||||
col_1 | integer | col_1
|
col_1 | integer | col_1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
|
||||||
-- Check that metada is cleaned successfully upon drop table
|
-- Check that metada is cleaned successfully upon drop table
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
DROP TABLE mx_ref;
|
DROP TABLE mx_ref;
|
||||||
|
@ -1448,6 +1440,63 @@ UPDATE pg_dist_placement
|
||||||
UPDATE pg_dist_placement
|
UPDATE pg_dist_placement
|
||||||
SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port)
|
SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port)
|
||||||
WHERE groupid = :old_worker_2_group;
|
WHERE groupid = :old_worker_2_group;
|
||||||
|
-- Confirm that shouldhaveshards is 'true'
|
||||||
|
\c - - - :master_port
|
||||||
|
select shouldhaveshards from pg_dist_node where nodeport = 8888;
|
||||||
|
shouldhaveshards
|
||||||
|
------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - postgres - :worker_1_port
|
||||||
|
select shouldhaveshards from pg_dist_node where nodeport = 8888;
|
||||||
|
shouldhaveshards
|
||||||
|
------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Check that setting shouldhaveshards to false is correctly transferred to other mx nodes
|
||||||
|
\c - - - :master_port
|
||||||
|
SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', false);
|
||||||
|
master_set_node_property
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select shouldhaveshards from pg_dist_node where nodeport = 8888;
|
||||||
|
shouldhaveshards
|
||||||
|
------------------
|
||||||
|
f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - postgres - :worker_1_port
|
||||||
|
select shouldhaveshards from pg_dist_node where nodeport = 8888;
|
||||||
|
shouldhaveshards
|
||||||
|
------------------
|
||||||
|
f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Check that setting shouldhaveshards to true is correctly transferred to other mx nodes
|
||||||
|
\c - postgres - :master_port
|
||||||
|
SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', true);
|
||||||
|
master_set_node_property
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select shouldhaveshards from pg_dist_node where nodeport = 8888;
|
||||||
|
shouldhaveshards
|
||||||
|
------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - postgres - :worker_1_port
|
||||||
|
select shouldhaveshards from pg_dist_node where nodeport = 8888;
|
||||||
|
shouldhaveshards
|
||||||
|
------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- Cleanup
|
-- Cleanup
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
|
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
-- MULTI_MX_CREATE_TABLE
|
-- MULTI_MX_CREATE_TABLE
|
||||||
--
|
--
|
||||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000;
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000;
|
||||||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000;
|
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
start_metadata_sync_to_node
|
start_metadata_sync_to_node
|
||||||
-----------------------------
|
-----------------------------
|
||||||
|
@ -459,28 +458,28 @@ FROM pg_dist_partition NATURAL JOIN shard_counts
|
||||||
ORDER BY colocationid, logicalrelid;
|
ORDER BY colocationid, logicalrelid;
|
||||||
logicalrelid | colocationid | shard_count | partmethod | repmodel
|
logicalrelid | colocationid | shard_count | partmethod | repmodel
|
||||||
--------------------------------------------------------+--------------+-------------+------------+----------
|
--------------------------------------------------------+--------------+-------------+------------+----------
|
||||||
nation_hash | 1390000 | 16 | h | s
|
citus_mx_test_schema_join_1.nation_hash | 1390002 | 4 | h | s
|
||||||
citus_mx_test_schema.nation_hash | 1390000 | 16 | h | s
|
citus_mx_test_schema_join_1.nation_hash_2 | 1390002 | 4 | h | s
|
||||||
citus_mx_test_schema_join_1.nation_hash | 1390001 | 4 | h | s
|
citus_mx_test_schema_join_2.nation_hash | 1390002 | 4 | h | s
|
||||||
citus_mx_test_schema_join_1.nation_hash_2 | 1390001 | 4 | h | s
|
citus_mx_test_schema.nation_hash_collation_search_path | 1390002 | 4 | h | s
|
||||||
citus_mx_test_schema_join_2.nation_hash | 1390001 | 4 | h | s
|
citus_mx_test_schema.nation_hash_composite_types | 1390002 | 4 | h | s
|
||||||
citus_mx_test_schema.nation_hash_collation_search_path | 1390001 | 4 | h | s
|
mx_ddl_table | 1390002 | 4 | h | s
|
||||||
citus_mx_test_schema.nation_hash_composite_types | 1390001 | 4 | h | s
|
app_analytics_events_mx | 1390002 | 4 | h | s
|
||||||
mx_ddl_table | 1390001 | 4 | h | s
|
company_employees_mx | 1390002 | 4 | h | s
|
||||||
app_analytics_events_mx | 1390001 | 4 | h | s
|
customer_mx | 1390004 | 1 | n | t
|
||||||
company_employees_mx | 1390001 | 4 | h | s
|
nation_mx | 1390004 | 1 | n | t
|
||||||
lineitem_mx | 1390002 | 16 | h | s
|
part_mx | 1390004 | 1 | n | t
|
||||||
orders_mx | 1390002 | 16 | h | s
|
supplier_mx | 1390004 | 1 | n | t
|
||||||
customer_mx | 1390003 | 1 | n | t
|
nation_hash | 1390006 | 16 | h | s
|
||||||
nation_mx | 1390003 | 1 | n | t
|
citus_mx_test_schema.nation_hash | 1390006 | 16 | h | s
|
||||||
part_mx | 1390003 | 1 | n | t
|
lineitem_mx | 1390007 | 16 | h | s
|
||||||
supplier_mx | 1390003 | 1 | n | t
|
orders_mx | 1390007 | 16 | h | s
|
||||||
limit_orders_mx | 1390004 | 2 | h | s
|
limit_orders_mx | 1390008 | 2 | h | s
|
||||||
articles_hash_mx | 1390004 | 2 | h | s
|
articles_hash_mx | 1390008 | 2 | h | s
|
||||||
multiple_hash_mx | 1390005 | 2 | h | s
|
multiple_hash_mx | 1390009 | 2 | h | s
|
||||||
researchers_mx | 1390006 | 2 | h | s
|
researchers_mx | 1390010 | 2 | h | s
|
||||||
labs_mx | 1390007 | 1 | h | s
|
labs_mx | 1390011 | 1 | h | s
|
||||||
objects_mx | 1390007 | 1 | h | s
|
objects_mx | 1390011 | 1 | h | s
|
||||||
articles_single_shard_hash_mx | 1390007 | 1 | h | s
|
articles_single_shard_hash_mx | 1390011 | 1 | h | s
|
||||||
(23 rows)
|
(23 rows)
|
||||||
|
|
||||||
|
|
|
@ -59,7 +59,6 @@ SELECT * FROM citus_shard_indexes_on_worker;
|
||||||
-- now show that we see the shards, but not the
|
-- now show that we see the shards, but not the
|
||||||
-- indexes as there are no indexes
|
-- indexes as there are no indexes
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SET citus.next_shard_id TO 1330000;
|
|
||||||
SET search_path TO 'mx_hide_shard_names';
|
SET search_path TO 'mx_hide_shard_names';
|
||||||
SELECT * FROM citus_shards_on_worker ORDER BY 2;
|
SELECT * FROM citus_shards_on_worker ORDER BY 2;
|
||||||
Schema | Name | Type | Owner
|
Schema | Name | Type | Owner
|
||||||
|
|
|
@ -112,6 +112,13 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass;
|
||||||
DROP TABLE mx_ref_table;
|
DROP TABLE mx_ref_table;
|
||||||
CREATE UNIQUE INDEX mx_test_uniq_index ON mx_table(col_1);
|
CREATE UNIQUE INDEX mx_test_uniq_index ON mx_table(col_1);
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
|
-- changing isdatanode
|
||||||
|
SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', false);
|
||||||
|
ERROR: operation is not allowed on this node
|
||||||
|
HINT: Connect to the coordinator and run it again.
|
||||||
|
SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', true);
|
||||||
|
ERROR: operation is not allowed on this node
|
||||||
|
HINT: Connect to the coordinator and run it again.
|
||||||
-- DDL commands
|
-- DDL commands
|
||||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass;
|
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass;
|
||||||
Column | Type | Modifiers
|
Column | Type | Modifiers
|
||||||
|
|
|
@ -5,6 +5,7 @@ test: isolation_add_node_vs_reference_table_operations
|
||||||
test: isolation_create_table_vs_add_remove_node
|
test: isolation_create_table_vs_add_remove_node
|
||||||
test: isolation_master_update_node
|
test: isolation_master_update_node
|
||||||
test: isolation_ensure_dependency_activate_node
|
test: isolation_ensure_dependency_activate_node
|
||||||
|
test: isolation_shouldhaveshards
|
||||||
|
|
||||||
# tests that change node metadata should precede
|
# tests that change node metadata should precede
|
||||||
# isolation_cluster_management such that tests
|
# isolation_cluster_management such that tests
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
# the test expects to have zero nodes in pg_dist_node at the beginning
|
||||||
|
# add single one of the nodes for the purpose of the test
|
||||||
|
setup
|
||||||
|
{
|
||||||
|
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
|
||||||
|
SELECT 1 FROM master_add_node('localhost', 57637);
|
||||||
|
}
|
||||||
|
|
||||||
|
teardown
|
||||||
|
{
|
||||||
|
DROP TABLE IF EXISTS t1 CASCADE;
|
||||||
|
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s1"
|
||||||
|
|
||||||
|
step "s1-add-second-node" {
|
||||||
|
SELECT 1 FROM master_add_node('localhost', 57638);
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-begin"
|
||||||
|
{
|
||||||
|
BEGIN;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-noshards"
|
||||||
|
{
|
||||||
|
SELECT * from master_set_node_property('localhost', 57637, 'shouldhaveshards', false);
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-commit"
|
||||||
|
{
|
||||||
|
COMMIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s2"
|
||||||
|
|
||||||
|
step "s2-begin"
|
||||||
|
{
|
||||||
|
BEGIN;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-create-distributed-table"
|
||||||
|
{
|
||||||
|
CREATE TABLE t1 (a int);
|
||||||
|
-- session needs to have replication factor set to 1, can't do in setup
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT create_distributed_table('t1', 'a');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-update-node"
|
||||||
|
{
|
||||||
|
select * from master_update_node((select nodeid from pg_dist_node where nodeport = 57637), 'localhost', 57638)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
step "s2-commit"
|
||||||
|
{
|
||||||
|
COMMIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-shardcounts"
|
||||||
|
{
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 't1'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
permutation "s1-add-second-node" "s1-begin" "s2-begin" "s2-create-distributed-table" "s1-noshards" "s2-commit" "s1-commit" "s2-shardcounts"
|
||||||
|
permutation "s1-add-second-node" "s1-begin" "s2-begin" "s1-noshards" "s2-create-distributed-table" "s1-commit" "s2-commit" "s2-shardcounts"
|
||||||
|
permutation "s1-begin" "s2-begin" "s1-noshards" "s2-update-node" "s1-commit" "s2-commit"
|
||||||
|
permutation "s1-begin" "s2-begin" "s2-update-node" "s1-noshards" "s2-commit" "s1-commit"
|
|
@ -1,4 +1,5 @@
|
||||||
SET citus.next_shard_id TO 1220000;
|
SET citus.next_shard_id TO 1220000;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000;
|
||||||
|
|
||||||
-- Tests functions related to cluster membership
|
-- Tests functions related to cluster membership
|
||||||
|
|
||||||
|
@ -295,3 +296,96 @@ SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;
|
||||||
-- cleanup
|
-- cleanup
|
||||||
SELECT master_update_node(:worker_1_node, 'localhost', :worker_1_port);
|
SELECT master_update_node(:worker_1_node, 'localhost', :worker_1_port);
|
||||||
SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;
|
SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;
|
||||||
|
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
CREATE TABLE test_dist (x int, y int);
|
||||||
|
SELECT create_distributed_table('test_dist', 'x');
|
||||||
|
|
||||||
|
-- testing behaviour when setting shouldhaveshards to false on partially empty node
|
||||||
|
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
|
||||||
|
CREATE TABLE test_dist_colocated (x int, y int);
|
||||||
|
CREATE TABLE test_dist_non_colocated (x int, y int);
|
||||||
|
CREATE TABLE test_dist_colocated_with_non_colocated (x int, y int);
|
||||||
|
CREATE TABLE test_ref (a int, b int);
|
||||||
|
SELECT create_distributed_table('test_dist_colocated', 'x');
|
||||||
|
SELECT create_distributed_table('test_dist_non_colocated', 'x', colocate_with => 'none');
|
||||||
|
SELECT create_distributed_table('test_dist_colocated_with_non_colocated', 'x', colocate_with => 'test_dist_non_colocated');
|
||||||
|
SELECT create_reference_table('test_ref');
|
||||||
|
|
||||||
|
-- colocated tables should still be placed on shouldhaveshards false nodes for safety
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_dist_colocated'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
|
||||||
|
-- non colocated tables should not be placed on shouldhaveshards false nodes anymore
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_dist_non_colocated'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
|
||||||
|
-- this table should be colocated with the test_dist_non_colocated table
|
||||||
|
-- correctly only on nodes with shouldhaveshards true
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_dist_colocated_with_non_colocated'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
|
||||||
|
-- reference tables should be placed on with shouldhaveshards false
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_ref'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
|
||||||
|
-- cleanup for next test
|
||||||
|
DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated, test_dist_colocated_with_non_colocated;
|
||||||
|
|
||||||
|
-- testing behaviour when setting shouldhaveshards to false on fully empty node
|
||||||
|
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
|
||||||
|
CREATE TABLE test_dist (x int, y int);
|
||||||
|
CREATE TABLE test_dist_colocated (x int, y int);
|
||||||
|
CREATE TABLE test_dist_non_colocated (x int, y int);
|
||||||
|
CREATE TABLE test_ref (a int, b int);
|
||||||
|
SELECT create_distributed_table('test_dist', 'x');
|
||||||
|
SELECT create_reference_table('test_ref');
|
||||||
|
|
||||||
|
-- distributed tables should not be placed on nodes with shouldhaveshards false
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_dist'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
|
||||||
|
-- reference tables should be placed on nodes with shouldhaveshards false
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_ref'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
|
||||||
|
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
|
||||||
|
|
||||||
|
-- distributed tables should still not be placed on nodes that were switched to
|
||||||
|
-- shouldhaveshards true
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_dist'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
|
||||||
|
-- reference tables should still be placed on all nodes with isdatanode 'true'
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_ref'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
|
||||||
|
SELECT create_distributed_table('test_dist_colocated', 'x');
|
||||||
|
SELECT create_distributed_table('test_dist_non_colocated', 'x', colocate_with => 'none');
|
||||||
|
|
||||||
|
-- colocated tables should not be placed on nodedes that were switched to
|
||||||
|
-- shouldhaveshards true
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_dist_colocated'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
|
||||||
|
|
||||||
|
-- non colocated tables should be placed on nodedes that were switched to
|
||||||
|
-- shouldhaveshards true
|
||||||
|
SELECT nodeport, count(*)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'test_dist_non_colocated'::regclass GROUP BY nodeport ORDER BY nodeport;
|
||||||
|
|
||||||
|
SELECT * from master_set_node_property('localhost', :worker_2_port, 'bogusproperty', false);
|
||||||
|
|
||||||
|
DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated;
|
||||||
|
|
|
@ -1,7 +1,11 @@
|
||||||
|
|
||||||
-- if the output of following query changes, we might need to change
|
-- if the output of following query changes, we might need to change
|
||||||
-- some heap_getattr() calls to heap_deform_tuple().
|
-- some heap_getattr() calls to heap_deform_tuple(). This errors out in
|
||||||
|
-- postgres versions before 11. If this test fails check out
|
||||||
|
-- https://github.com/citusdata/citus/pull/2464 for an explanation of what to
|
||||||
|
-- do. Once you used the new code for the table you can add it to the NOT IN
|
||||||
|
-- part of the query so new changes to it won't affect this test.
|
||||||
SELECT attrelid::regclass, attname, atthasmissing, attmissingval
|
SELECT attrelid::regclass, attname, atthasmissing, attmissingval
|
||||||
FROM pg_attribute
|
FROM pg_attribute
|
||||||
WHERE atthasmissing
|
WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass)
|
||||||
ORDER BY attrelid, attname;
|
ORDER BY attrelid, attname;
|
||||||
|
|
|
@ -678,6 +678,30 @@ UPDATE pg_dist_placement
|
||||||
SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port)
|
SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port)
|
||||||
WHERE groupid = :old_worker_2_group;
|
WHERE groupid = :old_worker_2_group;
|
||||||
|
|
||||||
|
-- Confirm that shouldhaveshards is 'true'
|
||||||
|
\c - - - :master_port
|
||||||
|
select shouldhaveshards from pg_dist_node where nodeport = 8888;
|
||||||
|
\c - postgres - :worker_1_port
|
||||||
|
select shouldhaveshards from pg_dist_node where nodeport = 8888;
|
||||||
|
|
||||||
|
|
||||||
|
-- Check that setting shouldhaveshards to false is correctly transferred to other mx nodes
|
||||||
|
\c - - - :master_port
|
||||||
|
SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', false);
|
||||||
|
select shouldhaveshards from pg_dist_node where nodeport = 8888;
|
||||||
|
|
||||||
|
\c - postgres - :worker_1_port
|
||||||
|
select shouldhaveshards from pg_dist_node where nodeport = 8888;
|
||||||
|
|
||||||
|
-- Check that setting shouldhaveshards to true is correctly transferred to other mx nodes
|
||||||
|
\c - postgres - :master_port
|
||||||
|
SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', true);
|
||||||
|
select shouldhaveshards from pg_dist_node where nodeport = 8888;
|
||||||
|
|
||||||
|
\c - postgres - :worker_1_port
|
||||||
|
select shouldhaveshards from pg_dist_node where nodeport = 8888;
|
||||||
|
|
||||||
|
|
||||||
-- Cleanup
|
-- Cleanup
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
|
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
|
||||||
|
|
|
@ -3,7 +3,6 @@
|
||||||
--
|
--
|
||||||
|
|
||||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000;
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000;
|
||||||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000;
|
|
||||||
|
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||||
|
|
|
@ -39,7 +39,6 @@ SELECT * FROM citus_shard_indexes_on_worker;
|
||||||
-- now show that we see the shards, but not the
|
-- now show that we see the shards, but not the
|
||||||
-- indexes as there are no indexes
|
-- indexes as there are no indexes
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SET citus.next_shard_id TO 1330000;
|
|
||||||
SET search_path TO 'mx_hide_shard_names';
|
SET search_path TO 'mx_hide_shard_names';
|
||||||
SELECT * FROM citus_shards_on_worker ORDER BY 2;
|
SELECT * FROM citus_shards_on_worker ORDER BY 2;
|
||||||
SELECT * FROM citus_shard_indexes_on_worker ORDER BY 2;
|
SELECT * FROM citus_shard_indexes_on_worker ORDER BY 2;
|
||||||
|
|
|
@ -84,6 +84,10 @@ DROP TABLE mx_ref_table;
|
||||||
CREATE UNIQUE INDEX mx_test_uniq_index ON mx_table(col_1);
|
CREATE UNIQUE INDEX mx_test_uniq_index ON mx_table(col_1);
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
|
|
||||||
|
-- changing isdatanode
|
||||||
|
SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', false);
|
||||||
|
SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', true);
|
||||||
|
|
||||||
-- DDL commands
|
-- DDL commands
|
||||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass;
|
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass;
|
||||||
CREATE INDEX mx_test_index ON mx_table(col_2);
|
CREATE INDEX mx_test_index ON mx_table(col_2);
|
||||||
|
|
|
@ -9,7 +9,7 @@ BEFORE_CITUS_UPGRADE_COORD_SCHEDULE = './before_citus_upgrade_coord_schedule'
|
||||||
|
|
||||||
MASTER = 'master'
|
MASTER = 'master'
|
||||||
# This should be updated when citus version changes
|
# This should be updated when citus version changes
|
||||||
MASTER_VERSION = '9.0'
|
MASTER_VERSION = '9.1'
|
||||||
|
|
||||||
HOME = expanduser("~")
|
HOME = expanduser("~")
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue