From c7bf2021fa36728b5fb5ef0465ccaca21844c8d8 Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Tue, 11 Oct 2016 14:48:47 +0300 Subject: [PATCH] Add metadata infrastructure for pg_dist_local_group table --- .../distributed/utils/metadata_cache.c | 126 ++++++++++++++++++ src/include/distributed/metadata_cache.h | 2 + src/include/distributed/pg_dist_local_group.h | 37 +++++ 3 files changed, 165 insertions(+) create mode 100644 src/include/distributed/pg_dist_local_group.h diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 83d5ecd00..18c17dc6f 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -26,6 +26,7 @@ #include "distributed/colocation_utils.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" +#include "distributed/pg_dist_local_group.h" #include "distributed/pg_dist_node.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" @@ -52,6 +53,7 @@ static bool extensionLoaded = false; static Oid distShardRelationId = InvalidOid; static Oid distShardPlacementRelationId = InvalidOid; static Oid distNodeRelationId = InvalidOid; +static Oid distLocalGroupRelationId = InvalidOid; static Oid distPartitionRelationId = InvalidOid; static Oid distPartitionLogicalRelidIndexId = InvalidOid; static Oid distPartitionColocationidIndexId = InvalidOid; @@ -69,6 +71,11 @@ static HTAB *DistTableCacheHash = NULL; static HTAB *WorkerNodeHash = NULL; static bool workerNodeHashValid = false; +static bool invalidationRegistered = false; + +/* default value is -1, for schema node it's 0 and for worker nodes > 0 */ +static int LocalGroupId = -1; + /* built first time through in InitializePartitionCache */ static ScanKeyData DistPartitionScanKey[1]; static ScanKeyData DistShardScanKey[1]; @@ -93,6 +100,7 @@ static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); static List * DistTableOidList(void); +static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId); static List * LookupDistShardTuples(Oid relationId); static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, @@ -107,6 +115,7 @@ static void CachedRelationLookup(const char *relationName, Oid *cachedOid); PG_FUNCTION_INFO_V1(master_dist_partition_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_shard_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate); +PG_FUNCTION_INFO_V1(master_dist_local_group_cache_invalidate); /* @@ -668,6 +677,16 @@ DistNodeRelationId(void) } +/* return oid of pg_dist_local_group relation */ +Oid +DistLocalGroupIdRelationId(void) +{ + CachedRelationLookup("pg_dist_local_group", &distLocalGroupRelationId); + + return distLocalGroupRelationId; +} + + /* return oid of pg_dist_partition relation */ Oid DistPartitionRelationId(void) @@ -993,6 +1012,29 @@ master_dist_node_cache_invalidate(PG_FUNCTION_ARGS) } +/* + * master_dist_local_group_cache_invalidate is a trigger function that performs + * relcache invalidations when the contents of pg_dist_local_group are changed + * on the SQL level. + * + * NB: We decided there is little point in checking permissions here, there + * are much easier ways to waste CPU than causing cache invalidations. + */ +Datum +master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS) +{ + if (!CALLED_AS_TRIGGER(fcinfo)) + { + ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("must be called as trigger"))); + } + + CitusInvalidateRelcacheByRelid(DistLocalGroupIdRelationId()); + + PG_RETURN_DATUM(PointerGetDatum(NULL)); +} + + /* initialize the infrastructure for the metadata cache */ static void InitializeDistTableCache(void) @@ -1149,6 +1191,74 @@ InitializeWorkerNodeCache(void) } +/* + * GetLocalGroupId returns the group identifier of the local node. The function assumes + * that pg_dist_local_node_group has exactly one row and has at least one column. + * Otherwise, the function errors out. + */ +int +GetLocalGroupId(void) +{ + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 0; + HeapTuple heapTuple = NULL; + TupleDesc tupleDescriptor = NULL; + Oid groupId = InvalidOid; + Relation pgDistLocalGroupId = NULL; + + /* + * Already set the group id, no need to read the heap again. + */ + if (LocalGroupId != -1) + { + return LocalGroupId; + } + + pgDistLocalGroupId = heap_open(DistLocalGroupIdRelationId(), AccessShareLock); + + scanDescriptor = systable_beginscan(pgDistLocalGroupId, + InvalidOid, false, + NULL, scanKeyCount, scanKey); + + tupleDescriptor = RelationGetDescr(pgDistLocalGroupId); + + heapTuple = systable_getnext(scanDescriptor); + + if (HeapTupleIsValid(heapTuple)) + { + bool isNull = false; + Datum groupIdDatum = heap_getattr(heapTuple, + Anum_pg_dist_local_groupid, + tupleDescriptor, &isNull); + + groupId = DatumGetUInt32(groupIdDatum); + } + else + { + elog(ERROR, "could not find any entries in pg_dist_local_group"); + } + + systable_endscan(scanDescriptor); + heap_close(pgDistLocalGroupId, AccessShareLock); + + /* prevent multiple invalidation registrations */ + if (!invalidationRegistered) + { + /* Watch for invalidation events. */ + CacheRegisterRelcacheCallback(InvalidateLocalGroupIdRelationCacheCallback, + (Datum) 0); + + invalidationRegistered = true; + } + + /* set the local cache variable */ + LocalGroupId = groupId; + + return groupId; +} + + /* * WorkerNodeHashCode computes the hash code for a worker node from the node's * host name and port number. Nodes that only differ by their rack locations @@ -1270,6 +1380,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) extensionLoaded = false; distShardRelationId = InvalidOid; distShardPlacementRelationId = InvalidOid; + distLocalGroupRelationId = InvalidOid; distPartitionRelationId = InvalidOid; distPartitionLogicalRelidIndexId = InvalidOid; distPartitionColocationidIndexId = InvalidOid; @@ -1344,6 +1455,21 @@ InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId) } +/* + * InvalidateLocalGroupIdRelationCacheCallback sets the LocalGroupId to + * the default value. + */ +static void +InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId) +{ + /* when invalidation happens simply set the LocalGroupId to the default value */ + if (relationId == InvalidOid || relationId == distLocalGroupRelationId) + { + LocalGroupId = -1; + } +} + + /* * LookupDistPartitionTuple searches pg_dist_partition for relationId's entry * and returns that or, if no matching entry was found, NULL. diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 2985c2b71..7a32854f1 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -56,6 +56,7 @@ extern bool IsDistributedTable(Oid relationId); extern List * DistributedTableList(void); extern ShardInterval * LoadShardInterval(uint64 shardId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); +extern int GetLocalGroupId(void); extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateNodeCache(void); @@ -69,6 +70,7 @@ extern Oid DistPartitionRelationId(void); extern Oid DistShardRelationId(void); extern Oid DistShardPlacementRelationId(void); extern Oid DistNodeRelationId(void); +extern Oid DistLocalGroupIdRelationId(void); /* index oids */ extern Oid DistPartitionLogicalRelidIndexId(void); diff --git a/src/include/distributed/pg_dist_local_group.h b/src/include/distributed/pg_dist_local_group.h new file mode 100644 index 000000000..5d0f8f47a --- /dev/null +++ b/src/include/distributed/pg_dist_local_group.h @@ -0,0 +1,37 @@ +/*------------------------------------------------------------------------- + * + * pg_dist_local_group.h + * definition of the relation that holds the local group id (pg_dist_local_group). + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PG_DIST_LOCAL_GROUP_H +#define PG_DIST_LOCAL_GROUP_H + +/* ---------------- + * pg_dist_local_group definition. + * ---------------- + */ +typedef struct FormData_pg_dist_local_group +{ + int groupid; +} FormData_pg_dist_local_group; + +/* ---------------- + * FormData_pg_dist_local_group corresponds to a pointer to a tuple with + * the format of pg_dist_local_group relation. + * ---------------- + */ +typedef FormData_pg_dist_local_group *Form_pg_dist_local_group; + +/* ---------------- + * compiler constants for pg_dist_local_group + * ---------------- + */ +#define Natts_pg_dist_local_group 1 +#define Anum_pg_dist_local_groupid 1 + +#endif /* PG_DIST_LOCAL_GROUP_H */