Add UDF description

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-06-20 22:05:24 +05:30
parent a23beeb43f
commit 23237e50a2
4 changed files with 42 additions and 37 deletions

View File

@ -62,49 +62,43 @@ static int NodeShardMappingHashCompare(const void *left, const void *right, Size
/* /*
* worker_split_shard_replication_setup UDF creates in-memory data structures * worker_split_shard_replication_setup UDF creates in-memory data structures
* to store the meta information about the shard undergoing split and new split * to store the meta information about the shard undergoing split and new split
* children along with their placements required during the catch up phase * children along with their placements. This info is required during the catch up
* of logical replication. * phase of logical replication.
* This meta information is stored in a shared memory segment and accessed * This meta information is stored in a shared memory segment and accessed
* by logical decoding plugin. * by logical decoding plugin.
* *
* Split information is given by user as an Array of source shards undergoing splits * Split information is given by user as an Array of custom data type 'citus.split_shard_info'.
* in the below format. * (worker_split_shard_replication_setup(citus.split_shard_info[]))
* Array[Array[sourceShardId, childShardId, minValue, maxValue, Destination NodeId]]
* *
* sourceShardId - id of the shard that is undergoing a split * Fields of custom data type 'citus.split_shard_info':
* childShardId - id of shard that stores a specific range of values * source_shard_id - id of the shard that is undergoing a split
* belonging to sourceShardId(parent)
* minValue - lower bound(inclusive) of hash value which childShard stores
* *
* maxValue - upper bound(inclusive) of hash value which childShard stores * child_shard_id - id of shard that stores a specific range of values
* belonging to sourceShardId(parent)
* *
* NodeId - Node where the childShardId is located * shard_min_value - lower bound(inclusive) of hash value which childShard stores.
* *
* The function parses the data and builds routing map per destination node id. * shard_max_value - upper bound(inclusive) of hash value which childShard stores
* Multiple shards can be placed on the same destiation node. Source and
* destinations nodes can be same too.
* *
* Usage Semantics: * node_id - Node where the childShardId is located
* This UDF returns a shared memory handle where the information is stored. This shared memory
* handle is used by caller to encode replication slot name as "citus_split_nodeId_sharedMemoryHandle_tableOnwerId"
* for every distinct table owner. The same encoded slot name is stored in one of the fields of the
* in-memory data structure(ShardSplitInfo).
* *
* There is a 1-1 mapping between a table owner id and a replication slot. One replication * The function parses the data and builds routing map with key for each distinct
* <nodeId, tableOwner> pair. Multiple shards can be placed on the same destination node.
* Source and destination nodes can be same too.
*
* There is a 1-1 mapping between a table owner and a replication slot. One replication
* slot takes care of replicating changes for all shards belonging to the same owner on a particular node. * slot takes care of replicating changes for all shards belonging to the same owner on a particular node.
* *
* During the replication phase, 'decoding_plugin_for_shard_split' called for a change on a particular * During the replication phase, 'decoding_plugin_for_shard_split' will attach to the shared memory
* replication slot, will decode the shared memory handle from its slot name and will attach to the * populated by current UDF. It routes the tuple from the source shard to the appropriate destination
* shared memory. The plugin consumes the information from shared memory. It routes the tuple * shard for which the respective slot is responsible.
* from the source shard to the appropriate destination shard for which the respective slot is
* responsible.
*/ */
Datum Datum
worker_split_shard_replication_setup(PG_FUNCTION_ARGS) worker_split_shard_replication_setup(PG_FUNCTION_ARGS)
{ {
if (PG_ARGISNULL(0)) if (PG_ARGISNULL(0))
{ {
ereport(ERROR, (errmsg("targets can't be null"))); ereport(ERROR, (errmsg("split_shard_info array cannot be NULL")));
} }
ArrayType *shardInfoArrayObject = PG_GETARG_ARRAYTYPE_P(0); ArrayType *shardInfoArrayObject = PG_GETARG_ARRAYTYPE_P(0);

View File

@ -18,7 +18,7 @@
#include "storage/ipc.h" #include "storage/ipc.h"
#include "utils/memutils.h" #include "utils/memutils.h"
const char *sharedMemoryNameForHandleManagement = const char *SharedMemoryNameForHandleManagement =
"SHARED_MEMORY_FOR_SPLIT_SHARD_HANDLE_MANAGEMENT"; "SHARED_MEMORY_FOR_SPLIT_SHARD_HANDLE_MANAGEMENT";
static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
@ -67,6 +67,13 @@ GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle)
ShardSplitInfoSMHeader *header = (ShardSplitInfoSMHeader *) dsm_segment_address( ShardSplitInfoSMHeader *header = (ShardSplitInfoSMHeader *) dsm_segment_address(
dsmSegment); dsmSegment);
if (header == NULL)
{
ereport(ERROR,
(errmsg("Could not get shared memory segment header "
"corresponding to handle for split workflow:%u", dsmHandle)));
}
return header; return header;
} }
@ -110,13 +117,12 @@ AllocateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, Size shardSplitIn
if (shardSplitInfoCount <= 0 || shardSplitInfoSize <= 0) if (shardSplitInfoCount <= 0 || shardSplitInfoSize <= 0)
{ {
ereport(ERROR, ereport(ERROR,
(errmsg("count and size of each step should be " (errmsg("shardSplitInfoCount and size of each step should be "
"positive values"))); "positive values")));
} }
Size totalSize = offsetof(ShardSplitInfoSMHeader, splitInfoArray) + Size totalSize = offsetof(ShardSplitInfoSMHeader, splitInfoArray) +
shardSplitInfoCount * (shardSplitInfoCount * shardSplitInfoSize);
shardSplitInfoSize;
dsm_segment *dsmSegment = dsm_create(totalSize, DSM_CREATE_NULL_IF_MAXSEGMENTS); dsm_segment *dsmSegment = dsm_create(totalSize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
if (dsmSegment == NULL) if (dsmSegment == NULL)
@ -204,7 +210,7 @@ static void
ShardSplitShmemInit(void) ShardSplitShmemInit(void)
{ {
bool alreadyInitialized = false; bool alreadyInitialized = false;
ShardSplitShmemData *smData = ShmemInitStruct(sharedMemoryNameForHandleManagement, ShardSplitShmemData *smData = ShmemInitStruct(SharedMemoryNameForHandleManagement,
sizeof(ShardSplitShmemData), sizeof(ShardSplitShmemData),
&alreadyInitialized); &alreadyInitialized);
@ -238,12 +244,14 @@ ShardSplitShmemInit(void)
/* /*
* StoreSharedMemoryHandle stores a handle of shared memory * StoreSharedMemoryHandle stores a handle of shared memory
* allocated and populated by 'worker_split_shard_replication_setup' UDF. * allocated and populated by 'worker_split_shard_replication_setup' UDF.
* This handle is stored in a different shared memory segment with name
* 'SHARED_MEMORY_FOR_SPLIT_SHARD_HANDLE_MANAGEMENT'.
*/ */
void void
StoreSharedMemoryHandle(dsm_handle dsmHandle) StoreSharedMemoryHandle(dsm_handle dsmHandle)
{ {
bool found = false; bool found = false;
ShardSplitShmemData *smData = ShmemInitStruct(sharedMemoryNameForHandleManagement, ShardSplitShmemData *smData = ShmemInitStruct(SharedMemoryNameForHandleManagement,
sizeof(ShardSplitShmemData), sizeof(ShardSplitShmemData),
&found); &found);
if (!found) if (!found)
@ -253,6 +261,10 @@ StoreSharedMemoryHandle(dsm_handle dsmHandle)
"Shared memory for handle management should have been initialized during boot")); "Shared memory for handle management should have been initialized during boot"));
} }
/*
* We only support non concurrent split. However, it is fine to take a
* lock and store the handle incase concurrent splits are introduced in future.
*/
LWLockAcquire(&smData->lock, LW_EXCLUSIVE); LWLockAcquire(&smData->lock, LW_EXCLUSIVE);
/* /*
@ -285,7 +297,7 @@ dsm_handle
GetSharedMemoryHandle(void) GetSharedMemoryHandle(void)
{ {
bool found = false; bool found = false;
ShardSplitShmemData *smData = ShmemInitStruct(sharedMemoryNameForHandleManagement, ShardSplitShmemData *smData = ShmemInitStruct(SharedMemoryNameForHandleManagement,
sizeof(ShardSplitShmemData), sizeof(ShardSplitShmemData),
&found); &found);
if (!found) if (!found)
@ -331,7 +343,7 @@ PopulateShardSplitInfoForReplicationSlot(char *slotName)
/* Found the starting index from where current slot information begins */ /* Found the starting index from where current slot information begins */
infoForReplicationSlot->startIndex = index; infoForReplicationSlot->startIndex = index;
/* Slide forward to get the end index */ /* Slide forward to get the ending index */
index++; index++;
while (index < smHeader->count && strcmp( while (index < smHeader->count && strcmp(
smHeader->splitInfoArray[index].slotName, slotName) == 0) smHeader->splitInfoArray[index].slotName, slotName) == 0)
@ -339,14 +351,15 @@ PopulateShardSplitInfoForReplicationSlot(char *slotName)
index++; index++;
} }
/* Found ending index */
infoForReplicationSlot->endIndex = index - 1; infoForReplicationSlot->endIndex = index - 1;
/* /*
* 'ShardSplitInfo' with same slot name are stored contiguously in shared memory segment. * 'ShardSplitInfo' with same slot name are stored contiguously in shared memory segment.
* After the current 'index' position, we should not encounter any 'ShardSplitInfo' with incoming slot name. * After the current 'index' position, we should not encounter any 'ShardSplitInfo' with incoming slot name.
* If this happens, there is shared memory corruption. Its worth to go ahead and assert for this assumption. * If this happens, there is shared memory corruption. Its worth to go ahead and assert for this assumption.
* TODO: Traverse further and assert
*/ */
break;
} }
index++; index++;

View File

@ -9,7 +9,6 @@ CREATE TYPE citus.split_shard_info AS (
shard_max_value text, shard_max_value text,
node_id integer); node_id integer);
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup(
splitShardInfo citus.split_shard_info[]) splitShardInfo citus.split_shard_info[])
RETURNS void RETURNS void

View File

@ -9,7 +9,6 @@ CREATE TYPE citus.split_shard_info AS (
shard_max_value text, shard_max_value text,
node_id integer); node_id integer);
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup(
splitShardInfo citus.split_shard_info[]) splitShardInfo citus.split_shard_info[])
RETURNS void RETURNS void