From 9d6699b07c55485e899e494ea073699b6f1b6daa Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Thu, 22 Sep 2016 16:57:06 +0300 Subject: [PATCH] Switch from pg_worker_list.conf file to pg_dist_node metadata table. Related to #786 This change adds the `pg_dist_node` table that contains the information about the workers in the cluster, replacing the previously used `pg_worker_list.conf` file (or the one specified with `citus.worker_list_file`). Upon update, `pg_worker_list.conf` file is read and `pg_dist_node` table is populated with the file's content. After that, `pg_worker_list.conf` file is renamed to `pg_worker_list.conf.obsolete` For adding and removing nodes, the change also includes two new UDFs: `master_add_node` and `master_remove_node`, which require superuser permissions. 'citus.worker_list_file' guc is kept for update purposes but not used after the update is finished. --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--6.0-3--6.0-4.sql | 61 ++ src/backend/distributed/citus.control | 2 +- .../master/master_metadata_utility.c | 54 ++ .../distributed/master/master_node_protocol.c | 2 +- .../distributed/master/master_repair_shards.c | 1 - .../distributed/master/worker_node_manager.c | 488 ++----------- .../planner/multi_physical_planner.c | 6 +- src/backend/distributed/shared_library_init.c | 6 +- .../distributed/utils/metadata_cache.c | 200 ++++++ src/backend/distributed/utils/node_metadata.c | 665 ++++++++++++++++++ src/backend/distributed/worker/task_tracker.c | 8 - .../distributed/master_metadata_utility.h | 2 + src/include/distributed/metadata_cache.h | 8 + src/include/distributed/pg_dist_node.h | 49 ++ src/include/distributed/worker_manager.h | 28 +- .../expected/multi_cluster_management.out | 134 ++++ .../regress/expected/multi_drop_extension.out | 13 + src/test/regress/expected/multi_extension.out | 1 + src/test/regress/expected/multi_table_ddl.out | 13 + src/test/regress/multi_binary_schedule | 1 + src/test/regress/multi_schedule | 1 + .../regress/multi_task_tracker_extra_schedule | 1 + src/test/regress/pg_regress_multi.pl | 7 - .../regress/sql/multi_cluster_management.sql | 49 ++ src/test/regress/sql/multi_drop_extension.sql | 4 + src/test/regress/sql/multi_extension.sql | 1 + src/test/regress/sql/multi_table_ddl.sql | 4 + 28 files changed, 1328 insertions(+), 485 deletions(-) create mode 100644 src/backend/distributed/citus--6.0-3--6.0-4.sql create mode 100644 src/backend/distributed/utils/node_metadata.c create mode 100644 src/include/distributed/pg_dist_node.h create mode 100644 src/test/regress/expected/multi_cluster_management.out create mode 100644 src/test/regress/sql/multi_cluster_management.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index ae9f079c7..390849e39 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -8,7 +8,7 @@ EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ - 6.0-1 6.0-2 6.0-3 + 6.0-1 6.0-2 6.0-3 6.0-4 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -64,6 +64,8 @@ $(EXTENSION)--6.0-2.sql: $(EXTENSION)--6.0-1.sql $(EXTENSION)--6.0-1--6.0-2.sql cat $^ > $@ $(EXTENSION)--6.0-3.sql: $(EXTENSION)--6.0-2.sql $(EXTENSION)--6.0-2--6.0-3.sql cat $^ > $@ +$(EXTENSION)--6.0-4.sql: $(EXTENSION)--6.0-3.sql $(EXTENSION)--6.0-3--6.0-4.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.0-3--6.0-4.sql b/src/backend/distributed/citus--6.0-3--6.0-4.sql new file mode 100644 index 000000000..076f8e876 --- /dev/null +++ b/src/backend/distributed/citus--6.0-3--6.0-4.sql @@ -0,0 +1,61 @@ +SET search_path = 'pg_catalog'; + +CREATE SEQUENCE citus.pg_dist_groupid_seq + MINVALUE 1 + MAXVALUE 4294967296; + +CREATE SEQUENCE citus.pg_dist_node_nodeid_seq + MINVALUE 1 + MAXVALUE 4294967296; + +ALTER SEQUENCE citus.pg_dist_groupid_seq SET SCHEMA pg_catalog; +ALTER SEQUENCE citus.pg_dist_node_nodeid_seq SET SCHEMA pg_catalog; + +/* add pg_dist_node */ +CREATE TABLE citus.pg_dist_node( + nodeid int NOT NULL DEFAULT nextval('pg_dist_groupid_seq') PRIMARY KEY, + groupid int NOT NULL DEFAULT nextval('pg_dist_node_nodeid_seq'), + nodename text NOT NULL, + nodeport int NOT NULL DEFAULT 5432, + noderack text NOT NULL DEFAULT 'default', + UNIQUE (nodename, nodeport) +); + +ALTER TABLE citus.pg_dist_node SET SCHEMA pg_catalog; + +CREATE FUNCTION master_dist_node_cache_invalidate() + RETURNS trigger + LANGUAGE C + AS 'MODULE_PATHNAME', $$master_dist_node_cache_invalidate$$; +COMMENT ON FUNCTION master_dist_node_cache_invalidate() + IS 'invalidate internal cache of nodes when pg_dist_nodes changes'; +CREATE TRIGGER dist_node_cache_invalidate + AFTER INSERT OR UPDATE OR DELETE + ON pg_catalog.pg_dist_node + FOR EACH ROW EXECUTE PROCEDURE master_dist_node_cache_invalidate(); + +CREATE FUNCTION master_add_node(nodename text, + nodeport integer) + RETURNS record + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_add_node$$; +COMMENT ON FUNCTION master_add_node(nodename text, + nodeport integer) + IS 'add node to the cluster'; + +CREATE FUNCTION master_remove_node(nodename text, nodeport integer) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_remove_node$$; +COMMENT ON FUNCTION master_remove_node(nodename text, nodeport integer) + IS 'remove node from the cluster'; + +/* this only needs to run once, now. */ +CREATE FUNCTION master_initialize_node_metadata() + RETURNS BOOL + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_initialize_node_metadata$$; + +SELECT master_initialize_node_metadata(); + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index c9b545b6d..732479937 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.0-3' +default_version = '6.0-4' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 7ec879ff9..769fa2d81 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -26,6 +26,7 @@ #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard_placement.h" +#include "distributed/relay_utility.h" #include "distributed/worker_manager.h" #include "nodes/makefuncs.h" #include "parser/scansup.h" @@ -199,6 +200,45 @@ ShardLength(uint64 shardId) } +/* + * NodeHasActiveShardPlacements returns whether any active shards are placed on this node + */ +bool +NodeHasActiveShardPlacements(char *nodeName, int32 nodePort) +{ + const int scanKeyCount = 3; + const bool indexOK = false; + + bool hasFinalizedPlacements = false; + + HeapTuple heapTuple = NULL; + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[scanKeyCount]; + + Relation pgShardPlacement = heap_open(DistShardPlacementRelationId(), + AccessShareLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_placement_nodename, + BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName)); + ScanKeyInit(&scanKey[1], Anum_pg_dist_shard_placement_nodeport, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodePort)); + ScanKeyInit(&scanKey[2], Anum_pg_dist_shard_placement_shardstate, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(FILE_FINALIZED)); + + scanDescriptor = systable_beginscan(pgShardPlacement, + DistShardPlacementNodeidIndexId(), indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + hasFinalizedPlacements = HeapTupleIsValid(heapTuple); + + systable_endscan(scanDescriptor); + heap_close(pgShardPlacement, AccessShareLock); + + return hasFinalizedPlacements; +} + + /* * FinalizedShardPlacementList finds shard placements for the given shardId from * system catalogs, chooses placements that are in finalized state, and returns @@ -589,6 +629,20 @@ EnsureTableOwner(Oid relationId) } +/* + * EnsureSuperUser check that the current user is a superuser and errors out if not. + */ +void +EnsureSuperUser(void) +{ + if (!superuser()) + { + ereport(ERROR, (errmsg("operation is not allowed"), + errhint("Run the command with a superuser."))); + } +} + + /* * Return a table's owner as a string. */ diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index f2c52c6b5..72543659a 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -459,7 +459,7 @@ master_get_round_robin_candidate_nodes(PG_FUNCTION_ARGS) /* * master_get_active_worker_nodes returns a set of active worker host names and * port numbers in deterministic order. Currently we assume that all worker - * nodes in pg_worker_list.conf are active. + * nodes in pg_dist_node are active. */ Datum master_get_active_worker_nodes(PG_FUNCTION_ARGS) diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 20429ba49..f7e94d51d 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -122,7 +122,6 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) } targetNode = palloc0(sizeof(WorkerNode)); - targetNode->inWorkerFile = true; strlcpy(targetNode->workerName, targetPlacement->nodeName, WORKER_LENGTH); targetNode->workerPort = targetPlacement->nodePort; diff --git a/src/backend/distributed/master/worker_node_manager.c b/src/backend/distributed/master/worker_node_manager.c index 5db1ef492..70d28d383 100644 --- a/src/backend/distributed/master/worker_node_manager.c +++ b/src/backend/distributed/master/worker_node_manager.c @@ -16,6 +16,7 @@ #include "commands/dbcommands.h" #include "distributed/worker_manager.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" #include "libpq/hba.h" #include "libpq/ip.h" @@ -30,25 +31,16 @@ /* Config variables managed via guc.c */ -char *WorkerListFileName; /* location of pg_worker_list.conf */ +char *WorkerListFileName; int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size */ -static HTAB *WorkerNodesHash = NULL; /* worker node hash in shared memory */ -static shmem_startup_hook_type prev_shmem_startup_hook = NULL; - /* Local functions forward declarations */ static char * ClientHostAddress(StringInfo remoteHostStringInfo); -static bool OddNumber(uint32 number); static WorkerNode * FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList); +static bool OddNumber(uint32 number); static bool ListMember(List *currentList, WorkerNode *workerNode); -static Size WorkerNodeShmemSize(void); -static void WorkerNodeShmemAndWorkerListInit(void); -static uint32 WorkerNodeHashCode(const void *key, Size keySize); -static int WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize); -static List * ParseWorkerNodeFile(const char *workerNodeFilename); -static void ResetWorkerNodesHash(HTAB *WorkerNodesHash); static bool WorkerNodeResponsive(const char *workerName, uint32 workerPort); @@ -59,14 +51,8 @@ static bool WorkerNodeResponsive(const char *workerName, uint32 workerPort); /* * WorkerGetRandomCandidateNode takes in a list of worker nodes, and then allocates - * a new worker node. The allocation is performed according to the following - * policy: if the list is empty, a random node is allocated; if the list has one - * node (or an odd number of nodes), the new node is allocated on a different - * rack than the first node; and if the list has two nodes (or an even number of - * nodes), the new node is allocated on the same rack as the first node, but is - * different from all the nodes in the list. This node allocation policy ensures - * that shard locality is maintained within a rack, but no single rack failure - * can result in data loss. + * a new worker node. The allocation is performed by randomly picking a worker node + * which is not in currentNodeList. * * Note that the function returns null if the worker membership list does not * contain enough nodes to allocate a new worker node. @@ -79,6 +65,8 @@ WorkerGetRandomCandidateNode(List *currentNodeList) uint32 tryCount = WORKER_RACK_TRIES; uint32 tryIndex = 0; + HTAB *workerNodeHash = GetWorkerNodeHash(); + /* * We check if the shard has already been placed on all nodes known to us. * This check is rather defensive, and has the drawback of performing a full @@ -94,7 +82,7 @@ WorkerGetRandomCandidateNode(List *currentNodeList) /* if current node list is empty, randomly pick one node and return */ if (currentNodeCount == 0) { - workerNode = FindRandomNodeNotInList(WorkerNodesHash, NIL); + workerNode = FindRandomNodeNotInList(workerNodeHash, NIL); return workerNode; } @@ -124,7 +112,7 @@ WorkerGetRandomCandidateNode(List *currentNodeList) char *workerRack = NULL; bool sameRack = false; - workerNode = FindRandomNodeNotInList(WorkerNodesHash, currentNodeList); + workerNode = FindRandomNodeNotInList(workerNodeHash, currentNodeList); workerRack = workerNode->workerRack; sameRack = (strncmp(workerRack, firstRack, WORKER_LENGTH) == 0); @@ -289,121 +277,64 @@ WorkerNode * WorkerGetNodeWithName(const char *hostname) { WorkerNode *workerNode = NULL; - HASH_SEQ_STATUS status; - hash_seq_init(&status, WorkerNodesHash); + HTAB *workerNodeHash = GetWorkerNodeHash(); - workerNode = (WorkerNode *) hash_seq_search(&status); - while (workerNode != NULL) + hash_seq_init(&status, workerNodeHash); + + while ((workerNode = hash_seq_search(&status)) != NULL) { - if (workerNode->inWorkerFile) + int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH); + if (nameCompare == 0) { - int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH); - if (nameCompare == 0) - { - hash_seq_term(&status); - break; - } + /* we need to terminate the scan since we break */ + hash_seq_term(&status); + break; } - - workerNode = (WorkerNode *) hash_seq_search(&status); } return workerNode; } -/* Returns the number of live nodes in the cluster. */ +/* + * WorkerGetLiveNodeCount returns the number of live nodes in the cluster. + * */ uint32 WorkerGetLiveNodeCount(void) { - WorkerNode *workerNode = NULL; - uint32 liveWorkerCount = 0; - - HASH_SEQ_STATUS status; - hash_seq_init(&status, WorkerNodesHash); - - workerNode = (WorkerNode *) hash_seq_search(&status); - while (workerNode != NULL) - { - if (workerNode->inWorkerFile) - { - liveWorkerCount++; - } - - workerNode = (WorkerNode *) hash_seq_search(&status); - } + HTAB *workerNodeHash = GetWorkerNodeHash(); + uint32 liveWorkerCount = hash_get_num_entries(workerNodeHash); return liveWorkerCount; } -/* Inserts the live worker nodes to a list, and returns the list. */ +/* + * WorkerNodeList iterates over the hash table that includes the worker nodes, and adds + * them to a list which is returned. + */ List * WorkerNodeList(void) { List *workerNodeList = NIL; WorkerNode *workerNode = NULL; - + HTAB *workerNodeHash = GetWorkerNodeHash(); HASH_SEQ_STATUS status; - hash_seq_init(&status, WorkerNodesHash); - workerNode = (WorkerNode *) hash_seq_search(&status); - while (workerNode != NULL) + hash_seq_init(&status, workerNodeHash); + + while ((workerNode = hash_seq_search(&status)) != NULL) { - if (workerNode->inWorkerFile) - { - workerNodeList = lappend(workerNodeList, workerNode); - } - - workerNode = (WorkerNode *) hash_seq_search(&status); + WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode)); + memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode)); + workerNodeList = lappend(workerNodeList, workerNodeCopy); } return workerNodeList; } -/* - * WorkerNodeActive looks up a worker node with the given name and port number - * in the current membership list. If such a worker node exists, the function - * returns true. - */ -bool -WorkerNodeActive(const char *nodeName, uint32 nodePort) -{ - bool workerNodeActive = false; - bool handleFound = false; - WorkerNode *workerNode = NULL; - void *hashKey = NULL; - - WorkerNode *searchedNode = (WorkerNode *) palloc0(sizeof(WorkerNode)); - strlcpy(searchedNode->workerName, nodeName, WORKER_LENGTH); - searchedNode->workerPort = nodePort; - - hashKey = (void *) searchedNode; - workerNode = (WorkerNode *) hash_search(WorkerNodesHash, hashKey, - HASH_FIND, &handleFound); - if (workerNode != NULL) - { - if (workerNode->inWorkerFile) - { - workerNodeActive = true; - } - } - - return workerNodeActive; -} - - -/* Returns true if given number is odd; returns false otherwise. */ -static bool -OddNumber(uint32 number) -{ - bool oddNumber = ((number % 2) == 1); - return oddNumber; -} - - /* * FindRandomNodeNotInList finds a random node from the shared hash that is not * a member of the current node list. The caller is responsible for making the @@ -449,7 +380,7 @@ FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList) { bool listMember = ListMember(currentNodeList, workerNode); - if (workerNode->inWorkerFile && !listMember) + if (!listMember) { lookForWorkerNode = false; } @@ -474,6 +405,17 @@ FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList) } +/* + * OddNumber function returns true if given number is odd; returns false otherwise. + */ +static bool +OddNumber(uint32 number) +{ + bool oddNumber = ((number % 2) == 1); + return oddNumber; +} + + /* Checks if given worker node is a member of the current list. */ static bool ListMember(List *currentList, WorkerNode *workerNode) @@ -495,97 +437,6 @@ ListMember(List *currentList, WorkerNode *workerNode) } -/* ------------------------------------------------------------ - * Worker node shared hash functions follow - * ------------------------------------------------------------ - */ - -/* Organize, at startup, that the resources for worker node management are allocated. */ -void -WorkerNodeRegister(void) -{ - RequestAddinShmemSpace(WorkerNodeShmemSize()); - - prev_shmem_startup_hook = shmem_startup_hook; - shmem_startup_hook = WorkerNodeShmemAndWorkerListInit; -} - - -/* Estimates the shared memory size used for managing worker nodes. */ -static Size -WorkerNodeShmemSize(void) -{ - Size size = 0; - Size hashSize = 0; - - hashSize = hash_estimate_size(MaxWorkerNodesTracked, sizeof(WorkerNode)); - size = add_size(size, hashSize); - - return size; -} - - -/* Initializes the shared memory used for managing worker nodes. */ -static void -WorkerNodeShmemAndWorkerListInit(void) -{ - HASHCTL info; - int hashFlags = 0; - long maxTableSize = 0; - long initTableSize = 0; - - maxTableSize = (long) MaxWorkerNodesTracked; - initTableSize = maxTableSize / 8; - - /* - * Allocate the control structure for the hash table that maps worker node - * name and port numbers (char[]:uint32) to general node membership and - * health information. - */ - memset(&info, 0, sizeof(info)); - info.keysize = WORKER_LENGTH + sizeof(uint32); - info.entrysize = sizeof(WorkerNode); - info.hash = WorkerNodeHashCode; - info.match = WorkerNodeCompare; - hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE); - - WorkerNodesHash = ShmemInitHash("Worker Node Hash", - initTableSize, maxTableSize, - &info, hashFlags); - - /* - * Load the intial contents of the worker node hash table from the - * configuration file. - */ - LoadWorkerNodeList(WorkerListFileName); - - if (prev_shmem_startup_hook != NULL) - { - prev_shmem_startup_hook(); - } -} - - -/* - * WorkerNodeHashCode computes the hash code for a worker node from the node's - * host name and port number. Nodes that only differ by their rack locations - * hash to the same value. - */ -static uint32 -WorkerNodeHashCode(const void *key, Size keySize) -{ - const WorkerNode *worker = (const WorkerNode *) key; - const char *workerName = worker->workerName; - const uint32 *workerPort = &(worker->workerPort); - - /* standard hash function outlined in Effective Java, Item 8 */ - uint32 result = 17; - result = 37 * result + string_hash(workerName, WORKER_LENGTH); - result = 37 * result + tag_hash(workerPort, sizeof(uint32)); - return result; -} - - /* * CompareWorkerNodes compares two pointers to worker nodes using the exact * same logic employed by WorkerNodeCompare. @@ -609,7 +460,7 @@ CompareWorkerNodes(const void *leftElement, const void *rightElement) * number. Two nodes that only differ by their rack locations are considered to * be equal to each other. */ -static int +int WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize) { const WorkerNode *workerLhs = (const WorkerNode *) lhsKey; @@ -629,251 +480,6 @@ WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize) } -/* - * LoadWorkerNodeList reads and parses given membership file, and loads worker - * nodes from this membership file into the shared hash. The function relies on - * hba.c's tokenization method for parsing, and therefore the membership file - * has the same syntax as other configuration files such as ph_hba.conf. - * - * Note that this function allows for reloading membership configuration files - * at runtime. When that happens, old worker nodes that do not appear in the - * file are marked as stale, but are still kept in the shared hash. - */ -void -LoadWorkerNodeList(const char *workerFilename) -{ - List *workerList = NIL; - ListCell *workerCell = NULL; - uint32 workerCount = 0; - - workerList = ParseWorkerNodeFile(workerFilename); - - workerCount = list_length(workerList); - if (workerCount > MaxWorkerNodesTracked) - { - ereport(FATAL, (errcode(ERRCODE_CONFIG_FILE_ERROR), - errmsg("worker node count: %u exceeds max allowed value: %d", - workerCount, MaxWorkerNodesTracked))); - } - else - { - ereport(INFO, (errmsg("reading nodes from worker file: %s", workerFilename))); - } - - /* before reading file's lines, reset worker node hash */ - ResetWorkerNodesHash(WorkerNodesHash); - - /* parse file lines */ - foreach(workerCell, workerList) - { - WorkerNode *workerNode = NULL; - WorkerNode *parsedNode = lfirst(workerCell); - void *hashKey = NULL; - bool handleFound = false; - - /* - * Search for the parsed worker node in the hash, and then insert parsed - * values. When searching, we make the hashKey point to the beginning of - * the parsed node; we previously set the key length and key comparison - * function to include both the node name and the port number. - */ - hashKey = (void *) parsedNode; - workerNode = (WorkerNode *) hash_search(WorkerNodesHash, hashKey, - HASH_ENTER, &handleFound); - - if (handleFound) - { - /* display notification if worker node's rack changed */ - char *oldWorkerRack = workerNode->workerRack; - char *newWorkerRack = parsedNode->workerRack; - - if (strncmp(oldWorkerRack, newWorkerRack, WORKER_LENGTH) != 0) - { - ereport(INFO, (errmsg("worker node: \"%s:%u\" changed rack location", - workerNode->workerName, workerNode->workerPort))); - } - - /* display warning if worker node already appeared in this file */ - if (workerNode->inWorkerFile) - { - ereport(WARNING, (errmsg("multiple lines for worker node: \"%s:%u\"", - workerNode->workerName, - workerNode->workerPort))); - } - } - - strlcpy(workerNode->workerName, parsedNode->workerName, WORKER_LENGTH); - strlcpy(workerNode->workerRack, parsedNode->workerRack, WORKER_LENGTH); - workerNode->workerPort = parsedNode->workerPort; - workerNode->inWorkerFile = parsedNode->inWorkerFile; - - pfree(parsedNode); - } -} - - -/* - * ParseWorkerNodeFile opens and parses the node name and node port from the - * specified configuration file. - */ -static List * -ParseWorkerNodeFile(const char *workerNodeFilename) -{ - FILE *workerFileStream = NULL; - List *workerNodeList = NIL; - char workerNodeLine[MAXPGPATH]; - char *workerFilePath = make_absolute_path(workerNodeFilename); - char *workerPatternTemplate = "%%%u[^# \t]%%*[ \t]%%%u[^# \t]%%*[ \t]%%%u[^# \t]"; - char workerLinePattern[1024]; - const int workerNameIndex = 0; - const int workerPortIndex = 1; - - memset(workerLinePattern, '\0', sizeof(workerLinePattern)); - - workerFileStream = AllocateFile(workerFilePath, PG_BINARY_R); - if (workerFileStream == NULL) - { - if (errno == ENOENT) - { - ereport(DEBUG1, (errmsg("worker list file located at \"%s\" is not present", - workerFilePath))); - } - else - { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not open worker list file \"%s\": %m", - workerFilePath))); - } - return NIL; - } - - /* build pattern to contain node name length limit */ - snprintf(workerLinePattern, sizeof(workerLinePattern), workerPatternTemplate, - WORKER_LENGTH, MAX_PORT_LENGTH, WORKER_LENGTH); - - while (fgets(workerNodeLine, sizeof(workerNodeLine), workerFileStream) != NULL) - { - const int workerLineLength = strnlen(workerNodeLine, MAXPGPATH); - WorkerNode *workerNode = NULL; - char *linePointer = NULL; - int32 nodePort = PostPortNumber; /* default port number */ - int fieldCount = 0; - bool lineIsInvalid = false; - char nodeName[WORKER_LENGTH + 1]; - char nodeRack[WORKER_LENGTH + 1]; - char nodePortString[MAX_PORT_LENGTH + 1]; - - memset(nodeName, '\0', sizeof(nodeName)); - strlcpy(nodeRack, WORKER_DEFAULT_RACK, sizeof(nodeRack)); - memset(nodePortString, '\0', sizeof(nodePortString)); - - if (workerLineLength == MAXPGPATH - 1) - { - ereport(ERROR, (errcode(ERRCODE_CONFIG_FILE_ERROR), - errmsg("worker node list file line exceeds the maximum " - "length of %d", MAXPGPATH))); - } - - /* trim trailing newlines preserved by fgets, if any */ - linePointer = workerNodeLine + workerLineLength - 1; - while (linePointer >= workerNodeLine && - (*linePointer == '\n' || *linePointer == '\r')) - { - *linePointer-- = '\0'; - } - - /* skip leading whitespace */ - for (linePointer = workerNodeLine; *linePointer; linePointer++) - { - if (!isspace((unsigned char) *linePointer)) - { - break; - } - } - - /* if the entire line is whitespace or a comment, skip it */ - if (*linePointer == '\0' || *linePointer == '#') - { - continue; - } - - /* parse line; node name is required, but port and rack are optional */ - fieldCount = sscanf(linePointer, workerLinePattern, - nodeName, nodePortString, nodeRack); - - /* adjust field count for zero based indexes */ - fieldCount--; - - /* raise error if no fields were assigned */ - if (fieldCount < workerNameIndex) - { - lineIsInvalid = true; - } - - /* no special treatment for nodeName: already parsed by sscanf */ - - /* if a second token was specified, convert to integer port */ - if (fieldCount >= workerPortIndex) - { - char *nodePortEnd = NULL; - - errno = 0; - nodePort = strtol(nodePortString, &nodePortEnd, 10); - - if (errno != 0 || (*nodePortEnd) != '\0' || nodePort <= 0) - { - lineIsInvalid = true; - } - } - - if (lineIsInvalid) - { - ereport(ERROR, (errcode(ERRCODE_CONFIG_FILE_ERROR), - errmsg("could not parse worker node line: %s", - workerNodeLine), - errhint("Lines in the worker node file must contain a valid " - "node name and, optionally, a positive port number. " - "Comments begin with a '#' character and extend to " - "the end of their line."))); - } - - /* allocate worker node structure and set fields */ - workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode)); - - strlcpy(workerNode->workerName, nodeName, WORKER_LENGTH); - strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH); - workerNode->workerPort = nodePort; - workerNode->inWorkerFile = true; - - workerNodeList = lappend(workerNodeList, workerNode); - } - - FreeFile(workerFileStream); - free(workerFilePath); - - return workerNodeList; -} - - -/* Marks all worker nodes in the shared hash as stale. */ -static void -ResetWorkerNodesHash(HTAB *WorkerNodesHash) -{ - WorkerNode *workerNode = NULL; - - HASH_SEQ_STATUS status; - hash_seq_init(&status, WorkerNodesHash); - - workerNode = (WorkerNode *) hash_seq_search(&status); - while (workerNode != NULL) - { - workerNode->inWorkerFile = false; - - workerNode = (WorkerNode *) hash_seq_search(&status); - } -} - - /* ResponsiveWorkerNodeList returns a list of all responsive worker nodes */ List * ResponsiveWorkerNodeList(void) diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 3bbe0c1d2..c0c7b833a 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -5166,11 +5166,11 @@ ActivePlacementList(List *placementList) foreach(placementCell, placementList) { ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); - bool workerNodeActive = false; + WorkerNode *workerNode = NULL; /* check if the worker node for this shard placement is active */ - workerNodeActive = WorkerNodeActive(placement->nodeName, placement->nodePort); - if (workerNodeActive) + workerNode = FindWorkerNode(placement->nodeName, placement->nodePort); + if (workerNode != NULL) { activePlacementList = lappend(activePlacementList, placement); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 98c88c184..cd07cea6d 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -150,9 +150,6 @@ _PG_init(void) /* organize that task tracker is started once server is up */ TaskTrackerRegister(); - /* initialize worker node manager */ - WorkerNodeRegister(); - /* initialize transaction callbacks */ RegisterRouterExecutorXactCallbacks(); RegisterShardPlacementXactCallbacks(); @@ -193,6 +190,7 @@ CreateRequiredDirectories(void) static void RegisterCitusConfigVariables(void) { + /* keeping temporarily for updates from pre-6.0 versions */ DefineCustomStringVariable( "citus.worker_list_file", gettext_noop("Sets the server's \"worker_list\" configuration file."), @@ -200,7 +198,7 @@ RegisterCitusConfigVariables(void) &WorkerListFileName, NULL, PGC_POSTMASTER, - GUC_SUPERUSER_ONLY, + GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL, NULL, NULL, NULL); NormalizeWorkerListPath(); diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index b6c14caf7..01e37c7db 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -26,9 +26,11 @@ #include "distributed/colocation_utils.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" +#include "distributed/pg_dist_node.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/shardinterval_utils.h" +#include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "parser/parse_func.h" #include "utils/builtins.h" @@ -49,17 +51,23 @@ static bool extensionLoaded = false; static Oid distShardRelationId = InvalidOid; static Oid distShardPlacementRelationId = InvalidOid; +static Oid distNodeRelationId = InvalidOid; static Oid distPartitionRelationId = InvalidOid; static Oid distPartitionLogicalRelidIndexId = InvalidOid; static Oid distPartitionColocationidIndexId = InvalidOid; static Oid distShardLogicalRelidIndexId = InvalidOid; static Oid distShardShardidIndexId = InvalidOid; static Oid distShardPlacementShardidIndexId = InvalidOid; +static Oid distShardPlacementNodeidIndexId = InvalidOid; static Oid extraDataContainerFuncId = InvalidOid; /* Hash table for informations about each partition */ static HTAB *DistTableCacheHash = NULL; +/* Hash table for informations about worker nodes */ +static HTAB *WorkerNodeHash = NULL; +static bool workerNodeHashValid = false; + /* built first time through in InitializePartitionCache */ static ScanKeyData DistPartitionScanKey[1]; static ScanKeyData DistShardScanKey[1]; @@ -78,8 +86,11 @@ static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount); static void InitializeDistTableCache(void); +static void InitializeWorkerNodeCache(void); +static uint32 WorkerNodeHashCode(const void *key, Size keySize); static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); +static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId); static List * LookupDistShardTuples(Oid relationId); static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, @@ -93,6 +104,7 @@ static void CachedRelationLookup(const char *relationName, Oid *cachedOid); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_dist_partition_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_shard_cache_invalidate); +PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate); /* @@ -616,6 +628,16 @@ DistShardPlacementRelationId(void) } +/* return oid of pg_dist_node relation */ +Oid +DistNodeRelationId(void) +{ + CachedRelationLookup("pg_dist_node", &distNodeRelationId); + + return distNodeRelationId; +} + + /* return oid of pg_dist_partition relation */ Oid DistPartitionRelationId(void) @@ -680,6 +702,17 @@ DistShardPlacementShardidIndexId(void) } +/* return oid of pg_dist_shard_placement_nodeid_index */ +Oid +DistShardPlacementNodeidIndexId(void) +{ + CachedRelationLookup("pg_dist_shard_placement_nodeid_index", + &distShardPlacementNodeidIndexId); + + return distShardPlacementNodeidIndexId; +} + + /* return oid of the citus_extradata_container(internal) function */ Oid CitusExtraDataContainerFuncId(void) @@ -896,6 +929,29 @@ master_dist_shard_cache_invalidate(PG_FUNCTION_ARGS) } +/* + * master_dist_node_cache_invalidate is a trigger function that performs + * relcache invalidations when the contents of pg_dist_node are changed + * on the SQL level. + * + * NB: We decided there is little point in checking permissions here, there + * are much easier ways to waste CPU than causing cache invalidations. + */ +Datum +master_dist_node_cache_invalidate(PG_FUNCTION_ARGS) +{ + if (!CALLED_AS_TRIGGER(fcinfo)) + { + ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("must be called as trigger"))); + } + + CitusInvalidateRelcacheByRelid(DistNodeRelationId()); + + PG_RETURN_DATUM(PointerGetDatum(NULL)); +} + + /* initialize the infrastructure for the metadata cache */ static void InitializeDistTableCache(void) @@ -944,6 +1000,133 @@ InitializeDistTableCache(void) } +/* + * GetWorkerNodeHash is a wrapper around InitializeWorkerNodeCache(). It + * triggers InitializeWorkerNodeCache when the workerHash is invalid. Otherwise, + * it returns the hash. + */ +HTAB * +GetWorkerNodeHash(void) +{ + if (!workerNodeHashValid) + { + InitializeWorkerNodeCache(); + + workerNodeHashValid = true; + } + + return WorkerNodeHash; +} + + +/* + * InitializeWorkerNodeCache initialize the infrastructure for the worker node cache. + * The function reads the worker nodes from the metadata table, adds them to the hash and + * finally registers an invalidation callback. + */ +static void +InitializeWorkerNodeCache(void) +{ + static bool invalidationRegistered = false; + HTAB *oldWorkerNodeHash = NULL; + List *workerNodeList = NIL; + ListCell *workerNodeCell = NULL; + HASHCTL info; + int hashFlags = 0; + long maxTableSize = (long) MaxWorkerNodesTracked; + + /* make sure we've initialized CacheMemoryContext */ + if (CacheMemoryContext == NULL) + { + CreateCacheMemoryContext(); + } + + /* + * Create the hash that holds the worker nodes. The key is the combination of + * nodename and nodeport, instead of the unique nodeid because worker nodes are + * searched by the nodename and nodeport in every physical plan creation. + */ + memset(&info, 0, sizeof(info)); + info.keysize = +sizeof(uint32) + WORKER_LENGTH + sizeof(uint32); + info.entrysize = sizeof(WorkerNode); + info.hcxt = CacheMemoryContext; + info.hash = WorkerNodeHashCode; + info.match = WorkerNodeCompare; + hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE; + + oldWorkerNodeHash = WorkerNodeHash; + WorkerNodeHash = hash_create("Worker Node Hash", + maxTableSize, + &info, hashFlags); + + /* read the list from pg_dist_node */ + workerNodeList = ReadWorkerNodes(); + + /* iterate over the worker node list */ + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = NULL; + WorkerNode *currentNode = lfirst(workerNodeCell); + void *hashKey = NULL; + bool handleFound = false; + + /* search for the worker node in the hash, and then insert the values */ + hashKey = (void *) currentNode; + workerNode = (WorkerNode *) hash_search(WorkerNodeHash, hashKey, + HASH_ENTER, &handleFound); + + /* fill the newly allocated workerNode in the cache */ + strlcpy(workerNode->workerName, currentNode->workerName, WORKER_LENGTH); + workerNode->workerPort = currentNode->workerPort; + workerNode->groupId = currentNode->groupId; + strlcpy(workerNode->workerRack, currentNode->workerRack, WORKER_LENGTH); + + if (handleFound) + { + ereport(WARNING, (errmsg("multiple lines for worker node: \"%s:%u\"", + workerNode->workerName, + workerNode->workerPort))); + } + + /* we do not need the currentNode anymore */ + pfree(currentNode); + } + + /* now, safe to destroy the old hash */ + hash_destroy(oldWorkerNodeHash); + + /* prevent multiple invalidation registrations */ + if (!invalidationRegistered) + { + /* Watch for invalidation events. */ + CacheRegisterRelcacheCallback(InvalidateNodeRelationCacheCallback, + (Datum) 0); + + invalidationRegistered = true; + } +} + + +/* + * WorkerNodeHashCode computes the hash code for a worker node from the node's + * host name and port number. Nodes that only differ by their rack locations + * hash to the same value. + */ +static uint32 +WorkerNodeHashCode(const void *key, Size keySize) +{ + const WorkerNode *worker = (const WorkerNode *) key; + const char *workerName = worker->workerName; + const uint32 *workerPort = &(worker->workerPort); + + /* standard hash function outlined in Effective Java, Item 8 */ + uint32 result = 17; + result = 37 * result + string_hash(workerName, WORKER_LENGTH); + result = 37 * result + tag_hash(workerPort, sizeof(uint32)); + return result; +} + + /* * ResetDistTableCacheEntry frees any out-of-band memory used by a cache entry, * but does not free the entry itself. @@ -1051,11 +1234,28 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) distShardLogicalRelidIndexId = InvalidOid; distShardShardidIndexId = InvalidOid; distShardPlacementShardidIndexId = InvalidOid; + distNodeRelationId = InvalidOid; extraDataContainerFuncId = InvalidOid; } } +/* + * InvalidateNodeRelationCacheCallback destroys the WorkerNodeHash when + * any change happens on pg_dist_node table. It also set WorkerNodeHash to + * NULL, which allows consequent accesses to the hash read from the + * pg_dist_node from scratch. + */ +static void +InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId) +{ + if (relationId == InvalidOid || relationId == distNodeRelationId) + { + workerNodeHashValid = false; + } +} + + /* * LookupDistPartitionTuple searches pg_dist_partition for relationId's entry * and returns that or, if no matching entry was found, NULL. diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c new file mode 100644 index 000000000..78fd3df83 --- /dev/null +++ b/src/backend/distributed/utils/node_metadata.c @@ -0,0 +1,665 @@ +/* + * node_metadata.c + * Functions that operate on pg_dist_node + * + * Copyright (c) 2012-2016, Citus Data, Inc. + */ +#include "postgres.h" +#include "miscadmin.h" +#include "funcapi.h" + + +#include "access/genam.h" +#include "access/heapam.h" +#include "access/htup.h" +#include "access/htup_details.h" +#include "access/skey.h" +#if (PG_VERSION_NUM >= 90500 && PG_VERSION_NUM < 90600) +#include "access/stratnum.h" +#else +#include "access/skey.h" +#endif +#include "access/tupmacs.h" +#include "access/xact.h" +#include "catalog/indexing.h" +#include "commands/sequence.h" +#include "distributed/master_protocol.h" +#include "distributed/master_metadata_utility.h" +#include "distributed/metadata_cache.h" +#include "distributed/pg_dist_node.h" +#include "distributed/worker_manager.h" +#include "lib/stringinfo.h" +#include "storage/lock.h" +#include "storage/fd.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/rel.h" +#include "utils/relcache.h" + + +/* default group size */ +int GroupSize = 1; + + +/* local function forward declarations */ +static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, + char *nodeRack); +static Datum GenerateNodeTuple(WorkerNode *workerNode); +static int32 GetNextGroupId(void); +static uint32 GetMaxGroupId(void); +static int GetNextNodeId(void); +static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId, + char *nodeRack); +static void DeleteNodeRow(char *nodename, int32 nodeport); +static List * ParseWorkerNodeFileAndRename(void); +static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); + +/* declarations for dynamic loading */ +PG_FUNCTION_INFO_V1(master_add_node); +PG_FUNCTION_INFO_V1(master_remove_node); +PG_FUNCTION_INFO_V1(master_initialize_node_metadata); + + +/* + * master_add_node function adds a new node to the cluster and returns its data. + */ +Datum +master_add_node(PG_FUNCTION_ARGS) +{ + text *nodeName = PG_GETARG_TEXT_P(0); + int32 nodePort = PG_GETARG_INT32(1); + char *nodeNameString = text_to_cstring(nodeName); + int32 groupId = 0; + char *nodeRack = WORKER_DEFAULT_RACK; + + Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack); + + PG_RETURN_CSTRING(returnData); +} + + +/* + * master_remove_node function removes the provided node from the pg_dist_node table. + * The call to the master_remove_node should be done by the super user and the specified + * node should not have any active placements. + */ +Datum +master_remove_node(PG_FUNCTION_ARGS) +{ + text *nodeName = PG_GETARG_TEXT_P(0); + int32 nodePort = PG_GETARG_INT32(1); + char *nodeNameString = text_to_cstring(nodeName); + bool hasShardPlacements = false; + + EnsureSuperUser(); + + hasShardPlacements = NodeHasActiveShardPlacements(nodeNameString, nodePort); + if (hasShardPlacements) + { + ereport(ERROR, (errmsg("you cannot remove a node which has active " + "shard placements"))); + } + + DeleteNodeRow(nodeNameString, nodePort); + + PG_RETURN_VOID(); +} + + +/* + * master_initialize_node_metadata is run once, when upgrading citus. It injests the + * existing pg_worker_list.conf into pg_dist_node, then adds a header to the file stating + * that it's no longer used. + */ +Datum +master_initialize_node_metadata(PG_FUNCTION_ARGS) +{ + ListCell *workerNodeCell = NULL; + List *workerNodes = ParseWorkerNodeFileAndRename(); + + foreach(workerNodeCell, workerNodes) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + + AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0, + workerNode->workerRack); + } + + PG_RETURN_BOOL(true); +} + + +/* + * FindWorkerNode searches over the worker nodes and returns the workerNode + * if it already exists. Else, the function returns NULL. + */ +WorkerNode * +FindWorkerNode(char *nodeName, int32 nodePort) +{ + WorkerNode *workerNode = NULL; + HTAB *workerNodeHash = GetWorkerNodeHash(); + bool handleFound = false; + void *hashKey = NULL; + + WorkerNode *searchedNode = (WorkerNode *) palloc0(sizeof(WorkerNode)); + strlcpy(searchedNode->workerName, nodeName, WORKER_LENGTH); + searchedNode->workerPort = nodePort; + + hashKey = (void *) searchedNode; + workerNode = (WorkerNode *) hash_search(workerNodeHash, hashKey, + HASH_FIND, &handleFound); + + return workerNode; +} + + +/* + * ReadWorkerNodes iterates over pg_dist_node table, converts each row + * into it's memory representation (i.e., WorkerNode) and adds them into + * a list. Lastly, the list is returned to the caller. + */ +List * +ReadWorkerNodes() +{ + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 0; + HeapTuple heapTuple = NULL; + List *workerNodeList = NIL; + TupleDesc tupleDescriptor = NULL; + + Relation pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); + + scanDescriptor = systable_beginscan(pgDistNode, + InvalidOid, false, + NULL, scanKeyCount, scanKey); + + tupleDescriptor = RelationGetDescr(pgDistNode); + + heapTuple = systable_getnext(scanDescriptor); + while (HeapTupleIsValid(heapTuple)) + { + WorkerNode *workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple); + workerNodeList = lappend(workerNodeList, workerNode); + + heapTuple = systable_getnext(scanDescriptor); + } + + systable_endscan(scanDescriptor); + heap_close(pgDistNode, AccessExclusiveLock); + + return workerNodeList; +} + + +/* + * AddNodeMetadata checks the given node information and adds the specified node to the + * pg_dist_node table. If the node already exists, the function returns with the + * information about the node. If not, the following prodecure is followed while adding a + * node. + * If the groupId is not explicitly given by the user, the function picks the + * group that the new node should be in with respect to GroupSize. Then, the + * new node is inserted into the local pg_dist_node. + */ +static Datum +AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack) +{ + Relation pgDistNode = NULL; + int nextNodeIdInt = 0; + Datum returnData = 0; + WorkerNode *workerNode = NULL; + + EnsureSuperUser(); + + /* acquire a lock so that no one can do this concurrently */ + pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); + + /* check if the node already exists in the cluster */ + workerNode = FindWorkerNode(nodeName, nodePort); + if (workerNode != NULL) + { + /* fill return data and return */ + returnData = GenerateNodeTuple(workerNode); + + /* close the heap */ + heap_close(pgDistNode, AccessExclusiveLock); + + PG_RETURN_DATUM(returnData); + } + + /* user lets Citus to decide on the group that the newly added node should be in */ + if (groupId == 0) + { + groupId = GetNextGroupId(); + } + else + { + uint maxGroupId = GetMaxGroupId(); + + if (groupId > maxGroupId) + { + ereport(ERROR, (errmsg("you cannot add a node to a non-existing group"))); + } + } + + /* generate the new node id from the sequence */ + nextNodeIdInt = GetNextNodeId(); + + InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack); + + heap_close(pgDistNode, AccessExclusiveLock); + + /* fetch the worker node, and generate the output */ + workerNode = FindWorkerNode(nodeName, nodePort); + returnData = GenerateNodeTuple(workerNode); + + return returnData; +} + + +/* + * GenerateNodeTuple gets a worker node and return a heap tuple of + * given worker node. + */ +static Datum +GenerateNodeTuple(WorkerNode *workerNode) +{ + Relation pgDistNode = NULL; + TupleDesc tupleDescriptor = NULL; + HeapTuple heapTuple = NULL; + Datum nodeDatum = 0; + Datum values[Natts_pg_dist_node]; + bool isNulls[Natts_pg_dist_node]; + + /* form new shard tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(workerNode->nodeId); + values[Anum_pg_dist_node_groupid - 1] = UInt32GetDatum(workerNode->groupId); + values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(workerNode->workerName); + values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(workerNode->workerPort); + values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(workerNode->workerRack); + + /* open shard relation and insert new tuple */ + pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); + + /* generate the tuple */ + tupleDescriptor = RelationGetDescr(pgDistNode); + heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + nodeDatum = HeapTupleGetDatum(heapTuple); + + /* close the relation */ + heap_close(pgDistNode, AccessShareLock); + + return nodeDatum; +} + + +/* + * GetNextGroupId allocates and returns a unique groupId for the group + * to be created. This allocation occurs both in shared memory and in write + * ahead logs; writing to logs avoids the risk of having groupId collisions. + * + * Please note that the caller is still responsible for finalizing node data + * and the groupId with the master node. Further note that this function relies + * on an internal sequence created in initdb to generate unique identifiers. + */ +int32 +GetNextGroupId() +{ + text *sequenceName = cstring_to_text(GROUPID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + Datum groupIdDatum = 0; + int32 groupId = 0; + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* generate new and unique shardId from sequence */ + groupIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + groupId = DatumGetUInt32(groupIdDatum); + + return groupId; +} + + +/* + * GetMaxGroupId iterates over the worker node hash, and returns the maximum + * group id from the table. + */ +static uint32 +GetMaxGroupId() +{ + uint32 maxGroupId = 0; + WorkerNode *workerNode = NULL; + HTAB *workerNodeHash = GetWorkerNodeHash(); + HASH_SEQ_STATUS status; + + hash_seq_init(&status, workerNodeHash); + + while ((workerNode = hash_seq_search(&status)) != NULL) + { + uint32 workerNodeGroupId = workerNode->groupId; + + if (workerNodeGroupId > maxGroupId) + { + maxGroupId = workerNodeGroupId; + } + } + + return maxGroupId; +} + + +/* + * GetNextNodeId allocates and returns a unique nodeId for the node + * to be added. This allocation occurs both in shared memory and in write + * ahead logs; writing to logs avoids the risk of having nodeId collisions. + * + * Please note that the caller is still responsible for finalizing node data + * and the nodeId with the master node. Further note that this function relies + * on an internal sequence created in initdb to generate unique identifiers. + */ +int +GetNextNodeId() +{ + text *sequenceName = cstring_to_text(NODEID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + Datum nextNodedIdDatum = 0; + int nextNodeId = 0; + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* generate new and unique shardId from sequence */ + nextNodedIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + PG_RETURN_DATUM(nextNodedIdDatum); + + nextNodeId = DatumGetUInt32(nextNodeId); + + return nextNodeId; +} + + +/* + * InsertNodedRow opens the node system catalog, and inserts a new row with the + * given values into that system catalog. + */ +static void +InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack) +{ + Relation pgDistNode = NULL; + TupleDesc tupleDescriptor = NULL; + HeapTuple heapTuple = NULL; + Datum values[Natts_pg_dist_node]; + bool isNulls[Natts_pg_dist_node]; + + /* form new shard tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid); + values[Anum_pg_dist_node_groupid - 1] = UInt32GetDatum(groupId); + values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName); + values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort); + values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack); + + /* open shard relation and insert new tuple */ + pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); + + tupleDescriptor = RelationGetDescr(pgDistNode); + heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + simple_heap_insert(pgDistNode, heapTuple); + CatalogUpdateIndexes(pgDistNode, heapTuple); + + /* close relation and invalidate previous cache entry */ + heap_close(pgDistNode, AccessExclusiveLock); + + CitusInvalidateRelcacheByRelid(DistNodeRelationId()); + + /* increment the counter so that next command can see the row */ + CommandCounterIncrement(); +} + + +/* + * DeleteNodeRow removes the requested row from pg_dist_node table if it exists. + */ +static void +DeleteNodeRow(char *nodeName, int32 nodePort) +{ + const int scanKeyCount = 2; + bool indexOK = false; + + HeapTuple heapTuple = NULL; + SysScanDesc heapScan = NULL; + ScanKeyData scanKey[scanKeyCount]; + + Relation pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename, + BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName)); + ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport, + BTEqualStrategyNumber, F_INT8EQ, Int32GetDatum(nodePort)); + + heapScan = systable_beginscan(pgDistNode, InvalidOid, indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(heapScan); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", + nodeName, nodePort))); + } + + simple_heap_delete(pgDistNode, &(heapTuple->t_self)); + + systable_endscan(heapScan); + heap_close(pgDistNode, AccessExclusiveLock); + + /* ensure future commands don't use the node we just removed */ + CitusInvalidateRelcacheByRelid(DistNodeRelationId()); + + /* increment the counter so that next command won't see the row */ + CommandCounterIncrement(); +} + + +/* + * ParseWorkerNodeFileAndRename opens and parses the node name and node port from the + * specified configuration file and after that, renames it marking it is not used anymore. + * Note that this function is deprecated. Do not use this function for any new + * features. + */ +static List * +ParseWorkerNodeFileAndRename() +{ + FILE *workerFileStream = NULL; + List *workerNodeList = NIL; + char workerNodeLine[MAXPGPATH]; + char *workerFilePath = make_absolute_path(WorkerListFileName); + StringInfo renamedWorkerFilePath = makeStringInfo(); + char *workerPatternTemplate = "%%%u[^# \t]%%*[ \t]%%%u[^# \t]%%*[ \t]%%%u[^# \t]"; + char workerLinePattern[1024]; + const int workerNameIndex = 0; + const int workerPortIndex = 1; + + memset(workerLinePattern, '\0', sizeof(workerLinePattern)); + + workerFileStream = AllocateFile(workerFilePath, PG_BINARY_R); + if (workerFileStream == NULL) + { + if (errno == ENOENT) + { + ereport(DEBUG1, (errmsg("worker list file located at \"%s\" is not present", + workerFilePath))); + } + else + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not open worker list file \"%s\": %m", + workerFilePath))); + } + return NIL; + } + + /* build pattern to contain node name length limit */ + snprintf(workerLinePattern, sizeof(workerLinePattern), workerPatternTemplate, + WORKER_LENGTH, MAX_PORT_LENGTH, WORKER_LENGTH); + + while (fgets(workerNodeLine, sizeof(workerNodeLine), workerFileStream) != NULL) + { + const int workerLineLength = strnlen(workerNodeLine, MAXPGPATH); + WorkerNode *workerNode = NULL; + char *linePointer = NULL; + int32 nodePort = 5432; /* default port number */ + int fieldCount = 0; + bool lineIsInvalid = false; + char nodeName[WORKER_LENGTH + 1]; + char nodeRack[WORKER_LENGTH + 1]; + char nodePortString[MAX_PORT_LENGTH + 1]; + + memset(nodeName, '\0', sizeof(nodeName)); + strlcpy(nodeRack, WORKER_DEFAULT_RACK, sizeof(nodeRack)); + memset(nodePortString, '\0', sizeof(nodePortString)); + + if (workerLineLength == MAXPGPATH - 1) + { + ereport(ERROR, (errcode(ERRCODE_CONFIG_FILE_ERROR), + errmsg("worker node list file line exceeds the maximum " + "length of %d", MAXPGPATH))); + } + + /* trim trailing newlines preserved by fgets, if any */ + linePointer = workerNodeLine + workerLineLength - 1; + while (linePointer >= workerNodeLine && + (*linePointer == '\n' || *linePointer == '\r')) + { + *linePointer-- = '\0'; + } + + /* skip leading whitespace */ + for (linePointer = workerNodeLine; *linePointer; linePointer++) + { + if (!isspace((unsigned char) *linePointer)) + { + break; + } + } + + /* if the entire line is whitespace or a comment, skip it */ + if (*linePointer == '\0' || *linePointer == '#') + { + continue; + } + + /* parse line; node name is required, but port and rack are optional */ + fieldCount = sscanf(linePointer, workerLinePattern, + nodeName, nodePortString, nodeRack); + + /* adjust field count for zero based indexes */ + fieldCount--; + + /* raise error if no fields were assigned */ + if (fieldCount < workerNameIndex) + { + lineIsInvalid = true; + } + + /* no special treatment for nodeName: already parsed by sscanf */ + + /* if a second token was specified, convert to integer port */ + if (fieldCount >= workerPortIndex) + { + char *nodePortEnd = NULL; + + errno = 0; + nodePort = strtol(nodePortString, &nodePortEnd, 10); + + if (errno != 0 || (*nodePortEnd) != '\0' || nodePort <= 0) + { + lineIsInvalid = true; + } + } + + if (lineIsInvalid) + { + ereport(ERROR, (errcode(ERRCODE_CONFIG_FILE_ERROR), + errmsg("could not parse worker node line: %s", + workerNodeLine), + errhint("Lines in the worker node file must contain a valid " + "node name and, optionally, a positive port number. " + "Comments begin with a '#' character and extend to " + "the end of their line."))); + } + + /* allocate worker node structure and set fields */ + workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode)); + + strlcpy(workerNode->workerName, nodeName, WORKER_LENGTH); + strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH); + workerNode->workerPort = nodePort; + + workerNodeList = lappend(workerNodeList, workerNode); + } + + FreeFile(workerFileStream); + free(workerFilePath); + + /* rename the file, marking that it is not used anymore */ + appendStringInfo(renamedWorkerFilePath, "%s", workerFilePath); + appendStringInfo(renamedWorkerFilePath, ".obsolete"); + rename(workerFilePath, renamedWorkerFilePath->data); + + return workerNodeList; +} + + +/* + * TupleToWorkerNode takes in a heap tuple from pg_dist_node, and + * converts this tuple to an equivalent struct in memory. The function assumes + * the caller already has locks on the tuple, and doesn't perform any locking. + */ +static WorkerNode * +TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) +{ + WorkerNode *workerNode = NULL; + bool isNull = false; + + Datum nodeId = heap_getattr(heapTuple, Anum_pg_dist_node_nodeid, + tupleDescriptor, &isNull); + Datum groupId = heap_getattr(heapTuple, Anum_pg_dist_node_groupid, + tupleDescriptor, &isNull); + Datum nodeName = heap_getattr(heapTuple, Anum_pg_dist_node_nodename, + tupleDescriptor, &isNull); + Datum nodePort = heap_getattr(heapTuple, Anum_pg_dist_node_nodeport, + tupleDescriptor, &isNull); + Datum nodeRack = heap_getattr(heapTuple, Anum_pg_dist_node_noderack, + tupleDescriptor, &isNull); + + Assert(!HeapTupleHasNulls(heapTuple)); + + workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode)); + workerNode->nodeId = DatumGetUInt32(nodeId); + workerNode->workerPort = DatumGetUInt32(nodePort); + workerNode->groupId = DatumGetUInt32(groupId); + strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH); + strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH); + + return workerNode; +} diff --git a/src/backend/distributed/worker/task_tracker.c b/src/backend/distributed/worker/task_tracker.c index 3f8b994f2..fe2cf3af9 100644 --- a/src/backend/distributed/worker/task_tracker.c +++ b/src/backend/distributed/worker/task_tracker.c @@ -235,14 +235,6 @@ TaskTrackerMain(Datum main_arg) /* reload postgres configuration files */ ProcessConfigFile(PGC_SIGHUP); - - /* - * Reload worker membership file. For now we do that in the task - * tracker because that's currently the only background worker in - * Citus. And only background workers allow us to safely - * register a SIGHUP handler. - */ - LoadWorkerNodeList(WorkerListFileName); } if (got_SIGTERM) { diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index dd38ae22b..01a6d7c4a 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -62,6 +62,7 @@ extern int ShardIntervalCount(Oid relationId); extern List * LoadShardList(Oid relationId); extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval); extern uint64 ShardLength(uint64 shardId); +extern bool NodeHasActiveShardPlacements(char *nodeName, int32 nodePort); extern List * FinalizedShardPlacementList(uint64 shardId); extern List * ShardPlacementList(uint64 shardId); extern ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc, @@ -81,5 +82,6 @@ extern Node * BuildDistributionKeyFromColumnName(Relation distributedRelation, extern char * TableOwner(Oid relationId); extern void EnsureTablePermissions(Oid relationId, AclMode mode); extern void EnsureTableOwner(Oid relationId); +extern void EnsureSuperUser(void); #endif /* MASTER_METADATA_UTILITY_H */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index ad166105f..20731eca8 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -14,6 +14,8 @@ #include "fmgr.h" #include "distributed/master_metadata_utility.h" #include "distributed/pg_dist_partition.h" +#include "distributed/worker_manager.h" +#include "utils/hsearch.h" /* @@ -54,13 +56,18 @@ extern bool IsDistributedTable(Oid relationId); extern ShardInterval * LoadShardInterval(uint64 shardId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern void CitusInvalidateRelcacheByRelid(Oid relationId); +extern void CitusInvalidateNodeCache(void); extern bool CitusHasBeenLoaded(void); +/* access WorkerNodeHash */ +extern HTAB * GetWorkerNodeHash(void); + /* relation oids */ extern Oid DistPartitionRelationId(void); extern Oid DistShardRelationId(void); extern Oid DistShardPlacementRelationId(void); +extern Oid DistNodeRelationId(void); /* index oids */ extern Oid DistPartitionLogicalRelidIndexId(void); @@ -68,6 +75,7 @@ extern Oid DistPartitionColocationidIndexId(void); extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardShardidIndexId(void); extern Oid DistShardPlacementShardidIndexId(void); +extern Oid DistShardPlacementNodeidIndexId(void); /* function oids */ extern Oid CitusExtraDataContainerFuncId(void); diff --git a/src/include/distributed/pg_dist_node.h b/src/include/distributed/pg_dist_node.h new file mode 100644 index 000000000..0276dcef7 --- /dev/null +++ b/src/include/distributed/pg_dist_node.h @@ -0,0 +1,49 @@ +/*------------------------------------------------------------------------- + * + * pg_dist_node.h + * definition of the relation that holds the nodes on the cluster (pg_dist_node). + * + * Copyright (c) 2012-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PG_DIST_NODE_H +#define PG_DIST_NODE_H + +/* ---------------- + * pg_dist_node definition. + * ---------------- + */ +typedef struct FormData_pg_dist_node +{ + int nodeid; + int groupid; +#ifdef CATALOG_VARLEN + text nodename; + int nodeport; +#endif +} FormData_pg_dist_node; + +/* ---------------- + * Form_pg_dist_partitions corresponds to a pointer to a tuple with + * the format of pg_dist_partitions relation. + * ---------------- + */ +typedef FormData_pg_dist_node *Form_pg_dist_node; + +/* ---------------- + * compiler constants for pg_dist_node + * ---------------- + */ +#define Natts_pg_dist_node 5 +#define Anum_pg_dist_node_nodeid 1 +#define Anum_pg_dist_node_groupid 2 +#define Anum_pg_dist_node_nodename 3 +#define Anum_pg_dist_node_nodeport 4 +#define Anum_pg_dist_node_noderack 5 + +#define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq" +#define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq" + +#endif /* PG_DIST_NODE_H */ diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index ec506848c..b10ae5f4f 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -32,22 +32,16 @@ /* - * WorkerNode keeps shared memory state for active and temporarily failed worker - * nodes. Permanently failed or departed nodes on the other hand are eventually - * purged from the shared hash. In the current implementation, the distinction - * between active, temporarily failed, and permanently departed nodes is made - * based on the node's presence in the membership file; only nodes in this file - * appear in the shared hash. In the future, worker nodes will report their - * health status to the master via heartbeats, and these heartbeats along with - * membership information will be used to determine a worker node's liveliness. + * In memory representation of pg_dist_node table elements. The elements are hold in + * WorkerNodeHash table. */ typedef struct WorkerNode { - uint32 workerPort; /* node's port; part of hash table key */ - char workerName[WORKER_LENGTH]; /* node's name; part of hash table key */ - char workerRack[WORKER_LENGTH]; /* node's network location */ - - bool inWorkerFile; /* is node in current membership file? */ + uint32 nodeId; /* node's unique id, key of the hash table */ + uint32 workerPort; /* node's port */ + char workerName[WORKER_LENGTH]; /* node's name */ + uint32 groupId; /* node's groupId; same for the nodes that are in the same group */ + char workerRack[WORKER_LENGTH]; /* node's network location */ } WorkerNode; @@ -65,14 +59,12 @@ extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); extern WorkerNode * WorkerGetNodeWithName(const char *hostname); extern uint32 WorkerGetLiveNodeCount(void); extern List * WorkerNodeList(void); -extern bool WorkerNodeActive(const char *nodeName, uint32 nodePort); extern List * ResponsiveWorkerNodeList(void); - -/* Function declarations for loading into shared hash tables */ -extern void WorkerNodeRegister(void); -extern void LoadWorkerNodeList(const char *workerFilename); +extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort); +extern List * ReadWorkerNodes(void); /* Function declarations for worker node utilities */ extern int CompareWorkerNodes(const void *leftElement, const void *rightElement); +extern int WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize); #endif /* WORKER_MANAGER_H */ diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out new file mode 100644 index 000000000..45c143cfc --- /dev/null +++ b/src/test/regress/expected/multi_cluster_management.out @@ -0,0 +1,134 @@ +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000; +-- Tests functions related to cluster membership +-- add the nodes to the cluster +SELECT master_add_node('localhost', :worker_1_port); + master_add_node +------------------------------- + (1,1,localhost,57637,default) +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +------------------------------- + (2,2,localhost,57638,default) +(1 row) + +-- get the active nodes +SELECT master_get_active_worker_nodes(); + master_get_active_worker_nodes +-------------------------------- + (localhost,57638) + (localhost,57637) +(2 rows) + +-- try to add the node again when it is activated +SELECT master_add_node('localhost', :worker_1_port); + master_add_node +------------------------------- + (1,1,localhost,57637,default) +(1 row) + +-- get the active nodes +SELECT master_get_active_worker_nodes(); + master_get_active_worker_nodes +-------------------------------- + (localhost,57638) + (localhost,57637) +(2 rows) + +-- try to remove a node (with no placements) +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +-- verify that the node has been deleted +SELECT master_get_active_worker_nodes(); + master_get_active_worker_nodes +-------------------------------- + (localhost,57637) +(1 row) + +-- add some shard placements to the cluster +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +------------------------------- + (3,3,localhost,57638,default) +(1 row) + +CREATE TABLE cluster_management_test (col_1 text, col_2 int); +SELECT master_create_distributed_table('cluster_management_test', 'col_1', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('cluster_management_test', 16, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +-- see that there are some active placements in the candidate node +SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1220001 | 1 | 0 | localhost | 57638 + 1220003 | 1 | 0 | localhost | 57638 + 1220005 | 1 | 0 | localhost | 57638 + 1220007 | 1 | 0 | localhost | 57638 + 1220009 | 1 | 0 | localhost | 57638 + 1220011 | 1 | 0 | localhost | 57638 + 1220013 | 1 | 0 | localhost | 57638 + 1220015 | 1 | 0 | localhost | 57638 +(8 rows) + +-- try to remove a node with active placements and see that node removal is failed +SELECT master_remove_node('localhost', :worker_2_port); +ERROR: you cannot remove a node which has active shard placements +SELECT master_get_active_worker_nodes(); + master_get_active_worker_nodes +-------------------------------- + (localhost,57638) + (localhost,57637) +(2 rows) + +-- mark all placements in the candidate node as inactive +UPDATE pg_dist_shard_placement SET shardstate=3 WHERE nodeport=:worker_2_port; +SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1220001 | 3 | 0 | localhost | 57638 + 1220003 | 3 | 0 | localhost | 57638 + 1220005 | 3 | 0 | localhost | 57638 + 1220007 | 3 | 0 | localhost | 57638 + 1220009 | 3 | 0 | localhost | 57638 + 1220011 | 3 | 0 | localhost | 57638 + 1220013 | 3 | 0 | localhost | 57638 + 1220015 | 3 | 0 | localhost | 57638 +(8 rows) + +-- try to remove a node with only inactive placements and see that node is removed +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +SELECT master_get_active_worker_nodes(); + master_get_active_worker_nodes +-------------------------------- + (localhost,57637) +(1 row) + +-- clean-up +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +------------------------------- + (4,4,localhost,57638,default) +(1 row) + +UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port; +DROP TABLE cluster_management_test; diff --git a/src/test/regress/expected/multi_drop_extension.out b/src/test/regress/expected/multi_drop_extension.out index 82bd8a47d..9fd10d0d6 100644 --- a/src/test/regress/expected/multi_drop_extension.out +++ b/src/test/regress/expected/multi_drop_extension.out @@ -19,6 +19,19 @@ SET client_min_messages TO 'WARNING'; DROP EXTENSION citus CASCADE; RESET client_min_messages; CREATE EXTENSION citus; +-- re-add the nodes to the cluster +SELECT master_add_node('localhost', :worker_1_port); + master_add_node +------------------------------- + (1,1,localhost,57637,default) +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +------------------------------- + (2,2,localhost,57638,default) +(1 row) + -- verify that a table can be created after the extension has been dropped and recreated CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL); SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append'); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 1789c6373..2084a1d94 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -29,6 +29,7 @@ ALTER EXTENSION citus UPDATE TO '5.2-4'; ALTER EXTENSION citus UPDATE TO '6.0-1'; ALTER EXTENSION citus UPDATE TO '6.0-2'; ALTER EXTENSION citus UPDATE TO '6.0-3'; +ALTER EXTENSION citus UPDATE TO '6.0-4'; -- drop extension an re-create in newest version DROP EXTENSION citus; \c diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index 8e8b49692..52940388d 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -65,6 +65,19 @@ SELECT * FROM pg_dist_shard_placement; -- check that the extension now can be dropped (and recreated) DROP EXTENSION citus; CREATE EXTENSION citus; +-- re-add the nodes to the cluster +SELECT master_add_node('localhost', :worker_1_port); + master_add_node +------------------------------- + (1,1,localhost,57637,default) +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +------------------------------- + (2,2,localhost,57638,default) +(1 row) + -- create a table with a SERIAL column CREATE TABLE testserialtable(id serial, group_id integer); SELECT master_create_distributed_table('testserialtable', 'group_id', 'hash'); diff --git a/src/test/regress/multi_binary_schedule b/src/test/regress/multi_binary_schedule index d5330f064..d09cc9a1e 100644 --- a/src/test/regress/multi_binary_schedule +++ b/src/test/regress/multi_binary_schedule @@ -11,6 +11,7 @@ # Tests around schema changes, these are run first, so there's no preexisting objects. # --- test: multi_extension +test: multi_cluster_management test: multi_table_ddl # ---------- diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 1233063e7..799783713 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -16,6 +16,7 @@ # Tests around schema changes, these are run first, so there's no preexisting objects. # --- test: multi_extension +test: multi_cluster_management test: multi_table_ddl test: multi_name_lengths diff --git a/src/test/regress/multi_task_tracker_extra_schedule b/src/test/regress/multi_task_tracker_extra_schedule index a3252b728..30c0a3653 100644 --- a/src/test/regress/multi_task_tracker_extra_schedule +++ b/src/test/regress/multi_task_tracker_extra_schedule @@ -14,6 +14,7 @@ # Tests around schema changes, these are run first, so there's no preexisting objects. # --- test: multi_extension +test: multi_cluster_management test: multi_table_ddl # ---------- diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 0b116da5c..bf3764a8a 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -168,13 +168,6 @@ for my $port (@workerPorts) or die "Could not create worker data directory"; } -# Initialize master's worker list -for my $port (@workerPorts) -{ - system("echo $host $port >> tmp_check/master/data/pg_worker_list.conf") == 0 - or die "Could not initialize master's worker list"; -} - # Routine to shutdown servers at failure/exit my $serversAreShutdown = "FALSE"; sub ShutdownServers() diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql new file mode 100644 index 000000000..b85188896 --- /dev/null +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -0,0 +1,49 @@ +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000; + +-- Tests functions related to cluster membership + +-- add the nodes to the cluster +SELECT master_add_node('localhost', :worker_1_port); +SELECT master_add_node('localhost', :worker_2_port); + +-- get the active nodes +SELECT master_get_active_worker_nodes(); + +-- try to add the node again when it is activated +SELECT master_add_node('localhost', :worker_1_port); + +-- get the active nodes +SELECT master_get_active_worker_nodes(); + +-- try to remove a node (with no placements) +SELECT master_remove_node('localhost', :worker_2_port); + +-- verify that the node has been deleted +SELECT master_get_active_worker_nodes(); + +-- add some shard placements to the cluster +SELECT master_add_node('localhost', :worker_2_port); +CREATE TABLE cluster_management_test (col_1 text, col_2 int); +SELECT master_create_distributed_table('cluster_management_test', 'col_1', 'hash'); +SELECT master_create_worker_shards('cluster_management_test', 16, 1); + +-- see that there are some active placements in the candidate node +SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; + +-- try to remove a node with active placements and see that node removal is failed +SELECT master_remove_node('localhost', :worker_2_port); +SELECT master_get_active_worker_nodes(); + +-- mark all placements in the candidate node as inactive +UPDATE pg_dist_shard_placement SET shardstate=3 WHERE nodeport=:worker_2_port; +SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; + +-- try to remove a node with only inactive placements and see that node is removed +SELECT master_remove_node('localhost', :worker_2_port); +SELECT master_get_active_worker_nodes(); + +-- clean-up +SELECT master_add_node('localhost', :worker_2_port); +UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port; +DROP TABLE cluster_management_test; \ No newline at end of file diff --git a/src/test/regress/sql/multi_drop_extension.sql b/src/test/regress/sql/multi_drop_extension.sql index c5320260d..9a78ed609 100644 --- a/src/test/regress/sql/multi_drop_extension.sql +++ b/src/test/regress/sql/multi_drop_extension.sql @@ -21,6 +21,10 @@ RESET client_min_messages; CREATE EXTENSION citus; +-- re-add the nodes to the cluster +SELECT master_add_node('localhost', :worker_1_port); +SELECT master_add_node('localhost', :worker_2_port); + -- verify that a table can be created after the extension has been dropped and recreated CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL); SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append'); diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index db3d6ad5e..ddcbb2cf5 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -34,6 +34,7 @@ ALTER EXTENSION citus UPDATE TO '5.2-4'; ALTER EXTENSION citus UPDATE TO '6.0-1'; ALTER EXTENSION citus UPDATE TO '6.0-2'; ALTER EXTENSION citus UPDATE TO '6.0-3'; +ALTER EXTENSION citus UPDATE TO '6.0-4'; -- drop extension an re-create in newest version DROP EXTENSION citus; diff --git a/src/test/regress/sql/multi_table_ddl.sql b/src/test/regress/sql/multi_table_ddl.sql index 31ded4bc9..8cb2ddbf3 100644 --- a/src/test/regress/sql/multi_table_ddl.sql +++ b/src/test/regress/sql/multi_table_ddl.sql @@ -45,6 +45,10 @@ SELECT * FROM pg_dist_shard_placement; DROP EXTENSION citus; CREATE EXTENSION citus; +-- re-add the nodes to the cluster +SELECT master_add_node('localhost', :worker_1_port); +SELECT master_add_node('localhost', :worker_2_port); + -- create a table with a SERIAL column CREATE TABLE testserialtable(id serial, group_id integer); SELECT master_create_distributed_table('testserialtable', 'group_id', 'hash');