users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-06-29 17:03:03 +05:30
parent b6662ff872
commit ea9eb103e0
5 changed files with 10 additions and 17 deletions

View File

@ -3,8 +3,7 @@
* shardsplit_decoder.c * shardsplit_decoder.c
* Logical Replication output plugin * Logical Replication output plugin
* *
* IDENTIFICATION * Copyright (c) Citus Data, Inc.
* src/backend/distributed/shardsplit/shardsplit_decoder.c
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -80,6 +79,8 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
return; return;
} }
char *replicationSlotName = ctx->slot->data.name.data;
/* /*
* Initialize SourceToDestinationShardMap if not already initialized. * Initialize SourceToDestinationShardMap if not already initialized.
* This gets initialized during the replication of first message. * This gets initialized during the replication of first message.
@ -87,11 +88,10 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (SourceToDestinationShardMap == NULL) if (SourceToDestinationShardMap == NULL)
{ {
SourceToDestinationShardMap = PopulateSourceToDestinationShardMapForSlot( SourceToDestinationShardMap = PopulateSourceToDestinationShardMapForSlot(
ctx->slot->data.name.data); replicationSlotName, TopMemoryContext);
} }
Oid targetRelationOid = InvalidOid; Oid targetRelationOid = InvalidOid;
char *replicationSlotName = ctx->slot->data.name.data;
switch (change->action) switch (change->action)
{ {
case REORDER_BUFFER_CHANGE_INSERT: case REORDER_BUFFER_CHANGE_INSERT:

View File

@ -236,7 +236,7 @@ ShardSplitShmemInit(void)
if (!alreadyInitialized) if (!alreadyInitialized)
{ {
char *trancheName = "Split_Shard_Setup_Tranche"; char *trancheName = "Split Shard Setup Tranche";
NamedLWLockTranche *namedLockTranche = NamedLWLockTranche *namedLockTranche =
&smData->namedLockTranche; &smData->namedLockTranche;
@ -341,20 +341,20 @@ GetShardSplitSharedMemoryHandle(void)
* To populate the map, the function traverses 'ShardSplitInfo' array stored within shared memory segment. * To populate the map, the function traverses 'ShardSplitInfo' array stored within shared memory segment.
*/ */
HTAB * HTAB *
PopulateSourceToDestinationShardMapForSlot(char *slotName) PopulateSourceToDestinationShardMapForSlot(char *slotName, MemoryContext cxt)
{ {
HASHCTL info; HASHCTL info;
memset(&info, 0, sizeof(info)); memset(&info, 0, sizeof(info));
info.keysize = sizeof(Oid); info.keysize = sizeof(Oid);
info.entrysize = sizeof(SourceToDestinationShardMapEntry); info.entrysize = sizeof(SourceToDestinationShardMapEntry);
info.hash = uint32_hash; info.hash = uint32_hash;
info.hcxt = TopMemoryContext; info.hcxt = cxt;
int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION); int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION);
HTAB *sourceShardToDesShardMap = hash_create("SourceToDestinationShardMap", 128, HTAB *sourceShardToDesShardMap = hash_create("SourceToDestinationShardMap", 128,
&info, hashFlags); &info, hashFlags);
MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext); MemoryContext oldContext = MemoryContextSwitchTo(cxt);
ShardSplitInfoSMHeader *smHeader = GetShardSplitInfoSMHeader(); ShardSplitInfoSMHeader *smHeader = GetShardSplitInfoSMHeader();
for (int index = 0; index < smHeader->count; index++) for (int index = 0; index < smHeader->count; index++)

View File

@ -1,7 +1,3 @@
DROP TYPE IF EXISTS citus.split_shard_info;
DROP FUNCTION IF EXISTS pg_catalog.worker_split_shard_replication_setup;
CREATE TYPE citus.split_shard_info AS ( CREATE TYPE citus.split_shard_info AS (
source_shard_id bigint, source_shard_id bigint,
child_shard_id bigint, child_shard_id bigint,

View File

@ -1,7 +1,3 @@
DROP TYPE IF EXISTS citus.split_shard_info;
DROP FUNCTION IF EXISTS pg_catalog.worker_split_shard_replication_setup;
CREATE TYPE citus.split_shard_info AS ( CREATE TYPE citus.split_shard_info AS (
source_shard_id bigint, source_shard_id bigint,
child_shard_id bigint, child_shard_id bigint,

View File

@ -62,7 +62,8 @@ extern void ReleaseSharedMemoryOfShardSplitInfo(void);
extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeader(void); extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeader(void);
extern HTAB * PopulateSourceToDestinationShardMapForSlot(char *slotName); extern HTAB * PopulateSourceToDestinationShardMapForSlot(char *slotName, MemoryContext
cxt);
char * encode_replication_slot(uint32_t nodeId, uint32_t tableOwnerId); char * encode_replication_slot(uint32_t nodeId, uint32_t tableOwnerId);
#endif /* SHARDSPLIT_SHARED_MEMORY_H */ #endif /* SHARDSPLIT_SHARED_MEMORY_H */