From 88ddde79f2e3708070ed0651e7c2869f6ee61ec2 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 10 May 2017 12:15:16 +0300 Subject: [PATCH] Accept invalidation messages before accessing the metadata cache This commit is crucial to prevent stale metadata reads from the cache. Without this commit, some of the operations may use stale metadata which could end up with various bugs such as crashes, inconsistent/lost data etc. As an example, consider that a COPY operation is blocked on shard metadata lock. Another concurrent session updates the metadata and invalidates the cache. However, since Citus doesn't accept invalidations, COPY continues with the stale metadata once it acquires the lock. With this commit, we make sure that invalidation messages are accepted just before accessing the metadata cache and preventing any operation to use stale metadata. --- .../distributed/utils/metadata_cache.c | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 27e5ad22c..66a2af3da 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -385,14 +385,23 @@ LookupShardCacheEntry(int64 shardId) recheck = true; } - else if (!shardEntry->tableEntry->isValid) + else { /* - * The cache entry might not be valid right now. Reload cache entry - * and recheck (as the offset might have changed). + * We might have some concurrent metadata changes. In order to get the changes, + * we first need to accept the cache invalidation messages. */ - LookupDistTableCacheEntry(shardEntry->tableEntry->relationId); - recheck = true; + AcceptInvalidationMessages(); + + if (!shardEntry->tableEntry->isValid) + { + /* + * The cache entry might not be valid right now. Reload cache entry + * and recheck (as the offset might have changed). + */ + LookupDistTableCacheEntry(shardEntry->tableEntry->relationId); + recheck = true; + } } /* @@ -472,6 +481,12 @@ LookupDistTableCacheEntry(Oid relationId) /* return valid matches */ if (foundInCache) { + /* + * We might have some concurrent metadata changes. In order to get the changes, + * we first need to accept the cache invalidation messages. + */ + AcceptInvalidationMessages(); + if (cacheEntry->isValid) { return cacheEntry; @@ -1927,6 +1942,12 @@ InitializeDistTableCache(void) HTAB * GetWorkerNodeHash(void) { + /* + * We might have some concurrent metadata changes. In order to get the changes, + * we first need to accept the cache invalidation messages. + */ + AcceptInvalidationMessages(); + if (!workerNodeHashValid) { InitializeWorkerNodeCache();