Use relcache invalidation much less

As our tests shows, many with lots of distributed partitioned tables

these unncessary cache invalidations are triggering certain Postgres errors

Such as ERROR:  invalid memory alloc request size 1073741824

Instead, just invalidate once per table while syncing the metadata for pg_dist_partition.
metadata_sync_partially_non_tx
Onder Kalaci 2022-11-28 20:02:56 +01:00
parent d4394b2e2d
commit 2f10073811
3 changed files with 53 additions and 5 deletions

View File

@ -3272,7 +3272,9 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
shardMaxValue); shardMaxValue);
} }
InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue); bool invalidateRelCache = false;
InsertShardRowInternal(relationId, shardId, storageType, shardMinValue, shardMaxValue,
invalidateRelCache);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -3477,7 +3479,9 @@ citus_internal_add_placement_metadata(PG_FUNCTION_ARGS)
shardLength, groupId); shardLength, groupId);
} }
InsertShardPlacementRow(shardId, placementId, shardState, shardLength, groupId); bool invalidateRelCache = false;
InsertShardPlacementRowInternal(shardId, placementId, shardState, shardLength,
groupId, invalidateRelCache);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }

View File

@ -1745,6 +1745,20 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
void void
InsertShardRow(Oid relationId, uint64 shardId, char storageType, InsertShardRow(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue) text *shardMinValue, text *shardMaxValue)
{
bool invalidateRelCache = true;
InsertShardRowInternal(relationId, shardId, storageType,
shardMinValue, shardMaxValue, invalidateRelCache);
}
/*
* InsertShardRowInternal is a helper function for InsertShardRow()
* where callers can also control invalidateRelCache.
*/
void
InsertShardRowInternal(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue, bool invalidateRelCache)
{ {
Datum values[Natts_pg_dist_shard]; Datum values[Natts_pg_dist_shard];
bool isNulls[Natts_pg_dist_shard]; bool isNulls[Natts_pg_dist_shard];
@ -1780,10 +1794,14 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
CatalogTupleInsert(pgDistShard, heapTuple); CatalogTupleInsert(pgDistShard, heapTuple);
if (invalidateRelCache)
{
/* invalidate previous cache entry and close relation */ /* invalidate previous cache entry and close relation */
CitusInvalidateRelcacheByRelid(relationId); CitusInvalidateRelcacheByRelid(relationId);
}
CommandCounterIncrement(); CommandCounterIncrement();
table_close(pgDistShard, NoLock); table_close(pgDistShard, NoLock);
} }
@ -1798,6 +1816,21 @@ uint64
InsertShardPlacementRow(uint64 shardId, uint64 placementId, InsertShardPlacementRow(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength, char shardState, uint64 shardLength,
int32 groupId) int32 groupId)
{
bool invalidateRelCache = true;
return InsertShardPlacementRowInternal(shardId, placementId, shardState, shardLength,
groupId, invalidateRelCache);
}
/*
* InsertShardPlacementRowInternal is a helper function for InsertShardPlacementRow()
* where callers can also control invalidateRelCache.
*/
uint64
InsertShardPlacementRowInternal(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength,
int32 groupId, bool invalidateRelCache)
{ {
Datum values[Natts_pg_dist_placement]; Datum values[Natts_pg_dist_placement];
bool isNulls[Natts_pg_dist_placement]; bool isNulls[Natts_pg_dist_placement];
@ -1824,9 +1857,13 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId,
CatalogTupleInsert(pgDistPlacement, heapTuple); CatalogTupleInsert(pgDistPlacement, heapTuple);
if (invalidateRelCache)
{
CitusInvalidateRelcacheByShardId(shardId); CitusInvalidateRelcacheByShardId(shardId);
}
CommandCounterIncrement(); CommandCounterIncrement();
table_close(pgDistPlacement, NoLock); table_close(pgDistPlacement, NoLock);
return placementId; return placementId;
@ -1886,6 +1923,7 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
RecordDistributedRelationDependencies(relationId); RecordDistributedRelationDependencies(relationId);
CommandCounterIncrement(); CommandCounterIncrement();
table_close(pgDistPartition, NoLock); table_close(pgDistPartition, NoLock);
} }

View File

@ -311,10 +311,16 @@ extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
/* Function declarations to modify shard and shard placement data */ /* Function declarations to modify shard and shard placement data */
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue); text *shardMinValue, text *shardMaxValue);
extern void InsertShardRowInternal(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue,
bool invalidateRelCache);
extern void DeleteShardRow(uint64 shardId); extern void DeleteShardRow(uint64 shardId);
extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId, extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength, char shardState, uint64 shardLength,
int32 groupId); int32 groupId);
extern uint64 InsertShardPlacementRowInternal(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength,
int32 groupId, bool invalidateRelCache);
extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId, Var *distributionColumn, uint32 colocationId,
char replicationModel, bool autoConverted); char replicationModel, bool autoConverted);