Add master_move_node function

pull/1710/head^2
Brian Cloutier 2017-10-10 16:26:34 -07:00 committed by Brian Cloutier
parent 4a155bfccf
commit ebcb2b65e9
12 changed files with 184 additions and 4 deletions

View File

@ -11,7 +11,8 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
6.2-1 6.2-2 6.2-3 6.2-4 \
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10 7.0-11 7.0-12 7.0-13 7.0-14 7.0-15
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10 7.0-11 7.0-12 7.0-13 7.0-14 7.0-15 \
7.1-1
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -169,6 +170,8 @@ $(EXTENSION)--7.0-14.sql: $(EXTENSION)--7.0-13.sql $(EXTENSION)--7.0-13--7.0-14.
cat $^ > $@
$(EXTENSION)--7.0-15.sql: $(EXTENSION)--7.0-14.sql $(EXTENSION)--7.0-14--7.0-15.sql
cat $^ > $@
$(EXTENSION)--7.1-1.sql: $(EXTENSION)--7.0-15.sql $(EXTENSION)--7.0-15--7.1-1.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,14 @@
/* citus--7.0-15--7.1-1 */
SET search_path = 'pg_catalog';
CREATE OR REPLACE FUNCTION master_update_node(node_id int,
new_node_name text,
new_node_port int)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_update_node$$;
COMMENT ON FUNCTION master_update_node(node_id int, new_node_name text, new_node_port int)
IS 'change the location of a node';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '7.0-15'
default_version = '7.1-1'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -99,6 +99,7 @@ typedef struct MetadataCacheData
Oid distShardRelationId;
Oid distPlacementRelationId;
Oid distNodeRelationId;
Oid distNodeNodeIdIndexId;
Oid distLocalGroupRelationId;
Oid distColocationRelationId;
Oid distColocationConfigurationIndexId;
@ -1654,6 +1655,17 @@ DistNodeRelationId(void)
}
/* return oid of pg_dist_node's primary key index */
Oid
DistNodeNodeIdIndexId(void)
{
CachedRelationLookup("pg_dist_node_pkey",
&MetadataCache.distNodeNodeIdIndexId);
return MetadataCache.distNodeNodeIdIndexId;
}
/* return oid of pg_dist_local_group relation */
Oid
DistLocalGroupIdRelationId(void)

View File

@ -71,6 +71,7 @@ static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 gro
static void DeleteNodeRow(char *nodename, int32 nodeport);
static List * ParseWorkerNodeFileAndRename(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(master_add_node);
@ -79,6 +80,7 @@ PG_FUNCTION_INFO_V1(master_add_secondary_node);
PG_FUNCTION_INFO_V1(master_remove_node);
PG_FUNCTION_INFO_V1(master_disable_node);
PG_FUNCTION_INFO_V1(master_activate_node);
PG_FUNCTION_INFO_V1(master_update_node);
PG_FUNCTION_INFO_V1(master_initialize_node_metadata);
PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
@ -453,6 +455,103 @@ ActivateNode(char *nodeName, int nodePort)
}
/*
* master_update_node moves the requested node to a different nodename and nodeport. It
* locks to ensure no queries are running concurrently; and is intended for customers who
* are running their own failover solution.
*/
Datum
master_update_node(PG_FUNCTION_ARGS)
{
int32 nodeId = PG_GETARG_INT32(0);
text *newNodeName = PG_GETARG_TEXT_P(1);
int32 newNodePort = PG_GETARG_INT32(2);
char *newNodeNameString = text_to_cstring(newNodeName);
CheckCitusVersion(ERROR);
/*
* This lock has two purposes:
* - Ensure buggy code in Citus doesn't cause failures when the nodename/nodeport of
* a node changes mid-query
* - Provide fencing during failover, after this function returns all connections
* will use the new node location.
*
* Drawback:
* - This function blocks until all previous queries have finished. This means that
* long-running queries will prevent failover.
*/
LockRelationOid(DistNodeRelationId(), AccessExclusiveLock);
if (FindWorkerNodeAnyCluster(newNodeNameString, newNodePort) != NULL)
{
ereport(ERROR, (errmsg("node at \"%s:%u\" already exists",
newNodeNameString,
newNodePort)));
}
UpdateNodeLocation(nodeId, newNodeNameString, newNodePort);
PG_RETURN_VOID();
}
static void
UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort)
{
const bool indexOK = true;
const int scanKeyCount = 1;
Relation pgDistNode = NULL;
TupleDesc tupleDescriptor = NULL;
ScanKeyData scanKey[scanKeyCount];
SysScanDesc scanDescriptor = NULL;
HeapTuple heapTuple = NULL;
Datum values[Natts_pg_dist_node];
bool isnull[Natts_pg_dist_node];
bool replace[Natts_pg_dist_node];
pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistNode);
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid,
BTEqualStrategyNumber, F_INT8EQ, Int32GetDatum(nodeId));
scanDescriptor = systable_beginscan(pgDistNode, DistNodeNodeIdIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
newNodeName, newNodePort)));
}
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_node_nodeport - 1] = Int32GetDatum(newNodePort);
isnull[Anum_pg_dist_node_nodeport - 1] = false;
replace[Anum_pg_dist_node_nodeport - 1] = true;
values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(newNodeName);
isnull[Anum_pg_dist_node_nodename - 1] = false;
replace[Anum_pg_dist_node_nodename - 1] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
CatalogTupleUpdate(pgDistNode, &heapTuple->t_self, heapTuple);
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
CommandCounterIncrement();
systable_endscan(scanDescriptor);
heap_close(pgDistNode, NoLock);
}
/*
* master_initialize_node_metadata is run once, when upgrading citus. It ingests the
* existing pg_worker_list.conf into pg_dist_node, then adds a header to the file stating

View File

@ -109,6 +109,7 @@ extern Oid DistNodeRelationId(void);
extern Oid DistLocalGroupIdRelationId(void);
/* index oids */
extern Oid DistNodeNodeIdIndexId(void);
extern Oid DistPartitionLogicalRelidIndexId(void);
extern Oid DistPartitionColocationidIndexId(void);
extern Oid DistShardLogicalRelidIndexId(void);

View File

@ -213,7 +213,7 @@ step s1-drop: DROP TABLE drop_hash;
step s2-distribute-table: SELECT create_distributed_table('drop_hash', 'id'); <waiting ...>
step s1-commit: COMMIT;
step s2-distribute-table: <... completed>
error in steps s1-commit s2-distribute-table: ERROR: could not open relation with OID 23248
error in steps s1-commit s2-distribute-table: ERROR: could not open relation with OID 23249
step s2-commit: COMMIT;
step s1-select-count: SELECT COUNT(*) FROM drop_hash;
ERROR: relation "drop_hash" does not exist

View File

@ -213,7 +213,7 @@ step s1-drop: DROP TABLE drop_hash;
step s2-distribute-table: SELECT create_distributed_table('drop_hash', 'id'); <waiting ...>
step s1-commit: COMMIT;
step s2-distribute-table: <... completed>
error in steps s1-commit s2-distribute-table: ERROR: could not open relation with OID 22204
error in steps s1-commit s2-distribute-table: ERROR: could not open relation with OID 22205
step s2-commit: COMMIT;
step s1-select-count: SELECT COUNT(*) FROM drop_hash;
ERROR: relation "drop_hash" does not exist

View File

@ -557,3 +557,36 @@ SELECT master_add_secondary_node('localhost', 9992, 'localhost', :worker_1_port,
(24,12,localhost,9992,default,f,t,secondary,second-cluster)
(1 row)
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
-- master_update_node checks node exists
SELECT master_update_node(100, 'localhost', 8000);
ERROR: could not find valid entry for node "localhost:8000"
-- master_update_node disallows aliasing existing node
SELECT master_update_node(:worker_1_node, 'localhost', :worker_2_port);
ERROR: node at "localhost:57638" already exists
-- master_update_node moves a node
SELECT master_update_node(:worker_1_node, 'somehost', 9000);
master_update_node
--------------------
(1 row)
SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
--------+---------+----------+----------+----------+-------------+----------+----------+-------------
13 | 12 | somehost | 9000 | default | f | t | primary | default
(1 row)
-- cleanup
SELECT master_update_node(:worker_1_node, 'localhost', :worker_1_port);
master_update_node
--------------------
(1 row)
SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------
13 | 12 | localhost | 57637 | default | f | t | primary | default
(1 row)

View File

@ -125,6 +125,7 @@ ALTER EXTENSION citus UPDATE TO '7.0-12';
ALTER EXTENSION citus UPDATE TO '7.0-13';
ALTER EXTENSION citus UPDATE TO '7.0-14';
ALTER EXTENSION citus UPDATE TO '7.0-15';
ALTER EXTENSION citus UPDATE TO '7.1-1';
-- show running version
SHOW citus.version;
citus.version

View File

@ -238,3 +238,19 @@ SELECT master_add_secondary_node('localhost', 9995, 'localhost', :worker_1_port)
SELECT master_add_secondary_node('localhost', 9994, primaryname => 'localhost', primaryport => :worker_2_port);
SELECT master_add_secondary_node('localhost', 9993, 'localhost', 2000);
SELECT master_add_secondary_node('localhost', 9992, 'localhost', :worker_1_port, nodecluster => 'second-cluster');
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
-- master_update_node checks node exists
SELECT master_update_node(100, 'localhost', 8000);
-- master_update_node disallows aliasing existing node
SELECT master_update_node(:worker_1_node, 'localhost', :worker_2_port);
-- master_update_node moves a node
SELECT master_update_node(:worker_1_node, 'somehost', 9000);
SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;
-- cleanup
SELECT master_update_node(:worker_1_node, 'localhost', :worker_1_port);
SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;

View File

@ -125,6 +125,7 @@ ALTER EXTENSION citus UPDATE TO '7.0-12';
ALTER EXTENSION citus UPDATE TO '7.0-13';
ALTER EXTENSION citus UPDATE TO '7.0-14';
ALTER EXTENSION citus UPDATE TO '7.0-15';
ALTER EXTENSION citus UPDATE TO '7.1-1';
-- show running version
SHOW citus.version;