Accept invalidation messages before accessing the metadata cache

This commit is crucial to prevent stale metadata reads from the
cache. Without this commit, some of the operations may use stale
metadata which could end up with various bugs such as crashes,
inconsistent/lost data etc.

As an example, consider that a COPY operation is blocked on shard
metadata lock. Another concurrent session updates the metadata and
invalidates the cache. However, since Citus doesn't accept invalidations,
COPY continues with the stale metadata once it acquires the lock.

With this commit, we make sure that invalidation messages are accepted
just before accessing the metadata cache and preventing any operation to
use stale metadata.
pull/1406/head
Onder Kalaci 2017-05-10 12:15:16 +03:00
parent 94151c9aef
commit 88ddde79f2
1 changed files with 26 additions and 5 deletions

View File

@ -385,14 +385,23 @@ LookupShardCacheEntry(int64 shardId)
recheck = true; recheck = true;
} }
else if (!shardEntry->tableEntry->isValid) else
{ {
/* /*
* The cache entry might not be valid right now. Reload cache entry * We might have some concurrent metadata changes. In order to get the changes,
* and recheck (as the offset might have changed). * we first need to accept the cache invalidation messages.
*/ */
LookupDistTableCacheEntry(shardEntry->tableEntry->relationId); AcceptInvalidationMessages();
recheck = true;
if (!shardEntry->tableEntry->isValid)
{
/*
* The cache entry might not be valid right now. Reload cache entry
* and recheck (as the offset might have changed).
*/
LookupDistTableCacheEntry(shardEntry->tableEntry->relationId);
recheck = true;
}
} }
/* /*
@ -472,6 +481,12 @@ LookupDistTableCacheEntry(Oid relationId)
/* return valid matches */ /* return valid matches */
if (foundInCache) if (foundInCache)
{ {
/*
* We might have some concurrent metadata changes. In order to get the changes,
* we first need to accept the cache invalidation messages.
*/
AcceptInvalidationMessages();
if (cacheEntry->isValid) if (cacheEntry->isValid)
{ {
return cacheEntry; return cacheEntry;
@ -1927,6 +1942,12 @@ InitializeDistTableCache(void)
HTAB * HTAB *
GetWorkerNodeHash(void) GetWorkerNodeHash(void)
{ {
/*
* We might have some concurrent metadata changes. In order to get the changes,
* we first need to accept the cache invalidation messages.
*/
AcceptInvalidationMessages();
if (!workerNodeHashValid) if (!workerNodeHashValid)
{ {
InitializeWorkerNodeCache(); InitializeWorkerNodeCache();