Merge pull request #5024 from citusdata/cleanup-old-shards-before-rebalance

pull/5020/head
Jelte Fennema 2021-06-04 14:37:22 +02:00 committed by GitHub
commit c113cb3198
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 407 additions and 407 deletions

View File

@ -1062,13 +1062,30 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo
if (targetPlacement != NULL) if (targetPlacement != NULL)
{ {
if (targetPlacement->shardState == SHARD_STATE_TO_DELETE) if (targetPlacement->shardState == SHARD_STATE_TO_DELETE)
{
/*
* Trigger deletion of orphaned shards and hope that this removes
* the shard.
*/
DropOrphanedShardsInSeparateTransaction();
shardPlacementList = ShardPlacementList(shardId);
targetPlacement = SearchShardPlacementInList(shardPlacementList,
targetNodeName,
targetNodePort);
/*
* If it still doesn't remove the shard, then we error.
*/
if (targetPlacement != NULL)
{ {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg( errmsg(
"shard " INT64_FORMAT " already exists in the target node", "shard " INT64_FORMAT
" still exists on the target node as an orphaned shard",
shardId), shardId),
errdetail( errdetail(
"The existing shard is marked for deletion, but could not be deleted because there are still active queries on it"))); "The existing shard is orphaned, but could not be deleted because there are still active queries on it")));
}
} }
else else
{ {

View File

@ -12,60 +12,109 @@
#include "postgres.h" #include "postgres.h"
#include "access/xact.h"
#include "postmaster/postmaster.h"
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/shard_cleaner.h" #include "distributed/shard_cleaner.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
/* declarations for dynamic loading */ /* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(master_defer_delete_shards); PG_FUNCTION_INFO_V1(citus_cleanup_orphaned_shards);
PG_FUNCTION_INFO_V1(isolation_cleanup_orphaned_shards);
static bool TryDropShard(GroupShardPlacement *placement); static bool TryDropShard(GroupShardPlacement *placement);
static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode); static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode);
/* /*
* master_defer_delete_shards implements a user-facing UDF to deleter orphaned shards that * citus_cleanup_orphaned_shards implements a user-facing UDF to delete
* are still haning around in the system. These shards are orphaned by previous actions * orphaned shards that are still haning around in the system. These shards are
* that were not directly able to delete the placements eg. shard moving or dropping of a * orphaned by previous actions that were not directly able to delete the
* distributed table while one of the data nodes was not online. * placements eg. shard moving or dropping of a distributed table while one of
* the data nodes was not online.
* *
* This function iterates through placements where shardstate is SHARD_STATE_TO_DELETE * This function iterates through placements where shardstate is
* (shardstate = 4), drops the corresponding tables from the node and removes the * SHARD_STATE_TO_DELETE (shardstate = 4), drops the corresponding tables from
* placement information from the catalog. * the node and removes the placement information from the catalog.
* *
* The function takes no arguments and runs cluster wide * The function takes no arguments and runs cluster wide. It cannot be run in a
* transaction, because holding the locks it takes for a long time is not good.
* While the locks are held, it is impossible for the background daemon to
* cleanup orphaned shards.
*/ */
Datum Datum
master_defer_delete_shards(PG_FUNCTION_ARGS) citus_cleanup_orphaned_shards(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
PreventInTransactionBlock(true, "citus_cleanup_orphaned_shards");
bool waitForLocks = true;
int droppedShardCount = DropOrphanedShards(waitForLocks);
if (droppedShardCount > 0)
{
ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount)));
}
PG_RETURN_VOID();
}
/*
* isolation_cleanup_orphaned_shards implements a test UDF that's the same as
* citus_cleanup_orphaned_shards. The only difference is that this command can
* be run in transactions, this is to test
*/
Datum
isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS)
{ {
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
EnsureCoordinator(); EnsureCoordinator();
bool waitForLocks = true; bool waitForLocks = true;
int droppedShardCount = DropMarkedShards(waitForLocks); int droppedShardCount = DropOrphanedShards(waitForLocks);
if (droppedShardCount > 0)
{
ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount)));
}
PG_RETURN_INT32(droppedShardCount); PG_RETURN_VOID();
} }
/* /*
* TryDropMarkedShards is a wrapper around DropMarkedShards that catches * DropOrphanedShardsInSeparateTransaction cleans up orphaned shards by
* connecting to localhost. This is done, so that the locks that
* DropOrphanedShards takes are only held for a short time.
*/
void
DropOrphanedShardsInSeparateTransaction(void)
{
ExecuteCriticalCommandInSeparateTransaction("CALL citus_cleanup_orphaned_shards()");
}
/*
* TryDropOrphanedShards is a wrapper around DropOrphanedShards that catches
* any errors to make it safe to use in the maintenance daemon. * any errors to make it safe to use in the maintenance daemon.
* *
* If dropping any of the shards failed this function returns -1, otherwise it * If dropping any of the shards failed this function returns -1, otherwise it
* returns the number of dropped shards. * returns the number of dropped shards.
*/ */
int int
TryDropMarkedShards(bool waitForLocks) TryDropOrphanedShards(bool waitForLocks)
{ {
int droppedShardCount = 0; int droppedShardCount = 0;
MemoryContext savedContext = CurrentMemoryContext; MemoryContext savedContext = CurrentMemoryContext;
PG_TRY(); PG_TRY();
{ {
droppedShardCount = DropMarkedShards(waitForLocks); droppedShardCount = DropOrphanedShards(waitForLocks);
} }
PG_CATCH(); PG_CATCH();
{ {
@ -84,7 +133,7 @@ TryDropMarkedShards(bool waitForLocks)
/* /*
* DropMarkedShards removes shards that were marked SHARD_STATE_TO_DELETE before. * DropOrphanedShards removes shards that were marked SHARD_STATE_TO_DELETE before.
* *
* It does so by trying to take an exclusive lock on the shard and its * It does so by trying to take an exclusive lock on the shard and its
* colocated placements before removing. If the lock cannot be obtained it * colocated placements before removing. If the lock cannot be obtained it
@ -103,7 +152,7 @@ TryDropMarkedShards(bool waitForLocks)
* *
*/ */
int int
DropMarkedShards(bool waitForLocks) DropOrphanedShards(bool waitForLocks)
{ {
int removedShardCount = 0; int removedShardCount = 0;
ListCell *shardPlacementCell = NULL; ListCell *shardPlacementCell = NULL;
@ -159,7 +208,7 @@ DropMarkedShards(bool waitForLocks)
if (failedShardDropCount > 0) if (failedShardDropCount > 0)
{ {
ereport(WARNING, (errmsg("Failed to drop %d old shards out of %d", ereport(WARNING, (errmsg("Failed to drop %d orphaned shards out of %d",
failedShardDropCount, list_length(shardPlacementList)))); failedShardDropCount, list_length(shardPlacementList))));
} }

View File

@ -43,6 +43,7 @@
#include "distributed/repair_shards.h" #include "distributed/repair_shards.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shard_rebalancer.h" #include "distributed/shard_rebalancer.h"
#include "distributed/shard_cleaner.h"
#include "distributed/tuplestore.h" #include "distributed/tuplestore.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "funcapi.h" #include "funcapi.h"
@ -700,6 +701,8 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid,
"unsupported"))); "unsupported")));
} }
DropOrphanedShardsInSeparateTransaction();
foreach(placementUpdateCell, placementUpdateList) foreach(placementUpdateCell, placementUpdateList)
{ {
PlacementUpdateEvent *placementUpdate = lfirst(placementUpdateCell); PlacementUpdateEvent *placementUpdate = lfirst(placementUpdateCell);
@ -910,17 +913,15 @@ citus_drain_node(PG_FUNCTION_ARGS)
}; };
char *nodeName = text_to_cstring(nodeNameText); char *nodeName = text_to_cstring(nodeNameText);
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName,
PostPortNumber);
/* /*
* This is done in a separate session. This way it's not undone if the * This is done in a separate session. This way it's not undone if the
* draining fails midway through. * draining fails midway through.
*/ */
ExecuteCriticalRemoteCommand(connection, psprintf( ExecuteCriticalCommandInSeparateTransaction(psprintf(
"SELECT master_set_node_property(%s, %i, 'shouldhaveshards', false)", "SELECT master_set_node_property(%s, %i, 'shouldhaveshards', false)",
quote_literal_cstr(nodeName), nodePort)); quote_literal_cstr(nodeName),
nodePort));
RebalanceTableShards(&options, shardTransferModeOid); RebalanceTableShards(&options, shardTransferModeOid);
@ -1692,20 +1693,32 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent,
REBALANCE_PROGRESS_MOVING); REBALANCE_PROGRESS_MOVING);
ConflictShardPlacementUpdateOnlyWithIsolationTesting(shardId); ConflictShardPlacementUpdateOnlyWithIsolationTesting(shardId);
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName,
PostPortNumber);
/* /*
* In case of failure, we throw an error such that rebalance_table_shards * In case of failure, we throw an error such that rebalance_table_shards
* fails early. * fails early.
*/ */
ExecuteCriticalRemoteCommand(connection, placementUpdateCommand->data); ExecuteCriticalCommandInSeparateTransaction(placementUpdateCommand->data);
UpdateColocatedShardPlacementProgress(shardId, UpdateColocatedShardPlacementProgress(shardId,
sourceNode->workerName, sourceNode->workerName,
sourceNode->workerPort, sourceNode->workerPort,
REBALANCE_PROGRESS_MOVED); REBALANCE_PROGRESS_MOVED);
}
/*
* ExecuteCriticalCommandInSeparateTransaction runs a command in a separate
* transaction that is commited right away. This is useful for things that you
* don't want to rollback when the current transaction is rolled back.
*/
void
ExecuteCriticalCommandInSeparateTransaction(char *command)
{
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName,
PostPortNumber);
ExecuteCriticalRemoteCommand(connection, command);
CloseConnection(connection); CloseConnection(connection);
} }

View File

@ -645,7 +645,9 @@ RegisterCitusConfigVariables(void)
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.defer_drop_after_shard_move", "citus.defer_drop_after_shard_move",
gettext_noop("When enabled a shard move will mark old shards for deletion"), gettext_noop("When enabled a shard move will mark the original shards "
"for deletion after a successful move, instead of deleting "
"them right away."),
gettext_noop("The deletion of a shard can sometimes run into a conflict with a " gettext_noop("The deletion of a shard can sometimes run into a conflict with a "
"long running transactions on a the shard during the drop phase of " "long running transactions on a the shard during the drop phase of "
"the shard move. This causes some moves to be rolled back after " "the shard move. This causes some moves to be rolled back after "

View File

@ -47,3 +47,5 @@ WHERE repmodel = 'c'
DROP TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger ON pg_catalog.pg_dist_rebalance_strategy; DROP TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger ON pg_catalog.pg_dist_rebalance_strategy;
DROP FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check(); DROP FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check();
#include "udfs/citus_cleanup_orphaned_shards/10.1-1.sql"

View File

@ -84,3 +84,4 @@ CREATE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger
BEFORE INSERT OR UPDATE OR DELETE OR TRUNCATE ON pg_dist_rebalance_strategy BEFORE INSERT OR UPDATE OR DELETE OR TRUNCATE ON pg_dist_rebalance_strategy
FOR EACH STATEMENT EXECUTE FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check(); FOR EACH STATEMENT EXECUTE FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check();
DROP PROCEDURE pg_catalog.citus_cleanup_orphaned_shards();

View File

@ -0,0 +1,5 @@
CREATE OR REPLACE PROCEDURE pg_catalog.citus_cleanup_orphaned_shards()
LANGUAGE C
AS 'citus', $$citus_cleanup_orphaned_shards$$;
COMMENT ON PROCEDURE pg_catalog.citus_cleanup_orphaned_shards()
IS 'cleanup orphaned shards';

View File

@ -0,0 +1,5 @@
CREATE OR REPLACE PROCEDURE pg_catalog.citus_cleanup_orphaned_shards()
LANGUAGE C
AS 'citus', $$citus_cleanup_orphaned_shards$$;
COMMENT ON PROCEDURE pg_catalog.citus_cleanup_orphaned_shards()
IS 'cleanup orphaned shards';

View File

@ -74,13 +74,13 @@ typedef struct RebalancePlanContext
} RebalancePlacementContext; } RebalancePlacementContext;
/* /*
* run_try_drop_marked_shards is a wrapper to run TryDropMarkedShards. * run_try_drop_marked_shards is a wrapper to run TryDropOrphanedShards.
*/ */
Datum Datum
run_try_drop_marked_shards(PG_FUNCTION_ARGS) run_try_drop_marked_shards(PG_FUNCTION_ARGS)
{ {
bool waitForLocks = false; bool waitForLocks = false;
TryDropMarkedShards(waitForLocks); TryDropOrphanedShards(waitForLocks);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }

View File

@ -645,7 +645,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
lastShardCleanTime = GetCurrentTimestamp(); lastShardCleanTime = GetCurrentTimestamp();
bool waitForLocks = false; bool waitForLocks = false;
numberOfDroppedShards = TryDropMarkedShards(waitForLocks); numberOfDroppedShards = TryDropOrphanedShards(waitForLocks);
} }
CommitTransactionCommand(); CommitTransactionCommand();

View File

@ -17,7 +17,8 @@ extern bool DeferShardDeleteOnMove;
extern double DesiredPercentFreeAfterMove; extern double DesiredPercentFreeAfterMove;
extern bool CheckAvailableSpaceBeforeMove; extern bool CheckAvailableSpaceBeforeMove;
extern int TryDropMarkedShards(bool waitForLocks); extern int TryDropOrphanedShards(bool waitForLocks);
extern int DropMarkedShards(bool waitForLocks); extern int DropOrphanedShards(bool waitForLocks);
extern void DropOrphanedShardsInSeparateTransaction(void);
#endif /*CITUS_SHARD_CLEANER_H */ #endif /*CITUS_SHARD_CLEANER_H */

View File

@ -190,6 +190,7 @@ extern List * RebalancePlacementUpdates(List *workerNodeList, List *shardPlaceme
RebalancePlanFunctions *rebalancePlanFunctions); RebalancePlanFunctions *rebalancePlanFunctions);
extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList, extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList,
int shardReplicationFactor); int shardReplicationFactor);
extern void ExecuteCriticalCommandInSeparateTransaction(char *command);
#endif /* SHARD_REBALANCER_H */ #endif /* SHARD_REBALANCER_H */

View File

@ -61,12 +61,8 @@ SELECT count(*) FROM referencing_table2;
101 101
(1 row) (1 row)
SELECT 1 FROM public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
?column? NOTICE: cleaned up 2 orphaned shards
---------------------------------------------------------------------
1
(1 row)
SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3;
name | relid | refd_relid name | relid | refd_relid
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -108,12 +104,8 @@ SELECT count(*) FROM referencing_table2;
101 101
(1 row) (1 row)
SELECT 1 FROM public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
?column? NOTICE: cleaned up 2 orphaned shards
---------------------------------------------------------------------
1
(1 row)
SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3;
name | relid | refd_relid name | relid | refd_relid
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -10,23 +10,19 @@ step s1-move-placement:
master_move_shard_placement master_move_shard_placement
s1: NOTICE: cleaned up 1 orphaned shards
step s1-drop-marked-shards: step s1-drop-marked-shards:
SELECT public.master_defer_delete_shards(); SET client_min_messages to NOTICE;
CALL isolation_cleanup_orphaned_shards();
master_defer_delete_shards
1
step s2-drop-marked-shards: step s2-drop-marked-shards:
SET client_min_messages to DEBUG1; SET client_min_messages to DEBUG1;
SELECT public.master_defer_delete_shards(); CALL isolation_cleanup_orphaned_shards();
<waiting ...> <waiting ...>
step s1-commit: step s1-commit:
COMMIT; COMMIT;
step s2-drop-marked-shards: <... completed> step s2-drop-marked-shards: <... completed>
master_defer_delete_shards
0
starting permutation: s1-begin s1-move-placement s2-drop-marked-shards s1-drop-marked-shards s1-commit starting permutation: s1-begin s1-move-placement s2-drop-marked-shards s1-drop-marked-shards s1-commit
step s1-begin: step s1-begin:
@ -40,17 +36,13 @@ master_move_shard_placement
step s2-drop-marked-shards: step s2-drop-marked-shards:
SET client_min_messages to DEBUG1; SET client_min_messages to DEBUG1;
SELECT public.master_defer_delete_shards(); CALL isolation_cleanup_orphaned_shards();
master_defer_delete_shards s1: NOTICE: cleaned up 1 orphaned shards
0
step s1-drop-marked-shards: step s1-drop-marked-shards:
SELECT public.master_defer_delete_shards(); SET client_min_messages to NOTICE;
CALL isolation_cleanup_orphaned_shards();
master_defer_delete_shards
1
step s1-commit: step s1-commit:
COMMIT; COMMIT;
@ -82,14 +74,63 @@ run_commands_on_session_level_connection_to_node
step s1-drop-marked-shards: step s1-drop-marked-shards:
SELECT public.master_defer_delete_shards(); SET client_min_messages to NOTICE;
CALL isolation_cleanup_orphaned_shards();
<waiting ...> <waiting ...>
s1: WARNING: canceling statement due to lock timeout s1: WARNING: canceling statement due to lock timeout
s1: WARNING: Failed to drop 1 old shards out of 1
step s1-drop-marked-shards: <... completed> step s1-drop-marked-shards: <... completed>
master_defer_delete_shards s1: WARNING: Failed to drop 1 orphaned shards out of 1
step s1-commit:
COMMIT;
0 step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
starting permutation: s1-begin s1-move-placement s2-start-session-level-connection s2-lock-table-on-worker s1-commit s1-begin s1-move-placement-back s1-commit s2-stop-connection
step s1-begin:
BEGIN;
step s1-move-placement:
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
master_move_shard_placement
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
step s2-lock-table-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN;');
SELECT run_commands_on_session_level_connection_to_node('LOCK TABLE t1_120000');
run_commands_on_session_level_connection_to_node
run_commands_on_session_level_connection_to_node
step s1-commit:
COMMIT;
step s1-begin:
BEGIN;
step s1-move-placement-back:
SET client_min_messages to NOTICE;
SHOW log_error_verbosity;
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57638, 'localhost', 57637);
log_error_verbosity
verbose
ERROR: shard xxxxx still exists on the target node as an orphaned shard
step s1-commit: step s1-commit:
COMMIT; COMMIT;

View File

@ -573,6 +573,7 @@ SELECT * FROM print_extension_changes();
function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint) | function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint) |
function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name) TABLE(table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer) | function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name) TABLE(table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer) |
| function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) void | function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) void
| function citus_cleanup_orphaned_shards()
| function citus_local_disk_space_stats() record | function citus_local_disk_space_stats() record
| function create_distributed_table(regclass,text,citus.distribution_type,text,integer) void | function create_distributed_table(regclass,text,citus.distribution_type,text,integer) void
| function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint) | function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint)
@ -580,7 +581,7 @@ SELECT * FROM print_extension_changes();
| function worker_partitioned_relation_size(regclass) bigint | function worker_partitioned_relation_size(regclass) bigint
| function worker_partitioned_relation_total_size(regclass) bigint | function worker_partitioned_relation_total_size(regclass) bigint
| function worker_partitioned_table_size(regclass) bigint | function worker_partitioned_table_size(regclass) bigint
(14 rows) (15 rows)
DROP TABLE prev_objects, extension_diff; DROP TABLE prev_objects, extension_diff;
-- show running version -- show running version

View File

@ -1,9 +1,3 @@
CREATE OR REPLACE FUNCTION master_defer_delete_shards()
RETURNS int
LANGUAGE C STRICT
AS 'citus', $$master_defer_delete_shards$$;
COMMENT ON FUNCTION master_defer_delete_shards()
IS 'remove orphaned shards';
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void RETURNS void
LANGUAGE C STRICT LANGUAGE C STRICT

View File

@ -50,13 +50,14 @@ $cmd$);
(localhost,57638,t,1) (localhost,57638,t,1)
(2 rows) (2 rows)
-- Make sure this cannot be run in a transaction
BEGIN;
CALL citus_cleanup_orphaned_shards();
ERROR: citus_cleanup_orphaned_shards cannot run inside a transaction block
COMMIT;
-- execute delayed removal -- execute delayed removal
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards NOTICE: cleaned up 1 orphaned shards
---------------------------------------------------------------------
1
(1 row)
-- we expect the shard to be on only the second worker -- we expect the shard to be on only the second worker
SELECT run_command_on_workers($cmd$ SELECT run_command_on_workers($cmd$
SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; SELECT count(*) FROM pg_class WHERE relname = 't1_20000000';
@ -133,10 +134,14 @@ $cmd$);
(localhost,57638,t,1) (localhost,57638,t,1)
(2 rows) (2 rows)
-- we expect to get an error since the old placement is still there -- master_move_shard_placement automatically cleans up orphaned shards if
-- needed.
SELECT master_move_shard_placement(20000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port); SELECT master_move_shard_placement(20000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
ERROR: shard xxxxx already exists in the target node master_move_shard_placement
DETAIL: The existing shard is marked for deletion, but could not be deleted because there are still active queries on it ---------------------------------------------------------------------
(1 row)
SELECT run_command_on_workers($cmd$ SELECT run_command_on_workers($cmd$
-- override the function for testing purpose -- override the function for testing purpose
create or replace function pg_catalog.citus_local_disk_space_stats(OUT available_disk_size bigint, OUT total_disk_size bigint) create or replace function pg_catalog.citus_local_disk_space_stats(OUT available_disk_size bigint, OUT total_disk_size bigint)

View File

@ -31,24 +31,14 @@ SELECT rebalance_table_shards('dist_table_test');
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
SELECT rebalance_table_shards(); SELECT rebalance_table_shards();
rebalance_table_shards rebalance_table_shards
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
-- test that calling rebalance_table_shards without specifying relation -- test that calling rebalance_table_shards without specifying relation
-- wouldn't move shard of the citus local table. -- wouldn't move shard of the citus local table.
CREATE TABLE citus_local_table(a int, b int); CREATE TABLE citus_local_table(a int, b int);
@ -65,12 +55,7 @@ SELECT rebalance_table_shards();
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
-- show that citus local table shard is still on the coordinator -- show that citus local table shard is still on the coordinator
SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%';
tablename tablename
@ -101,12 +86,7 @@ SELECT pg_sleep(.1); -- wait to make sure the config has changed before running
SELECT master_drain_node('localhost', :master_port); SELECT master_drain_node('localhost', :master_port);
ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: <system specific error> ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: <system specific error>
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
ALTER SYSTEM RESET citus.local_hostname; ALTER SYSTEM RESET citus.local_hostname;
SELECT pg_reload_conf(); SELECT pg_reload_conf();
pg_reload_conf pg_reload_conf
@ -126,12 +106,7 @@ SELECT master_drain_node('localhost', :master_port);
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
-- show that citus local table shard is still on the coordinator -- show that citus local table shard is still on the coordinator
SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%';
tablename tablename
@ -181,7 +156,6 @@ SELECT pg_sleep(.1); -- wait to make sure the config has changed before running
SET citus.shard_replication_factor TO 2; SET citus.shard_replication_factor TO 2;
SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes'); SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes');
NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: <system specific error> ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: <system specific error>
ALTER SYSTEM RESET citus.local_hostname; ALTER SYSTEM RESET citus.local_hostname;
SELECT pg_reload_conf(); SELECT pg_reload_conf();
@ -579,7 +553,7 @@ AS $$
pg_dist_shard_placement src USING (shardid), pg_dist_shard_placement src USING (shardid),
(SELECT nodename, nodeport FROM pg_dist_shard_placement ORDER BY nodeport DESC LIMIT 1) dst (SELECT nodename, nodeport FROM pg_dist_shard_placement ORDER BY nodeport DESC LIMIT 1) dst
WHERE src.nodeport < dst.nodeport AND s.logicalrelid = rel::regclass; WHERE src.nodeport < dst.nodeport AND s.logicalrelid = rel::regclass;
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
$$; $$;
CALL create_unbalanced_shards('rebalance_test_table'); CALL create_unbalanced_shards('rebalance_test_table');
SET citus.shard_replication_factor TO 2; SET citus.shard_replication_factor TO 2;
@ -624,12 +598,7 @@ FROM (
WHERE logicalrelid = 'rebalance_test_table'::regclass WHERE logicalrelid = 'rebalance_test_table'::regclass
) T; ) T;
ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: <system specific error> ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: <system specific error>
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
ALTER SYSTEM RESET citus.local_hostname; ALTER SYSTEM RESET citus.local_hostname;
SELECT pg_reload_conf(); SELECT pg_reload_conf();
pg_reload_conf pg_reload_conf
@ -658,12 +627,7 @@ FROM (
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
1
(1 row)
SELECT * FROM table_placements_per_node; SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -715,12 +679,7 @@ SELECT * FROM table_placements_per_node;
57638 | rebalance_test_table | 5 57638 | rebalance_test_table | 5
(2 rows) (2 rows)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
SELECT rebalance_table_shards('rebalance_test_table', SELECT rebalance_table_shards('rebalance_test_table',
threshold := 0, max_shard_moves := 1, threshold := 0, max_shard_moves := 1,
shard_transfer_mode:='block_writes'); shard_transfer_mode:='block_writes');
@ -729,12 +688,7 @@ SELECT rebalance_table_shards('rebalance_test_table',
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
1
(1 row)
SELECT * FROM table_placements_per_node; SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -749,12 +703,7 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 1, shard_tran
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
SELECT * FROM table_placements_per_node; SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -769,12 +718,7 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 0);
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
1
(1 row)
SELECT * FROM table_placements_per_node; SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -790,12 +734,7 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, shard_tran
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
SELECT * FROM table_placements_per_node; SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -969,12 +908,7 @@ SELECT COUNT(*) FROM imbalanced_table;
-- Try force_logical -- Try force_logical
SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='force_logical'); SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='force_logical');
ERROR: the force_logical transfer mode is currently unsupported ERROR: the force_logical transfer mode is currently unsupported
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
-- Test rebalance operation -- Test rebalance operation
SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='block_writes'); SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='block_writes');
rebalance_table_shards rebalance_table_shards
@ -982,12 +916,7 @@ SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_m
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
1
(1 row)
-- Confirm rebalance -- Confirm rebalance
-- Shard counts in each node after rebalance -- Shard counts in each node after rebalance
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
@ -1024,12 +953,7 @@ FROM pg_dist_shard_placement
WHERE nodeport = :worker_2_port; WHERE nodeport = :worker_2_port;
ERROR: Moving shards to a non-existing node is not supported ERROR: Moving shards to a non-existing node is not supported
HINT: Add the target node via SELECT citus_add_node('localhost', 10000); HINT: Add the target node via SELECT citus_add_node('localhost', 10000);
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
-- Try to move shards to a node where shards are not allowed -- Try to move shards to a node where shards are not allowed
SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false);
master_set_node_property master_set_node_property
@ -1073,12 +997,7 @@ WHERE nodeport = :worker_2_port;
(2 rows) (2 rows)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
2
(1 row)
SELECT create_distributed_table('colocated_rebalance_test2', 'id'); SELECT create_distributed_table('colocated_rebalance_test2', 'id');
create_distributed_table create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1106,12 +1025,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0,
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
-- Confirm that nothing changed -- Confirm that nothing changed
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
@ -1153,19 +1067,64 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0,
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
4
(1 row)
-- Check that we can call this function without a crash -- Check that we can call this function without a crash
SELECT * FROM get_rebalance_progress(); SELECT * FROM get_rebalance_progress();
sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
-- Confirm that the nodes are now there -- Confirm that the shards are now there
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 2
57638 | colocated_rebalance_test | 2
57637 | colocated_rebalance_test2 | 2
57638 | colocated_rebalance_test2 | 2
(4 rows)
CALL citus_cleanup_orphaned_shards();
select * from pg_dist_placement;
placementid | shardid | shardstate | shardlength | groupid
---------------------------------------------------------------------
135 | 123023 | 1 | 0 | 14
138 | 123024 | 1 | 0 | 14
141 | 123027 | 1 | 0 | 14
142 | 123028 | 1 | 0 | 14
143 | 123021 | 1 | 0 | 16
144 | 123025 | 1 | 0 | 16
145 | 123022 | 1 | 0 | 16
146 | 123026 | 1 | 0 | 16
(8 rows)
-- Move all shards to worker1 again
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes')
FROM pg_dist_shard NATURAL JOIN pg_dist_placement NATURAL JOIN pg_dist_node
WHERE nodeport = :worker_2_port AND logicalrelid = 'colocated_rebalance_test'::regclass;
master_move_shard_placement
---------------------------------------------------------------------
(2 rows)
-- Confirm that the shards are now all on worker1
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 4
57637 | colocated_rebalance_test2 | 4
(2 rows)
-- Explicitly don't run citus_cleanup_orphaned_shards, rebalance_table_shards
-- should do that for automatically.
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
-- Confirm that the shards are now moved
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1216,12 +1175,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0,
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
4
(1 row)
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1244,12 +1198,7 @@ SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold :
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
2
(1 row)
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1271,12 +1220,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0,
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
4
(1 row)
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1293,12 +1237,7 @@ SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold :
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
2
(1 row)
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1335,12 +1274,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
6
(1 row)
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1362,12 +1296,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
6
(1 row)
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1404,12 +1333,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
6
(1 row)
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1431,12 +1355,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
6
(1 row)
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1462,12 +1381,7 @@ SELECT * from master_drain_node('localhost', :worker_2_port, shard_transfer_mode
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
6
(1 row)
select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port; select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port;
shouldhaveshards shouldhaveshards
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1495,12 +1409,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
6
(1 row)
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1607,12 +1516,7 @@ SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes')
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1627,12 +1531,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards NOTICE: cleaned up 1 orphaned shards
---------------------------------------------------------------------
1
(1 row)
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1648,12 +1548,7 @@ DETAIL: Using threshold of 0.01
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1759,12 +1654,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards NOTICE: cleaned up 4 orphaned shards
---------------------------------------------------------------------
4
(1 row)
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1859,12 +1750,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards NOTICE: cleaned up 3 orphaned shards
---------------------------------------------------------------------
3
(1 row)
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1888,12 +1775,7 @@ SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes')
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1943,12 +1825,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards NOTICE: cleaned up 4 orphaned shards
---------------------------------------------------------------------
4
(1 row)
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count nodeport | logicalrelid | count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1973,20 +1851,10 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'non_
ERROR: could not find rebalance strategy with name non_existing ERROR: could not find rebalance strategy with name non_existing
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'non_existing'); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'non_existing');
ERROR: could not find rebalance strategy with name non_existing ERROR: could not find rebalance strategy with name non_existing
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy := 'non_existing'); SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy := 'non_existing');
ERROR: could not find rebalance strategy with name non_existing ERROR: could not find rebalance strategy with name non_existing
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
SELECT citus_set_default_rebalance_strategy('non_existing'); SELECT citus_set_default_rebalance_strategy('non_existing');
ERROR: strategy with specified name does not exist ERROR: strategy with specified name does not exist
UPDATE pg_dist_rebalance_strategy SET default_strategy=false; UPDATE pg_dist_rebalance_strategy SET default_strategy=false;
@ -1994,20 +1862,10 @@ SELECT * FROM get_rebalance_table_shards_plan('tab');
ERROR: no rebalance_strategy was provided, but there is also no default strategy set ERROR: no rebalance_strategy was provided, but there is also no default strategy set
SELECT * FROM rebalance_table_shards('tab'); SELECT * FROM rebalance_table_shards('tab');
ERROR: no rebalance_strategy was provided, but there is also no default strategy set ERROR: no rebalance_strategy was provided, but there is also no default strategy set
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
SELECT * FROM master_drain_node('localhost', :worker_2_port); SELECT * FROM master_drain_node('localhost', :worker_2_port);
ERROR: no rebalance_strategy was provided, but there is also no default strategy set ERROR: no rebalance_strategy was provided, but there is also no default strategy set
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count'; UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count';
CREATE OR REPLACE FUNCTION shard_cost_no_arguments() CREATE OR REPLACE FUNCTION shard_cost_no_arguments()
RETURNS real AS $$ SELECT 1.0::real $$ LANGUAGE sql; RETURNS real AS $$ SELECT 1.0::real $$ LANGUAGE sql;
@ -2283,12 +2141,7 @@ SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='bloc
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
3
(1 row)
SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -2350,12 +2203,7 @@ SELECT rebalance_table_shards();
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
2
(1 row)
DROP TABLE t1, r1, r2; DROP TABLE t1, r1, r2;
-- verify there are no distributed tables before we perform the following tests. Preceding -- verify there are no distributed tables before we perform the following tests. Preceding
-- test suites should clean up their distributed tables. -- test suites should clean up their distributed tables.
@ -2404,12 +2252,7 @@ SELECT rebalance_table_shards();
(1 row) (1 row)
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
-- verify the reference table is on all nodes after the rebalance -- verify the reference table is on all nodes after the rebalance
SELECT count(*) SELECT count(*)
FROM pg_dist_shard FROM pg_dist_shard

View File

@ -38,6 +38,7 @@ ORDER BY 1;
function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real)
function citus_add_secondary_node(text,integer,text,integer,name) function citus_add_secondary_node(text,integer,text,integer,name)
function citus_blocking_pids(integer) function citus_blocking_pids(integer)
function citus_cleanup_orphaned_shards()
function citus_conninfo_cache_invalidate() function citus_conninfo_cache_invalidate()
function citus_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode) function citus_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode)
function citus_create_restore_point(text) function citus_create_restore_point(text)
@ -245,5 +246,5 @@ ORDER BY 1;
view citus_worker_stat_activity view citus_worker_stat_activity
view pg_dist_shard_placement view pg_dist_shard_placement
view time_partitions view time_partitions
(229 rows) (230 rows)

View File

@ -26,12 +26,11 @@ setup
SELECT citus_internal.replace_isolation_tester_func(); SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement(); SELECT citus_internal.refresh_isolation_tester_prepared_statement();
CREATE OR REPLACE FUNCTION master_defer_delete_shards() CREATE OR REPLACE PROCEDURE isolation_cleanup_orphaned_shards()
RETURNS int LANGUAGE C
LANGUAGE C STRICT AS 'citus', $$isolation_cleanup_orphaned_shards$$;
AS 'citus', $$master_defer_delete_shards$$; COMMENT ON PROCEDURE isolation_cleanup_orphaned_shards()
COMMENT ON FUNCTION master_defer_delete_shards() IS 'cleanup orphaned shards';
IS 'remove orphaned shards';
SET citus.next_shard_id to 120000; SET citus.next_shard_id to 120000;
SET citus.shard_count TO 8; SET citus.shard_count TO 8;
@ -63,6 +62,13 @@ step "s1-move-placement"
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
} }
step "s1-move-placement-back"
{
SET client_min_messages to NOTICE;
SHOW log_error_verbosity;
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57638, 'localhost', 57637);
}
step "s1-move-placement-without-deferred" { step "s1-move-placement-without-deferred" {
SET citus.defer_drop_after_shard_move TO OFF; SET citus.defer_drop_after_shard_move TO OFF;
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
@ -71,7 +77,8 @@ step "s1-move-placement-without-deferred" {
step "s1-drop-marked-shards" step "s1-drop-marked-shards"
{ {
SELECT public.master_defer_delete_shards(); SET client_min_messages to NOTICE;
CALL isolation_cleanup_orphaned_shards();
} }
step "s1-lock-pg-dist-placement" { step "s1-lock-pg-dist-placement" {
@ -116,7 +123,7 @@ step "s2-select" {
step "s2-drop-marked-shards" step "s2-drop-marked-shards"
{ {
SET client_min_messages to DEBUG1; SET client_min_messages to DEBUG1;
SELECT public.master_defer_delete_shards(); CALL isolation_cleanup_orphaned_shards();
} }
step "s2-commit" { step "s2-commit" {
@ -127,6 +134,8 @@ step "s2-commit" {
permutation "s1-begin" "s1-move-placement" "s1-drop-marked-shards" "s2-drop-marked-shards" "s1-commit" permutation "s1-begin" "s1-move-placement" "s1-drop-marked-shards" "s2-drop-marked-shards" "s1-commit"
permutation "s1-begin" "s1-move-placement" "s2-drop-marked-shards" "s1-drop-marked-shards" "s1-commit" permutation "s1-begin" "s1-move-placement" "s2-drop-marked-shards" "s1-drop-marked-shards" "s1-commit"
permutation "s1-begin" "s1-move-placement" "s2-start-session-level-connection" "s2-lock-table-on-worker" "s1-drop-marked-shards" "s1-commit" "s2-stop-connection" permutation "s1-begin" "s1-move-placement" "s2-start-session-level-connection" "s2-lock-table-on-worker" "s1-drop-marked-shards" "s1-commit" "s2-stop-connection"
// make sure we give a clear error when we try to replace an orphaned shard that is still in use
permutation "s1-begin" "s1-move-placement" "s2-start-session-level-connection" "s2-lock-table-on-worker" "s1-commit" "s1-begin" "s1-move-placement-back" "s1-commit" "s2-stop-connection"
// make sure we error if we cannot get the lock on pg_dist_placement // make sure we error if we cannot get the lock on pg_dist_placement
permutation "s1-begin" "s1-lock-pg-dist-placement" "s2-drop-old-shards" "s1-commit" permutation "s1-begin" "s1-lock-pg-dist-placement" "s2-drop-old-shards" "s1-commit"
permutation "s1-begin" "s2-begin" "s2-select" "s1-move-placement-without-deferred" "s2-commit" "s1-commit" permutation "s1-begin" "s2-begin" "s2-select" "s1-move-placement-without-deferred" "s2-commit" "s1-commit"

View File

@ -45,14 +45,14 @@ SELECT master_move_shard_placement(15000009, 'localhost', :worker_1_port, 'local
SELECT count(*) FROM referencing_table2; SELECT count(*) FROM referencing_table2;
SELECT 1 FROM public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3;
SELECT master_move_shard_placement(15000009, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); SELECT master_move_shard_placement(15000009, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
SELECT count(*) FROM referencing_table2; SELECT count(*) FROM referencing_table2;
SELECT 1 FROM public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3;
-- create a function to show the -- create a function to show the

View File

@ -1,10 +1,3 @@
CREATE OR REPLACE FUNCTION master_defer_delete_shards()
RETURNS int
LANGUAGE C STRICT
AS 'citus', $$master_defer_delete_shards$$;
COMMENT ON FUNCTION master_defer_delete_shards()
IS 'remove orphaned shards';
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void RETURNS void
LANGUAGE C STRICT LANGUAGE C STRICT

View File

@ -31,8 +31,13 @@ SELECT run_command_on_workers($cmd$
SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; SELECT count(*) FROM pg_class WHERE relname = 't1_20000000';
$cmd$); $cmd$);
-- Make sure this cannot be run in a transaction
BEGIN;
CALL citus_cleanup_orphaned_shards();
COMMIT;
-- execute delayed removal -- execute delayed removal
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
-- we expect the shard to be on only the second worker -- we expect the shard to be on only the second worker
SELECT run_command_on_workers($cmd$ SELECT run_command_on_workers($cmd$
@ -70,7 +75,8 @@ SELECT run_command_on_workers($cmd$
SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; SELECT count(*) FROM pg_class WHERE relname = 't1_20000000';
$cmd$); $cmd$);
-- we expect to get an error since the old placement is still there -- master_move_shard_placement automatically cleans up orphaned shards if
-- needed.
SELECT master_move_shard_placement(20000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port); SELECT master_move_shard_placement(20000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port);

View File

@ -13,9 +13,9 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0);
-- should just be noops even if we add the coordinator to the pg_dist_node -- should just be noops even if we add the coordinator to the pg_dist_node
SELECT rebalance_table_shards('dist_table_test'); SELECT rebalance_table_shards('dist_table_test');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT rebalance_table_shards(); SELECT rebalance_table_shards();
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
-- test that calling rebalance_table_shards without specifying relation -- test that calling rebalance_table_shards without specifying relation
@ -25,7 +25,7 @@ SELECT citus_add_local_table_to_metadata('citus_local_table');
INSERT INTO citus_local_table VALUES (1, 2); INSERT INTO citus_local_table VALUES (1, 2);
SELECT rebalance_table_shards(); SELECT rebalance_table_shards();
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
-- show that citus local table shard is still on the coordinator -- show that citus local table shard is still on the coordinator
SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%';
@ -38,14 +38,14 @@ SELECT pg_reload_conf();
SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC
SELECT master_drain_node('localhost', :master_port); SELECT master_drain_node('localhost', :master_port);
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
ALTER SYSTEM RESET citus.local_hostname; ALTER SYSTEM RESET citus.local_hostname;
SELECT pg_reload_conf(); SELECT pg_reload_conf();
SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC
SELECT master_drain_node('localhost', :master_port); SELECT master_drain_node('localhost', :master_port);
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
-- show that citus local table shard is still on the coordinator -- show that citus local table shard is still on the coordinator
SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%';
@ -396,7 +396,7 @@ AS $$
pg_dist_shard_placement src USING (shardid), pg_dist_shard_placement src USING (shardid),
(SELECT nodename, nodeport FROM pg_dist_shard_placement ORDER BY nodeport DESC LIMIT 1) dst (SELECT nodename, nodeport FROM pg_dist_shard_placement ORDER BY nodeport DESC LIMIT 1) dst
WHERE src.nodeport < dst.nodeport AND s.logicalrelid = rel::regclass; WHERE src.nodeport < dst.nodeport AND s.logicalrelid = rel::regclass;
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
$$; $$;
CALL create_unbalanced_shards('rebalance_test_table'); CALL create_unbalanced_shards('rebalance_test_table');
@ -428,7 +428,7 @@ FROM (
FROM pg_dist_shard FROM pg_dist_shard
WHERE logicalrelid = 'rebalance_test_table'::regclass WHERE logicalrelid = 'rebalance_test_table'::regclass
) T; ) T;
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
ALTER SYSTEM RESET citus.local_hostname; ALTER SYSTEM RESET citus.local_hostname;
SELECT pg_reload_conf(); SELECT pg_reload_conf();
@ -445,7 +445,7 @@ FROM (
FROM pg_dist_shard FROM pg_dist_shard
WHERE logicalrelid = 'rebalance_test_table'::regclass WHERE logicalrelid = 'rebalance_test_table'::regclass
) T; ) T;
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node; SELECT * FROM table_placements_per_node;
@ -480,26 +480,26 @@ SELECT rebalance_table_shards('rebalance_test_table',
RESET ROLE; RESET ROLE;
-- Confirm no moves took place at all during these errors -- Confirm no moves took place at all during these errors
SELECT * FROM table_placements_per_node; SELECT * FROM table_placements_per_node;
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT rebalance_table_shards('rebalance_test_table', SELECT rebalance_table_shards('rebalance_test_table',
threshold := 0, max_shard_moves := 1, threshold := 0, max_shard_moves := 1,
shard_transfer_mode:='block_writes'); shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node; SELECT * FROM table_placements_per_node;
-- Check that threshold=1 doesn't move any shards -- Check that threshold=1 doesn't move any shards
SELECT rebalance_table_shards('rebalance_test_table', threshold := 1, shard_transfer_mode:='block_writes'); SELECT rebalance_table_shards('rebalance_test_table', threshold := 1, shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node; SELECT * FROM table_placements_per_node;
-- Move the remaining shards using threshold=0 -- Move the remaining shards using threshold=0
SELECT rebalance_table_shards('rebalance_test_table', threshold := 0); SELECT rebalance_table_shards('rebalance_test_table', threshold := 0);
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node; SELECT * FROM table_placements_per_node;
@ -507,7 +507,7 @@ SELECT * FROM table_placements_per_node;
-- any effects. -- any effects.
SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, shard_transfer_mode:='block_writes'); SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node; SELECT * FROM table_placements_per_node;
@ -602,11 +602,11 @@ SELECT COUNT(*) FROM imbalanced_table;
-- Try force_logical -- Try force_logical
SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='force_logical'); SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='force_logical');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
-- Test rebalance operation -- Test rebalance operation
SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='block_writes'); SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
-- Confirm rebalance -- Confirm rebalance
-- Shard counts in each node after rebalance -- Shard counts in each node after rebalance
@ -633,7 +633,7 @@ SELECT create_distributed_table('colocated_rebalance_test', 'id');
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', 10000, 'block_writes') SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', 10000, 'block_writes')
FROM pg_dist_shard_placement FROM pg_dist_shard_placement
WHERE nodeport = :worker_2_port; WHERE nodeport = :worker_2_port;
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
-- Try to move shards to a node where shards are not allowed -- Try to move shards to a node where shards are not allowed
SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false);
@ -660,7 +660,7 @@ UPDATE pg_dist_node SET noderole = 'primary' WHERE nodeport = :worker_1_port;
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes')
FROM pg_dist_shard_placement FROM pg_dist_shard_placement
WHERE nodeport = :worker_2_port; WHERE nodeport = :worker_2_port;
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT create_distributed_table('colocated_rebalance_test2', 'id'); SELECT create_distributed_table('colocated_rebalance_test2', 'id');
@ -671,7 +671,7 @@ SELECT * FROM public.table_placements_per_node;
SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0, drain_only := true); SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0, drain_only := true);
-- Running with drain_only shouldn't do anything -- Running with drain_only shouldn't do anything
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true); SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true);
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
-- Confirm that nothing changed -- Confirm that nothing changed
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
@ -684,11 +684,30 @@ SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', rebala
SELECT * FROM get_rebalance_progress(); SELECT * FROM get_rebalance_progress();
-- Actually do the rebalance -- Actually do the rebalance
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
-- Check that we can call this function without a crash -- Check that we can call this function without a crash
SELECT * FROM get_rebalance_progress(); SELECT * FROM get_rebalance_progress();
-- Confirm that the nodes are now there -- Confirm that the shards are now there
SELECT * FROM public.table_placements_per_node;
CALL citus_cleanup_orphaned_shards();
select * from pg_dist_placement;
-- Move all shards to worker1 again
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes')
FROM pg_dist_shard NATURAL JOIN pg_dist_placement NATURAL JOIN pg_dist_node
WHERE nodeport = :worker_2_port AND logicalrelid = 'colocated_rebalance_test'::regclass;
-- Confirm that the shards are now all on worker1
SELECT * FROM public.table_placements_per_node;
-- Explicitly don't run citus_cleanup_orphaned_shards, rebalance_table_shards
-- should do that for automatically.
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
-- Confirm that the shards are now moved
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
@ -702,22 +721,22 @@ SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaves
SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0); SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0);
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
SELECT * FROM get_rebalance_table_shards_plan('non_colocated_rebalance_test', threshold := 0); SELECT * FROM get_rebalance_table_shards_plan('non_colocated_rebalance_test', threshold := 0);
SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
-- Put shards back -- Put shards back
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
-- testing behaviour when setting shouldhaveshards to false and rebalancing all -- testing behaviour when setting shouldhaveshards to false and rebalancing all
@ -725,13 +744,13 @@ SELECT * FROM public.table_placements_per_node;
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
SELECT * FROM get_rebalance_table_shards_plan(threshold := 0, drain_only := true); SELECT * FROM get_rebalance_table_shards_plan(threshold := 0, drain_only := true);
SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true);
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
-- Put shards back -- Put shards back
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
-- testing behaviour when setting shouldhaveshards to false and rebalancing all -- testing behaviour when setting shouldhaveshards to false and rebalancing all
@ -739,13 +758,13 @@ SELECT * FROM public.table_placements_per_node;
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
SELECT * FROM get_rebalance_table_shards_plan(threshold := 0); SELECT * FROM get_rebalance_table_shards_plan(threshold := 0);
SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
-- Put shards back -- Put shards back
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
-- Make it a data node again -- Make it a data node again
@ -753,14 +772,14 @@ SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaves
-- testing behaviour of master_drain_node -- testing behaviour of master_drain_node
SELECT * from master_drain_node('localhost', :worker_2_port, shard_transfer_mode := 'block_writes'); SELECT * from master_drain_node('localhost', :worker_2_port, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port; select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port;
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
-- Put shards back -- Put shards back
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
@ -829,15 +848,15 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_d
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', threshold := 0); SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', threshold := 0);
SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes'); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes', threshold := 0); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes', threshold := 0);
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
-- Check that sizes of colocated tables are added together for rebalances -- Check that sizes of colocated tables are added together for rebalances
@ -888,7 +907,7 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_d
-- supports improvement_threshold -- supports improvement_threshold
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', improvement_threshold := 0); SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', improvement_threshold := 0);
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes'); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
ANALYZE tab, tab2; ANALYZE tab, tab2;
@ -945,13 +964,13 @@ SELECT citus_add_rebalance_strategy(
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_2'); SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_2');
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_2', shard_transfer_mode:='block_writes'); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_2', shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
SELECT citus_set_default_rebalance_strategy('capacity_high_worker_2'); SELECT citus_set_default_rebalance_strategy('capacity_high_worker_2');
SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM get_rebalance_table_shards_plan('tab');
SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
CREATE FUNCTION only_worker_1(shardid bigint, nodeidarg int) CREATE FUNCTION only_worker_1(shardid bigint, nodeidarg int)
@ -972,7 +991,7 @@ SELECT citus_add_rebalance_strategy(
SELECT citus_set_default_rebalance_strategy('only_worker_1'); SELECT citus_set_default_rebalance_strategy('only_worker_1');
SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM get_rebalance_table_shards_plan('tab');
SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node; SELECT * FROM public.table_placements_per_node;
SELECT citus_set_default_rebalance_strategy('by_shard_count'); SELECT citus_set_default_rebalance_strategy('by_shard_count');
@ -981,18 +1000,18 @@ SELECT * FROM get_rebalance_table_shards_plan('tab');
-- Check all the error handling cases -- Check all the error handling cases
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'non_existing'); SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'non_existing');
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'non_existing'); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'non_existing');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy := 'non_existing'); SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy := 'non_existing');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT citus_set_default_rebalance_strategy('non_existing'); SELECT citus_set_default_rebalance_strategy('non_existing');
UPDATE pg_dist_rebalance_strategy SET default_strategy=false; UPDATE pg_dist_rebalance_strategy SET default_strategy=false;
SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM get_rebalance_table_shards_plan('tab');
SELECT * FROM rebalance_table_shards('tab'); SELECT * FROM rebalance_table_shards('tab');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT * FROM master_drain_node('localhost', :worker_2_port); SELECT * FROM master_drain_node('localhost', :worker_2_port);
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count'; UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count';
CREATE OR REPLACE FUNCTION shard_cost_no_arguments() CREATE OR REPLACE FUNCTION shard_cost_no_arguments()
@ -1222,7 +1241,7 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0);
SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='block_writes'); SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
@ -1255,7 +1274,7 @@ INSERT INTO r2 VALUES (1,2), (3,4);
SELECT 1 from master_add_node('localhost', :worker_2_port); SELECT 1 from master_add_node('localhost', :worker_2_port);
SELECT rebalance_table_shards(); SELECT rebalance_table_shards();
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
DROP TABLE t1, r1, r2; DROP TABLE t1, r1, r2;
@ -1282,7 +1301,7 @@ WHERE logicalrelid = 'r1'::regclass;
-- rebalance with _only_ a reference table, this should trigger the copy -- rebalance with _only_ a reference table, this should trigger the copy
SELECT rebalance_table_shards(); SELECT rebalance_table_shards();
SELECT public.master_defer_delete_shards(); CALL citus_cleanup_orphaned_shards();
-- verify the reference table is on all nodes after the rebalance -- verify the reference table is on all nodes after the rebalance
SELECT count(*) SELECT count(*)