Add nodeRole column

- master_add_node enforces that there is only one primary per group
- there's also a trigger on pg_dist_node to prevent multiple primaries
  per group
- functions in metadata cache only return primary nodes
- Rename ActiveWorkerNodeList -> ActivePrimaryNodeList
- Rename WorkerGetLive{Node->Group}Count()
- Refactor WorkerGetRandomCandidateNode
- master_remove_node only complains about active shard placements if the
  node being removed is a primary.
- master_remove_node only deletes all reference table placements in the
  group if the node being removed is the primary.
- Rename {Node->NodeGroup}HasShardPlacements, this reflects the behavior it
  already had.
- Rename DeleteAllReferenceTablePlacementsFrom{Node->NodeGroup}. This also
  reflects the behavior it already had, but the new signature forces the
  caller to pass in a groupId
- Rename {WorkerGetLiveGroup->ActivePrimaryNode}Count
pull/1505/merge
Brian Cloutier 2017-07-03 18:55:37 +03:00 committed by Brian Cloutier
parent e6c375eb81
commit ec99f8f983
39 changed files with 795 additions and 312 deletions

View File

@ -11,7 +11,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
6.2-1 6.2-2 6.2-3 6.2-4 \
7.0-1 7.0-2 7.0-3 7.0-4
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -147,6 +147,8 @@ $(EXTENSION)--7.0-3.sql: $(EXTENSION)--7.0-2.sql $(EXTENSION)--7.0-2--7.0-3.sql
cat $^ > $@
$(EXTENSION)--7.0-4.sql: $(EXTENSION)--7.0-3.sql $(EXTENSION)--7.0-3--7.0-4.sql
cat $^ > $@
$(EXTENSION)--7.0-5.sql: $(EXTENSION)--7.0-4.sql $(EXTENSION)--7.0-4--7.0-5.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -22,4 +22,5 @@ CREATE OR REPLACE FUNCTION get_all_active_transactions(OUT database_id oid, OUT
AS 'MODULE_PATHNAME', $$get_all_active_transactions$$;
COMMENT ON FUNCTION get_all_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz)
IS 'returns distributed transaction ids of active distributed transactions';
RESET search_path;

View File

@ -0,0 +1,97 @@
/* citus--7.0-4--7.0-5.sql */
SET search_path = 'pg_catalog';
CREATE TYPE pg_catalog.noderole AS ENUM (
'primary', -- node is available and accepting writes
'secondary', -- node is available but only accepts reads
'unavailable' -- node is in recovery or otherwise not usable
-- adding new values to a type inside of a transaction (such as during an ALTER EXTENSION
-- citus UPDATE) isn't allowed in PG 9.6, and only allowed in PG10 if you don't use the
-- new values inside of the same transaction. You might need to replace this type with a
-- new one and then change the column type in pg_dist_node. There's a list of
-- alternatives here:
-- https://stackoverflow.com/questions/1771543/postgresql-updating-an-enum-type/41696273
);
ALTER TABLE pg_dist_node ADD COLUMN noderole noderole NOT NULL DEFAULT 'primary';
-- we're now allowed to have more than one node per group
ALTER TABLE pg_catalog.pg_dist_node DROP CONSTRAINT pg_dist_node_groupid_unique;
-- so make sure pg_dist_shard_placement only returns writable placements
CREATE OR REPLACE VIEW pg_catalog.pg_dist_shard_placement AS
SELECT shardid, shardstate, shardlength, nodename, nodeport, placementid
FROM pg_dist_placement placement INNER JOIN pg_dist_node node ON (
placement.groupid = node.groupid AND node.noderole = 'primary'
);
CREATE OR REPLACE FUNCTION citus.pg_dist_node_trigger_func()
RETURNS TRIGGER AS $$
BEGIN
LOCK TABLE pg_dist_node IN SHARE ROW EXCLUSIVE MODE;
IF (TG_OP = 'INSERT') THEN
IF NEW.noderole = 'primary'
AND EXISTS (SELECT 1 FROM pg_dist_node WHERE groupid = NEW.groupid AND
noderole = 'primary' AND
nodeid <> NEW.nodeid) THEN
RAISE EXCEPTION 'there cannot be two primary nodes in a group';
END IF;
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
IF NEW.noderole = 'primary'
AND EXISTS (SELECT 1 FROM pg_dist_node WHERE groupid = NEW.groupid AND
noderole = 'primary' AND
nodeid <> NEW.nodeid) THEN
RAISE EXCEPTION 'there cannot be two primary nodes in a group';
END IF;
RETURN NEW;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER pg_dist_node_trigger
BEFORE INSERT OR UPDATE ON pg_dist_node
FOR EACH ROW EXECUTE PROCEDURE citus.pg_dist_node_trigger_func();
DROP FUNCTION master_add_node(text, integer);
CREATE FUNCTION master_add_node(nodename text,
nodeport integer,
groupid integer default 0,
noderole noderole default 'primary',
OUT nodeid integer,
OUT groupid integer,
OUT nodename text,
OUT nodeport integer,
OUT noderack text,
OUT hasmetadata boolean,
OUT isactive bool,
OUT noderole noderole)
RETURNS record
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_add_node$$;
COMMENT ON FUNCTION master_add_node(nodename text, nodeport integer,
groupid integer, noderole noderole)
IS 'add node to the cluster';
DROP FUNCTION master_add_inactive_node(text, integer);
CREATE FUNCTION master_add_inactive_node(nodename text,
nodeport integer,
groupid integer default 0,
noderole noderole default 'primary',
OUT nodeid integer,
OUT groupid integer,
OUT nodename text,
OUT nodeport integer,
OUT noderack text,
OUT hasmetadata boolean,
OUT isactive bool,
OUT noderole noderole)
RETURNS record
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$master_add_inactive_node$$;
COMMENT ON FUNCTION master_add_inactive_node(nodename text,nodeport integer,
groupid integer, noderole noderole)
IS 'prepare node by adding it to pg_dist_node';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '7.0-4'
default_version = '7.0-5'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -252,7 +252,7 @@ CreateReferenceTable(Oid relationId)
EnsureCoordinator();
CheckCitusVersion(ERROR);
workerNodeList = ActiveWorkerNodeList();
workerNodeList = ActivePrimaryNodeList();
replicationFactor = list_length(workerNodeList);
/* if there are no workers, error out */
@ -720,7 +720,7 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
static void
EnsureSchemaExistsOnAllNodes(Oid relationId)
{
List *workerNodeList = ActiveWorkerNodeList();
List *workerNodeList = ActivePrimaryNodeList();
ListCell *workerNodeCell = NULL;
StringInfo applySchemaCreationDDL = makeStringInfo();

View File

@ -82,7 +82,7 @@ MultiRealTimeExecute(Job *job)
const char *workerHashName = "Worker node hash";
WaitInfo *waitInfo = MultiClientCreateWaitInfo(list_length(taskList));
workerNodeList = ActiveWorkerNodeList();
workerNodeList = ActivePrimaryNodeList();
workerHash = WorkerHash(workerHashName, workerNodeList);
/* initialize task execution structures for remote execution */

View File

@ -71,7 +71,7 @@ JobExecutorType(MultiPlan *multiPlan)
" queries on the workers.")));
}
workerNodeList = ActiveWorkerNodeList();
workerNodeList = ActivePrimaryNodeList();
workerNodeCount = list_length(workerNodeList);
taskCount = list_length(job->taskList);
tasksPerNode = taskCount / ((double) workerNodeCount);

View File

@ -190,7 +190,7 @@ MultiTaskTrackerExecute(Job *job)
* assigning and checking the status of tasks. The second (temporary) hash
* helps us in fetching results data from worker nodes to the master node.
*/
workerNodeList = ActiveWorkerNodeList();
workerNodeList = ActivePrimaryNodeList();
taskTrackerCount = (uint32) list_length(workerNodeList);
taskTrackerHash = TrackerHash(taskTrackerHashName, workerNodeList);

View File

@ -163,7 +163,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
hashTokenIncrement = HASH_TOKEN_COUNT / shardCount;
/* load and sort the worker node list for deterministic placement */
workerNodeList = ActiveWorkerNodeList();
workerNodeList = ActivePrimaryNodeList();
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
/* make sure we don't process cancel signals until all shards are created */
@ -382,7 +382,7 @@ CreateReferenceTableShard(Oid distributedTableId)
}
/* load and sort the worker node list for deterministic placement */
workerNodeList = ActiveWorkerNodeList();
workerNodeList = ActivePrimaryNodeList();
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
/* get the next shard id */

View File

@ -60,7 +60,7 @@ master_expire_table_cache(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR);
cacheEntry = DistributedTableCacheEntry(relationId);
workerNodeList = ActiveWorkerNodeList();
workerNodeList = ActivePrimaryNodeList();
shardCount = cacheEntry->shardIntervalArrayLength;
shardIntervalArray = cacheEntry->sortedShardIntervalArray;

View File

@ -160,7 +160,7 @@ DistributedTableSize(Oid relationId, char *sizeQuery)
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
workerNodeList = ActiveWorkerNodeList();
workerNodeList = ActivePrimaryNodeList();
foreach(workerNodeCell, workerNodeList)
{
@ -606,11 +606,10 @@ ShardLength(uint64 shardId)
/*
* NodeHasShardPlacements returns whether any active shards are placed on the group
* this node is a part of.
* NodeGroupHasShardPlacements returns whether any active shards are placed on the group
*/
bool
NodeHasShardPlacements(char *nodeName, int32 nodePort, bool onlyConsiderActivePlacements)
NodeGroupHasShardPlacements(uint32 groupId, bool onlyConsiderActivePlacements)
{
const int scanKeyCount = (onlyConsiderActivePlacements ? 2 : 1);
const bool indexOK = false;
@ -621,8 +620,6 @@ NodeHasShardPlacements(char *nodeName, int32 nodePort, bool onlyConsiderActivePl
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[scanKeyCount];
uint32 groupId = GroupForNode(nodeName, nodePort);
Relation pgPlacement = heap_open(DistPlacementRelationId(),
AccessShareLock);

View File

@ -399,7 +399,7 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS)
/* switch to memory context appropriate for multiple function calls */
oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx);
workerNodeList = ActiveWorkerNodeList();
workerNodeList = ActivePrimaryNodeList();
workerNodeCount = (uint32) list_length(workerNodeList);
functionContext->user_fctx = workerNodeList;

View File

@ -72,7 +72,6 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
char *relationName = text_to_cstring(relationNameText);
uint64 shardId = INVALID_SHARD_ID;
uint32 attemptableNodeCount = 0;
uint32 liveNodeCount = 0;
uint32 candidateNodeIndex = 0;
List *candidateNodeList = NIL;
@ -133,13 +132,16 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
/* generate new and unique shardId from sequence */
shardId = GetNextShardId();
/* if enough live nodes, add an extra candidate node as backup */
/* if enough live groups, add an extra candidate node as backup */
{
uint32 primaryNodeCount = ActivePrimaryNodeCount();
attemptableNodeCount = ShardReplicationFactor;
liveNodeCount = WorkerGetLiveNodeCount();
if (liveNodeCount > ShardReplicationFactor)
if (primaryNodeCount > ShardReplicationFactor)
{
attemptableNodeCount = ShardReplicationFactor + 1;
}
}
/* first retrieve a list of random nodes for shard placements */
while (candidateNodeIndex < attemptableNodeCount)
@ -152,7 +154,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
}
else if (ShardPlacementPolicy == SHARD_PLACEMENT_ROUND_ROBIN)
{
List *workerNodeList = ActiveWorkerNodeList();
List *workerNodeList = ActivePrimaryNodeList();
candidateNode = WorkerGetRoundRobinCandidateNode(workerNodeList, shardId,
candidateNodeIndex);
}

View File

@ -42,8 +42,8 @@ int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size *
/* Local functions forward declarations */
static WorkerNode * WorkerGetNodeWithName(const char *hostname);
static char * ClientHostAddress(StringInfo remoteHostStringInfo);
static WorkerNode * FindRandomNodeNotInList(HTAB *WorkerNodesHash,
List *currentNodeList);
static List * PrimaryNodesNotInList(List *currentList);
static WorkerNode * FindRandomNodeFromList(List *candidateWorkerNodeList);
static bool OddNumber(uint32 number);
static bool ListMember(List *currentList, WorkerNode *workerNode);
@ -54,9 +54,8 @@ static bool ListMember(List *currentList, WorkerNode *workerNode);
*/
/*
* WorkerGetRandomCandidateNode takes in a list of worker nodes, and then allocates
* a new worker node. The allocation is performed by randomly picking a worker node
* which is not in currentNodeList.
* WorkerGetRandomCandidateNode accepts a list of WorkerNode's and returns a random
* primary node which is not in that list.
*
* Note that the function returns null if the worker membership list does not
* contain enough nodes to allocate a new worker node.
@ -69,16 +68,11 @@ WorkerGetRandomCandidateNode(List *currentNodeList)
uint32 tryCount = WORKER_RACK_TRIES;
uint32 tryIndex = 0;
HTAB *workerNodeHash = GetWorkerNodeHash();
/*
* We check if the shard has already been placed on all nodes known to us.
* This check is rather defensive, and has the drawback of performing a full
* scan over the worker node hash for determining the number of live nodes.
*/
uint32 currentNodeCount = list_length(currentNodeList);
uint32 liveNodeCount = WorkerGetLiveNodeCount();
if (currentNodeCount >= liveNodeCount)
List *candidateWorkerNodeList = PrimaryNodesNotInList(currentNodeList);
/* we check if the shard has already been placed on all nodes known to us */
if (list_length(candidateWorkerNodeList) == 0)
{
return NULL;
}
@ -86,7 +80,7 @@ WorkerGetRandomCandidateNode(List *currentNodeList)
/* if current node list is empty, randomly pick one node and return */
if (currentNodeCount == 0)
{
workerNode = FindRandomNodeNotInList(workerNodeHash, NIL);
workerNode = FindRandomNodeFromList(candidateWorkerNodeList);
return workerNode;
}
@ -116,7 +110,7 @@ WorkerGetRandomCandidateNode(List *currentNodeList)
char *workerRack = NULL;
bool sameRack = false;
workerNode = FindRandomNodeNotInList(workerNodeHash, currentNodeList);
workerNode = FindRandomNodeFromList(candidateWorkerNodeList);
workerRack = workerNode->workerRack;
sameRack = (strncmp(workerRack, firstRack, WORKER_LENGTH) == 0);
@ -302,12 +296,12 @@ WorkerGetNodeWithName(const char *hostname)
/*
* WorkerGetLiveNodeCount returns the number of live nodes in the cluster.
* */
* ActivePrimaryNodeCount returns the number of groups with a primary in the cluster.
*/
uint32
WorkerGetLiveNodeCount(void)
ActivePrimaryNodeCount(void)
{
List *workerNodeList = ActiveWorkerNodeList();
List *workerNodeList = ActivePrimaryNodeList();
uint32 liveWorkerCount = list_length(workerNodeList);
return liveWorkerCount;
@ -315,11 +309,10 @@ WorkerGetLiveNodeCount(void)
/*
* ActiveWorkerNodeList iterates over the hash table that includes the worker
* nodes and adds active nodes to a list, which is returned.
* ActivePrimaryNodeList returns a list of all the active primary nodes in workerNodeHash
*/
List *
ActiveWorkerNodeList(void)
ActivePrimaryNodeList(void)
{
List *workerNodeList = NIL;
WorkerNode *workerNode = NULL;
@ -330,7 +323,7 @@ ActiveWorkerNodeList(void)
while ((workerNode = hash_seq_search(&status)) != NULL)
{
if (workerNode->isActive)
if (workerNode->isActive && WorkerNodeIsPrimary(workerNode))
{
WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode));
memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode));
@ -343,70 +336,48 @@ ActiveWorkerNodeList(void)
/*
* FindRandomNodeNotInList finds a random node from the shared hash that is not
* a member of the current node list. The caller is responsible for making the
* necessary node count checks to ensure that such a node exists.
*
* Note that this function has a selection bias towards nodes whose positions in
* the shared hash are sequentially adjacent to the positions of nodes that are
* in the current node list. This bias follows from our decision to first pick a
* random node in the hash, and if that node is a member of the current list, to
* simply iterate to the next node in the hash. Overall, this approach trades in
* some selection bias for simplicity in design and for bounded execution time.
* PrimaryNodesNotInList scans through the worker node hash and returns a list of all
* primary nodes which are not in currentList. It runs in O(n*m) but currentList is
* quite small.
*/
static WorkerNode *
FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList)
static List *
PrimaryNodesNotInList(List *currentList)
{
List *workerNodeList = NIL;
HTAB *workerNodeHash = GetWorkerNodeHash();
WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status;
uint32 workerNodeCount = 0;
uint32 currentNodeCount PG_USED_FOR_ASSERTS_ONLY = 0;
bool lookForWorkerNode = true;
uint32 workerPosition = 0;
uint32 workerIndex = 0;
workerNodeCount = hash_get_num_entries(WorkerNodesHash);
currentNodeCount = list_length(currentNodeList);
Assert(workerNodeCount > currentNodeCount);
hash_seq_init(&status, workerNodeHash);
/*
* We determine a random position within the worker hash between [1, N],
* assuming that the number of elements in the hash is N. We then get to
* this random position by iterating over the worker hash. Please note that
* the random seed has already been set by the postmaster when starting up.
*/
workerPosition = (random() % workerNodeCount) + 1;
hash_seq_init(&status, WorkerNodesHash);
for (workerIndex = 0; workerIndex < workerPosition; workerIndex++)
while ((workerNode = hash_seq_search(&status)) != NULL)
{
workerNode = (WorkerNode *) hash_seq_search(&status);
if (ListMember(currentList, workerNode))
{
continue;
}
while (lookForWorkerNode)
if (WorkerNodeIsPrimary(workerNode))
{
bool listMember = ListMember(currentNodeList, workerNode);
if (!listMember)
{
lookForWorkerNode = false;
}
else
{
/* iterate to the next worker node in the hash */
workerNode = (WorkerNode *) hash_seq_search(&status);
/* reached end of hash; start from the beginning */
if (workerNode == NULL)
{
hash_seq_init(&status, WorkerNodesHash);
workerNode = (WorkerNode *) hash_seq_search(&status);
}
workerNodeList = lappend(workerNodeList, workerNode);
}
}
/* we stopped scanning before completion; therefore clean up scan */
hash_seq_term(&status);
return workerNodeList;
}
/* FindRandomNodeFromList picks a random node from the list provided to it. */
static WorkerNode *
FindRandomNodeFromList(List *candidateWorkerNodeList)
{
uint32 candidateNodeCount = list_length(candidateWorkerNodeList);
/* nb, the random seed has already been set by the postmaster when starting up */
uint32 workerPosition = (random() % candidateNodeCount);
WorkerNode *workerNode =
(WorkerNode *) list_nth(candidateWorkerNodeList, workerPosition);
return workerNode;
}

View File

@ -217,7 +217,7 @@ MetadataCreateCommands(void)
List *metadataSnapshotCommandList = NIL;
List *distributedTableList = DistributedTableList();
List *propagatedTableList = NIL;
List *workerNodeList = ActiveWorkerNodeList();
List *workerNodeList = ActivePrimaryNodeList();
ListCell *distributedTableCell = NULL;
char *nodeListInsertCommand = NULL;
bool includeSequenceDefaults = true;
@ -398,6 +398,7 @@ NodeListInsertCommand(List *workerNodeList)
StringInfo nodeListInsertCommand = makeStringInfo();
int workerCount = list_length(workerNodeList);
int processedWorkerNodeCount = 0;
Oid primaryRole = PrimaryNodeRoleId();
/* if there are no workers, return NULL */
if (workerCount == 0)
@ -405,10 +406,18 @@ NodeListInsertCommand(List *workerNodeList)
return nodeListInsertCommand->data;
}
if (primaryRole == InvalidOid)
{
ereport(ERROR, (errmsg("bad metadata, noderole does not exist"),
errdetail("you should never see this, please submit "
"a bug report"),
errhint("run ALTER EXTENSION citus UPDATE and try again")));
}
/* generate the query without any values yet */
appendStringInfo(nodeListInsertCommand,
"INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, "
"noderack, hasmetadata, isactive) VALUES ");
"noderack, hasmetadata, isactive, noderole) VALUES ");
/* iterate over the worker nodes, add the values */
foreach(workerNodeCell, workerNodeList)
@ -417,15 +426,20 @@ NodeListInsertCommand(List *workerNodeList)
char *hasMetadataString = workerNode->hasMetadata ? "TRUE" : "FALSE";
char *isActiveString = workerNode->isActive ? "TRUE" : "FALSE";
Datum nodeRoleOidDatum = ObjectIdGetDatum(workerNode->nodeRole);
Datum nodeRoleStringDatum = DirectFunctionCall1(enum_out, nodeRoleOidDatum);
char *nodeRoleString = DatumGetCString(nodeRoleStringDatum);
appendStringInfo(nodeListInsertCommand,
"(%d, %d, %s, %d, %s, %s, %s)",
"(%d, %d, %s, %d, %s, %s, %s, '%s'::noderole)",
workerNode->nodeId,
workerNode->groupId,
quote_literal_cstr(workerNode->workerName),
workerNode->workerPort,
quote_literal_cstr(workerNode->workerRack),
hasMetadataString,
isActiveString);
isActiveString,
nodeRoleString);
processedWorkerNodeCount++;
if (processedWorkerNodeCount != workerCount)
@ -1014,7 +1028,7 @@ SchemaOwnerName(Oid objectId)
static bool
HasMetadataWorkers(void)
{
List *workerNodeList = ActiveWorkerNodeList();
List *workerNodeList = ActivePrimaryNodeList();
ListCell *workerNodeCell = NULL;
foreach(workerNodeCell, workerNodeList)

View File

@ -1863,10 +1863,10 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey,
static uint32
HashPartitionCount(void)
{
uint32 nodeCount = WorkerGetLiveNodeCount();
uint32 groupCount = ActivePrimaryNodeCount();
double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0;
uint32 partitionCount = (uint32) rint(nodeCount * maxReduceTasksPerNode);
uint32 partitionCount = (uint32) rint(groupCount * maxReduceTasksPerNode);
return partitionCount;
}
@ -4791,7 +4791,7 @@ GreedyAssignTaskList(List *taskList)
uint32 taskCount = list_length(taskList);
/* get the worker node list and sort the list */
List *workerNodeList = ActiveWorkerNodeList();
List *workerNodeList = ActivePrimaryNodeList();
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
/*
@ -5223,7 +5223,7 @@ AssignDualHashTaskList(List *taskList)
* if subsequent jobs have a small number of tasks, we won't allocate the
* tasks to the same worker repeatedly.
*/
List *workerNodeList = ActiveWorkerNodeList();
List *workerNodeList = ActivePrimaryNodeList();
uint32 workerNodeCount = (uint32) list_length(workerNodeList);
uint32 beginningNodeIndex = jobId % workerNodeCount;

View File

@ -1535,7 +1535,7 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC
}
else if (replacePrunedQueryWithDummy)
{
List *workerNodeList = ActiveWorkerNodeList();
List *workerNodeList = ActivePrimaryNodeList();
if (workerNodeList != NIL)
{
WorkerNode *workerNode = (WorkerNode *) linitial(workerNodeList);

View File

@ -127,7 +127,7 @@ RecoverPreparedTransactions(void)
*/
LockRelationOid(DistTransactionRelationId(), ExclusiveLock);
workerList = ActiveWorkerNodeList();
workerList = ActivePrimaryNodeList();
foreach(workerNodeCell, workerList)
{

View File

@ -78,7 +78,7 @@ SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command)
void
SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList)
{
List *workerNodeList = ActiveWorkerNodeList();
List *workerNodeList = ActivePrimaryNodeList();
ListCell *workerNodeCell = NULL;
char *nodeUser = CitusExtensionOwnerName();
ListCell *commandCell = NULL;
@ -128,7 +128,7 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command,
{
List *connectionList = NIL;
ListCell *connectionCell = NULL;
List *workerNodeList = ActiveWorkerNodeList();
List *workerNodeList = ActivePrimaryNodeList();
ListCell *workerNodeCell = NULL;
char *nodeUser = CitusExtensionOwnerName();

View File

@ -40,7 +40,9 @@
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "parser/parse_func.h"
#include "parser/parse_type.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/datum.h"
@ -110,6 +112,9 @@ typedef struct MetadataCacheData
Oid extraDataContainerFuncId;
Oid workerHashFunctionId;
Oid extensionOwner;
Oid primaryNodeRoleId;
Oid secondaryNodeRoleId;
Oid unavailableNodeRoleId;
} MetadataCacheData;
@ -422,18 +427,24 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
DistTableCacheEntry *tableEntry = shardEntry->tableEntry;
int shardIndex = shardEntry->shardIndex;
ShardInterval *shardInterval = tableEntry->sortedShardIntervalArray[shardIndex];
bool groupContainsNodes = false;
ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement);
uint32 groupId = groupShardPlacement->groupId;
WorkerNode *workerNode = NodeForGroup(groupId);
WorkerNode *workerNode = PrimaryNodeForGroup(groupId, &groupContainsNodes);
if (workerNode == NULL)
if (workerNode == NULL && !groupContainsNodes)
{
ereport(ERROR, (errmsg("the metadata is inconsistent"),
errdetail("there is a placement in group %u but "
"there are no nodes in that group", groupId)));
}
if (workerNode == NULL && groupContainsNodes)
{
ereport(ERROR, (errmsg("node group %u does not have a primary node", groupId)));
}
/* copy everything into shardPlacement but preserve the header */
memcpy((((CitusNode *) shardPlacement) + 1),
(((CitusNode *) groupShardPlacement) + 1),
@ -1840,6 +1851,99 @@ CurrentUserName(void)
}
/*
* LookupNodeRoleValueId returns the Oid of the "pg_catalog.noderole" type, or InvalidOid
* if it does not exist.
*/
static Oid
LookupNodeRoleTypeOid()
{
Value *schemaName = makeString("pg_catalog");
Value *typeName = makeString("noderole");
List *qualifiedName = list_make2(schemaName, typeName);
TypeName *enumTypeName = makeTypeNameFromNameList(qualifiedName);
Oid nodeRoleTypId;
/* typenameTypeId but instead of raising an error return InvalidOid */
Type tup = LookupTypeName(NULL, enumTypeName, NULL, false);
if (tup == NULL)
{
return InvalidOid;
}
nodeRoleTypId = HeapTupleGetOid(tup);
ReleaseSysCache(tup);
return nodeRoleTypId;
}
/*
* LookupNodeRoleValueId returns the Oid of the value in "pg_catalog.noderole" which
* matches the provided name, or InvalidOid if the noderole enum doesn't exist yet.
*/
static Oid
LookupNodeRoleValueId(char *valueName)
{
Oid nodeRoleTypId = LookupNodeRoleTypeOid();
if (nodeRoleTypId == InvalidOid)
{
return InvalidOid;
}
else
{
Datum nodeRoleIdDatum = ObjectIdGetDatum(nodeRoleTypId);
Datum valueDatum = CStringGetDatum(valueName);
Datum valueIdDatum = DirectFunctionCall2(enum_in, valueDatum, nodeRoleIdDatum);
Oid valueId = DatumGetObjectId(valueIdDatum);
return valueId;
}
}
/* return the Oid of the 'primary' nodeRole enum value */
Oid
PrimaryNodeRoleId(void)
{
if (!MetadataCache.primaryNodeRoleId)
{
MetadataCache.primaryNodeRoleId = LookupNodeRoleValueId("primary");
}
return MetadataCache.primaryNodeRoleId;
}
/* return the Oid of the 'secodary' nodeRole enum value */
Oid
SecondaryNodeRoleId(void)
{
if (!MetadataCache.secondaryNodeRoleId)
{
MetadataCache.secondaryNodeRoleId = LookupNodeRoleValueId("secondary");
}
return MetadataCache.secondaryNodeRoleId;
}
/* return the Oid of the 'unavailable' nodeRole enum value */
Oid
UnavailableNodeRoleId(void)
{
if (!MetadataCache.unavailableNodeRoleId)
{
MetadataCache.unavailableNodeRoleId = LookupNodeRoleValueId("unavailable");
}
return MetadataCache.unavailableNodeRoleId;
}
/*
* master_dist_partition_cache_invalidate is a trigger function that performs
* relcache invalidations when the contents of pg_dist_partition are changed
@ -2244,6 +2348,7 @@ InitializeWorkerNodeCache(void)
strlcpy(workerNode->workerRack, currentNode->workerRack, WORKER_LENGTH);
workerNode->hasMetadata = currentNode->hasMetadata;
workerNode->isActive = currentNode->isActive;
workerNode->nodeRole = currentNode->nodeRole;
if (handleFound)
{

View File

@ -18,6 +18,7 @@
#include "access/tupmacs.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "commands/sequence.h"
#include "distributed/colocation_utils.h"
#include "distributed/connection_management.h"
@ -53,7 +54,8 @@ static Datum ActivateNode(char *nodeName, int nodePort);
static void RemoveNodeFromCluster(char *nodeName, int32 nodePort);
static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId,
char *nodeRack, bool hasMetadata, bool isActive,
bool *nodeAlreadyExists);
Oid nodeRole, bool *nodeAlreadyExists);
static uint32 CountPrimariesWithMetadata();
static void SetNodeState(char *nodeName, int32 nodePort, bool isActive);
static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort);
static Datum GenerateNodeTuple(WorkerNode *workerNode);
@ -61,7 +63,7 @@ static int32 GetNextGroupId(void);
static uint32 GetMaxGroupId(void);
static int GetNextNodeId(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId,
char *nodeRack, bool hasMetadata, bool isActive);
char *nodeRack, bool hasMetadata, bool isActive, Oid nodeRole);
static void DeleteNodeRow(char *nodename, int32 nodeport);
static List * ParseWorkerNodeFileAndRename(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
@ -86,7 +88,8 @@ master_add_node(PG_FUNCTION_ARGS)
text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
int32 groupId = 0;
int32 groupId = PG_GETARG_INT32(2);
Oid nodeRole = InvalidOid;
char *nodeRack = WORKER_DEFAULT_RACK;
bool hasMetadata = false;
bool isActive = false;
@ -95,8 +98,18 @@ master_add_node(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR);
/* during tests this function is called before nodeRole has been created */
if (PG_NARGS() == 3)
{
nodeRole = InvalidOid;
}
else
{
nodeRole = PG_GETARG_OID(3);
}
nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
hasMetadata, isActive, &nodeAlreadyExists);
hasMetadata, isActive, nodeRole, &nodeAlreadyExists);
/*
* After adding new node, if the node did not already exist, we will activate
@ -123,7 +136,8 @@ master_add_inactive_node(PG_FUNCTION_ARGS)
text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
int32 groupId = 0;
int32 groupId = PG_GETARG_INT32(2);
Oid nodeRole = PG_GETARG_OID(3);
char *nodeRack = WORKER_DEFAULT_RACK;
bool hasMetadata = false;
bool isActive = false;
@ -133,7 +147,7 @@ master_add_inactive_node(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR);
nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
hasMetadata, isActive, &nodeAlreadyExists);
hasMetadata, isActive, nodeRole, &nodeAlreadyExists);
PG_RETURN_CSTRING(nodeRecord);
}
@ -183,16 +197,21 @@ master_disable_node(PG_FUNCTION_ARGS)
int32 nodePort = PG_GETARG_INT32(1);
char *nodeName = text_to_cstring(nodeNameText);
bool hasActiveShardPlacements = false;
bool isActive = false;
WorkerNode *workerNode = NULL;
CheckCitusVersion(ERROR);
DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort);
workerNode = FindWorkerNode(nodeName, nodePort);
hasActiveShardPlacements = NodeHasShardPlacements(nodeName, nodePort,
onlyConsiderActivePlacements);
if (hasActiveShardPlacements)
if (WorkerNodeIsPrimary(workerNode))
{
DeleteAllReferenceTablePlacementsFromNodeGroup(workerNode->groupId);
}
if (WorkerNodeIsPrimary(workerNode) &&
NodeGroupHasShardPlacements(workerNode->groupId, onlyConsiderActivePlacements))
{
ereport(NOTICE, (errmsg("Node %s:%d has active shard placements. Some queries "
"may fail after this operation. Use "
@ -246,11 +265,31 @@ GroupForNode(char *nodeName, int nodePort)
/*
* NodeForGroup returns the (unique) node which is in this group.
* In a future where we have nodeRole this will return the primary node.
* WorkerNodeIsPrimary returns whether the argument represents a primary node.
*/
bool
WorkerNodeIsPrimary(WorkerNode *worker)
{
Oid primaryRole = PrimaryNodeRoleId();
/* if nodeRole does not yet exist, all nodes are primary nodes */
if (primaryRole == InvalidOid)
{
return true;
}
return worker->nodeRole == primaryRole;
}
/*
* PrimaryNodeForGroup returns the (unique) primary in the specified group.
*
* If there are any nodes in the requested group and groupContainsNodes is not NULL
* it will set the bool groupContainsNodes references to true.
*/
WorkerNode *
NodeForGroup(uint32 groupId)
PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes)
{
WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status;
@ -261,8 +300,17 @@ NodeForGroup(uint32 groupId)
while ((workerNode = hash_seq_search(&status)) != NULL)
{
uint32 workerNodeGroupId = workerNode->groupId;
if (workerNodeGroupId != groupId)
{
continue;
}
if (workerNodeGroupId == groupId)
if (groupContainsNodes != NULL)
{
*groupContainsNodes = true;
}
if (WorkerNodeIsPrimary(workerNode))
{
hash_seq_term(&status);
return workerNode;
@ -300,9 +348,13 @@ ActivateNode(char *nodeName, int nodePort)
SetNodeState(nodeName, nodePort, isActive);
ReplicateAllReferenceTablesToNode(nodeName, nodePort);
workerNode = FindWorkerNode(nodeName, nodePort);
if (WorkerNodeIsPrimary(workerNode))
{
ReplicateAllReferenceTablesToNode(nodeName, nodePort);
}
nodeRecord = GenerateNodeTuple(workerNode);
heap_close(pgDistNode, AccessShareLock);
@ -322,6 +374,7 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
ListCell *workerNodeCell = NULL;
List *workerNodes = NULL;
bool nodeAlreadyExists = false;
Oid nodeRole = InvalidOid; /* nodeRole doesn't exist when this function is called */
CheckCitusVersion(ERROR);
@ -332,7 +385,7 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0,
workerNode->workerRack, false, workerNode->isActive,
&nodeAlreadyExists);
nodeRole, &nodeAlreadyExists);
}
PG_RETURN_BOOL(true);
@ -509,7 +562,6 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
{
const bool onlyConsiderActivePlacements = false;
char *nodeDeleteCommand = NULL;
bool hasAnyShardPlacements = false;
WorkerNode *workerNode = NULL;
List *referenceTableList = NIL;
uint32 deletedNodeId = INVALID_PLACEMENT_ID;
@ -518,19 +570,26 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
EnsureSuperUser();
workerNode = FindWorkerNode(nodeName, nodePort);
if (workerNode == NULL)
{
ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
}
if (workerNode != NULL)
{
deletedNodeId = workerNode->nodeId;
}
DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort);
hasAnyShardPlacements = NodeHasShardPlacements(nodeName, nodePort,
onlyConsiderActivePlacements);
if (hasAnyShardPlacements)
if (WorkerNodeIsPrimary(workerNode))
{
ereport(ERROR, (errmsg("you cannot remove a node which has shard placements")));
DeleteAllReferenceTablePlacementsFromNodeGroup(workerNode->groupId);
}
if (WorkerNodeIsPrimary(workerNode) &&
NodeGroupHasShardPlacements(workerNode->groupId, onlyConsiderActivePlacements))
{
ereport(ERROR, (errmsg("you cannot remove the primary node of a node group "
"which has shard placements")));
}
DeleteNodeRow(nodeName, nodePort);
@ -540,16 +599,20 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
* column for colocation group of reference tables so that replication factor will
* be equal to worker count.
*/
if (WorkerNodeIsPrimary(workerNode))
{
referenceTableList = ReferenceTableOidList();
if (list_length(referenceTableList) != 0)
{
Oid firstReferenceTableId = linitial_oid(referenceTableList);
uint32 referenceTableColocationId = TableColocationId(firstReferenceTableId);
List *workerNodeList = ActiveWorkerNodeList();
List *workerNodeList = ActivePrimaryNodeList();
int workerCount = list_length(workerNodeList);
UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount);
UpdateColocationGroupReplicationFactor(referenceTableColocationId,
workerCount);
}
}
nodeDeleteCommand = NodeDeleteCommand(deletedNodeId);
@ -561,6 +624,30 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
}
/* CountPrimariesWithMetadata returns the number of primary nodes which have metadata. */
static uint32
CountPrimariesWithMetadata()
{
uint32 primariesWithMetadata = 0;
WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status;
HTAB *workerNodeHash = GetWorkerNodeHash();
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
{
if (workerNode->hasMetadata && WorkerNodeIsPrimary(workerNode))
{
primariesWithMetadata++;
}
}
return primariesWithMetadata;
}
/*
* AddNodeMetadata checks the given node information and adds the specified node to the
* pg_dist_node table of the master and workers with metadata.
@ -572,15 +659,14 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
*/
static Datum
AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
bool hasMetadata, bool isActive, bool *nodeAlreadyExists)
bool hasMetadata, bool isActive, Oid nodeRole, bool *nodeAlreadyExists)
{
Relation pgDistNode = NULL;
int nextNodeIdInt = 0;
Datum returnData = 0;
WorkerNode *workerNode = NULL;
char *nodeDeleteCommand = NULL;
char *nodeInsertCommand = NULL;
List *workerNodeList = NIL;
uint32 primariesWithMetadata = 0;
EnsureCoordinator();
EnsureSuperUser();
@ -620,27 +706,43 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
}
}
/* if nodeRole hasn't been added yet there's a constraint for one-node-per-group */
if (nodeRole != InvalidOid)
{
if (nodeRole == PrimaryNodeRoleId())
{
WorkerNode *existingPrimaryNode = PrimaryNodeForGroup(groupId, NULL);
if (existingPrimaryNode != NULL)
{
ereport(ERROR, (errmsg("group %d already has a primary node", groupId)));
}
}
}
/* generate the new node id from the sequence */
nextNodeIdInt = GetNextNodeId();
InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata,
isActive);
isActive, nodeRole);
workerNode = FindWorkerNode(nodeName, nodePort);
/* send the delete command all nodes with metadata */
/* send the delete command to all primary nodes with metadata */
nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand);
/* finally prepare the insert command and send it to all primary nodes */
workerNodeList = list_make1(workerNode);
nodeInsertCommand = NodeListInsertCommand(workerNodeList);
primariesWithMetadata = CountPrimariesWithMetadata();
if (primariesWithMetadata != 0)
{
List *workerNodeList = list_make1(workerNode);
char *nodeInsertCommand = NodeListInsertCommand(workerNodeList);
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand);
}
heap_close(pgDistNode, AccessExclusiveLock);
heap_close(pgDistNode, NoLock);
/* fetch the worker node, and generate the output */
workerNode = FindWorkerNode(nodeName, nodePort);
returnData = GenerateNodeTuple(workerNode);
return returnData;
@ -751,6 +853,7 @@ GenerateNodeTuple(WorkerNode *workerNode)
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(workerNode->workerRack);
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(workerNode->hasMetadata);
values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(workerNode->isActive);
values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(workerNode->nodeRole);
/* open shard relation and insert new tuple */
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
@ -888,7 +991,7 @@ EnsureCoordinator(void)
*/
static void
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack,
bool hasMetadata, bool isActive)
bool hasMetadata, bool isActive, Oid nodeRole)
{
Relation pgDistNode = NULL;
TupleDesc tupleDescriptor = NULL;
@ -907,6 +1010,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack);
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata);
values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(isActive);
values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeRole);
/* open shard relation and insert new tuple */
pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
@ -917,7 +1021,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *
CatalogTupleInsert(pgDistNode, heapTuple);
/* close relation and invalidate previous cache entry */
heap_close(pgDistNode, AccessExclusiveLock);
heap_close(pgDistNode, NoLock);
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
@ -1147,6 +1251,8 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
tupleDescriptor, &isNull);
Datum isActive = heap_getattr(heapTuple, Anum_pg_dist_node_isactive,
tupleDescriptor, &isNull);
Datum nodeRole = heap_getattr(heapTuple, Anum_pg_dist_node_noderole,
tupleDescriptor, &isNull);
Assert(!HeapTupleHasNulls(heapTuple));
@ -1158,6 +1264,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH);
workerNode->hasMetadata = DatumGetBool(hasMetadata);
workerNode->isActive = DatumGetBool(isActive);
workerNode->nodeRole = DatumGetObjectId(nodeRole);
return workerNode;
}

View File

@ -127,7 +127,7 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
{
List *referenceTableList = ReferenceTableOidList();
ListCell *referenceTableCell = NULL;
List *workerNodeList = ActiveWorkerNodeList();
List *workerNodeList = ActivePrimaryNodeList();
uint32 workerCount = 0;
Oid firstReferenceTableId = InvalidOid;
uint32 referenceTableColocationId = INVALID_COLOCATION_ID;
@ -228,7 +228,7 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval)
{
/* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
List *workerNodeList = ActiveWorkerNodeList();
List *workerNodeList = ActivePrimaryNodeList();
ListCell *workerNodeCell = NULL;
/*
@ -364,7 +364,7 @@ uint32
CreateReferenceTableColocationId()
{
uint32 colocationId = INVALID_COLOCATION_ID;
List *workerNodeList = ActiveWorkerNodeList();
List *workerNodeList = ActivePrimaryNodeList();
int shardCount = 1;
int replicationFactor = list_length(workerNodeList);
Oid distributionColumnType = InvalidOid;
@ -382,13 +382,13 @@ CreateReferenceTableColocationId()
/*
* DeleteAllReferenceTablePlacementsFromNode function iterates over list of reference
* DeleteAllReferenceTablePlacementsFromNodeGroup function iterates over list of reference
* tables and deletes all reference table placements from pg_dist_placement table
* for given worker node. However, it does not modify replication factor of the colocation
* for given group. However, it does not modify replication factor of the colocation
* group of reference tables. It is caller's responsibility to do that if it is necessary.
*/
void
DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort)
DeleteAllReferenceTablePlacementsFromNodeGroup(uint32 groupId)
{
List *referenceTableList = ReferenceTableOidList();
ListCell *referenceTableCell = NULL;
@ -401,7 +401,7 @@ DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort)
/*
* We sort the reference table list to prevent deadlocks in concurrent
* DeleteAllReferenceTablePlacementsFromNode calls.
* DeleteAllReferenceTablePlacementsFromNodeGroup calls.
*/
referenceTableList = SortList(referenceTableList, CompareOids);
foreach(referenceTableCell, referenceTableList)
@ -409,11 +409,9 @@ DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort)
GroupShardPlacement *placement = NULL;
StringInfo deletePlacementCommand = makeStringInfo();
uint32 workerGroup = GroupForNode(workerName, workerPort);
Oid referenceTableId = lfirst_oid(referenceTableCell);
List *placements = GroupShardPlacementsForTableOnGroup(referenceTableId,
workerGroup);
groupId);
if (list_length(placements) == 0)
{
/* this happens if the node was previously disabled */

View File

@ -115,8 +115,8 @@ extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInt
extern void CopyShardPlacement(ShardPlacement *srcPlacement,
ShardPlacement *destPlacement);
extern uint64 ShardLength(uint64 shardId);
extern bool NodeHasShardPlacements(char *nodeName, int32 nodePort,
bool onlyLookForActivePlacements);
extern bool NodeGroupHasShardPlacements(uint32 groupId,
bool onlyConsiderActivePlacements);
extern List * FinalizedShardPlacementList(uint64 shardId);
extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk);
extern List * BuildShardPlacementList(ShardInterval *shardInterval);

View File

@ -112,6 +112,11 @@ extern Oid DistPlacementGroupidIndexId(void);
extern Oid CitusExtraDataContainerFuncId(void);
extern Oid CitusWorkerHashFunctionId(void);
/* nodeRole enum oids */
extern Oid PrimaryNodeRoleId(void);
extern Oid SecondaryNodeRoleId(void);
extern Oid UnavailableNodeRoleId(void);
/* user related functions */
extern Oid CitusExtensionOwner(void);
extern char * CitusExtensionOwnerName(void);

View File

@ -24,6 +24,7 @@ typedef struct FormData_pg_dist_node
int nodeport;
bool hasmetadata;
bool isactive
Oid noderole;
#endif
} FormData_pg_dist_node;
@ -38,7 +39,7 @@ typedef FormData_pg_dist_node *Form_pg_dist_node;
* compiler constants for pg_dist_node
* ----------------
*/
#define Natts_pg_dist_node 7
#define Natts_pg_dist_node 8
#define Anum_pg_dist_node_nodeid 1
#define Anum_pg_dist_node_groupid 2
#define Anum_pg_dist_node_nodename 3
@ -46,6 +47,7 @@ typedef FormData_pg_dist_node *Form_pg_dist_node;
#define Anum_pg_dist_node_noderack 5
#define Anum_pg_dist_node_hasmetadata 6
#define Anum_pg_dist_node_isactive 7
#define Anum_pg_dist_node_noderole 8
#define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq"
#define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq"

View File

@ -14,8 +14,7 @@
extern uint32 CreateReferenceTableColocationId(void);
extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort);
extern void DeleteAllReferenceTablePlacementsFromNode(char *workerName,
uint32 workerPort);
extern void DeleteAllReferenceTablePlacementsFromNodeGroup(uint32 groupId);
extern List * ReferenceTableOidList(void);
extern int CompareOids(const void *leftElement, const void *rightElement);

View File

@ -43,6 +43,7 @@ typedef struct WorkerNode
char workerRack[WORKER_LENGTH]; /* node's network location */
bool hasMetadata; /* node gets metadata changes */
bool isActive; /* node's state */
Oid nodeRole; /* the node's role in its group */
} WorkerNode;
@ -57,13 +58,14 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList,
uint64 shardId,
uint32 placementIndex);
extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList);
extern uint32 WorkerGetLiveNodeCount(void);
extern List * ActiveWorkerNodeList(void);
extern uint32 ActivePrimaryNodeCount(void);
extern List * ActivePrimaryNodeList(void);
extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort);
extern List * ReadWorkerNodes(void);
extern void EnsureCoordinator(void);
extern uint32 GroupForNode(char *nodeName, int32 nodePorT);
extern WorkerNode * NodeForGroup(uint32 groupId);
extern WorkerNode * PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes);
extern bool WorkerNodeIsPrimary(WorkerNode *worker);
/* Function declarations for worker node utilities */
extern int CompareWorkerNodes(const void *leftElement, const void *rightElement);

View File

@ -29,9 +29,9 @@ SELECT master_get_active_worker_nodes();
-- try to add a node that is already in the cluster
SELECT * FROM master_add_node('localhost', :worker_1_port);
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive
--------+---------+-----------+----------+----------+-------------+----------
1 | 1 | localhost | 57637 | default | f | t
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole
--------+---------+-----------+----------+----------+-------------+----------+----------
1 | 1 | localhost | 57637 | default | f | t | primary
(1 row)
-- get the active nodes
@ -78,8 +78,8 @@ SELECT master_get_active_worker_nodes();
-- add some shard placements to the cluster
SELECT master_activate_node('localhost', :worker_2_port);
master_activate_node
-----------------------------------
(3,3,localhost,57638,default,f,t)
-------------------------------------------
(3,3,localhost,57638,default,f,t,primary)
(1 row)
CREATE TABLE cluster_management_test (col_1 text, col_2 int);
@ -111,7 +111,7 @@ SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHER
-- try to remove a node with active placements and see that node removal is failed
SELECT master_remove_node('localhost', :worker_2_port);
ERROR: you cannot remove a node which has shard placements
ERROR: you cannot remove the primary node of a node group which has shard placements
SELECT master_get_active_worker_nodes();
master_get_active_worker_nodes
--------------------------------
@ -139,13 +139,13 @@ SELECT master_get_active_worker_nodes();
-- restore the node for next tests
SELECT master_activate_node('localhost', :worker_2_port);
master_activate_node
-----------------------------------
(3,3,localhost,57638,default,f,t)
-------------------------------------------
(3,3,localhost,57638,default,f,t,primary)
(1 row)
-- try to remove a node with active placements and see that node removal is failed
SELECT master_remove_node('localhost', :worker_2_port);
ERROR: you cannot remove a node which has shard placements
ERROR: you cannot remove the primary node of a node group which has shard placements
-- mark all placements in the candidate node as inactive
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
UPDATE pg_dist_placement SET shardstate=3 WHERE groupid=:worker_2_group;
@ -164,7 +164,7 @@ SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHER
-- try to remove a node with only inactive placements and see that removal still fails
SELECT master_remove_node('localhost', :worker_2_port);
ERROR: you cannot remove a node which has shard placements
ERROR: you cannot remove the primary node of a node group which has shard placements
SELECT master_get_active_worker_nodes();
master_get_active_worker_nodes
--------------------------------
@ -180,6 +180,34 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port);
(1 row)
UPDATE pg_dist_placement SET shardstate=1 WHERE groupid=:worker_2_group;
-- when there is no primary we should get a pretty error
UPDATE pg_dist_node SET noderole = 'secondary' WHERE nodeport=:worker_2_port;
SELECT * FROM cluster_management_test;
ERROR: node group 3 does not have a primary node
-- when there is no node at all in the group we should get a different error
DELETE FROM pg_dist_node WHERE nodeport=:worker_2_port;
SELECT * FROM cluster_management_test;
ERROR: the metadata is inconsistent
DETAIL: there is a placement in group 3 but there are no nodes in that group
-- clean-up
SELECT groupid as new_group FROM master_add_node('localhost', :worker_2_port) \gset
UPDATE pg_dist_placement SET groupid = :new_group WHERE groupid = :worker_2_group;
-- test that you are allowed to remove secondary nodes even if there are placements
SELECT master_add_node('localhost', 9990, groupid => :new_group, noderole => 'secondary');
master_add_node
--------------------------------------------
(5,4,localhost,9990,default,f,t,secondary)
(1 row)
SELECT master_remove_node('localhost', :worker_2_port);
ERROR: you cannot remove the primary node of a node group which has shard placements
SELECT master_remove_node('localhost', 9990);
master_remove_node
--------------------
(1 row)
-- clean-up
DROP TABLE cluster_management_test;
-- check that adding/removing nodes are propagated to nodes with hasmetadata=true
SELECT master_remove_node('localhost', :worker_2_port);
@ -241,8 +269,8 @@ SELECT
(1 row)
SELECT * FROM pg_dist_node ORDER BY nodeid;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive
--------+---------+----------+----------+----------+-------------+----------
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole
--------+---------+----------+----------+----------+-------------+----------+----------
(0 rows)
-- check that adding two nodes in the same transaction works
@ -250,15 +278,15 @@ SELECT
master_add_node('localhost', :worker_1_port),
master_add_node('localhost', :worker_2_port);
master_add_node | master_add_node
-----------------------------------+-----------------------------------
(6,6,localhost,57637,default,f,t) | (7,7,localhost,57638,default,f,t)
-------------------------------------------+-------------------------------------------
(8,7,localhost,57637,default,f,t,primary) | (9,8,localhost,57638,default,f,t,primary)
(1 row)
SELECT * FROM pg_dist_node ORDER BY nodeid;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive
--------+---------+-----------+----------+----------+-------------+----------
6 | 6 | localhost | 57637 | default | f | t
7 | 7 | localhost | 57638 | default | f | t
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole
--------+---------+-----------+----------+----------+-------------+----------+----------
8 | 7 | localhost | 57637 | default | f | t | primary
9 | 8 | localhost | 57638 | default | f | t | primary
(2 rows)
-- check that mixed add/remove node commands work fine inside transaction
@ -405,3 +433,39 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
(1 row)
-- check that you can't add more than one primary to a group
SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport = :worker_1_port \gset
SELECT master_add_node('localhost', 9999, groupid => :worker_1_group, noderole => 'primary');
ERROR: group 12 already has a primary node
-- check that you can add secondaries and unavailable nodes to a group
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
SELECT master_add_node('localhost', 9998, groupid => :worker_1_group, noderole => 'secondary');
master_add_node
----------------------------------------------
(16,12,localhost,9998,default,f,t,secondary)
(1 row)
SELECT master_add_node('localhost', 9997, groupid => :worker_1_group, noderole => 'unavailable');
master_add_node
------------------------------------------------
(17,12,localhost,9997,default,f,t,unavailable)
(1 row)
-- add_inactive_node also works with secondaries
SELECT master_add_inactive_node('localhost', 9996, groupid => :worker_2_group, noderole => 'secondary');
master_add_inactive_node
----------------------------------------------
(18,14,localhost,9996,default,f,f,secondary)
(1 row)
-- check that you can't manually add two primaries to a group
INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole)
VALUES ('localhost', 5000, :worker_1_group, 'primary');
ERROR: there cannot be two primary nodes in a group
CONTEXT: PL/pgSQL function citus.pg_dist_node_trigger_func() line 9 at RAISE
UPDATE pg_dist_node SET noderole = 'primary'
WHERE groupid = :worker_1_group AND nodeport = 9998;
ERROR: there cannot be two primary nodes in a group
CONTEXT: PL/pgSQL function citus.pg_dist_node_trigger_func() line 17 at RAISE
-- don't remove the secondary and unavailable nodes, check that no commands are sent to
-- them in any of the remaining tests

View File

@ -21,14 +21,14 @@ CREATE EXTENSION citus;
-- re-add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-----------------------------------
(1,1,localhost,57637,default,f,t)
-------------------------------------------
(1,1,localhost,57637,default,f,t,primary)
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------
(2,2,localhost,57638,default,f,t)
-------------------------------------------
(2,2,localhost,57638,default,f,t,primary)
(1 row)
-- verify that a table can be created after the extension has been dropped and recreated

View File

@ -28,10 +28,10 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
-- pg_dist_node entries and reference tables
SELECT unnest(master_metadata_snapshot());
unnest
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
TRUNCATE pg_dist_node
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole)
(3 rows)
-- Create a test table with constraints and SERIAL
@ -57,7 +57,7 @@ SELECT unnest(master_metadata_snapshot());
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
TRUNCATE pg_dist_node
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole)
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
@ -78,7 +78,7 @@ SELECT unnest(master_metadata_snapshot());
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
TRUNCATE pg_dist_node
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole)
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
@ -101,7 +101,7 @@ SELECT unnest(master_metadata_snapshot());
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
TRUNCATE pg_dist_node
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole)
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
@ -130,7 +130,7 @@ SELECT unnest(master_metadata_snapshot());
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
TRUNCATE pg_dist_node
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole)
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
@ -152,7 +152,7 @@ SELECT unnest(master_metadata_snapshot());
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
TRUNCATE pg_dist_node
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole)
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
@ -197,10 +197,10 @@ SELECT * FROM pg_dist_local_group;
(1 row)
SELECT * FROM pg_dist_node ORDER BY nodeid;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive
--------+---------+-----------+----------+----------+-------------+----------
1 | 1 | localhost | 57637 | default | t | t
2 | 2 | localhost | 57638 | default | f | t
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole
--------+---------+-----------+----------+----------+-------------+----------+----------
1 | 1 | localhost | 57637 | default | t | t | primary
2 | 2 | localhost | 57638 | default | f | t | primary
(2 rows)
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
@ -334,10 +334,10 @@ SELECT * FROM pg_dist_local_group;
(1 row)
SELECT * FROM pg_dist_node ORDER BY nodeid;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive
--------+---------+-----------+----------+----------+-------------+----------
1 | 1 | localhost | 57637 | default | t | t
2 | 2 | localhost | 57638 | default | f | t
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole
--------+---------+-----------+----------+----------+-------------+----------+----------
1 | 1 | localhost | 57637 | default | t | t | primary
2 | 2 | localhost | 57638 | default | f | t | primary
(2 rows)
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
@ -1130,8 +1130,8 @@ SELECT create_distributed_table('mx_table', 'a');
\c - postgres - :master_port
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------
(4,4,localhost,57638,default,f,t)
-------------------------------------------
(4,4,localhost,57638,default,f,t,primary)
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
@ -1343,8 +1343,8 @@ WHERE logicalrelid='mx_ref'::regclass;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "mx_ref" to the node localhost:57638
master_add_node
-----------------------------------
(5,5,localhost,57638,default,f,t)
-------------------------------------------
(5,5,localhost,57638,default,f,t,primary)
(1 row)
SELECT shardid, nodename, nodeport

View File

@ -481,7 +481,7 @@ INSERT INTO app_analytics_events_mx (app_id, name) VALUES (103, 'Mynt') RETURNIN
SELECT setval('app_analytics_events_mx_id_seq'::regclass, :last_value);
setval
------------------
3659174697238529
3940649673949185
(1 row)
ALTER SEQUENCE app_analytics_events_mx_id_seq

View File

@ -41,10 +41,12 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
(1 row)
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------------------
(1380000,1380000,localhost,57638,default,f,t)
SELECT groupid AS worker_2_group FROM master_add_node('localhost', :worker_2_port) \gset
-- add a secondary to check we don't attempt to replicate the table to it
SELECT isactive FROM master_add_node('localhost', 9000, groupid=>:worker_2_group, noderole=>'secondary');
isactive
----------
t
(1 row)
-- remove a node with reference table
@ -55,6 +57,64 @@ SELECT create_reference_table('remove_node_reference_table');
(1 row)
-- make sure when we add a secondary we don't attempt to add placements to it
SELECT isactive FROM master_add_node('localhost', 9001, groupid=>:worker_2_group, noderole=>'secondary');
isactive
----------
t
(1 row)
SELECT count(*) FROM pg_dist_placement WHERE groupid = :worker_2_group;
count
-------
1
(1 row)
-- make sure when we disable a secondary we don't remove any placements
SELECT master_disable_node('localhost', 9001);
master_disable_node
---------------------
(1 row)
SELECT isactive FROM pg_dist_node WHERE nodeport = 9001;
isactive
----------
f
(1 row)
SELECT count(*) FROM pg_dist_placement WHERE groupid = :worker_2_group;
count
-------
1
(1 row)
-- make sure when we activate a secondary we don't add any placements
SELECT master_activate_node('localhost', 9001);
master_activate_node
--------------------------------------------------------
(1380002,1380000,localhost,9001,default,f,t,secondary)
(1 row)
SELECT count(*) FROM pg_dist_placement WHERE groupid = :worker_2_group;
count
-------
1
(1 row)
-- make sure when we remove a secondary we don't remove any placements
SELECT master_remove_node('localhost', 9001);
master_remove_node
--------------------
(1 row)
SELECT count(*) FROM pg_dist_placement WHERE groupid = :worker_2_group;
count
-------
1
(1 row)
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
@ -165,8 +225,8 @@ ERROR: node at "localhost:57638" does not exist
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
master_add_node
-----------------------------------------------
(1380001,1380001,localhost,57638,default,f,t)
-------------------------------------------------------
(1380003,1380001,localhost,57638,default,f,t,primary)
(1 row)
-- try to disable the node before removing it (this used to crash)
@ -186,8 +246,8 @@ SELECT master_remove_node('localhost', :worker_2_port);
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
master_add_node
-----------------------------------------------
(1380002,1380002,localhost,57638,default,f,t)
-------------------------------------------------------
(1380004,1380002,localhost,57638,default,f,t,primary)
(1 row)
-- remove node in a transaction and ROLLBACK
@ -409,8 +469,8 @@ WHERE
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
master_add_node
-----------------------------------------------
(1380003,1380003,localhost,57638,default,f,t)
-------------------------------------------------------
(1380005,1380003,localhost,57638,default,f,t,primary)
(1 row)
-- test inserting a value then removing a node in a transaction
@ -538,8 +598,8 @@ SELECT * FROM remove_node_reference_table;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
master_add_node
-----------------------------------------------
(1380004,1380004,localhost,57638,default,f,t)
-------------------------------------------------------
(1380006,1380004,localhost,57638,default,f,t,primary)
(1 row)
-- test executing DDL command then removing a node in a transaction
@ -663,8 +723,8 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.remove_
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
master_add_node
-----------------------------------------------
(1380005,1380005,localhost,57638,default,f,t)
-------------------------------------------------------
(1380007,1380005,localhost,57638,default,f,t,primary)
(1 row)
-- test DROP table after removing a node in a transaction
@ -731,8 +791,8 @@ SELECT * FROM pg_dist_colocation WHERE colocationid = 1380000;
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------------------
(1380006,1380006,localhost,57638,default,f,t)
-------------------------------------------------------
(1380008,1380006,localhost,57638,default,f,t,primary)
(1 row)
-- re-create remove_node_reference_table
@ -866,8 +926,8 @@ SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
NOTICE: Replicating reference table "table1" to the node localhost:57638
master_add_node
-----------------------------------------------
(1380007,1380007,localhost,57638,default,f,t)
-------------------------------------------------------
(1380009,1380007,localhost,57638,default,f,t,primary)
(1 row)
-- test with master_disable_node
@ -915,7 +975,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid ASC;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380001 | 1 | 0 | localhost | 57638
@ -983,8 +1044,8 @@ SELECT master_activate_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
NOTICE: Replicating reference table "table1" to the node localhost:57638
master_activate_node
-----------------------------------------------
(1380007,1380007,localhost,57638,default,f,t)
-------------------------------------------------------
(1380009,1380007,localhost,57638,default,f,t,primary)
(1 row)
-- DROP tables to clean workspace

View File

@ -25,8 +25,8 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------------------
(1370000,1370000,localhost,57638,default,f,t)
-------------------------------------------------------
(1370000,1370000,localhost,57638,default,f,t,primary)
(1 row)
-- verify node is added
@ -123,8 +123,8 @@ WHERE colocationid IN
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_valid" to the node localhost:57638
master_add_node
-----------------------------------------------
(1370002,1370002,localhost,57638,default,f,t)
-------------------------------------------------------
(1370002,1370002,localhost,57638,default,f,t,primary)
(1 row)
-- status after master_add_node
@ -176,8 +176,8 @@ WHERE colocationid IN
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------------------
(1370002,1370002,localhost,57638,default,f,t)
-------------------------------------------------------
(1370002,1370002,localhost,57638,default,f,t,primary)
(1 row)
-- status after master_add_node
@ -244,8 +244,8 @@ BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_rollback" to the node localhost:57638
master_add_node
-----------------------------------------------
(1370003,1370003,localhost,57638,default,f,t)
-------------------------------------------------------
(1370003,1370003,localhost,57638,default,f,t,primary)
(1 row)
ROLLBACK;
@ -306,8 +306,8 @@ BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_commit" to the node localhost:57638
master_add_node
-----------------------------------------------
(1370004,1370004,localhost,57638,default,f,t)
-------------------------------------------------------
(1370004,1370004,localhost,57638,default,f,t,primary)
(1 row)
COMMIT;
@ -401,8 +401,8 @@ BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_reference_one" to the node localhost:57638
master_add_node
-----------------------------------------------
(1370005,1370005,localhost,57638,default,f,t)
-------------------------------------------------------
(1370005,1370005,localhost,57638,default,f,t,primary)
(1 row)
SELECT upgrade_to_reference_table('replicate_reference_table_hash');
@ -551,8 +551,8 @@ BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_drop" to the node localhost:57638
master_add_node
-----------------------------------------------
(1370009,1370009,localhost,57638,default,f,t)
-------------------------------------------------------
(1370009,1370009,localhost,57638,default,f,t,primary)
(1 row)
DROP TABLE replicate_reference_table_drop;
@ -613,8 +613,8 @@ WHERE colocationid IN
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "table1" to the node localhost:57638
master_add_node
-----------------------------------------------
(1370010,1370010,localhost,57638,default,f,t)
-------------------------------------------------------
(1370010,1370010,localhost,57638,default,f,t,primary)
(1 row)
-- status after master_add_node
@ -658,8 +658,8 @@ SELECT create_reference_table('initially_not_replicated_reference_table');
SELECT master_add_inactive_node('localhost', :worker_2_port);
master_add_inactive_node
-----------------------------------------------
(1370011,1370011,localhost,57638,default,f,f)
-------------------------------------------------------
(1370011,1370011,localhost,57638,default,f,f,primary)
(1 row)
-- we should see only one shard placements
@ -684,8 +684,8 @@ ORDER BY 1,4,5;
SELECT master_activate_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "initially_not_replicated_reference_table" to the node localhost:57638
master_activate_node
-----------------------------------------------
(1370011,1370011,localhost,57638,default,f,t)
-------------------------------------------------------
(1370011,1370011,localhost,57638,default,f,t,primary)
(1 row)
SELECT
@ -709,8 +709,8 @@ ORDER BY 1,4,5;
-- this should have no effect
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------------------
(1370011,1370011,localhost,57638,default,f,t)
-------------------------------------------------------
(1370011,1370011,localhost,57638,default,f,t,primary)
(1 row)
-- drop unnecassary tables

View File

@ -78,14 +78,14 @@ CREATE EXTENSION citus;
-- re-add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-----------------------------------
(1,1,localhost,57637,default,f,t)
-------------------------------------------
(1,1,localhost,57637,default,f,t,primary)
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------
(2,2,localhost,57638,default,f,t)
-------------------------------------------
(2,2,localhost,57638,default,f,t,primary)
(1 row)
-- create a table with a SERIAL column

View File

@ -219,10 +219,11 @@ SELECT count(*) FROM mx_table;
SELECT master_add_node('localhost', 5432);
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive
--------+---------+----------+----------+----------+-------------+----------
(0 rows)
SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
count
-------
0
(1 row)
-- master_remove_node
\c - - - :master_port
@ -231,8 +232,8 @@ NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
SELECT master_add_node('localhost', 5432);
master_add_node
----------------------------------------------
(1370000,1370000,localhost,5432,default,f,t)
------------------------------------------------------
(1370000,1370000,localhost,5432,default,f,t,primary)
(1 row)
\c - - - :worker_1_port
@ -240,9 +241,9 @@ SELECT master_remove_node('localhost', 5432);
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive
---------+---------+-----------+----------+----------+-------------+----------
1370000 | 1370000 | localhost | 5432 | default | f | t
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole
---------+---------+-----------+----------+----------+-------------+----------+----------
1370000 | 1370000 | localhost | 5432 | default | f | t | primary
(1 row)
\c - - - :master_port

View File

@ -768,8 +768,8 @@ SELECT master_activate_node('localhost', :worker_1_port);
NOTICE: Replicating reference table "nation" to the node localhost:57637
NOTICE: Replicating reference table "supplier" to the node localhost:57637
master_activate_node
-----------------------------------
(1,1,localhost,57637,default,f,t)
-------------------------------------------
(1,1,localhost,57637,default,f,t,primary)
(1 row)
RESET citus.shard_replication_factor;

View File

@ -71,6 +71,24 @@ SELECT master_get_active_worker_nodes();
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
UPDATE pg_dist_placement SET shardstate=1 WHERE groupid=:worker_2_group;
-- when there is no primary we should get a pretty error
UPDATE pg_dist_node SET noderole = 'secondary' WHERE nodeport=:worker_2_port;
SELECT * FROM cluster_management_test;
-- when there is no node at all in the group we should get a different error
DELETE FROM pg_dist_node WHERE nodeport=:worker_2_port;
SELECT * FROM cluster_management_test;
-- clean-up
SELECT groupid as new_group FROM master_add_node('localhost', :worker_2_port) \gset
UPDATE pg_dist_placement SET groupid = :new_group WHERE groupid = :worker_2_group;
-- test that you are allowed to remove secondary nodes even if there are placements
SELECT master_add_node('localhost', 9990, groupid => :new_group, noderole => 'secondary');
SELECT master_remove_node('localhost', :worker_2_port);
SELECT master_remove_node('localhost', 9990);
-- clean-up
DROP TABLE cluster_management_test;
-- check that adding/removing nodes are propagated to nodes with hasmetadata=true
@ -162,3 +180,23 @@ DELETE FROM pg_dist_node;
\c - - - :master_port
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
-- check that you can't add more than one primary to a group
SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport = :worker_1_port \gset
SELECT master_add_node('localhost', 9999, groupid => :worker_1_group, noderole => 'primary');
-- check that you can add secondaries and unavailable nodes to a group
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
SELECT master_add_node('localhost', 9998, groupid => :worker_1_group, noderole => 'secondary');
SELECT master_add_node('localhost', 9997, groupid => :worker_1_group, noderole => 'unavailable');
-- add_inactive_node also works with secondaries
SELECT master_add_inactive_node('localhost', 9996, groupid => :worker_2_group, noderole => 'secondary');
-- check that you can't manually add two primaries to a group
INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole)
VALUES ('localhost', 5000, :worker_1_group, 'primary');
UPDATE pg_dist_node SET noderole = 'primary'
WHERE groupid = :worker_1_group AND nodeport = 9998;
-- don't remove the secondary and unavailable nodes, check that no commands are sent to
-- them in any of the remaining tests

View File

@ -31,12 +31,28 @@ SELECT master_remove_node('localhost', :worker_2_port);
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
SELECT groupid AS worker_2_group FROM master_add_node('localhost', :worker_2_port) \gset
-- add a secondary to check we don't attempt to replicate the table to it
SELECT isactive FROM master_add_node('localhost', 9000, groupid=>:worker_2_group, noderole=>'secondary');
-- remove a node with reference table
CREATE TABLE remove_node_reference_table(column1 int);
SELECT create_reference_table('remove_node_reference_table');
-- make sure when we add a secondary we don't attempt to add placements to it
SELECT isactive FROM master_add_node('localhost', 9001, groupid=>:worker_2_group, noderole=>'secondary');
SELECT count(*) FROM pg_dist_placement WHERE groupid = :worker_2_group;
-- make sure when we disable a secondary we don't remove any placements
SELECT master_disable_node('localhost', 9001);
SELECT isactive FROM pg_dist_node WHERE nodeport = 9001;
SELECT count(*) FROM pg_dist_placement WHERE groupid = :worker_2_group;
-- make sure when we activate a secondary we don't add any placements
SELECT master_activate_node('localhost', 9001);
SELECT count(*) FROM pg_dist_placement WHERE groupid = :worker_2_group;
-- make sure when we remove a secondary we don't remove any placements
SELECT master_remove_node('localhost', 9001);
SELECT count(*) FROM pg_dist_placement WHERE groupid = :worker_2_group;
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
@ -544,7 +560,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid ASC;
\c - - - :master_port

View File

@ -119,7 +119,7 @@ SELECT count(*) FROM mx_table;
-- master_add_node
SELECT master_add_node('localhost', 5432);
SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
-- master_remove_node
\c - - - :master_port