Address review comments(partially done)

- Parameterize testcase methods
- remove code for handline 'UPDATE'.
- replication slot encoding/decoding logic changed now
users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-05-24 20:02:00 +05:30
parent 7a61bf1082
commit 75c6484e02
5 changed files with 36 additions and 136 deletions

View File

@ -48,27 +48,28 @@ static void ParseShardSplitInfo(ArrayType *shardInfoArrayObject,
int32 *minValue,
int32 *maxValue,
int32 *nodeId);
static ShardSplitInfo * GetShardSplitInfo(uint64 sourceShardIdToSplit,
static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit,
uint64 desSplitChildShardId,
int32 minValue,
int32 maxValue,
int32 nodeId);
static void AddShardSplitInfoEntryForNode(ShardSplitInfo *shardSplitInfo);
static void * CopyShardSplitInfoToSM(ShardSplitInfo *shardSplitInfoArray,
static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo);
static void * PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
HTAB *shardInfoHashMap,
dsm_handle dsmHandle,
int shardSplitInfoCount);
static HTAB * SetupHashMapForShardInfo();
static void SetupHashMapForShardInfo();
/*
* split_shard_replication_setup UDF creates in-memory data structures
* to store the meta information about the new shards and their placements
* required during the catch up phase of logical replication.
* to store the meta information about the shard undergoing split and new split
* children along with their placements required during the catch up phase
* of logical replication.
* This meta information is stored in a shared memory segment and accessed
* by logical decoding plugin.
*
* Split information is given by user as an Array in the below format
* [{sourceShardId, childShardID, minValue, maxValue, Destination NodeId}]
* [{sourceShardId, childShardId, minValue, maxValue, Destination NodeId}]
*
* sourceShardId - id of the shard that is undergoing a split
* childShardId - id of shard that stores a specific range of values
@ -83,7 +84,7 @@ static HTAB * SetupHashMapForShardInfo();
* Multiple shards can be placed on the same destiation node. Source and
* destinations nodes can be same too.
*
* There is a 1-1 mapping between a node and a replication slot as one replication
* There is a 1-1 mapping between a target node and a replication slot as one replication
* slot takes care of replicating changes for one node.
* The 'logical_decoding_plugin' consumes this information and routes the tuple
* from the source shard to the appropriate destination shard that falls in the
@ -94,7 +95,6 @@ split_shard_replication_setup(PG_FUNCTION_ARGS)
{
ArrayType *shardInfoArrayObject = PG_GETARG_ARRAYTYPE_P(0);
int shardInfoArrayLength = ARR_DIMS(shardInfoArrayObject)[0];
int insideCount = ARR_DIMS(shardInfoArrayObject)[1];
/* SetupMap */
SetupHashMapForShardInfo();
@ -117,14 +117,14 @@ split_shard_replication_setup(PG_FUNCTION_ARGS)
&maxValue,
&nodeId);
ShardSplitInfo *shardSplitInfo = GetShardSplitInfo(
ShardSplitInfo *shardSplitInfo = CreateShardSplitInfo(
sourceShardId,
desShardId,
minValue,
maxValue,
nodeId);
AddShardSplitInfoEntryForNode(shardSplitInfo);
AddShardSplitInfoEntryForNodeInMap(shardSplitInfo);
shardSplitInfoCount++;
}
@ -132,7 +132,7 @@ split_shard_replication_setup(PG_FUNCTION_ARGS)
ShardSplitInfo *splitShardInfoSMArray =
GetSharedMemoryForShardSplitInfo(shardSplitInfoCount, &dsmHandle);
CopyShardSplitInfoToSM(splitShardInfoSMArray,
PopulateShardSplitInfoInSM(splitShardInfoSMArray,
ShardInfoHashMap,
dsmHandle,
shardSplitInfoCount);
@ -147,7 +147,7 @@ split_shard_replication_setup(PG_FUNCTION_ARGS)
* is 'nodeId' and value is a list of ShardSplitInfo that are placed on
* this particular node.
*/
static HTAB *
static void
SetupHashMapForShardInfo()
{
HASHCTL info;
@ -159,7 +159,6 @@ SetupHashMapForShardInfo()
int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION);
ShardInfoHashMap = hash_create("ShardInfoMap", 128, &info, hashFlags);
return ShardInfoHashMap;
}
@ -173,7 +172,7 @@ ParseShardSplitInfo(ArrayType *shardInfoArrayObject,
int32 *nodeId)
{
Oid elemtypeId = ARR_ELEMTYPE(shardInfoArrayObject);
int16 elemtypeLength = 0;
int elemtypeLength = 0;
bool elemtypeByValue = false;
char elemtypeAlignment = 0;
get_typlenbyvalalign(elemtypeId, &elemtypeLength, &elemtypeByValue,
@ -304,7 +303,7 @@ ParseShardSplitInfo(ArrayType *shardInfoArrayObject,
/*
* GetShardSplitInfo function constructs ShardSplitInfo data structure
* CreateShardSplitInfo function constructs ShardSplitInfo data structure
* with appropriate OIs' for source and destination relation.
*
* sourceShardIdToSplit - Existing shardId which has a valid entry in cache and catalogue
@ -315,7 +314,7 @@ ParseShardSplitInfo(ArrayType *shardInfoArrayObject,
* However we can use shard ID and construct qualified shardName.
*/
ShardSplitInfo *
GetShardSplitInfo(uint64 sourceShardIdToSplit,
CreateShardSplitInfo(uint64 sourceShardIdToSplit,
uint64 desSplitChildShardId,
int32 minValue,
int32 maxValue,
@ -386,11 +385,11 @@ GetShardSplitInfo(uint64 sourceShardIdToSplit,
/*
* AddShardSplitInfoEntryForNode function add's ShardSplitInfo entry
* AddShardSplitInfoEntryForNodeInMap function add's ShardSplitInfo entry
* to the hash map. The key is nodeId on which the new shard is to be placed.
*/
void
AddShardSplitInfoEntryForNode(ShardSplitInfo *shardSplitInfo)
AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo)
{
uint64_t keyNodeId = shardSplitInfo->nodeId;
bool found = false;
@ -412,7 +411,7 @@ AddShardSplitInfoEntryForNode(ShardSplitInfo *shardSplitInfo)
/*
* CopyShardSplitInfoToSM function copies information from the hash map
* PopulateShardSplitInfoInSM function copies information from the hash map
* into shared memory segment. This information is consumed by the WAL sender
* process during logical replication.
*
@ -425,7 +424,7 @@ AddShardSplitInfoEntryForNode(ShardSplitInfo *shardSplitInfo)
* dsmHandle - Shared memory segment handle
*/
void *
CopyShardSplitInfoToSM(ShardSplitInfo *shardSplitInfoArray,
PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
HTAB *shardInfoHashMap,
dsm_handle dsmHandle,
int shardSplitInfoCount)
@ -439,9 +438,7 @@ CopyShardSplitInfoToSM(ShardSplitInfo *shardSplitInfoArray,
{
uint64_t nodeId = entry->key;
char *derivedSlotName =
encode_replication_slot(nodeId,
SLOT_HANDLING_INSERT_AND_DELETE,
dsmHandle);
encode_replication_slot(nodeId, dsmHandle);
List *shardSplitInfoList = entry->shardSplitInfoList;
ListCell *listCell = NULL;

View File

@ -7,7 +7,6 @@
# src/backend/replication/pgoutput
#
#-------------------------------------------------------------------------
citus_subdir = src/backend/distributed/shardsplit
citus_top_builddir = ../../../..
safestringlib_srcdir = $(citus_abs_top_srcdir)/vendor/safestringlib
SUBDIRS = . safeclib

View File

@ -43,13 +43,12 @@ int shardSplitInfoArraySize = 0;
/* Plugin callback */
static void split_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
/* Helper methods */
static bool ShouldSlotHandleChange(char *slotName, ReorderBufferChange *change);
static bool ShouldCommitBeApplied(Relation sourceShardRelation);
static int GetHashValueForIncomingTuple(Relation sourceShardRelation,
static int32_t GetHashValueForIncomingTuple(Relation sourceShardRelation,
HeapTuple tuple,
bool *shouldHandleUpdate);
@ -73,7 +72,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
pgoutputChangeCB = cb->change_cb;
cb->change_cb = split_change;
cb->change_cb = split_change_cb;
}
@ -83,7 +82,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
* handled as the incoming committed change would belong to a relation
* that is not under going split.
*/
static int
static int32_t
GetHashValueForIncomingTuple(Relation sourceShardRelation,
HeapTuple tuple,
bool *shouldHandleChange)
@ -139,7 +138,7 @@ GetHashValueForIncomingTuple(Relation sourceShardRelation,
/* get hashed value of the distribution value */
Datum hashedValueDatum = FunctionCall1(hashFunction, partitionColumnValue);
int hashedValue = DatumGetInt32(hashedValueDatum);
int32_t hashedValue = DatumGetInt32(hashedValueDatum);
*shouldHandleChange = true;
@ -226,42 +225,12 @@ ShouldCommitBeApplied(Relation sourceShardRelation)
}
bool
ShouldSlotHandleChange(char *slotName, ReorderBufferChange *change)
{
if (slotName == NULL)
{
ereport(ERROR, errmsg("Invalid null replication slot name."));
}
uint64_t nodeId = 0;
uint32_t slotType = 0;
dsm_handle dsmHandle;
/* change this to enum */
decode_replication_slot(slotName, &nodeId, &slotType, &dsmHandle);
if (slotType != SLOT_HANDLING_INSERT_AND_DELETE &&
slotType != SLOT_HANDLING_DELETE_OF_UPDATE)
{
ereport(ERROR, errmsg("Invalid replication slot type."));
}
if (slotType == SLOT_HANDLING_DELETE_OF_UPDATE &&
change->action != REORDER_BUFFER_CHANGE_UPDATE)
{
return false;
}
return true;
}
/*
* split_change function emits the incoming tuple change
* to the appropriate destination shard.
*/
static void
split_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
/*
@ -279,21 +248,11 @@ split_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
char *replicationSlotName = ctx->slot->data.name.data;
bool shouldHandleChanges = false;
if (!ShouldSlotHandleChange(replicationSlotName, change))
{
return;
}
if (!ShouldCommitBeApplied(relation))
{
return;
}
uint64_t nodeId = 0;
uint32 slotType = 0;
dsm_handle dsmHandle = 0;
decode_replication_slot(replicationSlotName, &nodeId, &slotType, &dsmHandle);
Oid targetRelationOid = InvalidOid;
switch (change->action)
{
@ -305,36 +264,6 @@ split_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
break;
}
case REORDER_BUFFER_CHANGE_UPDATE:
{
switch (slotType)
{
case SLOT_HANDLING_INSERT_AND_DELETE:
{
HeapTuple newTuple = &(change->data.tp.newtuple->tuple);
Oid destinationForInsert = FindTargetRelationOid(relation, newTuple,
replicationSlotName);
targetRelationOid = destinationForInsert;
change->action = REORDER_BUFFER_CHANGE_INSERT;
break;
}
case SLOT_HANDLING_DELETE_OF_UPDATE:
{
char *modifiedSlotName = encode_replication_slot(nodeId, 0,
dsmHandle);
HeapTuple oldTuple = &(change->data.tp.oldtuple->tuple);
Oid destinationForDelete = FindTargetRelationOid(relation, oldTuple,
modifiedSlotName);
targetRelationOid = destinationForDelete;
change->action = REORDER_BUFFER_CHANGE_DELETE;
break;
}
}
break;
}
case REORDER_BUFFER_CHANGE_DELETE:
{
HeapTuple oldTuple = &(change->data.tp.oldtuple->tuple);
@ -343,8 +272,12 @@ split_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
break;
}
}
/* Only INSERT/DELETE are visible in the replication path of split shard */
default:
Assert(false);
}
if (targetRelationOid != InvalidOid)
{
Relation targetRelation = RelationIdGetRelation(targetRelationOid);

View File

@ -97,9 +97,8 @@ GetSMHandleFromSlotName(char *slotName)
}
uint64_t nodeId = 0;
uint32_t type = 0;
dsm_handle handle = 0;
decode_replication_slot(slotName, &nodeId, &type, &handle);
decode_replication_slot(slotName, &nodeId, &handle);
return handle;
}
@ -192,27 +191,25 @@ ShardSplitInfoSMSteps(ShardSplitInfoSMHeader *shardSplitInfoSMHeader)
/*
* encode_replication_slot returns an encoded replication slot name
* in the following format.
* Slot Name = NodeId_ReplicationSlotType_SharedMemoryHandle
* Slot Name = NodeId_SharedMemoryHandle
*/
char *
encode_replication_slot(uint64_t nodeId,
uint32 slotType,
dsm_handle dsmHandle)
{
StringInfo slotName = makeStringInfo();
appendStringInfo(slotName, "%ld_%u_%u", nodeId, slotType, dsmHandle);
appendStringInfo(slotName, "%ld_%u", nodeId, dsmHandle);
return slotName->data;
}
/*
* decode_replication_slot decodes the replication slot name
* into node id, slotType, shared memory handle.
* into node id, shared memory handle.
*/
void
decode_replication_slot(char *slotName,
uint64_t *nodeId,
uint32_t *slotType,
dsm_handle *dsmHandle)
{
if (slotName == NULL)
@ -233,10 +230,6 @@ decode_replication_slot(char *slotName,
*nodeId = SafeStringToUint64(slotNameString);
}
else if (index == 1)
{
*slotType = strtoul(slotNameString, NULL, 10);
}
else if (index == 2)
{
*dsmHandle = strtoul(slotNameString, NULL, 10);
}

View File

@ -52,33 +52,11 @@ extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handl
dsm_segment **
attachedSegment);
/*
* An UPADATE request for a partition key, is realized as 'DELETE' on
* old shard and 'INSERT' on new shard. So a commit of UPDATE has to be
* seggrated in two replication messages. WAL sender belonging to a
* replication slot can send only one message and hence to handle UPDATE we
* have to create one extra replication slot per node that handles the deletion
* part of an UPDATE.
*
* SLOT_HANDING_INSERT_AND_DELETE - Responsible for handling INSERT and DELETE
* operations.
* SLOT_HANDLING_DELETE_OF_UPDATE - Responsible for only handling DELETE on old shard
* for an UPDATE. Its a no-op for INSERT and DELETE
* operations.
*/
enum ReplicationSlotType
{
SLOT_HANDLING_INSERT_AND_DELETE,
SLOT_HANDLING_DELETE_OF_UPDATE
};
/* Functions related to encoding-decoding for replication slot name */
char * encode_replication_slot(uint64_t nodeId,
uint32 slotType,
dsm_handle dsmHandle);
void decode_replication_slot(char *slotName,
uint64_t *nodeId,
uint32_t *slotType,
dsm_handle *dsmHandle);
#endif /* SHARDSPLIT_SHARED_MEMORY_H */