diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 541358ca5..963e17213 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 + 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 6.1-18 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -129,6 +129,8 @@ $(EXTENSION)--6.1-16.sql: $(EXTENSION)--6.1-15.sql $(EXTENSION)--6.1-15--6.1-16. cat $^ > $@ $(EXTENSION)--6.1-17.sql: $(EXTENSION)--6.1-16.sql $(EXTENSION)--6.1-16--6.1-17.sql cat $^ > $@ +$(EXTENSION)--6.1-18.sql: $(EXTENSION)--6.1-17.sql $(EXTENSION)--6.1-17--6.1-18.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.1-17--6.1-18.sql b/src/backend/distributed/citus--6.1-17--6.1-18.sql new file mode 100644 index 000000000..7a37b1637 --- /dev/null +++ b/src/backend/distributed/citus--6.1-17--6.1-18.sql @@ -0,0 +1,24 @@ +/* citus--6.1-17--6.1-18.sql */ + +SET search_path = 'pg_catalog'; + +ALTER TABLE pg_dist_node ADD COLUMN noderole "char" NOT NULL DEFAULT 'p'; + +DROP FUNCTION IF EXISTS master_add_node(text, integer); + +CREATE FUNCTION master_add_node(nodename text, + nodeport integer, + OUT nodeid integer, + OUT groupid integer, + OUT nodename text, + OUT nodeport integer, + OUT noderack text, + OUT hasmetadata boolean, + OUT noderole "char") + RETURNS record + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_add_node$$; +COMMENT ON FUNCTION master_add_node(nodename text, nodeport integer) + IS 'add node to the cluster'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 57d2534ec..4fed5458a 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-17' +default_version = '6.1-18' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 46610c407..d999e93de 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -399,7 +399,7 @@ master_get_local_first_candidate_nodes(PG_FUNCTION_ARGS) functionContext->max_calls = ShardReplicationFactor; /* if enough live nodes, return an extra candidate node as backup */ - liveNodeCount = WorkerGetLiveNodeCount(); + liveNodeCount = WorkerGetLiveGroupCount(); if (liveNodeCount > ShardReplicationFactor) { functionContext->max_calls = ShardReplicationFactor + 1; diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 00561a943..760e1edb5 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -141,7 +141,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) /* if enough live nodes, add an extra candidate node as backup */ attemptableNodeCount = ShardReplicationFactor; - liveNodeCount = WorkerGetLiveNodeCount(); + liveNodeCount = WorkerGetLiveGroupCount(); if (liveNodeCount > ShardReplicationFactor) { attemptableNodeCount = ShardReplicationFactor + 1; diff --git a/src/backend/distributed/master/worker_node_manager.c b/src/backend/distributed/master/worker_node_manager.c index a1daddb11..c4294a07a 100644 --- a/src/backend/distributed/master/worker_node_manager.c +++ b/src/backend/distributed/master/worker_node_manager.c @@ -18,6 +18,7 @@ #include "distributed/worker_manager.h" #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" +#include "distributed/pg_dist_node.h" #include "libpq/hba.h" #include "libpq/ip.h" #include "libpq/libpq-be.h" @@ -37,11 +38,13 @@ int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size * /* Local functions forward declarations */ static char * ClientHostAddress(StringInfo remoteHostStringInfo); -static WorkerNode * FindRandomNodeNotInList(HTAB *WorkerNodesHash, - List *currentNodeList); +static WorkerNode * FindRandomNodeFromList(List *workerNodeList); +static WorkerNode * WorkerGetNodeWithName(const char *hostname); static bool OddNumber(uint32 number); static bool ListMember(List *currentList, WorkerNode *workerNode); +static List * PrimaryNodesNotInList(List *currentList); + /* ------------------------------------------------------------ * Worker node selection functions follow @@ -50,8 +53,8 @@ static bool ListMember(List *currentList, WorkerNode *workerNode); /* * WorkerGetRandomCandidateNode takes in a list of worker nodes, and then allocates - * a new worker node. The allocation is performed by randomly picking a worker node - * which is not in currentNodeList. + * a new worker node. The allocation is performed by randomly picking a primary worker + * node which is not in currentNodeList. * * Note that the function returns null if the worker membership list does not * contain enough nodes to allocate a new worker node. @@ -64,16 +67,15 @@ WorkerGetRandomCandidateNode(List *currentNodeList) uint32 tryCount = WORKER_RACK_TRIES; uint32 tryIndex = 0; - HTAB *workerNodeHash = GetWorkerNodeHash(); + uint32 currentNodeCount = list_length(currentNodeList); + List *candidateWorkerNodeList = PrimaryNodesNotInList(currentNodeList); /* * We check if the shard has already been placed on all nodes known to us. * This check is rather defensive, and has the drawback of performing a full - * scan over the worker node hash for determining the number of live nodes. + * scan over the worker node hash. */ - uint32 currentNodeCount = list_length(currentNodeList); - uint32 liveNodeCount = WorkerGetLiveNodeCount(); - if (currentNodeCount >= liveNodeCount) + if (list_length(candidateWorkerNodeList) == 0) { return NULL; } @@ -81,7 +83,7 @@ WorkerGetRandomCandidateNode(List *currentNodeList) /* if current node list is empty, randomly pick one node and return */ if (currentNodeCount == 0) { - workerNode = FindRandomNodeNotInList(workerNodeHash, NIL); + workerNode = FindRandomNodeFromList(candidateWorkerNodeList); return workerNode; } @@ -111,7 +113,7 @@ WorkerGetRandomCandidateNode(List *currentNodeList) char *workerRack = NULL; bool sameRack = false; - workerNode = FindRandomNodeNotInList(workerNodeHash, currentNodeList); + workerNode = FindRandomNodeFromList(candidateWorkerNodeList); workerRack = workerNode->workerRack; sameRack = (strncmp(workerRack, firstRack, WORKER_LENGTH) == 0); @@ -156,7 +158,7 @@ WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId, /* * WorkerGetLocalFirstCandidateNode takes in a list of worker nodes, and then - * allocates a new worker node. The allocation is performed according to the + * allocates a new primary worker node. The allocation is performed according to the * following policy: if the list is empty, the node where the caller is connecting * from is allocated; if the list is not empty, a node is allocated according * to random policy. @@ -269,10 +271,10 @@ ClientHostAddress(StringInfo clientHostStringInfo) /* - * WorkerGetNodeWithName finds and returns a node from the membership list that + * WorkerGetNodeWithName finds and returns a primary node from the membership list that * has the given hostname. The function returns null if no such node exists. */ -WorkerNode * +static WorkerNode * WorkerGetNodeWithName(const char *hostname) { WorkerNode *workerNode = NULL; @@ -284,7 +286,7 @@ WorkerGetNodeWithName(const char *hostname) while ((workerNode = hash_seq_search(&status)) != NULL) { int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH); - if (nameCompare == 0) + if (nameCompare == 0 && workerNode->nodeRole == NODE_ROLE_PRIMARY) { /* we need to terminate the scan since we break */ hash_seq_term(&status); @@ -297,13 +299,26 @@ WorkerGetNodeWithName(const char *hostname) /* - * WorkerGetLiveNodeCount returns the number of live nodes in the cluster. - * */ + * WorkerGetLiveGroupCount returns the number of groups which have a primary capable of + * accepting writes. + */ uint32 -WorkerGetLiveNodeCount(void) +WorkerGetLiveGroupCount(void) { HTAB *workerNodeHash = GetWorkerNodeHash(); - uint32 liveWorkerCount = hash_get_num_entries(workerNodeHash); + uint32 liveWorkerCount = 0; + HASH_SEQ_STATUS status; + WorkerNode *workerNode = NULL; + + hash_seq_init(&status, workerNodeHash); + + while ((workerNode = hash_seq_search(&status)) != NULL) + { + if (workerNode->nodeRole == NODE_ROLE_PRIMARY) + { + liveWorkerCount++; + } + } return liveWorkerCount; } @@ -312,6 +327,8 @@ WorkerGetLiveNodeCount(void) /* * WorkerNodeList iterates over the hash table that includes the worker nodes, and adds * them to a list which is returned. + * + * It only returns nodes which are primaries. */ List * WorkerNodeList(void) @@ -325,7 +342,14 @@ WorkerNodeList(void) while ((workerNode = hash_seq_search(&status)) != NULL) { - WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode)); + WorkerNode *workerNodeCopy; + + if (workerNode->nodeRole != NODE_ROLE_PRIMARY) + { + continue; + } + + workerNodeCopy = palloc0(sizeof(WorkerNode)); memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode)); workerNodeList = lappend(workerNodeList, workerNodeCopy); } @@ -335,70 +359,50 @@ WorkerNodeList(void) /* - * FindRandomNodeNotInList finds a random node from the shared hash that is not - * a member of the current node list. The caller is responsible for making the - * necessary node count checks to ensure that such a node exists. - * - * Note that this function has a selection bias towards nodes whose positions in - * the shared hash are sequentially adjacent to the positions of nodes that are - * in the current node list. This bias follows from our decision to first pick a - * random node in the hash, and if that node is a member of the current list, to - * simply iterate to the next node in the hash. Overall, this approach trades in - * some selection bias for simplicity in design and for bounded execution time. + * WorkerNodesNotInList scans through the worker node hash and returns a list + * of all primary nodes which are not in currentList. */ -static WorkerNode * -FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList) +static List * +PrimaryNodesNotInList(List *currentList) { + List *workerNodeList = NIL; + HTAB *workerNodeHash = GetWorkerNodeHash(); WorkerNode *workerNode = NULL; HASH_SEQ_STATUS status; - uint32 workerNodeCount = 0; - uint32 currentNodeCount PG_USED_FOR_ASSERTS_ONLY = 0; - bool lookForWorkerNode = true; - uint32 workerPosition = 0; - uint32 workerIndex = 0; - workerNodeCount = hash_get_num_entries(WorkerNodesHash); - currentNodeCount = list_length(currentNodeList); - Assert(workerNodeCount > currentNodeCount); + hash_seq_init(&status, workerNodeHash); - /* - * We determine a random position within the worker hash between [1, N], - * assuming that the number of elements in the hash is N. We then get to - * this random position by iterating over the worker hash. Please note that - * the random seed has already been set by the postmaster when starting up. - */ - workerPosition = (random() % workerNodeCount) + 1; - hash_seq_init(&status, WorkerNodesHash); - - for (workerIndex = 0; workerIndex < workerPosition; workerIndex++) + /* this is O(n*m) but there usually aren't many nodes in currentList */ + while ((workerNode = hash_seq_search(&status)) != NULL) { - workerNode = (WorkerNode *) hash_seq_search(&status); + WorkerNode *workerNodeCopy; + + if ((workerNode->nodeRole != NODE_ROLE_PRIMARY) || + ListMember(currentList, workerNode)) + { + continue; + } + + workerNodeCopy = palloc0(sizeof(WorkerNode)); + memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode)); + workerNodeList = lappend(workerNodeList, workerNodeCopy); } - while (lookForWorkerNode) - { - bool listMember = ListMember(currentNodeList, workerNode); + return workerNodeList; +} - if (!listMember) - { - lookForWorkerNode = false; - } - else - { - /* iterate to the next worker node in the hash */ - workerNode = (WorkerNode *) hash_seq_search(&status); - /* reached end of hash; start from the beginning */ - if (workerNode == NULL) - { - hash_seq_init(&status, WorkerNodesHash); - workerNode = (WorkerNode *) hash_seq_search(&status); - } - } - } +/* FindRandomNodeFromList picks a random node from the list provided to it */ +static WorkerNode * +FindRandomNodeFromList(List *candidateWorkerNodeList) +{ + uint32 candidateNodeCount = list_length(candidateWorkerNodeList); - /* we stopped scanning before completion; therefore clean up scan */ - hash_seq_term(&status); + /* nb, the random seed has already been set by the postmaster when starting up. */ + uint32 workerPosition = (random() % candidateNodeCount); + + WorkerNode *workerNode = (WorkerNode *) list_nth(candidateWorkerNodeList, + workerPosition); return workerNode; } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index e965f362e..67f6d1b01 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -402,7 +402,7 @@ NodeListInsertCommand(List *workerNodeList) /* generate the query without any values yet */ appendStringInfo(nodeListInsertCommand, "INSERT INTO pg_dist_node " - "(nodeid, groupid, nodename, nodeport, noderack, hasmetadata) " + "(nodeid, groupid, nodename, nodeport, noderack, hasmetadata, noderole) " "VALUES "); /* iterate over the worker nodes, add the values */ @@ -412,13 +412,14 @@ NodeListInsertCommand(List *workerNodeList) char *hasMetadaString = workerNode->hasMetadata ? "TRUE" : "FALSE"; appendStringInfo(nodeListInsertCommand, - "(%d, %d, %s, %d, %s, %s)", + "(%d, %d, %s, %d, %s, %s, '%c')", workerNode->nodeId, workerNode->groupId, quote_literal_cstr(workerNode->workerName), workerNode->workerPort, quote_literal_cstr(workerNode->workerRack), - hasMetadaString); + hasMetadaString, + workerNode->nodeRole); processedWorkerNodeCount++; if (processedWorkerNodeCount != workerCount) diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 22ee7aa60..d6340172f 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -1834,7 +1834,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey, static uint32 HashPartitionCount(void) { - uint32 nodeCount = WorkerGetLiveNodeCount(); + uint32 nodeCount = WorkerGetLiveGroupCount(); double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0; uint32 partitionCount = (uint32) rint(nodeCount * maxReduceTasksPerNode); diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 2939ade41..1ae2b51d1 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -1625,6 +1625,7 @@ InitializeWorkerNodeCache(void) workerNode->nodeId = currentNode->nodeId; strlcpy(workerNode->workerRack, currentNode->workerRack, WORKER_LENGTH); workerNode->hasMetadata = currentNode->hasMetadata; + workerNode->nodeRole = currentNode->nodeRole; if (handleFound) { diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 7e26f17a6..06a3e6267 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -53,13 +53,14 @@ int GroupSize = 1; /* local function forward declarations */ static void RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove); static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, - char *nodeRack, bool hasMetadata, bool *nodeAlreadyExists); + char *nodeRack, bool hasMetadata, char noderole, + bool *nodeAlreadyExists); static Datum GenerateNodeTuple(WorkerNode *workerNode); static int32 GetNextGroupId(void); static uint32 GetMaxGroupId(void); static int GetNextNodeId(void); static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId, - char *nodeRack, bool hasMetadata); + char *nodeRack, bool hasMetadata, char noderole); static void DeleteNodeRow(char *nodename, int32 nodeport); static List * ParseWorkerNodeFileAndRename(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); @@ -88,7 +89,8 @@ master_add_node(PG_FUNCTION_ARGS) bool nodeAlreadyExists = false; Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, - hasMetadata, &nodeAlreadyExists); + hasMetadata, NODE_ROLE_PRIMARY, + &nodeAlreadyExists); /* * After adding new node, if the node is not already exist, we replicate all existing @@ -166,7 +168,8 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0, - workerNode->workerRack, false, &nodeAlreadyExists); + workerNode->workerRack, false, NODE_ROLE_PRIMARY, + &nodeAlreadyExists); } PG_RETURN_BOOL(true); @@ -414,7 +417,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove) */ static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, - bool hasMetadata, bool *nodeAlreadyExists) + bool hasMetadata, char noderole, bool *nodeAlreadyExists) { Relation pgDistNode = NULL; int nextNodeIdInt = 0; @@ -465,7 +468,8 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, /* generate the new node id from the sequence */ nextNodeIdInt = GetNextNodeId(); - InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata); + InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, + nodeRack, hasMetadata, noderole); workerNode = FindWorkerNode(nodeName, nodePort); @@ -512,6 +516,7 @@ GenerateNodeTuple(WorkerNode *workerNode) values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(workerNode->workerPort); values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(workerNode->workerRack); values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(workerNode->hasMetadata); + values[Anum_pg_dist_node_noderole - 1] = CharGetDatum(workerNode->nodeRole); /* open shard relation and insert new tuple */ pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); @@ -650,7 +655,7 @@ EnsureCoordinator(void) */ static void InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack, - bool hasMetadata) + bool hasMetadata, char noderole) { Relation pgDistNode = NULL; TupleDesc tupleDescriptor = NULL; @@ -668,6 +673,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char * values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort); values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack); values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata); + values[Anum_pg_dist_node_noderole - 1] = CharGetDatum(noderole); /* open shard relation and insert new tuple */ pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); @@ -867,6 +873,7 @@ ParseWorkerNodeFileAndRename() strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH); workerNode->workerPort = nodePort; workerNode->hasMetadata = false; + workerNode->nodeRole = NODE_ROLE_PRIMARY; workerNodeList = lappend(workerNodeList, workerNode); } @@ -906,6 +913,14 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) tupleDescriptor, &isNull); Datum hasMetadata = heap_getattr(heapTuple, Anum_pg_dist_node_hasmetadata, tupleDescriptor, &isNull); + Datum nodeRole = heap_getattr(heapTuple, Anum_pg_dist_node_noderole, + tupleDescriptor, &isNull); + + /* since the column doesn't exist yet assume it references a primary */ + if (HeapTupleHeaderGetNatts(heapTuple->t_data) < Anum_pg_dist_node_noderole) + { + nodeRole = CharGetDatum(NODE_ROLE_PRIMARY); + } Assert(!HeapTupleHasNulls(heapTuple)); @@ -916,6 +931,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH); strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH); workerNode->hasMetadata = DatumGetBool(hasMetadata); + workerNode->nodeRole = DatumGetChar(nodeRole); return workerNode; } diff --git a/src/include/distributed/pg_dist_node.h b/src/include/distributed/pg_dist_node.h index c5db5129c..2972858ae 100644 --- a/src/include/distributed/pg_dist_node.h +++ b/src/include/distributed/pg_dist_node.h @@ -23,6 +23,7 @@ typedef struct FormData_pg_dist_node text nodename; int nodeport; bool hasmetadata; + char noderole; /* see codes below */ #endif } FormData_pg_dist_node; @@ -37,15 +38,19 @@ typedef FormData_pg_dist_node *Form_pg_dist_node; * compiler constants for pg_dist_node * ---------------- */ -#define Natts_pg_dist_node 6 +#define Natts_pg_dist_node 7 #define Anum_pg_dist_node_nodeid 1 #define Anum_pg_dist_node_groupid 2 #define Anum_pg_dist_node_nodename 3 #define Anum_pg_dist_node_nodeport 4 #define Anum_pg_dist_node_noderack 5 #define Anum_pg_dist_node_hasmetadata 6 +#define Anum_pg_dist_node_noderole 7 #define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq" #define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq" +#define NODE_ROLE_PRIMARY 'p' +#define NODE_ROLE_SECONDARY 's' + #endif /* PG_DIST_NODE_H */ diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 01ff53c4f..122a3d22d 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -43,6 +43,7 @@ typedef struct WorkerNode uint32 groupId; /* node's groupId; same for the nodes that are in the same group */ char workerRack[WORKER_LENGTH]; /* node's network location */ bool hasMetadata; /* node gets metadata changes */ + char nodeRole; /* whether the node is a primary or secondary */ } WorkerNode; @@ -57,8 +58,7 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId, uint32 placementIndex); extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); -extern WorkerNode * WorkerGetNodeWithName(const char *hostname); -extern uint32 WorkerGetLiveNodeCount(void); +extern uint32 WorkerGetLiveGroupCount(void); extern List * WorkerNodeList(void); extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort); extern List * ReadWorkerNodes(void); diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 12495eb82..0d3d41d75 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -9,15 +9,15 @@ ERROR: cannot create reference table "test_reference_table" DETAIL: There are no active worker nodes. -- add the nodes to the cluster SELECT master_add_node('localhost', :worker_1_port); - master_add_node ---------------------------------- - (1,1,localhost,57637,default,f) + master_add_node +----------------------------------- + (1,1,localhost,57637,default,f,p) (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (2,2,localhost,57638,default,f) + master_add_node +----------------------------------- + (2,2,localhost,57638,default,f,p) (1 row) -- get the active nodes @@ -30,9 +30,9 @@ SELECT master_get_active_worker_nodes(); -- try to add a node that is already in the cluster SELECT * FROM master_add_node('localhost', :worker_1_port); - nodeid | groupid | nodename | nodeport | noderack | hasmetadata ---------+---------+-----------+----------+----------+------------- - 1 | 1 | localhost | 57637 | default | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | noderole +--------+---------+-----------+----------+----------+-------------+---------- + 1 | 1 | localhost | 57637 | default | f | p (1 row) -- get the active nodes @@ -59,9 +59,9 @@ SELECT master_get_active_worker_nodes(); -- try to disable a node with no placements see that node is removed SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (3,3,localhost,57638,default,f) + master_add_node +----------------------------------- + (3,3,localhost,57638,default,f,p) (1 row) SELECT master_disable_node('localhost', :worker_2_port); @@ -78,9 +78,9 @@ SELECT master_get_active_worker_nodes(); -- add some shard placements to the cluster SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (4,4,localhost,57638,default,f) + master_add_node +----------------------------------- + (4,4,localhost,57638,default,f,p) (1 row) CREATE TABLE cluster_management_test (col_1 text, col_2 int); @@ -139,9 +139,9 @@ SELECT master_get_active_worker_nodes(); -- restore the node for next tests SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (5,5,localhost,57638,default,f) + master_add_node +----------------------------------- + (5,5,localhost,57638,default,f,p) (1 row) -- try to remove a node with active placements and see that node removal is failed @@ -177,9 +177,9 @@ SELECT master_get_active_worker_nodes(); -- clean-up SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (6,6,localhost,57638,default,f) + master_add_node +----------------------------------- + (6,6,localhost,57638,default,f,p) (1 row) UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port; @@ -193,9 +193,9 @@ SELECT master_remove_node('localhost', :worker_2_port); UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port; SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (7,7,localhost,57638,default,f) + master_add_node +----------------------------------- + (7,7,localhost,57638,default,f,p) (1 row) \c - - - :worker_1_port @@ -222,9 +222,9 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep -- check that added nodes are not propagated to nodes with hasmetadata=false UPDATE pg_dist_node SET hasmetadata=false WHERE nodeport=:worker_1_port; SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (8,8,localhost,57638,default,f) + master_add_node +----------------------------------- + (8,8,localhost,57638,default,f,p) (1 row) \c - - - :worker_1_port @@ -244,24 +244,24 @@ SELECT (1 row) SELECT * FROM pg_dist_node ORDER BY nodeid; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata ---------+---------+----------+----------+----------+------------- + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | noderole +--------+---------+----------+----------+----------+-------------+---------- (0 rows) -- check that adding two nodes in the same transaction works SELECT master_add_node('localhost', :worker_1_port), master_add_node('localhost', :worker_2_port); - master_add_node | master_add_node ----------------------------------+----------------------------------- - (9,9,localhost,57637,default,f) | (10,10,localhost,57638,default,f) + master_add_node | master_add_node +-----------------------------------+------------------------------------- + (9,9,localhost,57637,default,f,p) | (10,10,localhost,57638,default,f,p) (1 row) SELECT * FROM pg_dist_node ORDER BY nodeid; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata ---------+---------+-----------+----------+----------+------------- - 9 | 9 | localhost | 57637 | default | f - 10 | 10 | localhost | 57638 | default | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | noderole +--------+---------+-----------+----------+----------+-------------+---------- + 9 | 9 | localhost | 57637 | default | f | p + 10 | 10 | localhost | 57638 | default | f | p (2 rows) -- check that mixed add/remove node commands work fine inside transaction @@ -273,9 +273,9 @@ SELECT master_remove_node('localhost', :worker_2_port); (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------ - (11,11,localhost,57638,default,f) + master_add_node +------------------------------------- + (11,11,localhost,57638,default,f,p) (1 row) SELECT master_remove_node('localhost', :worker_2_port); @@ -293,9 +293,9 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port; BEGIN; SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------ - (12,12,localhost,57638,default,f) + master_add_node +------------------------------------- + (12,12,localhost,57638,default,f,p) (1 row) SELECT master_remove_node('localhost', :worker_2_port); @@ -305,9 +305,9 @@ SELECT master_remove_node('localhost', :worker_2_port); (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------ - (13,13,localhost,57638,default,f) + master_add_node +------------------------------------- + (13,13,localhost,57638,default,f,p) (1 row) COMMIT; @@ -333,15 +333,15 @@ SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; (2 rows) SELECT master_add_node('localhost', :worker_1_port); - master_add_node ------------------------------------ - (14,14,localhost,57637,default,f) + master_add_node +------------------------------------- + (14,14,localhost,57637,default,f,p) (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------ - (15,15,localhost,57638,default,f) + master_add_node +------------------------------------- + (15,15,localhost,57638,default,f,p) (1 row) -- check that a distributed table can be created after adding a node in a transaction @@ -353,9 +353,9 @@ SELECT master_remove_node('localhost', :worker_2_port); BEGIN; SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------ - (16,16,localhost,57638,default,f) + master_add_node +------------------------------------- + (16,16,localhost,57638,default,f,p) (1 row) CREATE TABLE temp(col1 text, col2 int); diff --git a/src/test/regress/expected/multi_drop_extension.out b/src/test/regress/expected/multi_drop_extension.out index 645a7a935..cd4f752b8 100644 --- a/src/test/regress/expected/multi_drop_extension.out +++ b/src/test/regress/expected/multi_drop_extension.out @@ -21,15 +21,15 @@ RESET client_min_messages; CREATE EXTENSION citus; -- re-add the nodes to the cluster SELECT master_add_node('localhost', :worker_1_port); - master_add_node ---------------------------------- - (1,1,localhost,57637,default,f) + master_add_node +----------------------------------- + (1,1,localhost,57637,default,f,p) (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (2,2,localhost,57638,default,f) + master_add_node +----------------------------------- + (2,2,localhost,57638,default,f,p) (1 row) -- verify that a table can be created after the extension has been dropped and recreated diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 294f70af7..4763b0c5e 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -75,6 +75,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-14'; ALTER EXTENSION citus UPDATE TO '6.1-15'; ALTER EXTENSION citus UPDATE TO '6.1-16'; ALTER EXTENSION citus UPDATE TO '6.1-17'; +ALTER EXTENSION citus UPDATE TO '6.1-18'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index a297de3fa..40543ae25 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -28,11 +28,11 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s'; -- Show that, with no MX tables, metadata snapshot contains only the delete commands, -- pg_dist_node entries and reference tables SELECT unnest(master_metadata_snapshot()); - unnest ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + unnest +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, 'p'),(1, 1, 'localhost', 57637, 'default', FALSE, 'p') (3 rows) -- Create a test table with constraints and SERIAL @@ -58,7 +58,7 @@ SELECT unnest(master_metadata_snapshot()); -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, 'p'),(1, 1, 'localhost', 57637, 'default', FALSE, 'p') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE @@ -80,7 +80,7 @@ SELECT unnest(master_metadata_snapshot()); -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, 'p'),(1, 1, 'localhost', 57637, 'default', FALSE, 'p') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE @@ -104,7 +104,7 @@ SELECT unnest(master_metadata_snapshot()); ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, 'p'),(1, 1, 'localhost', 57637, 'default', FALSE, 'p') CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres @@ -134,7 +134,7 @@ SELECT unnest(master_metadata_snapshot()); ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, 'p'),(1, 1, 'localhost', 57637, 'default', FALSE, 'p') CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres @@ -157,7 +157,7 @@ SELECT unnest(master_metadata_snapshot()); ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, noderole) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, 'p'),(1, 1, 'localhost', 57637, 'default', FALSE, 'p') CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres @@ -203,10 +203,10 @@ SELECT * FROM pg_dist_local_group; (1 row) SELECT * FROM pg_dist_node ORDER BY nodeid; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata ---------+---------+-----------+----------+----------+------------- - 1 | 1 | localhost | 57637 | default | t - 2 | 2 | localhost | 57638 | default | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | noderole +--------+---------+-----------+----------+----------+-------------+---------- + 1 | 1 | localhost | 57637 | default | t | p + 2 | 2 | localhost | 57638 | default | f | p (2 rows) SELECT * FROM pg_dist_partition ORDER BY logicalrelid; @@ -333,10 +333,10 @@ SELECT * FROM pg_dist_local_group; (1 row) SELECT * FROM pg_dist_node ORDER BY nodeid; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata ---------+---------+-----------+----------+----------+------------- - 1 | 1 | localhost | 57637 | default | t - 2 | 2 | localhost | 57638 | default | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | noderole +--------+---------+-----------+----------+----------+-------------+---------- + 1 | 1 | localhost | 57637 | default | t | p + 2 | 2 | localhost | 57638 | default | f | p (2 rows) SELECT * FROM pg_dist_partition ORDER BY logicalrelid; @@ -1133,9 +1133,9 @@ SELECT create_distributed_table('mx_table', 'a'); \c - postgres - :master_port SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (4,4,localhost,57638,default,f) + master_add_node +----------------------------------- + (4,4,localhost,57638,default,f,p) (1 row) SELECT start_metadata_sync_to_node('localhost', :worker_2_port); @@ -1317,9 +1317,9 @@ WHERE logicalrelid='mx_ref'::regclass; \c - - - :master_port SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "mx_ref" to all workers - master_add_node ---------------------------------- - (5,5,localhost,57638,default,f) + master_add_node +----------------------------------- + (5,5,localhost,57638,default,f,p) (1 row) SELECT shardid, nodename, nodeport diff --git a/src/test/regress/expected/multi_remove_node_reference_table.out b/src/test/regress/expected/multi_remove_node_reference_table.out index 0f6934fe4..0d14e7e80 100644 --- a/src/test/regress/expected/multi_remove_node_reference_table.out +++ b/src/test/regress/expected/multi_remove_node_reference_table.out @@ -43,9 +43,9 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------------------- - (1380000,1380000,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1380000,1380000,localhost,57638,default,f,p) (1 row) -- remove a node with reference table @@ -165,9 +165,9 @@ ERROR: could not find valid entry for node "localhost:57638" -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers - master_add_node ---------------------------------------------- - (1380001,1380001,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1380001,1380001,localhost,57638,default,f,p) (1 row) -- remove node in a transaction and ROLLBACK @@ -388,9 +388,9 @@ WHERE -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers - master_add_node ---------------------------------------------- - (1380002,1380002,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1380002,1380002,localhost,57638,default,f,p) (1 row) -- test inserting a value then removing a node in a transaction @@ -517,9 +517,9 @@ SELECT * FROM remove_node_reference_table; -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers - master_add_node ---------------------------------------------- - (1380003,1380003,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1380003,1380003,localhost,57638,default,f,p) (1 row) -- test executing DDL command then removing a node in a transaction @@ -642,9 +642,9 @@ Table "public.remove_node_reference_table" -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers - master_add_node ---------------------------------------------- - (1380004,1380004,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1380004,1380004,localhost,57638,default,f,p) (1 row) -- test DROP table after removing a node in a transaction @@ -710,9 +710,9 @@ SELECT * FROM pg_dist_colocation WHERE colocationid = 1380000; -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------------------- - (1380005,1380005,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1380005,1380005,localhost,57638,default,f,p) (1 row) -- re-create remove_node_reference_table @@ -845,9 +845,9 @@ WHERE SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers NOTICE: Replicating reference table "table1" to all workers - master_add_node ---------------------------------------------- - (1380006,1380006,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1380006,1380006,localhost,57638,default,f,p) (1 row) -- test with master_disable_node @@ -962,9 +962,9 @@ WHERE SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers NOTICE: Replicating reference table "table1" to all workers - master_add_node ---------------------------------------------- - (1380007,1380007,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1380007,1380007,localhost,57638,default,f,p) (1 row) -- DROP tables to clean workspace diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index 615eb09ad..11da64165 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -25,9 +25,9 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------------------- - (1370000,1370000,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1370000,1370000,localhost,57638,default,f,p) (1 row) -- verify node is added @@ -124,9 +124,9 @@ WHERE colocationid IN SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "replicate_reference_table_valid" to all workers - master_add_node ---------------------------------------------- - (1370002,1370002,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1370002,1370002,localhost,57638,default,f,p) (1 row) -- status after master_add_node @@ -177,9 +177,9 @@ WHERE colocationid IN (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------------------- - (1370002,1370002,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1370002,1370002,localhost,57638,default,f,p) (1 row) -- status after master_add_node @@ -245,9 +245,9 @@ WHERE colocationid IN BEGIN; SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "replicate_reference_table_rollback" to all workers - master_add_node ---------------------------------------------- - (1370003,1370003,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1370003,1370003,localhost,57638,default,f,p) (1 row) ROLLBACK; @@ -307,9 +307,9 @@ WHERE colocationid IN BEGIN; SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "replicate_reference_table_commit" to all workers - master_add_node ---------------------------------------------- - (1370004,1370004,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1370004,1370004,localhost,57638,default,f,p) (1 row) COMMIT; @@ -402,9 +402,9 @@ ORDER BY logicalrelid; BEGIN; SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "replicate_reference_table_reference_one" to all workers - master_add_node ---------------------------------------------- - (1370005,1370005,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1370005,1370005,localhost,57638,default,f,p) (1 row) SELECT upgrade_to_reference_table('replicate_reference_table_hash'); @@ -551,9 +551,9 @@ WHERE colocationid IN BEGIN; SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "replicate_reference_table_drop" to all workers - master_add_node ---------------------------------------------- - (1370009,1370009,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1370009,1370009,localhost,57638,default,f,p) (1 row) DROP TABLE replicate_reference_table_drop; @@ -613,9 +613,9 @@ WHERE colocationid IN SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "table1" to all workers - master_add_node ---------------------------------------------- - (1370010,1370010,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1370010,1370010,localhost,57638,default,f,p) (1 row) -- status after master_add_node diff --git a/src/test/regress/expected/multi_stage_protocol.out b/src/test/regress/expected/multi_stage_protocol.out new file mode 100644 index 000000000..48675149b --- /dev/null +++ b/src/test/regress/expected/multi_stage_protocol.out @@ -0,0 +1,116 @@ +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1420000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1420000; +-- run master_create_empty_shard with differing placement policies +CREATE TABLE append_table (a int); +SELECT master_create_distributed_table('append_table', 'a', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SET citus.shard_placement_policy TO 'round-robin'; +SELECT master_create_empty_shard('append_table'); + master_create_empty_shard +--------------------------- + 1420000 +(1 row) + +SELECT master_create_empty_shard('append_table'); + master_create_empty_shard +--------------------------- + 1420001 +(1 row) + +SELECT * FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'append_table'::regclass); + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1420000 | 1 | 0 | localhost | 57638 | 311 + 1420000 | 1 | 0 | localhost | 57637 | 312 + 1420001 | 1 | 0 | localhost | 57637 | 313 + 1420001 | 1 | 0 | localhost | 57638 | 314 +(4 rows) + +UPDATE pg_dist_node SET noderole = 's' WHERE nodeport = :worker_2_port; +-- round robin only considers primary nodes +SELECT master_create_empty_shard('append_table'); +ERROR: could only find 1 of 2 possible nodes +SET citus.shard_replication_factor = 1; +SET citus.shard_placement_policy TO 'random'; +-- make sure it works when there's only one primary +SELECT master_create_empty_shard('append_table'); + master_create_empty_shard +--------------------------- + 1420003 +(1 row) + +SELECT * FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'append_table'::regclass); + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1420000 | 1 | 0 | localhost | 57638 | 311 + 1420000 | 1 | 0 | localhost | 57637 | 312 + 1420001 | 1 | 0 | localhost | 57637 | 313 + 1420001 | 1 | 0 | localhost | 57638 | 314 + 1420003 | 1 | 0 | localhost | 57637 | 315 +(5 rows) + +SELECT setseed(0.5); + setseed +--------- + +(1 row) + +UPDATE pg_dist_node SET noderole = 'p' WHERE nodeport = :worker_2_port; +SELECT master_create_empty_shard('append_table'); + master_create_empty_shard +--------------------------- + 1420004 +(1 row) + +SELECT master_create_empty_shard('append_table'); + master_create_empty_shard +--------------------------- + 1420005 +(1 row) + +SELECT master_create_empty_shard('append_table'); + master_create_empty_shard +--------------------------- + 1420006 +(1 row) + +SELECT master_create_empty_shard('append_table'); + master_create_empty_shard +--------------------------- + 1420007 +(1 row) + +SELECT master_create_empty_shard('append_table'); + master_create_empty_shard +--------------------------- + 1420008 +(1 row) + +SELECT master_create_empty_shard('append_table'); + master_create_empty_shard +--------------------------- + 1420009 +(1 row) + +SELECT * FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'append_table'::regclass); + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1420000 | 1 | 0 | localhost | 57638 | 311 + 1420000 | 1 | 0 | localhost | 57637 | 312 + 1420001 | 1 | 0 | localhost | 57637 | 313 + 1420001 | 1 | 0 | localhost | 57638 | 314 + 1420003 | 1 | 0 | localhost | 57637 | 315 + 1420004 | 1 | 0 | localhost | 57638 | 316 + 1420005 | 1 | 0 | localhost | 57638 | 317 + 1420006 | 1 | 0 | localhost | 57638 | 318 + 1420007 | 1 | 0 | localhost | 57638 | 319 + 1420008 | 1 | 0 | localhost | 57637 | 320 + 1420009 | 1 | 0 | localhost | 57637 | 321 +(11 rows) + +-- clean up +DROP TABLE append_table; diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index a5e2f4330..a57851515 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -78,15 +78,15 @@ DROP EXTENSION citus; CREATE EXTENSION citus; -- re-add the nodes to the cluster SELECT master_add_node('localhost', :worker_1_port); - master_add_node ---------------------------------- - (1,1,localhost,57637,default,f) + master_add_node +----------------------------------- + (1,1,localhost,57637,default,f,p) (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (2,2,localhost,57638,default,f) + master_add_node +----------------------------------- + (2,2,localhost,57638,default,f,p) (1 row) -- create a table with a SERIAL column diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index a5165c40a..894c4de52 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -217,16 +217,16 @@ SELECT master_add_node('localhost', 5432); ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata ---------+---------+----------+----------+----------+------------- + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | noderole +--------+---------+----------+----------+----------+-------------+---------- (0 rows) -- master_remove_node \c - - - :master_port SELECT master_add_node('localhost', 5432); - master_add_node --------------------------------------------- - (1370000,1370000,localhost,5432,default,f) + master_add_node +---------------------------------------------- + (1370000,1370000,localhost,5432,default,f,p) (1 row) \c - - - :worker_1_port @@ -234,9 +234,9 @@ SELECT master_remove_node('localhost', 5432); ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata ----------+---------+-----------+----------+----------+------------- - 1370000 | 1370000 | localhost | 5432 | default | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | noderole +---------+---------+-----------+----------+----------+-------------+---------- + 1370000 | 1370000 | localhost | 5432 | default | f | p (1 row) \c - - - :master_port diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 39593a977..550cbb16e 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -31,6 +31,7 @@ test: multi_master_protocol test: multi_load_data test: multi_insert_select +test: multi_stage_protocol # ---------- # Miscellaneous tests to check our query planning behavior diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index e04c99149..7fc1f5bf5 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -769,9 +769,9 @@ SELECT shardid, nodename, nodeport SELECT master_add_node('localhost', :worker_1_port); NOTICE: Replicating reference table "nation" to all workers NOTICE: Replicating reference table "supplier" to all workers - master_add_node ---------------------------------- - (3,3,localhost,57637,default,f) + master_add_node +----------------------------------- + (3,3,localhost,57637,default,f,p) (1 row) RESET citus.shard_replication_factor; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 647cedde4..0f7584aa1 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -75,6 +75,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-14'; ALTER EXTENSION citus UPDATE TO '6.1-15'; ALTER EXTENSION citus UPDATE TO '6.1-16'; ALTER EXTENSION citus UPDATE TO '6.1-17'; +ALTER EXTENSION citus UPDATE TO '6.1-18'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) diff --git a/src/test/regress/sql/multi_stage_protocol.sql b/src/test/regress/sql/multi_stage_protocol.sql new file mode 100644 index 000000000..164a3c1ab --- /dev/null +++ b/src/test/regress/sql/multi_stage_protocol.sql @@ -0,0 +1,41 @@ +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1420000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1420000; + +-- run master_create_empty_shard with differing placement policies + +CREATE TABLE append_table (a int); +SELECT master_create_distributed_table('append_table', 'a', 'append'); + +SET citus.shard_placement_policy TO 'round-robin'; +SELECT master_create_empty_shard('append_table'); +SELECT master_create_empty_shard('append_table'); + +SELECT * FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'append_table'::regclass); + +UPDATE pg_dist_node SET noderole = 's' WHERE nodeport = :worker_2_port; + +-- round robin only considers primary nodes +SELECT master_create_empty_shard('append_table'); + +SET citus.shard_replication_factor = 1; +SET citus.shard_placement_policy TO 'random'; + +-- make sure it works when there's only one primary +SELECT master_create_empty_shard('append_table'); + +SELECT * FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'append_table'::regclass); + +SELECT setseed(0.5); +UPDATE pg_dist_node SET noderole = 'p' WHERE nodeport = :worker_2_port; + +SELECT master_create_empty_shard('append_table'); +SELECT master_create_empty_shard('append_table'); +SELECT master_create_empty_shard('append_table'); +SELECT master_create_empty_shard('append_table'); +SELECT master_create_empty_shard('append_table'); +SELECT master_create_empty_shard('append_table'); + +SELECT * FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'append_table'::regclass); + +-- clean up +DROP TABLE append_table;