From d1c44d54361f77d279144ca57f584d84053acd90 Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Thu, 22 Sep 2016 16:57:06 +0300 Subject: [PATCH] Apply citus-mx pull #4 --- .../distributed/citus--5.2-1--5.3-1.sql | 62 ++ .../distributed/master/master_node_protocol.c | 2 +- .../distributed/master/master_repair_shards.c | 2 +- .../distributed/master/worker_node_manager.c | 482 ++------------- src/backend/distributed/shared_library_init.c | 60 -- .../distributed/utils/metadata_cache.c | 341 +++++++++++ src/backend/distributed/utils/node_metadata.c | 549 ++++++++++++++++++ src/backend/distributed/worker/task_tracker.c | 8 - src/include/distributed/metadata_cache.h | 8 + src/include/distributed/pg_dist_node.h | 48 ++ src/include/distributed/worker_manager.h | 31 +- .../regress/expected/multi_cluster_node.out | 68 +++ .../regress/expected/multi_drop_extension.out | 13 + src/test/regress/expected/multi_table_ddl.out | 13 + src/test/regress/multi_binary_schedule | 1 + src/test/regress/multi_schedule | 1 + .../regress/multi_task_tracker_extra_schedule | 1 + src/test/regress/pg_regress_multi.pl | 7 - src/test/regress/sql/multi_cluster_node.sql | 29 + src/test/regress/sql/multi_drop_extension.sql | 4 + src/test/regress/sql/multi_table_ddl.sql | 4 + 21 files changed, 1192 insertions(+), 542 deletions(-) create mode 100644 src/backend/distributed/citus--5.2-1--5.3-1.sql create mode 100644 src/backend/distributed/utils/node_metadata.c create mode 100644 src/include/distributed/pg_dist_node.h create mode 100644 src/test/regress/expected/multi_cluster_node.out create mode 100644 src/test/regress/sql/multi_cluster_node.sql diff --git a/src/backend/distributed/citus--5.2-1--5.3-1.sql b/src/backend/distributed/citus--5.2-1--5.3-1.sql new file mode 100644 index 000000000..6f03427f9 --- /dev/null +++ b/src/backend/distributed/citus--5.2-1--5.3-1.sql @@ -0,0 +1,62 @@ + +SET search_path = 'pg_catalog'; + +/* add pg_dist_node */ +CREATE TABLE citus.pg_dist_node( + nodeid int NOT NULL PRIMARY KEY, + groupid int NOT NULL, + nodename text NOT NULL, + nodeport int NOT NULL, + UNIQUE (nodename, nodeport) +); + +CREATE SEQUENCE citus.pg_dist_gropuid_seq + MINVALUE 1 + MAXVALUE 4294967296; + +CREATE SEQUENCE citus.pg_dist_node_nodeid_seq + MINVALUE 1 + MAXVALUE 4294967296; + +ALTER TABLE citus.pg_dist_node SET SCHEMA pg_catalog; +ALTER SEQUENCE citus.pg_dist_gropuid_seq SET SCHEMA pg_catalog; +ALTER SEQUENCE citus.pg_dist_node_nodeid_seq SET SCHEMA pg_catalog; + +CREATE FUNCTION master_dist_node_cache_invalidate() + RETURNS trigger + LANGUAGE C + AS 'MODULE_PATHNAME', $$master_dist_node_cache_invalidate$$; +COMMENT ON FUNCTION master_dist_node_cache_invalidate() + IS 'invalidate internal cache of nodes when pg_dist_nodes changes'; +CREATE TRIGGER dist_node_cache_invalidate + AFTER INSERT OR UPDATE OR DELETE + ON pg_catalog.pg_dist_node + FOR EACH ROW EXECUTE PROCEDURE master_dist_node_cache_invalidate(); + +CREATE FUNCTION cluster_add_node(nodename text, + nodeport integer, + groupid integer DEFAULT 0) + RETURNS record + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$cluster_add_node$$; +COMMENT ON FUNCTION cluster_add_node(nodename text, + nodeport integer, + groupid integer) + IS 'add node to the cluster'; + +CREATE FUNCTION cluster_remove_node(nodename text, nodeport integer) + RETURNS record + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$cluster_remove_node$$; +COMMENT ON FUNCTION cluster_remove_node(nodename text, nodeport integer) + IS 'remove node from the cluster'; + +/* this only needs to run once, now. */ +CREATE FUNCTION cluster_initialize_node_metadata() + RETURNS BOOL + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$cluster_initialize_node_metadata$$; + +SELECT cluster_initialize_node_metadata(); + +RESET search_path; diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index f2c52c6b5..72543659a 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -459,7 +459,7 @@ master_get_round_robin_candidate_nodes(PG_FUNCTION_ARGS) /* * master_get_active_worker_nodes returns a set of active worker host names and * port numbers in deterministic order. Currently we assume that all worker - * nodes in pg_worker_list.conf are active. + * nodes in pg_dist_node are active. */ Datum master_get_active_worker_nodes(PG_FUNCTION_ARGS) diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 20429ba49..fd5876881 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -122,7 +122,7 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) } targetNode = palloc0(sizeof(WorkerNode)); - targetNode->inWorkerFile = true; + targetNode->workerActive = true; strlcpy(targetNode->workerName, targetPlacement->nodeName, WORKER_LENGTH); targetNode->workerPort = targetPlacement->nodePort; diff --git a/src/backend/distributed/master/worker_node_manager.c b/src/backend/distributed/master/worker_node_manager.c index 5db1ef492..890117759 100644 --- a/src/backend/distributed/master/worker_node_manager.c +++ b/src/backend/distributed/master/worker_node_manager.c @@ -16,6 +16,7 @@ #include "commands/dbcommands.h" #include "distributed/worker_manager.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" #include "libpq/hba.h" #include "libpq/ip.h" @@ -30,25 +31,15 @@ /* Config variables managed via guc.c */ -char *WorkerListFileName; /* location of pg_worker_list.conf */ int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size */ -static HTAB *WorkerNodesHash = NULL; /* worker node hash in shared memory */ -static shmem_startup_hook_type prev_shmem_startup_hook = NULL; - /* Local functions forward declarations */ static char * ClientHostAddress(StringInfo remoteHostStringInfo); -static bool OddNumber(uint32 number); static WorkerNode * FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList); static bool ListMember(List *currentList, WorkerNode *workerNode); -static Size WorkerNodeShmemSize(void); -static void WorkerNodeShmemAndWorkerListInit(void); -static uint32 WorkerNodeHashCode(const void *key, Size keySize); static int WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize); -static List * ParseWorkerNodeFile(const char *workerNodeFilename); -static void ResetWorkerNodesHash(HTAB *WorkerNodesHash); static bool WorkerNodeResponsive(const char *workerName, uint32 workerPort); @@ -59,14 +50,8 @@ static bool WorkerNodeResponsive(const char *workerName, uint32 workerPort); /* * WorkerGetRandomCandidateNode takes in a list of worker nodes, and then allocates - * a new worker node. The allocation is performed according to the following - * policy: if the list is empty, a random node is allocated; if the list has one - * node (or an odd number of nodes), the new node is allocated on a different - * rack than the first node; and if the list has two nodes (or an even number of - * nodes), the new node is allocated on the same rack as the first node, but is - * different from all the nodes in the list. This node allocation policy ensures - * that shard locality is maintained within a rack, but no single rack failure - * can result in data loss. + * a new worker node. The allocation is performed a by randomly picking a worker node + * which is not in currentNodeList. * * Note that the function returns null if the worker membership list does not * contain enough nodes to allocate a new worker node. @@ -75,9 +60,7 @@ WorkerNode * WorkerGetRandomCandidateNode(List *currentNodeList) { WorkerNode *workerNode = NULL; - bool wantSameRack = false; - uint32 tryCount = WORKER_RACK_TRIES; - uint32 tryIndex = 0; + HTAB *workerNodeHash = GetWorkerNodeHash(); /* * We check if the shard has already been placed on all nodes known to us. @@ -91,49 +74,7 @@ WorkerGetRandomCandidateNode(List *currentNodeList) return NULL; } - /* if current node list is empty, randomly pick one node and return */ - if (currentNodeCount == 0) - { - workerNode = FindRandomNodeNotInList(WorkerNodesHash, NIL); - return workerNode; - } - - /* - * If the current list has an odd number of nodes (1, 3, 5, etc), we want to - * place the shard on a different rack than the first node's rack. - * Otherwise, we want to place the shard on the same rack as the first node. - */ - if (OddNumber(currentNodeCount)) - { - wantSameRack = false; - } - else - { - wantSameRack = true; - } - - /* - * We try to find a worker node that fits our rack-aware placement strategy. - * If after a predefined number of tries, we still cannot find such a node, - * we simply give up and return the last worker node we found. - */ - for (tryIndex = 0; tryIndex < tryCount; tryIndex++) - { - WorkerNode *firstNode = (WorkerNode *) linitial(currentNodeList); - char *firstRack = firstNode->workerRack; - char *workerRack = NULL; - bool sameRack = false; - - workerNode = FindRandomNodeNotInList(WorkerNodesHash, currentNodeList); - workerRack = workerNode->workerRack; - - sameRack = (strncmp(workerRack, firstRack, WORKER_LENGTH) == 0); - if ((sameRack && wantSameRack) || (!sameRack && !wantSameRack)) - { - break; - } - } - + workerNode = FindRandomNodeNotInList(workerNodeHash, currentNodeList); return workerNode; } @@ -289,24 +230,23 @@ WorkerNode * WorkerGetNodeWithName(const char *hostname) { WorkerNode *workerNode = NULL; - HASH_SEQ_STATUS status; - hash_seq_init(&status, WorkerNodesHash); + HTAB *workerNodeHash = GetWorkerNodeHash(); - workerNode = (WorkerNode *) hash_seq_search(&status); - while (workerNode != NULL) + hash_seq_init(&status, workerNodeHash); + + while ((workerNode = hash_seq_search(&status)) != NULL) { - if (workerNode->inWorkerFile) + if (workerNode->workerActive) { int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH); if (nameCompare == 0) { + /* we need to terminate the scan since we break */ hash_seq_term(&status); break; } } - - workerNode = (WorkerNode *) hash_seq_search(&status); } return workerNode; @@ -319,44 +259,43 @@ WorkerGetLiveNodeCount(void) { WorkerNode *workerNode = NULL; uint32 liveWorkerCount = 0; - HASH_SEQ_STATUS status; - hash_seq_init(&status, WorkerNodesHash); + HTAB *workerNodeHash = GetWorkerNodeHash(); - workerNode = (WorkerNode *) hash_seq_search(&status); - while (workerNode != NULL) + hash_seq_init(&status, workerNodeHash); + + while ((workerNode = hash_seq_search(&status)) != NULL) { - if (workerNode->inWorkerFile) + if (workerNode->workerActive) { liveWorkerCount++; } - - workerNode = (WorkerNode *) hash_seq_search(&status); } return liveWorkerCount; } -/* Inserts the live worker nodes to a list, and returns the list. */ +/* + * WorkerNodeList iterates over the hash table that includes the worker nodes, and adds + * them to a list which is returned. + */ List * WorkerNodeList(void) { List *workerNodeList = NIL; WorkerNode *workerNode = NULL; - + HTAB *workerNodeHash = GetWorkerNodeHash(); HASH_SEQ_STATUS status; - hash_seq_init(&status, WorkerNodesHash); - workerNode = (WorkerNode *) hash_seq_search(&status); - while (workerNode != NULL) + hash_seq_init(&status, workerNodeHash); + + while ((workerNode = hash_seq_search(&status)) != NULL) { - if (workerNode->inWorkerFile) + if (workerNode->workerActive) { workerNodeList = lappend(workerNodeList, workerNode); } - - workerNode = (WorkerNode *) hash_seq_search(&status); } return workerNodeList; @@ -372,20 +311,26 @@ bool WorkerNodeActive(const char *nodeName, uint32 nodePort) { bool workerNodeActive = false; - bool handleFound = false; WorkerNode *workerNode = NULL; - void *hashKey = NULL; + HASH_SEQ_STATUS status; + HTAB *workerNodeHash = GetWorkerNodeHash(); - WorkerNode *searchedNode = (WorkerNode *) palloc0(sizeof(WorkerNode)); - strlcpy(searchedNode->workerName, nodeName, WORKER_LENGTH); - searchedNode->workerPort = nodePort; + hash_seq_init(&status, workerNodeHash); + + while ((workerNode = hash_seq_search(&status)) != NULL) + { + if (strncmp(workerNode->workerName, nodeName, WORKER_LENGTH) == 0 && + workerNode->workerPort == nodePort) + { + /* we need to terminate the scan since we break */ + hash_seq_term(&status); + break; + } + } - hashKey = (void *) searchedNode; - workerNode = (WorkerNode *) hash_search(WorkerNodesHash, hashKey, - HASH_FIND, &handleFound); if (workerNode != NULL) { - if (workerNode->inWorkerFile) + if (workerNode->workerActive) { workerNodeActive = true; } @@ -395,15 +340,6 @@ WorkerNodeActive(const char *nodeName, uint32 nodePort) } -/* Returns true if given number is odd; returns false otherwise. */ -static bool -OddNumber(uint32 number) -{ - bool oddNumber = ((number % 2) == 1); - return oddNumber; -} - - /* * FindRandomNodeNotInList finds a random node from the shared hash that is not * a member of the current node list. The caller is responsible for making the @@ -449,7 +385,7 @@ FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList) { bool listMember = ListMember(currentNodeList, workerNode); - if (workerNode->inWorkerFile && !listMember) + if (workerNode->workerActive && !listMember) { lookForWorkerNode = false; } @@ -495,97 +431,6 @@ ListMember(List *currentList, WorkerNode *workerNode) } -/* ------------------------------------------------------------ - * Worker node shared hash functions follow - * ------------------------------------------------------------ - */ - -/* Organize, at startup, that the resources for worker node management are allocated. */ -void -WorkerNodeRegister(void) -{ - RequestAddinShmemSpace(WorkerNodeShmemSize()); - - prev_shmem_startup_hook = shmem_startup_hook; - shmem_startup_hook = WorkerNodeShmemAndWorkerListInit; -} - - -/* Estimates the shared memory size used for managing worker nodes. */ -static Size -WorkerNodeShmemSize(void) -{ - Size size = 0; - Size hashSize = 0; - - hashSize = hash_estimate_size(MaxWorkerNodesTracked, sizeof(WorkerNode)); - size = add_size(size, hashSize); - - return size; -} - - -/* Initializes the shared memory used for managing worker nodes. */ -static void -WorkerNodeShmemAndWorkerListInit(void) -{ - HASHCTL info; - int hashFlags = 0; - long maxTableSize = 0; - long initTableSize = 0; - - maxTableSize = (long) MaxWorkerNodesTracked; - initTableSize = maxTableSize / 8; - - /* - * Allocate the control structure for the hash table that maps worker node - * name and port numbers (char[]:uint32) to general node membership and - * health information. - */ - memset(&info, 0, sizeof(info)); - info.keysize = WORKER_LENGTH + sizeof(uint32); - info.entrysize = sizeof(WorkerNode); - info.hash = WorkerNodeHashCode; - info.match = WorkerNodeCompare; - hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE); - - WorkerNodesHash = ShmemInitHash("Worker Node Hash", - initTableSize, maxTableSize, - &info, hashFlags); - - /* - * Load the intial contents of the worker node hash table from the - * configuration file. - */ - LoadWorkerNodeList(WorkerListFileName); - - if (prev_shmem_startup_hook != NULL) - { - prev_shmem_startup_hook(); - } -} - - -/* - * WorkerNodeHashCode computes the hash code for a worker node from the node's - * host name and port number. Nodes that only differ by their rack locations - * hash to the same value. - */ -static uint32 -WorkerNodeHashCode(const void *key, Size keySize) -{ - const WorkerNode *worker = (const WorkerNode *) key; - const char *workerName = worker->workerName; - const uint32 *workerPort = &(worker->workerPort); - - /* standard hash function outlined in Effective Java, Item 8 */ - uint32 result = 17; - result = 37 * result + string_hash(workerName, WORKER_LENGTH); - result = 37 * result + tag_hash(workerPort, sizeof(uint32)); - return result; -} - - /* * CompareWorkerNodes compares two pointers to worker nodes using the exact * same logic employed by WorkerNodeCompare. @@ -629,251 +474,6 @@ WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize) } -/* - * LoadWorkerNodeList reads and parses given membership file, and loads worker - * nodes from this membership file into the shared hash. The function relies on - * hba.c's tokenization method for parsing, and therefore the membership file - * has the same syntax as other configuration files such as ph_hba.conf. - * - * Note that this function allows for reloading membership configuration files - * at runtime. When that happens, old worker nodes that do not appear in the - * file are marked as stale, but are still kept in the shared hash. - */ -void -LoadWorkerNodeList(const char *workerFilename) -{ - List *workerList = NIL; - ListCell *workerCell = NULL; - uint32 workerCount = 0; - - workerList = ParseWorkerNodeFile(workerFilename); - - workerCount = list_length(workerList); - if (workerCount > MaxWorkerNodesTracked) - { - ereport(FATAL, (errcode(ERRCODE_CONFIG_FILE_ERROR), - errmsg("worker node count: %u exceeds max allowed value: %d", - workerCount, MaxWorkerNodesTracked))); - } - else - { - ereport(INFO, (errmsg("reading nodes from worker file: %s", workerFilename))); - } - - /* before reading file's lines, reset worker node hash */ - ResetWorkerNodesHash(WorkerNodesHash); - - /* parse file lines */ - foreach(workerCell, workerList) - { - WorkerNode *workerNode = NULL; - WorkerNode *parsedNode = lfirst(workerCell); - void *hashKey = NULL; - bool handleFound = false; - - /* - * Search for the parsed worker node in the hash, and then insert parsed - * values. When searching, we make the hashKey point to the beginning of - * the parsed node; we previously set the key length and key comparison - * function to include both the node name and the port number. - */ - hashKey = (void *) parsedNode; - workerNode = (WorkerNode *) hash_search(WorkerNodesHash, hashKey, - HASH_ENTER, &handleFound); - - if (handleFound) - { - /* display notification if worker node's rack changed */ - char *oldWorkerRack = workerNode->workerRack; - char *newWorkerRack = parsedNode->workerRack; - - if (strncmp(oldWorkerRack, newWorkerRack, WORKER_LENGTH) != 0) - { - ereport(INFO, (errmsg("worker node: \"%s:%u\" changed rack location", - workerNode->workerName, workerNode->workerPort))); - } - - /* display warning if worker node already appeared in this file */ - if (workerNode->inWorkerFile) - { - ereport(WARNING, (errmsg("multiple lines for worker node: \"%s:%u\"", - workerNode->workerName, - workerNode->workerPort))); - } - } - - strlcpy(workerNode->workerName, parsedNode->workerName, WORKER_LENGTH); - strlcpy(workerNode->workerRack, parsedNode->workerRack, WORKER_LENGTH); - workerNode->workerPort = parsedNode->workerPort; - workerNode->inWorkerFile = parsedNode->inWorkerFile; - - pfree(parsedNode); - } -} - - -/* - * ParseWorkerNodeFile opens and parses the node name and node port from the - * specified configuration file. - */ -static List * -ParseWorkerNodeFile(const char *workerNodeFilename) -{ - FILE *workerFileStream = NULL; - List *workerNodeList = NIL; - char workerNodeLine[MAXPGPATH]; - char *workerFilePath = make_absolute_path(workerNodeFilename); - char *workerPatternTemplate = "%%%u[^# \t]%%*[ \t]%%%u[^# \t]%%*[ \t]%%%u[^# \t]"; - char workerLinePattern[1024]; - const int workerNameIndex = 0; - const int workerPortIndex = 1; - - memset(workerLinePattern, '\0', sizeof(workerLinePattern)); - - workerFileStream = AllocateFile(workerFilePath, PG_BINARY_R); - if (workerFileStream == NULL) - { - if (errno == ENOENT) - { - ereport(DEBUG1, (errmsg("worker list file located at \"%s\" is not present", - workerFilePath))); - } - else - { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not open worker list file \"%s\": %m", - workerFilePath))); - } - return NIL; - } - - /* build pattern to contain node name length limit */ - snprintf(workerLinePattern, sizeof(workerLinePattern), workerPatternTemplate, - WORKER_LENGTH, MAX_PORT_LENGTH, WORKER_LENGTH); - - while (fgets(workerNodeLine, sizeof(workerNodeLine), workerFileStream) != NULL) - { - const int workerLineLength = strnlen(workerNodeLine, MAXPGPATH); - WorkerNode *workerNode = NULL; - char *linePointer = NULL; - int32 nodePort = PostPortNumber; /* default port number */ - int fieldCount = 0; - bool lineIsInvalid = false; - char nodeName[WORKER_LENGTH + 1]; - char nodeRack[WORKER_LENGTH + 1]; - char nodePortString[MAX_PORT_LENGTH + 1]; - - memset(nodeName, '\0', sizeof(nodeName)); - strlcpy(nodeRack, WORKER_DEFAULT_RACK, sizeof(nodeRack)); - memset(nodePortString, '\0', sizeof(nodePortString)); - - if (workerLineLength == MAXPGPATH - 1) - { - ereport(ERROR, (errcode(ERRCODE_CONFIG_FILE_ERROR), - errmsg("worker node list file line exceeds the maximum " - "length of %d", MAXPGPATH))); - } - - /* trim trailing newlines preserved by fgets, if any */ - linePointer = workerNodeLine + workerLineLength - 1; - while (linePointer >= workerNodeLine && - (*linePointer == '\n' || *linePointer == '\r')) - { - *linePointer-- = '\0'; - } - - /* skip leading whitespace */ - for (linePointer = workerNodeLine; *linePointer; linePointer++) - { - if (!isspace((unsigned char) *linePointer)) - { - break; - } - } - - /* if the entire line is whitespace or a comment, skip it */ - if (*linePointer == '\0' || *linePointer == '#') - { - continue; - } - - /* parse line; node name is required, but port and rack are optional */ - fieldCount = sscanf(linePointer, workerLinePattern, - nodeName, nodePortString, nodeRack); - - /* adjust field count for zero based indexes */ - fieldCount--; - - /* raise error if no fields were assigned */ - if (fieldCount < workerNameIndex) - { - lineIsInvalid = true; - } - - /* no special treatment for nodeName: already parsed by sscanf */ - - /* if a second token was specified, convert to integer port */ - if (fieldCount >= workerPortIndex) - { - char *nodePortEnd = NULL; - - errno = 0; - nodePort = strtol(nodePortString, &nodePortEnd, 10); - - if (errno != 0 || (*nodePortEnd) != '\0' || nodePort <= 0) - { - lineIsInvalid = true; - } - } - - if (lineIsInvalid) - { - ereport(ERROR, (errcode(ERRCODE_CONFIG_FILE_ERROR), - errmsg("could not parse worker node line: %s", - workerNodeLine), - errhint("Lines in the worker node file must contain a valid " - "node name and, optionally, a positive port number. " - "Comments begin with a '#' character and extend to " - "the end of their line."))); - } - - /* allocate worker node structure and set fields */ - workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode)); - - strlcpy(workerNode->workerName, nodeName, WORKER_LENGTH); - strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH); - workerNode->workerPort = nodePort; - workerNode->inWorkerFile = true; - - workerNodeList = lappend(workerNodeList, workerNode); - } - - FreeFile(workerFileStream); - free(workerFilePath); - - return workerNodeList; -} - - -/* Marks all worker nodes in the shared hash as stale. */ -static void -ResetWorkerNodesHash(HTAB *WorkerNodesHash) -{ - WorkerNode *workerNode = NULL; - - HASH_SEQ_STATUS status; - hash_seq_init(&status, WorkerNodesHash); - - workerNode = (WorkerNode *) hash_seq_search(&status); - while (workerNode != NULL) - { - workerNode->inWorkerFile = false; - - workerNode = (WorkerNode *) hash_seq_search(&status); - } -} - - /* ResponsiveWorkerNodeList returns a list of all responsive worker nodes */ List * ResponsiveWorkerNodeList(void) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index b486b590e..943b2b663 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -47,7 +47,6 @@ void _PG_init(void); static void CreateRequiredDirectories(void); static void RegisterCitusConfigVariables(void); -static void NormalizeWorkerListPath(void); /* *INDENT-OFF* */ @@ -150,9 +149,6 @@ _PG_init(void) /* organize that task tracker is started once server is up */ TaskTrackerRegister(); - /* initialize worker node manager */ - WorkerNodeRegister(); - /* initialize transaction callbacks */ InstallRouterExecutorShmemHook(); InstallMultiShardXactShmemHook(); @@ -193,17 +189,6 @@ CreateRequiredDirectories(void) static void RegisterCitusConfigVariables(void) { - DefineCustomStringVariable( - "citus.worker_list_file", - gettext_noop("Sets the server's \"worker_list\" configuration file."), - NULL, - &WorkerListFileName, - NULL, - PGC_POSTMASTER, - GUC_SUPERUSER_ONLY, - NULL, NULL, NULL); - NormalizeWorkerListPath(); - DefineCustomBoolVariable( "citus.binary_master_copy_format", gettext_noop("Use the binary master copy format."), @@ -564,48 +549,3 @@ RegisterCitusConfigVariables(void) /* warn about config items in the citus namespace that are not registered above */ EmitWarningsOnPlaceholders("citus"); } - - -/* - * NormalizeWorkerListPath converts the path configured via - * citus.worker_list_file into an absolute path, falling back to the default - * value if necessary. The previous value of the config variable is - * overwritten with the normalized value. - * - * NB: This has to be called before ChangeToDataDir() is called as otherwise - * the relative paths won't make much sense to the user anymore. - */ -static void -NormalizeWorkerListPath(void) -{ - char *absoluteFileName = NULL; - - if (WorkerListFileName != NULL) - { - absoluteFileName = make_absolute_path(WorkerListFileName); - } - else if (DataDir != NULL) - { - absoluteFileName = malloc(strlen(DataDir) + strlen(WORKER_LIST_FILENAME) + 2); - if (absoluteFileName == NULL) - { - ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - } - - sprintf(absoluteFileName, "%s/%s", DataDir, WORKER_LIST_FILENAME); - } - else - { - ereport(FATAL, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("%s does not know where to find the \"worker_list_file\" " - "configuration file.\n" - "This can be specified as \"citus.worker_list_file\" in " - "\"%s\", or by the -D invocation option, or by the PGDATA " - "environment variable.\n", progname, ConfigFileName))); - } - - SetConfigOption("citus.worker_list_file", absoluteFileName, PGC_POSTMASTER, - PGC_S_OVERRIDE); - free(absoluteFileName); -} diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 08f27f4dc..0e997bc0e 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -25,9 +25,11 @@ #include "commands/trigger.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" +#include "distributed/pg_dist_node.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/shardinterval_utils.h" +#include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "parser/parse_func.h" #include "utils/builtins.h" @@ -48,6 +50,7 @@ static bool extensionLoaded = false; static Oid distShardRelationId = InvalidOid; static Oid distShardPlacementRelationId = InvalidOid; +static Oid distNodeRelationId = InvalidOid; static Oid distPartitionRelationId = InvalidOid; static Oid distPartitionLogicalRelidIndexId = InvalidOid; static Oid distShardLogicalRelidIndexId = InvalidOid; @@ -58,6 +61,9 @@ static Oid extraDataContainerFuncId = InvalidOid; /* Hash table for informations about each partition */ static HTAB *DistTableCacheHash = NULL; +/* Hash table for informations about worker nodes */ +static HTAB *WorkerNodeHash = NULL; + /* built first time through in InitializePartitionCache */ static ScanKeyData DistPartitionScanKey[1]; static ScanKeyData DistShardScanKey[1]; @@ -76,8 +82,10 @@ static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount); static void InitializeDistTableCache(void); +static void InitializeWorkerNodeCache(void); static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); +static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId); static List * LookupDistShardTuples(Oid relationId); static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, @@ -85,12 +93,15 @@ static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMe static ShardInterval * TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid intervalTypeId, int32 intervalTypeMod); +static List * ReadWorkerNodes(void); +static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static void CachedRelationLookup(const char *relationName, Oid *cachedOid); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_dist_partition_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_shard_cache_invalidate); +PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate); /* @@ -593,6 +604,16 @@ DistShardPlacementRelationId(void) } +/* return oid of pg_dist_node relation */ +Oid +DistNodeRelationId(void) +{ + CachedRelationLookup("pg_dist_node", &distNodeRelationId); + + return distNodeRelationId; +} + + /* return oid of pg_dist_partition relation */ Oid DistPartitionRelationId(void) @@ -862,6 +883,29 @@ master_dist_shard_cache_invalidate(PG_FUNCTION_ARGS) } +/* + * master_dist_node_cache_invalidate is a trigger function that performs + * relcache invalidations when the contents of pg_dist_node are changed + * on the SQL level. + * + * NB: We decided there is little point in checking permissions here, there + * are much easier ways to waste CPU than causing cache invalidations. + */ +Datum +master_dist_node_cache_invalidate(PG_FUNCTION_ARGS) +{ + if (!CALLED_AS_TRIGGER(fcinfo)) + { + ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("must be called as trigger"))); + } + + CitusInvalidateRelcacheByRelid(DistNodeRelationId()); + + PG_RETURN_DATUM(PointerGetDatum(NULL)); +} + + /* initialize the infrastructure for the metadata cache */ static void InitializeDistTableCache(void) @@ -910,6 +954,110 @@ InitializeDistTableCache(void) } +/* + * GetWorkerNodeHash is a wrapper around InitializeWorkerNodeCache(). It + * triggers InitializeWorkerNodeCache when the workerHash is NULL. Otherwise, + * it returns the hash. + */ +HTAB * +GetWorkerNodeHash(void) +{ + if (WorkerNodeHash == NULL) + { + InitializeWorkerNodeCache(); + } + + return WorkerNodeHash; +} + + +/* + * Initialize the infrastructure for the worker node cache. The functions + * reads the worker nodes from the metadata table, adds them to the hash and + * finally registers an invalidation callaback. + */ +static void +InitializeWorkerNodeCache(void) +{ + static bool invalidationRegistered = false; + List *workerNodeList = NIL; + ListCell *workerNodeCell = NULL; + HASHCTL info; + int hashFlags = 0; + long maxTableSize = (long) MaxWorkerNodesTracked; + + /* make sure we've initialized CacheMemoryContext */ + if (CacheMemoryContext == NULL) + { + CreateCacheMemoryContext(); + } + + /* + * Create the hash that holds the worker nodes. The key is the unique nodeid + * field. + */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(uint32); + info.entrysize = sizeof(WorkerNode); + info.hcxt = CacheMemoryContext; + info.hash = tag_hash; + hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT; + + WorkerNodeHash = hash_create("Worker Node Hash", + maxTableSize, + &info, hashFlags); + + /* read the list from the pg_dist_node */ + workerNodeList = ReadWorkerNodes(); + + /* iterate over the worker node list */ + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = NULL; + WorkerNode *currentNode = lfirst(workerNodeCell); + void *hashKey = NULL; + bool handleFound = false; + + /* + * Search for the worker node in the hash, and then insert the + * values. When searching, we make the hashKey the unique nodeid. + */ + hashKey = (void *) ¤tNode->nodeId; + workerNode = (WorkerNode *) hash_search(WorkerNodeHash, hashKey, + HASH_ENTER, &handleFound); + + /* fill the newly allocated workerNode in the cache */ + strlcpy(workerNode->workerName, currentNode->workerName, WORKER_LENGTH); + workerNode->workerPort = currentNode->workerPort; + workerNode->workerActive = currentNode->workerActive; + workerNode->workerRole = currentNode->workerRole; + workerNode->groupId = currentNode->groupId; + + if (handleFound) + { + ereport(WARNING, (errmsg("multiple lines for worker node: \"%s:%u\"", + workerNode->workerName, + workerNode->workerPort))); + } + + workerNode->workerPort = currentNode->workerPort; + + /* we do not need the currentNode anymore */ + pfree(currentNode); + } + + /* prevent multiple invalidation registrations */ + if (!invalidationRegistered) + { + /* Watch for invalidation events. */ + CacheRegisterRelcacheCallback(InvalidateNodeRelationCacheCallback, + (Datum) 0); + + invalidationRegistered = true; + } +} + + /* * ResetDistTableCacheEntry frees any out-of-band memory used by a cache entry, * but does not free the entry itself. @@ -1016,11 +1164,29 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) distShardLogicalRelidIndexId = InvalidOid; distShardShardidIndexId = InvalidOid; distShardPlacementShardidIndexId = InvalidOid; + distNodeRelationId = InvalidOid; extraDataContainerFuncId = InvalidOid; } } +/* + * InvalidateNodeRelationCacheCallback destroys the WorkerNodeHash when + * any change happens on pg_dist_node table. It also set WorkerNodeHash to + * NULL, which allows consequent accesses to the hash read from the + * pg_dist_node from scratch. + */ +static void +InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId) +{ + if (WorkerNodeHash != NULL && relationId == DistNodeRelationId()) + { + hash_destroy(WorkerNodeHash); + WorkerNodeHash = NULL; + } +} + + /* * LookupDistPartitionTuple searches pg_dist_partition for relationId's entry * and returns that or, if no matching entry was found, NULL. @@ -1211,6 +1377,181 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid interva } +/* + * ReadWorkerNodes iterates over pg_dist_node table, converts each row + * into it's memory representation (i.e., WorkerNode) and adds them into + * a list. Lastly, the list is returned to the caller. + */ +static List * +ReadWorkerNodes() +{ + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 0; + HeapTuple heapTuple = NULL; + List *workerNodeList = NIL; + TupleDesc tupleDescriptor = NULL; + + Relation pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); + + scanDescriptor = systable_beginscan(pgDistNode, + InvalidOid, false, + NULL, scanKeyCount, scanKey); + + tupleDescriptor = RelationGetDescr(pgDistNode); + + heapTuple = systable_getnext(scanDescriptor); + while (HeapTupleIsValid(heapTuple)) + { + WorkerNode *workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple); + workerNodeList = lappend(workerNodeList, workerNode); + + heapTuple = systable_getnext(scanDescriptor); + } + + systable_endscan(scanDescriptor); + heap_close(pgDistNode, AccessExclusiveLock); + + return workerNodeList; +} + + +/* + * InsertNodedRow opens the node system catalog, and inserts a new row with the + * given values into that system catalog. + */ +void +InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, char nodeRole, + bool nodeActive, uint32 groupId) +{ + Relation pgDistNode = NULL; + TupleDesc tupleDescriptor = NULL; + HeapTuple heapTuple = NULL; + Datum values[Natts_pg_dist_node]; + bool isNulls[Natts_pg_dist_node]; + + /* form new shard tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid); + values[Anum_pg_dist_node_noderole - 1] = CharGetDatum(nodeRole); + values[Anum_pg_dist_node_nodeactive - 1] = BoolGetDatum(nodeActive); + values[Anum_pg_dist_node_groupid - 1] = UInt32GetDatum(groupId); + values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName); + values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort); + + /* open shard relation and insert new tuple */ + pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); + + tupleDescriptor = RelationGetDescr(pgDistNode); + heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + simple_heap_insert(pgDistNode, heapTuple); + CatalogUpdateIndexes(pgDistNode, heapTuple); + + /* close relation and invalidate previous cache entry */ + heap_close(pgDistNode, AccessExclusiveLock); + + CitusInvalidateRelcacheByRelid(DistNodeRelationId()); + + /* increment the counter so that next command can see the row */ + CommandCounterIncrement(); +} + + +/* + * UpdateNodeActiveColumn updates the nodeactive column of the given worker node + * on the pg_dist_node. The function also invalidates the pg_dist_node's cache so + * that subsequent accesses to the table reads the updated values. + */ +void +UpdateNodeActiveColumn(WorkerNode *workerNode, bool nodeActive) +{ + Relation pgDistNode = NULL; + HeapTuple heapTuple = NULL; + HeapTuple modifiableHeaptuple = NULL; + SysScanDesc scanDescriptor = NULL; + Form_pg_dist_node nodeForm = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + uint32 nodeId = workerNode->nodeId; + + pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid, + BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(nodeId)); + + scanDescriptor = systable_beginscan(pgDistNode, InvalidOid, false, NULL, + scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for node %d", nodeId))); + } + + /* create a copy of the tuple */ + modifiableHeaptuple = heap_copytuple(heapTuple); + + nodeForm = (Form_pg_dist_node) GETSTRUCT(modifiableHeaptuple); + + /* now update the active column */ + nodeForm->nodeactive = nodeActive; + + simple_heap_update(pgDistNode, &heapTuple->t_self, modifiableHeaptuple); + + systable_endscan(scanDescriptor); + + heap_close(pgDistNode, AccessExclusiveLock); + + /* invalidate the cache */ + CitusInvalidateRelcacheByRelid(DistNodeRelationId()); + + /* increment the counter so that next command can see the row */ + CommandCounterIncrement(); +} + + + +/* + * TupleToWorkerNode takes in a heap tuple from pg_dist_node, and + * converts this tuple to an equivalent struct in memory. The function assumes + * the caller already has locks on the tuple, and doesn't perform any locking. + */ +static WorkerNode * +TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) +{ + WorkerNode *workerNode = NULL; + bool isNull = false; + + Datum nodeId = heap_getattr(heapTuple, Anum_pg_dist_node_nodeid, + tupleDescriptor, &isNull); + Datum nodeRole = heap_getattr(heapTuple, Anum_pg_dist_node_noderole, + tupleDescriptor, &isNull); + Datum nodeActive = heap_getattr(heapTuple, Anum_pg_dist_node_nodeactive, + tupleDescriptor, &isNull); + Datum groupId = heap_getattr(heapTuple, Anum_pg_dist_node_groupid, + tupleDescriptor, &isNull); + Datum nodeName = heap_getattr(heapTuple, Anum_pg_dist_node_nodename, + tupleDescriptor, &isNull); + Datum nodePort = heap_getattr(heapTuple, Anum_pg_dist_node_nodeport, + tupleDescriptor, &isNull); + + Assert(!HeapTupleHasNulls(heapTuple)); + + workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode)); + workerNode->nodeId = DatumGetUInt32(nodeId); + workerNode->workerPort = DatumGetUInt32(nodePort); + workerNode->workerRole = DatumGetChar(nodeRole); + workerNode->groupId = DatumGetUInt32(groupId); + workerNode->workerActive = DatumGetBool(nodeActive); + strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH); + + return workerNode; +} + + /* * CachedRelationLookup performs a cached lookup for the relation * relationName, with the result cached in *cachedOid. diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c new file mode 100644 index 000000000..eec0f6a0d --- /dev/null +++ b/src/backend/distributed/utils/node_metadata.c @@ -0,0 +1,549 @@ +/* + * node_metadata.c + * Functions that operate on pg_dist_node + * + * Copyright (c) 2012-2016, Citus Data, Inc. + */ +#include "postgres.h" +#include "miscadmin.h" +#include "funcapi.h" + + +#include "access/genam.h" +#include "access/heapam.h" +#include "access/htup.h" +#include "access/htup_details.h" +#include "access/skey.h" +#if (PG_VERSION_NUM >= 90500 && PG_VERSION_NUM < 90600) +#include "access/stratnum.h" +#else +#include "access/skey.h" +#endif +#include "access/tupmacs.h" +#include "access/xact.h" +#include "catalog/indexing.h" +#include "commands/sequence.h" +#include "distributed/master_protocol.h" +#include "distributed/metadata_cache.h" +#include "distributed/pg_dist_node.h" +#include "distributed/worker_manager.h" +#include "distributed/worker_transaction.h" +#include "lib/stringinfo.h" +#include "storage/lock.h" +#include "storage/fd.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/rel.h" +#include "utils/relcache.h" + + +/* default group size */ +int GroupSize = 1; + + +/* local function forward declarations */ +static Datum GenerateNodeTuple(WorkerNode *workerNode); +static WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort); +static uint32 NextGroupId(void); +static uint32 GetMaxGroupId(void); +static uint64 GetNodeCountInGroup(uint32 groupId); +static char * InsertNodeCommand(uint32 nodeid, char *nodename, int nodeport, + uint32 groupId); +static List * ParseWorkerNodeFile(const char *workerNodeFilename); + +/* declarations for dynamic loading */ +PG_FUNCTION_INFO_V1(cluster_add_node); +PG_FUNCTION_INFO_V1(cluster_read_worker_file); +PG_FUNCTION_INFO_V1(master_get_new_nodeid); +PG_FUNCTION_INFO_V1(master_get_next_groupid); + + +/* + * cluster_add_node function adds a new node to the cluster. If the node already + * exists, the function returns with the information about the node. If not, the + * following prodecure is followed while adding a node. + * If the groupId is not explicitly given by the user, the function picks the + * group that the new node should be in with respect to GroupSize. Then, the + * new node is inserted into the local pg_dist_node. + * + * TODO: The following will be added in the near future. + * Lastly, the new node is inserted to all other nodes' pg_dist_node table. + */ +Datum +cluster_add_node(PG_FUNCTION_ARGS) +{ + text *nodeName = PG_GETARG_TEXT_P(0); + int32 nodePort = PG_GETARG_INT32(1); + int32 groupId = PG_GETARG_INT32(2); + char *nodeNameString = text_to_cstring(nodeName); + + Relation pgDistNode = NULL; + Datum nextNodeId = 0; + int nextNodeIdInt = 0; + char *insertCommand = NULL; + Datum returnData = 0; + WorkerNode *workerNode = NULL; + + /* acquire a lock so that no one can do this concurrently */ + pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); + + /* check if the node already exists in the cluster */ + workerNode = FindWorkerNode(nodeNameString, nodePort); + if (workerNode != NULL) + { + /* fill return data and return */ + returnData = GenerateNodeTuple(workerNode); + + /* close the heap */ + heap_close(pgDistNode, AccessExclusiveLock); + + PG_RETURN_DATUM(returnData); + } + + /* user lets Citus to decide on the group that the newly added node should be in */ + if (groupId == 0) + { + groupId = NextGroupId(); + } + else + { + uint maxGroupId = GetMaxGroupId(); + + if (groupId > maxGroupId) + { + ereport(ERROR, (errmsg("you cannot add a node to a non-existing group"))); + } + } + + /* generate the new node id from the sequence */ + nextNodeId = master_get_new_nodeid(NULL); + nextNodeIdInt = DatumGetUInt32(nextNodeId); + + InsertNodeRow(nextNodeIdInt, nodeNameString, nodePort, groupId); + + insertCommand = InsertNodeCommand(nextNodeIdInt, nodeNameString, nodePort, groupId); + + /* TODO: enable this once we have fully metadata sync */ + /* SendCommandToWorkersInParallel(insertCommand); */ + + heap_close(pgDistNode, AccessExclusiveLock); + + /* fetch the worker node, and generate the output */ + workerNode = FindWorkerNode(nodeNameString, nodePort); + returnData = GenerateNodeTuple(workerNode); + + PG_RETURN_CSTRING(returnData); +} + + +/* + */ +Datum +cluster_read_worker_file(PG_FUNCTION_ARGS) +{ + text *filePath = PG_GETARG_TEXT_P(0); + char *filePathCStr = text_to_cstring(filePath); + + ListCell *workerNodeCell = NULL; + List *workerNodes = ParseWorkerNodeFile(filePathCStr); + + foreach(workerNodeCell, workerNodes) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + Datum workerNameDatum = PointerGetDatum(cstring_to_text(workerNode->workerName)); + + DirectFunctionCall3(cluster_add_node, workerNameDatum, + UInt32GetDatum(workerNode->workerPort), + PointerGetDatum(NULL)); + } + + PG_RETURN_BOOL(true); +} + + +/* + * GenerateNodeTuple gets a worker node and return a heap tuple of + * given worker node. + */ +static Datum +GenerateNodeTuple(WorkerNode *workerNode) +{ + Relation pgDistNode = NULL; + TupleDesc tupleDescriptor = NULL; + HeapTuple heapTuple = NULL; + Datum nodeDatum = 0; + Datum values[Natts_pg_dist_node]; + bool isNulls[Natts_pg_dist_node]; + + /* form new shard tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(workerNode->nodeId); + values[Anum_pg_dist_node_groupid - 1] = UInt32GetDatum(workerNode->groupId); + values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(workerNode->workerName); + values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(workerNode->workerPort); + + /* open shard relation and insert new tuple */ + pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); + + /* generate the tuple */ + tupleDescriptor = RelationGetDescr(pgDistNode); + heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + nodeDatum = HeapTupleGetDatum(heapTuple); + + /* close the relation */ + heap_close(pgDistNode, AccessShareLock); + + return nodeDatum; +} + + +/* + * FindWorkerNode iterates of the worker nodes and returns the workerNode + * if it already exists. Else, the function returns NULL. + */ +static WorkerNode * +FindWorkerNode(char *nodeName, int32 nodePort) +{ + WorkerNode *workerNode = NULL; + HTAB *workerNodeHash = GetWorkerNodeHash(); + HASH_SEQ_STATUS status; + + hash_seq_init(&status, workerNodeHash); + + while ((workerNode = hash_seq_search(&status)) != NULL) + { + if (strncasecmp(nodeName, workerNode->workerName, WORKER_LENGTH) == 0 && + nodePort == workerNode->workerPort) + { + /* we need to terminate the scan since we break */ + hash_seq_term(&status); + break; + } + } + + return workerNode; +} + + +/* + * NextGroupId returns the next group that that can be assigned to a node. If the + * group is full (i.e., it has equal or more elements than GroupSize), a new group id + * is generated and returned. Else, the current maximum group id is returned. + */ +static uint32 +NextGroupId() +{ + uint32 nextGroupId = 0; + uint32 maxGroupIdInt = GetMaxGroupId(); + uint64 nodeCountInMaxGroupId = GetNodeCountInGroup(maxGroupIdInt); + + if (nodeCountInMaxGroupId == 0 || nodeCountInMaxGroupId >= GroupSize) + { + Datum nextGroupIdDatum = master_get_next_groupid(NULL); + + nextGroupId = DatumGetUInt32(nextGroupIdDatum); + } + else + { + nextGroupId = maxGroupIdInt; + } + + return nextGroupId; +} + + +/* + * GetMaxGroupId iterates over the worker node hash, and returns the maximum + * group id from the table. + */ +static uint32 +GetMaxGroupId() +{ + uint32 maxGroupId = 0; + WorkerNode *workerNode = NULL; + HTAB *workerNodeHash = GetWorkerNodeHash(); + HASH_SEQ_STATUS status; + + hash_seq_init(&status, workerNodeHash); + + while ((workerNode = hash_seq_search(&status)) != NULL) + { + uint32 workerNodeGroupId = workerNode->groupId; + + if (workerNodeGroupId > maxGroupId) + { + maxGroupId = workerNodeGroupId; + + } + } + + return maxGroupId; +} + + +/* + * GetNodeCountInGroup iterates over the worker node hash, and returns the + * element count with the given groupId. + */ +static uint64 +GetNodeCountInGroup(uint32 groupId) +{ + uint64 elementCountInGroup = 0; + WorkerNode *workerNode = NULL; + HTAB *workerNodeHash = GetWorkerNodeHash(); + HASH_SEQ_STATUS status; + + hash_seq_init(&status, workerNodeHash); + + while ((workerNode = hash_seq_search(&status)) != NULL) + { + uint32 workerNodeGroupId = workerNode->groupId; + + if (workerNodeGroupId == groupId) + { + elementCountInGroup += 1; + } + } + + return elementCountInGroup; +} + + +/* + * DistributionCreateCommands generates a commands that can be + * executed to replicate the metadata for a distributed table. + */ +static char * +InsertNodeCommand(uint32 nodeid, char *nodename, int nodeport, uint32 groupId) +{ + StringInfo insertNodeCommand = makeStringInfo(); + + appendStringInfo(insertNodeCommand, + "INSERT INTO pg_dist_node " /*TODO: add a ON CONFLICT clause */ + "(nodeid, nodename, nodeport, groupid) " + "VALUES " + "(%d, '%s', %d, '%c', %s , %d);", + nodeid, + nodename, + nodeport, + groupId); + + return insertNodeCommand->data; +} + + +/* + * ParseWorkerNodeFile opens and parses the node name and node port from the + * specified configuration file. + * Note that this function is deprecated. Do not use this function for any new + * features. + */ +static List * +ParseWorkerNodeFile(const char *workerNodeFilename) +{ + FILE *workerFileStream = NULL; + List *workerNodeList = NIL; + char workerNodeLine[MAXPGPATH]; + char *workerFilePath = make_absolute_path(workerNodeFilename); + char *workerPatternTemplate = "%%%u[^# \t]%%*[ \t]%%%u[^# \t]%%*[ \t]%%%u[^# \t]"; + char workerLinePattern[1024]; + const int workerNameIndex = 0; + const int workerPortIndex = 1; + + memset(workerLinePattern, '\0', sizeof(workerLinePattern)); + + workerFileStream = AllocateFile(workerFilePath, PG_BINARY_R); + if (workerFileStream == NULL) + { + if (errno == ENOENT) + { + ereport(DEBUG1, (errmsg("worker list file located at \"%s\" is not present", + workerFilePath))); + } + else + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not open worker list file \"%s\": %m", + workerFilePath))); + } + return NIL; + } + + /* build pattern to contain node name length limit */ + snprintf(workerLinePattern, sizeof(workerLinePattern), workerPatternTemplate, + WORKER_LENGTH, MAX_PORT_LENGTH, WORKER_LENGTH); + + while (fgets(workerNodeLine, sizeof(workerNodeLine), workerFileStream) != NULL) + { + const int workerLineLength = strnlen(workerNodeLine, MAXPGPATH); + WorkerNode *workerNode = NULL; + char *linePointer = NULL; + int32 nodePort = 5432; /* default port number */ + int fieldCount = 0; + bool lineIsInvalid = false; + char nodeName[WORKER_LENGTH + 1]; + char nodeRack[WORKER_LENGTH + 1]; + char nodePortString[MAX_PORT_LENGTH + 1]; + + memset(nodeName, '\0', sizeof(nodeName)); + //strlcpy(nodeRack, WORKER_DEFAULT_RACK, sizeof(nodeRack)); + memset(nodePortString, '\0', sizeof(nodePortString)); + + if (workerLineLength == MAXPGPATH - 1) + { + ereport(ERROR, (errcode(ERRCODE_CONFIG_FILE_ERROR), + errmsg("worker node list file line exceeds the maximum " + "length of %d", MAXPGPATH))); + } + + /* trim trailing newlines preserved by fgets, if any */ + linePointer = workerNodeLine + workerLineLength - 1; + while (linePointer >= workerNodeLine && + (*linePointer == '\n' || *linePointer == '\r')) + { + *linePointer-- = '\0'; + } + + /* skip leading whitespace */ + for (linePointer = workerNodeLine; *linePointer; linePointer++) + { + if (!isspace((unsigned char) *linePointer)) + { + break; + } + } + + /* if the entire line is whitespace or a comment, skip it */ + if (*linePointer == '\0' || *linePointer == '#') + { + continue; + } + + /* parse line; node name is required, but port and rack are optional */ + fieldCount = sscanf(linePointer, workerLinePattern, + nodeName, nodePortString, nodeRack); + + /* adjust field count for zero based indexes */ + fieldCount--; + + /* raise error if no fields were assigned */ + if (fieldCount < workerNameIndex) + { + lineIsInvalid = true; + } + + /* no special treatment for nodeName: already parsed by sscanf */ + + /* if a second token was specified, convert to integer port */ + if (fieldCount >= workerPortIndex) + { + char *nodePortEnd = NULL; + + errno = 0; + nodePort = strtol(nodePortString, &nodePortEnd, 10); + + if (errno != 0 || (*nodePortEnd) != '\0' || nodePort <= 0) + { + lineIsInvalid = true; + } + } + + if (lineIsInvalid) + { + ereport(ERROR, (errcode(ERRCODE_CONFIG_FILE_ERROR), + errmsg("could not parse worker node line: %s", + workerNodeLine), + errhint("Lines in the worker node file must contain a valid " + "node name and, optionally, a positive port number. " + "Comments begin with a '#' character and extend to " + "the end of their line."))); + } + + /* allocate worker node structure and set fields */ + workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode)); + + strlcpy(workerNode->workerName, nodeName, WORKER_LENGTH); + //strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH); + workerNode->workerPort = nodePort; + + workerNodeList = lappend(workerNodeList, workerNode); + } + + FreeFile(workerFileStream); + free(workerFilePath); + + return workerNodeList; +} + + +/* + * master_get_new_nodeid allocates and returns a unique nodeId for the node + * to be added. This allocation occurs both in shared memory and in write + * ahead logs; writing to logs avoids the risk of having nodeId collisions. + * + * Please note that the caller is still responsible for finalizing node data + * and the nodeId with the master node. Further note that this function relies + * on an internal sequence created in initdb to generate unique identifiers. + * + * NB: This can be called by any user; for now we have decided that that's + * ok. We might want to restrict this to users part of a specific role or such + * at some later point. + */ +Datum +master_get_new_nodeid(PG_FUNCTION_ARGS) +{ + text *sequenceName = cstring_to_text(NODEID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + Datum shardIdDatum = 0; + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* generate new and unique shardId from sequence */ + shardIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + PG_RETURN_DATUM(shardIdDatum); +} + + +/* + * master_get_next_groupid allocates and returns a unique groupId for the group + * to be created. This allocation occurs both in shared memory and in write + * ahead logs; writing to logs avoids the risk of having groupId collisions. + * + * Please note that the caller is still responsible for finalizing node data + * and the groupId with the master node. Further note that this function relies + * on an internal sequence created in initdb to generate unique identifiers. + * + * NB: This can be called by any user; for now we have decided that that's + * ok. We might want to restrict this to users part of a specific role or such + * at some later point. + */ +Datum +master_get_next_groupid(PG_FUNCTION_ARGS) +{ + text *sequenceName = cstring_to_text(GROUPID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + Datum groupIdDatum = 0; + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* generate new and unique shardId from sequence */ + groupIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + PG_RETURN_DATUM(groupIdDatum); +} diff --git a/src/backend/distributed/worker/task_tracker.c b/src/backend/distributed/worker/task_tracker.c index f0ee43dc4..51fe50aad 100644 --- a/src/backend/distributed/worker/task_tracker.c +++ b/src/backend/distributed/worker/task_tracker.c @@ -236,14 +236,6 @@ TaskTrackerMain(Datum main_arg) /* reload postgres configuration files */ ProcessConfigFile(PGC_SIGHUP); - - /* - * Reload worker membership file. For now we do that in the task - * tracker because that's currently the only background worker in - * Citus. And only background workers allow us to safely - * register a SIGHUP handler. - */ - LoadWorkerNodeList(WorkerListFileName); } if (got_SIGTERM) { diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index d66268a98..435c775ae 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -14,6 +14,8 @@ #include "fmgr.h" #include "distributed/master_metadata_utility.h" #include "distributed/pg_dist_partition.h" +#include "distributed/worker_manager.h" +#include "utils/hsearch.h" /* @@ -51,14 +53,20 @@ typedef struct extern bool IsDistributedTable(Oid relationId); extern ShardInterval * LoadShardInterval(uint64 shardId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); +extern void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId); extern void CitusInvalidateRelcacheByRelid(Oid relationId); +extern void CitusInvalidateNodeCache(void); extern bool CitusHasBeenLoaded(void); +/* access WorkerNodeHash */ +extern HTAB * GetWorkerNodeHash(void); + /* relation oids */ extern Oid DistPartitionRelationId(void); extern Oid DistShardRelationId(void); extern Oid DistShardPlacementRelationId(void); +extern Oid DistNodeRelationId(void); /* index oids */ extern Oid DistPartitionLogicalRelidIndexId(void); diff --git a/src/include/distributed/pg_dist_node.h b/src/include/distributed/pg_dist_node.h new file mode 100644 index 000000000..941af87d1 --- /dev/null +++ b/src/include/distributed/pg_dist_node.h @@ -0,0 +1,48 @@ +/*------------------------------------------------------------------------- + * + * pg_dist_node.h + * definition of the relation that holds the nodes on the cluster (pg_dist_node). + * + * Copyright (c) 2012-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PG_DIST_NODE_H +#define PG_DIST_NODE_H + +/* ---------------- + * pg_dist_node definition. + * ---------------- + */ +typedef struct FormData_pg_dist_node +{ + int nodeid; + int groupid; +#ifdef CATALOG_VARLEN + text nodename; + int nodeport; +#endif +} FormData_pg_dist_node; + +/* ---------------- + * Form_pg_dist_partitions corresponds to a pointer to a tuple with + * the format of pg_dist_partitions relation. + * ---------------- + */ +typedef FormData_pg_dist_node *Form_pg_dist_node; + +/* ---------------- + * compiler constants for pg_dist_node + * ---------------- + */ +#define Natts_pg_dist_node 4 +#define Anum_pg_dist_node_nodeid 1 +#define Anum_pg_dist_node_groupid 2 +#define Anum_pg_dist_node_nodename 3 +#define Anum_pg_dist_node_nodeport 4 + +#define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq" +#define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq" + +#endif /* PG_DIST_NODE_H */ diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index ec506848c..171567f3c 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -23,37 +23,24 @@ /* Maximum length of worker port number (represented as string) */ #define MAX_PORT_LENGTH 10 -/* default filename for citus.worker_list_file */ -#define WORKER_LIST_FILENAME "pg_worker_list.conf" - -/* Implementation specific definitions used in finding worker nodes */ -#define WORKER_RACK_TRIES 5 -#define WORKER_DEFAULT_RACK "default" - /* - * WorkerNode keeps shared memory state for active and temporarily failed worker - * nodes. Permanently failed or departed nodes on the other hand are eventually - * purged from the shared hash. In the current implementation, the distinction - * between active, temporarily failed, and permanently departed nodes is made - * based on the node's presence in the membership file; only nodes in this file - * appear in the shared hash. In the future, worker nodes will report their - * health status to the master via heartbeats, and these heartbeats along with - * membership information will be used to determine a worker node's liveliness. + * In memory representation of pg_dist_node table elements. The elements are hold in + * WorkerNodeHash table. workerActive field is used to determine a worker node's liveliness. */ typedef struct WorkerNode { - uint32 workerPort; /* node's port; part of hash table key */ - char workerName[WORKER_LENGTH]; /* node's name; part of hash table key */ - char workerRack[WORKER_LENGTH]; /* node's network location */ + uint32 nodeId; /* node's unique id, key of the hash table */ + uint32 workerPort; /* node's port */ + char workerName[WORKER_LENGTH]; /* node's name */ + uint32 groupId; /* node's groupId; same for the nodes that are in the same group */ - bool inWorkerFile; /* is node in current membership file? */ + bool workerActive; /* should Citus utilize the node? */ } WorkerNode; /* Config variables managed via guc.c */ extern int MaxWorkerNodesTracked; -extern char *WorkerListFileName; /* Function declarations for finding worker nodes to place shards on */ @@ -68,10 +55,6 @@ extern List * WorkerNodeList(void); extern bool WorkerNodeActive(const char *nodeName, uint32 nodePort); extern List * ResponsiveWorkerNodeList(void); -/* Function declarations for loading into shared hash tables */ -extern void WorkerNodeRegister(void); -extern void LoadWorkerNodeList(const char *workerFilename); - /* Function declarations for worker node utilities */ extern int CompareWorkerNodes(const void *leftElement, const void *rightElement); diff --git a/src/test/regress/expected/multi_cluster_node.out b/src/test/regress/expected/multi_cluster_node.out new file mode 100644 index 000000000..0ba6a32d1 --- /dev/null +++ b/src/test/regress/expected/multi_cluster_node.out @@ -0,0 +1,68 @@ +-- Tests functions related to cluster membership +-- add the nodes to the cluster +SELECT cluster_add_node('localhost', :worker_1_port); + cluster_add_node +--------------------------- + (1,p,t,1,localhost,57637) +(1 row) + +SELECT cluster_add_node('localhost', :worker_2_port); + cluster_add_node +--------------------------- + (2,p,t,2,localhost,57638) +(1 row) + +-- get the active nodes +SELECT master_get_active_worker_nodes(); + master_get_active_worker_nodes +-------------------------------- + (localhost,57638) + (localhost,57637) +(2 rows) + +-- de-activate a node +SELECT cluster_deactivate_node('localhost', :worker_1_port); + cluster_deactivate_node +------------------------- + t +(1 row) + +-- try to add the node again when it is de-activated +SELECT cluster_add_node('localhost', :worker_1_port); + cluster_add_node +--------------------------- + (1,p,f,1,localhost,57637) +(1 row) + +-- get the active nodes again to see deactivation +SELECT master_get_active_worker_nodes(); + master_get_active_worker_nodes +-------------------------------- + (localhost,57638) +(1 row) + +-- we cannot de-activate a node that is already de-activated +SELECT cluster_deactivate_node('localhost', :worker_1_port); +ERROR: node is already deactivated +-- activate it again +SELECT cluster_activate_node('localhost', :worker_1_port); + cluster_activate_node +----------------------- + t +(1 row) + +-- try to add the node again when it is activated +SELECT cluster_add_node('localhost', :worker_1_port); + cluster_add_node +--------------------------- + (1,p,t,1,localhost,57637) +(1 row) + +-- get the active nodes +SELECT master_get_active_worker_nodes(); + master_get_active_worker_nodes +-------------------------------- + (localhost,57638) + (localhost,57637) +(2 rows) + diff --git a/src/test/regress/expected/multi_drop_extension.out b/src/test/regress/expected/multi_drop_extension.out index 82bd8a47d..80003a8bb 100644 --- a/src/test/regress/expected/multi_drop_extension.out +++ b/src/test/regress/expected/multi_drop_extension.out @@ -19,6 +19,19 @@ SET client_min_messages TO 'WARNING'; DROP EXTENSION citus CASCADE; RESET client_min_messages; CREATE EXTENSION citus; +-- re-add the nodes to the cluster +SELECT cluster_add_node('localhost', :worker_1_port); + cluster_add_node +--------------------------- + (1,p,t,1,localhost,57637) +(1 row) + +SELECT cluster_add_node('localhost', :worker_2_port); + cluster_add_node +--------------------------- + (2,p,t,2,localhost,57638) +(1 row) + -- verify that a table can be created after the extension has been dropped and recreated CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL); SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append'); diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index 59f398bb4..054dc04c2 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -65,6 +65,19 @@ SELECT * FROM pg_dist_shard_placement; -- check that the extension now can be dropped (and recreated) DROP EXTENSION citus; CREATE EXTENSION citus; +-- re-add the nodes to the cluster +SELECT cluster_add_node('localhost', :worker_1_port); + cluster_add_node +--------------------------- + (1,p,t,1,localhost,57637) +(1 row) + +SELECT cluster_add_node('localhost', :worker_2_port); + cluster_add_node +--------------------------- + (2,p,t,2,localhost,57638) +(1 row) + -- create a table with a SERIAL column CREATE TABLE testserialtable(id serial, group_id integer); SELECT master_create_distributed_table('testserialtable', 'group_id', 'hash'); diff --git a/src/test/regress/multi_binary_schedule b/src/test/regress/multi_binary_schedule index d5330f064..cb0d0498b 100644 --- a/src/test/regress/multi_binary_schedule +++ b/src/test/regress/multi_binary_schedule @@ -11,6 +11,7 @@ # Tests around schema changes, these are run first, so there's no preexisting objects. # --- test: multi_extension +test: multi_cluster_node test: multi_table_ddl # ---------- diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 4f868e506..34100ca03 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -16,6 +16,7 @@ # Tests around schema changes, these are run first, so there's no preexisting objects. # --- test: multi_extension +test: multi_cluster_node test: multi_table_ddl # ---------- diff --git a/src/test/regress/multi_task_tracker_extra_schedule b/src/test/regress/multi_task_tracker_extra_schedule index a3252b728..fedbaa610 100644 --- a/src/test/regress/multi_task_tracker_extra_schedule +++ b/src/test/regress/multi_task_tracker_extra_schedule @@ -14,6 +14,7 @@ # Tests around schema changes, these are run first, so there's no preexisting objects. # --- test: multi_extension +test: multi_cluster_node test: multi_table_ddl # ---------- diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 78e34322f..a83481e27 100644 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -168,13 +168,6 @@ for my $port (@workerPorts) or die "Could not create worker data directory"; } -# Initialize master's worker list -for my $port (@workerPorts) -{ - system("echo $host $port >> tmp_check/master/data/pg_worker_list.conf") == 0 - or die "Could not initialize master's worker list"; -} - # Routine to shutdown servers at failure/exit my $serversAreShutdown = "FALSE"; sub ShutdownServers() diff --git a/src/test/regress/sql/multi_cluster_node.sql b/src/test/regress/sql/multi_cluster_node.sql new file mode 100644 index 000000000..08eae68ed --- /dev/null +++ b/src/test/regress/sql/multi_cluster_node.sql @@ -0,0 +1,29 @@ +-- Tests functions related to cluster membership + +-- add the nodes to the cluster +SELECT cluster_add_node('localhost', :worker_1_port); +SELECT cluster_add_node('localhost', :worker_2_port); + +-- get the active nodes +SELECT master_get_active_worker_nodes(); + +-- de-activate a node +SELECT cluster_deactivate_node('localhost', :worker_1_port); + +-- try to add the node again when it is de-activated +SELECT cluster_add_node('localhost', :worker_1_port); + +-- get the active nodes again to see deactivation +SELECT master_get_active_worker_nodes(); + +-- we cannot de-activate a node that is already de-activated +SELECT cluster_deactivate_node('localhost', :worker_1_port); + +-- activate it again +SELECT cluster_activate_node('localhost', :worker_1_port); + +-- try to add the node again when it is activated +SELECT cluster_add_node('localhost', :worker_1_port); + +-- get the active nodes +SELECT master_get_active_worker_nodes(); \ No newline at end of file diff --git a/src/test/regress/sql/multi_drop_extension.sql b/src/test/regress/sql/multi_drop_extension.sql index c5320260d..2fa957357 100644 --- a/src/test/regress/sql/multi_drop_extension.sql +++ b/src/test/regress/sql/multi_drop_extension.sql @@ -21,6 +21,10 @@ RESET client_min_messages; CREATE EXTENSION citus; +-- re-add the nodes to the cluster +SELECT cluster_add_node('localhost', :worker_1_port); +SELECT cluster_add_node('localhost', :worker_2_port); + -- verify that a table can be created after the extension has been dropped and recreated CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL); SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append'); diff --git a/src/test/regress/sql/multi_table_ddl.sql b/src/test/regress/sql/multi_table_ddl.sql index 31ded4bc9..707e21657 100644 --- a/src/test/regress/sql/multi_table_ddl.sql +++ b/src/test/regress/sql/multi_table_ddl.sql @@ -45,6 +45,10 @@ SELECT * FROM pg_dist_shard_placement; DROP EXTENSION citus; CREATE EXTENSION citus; +-- re-add the nodes to the cluster +SELECT cluster_add_node('localhost', :worker_1_port); +SELECT cluster_add_node('localhost', :worker_2_port); + -- create a table with a SERIAL column CREATE TABLE testserialtable(id serial, group_id integer); SELECT master_create_distributed_table('testserialtable', 'group_id', 'hash');