When moving a shard to a new node ensure there is enough space (#4929)

* When moving a shard to a new node ensure there is enough space

* Add WairForMiliseconds time utility

* Add more tests and increase readability

* Remove the retry loop and use a single udf for disk stats

* Address review

* address review

Co-authored-by: Jelte Fennema <github-tech@jeltef.nl>
pull/4966/head
SaitTalhaNisanci 2021-05-06 17:28:02 +03:00 committed by GitHub
parent ff4098724a
commit 6b1904d37a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 523 additions and 51 deletions

View File

@ -10,6 +10,8 @@
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include <sys/statvfs.h>
#include "postgres.h" #include "postgres.h"
#include "funcapi.h" #include "funcapi.h"
#include "libpq-fe.h" #include "libpq-fe.h"
@ -65,6 +67,7 @@
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/syscache.h" #include "utils/syscache.h"
#define DISK_SPACE_FIELDS 2
/* Local functions forward declarations */ /* Local functions forward declarations */
static uint64 * AllocateUint64(uint64 value); static uint64 * AllocateUint64(uint64 value);
@ -98,12 +101,137 @@ static void AppendShardSizeMinMaxQuery(StringInfo selectQuery, uint64 shardId,
static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval, static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval,
char *quotedShardName); char *quotedShardName);
static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes,
uint64 totalBytes);
static bool GetLocalDiskSpaceStats(uint64 *availableBytes, uint64 *totalBytes);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(citus_local_disk_space_stats);
PG_FUNCTION_INFO_V1(citus_table_size); PG_FUNCTION_INFO_V1(citus_table_size);
PG_FUNCTION_INFO_V1(citus_total_relation_size); PG_FUNCTION_INFO_V1(citus_total_relation_size);
PG_FUNCTION_INFO_V1(citus_relation_size); PG_FUNCTION_INFO_V1(citus_relation_size);
PG_FUNCTION_INFO_V1(citus_shard_sizes); PG_FUNCTION_INFO_V1(citus_shard_sizes);
/*
* CreateDiskSpaceTuple creates a tuple that is used as the return value
* for citus_local_disk_space_stats.
*/
static HeapTuple
CreateDiskSpaceTuple(TupleDesc tupleDescriptor, uint64 availableBytes, uint64 totalBytes)
{
Datum values[DISK_SPACE_FIELDS];
bool isNulls[DISK_SPACE_FIELDS];
/* form heap tuple for remote disk space statistics */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[0] = UInt64GetDatum(availableBytes);
values[1] = UInt64GetDatum(totalBytes);
HeapTuple diskSpaceTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
return diskSpaceTuple;
}
/*
* citus_local_disk_space_stats returns total disk space and available disk
* space for the disk that contains PGDATA.
*/
Datum
citus_local_disk_space_stats(PG_FUNCTION_ARGS)
{
uint64 availableBytes = 0;
uint64 totalBytes = 0;
if (!GetLocalDiskSpaceStats(&availableBytes, &totalBytes))
{
ereport(WARNING, (errmsg("could not get disk space")));
}
TupleDesc tupleDescriptor = NULL;
TypeFuncClass resultTypeClass = get_call_result_type(fcinfo, NULL,
&tupleDescriptor);
if (resultTypeClass != TYPEFUNC_COMPOSITE)
{
ereport(ERROR, (errmsg("return type must be a row type")));
}
HeapTuple diskSpaceTuple = CreateDiskSpaceTuple(tupleDescriptor, availableBytes,
totalBytes);
PG_RETURN_DATUM(HeapTupleGetDatum(diskSpaceTuple));
}
/*
* GetLocalDiskSpaceStats returns total and available disk space for the disk containing
* PGDATA (not considering tablespaces, quota).
*/
static bool
GetLocalDiskSpaceStats(uint64 *availableBytes, uint64 *totalBytes)
{
struct statvfs buffer;
if (statvfs(DataDir, &buffer) != 0)
{
return false;
}
/*
* f_bfree: number of free blocks
* f_frsize: fragment size, same as f_bsize usually
* f_blocks: Size of fs in f_frsize units
*/
*availableBytes = buffer.f_bfree * buffer.f_frsize;
*totalBytes = buffer.f_blocks * buffer.f_frsize;
return true;
}
/*
* GetNodeDiskSpaceStatsForConnection fetches the disk space statistics for the node
* that is on the given connection, or returns false if unsuccessful.
*/
bool
GetNodeDiskSpaceStatsForConnection(MultiConnection *connection, uint64 *availableBytes,
uint64 *totalBytes)
{
PGresult *result = NULL;
char *sizeQuery = "SELECT available_disk_size, total_disk_size "
"FROM pg_catalog.citus_local_disk_space_stats()";
int queryResult = ExecuteOptionalRemoteCommand(connection, sizeQuery, &result);
if (queryResult != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1)
{
ereport(WARNING, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot get the disk space statistics for node %s:%d",
connection->hostname, connection->port)));
PQclear(result);
ForgetResults(connection);
return false;
}
char *availableBytesString = PQgetvalue(result, 0, 0);
char *totalBytesString = PQgetvalue(result, 0, 1);
*availableBytes = SafeStringToUint64(availableBytesString);
*totalBytes = SafeStringToUint64(totalBytesString);
PQclear(result);
ForgetResults(connection);
return true;
}
/* /*
* citus_shard_sizes returns all shard names and their sizes. * citus_shard_sizes returns all shard names and their sizes.
*/ */

View File

@ -15,6 +15,7 @@
#include "miscadmin.h" #include "miscadmin.h"
#include <string.h> #include <string.h>
#include <sys/statvfs.h>
#include "access/htup_details.h" #include "access/htup_details.h"
#include "catalog/pg_class.h" #include "catalog/pg_class.h"
@ -27,11 +28,13 @@
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/shard_cleaner.h" #include "distributed/shard_cleaner.h"
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
#include "distributed/repair_shards.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
@ -83,6 +86,12 @@ static void UpdateColocatedShardPlacementMetadataOnWorkers(int64 shardId,
int32 sourceNodePort, int32 sourceNodePort,
char *targetNodeName, char *targetNodeName,
int32 targetNodePort); int32 targetNodePort);
static void CheckSpaceConstraints(MultiConnection *connection,
uint64 colocationSizeInBytes);
static void EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList,
char *sourceNodeName, uint32 sourceNodePort,
char *targetNodeName, uint32
targetNodePort);
/* declarations for dynamic loading */ /* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(citus_copy_shard_placement); PG_FUNCTION_INFO_V1(citus_copy_shard_placement);
@ -90,9 +99,11 @@ PG_FUNCTION_INFO_V1(master_copy_shard_placement);
PG_FUNCTION_INFO_V1(citus_move_shard_placement); PG_FUNCTION_INFO_V1(citus_move_shard_placement);
PG_FUNCTION_INFO_V1(master_move_shard_placement); PG_FUNCTION_INFO_V1(master_move_shard_placement);
bool DeferShardDeleteOnMove = false; bool DeferShardDeleteOnMove = false;
double DesiredPercentFreeAfterMove = 10;
bool CheckAvailableSpaceBeforeMove = true;
/* /*
* citus_copy_shard_placement implements a user-facing UDF to repair data from * citus_copy_shard_placement implements a user-facing UDF to repair data from
@ -158,6 +169,98 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
} }
/*
* ShardListSizeInBytes returns the size in bytes of a set of shard tables.
*/
uint64
ShardListSizeInBytes(List *shardList, char *workerNodeName, uint32
workerNodePort)
{
uint32 connectionFlag = 0;
/* we skip child tables of a partitioned table if this boolean variable is true */
bool optimizePartitionCalculations = true;
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(shardList,
TOTAL_RELATION_SIZE,
optimizePartitionCalculations);
MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName,
workerNodePort);
PGresult *result = NULL;
int queryResult = ExecuteOptionalRemoteCommand(connection, tableSizeQuery->data,
&result);
if (queryResult != RESPONSE_OKAY)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot get the size because of a connection error")));
}
List *sizeList = ReadFirstColumnAsText(result);
if (list_length(sizeList) != 1)
{
ereport(ERROR, (errmsg(
"received wrong number of rows from worker, expected 1 received %d",
list_length(sizeList))));
}
StringInfo totalSizeStringInfo = (StringInfo) linitial(sizeList);
char *totalSizeString = totalSizeStringInfo->data;
uint64 totalSize = SafeStringToUint64(totalSizeString);
PQclear(result);
ForgetResults(connection);
return totalSize;
}
/*
* CheckSpaceConstraints checks there is enough space to place the colocation
* on the node that the connection is connected to.
*/
static void
CheckSpaceConstraints(MultiConnection *connection, uint64 colocationSizeInBytes)
{
uint64 diskAvailableInBytes = 0;
uint64 diskSizeInBytes = 0;
bool success =
GetNodeDiskSpaceStatsForConnection(connection, &diskAvailableInBytes,
&diskSizeInBytes);
if (!success)
{
ereport(ERROR, (errmsg("Could not fetch disk stats for node: %s-%d",
connection->hostname, connection->port)));
}
uint64 diskAvailableInBytesAfterShardMove = 0;
if (diskAvailableInBytes < colocationSizeInBytes)
{
/*
* even though the space will be less than "0", we set it to 0 for convenience.
*/
diskAvailableInBytes = 0;
}
else
{
diskAvailableInBytesAfterShardMove = diskAvailableInBytes - colocationSizeInBytes;
}
uint64 desiredNewDiskAvailableInBytes = diskSizeInBytes *
(DesiredPercentFreeAfterMove / 100);
if (diskAvailableInBytesAfterShardMove < desiredNewDiskAvailableInBytes)
{
ereport(ERROR, (errmsg("not enough empty space on node if the shard is moved, "
"actual available space after move will be %ld bytes, "
"desired available space after move is %ld bytes,"
"estimated size increase on node after move is %ld bytes.",
diskAvailableInBytesAfterShardMove,
desiredNewDiskAvailableInBytes, colocationSizeInBytes),
errhint(
"consider lowering citus.desired_percent_disk_available_after_move.")));
}
}
/* /*
* citus_move_shard_placement moves given shard (and its co-located shards) from one * citus_move_shard_placement moves given shard (and its co-located shards) from one
* node to the other node. To accomplish this it entirely recreates the table structure * node to the other node. To accomplish this it entirely recreates the table structure
@ -247,6 +350,9 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
"unsupported"))); "unsupported")));
} }
EnsureEnoughDiskSpaceForShardMove(colocatedShardList, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort);
BlockWritesToShardList(colocatedShardList); BlockWritesToShardList(colocatedShardList);
/* /*
@ -287,7 +393,32 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
/* /*
* master_move_shard_placement is a wrapper function for old UDF name. * EnsureEnoughDiskSpaceForShardMove checks that there is enough space for
* shard moves of the given colocated shard list from source node to target node.
* It tries to clean up old shard placements to ensure there is enough space.
*/
static void
EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList,
char *sourceNodeName, uint32 sourceNodePort,
char *targetNodeName, uint32 targetNodePort)
{
if (!CheckAvailableSpaceBeforeMove)
{
return;
}
uint64 colocationSizeInBytes = ShardListSizeInBytes(colocatedShardList,
sourceNodeName,
sourceNodePort);
uint32 connectionFlag = 0;
MultiConnection *connection = GetNodeConnection(connectionFlag, targetNodeName,
targetNodePort);
CheckSpaceConstraints(connection, colocationSizeInBytes);
}
/*
* master_move_shard_placement is a wrapper around citus_move_shard_placement.
*/ */
Datum Datum
master_move_shard_placement(PG_FUNCTION_ARGS) master_move_shard_placement(PG_FUNCTION_ARGS)

View File

@ -23,8 +23,6 @@
/* declarations for dynamic loading */ /* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(master_defer_delete_shards); PG_FUNCTION_INFO_V1(master_defer_delete_shards);
static int DropMarkedShards(bool waitForCleanupLock);
static bool TryDropShard(GroupShardPlacement *placement); static bool TryDropShard(GroupShardPlacement *placement);
/* /*
@ -55,13 +53,15 @@ master_defer_delete_shards(PG_FUNCTION_ARGS)
/* /*
* TryDropMarkedShards is a wrapper around DropMarkedShards that catches * TryDropMarkedShards is a wrapper around DropMarkedShards that catches
* any errors to make it safe to use in the maintenance daemon. * any errors to make it safe to use in the maintenance daemon.
*
* If dropping any of the shards failed this function returns -1, otherwise it
* returns the number of dropped shards.
*/ */
int int
TryDropMarkedShards(bool waitForCleanupLock) TryDropMarkedShards(bool waitForCleanupLock)
{ {
int droppedShardCount = 0; int droppedShardCount = 0;
MemoryContext savedContext = CurrentMemoryContext; MemoryContext savedContext = CurrentMemoryContext;
PG_TRY(); PG_TRY();
{ {
droppedShardCount = DropMarkedShards(waitForCleanupLock); droppedShardCount = DropMarkedShards(waitForCleanupLock);
@ -95,9 +95,10 @@ TryDropMarkedShards(bool waitForCleanupLock)
* This is to ensure that this function is not being run concurrently. * This is to ensure that this function is not being run concurrently.
* Otherwise really bad race conditions are possible, such as removing all * Otherwise really bad race conditions are possible, such as removing all
* placements of a shard. waitForCleanupLock indicates if this function should * placements of a shard. waitForCleanupLock indicates if this function should
* wait for this lock or returns with a warning. * wait for this lock or error out.
*
*/ */
static int int
DropMarkedShards(bool waitForCleanupLock) DropMarkedShards(bool waitForCleanupLock)
{ {
int removedShardCount = 0; int removedShardCount = 0;
@ -105,7 +106,7 @@ DropMarkedShards(bool waitForCleanupLock)
if (!IsCoordinator()) if (!IsCoordinator())
{ {
return removedShardCount; return 0;
} }
if (waitForCleanupLock) if (waitForCleanupLock)

View File

@ -40,6 +40,7 @@
#include "distributed/pg_dist_rebalance_strategy.h" #include "distributed/pg_dist_rebalance_strategy.h"
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/repair_shards.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shard_rebalancer.h" #include "distributed/shard_rebalancer.h"
#include "distributed/tuplestore.h" #include "distributed/tuplestore.h"
@ -444,14 +445,6 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS)
uint64 shardId = PG_GETARG_INT64(0); uint64 shardId = PG_GETARG_INT64(0);
bool missingOk = false; bool missingOk = false;
ShardPlacement *shardPlacement = ActiveShardPlacement(shardId, missingOk); ShardPlacement *shardPlacement = ActiveShardPlacement(shardId, missingOk);
char *workerNodeName = shardPlacement->nodeName;
uint32 workerNodePort = shardPlacement->nodePort;
uint32 connectionFlag = 0;
PGresult *result = NULL;
bool raiseErrors = true;
/* we skip child tables of a partitioned table if this boolean variable is true */
bool optimizePartitionCalculations = true;
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CostByDiscSizeContext", "CostByDiscSizeContext",
@ -459,47 +452,20 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS)
MemoryContext oldContext = MemoryContextSwitchTo(localContext); MemoryContext oldContext = MemoryContextSwitchTo(localContext);
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);
List *colocatedShardList = ColocatedNonPartitionShardIntervalList(shardInterval); List *colocatedShardList = ColocatedNonPartitionShardIntervalList(shardInterval);
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(colocatedShardList,
TOTAL_RELATION_SIZE,
optimizePartitionCalculations);
MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName, uint64 colocationSizeInBytes = ShardListSizeInBytes(colocatedShardList,
workerNodePort); shardPlacement->nodeName,
int queryResult = ExecuteOptionalRemoteCommand(connection, tableSizeQuery->data, shardPlacement->nodePort);
&result);
if (queryResult != RESPONSE_OKAY)
{
MemoryContextSwitchTo(oldContext);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot get the size because of a connection error")));
}
List *sizeList = ReadFirstColumnAsText(result);
if (list_length(sizeList) != 1)
{
ereport(ERROR, (errmsg(
"received wrong number of rows from worker, expected 1 received %d",
list_length(sizeList))));
}
StringInfo tableSizeStringInfo = (StringInfo) linitial(sizeList);
char *tableSizeString = tableSizeStringInfo->data;
uint64 tableSize = SafeStringToUint64(tableSizeString);
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
MemoryContextReset(localContext); MemoryContextReset(localContext);
PQclear(result); if (colocationSizeInBytes <= 0)
ClearResults(connection, raiseErrors);
if (tableSize <= 0)
{ {
PG_RETURN_FLOAT4(1); PG_RETURN_FLOAT4(1);
} }
PG_RETURN_FLOAT4(tableSize); PG_RETURN_FLOAT4(colocationSizeInBytes);
} }

View File

@ -60,6 +60,7 @@
#include "distributed/recursive_planning.h" #include "distributed/recursive_planning.h"
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h" #include "distributed/relation_access_tracking.h"
#include "distributed/repair_shards.h"
#include "distributed/run_from_same_connection.h" #include "distributed/run_from_same_connection.h"
#include "distributed/shard_cleaner.h" #include "distributed/shard_cleaner.h"
#include "distributed/shared_connection_stats.h" #include "distributed/shared_connection_stats.h"
@ -935,9 +936,34 @@ RegisterCitusConfigVariables(void)
&ExplainAnalyzeSortMethod, &ExplainAnalyzeSortMethod,
EXPLAIN_ANALYZE_SORT_BY_TIME, explain_analyze_sort_method_options, EXPLAIN_ANALYZE_SORT_BY_TIME, explain_analyze_sort_method_options,
PGC_USERSET, PGC_USERSET,
0,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.check_available_space_before_move",
gettext_noop("When enabled will check free disk space before a shard move"),
gettext_noop(
"Free disk space will be checked when this setting is enabled before each shard move."),
&CheckAvailableSpaceBeforeMove,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL, GUC_NO_SHOW_ALL,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomRealVariable(
"citus.desired_percent_disk_available_after_move",
gettext_noop(
"Sets how many percentage of free disk space should be after a shard move"),
gettext_noop(
"This setting controls how much free space should be available after a shard move."
"If the free disk space will be lower than this parameter, then shard move will result in"
"an error."),
&DesiredPercentFreeAfterMove,
10.0, 0.0, 100.0,
PGC_SIGHUP,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.explain_distributed_queries", "citus.explain_distributed_queries",
gettext_noop("Enables Explain for distributed queries."), gettext_noop("Enables Explain for distributed queries."),

View File

@ -6,3 +6,4 @@
#include "udfs/worker_partitioned_relation_size/10.1-1.sql" #include "udfs/worker_partitioned_relation_size/10.1-1.sql"
#include "udfs/worker_partitioned_table_size/10.1-1.sql" #include "udfs/worker_partitioned_table_size/10.1-1.sql"
#include "udfs/citus_finish_pg_upgrade/10.1-1.sql" #include "udfs/citus_finish_pg_upgrade/10.1-1.sql"
#include "udfs/citus_local_disk_space_stats/10.1-1.sql"

View File

@ -19,5 +19,6 @@ COMMENT ON FUNCTION create_distributed_table(table_name regclass,
DROP FUNCTION pg_catalog.worker_partitioned_relation_total_size(regclass); DROP FUNCTION pg_catalog.worker_partitioned_relation_total_size(regclass);
DROP FUNCTION pg_catalog.worker_partitioned_relation_size(regclass); DROP FUNCTION pg_catalog.worker_partitioned_relation_size(regclass);
DROP FUNCTION pg_catalog.worker_partitioned_table_size(regclass); DROP FUNCTION pg_catalog.worker_partitioned_table_size(regclass);
DROP FUNCTION pg_catalog.citus_local_disk_space_stats();
#include "../udfs/citus_finish_pg_upgrade/10.0-1.sql" #include "../udfs/citus_finish_pg_upgrade/10.0-1.sql"

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_local_disk_space_stats(
OUT available_disk_size bigint,
OUT total_disk_size bigint)
RETURNS record
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_local_disk_space_stats$$;
COMMENT ON FUNCTION pg_catalog.citus_local_disk_space_stats()
IS 'returns statistics on available disk space on the local node';

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_local_disk_space_stats(
OUT available_disk_size bigint,
OUT total_disk_size bigint)
RETURNS record
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_local_disk_space_stats$$;
COMMENT ON FUNCTION pg_catalog.citus_local_disk_space_stats()
IS 'returns statistics on available disk space on the local node';

View File

@ -22,6 +22,7 @@
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "catalog/objectaddress.h" #include "catalog/objectaddress.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/connection_management.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "utils/acl.h" #include "utils/acl.h"
#include "utils/relcache.h" #include "utils/relcache.h"
@ -287,5 +288,7 @@ extern void GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn,
extern List * SendShardStatisticsQueriesInParallel(List *citusTableIds, bool extern List * SendShardStatisticsQueriesInParallel(List *citusTableIds, bool
useDistributedTransaction, bool useDistributedTransaction, bool
useShardMinMaxQuery); useShardMinMaxQuery);
extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection,
uint64 *availableBytes,
uint64 *totalBytes);
#endif /* METADATA_UTILITY_H */ #endif /* METADATA_UTILITY_H */

View File

@ -0,0 +1,15 @@
/*-------------------------------------------------------------------------
*
* repair_shards.h
* Code used to move shards around.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "nodes/pg_list.h"
extern uint64 ShardListSizeInBytes(List *colocatedShardList,
char *workerNodeName, uint32 workerNodePort);

View File

@ -14,7 +14,10 @@
/* GUC to configure deferred shard deletion */ /* GUC to configure deferred shard deletion */
extern int DeferShardDeleteInterval; extern int DeferShardDeleteInterval;
extern bool DeferShardDeleteOnMove; extern bool DeferShardDeleteOnMove;
extern double DesiredPercentFreeAfterMove;
extern bool CheckAvailableSpaceBeforeMove;
extern int TryDropMarkedShards(bool waitForCleanupLock); extern int TryDropMarkedShards(bool waitForCleanupLock);
extern int DropMarkedShards(bool waitForCleanupLock);
#endif /*CITUS_SHARD_CLEANER_H */ #endif /*CITUS_SHARD_CLEANER_H */

View File

@ -564,11 +564,12 @@ SELECT * FROM print_extension_changes();
--------------------------------------------------------------------- ---------------------------------------------------------------------
function citus_internal.columnar_ensure_objects_exist() | function citus_internal.columnar_ensure_objects_exist() |
function create_distributed_table(regclass,text,citus.distribution_type,text) | function create_distributed_table(regclass,text,citus.distribution_type,text) |
| function citus_local_disk_space_stats()
| function create_distributed_table(regclass,text,citus.distribution_type,text,integer) | function create_distributed_table(regclass,text,citus.distribution_type,text,integer)
| function worker_partitioned_relation_size(regclass) | function worker_partitioned_relation_size(regclass)
| function worker_partitioned_relation_total_size(regclass) | function worker_partitioned_relation_total_size(regclass)
| function worker_partitioned_table_size(regclass) | function worker_partitioned_table_size(regclass)
(6 rows) (7 rows)
DROP TABLE prev_objects, extension_diff; DROP TABLE prev_objects, extension_diff;
-- show running version -- show running version

View File

@ -24,6 +24,15 @@ $cmd$);
(localhost,57638,t,0) (localhost,57638,t,0)
(2 rows) (2 rows)
SELECT run_command_on_workers($cmd$
SELECT count(*) FROM pg_class WHERE relname = 't1_20000001';
$cmd$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,0)
(localhost,57638,t,1)
(2 rows)
-- move shard -- move shard
SELECT master_move_shard_placement(20000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port); SELECT master_move_shard_placement(20000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
master_move_shard_placement master_move_shard_placement
@ -107,5 +116,105 @@ SELECT pg_reload_conf();
t t
(1 row) (1 row)
-- move shard
SELECT master_move_shard_placement(20000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- we expect shard xxxxx to be on both workers now
SELECT run_command_on_workers($cmd$
SELECT count(*) FROM pg_class WHERE relname = 't1_20000000';
$cmd$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(2 rows)
SELECT run_command_on_workers($cmd$
-- override the function for testing purpose
create or replace function pg_catalog.citus_local_disk_space_stats(OUT available_disk_size bigint, OUT total_disk_size bigint)
as $BODY$
begin
select 20 into available_disk_size;
select 8500 into total_disk_size;
end
$BODY$ language plpgsql;
$cmd$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"CREATE FUNCTION")
(localhost,57638,t,"CREATE FUNCTION")
(2 rows)
SELECT citus_shard_cost_by_disk_size(20000001);
citus_shard_cost_by_disk_size
---------------------------------------------------------------------
8192
(1 row)
-- When there's not enough space the move should fail
SELECT master_move_shard_placement(20000001, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
ERROR: not enough empty space on node if the shard is moved, actual available space after move will be 0 bytes, desired available space after move is 850 bytes,estimated size increase on node after move is 8192 bytes.
HINT: consider lowering citus.desired_percent_disk_available_after_move.
BEGIN;
-- when we disable the setting, the move should not give "not enough space" error
set citus.check_available_space_before_move to false;
SELECT master_move_shard_placement(20000001, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
ROLLBACK;
-- we expect shard xxxxx to be on both of the workers
SELECT run_command_on_workers($cmd$
SELECT count(*) FROM pg_class WHERE relname = 't1_20000000';
$cmd$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(2 rows)
SELECT run_command_on_workers($cmd$
-- override the function for testing purpose
create or replace function pg_catalog.citus_local_disk_space_stats(OUT available_disk_size bigint, OUT total_disk_size bigint)
as $BODY$
begin
select 8300 into available_disk_size;
select 8500 into total_disk_size;
end
$BODY$ language plpgsql;
$cmd$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"CREATE FUNCTION")
(localhost,57638,t,"CREATE FUNCTION")
(2 rows)
-- When there would not be enough free space left after the move, the move should fail
SELECT master_move_shard_placement(20000001, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
ERROR: not enough empty space on node if the shard is moved, actual available space after move will be 108 bytes, desired available space after move is 850 bytes,estimated size increase on node after move is 8192 bytes.
HINT: consider lowering citus.desired_percent_disk_available_after_move.
-- Restore the original function
SELECT run_command_on_workers($cmd$
CREATE OR REPLACE FUNCTION pg_catalog.citus_local_disk_space_stats(
OUT available_disk_size bigint,
OUT total_disk_size bigint)
RETURNS record
LANGUAGE C STRICT
AS 'citus', $$citus_local_disk_space_stats$$;
COMMENT ON FUNCTION pg_catalog.citus_local_disk_space_stats()
IS 'returns statistics on available disk space on the local node';
$cmd$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"CREATE FUNCTION")
(localhost,57638,t,"CREATE FUNCTION")
(2 rows)
DROP SCHEMA shard_move_deferred_delete CASCADE; DROP SCHEMA shard_move_deferred_delete CASCADE;
NOTICE: drop cascades to table t1 NOTICE: drop cascades to table t1

View File

@ -69,6 +69,7 @@ ORDER BY 1;
function citus_json_concatenate_final(json) function citus_json_concatenate_final(json)
function citus_jsonb_concatenate(jsonb,jsonb) function citus_jsonb_concatenate(jsonb,jsonb)
function citus_jsonb_concatenate_final(jsonb) function citus_jsonb_concatenate_final(jsonb)
function citus_local_disk_space_stats()
function citus_move_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode) function citus_move_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode)
function citus_node_capacity_1(integer) function citus_node_capacity_1(integer)
function citus_prepare_pg_upgrade() function citus_prepare_pg_upgrade()
@ -245,5 +246,5 @@ ORDER BY 1;
view citus_worker_stat_activity view citus_worker_stat_activity
view pg_dist_shard_placement view pg_dist_shard_placement
view time_partitions view time_partitions
(229 rows) (230 rows)

View File

@ -19,6 +19,10 @@ SELECT run_command_on_workers($cmd$
SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; SELECT count(*) FROM pg_class WHERE relname = 't1_20000000';
$cmd$); $cmd$);
SELECT run_command_on_workers($cmd$
SELECT count(*) FROM pg_class WHERE relname = 't1_20000001';
$cmd$);
-- move shard -- move shard
SELECT master_move_shard_placement(20000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port); SELECT master_move_shard_placement(20000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
@ -58,4 +62,70 @@ $cmd$);
ALTER SYSTEM SET citus.defer_shard_delete_interval TO -1; ALTER SYSTEM SET citus.defer_shard_delete_interval TO -1;
SELECT pg_reload_conf(); SELECT pg_reload_conf();
-- move shard
SELECT master_move_shard_placement(20000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
-- we expect shard 0 to be on both workers now
SELECT run_command_on_workers($cmd$
SELECT count(*) FROM pg_class WHERE relname = 't1_20000000';
$cmd$);
SELECT run_command_on_workers($cmd$
-- override the function for testing purpose
create or replace function pg_catalog.citus_local_disk_space_stats(OUT available_disk_size bigint, OUT total_disk_size bigint)
as $BODY$
begin
select 20 into available_disk_size;
select 8500 into total_disk_size;
end
$BODY$ language plpgsql;
$cmd$);
SELECT citus_shard_cost_by_disk_size(20000001);
-- When there's not enough space the move should fail
SELECT master_move_shard_placement(20000001, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
BEGIN;
-- when we disable the setting, the move should not give "not enough space" error
set citus.check_available_space_before_move to false;
SELECT master_move_shard_placement(20000001, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
ROLLBACK;
-- we expect shard 0 to be on both of the workers
SELECT run_command_on_workers($cmd$
SELECT count(*) FROM pg_class WHERE relname = 't1_20000000';
$cmd$);
SELECT run_command_on_workers($cmd$
-- override the function for testing purpose
create or replace function pg_catalog.citus_local_disk_space_stats(OUT available_disk_size bigint, OUT total_disk_size bigint)
as $BODY$
begin
select 8300 into available_disk_size;
select 8500 into total_disk_size;
end
$BODY$ language plpgsql;
$cmd$);
-- When there would not be enough free space left after the move, the move should fail
SELECT master_move_shard_placement(20000001, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
-- Restore the original function
SELECT run_command_on_workers($cmd$
CREATE OR REPLACE FUNCTION pg_catalog.citus_local_disk_space_stats(
OUT available_disk_size bigint,
OUT total_disk_size bigint)
RETURNS record
LANGUAGE C STRICT
AS 'citus', $$citus_local_disk_space_stats$$;
COMMENT ON FUNCTION pg_catalog.citus_local_disk_space_stats()
IS 'returns statistics on available disk space on the local node';
$cmd$);
DROP SCHEMA shard_move_deferred_delete CASCADE; DROP SCHEMA shard_move_deferred_delete CASCADE;