mirror of https://github.com/citusdata/citus.git
Use relcache invalidation much less
As our tests shows, many with lots of distributed partitioned tables these unncessary cache invalidations are triggering certain Postgres errors Such as ERROR: invalid memory alloc request size 1073741824 Instead, just invalidate once per table while syncing the metadata for pg_dist_partition.metadata_sync_final
parent
0e92244bfe
commit
bf03fae6d1
|
@ -3272,7 +3272,9 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
|
|||
shardMaxValue);
|
||||
}
|
||||
|
||||
InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue);
|
||||
bool invalidateRelCache = false;
|
||||
InsertShardRowInternal(relationId, shardId, storageType, shardMinValue, shardMaxValue,
|
||||
invalidateRelCache);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -3477,7 +3479,9 @@ citus_internal_add_placement_metadata(PG_FUNCTION_ARGS)
|
|||
shardLength, groupId);
|
||||
}
|
||||
|
||||
InsertShardPlacementRow(shardId, placementId, shardState, shardLength, groupId);
|
||||
bool invalidateRelCache = false;
|
||||
InsertShardPlacementRowInternal(shardId, placementId, shardState, shardLength,
|
||||
groupId, invalidateRelCache);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
|
|
@ -1745,6 +1745,20 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
|
|||
void
|
||||
InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
||||
text *shardMinValue, text *shardMaxValue)
|
||||
{
|
||||
bool invalidateRelCache = true;
|
||||
InsertShardRowInternal(relationId, shardId, storageType,
|
||||
shardMinValue, shardMaxValue, invalidateRelCache);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertShardRowInternal is a helper function for InsertShardRow()
|
||||
* where callers can also control invalidateRelCache.
|
||||
*/
|
||||
void
|
||||
InsertShardRowInternal(Oid relationId, uint64 shardId, char storageType,
|
||||
text *shardMinValue, text *shardMaxValue, bool invalidateRelCache)
|
||||
{
|
||||
Datum values[Natts_pg_dist_shard];
|
||||
bool isNulls[Natts_pg_dist_shard];
|
||||
|
@ -1780,10 +1794,14 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
|||
|
||||
CatalogTupleInsert(pgDistShard, heapTuple);
|
||||
|
||||
if (invalidateRelCache)
|
||||
{
|
||||
/* invalidate previous cache entry and close relation */
|
||||
CitusInvalidateRelcacheByRelid(relationId);
|
||||
}
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
||||
table_close(pgDistShard, NoLock);
|
||||
}
|
||||
|
||||
|
@ -1798,6 +1816,21 @@ uint64
|
|||
InsertShardPlacementRow(uint64 shardId, uint64 placementId,
|
||||
char shardState, uint64 shardLength,
|
||||
int32 groupId)
|
||||
{
|
||||
bool invalidateRelCache = true;
|
||||
return InsertShardPlacementRowInternal(shardId, placementId, shardState, shardLength,
|
||||
groupId, invalidateRelCache);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertShardPlacementRowInternal is a helper function for InsertShardPlacementRow()
|
||||
* where callers can also control invalidateRelCache.
|
||||
*/
|
||||
uint64
|
||||
InsertShardPlacementRowInternal(uint64 shardId, uint64 placementId,
|
||||
char shardState, uint64 shardLength,
|
||||
int32 groupId, bool invalidateRelCache)
|
||||
{
|
||||
Datum values[Natts_pg_dist_placement];
|
||||
bool isNulls[Natts_pg_dist_placement];
|
||||
|
@ -1824,9 +1857,13 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId,
|
|||
|
||||
CatalogTupleInsert(pgDistPlacement, heapTuple);
|
||||
|
||||
if (invalidateRelCache)
|
||||
{
|
||||
CitusInvalidateRelcacheByShardId(shardId);
|
||||
}
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
||||
table_close(pgDistPlacement, NoLock);
|
||||
|
||||
return placementId;
|
||||
|
@ -1886,6 +1923,7 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
|||
RecordDistributedRelationDependencies(relationId);
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
||||
table_close(pgDistPartition, NoLock);
|
||||
}
|
||||
|
||||
|
|
|
@ -311,10 +311,16 @@ extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
|
|||
/* Function declarations to modify shard and shard placement data */
|
||||
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
||||
text *shardMinValue, text *shardMaxValue);
|
||||
extern void InsertShardRowInternal(Oid relationId, uint64 shardId, char storageType,
|
||||
text *shardMinValue, text *shardMaxValue,
|
||||
bool invalidateRelCache);
|
||||
extern void DeleteShardRow(uint64 shardId);
|
||||
extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
|
||||
char shardState, uint64 shardLength,
|
||||
int32 groupId);
|
||||
extern uint64 InsertShardPlacementRowInternal(uint64 shardId, uint64 placementId,
|
||||
char shardState, uint64 shardLength,
|
||||
int32 groupId, bool invalidateRelCache);
|
||||
extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
||||
Var *distributionColumn, uint32 colocationId,
|
||||
char replicationModel, bool autoConverted);
|
||||
|
|
Loading…
Reference in New Issue