diff --git a/src/backend/distributed/operations/split_shard_replication_setup.c b/src/backend/distributed/operations/split_shard_replication_setup.c index 1613b4de6..f2841ce95 100644 --- a/src/backend/distributed/operations/split_shard_replication_setup.c +++ b/src/backend/distributed/operations/split_shard_replication_setup.c @@ -62,49 +62,43 @@ static int NodeShardMappingHashCompare(const void *left, const void *right, Size /* * worker_split_shard_replication_setup UDF creates in-memory data structures * to store the meta information about the shard undergoing split and new split - * children along with their placements required during the catch up phase - * of logical replication. + * children along with their placements. This info is required during the catch up + * phase of logical replication. * This meta information is stored in a shared memory segment and accessed * by logical decoding plugin. * - * Split information is given by user as an Array of source shards undergoing splits - * in the below format. - * Array[Array[sourceShardId, childShardId, minValue, maxValue, Destination NodeId]] + * Split information is given by user as an Array of custom data type 'citus.split_shard_info'. + * (worker_split_shard_replication_setup(citus.split_shard_info[])) * - * sourceShardId - id of the shard that is undergoing a split - * childShardId - id of shard that stores a specific range of values - * belonging to sourceShardId(parent) - * minValue - lower bound(inclusive) of hash value which childShard stores + * Fields of custom data type 'citus.split_shard_info': + * source_shard_id - id of the shard that is undergoing a split * - * maxValue - upper bound(inclusive) of hash value which childShard stores + * child_shard_id - id of shard that stores a specific range of values + * belonging to sourceShardId(parent) * - * NodeId - Node where the childShardId is located + * shard_min_value - lower bound(inclusive) of hash value which childShard stores. * - * The function parses the data and builds routing map per destination node id. - * Multiple shards can be placed on the same destiation node. Source and - * destinations nodes can be same too. + * shard_max_value - upper bound(inclusive) of hash value which childShard stores * - * Usage Semantics: - * This UDF returns a shared memory handle where the information is stored. This shared memory - * handle is used by caller to encode replication slot name as "citus_split_nodeId_sharedMemoryHandle_tableOnwerId" - * for every distinct table owner. The same encoded slot name is stored in one of the fields of the - * in-memory data structure(ShardSplitInfo). + * node_id - Node where the childShardId is located * - * There is a 1-1 mapping between a table owner id and a replication slot. One replication + * The function parses the data and builds routing map with key for each distinct + * pair. Multiple shards can be placed on the same destination node. + * Source and destination nodes can be same too. + * + * There is a 1-1 mapping between a table owner and a replication slot. One replication * slot takes care of replicating changes for all shards belonging to the same owner on a particular node. * - * During the replication phase, 'decoding_plugin_for_shard_split' called for a change on a particular - * replication slot, will decode the shared memory handle from its slot name and will attach to the - * shared memory. The plugin consumes the information from shared memory. It routes the tuple - * from the source shard to the appropriate destination shard for which the respective slot is - * responsible. + * During the replication phase, 'decoding_plugin_for_shard_split' will attach to the shared memory + * populated by current UDF. It routes the tuple from the source shard to the appropriate destination + * shard for which the respective slot is responsible. */ Datum worker_split_shard_replication_setup(PG_FUNCTION_ARGS) { if (PG_ARGISNULL(0)) { - ereport(ERROR, (errmsg("targets can't be null"))); + ereport(ERROR, (errmsg("split_shard_info array cannot be NULL"))); } ArrayType *shardInfoArrayObject = PG_GETARG_ARRAYTYPE_P(0); diff --git a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c index 87168781e..58cca6cd1 100644 --- a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c +++ b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c @@ -18,7 +18,7 @@ #include "storage/ipc.h" #include "utils/memutils.h" -const char *sharedMemoryNameForHandleManagement = +const char *SharedMemoryNameForHandleManagement = "SHARED_MEMORY_FOR_SPLIT_SHARD_HANDLE_MANAGEMENT"; static shmem_startup_hook_type prev_shmem_startup_hook = NULL; @@ -67,6 +67,13 @@ GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle) ShardSplitInfoSMHeader *header = (ShardSplitInfoSMHeader *) dsm_segment_address( dsmSegment); + if (header == NULL) + { + ereport(ERROR, + (errmsg("Could not get shared memory segment header " + "corresponding to handle for split workflow:%u", dsmHandle))); + } + return header; } @@ -110,13 +117,12 @@ AllocateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, Size shardSplitIn if (shardSplitInfoCount <= 0 || shardSplitInfoSize <= 0) { ereport(ERROR, - (errmsg("count and size of each step should be " + (errmsg("shardSplitInfoCount and size of each step should be " "positive values"))); } Size totalSize = offsetof(ShardSplitInfoSMHeader, splitInfoArray) + - shardSplitInfoCount * - shardSplitInfoSize; + (shardSplitInfoCount * shardSplitInfoSize); dsm_segment *dsmSegment = dsm_create(totalSize, DSM_CREATE_NULL_IF_MAXSEGMENTS); if (dsmSegment == NULL) @@ -204,7 +210,7 @@ static void ShardSplitShmemInit(void) { bool alreadyInitialized = false; - ShardSplitShmemData *smData = ShmemInitStruct(sharedMemoryNameForHandleManagement, + ShardSplitShmemData *smData = ShmemInitStruct(SharedMemoryNameForHandleManagement, sizeof(ShardSplitShmemData), &alreadyInitialized); @@ -238,12 +244,14 @@ ShardSplitShmemInit(void) /* * StoreSharedMemoryHandle stores a handle of shared memory * allocated and populated by 'worker_split_shard_replication_setup' UDF. + * This handle is stored in a different shared memory segment with name + * 'SHARED_MEMORY_FOR_SPLIT_SHARD_HANDLE_MANAGEMENT'. */ void StoreSharedMemoryHandle(dsm_handle dsmHandle) { bool found = false; - ShardSplitShmemData *smData = ShmemInitStruct(sharedMemoryNameForHandleManagement, + ShardSplitShmemData *smData = ShmemInitStruct(SharedMemoryNameForHandleManagement, sizeof(ShardSplitShmemData), &found); if (!found) @@ -253,6 +261,10 @@ StoreSharedMemoryHandle(dsm_handle dsmHandle) "Shared memory for handle management should have been initialized during boot")); } + /* + * We only support non concurrent split. However, it is fine to take a + * lock and store the handle incase concurrent splits are introduced in future. + */ LWLockAcquire(&smData->lock, LW_EXCLUSIVE); /* @@ -285,7 +297,7 @@ dsm_handle GetSharedMemoryHandle(void) { bool found = false; - ShardSplitShmemData *smData = ShmemInitStruct(sharedMemoryNameForHandleManagement, + ShardSplitShmemData *smData = ShmemInitStruct(SharedMemoryNameForHandleManagement, sizeof(ShardSplitShmemData), &found); if (!found) @@ -331,7 +343,7 @@ PopulateShardSplitInfoForReplicationSlot(char *slotName) /* Found the starting index from where current slot information begins */ infoForReplicationSlot->startIndex = index; - /* Slide forward to get the end index */ + /* Slide forward to get the ending index */ index++; while (index < smHeader->count && strcmp( smHeader->splitInfoArray[index].slotName, slotName) == 0) @@ -339,14 +351,15 @@ PopulateShardSplitInfoForReplicationSlot(char *slotName) index++; } + /* Found ending index */ infoForReplicationSlot->endIndex = index - 1; /* * 'ShardSplitInfo' with same slot name are stored contiguously in shared memory segment. * After the current 'index' position, we should not encounter any 'ShardSplitInfo' with incoming slot name. * If this happens, there is shared memory corruption. Its worth to go ahead and assert for this assumption. - * TODO: Traverse further and assert */ + break; } index++; diff --git a/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql b/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql index 94f625308..4a3b4a223 100644 --- a/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql +++ b/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql @@ -9,7 +9,6 @@ CREATE TYPE citus.split_shard_info AS ( shard_max_value text, node_id integer); - CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( splitShardInfo citus.split_shard_info[]) RETURNS void diff --git a/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql b/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql index 94f625308..4a3b4a223 100644 --- a/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql +++ b/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql @@ -9,7 +9,6 @@ CREATE TYPE citus.split_shard_info AS ( shard_max_value text, node_id integer); - CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( splitShardInfo citus.split_shard_info[]) RETURNS void