Add worker_split_shard_release_dsm udf to release dynamic shared memory (#6248)

The code introduces worker_split_shard_release_dsm udf to release the dynamic shared memory segment allocated during non-blocking split workflow.
pull/6261/head
Sameer Awasekar 2022-08-26 18:27:32 +05:30 committed by GitHub
parent 77dd49fcf8
commit 4df8eca77f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 169 additions and 7 deletions

View File

@ -133,6 +133,7 @@ static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
List *sourceColocatedShardIntervalList, List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList); List *destinationWorkerNodesList);
static void ExecuteSplitShardReleaseSharedMemory(WorkerNode *sourceWorkerNode);
static void AddDummyShardEntryInMap(HTAB *mapOfDummyShards, uint32 targetNodeId, static void AddDummyShardEntryInMap(HTAB *mapOfDummyShards, uint32 targetNodeId,
ShardInterval *shardInterval); ShardInterval *shardInterval);
static void DropDummyShards(HTAB *mapOfDummyShardToPlacement); static void DropDummyShards(HTAB *mapOfDummyShardToPlacement);
@ -1473,13 +1474,19 @@ NonBlockingShardSplit(SplitOperation splitOperation,
*/ */
DropDummyShards(mapOfDummyShardToPlacement); DropDummyShards(mapOfDummyShardToPlacement);
/* 24) Close source connection */ /*
* 24) Release shared memory allocated by worker_split_shard_replication_setup udf
* at source node.
*/
ExecuteSplitShardReleaseSharedMemory(sourceShardToCopyNode);
/* 25) Close source connection */
CloseConnection(sourceConnection); CloseConnection(sourceConnection);
/* 25) Close all subscriber connections */ /* 26) Close all subscriber connections */
CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash); CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash);
/* 26) Close connection of template replication slot */ /* 27) Close connection of template replication slot */
CloseConnection(sourceReplicationConnection); CloseConnection(sourceReplicationConnection);
} }
PG_CATCH(); PG_CATCH();
@ -1494,6 +1501,8 @@ NonBlockingShardSplit(SplitOperation splitOperation,
DropDummyShards(mapOfDummyShardToPlacement); DropDummyShards(mapOfDummyShardToPlacement);
ExecuteSplitShardReleaseSharedMemory(sourceShardToCopyNode);
PG_RE_THROW(); PG_RE_THROW();
} }
PG_END_TRY(); PG_END_TRY();
@ -1698,6 +1707,33 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
} }
/*
* ExecuteSplitShardReleaseSharedMemory releases dynamic shared memory
* at source node.
* As a part of non-blocking split workflow, worker_split_shard_replication_setup allocates
* shared memory to store split information. This has to be released after split completes(or fails).
*/
static void
ExecuteSplitShardReleaseSharedMemory(WorkerNode *sourceWorkerNode)
{
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(
connectionFlag,
sourceWorkerNode->workerName,
sourceWorkerNode->workerPort,
superUser,
databaseName);
StringInfo splitShardReleaseMemoryUDF = makeStringInfo();
appendStringInfo(splitShardReleaseMemoryUDF,
"SELECT pg_catalog.worker_split_shard_release_dsm();");
ExecuteCriticalRemoteCommand(sourceConnection, splitShardReleaseMemoryUDF->data);
}
/* /*
* CreateSplitShardReplicationSetupUDF creates and returns * CreateSplitShardReplicationSetupUDF creates and returns
* parameterized 'worker_split_shard_replication_setup' UDF command. * parameterized 'worker_split_shard_replication_setup' UDF command.

View File

@ -0,0 +1,24 @@
/*-------------------------------------------------------------------------
*
* worker_split_shard_release_dsm.c
* This file contains functions to release dynamic shared memory segment
* allocated during split workflow.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shardsplit_shared_memory.h"
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(worker_split_shard_release_dsm);
Datum
worker_split_shard_release_dsm(PG_FUNCTION_ARGS)
{
ReleaseSharedMemoryOfShardSplitInfo();
PG_RETURN_VOID();
}

View File

@ -1,6 +1,6 @@
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* worker_split_shard_replication_setup.c * worker_split_shard_replication_setup_udf.c
* This file contains functions to setup information about list of shards * This file contains functions to setup information about list of shards
* that are being split. * that are being split.
* *

View File

@ -173,6 +173,11 @@ ReleaseSharedMemoryOfShardSplitInfo()
/* Get handle of dynamic shared memory segment*/ /* Get handle of dynamic shared memory segment*/
dsm_handle dsmHandle = GetShardSplitSharedMemoryHandle(); dsm_handle dsmHandle = GetShardSplitSharedMemoryHandle();
if (dsmHandle == DSM_HANDLE_INVALID)
{
return;
}
/* /*
* Unpin the dynamic shared memory segment. 'dsm_pin_segment' was * Unpin the dynamic shared memory segment. 'dsm_pin_segment' was
* called previously by 'AllocateSharedMemoryForShardSplitInfo'. * called previously by 'AllocateSharedMemoryForShardSplitInfo'.
@ -266,8 +271,10 @@ StoreShardSplitSharedMemoryHandle(dsm_handle dsmHandle)
* before the current function is called. * before the current function is called.
* If this handle is still valid, it means cleanup of previous split shard * If this handle is still valid, it means cleanup of previous split shard
* workflow failed. Log a waring and continue the current shard split operation. * workflow failed. Log a waring and continue the current shard split operation.
* Skip warning if new handle to be stored is invalid. We store invalid handle
* when shared memory is released by calling worker_split_shard_release_dsm.
*/ */
if (smData->dsmHandle != DSM_HANDLE_INVALID) if (smData->dsmHandle != DSM_HANDLE_INVALID && dsmHandle != DSM_HANDLE_INVALID)
{ {
ereport(WARNING, ereport(WARNING,
errmsg( errmsg(

View File

@ -73,6 +73,7 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
#include "udfs/worker_split_shard_replication_setup/11.1-1.sql" #include "udfs/worker_split_shard_replication_setup/11.1-1.sql"
#include "udfs/citus_isolation_test_session_is_blocked/11.1-1.sql" #include "udfs/citus_isolation_test_session_is_blocked/11.1-1.sql"
#include "udfs/replicate_reference_tables/11.1-1.sql" #include "udfs/replicate_reference_tables/11.1-1.sql"
#include "udfs/worker_split_shard_release_dsm/11.1-1.sql"
DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text); DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text);
#include "udfs/isolate_tenant_to_new_shard/11.1-1.sql" #include "udfs/isolate_tenant_to_new_shard/11.1-1.sql"

View File

@ -82,6 +82,8 @@ DROP FUNCTION pg_catalog.worker_split_shard_replication_setup(
DROP TYPE pg_catalog.split_shard_info; DROP TYPE pg_catalog.split_shard_info;
DROP TYPE pg_catalog.replication_slot_info; DROP TYPE pg_catalog.replication_slot_info;
DROP FUNCTION pg_catalog.worker_split_shard_release_dsm();
DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4,
OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz,
OUT global_pid int8); OUT global_pid int8);

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_release_dsm()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_shard_release_dsm$$;
COMMENT ON FUNCTION pg_catalog.worker_split_shard_release_dsm()
IS 'Releases shared memory segment allocated by non-blocking split workflow';
REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_release_dsm() FROM PUBLIC;

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_release_dsm()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_shard_release_dsm$$;
COMMENT ON FUNCTION pg_catalog.worker_split_shard_release_dsm()
IS 'Releases shared memory segment allocated by non-blocking split workflow';
REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_release_dsm() FROM PUBLIC;

View File

@ -1142,12 +1142,13 @@ SELECT * FROM multi_extension.print_extension_changes();
| function replicate_reference_tables(citus.shard_transfer_mode) void | function replicate_reference_tables(citus.shard_transfer_mode) void
| function worker_copy_table_to_node(regclass,integer) void | function worker_copy_table_to_node(regclass,integer) void
| function worker_split_copy(bigint,split_copy_info[]) void | function worker_split_copy(bigint,split_copy_info[]) void
| function worker_split_shard_release_dsm() void
| function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info | function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info
| type replication_slot_info | type replication_slot_info
| type split_copy_info | type split_copy_info
| type split_shard_info | type split_shard_info
| view citus_locks | view citus_locks
(34 rows) (35 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version -- show running version

View File

@ -0,0 +1,45 @@
-- Test Secneario
-- 1) Setup shared memory segment by calling worker_split_shard_replication_setup.
-- 2) Redo step 1 as the earlier memory. Redoing will trigger warning as earlier memory isn't released.
-- 3) Execute worker_split_shard_release_dsm to release the dynamic shared memory segment
-- 4) Redo step 1 and expect no warning as the earlier memory is cleanedup.
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
]);
count
---------------------------------------------------------------------
1
(1 row)
SET client_min_messages TO WARNING;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
]);
WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow.
count
---------------------------------------------------------------------
1
(1 row)
SELECT pg_catalog.worker_split_shard_release_dsm();
worker_split_shard_release_dsm
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
]);
count
---------------------------------------------------------------------
1
(1 row)

View File

@ -229,6 +229,7 @@ ORDER BY 1;
function worker_record_sequence_dependency(regclass,regclass,name) function worker_record_sequence_dependency(regclass,regclass,name)
function worker_save_query_explain_analyze(text,jsonb) function worker_save_query_explain_analyze(text,jsonb)
function worker_split_copy(bigint,split_copy_info[]) function worker_split_copy(bigint,split_copy_info[])
function worker_split_shard_release_dsm()
function worker_split_shard_replication_setup(split_shard_info[]) function worker_split_shard_replication_setup(split_shard_info[])
schema citus schema citus
schema citus_internal schema citus_internal
@ -267,5 +268,5 @@ ORDER BY 1;
view citus_stat_statements view citus_stat_statements
view pg_dist_shard_placement view pg_dist_shard_placement
view time_partitions view time_partitions
(259 rows) (260 rows)

View File

@ -11,6 +11,7 @@ test: split_shard_replication_setup
test: split_shard_replication_setup_remote_local test: split_shard_replication_setup_remote_local
test: split_shard_replication_setup_local test: split_shard_replication_setup_local
test: split_shard_replication_colocated_setup test: split_shard_replication_colocated_setup
test: split_shard_release_dsm
test: worker_split_copy_test test: worker_split_copy_test
test: worker_split_binary_copy_test test: worker_split_binary_copy_test
test: worker_split_text_copy_test test: worker_split_text_copy_test

View File

@ -0,0 +1,28 @@
-- Test Secneario
-- 1) Setup shared memory segment by calling worker_split_shard_replication_setup.
-- 2) Redo step 1 as the earlier memory. Redoing will trigger warning as earlier memory isn't released.
-- 3) Execute worker_split_shard_release_dsm to release the dynamic shared memory segment
-- 4) Redo step 1 and expect no warning as the earlier memory is cleanedup.
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
]);
SET client_min_messages TO WARNING;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
]);
SELECT pg_catalog.worker_split_shard_release_dsm();
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
]);