diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 874bda931..5cbe6130d 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -28,8 +28,8 @@ #include "catalog/pg_class.h" #include "distributed/connection_cache.h" #include "distributed/listutils.h" -#include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 74c9f39d5..d03d7a407 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -55,6 +55,8 @@ static uint64 * AllocateUint64(uint64 value); static void RecordDistributedRelationDependencies(Oid distributedRelationId, Node *distributionKey); +static ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc, + HeapTuple heapTuple); /* exports for SQL callable functions */ @@ -240,6 +242,24 @@ CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval) } +/* + * CopyShardPlacement copies the values of the source placement into the + * target placement. + */ +void +CopyShardPlacement(ShardPlacement *srcPlacement, ShardPlacement *destPlacement) +{ + /* first copy all by-value fields */ + memcpy(destPlacement, srcPlacement, sizeof(ShardPlacement)); + + /* and then the fields pointing to external values */ + if (srcPlacement->nodeName) + { + destPlacement->nodeName = pstrdup(srcPlacement->nodeName); + } +} + + /* * ShardLength finds shard placements for the given shardId, extracts the length * of a finalized shard, and returns the shard's length. This function errors @@ -361,13 +381,17 @@ FinalizedShardPlacement(uint64 shardId, bool missingOk) /* - * ShardPlacementList finds shard placements for the given shardId from system - * catalogs, converts these placements to their in-memory representation, and - * returns the converted shard placements in a new list. + * BuildShardPlacementList finds shard placements for the given shardId from + * system catalogs, converts these placements to their in-memory + * representation, and returns the converted shard placements in a new list. + * + * This probably only should be called from metadata_cache.c. Resides here + * because it shares code with other routines in this file. */ List * -ShardPlacementList(uint64 shardId) +BuildShardPlacementList(ShardInterval *shardInterval) { + int64 shardId = shardInterval->shardId; List *shardPlacementList = NIL; Relation pgShardPlacement = NULL; SysScanDesc scanDescriptor = NULL; @@ -399,13 +423,6 @@ ShardPlacementList(uint64 shardId) systable_endscan(scanDescriptor); heap_close(pgShardPlacement, AccessShareLock); - /* if no shard placements are found, warn the user */ - if (shardPlacementList == NIL) - { - ereport(WARNING, (errmsg("could not find any shard placements for shardId " - UINT64_FORMAT, shardId))); - } - return shardPlacementList; } @@ -415,7 +432,7 @@ ShardPlacementList(uint64 shardId) * and converts this tuple to in-memory struct. The function assumes the * caller already has locks on the tuple, and doesn't perform any locking. */ -ShardPlacement * +static ShardPlacement * TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) { ShardPlacement *shardPlacement = NULL; diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 55b5329c1..98623fc5d 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -50,6 +50,33 @@ #include "utils/typcache.h" +/* + * ShardCacheEntry represents an entry in the shardId -> ShardInterval cache. + * To avoid duplicating data and invalidation logic between this cache and the + * DistTableCache, this only points into the DistTableCacheEntry of the + * shard's distributed table. + */ +typedef struct ShardCacheEntry +{ + /* hash key, needs to be first */ + int64 shardId; + + /* + * Cache entry for the distributed table a shard belongs to, possibly not + * valid. + */ + DistTableCacheEntry *tableEntry; + + /* + * Offset in tableEntry->sortedShardIntervalArray, only valid if + * tableEntry->isValid. We don't store pointers to the individual shard + * placements because that'd make invalidation a bit more complicated, and + * because there's simply no need. + */ + int shardIndex; +} ShardCacheEntry; + + /* state which should be cleared upon DROP EXTENSION */ static bool extensionLoaded = false; static Oid distShardRelationId = InvalidOid; @@ -74,6 +101,9 @@ static Oid extraDataContainerFuncId = InvalidOid; /* Hash table for informations about each partition */ static HTAB *DistTableCacheHash = NULL; +/* Hash table for informations about each shard */ +static HTAB *DistShardCacheHash = NULL; + /* Hash table for informations about worker nodes */ static HTAB *WorkerNodeHash = NULL; static bool workerNodeHashValid = false; @@ -89,7 +119,10 @@ static ScanKeyData DistShardScanKey[1]; /* local function forward declarations */ +static ShardCacheEntry * LookupShardCacheEntry(int64 shardId); static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId); +static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry); +static void BuildCachedShardList(DistTableCacheEntry *cacheEntry); static FmgrInfo * ShardIntervalCompareFunction(ShardInterval **shardIntervalArray, char partitionMethod); static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArray, @@ -109,6 +142,7 @@ static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId); static List * LookupDistShardTuples(Oid relationId); +static Oid LookupShardRelation(int64 shardId); static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, Oid *intervalTypeId, int32 *intervalTypeMod); static ShardInterval * TupleToShardInterval(HeapTuple heapTuple, @@ -179,58 +213,150 @@ DistributedTableList(void) /* - * LoadShardInterval reads shard metadata for given shardId from pg_dist_shard, - * and converts min/max values in these metadata to their properly typed datum - * representations. The function then allocates a structure that stores the read - * and converted values, and returns this structure. + * LoadShardInterval returns the, cached, metadata about a shard. + * + * The return value is a copy of the cached ShardInterval struct and may + * therefore be modified and/or freed. */ ShardInterval * LoadShardInterval(uint64 shardId) { - ShardInterval *shardInterval; - SysScanDesc scanDescriptor = NULL; - ScanKeyData scanKey[1]; - int scanKeyCount = 1; - HeapTuple heapTuple = NULL; - Form_pg_dist_shard shardForm = NULL; - DistTableCacheEntry *partitionEntry; - Oid intervalTypeId = InvalidOid; - int32 intervalTypeMod = -1; + ShardInterval *shardInterval = NULL; + ShardInterval *sourceShardInterval = NULL; + ShardCacheEntry *shardEntry = NULL; + DistTableCacheEntry *tableEntry = NULL; - Relation pgDistShard = heap_open(DistShardRelationId(), AccessShareLock); - TupleDesc tupleDescriptor = RelationGetDescr(pgDistShard); + shardEntry = LookupShardCacheEntry(shardId); - ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid, - BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); + tableEntry = shardEntry->tableEntry; - scanDescriptor = systable_beginscan(pgDistShard, - DistShardShardidIndexId(), true, - NULL, scanKeyCount, scanKey); + Assert(tableEntry->isDistributedTable); - heapTuple = systable_getnext(scanDescriptor); - if (!HeapTupleIsValid(heapTuple)) - { - ereport(ERROR, (errmsg("could not find valid entry for shard " - UINT64_FORMAT, shardId))); - } + /* the offset better be in a valid range */ + Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength); - shardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple); - partitionEntry = DistributedTableCacheEntry(shardForm->logicalrelid); + sourceShardInterval = tableEntry->sortedShardIntervalArray[shardEntry->shardIndex]; - GetPartitionTypeInputInfo(partitionEntry->partitionKeyString, - partitionEntry->partitionMethod, &intervalTypeId, - &intervalTypeMod); - - shardInterval = TupleToShardInterval(heapTuple, tupleDescriptor, intervalTypeId, - intervalTypeMod); - - systable_endscan(scanDescriptor); - heap_close(pgDistShard, AccessShareLock); + /* copy value to return */ + shardInterval = (ShardInterval *) palloc0(sizeof(ShardInterval)); + CopyShardInterval(sourceShardInterval, shardInterval); return shardInterval; } +/* + * ShardPlacementList returns the list of placements for the given shard from + * the cache. + * + * The returned list is deep copied from the cache and thus can be modified + * and pfree()d freely. + */ +List * +ShardPlacementList(uint64 shardId) +{ + ShardCacheEntry *shardEntry = NULL; + DistTableCacheEntry *tableEntry = NULL; + ShardPlacement *placementArray = NULL; + int numberOfPlacements = 0; + List *placementList = NIL; + int i = 0; + + shardEntry = LookupShardCacheEntry(shardId); + tableEntry = shardEntry->tableEntry; + + /* the offset better be in a valid range */ + Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength); + + placementArray = tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex]; + numberOfPlacements = tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex]; + + for (i = 0; i < numberOfPlacements; i++) + { + /* copy placement into target context */ + ShardPlacement *placement = CitusMakeNode(ShardPlacement); + CopyShardPlacement(&placementArray[i], placement); + + placementList = lappend(placementList, placement); + } + + /* if no shard placements are found, warn the user */ + if (numberOfPlacements == 0) + { + ereport(WARNING, (errmsg("could not find any shard placements for shardId " + UINT64_FORMAT, shardId))); + } + + return placementList; +} + + +/* + * LookupShardCacheEntry returns the cache entry belonging to a shard, or + * errors out if that shard is unknown. + */ +static ShardCacheEntry * +LookupShardCacheEntry(int64 shardId) +{ + ShardCacheEntry *shardEntry = NULL; + bool foundInCache = false; + bool recheck = false; + + /* probably not reachable */ + if (DistShardCacheHash == NULL) + { + InitializeDistTableCache(); + } + + /* lookup cache entry */ + shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache); + + if (!foundInCache) + { + /* + * A possible reason for not finding an entry in the cache is that the + * distributed table's cache entry hasn't been accessed. Thus look up + * the distributed table, and build the cache entry. Afterwards we + * know that the shard has to be in the cache if it exists. If the + * shard does *not* exist LookupShardRelation() will error out. + */ + Oid relationId = LookupShardRelation(shardId); + + /* trigger building the cache for the shard id */ + LookupDistTableCacheEntry(relationId); + + recheck = true; + } + else if (!shardEntry->tableEntry->isValid) + { + /* + * The cache entry might not be valid right now. Reload cache entry + * and recheck (as the offset might have changed). + */ + LookupDistTableCacheEntry(shardEntry->tableEntry->relationId); + recheck = true; + } + + /* + * If we (re-)loaded the table cache, re-search the shard cache - the + * shard index might have changed. If we still can't find the entry, it + * can't exist. + */ + if (recheck) + { + shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache); + + if (!foundInCache) + { + ereport(ERROR, (errmsg("could not find valid entry for shard " + UINT64_FORMAT, shardId))); + } + } + + return shardEntry; +} + + /* * DistributedTableCacheEntry looks up a pg_dist_partition entry for a * relation. @@ -276,99 +402,172 @@ LookupDistTableCacheEntry(Oid relationId) { DistTableCacheEntry *cacheEntry = NULL; bool foundInCache = false; - HeapTuple distPartitionTuple = NULL; - char *partitionKeyString = NULL; - char partitionMethod = 0; - uint32 colocationId = INVALID_COLOCATION_ID; - char replicationModel = 0; - List *distShardTupleList = NIL; - int shardIntervalArrayLength = 0; - ShardInterval **shardIntervalArray = NULL; - ShardInterval **sortedShardIntervalArray = NULL; - FmgrInfo *shardIntervalCompareFunction = NULL; - FmgrInfo *hashFunction = NULL; - bool hasUninitializedShardInterval = false; - bool hasUniformHashDistribution = false; void *hashKey = (void *) &relationId; - Relation pgDistPartition = NULL; if (DistTableCacheHash == NULL) { InitializeDistTableCache(); } - cacheEntry = hash_search(DistTableCacheHash, hashKey, HASH_FIND, &foundInCache); + cacheEntry = hash_search(DistTableCacheHash, hashKey, HASH_ENTER, &foundInCache); /* return valid matches */ - if ((cacheEntry != NULL) && (cacheEntry->isValid)) + if (foundInCache) { - return cacheEntry; - } + if (cacheEntry->isValid) + { + return cacheEntry; + } - /* free the content of old, invalid, entries */ - if (cacheEntry != NULL) - { + /* free the content of old, invalid, entries */ ResetDistTableCacheEntry(cacheEntry); } + /* zero out entry, but not the key part */ + memset(((char *) cacheEntry) + sizeof(Oid), 0, + sizeof(DistTableCacheEntry) - sizeof(Oid)); + + /* actually fill out entry */ + BuildDistTableCacheEntry(cacheEntry); + + /* and finally mark as valid */ + cacheEntry->isValid = true; + + return cacheEntry; +} + + +/* + * BuildDistTableCacheEntry is a helper routine for + * LookupDistTableCacheEntry() for building the cache contents. + */ +static void +BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry) +{ + HeapTuple distPartitionTuple = NULL; + Relation pgDistPartition = NULL; + Form_pg_dist_partition partitionForm = NULL; + Datum partitionKeyDatum = 0; + Datum replicationModelDatum = 0; + MemoryContext oldContext = NULL; + TupleDesc tupleDescriptor = NULL; + bool isNull = false; + bool partitionKeyIsNull = false; + pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock); - distPartitionTuple = LookupDistPartitionTuple(pgDistPartition, relationId); - if (distPartitionTuple != NULL) + distPartitionTuple = + LookupDistPartitionTuple(pgDistPartition, cacheEntry->relationId); + + /* not a distributed table, done */ + if (distPartitionTuple == NULL) { - Form_pg_dist_partition partitionForm = - (Form_pg_dist_partition) GETSTRUCT(distPartitionTuple); - Datum partitionKeyDatum = 0; - Datum replicationModelDatum = 0; - MemoryContext oldContext = NULL; - TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); - bool isNull = false; - bool partitionKeyIsNull = false; + cacheEntry->isDistributedTable = false; + heap_close(pgDistPartition, NoLock); + return; + } - partitionKeyDatum = heap_getattr(distPartitionTuple, - Anum_pg_dist_partition_partkey, - tupleDescriptor, - &partitionKeyIsNull); + cacheEntry->isDistributedTable = true; - colocationId = heap_getattr(distPartitionTuple, - Anum_pg_dist_partition_colocationid, tupleDescriptor, - &isNull); - if (isNull) - { - colocationId = INVALID_COLOCATION_ID; - } + tupleDescriptor = RelationGetDescr(pgDistPartition); + partitionForm = (Form_pg_dist_partition) GETSTRUCT(distPartitionTuple); - replicationModelDatum = heap_getattr(distPartitionTuple, - Anum_pg_dist_partition_repmodel, - tupleDescriptor, - &isNull); + cacheEntry->partitionMethod = partitionForm->partmethod; - if (isNull) - { - /* - * repmodel is NOT NULL but before ALTER EXTENSION citus UPGRADE the column - * doesn't exist - */ - replicationModelDatum = CharGetDatum('c'); - } + partitionKeyDatum = heap_getattr(distPartitionTuple, + Anum_pg_dist_partition_partkey, + tupleDescriptor, + &partitionKeyIsNull); + /* note that for reference tables partitionKeyisNull is true */ + if (!partitionKeyIsNull) + { oldContext = MemoryContextSwitchTo(CacheMemoryContext); - partitionMethod = partitionForm->partmethod; - replicationModel = DatumGetChar(replicationModelDatum); - - /* note that for reference tables isNull becomes true */ - if (!partitionKeyIsNull) - { - partitionKeyString = TextDatumGetCString(partitionKeyDatum); - } - + cacheEntry->partitionKeyString = TextDatumGetCString(partitionKeyDatum); MemoryContextSwitchTo(oldContext); + } + else + { + cacheEntry->partitionKeyString = NULL; + } - heap_freetuple(distPartitionTuple); + cacheEntry->colocationId = heap_getattr(distPartitionTuple, + Anum_pg_dist_partition_colocationid, + tupleDescriptor, + &isNull); + if (isNull) + { + cacheEntry->colocationId = INVALID_COLOCATION_ID; + } + + replicationModelDatum = heap_getattr(distPartitionTuple, + Anum_pg_dist_partition_repmodel, + tupleDescriptor, + &isNull); + if (isNull) + { + /* + * repmodel is NOT NULL but before ALTER EXTENSION citus UPGRADE the column + * doesn't exist + */ + cacheEntry->replicationModel = 'c'; + } + else + { + cacheEntry->replicationModel = DatumGetChar(replicationModelDatum); + } + + heap_freetuple(distPartitionTuple); + + BuildCachedShardList(cacheEntry); + + /* we only need hash functions for hash distributed tables */ + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH) + { + TypeCacheEntry *typeEntry = NULL; + Node *partitionNode = stringToNode(cacheEntry->partitionKeyString); + Var *partitionColumn = (Var *) partitionNode; + FmgrInfo *hashFunction = NULL; + + Assert(IsA(partitionNode, Var)); + typeEntry = lookup_type_cache(partitionColumn->vartype, + TYPECACHE_HASH_PROC_FINFO); + + hashFunction = MemoryContextAllocZero(CacheMemoryContext, + sizeof(FmgrInfo)); + + fmgr_info_copy(hashFunction, &(typeEntry->hash_proc_finfo), CacheMemoryContext); + + cacheEntry->hashFunction = hashFunction; + + /* check the shard distribution for hash partitioned tables */ + cacheEntry->hasUniformHashDistribution = + HasUniformHashDistribution(cacheEntry->sortedShardIntervalArray, + cacheEntry->shardIntervalArrayLength); + } + else + { + cacheEntry->hashFunction = NULL; } heap_close(pgDistPartition, NoLock); +} - distShardTupleList = LookupDistShardTuples(relationId); + +/* + * BuildCachedShardList() is a helper routine for BuildDistTableCacheEntry() + * building up the list of shards in a distributed relation. + */ +static void +BuildCachedShardList(DistTableCacheEntry *cacheEntry) +{ + ShardInterval **shardIntervalArray = NULL; + ShardInterval **sortedShardIntervalArray = NULL; + FmgrInfo *shardIntervalCompareFunction = NULL; + List *distShardTupleList = NIL; + int shardIntervalArrayLength = 0; + int shardIndex = 0; + + distShardTupleList = LookupDistShardTuples(cacheEntry->relationId); shardIntervalArrayLength = list_length(distShardTupleList); if (shardIntervalArrayLength > 0) { @@ -379,13 +578,24 @@ LookupDistTableCacheEntry(Oid relationId) Oid intervalTypeId = InvalidOid; int32 intervalTypeMod = -1; - GetPartitionTypeInputInfo(partitionKeyString, partitionMethod, &intervalTypeId, + GetPartitionTypeInputInfo(cacheEntry->partitionKeyString, + cacheEntry->partitionMethod, + &intervalTypeId, &intervalTypeMod); shardIntervalArray = MemoryContextAllocZero(CacheMemoryContext, shardIntervalArrayLength * sizeof(ShardInterval *)); + cacheEntry->arrayOfPlacementArrays = + MemoryContextAllocZero(CacheMemoryContext, + shardIntervalArrayLength * + sizeof(ShardPlacement *)); + cacheEntry->arrayOfPlacementArrayLengths = + MemoryContextAllocZero(CacheMemoryContext, + shardIntervalArrayLength * + sizeof(int)); + foreach(distShardTupleCell, distShardTupleList) { HeapTuple shardTuple = lfirst(distShardTupleCell); @@ -411,7 +621,7 @@ LookupDistTableCacheEntry(Oid relationId) } /* decide and allocate interval comparison function */ - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) { shardIntervalCompareFunction = NULL; } @@ -422,16 +632,17 @@ LookupDistTableCacheEntry(Oid relationId) /* allocate the comparison function in the cache context */ oldContext = MemoryContextSwitchTo(CacheMemoryContext); - shardIntervalCompareFunction = ShardIntervalCompareFunction(shardIntervalArray, - partitionMethod); + shardIntervalCompareFunction = + ShardIntervalCompareFunction(shardIntervalArray, + cacheEntry->partitionMethod); MemoryContextSwitchTo(oldContext); } /* reference tables has a single shard which is not initialized */ - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) { - hasUninitializedShardInterval = true; + cacheEntry->hasUninitializedShardInterval = true; /* * Note that during create_reference_table() call, @@ -439,7 +650,7 @@ LookupDistTableCacheEntry(Oid relationId) */ if (shardIntervalArrayLength > 1) { - char *relationName = get_rel_name(relationId); + char *relationName = get_rel_name(cacheEntry->relationId); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("reference table \"%s\" has more than 1 shard", @@ -457,60 +668,55 @@ LookupDistTableCacheEntry(Oid relationId) shardIntervalCompareFunction); /* check if there exists any shard intervals with no min/max values */ - hasUninitializedShardInterval = + cacheEntry->hasUninitializedShardInterval = HasUninitializedShardInterval(sortedShardIntervalArray, shardIntervalArrayLength); } - /* we only need hash functions for hash distributed tables */ - if (partitionMethod == DISTRIBUTE_BY_HASH) + + /* maintain shardId->(table,ShardInterval) cache */ + for (shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++) { - TypeCacheEntry *typeEntry = NULL; - Node *partitionNode = stringToNode(partitionKeyString); - Var *partitionColumn = (Var *) partitionNode; - Assert(IsA(partitionNode, Var)); - typeEntry = lookup_type_cache(partitionColumn->vartype, - TYPECACHE_HASH_PROC_FINFO); + ShardCacheEntry *shardEntry = NULL; + ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex]; + bool foundInCache = false; + List *placementList = NIL; + MemoryContext oldContext = NULL; + ListCell *placementCell = NULL; + ShardPlacement *placementArray = NULL; + int placementOffset = 0; + int numberOfPlacements = 0; - hashFunction = MemoryContextAllocZero(CacheMemoryContext, - sizeof(FmgrInfo)); + shardEntry = hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_ENTER, + &foundInCache); + Assert(!foundInCache); + shardEntry->shardIndex = shardIndex; + shardEntry->tableEntry = cacheEntry; - fmgr_info_copy(hashFunction, &(typeEntry->hash_proc_finfo), CacheMemoryContext); + /* build list of shard placements */ + placementList = BuildShardPlacementList(shardInterval); + numberOfPlacements = list_length(placementList); - /* check the shard distribution for hash partitioned tables */ - hasUniformHashDistribution = - HasUniformHashDistribution(sortedShardIntervalArray, - shardIntervalArrayLength); + /* and copy that list into the cache entry */ + oldContext = MemoryContextSwitchTo(CacheMemoryContext); + placementArray = palloc0(numberOfPlacements * sizeof(ShardPlacement)); + foreach(placementCell, placementList) + { + ShardPlacement *srcPlacement = (ShardPlacement *) lfirst(placementCell); + + CopyShardPlacement(srcPlacement, &placementArray[placementOffset]); + + placementOffset++; + } + MemoryContextSwitchTo(oldContext); + + cacheEntry->arrayOfPlacementArrays[shardIndex] = placementArray; + cacheEntry->arrayOfPlacementArrayLengths[shardIndex] = numberOfPlacements; } - cacheEntry = hash_search(DistTableCacheHash, hashKey, HASH_ENTER, NULL); - - /* zero out entry, but not the key part */ - memset(((char *) cacheEntry) + sizeof(Oid), 0, - sizeof(DistTableCacheEntry) - sizeof(Oid)); - - if (distPartitionTuple == NULL) - { - cacheEntry->isValid = true; - cacheEntry->isDistributedTable = false; - } - else - { - cacheEntry->isValid = true; - cacheEntry->isDistributedTable = true; - cacheEntry->partitionKeyString = partitionKeyString; - cacheEntry->partitionMethod = partitionMethod; - cacheEntry->colocationId = colocationId; - cacheEntry->replicationModel = replicationModel; - cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength; - cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray; - cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction; - cacheEntry->hashFunction = hashFunction; - cacheEntry->hasUninitializedShardInterval = hasUninitializedShardInterval; - cacheEntry->hasUniformHashDistribution = hasUniformHashDistribution; - } - - return cacheEntry; + cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength; + cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray; + cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction; } @@ -1249,7 +1455,7 @@ InitializeDistTableCache(void) DistShardScanKey[0].sk_collation = InvalidOid; DistShardScanKey[0].sk_attno = Anum_pg_dist_shard_logicalrelid; - /* initialize the hash table */ + /* initialize the per-table hash table */ MemSet(&info, 0, sizeof(info)); info.keysize = sizeof(Oid); info.entrysize = sizeof(DistTableCacheEntry); @@ -1258,6 +1464,15 @@ InitializeDistTableCache(void) hash_create("Distributed Relation Cache", 32, &info, HASH_ELEM | HASH_FUNCTION); + /* initialize the per-shard hash table */ + MemSet(&info, 0, sizeof(info)); + info.keysize = sizeof(int64); + info.entrysize = sizeof(ShardCacheEntry); + info.hash = tag_hash; + DistShardCacheHash = + hash_create("Shard Cache", 32 * 64, &info, + HASH_ELEM | HASH_FUNCTION); + /* Watch for invalidation events. */ CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback, (Datum) 0); @@ -1475,57 +1690,99 @@ WorkerNodeHashCode(const void *key, Size keySize) void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry) { + int shardIndex = 0; + if (cacheEntry->partitionKeyString != NULL) { pfree(cacheEntry->partitionKeyString); cacheEntry->partitionKeyString = NULL; } - if (cacheEntry->shardIntervalArrayLength > 0) + if (cacheEntry->shardIntervalCompareFunction != NULL) { - int i = 0; + pfree(cacheEntry->shardIntervalCompareFunction); + cacheEntry->shardIntervalCompareFunction = NULL; + } - for (i = 0; i < cacheEntry->shardIntervalArrayLength; i++) + if (cacheEntry->hashFunction) + { + pfree(cacheEntry->hashFunction); + cacheEntry->hashFunction = NULL; + } + + if (cacheEntry->shardIntervalArrayLength == 0) + { + return; + } + + for (shardIndex = 0; shardIndex < cacheEntry->shardIntervalArrayLength; + shardIndex++) + { + ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex]; + ShardPlacement *placementArray = cacheEntry->arrayOfPlacementArrays[shardIndex]; + int numberOfPlacements = cacheEntry->arrayOfPlacementArrayLengths[shardIndex]; + bool valueByVal = shardInterval->valueByVal; + bool foundInCache = false; + int placementIndex = 0; + + /* delete the shard's placements */ + for (placementIndex = 0; + placementIndex < numberOfPlacements; + placementIndex++) { - ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[i]; - bool valueByVal = shardInterval->valueByVal; + ShardPlacement *placement = &placementArray[placementIndex]; - if (!valueByVal) + if (placement->nodeName) { - if (shardInterval->minValueExists) - { - pfree(DatumGetPointer(shardInterval->minValue)); - } - - if (shardInterval->maxValueExists) - { - pfree(DatumGetPointer(shardInterval->maxValue)); - } + pfree(placement->nodeName); } - pfree(shardInterval); + /* placement itself is deleted as part of the array */ + } + pfree(placementArray); + + /* delete per-shard cache-entry */ + hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_REMOVE, + &foundInCache); + Assert(foundInCache); + + /* delete data pointed to by ShardInterval */ + if (!valueByVal) + { + if (shardInterval->minValueExists) + { + pfree(DatumGetPointer(shardInterval->minValue)); + } + + if (shardInterval->maxValueExists) + { + pfree(DatumGetPointer(shardInterval->maxValue)); + } } + /* and finally the ShardInterval itself */ + pfree(shardInterval); + } + + if (cacheEntry->sortedShardIntervalArray) + { pfree(cacheEntry->sortedShardIntervalArray); cacheEntry->sortedShardIntervalArray = NULL; - cacheEntry->shardIntervalArrayLength = 0; - - cacheEntry->hasUninitializedShardInterval = false; - cacheEntry->hasUniformHashDistribution = false; - - if (cacheEntry->shardIntervalCompareFunction != NULL) - { - pfree(cacheEntry->shardIntervalCompareFunction); - cacheEntry->shardIntervalCompareFunction = NULL; - } - - /* we only allocated hash function for hash distributed tables */ - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH) - { - pfree(cacheEntry->hashFunction); - cacheEntry->hashFunction = NULL; - } } + if (cacheEntry->arrayOfPlacementArrayLengths) + { + pfree(cacheEntry->arrayOfPlacementArrayLengths); + cacheEntry->arrayOfPlacementArrayLengths = NULL; + } + if (cacheEntry->arrayOfPlacementArrays) + { + pfree(cacheEntry->arrayOfPlacementArrays); + cacheEntry->arrayOfPlacementArrays = NULL; + } + + cacheEntry->shardIntervalArrayLength = 0; + cacheEntry->hasUninitializedShardInterval = false; + cacheEntry->hasUniformHashDistribution = false; } @@ -1741,6 +1998,46 @@ LookupDistShardTuples(Oid relationId) } +/* + * LookupShardRelation returns the logical relation oid a shard belongs to. + * + * Errors out if the shardId does not exist. + */ +static Oid +LookupShardRelation(int64 shardId) +{ + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + HeapTuple heapTuple = NULL; + Form_pg_dist_shard shardForm = NULL; + Relation pgDistShard = heap_open(DistShardRelationId(), AccessShareLock); + Oid relationId = InvalidOid; + + ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); + + scanDescriptor = systable_beginscan(pgDistShard, + DistShardShardidIndexId(), true, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for shard " + UINT64_FORMAT, shardId))); + } + + shardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple); + relationId = shardForm->logicalrelid; + + systable_endscan(scanDescriptor); + heap_close(pgDistShard, NoLock); + + return relationId; +} + + /* * GetPartitionTypeInputInfo populates output parameters with the interval type * identifier and modifier for the specified partition key/method combination. diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index a6e6c9070..696016f78 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -62,13 +62,13 @@ extern List * LoadShardIntervalList(Oid relationId); extern int ShardIntervalCount(Oid relationId); extern List * LoadShardList(Oid relationId); extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval); +extern void CopyShardPlacement(ShardPlacement *srcPlacement, + ShardPlacement *destPlacement); extern uint64 ShardLength(uint64 shardId); extern bool NodeHasActiveShardPlacements(char *nodeName, int32 nodePort); extern List * FinalizedShardPlacementList(uint64 shardId); extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk); -extern List * ShardPlacementList(uint64 shardId); -extern ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc, - HeapTuple heapTuple); +extern List * BuildShardPlacementList(ShardInterval *shardInterval); /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 38fbb9cb1..314002acc 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -49,6 +49,10 @@ typedef struct FmgrInfo *shardIntervalCompareFunction; /* NULL if no shard intervals exist */ FmgrInfo *hashFunction; /* NULL if table is not distributed by hash */ + + /* pg_dist_shard_placement metadata */ + ShardPlacement **arrayOfPlacementArrays; + int *arrayOfPlacementArrayLengths; } DistTableCacheEntry; @@ -58,6 +62,7 @@ extern ShardInterval * LoadShardInterval(uint64 shardId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern int GetLocalGroupId(void); extern List * DistTableOidList(void); +extern List * ShardPlacementList(uint64 shardId); extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByShardId(int64 shardId);