Add metadata infrastructure for pg_dist_local_group table

pull/865/head
Eren Basak 2016-10-11 14:48:47 +03:00
parent 8f477d18f1
commit c7bf2021fa
3 changed files with 165 additions and 0 deletions

View File

@ -26,6 +26,7 @@
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/pg_dist_local_group.h"
#include "distributed/pg_dist_node.h" #include "distributed/pg_dist_node.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
@ -52,6 +53,7 @@ static bool extensionLoaded = false;
static Oid distShardRelationId = InvalidOid; static Oid distShardRelationId = InvalidOid;
static Oid distShardPlacementRelationId = InvalidOid; static Oid distShardPlacementRelationId = InvalidOid;
static Oid distNodeRelationId = InvalidOid; static Oid distNodeRelationId = InvalidOid;
static Oid distLocalGroupRelationId = InvalidOid;
static Oid distPartitionRelationId = InvalidOid; static Oid distPartitionRelationId = InvalidOid;
static Oid distPartitionLogicalRelidIndexId = InvalidOid; static Oid distPartitionLogicalRelidIndexId = InvalidOid;
static Oid distPartitionColocationidIndexId = InvalidOid; static Oid distPartitionColocationidIndexId = InvalidOid;
@ -69,6 +71,11 @@ static HTAB *DistTableCacheHash = NULL;
static HTAB *WorkerNodeHash = NULL; static HTAB *WorkerNodeHash = NULL;
static bool workerNodeHashValid = false; static bool workerNodeHashValid = false;
static bool invalidationRegistered = false;
/* default value is -1, for schema node it's 0 and for worker nodes > 0 */
static int LocalGroupId = -1;
/* built first time through in InitializePartitionCache */ /* built first time through in InitializePartitionCache */
static ScanKeyData DistPartitionScanKey[1]; static ScanKeyData DistPartitionScanKey[1];
static ScanKeyData DistShardScanKey[1]; static ScanKeyData DistShardScanKey[1];
@ -93,6 +100,7 @@ static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId);
static List * DistTableOidList(void); static List * DistTableOidList(void);
static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId);
static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId);
static List * LookupDistShardTuples(Oid relationId); static List * LookupDistShardTuples(Oid relationId);
static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
@ -107,6 +115,7 @@ static void CachedRelationLookup(const char *relationName, Oid *cachedOid);
PG_FUNCTION_INFO_V1(master_dist_partition_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_partition_cache_invalidate);
PG_FUNCTION_INFO_V1(master_dist_shard_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_shard_cache_invalidate);
PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate);
PG_FUNCTION_INFO_V1(master_dist_local_group_cache_invalidate);
/* /*
@ -668,6 +677,16 @@ DistNodeRelationId(void)
} }
/* return oid of pg_dist_local_group relation */
Oid
DistLocalGroupIdRelationId(void)
{
CachedRelationLookup("pg_dist_local_group", &distLocalGroupRelationId);
return distLocalGroupRelationId;
}
/* return oid of pg_dist_partition relation */ /* return oid of pg_dist_partition relation */
Oid Oid
DistPartitionRelationId(void) DistPartitionRelationId(void)
@ -993,6 +1012,29 @@ master_dist_node_cache_invalidate(PG_FUNCTION_ARGS)
} }
/*
* master_dist_local_group_cache_invalidate is a trigger function that performs
* relcache invalidations when the contents of pg_dist_local_group are changed
* on the SQL level.
*
* NB: We decided there is little point in checking permissions here, there
* are much easier ways to waste CPU than causing cache invalidations.
*/
Datum
master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS)
{
if (!CALLED_AS_TRIGGER(fcinfo))
{
ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
errmsg("must be called as trigger")));
}
CitusInvalidateRelcacheByRelid(DistLocalGroupIdRelationId());
PG_RETURN_DATUM(PointerGetDatum(NULL));
}
/* initialize the infrastructure for the metadata cache */ /* initialize the infrastructure for the metadata cache */
static void static void
InitializeDistTableCache(void) InitializeDistTableCache(void)
@ -1149,6 +1191,74 @@ InitializeWorkerNodeCache(void)
} }
/*
* GetLocalGroupId returns the group identifier of the local node. The function assumes
* that pg_dist_local_node_group has exactly one row and has at least one column.
* Otherwise, the function errors out.
*/
int
GetLocalGroupId(void)
{
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 0;
HeapTuple heapTuple = NULL;
TupleDesc tupleDescriptor = NULL;
Oid groupId = InvalidOid;
Relation pgDistLocalGroupId = NULL;
/*
* Already set the group id, no need to read the heap again.
*/
if (LocalGroupId != -1)
{
return LocalGroupId;
}
pgDistLocalGroupId = heap_open(DistLocalGroupIdRelationId(), AccessShareLock);
scanDescriptor = systable_beginscan(pgDistLocalGroupId,
InvalidOid, false,
NULL, scanKeyCount, scanKey);
tupleDescriptor = RelationGetDescr(pgDistLocalGroupId);
heapTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(heapTuple))
{
bool isNull = false;
Datum groupIdDatum = heap_getattr(heapTuple,
Anum_pg_dist_local_groupid,
tupleDescriptor, &isNull);
groupId = DatumGetUInt32(groupIdDatum);
}
else
{
elog(ERROR, "could not find any entries in pg_dist_local_group");
}
systable_endscan(scanDescriptor);
heap_close(pgDistLocalGroupId, AccessShareLock);
/* prevent multiple invalidation registrations */
if (!invalidationRegistered)
{
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(InvalidateLocalGroupIdRelationCacheCallback,
(Datum) 0);
invalidationRegistered = true;
}
/* set the local cache variable */
LocalGroupId = groupId;
return groupId;
}
/* /*
* WorkerNodeHashCode computes the hash code for a worker node from the node's * WorkerNodeHashCode computes the hash code for a worker node from the node's
* host name and port number. Nodes that only differ by their rack locations * host name and port number. Nodes that only differ by their rack locations
@ -1270,6 +1380,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
extensionLoaded = false; extensionLoaded = false;
distShardRelationId = InvalidOid; distShardRelationId = InvalidOid;
distShardPlacementRelationId = InvalidOid; distShardPlacementRelationId = InvalidOid;
distLocalGroupRelationId = InvalidOid;
distPartitionRelationId = InvalidOid; distPartitionRelationId = InvalidOid;
distPartitionLogicalRelidIndexId = InvalidOid; distPartitionLogicalRelidIndexId = InvalidOid;
distPartitionColocationidIndexId = InvalidOid; distPartitionColocationidIndexId = InvalidOid;
@ -1344,6 +1455,21 @@ InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId)
} }
/*
* InvalidateLocalGroupIdRelationCacheCallback sets the LocalGroupId to
* the default value.
*/
static void
InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId)
{
/* when invalidation happens simply set the LocalGroupId to the default value */
if (relationId == InvalidOid || relationId == distLocalGroupRelationId)
{
LocalGroupId = -1;
}
}
/* /*
* LookupDistPartitionTuple searches pg_dist_partition for relationId's entry * LookupDistPartitionTuple searches pg_dist_partition for relationId's entry
* and returns that or, if no matching entry was found, NULL. * and returns that or, if no matching entry was found, NULL.

View File

@ -56,6 +56,7 @@ extern bool IsDistributedTable(Oid relationId);
extern List * DistributedTableList(void); extern List * DistributedTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId); extern ShardInterval * LoadShardInterval(uint64 shardId);
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
extern int GetLocalGroupId(void);
extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateNodeCache(void); extern void CitusInvalidateNodeCache(void);
@ -69,6 +70,7 @@ extern Oid DistPartitionRelationId(void);
extern Oid DistShardRelationId(void); extern Oid DistShardRelationId(void);
extern Oid DistShardPlacementRelationId(void); extern Oid DistShardPlacementRelationId(void);
extern Oid DistNodeRelationId(void); extern Oid DistNodeRelationId(void);
extern Oid DistLocalGroupIdRelationId(void);
/* index oids */ /* index oids */
extern Oid DistPartitionLogicalRelidIndexId(void); extern Oid DistPartitionLogicalRelidIndexId(void);

View File

@ -0,0 +1,37 @@
/*-------------------------------------------------------------------------
*
* pg_dist_local_group.h
* definition of the relation that holds the local group id (pg_dist_local_group).
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_DIST_LOCAL_GROUP_H
#define PG_DIST_LOCAL_GROUP_H
/* ----------------
* pg_dist_local_group definition.
* ----------------
*/
typedef struct FormData_pg_dist_local_group
{
int groupid;
} FormData_pg_dist_local_group;
/* ----------------
* FormData_pg_dist_local_group corresponds to a pointer to a tuple with
* the format of pg_dist_local_group relation.
* ----------------
*/
typedef FormData_pg_dist_local_group *Form_pg_dist_local_group;
/* ----------------
* compiler constants for pg_dist_local_group
* ----------------
*/
#define Natts_pg_dist_local_group 1
#define Anum_pg_dist_local_groupid 1
#endif /* PG_DIST_LOCAL_GROUP_H */