Fix command counter increment bug

Fixes citusdata/citus#714

On `InsertShardRow`, we previously called `CommandCounterIncrement()` before
`CitusInvalidateRelcacheByRelid(relationId);`. This might prevent to skip
invalidation of the distributed table in the next access within the same session.
release-5.2
Eren Basak 2016-09-30 12:32:54 +03:00 committed by Jason Petersen
parent 3df78ebd4a
commit a851aaf9f7
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
2 changed files with 13 additions and 9 deletions

View File

@ -16,6 +16,7 @@
#include "access/htup.h" #include "access/htup.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/nbtree.h" #include "access/nbtree.h"
#include "access/xact.h"
#include "catalog/dependency.h" #include "catalog/dependency.h"
#include "catalog/index.h" #include "catalog/index.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
@ -252,6 +253,8 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
RecordDistributedRelationDependencies(distributedRelationId, distributionKey); RecordDistributedRelationDependencies(distributedRelationId, distributionKey);
CommandCounterIncrement();
heap_close(pgDistPartition, NoLock); heap_close(pgDistPartition, NoLock);
relation_close(distributedRelation, NoLock); relation_close(distributedRelation, NoLock);

View File

@ -424,11 +424,12 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
simple_heap_insert(pgDistShard, heapTuple); simple_heap_insert(pgDistShard, heapTuple);
CatalogUpdateIndexes(pgDistShard, heapTuple); CatalogUpdateIndexes(pgDistShard, heapTuple);
CommandCounterIncrement();
/* close relation and invalidate previous cache entry */ /* invalidate previous cache entry and close relation */
heap_close(pgDistShard, RowExclusiveLock);
CitusInvalidateRelcacheByRelid(relationId); CitusInvalidateRelcacheByRelid(relationId);
CommandCounterIncrement();
heap_close(pgDistShard, RowExclusiveLock);
} }
@ -464,9 +465,8 @@ InsertShardPlacementRow(uint64 shardId, char shardState, uint64 shardLength,
simple_heap_insert(pgDistShardPlacement, heapTuple); simple_heap_insert(pgDistShardPlacement, heapTuple);
CatalogUpdateIndexes(pgDistShardPlacement, heapTuple); CatalogUpdateIndexes(pgDistShardPlacement, heapTuple);
CommandCounterIncrement();
/* close relation */ CommandCounterIncrement();
heap_close(pgDistShardPlacement, RowExclusiveLock); heap_close(pgDistShardPlacement, RowExclusiveLock);
} }
@ -507,13 +507,14 @@ DeleteShardRow(uint64 shardId)
distributedRelationId = pgDistShardForm->logicalrelid; distributedRelationId = pgDistShardForm->logicalrelid;
simple_heap_delete(pgDistShard, &heapTuple->t_self); simple_heap_delete(pgDistShard, &heapTuple->t_self);
CommandCounterIncrement();
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
heap_close(pgDistShard, RowExclusiveLock);
/* invalidate previous cache entry */ /* invalidate previous cache entry */
CitusInvalidateRelcacheByRelid(distributedRelationId); CitusInvalidateRelcacheByRelid(distributedRelationId);
CommandCounterIncrement();
heap_close(pgDistShard, RowExclusiveLock);
} }
@ -567,9 +568,9 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort)
} }
simple_heap_delete(pgDistShardPlacement, &heapTuple->t_self); simple_heap_delete(pgDistShardPlacement, &heapTuple->t_self);
CommandCounterIncrement();
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
CommandCounterIncrement();
heap_close(pgDistShardPlacement, RowExclusiveLock); heap_close(pgDistShardPlacement, RowExclusiveLock);
} }