diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 2687eaa6b..dd9eb4a1f 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -1953,12 +1953,10 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo resetStringInfo(&buf); appendStringInfo(&buf, - "SELECT pg_catalog.citus_move_shard_placement(%ld,%s,%u,%s,%u,%s)", + "SELECT pg_catalog.citus_move_shard_placement(%ld,%u,%u,%s)", move->shardId, - quote_literal_cstr(move->sourceNode->workerName), - move->sourceNode->workerPort, - quote_literal_cstr(move->targetNode->workerName), - move->targetNode->workerPort, + move->sourceNode->nodeId, + move->targetNode->nodeId, quote_literal_cstr(shardTranferModeLabel)); BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, @@ -2028,23 +2026,19 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, if (updateType == PLACEMENT_UPDATE_MOVE) { appendStringInfo(placementUpdateCommand, - "SELECT citus_move_shard_placement(%ld,%s,%u,%s,%u,%s)", + "SELECT citus_move_shard_placement(%ld,%u,%u,%s)", shardId, - quote_literal_cstr(sourceNode->workerName), - sourceNode->workerPort, - quote_literal_cstr(targetNode->workerName), - targetNode->workerPort, + sourceNode->nodeId, + targetNode->nodeId, quote_literal_cstr(shardTranferModeLabel)); } else if (updateType == PLACEMENT_UPDATE_COPY) { appendStringInfo(placementUpdateCommand, - "SELECT citus_copy_shard_placement(%ld,%s,%u,%s,%u,%s)", + "SELECT citus_copy_shard_placement(%ld,%u,%u,%s)", shardId, - quote_literal_cstr(sourceNode->workerName), - sourceNode->workerPort, - quote_literal_cstr(targetNode->workerName), - targetNode->workerPort, + sourceNode->nodeId, + targetNode->nodeId, quote_literal_cstr(shardTranferModeLabel)); } else diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index add72c6db..ece5afe0a 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -131,8 +131,10 @@ static char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNod /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_copy_shard_placement); +PG_FUNCTION_INFO_V1(citus_copy_shard_placement_with_nodeid); PG_FUNCTION_INFO_V1(master_copy_shard_placement); PG_FUNCTION_INFO_V1(citus_move_shard_placement); +PG_FUNCTION_INFO_V1(citus_move_shard_placement_with_nodeid); PG_FUNCTION_INFO_V1(master_move_shard_placement); double DesiredPercentFreeAfterMove = 10; @@ -169,6 +171,36 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS) } +/* + * citus_copy_shard_placement_with_nodeid implements a user-facing UDF to copy a placement + * from a source node to a target node, including all co-located placements. + */ +Datum +citus_copy_shard_placement_with_nodeid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + + int64 shardId = PG_GETARG_INT64(0); + uint32 sourceNodeId = PG_GETARG_INT32(1); + uint32 targetNodeId = PG_GETARG_INT32(2); + Oid shardReplicationModeOid = PG_GETARG_OID(3); + + bool missingOk = false; + WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk); + WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk); + + char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); + + ReplicateColocatedShardPlacement(shardId, + sourceNode->workerName, sourceNode->workerPort, + targetNode->workerName, targetNode->workerPort, + shardReplicationMode); + + PG_RETURN_VOID(); +} + + /* * master_copy_shard_placement is a wrapper function for old UDF name. */ @@ -232,7 +264,50 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) int32 targetNodePort = PG_GETARG_INT32(4); Oid shardReplicationModeOid = PG_GETARG_OID(5); + citus_move_shard_placement_internal(shardId, sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort, + shardReplicationModeOid); + PG_RETURN_VOID(); +} + + +/* + * citus_move_shard_placement_with_nodeid does the same as citus_move_shard_placement, + * but accepts node ids as parameters, instead of hostname and port. + */ +Datum +citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + + int64 shardId = PG_GETARG_INT64(0); + uint32 sourceNodeId = PG_GETARG_INT32(1); + uint32 targetNodeId = PG_GETARG_INT32(2); + Oid shardReplicationModeOid = PG_GETARG_OID(3); + + bool missingOk = false; + WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk); + WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk); + + citus_move_shard_placement_internal(shardId, sourceNode->workerName, + sourceNode->workerPort, targetNode->workerName, + targetNode->workerPort, + shardReplicationModeOid); + + PG_RETURN_VOID(); +} + + +/* + * citus_move_shard_placement_internal is the internal function for shard moves. + */ +void +citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName, + int32 sourceNodePort, char *targetNodeName, + int32 targetNodePort, Oid shardReplicationModeOid) +{ ListCell *colocatedTableCell = NULL; ListCell *colocatedShardCell = NULL; @@ -291,7 +366,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) ereport(WARNING, (errmsg("shard is already present on node %s:%d", targetNodeName, targetNodePort), errdetail("Move may have already completed."))); - PG_RETURN_VOID(); + return; } foreach(colocatedShardCell, colocatedShardList) @@ -432,7 +507,6 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) PLACEMENT_UPDATE_STATUS_COMPLETED); FinalizeCurrentProgressMonitor(); - PG_RETURN_VOID(); } diff --git a/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql b/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql index 05c2a8ba6..2e3c73539 100644 --- a/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql +++ b/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql @@ -13,6 +13,8 @@ DROP FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer) #include "udfs/citus_task_wait/11.2-1.sql" #include "udfs/citus_prepare_pg_upgrade/11.2-1.sql" #include "udfs/citus_finish_pg_upgrade/11.2-1.sql" +#include "udfs/citus_copy_shard_placement/11.2-1.sql" +#include "udfs/citus_move_shard_placement/11.2-1.sql" -- drop orphaned shards after inserting records for them into pg_dist_cleanup INSERT INTO pg_dist_cleanup diff --git a/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql b/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql index 166beb7c6..65e57152c 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql @@ -25,3 +25,6 @@ COMMENT ON FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, in DROP FUNCTION pg_catalog.citus_task_wait(bigint, pg_catalog.citus_task_status); #include "../udfs/citus_prepare_pg_upgrade/11.1-1.sql" #include "../udfs/citus_finish_pg_upgrade/11.1-1.sql" + +DROP FUNCTION pg_catalog.citus_copy_shard_placement(bigint, integer, integer, citus.shard_transfer_mode); +DROP FUNCTION pg_catalog.citus_move_shard_placement(bigint, integer, integer, citus.shard_transfer_mode); diff --git a/src/backend/distributed/sql/udfs/citus_copy_shard_placement/11.2-1.sql b/src/backend/distributed/sql/udfs/citus_copy_shard_placement/11.2-1.sql new file mode 100644 index 000000000..597381b67 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_copy_shard_placement/11.2-1.sql @@ -0,0 +1,35 @@ +-- citus_copy_shard_placement, but with nodeid +CREATE FUNCTION pg_catalog.citus_copy_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_copy_shard_placement_with_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_copy_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + transfer_mode citus.shard_transfer_mode) +IS 'copy a shard from the source node to the destination node'; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_copy_shard_placement( + shard_id bigint, + source_node_name text, + source_node_port integer, + target_node_name text, + target_node_port integer, + transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_copy_shard_placement$$; + +COMMENT ON FUNCTION pg_catalog.citus_copy_shard_placement(shard_id bigint, + source_node_name text, + source_node_port integer, + target_node_name text, + target_node_port integer, + shard_transfer_mode citus.shard_transfer_mode) +IS 'copy a shard from the source node to the destination node'; diff --git a/src/backend/distributed/sql/udfs/citus_copy_shard_placement/latest.sql b/src/backend/distributed/sql/udfs/citus_copy_shard_placement/latest.sql index 3a8b7d53f..597381b67 100644 --- a/src/backend/distributed/sql/udfs/citus_copy_shard_placement/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_copy_shard_placement/latest.sql @@ -1,5 +1,21 @@ -DROP FUNCTION pg_catalog.citus_copy_shard_placement; +-- citus_copy_shard_placement, but with nodeid CREATE FUNCTION pg_catalog.citus_copy_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_copy_shard_placement_with_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_copy_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + transfer_mode citus.shard_transfer_mode) +IS 'copy a shard from the source node to the destination node'; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_copy_shard_placement( shard_id bigint, source_node_name text, source_node_port integer, diff --git a/src/backend/distributed/sql/udfs/citus_move_shard_placement/11.2-1.sql b/src/backend/distributed/sql/udfs/citus_move_shard_placement/11.2-1.sql new file mode 100644 index 000000000..cbd905be6 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_move_shard_placement/11.2-1.sql @@ -0,0 +1,35 @@ +-- citus_move_shard_placement, but with nodeid +CREATE FUNCTION pg_catalog.citus_move_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_move_shard_placement_with_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_move_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + transfer_mode citus.shard_transfer_mode) +IS 'move a shard from the source node to the destination node'; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_move_shard_placement( + shard_id bigint, + source_node_name text, + source_node_port integer, + target_node_name text, + target_node_port integer, + shard_transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_move_shard_placement$$; + +COMMENT ON FUNCTION pg_catalog.citus_move_shard_placement( + shard_id bigint, + source_node_name text, + source_node_port integer, + target_node_name text, + target_node_port integer, + shard_transfer_mode citus.shard_transfer_mode) +IS 'move a shard from a the source node to the destination node'; diff --git a/src/backend/distributed/sql/udfs/citus_move_shard_placement/latest.sql b/src/backend/distributed/sql/udfs/citus_move_shard_placement/latest.sql index d063d1b82..cbd905be6 100644 --- a/src/backend/distributed/sql/udfs/citus_move_shard_placement/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_move_shard_placement/latest.sql @@ -1,4 +1,21 @@ +-- citus_move_shard_placement, but with nodeid CREATE FUNCTION pg_catalog.citus_move_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_move_shard_placement_with_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_move_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + transfer_mode citus.shard_transfer_mode) +IS 'move a shard from the source node to the destination node'; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_move_shard_placement( shard_id bigint, source_node_name text, source_node_port integer, diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index d279e8c2e..0b085c67a 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -421,13 +421,11 @@ CopyShardPlacementToWorkerNodeQuery(ShardPlacement *sourceShardPlacement, appendStringInfo(queryString, "SELECT citus_copy_shard_placement(" - UINT64_FORMAT ", %s, %d, %s, %d, " + UINT64_FORMAT ", %d, %d, " "transfer_mode := %s)", sourceShardPlacement->shardId, - quote_literal_cstr(sourceShardPlacement->nodeName), - sourceShardPlacement->nodePort, - quote_literal_cstr(workerNode->workerName), - workerNode->workerPort, + sourceShardPlacement->nodeId, + workerNode->nodeId, quote_literal_cstr(transferModeString)); return queryString; diff --git a/src/include/distributed/shard_transfer.h b/src/include/distributed/shard_transfer.h index 00a31af29..ff2eb2809 100644 --- a/src/include/distributed/shard_transfer.h +++ b/src/include/distributed/shard_transfer.h @@ -12,6 +12,11 @@ #include "distributed/shard_rebalancer.h" #include "nodes/pg_list.h" +extern void citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName, + int32 sourceNodePort, + char *targetNodeName, + int32 targetNodePort, + Oid shardReplicationModeOid); extern uint64 ShardListSizeInBytes(List *colocatedShardList, char *workerNodeName, uint32 workerNodePort); extern void ErrorIfMoveUnsupportedTableType(Oid relationId); diff --git a/src/test/regress/expected/citus_copy_shard_placement.out b/src/test/regress/expected/citus_copy_shard_placement.out index ce2cfca10..f6ea55c71 100644 --- a/src/test/regress/expected/citus_copy_shard_placement.out +++ b/src/test/regress/expected/citus_copy_shard_placement.out @@ -42,26 +42,28 @@ INSERT INTO data VALUES ('key-1', 'value-1'); INSERT INTO data VALUES ('key-2', 'value-2'); INSERT INTO history VALUES ('key-1', '2020-02-01', 'old'); INSERT INTO history VALUES ('key-1', '2019-10-01', 'older'); +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -- verify we error out if no healthy placement exists at source SELECT citus_copy_shard_placement( get_shard_id_for_distribution_column('data', 'key-1'), - 'localhost', :worker_1_port, - 'localhost', :worker_2_port, + :worker_1_node, + :worker_2_node, transfer_mode := 'block_writes'); ERROR: could not find placement matching "localhost:xxxxx" HINT: Confirm the placement still exists and try again. -- verify we error out if source and destination are the same SELECT citus_copy_shard_placement( get_shard_id_for_distribution_column('data', 'key-1'), - 'localhost', :worker_2_port, - 'localhost', :worker_2_port, + :worker_2_node, + :worker_2_node, transfer_mode := 'block_writes'); ERROR: cannot copy shard to the same node -- verify we warn if target already contains a healthy placement SELECT citus_copy_shard_placement( (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid), - 'localhost', :worker_1_port, - 'localhost', :worker_2_port, + :worker_1_node, + :worker_2_node, transfer_mode := 'block_writes'); WARNING: shard is already present on node localhost:xxxxx DETAIL: Copy may have already completed. @@ -75,15 +77,15 @@ INSERT INTO ref_table SELECT 1, value FROM data; ALTER TABLE data ADD CONSTRAINT distfk FOREIGN KEY (value) REFERENCES ref_table (b) MATCH FULL; SELECT citus_copy_shard_placement( get_shard_id_for_distribution_column('data', 'key-1'), - 'localhost', :worker_2_port, - 'localhost', :worker_1_port); + :worker_2_node, + :worker_1_node); ERROR: cannot replicate shards with foreign keys ALTER TABLE data DROP CONSTRAINT distfk; -- replicate shard that contains key-1 SELECT citus_copy_shard_placement( get_shard_id_for_distribution_column('data', 'key-1'), - 'localhost', :worker_2_port, - 'localhost', :worker_1_port, + :worker_2_node, + :worker_1_node, transfer_mode := 'block_writes'); citus_copy_shard_placement --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index bce90883c..b84d6d43a 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1266,10 +1266,12 @@ SELECT * FROM multi_extension.print_extension_changes(); function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text) | function worker_append_table_to_shard(text,text,text,integer) void | function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info | + | function citus_copy_shard_placement(bigint,integer,integer,citus.shard_transfer_mode) void | function citus_get_node_clock() cluster_clock | function citus_get_transaction_clock() cluster_clock | function citus_internal_adjust_local_clock_to_remote(cluster_clock) void | function citus_is_clock_after(cluster_clock,cluster_clock) boolean + | function citus_move_shard_placement(bigint,integer,integer,citus.shard_transfer_mode) void | function citus_task_wait(bigint,citus_task_status) void | function cluster_clock_cmp(cluster_clock,cluster_clock) integer | function cluster_clock_eq(cluster_clock,cluster_clock) boolean @@ -1295,7 +1297,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | operator family cluster_clock_ops for access method btree | sequence pg_dist_clock_logical_seq | type cluster_clock -(32 rows) +(34 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/shard_move_constraints.out b/src/test/regress/expected/shard_move_constraints.out index 0862a4b4f..931e55644 100644 --- a/src/test/regress/expected/shard_move_constraints.out +++ b/src/test/regress/expected/shard_move_constraints.out @@ -399,14 +399,16 @@ SELECT create_distributed_table('multiple_unique_keys', 'key', colocate_with:='s (1 row) INSERT INTO multiple_unique_keys SELECT i,i,i,i,i,i,i,i,i FROM generate_series(0,1000)i; +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -- make sure that both online and offline rebalance operations succeed -SELECT citus_move_shard_placement(8970000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical'); +SELECT citus_move_shard_placement(8970000, :worker_2_node, :worker_1_node, 'force_logical'); citus_move_shard_placement --------------------------------------------------------------------- (1 row) -SELECT citus_move_shard_placement(8970000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes'); +SELECT citus_move_shard_placement(8970000, :worker_1_node, :worker_2_node, 'block_writes'); citus_move_shard_placement --------------------------------------------------------------------- @@ -414,13 +416,13 @@ SELECT citus_move_shard_placement(8970000, 'localhost', :worker_1_port, 'localho -- even on another schema SET search_path TO public; -SELECT citus_move_shard_placement(8970000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical'); +SELECT citus_move_shard_placement(8970000, :worker_2_node, :worker_1_node, 'force_logical'); citus_move_shard_placement --------------------------------------------------------------------- (1 row) -SELECT citus_move_shard_placement(8970000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes'); +SELECT citus_move_shard_placement(8970000, :worker_1_node, :worker_2_node, 'block_writes'); citus_move_shard_placement --------------------------------------------------------------------- diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 84ba28402..413943ed9 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -221,7 +221,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_copy_shard_placement(433101,'localhost',57637,'localhost',57638,'block_writes') +NOTICE: issuing SELECT citus_copy_shard_placement(433101,16,18,'block_writes') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -244,7 +244,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_copy_shard_placement(433102,'localhost',57638,'localhost',57637,'block_writes') +NOTICE: issuing SELECT citus_copy_shard_placement(433102,18,16,'block_writes') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -267,7 +267,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_copy_shard_placement(433103,'localhost',57637,'localhost',57638,'block_writes') +NOTICE: issuing SELECT citus_copy_shard_placement(433103,16,18,'block_writes') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -290,7 +290,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_copy_shard_placement(433104,'localhost',57638,'localhost',57637,'block_writes') +NOTICE: issuing SELECT citus_copy_shard_placement(433104,18,16,'block_writes') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index f10a2a89a..541212bd3 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -35,6 +35,7 @@ ORDER BY 1; function citus_cleanup_orphaned_shards() function citus_conninfo_cache_invalidate() function citus_coordinator_nodeid() + function citus_copy_shard_placement(bigint,integer,integer,citus.shard_transfer_mode) function citus_copy_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode) function citus_create_restore_point(text) function citus_disable_node(text,integer,boolean) @@ -86,6 +87,7 @@ ORDER BY 1; function citus_jsonb_concatenate_final(jsonb) function citus_local_disk_space_stats() function citus_locks() + function citus_move_shard_placement(bigint,integer,integer,citus.shard_transfer_mode) function citus_move_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode) function citus_node_capacity_1(integer) function citus_nodeid_for_gpid(bigint) @@ -312,5 +314,5 @@ ORDER BY 1; view citus_stat_statements view pg_dist_shard_placement view time_partitions -(304 rows) +(306 rows) diff --git a/src/test/regress/sql/citus_copy_shard_placement.sql b/src/test/regress/sql/citus_copy_shard_placement.sql index eba654675..c6e365fe2 100644 --- a/src/test/regress/sql/citus_copy_shard_placement.sql +++ b/src/test/regress/sql/citus_copy_shard_placement.sql @@ -34,25 +34,28 @@ INSERT INTO data VALUES ('key-2', 'value-2'); INSERT INTO history VALUES ('key-1', '2020-02-01', 'old'); INSERT INTO history VALUES ('key-1', '2019-10-01', 'older'); +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + -- verify we error out if no healthy placement exists at source SELECT citus_copy_shard_placement( get_shard_id_for_distribution_column('data', 'key-1'), - 'localhost', :worker_1_port, - 'localhost', :worker_2_port, + :worker_1_node, + :worker_2_node, transfer_mode := 'block_writes'); -- verify we error out if source and destination are the same SELECT citus_copy_shard_placement( get_shard_id_for_distribution_column('data', 'key-1'), - 'localhost', :worker_2_port, - 'localhost', :worker_2_port, + :worker_2_node, + :worker_2_node, transfer_mode := 'block_writes'); -- verify we warn if target already contains a healthy placement SELECT citus_copy_shard_placement( (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid), - 'localhost', :worker_1_port, - 'localhost', :worker_2_port, + :worker_1_node, + :worker_2_node, transfer_mode := 'block_writes'); -- verify we error out if table has foreign key constraints @@ -61,16 +64,16 @@ INSERT INTO ref_table SELECT 1, value FROM data; ALTER TABLE data ADD CONSTRAINT distfk FOREIGN KEY (value) REFERENCES ref_table (b) MATCH FULL; SELECT citus_copy_shard_placement( get_shard_id_for_distribution_column('data', 'key-1'), - 'localhost', :worker_2_port, - 'localhost', :worker_1_port); + :worker_2_node, + :worker_1_node); ALTER TABLE data DROP CONSTRAINT distfk; -- replicate shard that contains key-1 SELECT citus_copy_shard_placement( get_shard_id_for_distribution_column('data', 'key-1'), - 'localhost', :worker_2_port, - 'localhost', :worker_1_port, + :worker_2_node, + :worker_1_node, transfer_mode := 'block_writes'); -- forcefully mark the old replica as inactive diff --git a/src/test/regress/sql/shard_move_constraints.sql b/src/test/regress/sql/shard_move_constraints.sql index e64962d5b..30fddada9 100644 --- a/src/test/regress/sql/shard_move_constraints.sql +++ b/src/test/regress/sql/shard_move_constraints.sql @@ -249,14 +249,17 @@ CREATE INDEX ii10 ON multiple_unique_keys(a,b,c); SELECT create_distributed_table('multiple_unique_keys', 'key', colocate_with:='sensors'); INSERT INTO multiple_unique_keys SELECT i,i,i,i,i,i,i,i,i FROM generate_series(0,1000)i; +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + -- make sure that both online and offline rebalance operations succeed -SELECT citus_move_shard_placement(8970000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical'); -SELECT citus_move_shard_placement(8970000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes'); +SELECT citus_move_shard_placement(8970000, :worker_2_node, :worker_1_node, 'force_logical'); +SELECT citus_move_shard_placement(8970000, :worker_1_node, :worker_2_node, 'block_writes'); -- even on another schema SET search_path TO public; -SELECT citus_move_shard_placement(8970000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical'); -SELECT citus_move_shard_placement(8970000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes'); +SELECT citus_move_shard_placement(8970000, :worker_2_node, :worker_1_node, 'force_logical'); +SELECT citus_move_shard_placement(8970000, :worker_1_node, :worker_2_node, 'block_writes'); SELECT public.wait_for_resource_cleanup();