Rename method to IsCommitRecursive

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-06-01 23:13:33 +05:30
parent f03a0343bc
commit 01540d78d6
2 changed files with 10 additions and 19 deletions

View File

@ -33,7 +33,7 @@ static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change); Relation relation, ReorderBufferChange *change);
/* Helper methods */ /* Helper methods */
static bool ShouldCommitBeApplied(Relation sourceShardRelation); static bool IsCommitRecursive(Relation sourceShardRelation);
static int32_t GetHashValueForIncomingTuple(Relation sourceShardRelation, static int32_t GetHashValueForIncomingTuple(Relation sourceShardRelation,
HeapTuple tuple, HeapTuple tuple,
bool *shouldHandleUpdate); bool *shouldHandleUpdate);
@ -183,29 +183,28 @@ FindTargetRelationOid(Relation sourceShardRelation,
/* /*
* ShouldCommitBeApplied avoids recursive commit case when source shard and * IsCommitRecursive returns true when commit is recursive. When the source shard
* new split child shards are placed on the same node. When the source shard
* recives a commit(1), the WAL sender processes this commit message. This * recives a commit(1), the WAL sender processes this commit message. This
* commit is applied to a child shard which is placed on the same node as a * commit is applied to a child shard which is placed on the same node as a
* part of replication. This in turn creates one more commit(2). * part of replication. This in turn creates one more commit(2) which is recursive in nature.
* Commit 2 should be skipped as the source shard and destination for commit 2 * Commit 2 should be skipped as the source shard and destination for commit 2
* are same and the commit has already been applied. * are same and the commit has already been applied.
*/ */
bool bool
ShouldCommitBeApplied(Relation sourceShardRelation) IsCommitRecursive(Relation sourceShardRelation)
{ {
Oid sourceShardOid = sourceShardRelation->rd_id; Oid sourceShardOid = sourceShardRelation->rd_id;
for (int i = 0; i < shardSplitInfoArraySize; i++) for (int i = 0; i < shardSplitInfoArraySize; i++)
{ {
/* skip the commit when destination is equal to the source */ /* skip the commit when destination is equal to the source */
ShardSplitInfo *shardSplitInfo = &shardSplitInfoArray[i]; ShardSplitInfo *shardSplitInfo = &shardSplitInfoArray[i];
if (shardSplitInfo->splitChildShardOid == sourceShardOid) if (sourceShardOid == shardSplitInfo->splitChildShardOid)
{ {
return false; return true;
} }
} }
return true; return false;
} }
@ -229,13 +228,14 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
&shardSplitInfoArraySize); &shardSplitInfoArraySize);
} }
char *replicationSlotName = ctx->slot->data.name.data; /* avoid applying recursive commit */
if (!ShouldCommitBeApplied(relation)) if (IsCommitRecursive(relation))
{ {
return; return;
} }
Oid targetRelationOid = InvalidOid; Oid targetRelationOid = InvalidOid;
char *replicationSlotName = ctx->slot->data.name.data;
switch (change->action) switch (change->action)
{ {
case REORDER_BUFFER_CHANGE_INSERT: case REORDER_BUFFER_CHANGE_INSERT:

View File

@ -202,15 +202,6 @@ decode_replication_slot(char *slotName,
uint32_t *nodeId, uint32_t *nodeId,
dsm_handle *dsmHandle) dsm_handle *dsmHandle)
{ {
if (slotName == NULL ||
nodeId == NULL ||
dsmHandle == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid Out parameters")));
}
int index = 0; int index = 0;
char *strtokPosition = NULL; char *strtokPosition = NULL;
char *dupSlotName = pstrdup(slotName); char *dupSlotName = pstrdup(slotName);