diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 35ff2b848..c4f16f6b2 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -95,6 +95,15 @@ static void EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList, char *sourceNodeName, uint32 sourceNodePort, char *targetNodeName, uint32 targetNodePort); +static List * RecreateShardDDLCommandList(ShardInterval *shardInterval, + const char *sourceNodeName, + int32 sourceNodePort); +static List * CopyShardContentsCommandList(ShardInterval *shardInterval, + const char *sourceNodeName, + int32 sourceNodePort); +static List * PostLoadShardCreationCommandList(ShardInterval *shardInterval, + const char *sourceNodeName, + int32 sourceNodePort); /* declarations for dynamic loading */ @@ -932,12 +941,38 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, shardIntervalList) { - bool includeDataCopy = !PartitionedTable(shardInterval->relationId); - - List *ddlCommandList = CopyShardCommandList(shardInterval, sourceNodeName, - sourceNodePort, includeDataCopy); + /* + * For each shard we first create the shard table in a separate + * transaction and then we copy the data and create the indexes in a + * second separate transaction. The reason we don't do both in a single + * transaction is so we can see the size of the new shard growing + * during the copy when we run get_rebalance_progress in another + * session. If we wouldn't split these two phases up, then the table + * wouldn't be visible in the session that get_rebalance_progress uses. + * So get_rebalance_progress would always report its size as 0. + */ + List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, + sourceNodePort); char *tableOwner = TableOwner(shardInterval->relationId); + SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); + ddlCommandList = NIL; + + /* + * Skip copying data for partitioned tables, because they contain no + * data themselves. Their partitions do contain data, but those are + * different colocated shards that will be copied seperately. + */ + if (!PartitionedTable(shardInterval->relationId)) + { + ddlCommandList = CopyShardContentsCommandList(shardInterval, sourceNodeName, + sourceNodePort); + } + ddlCommandList = list_concat( + ddlCommandList, + PostLoadShardCreationCommandList(shardInterval, sourceNodeName, + sourceNodePort)); SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, ddlCommandList); @@ -1165,47 +1200,82 @@ SearchShardPlacementInListOrError(List *shardPlacementList, const char *nodeName /* * CopyShardCommandList generates command list to copy the given shard placement - * from the source node to the target node. Caller could optionally skip copying - * the data by the flag includeDataCopy. + * from the source node to the target node. To do this it recreates the shard + * on the target, and then copies the data. Caller could optionally skip + * copying the data by the flag includeDataCopy. */ List * CopyShardCommandList(ShardInterval *shardInterval, const char *sourceNodeName, int32 sourceNodePort, bool includeDataCopy) +{ + List *copyShardToNodeCommandsList = RecreateShardDDLCommandList( + shardInterval, sourceNodeName, sourceNodePort); + if (includeDataCopy) + { + copyShardToNodeCommandsList = list_concat( + copyShardToNodeCommandsList, + CopyShardContentsCommandList(shardInterval, sourceNodeName, + sourceNodePort)); + } + return list_concat(copyShardToNodeCommandsList, + PostLoadShardCreationCommandList(shardInterval, sourceNodeName, + sourceNodePort)); +} + + +/* + * RecreateShardDDLCommandList generates a command list to recreate a shard, + * but without any data init and without the post-load table creation commands. + */ +static List * +RecreateShardDDLCommandList(ShardInterval *shardInterval, const char *sourceNodeName, + int32 sourceNodePort) { int64 shardId = shardInterval->shardId; - char *shardName = ConstructQualifiedShardName(shardInterval); - List *copyShardToNodeCommandsList = NIL; - StringInfo copyShardDataCommand = makeStringInfo(); Oid relationId = shardInterval->relationId; List *tableRecreationCommandList = RecreateTableDDLCommandList(relationId); - tableRecreationCommandList = - WorkerApplyShardDDLCommandList(tableRecreationCommandList, shardId); + return WorkerApplyShardDDLCommandList(tableRecreationCommandList, shardId); +} - copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList, - tableRecreationCommandList); - if (includeDataCopy) - { - appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, - quote_literal_cstr(shardName), /* table to append */ - quote_literal_cstr(shardName), /* remote table name */ - quote_literal_cstr(sourceNodeName), /* remote host */ - sourceNodePort); /* remote port */ +/* + * CopyShardContentsCommandList generates a command list to copy the data of the + * given shard placement from the source node to the target node. This copying + * requires a precreated table for the shard on the target node to have been + * created already (using RecreateShardDDLCommandList). + */ +static List * +CopyShardContentsCommandList(ShardInterval *shardInterval, const char *sourceNodeName, + int32 sourceNodePort) +{ + char *shardName = ConstructQualifiedShardName(shardInterval); + StringInfo copyShardDataCommand = makeStringInfo(); + appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, + quote_literal_cstr(shardName), /* table to append */ + quote_literal_cstr(shardName), /* remote table name */ + quote_literal_cstr(sourceNodeName), /* remote host */ + sourceNodePort); /* remote port */ - copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList, - copyShardDataCommand->data); - } + return list_make1(copyShardDataCommand->data); +} + +/* + * PostLoadShardCreationCommandList generates a command list to finalize the + * creation of a shard after the data has been loaded. This creates stuff like + * the indexes on the table. + */ +static List * +PostLoadShardCreationCommandList(ShardInterval *shardInterval, const char *sourceNodeName, + int32 sourceNodePort) +{ + int64 shardId = shardInterval->shardId; + Oid relationId = shardInterval->relationId; bool includeReplicaIdentity = true; List *indexCommandList = GetPostLoadTableCreationCommands(relationId, true, includeReplicaIdentity); - indexCommandList = WorkerApplyShardDDLCommandList(indexCommandList, shardId); - - copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList, - indexCommandList); - - return copyShardToNodeCommandsList; + return WorkerApplyShardDDLCommandList(indexCommandList, shardId); } diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out index a941d6d2b..32e508e0a 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer_progress.out +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -65,8 +65,8 @@ step s3-progress: table_name shardid shard_size sourcename sourceport source_shard_sizetargetname targetport target_shard_sizeprogress -colocated1 1500001 49152 localhost 57637 49152 localhost 57638 49152 2 -colocated2 1500005 376832 localhost 57637 376832 localhost 57638 376832 2 +colocated1 1500001 73728 localhost 57637 49152 localhost 57638 73728 2 +colocated2 1500005 401408 localhost 57637 376832 localhost 57638 401408 2 colocated1 1500002 196608 localhost 57637 196608 localhost 57638 0 1 colocated2 1500006 8192 localhost 57637 8192 localhost 57638 0 1 step s2-unlock-2: diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index ae2a60cb3..deb133c76 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -1688,6 +1688,7 @@ SELECT * FROM public.table_placements_per_node; 57638 | tab2 | 1 (4 rows) +VACUUM FULL tab, tab2; ANALYZE tab, tab2; \c - - - :worker_1_port SELECT table_schema, table_name, row_estimate, total_bytes diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index e91531018..66a36327e 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -926,6 +926,7 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_d SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes'); CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; +VACUUM FULL tab, tab2; ANALYZE tab, tab2; \c - - - :worker_1_port