mirror of https://github.com/citusdata/citus.git
Keep track of cached entries in case of interruption. (#2433)
* Keep track of cached entries in case of interruption. Previously we set DistTableCacheEntry->sortedShardIntervalArray and DistTableCacheEntry->shardIntervalArrayLength after we entered all related shard entries into DistShardCacheHash. The drawback was that if populating DistShardCacheHash was interrupted, ResetDistTableCacheEntry() didn't see the shard hash entries created, so was unable to clean them up. This patch fixes that by setting sortedShardIntervalArray earlier, and incrementing shardIntervalArrayLength as we enter shards into the cache.pull/2436/head
parent
a9f183a284
commit
431ac80563
|
@ -860,12 +860,22 @@ LookupDistTableCacheEntry(Oid relationId)
|
|||
memset(((char *) cacheEntry) + sizeof(Oid), 0,
|
||||
sizeof(DistTableCacheEntry) - sizeof(Oid));
|
||||
|
||||
/*
|
||||
* We disable interrupts while creating the cache entry because loading
|
||||
* shard metadata can take a while, and if statement_timeout is too low,
|
||||
* this will get canceled on each call and we won't be able to run any
|
||||
* queries on the table.
|
||||
*/
|
||||
HOLD_INTERRUPTS();
|
||||
|
||||
/* actually fill out entry */
|
||||
BuildDistTableCacheEntry(cacheEntry);
|
||||
|
||||
/* and finally mark as valid */
|
||||
cacheEntry->isValid = true;
|
||||
|
||||
RESUME_INTERRUPTS();
|
||||
|
||||
return cacheEntry;
|
||||
}
|
||||
|
||||
|
@ -1170,6 +1180,13 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* We set these here, so ResetDistTableCacheEntry() can see what has been
|
||||
* entered into DistShardCacheHash even if the following loop is interrupted
|
||||
* by throwing errors, etc.
|
||||
*/
|
||||
cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray;
|
||||
cacheEntry->shardIntervalArrayLength = 0;
|
||||
|
||||
/* maintain shardId->(table,ShardInterval) cache */
|
||||
for (shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++)
|
||||
|
@ -1194,6 +1211,13 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
|
|||
errhint("Reconnect and try again.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* We should increment this only after we are sure this hasn't already
|
||||
* been assigned to any other relations. ResetDistTableCacheEntry()
|
||||
* depends on this.
|
||||
*/
|
||||
cacheEntry->shardIntervalArrayLength++;
|
||||
|
||||
shardEntry->shardIndex = shardIndex;
|
||||
shardEntry->tableEntry = cacheEntry;
|
||||
|
||||
|
@ -1222,8 +1246,6 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
|
|||
shardInterval->shardIndex = shardIndex;
|
||||
}
|
||||
|
||||
cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength;
|
||||
cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray;
|
||||
cacheEntry->shardColumnCompareFunction = shardColumnCompareFunction;
|
||||
cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction;
|
||||
}
|
||||
|
@ -2934,7 +2956,10 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
|
|||
bool foundInCache = false;
|
||||
|
||||
/* delete the shard's placements */
|
||||
pfree(placementArray);
|
||||
if (placementArray != NULL)
|
||||
{
|
||||
pfree(placementArray);
|
||||
}
|
||||
|
||||
/* delete per-shard cache-entry */
|
||||
hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_REMOVE,
|
||||
|
|
Loading…
Reference in New Issue