From 8f477d18f18e97aabc996a516663bc45a85bb9e1 Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Tue, 11 Oct 2016 12:18:38 +0300 Subject: [PATCH 1/3] Add pg_dist_local_group Metadata Table This change adds the pg_dist_local_group metadata table, which indicates the group id of the current node. It is expected that this table contains one and only one row, which only contains the group id of the node as an integer. --- src/backend/distributed/Makefile | 4 +++- src/backend/distributed/citus--6.0-4--6.0-5.sql | 2 -- src/backend/distributed/citus--6.0-6--6.0-7.sql | 2 +- src/backend/distributed/citus--6.0-8--6.0-9.sql | 9 +++++++++ src/backend/distributed/citus.control | 2 +- src/test/regress/expected/multi_extension.out | 1 + src/test/regress/sql/multi_extension.sql | 1 + 7 files changed, 16 insertions(+), 5 deletions(-) create mode 100644 src/backend/distributed/citus--6.0-8--6.0-9.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index e0cc59453..7a3c54bb3 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -8,7 +8,7 @@ EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ - 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 + 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -74,6 +74,8 @@ $(EXTENSION)--6.0-7.sql: $(EXTENSION)--6.0-6.sql $(EXTENSION)--6.0-6--6.0-7.sql cat $^ > $@ $(EXTENSION)--6.0-8.sql: $(EXTENSION)--6.0-7.sql $(EXTENSION)--6.0-7--6.0-8.sql cat $^ > $@ +$(EXTENSION)--6.0-9.sql: $(EXTENSION)--6.0-8.sql $(EXTENSION)--6.0-8--6.0-9.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.0-4--6.0-5.sql b/src/backend/distributed/citus--6.0-4--6.0-5.sql index 08ee51a78..1b72ede65 100644 --- a/src/backend/distributed/citus--6.0-4--6.0-5.sql +++ b/src/backend/distributed/citus--6.0-4--6.0-5.sql @@ -1,5 +1,3 @@ -/* citus--5.2-1--5.2-2.sql */ - /* * Replace oid column in pg_dist_shard_placement with an sequence column. */ diff --git a/src/backend/distributed/citus--6.0-6--6.0-7.sql b/src/backend/distributed/citus--6.0-6--6.0-7.sql index 9fe34e250..20cc432c2 100644 --- a/src/backend/distributed/citus--6.0-6--6.0-7.sql +++ b/src/backend/distributed/citus--6.0-6--6.0-7.sql @@ -1,4 +1,4 @@ -/* citus--6.0-5--6.0-6.sql */ +/* citus--6.0-6--6.0-7.sql */ CREATE FUNCTION pg_catalog.get_colocated_table_array(regclass) RETURNS regclass[] diff --git a/src/backend/distributed/citus--6.0-8--6.0-9.sql b/src/backend/distributed/citus--6.0-8--6.0-9.sql new file mode 100644 index 000000000..057966d28 --- /dev/null +++ b/src/backend/distributed/citus--6.0-8--6.0-9.sql @@ -0,0 +1,9 @@ +CREATE TABLE citus.pg_dist_local_group( + groupid int NOT NULL PRIMARY KEY) +; + +/* insert the default value for being the coordinator node */ +INSERT INTO citus.pg_dist_local_group VALUES (0); + +ALTER TABLE citus.pg_dist_local_group SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.pg_dist_local_group TO public; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index fbf54e7f4..6dcf270ae 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.0-8' +default_version = '6.0-9' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index f3010ba42..0ebe5275f 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -34,6 +34,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-5'; ALTER EXTENSION citus UPDATE TO '6.0-6'; ALTER EXTENSION citus UPDATE TO '6.0-7'; ALTER EXTENSION citus UPDATE TO '6.0-8'; +ALTER EXTENSION citus UPDATE TO '6.0-9'; -- drop extension an re-create in newest version DROP EXTENSION citus; \c diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 8803ab777..19c81e27c 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -39,6 +39,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-5'; ALTER EXTENSION citus UPDATE TO '6.0-6'; ALTER EXTENSION citus UPDATE TO '6.0-7'; ALTER EXTENSION citus UPDATE TO '6.0-8'; +ALTER EXTENSION citus UPDATE TO '6.0-9'; -- drop extension an re-create in newest version DROP EXTENSION citus; From c7bf2021fa36728b5fb5ef0465ccaca21844c8d8 Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Tue, 11 Oct 2016 14:48:47 +0300 Subject: [PATCH 2/3] Add metadata infrastructure for pg_dist_local_group table --- .../distributed/utils/metadata_cache.c | 126 ++++++++++++++++++ src/include/distributed/metadata_cache.h | 2 + src/include/distributed/pg_dist_local_group.h | 37 +++++ 3 files changed, 165 insertions(+) create mode 100644 src/include/distributed/pg_dist_local_group.h diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 83d5ecd00..18c17dc6f 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -26,6 +26,7 @@ #include "distributed/colocation_utils.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" +#include "distributed/pg_dist_local_group.h" #include "distributed/pg_dist_node.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" @@ -52,6 +53,7 @@ static bool extensionLoaded = false; static Oid distShardRelationId = InvalidOid; static Oid distShardPlacementRelationId = InvalidOid; static Oid distNodeRelationId = InvalidOid; +static Oid distLocalGroupRelationId = InvalidOid; static Oid distPartitionRelationId = InvalidOid; static Oid distPartitionLogicalRelidIndexId = InvalidOid; static Oid distPartitionColocationidIndexId = InvalidOid; @@ -69,6 +71,11 @@ static HTAB *DistTableCacheHash = NULL; static HTAB *WorkerNodeHash = NULL; static bool workerNodeHashValid = false; +static bool invalidationRegistered = false; + +/* default value is -1, for schema node it's 0 and for worker nodes > 0 */ +static int LocalGroupId = -1; + /* built first time through in InitializePartitionCache */ static ScanKeyData DistPartitionScanKey[1]; static ScanKeyData DistShardScanKey[1]; @@ -93,6 +100,7 @@ static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); static List * DistTableOidList(void); +static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId); static List * LookupDistShardTuples(Oid relationId); static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, @@ -107,6 +115,7 @@ static void CachedRelationLookup(const char *relationName, Oid *cachedOid); PG_FUNCTION_INFO_V1(master_dist_partition_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_shard_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate); +PG_FUNCTION_INFO_V1(master_dist_local_group_cache_invalidate); /* @@ -668,6 +677,16 @@ DistNodeRelationId(void) } +/* return oid of pg_dist_local_group relation */ +Oid +DistLocalGroupIdRelationId(void) +{ + CachedRelationLookup("pg_dist_local_group", &distLocalGroupRelationId); + + return distLocalGroupRelationId; +} + + /* return oid of pg_dist_partition relation */ Oid DistPartitionRelationId(void) @@ -993,6 +1012,29 @@ master_dist_node_cache_invalidate(PG_FUNCTION_ARGS) } +/* + * master_dist_local_group_cache_invalidate is a trigger function that performs + * relcache invalidations when the contents of pg_dist_local_group are changed + * on the SQL level. + * + * NB: We decided there is little point in checking permissions here, there + * are much easier ways to waste CPU than causing cache invalidations. + */ +Datum +master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS) +{ + if (!CALLED_AS_TRIGGER(fcinfo)) + { + ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("must be called as trigger"))); + } + + CitusInvalidateRelcacheByRelid(DistLocalGroupIdRelationId()); + + PG_RETURN_DATUM(PointerGetDatum(NULL)); +} + + /* initialize the infrastructure for the metadata cache */ static void InitializeDistTableCache(void) @@ -1149,6 +1191,74 @@ InitializeWorkerNodeCache(void) } +/* + * GetLocalGroupId returns the group identifier of the local node. The function assumes + * that pg_dist_local_node_group has exactly one row and has at least one column. + * Otherwise, the function errors out. + */ +int +GetLocalGroupId(void) +{ + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 0; + HeapTuple heapTuple = NULL; + TupleDesc tupleDescriptor = NULL; + Oid groupId = InvalidOid; + Relation pgDistLocalGroupId = NULL; + + /* + * Already set the group id, no need to read the heap again. + */ + if (LocalGroupId != -1) + { + return LocalGroupId; + } + + pgDistLocalGroupId = heap_open(DistLocalGroupIdRelationId(), AccessShareLock); + + scanDescriptor = systable_beginscan(pgDistLocalGroupId, + InvalidOid, false, + NULL, scanKeyCount, scanKey); + + tupleDescriptor = RelationGetDescr(pgDistLocalGroupId); + + heapTuple = systable_getnext(scanDescriptor); + + if (HeapTupleIsValid(heapTuple)) + { + bool isNull = false; + Datum groupIdDatum = heap_getattr(heapTuple, + Anum_pg_dist_local_groupid, + tupleDescriptor, &isNull); + + groupId = DatumGetUInt32(groupIdDatum); + } + else + { + elog(ERROR, "could not find any entries in pg_dist_local_group"); + } + + systable_endscan(scanDescriptor); + heap_close(pgDistLocalGroupId, AccessShareLock); + + /* prevent multiple invalidation registrations */ + if (!invalidationRegistered) + { + /* Watch for invalidation events. */ + CacheRegisterRelcacheCallback(InvalidateLocalGroupIdRelationCacheCallback, + (Datum) 0); + + invalidationRegistered = true; + } + + /* set the local cache variable */ + LocalGroupId = groupId; + + return groupId; +} + + /* * WorkerNodeHashCode computes the hash code for a worker node from the node's * host name and port number. Nodes that only differ by their rack locations @@ -1270,6 +1380,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) extensionLoaded = false; distShardRelationId = InvalidOid; distShardPlacementRelationId = InvalidOid; + distLocalGroupRelationId = InvalidOid; distPartitionRelationId = InvalidOid; distPartitionLogicalRelidIndexId = InvalidOid; distPartitionColocationidIndexId = InvalidOid; @@ -1344,6 +1455,21 @@ InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId) } +/* + * InvalidateLocalGroupIdRelationCacheCallback sets the LocalGroupId to + * the default value. + */ +static void +InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId) +{ + /* when invalidation happens simply set the LocalGroupId to the default value */ + if (relationId == InvalidOid || relationId == distLocalGroupRelationId) + { + LocalGroupId = -1; + } +} + + /* * LookupDistPartitionTuple searches pg_dist_partition for relationId's entry * and returns that or, if no matching entry was found, NULL. diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 2985c2b71..7a32854f1 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -56,6 +56,7 @@ extern bool IsDistributedTable(Oid relationId); extern List * DistributedTableList(void); extern ShardInterval * LoadShardInterval(uint64 shardId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); +extern int GetLocalGroupId(void); extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateNodeCache(void); @@ -69,6 +70,7 @@ extern Oid DistPartitionRelationId(void); extern Oid DistShardRelationId(void); extern Oid DistShardPlacementRelationId(void); extern Oid DistNodeRelationId(void); +extern Oid DistLocalGroupIdRelationId(void); /* index oids */ extern Oid DistPartitionLogicalRelidIndexId(void); diff --git a/src/include/distributed/pg_dist_local_group.h b/src/include/distributed/pg_dist_local_group.h new file mode 100644 index 000000000..5d0f8f47a --- /dev/null +++ b/src/include/distributed/pg_dist_local_group.h @@ -0,0 +1,37 @@ +/*------------------------------------------------------------------------- + * + * pg_dist_local_group.h + * definition of the relation that holds the local group id (pg_dist_local_group). + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PG_DIST_LOCAL_GROUP_H +#define PG_DIST_LOCAL_GROUP_H + +/* ---------------- + * pg_dist_local_group definition. + * ---------------- + */ +typedef struct FormData_pg_dist_local_group +{ + int groupid; +} FormData_pg_dist_local_group; + +/* ---------------- + * FormData_pg_dist_local_group corresponds to a pointer to a tuple with + * the format of pg_dist_local_group relation. + * ---------------- + */ +typedef FormData_pg_dist_local_group *Form_pg_dist_local_group; + +/* ---------------- + * compiler constants for pg_dist_local_group + * ---------------- + */ +#define Natts_pg_dist_local_group 1 +#define Anum_pg_dist_local_groupid 1 + +#endif /* PG_DIST_LOCAL_GROUP_H */ From f3ede37c9f5dc9ee9d331ece51b9a704a92ee282 Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Fri, 14 Oct 2016 12:01:31 +0300 Subject: [PATCH 3/3] Add hasmetadata column to pg_dist_node --- .../distributed/citus--6.0-8--6.0-9.sql | 2 ++ .../distributed/metadata/metadata_sync.c | 8 +++-- .../distributed/utils/metadata_cache.c | 1 + src/backend/distributed/utils/node_metadata.c | 24 ++++++++++----- src/include/distributed/pg_dist_node.h | 4 ++- src/include/distributed/worker_manager.h | 1 + .../expected/multi_cluster_management.out | 30 +++++++++---------- .../regress/expected/multi_drop_extension.out | 12 ++++---- .../expected/multi_metadata_snapshot.out | 16 +++++----- src/test/regress/expected/multi_table_ddl.out | 12 ++++---- 10 files changed, 64 insertions(+), 46 deletions(-) diff --git a/src/backend/distributed/citus--6.0-8--6.0-9.sql b/src/backend/distributed/citus--6.0-8--6.0-9.sql index 057966d28..78139bc77 100644 --- a/src/backend/distributed/citus--6.0-8--6.0-9.sql +++ b/src/backend/distributed/citus--6.0-8--6.0-9.sql @@ -7,3 +7,5 @@ INSERT INTO citus.pg_dist_local_group VALUES (0); ALTER TABLE citus.pg_dist_local_group SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.pg_dist_local_group TO public; + +ALTER TABLE pg_catalog.pg_dist_node ADD COLUMN hasmetadata bool NOT NULL DEFAULT false; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 6b2d75f3e..5980b6425 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -172,21 +172,23 @@ NodeListInsertCommand(List *workerNodeList) /* generate the query without any values yet */ appendStringInfo(nodeListInsertCommand, "INSERT INTO pg_dist_node " - "(nodeid, groupid, nodename, nodeport, noderack) " + "(nodeid, groupid, nodename, nodeport, noderack, hasmetadata) " "VALUES "); /* iterate over the worker nodes, add the values */ foreach(workerNodeCell, workerNodeList) { WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *hasMetadaString = workerNode->hasMetadata ? "TRUE" : "FALSE"; appendStringInfo(nodeListInsertCommand, - "(%d, %d, %s, %d, '%s')", + "(%d, %d, %s, %d, '%s', %s)", workerNode->nodeId, workerNode->groupId, quote_literal_cstr(workerNode->workerName), workerNode->workerPort, - workerNode->workerRack); + workerNode->workerRack, + hasMetadaString); processedWorkerNodeCount++; if (processedWorkerNodeCount != workerCount) diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 18c17dc6f..a2b62bb34 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -1164,6 +1164,7 @@ InitializeWorkerNodeCache(void) workerNode->groupId = currentNode->groupId; workerNode->nodeId = currentNode->nodeId; strlcpy(workerNode->workerRack, currentNode->workerRack, WORKER_LENGTH); + workerNode->hasMetadata = currentNode->hasMetadata; if (handleFound) { diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 78fd3df83..9bdf0b592 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -43,13 +43,13 @@ int GroupSize = 1; /* local function forward declarations */ static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, - char *nodeRack); + char *nodeRack, bool hasMetadata); static Datum GenerateNodeTuple(WorkerNode *workerNode); static int32 GetNextGroupId(void); static uint32 GetMaxGroupId(void); static int GetNextNodeId(void); static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId, - char *nodeRack); + char *nodeRack, bool hasMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); static List * ParseWorkerNodeFileAndRename(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); @@ -71,8 +71,10 @@ master_add_node(PG_FUNCTION_ARGS) char *nodeNameString = text_to_cstring(nodeName); int32 groupId = 0; char *nodeRack = WORKER_DEFAULT_RACK; + bool hasMetadata = false; - Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack); + Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, + hasMetadata); PG_RETURN_CSTRING(returnData); } @@ -122,7 +124,7 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0, - workerNode->workerRack); + workerNode->workerRack, false); } PG_RETURN_BOOL(true); @@ -202,7 +204,8 @@ ReadWorkerNodes() * new node is inserted into the local pg_dist_node. */ static Datum -AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack) +AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, + bool hasMetadata) { Relation pgDistNode = NULL; int nextNodeIdInt = 0; @@ -245,7 +248,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack) /* generate the new node id from the sequence */ nextNodeIdInt = GetNextNodeId(); - InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack); + InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata); heap_close(pgDistNode, AccessExclusiveLock); @@ -280,6 +283,7 @@ GenerateNodeTuple(WorkerNode *workerNode) values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(workerNode->workerName); values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(workerNode->workerPort); values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(workerNode->workerRack); + values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(workerNode->hasMetadata); /* open shard relation and insert new tuple */ pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); @@ -400,7 +404,8 @@ GetNextNodeId() * given values into that system catalog. */ static void -InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack) +InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack, + bool hasMetadata) { Relation pgDistNode = NULL; TupleDesc tupleDescriptor = NULL; @@ -417,6 +422,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char * values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName); values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort); values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack); + values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata); /* open shard relation and insert new tuple */ pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); @@ -614,6 +620,7 @@ ParseWorkerNodeFileAndRename() strlcpy(workerNode->workerName, nodeName, WORKER_LENGTH); strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH); workerNode->workerPort = nodePort; + workerNode->hasMetadata = false; workerNodeList = lappend(workerNodeList, workerNode); } @@ -651,6 +658,8 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) tupleDescriptor, &isNull); Datum nodeRack = heap_getattr(heapTuple, Anum_pg_dist_node_noderack, tupleDescriptor, &isNull); + Datum hasMetadata = heap_getattr(heapTuple, Anum_pg_dist_node_hasmetadata, + tupleDescriptor, &isNull); Assert(!HeapTupleHasNulls(heapTuple)); @@ -660,6 +669,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) workerNode->groupId = DatumGetUInt32(groupId); strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH); strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH); + workerNode->hasMetadata = DatumGetBool(hasMetadata); return workerNode; } diff --git a/src/include/distributed/pg_dist_node.h b/src/include/distributed/pg_dist_node.h index 0276dcef7..c5db5129c 100644 --- a/src/include/distributed/pg_dist_node.h +++ b/src/include/distributed/pg_dist_node.h @@ -22,6 +22,7 @@ typedef struct FormData_pg_dist_node #ifdef CATALOG_VARLEN text nodename; int nodeport; + bool hasmetadata; #endif } FormData_pg_dist_node; @@ -36,12 +37,13 @@ typedef FormData_pg_dist_node *Form_pg_dist_node; * compiler constants for pg_dist_node * ---------------- */ -#define Natts_pg_dist_node 5 +#define Natts_pg_dist_node 6 #define Anum_pg_dist_node_nodeid 1 #define Anum_pg_dist_node_groupid 2 #define Anum_pg_dist_node_nodename 3 #define Anum_pg_dist_node_nodeport 4 #define Anum_pg_dist_node_noderack 5 +#define Anum_pg_dist_node_hasmetadata 6 #define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq" #define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq" diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index b10ae5f4f..7f832d3b3 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -42,6 +42,7 @@ typedef struct WorkerNode char workerName[WORKER_LENGTH]; /* node's name */ uint32 groupId; /* node's groupId; same for the nodes that are in the same group */ char workerRack[WORKER_LENGTH]; /* node's network location */ + bool hasMetadata; /* node gets metadata changes */ } WorkerNode; diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 84e753c2d..a5e3fd5c9 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -3,15 +3,15 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000; -- Tests functions related to cluster membership -- add the nodes to the cluster SELECT master_add_node('localhost', :worker_1_port); - master_add_node -------------------------------- - (1,1,localhost,57637,default) + master_add_node +--------------------------------- + (1,1,localhost,57637,default,f) (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node -------------------------------- - (2,2,localhost,57638,default) + master_add_node +--------------------------------- + (2,2,localhost,57638,default,f) (1 row) -- get the active nodes @@ -24,9 +24,9 @@ SELECT master_get_active_worker_nodes(); -- try to add the node again when it is activated SELECT master_add_node('localhost', :worker_1_port); - master_add_node -------------------------------- - (1,1,localhost,57637,default) + master_add_node +--------------------------------- + (1,1,localhost,57637,default,f) (1 row) -- get the active nodes @@ -53,9 +53,9 @@ SELECT master_get_active_worker_nodes(); -- add some shard placements to the cluster SELECT master_add_node('localhost', :worker_2_port); - master_add_node -------------------------------- - (3,3,localhost,57638,default) + master_add_node +--------------------------------- + (3,3,localhost,57638,default,f) (1 row) CREATE TABLE cluster_management_test (col_1 text, col_2 int); @@ -125,9 +125,9 @@ SELECT master_get_active_worker_nodes(); -- clean-up SELECT master_add_node('localhost', :worker_2_port); - master_add_node -------------------------------- - (4,4,localhost,57638,default) + master_add_node +--------------------------------- + (4,4,localhost,57638,default,f) (1 row) UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port; diff --git a/src/test/regress/expected/multi_drop_extension.out b/src/test/regress/expected/multi_drop_extension.out index 9fd10d0d6..645a7a935 100644 --- a/src/test/regress/expected/multi_drop_extension.out +++ b/src/test/regress/expected/multi_drop_extension.out @@ -21,15 +21,15 @@ RESET client_min_messages; CREATE EXTENSION citus; -- re-add the nodes to the cluster SELECT master_add_node('localhost', :worker_1_port); - master_add_node -------------------------------- - (1,1,localhost,57637,default) + master_add_node +--------------------------------- + (1,1,localhost,57637,default,f) (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node -------------------------------- - (2,2,localhost,57638,default) + master_add_node +--------------------------------- + (2,2,localhost,57638,default,f) (1 row) -- verify that a table can be created after the extension has been dropped and recreated diff --git a/src/test/regress/expected/multi_metadata_snapshot.out b/src/test/regress/expected/multi_metadata_snapshot.out index 01d23a236..c33daeadf 100644 --- a/src/test/regress/expected/multi_metadata_snapshot.out +++ b/src/test/regress/expected/multi_metadata_snapshot.out @@ -25,11 +25,11 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s'; -- Show that, with no MX tables, metadata snapshot contains only the delete commands and -- pg_dist_node entries SELECT unnest(master_metadata_snapshot()); - unnest -------------------------------------------------------------------------------------------------------------------------------------------------------------- + unnest +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) (3 rows) -- Create a test table with constraints and SERIAL @@ -55,7 +55,7 @@ SELECT unnest(master_metadata_snapshot()); -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL) ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) @@ -74,7 +74,7 @@ SELECT unnest(master_metadata_snapshot()); -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2) @@ -95,7 +95,7 @@ SELECT unnest(master_metadata_snapshot()); ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) CREATE SCHEMA IF NOT EXISTS mx_testing_schema CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) @@ -121,7 +121,7 @@ SELECT unnest(master_metadata_snapshot()); ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) CREATE SCHEMA IF NOT EXISTS mx_testing_schema CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) @@ -140,7 +140,7 @@ SELECT unnest(master_metadata_snapshot()); ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) CREATE SCHEMA IF NOT EXISTS mx_testing_schema CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index 0052ad7ae..b4d2c3704 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -67,15 +67,15 @@ DROP EXTENSION citus; CREATE EXTENSION citus; -- re-add the nodes to the cluster SELECT master_add_node('localhost', :worker_1_port); - master_add_node -------------------------------- - (1,1,localhost,57637,default) + master_add_node +--------------------------------- + (1,1,localhost,57637,default,f) (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node -------------------------------- - (2,2,localhost,57638,default) + master_add_node +--------------------------------- + (2,2,localhost,57638,default,f) (1 row) -- create a table with a SERIAL column