From f9a5be59b9e73de042472d51bd2f7ba14c790a06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emel=20=C5=9Eim=C5=9Fek?= Date: Thu, 18 May 2023 23:46:32 +0300 Subject: [PATCH] Run replicate_reference_tables background task as superuser. (#6930) DESCRIPTION: Fixes a bug in background shard rebalancer where the replicate reference tables task fails if the current user is not a superuser. This change is to be backported to earlier releases. We should fix the permissions for replicate_reference_tables on main branch such that it can be run by non-superuser roles. Fixes #6925. Fixes #6926. --- .../distributed/operations/shard_rebalancer.c | 9 ++- .../distributed/utils/reference_table_utils.c | 2 +- src/test/regress/bin/normalize.sed | 2 +- src/test/regress/citus_tests/run_test.py | 8 +++ .../regress/expected/background_rebalance.out | 55 +++++++++++++++++++ .../regress/expected/shard_rebalancer.out | 8 +-- src/test/regress/operations_schedule | 2 +- src/test/regress/sql/background_rebalance.sql | 21 +++++++ 8 files changed, 97 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 082e0a8b5..0bb27934d 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -2174,7 +2174,10 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo quote_literal_cstr(shardTranferModeLabel)); int32 nodesInvolved[] = { 0 }; - BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, 0, + + /* replicate_reference_tables permissions require superuser */ + Oid superUserId = CitusExtensionOwner(); + BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, buf.data, 0, NULL, 0, nodesInvolved); replicateRefTablesTaskId = task->taskid; } @@ -2277,7 +2280,7 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, if (updateType == PLACEMENT_UPDATE_MOVE) { appendStringInfo(placementUpdateCommand, - "SELECT citus_move_shard_placement(%ld,%u,%u,%s)", + "SELECT pg_catalog.citus_move_shard_placement(%ld,%u,%u,%s)", shardId, sourceNode->nodeId, targetNode->nodeId, @@ -2286,7 +2289,7 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, else if (updateType == PLACEMENT_UPDATE_COPY) { appendStringInfo(placementUpdateCommand, - "SELECT citus_copy_shard_placement(%ld,%u,%u,%s)", + "SELECT pg_catalog.citus_copy_shard_placement(%ld,%u,%u,%s)", shardId, sourceNode->nodeId, targetNode->nodeId, diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 687ce02a7..314044ab5 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -420,7 +420,7 @@ CopyShardPlacementToWorkerNodeQuery(ShardPlacement *sourceShardPlacement, "auto"; appendStringInfo(queryString, - "SELECT citus_copy_shard_placement(" + "SELECT pg_catalog.citus_copy_shard_placement(" UINT64_FORMAT ", %d, %d, " "transfer_mode := %s)", sourceShardPlacement->shardId, diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 731ae659a..56e40ac51 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -257,7 +257,7 @@ s/pg_cancel_backend\('[0-9]+'::bigint\)/pg_cancel_backend('xxxxx'::bigint)/g s/issuing SELECT pg_cancel_backend\([0-9]+::integer\)/issuing SELECT pg_cancel_backend(xxxxx::integer)/g # shard_rebalancer output for flaky nodeIds -s/issuing SELECT citus_copy_shard_placement\(43[0-9]+,[0-9]+,[0-9]+,'block_writes'\)/issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes')/g +s/issuing SELECT pg_catalog.citus_copy_shard_placement\(43[0-9]+,[0-9]+,[0-9]+,'block_writes'\)/issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes')/g # node id in run_command_on_all_nodes warning s/Error on node with node id [0-9]+/Error on node with node id xxxxx/g diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 6ccdc40c9..3249c98a7 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -115,6 +115,14 @@ DEPS = { "multi_mx_function_table_reference", ], ), + "background_rebalance": TestDeps( + None, + [ + "multi_test_helpers", + "multi_cluster_management", + ], + worker_count=3, + ), "background_rebalance_parallel": TestDeps( None, [ diff --git a/src/test/regress/expected/background_rebalance.out b/src/test/regress/expected/background_rebalance.out index e4495ccf9..1b8dfdd08 100644 --- a/src/test/regress/expected/background_rebalance.out +++ b/src/test/regress/expected/background_rebalance.out @@ -310,6 +310,61 @@ SELECT public.wait_until_metadata_sync(30000); (1 row) +-- make sure a non-super user can rebalance when there are reference tables to replicate +CREATE TABLE ref_table(a int primary key); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- add a new node to trigger replicate_reference_tables task +SELECT 1 FROM citus_add_node('localhost', :worker_3_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SET ROLE non_super_user_rebalance; +SELECT 1 FROM citus_rebalance_start(shard_transfer_mode := 'force_logical'); +NOTICE: Scheduled 1 moves as job xxx +DETAIL: Rebalance scheduled as background job +HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status(); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- wait for success +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT state, details from citus_rebalance_status(); + state | details +--------------------------------------------------------------------- + finished | {"tasks": [], "task_state_counts": {"done": 2}} +(1 row) + +RESET ROLE; +-- reset the the number of nodes by removing the previously added node +SELECT 1 FROM citus_drain_node('localhost', :worker_3_port); +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +CALL citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 1 orphaned resources +SELECT 1 FROM citus_remove_node('localhost', :worker_3_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + SET client_min_messages TO WARNING; DROP SCHEMA background_rebalance CASCADE; DROP USER non_super_user_rebalance; diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 23f1f7373..0ae2193e5 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -222,7 +222,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') +NOTICE: issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -245,7 +245,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') +NOTICE: issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -268,7 +268,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') +NOTICE: issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -291,7 +291,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') +NOTICE: issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx diff --git a/src/test/regress/operations_schedule b/src/test/regress/operations_schedule index f5e77c835..6dbc303c2 100644 --- a/src/test/regress/operations_schedule +++ b/src/test/regress/operations_schedule @@ -1,10 +1,10 @@ test: multi_test_helpers multi_test_helpers_superuser test: multi_cluster_management test: multi_test_catalog_views +test: worker_copy_table_to_node test: shard_rebalancer_unit test: shard_rebalancer test: background_rebalance -test: worker_copy_table_to_node test: background_rebalance_parallel test: foreign_key_to_reference_shard_rebalance test: multi_move_mx diff --git a/src/test/regress/sql/background_rebalance.sql b/src/test/regress/sql/background_rebalance.sql index 59b296576..86a592d64 100644 --- a/src/test/regress/sql/background_rebalance.sql +++ b/src/test/regress/sql/background_rebalance.sql @@ -110,6 +110,27 @@ SELECT public.wait_for_resource_cleanup(); SELECT 1 FROM citus_remove_node('localhost', :master_port); SELECT public.wait_until_metadata_sync(30000); +-- make sure a non-super user can rebalance when there are reference tables to replicate +CREATE TABLE ref_table(a int primary key); +SELECT create_reference_table('ref_table'); + +-- add a new node to trigger replicate_reference_tables task +SELECT 1 FROM citus_add_node('localhost', :worker_3_port); + +SET ROLE non_super_user_rebalance; +SELECT 1 FROM citus_rebalance_start(shard_transfer_mode := 'force_logical'); + +-- wait for success +SELECT citus_rebalance_wait(); +SELECT state, details from citus_rebalance_status(); + +RESET ROLE; + +-- reset the the number of nodes by removing the previously added node +SELECT 1 FROM citus_drain_node('localhost', :worker_3_port); +CALL citus_cleanup_orphaned_resources(); +SELECT 1 FROM citus_remove_node('localhost', :worker_3_port); + SET client_min_messages TO WARNING; DROP SCHEMA background_rebalance CASCADE; DROP USER non_super_user_rebalance;