Addressing comments

- Changed name of functions
- Moved some functions from .h to .c file
users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-05-25 15:31:22 +05:30
parent 75c6484e02
commit 59c3c93aa4
4 changed files with 72 additions and 66 deletions

View File

@ -49,15 +49,15 @@ static void ParseShardSplitInfo(ArrayType *shardInfoArrayObject,
int32 *maxValue,
int32 *nodeId);
static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit,
uint64 desSplitChildShardId,
int32 minValue,
int32 maxValue,
int32 nodeId);
uint64 desSplitChildShardId,
int32 minValue,
int32 maxValue,
int32 nodeId);
static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo);
static void * PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
HTAB *shardInfoHashMap,
dsm_handle dsmHandle,
int shardSplitInfoCount);
HTAB *shardInfoHashMap,
dsm_handle dsmHandle,
int shardSplitInfoCount);
static void SetupHashMapForShardInfo();
/*
@ -130,12 +130,12 @@ split_shard_replication_setup(PG_FUNCTION_ARGS)
dsm_handle dsmHandle;
ShardSplitInfo *splitShardInfoSMArray =
GetSharedMemoryForShardSplitInfo(shardSplitInfoCount, &dsmHandle);
CreateSharedMemoryForShardSplitInfo(shardSplitInfoCount, &dsmHandle);
PopulateShardSplitInfoInSM(splitShardInfoSMArray,
ShardInfoHashMap,
dsmHandle,
shardSplitInfoCount);
ShardInfoHashMap,
dsmHandle,
shardSplitInfoCount);
return dsmHandle;
}
@ -315,10 +315,10 @@ ParseShardSplitInfo(ArrayType *shardInfoArrayObject,
*/
ShardSplitInfo *
CreateShardSplitInfo(uint64 sourceShardIdToSplit,
uint64 desSplitChildShardId,
int32 minValue,
int32 maxValue,
int32 nodeId)
uint64 desSplitChildShardId,
int32 minValue,
int32 maxValue,
int32 nodeId)
{
ShardInterval *shardIntervalToSplit = LoadShardInterval(sourceShardIdToSplit);
CitusTableCacheEntry *cachedTableEntry = GetCitusTableCacheEntry(
@ -425,9 +425,9 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo)
*/
void *
PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
HTAB *shardInfoHashMap,
dsm_handle dsmHandle,
int shardSplitInfoCount)
HTAB *shardInfoHashMap,
dsm_handle dsmHandle,
int shardSplitInfoCount)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, shardInfoHashMap);

View File

@ -44,13 +44,13 @@ int shardSplitInfoArraySize = 0;
/* Plugin callback */
static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
Relation relation, ReorderBufferChange *change);
/* Helper methods */
static bool ShouldCommitBeApplied(Relation sourceShardRelation);
static int32_t GetHashValueForIncomingTuple(Relation sourceShardRelation,
HeapTuple tuple,
bool *shouldHandleUpdate);
HeapTuple tuple,
bool *shouldHandleUpdate);
void
_PG_output_plugin_init(OutputPluginCallbacks *cb)
@ -202,6 +202,8 @@ FindTargetRelationOid(Relation sourceShardRelation,
* part of replication. This in turn creates one more commit(2).
* Commit 2 should be skipped as the source shard and destination for commit 2
* are same and the commit has already been applied.
*
* TODO(saawasek): Add the information in Hashmap for performance reasons.
*/
bool
ShouldCommitBeApplied(Relation sourceShardRelation)
@ -231,7 +233,7 @@ ShouldCommitBeApplied(Relation sourceShardRelation)
*/
static void
split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
Relation relation, ReorderBufferChange *change)
{
/*
* Get ShardSplitInfo array from Shared Memory if not already
@ -243,8 +245,7 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
shardSplitInfoArray =
GetShardSplitInfoSMArrayForSlot(ctx->slot->data.name.data,
&arraySize);
shardSplitInfoArraySize = arraySize;
&shardSplitInfoArraySize);
}
char *replicationSlotName = ctx->slot->data.name.data;
@ -277,11 +278,14 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
default:
Assert(false);
}
if (targetRelationOid != InvalidOid)
/* Current replication slot is not responsible for handling the change */
if (targetRelationOid == InvalidOid)
{
Relation targetRelation = RelationIdGetRelation(targetRelationOid);
pgoutputChangeCB(ctx, txn, targetRelation, change);
RelationClose(targetRelation);
return;
}
Relation targetRelation = RelationIdGetRelation(targetRelationOid);
pgoutputChangeCB(ctx, txn, targetRelation, change);
RelationClose(targetRelation);
}

View File

@ -18,12 +18,28 @@
#include "distributed/shardsplit_shared_memory.h"
#include "distributed/citus_safe_lib.h"
/* Function declarations */
static ShardSplitInfoSMHeader * AllocateSharedMemoryForShardSplitInfo(int
shardSplitInfoCount,
Size
shardSplitInfoSize,
dsm_handle *
dsmHandle);
static void * ShardSplitInfoSMSteps(ShardSplitInfoSMHeader *shardSplitInfoSMHeader);
static ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle
dsmHandle,
dsm_segment **
attachedSegment);
static dsm_handle GetSMHandleFromSlotName(char *slotName);
/*
* GetShardSplitInfoSMHeaderFromDSMHandle returns the header of the shared memory
* segment beloing to 'dsmHandle'. It pins the shared memory segment mapping till
* lifetime of the backend process accessing it.
*/
ShardSplitInfoSMHeader *
static ShardSplitInfoSMHeader *
GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle,
dsm_segment **attachedSegment)
{
@ -87,7 +103,7 @@ GetShardSplitInfoSMArrayForSlot(char *slotName, int *arraySize)
* from the replication slot name. Replication slot name is encoded as
* "NODEID_SlotType_SharedMemoryHANDLE".
*/
dsm_handle
static dsm_handle
GetSMHandleFromSlotName(char *slotName)
{
if (slotName == NULL)
@ -105,27 +121,29 @@ GetSMHandleFromSlotName(char *slotName)
/*
* CreateShardSplitInfoSharedMemory is used to create a place to store
* information about the shard undergoing a split. The function creates dynamic
* shared memory segment consisting of a header regarding the processId and an
* array of "steps" which store ShardSplitInfo. The contents of this shared
* memory segment are consumed by WAL sender process during catch up phase of
* AllocateSharedMemoryForShardSplitInfo is used to create a place to store
* information about the shard undergoing a split. The function allocates dynamic
* shared memory segment consisting of a header which stores the id of process
* creating it and an array of "steps" which store ShardSplitInfo. The contents of
* this shared memory segment are consumed by WAL sender process during catch up phase of
* replication through logical decoding plugin.
*
* The shared memory segment exists till the catch up phase completes or the
* postmaster shutsdown.
*/
ShardSplitInfoSMHeader *
CreateShardSplitInfoSharedMemory(int stepCount, Size stepSize, dsm_handle *dsmHandle)
static ShardSplitInfoSMHeader *
AllocateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, Size shardSplitInfoSize,
dsm_handle *dsmHandle)
{
if (stepSize <= 0 || stepCount <= 0)
if (shardSplitInfoCount <= 0 || shardSplitInfoSize <= 0)
{
ereport(ERROR,
(errmsg("number of steps and size of each step should be "
(errmsg("count and size of each step should be "
"positive values")));
}
Size totalSize = sizeof(ShardSplitInfoSMHeader) + stepSize * stepCount;
Size totalSize = sizeof(ShardSplitInfoSMHeader) + shardSplitInfoCount *
shardSplitInfoSize;
dsm_segment *dsmSegment = dsm_create(totalSize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
if (dsmSegment == NULL)
@ -146,7 +164,7 @@ CreateShardSplitInfoSharedMemory(int stepCount, Size stepSize, dsm_handle *dsmHa
ShardSplitInfoSMHeader *shardSplitInfoSMHeader =
GetShardSplitInfoSMHeaderFromDSMHandle(*dsmHandle, &dsmSegment);
shardSplitInfoSMHeader->stepCount = stepCount;
shardSplitInfoSMHeader->stepCount = shardSplitInfoCount;
shardSplitInfoSMHeader->processId = MyProcPid;
return shardSplitInfoSMHeader;
@ -154,7 +172,7 @@ CreateShardSplitInfoSharedMemory(int stepCount, Size stepSize, dsm_handle *dsmHa
/*
* GetSharedMemoryForShardSplitInfo is a wrapper function which creates shared memory
* CreateSharedMemoryForShardSplitInfo is a wrapper function which creates shared memory
* for storing shard split infomation. The function returns pointer the first element
* within this array.
*
@ -162,12 +180,12 @@ CreateShardSplitInfoSharedMemory(int stepCount, Size stepSize, dsm_handle *dsmHa
* dsmHandle - handle of the allocated shared memory segment
*/
ShardSplitInfo *
GetSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHandle)
CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHandle)
{
ShardSplitInfoSMHeader *shardSplitInfoSMHeader =
CreateShardSplitInfoSharedMemory(shardSplitInfoCount,
sizeof(ShardSplitInfo),
dsmHandle);
AllocateSharedMemoryForShardSplitInfo(shardSplitInfoCount,
sizeof(ShardSplitInfo),
dsmHandle);
ShardSplitInfo *shardSplitInfoSMArray =
(ShardSplitInfo *) ShardSplitInfoSMSteps(shardSplitInfoSMHeader);
@ -181,7 +199,7 @@ GetSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHandle)
* right after the header, so this function is trivial. The main purpose of
* this function is to make the intent clear to readers of the code.
*/
void *
static void *
ShardSplitInfoSMSteps(ShardSplitInfoSMHeader *shardSplitInfoSMHeader)
{
return shardSplitInfoSMHeader + 1;

View File

@ -30,27 +30,11 @@ typedef struct ShardSplitInfoSMHeader
/* Functions for creating and accessing shared memory segments */
extern ShardSplitInfoSMHeader * CreateShardSplitInfoSharedMemory(int stepCount,
Size stepSize,
dsm_handle *dsmHandle);
extern ShardSplitInfo * GetSharedMemoryForShardSplitInfo(int shardSplitInfoCount,
dsm_handle *dsmHandle);
extern ShardSplitInfo * CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount,
dsm_handle *dsmHandle);
extern ShardSplitInfo * GetShardSplitInfoSMArrayForSlot(char *slotName, int *arraySize);
extern dsm_handle GetSMHandleFromSlotName(char *slotName);
/*
* ShardSplitInfoSMSteps returns a pointer to the array of shard split info
* steps that are stored in shared memory.
*/
extern void * ShardSplitInfoSMSteps(ShardSplitInfoSMHeader *shardSplitInfoSMHeader);
extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle
dsmHandle,
dsm_segment **
attachedSegment);
/* Functions related to encoding-decoding for replication slot name */
char * encode_replication_slot(uint64_t nodeId,