Switch from pg_worker_list.conf file to pg_dist_node metadata table.

Related to #786

This change adds the `pg_dist_node` table that contains the information
about the workers in the cluster, replacing the previously used
`pg_worker_list.conf` file (or the one specified with `citus.worker_list_file`).

Upon update, `pg_worker_list.conf` file is read and `pg_dist_node` table is
populated with the file's content. After that, `pg_worker_list.conf` file
is renamed to `pg_worker_list.conf.obsolete`

For adding and removing nodes, the change also includes two new UDFs:
`master_add_node` and `master_remove_node`, which require superuser
permissions.

'citus.worker_list_file' guc is kept for update purposes but not used after the
update is finished.
pull/798/head
Brian Cloutier 2016-09-22 16:57:06 +03:00 committed by Eren Basak
parent 4fae2133f1
commit 9d6699b07c
28 changed files with 1328 additions and 485 deletions

View File

@ -8,7 +8,7 @@ EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3
6.0-1 6.0-2 6.0-3 6.0-4
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -64,6 +64,8 @@ $(EXTENSION)--6.0-2.sql: $(EXTENSION)--6.0-1.sql $(EXTENSION)--6.0-1--6.0-2.sql
cat $^ > $@
$(EXTENSION)--6.0-3.sql: $(EXTENSION)--6.0-2.sql $(EXTENSION)--6.0-2--6.0-3.sql
cat $^ > $@
$(EXTENSION)--6.0-4.sql: $(EXTENSION)--6.0-3.sql $(EXTENSION)--6.0-3--6.0-4.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,61 @@
SET search_path = 'pg_catalog';
CREATE SEQUENCE citus.pg_dist_groupid_seq
MINVALUE 1
MAXVALUE 4294967296;
CREATE SEQUENCE citus.pg_dist_node_nodeid_seq
MINVALUE 1
MAXVALUE 4294967296;
ALTER SEQUENCE citus.pg_dist_groupid_seq SET SCHEMA pg_catalog;
ALTER SEQUENCE citus.pg_dist_node_nodeid_seq SET SCHEMA pg_catalog;
/* add pg_dist_node */
CREATE TABLE citus.pg_dist_node(
nodeid int NOT NULL DEFAULT nextval('pg_dist_groupid_seq') PRIMARY KEY,
groupid int NOT NULL DEFAULT nextval('pg_dist_node_nodeid_seq'),
nodename text NOT NULL,
nodeport int NOT NULL DEFAULT 5432,
noderack text NOT NULL DEFAULT 'default',
UNIQUE (nodename, nodeport)
);
ALTER TABLE citus.pg_dist_node SET SCHEMA pg_catalog;
CREATE FUNCTION master_dist_node_cache_invalidate()
RETURNS trigger
LANGUAGE C
AS 'MODULE_PATHNAME', $$master_dist_node_cache_invalidate$$;
COMMENT ON FUNCTION master_dist_node_cache_invalidate()
IS 'invalidate internal cache of nodes when pg_dist_nodes changes';
CREATE TRIGGER dist_node_cache_invalidate
AFTER INSERT OR UPDATE OR DELETE
ON pg_catalog.pg_dist_node
FOR EACH ROW EXECUTE PROCEDURE master_dist_node_cache_invalidate();
CREATE FUNCTION master_add_node(nodename text,
nodeport integer)
RETURNS record
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_add_node$$;
COMMENT ON FUNCTION master_add_node(nodename text,
nodeport integer)
IS 'add node to the cluster';
CREATE FUNCTION master_remove_node(nodename text, nodeport integer)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_remove_node$$;
COMMENT ON FUNCTION master_remove_node(nodename text, nodeport integer)
IS 'remove node from the cluster';
/* this only needs to run once, now. */
CREATE FUNCTION master_initialize_node_metadata()
RETURNS BOOL
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_initialize_node_metadata$$;
SELECT master_initialize_node_metadata();
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '6.0-3'
default_version = '6.0-4'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -26,6 +26,7 @@
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_shard_placement.h"
#include "distributed/relay_utility.h"
#include "distributed/worker_manager.h"
#include "nodes/makefuncs.h"
#include "parser/scansup.h"
@ -199,6 +200,45 @@ ShardLength(uint64 shardId)
}
/*
* NodeHasActiveShardPlacements returns whether any active shards are placed on this node
*/
bool
NodeHasActiveShardPlacements(char *nodeName, int32 nodePort)
{
const int scanKeyCount = 3;
const bool indexOK = false;
bool hasFinalizedPlacements = false;
HeapTuple heapTuple = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[scanKeyCount];
Relation pgShardPlacement = heap_open(DistShardPlacementRelationId(),
AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_placement_nodename,
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
ScanKeyInit(&scanKey[1], Anum_pg_dist_shard_placement_nodeport,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodePort));
ScanKeyInit(&scanKey[2], Anum_pg_dist_shard_placement_shardstate,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(FILE_FINALIZED));
scanDescriptor = systable_beginscan(pgShardPlacement,
DistShardPlacementNodeidIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
hasFinalizedPlacements = HeapTupleIsValid(heapTuple);
systable_endscan(scanDescriptor);
heap_close(pgShardPlacement, AccessShareLock);
return hasFinalizedPlacements;
}
/*
* FinalizedShardPlacementList finds shard placements for the given shardId from
* system catalogs, chooses placements that are in finalized state, and returns
@ -589,6 +629,20 @@ EnsureTableOwner(Oid relationId)
}
/*
* EnsureSuperUser check that the current user is a superuser and errors out if not.
*/
void
EnsureSuperUser(void)
{
if (!superuser())
{
ereport(ERROR, (errmsg("operation is not allowed"),
errhint("Run the command with a superuser.")));
}
}
/*
* Return a table's owner as a string.
*/

View File

@ -459,7 +459,7 @@ master_get_round_robin_candidate_nodes(PG_FUNCTION_ARGS)
/*
* master_get_active_worker_nodes returns a set of active worker host names and
* port numbers in deterministic order. Currently we assume that all worker
* nodes in pg_worker_list.conf are active.
* nodes in pg_dist_node are active.
*/
Datum
master_get_active_worker_nodes(PG_FUNCTION_ARGS)

View File

@ -122,7 +122,6 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
}
targetNode = palloc0(sizeof(WorkerNode));
targetNode->inWorkerFile = true;
strlcpy(targetNode->workerName, targetPlacement->nodeName, WORKER_LENGTH);
targetNode->workerPort = targetPlacement->nodePort;

View File

@ -16,6 +16,7 @@
#include "commands/dbcommands.h"
#include "distributed/worker_manager.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h"
#include "libpq/hba.h"
#include "libpq/ip.h"
@ -30,25 +31,16 @@
/* Config variables managed via guc.c */
char *WorkerListFileName; /* location of pg_worker_list.conf */
char *WorkerListFileName;
int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size */
static HTAB *WorkerNodesHash = NULL; /* worker node hash in shared memory */
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
/* Local functions forward declarations */
static char * ClientHostAddress(StringInfo remoteHostStringInfo);
static bool OddNumber(uint32 number);
static WorkerNode * FindRandomNodeNotInList(HTAB *WorkerNodesHash,
List *currentNodeList);
static bool OddNumber(uint32 number);
static bool ListMember(List *currentList, WorkerNode *workerNode);
static Size WorkerNodeShmemSize(void);
static void WorkerNodeShmemAndWorkerListInit(void);
static uint32 WorkerNodeHashCode(const void *key, Size keySize);
static int WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize);
static List * ParseWorkerNodeFile(const char *workerNodeFilename);
static void ResetWorkerNodesHash(HTAB *WorkerNodesHash);
static bool WorkerNodeResponsive(const char *workerName, uint32 workerPort);
@ -59,14 +51,8 @@ static bool WorkerNodeResponsive(const char *workerName, uint32 workerPort);
/*
* WorkerGetRandomCandidateNode takes in a list of worker nodes, and then allocates
* a new worker node. The allocation is performed according to the following
* policy: if the list is empty, a random node is allocated; if the list has one
* node (or an odd number of nodes), the new node is allocated on a different
* rack than the first node; and if the list has two nodes (or an even number of
* nodes), the new node is allocated on the same rack as the first node, but is
* different from all the nodes in the list. This node allocation policy ensures
* that shard locality is maintained within a rack, but no single rack failure
* can result in data loss.
* a new worker node. The allocation is performed by randomly picking a worker node
* which is not in currentNodeList.
*
* Note that the function returns null if the worker membership list does not
* contain enough nodes to allocate a new worker node.
@ -79,6 +65,8 @@ WorkerGetRandomCandidateNode(List *currentNodeList)
uint32 tryCount = WORKER_RACK_TRIES;
uint32 tryIndex = 0;
HTAB *workerNodeHash = GetWorkerNodeHash();
/*
* We check if the shard has already been placed on all nodes known to us.
* This check is rather defensive, and has the drawback of performing a full
@ -94,7 +82,7 @@ WorkerGetRandomCandidateNode(List *currentNodeList)
/* if current node list is empty, randomly pick one node and return */
if (currentNodeCount == 0)
{
workerNode = FindRandomNodeNotInList(WorkerNodesHash, NIL);
workerNode = FindRandomNodeNotInList(workerNodeHash, NIL);
return workerNode;
}
@ -124,7 +112,7 @@ WorkerGetRandomCandidateNode(List *currentNodeList)
char *workerRack = NULL;
bool sameRack = false;
workerNode = FindRandomNodeNotInList(WorkerNodesHash, currentNodeList);
workerNode = FindRandomNodeNotInList(workerNodeHash, currentNodeList);
workerRack = workerNode->workerRack;
sameRack = (strncmp(workerRack, firstRack, WORKER_LENGTH) == 0);
@ -289,121 +277,64 @@ WorkerNode *
WorkerGetNodeWithName(const char *hostname)
{
WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status;
hash_seq_init(&status, WorkerNodesHash);
HTAB *workerNodeHash = GetWorkerNodeHash();
workerNode = (WorkerNode *) hash_seq_search(&status);
while (workerNode != NULL)
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
{
if (workerNode->inWorkerFile)
int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH);
if (nameCompare == 0)
{
int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH);
if (nameCompare == 0)
{
hash_seq_term(&status);
break;
}
/* we need to terminate the scan since we break */
hash_seq_term(&status);
break;
}
workerNode = (WorkerNode *) hash_seq_search(&status);
}
return workerNode;
}
/* Returns the number of live nodes in the cluster. */
/*
* WorkerGetLiveNodeCount returns the number of live nodes in the cluster.
* */
uint32
WorkerGetLiveNodeCount(void)
{
WorkerNode *workerNode = NULL;
uint32 liveWorkerCount = 0;
HASH_SEQ_STATUS status;
hash_seq_init(&status, WorkerNodesHash);
workerNode = (WorkerNode *) hash_seq_search(&status);
while (workerNode != NULL)
{
if (workerNode->inWorkerFile)
{
liveWorkerCount++;
}
workerNode = (WorkerNode *) hash_seq_search(&status);
}
HTAB *workerNodeHash = GetWorkerNodeHash();
uint32 liveWorkerCount = hash_get_num_entries(workerNodeHash);
return liveWorkerCount;
}
/* Inserts the live worker nodes to a list, and returns the list. */
/*
* WorkerNodeList iterates over the hash table that includes the worker nodes, and adds
* them to a list which is returned.
*/
List *
WorkerNodeList(void)
{
List *workerNodeList = NIL;
WorkerNode *workerNode = NULL;
HTAB *workerNodeHash = GetWorkerNodeHash();
HASH_SEQ_STATUS status;
hash_seq_init(&status, WorkerNodesHash);
workerNode = (WorkerNode *) hash_seq_search(&status);
while (workerNode != NULL)
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
{
if (workerNode->inWorkerFile)
{
workerNodeList = lappend(workerNodeList, workerNode);
}
workerNode = (WorkerNode *) hash_seq_search(&status);
WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode));
memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode));
workerNodeList = lappend(workerNodeList, workerNodeCopy);
}
return workerNodeList;
}
/*
* WorkerNodeActive looks up a worker node with the given name and port number
* in the current membership list. If such a worker node exists, the function
* returns true.
*/
bool
WorkerNodeActive(const char *nodeName, uint32 nodePort)
{
bool workerNodeActive = false;
bool handleFound = false;
WorkerNode *workerNode = NULL;
void *hashKey = NULL;
WorkerNode *searchedNode = (WorkerNode *) palloc0(sizeof(WorkerNode));
strlcpy(searchedNode->workerName, nodeName, WORKER_LENGTH);
searchedNode->workerPort = nodePort;
hashKey = (void *) searchedNode;
workerNode = (WorkerNode *) hash_search(WorkerNodesHash, hashKey,
HASH_FIND, &handleFound);
if (workerNode != NULL)
{
if (workerNode->inWorkerFile)
{
workerNodeActive = true;
}
}
return workerNodeActive;
}
/* Returns true if given number is odd; returns false otherwise. */
static bool
OddNumber(uint32 number)
{
bool oddNumber = ((number % 2) == 1);
return oddNumber;
}
/*
* FindRandomNodeNotInList finds a random node from the shared hash that is not
* a member of the current node list. The caller is responsible for making the
@ -449,7 +380,7 @@ FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList)
{
bool listMember = ListMember(currentNodeList, workerNode);
if (workerNode->inWorkerFile && !listMember)
if (!listMember)
{
lookForWorkerNode = false;
}
@ -474,6 +405,17 @@ FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList)
}
/*
* OddNumber function returns true if given number is odd; returns false otherwise.
*/
static bool
OddNumber(uint32 number)
{
bool oddNumber = ((number % 2) == 1);
return oddNumber;
}
/* Checks if given worker node is a member of the current list. */
static bool
ListMember(List *currentList, WorkerNode *workerNode)
@ -495,97 +437,6 @@ ListMember(List *currentList, WorkerNode *workerNode)
}
/* ------------------------------------------------------------
* Worker node shared hash functions follow
* ------------------------------------------------------------
*/
/* Organize, at startup, that the resources for worker node management are allocated. */
void
WorkerNodeRegister(void)
{
RequestAddinShmemSpace(WorkerNodeShmemSize());
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = WorkerNodeShmemAndWorkerListInit;
}
/* Estimates the shared memory size used for managing worker nodes. */
static Size
WorkerNodeShmemSize(void)
{
Size size = 0;
Size hashSize = 0;
hashSize = hash_estimate_size(MaxWorkerNodesTracked, sizeof(WorkerNode));
size = add_size(size, hashSize);
return size;
}
/* Initializes the shared memory used for managing worker nodes. */
static void
WorkerNodeShmemAndWorkerListInit(void)
{
HASHCTL info;
int hashFlags = 0;
long maxTableSize = 0;
long initTableSize = 0;
maxTableSize = (long) MaxWorkerNodesTracked;
initTableSize = maxTableSize / 8;
/*
* Allocate the control structure for the hash table that maps worker node
* name and port numbers (char[]:uint32) to general node membership and
* health information.
*/
memset(&info, 0, sizeof(info));
info.keysize = WORKER_LENGTH + sizeof(uint32);
info.entrysize = sizeof(WorkerNode);
info.hash = WorkerNodeHashCode;
info.match = WorkerNodeCompare;
hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
WorkerNodesHash = ShmemInitHash("Worker Node Hash",
initTableSize, maxTableSize,
&info, hashFlags);
/*
* Load the intial contents of the worker node hash table from the
* configuration file.
*/
LoadWorkerNodeList(WorkerListFileName);
if (prev_shmem_startup_hook != NULL)
{
prev_shmem_startup_hook();
}
}
/*
* WorkerNodeHashCode computes the hash code for a worker node from the node's
* host name and port number. Nodes that only differ by their rack locations
* hash to the same value.
*/
static uint32
WorkerNodeHashCode(const void *key, Size keySize)
{
const WorkerNode *worker = (const WorkerNode *) key;
const char *workerName = worker->workerName;
const uint32 *workerPort = &(worker->workerPort);
/* standard hash function outlined in Effective Java, Item 8 */
uint32 result = 17;
result = 37 * result + string_hash(workerName, WORKER_LENGTH);
result = 37 * result + tag_hash(workerPort, sizeof(uint32));
return result;
}
/*
* CompareWorkerNodes compares two pointers to worker nodes using the exact
* same logic employed by WorkerNodeCompare.
@ -609,7 +460,7 @@ CompareWorkerNodes(const void *leftElement, const void *rightElement)
* number. Two nodes that only differ by their rack locations are considered to
* be equal to each other.
*/
static int
int
WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize)
{
const WorkerNode *workerLhs = (const WorkerNode *) lhsKey;
@ -629,251 +480,6 @@ WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize)
}
/*
* LoadWorkerNodeList reads and parses given membership file, and loads worker
* nodes from this membership file into the shared hash. The function relies on
* hba.c's tokenization method for parsing, and therefore the membership file
* has the same syntax as other configuration files such as ph_hba.conf.
*
* Note that this function allows for reloading membership configuration files
* at runtime. When that happens, old worker nodes that do not appear in the
* file are marked as stale, but are still kept in the shared hash.
*/
void
LoadWorkerNodeList(const char *workerFilename)
{
List *workerList = NIL;
ListCell *workerCell = NULL;
uint32 workerCount = 0;
workerList = ParseWorkerNodeFile(workerFilename);
workerCount = list_length(workerList);
if (workerCount > MaxWorkerNodesTracked)
{
ereport(FATAL, (errcode(ERRCODE_CONFIG_FILE_ERROR),
errmsg("worker node count: %u exceeds max allowed value: %d",
workerCount, MaxWorkerNodesTracked)));
}
else
{
ereport(INFO, (errmsg("reading nodes from worker file: %s", workerFilename)));
}
/* before reading file's lines, reset worker node hash */
ResetWorkerNodesHash(WorkerNodesHash);
/* parse file lines */
foreach(workerCell, workerList)
{
WorkerNode *workerNode = NULL;
WorkerNode *parsedNode = lfirst(workerCell);
void *hashKey = NULL;
bool handleFound = false;
/*
* Search for the parsed worker node in the hash, and then insert parsed
* values. When searching, we make the hashKey point to the beginning of
* the parsed node; we previously set the key length and key comparison
* function to include both the node name and the port number.
*/
hashKey = (void *) parsedNode;
workerNode = (WorkerNode *) hash_search(WorkerNodesHash, hashKey,
HASH_ENTER, &handleFound);
if (handleFound)
{
/* display notification if worker node's rack changed */
char *oldWorkerRack = workerNode->workerRack;
char *newWorkerRack = parsedNode->workerRack;
if (strncmp(oldWorkerRack, newWorkerRack, WORKER_LENGTH) != 0)
{
ereport(INFO, (errmsg("worker node: \"%s:%u\" changed rack location",
workerNode->workerName, workerNode->workerPort)));
}
/* display warning if worker node already appeared in this file */
if (workerNode->inWorkerFile)
{
ereport(WARNING, (errmsg("multiple lines for worker node: \"%s:%u\"",
workerNode->workerName,
workerNode->workerPort)));
}
}
strlcpy(workerNode->workerName, parsedNode->workerName, WORKER_LENGTH);
strlcpy(workerNode->workerRack, parsedNode->workerRack, WORKER_LENGTH);
workerNode->workerPort = parsedNode->workerPort;
workerNode->inWorkerFile = parsedNode->inWorkerFile;
pfree(parsedNode);
}
}
/*
* ParseWorkerNodeFile opens and parses the node name and node port from the
* specified configuration file.
*/
static List *
ParseWorkerNodeFile(const char *workerNodeFilename)
{
FILE *workerFileStream = NULL;
List *workerNodeList = NIL;
char workerNodeLine[MAXPGPATH];
char *workerFilePath = make_absolute_path(workerNodeFilename);
char *workerPatternTemplate = "%%%u[^# \t]%%*[ \t]%%%u[^# \t]%%*[ \t]%%%u[^# \t]";
char workerLinePattern[1024];
const int workerNameIndex = 0;
const int workerPortIndex = 1;
memset(workerLinePattern, '\0', sizeof(workerLinePattern));
workerFileStream = AllocateFile(workerFilePath, PG_BINARY_R);
if (workerFileStream == NULL)
{
if (errno == ENOENT)
{
ereport(DEBUG1, (errmsg("worker list file located at \"%s\" is not present",
workerFilePath)));
}
else
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not open worker list file \"%s\": %m",
workerFilePath)));
}
return NIL;
}
/* build pattern to contain node name length limit */
snprintf(workerLinePattern, sizeof(workerLinePattern), workerPatternTemplate,
WORKER_LENGTH, MAX_PORT_LENGTH, WORKER_LENGTH);
while (fgets(workerNodeLine, sizeof(workerNodeLine), workerFileStream) != NULL)
{
const int workerLineLength = strnlen(workerNodeLine, MAXPGPATH);
WorkerNode *workerNode = NULL;
char *linePointer = NULL;
int32 nodePort = PostPortNumber; /* default port number */
int fieldCount = 0;
bool lineIsInvalid = false;
char nodeName[WORKER_LENGTH + 1];
char nodeRack[WORKER_LENGTH + 1];
char nodePortString[MAX_PORT_LENGTH + 1];
memset(nodeName, '\0', sizeof(nodeName));
strlcpy(nodeRack, WORKER_DEFAULT_RACK, sizeof(nodeRack));
memset(nodePortString, '\0', sizeof(nodePortString));
if (workerLineLength == MAXPGPATH - 1)
{
ereport(ERROR, (errcode(ERRCODE_CONFIG_FILE_ERROR),
errmsg("worker node list file line exceeds the maximum "
"length of %d", MAXPGPATH)));
}
/* trim trailing newlines preserved by fgets, if any */
linePointer = workerNodeLine + workerLineLength - 1;
while (linePointer >= workerNodeLine &&
(*linePointer == '\n' || *linePointer == '\r'))
{
*linePointer-- = '\0';
}
/* skip leading whitespace */
for (linePointer = workerNodeLine; *linePointer; linePointer++)
{
if (!isspace((unsigned char) *linePointer))
{
break;
}
}
/* if the entire line is whitespace or a comment, skip it */
if (*linePointer == '\0' || *linePointer == '#')
{
continue;
}
/* parse line; node name is required, but port and rack are optional */
fieldCount = sscanf(linePointer, workerLinePattern,
nodeName, nodePortString, nodeRack);
/* adjust field count for zero based indexes */
fieldCount--;
/* raise error if no fields were assigned */
if (fieldCount < workerNameIndex)
{
lineIsInvalid = true;
}
/* no special treatment for nodeName: already parsed by sscanf */
/* if a second token was specified, convert to integer port */
if (fieldCount >= workerPortIndex)
{
char *nodePortEnd = NULL;
errno = 0;
nodePort = strtol(nodePortString, &nodePortEnd, 10);
if (errno != 0 || (*nodePortEnd) != '\0' || nodePort <= 0)
{
lineIsInvalid = true;
}
}
if (lineIsInvalid)
{
ereport(ERROR, (errcode(ERRCODE_CONFIG_FILE_ERROR),
errmsg("could not parse worker node line: %s",
workerNodeLine),
errhint("Lines in the worker node file must contain a valid "
"node name and, optionally, a positive port number. "
"Comments begin with a '#' character and extend to "
"the end of their line.")));
}
/* allocate worker node structure and set fields */
workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode));
strlcpy(workerNode->workerName, nodeName, WORKER_LENGTH);
strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH);
workerNode->workerPort = nodePort;
workerNode->inWorkerFile = true;
workerNodeList = lappend(workerNodeList, workerNode);
}
FreeFile(workerFileStream);
free(workerFilePath);
return workerNodeList;
}
/* Marks all worker nodes in the shared hash as stale. */
static void
ResetWorkerNodesHash(HTAB *WorkerNodesHash)
{
WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status;
hash_seq_init(&status, WorkerNodesHash);
workerNode = (WorkerNode *) hash_seq_search(&status);
while (workerNode != NULL)
{
workerNode->inWorkerFile = false;
workerNode = (WorkerNode *) hash_seq_search(&status);
}
}
/* ResponsiveWorkerNodeList returns a list of all responsive worker nodes */
List *
ResponsiveWorkerNodeList(void)

View File

@ -5166,11 +5166,11 @@ ActivePlacementList(List *placementList)
foreach(placementCell, placementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
bool workerNodeActive = false;
WorkerNode *workerNode = NULL;
/* check if the worker node for this shard placement is active */
workerNodeActive = WorkerNodeActive(placement->nodeName, placement->nodePort);
if (workerNodeActive)
workerNode = FindWorkerNode(placement->nodeName, placement->nodePort);
if (workerNode != NULL)
{
activePlacementList = lappend(activePlacementList, placement);
}

View File

@ -150,9 +150,6 @@ _PG_init(void)
/* organize that task tracker is started once server is up */
TaskTrackerRegister();
/* initialize worker node manager */
WorkerNodeRegister();
/* initialize transaction callbacks */
RegisterRouterExecutorXactCallbacks();
RegisterShardPlacementXactCallbacks();
@ -193,6 +190,7 @@ CreateRequiredDirectories(void)
static void
RegisterCitusConfigVariables(void)
{
/* keeping temporarily for updates from pre-6.0 versions */
DefineCustomStringVariable(
"citus.worker_list_file",
gettext_noop("Sets the server's \"worker_list\" configuration file."),
@ -200,7 +198,7 @@ RegisterCitusConfigVariables(void)
&WorkerListFileName,
NULL,
PGC_POSTMASTER,
GUC_SUPERUSER_ONLY,
GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
NormalizeWorkerListPath();

View File

@ -26,9 +26,11 @@
#include "distributed/colocation_utils.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/pg_dist_node.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "parser/parse_func.h"
#include "utils/builtins.h"
@ -49,17 +51,23 @@
static bool extensionLoaded = false;
static Oid distShardRelationId = InvalidOid;
static Oid distShardPlacementRelationId = InvalidOid;
static Oid distNodeRelationId = InvalidOid;
static Oid distPartitionRelationId = InvalidOid;
static Oid distPartitionLogicalRelidIndexId = InvalidOid;
static Oid distPartitionColocationidIndexId = InvalidOid;
static Oid distShardLogicalRelidIndexId = InvalidOid;
static Oid distShardShardidIndexId = InvalidOid;
static Oid distShardPlacementShardidIndexId = InvalidOid;
static Oid distShardPlacementNodeidIndexId = InvalidOid;
static Oid extraDataContainerFuncId = InvalidOid;
/* Hash table for informations about each partition */
static HTAB *DistTableCacheHash = NULL;
/* Hash table for informations about worker nodes */
static HTAB *WorkerNodeHash = NULL;
static bool workerNodeHashValid = false;
/* built first time through in InitializePartitionCache */
static ScanKeyData DistPartitionScanKey[1];
static ScanKeyData DistShardScanKey[1];
@ -78,8 +86,11 @@ static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
int shardCount);
static void InitializeDistTableCache(void);
static void InitializeWorkerNodeCache(void);
static uint32 WorkerNodeHashCode(const void *key, Size keySize);
static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId);
static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId);
static List * LookupDistShardTuples(Oid relationId);
static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
@ -93,6 +104,7 @@ static void CachedRelationLookup(const char *relationName, Oid *cachedOid);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_dist_partition_cache_invalidate);
PG_FUNCTION_INFO_V1(master_dist_shard_cache_invalidate);
PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate);
/*
@ -616,6 +628,16 @@ DistShardPlacementRelationId(void)
}
/* return oid of pg_dist_node relation */
Oid
DistNodeRelationId(void)
{
CachedRelationLookup("pg_dist_node", &distNodeRelationId);
return distNodeRelationId;
}
/* return oid of pg_dist_partition relation */
Oid
DistPartitionRelationId(void)
@ -680,6 +702,17 @@ DistShardPlacementShardidIndexId(void)
}
/* return oid of pg_dist_shard_placement_nodeid_index */
Oid
DistShardPlacementNodeidIndexId(void)
{
CachedRelationLookup("pg_dist_shard_placement_nodeid_index",
&distShardPlacementNodeidIndexId);
return distShardPlacementNodeidIndexId;
}
/* return oid of the citus_extradata_container(internal) function */
Oid
CitusExtraDataContainerFuncId(void)
@ -896,6 +929,29 @@ master_dist_shard_cache_invalidate(PG_FUNCTION_ARGS)
}
/*
* master_dist_node_cache_invalidate is a trigger function that performs
* relcache invalidations when the contents of pg_dist_node are changed
* on the SQL level.
*
* NB: We decided there is little point in checking permissions here, there
* are much easier ways to waste CPU than causing cache invalidations.
*/
Datum
master_dist_node_cache_invalidate(PG_FUNCTION_ARGS)
{
if (!CALLED_AS_TRIGGER(fcinfo))
{
ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
errmsg("must be called as trigger")));
}
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
PG_RETURN_DATUM(PointerGetDatum(NULL));
}
/* initialize the infrastructure for the metadata cache */
static void
InitializeDistTableCache(void)
@ -944,6 +1000,133 @@ InitializeDistTableCache(void)
}
/*
* GetWorkerNodeHash is a wrapper around InitializeWorkerNodeCache(). It
* triggers InitializeWorkerNodeCache when the workerHash is invalid. Otherwise,
* it returns the hash.
*/
HTAB *
GetWorkerNodeHash(void)
{
if (!workerNodeHashValid)
{
InitializeWorkerNodeCache();
workerNodeHashValid = true;
}
return WorkerNodeHash;
}
/*
* InitializeWorkerNodeCache initialize the infrastructure for the worker node cache.
* The function reads the worker nodes from the metadata table, adds them to the hash and
* finally registers an invalidation callback.
*/
static void
InitializeWorkerNodeCache(void)
{
static bool invalidationRegistered = false;
HTAB *oldWorkerNodeHash = NULL;
List *workerNodeList = NIL;
ListCell *workerNodeCell = NULL;
HASHCTL info;
int hashFlags = 0;
long maxTableSize = (long) MaxWorkerNodesTracked;
/* make sure we've initialized CacheMemoryContext */
if (CacheMemoryContext == NULL)
{
CreateCacheMemoryContext();
}
/*
* Create the hash that holds the worker nodes. The key is the combination of
* nodename and nodeport, instead of the unique nodeid because worker nodes are
* searched by the nodename and nodeport in every physical plan creation.
*/
memset(&info, 0, sizeof(info));
info.keysize = +sizeof(uint32) + WORKER_LENGTH + sizeof(uint32);
info.entrysize = sizeof(WorkerNode);
info.hcxt = CacheMemoryContext;
info.hash = WorkerNodeHashCode;
info.match = WorkerNodeCompare;
hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE;
oldWorkerNodeHash = WorkerNodeHash;
WorkerNodeHash = hash_create("Worker Node Hash",
maxTableSize,
&info, hashFlags);
/* read the list from pg_dist_node */
workerNodeList = ReadWorkerNodes();
/* iterate over the worker node list */
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = NULL;
WorkerNode *currentNode = lfirst(workerNodeCell);
void *hashKey = NULL;
bool handleFound = false;
/* search for the worker node in the hash, and then insert the values */
hashKey = (void *) currentNode;
workerNode = (WorkerNode *) hash_search(WorkerNodeHash, hashKey,
HASH_ENTER, &handleFound);
/* fill the newly allocated workerNode in the cache */
strlcpy(workerNode->workerName, currentNode->workerName, WORKER_LENGTH);
workerNode->workerPort = currentNode->workerPort;
workerNode->groupId = currentNode->groupId;
strlcpy(workerNode->workerRack, currentNode->workerRack, WORKER_LENGTH);
if (handleFound)
{
ereport(WARNING, (errmsg("multiple lines for worker node: \"%s:%u\"",
workerNode->workerName,
workerNode->workerPort)));
}
/* we do not need the currentNode anymore */
pfree(currentNode);
}
/* now, safe to destroy the old hash */
hash_destroy(oldWorkerNodeHash);
/* prevent multiple invalidation registrations */
if (!invalidationRegistered)
{
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(InvalidateNodeRelationCacheCallback,
(Datum) 0);
invalidationRegistered = true;
}
}
/*
* WorkerNodeHashCode computes the hash code for a worker node from the node's
* host name and port number. Nodes that only differ by their rack locations
* hash to the same value.
*/
static uint32
WorkerNodeHashCode(const void *key, Size keySize)
{
const WorkerNode *worker = (const WorkerNode *) key;
const char *workerName = worker->workerName;
const uint32 *workerPort = &(worker->workerPort);
/* standard hash function outlined in Effective Java, Item 8 */
uint32 result = 17;
result = 37 * result + string_hash(workerName, WORKER_LENGTH);
result = 37 * result + tag_hash(workerPort, sizeof(uint32));
return result;
}
/*
* ResetDistTableCacheEntry frees any out-of-band memory used by a cache entry,
* but does not free the entry itself.
@ -1051,11 +1234,28 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
distShardLogicalRelidIndexId = InvalidOid;
distShardShardidIndexId = InvalidOid;
distShardPlacementShardidIndexId = InvalidOid;
distNodeRelationId = InvalidOid;
extraDataContainerFuncId = InvalidOid;
}
}
/*
* InvalidateNodeRelationCacheCallback destroys the WorkerNodeHash when
* any change happens on pg_dist_node table. It also set WorkerNodeHash to
* NULL, which allows consequent accesses to the hash read from the
* pg_dist_node from scratch.
*/
static void
InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId)
{
if (relationId == InvalidOid || relationId == distNodeRelationId)
{
workerNodeHashValid = false;
}
}
/*
* LookupDistPartitionTuple searches pg_dist_partition for relationId's entry
* and returns that or, if no matching entry was found, NULL.

View File

@ -0,0 +1,665 @@
/*
* node_metadata.c
* Functions that operate on pg_dist_node
*
* Copyright (c) 2012-2016, Citus Data, Inc.
*/
#include "postgres.h"
#include "miscadmin.h"
#include "funcapi.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/htup_details.h"
#include "access/skey.h"
#if (PG_VERSION_NUM >= 90500 && PG_VERSION_NUM < 90600)
#include "access/stratnum.h"
#else
#include "access/skey.h"
#endif
#include "access/tupmacs.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "commands/sequence.h"
#include "distributed/master_protocol.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/pg_dist_node.h"
#include "distributed/worker_manager.h"
#include "lib/stringinfo.h"
#include "storage/lock.h"
#include "storage/fd.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
#include "utils/relcache.h"
/* default group size */
int GroupSize = 1;
/* local function forward declarations */
static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId,
char *nodeRack);
static Datum GenerateNodeTuple(WorkerNode *workerNode);
static int32 GetNextGroupId(void);
static uint32 GetMaxGroupId(void);
static int GetNextNodeId(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId,
char *nodeRack);
static void DeleteNodeRow(char *nodename, int32 nodeport);
static List * ParseWorkerNodeFileAndRename(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(master_add_node);
PG_FUNCTION_INFO_V1(master_remove_node);
PG_FUNCTION_INFO_V1(master_initialize_node_metadata);
/*
* master_add_node function adds a new node to the cluster and returns its data.
*/
Datum
master_add_node(PG_FUNCTION_ARGS)
{
text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
int32 groupId = 0;
char *nodeRack = WORKER_DEFAULT_RACK;
Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack);
PG_RETURN_CSTRING(returnData);
}
/*
* master_remove_node function removes the provided node from the pg_dist_node table.
* The call to the master_remove_node should be done by the super user and the specified
* node should not have any active placements.
*/
Datum
master_remove_node(PG_FUNCTION_ARGS)
{
text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
bool hasShardPlacements = false;
EnsureSuperUser();
hasShardPlacements = NodeHasActiveShardPlacements(nodeNameString, nodePort);
if (hasShardPlacements)
{
ereport(ERROR, (errmsg("you cannot remove a node which has active "
"shard placements")));
}
DeleteNodeRow(nodeNameString, nodePort);
PG_RETURN_VOID();
}
/*
* master_initialize_node_metadata is run once, when upgrading citus. It injests the
* existing pg_worker_list.conf into pg_dist_node, then adds a header to the file stating
* that it's no longer used.
*/
Datum
master_initialize_node_metadata(PG_FUNCTION_ARGS)
{
ListCell *workerNodeCell = NULL;
List *workerNodes = ParseWorkerNodeFileAndRename();
foreach(workerNodeCell, workerNodes)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0,
workerNode->workerRack);
}
PG_RETURN_BOOL(true);
}
/*
* FindWorkerNode searches over the worker nodes and returns the workerNode
* if it already exists. Else, the function returns NULL.
*/
WorkerNode *
FindWorkerNode(char *nodeName, int32 nodePort)
{
WorkerNode *workerNode = NULL;
HTAB *workerNodeHash = GetWorkerNodeHash();
bool handleFound = false;
void *hashKey = NULL;
WorkerNode *searchedNode = (WorkerNode *) palloc0(sizeof(WorkerNode));
strlcpy(searchedNode->workerName, nodeName, WORKER_LENGTH);
searchedNode->workerPort = nodePort;
hashKey = (void *) searchedNode;
workerNode = (WorkerNode *) hash_search(workerNodeHash, hashKey,
HASH_FIND, &handleFound);
return workerNode;
}
/*
* ReadWorkerNodes iterates over pg_dist_node table, converts each row
* into it's memory representation (i.e., WorkerNode) and adds them into
* a list. Lastly, the list is returned to the caller.
*/
List *
ReadWorkerNodes()
{
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 0;
HeapTuple heapTuple = NULL;
List *workerNodeList = NIL;
TupleDesc tupleDescriptor = NULL;
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
scanDescriptor = systable_beginscan(pgDistNode,
InvalidOid, false,
NULL, scanKeyCount, scanKey);
tupleDescriptor = RelationGetDescr(pgDistNode);
heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
WorkerNode *workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);
workerNodeList = lappend(workerNodeList, workerNode);
heapTuple = systable_getnext(scanDescriptor);
}
systable_endscan(scanDescriptor);
heap_close(pgDistNode, AccessExclusiveLock);
return workerNodeList;
}
/*
* AddNodeMetadata checks the given node information and adds the specified node to the
* pg_dist_node table. If the node already exists, the function returns with the
* information about the node. If not, the following prodecure is followed while adding a
* node.
* If the groupId is not explicitly given by the user, the function picks the
* group that the new node should be in with respect to GroupSize. Then, the
* new node is inserted into the local pg_dist_node.
*/
static Datum
AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack)
{
Relation pgDistNode = NULL;
int nextNodeIdInt = 0;
Datum returnData = 0;
WorkerNode *workerNode = NULL;
EnsureSuperUser();
/* acquire a lock so that no one can do this concurrently */
pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
/* check if the node already exists in the cluster */
workerNode = FindWorkerNode(nodeName, nodePort);
if (workerNode != NULL)
{
/* fill return data and return */
returnData = GenerateNodeTuple(workerNode);
/* close the heap */
heap_close(pgDistNode, AccessExclusiveLock);
PG_RETURN_DATUM(returnData);
}
/* user lets Citus to decide on the group that the newly added node should be in */
if (groupId == 0)
{
groupId = GetNextGroupId();
}
else
{
uint maxGroupId = GetMaxGroupId();
if (groupId > maxGroupId)
{
ereport(ERROR, (errmsg("you cannot add a node to a non-existing group")));
}
}
/* generate the new node id from the sequence */
nextNodeIdInt = GetNextNodeId();
InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack);
heap_close(pgDistNode, AccessExclusiveLock);
/* fetch the worker node, and generate the output */
workerNode = FindWorkerNode(nodeName, nodePort);
returnData = GenerateNodeTuple(workerNode);
return returnData;
}
/*
* GenerateNodeTuple gets a worker node and return a heap tuple of
* given worker node.
*/
static Datum
GenerateNodeTuple(WorkerNode *workerNode)
{
Relation pgDistNode = NULL;
TupleDesc tupleDescriptor = NULL;
HeapTuple heapTuple = NULL;
Datum nodeDatum = 0;
Datum values[Natts_pg_dist_node];
bool isNulls[Natts_pg_dist_node];
/* form new shard tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(workerNode->nodeId);
values[Anum_pg_dist_node_groupid - 1] = UInt32GetDatum(workerNode->groupId);
values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(workerNode->workerName);
values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(workerNode->workerPort);
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(workerNode->workerRack);
/* open shard relation and insert new tuple */
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
/* generate the tuple */
tupleDescriptor = RelationGetDescr(pgDistNode);
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
nodeDatum = HeapTupleGetDatum(heapTuple);
/* close the relation */
heap_close(pgDistNode, AccessShareLock);
return nodeDatum;
}
/*
* GetNextGroupId allocates and returns a unique groupId for the group
* to be created. This allocation occurs both in shared memory and in write
* ahead logs; writing to logs avoids the risk of having groupId collisions.
*
* Please note that the caller is still responsible for finalizing node data
* and the groupId with the master node. Further note that this function relies
* on an internal sequence created in initdb to generate unique identifiers.
*/
int32
GetNextGroupId()
{
text *sequenceName = cstring_to_text(GROUPID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
Datum groupIdDatum = 0;
int32 groupId = 0;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique shardId from sequence */
groupIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
groupId = DatumGetUInt32(groupIdDatum);
return groupId;
}
/*
* GetMaxGroupId iterates over the worker node hash, and returns the maximum
* group id from the table.
*/
static uint32
GetMaxGroupId()
{
uint32 maxGroupId = 0;
WorkerNode *workerNode = NULL;
HTAB *workerNodeHash = GetWorkerNodeHash();
HASH_SEQ_STATUS status;
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
{
uint32 workerNodeGroupId = workerNode->groupId;
if (workerNodeGroupId > maxGroupId)
{
maxGroupId = workerNodeGroupId;
}
}
return maxGroupId;
}
/*
* GetNextNodeId allocates and returns a unique nodeId for the node
* to be added. This allocation occurs both in shared memory and in write
* ahead logs; writing to logs avoids the risk of having nodeId collisions.
*
* Please note that the caller is still responsible for finalizing node data
* and the nodeId with the master node. Further note that this function relies
* on an internal sequence created in initdb to generate unique identifiers.
*/
int
GetNextNodeId()
{
text *sequenceName = cstring_to_text(NODEID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
Datum nextNodedIdDatum = 0;
int nextNodeId = 0;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique shardId from sequence */
nextNodedIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
PG_RETURN_DATUM(nextNodedIdDatum);
nextNodeId = DatumGetUInt32(nextNodeId);
return nextNodeId;
}
/*
* InsertNodedRow opens the node system catalog, and inserts a new row with the
* given values into that system catalog.
*/
static void
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack)
{
Relation pgDistNode = NULL;
TupleDesc tupleDescriptor = NULL;
HeapTuple heapTuple = NULL;
Datum values[Natts_pg_dist_node];
bool isNulls[Natts_pg_dist_node];
/* form new shard tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid);
values[Anum_pg_dist_node_groupid - 1] = UInt32GetDatum(groupId);
values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName);
values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort);
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack);
/* open shard relation and insert new tuple */
pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistNode);
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
simple_heap_insert(pgDistNode, heapTuple);
CatalogUpdateIndexes(pgDistNode, heapTuple);
/* close relation and invalidate previous cache entry */
heap_close(pgDistNode, AccessExclusiveLock);
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
/* increment the counter so that next command can see the row */
CommandCounterIncrement();
}
/*
* DeleteNodeRow removes the requested row from pg_dist_node table if it exists.
*/
static void
DeleteNodeRow(char *nodeName, int32 nodePort)
{
const int scanKeyCount = 2;
bool indexOK = false;
HeapTuple heapTuple = NULL;
SysScanDesc heapScan = NULL;
ScanKeyData scanKey[scanKeyCount];
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename,
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport,
BTEqualStrategyNumber, F_INT8EQ, Int32GetDatum(nodePort));
heapScan = systable_beginscan(pgDistNode, InvalidOid, indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(heapScan);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
nodeName, nodePort)));
}
simple_heap_delete(pgDistNode, &(heapTuple->t_self));
systable_endscan(heapScan);
heap_close(pgDistNode, AccessExclusiveLock);
/* ensure future commands don't use the node we just removed */
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
/* increment the counter so that next command won't see the row */
CommandCounterIncrement();
}
/*
* ParseWorkerNodeFileAndRename opens and parses the node name and node port from the
* specified configuration file and after that, renames it marking it is not used anymore.
* Note that this function is deprecated. Do not use this function for any new
* features.
*/
static List *
ParseWorkerNodeFileAndRename()
{
FILE *workerFileStream = NULL;
List *workerNodeList = NIL;
char workerNodeLine[MAXPGPATH];
char *workerFilePath = make_absolute_path(WorkerListFileName);
StringInfo renamedWorkerFilePath = makeStringInfo();
char *workerPatternTemplate = "%%%u[^# \t]%%*[ \t]%%%u[^# \t]%%*[ \t]%%%u[^# \t]";
char workerLinePattern[1024];
const int workerNameIndex = 0;
const int workerPortIndex = 1;
memset(workerLinePattern, '\0', sizeof(workerLinePattern));
workerFileStream = AllocateFile(workerFilePath, PG_BINARY_R);
if (workerFileStream == NULL)
{
if (errno == ENOENT)
{
ereport(DEBUG1, (errmsg("worker list file located at \"%s\" is not present",
workerFilePath)));
}
else
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not open worker list file \"%s\": %m",
workerFilePath)));
}
return NIL;
}
/* build pattern to contain node name length limit */
snprintf(workerLinePattern, sizeof(workerLinePattern), workerPatternTemplate,
WORKER_LENGTH, MAX_PORT_LENGTH, WORKER_LENGTH);
while (fgets(workerNodeLine, sizeof(workerNodeLine), workerFileStream) != NULL)
{
const int workerLineLength = strnlen(workerNodeLine, MAXPGPATH);
WorkerNode *workerNode = NULL;
char *linePointer = NULL;
int32 nodePort = 5432; /* default port number */
int fieldCount = 0;
bool lineIsInvalid = false;
char nodeName[WORKER_LENGTH + 1];
char nodeRack[WORKER_LENGTH + 1];
char nodePortString[MAX_PORT_LENGTH + 1];
memset(nodeName, '\0', sizeof(nodeName));
strlcpy(nodeRack, WORKER_DEFAULT_RACK, sizeof(nodeRack));
memset(nodePortString, '\0', sizeof(nodePortString));
if (workerLineLength == MAXPGPATH - 1)
{
ereport(ERROR, (errcode(ERRCODE_CONFIG_FILE_ERROR),
errmsg("worker node list file line exceeds the maximum "
"length of %d", MAXPGPATH)));
}
/* trim trailing newlines preserved by fgets, if any */
linePointer = workerNodeLine + workerLineLength - 1;
while (linePointer >= workerNodeLine &&
(*linePointer == '\n' || *linePointer == '\r'))
{
*linePointer-- = '\0';
}
/* skip leading whitespace */
for (linePointer = workerNodeLine; *linePointer; linePointer++)
{
if (!isspace((unsigned char) *linePointer))
{
break;
}
}
/* if the entire line is whitespace or a comment, skip it */
if (*linePointer == '\0' || *linePointer == '#')
{
continue;
}
/* parse line; node name is required, but port and rack are optional */
fieldCount = sscanf(linePointer, workerLinePattern,
nodeName, nodePortString, nodeRack);
/* adjust field count for zero based indexes */
fieldCount--;
/* raise error if no fields were assigned */
if (fieldCount < workerNameIndex)
{
lineIsInvalid = true;
}
/* no special treatment for nodeName: already parsed by sscanf */
/* if a second token was specified, convert to integer port */
if (fieldCount >= workerPortIndex)
{
char *nodePortEnd = NULL;
errno = 0;
nodePort = strtol(nodePortString, &nodePortEnd, 10);
if (errno != 0 || (*nodePortEnd) != '\0' || nodePort <= 0)
{
lineIsInvalid = true;
}
}
if (lineIsInvalid)
{
ereport(ERROR, (errcode(ERRCODE_CONFIG_FILE_ERROR),
errmsg("could not parse worker node line: %s",
workerNodeLine),
errhint("Lines in the worker node file must contain a valid "
"node name and, optionally, a positive port number. "
"Comments begin with a '#' character and extend to "
"the end of their line.")));
}
/* allocate worker node structure and set fields */
workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode));
strlcpy(workerNode->workerName, nodeName, WORKER_LENGTH);
strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH);
workerNode->workerPort = nodePort;
workerNodeList = lappend(workerNodeList, workerNode);
}
FreeFile(workerFileStream);
free(workerFilePath);
/* rename the file, marking that it is not used anymore */
appendStringInfo(renamedWorkerFilePath, "%s", workerFilePath);
appendStringInfo(renamedWorkerFilePath, ".obsolete");
rename(workerFilePath, renamedWorkerFilePath->data);
return workerNodeList;
}
/*
* TupleToWorkerNode takes in a heap tuple from pg_dist_node, and
* converts this tuple to an equivalent struct in memory. The function assumes
* the caller already has locks on the tuple, and doesn't perform any locking.
*/
static WorkerNode *
TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
{
WorkerNode *workerNode = NULL;
bool isNull = false;
Datum nodeId = heap_getattr(heapTuple, Anum_pg_dist_node_nodeid,
tupleDescriptor, &isNull);
Datum groupId = heap_getattr(heapTuple, Anum_pg_dist_node_groupid,
tupleDescriptor, &isNull);
Datum nodeName = heap_getattr(heapTuple, Anum_pg_dist_node_nodename,
tupleDescriptor, &isNull);
Datum nodePort = heap_getattr(heapTuple, Anum_pg_dist_node_nodeport,
tupleDescriptor, &isNull);
Datum nodeRack = heap_getattr(heapTuple, Anum_pg_dist_node_noderack,
tupleDescriptor, &isNull);
Assert(!HeapTupleHasNulls(heapTuple));
workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode));
workerNode->nodeId = DatumGetUInt32(nodeId);
workerNode->workerPort = DatumGetUInt32(nodePort);
workerNode->groupId = DatumGetUInt32(groupId);
strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH);
strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH);
return workerNode;
}

View File

@ -235,14 +235,6 @@ TaskTrackerMain(Datum main_arg)
/* reload postgres configuration files */
ProcessConfigFile(PGC_SIGHUP);
/*
* Reload worker membership file. For now we do that in the task
* tracker because that's currently the only background worker in
* Citus. And only background workers allow us to safely
* register a SIGHUP handler.
*/
LoadWorkerNodeList(WorkerListFileName);
}
if (got_SIGTERM)
{

View File

@ -62,6 +62,7 @@ extern int ShardIntervalCount(Oid relationId);
extern List * LoadShardList(Oid relationId);
extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval);
extern uint64 ShardLength(uint64 shardId);
extern bool NodeHasActiveShardPlacements(char *nodeName, int32 nodePort);
extern List * FinalizedShardPlacementList(uint64 shardId);
extern List * ShardPlacementList(uint64 shardId);
extern ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc,
@ -81,5 +82,6 @@ extern Node * BuildDistributionKeyFromColumnName(Relation distributedRelation,
extern char * TableOwner(Oid relationId);
extern void EnsureTablePermissions(Oid relationId, AclMode mode);
extern void EnsureTableOwner(Oid relationId);
extern void EnsureSuperUser(void);
#endif /* MASTER_METADATA_UTILITY_H */

View File

@ -14,6 +14,8 @@
#include "fmgr.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/worker_manager.h"
#include "utils/hsearch.h"
/*
@ -54,13 +56,18 @@ extern bool IsDistributedTable(Oid relationId);
extern ShardInterval * LoadShardInterval(uint64 shardId);
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateNodeCache(void);
extern bool CitusHasBeenLoaded(void);
/* access WorkerNodeHash */
extern HTAB * GetWorkerNodeHash(void);
/* relation oids */
extern Oid DistPartitionRelationId(void);
extern Oid DistShardRelationId(void);
extern Oid DistShardPlacementRelationId(void);
extern Oid DistNodeRelationId(void);
/* index oids */
extern Oid DistPartitionLogicalRelidIndexId(void);
@ -68,6 +75,7 @@ extern Oid DistPartitionColocationidIndexId(void);
extern Oid DistShardLogicalRelidIndexId(void);
extern Oid DistShardShardidIndexId(void);
extern Oid DistShardPlacementShardidIndexId(void);
extern Oid DistShardPlacementNodeidIndexId(void);
/* function oids */
extern Oid CitusExtraDataContainerFuncId(void);

View File

@ -0,0 +1,49 @@
/*-------------------------------------------------------------------------
*
* pg_dist_node.h
* definition of the relation that holds the nodes on the cluster (pg_dist_node).
*
* Copyright (c) 2012-2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_DIST_NODE_H
#define PG_DIST_NODE_H
/* ----------------
* pg_dist_node definition.
* ----------------
*/
typedef struct FormData_pg_dist_node
{
int nodeid;
int groupid;
#ifdef CATALOG_VARLEN
text nodename;
int nodeport;
#endif
} FormData_pg_dist_node;
/* ----------------
* Form_pg_dist_partitions corresponds to a pointer to a tuple with
* the format of pg_dist_partitions relation.
* ----------------
*/
typedef FormData_pg_dist_node *Form_pg_dist_node;
/* ----------------
* compiler constants for pg_dist_node
* ----------------
*/
#define Natts_pg_dist_node 5
#define Anum_pg_dist_node_nodeid 1
#define Anum_pg_dist_node_groupid 2
#define Anum_pg_dist_node_nodename 3
#define Anum_pg_dist_node_nodeport 4
#define Anum_pg_dist_node_noderack 5
#define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq"
#define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq"
#endif /* PG_DIST_NODE_H */

View File

@ -32,22 +32,16 @@
/*
* WorkerNode keeps shared memory state for active and temporarily failed worker
* nodes. Permanently failed or departed nodes on the other hand are eventually
* purged from the shared hash. In the current implementation, the distinction
* between active, temporarily failed, and permanently departed nodes is made
* based on the node's presence in the membership file; only nodes in this file
* appear in the shared hash. In the future, worker nodes will report their
* health status to the master via heartbeats, and these heartbeats along with
* membership information will be used to determine a worker node's liveliness.
* In memory representation of pg_dist_node table elements. The elements are hold in
* WorkerNodeHash table.
*/
typedef struct WorkerNode
{
uint32 workerPort; /* node's port; part of hash table key */
char workerName[WORKER_LENGTH]; /* node's name; part of hash table key */
char workerRack[WORKER_LENGTH]; /* node's network location */
bool inWorkerFile; /* is node in current membership file? */
uint32 nodeId; /* node's unique id, key of the hash table */
uint32 workerPort; /* node's port */
char workerName[WORKER_LENGTH]; /* node's name */
uint32 groupId; /* node's groupId; same for the nodes that are in the same group */
char workerRack[WORKER_LENGTH]; /* node's network location */
} WorkerNode;
@ -65,14 +59,12 @@ extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList);
extern WorkerNode * WorkerGetNodeWithName(const char *hostname);
extern uint32 WorkerGetLiveNodeCount(void);
extern List * WorkerNodeList(void);
extern bool WorkerNodeActive(const char *nodeName, uint32 nodePort);
extern List * ResponsiveWorkerNodeList(void);
/* Function declarations for loading into shared hash tables */
extern void WorkerNodeRegister(void);
extern void LoadWorkerNodeList(const char *workerFilename);
extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort);
extern List * ReadWorkerNodes(void);
/* Function declarations for worker node utilities */
extern int CompareWorkerNodes(const void *leftElement, const void *rightElement);
extern int WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize);
#endif /* WORKER_MANAGER_H */

View File

@ -0,0 +1,134 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000;
-- Tests functions related to cluster membership
-- add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-------------------------------
(1,1,localhost,57637,default)
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-------------------------------
(2,2,localhost,57638,default)
(1 row)
-- get the active nodes
SELECT master_get_active_worker_nodes();
master_get_active_worker_nodes
--------------------------------
(localhost,57638)
(localhost,57637)
(2 rows)
-- try to add the node again when it is activated
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-------------------------------
(1,1,localhost,57637,default)
(1 row)
-- get the active nodes
SELECT master_get_active_worker_nodes();
master_get_active_worker_nodes
--------------------------------
(localhost,57638)
(localhost,57637)
(2 rows)
-- try to remove a node (with no placements)
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
-- verify that the node has been deleted
SELECT master_get_active_worker_nodes();
master_get_active_worker_nodes
--------------------------------
(localhost,57637)
(1 row)
-- add some shard placements to the cluster
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-------------------------------
(3,3,localhost,57638,default)
(1 row)
CREATE TABLE cluster_management_test (col_1 text, col_2 int);
SELECT master_create_distributed_table('cluster_management_test', 'col_1', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('cluster_management_test', 16, 1);
master_create_worker_shards
-----------------------------
(1 row)
-- see that there are some active placements in the candidate node
SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1220001 | 1 | 0 | localhost | 57638
1220003 | 1 | 0 | localhost | 57638
1220005 | 1 | 0 | localhost | 57638
1220007 | 1 | 0 | localhost | 57638
1220009 | 1 | 0 | localhost | 57638
1220011 | 1 | 0 | localhost | 57638
1220013 | 1 | 0 | localhost | 57638
1220015 | 1 | 0 | localhost | 57638
(8 rows)
-- try to remove a node with active placements and see that node removal is failed
SELECT master_remove_node('localhost', :worker_2_port);
ERROR: you cannot remove a node which has active shard placements
SELECT master_get_active_worker_nodes();
master_get_active_worker_nodes
--------------------------------
(localhost,57638)
(localhost,57637)
(2 rows)
-- mark all placements in the candidate node as inactive
UPDATE pg_dist_shard_placement SET shardstate=3 WHERE nodeport=:worker_2_port;
SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1220001 | 3 | 0 | localhost | 57638
1220003 | 3 | 0 | localhost | 57638
1220005 | 3 | 0 | localhost | 57638
1220007 | 3 | 0 | localhost | 57638
1220009 | 3 | 0 | localhost | 57638
1220011 | 3 | 0 | localhost | 57638
1220013 | 3 | 0 | localhost | 57638
1220015 | 3 | 0 | localhost | 57638
(8 rows)
-- try to remove a node with only inactive placements and see that node is removed
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
SELECT master_get_active_worker_nodes();
master_get_active_worker_nodes
--------------------------------
(localhost,57637)
(1 row)
-- clean-up
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-------------------------------
(4,4,localhost,57638,default)
(1 row)
UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port;
DROP TABLE cluster_management_test;

View File

@ -19,6 +19,19 @@ SET client_min_messages TO 'WARNING';
DROP EXTENSION citus CASCADE;
RESET client_min_messages;
CREATE EXTENSION citus;
-- re-add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-------------------------------
(1,1,localhost,57637,default)
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-------------------------------
(2,2,localhost,57638,default)
(1 row)
-- verify that a table can be created after the extension has been dropped and recreated
CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL);
SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append');

View File

@ -29,6 +29,7 @@ ALTER EXTENSION citus UPDATE TO '5.2-4';
ALTER EXTENSION citus UPDATE TO '6.0-1';
ALTER EXTENSION citus UPDATE TO '6.0-2';
ALTER EXTENSION citus UPDATE TO '6.0-3';
ALTER EXTENSION citus UPDATE TO '6.0-4';
-- drop extension an re-create in newest version
DROP EXTENSION citus;
\c

View File

@ -65,6 +65,19 @@ SELECT * FROM pg_dist_shard_placement;
-- check that the extension now can be dropped (and recreated)
DROP EXTENSION citus;
CREATE EXTENSION citus;
-- re-add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-------------------------------
(1,1,localhost,57637,default)
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-------------------------------
(2,2,localhost,57638,default)
(1 row)
-- create a table with a SERIAL column
CREATE TABLE testserialtable(id serial, group_id integer);
SELECT master_create_distributed_table('testserialtable', 'group_id', 'hash');

View File

@ -11,6 +11,7 @@
# Tests around schema changes, these are run first, so there's no preexisting objects.
# ---
test: multi_extension
test: multi_cluster_management
test: multi_table_ddl
# ----------

View File

@ -16,6 +16,7 @@
# Tests around schema changes, these are run first, so there's no preexisting objects.
# ---
test: multi_extension
test: multi_cluster_management
test: multi_table_ddl
test: multi_name_lengths

View File

@ -14,6 +14,7 @@
# Tests around schema changes, these are run first, so there's no preexisting objects.
# ---
test: multi_extension
test: multi_cluster_management
test: multi_table_ddl
# ----------

View File

@ -168,13 +168,6 @@ for my $port (@workerPorts)
or die "Could not create worker data directory";
}
# Initialize master's worker list
for my $port (@workerPorts)
{
system("echo $host $port >> tmp_check/master/data/pg_worker_list.conf") == 0
or die "Could not initialize master's worker list";
}
# Routine to shutdown servers at failure/exit
my $serversAreShutdown = "FALSE";
sub ShutdownServers()

View File

@ -0,0 +1,49 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000;
-- Tests functions related to cluster membership
-- add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
SELECT master_add_node('localhost', :worker_2_port);
-- get the active nodes
SELECT master_get_active_worker_nodes();
-- try to add the node again when it is activated
SELECT master_add_node('localhost', :worker_1_port);
-- get the active nodes
SELECT master_get_active_worker_nodes();
-- try to remove a node (with no placements)
SELECT master_remove_node('localhost', :worker_2_port);
-- verify that the node has been deleted
SELECT master_get_active_worker_nodes();
-- add some shard placements to the cluster
SELECT master_add_node('localhost', :worker_2_port);
CREATE TABLE cluster_management_test (col_1 text, col_2 int);
SELECT master_create_distributed_table('cluster_management_test', 'col_1', 'hash');
SELECT master_create_worker_shards('cluster_management_test', 16, 1);
-- see that there are some active placements in the candidate node
SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
-- try to remove a node with active placements and see that node removal is failed
SELECT master_remove_node('localhost', :worker_2_port);
SELECT master_get_active_worker_nodes();
-- mark all placements in the candidate node as inactive
UPDATE pg_dist_shard_placement SET shardstate=3 WHERE nodeport=:worker_2_port;
SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
-- try to remove a node with only inactive placements and see that node is removed
SELECT master_remove_node('localhost', :worker_2_port);
SELECT master_get_active_worker_nodes();
-- clean-up
SELECT master_add_node('localhost', :worker_2_port);
UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port;
DROP TABLE cluster_management_test;

View File

@ -21,6 +21,10 @@ RESET client_min_messages;
CREATE EXTENSION citus;
-- re-add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
SELECT master_add_node('localhost', :worker_2_port);
-- verify that a table can be created after the extension has been dropped and recreated
CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL);
SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append');

View File

@ -34,6 +34,7 @@ ALTER EXTENSION citus UPDATE TO '5.2-4';
ALTER EXTENSION citus UPDATE TO '6.0-1';
ALTER EXTENSION citus UPDATE TO '6.0-2';
ALTER EXTENSION citus UPDATE TO '6.0-3';
ALTER EXTENSION citus UPDATE TO '6.0-4';
-- drop extension an re-create in newest version
DROP EXTENSION citus;

View File

@ -45,6 +45,10 @@ SELECT * FROM pg_dist_shard_placement;
DROP EXTENSION citus;
CREATE EXTENSION citus;
-- re-add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
SELECT master_add_node('localhost', :worker_2_port);
-- create a table with a SERIAL column
CREATE TABLE testserialtable(id serial, group_id integer);
SELECT master_create_distributed_table('testserialtable', 'group_id', 'hash');