Merge pull request #865 from citusdata/add_pg_dist_local_group_table

Add pg_dist_local_group Metadata Table
pull/775/merge
Eren Başak 2016-10-17 12:02:54 +03:00 committed by GitHub
commit 64c8972c19
18 changed files with 245 additions and 51 deletions

View File

@ -8,7 +8,7 @@ EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \ 5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9
# All citus--*.sql files in the source directory # All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -74,6 +74,8 @@ $(EXTENSION)--6.0-7.sql: $(EXTENSION)--6.0-6.sql $(EXTENSION)--6.0-6--6.0-7.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--6.0-8.sql: $(EXTENSION)--6.0-7.sql $(EXTENSION)--6.0-7--6.0-8.sql $(EXTENSION)--6.0-8.sql: $(EXTENSION)--6.0-7.sql $(EXTENSION)--6.0-7--6.0-8.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--6.0-9.sql: $(EXTENSION)--6.0-8.sql $(EXTENSION)--6.0-8--6.0-9.sql
cat $^ > $@
NO_PGXS = 1 NO_PGXS = 1

View File

@ -1,5 +1,3 @@
/* citus--5.2-1--5.2-2.sql */
/* /*
* Replace oid column in pg_dist_shard_placement with an sequence column. * Replace oid column in pg_dist_shard_placement with an sequence column.
*/ */

View File

@ -1,4 +1,4 @@
/* citus--6.0-5--6.0-6.sql */ /* citus--6.0-6--6.0-7.sql */
CREATE FUNCTION pg_catalog.get_colocated_table_array(regclass) CREATE FUNCTION pg_catalog.get_colocated_table_array(regclass)
RETURNS regclass[] RETURNS regclass[]

View File

@ -0,0 +1,11 @@
CREATE TABLE citus.pg_dist_local_group(
groupid int NOT NULL PRIMARY KEY)
;
/* insert the default value for being the coordinator node */
INSERT INTO citus.pg_dist_local_group VALUES (0);
ALTER TABLE citus.pg_dist_local_group SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.pg_dist_local_group TO public;
ALTER TABLE pg_catalog.pg_dist_node ADD COLUMN hasmetadata bool NOT NULL DEFAULT false;

View File

@ -1,6 +1,6 @@
# Citus extension # Citus extension
comment = 'Citus distributed database' comment = 'Citus distributed database'
default_version = '6.0-8' default_version = '6.0-9'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog

View File

@ -172,21 +172,23 @@ NodeListInsertCommand(List *workerNodeList)
/* generate the query without any values yet */ /* generate the query without any values yet */
appendStringInfo(nodeListInsertCommand, appendStringInfo(nodeListInsertCommand,
"INSERT INTO pg_dist_node " "INSERT INTO pg_dist_node "
"(nodeid, groupid, nodename, nodeport, noderack) " "(nodeid, groupid, nodename, nodeport, noderack, hasmetadata) "
"VALUES "); "VALUES ");
/* iterate over the worker nodes, add the values */ /* iterate over the worker nodes, add the values */
foreach(workerNodeCell, workerNodeList) foreach(workerNodeCell, workerNodeList)
{ {
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
char *hasMetadaString = workerNode->hasMetadata ? "TRUE" : "FALSE";
appendStringInfo(nodeListInsertCommand, appendStringInfo(nodeListInsertCommand,
"(%d, %d, %s, %d, '%s')", "(%d, %d, %s, %d, '%s', %s)",
workerNode->nodeId, workerNode->nodeId,
workerNode->groupId, workerNode->groupId,
quote_literal_cstr(workerNode->workerName), quote_literal_cstr(workerNode->workerName),
workerNode->workerPort, workerNode->workerPort,
workerNode->workerRack); workerNode->workerRack,
hasMetadaString);
processedWorkerNodeCount++; processedWorkerNodeCount++;
if (processedWorkerNodeCount != workerCount) if (processedWorkerNodeCount != workerCount)

View File

@ -26,6 +26,7 @@
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/pg_dist_local_group.h"
#include "distributed/pg_dist_node.h" #include "distributed/pg_dist_node.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
@ -52,6 +53,7 @@ static bool extensionLoaded = false;
static Oid distShardRelationId = InvalidOid; static Oid distShardRelationId = InvalidOid;
static Oid distShardPlacementRelationId = InvalidOid; static Oid distShardPlacementRelationId = InvalidOid;
static Oid distNodeRelationId = InvalidOid; static Oid distNodeRelationId = InvalidOid;
static Oid distLocalGroupRelationId = InvalidOid;
static Oid distPartitionRelationId = InvalidOid; static Oid distPartitionRelationId = InvalidOid;
static Oid distPartitionLogicalRelidIndexId = InvalidOid; static Oid distPartitionLogicalRelidIndexId = InvalidOid;
static Oid distPartitionColocationidIndexId = InvalidOid; static Oid distPartitionColocationidIndexId = InvalidOid;
@ -69,6 +71,11 @@ static HTAB *DistTableCacheHash = NULL;
static HTAB *WorkerNodeHash = NULL; static HTAB *WorkerNodeHash = NULL;
static bool workerNodeHashValid = false; static bool workerNodeHashValid = false;
static bool invalidationRegistered = false;
/* default value is -1, for schema node it's 0 and for worker nodes > 0 */
static int LocalGroupId = -1;
/* built first time through in InitializePartitionCache */ /* built first time through in InitializePartitionCache */
static ScanKeyData DistPartitionScanKey[1]; static ScanKeyData DistPartitionScanKey[1];
static ScanKeyData DistShardScanKey[1]; static ScanKeyData DistShardScanKey[1];
@ -93,6 +100,7 @@ static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId);
static List * DistTableOidList(void); static List * DistTableOidList(void);
static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId);
static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId);
static List * LookupDistShardTuples(Oid relationId); static List * LookupDistShardTuples(Oid relationId);
static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
@ -107,6 +115,7 @@ static void CachedRelationLookup(const char *relationName, Oid *cachedOid);
PG_FUNCTION_INFO_V1(master_dist_partition_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_partition_cache_invalidate);
PG_FUNCTION_INFO_V1(master_dist_shard_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_shard_cache_invalidate);
PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate);
PG_FUNCTION_INFO_V1(master_dist_local_group_cache_invalidate);
/* /*
@ -668,6 +677,16 @@ DistNodeRelationId(void)
} }
/* return oid of pg_dist_local_group relation */
Oid
DistLocalGroupIdRelationId(void)
{
CachedRelationLookup("pg_dist_local_group", &distLocalGroupRelationId);
return distLocalGroupRelationId;
}
/* return oid of pg_dist_partition relation */ /* return oid of pg_dist_partition relation */
Oid Oid
DistPartitionRelationId(void) DistPartitionRelationId(void)
@ -993,6 +1012,29 @@ master_dist_node_cache_invalidate(PG_FUNCTION_ARGS)
} }
/*
* master_dist_local_group_cache_invalidate is a trigger function that performs
* relcache invalidations when the contents of pg_dist_local_group are changed
* on the SQL level.
*
* NB: We decided there is little point in checking permissions here, there
* are much easier ways to waste CPU than causing cache invalidations.
*/
Datum
master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS)
{
if (!CALLED_AS_TRIGGER(fcinfo))
{
ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
errmsg("must be called as trigger")));
}
CitusInvalidateRelcacheByRelid(DistLocalGroupIdRelationId());
PG_RETURN_DATUM(PointerGetDatum(NULL));
}
/* initialize the infrastructure for the metadata cache */ /* initialize the infrastructure for the metadata cache */
static void static void
InitializeDistTableCache(void) InitializeDistTableCache(void)
@ -1122,6 +1164,7 @@ InitializeWorkerNodeCache(void)
workerNode->groupId = currentNode->groupId; workerNode->groupId = currentNode->groupId;
workerNode->nodeId = currentNode->nodeId; workerNode->nodeId = currentNode->nodeId;
strlcpy(workerNode->workerRack, currentNode->workerRack, WORKER_LENGTH); strlcpy(workerNode->workerRack, currentNode->workerRack, WORKER_LENGTH);
workerNode->hasMetadata = currentNode->hasMetadata;
if (handleFound) if (handleFound)
{ {
@ -1149,6 +1192,74 @@ InitializeWorkerNodeCache(void)
} }
/*
* GetLocalGroupId returns the group identifier of the local node. The function assumes
* that pg_dist_local_node_group has exactly one row and has at least one column.
* Otherwise, the function errors out.
*/
int
GetLocalGroupId(void)
{
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 0;
HeapTuple heapTuple = NULL;
TupleDesc tupleDescriptor = NULL;
Oid groupId = InvalidOid;
Relation pgDistLocalGroupId = NULL;
/*
* Already set the group id, no need to read the heap again.
*/
if (LocalGroupId != -1)
{
return LocalGroupId;
}
pgDistLocalGroupId = heap_open(DistLocalGroupIdRelationId(), AccessShareLock);
scanDescriptor = systable_beginscan(pgDistLocalGroupId,
InvalidOid, false,
NULL, scanKeyCount, scanKey);
tupleDescriptor = RelationGetDescr(pgDistLocalGroupId);
heapTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(heapTuple))
{
bool isNull = false;
Datum groupIdDatum = heap_getattr(heapTuple,
Anum_pg_dist_local_groupid,
tupleDescriptor, &isNull);
groupId = DatumGetUInt32(groupIdDatum);
}
else
{
elog(ERROR, "could not find any entries in pg_dist_local_group");
}
systable_endscan(scanDescriptor);
heap_close(pgDistLocalGroupId, AccessShareLock);
/* prevent multiple invalidation registrations */
if (!invalidationRegistered)
{
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(InvalidateLocalGroupIdRelationCacheCallback,
(Datum) 0);
invalidationRegistered = true;
}
/* set the local cache variable */
LocalGroupId = groupId;
return groupId;
}
/* /*
* WorkerNodeHashCode computes the hash code for a worker node from the node's * WorkerNodeHashCode computes the hash code for a worker node from the node's
* host name and port number. Nodes that only differ by their rack locations * host name and port number. Nodes that only differ by their rack locations
@ -1270,6 +1381,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
extensionLoaded = false; extensionLoaded = false;
distShardRelationId = InvalidOid; distShardRelationId = InvalidOid;
distShardPlacementRelationId = InvalidOid; distShardPlacementRelationId = InvalidOid;
distLocalGroupRelationId = InvalidOid;
distPartitionRelationId = InvalidOid; distPartitionRelationId = InvalidOid;
distPartitionLogicalRelidIndexId = InvalidOid; distPartitionLogicalRelidIndexId = InvalidOid;
distPartitionColocationidIndexId = InvalidOid; distPartitionColocationidIndexId = InvalidOid;
@ -1344,6 +1456,21 @@ InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId)
} }
/*
* InvalidateLocalGroupIdRelationCacheCallback sets the LocalGroupId to
* the default value.
*/
static void
InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId)
{
/* when invalidation happens simply set the LocalGroupId to the default value */
if (relationId == InvalidOid || relationId == distLocalGroupRelationId)
{
LocalGroupId = -1;
}
}
/* /*
* LookupDistPartitionTuple searches pg_dist_partition for relationId's entry * LookupDistPartitionTuple searches pg_dist_partition for relationId's entry
* and returns that or, if no matching entry was found, NULL. * and returns that or, if no matching entry was found, NULL.

View File

@ -43,13 +43,13 @@ int GroupSize = 1;
/* local function forward declarations */ /* local function forward declarations */
static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId,
char *nodeRack); char *nodeRack, bool hasMetadata);
static Datum GenerateNodeTuple(WorkerNode *workerNode); static Datum GenerateNodeTuple(WorkerNode *workerNode);
static int32 GetNextGroupId(void); static int32 GetNextGroupId(void);
static uint32 GetMaxGroupId(void); static uint32 GetMaxGroupId(void);
static int GetNextNodeId(void); static int GetNextNodeId(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId, static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId,
char *nodeRack); char *nodeRack, bool hasMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport); static void DeleteNodeRow(char *nodename, int32 nodeport);
static List * ParseWorkerNodeFileAndRename(void); static List * ParseWorkerNodeFileAndRename(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
@ -71,8 +71,10 @@ master_add_node(PG_FUNCTION_ARGS)
char *nodeNameString = text_to_cstring(nodeName); char *nodeNameString = text_to_cstring(nodeName);
int32 groupId = 0; int32 groupId = 0;
char *nodeRack = WORKER_DEFAULT_RACK; char *nodeRack = WORKER_DEFAULT_RACK;
bool hasMetadata = false;
Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack); Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
hasMetadata);
PG_RETURN_CSTRING(returnData); PG_RETURN_CSTRING(returnData);
} }
@ -122,7 +124,7 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0, AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0,
workerNode->workerRack); workerNode->workerRack, false);
} }
PG_RETURN_BOOL(true); PG_RETURN_BOOL(true);
@ -202,7 +204,8 @@ ReadWorkerNodes()
* new node is inserted into the local pg_dist_node. * new node is inserted into the local pg_dist_node.
*/ */
static Datum static Datum
AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack) AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
bool hasMetadata)
{ {
Relation pgDistNode = NULL; Relation pgDistNode = NULL;
int nextNodeIdInt = 0; int nextNodeIdInt = 0;
@ -245,7 +248,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack)
/* generate the new node id from the sequence */ /* generate the new node id from the sequence */
nextNodeIdInt = GetNextNodeId(); nextNodeIdInt = GetNextNodeId();
InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack); InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata);
heap_close(pgDistNode, AccessExclusiveLock); heap_close(pgDistNode, AccessExclusiveLock);
@ -280,6 +283,7 @@ GenerateNodeTuple(WorkerNode *workerNode)
values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(workerNode->workerName); values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(workerNode->workerName);
values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(workerNode->workerPort); values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(workerNode->workerPort);
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(workerNode->workerRack); values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(workerNode->workerRack);
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(workerNode->hasMetadata);
/* open shard relation and insert new tuple */ /* open shard relation and insert new tuple */
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
@ -400,7 +404,8 @@ GetNextNodeId()
* given values into that system catalog. * given values into that system catalog.
*/ */
static void static void
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack) InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack,
bool hasMetadata)
{ {
Relation pgDistNode = NULL; Relation pgDistNode = NULL;
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
@ -417,6 +422,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *
values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName); values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName);
values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort); values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort);
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack); values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack);
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata);
/* open shard relation and insert new tuple */ /* open shard relation and insert new tuple */
pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
@ -614,6 +620,7 @@ ParseWorkerNodeFileAndRename()
strlcpy(workerNode->workerName, nodeName, WORKER_LENGTH); strlcpy(workerNode->workerName, nodeName, WORKER_LENGTH);
strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH); strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH);
workerNode->workerPort = nodePort; workerNode->workerPort = nodePort;
workerNode->hasMetadata = false;
workerNodeList = lappend(workerNodeList, workerNode); workerNodeList = lappend(workerNodeList, workerNode);
} }
@ -651,6 +658,8 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
tupleDescriptor, &isNull); tupleDescriptor, &isNull);
Datum nodeRack = heap_getattr(heapTuple, Anum_pg_dist_node_noderack, Datum nodeRack = heap_getattr(heapTuple, Anum_pg_dist_node_noderack,
tupleDescriptor, &isNull); tupleDescriptor, &isNull);
Datum hasMetadata = heap_getattr(heapTuple, Anum_pg_dist_node_hasmetadata,
tupleDescriptor, &isNull);
Assert(!HeapTupleHasNulls(heapTuple)); Assert(!HeapTupleHasNulls(heapTuple));
@ -660,6 +669,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
workerNode->groupId = DatumGetUInt32(groupId); workerNode->groupId = DatumGetUInt32(groupId);
strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH); strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH);
strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH); strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH);
workerNode->hasMetadata = DatumGetBool(hasMetadata);
return workerNode; return workerNode;
} }

View File

@ -56,6 +56,7 @@ extern bool IsDistributedTable(Oid relationId);
extern List * DistributedTableList(void); extern List * DistributedTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId); extern ShardInterval * LoadShardInterval(uint64 shardId);
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
extern int GetLocalGroupId(void);
extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateNodeCache(void); extern void CitusInvalidateNodeCache(void);
@ -69,6 +70,7 @@ extern Oid DistPartitionRelationId(void);
extern Oid DistShardRelationId(void); extern Oid DistShardRelationId(void);
extern Oid DistShardPlacementRelationId(void); extern Oid DistShardPlacementRelationId(void);
extern Oid DistNodeRelationId(void); extern Oid DistNodeRelationId(void);
extern Oid DistLocalGroupIdRelationId(void);
/* index oids */ /* index oids */
extern Oid DistPartitionLogicalRelidIndexId(void); extern Oid DistPartitionLogicalRelidIndexId(void);

View File

@ -0,0 +1,37 @@
/*-------------------------------------------------------------------------
*
* pg_dist_local_group.h
* definition of the relation that holds the local group id (pg_dist_local_group).
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_DIST_LOCAL_GROUP_H
#define PG_DIST_LOCAL_GROUP_H
/* ----------------
* pg_dist_local_group definition.
* ----------------
*/
typedef struct FormData_pg_dist_local_group
{
int groupid;
} FormData_pg_dist_local_group;
/* ----------------
* FormData_pg_dist_local_group corresponds to a pointer to a tuple with
* the format of pg_dist_local_group relation.
* ----------------
*/
typedef FormData_pg_dist_local_group *Form_pg_dist_local_group;
/* ----------------
* compiler constants for pg_dist_local_group
* ----------------
*/
#define Natts_pg_dist_local_group 1
#define Anum_pg_dist_local_groupid 1
#endif /* PG_DIST_LOCAL_GROUP_H */

View File

@ -22,6 +22,7 @@ typedef struct FormData_pg_dist_node
#ifdef CATALOG_VARLEN #ifdef CATALOG_VARLEN
text nodename; text nodename;
int nodeport; int nodeport;
bool hasmetadata;
#endif #endif
} FormData_pg_dist_node; } FormData_pg_dist_node;
@ -36,12 +37,13 @@ typedef FormData_pg_dist_node *Form_pg_dist_node;
* compiler constants for pg_dist_node * compiler constants for pg_dist_node
* ---------------- * ----------------
*/ */
#define Natts_pg_dist_node 5 #define Natts_pg_dist_node 6
#define Anum_pg_dist_node_nodeid 1 #define Anum_pg_dist_node_nodeid 1
#define Anum_pg_dist_node_groupid 2 #define Anum_pg_dist_node_groupid 2
#define Anum_pg_dist_node_nodename 3 #define Anum_pg_dist_node_nodename 3
#define Anum_pg_dist_node_nodeport 4 #define Anum_pg_dist_node_nodeport 4
#define Anum_pg_dist_node_noderack 5 #define Anum_pg_dist_node_noderack 5
#define Anum_pg_dist_node_hasmetadata 6
#define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq" #define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq"
#define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq" #define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq"

View File

@ -42,6 +42,7 @@ typedef struct WorkerNode
char workerName[WORKER_LENGTH]; /* node's name */ char workerName[WORKER_LENGTH]; /* node's name */
uint32 groupId; /* node's groupId; same for the nodes that are in the same group */ uint32 groupId; /* node's groupId; same for the nodes that are in the same group */
char workerRack[WORKER_LENGTH]; /* node's network location */ char workerRack[WORKER_LENGTH]; /* node's network location */
bool hasMetadata; /* node gets metadata changes */
} WorkerNode; } WorkerNode;

View File

@ -3,15 +3,15 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000;
-- Tests functions related to cluster membership -- Tests functions related to cluster membership
-- add the nodes to the cluster -- add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port); SELECT master_add_node('localhost', :worker_1_port);
master_add_node master_add_node
------------------------------- ---------------------------------
(1,1,localhost,57637,default) (1,1,localhost,57637,default,f)
(1 row) (1 row)
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
master_add_node master_add_node
------------------------------- ---------------------------------
(2,2,localhost,57638,default) (2,2,localhost,57638,default,f)
(1 row) (1 row)
-- get the active nodes -- get the active nodes
@ -24,9 +24,9 @@ SELECT master_get_active_worker_nodes();
-- try to add the node again when it is activated -- try to add the node again when it is activated
SELECT master_add_node('localhost', :worker_1_port); SELECT master_add_node('localhost', :worker_1_port);
master_add_node master_add_node
------------------------------- ---------------------------------
(1,1,localhost,57637,default) (1,1,localhost,57637,default,f)
(1 row) (1 row)
-- get the active nodes -- get the active nodes
@ -53,9 +53,9 @@ SELECT master_get_active_worker_nodes();
-- add some shard placements to the cluster -- add some shard placements to the cluster
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
master_add_node master_add_node
------------------------------- ---------------------------------
(3,3,localhost,57638,default) (3,3,localhost,57638,default,f)
(1 row) (1 row)
CREATE TABLE cluster_management_test (col_1 text, col_2 int); CREATE TABLE cluster_management_test (col_1 text, col_2 int);
@ -125,9 +125,9 @@ SELECT master_get_active_worker_nodes();
-- clean-up -- clean-up
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
master_add_node master_add_node
------------------------------- ---------------------------------
(4,4,localhost,57638,default) (4,4,localhost,57638,default,f)
(1 row) (1 row)
UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port; UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port;

View File

@ -21,15 +21,15 @@ RESET client_min_messages;
CREATE EXTENSION citus; CREATE EXTENSION citus;
-- re-add the nodes to the cluster -- re-add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port); SELECT master_add_node('localhost', :worker_1_port);
master_add_node master_add_node
------------------------------- ---------------------------------
(1,1,localhost,57637,default) (1,1,localhost,57637,default,f)
(1 row) (1 row)
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
master_add_node master_add_node
------------------------------- ---------------------------------
(2,2,localhost,57638,default) (2,2,localhost,57638,default,f)
(1 row) (1 row)
-- verify that a table can be created after the extension has been dropped and recreated -- verify that a table can be created after the extension has been dropped and recreated

View File

@ -34,6 +34,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-5';
ALTER EXTENSION citus UPDATE TO '6.0-6'; ALTER EXTENSION citus UPDATE TO '6.0-6';
ALTER EXTENSION citus UPDATE TO '6.0-7'; ALTER EXTENSION citus UPDATE TO '6.0-7';
ALTER EXTENSION citus UPDATE TO '6.0-8'; ALTER EXTENSION citus UPDATE TO '6.0-8';
ALTER EXTENSION citus UPDATE TO '6.0-9';
-- drop extension an re-create in newest version -- drop extension an re-create in newest version
DROP EXTENSION citus; DROP EXTENSION citus;
\c \c

View File

@ -25,11 +25,11 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
-- Show that, with no MX tables, metadata snapshot contains only the delete commands and -- Show that, with no MX tables, metadata snapshot contains only the delete commands and
-- pg_dist_node entries -- pg_dist_node entries
SELECT unnest(master_metadata_snapshot()); SELECT unnest(master_metadata_snapshot());
unnest unnest
------------------------------------------------------------------------------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
(3 rows) (3 rows)
-- Create a test table with constraints and SERIAL -- Create a test table with constraints and SERIAL
@ -55,7 +55,7 @@ SELECT unnest(master_metadata_snapshot());
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
@ -74,7 +74,7 @@ SELECT unnest(master_metadata_snapshot());
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2) CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2)
@ -95,7 +95,7 @@ SELECT unnest(master_metadata_snapshot());
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
CREATE SCHEMA IF NOT EXISTS mx_testing_schema CREATE SCHEMA IF NOT EXISTS mx_testing_schema
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
@ -121,7 +121,7 @@ SELECT unnest(master_metadata_snapshot());
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
CREATE SCHEMA IF NOT EXISTS mx_testing_schema CREATE SCHEMA IF NOT EXISTS mx_testing_schema
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
@ -140,7 +140,7 @@ SELECT unnest(master_metadata_snapshot());
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default') INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
CREATE SCHEMA IF NOT EXISTS mx_testing_schema CREATE SCHEMA IF NOT EXISTS mx_testing_schema
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)

View File

@ -67,15 +67,15 @@ DROP EXTENSION citus;
CREATE EXTENSION citus; CREATE EXTENSION citus;
-- re-add the nodes to the cluster -- re-add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port); SELECT master_add_node('localhost', :worker_1_port);
master_add_node master_add_node
------------------------------- ---------------------------------
(1,1,localhost,57637,default) (1,1,localhost,57637,default,f)
(1 row) (1 row)
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
master_add_node master_add_node
------------------------------- ---------------------------------
(2,2,localhost,57638,default) (2,2,localhost,57638,default,f)
(1 row) (1 row)
-- create a table with a SERIAL column -- create a table with a SERIAL column

View File

@ -39,6 +39,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-5';
ALTER EXTENSION citus UPDATE TO '6.0-6'; ALTER EXTENSION citus UPDATE TO '6.0-6';
ALTER EXTENSION citus UPDATE TO '6.0-7'; ALTER EXTENSION citus UPDATE TO '6.0-7';
ALTER EXTENSION citus UPDATE TO '6.0-8'; ALTER EXTENSION citus UPDATE TO '6.0-8';
ALTER EXTENSION citus UPDATE TO '6.0-9';
-- drop extension an re-create in newest version -- drop extension an re-create in newest version
DROP EXTENSION citus; DROP EXTENSION citus;