From 4fb2b80f3a8303fbf8af2999aee4c1e2f458e6b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emel=20=C5=9Eim=C5=9Fek?= Date: Thu, 18 May 2023 23:46:32 +0300 Subject: [PATCH] Backport --- .../distributed/operations/shard_rebalancer.c | 8 ++- .../distributed/utils/reference_table_utils.c | 2 +- src/test/regress/bin/normalize.sed | 2 +- .../regress/expected/background_rebalance.out | 55 +++++++++++++++++++ .../regress/expected/shard_rebalancer.out | 8 +-- src/test/regress/operations_schedule | 2 +- src/test/regress/sql/background_rebalance.sql | 21 +++++++ 7 files changed, 88 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index d24936925..27223a70b 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -1949,7 +1949,9 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo appendStringInfo(&buf, "SELECT pg_catalog.replicate_reference_tables(%s)", quote_literal_cstr(shardTranferModeLabel)); - BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, + + Oid superUserId = CitusExtensionOwner(); + BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, buf.data, prevJobIdx, prevJobId); prevJobId[prevJobIdx] = task->taskid; prevJobIdx++; @@ -2034,7 +2036,7 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, if (updateType == PLACEMENT_UPDATE_MOVE) { appendStringInfo(placementUpdateCommand, - "SELECT citus_move_shard_placement(%ld,%u,%u,%s)", + "SELECT pg_catalog.citus_move_shard_placement(%ld,%u,%u,%s)", shardId, sourceNode->nodeId, targetNode->nodeId, @@ -2043,7 +2045,7 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, else if (updateType == PLACEMENT_UPDATE_COPY) { appendStringInfo(placementUpdateCommand, - "SELECT citus_copy_shard_placement(%ld,%u,%u,%s)", + "SELECT pg_catalog.citus_copy_shard_placement(%ld,%u,%u,%s)", shardId, sourceNode->nodeId, targetNode->nodeId, diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 0b085c67a..f7af1823d 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -420,7 +420,7 @@ CopyShardPlacementToWorkerNodeQuery(ShardPlacement *sourceShardPlacement, "auto"; appendStringInfo(queryString, - "SELECT citus_copy_shard_placement(" + "SELECT pg_catalog.citus_copy_shard_placement(" UINT64_FORMAT ", %d, %d, " "transfer_mode := %s)", sourceShardPlacement->shardId, diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index df343a077..a4f6acbd0 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -253,7 +253,7 @@ s/pg_cancel_backend\('[0-9]+'::bigint\)/pg_cancel_backend('xxxxx'::bigint)/g s/issuing SELECT pg_cancel_backend\([0-9]+::integer\)/issuing SELECT pg_cancel_backend(xxxxx::integer)/g # shard_rebalancer output for flaky nodeIds -s/issuing SELECT citus_copy_shard_placement\(43[0-9]+,[0-9]+,[0-9]+,'block_writes'\)/issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes')/g +s/issuing SELECT pg_catalog.citus_copy_shard_placement\(43[0-9]+,[0-9]+,[0-9]+,'block_writes'\)/issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes')/g # node id in run_command_on_all_nodes warning s/Error on node with node id [0-9]+/Error on node with node id xxxxx/g diff --git a/src/test/regress/expected/background_rebalance.out b/src/test/regress/expected/background_rebalance.out index c82078d6f..c10b43f78 100644 --- a/src/test/regress/expected/background_rebalance.out +++ b/src/test/regress/expected/background_rebalance.out @@ -304,6 +304,61 @@ SELECT public.wait_until_metadata_sync(30000); (1 row) +-- make sure a non-super user can rebalance when there are reference tables to replicate +CREATE TABLE ref_table(a int primary key); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- add a new node to trigger replicate_reference_tables task +SELECT 1 FROM citus_add_node('localhost', :worker_3_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SET ROLE non_super_user_rebalance; +SELECT 1 FROM citus_rebalance_start(shard_transfer_mode := 'force_logical'); +NOTICE: Scheduled 1 moves as job xxx +DETAIL: Rebalance scheduled as background job +HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status(); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- wait for success +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT state, details from citus_rebalance_status(); + state | details +--------------------------------------------------------------------- + finished | {"tasks": [], "task_state_counts": {"done": 2}} +(1 row) + +RESET ROLE; +-- reset the the number of nodes by removing the previously added node +SELECT 1 FROM citus_drain_node('localhost', :worker_3_port); +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +CALL citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 1 orphaned resources +SELECT 1 FROM citus_remove_node('localhost', :worker_3_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + SET client_min_messages TO WARNING; DROP SCHEMA background_rebalance CASCADE; DROP USER non_super_user_rebalance; diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 9eec2cee3..8b3b99c7a 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -221,7 +221,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') +NOTICE: issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -244,7 +244,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') +NOTICE: issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -267,7 +267,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') +NOTICE: issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -290,7 +290,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') +NOTICE: issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx diff --git a/src/test/regress/operations_schedule b/src/test/regress/operations_schedule index 15afd9e18..7e8a5b445 100644 --- a/src/test/regress/operations_schedule +++ b/src/test/regress/operations_schedule @@ -1,10 +1,10 @@ test: multi_test_helpers multi_test_helpers_superuser test: multi_cluster_management test: multi_test_catalog_views +test: worker_copy_table_to_node test: shard_rebalancer_unit test: shard_rebalancer test: background_rebalance -test: worker_copy_table_to_node test: foreign_key_to_reference_shard_rebalance test: multi_move_mx test: shard_move_deferred_delete diff --git a/src/test/regress/sql/background_rebalance.sql b/src/test/regress/sql/background_rebalance.sql index 4d105655b..eb38deee9 100644 --- a/src/test/regress/sql/background_rebalance.sql +++ b/src/test/regress/sql/background_rebalance.sql @@ -108,6 +108,27 @@ SELECT state, details from citus_rebalance_status(); SELECT 1 FROM citus_remove_node('localhost', :master_port); SELECT public.wait_until_metadata_sync(30000); +-- make sure a non-super user can rebalance when there are reference tables to replicate +CREATE TABLE ref_table(a int primary key); +SELECT create_reference_table('ref_table'); + +-- add a new node to trigger replicate_reference_tables task +SELECT 1 FROM citus_add_node('localhost', :worker_3_port); + +SET ROLE non_super_user_rebalance; +SELECT 1 FROM citus_rebalance_start(shard_transfer_mode := 'force_logical'); + +-- wait for success +SELECT citus_rebalance_wait(); +SELECT state, details from citus_rebalance_status(); + +RESET ROLE; + +-- reset the the number of nodes by removing the previously added node +SELECT 1 FROM citus_drain_node('localhost', :worker_3_port); +CALL citus_cleanup_orphaned_resources(); +SELECT 1 FROM citus_remove_node('localhost', :worker_3_port); + SET client_min_messages TO WARNING; DROP SCHEMA background_rebalance CASCADE; DROP USER non_super_user_rebalance;