From 4df8eca77f87f342f08c5963d9b7c100f8053fa2 Mon Sep 17 00:00:00 2001 From: Sameer Awasekar <82865256+saawasek@users.noreply.github.com> Date: Fri, 26 Aug 2022 18:27:32 +0530 Subject: [PATCH] Add worker_split_shard_release_dsm udf to release dynamic shared memory (#6248) The code introduces worker_split_shard_release_dsm udf to release the dynamic shared memory segment allocated during non-blocking split workflow. --- .../distributed/operations/shard_split.c | 42 +++++++++++++++-- .../worker_split_shard_release_dsm_udf.c | 24 ++++++++++ ...worker_split_shard_replication_setup_udf.c | 2 +- .../shardsplit/shardsplit_shared_memory.c | 9 +++- .../distributed/sql/citus--11.0-4--11.1-1.sql | 1 + .../sql/downgrades/citus--11.1-1--11.0-4.sql | 2 + .../worker_split_shard_release_dsm/11.1-1.sql | 8 ++++ .../worker_split_shard_release_dsm/latest.sql | 8 ++++ src/test/regress/expected/multi_extension.out | 3 +- .../expected/split_shard_release_dsm.out | 45 +++++++++++++++++++ .../expected/upgrade_list_citus_objects.out | 3 +- src/test/regress/split_schedule | 1 + .../regress/sql/split_shard_release_dsm.sql | 28 ++++++++++++ 13 files changed, 169 insertions(+), 7 deletions(-) create mode 100644 src/backend/distributed/operations/worker_split_shard_release_dsm_udf.c create mode 100644 src/backend/distributed/sql/udfs/worker_split_shard_release_dsm/11.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/worker_split_shard_release_dsm/latest.sql create mode 100644 src/test/regress/expected/split_shard_release_dsm.out create mode 100644 src/test/regress/sql/split_shard_release_dsm.sql diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index a8fb01655..f5fdf62ca 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -133,6 +133,7 @@ static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList); +static void ExecuteSplitShardReleaseSharedMemory(WorkerNode *sourceWorkerNode); static void AddDummyShardEntryInMap(HTAB *mapOfDummyShards, uint32 targetNodeId, ShardInterval *shardInterval); static void DropDummyShards(HTAB *mapOfDummyShardToPlacement); @@ -1473,13 +1474,19 @@ NonBlockingShardSplit(SplitOperation splitOperation, */ DropDummyShards(mapOfDummyShardToPlacement); - /* 24) Close source connection */ + /* + * 24) Release shared memory allocated by worker_split_shard_replication_setup udf + * at source node. + */ + ExecuteSplitShardReleaseSharedMemory(sourceShardToCopyNode); + + /* 25) Close source connection */ CloseConnection(sourceConnection); - /* 25) Close all subscriber connections */ + /* 26) Close all subscriber connections */ CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash); - /* 26) Close connection of template replication slot */ + /* 27) Close connection of template replication slot */ CloseConnection(sourceReplicationConnection); } PG_CATCH(); @@ -1494,6 +1501,8 @@ NonBlockingShardSplit(SplitOperation splitOperation, DropDummyShards(mapOfDummyShardToPlacement); + ExecuteSplitShardReleaseSharedMemory(sourceShardToCopyNode); + PG_RE_THROW(); } PG_END_TRY(); @@ -1698,6 +1707,33 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, } +/* + * ExecuteSplitShardReleaseSharedMemory releases dynamic shared memory + * at source node. + * As a part of non-blocking split workflow, worker_split_shard_replication_setup allocates + * shared memory to store split information. This has to be released after split completes(or fails). + */ +static void +ExecuteSplitShardReleaseSharedMemory(WorkerNode *sourceWorkerNode) +{ + char *superUser = CitusExtensionOwnerName(); + char *databaseName = get_database_name(MyDatabaseId); + + int connectionFlag = FORCE_NEW_CONNECTION; + MultiConnection *sourceConnection = GetNodeUserDatabaseConnection( + connectionFlag, + sourceWorkerNode->workerName, + sourceWorkerNode->workerPort, + superUser, + databaseName); + + StringInfo splitShardReleaseMemoryUDF = makeStringInfo(); + appendStringInfo(splitShardReleaseMemoryUDF, + "SELECT pg_catalog.worker_split_shard_release_dsm();"); + ExecuteCriticalRemoteCommand(sourceConnection, splitShardReleaseMemoryUDF->data); +} + + /* * CreateSplitShardReplicationSetupUDF creates and returns * parameterized 'worker_split_shard_replication_setup' UDF command. diff --git a/src/backend/distributed/operations/worker_split_shard_release_dsm_udf.c b/src/backend/distributed/operations/worker_split_shard_release_dsm_udf.c new file mode 100644 index 000000000..94ce40cdb --- /dev/null +++ b/src/backend/distributed/operations/worker_split_shard_release_dsm_udf.c @@ -0,0 +1,24 @@ +/*------------------------------------------------------------------------- + * + * worker_split_shard_release_dsm.c + * This file contains functions to release dynamic shared memory segment + * allocated during split workflow. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "distributed/shardinterval_utils.h" +#include "distributed/shardsplit_shared_memory.h" + + +/* declarations for dynamic loading */ +PG_FUNCTION_INFO_V1(worker_split_shard_release_dsm); + +Datum +worker_split_shard_release_dsm(PG_FUNCTION_ARGS) +{ + ReleaseSharedMemoryOfShardSplitInfo(); + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c b/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c index 98d14c857..f4ffc6184 100644 --- a/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c +++ b/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c @@ -1,6 +1,6 @@ /*------------------------------------------------------------------------- * - * worker_split_shard_replication_setup.c + * worker_split_shard_replication_setup_udf.c * This file contains functions to setup information about list of shards * that are being split. * diff --git a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c index d9a2daad4..3e8745758 100644 --- a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c +++ b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c @@ -173,6 +173,11 @@ ReleaseSharedMemoryOfShardSplitInfo() /* Get handle of dynamic shared memory segment*/ dsm_handle dsmHandle = GetShardSplitSharedMemoryHandle(); + if (dsmHandle == DSM_HANDLE_INVALID) + { + return; + } + /* * Unpin the dynamic shared memory segment. 'dsm_pin_segment' was * called previously by 'AllocateSharedMemoryForShardSplitInfo'. @@ -266,8 +271,10 @@ StoreShardSplitSharedMemoryHandle(dsm_handle dsmHandle) * before the current function is called. * If this handle is still valid, it means cleanup of previous split shard * workflow failed. Log a waring and continue the current shard split operation. + * Skip warning if new handle to be stored is invalid. We store invalid handle + * when shared memory is released by calling worker_split_shard_release_dsm. */ - if (smData->dsmHandle != DSM_HANDLE_INVALID) + if (smData->dsmHandle != DSM_HANDLE_INVALID && dsmHandle != DSM_HANDLE_INVALID) { ereport(WARNING, errmsg( diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 4e58bdd51..98469b614 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -73,6 +73,7 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ #include "udfs/worker_split_shard_replication_setup/11.1-1.sql" #include "udfs/citus_isolation_test_session_is_blocked/11.1-1.sql" #include "udfs/replicate_reference_tables/11.1-1.sql" +#include "udfs/worker_split_shard_release_dsm/11.1-1.sql" DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text); #include "udfs/isolate_tenant_to_new_shard/11.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql index 677234c46..d37451583 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql @@ -82,6 +82,8 @@ DROP FUNCTION pg_catalog.worker_split_shard_replication_setup( DROP TYPE pg_catalog.split_shard_info; DROP TYPE pg_catalog.replication_slot_info; +DROP FUNCTION pg_catalog.worker_split_shard_release_dsm(); + DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8); diff --git a/src/backend/distributed/sql/udfs/worker_split_shard_release_dsm/11.1-1.sql b/src/backend/distributed/sql/udfs/worker_split_shard_release_dsm/11.1-1.sql new file mode 100644 index 000000000..5df6554a1 --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_split_shard_release_dsm/11.1-1.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_release_dsm() +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$worker_split_shard_release_dsm$$; +COMMENT ON FUNCTION pg_catalog.worker_split_shard_release_dsm() + IS 'Releases shared memory segment allocated by non-blocking split workflow'; + +REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_release_dsm() FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/worker_split_shard_release_dsm/latest.sql b/src/backend/distributed/sql/udfs/worker_split_shard_release_dsm/latest.sql new file mode 100644 index 000000000..5df6554a1 --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_split_shard_release_dsm/latest.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_release_dsm() +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$worker_split_shard_release_dsm$$; +COMMENT ON FUNCTION pg_catalog.worker_split_shard_release_dsm() + IS 'Releases shared memory segment allocated by non-blocking split workflow'; + +REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_release_dsm() FROM PUBLIC; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 62d1bee95..aea663960 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1142,12 +1142,13 @@ SELECT * FROM multi_extension.print_extension_changes(); | function replicate_reference_tables(citus.shard_transfer_mode) void | function worker_copy_table_to_node(regclass,integer) void | function worker_split_copy(bigint,split_copy_info[]) void + | function worker_split_shard_release_dsm() void | function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info | type replication_slot_info | type split_copy_info | type split_shard_info | view citus_locks -(34 rows) +(35 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/split_shard_release_dsm.out b/src/test/regress/expected/split_shard_release_dsm.out new file mode 100644 index 000000000..95cc210bb --- /dev/null +++ b/src/test/regress/expected/split_shard_release_dsm.out @@ -0,0 +1,45 @@ +-- Test Secneario +-- 1) Setup shared memory segment by calling worker_split_shard_replication_setup. +-- 2) Redo step 1 as the earlier memory. Redoing will trigger warning as earlier memory isn't released. +-- 3) Execute worker_split_shard_release_dsm to release the dynamic shared memory segment +-- 4) Redo step 1 and expect no warning as the earlier memory is cleanedup. +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info + ]); + count +--------------------------------------------------------------------- + 1 +(1 row) + +SET client_min_messages TO WARNING; +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info + ]); +WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT pg_catalog.worker_split_shard_release_dsm(); + worker_split_shard_release_dsm +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info + ]); + count +--------------------------------------------------------------------- + 1 +(1 row) + diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 303326b76..b51d6e15d 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -229,6 +229,7 @@ ORDER BY 1; function worker_record_sequence_dependency(regclass,regclass,name) function worker_save_query_explain_analyze(text,jsonb) function worker_split_copy(bigint,split_copy_info[]) + function worker_split_shard_release_dsm() function worker_split_shard_replication_setup(split_shard_info[]) schema citus schema citus_internal @@ -267,5 +268,5 @@ ORDER BY 1; view citus_stat_statements view pg_dist_shard_placement view time_partitions -(259 rows) +(260 rows) diff --git a/src/test/regress/split_schedule b/src/test/regress/split_schedule index eaa8eb799..62ba469bf 100644 --- a/src/test/regress/split_schedule +++ b/src/test/regress/split_schedule @@ -11,6 +11,7 @@ test: split_shard_replication_setup test: split_shard_replication_setup_remote_local test: split_shard_replication_setup_local test: split_shard_replication_colocated_setup +test: split_shard_release_dsm test: worker_split_copy_test test: worker_split_binary_copy_test test: worker_split_text_copy_test diff --git a/src/test/regress/sql/split_shard_release_dsm.sql b/src/test/regress/sql/split_shard_release_dsm.sql new file mode 100644 index 000000000..b8fe6bfb6 --- /dev/null +++ b/src/test/regress/sql/split_shard_release_dsm.sql @@ -0,0 +1,28 @@ +-- Test Secneario +-- 1) Setup shared memory segment by calling worker_split_shard_replication_setup. +-- 2) Redo step 1 as the earlier memory. Redoing will trigger warning as earlier memory isn't released. +-- 3) Execute worker_split_shard_release_dsm to release the dynamic shared memory segment +-- 4) Redo step 1 and expect no warning as the earlier memory is cleanedup. + +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info + ]); + +SET client_min_messages TO WARNING; +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info + ]); + +SELECT pg_catalog.worker_split_shard_release_dsm(); +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info + ]);