Add hasmetadata column to pg_dist_node

pull/1938/head
Eren Basak 2016-10-14 12:01:31 +03:00
parent c3107b1315
commit cb1d9cba5e
10 changed files with 64 additions and 46 deletions

View File

@ -7,3 +7,5 @@ INSERT INTO citus.pg_dist_local_group VALUES (0);
ALTER TABLE citus.pg_dist_local_group SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.pg_dist_local_group TO public;
ALTER TABLE pg_catalog.pg_dist_node ADD COLUMN hasmetadata bool NOT NULL DEFAULT false;

View File

@ -172,21 +172,23 @@ NodeListInsertCommand(List *workerNodeList)
/* generate the query without any values yet */
appendStringInfo(nodeListInsertCommand,
"INSERT INTO pg_dist_node "
"(nodeid, groupid, nodename, nodeport, noderack) "
"(nodeid, groupid, nodename, nodeport, noderack, hasmetadata) "
"VALUES ");
/* iterate over the worker nodes, add the values */
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
char *hasMetadaString = workerNode->hasMetadata ? "TRUE" : "FALSE";
appendStringInfo(nodeListInsertCommand,
"(%d, %d, %s, %d, '%s')",
"(%d, %d, %s, %d, '%s', %s)",
workerNode->nodeId,
workerNode->groupId,
quote_literal_cstr(workerNode->workerName),
workerNode->workerPort,
workerNode->workerRack);
workerNode->workerRack,
hasMetadaString);
processedWorkerNodeCount++;
if (processedWorkerNodeCount != workerCount)

View File

@ -1164,6 +1164,7 @@ InitializeWorkerNodeCache(void)
workerNode->groupId = currentNode->groupId;
workerNode->nodeId = currentNode->nodeId;
strlcpy(workerNode->workerRack, currentNode->workerRack, WORKER_LENGTH);
workerNode->hasMetadata = currentNode->hasMetadata;
if (handleFound)
{

View File

@ -43,13 +43,13 @@ int GroupSize = 1;
/* local function forward declarations */
static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId,
char *nodeRack);
char *nodeRack, bool hasMetadata);
static Datum GenerateNodeTuple(WorkerNode *workerNode);
static int32 GetNextGroupId(void);
static uint32 GetMaxGroupId(void);
static int GetNextNodeId(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId,
char *nodeRack);
char *nodeRack, bool hasMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport);
static List * ParseWorkerNodeFileAndRename(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
@ -71,8 +71,10 @@ master_add_node(PG_FUNCTION_ARGS)
char *nodeNameString = text_to_cstring(nodeName);
int32 groupId = 0;
char *nodeRack = WORKER_DEFAULT_RACK;
bool hasMetadata = false;
Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack);
Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
hasMetadata);
PG_RETURN_CSTRING(returnData);
}
@ -122,7 +124,7 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0,
workerNode->workerRack);
workerNode->workerRack, false);
}
PG_RETURN_BOOL(true);
@ -202,7 +204,8 @@ ReadWorkerNodes()
* new node is inserted into the local pg_dist_node.
*/
static Datum
AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack)
AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
bool hasMetadata)
{
Relation pgDistNode = NULL;
int nextNodeIdInt = 0;
@ -245,7 +248,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack)
/* generate the new node id from the sequence */
nextNodeIdInt = GetNextNodeId();
InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack);
InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata);
heap_close(pgDistNode, AccessExclusiveLock);
@ -280,6 +283,7 @@ GenerateNodeTuple(WorkerNode *workerNode)
values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(workerNode->workerName);
values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(workerNode->workerPort);
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(workerNode->workerRack);
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(workerNode->hasMetadata);
/* open shard relation and insert new tuple */
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
@ -400,7 +404,8 @@ GetNextNodeId()
* given values into that system catalog.
*/
static void
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack)
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack,
bool hasMetadata)
{
Relation pgDistNode = NULL;
TupleDesc tupleDescriptor = NULL;
@ -417,6 +422,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *
values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName);
values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort);
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack);
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata);
/* open shard relation and insert new tuple */
pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
@ -614,6 +620,7 @@ ParseWorkerNodeFileAndRename()
strlcpy(workerNode->workerName, nodeName, WORKER_LENGTH);
strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH);
workerNode->workerPort = nodePort;
workerNode->hasMetadata = false;
workerNodeList = lappend(workerNodeList, workerNode);
}
@ -651,6 +658,8 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
tupleDescriptor, &isNull);
Datum nodeRack = heap_getattr(heapTuple, Anum_pg_dist_node_noderack,
tupleDescriptor, &isNull);
Datum hasMetadata = heap_getattr(heapTuple, Anum_pg_dist_node_hasmetadata,
tupleDescriptor, &isNull);
Assert(!HeapTupleHasNulls(heapTuple));
@ -660,6 +669,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
workerNode->groupId = DatumGetUInt32(groupId);
strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH);
strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH);
workerNode->hasMetadata = DatumGetBool(hasMetadata);
return workerNode;
}

View File

@ -22,6 +22,7 @@ typedef struct FormData_pg_dist_node
#ifdef CATALOG_VARLEN
text nodename;
int nodeport;
bool hasmetadata;
#endif
} FormData_pg_dist_node;
@ -36,12 +37,13 @@ typedef FormData_pg_dist_node *Form_pg_dist_node;
* compiler constants for pg_dist_node
* ----------------
*/
#define Natts_pg_dist_node 5
#define Natts_pg_dist_node 6
#define Anum_pg_dist_node_nodeid 1
#define Anum_pg_dist_node_groupid 2
#define Anum_pg_dist_node_nodename 3
#define Anum_pg_dist_node_nodeport 4
#define Anum_pg_dist_node_noderack 5
#define Anum_pg_dist_node_hasmetadata 6
#define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq"
#define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq"

View File

@ -42,6 +42,7 @@ typedef struct WorkerNode
char workerName[WORKER_LENGTH]; /* node's name */
uint32 groupId; /* node's groupId; same for the nodes that are in the same group */
char workerRack[WORKER_LENGTH]; /* node's network location */
bool hasMetadata; /* node gets metadata changes */
} WorkerNode;

View File

@ -3,15 +3,15 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000;
-- Tests functions related to cluster membership
-- add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-------------------------------
(1,1,localhost,57637,default)
master_add_node
---------------------------------
(1,1,localhost,57637,default,f)
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-------------------------------
(2,2,localhost,57638,default)
master_add_node
---------------------------------
(2,2,localhost,57638,default,f)
(1 row)
-- get the active nodes
@ -24,9 +24,9 @@ SELECT master_get_active_worker_nodes();
-- try to add the node again when it is activated
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-------------------------------
(1,1,localhost,57637,default)
master_add_node
---------------------------------
(1,1,localhost,57637,default,f)
(1 row)
-- get the active nodes
@ -53,9 +53,9 @@ SELECT master_get_active_worker_nodes();
-- add some shard placements to the cluster
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-------------------------------
(3,3,localhost,57638,default)
master_add_node
---------------------------------
(3,3,localhost,57638,default,f)
(1 row)
CREATE TABLE cluster_management_test (col_1 text, col_2 int);
@ -125,9 +125,9 @@ SELECT master_get_active_worker_nodes();
-- clean-up
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-------------------------------
(4,4,localhost,57638,default)
master_add_node
---------------------------------
(4,4,localhost,57638,default,f)
(1 row)
UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port;

View File

@ -21,15 +21,15 @@ RESET client_min_messages;
CREATE EXTENSION citus;
-- re-add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-------------------------------
(1,1,localhost,57637,default)
master_add_node
---------------------------------
(1,1,localhost,57637,default,f)
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-------------------------------
(2,2,localhost,57638,default)
master_add_node
---------------------------------
(2,2,localhost,57638,default,f)
(1 row)
-- verify that a table can be created after the extension has been dropped and recreated

View File

@ -25,11 +25,11 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
-- Show that, with no MX tables, metadata snapshot contains only the delete commands and
-- pg_dist_node entries
SELECT unnest(master_metadata_snapshot());
unnest
-------------------------------------------------------------------------------------------------------------------------------------------------------------
unnest
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default')
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
(3 rows)
-- Create a test table with constraints and SERIAL
@ -55,7 +55,7 @@ SELECT unnest(master_metadata_snapshot());
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default')
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
@ -74,7 +74,7 @@ SELECT unnest(master_metadata_snapshot());
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default')
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2)
@ -95,7 +95,7 @@ SELECT unnest(master_metadata_snapshot());
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default')
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
CREATE SCHEMA IF NOT EXISTS mx_testing_schema
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
@ -121,7 +121,7 @@ SELECT unnest(master_metadata_snapshot());
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default')
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
CREATE SCHEMA IF NOT EXISTS mx_testing_schema
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
@ -140,7 +140,7 @@ SELECT unnest(master_metadata_snapshot());
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default')
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
CREATE SCHEMA IF NOT EXISTS mx_testing_schema
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)

View File

@ -67,15 +67,15 @@ DROP EXTENSION citus;
CREATE EXTENSION citus;
-- re-add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-------------------------------
(1,1,localhost,57637,default)
master_add_node
---------------------------------
(1,1,localhost,57637,default,f)
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-------------------------------
(2,2,localhost,57638,default)
master_add_node
---------------------------------
(2,2,localhost,57638,default,f)
(1 row)
-- create a table with a SERIAL column