diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index b25da1ebd..9fd4290ba 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -32,6 +32,7 @@ #include "catalog/pg_type.h" #include "commands/extension.h" #include "commands/sequence.h" +#include "distributed/background_jobs.h" #include "distributed/colocation_utils.h" #include "distributed/connection_management.h" #include "distributed/citus_nodes.h" @@ -57,7 +58,9 @@ #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" #include "distributed/remote_commands.h" +#include "distributed/shard_rebalancer.h" #include "distributed/tuplestore.h" +#include "distributed/utils/array_type.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" @@ -777,7 +780,6 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, { partitionedShardNames = lappend(partitionedShardNames, quotedShardName); } - /* for non-partitioned tables, we will use Postgres' size functions */ else { @@ -2816,7 +2818,8 @@ CreateBackgroundJob(const char *jobType, const char *description) */ BackgroundTask * ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskCount, - int64 dependingTaskIds[]) + int64 dependingTaskIds[], int nodesInvolvedCount, int32 + nodesInvolved[]) { BackgroundTask *task = NULL; @@ -2890,6 +2893,11 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC values[Anum_pg_dist_background_task_message - 1] = CStringGetTextDatum(""); nulls[Anum_pg_dist_background_task_message - 1] = false; + values[Anum_pg_dist_background_task_nodes_involved - 1] = + IntArrayToDatum(nodesInvolvedCount, nodesInvolved); + nulls[Anum_pg_dist_background_task_nodes_involved - 1] = (nodesInvolvedCount == + 0); + HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask), values, nulls); CatalogTupleInsert(pgDistBackgroundTask, newTuple); @@ -3201,6 +3209,13 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple) TextDatumGetCString(values[Anum_pg_dist_background_task_message - 1]); } + if (!nulls[Anum_pg_dist_background_task_nodes_involved - 1]) + { + ArrayType *nodesInvolvedArrayObject = + DatumGetArrayTypeP(values[Anum_pg_dist_background_task_nodes_involved - 1]); + task->nodesInvolved = IntegerArrayTypeToList(nodesInvolvedArrayObject); + } + return task; } @@ -3333,7 +3348,8 @@ GetRunnableBackgroundTask(void) while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor))) { task = DeformBackgroundTaskHeapTuple(tupleDescriptor, taskTuple); - if (BackgroundTaskReadyToRun(task)) + if (BackgroundTaskReadyToRun(task) && + IncrementParallelTaskCountForNodesInvolved(task)) { /* found task, close table and return */ break; diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 5d30ff8be..b5ec9b7ba 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -190,13 +190,32 @@ typedef struct WorkerShardStatistics HTAB *statistics; } WorkerShardStatistics; -/* ShardMoveDependencyHashEntry contains the taskId which any new shard move task within the corresponding colocation group must take a dependency on */ +/* + * ShardMoveDependencyHashEntry contains the taskId which any new shard + * move task within the corresponding colocation group + * must take a dependency on + */ typedef struct ShardMoveDependencyInfo { int64 key; int64 taskId; } ShardMoveDependencyInfo; +/* + * ShardMoveSourceNodeHashEntry keeps track of the source nodes + * of the moves. + */ +typedef struct ShardMoveSourceNodeHashEntry +{ + /* this is the key */ + int32 node_id; + List *taskIds; +} ShardMoveSourceNodeHashEntry; + +/* + * ShardMoveDependencies keeps track of all needed dependencies + * between shard moves. + */ typedef struct ShardMoveDependencies { HTAB *colocationDependencies; @@ -274,6 +293,15 @@ static void AddToWorkerShardIdSet(HTAB *shardsByWorker, char *workerName, int wo static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics); static void ErrorOnConcurrentRebalance(RebalanceOptions *); static List * GetSetCommandListForNewConnections(void); +static int64 GetColocationId(PlacementUpdateEvent *move); +static ShardMoveDependencies InitializeShardMoveDependencies(); +static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 + colocationId, + ShardMoveDependencies shardMoveDependencies, + int *nDepends); +static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, + int64 taskId, + ShardMoveDependencies shardMoveDependencies); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(rebalance_table_shards); @@ -1930,8 +1958,7 @@ GetColocationId(PlacementUpdateEvent *move) * InitializeShardMoveDependencies function creates the hash maps that we use to track * the latest moves so that subsequent moves with the same properties must take a dependency * on them. There are two hash maps. One is for tracking the latest move scheduled in a - * given colocation group and the other one is for tracking the latest move which involves - * a given node either as its source node or its target node. + * given colocation group and the other one is for tracking source nodes of all moves. */ static ShardMoveDependencies InitializeShardMoveDependencies() @@ -1941,18 +1968,17 @@ InitializeShardMoveDependencies() ShardMoveDependencyInfo, "colocationDependencyHashMap", 6); - shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int64, - ShardMoveDependencyInfo, + shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int32, + ShardMoveSourceNodeHashEntry, "nodeDependencyHashMap", 6); - return shardMoveDependencies; } /* * GenerateTaskMoveDependencyList creates and returns a List of taskIds that - * the move must take a dependency on. + * the move must take a dependency on, given the shard move dependencies as input. */ static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, @@ -1972,27 +1998,24 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); } - /* Check if there exists a move scheduled earlier whose source or target node - * overlaps with the current move's source node. */ - shardMoveDependencyInfo = hash_search( - shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER, + /* + * Check if there exists moves scheduled earlier whose source node + * overlaps with the current move's target node. + * The earlier/first move might make space for the later/second move. + * So we could run out of disk space (or at least overload the node) + * if we move the second shard to it before the first one is moved away.  + */ + ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search( + shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_FIND, &found); if (found) { - hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); - } - - /* Check if there exists a move scheduled earlier whose source or target node - * overlaps with the current move's target node. */ - shardMoveDependencyInfo = hash_search( - shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_ENTER, - &found); - - - if (found) - { - hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); + int64 *taskId = NULL; + foreach_ptr(taskId, shardMoveSourceNodeHashEntry->taskIds) + { + hash_search(dependsList, taskId, HASH_ENTER, NULL); + } } *nDepends = hash_get_num_entries(dependsList); @@ -2030,15 +2053,20 @@ UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL); shardMoveDependencyInfo->taskId = taskId; - shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies, - &move->sourceNode->nodeId, HASH_ENTER, NULL); + bool found; + ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search( + shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER, + &found); - shardMoveDependencyInfo->taskId = taskId; + if (!found) + { + shardMoveSourceNodeHashEntry->taskIds = NIL; + } - shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies, - &move->targetNode->nodeId, HASH_ENTER, NULL); - - shardMoveDependencyInfo->taskId = taskId; + int64 *newTaskId = palloc0(sizeof(int64)); + *newTaskId = taskId; + shardMoveSourceNodeHashEntry->taskIds = lappend( + shardMoveSourceNodeHashEntry->taskIds, newTaskId); } @@ -2135,8 +2163,10 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo appendStringInfo(&buf, "SELECT pg_catalog.replicate_reference_tables(%s)", quote_literal_cstr(shardTranferModeLabel)); + + int32 nodesInvolved[] = { 0 }; BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, 0, - NULL); + NULL, 0, nodesInvolved); replicateRefTablesTaskId = task->taskid; } @@ -2170,9 +2200,14 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo dependsArray[0] = replicateRefTablesTaskId; } + int32 nodesInvolved[2] = { 0 }; + nodesInvolved[0] = move->sourceNode->nodeId; + nodesInvolved[1] = move->targetNode->nodeId; + BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, nDepends, - dependsArray); + dependsArray, 2, + nodesInvolved); UpdateShardMoveDependencies(move, colocationId, task->taskid, shardMoveDependencies); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 76e0ae9b9..e7fedd1d8 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1793,6 +1793,18 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.max_background_task_executors_per_node", + gettext_noop( + "Sets the maximum number of parallel background task executor workers " + "for scheduled background tasks that involve a particular node"), + NULL, + &MaxBackgroundTaskExecutorsPerNode, + 1, 1, 128, + PGC_SIGHUP, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.max_cached_connection_lifetime", gettext_noop("Sets the maximum lifetime of cached connections to other nodes."), diff --git a/src/backend/distributed/sql/citus--11.2-2--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-2--11.3-1.sql index a0826d22f..9b45cc0fe 100644 --- a/src/backend/distributed/sql/citus--11.2-2--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-2--11.3-1.sql @@ -14,3 +14,7 @@ ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY USING INDEX pg_dist_ #include "udfs/citus_stat_tenants_local_reset/11.3-1.sql" #include "udfs/citus_stat_tenants_reset/11.3-1.sql" + +-- we introduce nodes_involved, which will be used internally to +-- limit the number of parallel tasks running per node +ALTER TABLE pg_catalog.pg_dist_background_task ADD COLUMN nodes_involved int[] DEFAULT NULL; diff --git a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-2.sql b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-2.sql index 501d39785..0550354e1 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-2.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-2.sql @@ -28,3 +28,5 @@ DROP FUNCTION pg_catalog.citus_stat_tenants(boolean); DROP FUNCTION pg_catalog.citus_stat_tenants_local_reset(); DROP FUNCTION pg_catalog.citus_stat_tenants_reset(); + +ALTER TABLE pg_catalog.pg_dist_background_task DROP COLUMN nodes_involved; diff --git a/src/backend/distributed/utils/array_type.c b/src/backend/distributed/utils/array_type.c index 348b25b4a..70c7dde14 100644 --- a/src/backend/distributed/utils/array_type.c +++ b/src/backend/distributed/utils/array_type.c @@ -140,3 +140,34 @@ TextArrayTypeToIntegerList(ArrayType *arrayObject) return list; } + + +/* + * IntArrayToDatum + * + * Convert an integer array to the datum int array format + * (currently used for nodes_involved in pg_dist_background_task) + * + * Returns the array in the form of a Datum, or PointerGetDatum(NULL) + * if the int_array is empty. + */ +Datum +IntArrayToDatum(uint32 int_array_size, int int_array[]) +{ + if (int_array_size == 0) + { + return PointerGetDatum(NULL); + } + + ArrayBuildState *astate = NULL; + for (int i = 0; i < int_array_size; i++) + { + Datum dvalue = Int32GetDatum(int_array[i]); + bool disnull = false; + Oid element_type = INT4OID; + astate = accumArrayResult(astate, dvalue, disnull, element_type, + CurrentMemoryContext); + } + + return makeArrayResult(astate, CurrentMemoryContext); +} diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index d7a5a31bd..789732d21 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -63,6 +63,7 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_utility.h" #include "distributed/shard_cleaner.h" +#include "distributed/shard_rebalancer.h" #include "distributed/resource_lock.h" /* Table-of-contents constants for our dynamic shared memory segment. */ @@ -115,12 +116,17 @@ static bool MonitorGotTerminationOrCancellationRequest(); static void QueueMonitorSigTermHandler(SIGNAL_ARGS); static void QueueMonitorSigIntHandler(SIGNAL_ARGS); static void QueueMonitorSigHupHandler(SIGNAL_ARGS); +static void DecrementParallelTaskCountForNodesInvolved(BackgroundTask *task); /* flags set by signal handlers */ static volatile sig_atomic_t GotSigterm = false; static volatile sig_atomic_t GotSigint = false; static volatile sig_atomic_t GotSighup = false; +/* keeping track of parallel background tasks per node */ +HTAB *ParallelTasksPerNode = NULL; +int MaxBackgroundTaskExecutorsPerNode = 1; + PG_FUNCTION_INFO_V1(citus_job_cancel); PG_FUNCTION_INFO_V1(citus_job_wait); PG_FUNCTION_INFO_V1(citus_task_wait); @@ -211,7 +217,7 @@ citus_job_wait(PG_FUNCTION_ARGS) * assume any terminal state as its desired status. The function returns if the * desired_state was reached. * - * The current implementation is a polling implementation with an interval of 1 second. + * The current implementation is a polling implementation with an interval of 0.1 seconds. * Ideally we would have some synchronization between the background tasks queue monitor * and any backend calling this function to receive a signal when the task changes state. */ @@ -857,6 +863,7 @@ TaskEnded(TaskExecutionContext *taskExecutionContext) UpdateBackgroundTask(task); UpdateDependingTasks(task); UpdateBackgroundJob(task->jobid); + DecrementParallelTaskCountForNodesInvolved(task); /* we are sure that at least one task did not block on current iteration */ queueMonitorExecutionContext->allTasksWouldBlock = false; @@ -868,6 +875,77 @@ TaskEnded(TaskExecutionContext *taskExecutionContext) } +/* + * IncrementParallelTaskCountForNodesInvolved + * Checks whether we have reached the limit of parallel tasks per node + * per each of the nodes involved with the task + * If at least one limit is reached, it returns false. + * If limits aren't reached, it increments the parallel task count + * for each of the nodes involved with the task, and returns true. + */ +bool +IncrementParallelTaskCountForNodesInvolved(BackgroundTask *task) +{ + if (task->nodesInvolved) + { + int node; + + /* first check whether we have reached the limit for any of the nodes */ + foreach_int(node, task->nodesInvolved) + { + bool found; + ParallelTasksPerNodeEntry *hashEntry = hash_search( + ParallelTasksPerNode, &(node), HASH_ENTER, &found); + if (!found) + { + hashEntry->counter = 0; + } + else if (hashEntry->counter >= MaxBackgroundTaskExecutorsPerNode) + { + /* at least one node's limit is reached */ + return false; + } + } + + /* then, increment the parallel task count per each node */ + foreach_int(node, task->nodesInvolved) + { + ParallelTasksPerNodeEntry *hashEntry = hash_search( + ParallelTasksPerNode, &(node), HASH_FIND, NULL); + Assert(hashEntry); + hashEntry->counter += 1; + } + } + + return true; +} + + +/* + * DecrementParallelTaskCountForNodesInvolved + * Decrements the parallel task count for each of the nodes involved + * with the task. + * We call this function after the task has gone through Running state + * and then has ended. + */ +static void +DecrementParallelTaskCountForNodesInvolved(BackgroundTask *task) +{ + if (task->nodesInvolved) + { + int node; + foreach_int(node, task->nodesInvolved) + { + ParallelTasksPerNodeEntry *hashEntry = hash_search(ParallelTasksPerNode, + &(node), + HASH_FIND, NULL); + + hashEntry->counter -= 1; + } + } +} + + /* * QueueMonitorSigHupHandler handles SIGHUP to update monitor related config params. */ @@ -1023,7 +1101,7 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) /* handle SIGINT to properly cancel active task executors */ pqsignal(SIGINT, QueueMonitorSigIntHandler); - /* handle SIGHUP to update MaxBackgroundTaskExecutors */ + /* handle SIGHUP to update MaxBackgroundTaskExecutors and MaxBackgroundTaskExecutorsPerNode */ pqsignal(SIGHUP, QueueMonitorSigHupHandler); /* ready to handle signals */ @@ -1167,10 +1245,15 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) { GotSighup = false; - /* update max_background_task_executors if changed */ + /* update max_background_task_executors and max_background_task_executors_per_node if changed */ ProcessConfigFile(PGC_SIGHUP); } + if (ParallelTasksPerNode == NULL) + { + ParallelTasksPerNode = CreateSimpleHash(int32, ParallelTasksPerNodeEntry); + } + /* assign runnable tasks, if any, to new task executors in a transaction if we do not have SIGTERM or SIGINT */ if (!MonitorGotTerminationOrCancellationRequest()) { diff --git a/src/include/distributed/background_jobs.h b/src/include/distributed/background_jobs.h index 3a14b6207..35745c014 100644 --- a/src/include/distributed/background_jobs.h +++ b/src/include/distributed/background_jobs.h @@ -85,6 +85,21 @@ typedef struct TaskExecutionContext } TaskExecutionContext; +/* + * ParallelTasksPerNodeEntry is the struct used + * to track the number of concurrent background tasks that + * involve a particular node (the key to the entry) + */ +typedef struct ParallelTasksPerNodeEntry +{ + /* Used as hash key. */ + int32 node_id; + + /* number of concurrent background tasks that involve node node_id */ + uint32 counter; +} ParallelTasksPerNodeEntry; + + extern BackgroundWorkerHandle * StartCitusBackgroundTaskQueueMonitor(Oid database, Oid extensionOwner); extern void CitusBackgroundTaskQueueMonitorMain(Datum arg); @@ -95,5 +110,6 @@ extern Datum citus_job_wait(PG_FUNCTION_ARGS); extern Datum citus_task_wait(PG_FUNCTION_ARGS); extern void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus); extern void citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus); +extern bool IncrementParallelTaskCountForNodesInvolved(BackgroundTask *task); #endif /*CITUS_BACKGROUND_JOBS_H */ diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index f7b2038ee..e27f3df22 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -252,6 +252,7 @@ typedef struct BackgroundTask int32 *retry_count; TimestampTz *not_before; char *message; + List *nodesInvolved; /* extra space to store values for nullable value types above */ struct @@ -388,7 +389,9 @@ extern bool HasNonTerminalJobOfType(const char *jobType, int64 *jobIdOut); extern int64 CreateBackgroundJob(const char *jobType, const char *description); extern BackgroundTask * ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskCount, - int64 dependingTaskIds[]); + int64 dependingTaskIds[], + int nodesInvolvedCount, + int32 nodesInvolved[]); extern BackgroundTask * GetRunnableBackgroundTask(void); extern void ResetRunningBackgroundTasks(void); extern BackgroundJob * GetBackgroundJobByJobId(int64 jobId); diff --git a/src/include/distributed/pg_dist_background_task.h b/src/include/distributed/pg_dist_background_task.h index b6d132e59..9e6673a64 100644 --- a/src/include/distributed/pg_dist_background_task.h +++ b/src/include/distributed/pg_dist_background_task.h @@ -15,7 +15,7 @@ * compiler constants for pg_dist_background_task * ---------------- */ -#define Natts_pg_dist_background_task 9 +#define Natts_pg_dist_background_task 10 #define Anum_pg_dist_background_task_job_id 1 #define Anum_pg_dist_background_task_task_id 2 #define Anum_pg_dist_background_task_owner 3 @@ -25,5 +25,6 @@ #define Anum_pg_dist_background_task_retry_count 7 #define Anum_pg_dist_background_task_not_before 8 #define Anum_pg_dist_background_task_message 9 +#define Anum_pg_dist_background_task_nodes_involved 10 #endif /* CITUS_PG_DIST_BACKGROUND_TASK_H */ diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 90f73e2f3..705196ad4 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -190,6 +190,7 @@ extern char *VariablesToBePassedToNewConnections; extern int MaxRebalancerLoggedIgnoredMoves; extern bool RunningUnderIsolationTest; extern bool PropagateSessionSettingsForLoopbackConnection; +extern int MaxBackgroundTaskExecutorsPerNode; /* External function declarations */ extern Datum shard_placement_rebalance_array(PG_FUNCTION_ARGS); diff --git a/src/include/distributed/utils/array_type.h b/src/include/distributed/utils/array_type.h index 4599b8a9f..43826076e 100644 --- a/src/include/distributed/utils/array_type.h +++ b/src/include/distributed/utils/array_type.h @@ -22,5 +22,6 @@ extern ArrayType * DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId); extern List * IntegerArrayTypeToList(ArrayType *arrayObject); extern List * TextArrayTypeToIntegerList(ArrayType *arrayObject); +extern Datum IntArrayToDatum(uint32 int_array_size, int int_array[]); #endif /* CITUS_ARRAY_TYPE_H */ diff --git a/src/test/regress/expected/background_rebalance_parallel.out b/src/test/regress/expected/background_rebalance_parallel.out index 862beb57e..9c43fab9b 100644 --- a/src/test/regress/expected/background_rebalance_parallel.out +++ b/src/test/regress/expected/background_rebalance_parallel.out @@ -1,12 +1,22 @@ -/* - Test to check if the background tasks scheduled by the background rebalancer - has the correct dependencies. -*/ +-- +-- BACKGROUND_REBALANCE_PARALLEL +-- +-- Test to check if the background tasks scheduled by the background rebalancer +-- have the correct dependencies +-- +-- Test to verify that we do not allow parallel rebalancer moves involving a +-- particular node (either as source or target) more than +-- citus.max_background_task_executors_per_node, and that we can change the GUC on +-- the fly, and that will affect the ongoing balance as it should +-- +-- Test to verify that there's a hard dependency when a specific node is first being +-- used as a source for a move, and then later as a target. +-- CREATE SCHEMA background_rebalance_parallel; SET search_path TO background_rebalance_parallel; SET citus.next_shard_id TO 85674000; SET citus.shard_replication_factor TO 1; -SET client_min_messages TO WARNING; +SET client_min_messages TO ERROR; ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050; @@ -45,52 +55,52 @@ SELECT pg_reload_conf(); t (1 row) -/* Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group */ +-- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group CREATE TABLE table1_colg1 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4 , colocate_with => 'none'); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE table2_colg1 (b int PRIMARY KEY); -SELECT create_distributed_table('table2_colg1', 'b' , colocate_with => 'table1_colg1'); +SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); create_distributed_table --------------------------------------------------------------------- (1 row) -/* Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group */ +-- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group CREATE TABLE table1_colg2 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg2 ', 'a', shard_count => 4, colocate_with => 'none'); +SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE table2_colg2 (b int primary key); -SELECT create_distributed_table('table2_colg2', 'b' , colocate_with => 'table1_colg2'); +SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); create_distributed_table --------------------------------------------------------------------- (1 row) -/* Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group */ +-- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group CREATE TABLE table1_colg3 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg3 ', 'a', shard_count => 4, colocate_with => 'none'); +SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE table2_colg3 (b int primary key); -SELECT create_distributed_table('table2_colg3', 'b' , colocate_with => 'table1_colg3'); +SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); create_distributed_table --------------------------------------------------------------------- (1 row) -/* Add two new node so that we can rebalance */ +-- Add two new nodes so that we can rebalance SELECT 1 FROM citus_add_node('localhost', :worker_3_port); ?column? --------------------------------------------------------------------- @@ -132,10 +142,11 @@ SELECT citus_rebalance_wait(); (1 row) -/*Check that a move is dependent on - 1. any other move scheduled earlier in its colocation group. - 2. any other move scheduled earlier whose source node or target - node overlaps with the current moves nodes. */ +-- PART 1 +-- Test to check if the background tasks scheduled by the background rebalancer +-- have the correct dependencies +-- Check that a move is dependent on +-- any other move scheduled earlier in its colocation group. SELECT S.shardid, P.colocationid FROM pg_dist_shard S, pg_dist_partition P WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC; @@ -175,16 +186,12 @@ FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id, task_id | command | depends_on | command --------------------------------------------------------------------- 1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto') - 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto') - 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto') 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') - 1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') - 1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') 1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto') -(7 rows) +(3 rows) -/* Check that if there is a reference table that needs to be synched to a node, - any move without a dependency must depend on the move task for reference table. */ +-- Check that if there is a reference table that needs to be synched to a node, +-- any move without a dependency must depend on the move task for reference table. SELECT 1 FROM citus_drain_node('localhost',:worker_4_port); ?column? --------------------------------------------------------------------- @@ -203,8 +210,8 @@ SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true) 1 (1 row) -/* Drain worker_3 so that we can move only one colocation group to worker_3 - to create an unbalance that would cause parallel rebalancing. */ +-- Drain worker_3 so that we can move only one colocation group to worker_3 +-- to create an unbalance that would cause parallel rebalancing. SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); ?column? --------------------------------------------------------------------- @@ -225,7 +232,7 @@ SELECT create_reference_table('ref_table'); (1 row) -/* Move all the shards of Colocation group 3 to worker_3.*/ +-- Move all the shards of Colocation group 3 to worker_3. SELECT master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') FROM @@ -243,7 +250,7 @@ ORDER BY (4 rows) CALL citus_cleanup_orphaned_resources(); -/* Activate and new nodes so that we can rebalance. */ +-- Activate and new nodes so that we can rebalance. SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); ?column? --------------------------------------------------------------------- @@ -323,18 +330,34 @@ FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id, 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto') 1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') 1010 | SELECT pg_catalog.citus_move_shard_placement(85674017,52,53,'auto') | 1007 | SELECT pg_catalog.citus_move_shard_placement(85674016,52,53,'auto') - 1011 | SELECT pg_catalog.citus_move_shard_placement(85674008,51,54,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') + 1011 | SELECT pg_catalog.citus_move_shard_placement(85674008,51,54,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto') 1012 | SELECT pg_catalog.citus_move_shard_placement(85674001,50,55,'auto') | 1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto') (6 rows) +-- PART 2 +-- Test to verify that we do not allow parallel rebalancer moves involving a +-- particular node (either as source or target) +-- more than citus.max_background_task_executors_per_node +-- and that we can change the GUC on the fly +-- citus_task_wait calls are used to ensure consistent pg_dist_background_task query +-- output i.e. to avoid flakiness +-- First let's restart the scenario DROP SCHEMA background_rebalance_parallel CASCADE; TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; SELECT public.wait_for_resource_cleanup(); wait_for_resource_cleanup --------------------------------------------------------------------- (1 row) +select citus_remove_node('localhost', :worker_2_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + select citus_remove_node('localhost', :worker_3_port); citus_remove_node --------------------------------------------------------------------- @@ -359,6 +382,474 @@ select citus_remove_node('localhost', :worker_6_port); (1 row) +CREATE SCHEMA background_rebalance_parallel; +SET search_path TO background_rebalance_parallel; +-- Create 8 tables in 4 colocation groups, and populate them +CREATE TABLE table1_colg1 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 3, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table1_colg1 SELECT i FROM generate_series(0, 100)i; +CREATE TABLE table2_colg1 (b int PRIMARY KEY); +SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table2_colg1 SELECT i FROM generate_series(0, 100)i; +CREATE TABLE table1_colg2 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg2', 'a', shard_count => 3, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table1_colg2 SELECT i FROM generate_series(0, 100)i; +CREATE TABLE table2_colg2 (b int PRIMARY KEY); +SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table2_colg2 SELECT i FROM generate_series(0, 100)i; +CREATE TABLE table1_colg3 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg3', 'a', shard_count => 3, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table1_colg3 SELECT i FROM generate_series(0, 100)i; +CREATE TABLE table2_colg3 (b int primary key); +SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table2_colg3 SELECT i FROM generate_series(0, 100)i; +CREATE TABLE table1_colg4 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg4', 'a', shard_count => 3, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table1_colg4 SELECT i FROM generate_series(0, 100)i; +CREATE TABLE table2_colg4 (b int PRIMARY KEY); +SELECT create_distributed_table('table2_colg4', 'b', colocate_with => 'table1_colg4'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table2_colg4 SELECT i FROM generate_series(0, 100)i; +-- Add nodes so that we can rebalance +SELECT citus_add_node('localhost', :worker_2_port); + citus_add_node +--------------------------------------------------------------------- + 56 +(1 row) + +SELECT citus_add_node('localhost', :worker_3_port); + citus_add_node +--------------------------------------------------------------------- + 57 +(1 row) + +SELECT citus_rebalance_start AS job_id from citus_rebalance_start() \gset +-- see dependent tasks to understand which tasks remain runnable because of +-- citus.max_background_task_executors_per_node +-- and which tasks are actually blocked from colocation group dependencies +SELECT D.task_id, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; + task_id | command | depends_on | command +--------------------------------------------------------------------- + 1014 | SELECT pg_catalog.citus_move_shard_placement(85674026,50,57,'auto') | 1013 | SELECT pg_catalog.citus_move_shard_placement(85674025,50,56,'auto') + 1016 | SELECT pg_catalog.citus_move_shard_placement(85674032,50,57,'auto') | 1015 | SELECT pg_catalog.citus_move_shard_placement(85674031,50,56,'auto') + 1018 | SELECT pg_catalog.citus_move_shard_placement(85674038,50,57,'auto') | 1017 | SELECT pg_catalog.citus_move_shard_placement(85674037,50,56,'auto') + 1020 | SELECT pg_catalog.citus_move_shard_placement(85674044,50,57,'auto') | 1019 | SELECT pg_catalog.citus_move_shard_placement(85674043,50,56,'auto') +(4 rows) + +-- default citus.max_background_task_executors_per_node is 1 +-- show that first exactly one task per node is running +-- among the tasks that are not blocked +SELECT citus_task_wait(1013, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT job_id, task_id, status, nodes_involved +FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; + job_id | task_id | status | nodes_involved +--------------------------------------------------------------------- + 17779 | 1013 | running | {50,56} + 17779 | 1014 | blocked | {50,57} + 17779 | 1015 | runnable | {50,56} + 17779 | 1016 | blocked | {50,57} + 17779 | 1017 | runnable | {50,56} + 17779 | 1018 | blocked | {50,57} + 17779 | 1019 | runnable | {50,56} + 17779 | 1020 | blocked | {50,57} +(8 rows) + +-- increase citus.max_background_task_executors_per_node +ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT citus_task_wait(1015, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(1013, desired_status => 'done'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +-- show that at most 2 tasks per node are running +-- among the tasks that are not blocked +SELECT job_id, task_id, status, nodes_involved +FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; + job_id | task_id | status | nodes_involved +--------------------------------------------------------------------- + 17779 | 1013 | done | {50,56} + 17779 | 1014 | running | {50,57} + 17779 | 1015 | running | {50,56} + 17779 | 1016 | blocked | {50,57} + 17779 | 1017 | runnable | {50,56} + 17779 | 1018 | blocked | {50,57} + 17779 | 1019 | runnable | {50,56} + 17779 | 1020 | blocked | {50,57} +(8 rows) + +-- decrease to default (1) +ALTER SYSTEM RESET citus.max_background_task_executors_per_node; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT citus_task_wait(1015, desired_status => 'done'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(1014, desired_status => 'done'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(1016, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +-- show that exactly one task per node is running +-- among the tasks that are not blocked +SELECT job_id, task_id, status, nodes_involved +FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; + job_id | task_id | status | nodes_involved +--------------------------------------------------------------------- + 17779 | 1013 | done | {50,56} + 17779 | 1014 | done | {50,57} + 17779 | 1015 | done | {50,56} + 17779 | 1016 | running | {50,57} + 17779 | 1017 | runnable | {50,56} + 17779 | 1018 | blocked | {50,57} + 17779 | 1019 | runnable | {50,56} + 17779 | 1020 | blocked | {50,57} +(8 rows) + +SELECT citus_rebalance_stop(); + citus_rebalance_stop +--------------------------------------------------------------------- + +(1 row) + +-- PART 3 +-- Test to verify that there's a hard dependency when A specific node is first being used as a +-- source for a move, and then later as a target. +-- First let's restart the scenario +DROP SCHEMA background_rebalance_parallel CASCADE; +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_1_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_2_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_3_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +CREATE SCHEMA background_rebalance_parallel; +SET search_path TO background_rebalance_parallel; +SET citus.next_shard_id TO 85674051; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 61; +-- add the first node +-- nodeid here is 61 +select citus_add_node('localhost', :worker_1_port); + citus_add_node +--------------------------------------------------------------------- + 61 +(1 row) + +-- create, populate and distribute 6 tables, each with 1 shard, none colocated with each other +CREATE TABLE table1_colg1 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 1, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table1_colg1 SELECT i FROM generate_series(0, 100)i; +CREATE TABLE table1_colg2 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg2', 'a', shard_count => 1, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table1_colg2 SELECT i FROM generate_series(0, 100)i; +CREATE TABLE table1_colg3 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg3', 'a', shard_count => 1, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table1_colg3 SELECT i FROM generate_series(0, 100)i; +CREATE TABLE table1_colg4 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg4', 'a', shard_count => 1, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table1_colg4 SELECT i FROM generate_series(0, 100)i; +CREATE TABLE table1_colg5 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg5', 'a', shard_count => 1, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table1_colg5 SELECT i FROM generate_series(0, 100)i; +CREATE TABLE table1_colg6 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg6', 'a', shard_count => 1, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table1_colg6 SELECT i FROM generate_series(0, 100)i; +-- add two other nodes +-- nodeid here is 62 +select citus_add_node('localhost', :worker_2_port); + citus_add_node +--------------------------------------------------------------------- + 62 +(1 row) + +-- nodeid here is 63 +select citus_add_node('localhost', :worker_3_port); + citus_add_node +--------------------------------------------------------------------- + 63 +(1 row) + +CREATE OR REPLACE FUNCTION shard_placement_rebalance_array( + worker_node_list json[], + shard_placement_list json[], + threshold float4 DEFAULT 0, + max_shard_moves int DEFAULT 1000000, + drain_only bool DEFAULT false, + improvement_threshold float4 DEFAULT 0.5 +) +RETURNS json[] +AS 'citus' +LANGUAGE C STRICT VOLATILE; +-- we are simulating the following from shard_rebalancer_unit.sql +-- the following steps are all according to this scenario +-- where the third move should be dependent of the first two +-- because the third move's target is the source of the first two +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "hostname1", "disallowed_shards": "1,2,3,5,6"}', + '{"node_name": "hostname2", "disallowed_shards": "4"}', + '{"node_name": "hostname3", "disallowed_shards": "4"}' + ]::json[], + ARRAY['{"shardid":1, "nodename":"hostname1"}', + '{"shardid":2, "nodename":"hostname1"}', + '{"shardid":3, "nodename":"hostname2"}', + '{"shardid":4, "nodename":"hostname2"}', + '{"shardid":5, "nodename":"hostname3"}', + '{"shardid":6, "nodename":"hostname3"}' + ]::json[] +)); + unnest +--------------------------------------------------------------------- + {"updatetype":1,"shardid":1,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname2","targetport":5432} + {"updatetype":1,"shardid":2,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname3","targetport":5432} + {"updatetype":1,"shardid":4,"sourcename":"hostname2","sourceport":5432,"targetname":"hostname1","targetport":5432} +(3 rows) + +-- manually balance the cluster such that we have +-- a balanced cluster like above with 1,2,3,4,5,6 and hostname1/2/3 +-- shardid 85674051 (1) nodeid 61 (hostname1) +-- shardid 85674052 (2) nodeid 61 (hostname1) +-- shardid 85674053 (3) nodeid 62 (hostname2) +-- shardid 85674054 (4) nodeid 62 (hostname2) +-- shardid 85674055 (5) nodeid 63 (hostname3) +-- shardid 85674056 (6) nodeid 63 (hostname3) +SELECT pg_catalog.citus_move_shard_placement(85674053,61,62,'auto'); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT pg_catalog.citus_move_shard_placement(85674054,61,62,'auto'); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT pg_catalog.citus_move_shard_placement(85674055,61,63,'auto'); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT pg_catalog.citus_move_shard_placement(85674056,61,63,'auto'); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- now create another rebalance strategy in order to simulate moves +-- which use as target a node that has been previously used as source +CREATE OR REPLACE FUNCTION test_shard_allowed_on_node(shardid bigint, nodeid int) + RETURNS boolean AS +$$ + -- analogous to '{"node_name": "hostname1", "disallowed_shards": "1,2,3,5,6"}' + select case when (shardid != 85674054 and nodeid = 61) + then false + -- analogous to '{"node_name": "hostname2", "disallowed_shards": "4"}' + -- AND '{"node_name": "hostname2", "disallowed_shards": "4"}' + when (shardid = 85674054 and nodeid != 61) + then false + else true + end; +$$ LANGUAGE sql; +-- insert the new test rebalance strategy +INSERT INTO + pg_catalog.pg_dist_rebalance_strategy( + name, + default_strategy, + shard_cost_function, + node_capacity_function, + shard_allowed_on_node_function, + default_threshold, + minimum_threshold, + improvement_threshold + ) VALUES ( + 'test_source_then_target', + false, + 'citus_shard_cost_1', + 'citus_node_capacity_1', + 'background_rebalance_parallel.test_shard_allowed_on_node', + 0, + 0, + 0 + ); +SELECT * FROM get_rebalance_table_shards_plan(rebalance_strategy := 'test_source_then_target'); + table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport +--------------------------------------------------------------------- + table1_colg1 | 85674051 | 0 | localhost | 57637 | localhost | 57638 + table1_colg2 | 85674052 | 0 | localhost | 57637 | localhost | 57639 + table1_colg4 | 85674054 | 0 | localhost | 57638 | localhost | 57637 +(3 rows) + +SELECT citus_rebalance_start AS job_id from citus_rebalance_start(rebalance_strategy := 'test_source_then_target') \gset +-- check that the third move is blocked and depends on the first two +SELECT job_id, task_id, status, nodes_involved +FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; + job_id | task_id | status | nodes_involved +--------------------------------------------------------------------- + 17780 | 1021 | runnable | {61,62} + 17780 | 1022 | runnable | {61,63} + 17780 | 1023 | blocked | {62,61} +(3 rows) + +SELECT D.task_id, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; + task_id | command | depends_on | command +--------------------------------------------------------------------- + 1023 | SELECT pg_catalog.citus_move_shard_placement(85674054,62,61,'auto') | 1021 | SELECT pg_catalog.citus_move_shard_placement(85674051,61,62,'auto') + 1023 | SELECT pg_catalog.citus_move_shard_placement(85674054,62,61,'auto') | 1022 | SELECT pg_catalog.citus_move_shard_placement(85674052,61,63,'auto') +(2 rows) + +SELECT citus_rebalance_stop(); + citus_rebalance_stop +--------------------------------------------------------------------- + +(1 row) + +DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_source_then_target'; +DROP SCHEMA background_rebalance_parallel CASCADE; +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_3_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + -- keep the rest of the tests inact that depends node/group ids ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls; diff --git a/src/test/regress/expected/background_task_queue_monitor.out b/src/test/regress/expected/background_task_queue_monitor.out index 9435af3d3..2b4f7de37 100644 --- a/src/test/regress/expected/background_task_queue_monitor.out +++ b/src/test/regress/expected/background_task_queue_monitor.out @@ -3,6 +3,7 @@ SET search_path TO background_task_queue_monitor; SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 3536400; +SET client_min_messages TO ERROR; -- reset sequence values ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 1450000; ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1450000; @@ -654,11 +655,268 @@ SELECT job_id, task_id, status FROM pg_dist_background_task 1450016 | 1450024 | done (2 rows) +-- TEST11 +-- verify that we do not allow parallel task executors involving a particular node +-- more than citus.max_background_task_executors_per_node +-- verify that we can change citus.max_background_task_executors_per_node on the fly +-- tests are done with dummy node ids +-- citus_task_wait calls are used to ensure consistent pg_dist_background_task query +-- output i.e. to avoid flakiness +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify changing max background task executors per node on the fly') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(2); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(2); $job$, ARRAY [3, 4]) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id3 \gset +INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [1, 3]) RETURNING task_id AS task_id4 \gset +INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [2, 4]) RETURNING task_id AS task_id5 \gset +INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(7); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id6 \gset +INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(6); $job$, ARRAY [1, 3]) RETURNING task_id AS task_id7 \gset +INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(6); $job$, ARRAY [1, 4]) RETURNING task_id AS task_id8 \gset +COMMIT; +SELECT citus_task_wait(:task_id1, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id2, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, + :task_id5, :task_id6, :task_id7, :task_id8) + ORDER BY job_id, task_id; -- show that at most 1 task per node is running + job_id | task_id | status | nodes_involved +--------------------------------------------------------------------- + 1450017 | 1450025 | running | {1,2} + 1450017 | 1450026 | running | {3,4} + 1450017 | 1450027 | runnable | {1,2} + 1450017 | 1450028 | runnable | {1,3} + 1450017 | 1450029 | runnable | {2,4} + 1450017 | 1450030 | runnable | {1,2} + 1450017 | 1450031 | runnable | {1,3} + 1450017 | 1450032 | runnable | {1,4} +(8 rows) + +SELECT citus_task_wait(:task_id1, desired_status => 'done'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id2, desired_status => 'done'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +-- increase max_background_task_executors_per_node on the fly +ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT citus_task_wait(:task_id3, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id4, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id5, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, + :task_id5, :task_id6, :task_id7, :task_id8) + ORDER BY job_id, task_id; -- show that at most 2 tasks per node are running + job_id | task_id | status | nodes_involved +--------------------------------------------------------------------- + 1450017 | 1450025 | done | {1,2} + 1450017 | 1450026 | done | {3,4} + 1450017 | 1450027 | running | {1,2} + 1450017 | 1450028 | running | {1,3} + 1450017 | 1450029 | running | {2,4} + 1450017 | 1450030 | runnable | {1,2} + 1450017 | 1450031 | runnable | {1,3} + 1450017 | 1450032 | runnable | {1,4} +(8 rows) + +-- increase to 3 max_background_task_executors_per_node on the fly +SELECT citus_task_wait(:task_id3, desired_status => 'done'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id4, desired_status => 'done'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id5, desired_status => 'done'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +ALTER SYSTEM SET citus.max_background_task_executors_per_node = 3; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT citus_task_wait(:task_id6, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id7, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id8, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, + :task_id5, :task_id6, :task_id7, :task_id8) + ORDER BY job_id, task_id; -- show that at most 3 tasks per node are running + job_id | task_id | status | nodes_involved +--------------------------------------------------------------------- + 1450017 | 1450025 | done | {1,2} + 1450017 | 1450026 | done | {3,4} + 1450017 | 1450027 | done | {1,2} + 1450017 | 1450028 | done | {1,3} + 1450017 | 1450029 | done | {2,4} + 1450017 | 1450030 | running | {1,2} + 1450017 | 1450031 | running | {1,3} + 1450017 | 1450032 | running | {1,4} +(8 rows) + +ALTER SYSTEM RESET citus.max_background_task_executors_per_node; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +-- if pg_cancel_backend is called on one of the running task PIDs +-- task doesn't restart because it's not allowed anymore by the limit. +-- node with id 1 can be used only once, unless there are previously running tasks +SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset +SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process + pg_cancel_backend +--------------------------------------------------------------------- + t +(1 row) + +-- task goes to only runnable state, not running anymore. +SELECT citus_task_wait(:task_id6, desired_status => 'runnable'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +-- show that cancelled task hasn't restarted because limit doesn't allow it +SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, + :task_id5, :task_id6, :task_id7, :task_id8) + ORDER BY job_id, task_id; + job_id | task_id | status | nodes_involved +--------------------------------------------------------------------- + 1450017 | 1450025 | done | {1,2} + 1450017 | 1450026 | done | {3,4} + 1450017 | 1450027 | done | {1,2} + 1450017 | 1450028 | done | {1,3} + 1450017 | 1450029 | done | {2,4} + 1450017 | 1450030 | runnable | {1,2} + 1450017 | 1450031 | running | {1,3} + 1450017 | 1450032 | running | {1,4} +(8 rows) + +SELECT citus_task_wait(:task_id7, desired_status => 'done'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id8, desired_status => 'done'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id6, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +-- show that the 6th task has restarted only after both 6 and 7 are done +-- since we have a limit of 1 background task executor per node with id 1 +SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, + :task_id5, :task_id6, :task_id7, :task_id8) + ORDER BY job_id, task_id; + job_id | task_id | status | nodes_involved +--------------------------------------------------------------------- + 1450017 | 1450025 | done | {1,2} + 1450017 | 1450026 | done | {3,4} + 1450017 | 1450027 | done | {1,2} + 1450017 | 1450028 | done | {1,3} + 1450017 | 1450029 | done | {2,4} + 1450017 | 1450030 | running | {1,2} + 1450017 | 1450031 | done | {1,3} + 1450017 | 1450032 | done | {1,4} +(8 rows) + +SELECT citus_job_cancel(:job_id1); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id1); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +ALTER SYSTEM RESET citus.max_background_task_executors_per_node; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + SET client_min_messages TO WARNING; TRUNCATE pg_dist_background_job CASCADE; TRUNCATE pg_dist_background_task CASCADE; TRUNCATE pg_dist_background_task_depend; DROP SCHEMA background_task_queue_monitor CASCADE; +RESET client_min_messages; ALTER SYSTEM RESET citus.background_task_queue_interval; ALTER SYSTEM RESET citus.max_background_task_executors; SELECT pg_reload_conf(); diff --git a/src/test/regress/expected/multi_metadata_attributes.out b/src/test/regress/expected/multi_metadata_attributes.out index 3ce512c2b..b54946d3f 100644 --- a/src/test/regress/expected/multi_metadata_attributes.out +++ b/src/test/regress/expected/multi_metadata_attributes.out @@ -9,7 +9,8 @@ FROM pg_attribute WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass, 'pg_dist_rebalance_strategy'::regclass, 'pg_dist_partition'::regclass, - 'pg_dist_object'::regclass) + 'pg_dist_object'::regclass, + 'pg_dist_background_task'::regclass) ORDER BY attrelid, attname; attrelid | attname | atthasmissing | attmissingval --------------------------------------------------------------------- diff --git a/src/test/regress/sql/background_rebalance_parallel.sql b/src/test/regress/sql/background_rebalance_parallel.sql index 8c5fb5bb1..5229e7f88 100644 --- a/src/test/regress/sql/background_rebalance_parallel.sql +++ b/src/test/regress/sql/background_rebalance_parallel.sql @@ -1,12 +1,22 @@ -/* - Test to check if the background tasks scheduled by the background rebalancer - has the correct dependencies. -*/ +-- +-- BACKGROUND_REBALANCE_PARALLEL +-- +-- Test to check if the background tasks scheduled by the background rebalancer +-- have the correct dependencies +-- +-- Test to verify that we do not allow parallel rebalancer moves involving a +-- particular node (either as source or target) more than +-- citus.max_background_task_executors_per_node, and that we can change the GUC on +-- the fly, and that will affect the ongoing balance as it should +-- +-- Test to verify that there's a hard dependency when a specific node is first being +-- used as a source for a move, and then later as a target. +-- CREATE SCHEMA background_rebalance_parallel; SET search_path TO background_rebalance_parallel; SET citus.next_shard_id TO 85674000; SET citus.shard_replication_factor TO 1; -SET client_min_messages TO WARNING; +SET client_min_messages TO ERROR; ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; @@ -26,34 +36,34 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port); ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; SELECT pg_reload_conf(); -/* Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group */ +-- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group CREATE TABLE table1_colg1 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4 , colocate_with => 'none'); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none'); CREATE TABLE table2_colg1 (b int PRIMARY KEY); -SELECT create_distributed_table('table2_colg1', 'b' , colocate_with => 'table1_colg1'); +SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); -/* Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group */ +-- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group CREATE TABLE table1_colg2 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg2 ', 'a', shard_count => 4, colocate_with => 'none'); +SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none'); CREATE TABLE table2_colg2 (b int primary key); -SELECT create_distributed_table('table2_colg2', 'b' , colocate_with => 'table1_colg2'); +SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); -/* Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group */ +-- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group CREATE TABLE table1_colg3 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg3 ', 'a', shard_count => 4, colocate_with => 'none'); +SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none'); CREATE TABLE table2_colg3 (b int primary key); -SELECT create_distributed_table('table2_colg3', 'b' , colocate_with => 'table1_colg3'); +SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); -/* Add two new node so that we can rebalance */ +-- Add two new nodes so that we can rebalance SELECT 1 FROM citus_add_node('localhost', :worker_3_port); SELECT 1 FROM citus_add_node('localhost', :worker_4_port); @@ -63,10 +73,12 @@ SELECT * FROM citus_rebalance_start(); SELECT citus_rebalance_wait(); -/*Check that a move is dependent on - 1. any other move scheduled earlier in its colocation group. - 2. any other move scheduled earlier whose source node or target - node overlaps with the current moves nodes. */ +-- PART 1 +-- Test to check if the background tasks scheduled by the background rebalancer +-- have the correct dependencies + +-- Check that a move is dependent on +-- any other move scheduled earlier in its colocation group. SELECT S.shardid, P.colocationid FROM pg_dist_shard S, pg_dist_partition P WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC; @@ -78,14 +90,14 @@ SELECT D.task_id, FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id, D.depends_on ASC; -/* Check that if there is a reference table that needs to be synched to a node, - any move without a dependency must depend on the move task for reference table. */ +-- Check that if there is a reference table that needs to be synched to a node, +-- any move without a dependency must depend on the move task for reference table. SELECT 1 FROM citus_drain_node('localhost',:worker_4_port); SELECT public.wait_for_resource_cleanup(); SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true); -/* Drain worker_3 so that we can move only one colocation group to worker_3 - to create an unbalance that would cause parallel rebalancing. */ +-- Drain worker_3 so that we can move only one colocation group to worker_3 +-- to create an unbalance that would cause parallel rebalancing. SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true); @@ -95,7 +107,7 @@ CREATE TABLE ref_table(a int PRIMARY KEY); SELECT create_reference_table('ref_table'); -/* Move all the shards of Colocation group 3 to worker_3.*/ +-- Move all the shards of Colocation group 3 to worker_3. SELECT master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') FROM @@ -107,7 +119,7 @@ ORDER BY CALL citus_cleanup_orphaned_resources(); -/* Activate and new nodes so that we can rebalance. */ +-- Activate and new nodes so that we can rebalance. SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true); @@ -128,13 +140,265 @@ SELECT D.task_id, (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id, D.depends_on ASC; +-- PART 2 +-- Test to verify that we do not allow parallel rebalancer moves involving a +-- particular node (either as source or target) +-- more than citus.max_background_task_executors_per_node +-- and that we can change the GUC on the fly +-- citus_task_wait calls are used to ensure consistent pg_dist_background_task query +-- output i.e. to avoid flakiness + +-- First let's restart the scenario DROP SCHEMA background_rebalance_parallel CASCADE; TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; SELECT public.wait_for_resource_cleanup(); +select citus_remove_node('localhost', :worker_2_port); select citus_remove_node('localhost', :worker_3_port); select citus_remove_node('localhost', :worker_4_port); select citus_remove_node('localhost', :worker_5_port); select citus_remove_node('localhost', :worker_6_port); +CREATE SCHEMA background_rebalance_parallel; +SET search_path TO background_rebalance_parallel; + +-- Create 8 tables in 4 colocation groups, and populate them +CREATE TABLE table1_colg1 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 3, colocate_with => 'none'); +INSERT INTO table1_colg1 SELECT i FROM generate_series(0, 100)i; + +CREATE TABLE table2_colg1 (b int PRIMARY KEY); +SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); +INSERT INTO table2_colg1 SELECT i FROM generate_series(0, 100)i; + +CREATE TABLE table1_colg2 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg2', 'a', shard_count => 3, colocate_with => 'none'); +INSERT INTO table1_colg2 SELECT i FROM generate_series(0, 100)i; + +CREATE TABLE table2_colg2 (b int PRIMARY KEY); +SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); +INSERT INTO table2_colg2 SELECT i FROM generate_series(0, 100)i; + +CREATE TABLE table1_colg3 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg3', 'a', shard_count => 3, colocate_with => 'none'); +INSERT INTO table1_colg3 SELECT i FROM generate_series(0, 100)i; + +CREATE TABLE table2_colg3 (b int primary key); +SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); +INSERT INTO table2_colg3 SELECT i FROM generate_series(0, 100)i; + +CREATE TABLE table1_colg4 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg4', 'a', shard_count => 3, colocate_with => 'none'); +INSERT INTO table1_colg4 SELECT i FROM generate_series(0, 100)i; + +CREATE TABLE table2_colg4 (b int PRIMARY KEY); +SELECT create_distributed_table('table2_colg4', 'b', colocate_with => 'table1_colg4'); +INSERT INTO table2_colg4 SELECT i FROM generate_series(0, 100)i; + +-- Add nodes so that we can rebalance +SELECT citus_add_node('localhost', :worker_2_port); +SELECT citus_add_node('localhost', :worker_3_port); + +SELECT citus_rebalance_start AS job_id from citus_rebalance_start() \gset + +-- see dependent tasks to understand which tasks remain runnable because of +-- citus.max_background_task_executors_per_node +-- and which tasks are actually blocked from colocation group dependencies +SELECT D.task_id, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; + +-- default citus.max_background_task_executors_per_node is 1 +-- show that first exactly one task per node is running +-- among the tasks that are not blocked +SELECT citus_task_wait(1013, desired_status => 'running'); +SELECT job_id, task_id, status, nodes_involved +FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; + +-- increase citus.max_background_task_executors_per_node +ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2; +SELECT pg_reload_conf(); +SELECT citus_task_wait(1015, desired_status => 'running'); +SELECT citus_task_wait(1013, desired_status => 'done'); + +-- show that at most 2 tasks per node are running +-- among the tasks that are not blocked +SELECT job_id, task_id, status, nodes_involved +FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; + +-- decrease to default (1) +ALTER SYSTEM RESET citus.max_background_task_executors_per_node; +SELECT pg_reload_conf(); +SELECT citus_task_wait(1015, desired_status => 'done'); +SELECT citus_task_wait(1014, desired_status => 'done'); +SELECT citus_task_wait(1016, desired_status => 'running'); + +-- show that exactly one task per node is running +-- among the tasks that are not blocked +SELECT job_id, task_id, status, nodes_involved +FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; + +SELECT citus_rebalance_stop(); + +-- PART 3 +-- Test to verify that there's a hard dependency when A specific node is first being used as a +-- source for a move, and then later as a target. + +-- First let's restart the scenario +DROP SCHEMA background_rebalance_parallel CASCADE; +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; +SELECT public.wait_for_resource_cleanup(); +select citus_remove_node('localhost', :worker_1_port); +select citus_remove_node('localhost', :worker_2_port); +select citus_remove_node('localhost', :worker_3_port); +CREATE SCHEMA background_rebalance_parallel; +SET search_path TO background_rebalance_parallel; +SET citus.next_shard_id TO 85674051; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 61; + +-- add the first node +-- nodeid here is 61 +select citus_add_node('localhost', :worker_1_port); + +-- create, populate and distribute 6 tables, each with 1 shard, none colocated with each other +CREATE TABLE table1_colg1 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 1, colocate_with => 'none'); +INSERT INTO table1_colg1 SELECT i FROM generate_series(0, 100)i; + +CREATE TABLE table1_colg2 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg2', 'a', shard_count => 1, colocate_with => 'none'); +INSERT INTO table1_colg2 SELECT i FROM generate_series(0, 100)i; + +CREATE TABLE table1_colg3 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg3', 'a', shard_count => 1, colocate_with => 'none'); +INSERT INTO table1_colg3 SELECT i FROM generate_series(0, 100)i; + +CREATE TABLE table1_colg4 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg4', 'a', shard_count => 1, colocate_with => 'none'); +INSERT INTO table1_colg4 SELECT i FROM generate_series(0, 100)i; + +CREATE TABLE table1_colg5 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg5', 'a', shard_count => 1, colocate_with => 'none'); +INSERT INTO table1_colg5 SELECT i FROM generate_series(0, 100)i; + +CREATE TABLE table1_colg6 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg6', 'a', shard_count => 1, colocate_with => 'none'); +INSERT INTO table1_colg6 SELECT i FROM generate_series(0, 100)i; + +-- add two other nodes +-- nodeid here is 62 +select citus_add_node('localhost', :worker_2_port); +-- nodeid here is 63 +select citus_add_node('localhost', :worker_3_port); + +CREATE OR REPLACE FUNCTION shard_placement_rebalance_array( + worker_node_list json[], + shard_placement_list json[], + threshold float4 DEFAULT 0, + max_shard_moves int DEFAULT 1000000, + drain_only bool DEFAULT false, + improvement_threshold float4 DEFAULT 0.5 +) +RETURNS json[] +AS 'citus' +LANGUAGE C STRICT VOLATILE; + +-- we are simulating the following from shard_rebalancer_unit.sql +-- the following steps are all according to this scenario +-- where the third move should be dependent of the first two +-- because the third move's target is the source of the first two +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "hostname1", "disallowed_shards": "1,2,3,5,6"}', + '{"node_name": "hostname2", "disallowed_shards": "4"}', + '{"node_name": "hostname3", "disallowed_shards": "4"}' + ]::json[], + ARRAY['{"shardid":1, "nodename":"hostname1"}', + '{"shardid":2, "nodename":"hostname1"}', + '{"shardid":3, "nodename":"hostname2"}', + '{"shardid":4, "nodename":"hostname2"}', + '{"shardid":5, "nodename":"hostname3"}', + '{"shardid":6, "nodename":"hostname3"}' + ]::json[] +)); + +-- manually balance the cluster such that we have +-- a balanced cluster like above with 1,2,3,4,5,6 and hostname1/2/3 +-- shardid 85674051 (1) nodeid 61 (hostname1) +-- shardid 85674052 (2) nodeid 61 (hostname1) +-- shardid 85674053 (3) nodeid 62 (hostname2) +-- shardid 85674054 (4) nodeid 62 (hostname2) +-- shardid 85674055 (5) nodeid 63 (hostname3) +-- shardid 85674056 (6) nodeid 63 (hostname3) +SELECT pg_catalog.citus_move_shard_placement(85674053,61,62,'auto'); +SELECT pg_catalog.citus_move_shard_placement(85674054,61,62,'auto'); +SELECT pg_catalog.citus_move_shard_placement(85674055,61,63,'auto'); +SELECT pg_catalog.citus_move_shard_placement(85674056,61,63,'auto'); + +-- now create another rebalance strategy in order to simulate moves +-- which use as target a node that has been previously used as source +CREATE OR REPLACE FUNCTION test_shard_allowed_on_node(shardid bigint, nodeid int) + RETURNS boolean AS +$$ + -- analogous to '{"node_name": "hostname1", "disallowed_shards": "1,2,3,5,6"}' + select case when (shardid != 85674054 and nodeid = 61) + then false + -- analogous to '{"node_name": "hostname2", "disallowed_shards": "4"}' + -- AND '{"node_name": "hostname2", "disallowed_shards": "4"}' + when (shardid = 85674054 and nodeid != 61) + then false + else true + end; +$$ LANGUAGE sql; + +-- insert the new test rebalance strategy +INSERT INTO + pg_catalog.pg_dist_rebalance_strategy( + name, + default_strategy, + shard_cost_function, + node_capacity_function, + shard_allowed_on_node_function, + default_threshold, + minimum_threshold, + improvement_threshold + ) VALUES ( + 'test_source_then_target', + false, + 'citus_shard_cost_1', + 'citus_node_capacity_1', + 'background_rebalance_parallel.test_shard_allowed_on_node', + 0, + 0, + 0 + ); + +SELECT * FROM get_rebalance_table_shards_plan(rebalance_strategy := 'test_source_then_target'); + +SELECT citus_rebalance_start AS job_id from citus_rebalance_start(rebalance_strategy := 'test_source_then_target') \gset + +-- check that the third move is blocked and depends on the first two +SELECT job_id, task_id, status, nodes_involved +FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; + +SELECT D.task_id, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; + +SELECT citus_rebalance_stop(); +DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_source_then_target'; + +DROP SCHEMA background_rebalance_parallel CASCADE; +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; +SELECT public.wait_for_resource_cleanup(); +select citus_remove_node('localhost', :worker_3_port); -- keep the rest of the tests inact that depends node/group ids ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls; diff --git a/src/test/regress/sql/background_task_queue_monitor.sql b/src/test/regress/sql/background_task_queue_monitor.sql index 04bb898db..9f6abb73a 100644 --- a/src/test/regress/sql/background_task_queue_monitor.sql +++ b/src/test/regress/sql/background_task_queue_monitor.sql @@ -3,6 +3,7 @@ SET search_path TO background_task_queue_monitor; SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 3536400; +SET client_min_messages TO ERROR; -- reset sequence values ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 1450000; @@ -279,11 +280,106 @@ SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2) ORDER BY job_id, task_id; -- show that task is cancelled +-- TEST11 +-- verify that we do not allow parallel task executors involving a particular node +-- more than citus.max_background_task_executors_per_node +-- verify that we can change citus.max_background_task_executors_per_node on the fly +-- tests are done with dummy node ids +-- citus_task_wait calls are used to ensure consistent pg_dist_background_task query +-- output i.e. to avoid flakiness + +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify changing max background task executors per node on the fly') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(2); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(2); $job$, ARRAY [3, 4]) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id3 \gset +INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [1, 3]) RETURNING task_id AS task_id4 \gset +INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [2, 4]) RETURNING task_id AS task_id5 \gset +INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(7); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id6 \gset +INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(6); $job$, ARRAY [1, 3]) RETURNING task_id AS task_id7 \gset +INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(6); $job$, ARRAY [1, 4]) RETURNING task_id AS task_id8 \gset +COMMIT; + +SELECT citus_task_wait(:task_id1, desired_status => 'running'); +SELECT citus_task_wait(:task_id2, desired_status => 'running'); + +SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, + :task_id5, :task_id6, :task_id7, :task_id8) + ORDER BY job_id, task_id; -- show that at most 1 task per node is running + +SELECT citus_task_wait(:task_id1, desired_status => 'done'); +SELECT citus_task_wait(:task_id2, desired_status => 'done'); +-- increase max_background_task_executors_per_node on the fly +ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2; +SELECT pg_reload_conf(); + +SELECT citus_task_wait(:task_id3, desired_status => 'running'); +SELECT citus_task_wait(:task_id4, desired_status => 'running'); +SELECT citus_task_wait(:task_id5, desired_status => 'running'); + +SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, + :task_id5, :task_id6, :task_id7, :task_id8) + ORDER BY job_id, task_id; -- show that at most 2 tasks per node are running + +-- increase to 3 max_background_task_executors_per_node on the fly +SELECT citus_task_wait(:task_id3, desired_status => 'done'); +SELECT citus_task_wait(:task_id4, desired_status => 'done'); +SELECT citus_task_wait(:task_id5, desired_status => 'done'); +ALTER SYSTEM SET citus.max_background_task_executors_per_node = 3; +SELECT pg_reload_conf(); + +SELECT citus_task_wait(:task_id6, desired_status => 'running'); +SELECT citus_task_wait(:task_id7, desired_status => 'running'); +SELECT citus_task_wait(:task_id8, desired_status => 'running'); + +SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, + :task_id5, :task_id6, :task_id7, :task_id8) + ORDER BY job_id, task_id; -- show that at most 3 tasks per node are running + +ALTER SYSTEM RESET citus.max_background_task_executors_per_node; +SELECT pg_reload_conf(); + +-- if pg_cancel_backend is called on one of the running task PIDs +-- task doesn't restart because it's not allowed anymore by the limit. +-- node with id 1 can be used only once, unless there are previously running tasks +SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset +SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process + +-- task goes to only runnable state, not running anymore. +SELECT citus_task_wait(:task_id6, desired_status => 'runnable'); + +-- show that cancelled task hasn't restarted because limit doesn't allow it +SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, + :task_id5, :task_id6, :task_id7, :task_id8) + ORDER BY job_id, task_id; + +SELECT citus_task_wait(:task_id7, desired_status => 'done'); +SELECT citus_task_wait(:task_id8, desired_status => 'done'); +SELECT citus_task_wait(:task_id6, desired_status => 'running'); + +-- show that the 6th task has restarted only after both 6 and 7 are done +-- since we have a limit of 1 background task executor per node with id 1 +SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, + :task_id5, :task_id6, :task_id7, :task_id8) + ORDER BY job_id, task_id; + +SELECT citus_job_cancel(:job_id1); +SELECT citus_job_wait(:job_id1); + +ALTER SYSTEM RESET citus.max_background_task_executors_per_node; +SELECT pg_reload_conf(); + SET client_min_messages TO WARNING; TRUNCATE pg_dist_background_job CASCADE; TRUNCATE pg_dist_background_task CASCADE; TRUNCATE pg_dist_background_task_depend; DROP SCHEMA background_task_queue_monitor CASCADE; +RESET client_min_messages; ALTER SYSTEM RESET citus.background_task_queue_interval; ALTER SYSTEM RESET citus.max_background_task_executors; diff --git a/src/test/regress/sql/multi_metadata_attributes.sql b/src/test/regress/sql/multi_metadata_attributes.sql index 58351310c..1a592d858 100644 --- a/src/test/regress/sql/multi_metadata_attributes.sql +++ b/src/test/regress/sql/multi_metadata_attributes.sql @@ -10,5 +10,6 @@ FROM pg_attribute WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass, 'pg_dist_rebalance_strategy'::regclass, 'pg_dist_partition'::regclass, - 'pg_dist_object'::regclass) + 'pg_dist_object'::regclass, + 'pg_dist_background_task'::regclass) ORDER BY attrelid, attname;