release-11.2-emel
Emel Şimşek 2023-05-18 23:46:32 +03:00 committed by EmelSimsek
parent 4886473f35
commit 4fb2b80f3a
7 changed files with 88 additions and 10 deletions

View File

@ -1949,7 +1949,9 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
appendStringInfo(&buf, appendStringInfo(&buf,
"SELECT pg_catalog.replicate_reference_tables(%s)", "SELECT pg_catalog.replicate_reference_tables(%s)",
quote_literal_cstr(shardTranferModeLabel)); quote_literal_cstr(shardTranferModeLabel));
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data,
Oid superUserId = CitusExtensionOwner();
BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, buf.data,
prevJobIdx, prevJobId); prevJobIdx, prevJobId);
prevJobId[prevJobIdx] = task->taskid; prevJobId[prevJobIdx] = task->taskid;
prevJobIdx++; prevJobIdx++;
@ -2034,7 +2036,7 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent,
if (updateType == PLACEMENT_UPDATE_MOVE) if (updateType == PLACEMENT_UPDATE_MOVE)
{ {
appendStringInfo(placementUpdateCommand, appendStringInfo(placementUpdateCommand,
"SELECT citus_move_shard_placement(%ld,%u,%u,%s)", "SELECT pg_catalog.citus_move_shard_placement(%ld,%u,%u,%s)",
shardId, shardId,
sourceNode->nodeId, sourceNode->nodeId,
targetNode->nodeId, targetNode->nodeId,
@ -2043,7 +2045,7 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent,
else if (updateType == PLACEMENT_UPDATE_COPY) else if (updateType == PLACEMENT_UPDATE_COPY)
{ {
appendStringInfo(placementUpdateCommand, appendStringInfo(placementUpdateCommand,
"SELECT citus_copy_shard_placement(%ld,%u,%u,%s)", "SELECT pg_catalog.citus_copy_shard_placement(%ld,%u,%u,%s)",
shardId, shardId,
sourceNode->nodeId, sourceNode->nodeId,
targetNode->nodeId, targetNode->nodeId,

View File

@ -420,7 +420,7 @@ CopyShardPlacementToWorkerNodeQuery(ShardPlacement *sourceShardPlacement,
"auto"; "auto";
appendStringInfo(queryString, appendStringInfo(queryString,
"SELECT citus_copy_shard_placement(" "SELECT pg_catalog.citus_copy_shard_placement("
UINT64_FORMAT ", %d, %d, " UINT64_FORMAT ", %d, %d, "
"transfer_mode := %s)", "transfer_mode := %s)",
sourceShardPlacement->shardId, sourceShardPlacement->shardId,

View File

@ -253,7 +253,7 @@ s/pg_cancel_backend\('[0-9]+'::bigint\)/pg_cancel_backend('xxxxx'::bigint)/g
s/issuing SELECT pg_cancel_backend\([0-9]+::integer\)/issuing SELECT pg_cancel_backend(xxxxx::integer)/g s/issuing SELECT pg_cancel_backend\([0-9]+::integer\)/issuing SELECT pg_cancel_backend(xxxxx::integer)/g
# shard_rebalancer output for flaky nodeIds # shard_rebalancer output for flaky nodeIds
s/issuing SELECT citus_copy_shard_placement\(43[0-9]+,[0-9]+,[0-9]+,'block_writes'\)/issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes')/g s/issuing SELECT pg_catalog.citus_copy_shard_placement\(43[0-9]+,[0-9]+,[0-9]+,'block_writes'\)/issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes')/g
# node id in run_command_on_all_nodes warning # node id in run_command_on_all_nodes warning
s/Error on node with node id [0-9]+/Error on node with node id xxxxx/g s/Error on node with node id [0-9]+/Error on node with node id xxxxx/g

View File

@ -304,6 +304,61 @@ SELECT public.wait_until_metadata_sync(30000);
(1 row) (1 row)
-- make sure a non-super user can rebalance when there are reference tables to replicate
CREATE TABLE ref_table(a int primary key);
SELECT create_reference_table('ref_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- add a new node to trigger replicate_reference_tables task
SELECT 1 FROM citus_add_node('localhost', :worker_3_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SET ROLE non_super_user_rebalance;
SELECT 1 FROM citus_rebalance_start(shard_transfer_mode := 'force_logical');
NOTICE: Scheduled 1 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status();
?column?
---------------------------------------------------------------------
1
(1 row)
-- wait for success
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
SELECT state, details from citus_rebalance_status();
state | details
---------------------------------------------------------------------
finished | {"tasks": [], "task_state_counts": {"done": 2}}
(1 row)
RESET ROLE;
-- reset the the number of nodes by removing the previously added node
SELECT 1 FROM citus_drain_node('localhost', :worker_3_port);
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
?column?
---------------------------------------------------------------------
1
(1 row)
CALL citus_cleanup_orphaned_resources();
NOTICE: cleaned up 1 orphaned resources
SELECT 1 FROM citus_remove_node('localhost', :worker_3_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA background_rebalance CASCADE; DROP SCHEMA background_rebalance CASCADE;
DROP USER non_super_user_rebalance; DROP USER non_super_user_rebalance;

View File

@ -221,7 +221,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') NOTICE: issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -244,7 +244,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') NOTICE: issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -267,7 +267,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') NOTICE: issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -290,7 +290,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') NOTICE: issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx

View File

@ -1,10 +1,10 @@
test: multi_test_helpers multi_test_helpers_superuser test: multi_test_helpers multi_test_helpers_superuser
test: multi_cluster_management test: multi_cluster_management
test: multi_test_catalog_views test: multi_test_catalog_views
test: worker_copy_table_to_node
test: shard_rebalancer_unit test: shard_rebalancer_unit
test: shard_rebalancer test: shard_rebalancer
test: background_rebalance test: background_rebalance
test: worker_copy_table_to_node
test: foreign_key_to_reference_shard_rebalance test: foreign_key_to_reference_shard_rebalance
test: multi_move_mx test: multi_move_mx
test: shard_move_deferred_delete test: shard_move_deferred_delete

View File

@ -108,6 +108,27 @@ SELECT state, details from citus_rebalance_status();
SELECT 1 FROM citus_remove_node('localhost', :master_port); SELECT 1 FROM citus_remove_node('localhost', :master_port);
SELECT public.wait_until_metadata_sync(30000); SELECT public.wait_until_metadata_sync(30000);
-- make sure a non-super user can rebalance when there are reference tables to replicate
CREATE TABLE ref_table(a int primary key);
SELECT create_reference_table('ref_table');
-- add a new node to trigger replicate_reference_tables task
SELECT 1 FROM citus_add_node('localhost', :worker_3_port);
SET ROLE non_super_user_rebalance;
SELECT 1 FROM citus_rebalance_start(shard_transfer_mode := 'force_logical');
-- wait for success
SELECT citus_rebalance_wait();
SELECT state, details from citus_rebalance_status();
RESET ROLE;
-- reset the the number of nodes by removing the previously added node
SELECT 1 FROM citus_drain_node('localhost', :worker_3_port);
CALL citus_cleanup_orphaned_resources();
SELECT 1 FROM citus_remove_node('localhost', :worker_3_port);
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA background_rebalance CASCADE; DROP SCHEMA background_rebalance CASCADE;
DROP USER non_super_user_rebalance; DROP USER non_super_user_rebalance;