Fix command counter increment bug

Fixes citusdata/citus#714

On `InsertShardRow`, we previously called `CommandCounterIncrement()` before
`CitusInvalidateRelcacheByRelid(relationId);`. This might prevent to skip
invalidation of the distributed table in the next access within the same session.
pull/825/head
Eren Basak 2016-09-30 12:32:54 +03:00
parent 7e7b0f3491
commit ac3a4eee21
2 changed files with 13 additions and 9 deletions

View File

@ -16,6 +16,7 @@
#include "access/htup.h" #include "access/htup.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/nbtree.h" #include "access/nbtree.h"
#include "access/xact.h"
#include "catalog/dependency.h" #include "catalog/dependency.h"
#include "catalog/index.h" #include "catalog/index.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
@ -299,6 +300,8 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
RecordDistributedRelationDependencies(distributedRelationId, distributionKey); RecordDistributedRelationDependencies(distributedRelationId, distributionKey);
CommandCounterIncrement();
heap_close(pgDistPartition, NoLock); heap_close(pgDistPartition, NoLock);
relation_close(distributedRelation, NoLock); relation_close(distributedRelation, NoLock);

View File

@ -357,11 +357,12 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
simple_heap_insert(pgDistShard, heapTuple); simple_heap_insert(pgDistShard, heapTuple);
CatalogUpdateIndexes(pgDistShard, heapTuple); CatalogUpdateIndexes(pgDistShard, heapTuple);
CommandCounterIncrement();
/* close relation and invalidate previous cache entry */ /* invalidate previous cache entry and close relation */
heap_close(pgDistShard, RowExclusiveLock);
CitusInvalidateRelcacheByRelid(relationId); CitusInvalidateRelcacheByRelid(relationId);
CommandCounterIncrement();
heap_close(pgDistShard, RowExclusiveLock);
} }
@ -397,9 +398,8 @@ InsertShardPlacementRow(uint64 shardId, char shardState, uint64 shardLength,
simple_heap_insert(pgDistShardPlacement, heapTuple); simple_heap_insert(pgDistShardPlacement, heapTuple);
CatalogUpdateIndexes(pgDistShardPlacement, heapTuple); CatalogUpdateIndexes(pgDistShardPlacement, heapTuple);
CommandCounterIncrement();
/* close relation */ CommandCounterIncrement();
heap_close(pgDistShardPlacement, RowExclusiveLock); heap_close(pgDistShardPlacement, RowExclusiveLock);
} }
@ -440,13 +440,14 @@ DeleteShardRow(uint64 shardId)
distributedRelationId = pgDistShardForm->logicalrelid; distributedRelationId = pgDistShardForm->logicalrelid;
simple_heap_delete(pgDistShard, &heapTuple->t_self); simple_heap_delete(pgDistShard, &heapTuple->t_self);
CommandCounterIncrement();
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
heap_close(pgDistShard, RowExclusiveLock);
/* invalidate previous cache entry */ /* invalidate previous cache entry */
CitusInvalidateRelcacheByRelid(distributedRelationId); CitusInvalidateRelcacheByRelid(distributedRelationId);
CommandCounterIncrement();
heap_close(pgDistShard, RowExclusiveLock);
} }
@ -500,9 +501,9 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort)
} }
simple_heap_delete(pgDistShardPlacement, &heapTuple->t_self); simple_heap_delete(pgDistShardPlacement, &heapTuple->t_self);
CommandCounterIncrement();
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
CommandCounterIncrement();
heap_close(pgDistShardPlacement, RowExclusiveLock); heap_close(pgDistShardPlacement, RowExclusiveLock);
} }