Address review comments

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-06-03 15:08:50 +05:30
parent 672d198019
commit 5b82fd2ea3
4 changed files with 20 additions and 23 deletions

View File

@ -87,8 +87,8 @@ static int NodeShardMappingHashCompare(const void *left, const void *right, Size
* *
* Usage Semantics: * Usage Semantics:
* This UDF returns a shared memory handle where the information is stored. This shared memory * This UDF returns a shared memory handle where the information is stored. This shared memory
* handle is used by caller to encode replication slot name as "NodeId_SharedMemoryHandle_TableOnwerId" for every * handle is used by caller to encode replication slot name as "citus_split_nodeId_sharedMemoryHandle_tableOnwerId"
* distinct target node. The same encoded slot name is stored in one of the fields of the * for every distinct table owner. The same encoded slot name is stored in one of the fields of the
* in-memory data structure(ShardSplitInfo). * in-memory data structure(ShardSplitInfo).
* *
* There is a 1-1 mapping between a table owner id and a replication slot. One replication * There is a 1-1 mapping between a table owner id and a replication slot. One replication

View File

@ -180,7 +180,8 @@ ShardSplitInfoSMData(ShardSplitInfoSMHeader *shardSplitInfoSMHeader)
/* /*
* encode_replication_slot returns an encoded replication slot name * encode_replication_slot returns an encoded replication slot name
* in the following format. * in the following format.
* Slot Name = NodeId_SharedMemoryHandle_TableOwnerOid * Slot Name = citus_split_nodeId_sharedMemoryHandle_tableOwnerOid
* Max supported length of replication slot name is 64 bytes.
*/ */
char * char *
encode_replication_slot(uint32_t nodeId, encode_replication_slot(uint32_t nodeId,
@ -188,7 +189,7 @@ encode_replication_slot(uint32_t nodeId,
uint32_t tableOwnerId) uint32_t tableOwnerId)
{ {
StringInfo slotName = makeStringInfo(); StringInfo slotName = makeStringInfo();
appendStringInfo(slotName, "%u_%u_%u", nodeId, dsmHandle, tableOwnerId); appendStringInfo(slotName, "citus_split_%u_%u_%u", nodeId, dsmHandle, tableOwnerId);
return slotName->data; return slotName->data;
} }
@ -208,14 +209,14 @@ decode_replication_slot(char *slotName,
char *slotNameString = strtok_r(dupSlotName, "_", &strtokPosition); char *slotNameString = strtok_r(dupSlotName, "_", &strtokPosition);
while (slotNameString != NULL) while (slotNameString != NULL)
{ {
/* first part of the slot name is NodeId */ /* third part of the slot name is NodeId */
if (index == 0) if (index == 2)
{ {
*nodeId = strtoul(slotNameString, NULL, 10); *nodeId = strtoul(slotNameString, NULL, 10);
} }
/* second part of the name is memory handle */ /* fourth part of the name is memory handle */
else if (index == 1) else if (index == 3)
{ {
*dsmHandle = strtoul(slotNameString, NULL, 10); *dsmHandle = strtoul(slotNameString, NULL, 10);
} }
@ -227,10 +228,10 @@ decode_replication_slot(char *slotName,
} }
/* /*
* Replication slot name is encoded as NodeId_SharedMemoryHandle_TableOwnerOid. * Replication slot name is encoded as citus_split_nodeId_sharedMemoryHandle_tableOwnerOid.
* Hence the number of tokens would be strictly three considering "_" as delimiter. * Hence the number of tokens would be strictly five considering "_" as delimiter.
*/ */
if (index != 3) if (index != 5)
{ {
ereport(ERROR, ereport(ERROR,
(errmsg("Invalid Replication Slot name encoding: %s", slotName))); (errmsg("Invalid Replication Slot name encoding: %s", slotName)));

View File

@ -21,12 +21,12 @@ DECLARE
begin begin
SELECT * into sharedMemoryId from public.split_shard_replication_setup_helper(targetNode1, targetNode2); SELECT * into sharedMemoryId from public.split_shard_replication_setup_helper(targetNode1, targetNode2);
SELECT FORMAT('%s_%s_10', targetNode1, sharedMemoryId) into derivedSlotName; SELECT FORMAT('citus_split_%s_%s_10', targetNode1, sharedMemoryId) into derivedSlotName;
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
-- if new child shards are placed on different nodes, create one more replication slot -- if new child shards are placed on different nodes, create one more replication slot
if (targetNode1 != targetNode2) then if (targetNode1 != targetNode2) then
SELECT FORMAT('%s_%s_10', targetNode2, sharedMemoryId) into derivedSlotName; SELECT FORMAT('citus_split_%s_%s_10', targetNode2, sharedMemoryId) into derivedSlotName;
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
end if; end if;
@ -69,11 +69,11 @@ begin
SELECT * into sharedMemoryId from public.split_shard_replication_setup_for_colocated_shards(targetNode1, targetNode2); SELECT * into sharedMemoryId from public.split_shard_replication_setup_for_colocated_shards(targetNode1, targetNode2);
SELECT relowner into tableOwnerOne from pg_class where relname='table_first'; SELECT relowner into tableOwnerOne from pg_class where relname='table_first';
SELECT FORMAT('%s_%s_%s', targetNode1, sharedMemoryId, tableOwnerOne) into derivedSlotNameOne; SELECT FORMAT('citus_split_%s_%s_%s', targetNode1, sharedMemoryId, tableOwnerOne) into derivedSlotNameOne;
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotNameOne, 'decoding_plugin_for_shard_split'); SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotNameOne, 'decoding_plugin_for_shard_split');
SELECT relowner into tableOwnerTwo from pg_class where relname='table_second'; SELECT relowner into tableOwnerTwo from pg_class where relname='table_second';
SELECT FORMAT('%s_%s_%s', targetNode2, sharedMemoryId, tableOwnerTwo) into derivedSlotNameTwo; SELECT FORMAT('citus_split_%s_%s_%s', targetNode2, sharedMemoryId, tableOwnerTwo) into derivedSlotNameTwo;
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotNameTwo, 'decoding_plugin_for_shard_split'); SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotNameTwo, 'decoding_plugin_for_shard_split');
@ -104,7 +104,6 @@ DECLARE
begin begin
SELECT name into replicationSlotName from slotName_table where id = 1; SELECT name into replicationSlotName from slotName_table where id = 1;
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
RAISE NOTICE 'sameer %', replicationSlotName;
return replicationSlotName; return replicationSlotName;
end end
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
@ -117,7 +116,6 @@ DECLARE
begin begin
SELECT name into replicationSlotName from slotName_table where id = 2; SELECT name into replicationSlotName from slotName_table where id = 2;
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB2 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB2 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
RAISE NOTICE 'sameer %', replicationSlotName;
return replicationSlotName; return replicationSlotName;
end end
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;

View File

@ -23,12 +23,12 @@ DECLARE
begin begin
SELECT * into sharedMemoryId from public.split_shard_replication_setup_helper(targetNode1, targetNode2); SELECT * into sharedMemoryId from public.split_shard_replication_setup_helper(targetNode1, targetNode2);
SELECT FORMAT('%s_%s_10', targetNode1, sharedMemoryId) into derivedSlotName; SELECT FORMAT('citus_split_%s_%s_10', targetNode1, sharedMemoryId) into derivedSlotName;
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
-- if new child shards are placed on different nodes, create one more replication slot -- if new child shards are placed on different nodes, create one more replication slot
if (targetNode1 != targetNode2) then if (targetNode1 != targetNode2) then
SELECT FORMAT('%s_%s_10', targetNode2, sharedMemoryId) into derivedSlotName; SELECT FORMAT('citus_split_%s_%s_10', targetNode2, sharedMemoryId) into derivedSlotName;
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
end if; end if;
@ -73,11 +73,11 @@ begin
SELECT * into sharedMemoryId from public.split_shard_replication_setup_for_colocated_shards(targetNode1, targetNode2); SELECT * into sharedMemoryId from public.split_shard_replication_setup_for_colocated_shards(targetNode1, targetNode2);
SELECT relowner into tableOwnerOne from pg_class where relname='table_first'; SELECT relowner into tableOwnerOne from pg_class where relname='table_first';
SELECT FORMAT('%s_%s_%s', targetNode1, sharedMemoryId, tableOwnerOne) into derivedSlotNameOne; SELECT FORMAT('citus_split_%s_%s_%s', targetNode1, sharedMemoryId, tableOwnerOne) into derivedSlotNameOne;
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotNameOne, 'decoding_plugin_for_shard_split'); SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotNameOne, 'decoding_plugin_for_shard_split');
SELECT relowner into tableOwnerTwo from pg_class where relname='table_second'; SELECT relowner into tableOwnerTwo from pg_class where relname='table_second';
SELECT FORMAT('%s_%s_%s', targetNode2, sharedMemoryId, tableOwnerTwo) into derivedSlotNameTwo; SELECT FORMAT('citus_split_%s_%s_%s', targetNode2, sharedMemoryId, tableOwnerTwo) into derivedSlotNameTwo;
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotNameTwo, 'decoding_plugin_for_shard_split'); SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotNameTwo, 'decoding_plugin_for_shard_split');
@ -110,7 +110,6 @@ DECLARE
begin begin
SELECT name into replicationSlotName from slotName_table where id = 1; SELECT name into replicationSlotName from slotName_table where id = 1;
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
RAISE NOTICE 'sameer %', replicationSlotName;
return replicationSlotName; return replicationSlotName;
end end
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
@ -124,7 +123,6 @@ DECLARE
begin begin
SELECT name into replicationSlotName from slotName_table where id = 2; SELECT name into replicationSlotName from slotName_table where id = 2;
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB2 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB2 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
RAISE NOTICE 'sameer %', replicationSlotName;
return replicationSlotName; return replicationSlotName;
end end
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;