Transfer shard with nodeid (#6612)

DESCRIPTION: Introduce citus_copy_shard_placement UDF with node id
DESCRIPTION: Introduce citus_move_shard_placement UDF with node id
DESCRIPTION: Use new shard transfer functions with node id for rebalancing

New shard transfer functions to be used with nodeid instead of hostname
and port.
Use these functions in shard rebalancer.
pull/6606/head^2
Ahmet Gedemenli 2023-01-12 18:33:46 +03:00 committed by GitHub
commit a6ad4574f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 250 additions and 57 deletions

View File

@ -1953,12 +1953,10 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
resetStringInfo(&buf);
appendStringInfo(&buf,
"SELECT pg_catalog.citus_move_shard_placement(%ld,%s,%u,%s,%u,%s)",
"SELECT pg_catalog.citus_move_shard_placement(%ld,%u,%u,%s)",
move->shardId,
quote_literal_cstr(move->sourceNode->workerName),
move->sourceNode->workerPort,
quote_literal_cstr(move->targetNode->workerName),
move->targetNode->workerPort,
move->sourceNode->nodeId,
move->targetNode->nodeId,
quote_literal_cstr(shardTranferModeLabel));
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data,
@ -2028,23 +2026,19 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent,
if (updateType == PLACEMENT_UPDATE_MOVE)
{
appendStringInfo(placementUpdateCommand,
"SELECT citus_move_shard_placement(%ld,%s,%u,%s,%u,%s)",
"SELECT citus_move_shard_placement(%ld,%u,%u,%s)",
shardId,
quote_literal_cstr(sourceNode->workerName),
sourceNode->workerPort,
quote_literal_cstr(targetNode->workerName),
targetNode->workerPort,
sourceNode->nodeId,
targetNode->nodeId,
quote_literal_cstr(shardTranferModeLabel));
}
else if (updateType == PLACEMENT_UPDATE_COPY)
{
appendStringInfo(placementUpdateCommand,
"SELECT citus_copy_shard_placement(%ld,%s,%u,%s,%u,%s)",
"SELECT citus_copy_shard_placement(%ld,%u,%u,%s)",
shardId,
quote_literal_cstr(sourceNode->workerName),
sourceNode->workerPort,
quote_literal_cstr(targetNode->workerName),
targetNode->workerPort,
sourceNode->nodeId,
targetNode->nodeId,
quote_literal_cstr(shardTranferModeLabel));
}
else

View File

@ -131,8 +131,10 @@ static char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNod
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(citus_copy_shard_placement);
PG_FUNCTION_INFO_V1(citus_copy_shard_placement_with_nodeid);
PG_FUNCTION_INFO_V1(master_copy_shard_placement);
PG_FUNCTION_INFO_V1(citus_move_shard_placement);
PG_FUNCTION_INFO_V1(citus_move_shard_placement_with_nodeid);
PG_FUNCTION_INFO_V1(master_move_shard_placement);
double DesiredPercentFreeAfterMove = 10;
@ -169,6 +171,36 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS)
}
/*
* citus_copy_shard_placement_with_nodeid implements a user-facing UDF to copy a placement
* from a source node to a target node, including all co-located placements.
*/
Datum
citus_copy_shard_placement_with_nodeid(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
int64 shardId = PG_GETARG_INT64(0);
uint32 sourceNodeId = PG_GETARG_INT32(1);
uint32 targetNodeId = PG_GETARG_INT32(2);
Oid shardReplicationModeOid = PG_GETARG_OID(3);
bool missingOk = false;
WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk);
WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk);
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
ReplicateColocatedShardPlacement(shardId,
sourceNode->workerName, sourceNode->workerPort,
targetNode->workerName, targetNode->workerPort,
shardReplicationMode);
PG_RETURN_VOID();
}
/*
* master_copy_shard_placement is a wrapper function for old UDF name.
*/
@ -232,7 +264,50 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
int32 targetNodePort = PG_GETARG_INT32(4);
Oid shardReplicationModeOid = PG_GETARG_OID(5);
citus_move_shard_placement_internal(shardId, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort,
shardReplicationModeOid);
PG_RETURN_VOID();
}
/*
* citus_move_shard_placement_with_nodeid does the same as citus_move_shard_placement,
* but accepts node ids as parameters, instead of hostname and port.
*/
Datum
citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
int64 shardId = PG_GETARG_INT64(0);
uint32 sourceNodeId = PG_GETARG_INT32(1);
uint32 targetNodeId = PG_GETARG_INT32(2);
Oid shardReplicationModeOid = PG_GETARG_OID(3);
bool missingOk = false;
WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk);
WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk);
citus_move_shard_placement_internal(shardId, sourceNode->workerName,
sourceNode->workerPort, targetNode->workerName,
targetNode->workerPort,
shardReplicationModeOid);
PG_RETURN_VOID();
}
/*
* citus_move_shard_placement_internal is the internal function for shard moves.
*/
void
citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort, Oid shardReplicationModeOid)
{
ListCell *colocatedTableCell = NULL;
ListCell *colocatedShardCell = NULL;
@ -291,7 +366,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
ereport(WARNING, (errmsg("shard is already present on node %s:%d",
targetNodeName, targetNodePort),
errdetail("Move may have already completed.")));
PG_RETURN_VOID();
return;
}
foreach(colocatedShardCell, colocatedShardList)
@ -432,7 +507,6 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
PLACEMENT_UPDATE_STATUS_COMPLETED);
FinalizeCurrentProgressMonitor();
PG_RETURN_VOID();
}

View File

@ -13,6 +13,8 @@ DROP FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer)
#include "udfs/citus_task_wait/11.2-1.sql"
#include "udfs/citus_prepare_pg_upgrade/11.2-1.sql"
#include "udfs/citus_finish_pg_upgrade/11.2-1.sql"
#include "udfs/citus_copy_shard_placement/11.2-1.sql"
#include "udfs/citus_move_shard_placement/11.2-1.sql"
-- drop orphaned shards after inserting records for them into pg_dist_cleanup
INSERT INTO pg_dist_cleanup

View File

@ -25,3 +25,6 @@ COMMENT ON FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, in
DROP FUNCTION pg_catalog.citus_task_wait(bigint, pg_catalog.citus_task_status);
#include "../udfs/citus_prepare_pg_upgrade/11.1-1.sql"
#include "../udfs/citus_finish_pg_upgrade/11.1-1.sql"
DROP FUNCTION pg_catalog.citus_copy_shard_placement(bigint, integer, integer, citus.shard_transfer_mode);
DROP FUNCTION pg_catalog.citus_move_shard_placement(bigint, integer, integer, citus.shard_transfer_mode);

View File

@ -0,0 +1,35 @@
-- citus_copy_shard_placement, but with nodeid
CREATE FUNCTION pg_catalog.citus_copy_shard_placement(
shard_id bigint,
source_node_id integer,
target_node_id integer,
transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_copy_shard_placement_with_nodeid$$;
COMMENT ON FUNCTION pg_catalog.citus_copy_shard_placement(
shard_id bigint,
source_node_id integer,
target_node_id integer,
transfer_mode citus.shard_transfer_mode)
IS 'copy a shard from the source node to the destination node';
CREATE OR REPLACE FUNCTION pg_catalog.citus_copy_shard_placement(
shard_id bigint,
source_node_name text,
source_node_port integer,
target_node_name text,
target_node_port integer,
transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_copy_shard_placement$$;
COMMENT ON FUNCTION pg_catalog.citus_copy_shard_placement(shard_id bigint,
source_node_name text,
source_node_port integer,
target_node_name text,
target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode)
IS 'copy a shard from the source node to the destination node';

View File

@ -1,5 +1,21 @@
DROP FUNCTION pg_catalog.citus_copy_shard_placement;
-- citus_copy_shard_placement, but with nodeid
CREATE FUNCTION pg_catalog.citus_copy_shard_placement(
shard_id bigint,
source_node_id integer,
target_node_id integer,
transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_copy_shard_placement_with_nodeid$$;
COMMENT ON FUNCTION pg_catalog.citus_copy_shard_placement(
shard_id bigint,
source_node_id integer,
target_node_id integer,
transfer_mode citus.shard_transfer_mode)
IS 'copy a shard from the source node to the destination node';
CREATE OR REPLACE FUNCTION pg_catalog.citus_copy_shard_placement(
shard_id bigint,
source_node_name text,
source_node_port integer,

View File

@ -0,0 +1,35 @@
-- citus_move_shard_placement, but with nodeid
CREATE FUNCTION pg_catalog.citus_move_shard_placement(
shard_id bigint,
source_node_id integer,
target_node_id integer,
transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_move_shard_placement_with_nodeid$$;
COMMENT ON FUNCTION pg_catalog.citus_move_shard_placement(
shard_id bigint,
source_node_id integer,
target_node_id integer,
transfer_mode citus.shard_transfer_mode)
IS 'move a shard from the source node to the destination node';
CREATE OR REPLACE FUNCTION pg_catalog.citus_move_shard_placement(
shard_id bigint,
source_node_name text,
source_node_port integer,
target_node_name text,
target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_move_shard_placement$$;
COMMENT ON FUNCTION pg_catalog.citus_move_shard_placement(
shard_id bigint,
source_node_name text,
source_node_port integer,
target_node_name text,
target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode)
IS 'move a shard from a the source node to the destination node';

View File

@ -1,4 +1,21 @@
-- citus_move_shard_placement, but with nodeid
CREATE FUNCTION pg_catalog.citus_move_shard_placement(
shard_id bigint,
source_node_id integer,
target_node_id integer,
transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_move_shard_placement_with_nodeid$$;
COMMENT ON FUNCTION pg_catalog.citus_move_shard_placement(
shard_id bigint,
source_node_id integer,
target_node_id integer,
transfer_mode citus.shard_transfer_mode)
IS 'move a shard from the source node to the destination node';
CREATE OR REPLACE FUNCTION pg_catalog.citus_move_shard_placement(
shard_id bigint,
source_node_name text,
source_node_port integer,

View File

@ -421,13 +421,11 @@ CopyShardPlacementToWorkerNodeQuery(ShardPlacement *sourceShardPlacement,
appendStringInfo(queryString,
"SELECT citus_copy_shard_placement("
UINT64_FORMAT ", %s, %d, %s, %d, "
UINT64_FORMAT ", %d, %d, "
"transfer_mode := %s)",
sourceShardPlacement->shardId,
quote_literal_cstr(sourceShardPlacement->nodeName),
sourceShardPlacement->nodePort,
quote_literal_cstr(workerNode->workerName),
workerNode->workerPort,
sourceShardPlacement->nodeId,
workerNode->nodeId,
quote_literal_cstr(transferModeString));
return queryString;

View File

@ -12,6 +12,11 @@
#include "distributed/shard_rebalancer.h"
#include "nodes/pg_list.h"
extern void citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName,
int32 sourceNodePort,
char *targetNodeName,
int32 targetNodePort,
Oid shardReplicationModeOid);
extern uint64 ShardListSizeInBytes(List *colocatedShardList,
char *workerNodeName, uint32 workerNodePort);
extern void ErrorIfMoveUnsupportedTableType(Oid relationId);

View File

@ -42,26 +42,28 @@ INSERT INTO data VALUES ('key-1', 'value-1');
INSERT INTO data VALUES ('key-2', 'value-2');
INSERT INTO history VALUES ('key-1', '2020-02-01', 'old');
INSERT INTO history VALUES ('key-1', '2019-10-01', 'older');
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- verify we error out if no healthy placement exists at source
SELECT citus_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
'localhost', :worker_1_port,
'localhost', :worker_2_port,
:worker_1_node,
:worker_2_node,
transfer_mode := 'block_writes');
ERROR: could not find placement matching "localhost:xxxxx"
HINT: Confirm the placement still exists and try again.
-- verify we error out if source and destination are the same
SELECT citus_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
'localhost', :worker_2_port,
'localhost', :worker_2_port,
:worker_2_node,
:worker_2_node,
transfer_mode := 'block_writes');
ERROR: cannot copy shard to the same node
-- verify we warn if target already contains a healthy placement
SELECT citus_copy_shard_placement(
(SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid),
'localhost', :worker_1_port,
'localhost', :worker_2_port,
:worker_1_node,
:worker_2_node,
transfer_mode := 'block_writes');
WARNING: shard is already present on node localhost:xxxxx
DETAIL: Copy may have already completed.
@ -75,15 +77,15 @@ INSERT INTO ref_table SELECT 1, value FROM data;
ALTER TABLE data ADD CONSTRAINT distfk FOREIGN KEY (value) REFERENCES ref_table (b) MATCH FULL;
SELECT citus_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
'localhost', :worker_2_port,
'localhost', :worker_1_port);
:worker_2_node,
:worker_1_node);
ERROR: cannot replicate shards with foreign keys
ALTER TABLE data DROP CONSTRAINT distfk;
-- replicate shard that contains key-1
SELECT citus_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
'localhost', :worker_2_port,
'localhost', :worker_1_port,
:worker_2_node,
:worker_1_node,
transfer_mode := 'block_writes');
citus_copy_shard_placement
---------------------------------------------------------------------

View File

@ -1266,10 +1266,12 @@ SELECT * FROM multi_extension.print_extension_changes();
function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text) |
function worker_append_table_to_shard(text,text,text,integer) void |
function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info |
| function citus_copy_shard_placement(bigint,integer,integer,citus.shard_transfer_mode) void
| function citus_get_node_clock() cluster_clock
| function citus_get_transaction_clock() cluster_clock
| function citus_internal_adjust_local_clock_to_remote(cluster_clock) void
| function citus_is_clock_after(cluster_clock,cluster_clock) boolean
| function citus_move_shard_placement(bigint,integer,integer,citus.shard_transfer_mode) void
| function citus_task_wait(bigint,citus_task_status) void
| function cluster_clock_cmp(cluster_clock,cluster_clock) integer
| function cluster_clock_eq(cluster_clock,cluster_clock) boolean
@ -1295,7 +1297,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| operator family cluster_clock_ops for access method btree
| sequence pg_dist_clock_logical_seq
| type cluster_clock
(32 rows)
(34 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -399,14 +399,16 @@ SELECT create_distributed_table('multiple_unique_keys', 'key', colocate_with:='s
(1 row)
INSERT INTO multiple_unique_keys SELECT i,i,i,i,i,i,i,i,i FROM generate_series(0,1000)i;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- make sure that both online and offline rebalance operations succeed
SELECT citus_move_shard_placement(8970000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
SELECT citus_move_shard_placement(8970000, :worker_2_node, :worker_1_node, 'force_logical');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT citus_move_shard_placement(8970000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
SELECT citus_move_shard_placement(8970000, :worker_1_node, :worker_2_node, 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------
@ -414,13 +416,13 @@ SELECT citus_move_shard_placement(8970000, 'localhost', :worker_1_port, 'localho
-- even on another schema
SET search_path TO public;
SELECT citus_move_shard_placement(8970000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
SELECT citus_move_shard_placement(8970000, :worker_2_node, :worker_1_node, 'force_logical');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT citus_move_shard_placement(8970000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
SELECT citus_move_shard_placement(8970000, :worker_1_node, :worker_2_node, 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------

View File

@ -221,7 +221,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_copy_shard_placement(433101,'localhost',57637,'localhost',57638,'block_writes')
NOTICE: issuing SELECT citus_copy_shard_placement(433101,16,18,'block_writes')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -244,7 +244,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_copy_shard_placement(433102,'localhost',57638,'localhost',57637,'block_writes')
NOTICE: issuing SELECT citus_copy_shard_placement(433102,18,16,'block_writes')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -267,7 +267,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_copy_shard_placement(433103,'localhost',57637,'localhost',57638,'block_writes')
NOTICE: issuing SELECT citus_copy_shard_placement(433103,16,18,'block_writes')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -290,7 +290,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_copy_shard_placement(433104,'localhost',57638,'localhost',57637,'block_writes')
NOTICE: issuing SELECT citus_copy_shard_placement(433104,18,16,'block_writes')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx

View File

@ -35,6 +35,7 @@ ORDER BY 1;
function citus_cleanup_orphaned_shards()
function citus_conninfo_cache_invalidate()
function citus_coordinator_nodeid()
function citus_copy_shard_placement(bigint,integer,integer,citus.shard_transfer_mode)
function citus_copy_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode)
function citus_create_restore_point(text)
function citus_disable_node(text,integer,boolean)
@ -86,6 +87,7 @@ ORDER BY 1;
function citus_jsonb_concatenate_final(jsonb)
function citus_local_disk_space_stats()
function citus_locks()
function citus_move_shard_placement(bigint,integer,integer,citus.shard_transfer_mode)
function citus_move_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode)
function citus_node_capacity_1(integer)
function citus_nodeid_for_gpid(bigint)
@ -312,5 +314,5 @@ ORDER BY 1;
view citus_stat_statements
view pg_dist_shard_placement
view time_partitions
(304 rows)
(306 rows)

View File

@ -34,25 +34,28 @@ INSERT INTO data VALUES ('key-2', 'value-2');
INSERT INTO history VALUES ('key-1', '2020-02-01', 'old');
INSERT INTO history VALUES ('key-1', '2019-10-01', 'older');
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- verify we error out if no healthy placement exists at source
SELECT citus_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
'localhost', :worker_1_port,
'localhost', :worker_2_port,
:worker_1_node,
:worker_2_node,
transfer_mode := 'block_writes');
-- verify we error out if source and destination are the same
SELECT citus_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
'localhost', :worker_2_port,
'localhost', :worker_2_port,
:worker_2_node,
:worker_2_node,
transfer_mode := 'block_writes');
-- verify we warn if target already contains a healthy placement
SELECT citus_copy_shard_placement(
(SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid),
'localhost', :worker_1_port,
'localhost', :worker_2_port,
:worker_1_node,
:worker_2_node,
transfer_mode := 'block_writes');
-- verify we error out if table has foreign key constraints
@ -61,16 +64,16 @@ INSERT INTO ref_table SELECT 1, value FROM data;
ALTER TABLE data ADD CONSTRAINT distfk FOREIGN KEY (value) REFERENCES ref_table (b) MATCH FULL;
SELECT citus_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
'localhost', :worker_2_port,
'localhost', :worker_1_port);
:worker_2_node,
:worker_1_node);
ALTER TABLE data DROP CONSTRAINT distfk;
-- replicate shard that contains key-1
SELECT citus_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
'localhost', :worker_2_port,
'localhost', :worker_1_port,
:worker_2_node,
:worker_1_node,
transfer_mode := 'block_writes');
-- forcefully mark the old replica as inactive

View File

@ -249,14 +249,17 @@ CREATE INDEX ii10 ON multiple_unique_keys(a,b,c);
SELECT create_distributed_table('multiple_unique_keys', 'key', colocate_with:='sensors');
INSERT INTO multiple_unique_keys SELECT i,i,i,i,i,i,i,i,i FROM generate_series(0,1000)i;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- make sure that both online and offline rebalance operations succeed
SELECT citus_move_shard_placement(8970000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
SELECT citus_move_shard_placement(8970000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
SELECT citus_move_shard_placement(8970000, :worker_2_node, :worker_1_node, 'force_logical');
SELECT citus_move_shard_placement(8970000, :worker_1_node, :worker_2_node, 'block_writes');
-- even on another schema
SET search_path TO public;
SELECT citus_move_shard_placement(8970000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
SELECT citus_move_shard_placement(8970000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
SELECT citus_move_shard_placement(8970000, :worker_2_node, :worker_1_node, 'force_logical');
SELECT citus_move_shard_placement(8970000, :worker_1_node, :worker_2_node, 'block_writes');
SELECT public.wait_for_resource_cleanup();