mirror of https://github.com/citusdata/citus.git
Use relcache invalidation much less
As our tests shows, many with lots of distributed partitioned tables these unncessary cache invalidations are triggering certain Postgres errors Such as ERROR: invalid memory alloc request size 1073741824tx_metadata_sync
parent
617cac4024
commit
873429c465
|
@ -3122,8 +3122,11 @@ citus_internal_add_partition_metadata(PG_FUNCTION_ARGS)
|
|||
replicationModel, distributionColumnVar);
|
||||
}
|
||||
|
||||
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumnVar,
|
||||
colocationId, replicationModel, autoConverted);
|
||||
bool invalidateRelCache = false;
|
||||
InsertIntoPgDistPartitionInternal(relationId, distributionMethod,
|
||||
distributionColumnVar,
|
||||
colocationId, replicationModel, autoConverted,
|
||||
invalidateRelCache);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -3272,7 +3275,9 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
|
|||
shardMaxValue);
|
||||
}
|
||||
|
||||
InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue);
|
||||
bool invalidateRelCache = false;
|
||||
InsertShardRowInternal(relationId, shardId, storageType, shardMinValue, shardMaxValue,
|
||||
invalidateRelCache);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -3477,7 +3482,9 @@ citus_internal_add_placement_metadata(PG_FUNCTION_ARGS)
|
|||
shardLength, groupId);
|
||||
}
|
||||
|
||||
InsertShardPlacementRow(shardId, placementId, shardState, shardLength, groupId);
|
||||
bool invalidateRelCache = false;
|
||||
InsertShardPlacementRowInternal(shardId, placementId, shardState, shardLength,
|
||||
groupId, invalidateRelCache);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
|
|
@ -1745,6 +1745,20 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
|
|||
void
|
||||
InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
||||
text *shardMinValue, text *shardMaxValue)
|
||||
{
|
||||
bool invalidateRelCache = true;
|
||||
InsertShardRowInternal(relationId, shardId, storageType,
|
||||
shardMinValue, shardMaxValue, invalidateRelCache);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertShardRowInternal is a helper function for InsertShardRow()
|
||||
* where callers can also control invalidateRelCache.
|
||||
*/
|
||||
void
|
||||
InsertShardRowInternal(Oid relationId, uint64 shardId, char storageType,
|
||||
text *shardMinValue, text *shardMaxValue, bool invalidateRelCache)
|
||||
{
|
||||
Datum values[Natts_pg_dist_shard];
|
||||
bool isNulls[Natts_pg_dist_shard];
|
||||
|
@ -1780,10 +1794,14 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
|||
|
||||
CatalogTupleInsert(pgDistShard, heapTuple);
|
||||
|
||||
if (invalidateRelCache)
|
||||
{
|
||||
/* invalidate previous cache entry and close relation */
|
||||
CitusInvalidateRelcacheByRelid(relationId);
|
||||
}
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
||||
table_close(pgDistShard, NoLock);
|
||||
}
|
||||
|
||||
|
@ -1798,6 +1816,21 @@ uint64
|
|||
InsertShardPlacementRow(uint64 shardId, uint64 placementId,
|
||||
char shardState, uint64 shardLength,
|
||||
int32 groupId)
|
||||
{
|
||||
bool invalidateRelCache = true;
|
||||
return InsertShardPlacementRowInternal(shardId, placementId, shardState, shardLength,
|
||||
groupId, invalidateRelCache);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertShardPlacementRowInternal is a helper function for InsertShardPlacementRow()
|
||||
* where callers can also control invalidateRelCache.
|
||||
*/
|
||||
uint64
|
||||
InsertShardPlacementRowInternal(uint64 shardId, uint64 placementId,
|
||||
char shardState, uint64 shardLength,
|
||||
int32 groupId, bool invalidateRelCache)
|
||||
{
|
||||
Datum values[Natts_pg_dist_placement];
|
||||
bool isNulls[Natts_pg_dist_placement];
|
||||
|
@ -1824,9 +1857,13 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId,
|
|||
|
||||
CatalogTupleInsert(pgDistPlacement, heapTuple);
|
||||
|
||||
if (invalidateRelCache)
|
||||
{
|
||||
CitusInvalidateRelcacheByShardId(shardId);
|
||||
}
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
||||
table_close(pgDistPlacement, NoLock);
|
||||
|
||||
return placementId;
|
||||
|
@ -1840,6 +1877,23 @@ void
|
|||
InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
||||
Var *distributionColumn, uint32 colocationId,
|
||||
char replicationModel, bool autoConverted)
|
||||
{
|
||||
bool invalidateRelCache = true;
|
||||
InsertIntoPgDistPartitionInternal(relationId, distributionMethod, distributionColumn,
|
||||
colocationId, replicationModel,
|
||||
autoConverted, invalidateRelCache);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertIntoPgDistPartitionInternal is a helper function for InsertIntoPgDistPartition()
|
||||
* where callers can also control invalidateRelCache.
|
||||
*/
|
||||
void
|
||||
InsertIntoPgDistPartitionInternal(Oid relationId, char distributionMethod,
|
||||
Var *distributionColumn, uint32 colocationId,
|
||||
char replicationModel, bool autoConverted,
|
||||
bool invalidateRelCache)
|
||||
{
|
||||
char *distributionColumnString = NULL;
|
||||
|
||||
|
@ -1881,11 +1935,15 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
|||
/* finally insert tuple, build index entries & register cache invalidation */
|
||||
CatalogTupleInsert(pgDistPartition, newTuple);
|
||||
|
||||
if (invalidateRelCache)
|
||||
{
|
||||
CitusInvalidateRelcacheByRelid(relationId);
|
||||
}
|
||||
|
||||
RecordDistributedRelationDependencies(relationId);
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
||||
table_close(pgDistPartition, NoLock);
|
||||
}
|
||||
|
||||
|
|
|
@ -311,13 +311,27 @@ extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
|
|||
/* Function declarations to modify shard and shard placement data */
|
||||
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
||||
text *shardMinValue, text *shardMaxValue);
|
||||
extern void InsertShardRowInternal(Oid relationId, uint64 shardId, char storageType,
|
||||
text *shardMinValue, text *shardMaxValue,
|
||||
bool invalidateRelCache);
|
||||
extern void DeleteShardRow(uint64 shardId);
|
||||
extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
|
||||
char shardState, uint64 shardLength,
|
||||
int32 groupId);
|
||||
extern uint64 InsertShardPlacementRowInternal(uint64 shardId, uint64 placementId,
|
||||
char shardState, uint64 shardLength,
|
||||
int32 groupId, bool invalidateRelCache);
|
||||
extern uint64 InsertShardPlacementRowInternal(uint64 shardId, uint64 placementId,
|
||||
char shardState, uint64 shardLength,
|
||||
int32 groupId, bool invalidateRelCache);
|
||||
extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
||||
Var *distributionColumn, uint32 colocationId,
|
||||
char replicationModel, bool autoConverted);
|
||||
extern void InsertIntoPgDistPartitionInternal(Oid relationId, char distributionMethod,
|
||||
Var *distributionColumn,
|
||||
uint32 colocationId, char replicationModel,
|
||||
bool autoConverted,
|
||||
bool invalidateRelCache);
|
||||
extern void UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted);
|
||||
extern void UpdateDistributionColumnGlobally(Oid relationId, char distributionMethod,
|
||||
Var *distributionColumn, int colocationId);
|
||||
|
|
Loading…
Reference in New Issue