From 2aa67421a78a0404b49f7591e110ad822065d5e1 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Fri, 23 Jul 2021 16:37:00 +0200 Subject: [PATCH] Fix showing target shard size in the rebalance progress monitor (#5136) The progress monitor wouldn't actually update the size of the shard on the target node when using "block_writes" as the `shard_transfer_mode`. The reason for this is that the CREATE TABLE part of the shard creation would only be committed once all data was moved as well. This caused our size calculation to always return 0, since the table did not exist yet in the session that the progress monitor used. This is fixed by first committing creation of the table, and only then starting the actual data copy. The test output changes slightly. Apparently splitting this up in two transactions instead of one, increases the table size after the copy by about 40kB. The additional size used doesn't increase when with the amount of data in the table is larger (it stays ~40kB per shard). So this small change in test output is not considered an actual problem. --- .../distributed/operations/repair_shards.c | 128 ++++++++++++++---- .../isolation_shard_rebalancer_progress.out | 4 +- .../regress/expected/shard_rebalancer.out | 1 + src/test/regress/sql/shard_rebalancer.sql | 1 + 4 files changed, 103 insertions(+), 31 deletions(-) diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 35ff2b848..c4f16f6b2 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -95,6 +95,15 @@ static void EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList, char *sourceNodeName, uint32 sourceNodePort, char *targetNodeName, uint32 targetNodePort); +static List * RecreateShardDDLCommandList(ShardInterval *shardInterval, + const char *sourceNodeName, + int32 sourceNodePort); +static List * CopyShardContentsCommandList(ShardInterval *shardInterval, + const char *sourceNodeName, + int32 sourceNodePort); +static List * PostLoadShardCreationCommandList(ShardInterval *shardInterval, + const char *sourceNodeName, + int32 sourceNodePort); /* declarations for dynamic loading */ @@ -932,12 +941,38 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, shardIntervalList) { - bool includeDataCopy = !PartitionedTable(shardInterval->relationId); - - List *ddlCommandList = CopyShardCommandList(shardInterval, sourceNodeName, - sourceNodePort, includeDataCopy); + /* + * For each shard we first create the shard table in a separate + * transaction and then we copy the data and create the indexes in a + * second separate transaction. The reason we don't do both in a single + * transaction is so we can see the size of the new shard growing + * during the copy when we run get_rebalance_progress in another + * session. If we wouldn't split these two phases up, then the table + * wouldn't be visible in the session that get_rebalance_progress uses. + * So get_rebalance_progress would always report its size as 0. + */ + List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, + sourceNodePort); char *tableOwner = TableOwner(shardInterval->relationId); + SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); + ddlCommandList = NIL; + + /* + * Skip copying data for partitioned tables, because they contain no + * data themselves. Their partitions do contain data, but those are + * different colocated shards that will be copied seperately. + */ + if (!PartitionedTable(shardInterval->relationId)) + { + ddlCommandList = CopyShardContentsCommandList(shardInterval, sourceNodeName, + sourceNodePort); + } + ddlCommandList = list_concat( + ddlCommandList, + PostLoadShardCreationCommandList(shardInterval, sourceNodeName, + sourceNodePort)); SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, ddlCommandList); @@ -1165,47 +1200,82 @@ SearchShardPlacementInListOrError(List *shardPlacementList, const char *nodeName /* * CopyShardCommandList generates command list to copy the given shard placement - * from the source node to the target node. Caller could optionally skip copying - * the data by the flag includeDataCopy. + * from the source node to the target node. To do this it recreates the shard + * on the target, and then copies the data. Caller could optionally skip + * copying the data by the flag includeDataCopy. */ List * CopyShardCommandList(ShardInterval *shardInterval, const char *sourceNodeName, int32 sourceNodePort, bool includeDataCopy) +{ + List *copyShardToNodeCommandsList = RecreateShardDDLCommandList( + shardInterval, sourceNodeName, sourceNodePort); + if (includeDataCopy) + { + copyShardToNodeCommandsList = list_concat( + copyShardToNodeCommandsList, + CopyShardContentsCommandList(shardInterval, sourceNodeName, + sourceNodePort)); + } + return list_concat(copyShardToNodeCommandsList, + PostLoadShardCreationCommandList(shardInterval, sourceNodeName, + sourceNodePort)); +} + + +/* + * RecreateShardDDLCommandList generates a command list to recreate a shard, + * but without any data init and without the post-load table creation commands. + */ +static List * +RecreateShardDDLCommandList(ShardInterval *shardInterval, const char *sourceNodeName, + int32 sourceNodePort) { int64 shardId = shardInterval->shardId; - char *shardName = ConstructQualifiedShardName(shardInterval); - List *copyShardToNodeCommandsList = NIL; - StringInfo copyShardDataCommand = makeStringInfo(); Oid relationId = shardInterval->relationId; List *tableRecreationCommandList = RecreateTableDDLCommandList(relationId); - tableRecreationCommandList = - WorkerApplyShardDDLCommandList(tableRecreationCommandList, shardId); + return WorkerApplyShardDDLCommandList(tableRecreationCommandList, shardId); +} - copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList, - tableRecreationCommandList); - if (includeDataCopy) - { - appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, - quote_literal_cstr(shardName), /* table to append */ - quote_literal_cstr(shardName), /* remote table name */ - quote_literal_cstr(sourceNodeName), /* remote host */ - sourceNodePort); /* remote port */ +/* + * CopyShardContentsCommandList generates a command list to copy the data of the + * given shard placement from the source node to the target node. This copying + * requires a precreated table for the shard on the target node to have been + * created already (using RecreateShardDDLCommandList). + */ +static List * +CopyShardContentsCommandList(ShardInterval *shardInterval, const char *sourceNodeName, + int32 sourceNodePort) +{ + char *shardName = ConstructQualifiedShardName(shardInterval); + StringInfo copyShardDataCommand = makeStringInfo(); + appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, + quote_literal_cstr(shardName), /* table to append */ + quote_literal_cstr(shardName), /* remote table name */ + quote_literal_cstr(sourceNodeName), /* remote host */ + sourceNodePort); /* remote port */ - copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList, - copyShardDataCommand->data); - } + return list_make1(copyShardDataCommand->data); +} + +/* + * PostLoadShardCreationCommandList generates a command list to finalize the + * creation of a shard after the data has been loaded. This creates stuff like + * the indexes on the table. + */ +static List * +PostLoadShardCreationCommandList(ShardInterval *shardInterval, const char *sourceNodeName, + int32 sourceNodePort) +{ + int64 shardId = shardInterval->shardId; + Oid relationId = shardInterval->relationId; bool includeReplicaIdentity = true; List *indexCommandList = GetPostLoadTableCreationCommands(relationId, true, includeReplicaIdentity); - indexCommandList = WorkerApplyShardDDLCommandList(indexCommandList, shardId); - - copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList, - indexCommandList); - - return copyShardToNodeCommandsList; + return WorkerApplyShardDDLCommandList(indexCommandList, shardId); } diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out index a941d6d2b..32e508e0a 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer_progress.out +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -65,8 +65,8 @@ step s3-progress: table_name shardid shard_size sourcename sourceport source_shard_sizetargetname targetport target_shard_sizeprogress -colocated1 1500001 49152 localhost 57637 49152 localhost 57638 49152 2 -colocated2 1500005 376832 localhost 57637 376832 localhost 57638 376832 2 +colocated1 1500001 73728 localhost 57637 49152 localhost 57638 73728 2 +colocated2 1500005 401408 localhost 57637 376832 localhost 57638 401408 2 colocated1 1500002 196608 localhost 57637 196608 localhost 57638 0 1 colocated2 1500006 8192 localhost 57637 8192 localhost 57638 0 1 step s2-unlock-2: diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index ae2a60cb3..deb133c76 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -1688,6 +1688,7 @@ SELECT * FROM public.table_placements_per_node; 57638 | tab2 | 1 (4 rows) +VACUUM FULL tab, tab2; ANALYZE tab, tab2; \c - - - :worker_1_port SELECT table_schema, table_name, row_estimate, total_bytes diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index e91531018..66a36327e 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -926,6 +926,7 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_d SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes'); CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; +VACUUM FULL tab, tab2; ANALYZE tab, tab2; \c - - - :worker_1_port