From f13b14062105b08d8e2891ae781f37461c039696 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Tue, 13 Sep 2022 10:59:52 +0200 Subject: [PATCH] Show citus_copy_shard_placement progress in get_rebalance_progress (#6322) DESCRIPTION: Show citus_copy_shard_placement progress in get_rebalance_progress When rebalancing to a new node that does not have reference tables yet the rebalancer will first copy the reference tables to the nodes. Depending on the size of the reference tables, this might take a long time. However, there's no indication of what's happening at this stage of the rebalance. This PR improves this situation by also showing the progress of any citus_copy_shard_placement calls when calling get_rebalance_progress. --- .../distributed/operations/shard_rebalancer.c | 12 +- .../distributed/operations/shard_transfer.c | 15 + .../distributed/sql/citus--11.0-4--11.1-1.sql | 1 + .../sql/downgrades/citus--11.1-1--11.0-4.sql | 1 + .../udfs/get_rebalance_progress/11.1-1.sql | 20 ++ .../udfs/get_rebalance_progress/latest.sql | 4 +- src/include/distributed/shard_rebalancer.h | 1 + .../isolation_shard_rebalancer_progress.out | 284 +++++++++++++----- src/test/regress/expected/multi_extension.out | 116 +++---- .../regress/expected/shard_rebalancer.out | 4 +- .../isolation_shard_rebalancer_progress.spec | 22 +- 11 files changed, 341 insertions(+), 139 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/get_rebalance_progress/11.1-1.sql diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 60962deac..beab2be47 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -269,6 +269,11 @@ PG_FUNCTION_INFO_V1(citus_rebalance_wait); bool RunningUnderIsolationTest = false; int MaxRebalancerLoggedIgnoredMoves = 5; +static const char *PlacementUpdateTypeNames[] = { + [PLACEMENT_UPDATE_INVALID_FIRST] = "unknown", + [PLACEMENT_UPDATE_MOVE] = "move", + [PLACEMENT_UPDATE_COPY] = "copy", +}; #ifdef USE_ASSERT_CHECKING @@ -801,6 +806,7 @@ SetupRebalanceMonitor(List *placementUpdateList, event->shardId = colocatedUpdate->shardId; event->sourcePort = colocatedUpdate->sourceNode->workerPort; event->targetPort = colocatedUpdate->targetNode->workerPort; + event->updateType = colocatedUpdate->updateType; pg_atomic_init_u64(&event->progress, initialProgressState); eventIndex++; @@ -1234,8 +1240,8 @@ get_rebalance_progress(PG_FUNCTION_ARGS) shardSize = shardSizesStat->totalSize; } - Datum values[11]; - bool nulls[11]; + Datum values[12]; + bool nulls[12]; memset(values, 0, sizeof(values)); memset(nulls, 0, sizeof(nulls)); @@ -1251,6 +1257,8 @@ get_rebalance_progress(PG_FUNCTION_ARGS) values[8] = UInt64GetDatum(pg_atomic_read_u64(&step->progress)); values[9] = UInt64GetDatum(sourceSize); values[10] = UInt64GetDatum(targetSize); + values[11] = PointerGetDatum( + cstring_to_text(PlacementUpdateTypeNames[step->updateType])); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 356e3dd6a..cdb61731f 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -996,6 +996,20 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, return; } + WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); + WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); + + Oid relationId = RelationIdForShard(shardId); + PlacementUpdateEvent *placementUpdateEvent = palloc0( + sizeof(PlacementUpdateEvent)); + placementUpdateEvent->updateType = PLACEMENT_UPDATE_COPY; + placementUpdateEvent->shardId = shardId; + placementUpdateEvent->sourceNode = sourceNode; + placementUpdateEvent->targetNode = targetNode; + SetupRebalanceMonitor(list_make1(placementUpdateEvent), relationId, + REBALANCE_PROGRESS_MOVING); + + /* * At this point of the shard replication, we don't need to block the writes to * shards when logical replication is used. @@ -1064,6 +1078,7 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, SendCommandToWorkersWithMetadata(placementCommand); } } + FinalizeCurrentProgressMonitor(); } diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 00c28f22c..363ddad3c 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -173,3 +173,4 @@ GRANT SELECT ON pg_catalog.pg_dist_background_task_depend TO PUBLIC; #include "udfs/citus_rebalance_start/11.1-1.sql" #include "udfs/citus_rebalance_stop/11.1-1.sql" #include "udfs/citus_rebalance_wait/11.1-1.sql" +#include "udfs/get_rebalance_progress/11.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql index 2a7462e0d..54ebcf4f3 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql @@ -121,3 +121,4 @@ DROP TABLE pg_catalog.pg_dist_background_job; DROP TYPE pg_catalog.citus_job_status; DROP FUNCTION pg_catalog.citus_copy_shard_placement; #include "../udfs/citus_copy_shard_placement/10.0-1.sql" +#include "../udfs/get_rebalance_progress/10.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/get_rebalance_progress/11.1-1.sql b/src/backend/distributed/sql/udfs/get_rebalance_progress/11.1-1.sql new file mode 100644 index 000000000..639f9078b --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_rebalance_progress/11.1-1.sql @@ -0,0 +1,20 @@ +DROP FUNCTION pg_catalog.get_rebalance_progress(); + +CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_progress() + RETURNS TABLE(sessionid integer, + table_name regclass, + shardid bigint, + shard_size bigint, + sourcename text, + sourceport int, + targetname text, + targetport int, + progress bigint, + source_shard_size bigint, + target_shard_size bigint, + operation_type text + ) + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT; +COMMENT ON FUNCTION pg_catalog.get_rebalance_progress() + IS 'provides progress information about the ongoing rebalance operations'; diff --git a/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql b/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql index 7df399ac1..639f9078b 100644 --- a/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql +++ b/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql @@ -11,7 +11,9 @@ CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_progress() targetport int, progress bigint, source_shard_size bigint, - target_shard_size bigint) + target_shard_size bigint, + operation_type text + ) AS 'MODULE_PATHNAME' LANGUAGE C STRICT; COMMENT ON FUNCTION pg_catalog.get_rebalance_progress() diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 36c38ffff..3abeb80fb 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -106,6 +106,7 @@ typedef struct PlacementUpdateEventProgress int sourcePort; char targetName[255]; int targetPort; + PlacementUpdateType updateType; pg_atomic_uint64 progress; } PlacementUpdateEventProgress; diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out index 731f72c14..3ad592a50 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer_progress.out +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -28,15 +28,16 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1 -colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0 -colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1|move +colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0|move +colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0|move (4 rows) step s2-unlock-1-start: @@ -71,10 +72,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -109,15 +111,16 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 73728|localhost | 57637| 0|localhost | 57638| 73728| 2 -colocated2|1500005| 401408|localhost | 57637| 0|localhost | 57638| 401408| 2 -colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 1 -colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 1 +colocated1|1500001| 73728|localhost | 57637| 0|localhost | 57638| 73728| 2|move +colocated2|1500005| 401408|localhost | 57637| 0|localhost | 57638| 401408| 2|move +colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 1|move +colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 1|move (4 rows) step s3-unlock-2-start: @@ -152,10 +155,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -202,15 +206,16 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 -colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0 -colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1|move +colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0|move +colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0|move (4 rows) step s7-release-lock: @@ -245,10 +250,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -287,15 +293,16 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1 -colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0 -colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1|move +colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0|move +colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0|move (4 rows) step s6-release-advisory-lock: @@ -335,10 +342,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -384,13 +392,14 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1|move (2 rows) step s7-release-lock: @@ -417,10 +426,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -455,13 +465,14 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1|move (2 rows) step s2-unlock-1-start: @@ -488,10 +499,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -537,13 +549,14 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1|move (2 rows) step s7-release-lock: @@ -570,10 +583,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -581,6 +595,57 @@ step enable-deferred-drop: ALTER SYSTEM RESET citus.defer_drop_after_shard_move; +starting permutation: s2-lock-1-start s1-shard-copy-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s2-lock-1-start: + BEGIN; + DELETE FROM colocated1 WHERE test_id = 1; + DELETE FROM separate WHERE test_id = 1; + +step s1-shard-copy-c1-block-writes: + BEGIN; + UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); + SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='block_writes'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress, + operation_type + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +--------------------------------------------------------------------- +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1|copy +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1|copy +(2 rows) + +step s2-unlock-1-start: + ROLLBACK; + +step s1-shard-copy-c1-block-writes: <... completed> +citus_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + + starting permutation: s6-acquire-advisory-lock s1-shard-move-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress enable-deferred-drop master_set_node_property --------------------------------------------------------------------- @@ -611,13 +676,14 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1|move (2 rows) step s6-release-advisory-lock: @@ -649,10 +715,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -698,13 +765,14 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1|move (2 rows) step s7-release-lock: @@ -731,10 +799,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -742,6 +811,65 @@ step enable-deferred-drop: ALTER SYSTEM RESET citus.defer_drop_after_shard_move; +starting permutation: s6-acquire-advisory-lock s1-shard-copy-c1-online s7-get-progress s6-release-advisory-lock s1-commit +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s6-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-shard-copy-c1-online: + BEGIN; + UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); + SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress, + operation_type + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +--------------------------------------------------------------------- +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1|copy +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1|copy +(2 rows) + +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-copy-c1-online: <... completed> +citus_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + + starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress s2-unlock-1-start s1-commit s4-commit s7-get-progress enable-deferred-drop master_set_node_property --------------------------------------------------------------------- @@ -773,14 +901,15 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1 -separate |1500009| 122880|localhost | 57637| 122880|localhost | 57638| 0| 1 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1|move +separate |1500009| 122880|localhost | 57637| 122880|localhost | 57638| 0| 1|move (3 rows) step s2-unlock-1-start: @@ -816,10 +945,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) @@ -869,14 +999,15 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- -colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 -colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 -separate |1500009| 122880|localhost | 57637| 122880|localhost | 57638| 147456| 1 +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1|move +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1|move +separate |1500009| 122880|localhost | 57637| 122880|localhost | 57638| 147456| 1|move (3 rows) step s7-release-lock: @@ -912,10 +1043,11 @@ step s7-get-progress: targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 5f37f7a32..4ae526935 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1111,64 +1111,66 @@ ERROR: extension "citus" already exists -- Snapshot of state at 11.1-1 ALTER EXTENSION citus UPDATE TO '11.1-1'; SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- - access method columnar | - function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean) void | - function alter_columnar_table_set(regclass,integer,integer,name,integer) void | - function citus_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode) void | - function citus_internal.columnar_ensure_am_depends_catalog() void | - function citus_internal.downgrade_columnar_storage(regclass) void | - function citus_internal.upgrade_columnar_storage(regclass) void | - function columnar.columnar_handler(internal) table_am_handler | - function isolate_tenant_to_new_shard(regclass,"any",text) bigint | - function replicate_reference_tables() void | - function worker_cleanup_job_schema_cache() void | - function worker_create_schema(bigint,text) void | - function worker_fetch_foreign_file(text,text,bigint,text[],integer[]) void | - function worker_fetch_partition_file(bigint,integer,integer,integer,text,integer) void | - function worker_hash_partition_table(bigint,integer,text,text,oid,anyarray) void | - function worker_merge_files_into_table(bigint,integer,text[],text[]) void | - function worker_range_partition_table(bigint,integer,text,text,oid,anyarray) void | - function worker_repartition_cleanup(bigint) void | - schema columnar | - sequence columnar.storageid_seq | - table columnar.chunk | - table columnar.chunk_group | - table columnar.options | - table columnar.stripe | - | function citus_cleanup_orphaned_resources() - | function citus_copy_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode) void - | function citus_internal_delete_partition_metadata(regclass) void - | function citus_job_cancel(bigint) void - | function citus_job_wait(bigint,citus_job_status) void - | function citus_locks() SETOF record - | function citus_rebalance_start(name,boolean,citus.shard_transfer_mode) bigint - | function citus_rebalance_stop() void - | function citus_rebalance_wait() void - | function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void - | function create_distributed_table_concurrently(regclass,text,citus.distribution_type,text,integer) void - | function isolate_tenant_to_new_shard(regclass,"any",text,citus.shard_transfer_mode) bigint - | function replicate_reference_tables(citus.shard_transfer_mode) void - | function worker_copy_table_to_node(regclass,integer) void - | function worker_split_copy(bigint,text,split_copy_info[]) void - | function worker_split_shard_release_dsm() void - | function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info - | sequence pg_dist_background_job_job_id_seq - | sequence pg_dist_background_task_task_id_seq - | sequence pg_dist_cleanup_recordid_seq - | sequence pg_dist_operationid_seq - | table pg_dist_background_job - | table pg_dist_background_task - | table pg_dist_background_task_depend - | table pg_dist_cleanup - | type citus_job_status - | type citus_task_status - | type replication_slot_info - | type split_copy_info - | type split_shard_info - | view citus_locks -(55 rows) + access method columnar | + function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean) void | + function alter_columnar_table_set(regclass,integer,integer,name,integer) void | + function citus_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode) void | + function citus_internal.columnar_ensure_am_depends_catalog() void | + function citus_internal.downgrade_columnar_storage(regclass) void | + function citus_internal.upgrade_columnar_storage(regclass) void | + function columnar.columnar_handler(internal) table_am_handler | + function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint) | + function isolate_tenant_to_new_shard(regclass,"any",text) bigint | + function replicate_reference_tables() void | + function worker_cleanup_job_schema_cache() void | + function worker_create_schema(bigint,text) void | + function worker_fetch_foreign_file(text,text,bigint,text[],integer[]) void | + function worker_fetch_partition_file(bigint,integer,integer,integer,text,integer) void | + function worker_hash_partition_table(bigint,integer,text,text,oid,anyarray) void | + function worker_merge_files_into_table(bigint,integer,text[],text[]) void | + function worker_range_partition_table(bigint,integer,text,text,oid,anyarray) void | + function worker_repartition_cleanup(bigint) void | + schema columnar | + sequence columnar.storageid_seq | + table columnar.chunk | + table columnar.chunk_group | + table columnar.options | + table columnar.stripe | + | function citus_cleanup_orphaned_resources() + | function citus_copy_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode) void + | function citus_internal_delete_partition_metadata(regclass) void + | function citus_job_cancel(bigint) void + | function citus_job_wait(bigint,citus_job_status) void + | function citus_locks() SETOF record + | function citus_rebalance_start(name,boolean,citus.shard_transfer_mode) bigint + | function citus_rebalance_stop() void + | function citus_rebalance_wait() void + | function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void + | function create_distributed_table_concurrently(regclass,text,citus.distribution_type,text,integer) void + | function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text) + | function isolate_tenant_to_new_shard(regclass,"any",text,citus.shard_transfer_mode) bigint + | function replicate_reference_tables(citus.shard_transfer_mode) void + | function worker_copy_table_to_node(regclass,integer) void + | function worker_split_copy(bigint,text,split_copy_info[]) void + | function worker_split_shard_release_dsm() void + | function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info + | sequence pg_dist_background_job_job_id_seq + | sequence pg_dist_background_task_task_id_seq + | sequence pg_dist_cleanup_recordid_seq + | sequence pg_dist_operationid_seq + | table pg_dist_background_job + | table pg_dist_background_task + | table pg_dist_background_task_depend + | table pg_dist_cleanup + | type citus_job_status + | type citus_task_status + | type replication_slot_info + | type split_copy_info + | type split_shard_info + | view citus_locks +(57 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 4f7fad246..c7af9a94b 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -1044,7 +1044,7 @@ SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', rebala -- Check that we can call this function SELECT * FROM get_rebalance_progress(); - sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size + sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size | operation_type --------------------------------------------------------------------- (0 rows) @@ -1058,7 +1058,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, CALL citus_cleanup_orphaned_shards(); -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); - sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size + sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size | operation_type --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec index c9bb3b641..572163f7c 100644 --- a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec +++ b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec @@ -58,12 +58,26 @@ step "s1-shard-move-c1-block-writes" SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); } +step "s1-shard-copy-c1-block-writes" +{ + BEGIN; + UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); + SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='block_writes'); +} + step "s1-shard-move-c1-online" { BEGIN; SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical'); } +step "s1-shard-copy-c1-online" +{ + BEGIN; + UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); + SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='force_logical'); +} + step "s1-commit" { COMMIT; @@ -156,7 +170,8 @@ step "s7-get-progress" targetname, targetport, target_shard_size, - progress + progress, + operation_type FROM get_rebalance_progress(); } @@ -188,10 +203,15 @@ permutation "s7-grab-lock" "s1-shard-move-c1-online" "s7-get-progress" "s7-relea permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" permutation "s7-grab-lock" "s1-shard-move-c1-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +// blocking shard copy +permutation "s2-lock-1-start" "s1-shard-copy-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" + // online shard move permutation "s6-acquire-advisory-lock" "s1-shard-move-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" permutation "s7-grab-lock" "s1-shard-move-c1-online" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +// online shard copy +permutation "s6-acquire-advisory-lock" "s1-shard-copy-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" // parallel blocking shard move permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s4-commit" "s7-get-progress" "enable-deferred-drop"