diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index beab2be47..8290260d1 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -39,6 +39,7 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_utility.h" #include "distributed/multi_client_executor.h" +#include "distributed/multi_logical_replication.h" #include "distributed/multi_progress.h" #include "distributed/multi_server_executor.h" #include "distributed/pg_dist_rebalance_strategy.h" @@ -60,6 +61,7 @@ #include "utils/json.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/pg_lsn.h" #include "utils/syscache.h" #include "common/hashfn.h" @@ -166,6 +168,7 @@ typedef struct ShardStatistics /* The shard its size in bytes. */ uint64 totalSize; + XLogRecPtr shardLSN; } ShardStatistics; /* @@ -175,6 +178,7 @@ typedef struct ShardStatistics typedef struct WorkerShardStatistics { WorkerHashKey worker; + XLogRecPtr workerLSN; /* * Statistics for each shard on this worker: @@ -246,6 +250,10 @@ static HTAB * GetMovedShardIdsByWorker(PlacementUpdateEventProgress *steps, int stepCount, bool fromSource); static uint64 WorkerShardSize(HTAB *workerShardStatistics, char *workerName, int workerPort, uint64 shardId); +static XLogRecPtr WorkerShardLSN(HTAB *workerShardStatisticsHash, char *workerName, + int workerPort, uint64 shardId); +static XLogRecPtr WorkerLSN(HTAB *workerShardStatisticsHash, + char *workerName, int workerPort); static void AddToWorkerShardIdSet(HTAB *shardsByWorker, char *workerName, int workerPort, uint64 shardId); static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics); @@ -1232,6 +1240,11 @@ get_rebalance_progress(PG_FUNCTION_ARGS) uint64 targetSize = WorkerShardSize(shardStatistics, step->targetName, step->targetPort, shardId); + XLogRecPtr sourceLSN = WorkerLSN(shardStatistics, step->sourceName, + step->sourcePort); + XLogRecPtr targetLSN = WorkerShardLSN(shardStatistics, step->targetName, + step->targetPort, shardId); + uint64 shardSize = 0; ShardStatistics *shardSizesStat = hash_search(shardSizes, &shardId, HASH_FIND, NULL); @@ -1240,8 +1253,8 @@ get_rebalance_progress(PG_FUNCTION_ARGS) shardSize = shardSizesStat->totalSize; } - Datum values[12]; - bool nulls[12]; + Datum values[14]; + bool nulls[14]; memset(values, 0, sizeof(values)); memset(nulls, 0, sizeof(nulls)); @@ -1259,6 +1272,8 @@ get_rebalance_progress(PG_FUNCTION_ARGS) values[10] = UInt64GetDatum(targetSize); values[11] = PointerGetDatum( cstring_to_text(PlacementUpdateTypeNames[step->updateType])); + values[12] = LSNGetDatum(sourceLSN); + values[13] = LSNGetDatum(targetLSN); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } @@ -1392,6 +1407,59 @@ WorkerShardSize(HTAB *workerShardStatisticsHash, char *workerName, int workerPor } +/* + * WorkerShardLSN returns the LSN of a shard on a worker, based on + * the workerShardStatisticsHash. If there is no LSN data in the + * statistics object, returns InvalidXLogRecPtr. + */ +static XLogRecPtr +WorkerShardLSN(HTAB *workerShardStatisticsHash, char *workerName, int workerPort, + uint64 shardId) +{ + WorkerHashKey workerKey = { 0 }; + strlcpy(workerKey.hostname, workerName, MAX_NODE_LENGTH); + workerKey.port = workerPort; + + WorkerShardStatistics *workerStats = + hash_search(workerShardStatisticsHash, &workerKey, HASH_FIND, NULL); + if (!workerStats) + { + return InvalidXLogRecPtr; + } + + ShardStatistics *shardStats = + hash_search(workerStats->statistics, &shardId, HASH_FIND, NULL); + if (!shardStats) + { + return InvalidXLogRecPtr; + } + + return shardStats->shardLSN; +} + + +/* + * WorkerLSN returns the LSN of a worker, based on the workerShardStatisticsHash. + * If there is no LSN data in the statistics object, returns InvalidXLogRecPtr. + */ +static XLogRecPtr +WorkerLSN(HTAB *workerShardStatisticsHash, char *workerName, int workerPort) +{ + WorkerHashKey workerKey = { 0 }; + strlcpy(workerKey.hostname, workerName, MAX_NODE_LENGTH); + workerKey.port = workerPort; + + WorkerShardStatistics *workerStats = + hash_search(workerShardStatisticsHash, &workerKey, HASH_FIND, NULL); + if (!workerStats) + { + return InvalidXLogRecPtr; + } + + return workerStats->workerLSN; +} + + /* * BuildWorkerShardStatisticsHash returns a shard id -> shard statistics hash containing * sizes of shards on the source node and destination node. @@ -1430,6 +1498,7 @@ BuildWorkerShardStatisticsHash(PlacementUpdateEventProgress *steps, int stepCoun WorkerShardStatistics *moveStat = hash_search(workerShardStatistics, &entry->worker, HASH_ENTER, NULL); moveStat->statistics = statistics; + moveStat->workerLSN = GetRemoteLogPosition(connection); } return workerShardStatistics; @@ -1481,15 +1550,17 @@ GetShardStatistics(MultiConnection *connection, HTAB *shardIds) appendStringInfoString(query, "))"); appendStringInfoString( query, - " SELECT shard_id, coalesce(pg_total_relation_size(tables.relid),0)" + " SELECT shard_id, coalesce(pg_total_relation_size(tables.relid),0), tables.lsn" /* for each shard in shardIds */ " FROM shard_names" /* check if its name can be found in pg_class, if so return size */ " LEFT JOIN" - " (SELECT c.oid AS relid, c.relname, n.nspname" - " FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace) tables" + " (SELECT c.oid AS relid, c.relname, n.nspname, ss.latest_end_lsn AS lsn" + " FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace " + " LEFT JOIN pg_subscription_rel sr ON sr.srrelid = c.oid " + " LEFT JOIN pg_stat_subscription ss ON sr.srsubid = ss.subid) tables" " ON tables.relname = shard_names.table_name AND" " tables.nspname = shard_names.schema_name "); @@ -1530,6 +1601,17 @@ GetShardStatistics(MultiConnection *connection, HTAB *shardIds) ShardStatistics *statistics = hash_search(shardStatistics, &shardId, HASH_ENTER, NULL); statistics->totalSize = totalSize; + + if (PQgetisnull(result, rowIndex, 2)) + { + statistics->shardLSN = InvalidXLogRecPtr; + } + else + { + char *LSNString = PQgetvalue(result, rowIndex, 2); + Datum LSNDatum = DirectFunctionCall1(pg_lsn_in, CStringGetDatum(LSNString)); + statistics->shardLSN = DatumGetLSN(LSNDatum); + } } PQclear(result); diff --git a/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql b/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql index 8684ce196..d5bfb4184 100644 --- a/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql +++ b/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql @@ -1,3 +1 @@ --- citus--11.1-1--11.2-1 - --- bump version to 11.2-1 +#include "udfs/get_rebalance_progress/11.2-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql b/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql index eb8106a6a..b2e6c1e2c 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql @@ -1 +1 @@ --- citus--11.2-1--11.1-1 +#include "../udfs/get_rebalance_progress/11.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/get_rebalance_progress/11.2-1.sql b/src/backend/distributed/sql/udfs/get_rebalance_progress/11.2-1.sql new file mode 100644 index 000000000..66d4d4bec --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_rebalance_progress/11.2-1.sql @@ -0,0 +1,22 @@ +DROP FUNCTION pg_catalog.get_rebalance_progress(); + +CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_progress() + RETURNS TABLE(sessionid integer, + table_name regclass, + shardid bigint, + shard_size bigint, + sourcename text, + sourceport int, + targetname text, + targetport int, + progress bigint, + source_shard_size bigint, + target_shard_size bigint, + operation_type text, + source_lsn pg_lsn, + target_lsn pg_lsn + ) + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT; +COMMENT ON FUNCTION pg_catalog.get_rebalance_progress() + IS 'provides progress information about the ongoing rebalance operations'; diff --git a/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql b/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql index 639f9078b..66d4d4bec 100644 --- a/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql +++ b/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql @@ -12,7 +12,9 @@ CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_progress() progress bigint, source_shard_size bigint, target_shard_size bigint, - operation_type text + operation_type text, + source_lsn pg_lsn, + target_lsn pg_lsn ) AS 'MODULE_PATHNAME' LANGUAGE C STRICT; diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out index 234b734b3..69d0a9f91 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer_progress.out +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -30,15 +30,18 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move -colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move -colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move |t |t |f +colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f +colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f (4 rows) step s2-unlock-1-start: @@ -75,10 +78,13 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- (0 rows) @@ -115,15 +121,18 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 0|localhost | 57638| 50000| 2|move -colocated2|1500005| 400000|localhost | 57637| 0|localhost | 57638| 400000| 2|move -colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 1|move -colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 1|move +colocated1|1500001| 50000|localhost | 57637| 0|localhost | 57638| 50000| 2|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 0|localhost | 57638| 400000| 2|move |t |t |f +colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 1|move |t |t |f +colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 1|move |t |t |f (4 rows) step s3-unlock-2-start: @@ -160,10 +169,13 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- (0 rows) @@ -212,15 +224,18 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move -colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move -colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f +colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f +colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f (4 rows) step s7-release-lock: @@ -257,10 +272,13 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- (0 rows) @@ -301,15 +319,18 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move -colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move -colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f +colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f +colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f (4 rows) step s6-release-advisory-lock: @@ -351,10 +372,13 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- (0 rows) @@ -391,13 +415,16 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move |t |t |f (2 rows) step s2-unlock-1-start: @@ -426,10 +453,13 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- (0 rows) @@ -477,13 +507,16 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f (2 rows) step s7-release-lock: @@ -512,10 +545,13 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- (0 rows) @@ -553,13 +589,16 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|copy -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|copy +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|copy |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|copy |t |t |f (2 rows) step s2-unlock-1-start: @@ -607,13 +646,16 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f (2 rows) step s6-release-advisory-lock: @@ -647,10 +689,13 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- (0 rows) @@ -691,13 +736,16 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|copy -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|copy |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f (2 rows) step s6-release-advisory-lock: @@ -751,14 +799,17 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move -separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move |t |t |f +separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f (3 rows) step s2-unlock-1-start: @@ -796,10 +847,13 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- (0 rows) @@ -851,14 +905,17 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move -separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 200000| 1|move +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f +separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 200000| 1|move |t |t |f (3 rows) step s7-release-lock: @@ -896,10 +953,13 @@ step s7-get-progress: targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 8d92416c6..40d8ccfd9 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1199,9 +1199,11 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Snapshot of state at 11.2-1 ALTER EXTENSION citus UPDATE TO '11.2-1'; SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- -(0 rows) + function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text) | + | function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text, source_lsn pg_lsn, target_lsn pg_lsn) +(2 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index c7af9a94b..2a927ea18 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -1044,7 +1044,7 @@ SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', rebala -- Check that we can call this function SELECT * FROM get_rebalance_progress(); - sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size | operation_type + sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size | operation_type | source_lsn | target_lsn --------------------------------------------------------------------- (0 rows) @@ -1058,7 +1058,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, CALL citus_cleanup_orphaned_shards(); -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); - sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size | operation_type + sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size | operation_type | source_lsn | target_lsn --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec index 2e29da4a0..b9d10d047 100644 --- a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec +++ b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec @@ -172,7 +172,10 @@ step "s7-get-progress" targetport, ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, progress, - operation_type + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available FROM get_rebalance_progress(); }