Merge pull request #689 from citusdata/fix-674-51

Don't access pg_dist_partition->partkey directly, use heap_getattr() - 5.1.
release-5.1
Andres Freund 2016-07-29 10:14:51 -07:00 committed by GitHub
commit bb83331ac5
2 changed files with 18 additions and 10 deletions

View File

@ -78,7 +78,7 @@ static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArr
static void InitializeDistTableCache(void); static void InitializeDistTableCache(void);
static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
static HeapTuple LookupDistPartitionTuple(Oid relationId); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId);
static List * LookupDistShardTuples(Oid relationId); static List * LookupDistShardTuples(Oid relationId);
static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
Oid *intervalTypeId, int32 *intervalTypeMod); Oid *intervalTypeId, int32 *intervalTypeMod);
@ -227,6 +227,7 @@ LookupDistTableCacheEntry(Oid relationId)
bool hasUninitializedShardInterval = false; bool hasUninitializedShardInterval = false;
bool hasUniformHashDistribution = false; bool hasUniformHashDistribution = false;
void *hashKey = (void *) &relationId; void *hashKey = (void *) &relationId;
Relation pgDistPartition = NULL;
if (DistTableCacheHash == NULL) if (DistTableCacheHash == NULL)
{ {
@ -247,15 +248,23 @@ LookupDistTableCacheEntry(Oid relationId)
ResetDistTableCacheEntry(cacheEntry); ResetDistTableCacheEntry(cacheEntry);
} }
distPartitionTuple = LookupDistPartitionTuple(relationId); pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
distPartitionTuple = LookupDistPartitionTuple(pgDistPartition, relationId);
if (distPartitionTuple != NULL) if (distPartitionTuple != NULL)
{ {
Form_pg_dist_partition partitionForm = Form_pg_dist_partition partitionForm =
(Form_pg_dist_partition) GETSTRUCT(distPartitionTuple); (Form_pg_dist_partition) GETSTRUCT(distPartitionTuple);
Datum partitionKeyDatum = PointerGetDatum(&partitionForm->partkey); Datum partitionKeyDatum = 0;
MemoryContext oldContext = NULL;
bool isNull = false;
MemoryContext oldContext = MemoryContextSwitchTo(CacheMemoryContext); partitionKeyDatum = heap_getattr(distPartitionTuple,
Anum_pg_dist_partition_partkey,
RelationGetDescr(pgDistPartition),
&isNull);
Assert(!isNull);
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
partitionKeyString = TextDatumGetCString(partitionKeyDatum); partitionKeyString = TextDatumGetCString(partitionKeyDatum);
partitionMethod = partitionForm->partmethod; partitionMethod = partitionForm->partmethod;
@ -264,6 +273,8 @@ LookupDistTableCacheEntry(Oid relationId)
heap_freetuple(distPartitionTuple); heap_freetuple(distPartitionTuple);
} }
heap_close(pgDistPartition, NoLock);
distShardTupleList = LookupDistShardTuples(relationId); distShardTupleList = LookupDistShardTuples(relationId);
shardIntervalArrayLength = list_length(distShardTupleList); shardIntervalArrayLength = list_length(distShardTupleList);
if (shardIntervalArrayLength > 0) if (shardIntervalArrayLength > 0)
@ -1019,16 +1030,13 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
* and returns that or, if no matching entry was found, NULL. * and returns that or, if no matching entry was found, NULL.
*/ */
static HeapTuple static HeapTuple
LookupDistPartitionTuple(Oid relationId) LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId)
{ {
Relation pgDistPartition = NULL;
HeapTuple distPartitionTuple = NULL; HeapTuple distPartitionTuple = NULL;
HeapTuple currentPartitionTuple = NULL; HeapTuple currentPartitionTuple = NULL;
SysScanDesc scanDescriptor; SysScanDesc scanDescriptor;
ScanKeyData scanKey[1]; ScanKeyData scanKey[1];
pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
/* copy scankey to local copy, it will be modified during the scan */ /* copy scankey to local copy, it will be modified during the scan */
memcpy(scanKey, DistPartitionScanKey, sizeof(DistPartitionScanKey)); memcpy(scanKey, DistPartitionScanKey, sizeof(DistPartitionScanKey));
@ -1049,8 +1057,6 @@ LookupDistPartitionTuple(Oid relationId)
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
heap_close(pgDistPartition, NoLock);
return distPartitionTuple; return distPartitionTuple;
} }

View File

@ -23,7 +23,9 @@ typedef struct FormData_pg_dist_partition
{ {
Oid logicalrelid; /* logical relation id; references pg_class oid */ Oid logicalrelid; /* logical relation id; references pg_class oid */
char partmethod; /* partition method; see codes below */ char partmethod; /* partition method; see codes below */
#ifdef CATALOG_VARLEN /* variable-length fields start here */
text partkey; /* partition key expression */ text partkey; /* partition key expression */
#endif
} FormData_pg_dist_partition; } FormData_pg_dist_partition;
/* ---------------- /* ----------------