diff --git a/src/backend/distributed/citus--6.0-8--6.0-9.sql b/src/backend/distributed/citus--6.0-8--6.0-9.sql index 057966d28..78139bc77 100644 --- a/src/backend/distributed/citus--6.0-8--6.0-9.sql +++ b/src/backend/distributed/citus--6.0-8--6.0-9.sql @@ -7,3 +7,5 @@ INSERT INTO citus.pg_dist_local_group VALUES (0); ALTER TABLE citus.pg_dist_local_group SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.pg_dist_local_group TO public; + +ALTER TABLE pg_catalog.pg_dist_node ADD COLUMN hasmetadata bool NOT NULL DEFAULT false; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 6b2d75f3e..5980b6425 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -172,21 +172,23 @@ NodeListInsertCommand(List *workerNodeList) /* generate the query without any values yet */ appendStringInfo(nodeListInsertCommand, "INSERT INTO pg_dist_node " - "(nodeid, groupid, nodename, nodeport, noderack) " + "(nodeid, groupid, nodename, nodeport, noderack, hasmetadata) " "VALUES "); /* iterate over the worker nodes, add the values */ foreach(workerNodeCell, workerNodeList) { WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *hasMetadaString = workerNode->hasMetadata ? "TRUE" : "FALSE"; appendStringInfo(nodeListInsertCommand, - "(%d, %d, %s, %d, '%s')", + "(%d, %d, %s, %d, '%s', %s)", workerNode->nodeId, workerNode->groupId, quote_literal_cstr(workerNode->workerName), workerNode->workerPort, - workerNode->workerRack); + workerNode->workerRack, + hasMetadaString); processedWorkerNodeCount++; if (processedWorkerNodeCount != workerCount) diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 18c17dc6f..a2b62bb34 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -1164,6 +1164,7 @@ InitializeWorkerNodeCache(void) workerNode->groupId = currentNode->groupId; workerNode->nodeId = currentNode->nodeId; strlcpy(workerNode->workerRack, currentNode->workerRack, WORKER_LENGTH); + workerNode->hasMetadata = currentNode->hasMetadata; if (handleFound) { diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 78fd3df83..9bdf0b592 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -43,13 +43,13 @@ int GroupSize = 1; /* local function forward declarations */ static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, - char *nodeRack); + char *nodeRack, bool hasMetadata); static Datum GenerateNodeTuple(WorkerNode *workerNode); static int32 GetNextGroupId(void); static uint32 GetMaxGroupId(void); static int GetNextNodeId(void); static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId, - char *nodeRack); + char *nodeRack, bool hasMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); static List * ParseWorkerNodeFileAndRename(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); @@ -71,8 +71,10 @@ master_add_node(PG_FUNCTION_ARGS) char *nodeNameString = text_to_cstring(nodeName); int32 groupId = 0; char *nodeRack = WORKER_DEFAULT_RACK; + bool hasMetadata = false; - Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack); + Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, + hasMetadata); PG_RETURN_CSTRING(returnData); } @@ -122,7 +124,7 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0, - workerNode->workerRack); + workerNode->workerRack, false); } PG_RETURN_BOOL(true); @@ -202,7 +204,8 @@ ReadWorkerNodes() * new node is inserted into the local pg_dist_node. */ static Datum -AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack) +AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, + bool hasMetadata) { Relation pgDistNode = NULL; int nextNodeIdInt = 0; @@ -245,7 +248,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack) /* generate the new node id from the sequence */ nextNodeIdInt = GetNextNodeId(); - InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack); + InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata); heap_close(pgDistNode, AccessExclusiveLock); @@ -280,6 +283,7 @@ GenerateNodeTuple(WorkerNode *workerNode) values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(workerNode->workerName); values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(workerNode->workerPort); values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(workerNode->workerRack); + values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(workerNode->hasMetadata); /* open shard relation and insert new tuple */ pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); @@ -400,7 +404,8 @@ GetNextNodeId() * given values into that system catalog. */ static void -InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack) +InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack, + bool hasMetadata) { Relation pgDistNode = NULL; TupleDesc tupleDescriptor = NULL; @@ -417,6 +422,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char * values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName); values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort); values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack); + values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata); /* open shard relation and insert new tuple */ pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); @@ -614,6 +620,7 @@ ParseWorkerNodeFileAndRename() strlcpy(workerNode->workerName, nodeName, WORKER_LENGTH); strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH); workerNode->workerPort = nodePort; + workerNode->hasMetadata = false; workerNodeList = lappend(workerNodeList, workerNode); } @@ -651,6 +658,8 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) tupleDescriptor, &isNull); Datum nodeRack = heap_getattr(heapTuple, Anum_pg_dist_node_noderack, tupleDescriptor, &isNull); + Datum hasMetadata = heap_getattr(heapTuple, Anum_pg_dist_node_hasmetadata, + tupleDescriptor, &isNull); Assert(!HeapTupleHasNulls(heapTuple)); @@ -660,6 +669,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) workerNode->groupId = DatumGetUInt32(groupId); strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH); strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH); + workerNode->hasMetadata = DatumGetBool(hasMetadata); return workerNode; } diff --git a/src/include/distributed/pg_dist_node.h b/src/include/distributed/pg_dist_node.h index 0276dcef7..c5db5129c 100644 --- a/src/include/distributed/pg_dist_node.h +++ b/src/include/distributed/pg_dist_node.h @@ -22,6 +22,7 @@ typedef struct FormData_pg_dist_node #ifdef CATALOG_VARLEN text nodename; int nodeport; + bool hasmetadata; #endif } FormData_pg_dist_node; @@ -36,12 +37,13 @@ typedef FormData_pg_dist_node *Form_pg_dist_node; * compiler constants for pg_dist_node * ---------------- */ -#define Natts_pg_dist_node 5 +#define Natts_pg_dist_node 6 #define Anum_pg_dist_node_nodeid 1 #define Anum_pg_dist_node_groupid 2 #define Anum_pg_dist_node_nodename 3 #define Anum_pg_dist_node_nodeport 4 #define Anum_pg_dist_node_noderack 5 +#define Anum_pg_dist_node_hasmetadata 6 #define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq" #define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq" diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index b10ae5f4f..7f832d3b3 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -42,6 +42,7 @@ typedef struct WorkerNode char workerName[WORKER_LENGTH]; /* node's name */ uint32 groupId; /* node's groupId; same for the nodes that are in the same group */ char workerRack[WORKER_LENGTH]; /* node's network location */ + bool hasMetadata; /* node gets metadata changes */ } WorkerNode; diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 84e753c2d..a5e3fd5c9 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -3,15 +3,15 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000; -- Tests functions related to cluster membership -- add the nodes to the cluster SELECT master_add_node('localhost', :worker_1_port); - master_add_node -------------------------------- - (1,1,localhost,57637,default) + master_add_node +--------------------------------- + (1,1,localhost,57637,default,f) (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node -------------------------------- - (2,2,localhost,57638,default) + master_add_node +--------------------------------- + (2,2,localhost,57638,default,f) (1 row) -- get the active nodes @@ -24,9 +24,9 @@ SELECT master_get_active_worker_nodes(); -- try to add the node again when it is activated SELECT master_add_node('localhost', :worker_1_port); - master_add_node -------------------------------- - (1,1,localhost,57637,default) + master_add_node +--------------------------------- + (1,1,localhost,57637,default,f) (1 row) -- get the active nodes @@ -53,9 +53,9 @@ SELECT master_get_active_worker_nodes(); -- add some shard placements to the cluster SELECT master_add_node('localhost', :worker_2_port); - master_add_node -------------------------------- - (3,3,localhost,57638,default) + master_add_node +--------------------------------- + (3,3,localhost,57638,default,f) (1 row) CREATE TABLE cluster_management_test (col_1 text, col_2 int); @@ -125,9 +125,9 @@ SELECT master_get_active_worker_nodes(); -- clean-up SELECT master_add_node('localhost', :worker_2_port); - master_add_node -------------------------------- - (4,4,localhost,57638,default) + master_add_node +--------------------------------- + (4,4,localhost,57638,default,f) (1 row) UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port; diff --git a/src/test/regress/expected/multi_drop_extension.out b/src/test/regress/expected/multi_drop_extension.out index 9fd10d0d6..645a7a935 100644 --- a/src/test/regress/expected/multi_drop_extension.out +++ b/src/test/regress/expected/multi_drop_extension.out @@ -21,15 +21,15 @@ RESET client_min_messages; CREATE EXTENSION citus; -- re-add the nodes to the cluster SELECT master_add_node('localhost', :worker_1_port); - master_add_node -------------------------------- - (1,1,localhost,57637,default) + master_add_node +--------------------------------- + (1,1,localhost,57637,default,f) (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node -------------------------------- - (2,2,localhost,57638,default) + master_add_node +--------------------------------- + (2,2,localhost,57638,default,f) (1 row) -- verify that a table can be created after the extension has been dropped and recreated diff --git a/src/test/regress/expected/multi_metadata_snapshot.out b/src/test/regress/expected/multi_metadata_snapshot.out index 01d23a236..c33daeadf 100644 --- a/src/test/regress/expected/multi_metadata_snapshot.out +++ b/src/test/regress/expected/multi_metadata_snapshot.out @@ -25,11 +25,11 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s'; -- Show that, with no MX tables, metadata snapshot contains only the delete commands and -- pg_dist_node entries SELECT unnest(master_metadata_snapshot()); - unnest -------------------------------------------------------------------------------------------------------------------------------------------------------------- + unnest +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) (3 rows) -- Create a test table with constraints and SERIAL @@ -55,7 +55,7 @@ SELECT unnest(master_metadata_snapshot()); -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL) ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) @@ -74,7 +74,7 @@ SELECT unnest(master_metadata_snapshot()); -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2) @@ -95,7 +95,7 @@ SELECT unnest(master_metadata_snapshot()); ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) CREATE SCHEMA IF NOT EXISTS mx_testing_schema CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) @@ -121,7 +121,7 @@ SELECT unnest(master_metadata_snapshot()); ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) CREATE SCHEMA IF NOT EXISTS mx_testing_schema CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) @@ -140,7 +140,7 @@ SELECT unnest(master_metadata_snapshot()); ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) CREATE SCHEMA IF NOT EXISTS mx_testing_schema CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index 0052ad7ae..b4d2c3704 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -67,15 +67,15 @@ DROP EXTENSION citus; CREATE EXTENSION citus; -- re-add the nodes to the cluster SELECT master_add_node('localhost', :worker_1_port); - master_add_node -------------------------------- - (1,1,localhost,57637,default) + master_add_node +--------------------------------- + (1,1,localhost,57637,default,f) (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node -------------------------------- - (2,2,localhost,57638,default) + master_add_node +--------------------------------- + (2,2,localhost,57638,default,f) (1 row) -- create a table with a SERIAL column