diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 60962deac..beab2be47 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -269,6 +269,11 @@ PG_FUNCTION_INFO_V1(citus_rebalance_wait); bool RunningUnderIsolationTest = false; int MaxRebalancerLoggedIgnoredMoves = 5; +static const char *PlacementUpdateTypeNames[] = { + [PLACEMENT_UPDATE_INVALID_FIRST] = "unknown", + [PLACEMENT_UPDATE_MOVE] = "move", + [PLACEMENT_UPDATE_COPY] = "copy", +}; #ifdef USE_ASSERT_CHECKING @@ -801,6 +806,7 @@ SetupRebalanceMonitor(List *placementUpdateList, event->shardId = colocatedUpdate->shardId; event->sourcePort = colocatedUpdate->sourceNode->workerPort; event->targetPort = colocatedUpdate->targetNode->workerPort; + event->updateType = colocatedUpdate->updateType; pg_atomic_init_u64(&event->progress, initialProgressState); eventIndex++; @@ -1234,8 +1240,8 @@ get_rebalance_progress(PG_FUNCTION_ARGS) shardSize = shardSizesStat->totalSize; } - Datum values[11]; - bool nulls[11]; + Datum values[12]; + bool nulls[12]; memset(values, 0, sizeof(values)); memset(nulls, 0, sizeof(nulls)); @@ -1251,6 +1257,8 @@ get_rebalance_progress(PG_FUNCTION_ARGS) values[8] = UInt64GetDatum(pg_atomic_read_u64(&step->progress)); values[9] = UInt64GetDatum(sourceSize); values[10] = UInt64GetDatum(targetSize); + values[11] = PointerGetDatum( + cstring_to_text(PlacementUpdateTypeNames[step->updateType])); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 356e3dd6a..cdb61731f 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -996,6 +996,20 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, return; } + WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); + WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); + + Oid relationId = RelationIdForShard(shardId); + PlacementUpdateEvent *placementUpdateEvent = palloc0( + sizeof(PlacementUpdateEvent)); + placementUpdateEvent->updateType = PLACEMENT_UPDATE_COPY; + placementUpdateEvent->shardId = shardId; + placementUpdateEvent->sourceNode = sourceNode; + placementUpdateEvent->targetNode = targetNode; + SetupRebalanceMonitor(list_make1(placementUpdateEvent), relationId, + REBALANCE_PROGRESS_MOVING); + + /* * At this point of the shard replication, we don't need to block the writes to * shards when logical replication is used. @@ -1064,6 +1078,7 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, SendCommandToWorkersWithMetadata(placementCommand); } } + FinalizeCurrentProgressMonitor(); } diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 00c28f22c..363ddad3c 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -173,3 +173,4 @@ GRANT SELECT ON pg_catalog.pg_dist_background_task_depend TO PUBLIC; #include "udfs/citus_rebalance_start/11.1-1.sql" #include "udfs/citus_rebalance_stop/11.1-1.sql" #include "udfs/citus_rebalance_wait/11.1-1.sql" +#include "udfs/get_rebalance_progress/11.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql index 2a7462e0d..54ebcf4f3 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql @@ -121,3 +121,4 @@ DROP TABLE pg_catalog.pg_dist_background_job; DROP TYPE pg_catalog.citus_job_status; DROP FUNCTION pg_catalog.citus_copy_shard_placement; #include "../udfs/citus_copy_shard_placement/10.0-1.sql" +#include "../udfs/get_rebalance_progress/10.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/get_rebalance_progress/11.1-1.sql b/src/backend/distributed/sql/udfs/get_rebalance_progress/11.1-1.sql new file mode 100644 index 000000000..639f9078b --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_rebalance_progress/11.1-1.sql @@ -0,0 +1,20 @@ +DROP FUNCTION pg_catalog.get_rebalance_progress(); + +CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_progress() + RETURNS TABLE(sessionid integer, + table_name regclass, + shardid bigint, + shard_size bigint, + sourcename text, + sourceport int, + targetname text, + targetport int, + progress bigint, + source_shard_size bigint, + target_shard_size bigint, + operation_type text + ) + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT; +COMMENT ON FUNCTION pg_catalog.get_rebalance_progress() + IS 'provides progress information about the ongoing rebalance operations'; diff --git a/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql b/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql index 7df399ac1..639f9078b 100644 --- a/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql +++ b/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql @@ -11,7 +11,9 @@ CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_progress() targetport int, progress bigint, source_shard_size bigint, - target_shard_size bigint) + target_shard_size bigint, + operation_type text + ) AS 'MODULE_PATHNAME' LANGUAGE C STRICT; COMMENT ON FUNCTION pg_catalog.get_rebalance_progress() diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 36c38ffff..3abeb80fb 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -106,6 +106,7 @@ typedef struct PlacementUpdateEventProgress int sourcePort; char targetName[255]; int targetPort; + PlacementUpdateType updateType; pg_atomic_uint64 progress; } PlacementUpdateEventProgress; diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out index 731f72c14..3ad592a50 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer_progress.out +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -28,15 +28,16 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1 -colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0 -colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1|move +colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0|move +colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0|move (4 rows) step s2-unlock-1-start: @@ -71,10 +72,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -109,15 +111,16 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 73728|localhost | 57637| 0|localhost | 57638| 73728| 2 -colocated2|1500005| 401408|localhost | 57637| 0|localhost | 57638| 401408| 2 -colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 1 -colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 1 +colocated1|1500001| 73728|localhost | 57637| 0|localhost | 57638| 73728| 2|move +colocated2|1500005| 401408|localhost | 57637| 0|localhost | 57638| 401408| 2|move +colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 1|move +colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 1|move (4 rows) step s3-unlock-2-start: @@ -152,10 +155,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -202,15 +206,16 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 -colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0 -colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1|move +colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0|move +colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0|move (4 rows) step s7-release-lock: @@ -245,10 +250,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -287,15 +293,16 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1 -colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0 -colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1|move +colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0|move +colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0|move (4 rows) step s6-release-advisory-lock: @@ -335,10 +342,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -384,13 +392,14 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1|move (2 rows) step s7-release-lock: @@ -417,10 +426,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -455,13 +465,14 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1|move (2 rows) step s2-unlock-1-start: @@ -488,10 +499,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -537,13 +549,14 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1|move (2 rows) step s7-release-lock: @@ -570,10 +583,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -581,6 +595,57 @@ step enable-deferred-drop: ALTER SYSTEM RESET citus.defer_drop_after_shard_move; +starting permutation: s2-lock-1-start s1-shard-copy-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s2-lock-1-start: + BEGIN; + DELETE FROM colocated1 WHERE test_id = 1; + DELETE FROM separate WHERE test_id = 1; + +step s1-shard-copy-c1-block-writes: + BEGIN; + UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); + SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='block_writes'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress, + operation_type + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +--------------------------------------------------------------------- +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1|copy +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1|copy +(2 rows) + +step s2-unlock-1-start: + ROLLBACK; + +step s1-shard-copy-c1-block-writes: <... completed> +citus_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + + starting permutation: s6-acquire-advisory-lock s1-shard-move-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress enable-deferred-drop master_set_node_property --------------------------------------------------------------------- @@ -611,13 +676,14 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1|move (2 rows) step s6-release-advisory-lock: @@ -649,10 +715,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -698,13 +765,14 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1|move (2 rows) step s7-release-lock: @@ -731,10 +799,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -742,6 +811,65 @@ step enable-deferred-drop: ALTER SYSTEM RESET citus.defer_drop_after_shard_move; +starting permutation: s6-acquire-advisory-lock s1-shard-copy-c1-online s7-get-progress s6-release-advisory-lock s1-commit +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s6-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-shard-copy-c1-online: + BEGIN; + UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); + SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress, + operation_type + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +--------------------------------------------------------------------- +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1|copy +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1|copy +(2 rows) + +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-copy-c1-online: <... completed> +citus_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + + starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress s2-unlock-1-start s1-commit s4-commit s7-get-progress enable-deferred-drop master_set_node_property --------------------------------------------------------------------- @@ -773,14 +901,15 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1 -separate |1500009| 122880|localhost | 57637| 122880|localhost | 57638| 0| 1 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1|move +separate |1500009| 122880|localhost | 57637| 122880|localhost | 57638| 0| 1|move (3 rows) step s2-unlock-1-start: @@ -816,10 +945,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -869,14 +999,15 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 -separate |1500009| 122880|localhost | 57637| 122880|localhost | 57638| 147456| 1 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1|move +separate |1500009| 122880|localhost | 57637| 122880|localhost | 57638| 147456| 1|move (3 rows) step s7-release-lock: @@ -912,10 +1043,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 5f37f7a32..4ae526935 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1111,64 +1111,66 @@ ERROR: extension "citus" already exists -- Snapshot of state at 11.1-1 ALTER EXTENSION citus UPDATE TO '11.1-1'; SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- - access method columnar | - function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean) void | - function alter_columnar_table_set(regclass,integer,integer,name,integer) void | - function citus_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode) void | - function citus_internal.columnar_ensure_am_depends_catalog() void | - function citus_internal.downgrade_columnar_storage(regclass) void | - function citus_internal.upgrade_columnar_storage(regclass) void | - function columnar.columnar_handler(internal) table_am_handler | - function isolate_tenant_to_new_shard(regclass,"any",text) bigint | - function replicate_reference_tables() void | - function worker_cleanup_job_schema_cache() void | - function worker_create_schema(bigint,text) void | - function worker_fetch_foreign_file(text,text,bigint,text[],integer[]) void | - function worker_fetch_partition_file(bigint,integer,integer,integer,text,integer) void | - function worker_hash_partition_table(bigint,integer,text,text,oid,anyarray) void | - function worker_merge_files_into_table(bigint,integer,text[],text[]) void | - function worker_range_partition_table(bigint,integer,text,text,oid,anyarray) void | - function worker_repartition_cleanup(bigint) void | - schema columnar | - sequence columnar.storageid_seq | - table columnar.chunk | - table columnar.chunk_group | - table columnar.options | - table columnar.stripe | - | function citus_cleanup_orphaned_resources() - | function citus_copy_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode) void - | function citus_internal_delete_partition_metadata(regclass) void - | function citus_job_cancel(bigint) void - | function citus_job_wait(bigint,citus_job_status) void - | function citus_locks() SETOF record - | function citus_rebalance_start(name,boolean,citus.shard_transfer_mode) bigint - | function citus_rebalance_stop() void - | function citus_rebalance_wait() void - | function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void - | function create_distributed_table_concurrently(regclass,text,citus.distribution_type,text,integer) void - | function isolate_tenant_to_new_shard(regclass,"any",text,citus.shard_transfer_mode) bigint - | function replicate_reference_tables(citus.shard_transfer_mode) void - | function worker_copy_table_to_node(regclass,integer) void - | function worker_split_copy(bigint,text,split_copy_info[]) void - | function worker_split_shard_release_dsm() void - | function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info - | sequence pg_dist_background_job_job_id_seq - | sequence pg_dist_background_task_task_id_seq - | sequence pg_dist_cleanup_recordid_seq - | sequence pg_dist_operationid_seq - | table pg_dist_background_job - | table pg_dist_background_task - | table pg_dist_background_task_depend - | table pg_dist_cleanup - | type citus_job_status - | type citus_task_status - | type replication_slot_info - | type split_copy_info - | type split_shard_info - | view citus_locks -(55 rows) + access method columnar | + function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean) void | + function alter_columnar_table_set(regclass,integer,integer,name,integer) void | + function citus_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode) void | + function citus_internal.columnar_ensure_am_depends_catalog() void | + function citus_internal.downgrade_columnar_storage(regclass) void | + function citus_internal.upgrade_columnar_storage(regclass) void | + function columnar.columnar_handler(internal) table_am_handler | + function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint) | + function isolate_tenant_to_new_shard(regclass,"any",text) bigint | + function replicate_reference_tables() void | + function worker_cleanup_job_schema_cache() void | + function worker_create_schema(bigint,text) void | + function worker_fetch_foreign_file(text,text,bigint,text[],integer[]) void | + function worker_fetch_partition_file(bigint,integer,integer,integer,text,integer) void | + function worker_hash_partition_table(bigint,integer,text,text,oid,anyarray) void | + function worker_merge_files_into_table(bigint,integer,text[],text[]) void | + function worker_range_partition_table(bigint,integer,text,text,oid,anyarray) void | + function worker_repartition_cleanup(bigint) void | + schema columnar | + sequence columnar.storageid_seq | + table columnar.chunk | + table columnar.chunk_group | + table columnar.options | + table columnar.stripe | + | function citus_cleanup_orphaned_resources() + | function citus_copy_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode) void + | function citus_internal_delete_partition_metadata(regclass) void + | function citus_job_cancel(bigint) void + | function citus_job_wait(bigint,citus_job_status) void + | function citus_locks() SETOF record + | function citus_rebalance_start(name,boolean,citus.shard_transfer_mode) bigint + | function citus_rebalance_stop() void + | function citus_rebalance_wait() void + | function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void + | function create_distributed_table_concurrently(regclass,text,citus.distribution_type,text,integer) void + | function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text) + | function isolate_tenant_to_new_shard(regclass,"any",text,citus.shard_transfer_mode) bigint + | function replicate_reference_tables(citus.shard_transfer_mode) void + | function worker_copy_table_to_node(regclass,integer) void + | function worker_split_copy(bigint,text,split_copy_info[]) void + | function worker_split_shard_release_dsm() void + | function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info + | sequence pg_dist_background_job_job_id_seq + | sequence pg_dist_background_task_task_id_seq + | sequence pg_dist_cleanup_recordid_seq + | sequence pg_dist_operationid_seq + | table pg_dist_background_job + | table pg_dist_background_task + | table pg_dist_background_task_depend + | table pg_dist_cleanup + | type citus_job_status + | type citus_task_status + | type replication_slot_info + | type split_copy_info + | type split_shard_info + | view citus_locks +(57 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 4f7fad246..c7af9a94b 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -1044,7 +1044,7 @@ SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', rebala -- Check that we can call this function SELECT * FROM get_rebalance_progress(); - sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size + sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size | operation_type --------------------------------------------------------------------- (0 rows) @@ -1058,7 +1058,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, CALL citus_cleanup_orphaned_shards(); -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); - sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size + sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size | operation_type --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec index c9bb3b641..572163f7c 100644 --- a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec +++ b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec @@ -58,12 +58,26 @@ step "s1-shard-move-c1-block-writes" SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); } +step "s1-shard-copy-c1-block-writes" +{ + BEGIN; + UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); + SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='block_writes'); +} + step "s1-shard-move-c1-online" { BEGIN; SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical'); } +step "s1-shard-copy-c1-online" +{ + BEGIN; + UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); + SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='force_logical'); +} + step "s1-commit" { COMMIT; @@ -156,7 +170,8 @@ step "s7-get-progress" targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); } @@ -188,10 +203,15 @@ permutation "s7-grab-lock" "s1-shard-move-c1-online" "s7-get-progress" "s7-relea permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" permutation "s7-grab-lock" "s1-shard-move-c1-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +// blocking shard copy +permutation "s2-lock-1-start" "s1-shard-copy-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" + // online shard move permutation "s6-acquire-advisory-lock" "s1-shard-move-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" permutation "s7-grab-lock" "s1-shard-move-c1-online" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +// online shard copy +permutation "s6-acquire-advisory-lock" "s1-shard-copy-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" // parallel blocking shard move permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s4-commit" "s7-get-progress" "enable-deferred-drop"