Add hashmap, cleanup methods

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-06-29 12:13:51 +05:30
parent 213b15071b
commit b6662ff872
5 changed files with 178 additions and 231 deletions

View File

@ -11,6 +11,7 @@
#include "postgres.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shardsplit_shared_memory.h"
#include "distributed/listutils.h"
#include "replication/logical.h"
/*
@ -23,18 +24,17 @@ PG_MODULE_MAGIC;
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
static LogicalDecodeChangeCB pgoutputChangeCB;
static ShardSplitInfoSMHeader *ShardSplitInfo_SMHeader = NULL;
static ShardSplitInfoForReplicationSlot *ShardSplitInfoForSlot = NULL;
static HTAB *SourceToDestinationShardMap = NULL;
/* Plugin callback */
static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
/* Helper methods */
static bool IsCommitRecursive(Relation sourceShardRelation);
static int32_t GetHashValueForIncomingTuple(Relation sourceShardRelation,
HeapTuple tuple,
bool *shouldHandleUpdate);
int partitionColumIndex,
Oid distributedTableOid);
static Oid FindTargetRelationOid(Relation sourceShardRelation,
HeapTuple tuple,
@ -67,151 +67,6 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
}
/*
* GetHashValueForIncomingTuple returns the hash value of the partition
* column for the incoming tuple. It also checks if the change should be
* handled as the incoming committed change can belong to a relation
* that is not under going split.
*/
static int32_t
GetHashValueForIncomingTuple(Relation sourceShardRelation,
HeapTuple tuple,
bool *shouldHandleChange)
{
ShardSplitInfo *shardSplitInfo = NULL;
int partitionColumnIndex = -1;
Oid distributedTableOid = InvalidOid;
Oid sourceShardOid = sourceShardRelation->rd_id;
for (int i = ShardSplitInfoForSlot->startIndex; i <= ShardSplitInfoForSlot->endIndex;
i++)
{
shardSplitInfo = &ShardSplitInfo_SMHeader->splitInfoArray[i];
if (shardSplitInfo->sourceShardOid == sourceShardOid)
{
distributedTableOid = shardSplitInfo->distributedTableOid;
partitionColumnIndex = shardSplitInfo->partitionColumnIndex;
break;
}
}
/*
* The commit can belong to any other table that is not going
* under split. Ignore such commit's.
*/
if (partitionColumnIndex == -1 ||
distributedTableOid == InvalidOid)
{
/*
* TODO(saawasek): Change below warning to DEBUG once more test case
* are added.
*/
ereport(WARNING, errmsg("Skipping Commit as "
"Relation: %s isn't splitting",
RelationGetRelationName(sourceShardRelation)));
*shouldHandleChange = false;
return 0;
}
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableOid);
if (cacheEntry == NULL)
{
ereport(ERROR, errmsg("null entry found for cacheEntry"));
}
TupleDesc relationTupleDes = RelationGetDescr(sourceShardRelation);
bool isNull = false;
Datum partitionColumnValue = heap_getattr(tuple,
partitionColumnIndex + 1,
relationTupleDes,
&isNull);
FmgrInfo *hashFunction = cacheEntry->hashFunction;
/* get hashed value of the distribution value */
Datum hashedValueDatum = FunctionCall1(hashFunction, partitionColumnValue);
int32_t hashedValue = DatumGetInt32(hashedValueDatum);
*shouldHandleChange = true;
return hashedValue;
}
/*
* FindTargetRelationOid returns the destination relation Oid for the incoming
* tuple.
* sourceShardRelation - Relation on which a commit has happened.
* tuple - changed tuple.
* currentSlotName - Name of replication slot that is processing this update.
*/
static Oid
FindTargetRelationOid(Relation sourceShardRelation,
HeapTuple tuple,
char *currentSlotName)
{
Oid targetRelationOid = InvalidOid;
Oid sourceShardRelationOid = sourceShardRelation->rd_id;
bool shouldHandleUpdate = false;
int hashValue = GetHashValueForIncomingTuple(sourceShardRelation, tuple,
&shouldHandleUpdate);
if (shouldHandleUpdate == false)
{
return InvalidOid;
}
for (int i = ShardSplitInfoForSlot->startIndex; i <= ShardSplitInfoForSlot->endIndex;
i++)
{
ShardSplitInfo *shardSplitInfo = &ShardSplitInfo_SMHeader->splitInfoArray[i];
/*
* Each commit message is processed by all the configured replication slots.
* A replication slot is responsible for shard placements belonging to unique
* table owner and nodeId combination. We check if the current slot which is
* processing the commit should emit a target relation Oid.
*/
if (shardSplitInfo->sourceShardOid == sourceShardRelationOid &&
shardSplitInfo->shardMinValue <= hashValue &&
shardSplitInfo->shardMaxValue >= hashValue)
{
targetRelationOid = shardSplitInfo->splitChildShardOid;
break;
}
}
return targetRelationOid;
}
/*
* IsCommitRecursive returns true when commit is recursive. When the source shard
* recives a commit(1), the WAL sender processes this commit message. This
* commit is applied to a child shard which is placed on the same node as a
* part of replication. This in turn creates one more commit(2) which is recursive in nature.
* Commit 2 should be skipped as the source shard and destination for commit 2
* are same and the commit has already been applied.
*/
bool
IsCommitRecursive(Relation sourceShardRelation)
{
Oid sourceShardOid = sourceShardRelation->rd_id;
for (int i = ShardSplitInfoForSlot->startIndex; i <= ShardSplitInfoForSlot->endIndex;
i++)
{
/* skip the commit when destination is equal to the source */
ShardSplitInfo *shardSplitInfo = &ShardSplitInfo_SMHeader->splitInfoArray[i];
if (sourceShardOid == shardSplitInfo->splitChildShardOid)
{
return true;
}
}
return false;
}
/*
* split_change function emits the incoming tuple change
* to the appropriate destination shard.
@ -226,19 +81,13 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
/*
* Get ShardSplitInfoForSlot if not already initialized.
* Initialize SourceToDestinationShardMap if not already initialized.
* This gets initialized during the replication of first message.
*/
if (ShardSplitInfoForSlot == NULL)
if (SourceToDestinationShardMap == NULL)
{
ShardSplitInfoForSlot = PopulateShardSplitInfoForReplicationSlot(
SourceToDestinationShardMap = PopulateSourceToDestinationShardMapForSlot(
ctx->slot->data.name.data);
ShardSplitInfo_SMHeader = ShardSplitInfoForSlot->shardSplitInfoHeader;
}
if (IsCommitRecursive(relation))
{
return;
}
Oid targetRelationOid = InvalidOid;
@ -288,3 +137,95 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
pgoutputChangeCB(ctx, txn, targetRelation, change);
RelationClose(targetRelation);
}
/*
* FindTargetRelationOid returns the destination relation Oid for the incoming
* tuple.
* sourceShardRelation - Relation on which a commit has happened.
* tuple - changed tuple.
* currentSlotName - Name of replication slot that is processing this update.
*/
static Oid
FindTargetRelationOid(Relation sourceShardRelation,
HeapTuple tuple,
char *currentSlotName)
{
Oid targetRelationOid = InvalidOid;
Oid sourceShardRelationOid = sourceShardRelation->rd_id;
/* Get child shard list for source(parent) shard from hashmap*/
bool found = false;
SourceToDestinationShardMapEntry *entry =
(SourceToDestinationShardMapEntry *) hash_search(
SourceToDestinationShardMap, &sourceShardRelationOid, HASH_FIND, &found);
/*
* Source shard Oid might not exist in the hash map. This can happen
* in below cases:
* 1) The commit can belong to any other table that is not under going split.
* 2) The commit can be recursive in nature. When the source shard
* receives a commit(a), the WAL sender processes this commit message. This
* commit is applied to a child shard which is placed on the same node as a
* part of replication. This in turn creates one more commit(b) which is recursive in nature.
* Commit 'b' should be skipped as the source shard and destination for commit 'b'
* are same and the commit has already been applied.
*/
if (!found)
{
return InvalidOid;
}
ShardSplitInfo *shardSplitInfo = (ShardSplitInfo *) lfirst(list_head(
entry->
shardSplitInfoList));
int hashValue = GetHashValueForIncomingTuple(sourceShardRelation, tuple,
shardSplitInfo->partitionColumnIndex,
shardSplitInfo->distributedTableOid);
shardSplitInfo = NULL;
foreach_ptr(shardSplitInfo, entry->shardSplitInfoList)
{
if (shardSplitInfo->shardMinValue <= hashValue &&
shardSplitInfo->shardMaxValue >= hashValue)
{
targetRelationOid = shardSplitInfo->splitChildShardOid;
break;
}
}
return targetRelationOid;
}
/*
* GetHashValueForIncomingTuple returns the hash value of the partition
* column for the incoming tuple.
*/
static int32_t
GetHashValueForIncomingTuple(Relation sourceShardRelation,
HeapTuple tuple,
int partitionColumnIndex,
Oid distributedTableOid)
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableOid);
if (cacheEntry == NULL)
{
ereport(ERROR, errmsg(
"Expected valid Citus Cache entry to be present. But found null"));
}
TupleDesc relationTupleDes = RelationGetDescr(sourceShardRelation);
bool isNull = false;
Datum partitionColumnValue = heap_getattr(tuple,
partitionColumnIndex + 1,
relationTupleDes,
&isNull);
FmgrInfo *hashFunction = cacheEntry->hashFunction;
/* get hashed value of the distribution value */
Datum hashedValueDatum = FunctionCall1(hashFunction, partitionColumnValue);
return DatumGetInt32(hashedValueDatum);
}

View File

@ -17,6 +17,7 @@
#include "distributed/citus_safe_lib.h"
#include "storage/ipc.h"
#include "utils/memutils.h"
#include "common/hashfn.h"
const char *SharedMemoryNameForHandleManagement =
"Shared memory handle for shard split";
@ -82,15 +83,8 @@ GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle)
* GetShardSplitInfoSMHeader returns pointer to the header of shared memory segment.
*/
ShardSplitInfoSMHeader *
GetShardSplitInfoSMHeader(char *slotName)
GetShardSplitInfoSMHeader()
{
if (slotName == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("Expected slot name but found NULL")));
}
dsm_handle dsmHandle = GetShardSplitSharedMemoryHandle();
ShardSplitInfoSMHeader *shardSplitInfoSMHeader =
@ -168,6 +162,32 @@ CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHand
}
/*
* ReleaseSharedMemoryOfShardSplitInfo releases(unpins) the dynamic shared memory segment
* allocated by 'worker_split_shard_replication_setup'. This shared memory was pinned
* to Postmaster process and is valid till Postmaster shutsdown or
* explicitly unpinned by calling 'dsm_unpin_segment'.
*/
void
ReleaseSharedMemoryOfShardSplitInfo()
{
/* Get handle of dynamic shared memory segment*/
dsm_handle dsmHandle = GetShardSplitSharedMemoryHandle();
/*
* Unpin the dynamic shared memory segment. 'dsm_pin_segment' was
* called previously by 'AllocateSharedMemoryForShardSplitInfo'.
*/
dsm_unpin_segment(dsmHandle);
/*
* As dynamic shared memory is unpinned, store an invalid handle in static
* shared memory used for handle management.
*/
StoreShardSplitSharedMemoryHandle(DSM_HANDLE_INVALID);
}
/*
* encode_replication_slot returns an encoded replication slot name
* in the following format.
@ -244,8 +264,8 @@ ShardSplitShmemInit(void)
/*
* StoreShardSplitSharedMemoryHandle stores a handle of shared memory
* allocated and populated by 'worker_split_shard_replication_setup' UDF.
* This handle is stored in a different shared memory segment with name
* 'Shared memory handle for shard split'.
* This handle is stored in a different statically allocated shared memory
* segment with name 'Shared memory handle for shard split'.
*/
void
StoreShardSplitSharedMemoryHandle(dsm_handle dsmHandle)
@ -277,8 +297,8 @@ StoreShardSplitSharedMemoryHandle(dsm_handle dsmHandle)
{
ereport(WARNING,
errmsg(
"As a part of split shard workflow,unexpectedly found a valid"
" shared memory handle while storing a new one."));
"Previous split shard worflow was not successfully and could not complete the cleanup phase."
" Continuing with the current split shard workflow."));
}
/* Store the incoming handle */
@ -289,9 +309,9 @@ StoreShardSplitSharedMemoryHandle(dsm_handle dsmHandle)
/*
* GetShardSplitSharedMemoryHandle returns the shared memory handle stored
* by 'worker_split_shard_replication_setup' UDF. This handle
* is requested by wal sender processes during logical replication phase.
* GetShardSplitSharedMemoryHandle returns the handle of dynamic shared memory segment stored
* by 'worker_split_shard_replication_setup' UDF. This handle is requested by WAL sender processes
* during logical replication phase or during cleanup.
*/
dsm_handle
GetShardSplitSharedMemoryHandle(void)
@ -316,64 +336,52 @@ GetShardSplitSharedMemoryHandle(void)
/*
* PopulateShardSplitInfoForReplicationSlot function traverses 'ShardSplitInfo' array
* stored within shared memory segment. It returns the starting and ending index position
* of a given slot within this array. When the given replication slot processes a commit,
* traversal is only limited within this bound thus enhancing performance.
* PopulateSourceToDestinationShardMapForSlot populates 'SourceToDestinationShard' hash map for a given slot.
* Key of the map is Oid of source shard which is undergoing a split and value is a list of corresponding child shards.
* To populate the map, the function traverses 'ShardSplitInfo' array stored within shared memory segment.
*/
ShardSplitInfoForReplicationSlot *
PopulateShardSplitInfoForReplicationSlot(char *slotName)
HTAB *
PopulateSourceToDestinationShardMapForSlot(char *slotName)
{
ShardSplitInfoSMHeader *smHeader = GetShardSplitInfoSMHeader(slotName);
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(Oid);
info.entrysize = sizeof(SourceToDestinationShardMapEntry);
info.hash = uint32_hash;
info.hcxt = TopMemoryContext;
int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION);
HTAB *sourceShardToDesShardMap = hash_create("SourceToDestinationShardMap", 128,
&info, hashFlags);
MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext);
ShardSplitInfoForReplicationSlot *infoForReplicationSlot =
(ShardSplitInfoForReplicationSlot *) palloc0(
sizeof(ShardSplitInfoForReplicationSlot));
infoForReplicationSlot->shardSplitInfoHeader = smHeader;
infoForReplicationSlot->startIndex = -1;
infoForReplicationSlot->endIndex = -1;
int splitInfoIndex = 0;
while (splitInfoIndex < smHeader->count)
ShardSplitInfoSMHeader *smHeader = GetShardSplitInfoSMHeader();
for (int index = 0; index < smHeader->count; index++)
{
if (strcmp(smHeader->splitInfoArray[splitInfoIndex].slotName, slotName) == 0)
if (strcmp(smHeader->splitInfoArray[index].slotName, slotName) == 0)
{
/* Found the starting index from where current slot information begins */
infoForReplicationSlot->startIndex = splitInfoIndex;
Oid sourceShardOid = smHeader->splitInfoArray[index].sourceShardOid;
bool found = false;
SourceToDestinationShardMapEntry *entry =
(SourceToDestinationShardMapEntry *) hash_search(
sourceShardToDesShardMap, &sourceShardOid, HASH_ENTER, &found);
/* Slide forward to get the ending index */
splitInfoIndex++;
while (splitInfoIndex < smHeader->count && strcmp(
smHeader->splitInfoArray[splitInfoIndex].slotName, slotName) == 0)
if (!found)
{
splitInfoIndex++;
entry->shardSplitInfoList = NIL;
entry->sourceShardKey = sourceShardOid;
}
/* Found ending index */
infoForReplicationSlot->endIndex = splitInfoIndex - 1;
ShardSplitInfo *shardSplitInfoForSlot = (ShardSplitInfo *) palloc0(
sizeof(ShardSplitInfo));
*shardSplitInfoForSlot = smHeader->splitInfoArray[index];
/*
* 'ShardSplitInfo' with same slot name are stored contiguously in shared memory segment.
* After the current 'index' position, we should not encounter any 'ShardSplitInfo' with incoming slot name.
* If this happens, there is shared memory corruption. Its worth to go ahead and assert for this assumption.
*/
break;
entry->shardSplitInfoList = lappend(entry->shardSplitInfoList,
(ShardSplitInfo *) shardSplitInfoForSlot);
}
splitInfoIndex++;
}
if (infoForReplicationSlot->startIndex == -1)
{
ereport(ERROR,
(errmsg("Unexpectedly could not find information "
"corresponding to replication slot name:%s in shared memory.",
slotName)));
}
MemoryContextSwitchTo(oldContext);
return infoForReplicationSlot;
return sourceShardToDesShardMap;
}

View File

@ -29,17 +29,15 @@ typedef struct ShardSplitInfoSMHeader
* Shard split information is populated and stored in shared memory in the form of one dimensional
* array by 'worker_split_shard_replication_setup'. Information belonging to same replication
* slot is grouped together and stored contiguously within this array.
* 'ShardSplitInfoForReplicationSlot' stores the starting and ending indices for a particular
* replication slot within shared memory segment.
* When a slot processes a commit, traversing only within this boundary of shared memory segment
* improves performance.
* 'SourceToDestinationShardMap' maps list of child(destination) shards that should be processed by a replication
* slot corresponding to a parent(source) shard. When a parent shard receives a change, the decoder can use this map
* to traverse only the list of child shards corresponding the given parent.
*/
typedef struct ShardSplitInfoForReplicationSlot
typedef struct SourceToDestinationShardMapEntry
{
ShardSplitInfoSMHeader *shardSplitInfoHeader; /* shared memory segment header */
int startIndex; /* starting index for a given slot */
int endIndex; /* ending index for a given slot */
} ShardSplitInfoForReplicationSlot;
Oid sourceShardKey;
List *shardSplitInfoList;
} SourceToDestinationShardMapEntry;
typedef struct ShardSplitShmemData
{
@ -60,11 +58,11 @@ dsm_handle GetShardSplitSharedMemoryHandle(void);
extern ShardSplitInfoSMHeader * CreateSharedMemoryForShardSplitInfo(int
shardSplitInfoCount,
dsm_handle *dsmHandle);
extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeader(char *slotName);
extern void ReleaseSharedMemoryOfShardSplitInfo(void);
extern ShardSplitInfoForReplicationSlot * PopulateShardSplitInfoForReplicationSlot(
char *slotName);
extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeader(void);
extern HTAB * PopulateSourceToDestinationShardMapForSlot(char *slotName);
char * encode_replication_slot(uint32_t nodeId, uint32_t tableOwnerId);
#endif /* SHARDSPLIT_SHARED_MEMORY_H */

View File

@ -70,7 +70,7 @@ SELECT worker_split_shard_replication_setup(ARRAY[
ROW(7, 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(7, 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]);
WARNING: As a part of split shard workflow,unexpectedly found a valid shared memory handle while storing a new one.
WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow.
worker_split_shard_replication_setup
---------------------------------------------------------------------

View File

@ -12,7 +12,7 @@ SELECT worker_split_shard_replication_setup(ARRAY[
ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]);
WARNING: As a part of split shard workflow,unexpectedly found a valid shared memory handle while storing a new one.
WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow.
worker_split_shard_replication_setup
---------------------------------------------------------------------