From ec99f8f98356477e83e3e7e00b2e08f4b67ffea4 Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Mon, 3 Jul 2017 18:55:37 +0300 Subject: [PATCH] Add nodeRole column - master_add_node enforces that there is only one primary per group - there's also a trigger on pg_dist_node to prevent multiple primaries per group - functions in metadata cache only return primary nodes - Rename ActiveWorkerNodeList -> ActivePrimaryNodeList - Rename WorkerGetLive{Node->Group}Count() - Refactor WorkerGetRandomCandidateNode - master_remove_node only complains about active shard placements if the node being removed is a primary. - master_remove_node only deletes all reference table placements in the group if the node being removed is the primary. - Rename {Node->NodeGroup}HasShardPlacements, this reflects the behavior it already had. - Rename DeleteAllReferenceTablePlacementsFrom{Node->NodeGroup}. This also reflects the behavior it already had, but the new signature forces the caller to pass in a groupId - Rename {WorkerGetLiveGroup->ActivePrimaryNode}Count --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--7.0-3--7.0-4.sql | 1 + .../distributed/citus--7.0-4--7.0-5.sql | 97 +++++++++ src/backend/distributed/citus.control | 2 +- .../commands/create_distributed_table.c | 4 +- .../executor/multi_real_time_executor.c | 2 +- .../executor/multi_server_executor.c | 2 +- .../executor/multi_task_tracker_executor.c | 2 +- .../distributed/master/master_create_shards.c | 4 +- .../master/master_expire_table_cache.c | 2 +- .../master/master_metadata_utility.c | 9 +- .../distributed/master/master_node_protocol.c | 2 +- .../master/master_stage_protocol.c | 16 +- .../distributed/master/worker_node_manager.c | 121 ++++------- .../distributed/metadata/metadata_sync.c | 24 ++- .../planner/multi_physical_planner.c | 8 +- .../planner/multi_router_planner.c | 2 +- .../transaction/transaction_recovery.c | 2 +- .../transaction/worker_transaction.c | 4 +- .../distributed/utils/metadata_cache.c | 111 +++++++++- src/backend/distributed/utils/node_metadata.c | 197 ++++++++++++++---- .../distributed/utils/reference_table_utils.c | 18 +- .../distributed/master_metadata_utility.h | 4 +- src/include/distributed/metadata_cache.h | 5 + src/include/distributed/pg_dist_node.h | 4 +- .../distributed/reference_table_utils.h | 3 +- src/include/distributed/worker_manager.h | 8 +- .../expected/multi_cluster_management.out | 106 ++++++++-- .../regress/expected/multi_drop_extension.out | 12 +- .../regress/expected/multi_metadata_sync.out | 44 ++-- .../expected/multi_mx_modifications.out | 2 +- .../multi_remove_node_reference_table.out | 119 ++++++++--- .../multi_replicate_reference_table.out | 66 +++--- src/test/regress/expected/multi_table_ddl.out | 12 +- .../multi_unsupported_worker_operations.out | 21 +- src/test/regress/output/multi_copy.source | 6 +- .../regress/sql/multi_cluster_management.sql | 38 ++++ .../sql/multi_remove_node_reference_table.sql | 21 +- .../multi_unsupported_worker_operations.sql | 2 +- 39 files changed, 795 insertions(+), 312 deletions(-) create mode 100644 src/backend/distributed/citus--7.0-4--7.0-5.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 44f052ba2..f7f094846 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -11,7 +11,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \ 6.2-1 6.2-2 6.2-3 6.2-4 \ - 7.0-1 7.0-2 7.0-3 7.0-4 + 7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -147,6 +147,8 @@ $(EXTENSION)--7.0-3.sql: $(EXTENSION)--7.0-2.sql $(EXTENSION)--7.0-2--7.0-3.sql cat $^ > $@ $(EXTENSION)--7.0-4.sql: $(EXTENSION)--7.0-3.sql $(EXTENSION)--7.0-3--7.0-4.sql cat $^ > $@ +$(EXTENSION)--7.0-5.sql: $(EXTENSION)--7.0-4.sql $(EXTENSION)--7.0-4--7.0-5.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--7.0-3--7.0-4.sql b/src/backend/distributed/citus--7.0-3--7.0-4.sql index a7a00ad09..c7e3f690f 100644 --- a/src/backend/distributed/citus--7.0-3--7.0-4.sql +++ b/src/backend/distributed/citus--7.0-3--7.0-4.sql @@ -22,4 +22,5 @@ CREATE OR REPLACE FUNCTION get_all_active_transactions(OUT database_id oid, OUT AS 'MODULE_PATHNAME', $$get_all_active_transactions$$; COMMENT ON FUNCTION get_all_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz) IS 'returns distributed transaction ids of active distributed transactions'; + RESET search_path; diff --git a/src/backend/distributed/citus--7.0-4--7.0-5.sql b/src/backend/distributed/citus--7.0-4--7.0-5.sql new file mode 100644 index 000000000..8baff3017 --- /dev/null +++ b/src/backend/distributed/citus--7.0-4--7.0-5.sql @@ -0,0 +1,97 @@ +/* citus--7.0-4--7.0-5.sql */ + +SET search_path = 'pg_catalog'; + +CREATE TYPE pg_catalog.noderole AS ENUM ( + 'primary', -- node is available and accepting writes + 'secondary', -- node is available but only accepts reads + 'unavailable' -- node is in recovery or otherwise not usable +-- adding new values to a type inside of a transaction (such as during an ALTER EXTENSION +-- citus UPDATE) isn't allowed in PG 9.6, and only allowed in PG10 if you don't use the +-- new values inside of the same transaction. You might need to replace this type with a +-- new one and then change the column type in pg_dist_node. There's a list of +-- alternatives here: +-- https://stackoverflow.com/questions/1771543/postgresql-updating-an-enum-type/41696273 +); + +ALTER TABLE pg_dist_node ADD COLUMN noderole noderole NOT NULL DEFAULT 'primary'; + +-- we're now allowed to have more than one node per group +ALTER TABLE pg_catalog.pg_dist_node DROP CONSTRAINT pg_dist_node_groupid_unique; + +-- so make sure pg_dist_shard_placement only returns writable placements +CREATE OR REPLACE VIEW pg_catalog.pg_dist_shard_placement AS + SELECT shardid, shardstate, shardlength, nodename, nodeport, placementid + FROM pg_dist_placement placement INNER JOIN pg_dist_node node ON ( + placement.groupid = node.groupid AND node.noderole = 'primary' + ); + +CREATE OR REPLACE FUNCTION citus.pg_dist_node_trigger_func() +RETURNS TRIGGER AS $$ + BEGIN + LOCK TABLE pg_dist_node IN SHARE ROW EXCLUSIVE MODE; + IF (TG_OP = 'INSERT') THEN + IF NEW.noderole = 'primary' + AND EXISTS (SELECT 1 FROM pg_dist_node WHERE groupid = NEW.groupid AND + noderole = 'primary' AND + nodeid <> NEW.nodeid) THEN + RAISE EXCEPTION 'there cannot be two primary nodes in a group'; + END IF; + RETURN NEW; + ELSIF (TG_OP = 'UPDATE') THEN + IF NEW.noderole = 'primary' + AND EXISTS (SELECT 1 FROM pg_dist_node WHERE groupid = NEW.groupid AND + noderole = 'primary' AND + nodeid <> NEW.nodeid) THEN + RAISE EXCEPTION 'there cannot be two primary nodes in a group'; + END IF; + RETURN NEW; + END IF; + END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER pg_dist_node_trigger + BEFORE INSERT OR UPDATE ON pg_dist_node + FOR EACH ROW EXECUTE PROCEDURE citus.pg_dist_node_trigger_func(); + +DROP FUNCTION master_add_node(text, integer); +CREATE FUNCTION master_add_node(nodename text, + nodeport integer, + groupid integer default 0, + noderole noderole default 'primary', + OUT nodeid integer, + OUT groupid integer, + OUT nodename text, + OUT nodeport integer, + OUT noderack text, + OUT hasmetadata boolean, + OUT isactive bool, + OUT noderole noderole) + RETURNS record + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_add_node$$; +COMMENT ON FUNCTION master_add_node(nodename text, nodeport integer, + groupid integer, noderole noderole) + IS 'add node to the cluster'; + +DROP FUNCTION master_add_inactive_node(text, integer); +CREATE FUNCTION master_add_inactive_node(nodename text, + nodeport integer, + groupid integer default 0, + noderole noderole default 'primary', + OUT nodeid integer, + OUT groupid integer, + OUT nodename text, + OUT nodeport integer, + OUT noderack text, + OUT hasmetadata boolean, + OUT isactive bool, + OUT noderole noderole) + RETURNS record + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$master_add_inactive_node$$; +COMMENT ON FUNCTION master_add_inactive_node(nodename text,nodeport integer, + groupid integer, noderole noderole) + IS 'prepare node by adding it to pg_dist_node'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index ac5f945a7..661f7164b 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '7.0-4' +default_version = '7.0-5' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 557f0b932..0aeec0165 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -252,7 +252,7 @@ CreateReferenceTable(Oid relationId) EnsureCoordinator(); CheckCitusVersion(ERROR); - workerNodeList = ActiveWorkerNodeList(); + workerNodeList = ActivePrimaryNodeList(); replicationFactor = list_length(workerNodeList); /* if there are no workers, error out */ @@ -720,7 +720,7 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, static void EnsureSchemaExistsOnAllNodes(Oid relationId) { - List *workerNodeList = ActiveWorkerNodeList(); + List *workerNodeList = ActivePrimaryNodeList(); ListCell *workerNodeCell = NULL; StringInfo applySchemaCreationDDL = makeStringInfo(); diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 911408c96..0a0adc02f 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -82,7 +82,7 @@ MultiRealTimeExecute(Job *job) const char *workerHashName = "Worker node hash"; WaitInfo *waitInfo = MultiClientCreateWaitInfo(list_length(taskList)); - workerNodeList = ActiveWorkerNodeList(); + workerNodeList = ActivePrimaryNodeList(); workerHash = WorkerHash(workerHashName, workerNodeList); /* initialize task execution structures for remote execution */ diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index c57be21d2..79249f54a 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -71,7 +71,7 @@ JobExecutorType(MultiPlan *multiPlan) " queries on the workers."))); } - workerNodeList = ActiveWorkerNodeList(); + workerNodeList = ActivePrimaryNodeList(); workerNodeCount = list_length(workerNodeList); taskCount = list_length(job->taskList); tasksPerNode = taskCount / ((double) workerNodeCount); diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 307a82f1a..0a8b4e3d1 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -190,7 +190,7 @@ MultiTaskTrackerExecute(Job *job) * assigning and checking the status of tasks. The second (temporary) hash * helps us in fetching results data from worker nodes to the master node. */ - workerNodeList = ActiveWorkerNodeList(); + workerNodeList = ActivePrimaryNodeList(); taskTrackerCount = (uint32) list_length(workerNodeList); taskTrackerHash = TrackerHash(taskTrackerHashName, workerNodeList); diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index ee1bf458c..19f334092 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -163,7 +163,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; /* load and sort the worker node list for deterministic placement */ - workerNodeList = ActiveWorkerNodeList(); + workerNodeList = ActivePrimaryNodeList(); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); /* make sure we don't process cancel signals until all shards are created */ @@ -382,7 +382,7 @@ CreateReferenceTableShard(Oid distributedTableId) } /* load and sort the worker node list for deterministic placement */ - workerNodeList = ActiveWorkerNodeList(); + workerNodeList = ActivePrimaryNodeList(); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); /* get the next shard id */ diff --git a/src/backend/distributed/master/master_expire_table_cache.c b/src/backend/distributed/master/master_expire_table_cache.c index ce090a495..d983325b7 100644 --- a/src/backend/distributed/master/master_expire_table_cache.c +++ b/src/backend/distributed/master/master_expire_table_cache.c @@ -60,7 +60,7 @@ master_expire_table_cache(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); cacheEntry = DistributedTableCacheEntry(relationId); - workerNodeList = ActiveWorkerNodeList(); + workerNodeList = ActivePrimaryNodeList(); shardCount = cacheEntry->shardIntervalArrayLength; shardIntervalArray = cacheEntry->sortedShardIntervalArray; diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 1f1f98c98..76c1dd3da 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -160,7 +160,7 @@ DistributedTableSize(Oid relationId, char *sizeQuery) pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); - workerNodeList = ActiveWorkerNodeList(); + workerNodeList = ActivePrimaryNodeList(); foreach(workerNodeCell, workerNodeList) { @@ -606,11 +606,10 @@ ShardLength(uint64 shardId) /* - * NodeHasShardPlacements returns whether any active shards are placed on the group - * this node is a part of. + * NodeGroupHasShardPlacements returns whether any active shards are placed on the group */ bool -NodeHasShardPlacements(char *nodeName, int32 nodePort, bool onlyConsiderActivePlacements) +NodeGroupHasShardPlacements(uint32 groupId, bool onlyConsiderActivePlacements) { const int scanKeyCount = (onlyConsiderActivePlacements ? 2 : 1); const bool indexOK = false; @@ -621,8 +620,6 @@ NodeHasShardPlacements(char *nodeName, int32 nodePort, bool onlyConsiderActivePl SysScanDesc scanDescriptor = NULL; ScanKeyData scanKey[scanKeyCount]; - uint32 groupId = GroupForNode(nodeName, nodePort); - Relation pgPlacement = heap_open(DistPlacementRelationId(), AccessShareLock); diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 7d18b9dd8..699de72db 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -399,7 +399,7 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS) /* switch to memory context appropriate for multiple function calls */ oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx); - workerNodeList = ActiveWorkerNodeList(); + workerNodeList = ActivePrimaryNodeList(); workerNodeCount = (uint32) list_length(workerNodeList); functionContext->user_fctx = workerNodeList; diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 0ea42e770..765fd3084 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -72,7 +72,6 @@ master_create_empty_shard(PG_FUNCTION_ARGS) char *relationName = text_to_cstring(relationNameText); uint64 shardId = INVALID_SHARD_ID; uint32 attemptableNodeCount = 0; - uint32 liveNodeCount = 0; uint32 candidateNodeIndex = 0; List *candidateNodeList = NIL; @@ -133,12 +132,15 @@ master_create_empty_shard(PG_FUNCTION_ARGS) /* generate new and unique shardId from sequence */ shardId = GetNextShardId(); - /* if enough live nodes, add an extra candidate node as backup */ - attemptableNodeCount = ShardReplicationFactor; - liveNodeCount = WorkerGetLiveNodeCount(); - if (liveNodeCount > ShardReplicationFactor) + /* if enough live groups, add an extra candidate node as backup */ { - attemptableNodeCount = ShardReplicationFactor + 1; + uint32 primaryNodeCount = ActivePrimaryNodeCount(); + + attemptableNodeCount = ShardReplicationFactor; + if (primaryNodeCount > ShardReplicationFactor) + { + attemptableNodeCount = ShardReplicationFactor + 1; + } } /* first retrieve a list of random nodes for shard placements */ @@ -152,7 +154,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) } else if (ShardPlacementPolicy == SHARD_PLACEMENT_ROUND_ROBIN) { - List *workerNodeList = ActiveWorkerNodeList(); + List *workerNodeList = ActivePrimaryNodeList(); candidateNode = WorkerGetRoundRobinCandidateNode(workerNodeList, shardId, candidateNodeIndex); } diff --git a/src/backend/distributed/master/worker_node_manager.c b/src/backend/distributed/master/worker_node_manager.c index 1d93b49b1..aab1f7f4c 100644 --- a/src/backend/distributed/master/worker_node_manager.c +++ b/src/backend/distributed/master/worker_node_manager.c @@ -42,8 +42,8 @@ int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size * /* Local functions forward declarations */ static WorkerNode * WorkerGetNodeWithName(const char *hostname); static char * ClientHostAddress(StringInfo remoteHostStringInfo); -static WorkerNode * FindRandomNodeNotInList(HTAB *WorkerNodesHash, - List *currentNodeList); +static List * PrimaryNodesNotInList(List *currentList); +static WorkerNode * FindRandomNodeFromList(List *candidateWorkerNodeList); static bool OddNumber(uint32 number); static bool ListMember(List *currentList, WorkerNode *workerNode); @@ -54,9 +54,8 @@ static bool ListMember(List *currentList, WorkerNode *workerNode); */ /* - * WorkerGetRandomCandidateNode takes in a list of worker nodes, and then allocates - * a new worker node. The allocation is performed by randomly picking a worker node - * which is not in currentNodeList. + * WorkerGetRandomCandidateNode accepts a list of WorkerNode's and returns a random + * primary node which is not in that list. * * Note that the function returns null if the worker membership list does not * contain enough nodes to allocate a new worker node. @@ -69,16 +68,11 @@ WorkerGetRandomCandidateNode(List *currentNodeList) uint32 tryCount = WORKER_RACK_TRIES; uint32 tryIndex = 0; - HTAB *workerNodeHash = GetWorkerNodeHash(); - - /* - * We check if the shard has already been placed on all nodes known to us. - * This check is rather defensive, and has the drawback of performing a full - * scan over the worker node hash for determining the number of live nodes. - */ uint32 currentNodeCount = list_length(currentNodeList); - uint32 liveNodeCount = WorkerGetLiveNodeCount(); - if (currentNodeCount >= liveNodeCount) + List *candidateWorkerNodeList = PrimaryNodesNotInList(currentNodeList); + + /* we check if the shard has already been placed on all nodes known to us */ + if (list_length(candidateWorkerNodeList) == 0) { return NULL; } @@ -86,7 +80,7 @@ WorkerGetRandomCandidateNode(List *currentNodeList) /* if current node list is empty, randomly pick one node and return */ if (currentNodeCount == 0) { - workerNode = FindRandomNodeNotInList(workerNodeHash, NIL); + workerNode = FindRandomNodeFromList(candidateWorkerNodeList); return workerNode; } @@ -116,7 +110,7 @@ WorkerGetRandomCandidateNode(List *currentNodeList) char *workerRack = NULL; bool sameRack = false; - workerNode = FindRandomNodeNotInList(workerNodeHash, currentNodeList); + workerNode = FindRandomNodeFromList(candidateWorkerNodeList); workerRack = workerNode->workerRack; sameRack = (strncmp(workerRack, firstRack, WORKER_LENGTH) == 0); @@ -302,12 +296,12 @@ WorkerGetNodeWithName(const char *hostname) /* - * WorkerGetLiveNodeCount returns the number of live nodes in the cluster. - * */ + * ActivePrimaryNodeCount returns the number of groups with a primary in the cluster. + */ uint32 -WorkerGetLiveNodeCount(void) +ActivePrimaryNodeCount(void) { - List *workerNodeList = ActiveWorkerNodeList(); + List *workerNodeList = ActivePrimaryNodeList(); uint32 liveWorkerCount = list_length(workerNodeList); return liveWorkerCount; @@ -315,11 +309,10 @@ WorkerGetLiveNodeCount(void) /* - * ActiveWorkerNodeList iterates over the hash table that includes the worker - * nodes and adds active nodes to a list, which is returned. + * ActivePrimaryNodeList returns a list of all the active primary nodes in workerNodeHash */ List * -ActiveWorkerNodeList(void) +ActivePrimaryNodeList(void) { List *workerNodeList = NIL; WorkerNode *workerNode = NULL; @@ -330,7 +323,7 @@ ActiveWorkerNodeList(void) while ((workerNode = hash_seq_search(&status)) != NULL) { - if (workerNode->isActive) + if (workerNode->isActive && WorkerNodeIsPrimary(workerNode)) { WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode)); memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode)); @@ -343,70 +336,48 @@ ActiveWorkerNodeList(void) /* - * FindRandomNodeNotInList finds a random node from the shared hash that is not - * a member of the current node list. The caller is responsible for making the - * necessary node count checks to ensure that such a node exists. - * - * Note that this function has a selection bias towards nodes whose positions in - * the shared hash are sequentially adjacent to the positions of nodes that are - * in the current node list. This bias follows from our decision to first pick a - * random node in the hash, and if that node is a member of the current list, to - * simply iterate to the next node in the hash. Overall, this approach trades in - * some selection bias for simplicity in design and for bounded execution time. + * PrimaryNodesNotInList scans through the worker node hash and returns a list of all + * primary nodes which are not in currentList. It runs in O(n*m) but currentList is + * quite small. */ -static WorkerNode * -FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList) +static List * +PrimaryNodesNotInList(List *currentList) { + List *workerNodeList = NIL; + HTAB *workerNodeHash = GetWorkerNodeHash(); WorkerNode *workerNode = NULL; HASH_SEQ_STATUS status; - uint32 workerNodeCount = 0; - uint32 currentNodeCount PG_USED_FOR_ASSERTS_ONLY = 0; - bool lookForWorkerNode = true; - uint32 workerPosition = 0; - uint32 workerIndex = 0; - workerNodeCount = hash_get_num_entries(WorkerNodesHash); - currentNodeCount = list_length(currentNodeList); - Assert(workerNodeCount > currentNodeCount); + hash_seq_init(&status, workerNodeHash); - /* - * We determine a random position within the worker hash between [1, N], - * assuming that the number of elements in the hash is N. We then get to - * this random position by iterating over the worker hash. Please note that - * the random seed has already been set by the postmaster when starting up. - */ - workerPosition = (random() % workerNodeCount) + 1; - hash_seq_init(&status, WorkerNodesHash); - - for (workerIndex = 0; workerIndex < workerPosition; workerIndex++) + while ((workerNode = hash_seq_search(&status)) != NULL) { - workerNode = (WorkerNode *) hash_seq_search(&status); - } - - while (lookForWorkerNode) - { - bool listMember = ListMember(currentNodeList, workerNode); - - if (!listMember) + if (ListMember(currentList, workerNode)) { - lookForWorkerNode = false; + continue; } - else - { - /* iterate to the next worker node in the hash */ - workerNode = (WorkerNode *) hash_seq_search(&status); - /* reached end of hash; start from the beginning */ - if (workerNode == NULL) - { - hash_seq_init(&status, WorkerNodesHash); - workerNode = (WorkerNode *) hash_seq_search(&status); - } + if (WorkerNodeIsPrimary(workerNode)) + { + workerNodeList = lappend(workerNodeList, workerNode); } } - /* we stopped scanning before completion; therefore clean up scan */ - hash_seq_term(&status); + return workerNodeList; +} + + +/* FindRandomNodeFromList picks a random node from the list provided to it. */ +static WorkerNode * +FindRandomNodeFromList(List *candidateWorkerNodeList) +{ + uint32 candidateNodeCount = list_length(candidateWorkerNodeList); + + /* nb, the random seed has already been set by the postmaster when starting up */ + uint32 workerPosition = (random() % candidateNodeCount); + + WorkerNode *workerNode = + (WorkerNode *) list_nth(candidateWorkerNodeList, workerPosition); return workerNode; } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 344089655..631f18c8f 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -217,7 +217,7 @@ MetadataCreateCommands(void) List *metadataSnapshotCommandList = NIL; List *distributedTableList = DistributedTableList(); List *propagatedTableList = NIL; - List *workerNodeList = ActiveWorkerNodeList(); + List *workerNodeList = ActivePrimaryNodeList(); ListCell *distributedTableCell = NULL; char *nodeListInsertCommand = NULL; bool includeSequenceDefaults = true; @@ -398,6 +398,7 @@ NodeListInsertCommand(List *workerNodeList) StringInfo nodeListInsertCommand = makeStringInfo(); int workerCount = list_length(workerNodeList); int processedWorkerNodeCount = 0; + Oid primaryRole = PrimaryNodeRoleId(); /* if there are no workers, return NULL */ if (workerCount == 0) @@ -405,10 +406,18 @@ NodeListInsertCommand(List *workerNodeList) return nodeListInsertCommand->data; } + if (primaryRole == InvalidOid) + { + ereport(ERROR, (errmsg("bad metadata, noderole does not exist"), + errdetail("you should never see this, please submit " + "a bug report"), + errhint("run ALTER EXTENSION citus UPDATE and try again"))); + } + /* generate the query without any values yet */ appendStringInfo(nodeListInsertCommand, "INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, " - "noderack, hasmetadata, isactive) VALUES "); + "noderack, hasmetadata, isactive, noderole) VALUES "); /* iterate over the worker nodes, add the values */ foreach(workerNodeCell, workerNodeList) @@ -417,15 +426,20 @@ NodeListInsertCommand(List *workerNodeList) char *hasMetadataString = workerNode->hasMetadata ? "TRUE" : "FALSE"; char *isActiveString = workerNode->isActive ? "TRUE" : "FALSE"; + Datum nodeRoleOidDatum = ObjectIdGetDatum(workerNode->nodeRole); + Datum nodeRoleStringDatum = DirectFunctionCall1(enum_out, nodeRoleOidDatum); + char *nodeRoleString = DatumGetCString(nodeRoleStringDatum); + appendStringInfo(nodeListInsertCommand, - "(%d, %d, %s, %d, %s, %s, %s)", + "(%d, %d, %s, %d, %s, %s, %s, '%s'::noderole)", workerNode->nodeId, workerNode->groupId, quote_literal_cstr(workerNode->workerName), workerNode->workerPort, quote_literal_cstr(workerNode->workerRack), hasMetadataString, - isActiveString); + isActiveString, + nodeRoleString); processedWorkerNodeCount++; if (processedWorkerNodeCount != workerCount) @@ -1014,7 +1028,7 @@ SchemaOwnerName(Oid objectId) static bool HasMetadataWorkers(void) { - List *workerNodeList = ActiveWorkerNodeList(); + List *workerNodeList = ActivePrimaryNodeList(); ListCell *workerNodeCell = NULL; foreach(workerNodeCell, workerNodeList) diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 18f6b842d..4b8ed9468 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -1863,10 +1863,10 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey, static uint32 HashPartitionCount(void) { - uint32 nodeCount = WorkerGetLiveNodeCount(); + uint32 groupCount = ActivePrimaryNodeCount(); double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0; - uint32 partitionCount = (uint32) rint(nodeCount * maxReduceTasksPerNode); + uint32 partitionCount = (uint32) rint(groupCount * maxReduceTasksPerNode); return partitionCount; } @@ -4791,7 +4791,7 @@ GreedyAssignTaskList(List *taskList) uint32 taskCount = list_length(taskList); /* get the worker node list and sort the list */ - List *workerNodeList = ActiveWorkerNodeList(); + List *workerNodeList = ActivePrimaryNodeList(); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); /* @@ -5223,7 +5223,7 @@ AssignDualHashTaskList(List *taskList) * if subsequent jobs have a small number of tasks, we won't allocate the * tasks to the same worker repeatedly. */ - List *workerNodeList = ActiveWorkerNodeList(); + List *workerNodeList = ActivePrimaryNodeList(); uint32 workerNodeCount = (uint32) list_length(workerNodeList); uint32 beginningNodeIndex = jobId % workerNodeCount; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index c71075d32..9cf75e2b7 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1535,7 +1535,7 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC } else if (replacePrunedQueryWithDummy) { - List *workerNodeList = ActiveWorkerNodeList(); + List *workerNodeList = ActivePrimaryNodeList(); if (workerNodeList != NIL) { WorkerNode *workerNode = (WorkerNode *) linitial(workerNodeList); diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 52c800c3b..cb6801b1b 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -127,7 +127,7 @@ RecoverPreparedTransactions(void) */ LockRelationOid(DistTransactionRelationId(), ExclusiveLock); - workerList = ActiveWorkerNodeList(); + workerList = ActivePrimaryNodeList(); foreach(workerNodeCell, workerList) { diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 6cf1333c2..a695437cd 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -78,7 +78,7 @@ SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command) void SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList) { - List *workerNodeList = ActiveWorkerNodeList(); + List *workerNodeList = ActivePrimaryNodeList(); ListCell *workerNodeCell = NULL; char *nodeUser = CitusExtensionOwnerName(); ListCell *commandCell = NULL; @@ -128,7 +128,7 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command, { List *connectionList = NIL; ListCell *connectionCell = NULL; - List *workerNodeList = ActiveWorkerNodeList(); + List *workerNodeList = ActivePrimaryNodeList(); ListCell *workerNodeCell = NULL; char *nodeUser = CitusExtensionOwnerName(); diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 99764a1bd..fcd107b75 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -40,7 +40,9 @@ #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "executor/executor.h" +#include "nodes/makefuncs.h" #include "parser/parse_func.h" +#include "parser/parse_type.h" #include "utils/builtins.h" #include "utils/catcache.h" #include "utils/datum.h" @@ -110,6 +112,9 @@ typedef struct MetadataCacheData Oid extraDataContainerFuncId; Oid workerHashFunctionId; Oid extensionOwner; + Oid primaryNodeRoleId; + Oid secondaryNodeRoleId; + Oid unavailableNodeRoleId; } MetadataCacheData; @@ -422,18 +427,24 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement, DistTableCacheEntry *tableEntry = shardEntry->tableEntry; int shardIndex = shardEntry->shardIndex; ShardInterval *shardInterval = tableEntry->sortedShardIntervalArray[shardIndex]; + bool groupContainsNodes = false; ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement); uint32 groupId = groupShardPlacement->groupId; - WorkerNode *workerNode = NodeForGroup(groupId); + WorkerNode *workerNode = PrimaryNodeForGroup(groupId, &groupContainsNodes); - if (workerNode == NULL) + if (workerNode == NULL && !groupContainsNodes) { ereport(ERROR, (errmsg("the metadata is inconsistent"), errdetail("there is a placement in group %u but " "there are no nodes in that group", groupId))); } + if (workerNode == NULL && groupContainsNodes) + { + ereport(ERROR, (errmsg("node group %u does not have a primary node", groupId))); + } + /* copy everything into shardPlacement but preserve the header */ memcpy((((CitusNode *) shardPlacement) + 1), (((CitusNode *) groupShardPlacement) + 1), @@ -1830,7 +1841,7 @@ CitusExtensionOwnerName(void) } -/* return the username of the currently active role */ +/* return the username of the currently active role */ char * CurrentUserName(void) { @@ -1840,6 +1851,99 @@ CurrentUserName(void) } +/* + * LookupNodeRoleValueId returns the Oid of the "pg_catalog.noderole" type, or InvalidOid + * if it does not exist. + */ +static Oid +LookupNodeRoleTypeOid() +{ + Value *schemaName = makeString("pg_catalog"); + Value *typeName = makeString("noderole"); + List *qualifiedName = list_make2(schemaName, typeName); + TypeName *enumTypeName = makeTypeNameFromNameList(qualifiedName); + + Oid nodeRoleTypId; + + /* typenameTypeId but instead of raising an error return InvalidOid */ + Type tup = LookupTypeName(NULL, enumTypeName, NULL, false); + if (tup == NULL) + { + return InvalidOid; + } + + nodeRoleTypId = HeapTupleGetOid(tup); + ReleaseSysCache(tup); + + return nodeRoleTypId; +} + + +/* + * LookupNodeRoleValueId returns the Oid of the value in "pg_catalog.noderole" which + * matches the provided name, or InvalidOid if the noderole enum doesn't exist yet. + */ +static Oid +LookupNodeRoleValueId(char *valueName) +{ + Oid nodeRoleTypId = LookupNodeRoleTypeOid(); + + if (nodeRoleTypId == InvalidOid) + { + return InvalidOid; + } + else + { + Datum nodeRoleIdDatum = ObjectIdGetDatum(nodeRoleTypId); + Datum valueDatum = CStringGetDatum(valueName); + + Datum valueIdDatum = DirectFunctionCall2(enum_in, valueDatum, nodeRoleIdDatum); + + Oid valueId = DatumGetObjectId(valueIdDatum); + return valueId; + } +} + + +/* return the Oid of the 'primary' nodeRole enum value */ +Oid +PrimaryNodeRoleId(void) +{ + if (!MetadataCache.primaryNodeRoleId) + { + MetadataCache.primaryNodeRoleId = LookupNodeRoleValueId("primary"); + } + + return MetadataCache.primaryNodeRoleId; +} + + +/* return the Oid of the 'secodary' nodeRole enum value */ +Oid +SecondaryNodeRoleId(void) +{ + if (!MetadataCache.secondaryNodeRoleId) + { + MetadataCache.secondaryNodeRoleId = LookupNodeRoleValueId("secondary"); + } + + return MetadataCache.secondaryNodeRoleId; +} + + +/* return the Oid of the 'unavailable' nodeRole enum value */ +Oid +UnavailableNodeRoleId(void) +{ + if (!MetadataCache.unavailableNodeRoleId) + { + MetadataCache.unavailableNodeRoleId = LookupNodeRoleValueId("unavailable"); + } + + return MetadataCache.unavailableNodeRoleId; +} + + /* * master_dist_partition_cache_invalidate is a trigger function that performs * relcache invalidations when the contents of pg_dist_partition are changed @@ -2244,6 +2348,7 @@ InitializeWorkerNodeCache(void) strlcpy(workerNode->workerRack, currentNode->workerRack, WORKER_LENGTH); workerNode->hasMetadata = currentNode->hasMetadata; workerNode->isActive = currentNode->isActive; + workerNode->nodeRole = currentNode->nodeRole; if (handleFound) { diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index e72dd99e9..6ff4f8b37 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -18,6 +18,7 @@ #include "access/tupmacs.h" #include "access/xact.h" #include "catalog/indexing.h" +#include "catalog/namespace.h" #include "commands/sequence.h" #include "distributed/colocation_utils.h" #include "distributed/connection_management.h" @@ -53,7 +54,8 @@ static Datum ActivateNode(char *nodeName, int nodePort); static void RemoveNodeFromCluster(char *nodeName, int32 nodePort); static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, bool hasMetadata, bool isActive, - bool *nodeAlreadyExists); + Oid nodeRole, bool *nodeAlreadyExists); +static uint32 CountPrimariesWithMetadata(); static void SetNodeState(char *nodeName, int32 nodePort, bool isActive); static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort); static Datum GenerateNodeTuple(WorkerNode *workerNode); @@ -61,7 +63,7 @@ static int32 GetNextGroupId(void); static uint32 GetMaxGroupId(void); static int GetNextNodeId(void); static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId, - char *nodeRack, bool hasMetadata, bool isActive); + char *nodeRack, bool hasMetadata, bool isActive, Oid nodeRole); static void DeleteNodeRow(char *nodename, int32 nodeport); static List * ParseWorkerNodeFileAndRename(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); @@ -86,7 +88,8 @@ master_add_node(PG_FUNCTION_ARGS) text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); - int32 groupId = 0; + int32 groupId = PG_GETARG_INT32(2); + Oid nodeRole = InvalidOid; char *nodeRack = WORKER_DEFAULT_RACK; bool hasMetadata = false; bool isActive = false; @@ -95,8 +98,18 @@ master_add_node(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); + /* during tests this function is called before nodeRole has been created */ + if (PG_NARGS() == 3) + { + nodeRole = InvalidOid; + } + else + { + nodeRole = PG_GETARG_OID(3); + } + nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, - hasMetadata, isActive, &nodeAlreadyExists); + hasMetadata, isActive, nodeRole, &nodeAlreadyExists); /* * After adding new node, if the node did not already exist, we will activate @@ -123,7 +136,8 @@ master_add_inactive_node(PG_FUNCTION_ARGS) text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); - int32 groupId = 0; + int32 groupId = PG_GETARG_INT32(2); + Oid nodeRole = PG_GETARG_OID(3); char *nodeRack = WORKER_DEFAULT_RACK; bool hasMetadata = false; bool isActive = false; @@ -133,7 +147,7 @@ master_add_inactive_node(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, - hasMetadata, isActive, &nodeAlreadyExists); + hasMetadata, isActive, nodeRole, &nodeAlreadyExists); PG_RETURN_CSTRING(nodeRecord); } @@ -183,16 +197,21 @@ master_disable_node(PG_FUNCTION_ARGS) int32 nodePort = PG_GETARG_INT32(1); char *nodeName = text_to_cstring(nodeNameText); - bool hasActiveShardPlacements = false; bool isActive = false; + WorkerNode *workerNode = NULL; + CheckCitusVersion(ERROR); - DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort); + workerNode = FindWorkerNode(nodeName, nodePort); - hasActiveShardPlacements = NodeHasShardPlacements(nodeName, nodePort, - onlyConsiderActivePlacements); - if (hasActiveShardPlacements) + if (WorkerNodeIsPrimary(workerNode)) + { + DeleteAllReferenceTablePlacementsFromNodeGroup(workerNode->groupId); + } + + if (WorkerNodeIsPrimary(workerNode) && + NodeGroupHasShardPlacements(workerNode->groupId, onlyConsiderActivePlacements)) { ereport(NOTICE, (errmsg("Node %s:%d has active shard placements. Some queries " "may fail after this operation. Use " @@ -246,11 +265,31 @@ GroupForNode(char *nodeName, int nodePort) /* - * NodeForGroup returns the (unique) node which is in this group. - * In a future where we have nodeRole this will return the primary node. + * WorkerNodeIsPrimary returns whether the argument represents a primary node. + */ +bool +WorkerNodeIsPrimary(WorkerNode *worker) +{ + Oid primaryRole = PrimaryNodeRoleId(); + + /* if nodeRole does not yet exist, all nodes are primary nodes */ + if (primaryRole == InvalidOid) + { + return true; + } + + return worker->nodeRole == primaryRole; +} + + +/* + * PrimaryNodeForGroup returns the (unique) primary in the specified group. + * + * If there are any nodes in the requested group and groupContainsNodes is not NULL + * it will set the bool groupContainsNodes references to true. */ WorkerNode * -NodeForGroup(uint32 groupId) +PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes) { WorkerNode *workerNode = NULL; HASH_SEQ_STATUS status; @@ -261,8 +300,17 @@ NodeForGroup(uint32 groupId) while ((workerNode = hash_seq_search(&status)) != NULL) { uint32 workerNodeGroupId = workerNode->groupId; + if (workerNodeGroupId != groupId) + { + continue; + } - if (workerNodeGroupId == groupId) + if (groupContainsNodes != NULL) + { + *groupContainsNodes = true; + } + + if (WorkerNodeIsPrimary(workerNode)) { hash_seq_term(&status); return workerNode; @@ -300,9 +348,13 @@ ActivateNode(char *nodeName, int nodePort) SetNodeState(nodeName, nodePort, isActive); - ReplicateAllReferenceTablesToNode(nodeName, nodePort); - workerNode = FindWorkerNode(nodeName, nodePort); + + if (WorkerNodeIsPrimary(workerNode)) + { + ReplicateAllReferenceTablesToNode(nodeName, nodePort); + } + nodeRecord = GenerateNodeTuple(workerNode); heap_close(pgDistNode, AccessShareLock); @@ -322,6 +374,7 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) ListCell *workerNodeCell = NULL; List *workerNodes = NULL; bool nodeAlreadyExists = false; + Oid nodeRole = InvalidOid; /* nodeRole doesn't exist when this function is called */ CheckCitusVersion(ERROR); @@ -332,7 +385,7 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0, workerNode->workerRack, false, workerNode->isActive, - &nodeAlreadyExists); + nodeRole, &nodeAlreadyExists); } PG_RETURN_BOOL(true); @@ -509,7 +562,6 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) { const bool onlyConsiderActivePlacements = false; char *nodeDeleteCommand = NULL; - bool hasAnyShardPlacements = false; WorkerNode *workerNode = NULL; List *referenceTableList = NIL; uint32 deletedNodeId = INVALID_PLACEMENT_ID; @@ -518,19 +570,26 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) EnsureSuperUser(); workerNode = FindWorkerNode(nodeName, nodePort); + if (workerNode == NULL) + { + ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort))); + } if (workerNode != NULL) { deletedNodeId = workerNode->nodeId; } - DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort); - - hasAnyShardPlacements = NodeHasShardPlacements(nodeName, nodePort, - onlyConsiderActivePlacements); - if (hasAnyShardPlacements) + if (WorkerNodeIsPrimary(workerNode)) { - ereport(ERROR, (errmsg("you cannot remove a node which has shard placements"))); + DeleteAllReferenceTablePlacementsFromNodeGroup(workerNode->groupId); + } + + if (WorkerNodeIsPrimary(workerNode) && + NodeGroupHasShardPlacements(workerNode->groupId, onlyConsiderActivePlacements)) + { + ereport(ERROR, (errmsg("you cannot remove the primary node of a node group " + "which has shard placements"))); } DeleteNodeRow(nodeName, nodePort); @@ -540,16 +599,20 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) * column for colocation group of reference tables so that replication factor will * be equal to worker count. */ - referenceTableList = ReferenceTableOidList(); - if (list_length(referenceTableList) != 0) + if (WorkerNodeIsPrimary(workerNode)) { - Oid firstReferenceTableId = linitial_oid(referenceTableList); - uint32 referenceTableColocationId = TableColocationId(firstReferenceTableId); + referenceTableList = ReferenceTableOidList(); + if (list_length(referenceTableList) != 0) + { + Oid firstReferenceTableId = linitial_oid(referenceTableList); + uint32 referenceTableColocationId = TableColocationId(firstReferenceTableId); - List *workerNodeList = ActiveWorkerNodeList(); - int workerCount = list_length(workerNodeList); + List *workerNodeList = ActivePrimaryNodeList(); + int workerCount = list_length(workerNodeList); - UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount); + UpdateColocationGroupReplicationFactor(referenceTableColocationId, + workerCount); + } } nodeDeleteCommand = NodeDeleteCommand(deletedNodeId); @@ -561,6 +624,30 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) } +/* CountPrimariesWithMetadata returns the number of primary nodes which have metadata. */ +static uint32 +CountPrimariesWithMetadata() +{ + uint32 primariesWithMetadata = 0; + WorkerNode *workerNode = NULL; + + HASH_SEQ_STATUS status; + HTAB *workerNodeHash = GetWorkerNodeHash(); + + hash_seq_init(&status, workerNodeHash); + + while ((workerNode = hash_seq_search(&status)) != NULL) + { + if (workerNode->hasMetadata && WorkerNodeIsPrimary(workerNode)) + { + primariesWithMetadata++; + } + } + + return primariesWithMetadata; +} + + /* * AddNodeMetadata checks the given node information and adds the specified node to the * pg_dist_node table of the master and workers with metadata. @@ -572,15 +659,14 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) */ static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, - bool hasMetadata, bool isActive, bool *nodeAlreadyExists) + bool hasMetadata, bool isActive, Oid nodeRole, bool *nodeAlreadyExists) { Relation pgDistNode = NULL; int nextNodeIdInt = 0; Datum returnData = 0; WorkerNode *workerNode = NULL; char *nodeDeleteCommand = NULL; - char *nodeInsertCommand = NULL; - List *workerNodeList = NIL; + uint32 primariesWithMetadata = 0; EnsureCoordinator(); EnsureSuperUser(); @@ -620,27 +706,43 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, } } + /* if nodeRole hasn't been added yet there's a constraint for one-node-per-group */ + if (nodeRole != InvalidOid) + { + if (nodeRole == PrimaryNodeRoleId()) + { + WorkerNode *existingPrimaryNode = PrimaryNodeForGroup(groupId, NULL); + + if (existingPrimaryNode != NULL) + { + ereport(ERROR, (errmsg("group %d already has a primary node", groupId))); + } + } + } + /* generate the new node id from the sequence */ nextNodeIdInt = GetNextNodeId(); InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata, - isActive); + isActive, nodeRole); workerNode = FindWorkerNode(nodeName, nodePort); - /* send the delete command all nodes with metadata */ + /* send the delete command to all primary nodes with metadata */ nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand); /* finally prepare the insert command and send it to all primary nodes */ - workerNodeList = list_make1(workerNode); - nodeInsertCommand = NodeListInsertCommand(workerNodeList); - SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand); + primariesWithMetadata = CountPrimariesWithMetadata(); + if (primariesWithMetadata != 0) + { + List *workerNodeList = list_make1(workerNode); + char *nodeInsertCommand = NodeListInsertCommand(workerNodeList); + SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand); + } - heap_close(pgDistNode, AccessExclusiveLock); + heap_close(pgDistNode, NoLock); - /* fetch the worker node, and generate the output */ - workerNode = FindWorkerNode(nodeName, nodePort); returnData = GenerateNodeTuple(workerNode); return returnData; @@ -751,6 +853,7 @@ GenerateNodeTuple(WorkerNode *workerNode) values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(workerNode->workerRack); values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(workerNode->hasMetadata); values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(workerNode->isActive); + values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(workerNode->nodeRole); /* open shard relation and insert new tuple */ pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); @@ -888,7 +991,7 @@ EnsureCoordinator(void) */ static void InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack, - bool hasMetadata, bool isActive) + bool hasMetadata, bool isActive, Oid nodeRole) { Relation pgDistNode = NULL; TupleDesc tupleDescriptor = NULL; @@ -907,6 +1010,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char * values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack); values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata); values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(isActive); + values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeRole); /* open shard relation and insert new tuple */ pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); @@ -917,7 +1021,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char * CatalogTupleInsert(pgDistNode, heapTuple); /* close relation and invalidate previous cache entry */ - heap_close(pgDistNode, AccessExclusiveLock); + heap_close(pgDistNode, NoLock); CitusInvalidateRelcacheByRelid(DistNodeRelationId()); @@ -1147,6 +1251,8 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) tupleDescriptor, &isNull); Datum isActive = heap_getattr(heapTuple, Anum_pg_dist_node_isactive, tupleDescriptor, &isNull); + Datum nodeRole = heap_getattr(heapTuple, Anum_pg_dist_node_noderole, + tupleDescriptor, &isNull); Assert(!HeapTupleHasNulls(heapTuple)); @@ -1158,6 +1264,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH); workerNode->hasMetadata = DatumGetBool(hasMetadata); workerNode->isActive = DatumGetBool(isActive); + workerNode->nodeRole = DatumGetObjectId(nodeRole); return workerNode; } diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index c87b7f31d..6ea1d6eb6 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -127,7 +127,7 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) { List *referenceTableList = ReferenceTableOidList(); ListCell *referenceTableCell = NULL; - List *workerNodeList = ActiveWorkerNodeList(); + List *workerNodeList = ActivePrimaryNodeList(); uint32 workerCount = 0; Oid firstReferenceTableId = InvalidOid; uint32 referenceTableColocationId = INVALID_COLOCATION_ID; @@ -228,7 +228,7 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval) { /* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */ Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); - List *workerNodeList = ActiveWorkerNodeList(); + List *workerNodeList = ActivePrimaryNodeList(); ListCell *workerNodeCell = NULL; /* @@ -364,7 +364,7 @@ uint32 CreateReferenceTableColocationId() { uint32 colocationId = INVALID_COLOCATION_ID; - List *workerNodeList = ActiveWorkerNodeList(); + List *workerNodeList = ActivePrimaryNodeList(); int shardCount = 1; int replicationFactor = list_length(workerNodeList); Oid distributionColumnType = InvalidOid; @@ -382,13 +382,13 @@ CreateReferenceTableColocationId() /* - * DeleteAllReferenceTablePlacementsFromNode function iterates over list of reference + * DeleteAllReferenceTablePlacementsFromNodeGroup function iterates over list of reference * tables and deletes all reference table placements from pg_dist_placement table - * for given worker node. However, it does not modify replication factor of the colocation + * for given group. However, it does not modify replication factor of the colocation * group of reference tables. It is caller's responsibility to do that if it is necessary. */ void -DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort) +DeleteAllReferenceTablePlacementsFromNodeGroup(uint32 groupId) { List *referenceTableList = ReferenceTableOidList(); ListCell *referenceTableCell = NULL; @@ -401,7 +401,7 @@ DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort) /* * We sort the reference table list to prevent deadlocks in concurrent - * DeleteAllReferenceTablePlacementsFromNode calls. + * DeleteAllReferenceTablePlacementsFromNodeGroup calls. */ referenceTableList = SortList(referenceTableList, CompareOids); foreach(referenceTableCell, referenceTableList) @@ -409,11 +409,9 @@ DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort) GroupShardPlacement *placement = NULL; StringInfo deletePlacementCommand = makeStringInfo(); - uint32 workerGroup = GroupForNode(workerName, workerPort); - Oid referenceTableId = lfirst_oid(referenceTableCell); List *placements = GroupShardPlacementsForTableOnGroup(referenceTableId, - workerGroup); + groupId); if (list_length(placements) == 0) { /* this happens if the node was previously disabled */ diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 66dcf91ff..93e7767a1 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -115,8 +115,8 @@ extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInt extern void CopyShardPlacement(ShardPlacement *srcPlacement, ShardPlacement *destPlacement); extern uint64 ShardLength(uint64 shardId); -extern bool NodeHasShardPlacements(char *nodeName, int32 nodePort, - bool onlyLookForActivePlacements); +extern bool NodeGroupHasShardPlacements(uint32 groupId, + bool onlyConsiderActivePlacements); extern List * FinalizedShardPlacementList(uint64 shardId); extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk); extern List * BuildShardPlacementList(ShardInterval *shardInterval); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 603801284..089915ae1 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -112,6 +112,11 @@ extern Oid DistPlacementGroupidIndexId(void); extern Oid CitusExtraDataContainerFuncId(void); extern Oid CitusWorkerHashFunctionId(void); +/* nodeRole enum oids */ +extern Oid PrimaryNodeRoleId(void); +extern Oid SecondaryNodeRoleId(void); +extern Oid UnavailableNodeRoleId(void); + /* user related functions */ extern Oid CitusExtensionOwner(void); extern char * CitusExtensionOwnerName(void); diff --git a/src/include/distributed/pg_dist_node.h b/src/include/distributed/pg_dist_node.h index 391f7a909..001c1da7b 100644 --- a/src/include/distributed/pg_dist_node.h +++ b/src/include/distributed/pg_dist_node.h @@ -24,6 +24,7 @@ typedef struct FormData_pg_dist_node int nodeport; bool hasmetadata; bool isactive + Oid noderole; #endif } FormData_pg_dist_node; @@ -38,7 +39,7 @@ typedef FormData_pg_dist_node *Form_pg_dist_node; * compiler constants for pg_dist_node * ---------------- */ -#define Natts_pg_dist_node 7 +#define Natts_pg_dist_node 8 #define Anum_pg_dist_node_nodeid 1 #define Anum_pg_dist_node_groupid 2 #define Anum_pg_dist_node_nodename 3 @@ -46,6 +47,7 @@ typedef FormData_pg_dist_node *Form_pg_dist_node; #define Anum_pg_dist_node_noderack 5 #define Anum_pg_dist_node_hasmetadata 6 #define Anum_pg_dist_node_isactive 7 +#define Anum_pg_dist_node_noderole 8 #define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq" #define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq" diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index feed8fe32..3e1365844 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -14,8 +14,7 @@ extern uint32 CreateReferenceTableColocationId(void); extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort); -extern void DeleteAllReferenceTablePlacementsFromNode(char *workerName, - uint32 workerPort); +extern void DeleteAllReferenceTablePlacementsFromNodeGroup(uint32 groupId); extern List * ReferenceTableOidList(void); extern int CompareOids(const void *leftElement, const void *rightElement); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index fe385bc8f..7ab318087 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -43,6 +43,7 @@ typedef struct WorkerNode char workerRack[WORKER_LENGTH]; /* node's network location */ bool hasMetadata; /* node gets metadata changes */ bool isActive; /* node's state */ + Oid nodeRole; /* the node's role in its group */ } WorkerNode; @@ -57,13 +58,14 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId, uint32 placementIndex); extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); -extern uint32 WorkerGetLiveNodeCount(void); -extern List * ActiveWorkerNodeList(void); +extern uint32 ActivePrimaryNodeCount(void); +extern List * ActivePrimaryNodeList(void); extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort); extern List * ReadWorkerNodes(void); extern void EnsureCoordinator(void); extern uint32 GroupForNode(char *nodeName, int32 nodePorT); -extern WorkerNode * NodeForGroup(uint32 groupId); +extern WorkerNode * PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes); +extern bool WorkerNodeIsPrimary(WorkerNode *worker); /* Function declarations for worker node utilities */ extern int CompareWorkerNodes(const void *leftElement, const void *rightElement); diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 77438b28f..0cb4dc769 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -29,9 +29,9 @@ SELECT master_get_active_worker_nodes(); -- try to add a node that is already in the cluster SELECT * FROM master_add_node('localhost', :worker_1_port); - nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive ---------+---------+-----------+----------+----------+-------------+---------- - 1 | 1 | localhost | 57637 | default | f | t + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole +--------+---------+-----------+----------+----------+-------------+----------+---------- + 1 | 1 | localhost | 57637 | default | f | t | primary (1 row) -- get the active nodes @@ -77,9 +77,9 @@ SELECT master_get_active_worker_nodes(); -- add some shard placements to the cluster SELECT master_activate_node('localhost', :worker_2_port); - master_activate_node ------------------------------------ - (3,3,localhost,57638,default,f,t) + master_activate_node +------------------------------------------- + (3,3,localhost,57638,default,f,t,primary) (1 row) CREATE TABLE cluster_management_test (col_1 text, col_2 int); @@ -111,7 +111,7 @@ SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHER -- try to remove a node with active placements and see that node removal is failed SELECT master_remove_node('localhost', :worker_2_port); -ERROR: you cannot remove a node which has shard placements +ERROR: you cannot remove the primary node of a node group which has shard placements SELECT master_get_active_worker_nodes(); master_get_active_worker_nodes -------------------------------- @@ -138,14 +138,14 @@ SELECT master_get_active_worker_nodes(); -- restore the node for next tests SELECT master_activate_node('localhost', :worker_2_port); - master_activate_node ------------------------------------ - (3,3,localhost,57638,default,f,t) + master_activate_node +------------------------------------------- + (3,3,localhost,57638,default,f,t,primary) (1 row) -- try to remove a node with active placements and see that node removal is failed SELECT master_remove_node('localhost', :worker_2_port); -ERROR: you cannot remove a node which has shard placements +ERROR: you cannot remove the primary node of a node group which has shard placements -- mark all placements in the candidate node as inactive SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset UPDATE pg_dist_placement SET shardstate=3 WHERE groupid=:worker_2_group; @@ -164,7 +164,7 @@ SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHER -- try to remove a node with only inactive placements and see that removal still fails SELECT master_remove_node('localhost', :worker_2_port); -ERROR: you cannot remove a node which has shard placements +ERROR: you cannot remove the primary node of a node group which has shard placements SELECT master_get_active_worker_nodes(); master_get_active_worker_nodes -------------------------------- @@ -180,6 +180,34 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port); (1 row) UPDATE pg_dist_placement SET shardstate=1 WHERE groupid=:worker_2_group; +-- when there is no primary we should get a pretty error +UPDATE pg_dist_node SET noderole = 'secondary' WHERE nodeport=:worker_2_port; +SELECT * FROM cluster_management_test; +ERROR: node group 3 does not have a primary node +-- when there is no node at all in the group we should get a different error +DELETE FROM pg_dist_node WHERE nodeport=:worker_2_port; +SELECT * FROM cluster_management_test; +ERROR: the metadata is inconsistent +DETAIL: there is a placement in group 3 but there are no nodes in that group +-- clean-up +SELECT groupid as new_group FROM master_add_node('localhost', :worker_2_port) \gset +UPDATE pg_dist_placement SET groupid = :new_group WHERE groupid = :worker_2_group; +-- test that you are allowed to remove secondary nodes even if there are placements +SELECT master_add_node('localhost', 9990, groupid => :new_group, noderole => 'secondary'); + master_add_node +-------------------------------------------- + (5,4,localhost,9990,default,f,t,secondary) +(1 row) + +SELECT master_remove_node('localhost', :worker_2_port); +ERROR: you cannot remove the primary node of a node group which has shard placements +SELECT master_remove_node('localhost', 9990); + master_remove_node +-------------------- + +(1 row) + +-- clean-up DROP TABLE cluster_management_test; -- check that adding/removing nodes are propagated to nodes with hasmetadata=true SELECT master_remove_node('localhost', :worker_2_port); @@ -241,24 +269,24 @@ SELECT (1 row) SELECT * FROM pg_dist_node ORDER BY nodeid; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive ---------+---------+----------+----------+----------+-------------+---------- + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole +--------+---------+----------+----------+----------+-------------+----------+---------- (0 rows) -- check that adding two nodes in the same transaction works SELECT master_add_node('localhost', :worker_1_port), master_add_node('localhost', :worker_2_port); - master_add_node | master_add_node ------------------------------------+----------------------------------- - (6,6,localhost,57637,default,f,t) | (7,7,localhost,57638,default,f,t) + master_add_node | master_add_node +-------------------------------------------+------------------------------------------- + (8,7,localhost,57637,default,f,t,primary) | (9,8,localhost,57638,default,f,t,primary) (1 row) SELECT * FROM pg_dist_node ORDER BY nodeid; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive ---------+---------+-----------+----------+----------+-------------+---------- - 6 | 6 | localhost | 57637 | default | f | t - 7 | 7 | localhost | 57638 | default | f | t + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole +--------+---------+-----------+----------+----------+-------------+----------+---------- + 8 | 7 | localhost | 57637 | default | f | t | primary + 9 | 8 | localhost | 57638 | default | f | t | primary (2 rows) -- check that mixed add/remove node commands work fine inside transaction @@ -405,3 +433,39 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); (1 row) +-- check that you can't add more than one primary to a group +SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport = :worker_1_port \gset +SELECT master_add_node('localhost', 9999, groupid => :worker_1_group, noderole => 'primary'); +ERROR: group 12 already has a primary node +-- check that you can add secondaries and unavailable nodes to a group +SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset +SELECT master_add_node('localhost', 9998, groupid => :worker_1_group, noderole => 'secondary'); + master_add_node +---------------------------------------------- + (16,12,localhost,9998,default,f,t,secondary) +(1 row) + +SELECT master_add_node('localhost', 9997, groupid => :worker_1_group, noderole => 'unavailable'); + master_add_node +------------------------------------------------ + (17,12,localhost,9997,default,f,t,unavailable) +(1 row) + +-- add_inactive_node also works with secondaries +SELECT master_add_inactive_node('localhost', 9996, groupid => :worker_2_group, noderole => 'secondary'); + master_add_inactive_node +---------------------------------------------- + (18,14,localhost,9996,default,f,f,secondary) +(1 row) + +-- check that you can't manually add two primaries to a group +INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole) + VALUES ('localhost', 5000, :worker_1_group, 'primary'); +ERROR: there cannot be two primary nodes in a group +CONTEXT: PL/pgSQL function citus.pg_dist_node_trigger_func() line 9 at RAISE +UPDATE pg_dist_node SET noderole = 'primary' + WHERE groupid = :worker_1_group AND nodeport = 9998; +ERROR: there cannot be two primary nodes in a group +CONTEXT: PL/pgSQL function citus.pg_dist_node_trigger_func() line 17 at RAISE +-- don't remove the secondary and unavailable nodes, check that no commands are sent to +-- them in any of the remaining tests diff --git a/src/test/regress/expected/multi_drop_extension.out b/src/test/regress/expected/multi_drop_extension.out index 462d35d64..8775964f0 100644 --- a/src/test/regress/expected/multi_drop_extension.out +++ b/src/test/regress/expected/multi_drop_extension.out @@ -20,15 +20,15 @@ RESET client_min_messages; CREATE EXTENSION citus; -- re-add the nodes to the cluster SELECT master_add_node('localhost', :worker_1_port); - master_add_node ------------------------------------ - (1,1,localhost,57637,default,f,t) + master_add_node +------------------------------------------- + (1,1,localhost,57637,default,f,t,primary) (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------ - (2,2,localhost,57638,default,f,t) + master_add_node +------------------------------------------- + (2,2,localhost,57638,default,f,t,primary) (1 row) -- verify that a table can be created after the extension has been dropped and recreated diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 290c46c9b..4d364178a 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -27,11 +27,11 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s'; -- Show that, with no MX tables, metadata snapshot contains only the delete commands, -- pg_dist_node entries and reference tables SELECT unnest(master_metadata_snapshot()); - unnest --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + unnest +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition TRUNCATE pg_dist_node - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole) (3 rows) -- Create a test table with constraints and SERIAL @@ -57,7 +57,7 @@ SELECT unnest(master_metadata_snapshot()); -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition TRUNCATE pg_dist_node - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole) SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL) @@ -78,7 +78,7 @@ SELECT unnest(master_metadata_snapshot()); -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition TRUNCATE pg_dist_node - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole) SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL) @@ -101,7 +101,7 @@ SELECT unnest(master_metadata_snapshot()); ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition TRUNCATE pg_dist_node - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole) CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres @@ -130,7 +130,7 @@ SELECT unnest(master_metadata_snapshot()); ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition TRUNCATE pg_dist_node - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole) CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres @@ -152,7 +152,7 @@ SELECT unnest(master_metadata_snapshot()); ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition TRUNCATE pg_dist_node - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole) CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres @@ -197,10 +197,10 @@ SELECT * FROM pg_dist_local_group; (1 row) SELECT * FROM pg_dist_node ORDER BY nodeid; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive ---------+---------+-----------+----------+----------+-------------+---------- - 1 | 1 | localhost | 57637 | default | t | t - 2 | 2 | localhost | 57638 | default | f | t + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole +--------+---------+-----------+----------+----------+-------------+----------+---------- + 1 | 1 | localhost | 57637 | default | t | t | primary + 2 | 2 | localhost | 57638 | default | f | t | primary (2 rows) SELECT * FROM pg_dist_partition ORDER BY logicalrelid; @@ -334,10 +334,10 @@ SELECT * FROM pg_dist_local_group; (1 row) SELECT * FROM pg_dist_node ORDER BY nodeid; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive ---------+---------+-----------+----------+----------+-------------+---------- - 1 | 1 | localhost | 57637 | default | t | t - 2 | 2 | localhost | 57638 | default | f | t + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole +--------+---------+-----------+----------+----------+-------------+----------+---------- + 1 | 1 | localhost | 57637 | default | t | t | primary + 2 | 2 | localhost | 57638 | default | f | t | primary (2 rows) SELECT * FROM pg_dist_partition ORDER BY logicalrelid; @@ -1129,9 +1129,9 @@ SELECT create_distributed_table('mx_table', 'a'); \c - postgres - :master_port SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------ - (4,4,localhost,57638,default,f,t) + master_add_node +------------------------------------------- + (4,4,localhost,57638,default,f,t,primary) (1 row) SELECT start_metadata_sync_to_node('localhost', :worker_2_port); @@ -1342,9 +1342,9 @@ WHERE logicalrelid='mx_ref'::regclass; \c - - - :master_port SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "mx_ref" to the node localhost:57638 - master_add_node ------------------------------------ - (5,5,localhost,57638,default,f,t) + master_add_node +------------------------------------------- + (5,5,localhost,57638,default,f,t,primary) (1 row) SELECT shardid, nodename, nodeport diff --git a/src/test/regress/expected/multi_mx_modifications.out b/src/test/regress/expected/multi_mx_modifications.out index ae14b8c7c..56e7cd59d 100644 --- a/src/test/regress/expected/multi_mx_modifications.out +++ b/src/test/regress/expected/multi_mx_modifications.out @@ -481,7 +481,7 @@ INSERT INTO app_analytics_events_mx (app_id, name) VALUES (103, 'Mynt') RETURNIN SELECT setval('app_analytics_events_mx_id_seq'::regclass, :last_value); setval ------------------ - 3659174697238529 + 3940649673949185 (1 row) ALTER SEQUENCE app_analytics_events_mx_id_seq diff --git a/src/test/regress/expected/multi_remove_node_reference_table.out b/src/test/regress/expected/multi_remove_node_reference_table.out index f2a1e7740..082d39a5f 100644 --- a/src/test/regress/expected/multi_remove_node_reference_table.out +++ b/src/test/regress/expected/multi_remove_node_reference_table.out @@ -41,10 +41,12 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; (1 row) -- re-add the node for next tests -SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------------------ - (1380000,1380000,localhost,57638,default,f,t) +SELECT groupid AS worker_2_group FROM master_add_node('localhost', :worker_2_port) \gset +-- add a secondary to check we don't attempt to replicate the table to it +SELECT isactive FROM master_add_node('localhost', 9000, groupid=>:worker_2_group, noderole=>'secondary'); + isactive +---------- + t (1 row) -- remove a node with reference table @@ -55,6 +57,64 @@ SELECT create_reference_table('remove_node_reference_table'); (1 row) +-- make sure when we add a secondary we don't attempt to add placements to it +SELECT isactive FROM master_add_node('localhost', 9001, groupid=>:worker_2_group, noderole=>'secondary'); + isactive +---------- + t +(1 row) + +SELECT count(*) FROM pg_dist_placement WHERE groupid = :worker_2_group; + count +------- + 1 +(1 row) + +-- make sure when we disable a secondary we don't remove any placements +SELECT master_disable_node('localhost', 9001); + master_disable_node +--------------------- + +(1 row) + +SELECT isactive FROM pg_dist_node WHERE nodeport = 9001; + isactive +---------- + f +(1 row) + +SELECT count(*) FROM pg_dist_placement WHERE groupid = :worker_2_group; + count +------- + 1 +(1 row) + +-- make sure when we activate a secondary we don't add any placements +SELECT master_activate_node('localhost', 9001); + master_activate_node +-------------------------------------------------------- + (1380002,1380000,localhost,9001,default,f,t,secondary) +(1 row) + +SELECT count(*) FROM pg_dist_placement WHERE groupid = :worker_2_group; + count +------- + 1 +(1 row) + +-- make sure when we remove a secondary we don't remove any placements +SELECT master_remove_node('localhost', 9001); + master_remove_node +-------------------- + +(1 row) + +SELECT count(*) FROM pg_dist_placement WHERE groupid = :worker_2_group; + count +------- + 1 +(1 row) + -- status before master_remove_node SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; count @@ -164,9 +224,9 @@ ERROR: node at "localhost:57638" does not exist -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 - master_add_node ------------------------------------------------ - (1380001,1380001,localhost,57638,default,f,t) + master_add_node +------------------------------------------------------- + (1380003,1380001,localhost,57638,default,f,t,primary) (1 row) -- try to disable the node before removing it (this used to crash) @@ -185,9 +245,9 @@ SELECT master_remove_node('localhost', :worker_2_port); -- re-add the node for the next test SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 - master_add_node ------------------------------------------------ - (1380002,1380002,localhost,57638,default,f,t) + master_add_node +------------------------------------------------------- + (1380004,1380002,localhost,57638,default,f,t,primary) (1 row) -- remove node in a transaction and ROLLBACK @@ -408,9 +468,9 @@ WHERE -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 - master_add_node ------------------------------------------------ - (1380003,1380003,localhost,57638,default,f,t) + master_add_node +------------------------------------------------------- + (1380005,1380003,localhost,57638,default,f,t,primary) (1 row) -- test inserting a value then removing a node in a transaction @@ -537,9 +597,9 @@ SELECT * FROM remove_node_reference_table; -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 - master_add_node ------------------------------------------------ - (1380004,1380004,localhost,57638,default,f,t) + master_add_node +------------------------------------------------------- + (1380006,1380004,localhost,57638,default,f,t,primary) (1 row) -- test executing DDL command then removing a node in a transaction @@ -662,9 +722,9 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.remove_ -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 - master_add_node ------------------------------------------------ - (1380005,1380005,localhost,57638,default,f,t) + master_add_node +------------------------------------------------------- + (1380007,1380005,localhost,57638,default,f,t,primary) (1 row) -- test DROP table after removing a node in a transaction @@ -730,9 +790,9 @@ SELECT * FROM pg_dist_colocation WHERE colocationid = 1380000; -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------------------ - (1380006,1380006,localhost,57638,default,f,t) + master_add_node +------------------------------------------------------- + (1380008,1380006,localhost,57638,default,f,t,primary) (1 row) -- re-create remove_node_reference_table @@ -865,9 +925,9 @@ WHERE SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 NOTICE: Replicating reference table "table1" to the node localhost:57638 - master_add_node ------------------------------------------------ - (1380007,1380007,localhost,57638,default,f,t) + master_add_node +------------------------------------------------------- + (1380009,1380007,localhost,57638,default,f,t,primary) (1 row) -- test with master_disable_node @@ -915,7 +975,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid ASC; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380001 | 1 | 0 | localhost | 57638 @@ -982,9 +1043,9 @@ WHERE SELECT master_activate_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 NOTICE: Replicating reference table "table1" to the node localhost:57638 - master_activate_node ------------------------------------------------ - (1380007,1380007,localhost,57638,default,f,t) + master_activate_node +------------------------------------------------------- + (1380009,1380007,localhost,57638,default,f,t,primary) (1 row) -- DROP tables to clean workspace diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index bffb670cd..42701eb45 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -24,9 +24,9 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------------------ - (1370000,1370000,localhost,57638,default,f,t) + master_add_node +------------------------------------------------------- + (1370000,1370000,localhost,57638,default,f,t,primary) (1 row) -- verify node is added @@ -122,9 +122,9 @@ WHERE colocationid IN SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "replicate_reference_table_valid" to the node localhost:57638 - master_add_node ------------------------------------------------ - (1370002,1370002,localhost,57638,default,f,t) + master_add_node +------------------------------------------------------- + (1370002,1370002,localhost,57638,default,f,t,primary) (1 row) -- status after master_add_node @@ -175,9 +175,9 @@ WHERE colocationid IN (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------------------ - (1370002,1370002,localhost,57638,default,f,t) + master_add_node +------------------------------------------------------- + (1370002,1370002,localhost,57638,default,f,t,primary) (1 row) -- status after master_add_node @@ -243,9 +243,9 @@ WHERE colocationid IN BEGIN; SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "replicate_reference_table_rollback" to the node localhost:57638 - master_add_node ------------------------------------------------ - (1370003,1370003,localhost,57638,default,f,t) + master_add_node +------------------------------------------------------- + (1370003,1370003,localhost,57638,default,f,t,primary) (1 row) ROLLBACK; @@ -305,9 +305,9 @@ WHERE colocationid IN BEGIN; SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "replicate_reference_table_commit" to the node localhost:57638 - master_add_node ------------------------------------------------ - (1370004,1370004,localhost,57638,default,f,t) + master_add_node +------------------------------------------------------- + (1370004,1370004,localhost,57638,default,f,t,primary) (1 row) COMMIT; @@ -400,9 +400,9 @@ ORDER BY logicalrelid; BEGIN; SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "replicate_reference_table_reference_one" to the node localhost:57638 - master_add_node ------------------------------------------------ - (1370005,1370005,localhost,57638,default,f,t) + master_add_node +------------------------------------------------------- + (1370005,1370005,localhost,57638,default,f,t,primary) (1 row) SELECT upgrade_to_reference_table('replicate_reference_table_hash'); @@ -550,9 +550,9 @@ WHERE colocationid IN BEGIN; SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "replicate_reference_table_drop" to the node localhost:57638 - master_add_node ------------------------------------------------ - (1370009,1370009,localhost,57638,default,f,t) + master_add_node +------------------------------------------------------- + (1370009,1370009,localhost,57638,default,f,t,primary) (1 row) DROP TABLE replicate_reference_table_drop; @@ -612,9 +612,9 @@ WHERE colocationid IN SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "table1" to the node localhost:57638 - master_add_node ------------------------------------------------ - (1370010,1370010,localhost,57638,default,f,t) + master_add_node +------------------------------------------------------- + (1370010,1370010,localhost,57638,default,f,t,primary) (1 row) -- status after master_add_node @@ -657,9 +657,9 @@ SELECT create_reference_table('initially_not_replicated_reference_table'); (1 row) SELECT master_add_inactive_node('localhost', :worker_2_port); - master_add_inactive_node ------------------------------------------------ - (1370011,1370011,localhost,57638,default,f,f) + master_add_inactive_node +------------------------------------------------------- + (1370011,1370011,localhost,57638,default,f,f,primary) (1 row) -- we should see only one shard placements @@ -683,9 +683,9 @@ ORDER BY 1,4,5; -- we should see the two shard placements after activation SELECT master_activate_node('localhost', :worker_2_port); NOTICE: Replicating reference table "initially_not_replicated_reference_table" to the node localhost:57638 - master_activate_node ------------------------------------------------ - (1370011,1370011,localhost,57638,default,f,t) + master_activate_node +------------------------------------------------------- + (1370011,1370011,localhost,57638,default,f,t,primary) (1 row) SELECT @@ -708,9 +708,9 @@ ORDER BY 1,4,5; -- this should have no effect SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------------------ - (1370011,1370011,localhost,57638,default,f,t) + master_add_node +------------------------------------------------------- + (1370011,1370011,localhost,57638,default,f,t,primary) (1 row) -- drop unnecassary tables diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index 0930d2a0a..a1113eb7e 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -77,15 +77,15 @@ DROP EXTENSION citus; CREATE EXTENSION citus; -- re-add the nodes to the cluster SELECT master_add_node('localhost', :worker_1_port); - master_add_node ------------------------------------ - (1,1,localhost,57637,default,f,t) + master_add_node +------------------------------------------- + (1,1,localhost,57637,default,f,t,primary) (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------ - (2,2,localhost,57638,default,f,t) + master_add_node +------------------------------------------- + (2,2,localhost,57638,default,f,t,primary) (1 row) -- create a table with a SERIAL column diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index 5901aa1d8..237454a5b 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -219,10 +219,11 @@ SELECT count(*) FROM mx_table; SELECT master_add_node('localhost', 5432); ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. -SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive ---------+---------+----------+----------+----------+-------------+---------- -(0 rows) +SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; + count +------- + 0 +(1 row) -- master_remove_node \c - - - :master_port @@ -230,9 +231,9 @@ DROP INDEX mx_test_uniq_index; NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' SELECT master_add_node('localhost', 5432); - master_add_node ----------------------------------------------- - (1370000,1370000,localhost,5432,default,f,t) + master_add_node +------------------------------------------------------ + (1370000,1370000,localhost,5432,default,f,t,primary) (1 row) \c - - - :worker_1_port @@ -240,9 +241,9 @@ SELECT master_remove_node('localhost', 5432); ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive ----------+---------+-----------+----------+----------+-------------+---------- - 1370000 | 1370000 | localhost | 5432 | default | f | t + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole +---------+---------+-----------+----------+----------+-------------+----------+---------- + 1370000 | 1370000 | localhost | 5432 | default | f | t | primary (1 row) \c - - - :master_port diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index 4edb74b48..ccb16b2ab 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -767,9 +767,9 @@ SELECT shardid, nodename, nodeport SELECT master_activate_node('localhost', :worker_1_port); NOTICE: Replicating reference table "nation" to the node localhost:57637 NOTICE: Replicating reference table "supplier" to the node localhost:57637 - master_activate_node ------------------------------------ - (1,1,localhost,57637,default,f,t) + master_activate_node +------------------------------------------- + (1,1,localhost,57637,default,f,t,primary) (1 row) RESET citus.shard_replication_factor; diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index bd282da1c..1ad8c8552 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -71,6 +71,24 @@ SELECT master_get_active_worker_nodes(); SELECT 1 FROM master_add_node('localhost', :worker_2_port); UPDATE pg_dist_placement SET shardstate=1 WHERE groupid=:worker_2_group; +-- when there is no primary we should get a pretty error +UPDATE pg_dist_node SET noderole = 'secondary' WHERE nodeport=:worker_2_port; +SELECT * FROM cluster_management_test; + +-- when there is no node at all in the group we should get a different error +DELETE FROM pg_dist_node WHERE nodeport=:worker_2_port; +SELECT * FROM cluster_management_test; + +-- clean-up +SELECT groupid as new_group FROM master_add_node('localhost', :worker_2_port) \gset +UPDATE pg_dist_placement SET groupid = :new_group WHERE groupid = :worker_2_group; + +-- test that you are allowed to remove secondary nodes even if there are placements +SELECT master_add_node('localhost', 9990, groupid => :new_group, noderole => 'secondary'); +SELECT master_remove_node('localhost', :worker_2_port); +SELECT master_remove_node('localhost', 9990); + +-- clean-up DROP TABLE cluster_management_test; -- check that adding/removing nodes are propagated to nodes with hasmetadata=true @@ -162,3 +180,23 @@ DELETE FROM pg_dist_node; \c - - - :master_port SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); + +-- check that you can't add more than one primary to a group +SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport = :worker_1_port \gset +SELECT master_add_node('localhost', 9999, groupid => :worker_1_group, noderole => 'primary'); + +-- check that you can add secondaries and unavailable nodes to a group +SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset +SELECT master_add_node('localhost', 9998, groupid => :worker_1_group, noderole => 'secondary'); +SELECT master_add_node('localhost', 9997, groupid => :worker_1_group, noderole => 'unavailable'); +-- add_inactive_node also works with secondaries +SELECT master_add_inactive_node('localhost', 9996, groupid => :worker_2_group, noderole => 'secondary'); + +-- check that you can't manually add two primaries to a group +INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole) + VALUES ('localhost', 5000, :worker_1_group, 'primary'); +UPDATE pg_dist_node SET noderole = 'primary' + WHERE groupid = :worker_1_group AND nodeport = 9998; + +-- don't remove the secondary and unavailable nodes, check that no commands are sent to +-- them in any of the remaining tests diff --git a/src/test/regress/sql/multi_remove_node_reference_table.sql b/src/test/regress/sql/multi_remove_node_reference_table.sql index 3f0eb9751..abe21203c 100644 --- a/src/test/regress/sql/multi_remove_node_reference_table.sql +++ b/src/test/regress/sql/multi_remove_node_reference_table.sql @@ -31,12 +31,28 @@ SELECT master_remove_node('localhost', :worker_2_port); SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; -- re-add the node for next tests -SELECT master_add_node('localhost', :worker_2_port); +SELECT groupid AS worker_2_group FROM master_add_node('localhost', :worker_2_port) \gset +-- add a secondary to check we don't attempt to replicate the table to it +SELECT isactive FROM master_add_node('localhost', 9000, groupid=>:worker_2_group, noderole=>'secondary'); -- remove a node with reference table CREATE TABLE remove_node_reference_table(column1 int); SELECT create_reference_table('remove_node_reference_table'); +-- make sure when we add a secondary we don't attempt to add placements to it +SELECT isactive FROM master_add_node('localhost', 9001, groupid=>:worker_2_group, noderole=>'secondary'); +SELECT count(*) FROM pg_dist_placement WHERE groupid = :worker_2_group; +-- make sure when we disable a secondary we don't remove any placements +SELECT master_disable_node('localhost', 9001); +SELECT isactive FROM pg_dist_node WHERE nodeport = 9001; +SELECT count(*) FROM pg_dist_placement WHERE groupid = :worker_2_group; +-- make sure when we activate a secondary we don't add any placements +SELECT master_activate_node('localhost', 9001); +SELECT count(*) FROM pg_dist_placement WHERE groupid = :worker_2_group; +-- make sure when we remove a secondary we don't remove any placements +SELECT master_remove_node('localhost', 9001); +SELECT count(*) FROM pg_dist_placement WHERE groupid = :worker_2_group; + -- status before master_remove_node SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; @@ -544,7 +560,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid ASC; \c - - - :master_port diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index e19835e3d..995ec3325 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -119,7 +119,7 @@ SELECT count(*) FROM mx_table; -- master_add_node SELECT master_add_node('localhost', 5432); -SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; +SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; -- master_remove_node \c - - - :master_port