Introduce citus_move_shard_placement UDF with nodeid

pull/6612/head
Ahmet Gedemenli 2023-01-11 16:10:28 +03:00
parent e19c545fbf
commit e5fef40c06
10 changed files with 121 additions and 14 deletions

View File

@ -134,6 +134,7 @@ PG_FUNCTION_INFO_V1(citus_copy_shard_placement);
PG_FUNCTION_INFO_V1(citus_copy_shard_placement_with_nodeid);
PG_FUNCTION_INFO_V1(master_copy_shard_placement);
PG_FUNCTION_INFO_V1(citus_move_shard_placement);
PG_FUNCTION_INFO_V1(citus_move_shard_placement_with_nodeid);
PG_FUNCTION_INFO_V1(master_move_shard_placement);
double DesiredPercentFreeAfterMove = 10;
@ -263,7 +264,50 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
int32 targetNodePort = PG_GETARG_INT32(4);
Oid shardReplicationModeOid = PG_GETARG_OID(5);
citus_move_shard_placement_internal(shardId, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort,
shardReplicationModeOid);
PG_RETURN_VOID();
}
/*
* citus_move_shard_placement_with_nodeid does the same as citus_move_shard_placement,
* but accepts node ids as parameters, instead of hostname and port.
*/
Datum
citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
int64 shardId = PG_GETARG_INT64(0);
uint32 sourceNodeId = PG_GETARG_INT32(1);
uint32 targetNodeId = PG_GETARG_INT32(2);
Oid shardReplicationModeOid = PG_GETARG_OID(3);
bool missingOk = false;
WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk);
WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk);
citus_move_shard_placement_internal(shardId, sourceNode->workerName,
sourceNode->workerPort, targetNode->workerName,
targetNode->workerPort,
shardReplicationModeOid);
PG_RETURN_VOID();
}
/*
* citus_move_shard_placement_internal is the internal function for shard moves.
*/
void
citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort, Oid shardReplicationModeOid)
{
ListCell *colocatedTableCell = NULL;
ListCell *colocatedShardCell = NULL;
@ -322,7 +366,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
ereport(WARNING, (errmsg("shard is already present on node %s:%d",
targetNodeName, targetNodePort),
errdetail("Move may have already completed.")));
PG_RETURN_VOID();
return;
}
foreach(colocatedShardCell, colocatedShardList)
@ -463,7 +507,6 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
PLACEMENT_UPDATE_STATUS_COMPLETED);
FinalizeCurrentProgressMonitor();
PG_RETURN_VOID();
}

View File

@ -14,7 +14,7 @@ DROP FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer)
#include "udfs/citus_prepare_pg_upgrade/11.2-1.sql"
#include "udfs/citus_finish_pg_upgrade/11.2-1.sql"
#include "udfs/citus_copy_shard_placement/11.2-1.sql"
#include "udfs/citus_move_shard_placement_with_nodeid/11.2-1.sql"
#include "udfs/citus_move_shard_placement/11.2-1.sql"
-- drop orphaned shards after inserting records for them into pg_dist_cleanup
INSERT INTO pg_dist_cleanup

View File

@ -27,4 +27,4 @@ DROP FUNCTION pg_catalog.citus_task_wait(bigint, pg_catalog.citus_task_status);
#include "../udfs/citus_finish_pg_upgrade/11.1-1.sql"
DROP FUNCTION pg_catalog.citus_copy_shard_placement(bigint, integer, integer, citus.shard_transfer_mode);
DROP FUNCTION pg_catalog.citus_move_shard_placement_with_nodeid;
DROP FUNCTION pg_catalog.citus_move_shard_placement(bigint, integer, integer, citus.shard_transfer_mode);

View File

@ -0,0 +1,35 @@
-- citus_move_shard_placement, but with nodeid
CREATE FUNCTION pg_catalog.citus_move_shard_placement(
shard_id bigint,
source_node_id integer,
target_node_id integer,
transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_move_shard_placement_with_nodeid$$;
COMMENT ON FUNCTION pg_catalog.citus_move_shard_placement(
shard_id bigint,
source_node_id integer,
target_node_id integer,
transfer_mode citus.shard_transfer_mode)
IS 'move a shard from the source node to the destination node';
CREATE OR REPLACE FUNCTION pg_catalog.citus_move_shard_placement(
shard_id bigint,
source_node_name text,
source_node_port integer,
target_node_name text,
target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_move_shard_placement$$;
COMMENT ON FUNCTION pg_catalog.citus_move_shard_placement(
shard_id bigint,
source_node_name text,
source_node_port integer,
target_node_name text,
target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode)
IS 'move a shard from a the source node to the destination node';

View File

@ -1,4 +1,21 @@
-- citus_move_shard_placement, but with nodeid
CREATE FUNCTION pg_catalog.citus_move_shard_placement(
shard_id bigint,
source_node_id integer,
target_node_id integer,
transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_move_shard_placement_with_nodeid$$;
COMMENT ON FUNCTION pg_catalog.citus_move_shard_placement(
shard_id bigint,
source_node_id integer,
target_node_id integer,
transfer_mode citus.shard_transfer_mode)
IS 'move a shard from the source node to the destination node';
CREATE OR REPLACE FUNCTION pg_catalog.citus_move_shard_placement(
shard_id bigint,
source_node_name text,
source_node_port integer,

View File

@ -12,6 +12,11 @@
#include "distributed/shard_rebalancer.h"
#include "nodes/pg_list.h"
extern void citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName,
int32 sourceNodePort,
char *targetNodeName,
int32 targetNodePort,
Oid shardReplicationModeOid);
extern uint64 ShardListSizeInBytes(List *colocatedShardList,
char *workerNodeName, uint32 workerNodePort);
extern void ErrorIfMoveUnsupportedTableType(Oid relationId);

View File

@ -1271,6 +1271,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_get_transaction_clock() cluster_clock
| function citus_internal_adjust_local_clock_to_remote(cluster_clock) void
| function citus_is_clock_after(cluster_clock,cluster_clock) boolean
| function citus_move_shard_placement(bigint,integer,integer,citus.shard_transfer_mode) void
| function citus_task_wait(bigint,citus_task_status) void
| function cluster_clock_cmp(cluster_clock,cluster_clock) integer
| function cluster_clock_eq(cluster_clock,cluster_clock) boolean
@ -1296,7 +1297,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| operator family cluster_clock_ops for access method btree
| sequence pg_dist_clock_logical_seq
| type cluster_clock
(33 rows)
(34 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -399,14 +399,16 @@ SELECT create_distributed_table('multiple_unique_keys', 'key', colocate_with:='s
(1 row)
INSERT INTO multiple_unique_keys SELECT i,i,i,i,i,i,i,i,i FROM generate_series(0,1000)i;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- make sure that both online and offline rebalance operations succeed
SELECT citus_move_shard_placement(8970000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
SELECT citus_move_shard_placement(8970000, :worker_2_node, :worker_1_node, 'force_logical');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT citus_move_shard_placement(8970000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
SELECT citus_move_shard_placement(8970000, :worker_1_node, :worker_2_node, 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------
@ -414,13 +416,13 @@ SELECT citus_move_shard_placement(8970000, 'localhost', :worker_1_port, 'localho
-- even on another schema
SET search_path TO public;
SELECT citus_move_shard_placement(8970000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
SELECT citus_move_shard_placement(8970000, :worker_2_node, :worker_1_node, 'force_logical');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT citus_move_shard_placement(8970000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
SELECT citus_move_shard_placement(8970000, :worker_1_node, :worker_2_node, 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------

View File

@ -87,6 +87,7 @@ ORDER BY 1;
function citus_jsonb_concatenate_final(jsonb)
function citus_local_disk_space_stats()
function citus_locks()
function citus_move_shard_placement(bigint,integer,integer,citus.shard_transfer_mode)
function citus_move_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode)
function citus_node_capacity_1(integer)
function citus_nodeid_for_gpid(bigint)
@ -313,5 +314,5 @@ ORDER BY 1;
view citus_stat_statements
view pg_dist_shard_placement
view time_partitions
(305 rows)
(306 rows)

View File

@ -249,14 +249,17 @@ CREATE INDEX ii10 ON multiple_unique_keys(a,b,c);
SELECT create_distributed_table('multiple_unique_keys', 'key', colocate_with:='sensors');
INSERT INTO multiple_unique_keys SELECT i,i,i,i,i,i,i,i,i FROM generate_series(0,1000)i;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- make sure that both online and offline rebalance operations succeed
SELECT citus_move_shard_placement(8970000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
SELECT citus_move_shard_placement(8970000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
SELECT citus_move_shard_placement(8970000, :worker_2_node, :worker_1_node, 'force_logical');
SELECT citus_move_shard_placement(8970000, :worker_1_node, :worker_2_node, 'block_writes');
-- even on another schema
SET search_path TO public;
SELECT citus_move_shard_placement(8970000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
SELECT citus_move_shard_placement(8970000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
SELECT citus_move_shard_placement(8970000, :worker_2_node, :worker_1_node, 'force_logical');
SELECT citus_move_shard_placement(8970000, :worker_1_node, :worker_2_node, 'block_writes');
SELECT public.wait_for_resource_cleanup();