diff --git a/.circleci/config.yml b/.circleci/config.yml index 225195d27..d5eadd94f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,7 +6,7 @@ orbs: parameters: image_suffix: type: string - default: '-v3417e8d' + default: '-v087ecd7' pg13_version: type: string default: '13.10' @@ -490,6 +490,10 @@ jobs: pg_major: << parameters.pg_major >> - configure - enable_core + - run: + name: 'Install DBI.pm' + command: | + apt-get update && apt-get install libdbi-perl && apt-get install libdbd-pg-perl - run: name: 'Run Test' command: | diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 8f28b04b0..1cefb5769 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -37,7 +37,8 @@ OBJS += \ all: cdc cdc: - $(MAKE) -C cdc all + echo "running cdc make" + $(MAKE) DECODER=pgoutput -C cdc all NO_PGXS = 1 @@ -84,13 +85,12 @@ ifneq (,$(SQL_Po_files)) include $(SQL_Po_files) endif - -.PHONY: clean-full install install-downgrades install-all install-cdc clean-cdc +.PHONY: clean-full install install-downgrades install-all clean: clean-cdc clean-cdc: - $(MAKE) -C cdc clean + $(MAKE) DECODER=pgoutput -C cdc clean cleanup-before-install: rm -f $(DESTDIR)$(datadir)/$(datamoduledir)/citus.control @@ -99,7 +99,7 @@ cleanup-before-install: install: cleanup-before-install install-cdc install-cdc: - $(MAKE) -C cdc install + $(MAKE) DECODER=pgoutput -C cdc install # install and install-downgrades should be run sequentially install-all: install diff --git a/src/backend/distributed/cdc/Makefile b/src/backend/distributed/cdc/Makefile index 8a7449476..76aa28726 100644 --- a/src/backend/distributed/cdc/Makefile +++ b/src/backend/distributed/cdc/Makefile @@ -1,45 +1,26 @@ +ifndef DECODER + DECODER = pgoutput +endif + +MODULE_big = citus_$(DECODER) +citus_subdir = src/backend/distributed/cdc citus_top_builddir = ../../../.. +citus_decoders_dir = $(DESTDIR)$(pkglibdir)/citus_decoders + +OBJS += cdc_decoder.o cdc_decoder_utils.o + include $(citus_top_builddir)/Makefile.global -citus_subdir = src/backend/distributed/cdc -SRC_DIR = $(citus_abs_top_srcdir)/$(citus_subdir) +override CFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include +override CPPFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include -#List of supported based decoders. Add new decoders here. -cdc_base_decoders :=pgoutput wal2json +install: install-cdc -all: build-cdc-decoders +clean: clean-cdc -copy-decoder-files-to-build-dir: - $(eval DECODER_BUILD_DIR=build-cdc-$(DECODER)) - mkdir -p $(DECODER_BUILD_DIR) - @for file in $(SRC_DIR)/*.c $(SRC_DIR)/*.h; do \ - if [ -f $$file ]; then \ - if [ -f $(DECODER_BUILD_DIR)/$$(basename $$file) ]; then \ - if ! diff -q $$file $(DECODER_BUILD_DIR)/$$(basename $$file); then \ - cp $$file $(DECODER_BUILD_DIR)/$$(basename $$file); \ - fi \ - else \ - cp $$file $(DECODER_BUILD_DIR)/$$(basename $$file); \ - fi \ - fi \ - done - cp $(SRC_DIR)/Makefile.decoder $(DECODER_BUILD_DIR)/Makefile - -build-cdc-decoders: - $(foreach base_decoder,$(cdc_base_decoders),$(MAKE) DECODER=$(base_decoder) build-cdc-decoder;) - -install-cdc-decoders: - $(foreach base_decoder,$(cdc_base_decoders),$(MAKE) DECODER=$(base_decoder) -C build-cdc-$(base_decoder) install;) - -clean-cdc-decoders: - $(foreach base_decoder,$(cdc_base_decoders),rm -rf build-cdc-$(base_decoder);) - - -build-cdc-decoder: - $(MAKE) DECODER=$(DECODER) copy-decoder-files-to-build-dir - $(MAKE) DECODER=$(DECODER) -C build-cdc-$(DECODER) - -install: install-cdc-decoders - -clean: clean-cdc-decoders +install-cdc: + mkdir -p '$(citus_decoders_dir)' + $(INSTALL_SHLIB) citus_$(DECODER).so '$(citus_decoders_dir)/$(DECODER).so' +clean-cdc: + rm -f '$(DESTDIR)$(datadir)/$(datamoduledir)/citus_decoders/$(DECODER).so' diff --git a/src/backend/distributed/cdc/Makefile.decoder b/src/backend/distributed/cdc/Makefile.decoder deleted file mode 100644 index 963f8bf15..000000000 --- a/src/backend/distributed/cdc/Makefile.decoder +++ /dev/null @@ -1,24 +0,0 @@ -MODULE_big = citus_$(DECODER) - -citus_decoders_dir = $(DESTDIR)$(pkglibdir)/citus_decoders - -citus_top_builddir = ../../../../.. -citus_subdir = src/backend/distributed/cdc/cdc_$(DECODER) - -OBJS += cdc_decoder.o cdc_decoder_utils.o - -include $(citus_top_builddir)/Makefile.global - -override CFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include -override CPPFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include - -install: install-cdc - -clean: clean-cdc - -install-cdc: - mkdir -p '$(citus_decoders_dir)' - $(INSTALL_SHLIB) citus_$(DECODER).so '$(citus_decoders_dir)/$(DECODER).so' - -clean-cdc: - rm -f '$(DESTDIR)$(datadir)/$(datamoduledir)/citus_decoders/$(DECODER).so' diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 9fd4290ba..b25da1ebd 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -32,7 +32,6 @@ #include "catalog/pg_type.h" #include "commands/extension.h" #include "commands/sequence.h" -#include "distributed/background_jobs.h" #include "distributed/colocation_utils.h" #include "distributed/connection_management.h" #include "distributed/citus_nodes.h" @@ -58,9 +57,7 @@ #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" #include "distributed/remote_commands.h" -#include "distributed/shard_rebalancer.h" #include "distributed/tuplestore.h" -#include "distributed/utils/array_type.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" @@ -780,6 +777,7 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, { partitionedShardNames = lappend(partitionedShardNames, quotedShardName); } + /* for non-partitioned tables, we will use Postgres' size functions */ else { @@ -2818,8 +2816,7 @@ CreateBackgroundJob(const char *jobType, const char *description) */ BackgroundTask * ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskCount, - int64 dependingTaskIds[], int nodesInvolvedCount, int32 - nodesInvolved[]) + int64 dependingTaskIds[]) { BackgroundTask *task = NULL; @@ -2893,11 +2890,6 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC values[Anum_pg_dist_background_task_message - 1] = CStringGetTextDatum(""); nulls[Anum_pg_dist_background_task_message - 1] = false; - values[Anum_pg_dist_background_task_nodes_involved - 1] = - IntArrayToDatum(nodesInvolvedCount, nodesInvolved); - nulls[Anum_pg_dist_background_task_nodes_involved - 1] = (nodesInvolvedCount == - 0); - HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask), values, nulls); CatalogTupleInsert(pgDistBackgroundTask, newTuple); @@ -3209,13 +3201,6 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple) TextDatumGetCString(values[Anum_pg_dist_background_task_message - 1]); } - if (!nulls[Anum_pg_dist_background_task_nodes_involved - 1]) - { - ArrayType *nodesInvolvedArrayObject = - DatumGetArrayTypeP(values[Anum_pg_dist_background_task_nodes_involved - 1]); - task->nodesInvolved = IntegerArrayTypeToList(nodesInvolvedArrayObject); - } - return task; } @@ -3348,8 +3333,7 @@ GetRunnableBackgroundTask(void) while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor))) { task = DeformBackgroundTaskHeapTuple(tupleDescriptor, taskTuple); - if (BackgroundTaskReadyToRun(task) && - IncrementParallelTaskCountForNodesInvolved(task)) + if (BackgroundTaskReadyToRun(task)) { /* found task, close table and return */ break; diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index b5ec9b7ba..5d30ff8be 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -190,32 +190,13 @@ typedef struct WorkerShardStatistics HTAB *statistics; } WorkerShardStatistics; -/* - * ShardMoveDependencyHashEntry contains the taskId which any new shard - * move task within the corresponding colocation group - * must take a dependency on - */ +/* ShardMoveDependencyHashEntry contains the taskId which any new shard move task within the corresponding colocation group must take a dependency on */ typedef struct ShardMoveDependencyInfo { int64 key; int64 taskId; } ShardMoveDependencyInfo; -/* - * ShardMoveSourceNodeHashEntry keeps track of the source nodes - * of the moves. - */ -typedef struct ShardMoveSourceNodeHashEntry -{ - /* this is the key */ - int32 node_id; - List *taskIds; -} ShardMoveSourceNodeHashEntry; - -/* - * ShardMoveDependencies keeps track of all needed dependencies - * between shard moves. - */ typedef struct ShardMoveDependencies { HTAB *colocationDependencies; @@ -293,15 +274,6 @@ static void AddToWorkerShardIdSet(HTAB *shardsByWorker, char *workerName, int wo static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics); static void ErrorOnConcurrentRebalance(RebalanceOptions *); static List * GetSetCommandListForNewConnections(void); -static int64 GetColocationId(PlacementUpdateEvent *move); -static ShardMoveDependencies InitializeShardMoveDependencies(); -static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 - colocationId, - ShardMoveDependencies shardMoveDependencies, - int *nDepends); -static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, - int64 taskId, - ShardMoveDependencies shardMoveDependencies); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(rebalance_table_shards); @@ -1958,7 +1930,8 @@ GetColocationId(PlacementUpdateEvent *move) * InitializeShardMoveDependencies function creates the hash maps that we use to track * the latest moves so that subsequent moves with the same properties must take a dependency * on them. There are two hash maps. One is for tracking the latest move scheduled in a - * given colocation group and the other one is for tracking source nodes of all moves. + * given colocation group and the other one is for tracking the latest move which involves + * a given node either as its source node or its target node. */ static ShardMoveDependencies InitializeShardMoveDependencies() @@ -1968,17 +1941,18 @@ InitializeShardMoveDependencies() ShardMoveDependencyInfo, "colocationDependencyHashMap", 6); - shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int32, - ShardMoveSourceNodeHashEntry, + shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int64, + ShardMoveDependencyInfo, "nodeDependencyHashMap", 6); + return shardMoveDependencies; } /* * GenerateTaskMoveDependencyList creates and returns a List of taskIds that - * the move must take a dependency on, given the shard move dependencies as input. + * the move must take a dependency on. */ static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, @@ -1998,24 +1972,27 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); } - /* - * Check if there exists moves scheduled earlier whose source node - * overlaps with the current move's target node. - * The earlier/first move might make space for the later/second move. - * So we could run out of disk space (or at least overload the node) - * if we move the second shard to it before the first one is moved away.  - */ - ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search( - shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_FIND, + /* Check if there exists a move scheduled earlier whose source or target node + * overlaps with the current move's source node. */ + shardMoveDependencyInfo = hash_search( + shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER, &found); if (found) { - int64 *taskId = NULL; - foreach_ptr(taskId, shardMoveSourceNodeHashEntry->taskIds) - { - hash_search(dependsList, taskId, HASH_ENTER, NULL); - } + hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); + } + + /* Check if there exists a move scheduled earlier whose source or target node + * overlaps with the current move's target node. */ + shardMoveDependencyInfo = hash_search( + shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_ENTER, + &found); + + + if (found) + { + hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); } *nDepends = hash_get_num_entries(dependsList); @@ -2053,20 +2030,15 @@ UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL); shardMoveDependencyInfo->taskId = taskId; - bool found; - ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search( - shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER, - &found); + shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies, + &move->sourceNode->nodeId, HASH_ENTER, NULL); - if (!found) - { - shardMoveSourceNodeHashEntry->taskIds = NIL; - } + shardMoveDependencyInfo->taskId = taskId; - int64 *newTaskId = palloc0(sizeof(int64)); - *newTaskId = taskId; - shardMoveSourceNodeHashEntry->taskIds = lappend( - shardMoveSourceNodeHashEntry->taskIds, newTaskId); + shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies, + &move->targetNode->nodeId, HASH_ENTER, NULL); + + shardMoveDependencyInfo->taskId = taskId; } @@ -2163,10 +2135,8 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo appendStringInfo(&buf, "SELECT pg_catalog.replicate_reference_tables(%s)", quote_literal_cstr(shardTranferModeLabel)); - - int32 nodesInvolved[] = { 0 }; BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, 0, - NULL, 0, nodesInvolved); + NULL); replicateRefTablesTaskId = task->taskid; } @@ -2200,14 +2170,9 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo dependsArray[0] = replicateRefTablesTaskId; } - int32 nodesInvolved[2] = { 0 }; - nodesInvolved[0] = move->sourceNode->nodeId; - nodesInvolved[1] = move->targetNode->nodeId; - BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, nDepends, - dependsArray, 2, - nodesInvolved); + dependsArray); UpdateShardMoveDependencies(move, colocationId, task->taskid, shardMoveDependencies); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index e7fedd1d8..76e0ae9b9 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1793,18 +1793,6 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); - DefineCustomIntVariable( - "citus.max_background_task_executors_per_node", - gettext_noop( - "Sets the maximum number of parallel background task executor workers " - "for scheduled background tasks that involve a particular node"), - NULL, - &MaxBackgroundTaskExecutorsPerNode, - 1, 1, 128, - PGC_SIGHUP, - GUC_STANDARD, - NULL, NULL, NULL); - DefineCustomIntVariable( "citus.max_cached_connection_lifetime", gettext_noop("Sets the maximum lifetime of cached connections to other nodes."), diff --git a/src/backend/distributed/sql/citus--11.2-1--11.2-2.sql b/src/backend/distributed/sql/citus--11.2-1--11.2-2.sql deleted file mode 100644 index 2a4051541..000000000 --- a/src/backend/distributed/sql/citus--11.2-1--11.2-2.sql +++ /dev/null @@ -1,2 +0,0 @@ --- citus--11.2-1--11.2-2 -#include "udfs/worker_adjust_identity_column_seq_ranges/11.2-2.sql" diff --git a/src/backend/distributed/sql/citus--11.2-2--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql similarity index 83% rename from src/backend/distributed/sql/citus--11.2-2--11.3-1.sql rename to src/backend/distributed/sql/citus--11.2-1--11.3-1.sql index 9b45cc0fe..fd14cd90d 100644 --- a/src/backend/distributed/sql/citus--11.2-2--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql @@ -1,5 +1,6 @@ -- citus--11.2-1--11.3-1 #include "udfs/repl_origin_helper/11.3-1.sql" +#include "udfs/worker_adjust_identity_column_seq_ranges/11.3-1.sql" ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY USING INDEX pg_dist_authinfo_identification_index; ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY USING INDEX pg_dist_partition_logical_relid_index; ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY USING INDEX pg_dist_placement_placementid_index; @@ -14,7 +15,3 @@ ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY USING INDEX pg_dist_ #include "udfs/citus_stat_tenants_local_reset/11.3-1.sql" #include "udfs/citus_stat_tenants_reset/11.3-1.sql" - --- we introduce nodes_involved, which will be used internally to --- limit the number of parallel tasks running per node -ALTER TABLE pg_catalog.pg_dist_background_task ADD COLUMN nodes_involved int[] DEFAULT NULL; diff --git a/src/backend/distributed/sql/downgrades/citus--11.2-2--11.2-1.sql b/src/backend/distributed/sql/downgrades/citus--11.2-2--11.2-1.sql deleted file mode 100644 index 92ba854da..000000000 --- a/src/backend/distributed/sql/downgrades/citus--11.2-2--11.2-1.sql +++ /dev/null @@ -1,2 +0,0 @@ --- citus--11.2-2--11.2-1 -DROP FUNCTION IF EXISTS pg_catalog.worker_adjust_identity_column_seq_ranges(regclass); diff --git a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-2.sql b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql similarity index 94% rename from src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-2.sql rename to src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql index 0550354e1..15afec40a 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-2.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql @@ -3,6 +3,7 @@ DROP FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking(); DROP FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking(); DROP FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active(); +DROP FUNCTION IF EXISTS pg_catalog.worker_adjust_identity_column_seq_ranges(regclass); ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY NOTHING; ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY NOTHING; ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY NOTHING; @@ -28,5 +29,3 @@ DROP FUNCTION pg_catalog.citus_stat_tenants(boolean); DROP FUNCTION pg_catalog.citus_stat_tenants_local_reset(); DROP FUNCTION pg_catalog.citus_stat_tenants_reset(); - -ALTER TABLE pg_catalog.pg_dist_background_task DROP COLUMN nodes_involved; diff --git a/src/backend/distributed/sql/udfs/worker_adjust_identity_column_seq_ranges/11.2-2.sql b/src/backend/distributed/sql/udfs/worker_adjust_identity_column_seq_ranges/11.2-2.sql deleted file mode 100644 index b444bfa22..000000000 --- a/src/backend/distributed/sql/udfs/worker_adjust_identity_column_seq_ranges/11.2-2.sql +++ /dev/null @@ -1,3 +0,0 @@ --- Since we backported the UDF below from version 11.3, the definition is the same -#include "11.3-1.sql" - diff --git a/src/backend/distributed/utils/array_type.c b/src/backend/distributed/utils/array_type.c index 70c7dde14..348b25b4a 100644 --- a/src/backend/distributed/utils/array_type.c +++ b/src/backend/distributed/utils/array_type.c @@ -140,34 +140,3 @@ TextArrayTypeToIntegerList(ArrayType *arrayObject) return list; } - - -/* - * IntArrayToDatum - * - * Convert an integer array to the datum int array format - * (currently used for nodes_involved in pg_dist_background_task) - * - * Returns the array in the form of a Datum, or PointerGetDatum(NULL) - * if the int_array is empty. - */ -Datum -IntArrayToDatum(uint32 int_array_size, int int_array[]) -{ - if (int_array_size == 0) - { - return PointerGetDatum(NULL); - } - - ArrayBuildState *astate = NULL; - for (int i = 0; i < int_array_size; i++) - { - Datum dvalue = Int32GetDatum(int_array[i]); - bool disnull = false; - Oid element_type = INT4OID; - astate = accumArrayResult(astate, dvalue, disnull, element_type, - CurrentMemoryContext); - } - - return makeArrayResult(astate, CurrentMemoryContext); -} diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 789732d21..d7a5a31bd 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -63,7 +63,6 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_utility.h" #include "distributed/shard_cleaner.h" -#include "distributed/shard_rebalancer.h" #include "distributed/resource_lock.h" /* Table-of-contents constants for our dynamic shared memory segment. */ @@ -116,17 +115,12 @@ static bool MonitorGotTerminationOrCancellationRequest(); static void QueueMonitorSigTermHandler(SIGNAL_ARGS); static void QueueMonitorSigIntHandler(SIGNAL_ARGS); static void QueueMonitorSigHupHandler(SIGNAL_ARGS); -static void DecrementParallelTaskCountForNodesInvolved(BackgroundTask *task); /* flags set by signal handlers */ static volatile sig_atomic_t GotSigterm = false; static volatile sig_atomic_t GotSigint = false; static volatile sig_atomic_t GotSighup = false; -/* keeping track of parallel background tasks per node */ -HTAB *ParallelTasksPerNode = NULL; -int MaxBackgroundTaskExecutorsPerNode = 1; - PG_FUNCTION_INFO_V1(citus_job_cancel); PG_FUNCTION_INFO_V1(citus_job_wait); PG_FUNCTION_INFO_V1(citus_task_wait); @@ -217,7 +211,7 @@ citus_job_wait(PG_FUNCTION_ARGS) * assume any terminal state as its desired status. The function returns if the * desired_state was reached. * - * The current implementation is a polling implementation with an interval of 0.1 seconds. + * The current implementation is a polling implementation with an interval of 1 second. * Ideally we would have some synchronization between the background tasks queue monitor * and any backend calling this function to receive a signal when the task changes state. */ @@ -863,7 +857,6 @@ TaskEnded(TaskExecutionContext *taskExecutionContext) UpdateBackgroundTask(task); UpdateDependingTasks(task); UpdateBackgroundJob(task->jobid); - DecrementParallelTaskCountForNodesInvolved(task); /* we are sure that at least one task did not block on current iteration */ queueMonitorExecutionContext->allTasksWouldBlock = false; @@ -875,77 +868,6 @@ TaskEnded(TaskExecutionContext *taskExecutionContext) } -/* - * IncrementParallelTaskCountForNodesInvolved - * Checks whether we have reached the limit of parallel tasks per node - * per each of the nodes involved with the task - * If at least one limit is reached, it returns false. - * If limits aren't reached, it increments the parallel task count - * for each of the nodes involved with the task, and returns true. - */ -bool -IncrementParallelTaskCountForNodesInvolved(BackgroundTask *task) -{ - if (task->nodesInvolved) - { - int node; - - /* first check whether we have reached the limit for any of the nodes */ - foreach_int(node, task->nodesInvolved) - { - bool found; - ParallelTasksPerNodeEntry *hashEntry = hash_search( - ParallelTasksPerNode, &(node), HASH_ENTER, &found); - if (!found) - { - hashEntry->counter = 0; - } - else if (hashEntry->counter >= MaxBackgroundTaskExecutorsPerNode) - { - /* at least one node's limit is reached */ - return false; - } - } - - /* then, increment the parallel task count per each node */ - foreach_int(node, task->nodesInvolved) - { - ParallelTasksPerNodeEntry *hashEntry = hash_search( - ParallelTasksPerNode, &(node), HASH_FIND, NULL); - Assert(hashEntry); - hashEntry->counter += 1; - } - } - - return true; -} - - -/* - * DecrementParallelTaskCountForNodesInvolved - * Decrements the parallel task count for each of the nodes involved - * with the task. - * We call this function after the task has gone through Running state - * and then has ended. - */ -static void -DecrementParallelTaskCountForNodesInvolved(BackgroundTask *task) -{ - if (task->nodesInvolved) - { - int node; - foreach_int(node, task->nodesInvolved) - { - ParallelTasksPerNodeEntry *hashEntry = hash_search(ParallelTasksPerNode, - &(node), - HASH_FIND, NULL); - - hashEntry->counter -= 1; - } - } -} - - /* * QueueMonitorSigHupHandler handles SIGHUP to update monitor related config params. */ @@ -1101,7 +1023,7 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) /* handle SIGINT to properly cancel active task executors */ pqsignal(SIGINT, QueueMonitorSigIntHandler); - /* handle SIGHUP to update MaxBackgroundTaskExecutors and MaxBackgroundTaskExecutorsPerNode */ + /* handle SIGHUP to update MaxBackgroundTaskExecutors */ pqsignal(SIGHUP, QueueMonitorSigHupHandler); /* ready to handle signals */ @@ -1245,15 +1167,10 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) { GotSighup = false; - /* update max_background_task_executors and max_background_task_executors_per_node if changed */ + /* update max_background_task_executors if changed */ ProcessConfigFile(PGC_SIGHUP); } - if (ParallelTasksPerNode == NULL) - { - ParallelTasksPerNode = CreateSimpleHash(int32, ParallelTasksPerNodeEntry); - } - /* assign runnable tasks, if any, to new task executors in a transaction if we do not have SIGTERM or SIGINT */ if (!MonitorGotTerminationOrCancellationRequest()) { diff --git a/src/include/distributed/background_jobs.h b/src/include/distributed/background_jobs.h index 35745c014..3a14b6207 100644 --- a/src/include/distributed/background_jobs.h +++ b/src/include/distributed/background_jobs.h @@ -85,21 +85,6 @@ typedef struct TaskExecutionContext } TaskExecutionContext; -/* - * ParallelTasksPerNodeEntry is the struct used - * to track the number of concurrent background tasks that - * involve a particular node (the key to the entry) - */ -typedef struct ParallelTasksPerNodeEntry -{ - /* Used as hash key. */ - int32 node_id; - - /* number of concurrent background tasks that involve node node_id */ - uint32 counter; -} ParallelTasksPerNodeEntry; - - extern BackgroundWorkerHandle * StartCitusBackgroundTaskQueueMonitor(Oid database, Oid extensionOwner); extern void CitusBackgroundTaskQueueMonitorMain(Datum arg); @@ -110,6 +95,5 @@ extern Datum citus_job_wait(PG_FUNCTION_ARGS); extern Datum citus_task_wait(PG_FUNCTION_ARGS); extern void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus); extern void citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus); -extern bool IncrementParallelTaskCountForNodesInvolved(BackgroundTask *task); #endif /*CITUS_BACKGROUND_JOBS_H */ diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index e27f3df22..f7b2038ee 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -252,7 +252,6 @@ typedef struct BackgroundTask int32 *retry_count; TimestampTz *not_before; char *message; - List *nodesInvolved; /* extra space to store values for nullable value types above */ struct @@ -389,9 +388,7 @@ extern bool HasNonTerminalJobOfType(const char *jobType, int64 *jobIdOut); extern int64 CreateBackgroundJob(const char *jobType, const char *description); extern BackgroundTask * ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskCount, - int64 dependingTaskIds[], - int nodesInvolvedCount, - int32 nodesInvolved[]); + int64 dependingTaskIds[]); extern BackgroundTask * GetRunnableBackgroundTask(void); extern void ResetRunningBackgroundTasks(void); extern BackgroundJob * GetBackgroundJobByJobId(int64 jobId); diff --git a/src/include/distributed/pg_dist_background_task.h b/src/include/distributed/pg_dist_background_task.h index 9e6673a64..b6d132e59 100644 --- a/src/include/distributed/pg_dist_background_task.h +++ b/src/include/distributed/pg_dist_background_task.h @@ -15,7 +15,7 @@ * compiler constants for pg_dist_background_task * ---------------- */ -#define Natts_pg_dist_background_task 10 +#define Natts_pg_dist_background_task 9 #define Anum_pg_dist_background_task_job_id 1 #define Anum_pg_dist_background_task_task_id 2 #define Anum_pg_dist_background_task_owner 3 @@ -25,6 +25,5 @@ #define Anum_pg_dist_background_task_retry_count 7 #define Anum_pg_dist_background_task_not_before 8 #define Anum_pg_dist_background_task_message 9 -#define Anum_pg_dist_background_task_nodes_involved 10 #endif /* CITUS_PG_DIST_BACKGROUND_TASK_H */ diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 705196ad4..90f73e2f3 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -190,7 +190,6 @@ extern char *VariablesToBePassedToNewConnections; extern int MaxRebalancerLoggedIgnoredMoves; extern bool RunningUnderIsolationTest; extern bool PropagateSessionSettingsForLoopbackConnection; -extern int MaxBackgroundTaskExecutorsPerNode; /* External function declarations */ extern Datum shard_placement_rebalance_array(PG_FUNCTION_ARGS); diff --git a/src/include/distributed/utils/array_type.h b/src/include/distributed/utils/array_type.h index 43826076e..4599b8a9f 100644 --- a/src/include/distributed/utils/array_type.h +++ b/src/include/distributed/utils/array_type.h @@ -22,6 +22,5 @@ extern ArrayType * DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId); extern List * IntegerArrayTypeToList(ArrayType *arrayObject); extern List * TextArrayTypeToIntegerList(ArrayType *arrayObject); -extern Datum IntArrayToDatum(uint32 int_array_size, int int_array[]); #endif /* CITUS_ARRAY_TYPE_H */ diff --git a/src/test/cdc/t/016_cdc_wal2json.pl b/src/test/cdc/t/016_cdc_wal2json.pl deleted file mode 100644 index ab384df64..000000000 --- a/src/test/cdc/t/016_cdc_wal2json.pl +++ /dev/null @@ -1,51 +0,0 @@ -# Schema change CDC test for Citus -use strict; -use warnings; - -use Test::More; - -use lib './t'; -use cdctestlib; - -use threads; - -# Initialize co-ordinator node -my $select_stmt = qq(SELECT * FROM data_100008 ORDER BY id;); -my $result = 0; - -### Create the citus cluster with coordinator and two worker nodes -our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636); - -print("coordinator port: " . $node_coordinator->port() . "\n"); -print("worker0 port:" . $workers[0]->port() . "\n"); - -my $initial_schema = " - CREATE TABLE data_100008( - id integer, - data text, - PRIMARY KEY (data));"; - -$node_coordinator->safe_psql('postgres',$initial_schema); -$node_coordinator->safe_psql('postgres','ALTER TABLE data_100008 REPLICA IDENTITY FULL;'); -$node_coordinator->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','wal2json');"); - -#insert data into the data_100008 table in the coordinator node before distributing the table. -$node_coordinator->safe_psql('postgres'," - INSERT INTO data_100008 - SELECT i, 'my test data ' || i - FROM generate_series(-1,1)i;"); - -my $output = $node_coordinator->safe_psql('postgres',"SELECT * FROM pg_logical_slot_get_changes('cdc_replication_slot', NULL, NULL);"); -print($output); - -my $change_string_expected = '[0,"my test data 0"]'; -if ($output =~ /$change_string_expected/) { - $result = 1; -} else { - $result = 0; -} - -is($result, 1, 'CDC create_distributed_table - wal2json test'); -$node_coordinator->safe_psql('postgres',"SELECT pg_drop_replication_slot('cdc_replication_slot');"); - -done_testing(); diff --git a/src/test/regress/after_citus_upgrade_coord_schedule b/src/test/regress/after_citus_upgrade_coord_schedule index 6a2a5255a..b36151ce6 100644 --- a/src/test/regress/after_citus_upgrade_coord_schedule +++ b/src/test/regress/after_citus_upgrade_coord_schedule @@ -1,7 +1,6 @@ # this schedule is to be run only on coordinators test: upgrade_citus_finish_citus_upgrade -test: upgrade_pg_dist_cleanup_after test: upgrade_basic_after test: upgrade_partition_constraints_after test: upgrade_pg_dist_object_test_after diff --git a/src/test/regress/before_citus_upgrade_coord_schedule b/src/test/regress/before_citus_upgrade_coord_schedule index 0e0eaa091..169a7f418 100644 --- a/src/test/regress/before_citus_upgrade_coord_schedule +++ b/src/test/regress/before_citus_upgrade_coord_schedule @@ -4,5 +4,4 @@ test: upgrade_basic_before test: upgrade_partition_constraints_before test: upgrade_pg_dist_object_test_before test: upgrade_columnar_metapage_before -test: upgrade_pg_dist_cleanup_before test: upgrade_post_11_before diff --git a/src/test/regress/citus_tests/config.py b/src/test/regress/citus_tests/config.py index 35385f952..7c9cd599a 100644 --- a/src/test/regress/citus_tests/config.py +++ b/src/test/regress/citus_tests/config.py @@ -110,7 +110,7 @@ class CitusBaseClusterConfig(object, metaclass=NewInitCaller): "max_connections": 1200, } self.new_settings = {} - self.add_coordinator_to_metadata = True + self.add_coordinator_to_metadata = False self.env_variables = {} self.skip_tests = [] @@ -166,6 +166,7 @@ class CitusDefaultClusterConfig(CitusBaseClusterConfig): "citus.use_citus_managed_tables": True, } self.settings.update(new_settings) + self.add_coordinator_to_metadata = True self.skip_tests = [ # Alter Table statement cannot be run from an arbitrary node so this test will fail "arbitrary_configs_alter_table_add_constraint_without_name_create", @@ -379,3 +380,4 @@ class PGUpgradeConfig(CitusBaseClusterConfig): self.old_datadir = self.temp_dir + "/oldData" self.new_datadir = self.temp_dir + "/newData" self.user = SUPER_USER_NAME + self.add_coordinator_to_metadata = True diff --git a/src/test/regress/expected/background_rebalance_parallel.out b/src/test/regress/expected/background_rebalance_parallel.out index 9c43fab9b..862beb57e 100644 --- a/src/test/regress/expected/background_rebalance_parallel.out +++ b/src/test/regress/expected/background_rebalance_parallel.out @@ -1,22 +1,12 @@ --- --- BACKGROUND_REBALANCE_PARALLEL --- --- Test to check if the background tasks scheduled by the background rebalancer --- have the correct dependencies --- --- Test to verify that we do not allow parallel rebalancer moves involving a --- particular node (either as source or target) more than --- citus.max_background_task_executors_per_node, and that we can change the GUC on --- the fly, and that will affect the ongoing balance as it should --- --- Test to verify that there's a hard dependency when a specific node is first being --- used as a source for a move, and then later as a target. --- +/* + Test to check if the background tasks scheduled by the background rebalancer + has the correct dependencies. +*/ CREATE SCHEMA background_rebalance_parallel; SET search_path TO background_rebalance_parallel; SET citus.next_shard_id TO 85674000; SET citus.shard_replication_factor TO 1; -SET client_min_messages TO ERROR; +SET client_min_messages TO WARNING; ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050; @@ -55,52 +45,52 @@ SELECT pg_reload_conf(); t (1 row) --- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group +/* Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group */ CREATE TABLE table1_colg1 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none'); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4 , colocate_with => 'none'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE table2_colg1 (b int PRIMARY KEY); -SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); +SELECT create_distributed_table('table2_colg1', 'b' , colocate_with => 'table1_colg1'); create_distributed_table --------------------------------------------------------------------- (1 row) --- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group +/* Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group */ CREATE TABLE table1_colg2 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none'); +SELECT create_distributed_table('table1_colg2 ', 'a', shard_count => 4, colocate_with => 'none'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE table2_colg2 (b int primary key); -SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); +SELECT create_distributed_table('table2_colg2', 'b' , colocate_with => 'table1_colg2'); create_distributed_table --------------------------------------------------------------------- (1 row) --- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group +/* Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group */ CREATE TABLE table1_colg3 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none'); +SELECT create_distributed_table('table1_colg3 ', 'a', shard_count => 4, colocate_with => 'none'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE table2_colg3 (b int primary key); -SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); +SELECT create_distributed_table('table2_colg3', 'b' , colocate_with => 'table1_colg3'); create_distributed_table --------------------------------------------------------------------- (1 row) --- Add two new nodes so that we can rebalance +/* Add two new node so that we can rebalance */ SELECT 1 FROM citus_add_node('localhost', :worker_3_port); ?column? --------------------------------------------------------------------- @@ -142,11 +132,10 @@ SELECT citus_rebalance_wait(); (1 row) --- PART 1 --- Test to check if the background tasks scheduled by the background rebalancer --- have the correct dependencies --- Check that a move is dependent on --- any other move scheduled earlier in its colocation group. +/*Check that a move is dependent on + 1. any other move scheduled earlier in its colocation group. + 2. any other move scheduled earlier whose source node or target + node overlaps with the current moves nodes. */ SELECT S.shardid, P.colocationid FROM pg_dist_shard S, pg_dist_partition P WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC; @@ -186,12 +175,16 @@ FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id, task_id | command | depends_on | command --------------------------------------------------------------------- 1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto') + 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto') + 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto') 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') + 1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') + 1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') 1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto') -(3 rows) +(7 rows) --- Check that if there is a reference table that needs to be synched to a node, --- any move without a dependency must depend on the move task for reference table. +/* Check that if there is a reference table that needs to be synched to a node, + any move without a dependency must depend on the move task for reference table. */ SELECT 1 FROM citus_drain_node('localhost',:worker_4_port); ?column? --------------------------------------------------------------------- @@ -210,8 +203,8 @@ SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true) 1 (1 row) --- Drain worker_3 so that we can move only one colocation group to worker_3 --- to create an unbalance that would cause parallel rebalancing. +/* Drain worker_3 so that we can move only one colocation group to worker_3 + to create an unbalance that would cause parallel rebalancing. */ SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); ?column? --------------------------------------------------------------------- @@ -232,7 +225,7 @@ SELECT create_reference_table('ref_table'); (1 row) --- Move all the shards of Colocation group 3 to worker_3. +/* Move all the shards of Colocation group 3 to worker_3.*/ SELECT master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') FROM @@ -250,7 +243,7 @@ ORDER BY (4 rows) CALL citus_cleanup_orphaned_resources(); --- Activate and new nodes so that we can rebalance. +/* Activate and new nodes so that we can rebalance. */ SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); ?column? --------------------------------------------------------------------- @@ -330,34 +323,18 @@ FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id, 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto') 1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') 1010 | SELECT pg_catalog.citus_move_shard_placement(85674017,52,53,'auto') | 1007 | SELECT pg_catalog.citus_move_shard_placement(85674016,52,53,'auto') - 1011 | SELECT pg_catalog.citus_move_shard_placement(85674008,51,54,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto') + 1011 | SELECT pg_catalog.citus_move_shard_placement(85674008,51,54,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') 1012 | SELECT pg_catalog.citus_move_shard_placement(85674001,50,55,'auto') | 1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto') (6 rows) --- PART 2 --- Test to verify that we do not allow parallel rebalancer moves involving a --- particular node (either as source or target) --- more than citus.max_background_task_executors_per_node --- and that we can change the GUC on the fly --- citus_task_wait calls are used to ensure consistent pg_dist_background_task query --- output i.e. to avoid flakiness --- First let's restart the scenario DROP SCHEMA background_rebalance_parallel CASCADE; TRUNCATE pg_dist_background_job CASCADE; -TRUNCATE pg_dist_background_task CASCADE; -TRUNCATE pg_dist_background_task_depend; SELECT public.wait_for_resource_cleanup(); wait_for_resource_cleanup --------------------------------------------------------------------- (1 row) -select citus_remove_node('localhost', :worker_2_port); - citus_remove_node ---------------------------------------------------------------------- - -(1 row) - select citus_remove_node('localhost', :worker_3_port); citus_remove_node --------------------------------------------------------------------- @@ -382,474 +359,6 @@ select citus_remove_node('localhost', :worker_6_port); (1 row) -CREATE SCHEMA background_rebalance_parallel; -SET search_path TO background_rebalance_parallel; --- Create 8 tables in 4 colocation groups, and populate them -CREATE TABLE table1_colg1 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg1', 'a', shard_count => 3, colocate_with => 'none'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO table1_colg1 SELECT i FROM generate_series(0, 100)i; -CREATE TABLE table2_colg1 (b int PRIMARY KEY); -SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO table2_colg1 SELECT i FROM generate_series(0, 100)i; -CREATE TABLE table1_colg2 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg2', 'a', shard_count => 3, colocate_with => 'none'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO table1_colg2 SELECT i FROM generate_series(0, 100)i; -CREATE TABLE table2_colg2 (b int PRIMARY KEY); -SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO table2_colg2 SELECT i FROM generate_series(0, 100)i; -CREATE TABLE table1_colg3 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg3', 'a', shard_count => 3, colocate_with => 'none'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO table1_colg3 SELECT i FROM generate_series(0, 100)i; -CREATE TABLE table2_colg3 (b int primary key); -SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO table2_colg3 SELECT i FROM generate_series(0, 100)i; -CREATE TABLE table1_colg4 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg4', 'a', shard_count => 3, colocate_with => 'none'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO table1_colg4 SELECT i FROM generate_series(0, 100)i; -CREATE TABLE table2_colg4 (b int PRIMARY KEY); -SELECT create_distributed_table('table2_colg4', 'b', colocate_with => 'table1_colg4'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO table2_colg4 SELECT i FROM generate_series(0, 100)i; --- Add nodes so that we can rebalance -SELECT citus_add_node('localhost', :worker_2_port); - citus_add_node ---------------------------------------------------------------------- - 56 -(1 row) - -SELECT citus_add_node('localhost', :worker_3_port); - citus_add_node ---------------------------------------------------------------------- - 57 -(1 row) - -SELECT citus_rebalance_start AS job_id from citus_rebalance_start() \gset --- see dependent tasks to understand which tasks remain runnable because of --- citus.max_background_task_executors_per_node --- and which tasks are actually blocked from colocation group dependencies -SELECT D.task_id, - (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), - D.depends_on, - (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) -FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; - task_id | command | depends_on | command ---------------------------------------------------------------------- - 1014 | SELECT pg_catalog.citus_move_shard_placement(85674026,50,57,'auto') | 1013 | SELECT pg_catalog.citus_move_shard_placement(85674025,50,56,'auto') - 1016 | SELECT pg_catalog.citus_move_shard_placement(85674032,50,57,'auto') | 1015 | SELECT pg_catalog.citus_move_shard_placement(85674031,50,56,'auto') - 1018 | SELECT pg_catalog.citus_move_shard_placement(85674038,50,57,'auto') | 1017 | SELECT pg_catalog.citus_move_shard_placement(85674037,50,56,'auto') - 1020 | SELECT pg_catalog.citus_move_shard_placement(85674044,50,57,'auto') | 1019 | SELECT pg_catalog.citus_move_shard_placement(85674043,50,56,'auto') -(4 rows) - --- default citus.max_background_task_executors_per_node is 1 --- show that first exactly one task per node is running --- among the tasks that are not blocked -SELECT citus_task_wait(1013, desired_status => 'running'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT job_id, task_id, status, nodes_involved -FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; - job_id | task_id | status | nodes_involved ---------------------------------------------------------------------- - 17779 | 1013 | running | {50,56} - 17779 | 1014 | blocked | {50,57} - 17779 | 1015 | runnable | {50,56} - 17779 | 1016 | blocked | {50,57} - 17779 | 1017 | runnable | {50,56} - 17779 | 1018 | blocked | {50,57} - 17779 | 1019 | runnable | {50,56} - 17779 | 1020 | blocked | {50,57} -(8 rows) - --- increase citus.max_background_task_executors_per_node -ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2; -SELECT pg_reload_conf(); - pg_reload_conf ---------------------------------------------------------------------- - t -(1 row) - -SELECT citus_task_wait(1015, desired_status => 'running'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT citus_task_wait(1013, desired_status => 'done'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - --- show that at most 2 tasks per node are running --- among the tasks that are not blocked -SELECT job_id, task_id, status, nodes_involved -FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; - job_id | task_id | status | nodes_involved ---------------------------------------------------------------------- - 17779 | 1013 | done | {50,56} - 17779 | 1014 | running | {50,57} - 17779 | 1015 | running | {50,56} - 17779 | 1016 | blocked | {50,57} - 17779 | 1017 | runnable | {50,56} - 17779 | 1018 | blocked | {50,57} - 17779 | 1019 | runnable | {50,56} - 17779 | 1020 | blocked | {50,57} -(8 rows) - --- decrease to default (1) -ALTER SYSTEM RESET citus.max_background_task_executors_per_node; -SELECT pg_reload_conf(); - pg_reload_conf ---------------------------------------------------------------------- - t -(1 row) - -SELECT citus_task_wait(1015, desired_status => 'done'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT citus_task_wait(1014, desired_status => 'done'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT citus_task_wait(1016, desired_status => 'running'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - --- show that exactly one task per node is running --- among the tasks that are not blocked -SELECT job_id, task_id, status, nodes_involved -FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; - job_id | task_id | status | nodes_involved ---------------------------------------------------------------------- - 17779 | 1013 | done | {50,56} - 17779 | 1014 | done | {50,57} - 17779 | 1015 | done | {50,56} - 17779 | 1016 | running | {50,57} - 17779 | 1017 | runnable | {50,56} - 17779 | 1018 | blocked | {50,57} - 17779 | 1019 | runnable | {50,56} - 17779 | 1020 | blocked | {50,57} -(8 rows) - -SELECT citus_rebalance_stop(); - citus_rebalance_stop ---------------------------------------------------------------------- - -(1 row) - --- PART 3 --- Test to verify that there's a hard dependency when A specific node is first being used as a --- source for a move, and then later as a target. --- First let's restart the scenario -DROP SCHEMA background_rebalance_parallel CASCADE; -TRUNCATE pg_dist_background_job CASCADE; -TRUNCATE pg_dist_background_task CASCADE; -TRUNCATE pg_dist_background_task_depend; -SELECT public.wait_for_resource_cleanup(); - wait_for_resource_cleanup ---------------------------------------------------------------------- - -(1 row) - -select citus_remove_node('localhost', :worker_1_port); - citus_remove_node ---------------------------------------------------------------------- - -(1 row) - -select citus_remove_node('localhost', :worker_2_port); - citus_remove_node ---------------------------------------------------------------------- - -(1 row) - -select citus_remove_node('localhost', :worker_3_port); - citus_remove_node ---------------------------------------------------------------------- - -(1 row) - -CREATE SCHEMA background_rebalance_parallel; -SET search_path TO background_rebalance_parallel; -SET citus.next_shard_id TO 85674051; -ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 61; --- add the first node --- nodeid here is 61 -select citus_add_node('localhost', :worker_1_port); - citus_add_node ---------------------------------------------------------------------- - 61 -(1 row) - --- create, populate and distribute 6 tables, each with 1 shard, none colocated with each other -CREATE TABLE table1_colg1 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg1', 'a', shard_count => 1, colocate_with => 'none'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO table1_colg1 SELECT i FROM generate_series(0, 100)i; -CREATE TABLE table1_colg2 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg2', 'a', shard_count => 1, colocate_with => 'none'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO table1_colg2 SELECT i FROM generate_series(0, 100)i; -CREATE TABLE table1_colg3 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg3', 'a', shard_count => 1, colocate_with => 'none'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO table1_colg3 SELECT i FROM generate_series(0, 100)i; -CREATE TABLE table1_colg4 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg4', 'a', shard_count => 1, colocate_with => 'none'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO table1_colg4 SELECT i FROM generate_series(0, 100)i; -CREATE TABLE table1_colg5 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg5', 'a', shard_count => 1, colocate_with => 'none'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO table1_colg5 SELECT i FROM generate_series(0, 100)i; -CREATE TABLE table1_colg6 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg6', 'a', shard_count => 1, colocate_with => 'none'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO table1_colg6 SELECT i FROM generate_series(0, 100)i; --- add two other nodes --- nodeid here is 62 -select citus_add_node('localhost', :worker_2_port); - citus_add_node ---------------------------------------------------------------------- - 62 -(1 row) - --- nodeid here is 63 -select citus_add_node('localhost', :worker_3_port); - citus_add_node ---------------------------------------------------------------------- - 63 -(1 row) - -CREATE OR REPLACE FUNCTION shard_placement_rebalance_array( - worker_node_list json[], - shard_placement_list json[], - threshold float4 DEFAULT 0, - max_shard_moves int DEFAULT 1000000, - drain_only bool DEFAULT false, - improvement_threshold float4 DEFAULT 0.5 -) -RETURNS json[] -AS 'citus' -LANGUAGE C STRICT VOLATILE; --- we are simulating the following from shard_rebalancer_unit.sql --- the following steps are all according to this scenario --- where the third move should be dependent of the first two --- because the third move's target is the source of the first two -SELECT unnest(shard_placement_rebalance_array( - ARRAY['{"node_name": "hostname1", "disallowed_shards": "1,2,3,5,6"}', - '{"node_name": "hostname2", "disallowed_shards": "4"}', - '{"node_name": "hostname3", "disallowed_shards": "4"}' - ]::json[], - ARRAY['{"shardid":1, "nodename":"hostname1"}', - '{"shardid":2, "nodename":"hostname1"}', - '{"shardid":3, "nodename":"hostname2"}', - '{"shardid":4, "nodename":"hostname2"}', - '{"shardid":5, "nodename":"hostname3"}', - '{"shardid":6, "nodename":"hostname3"}' - ]::json[] -)); - unnest ---------------------------------------------------------------------- - {"updatetype":1,"shardid":1,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname2","targetport":5432} - {"updatetype":1,"shardid":2,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname3","targetport":5432} - {"updatetype":1,"shardid":4,"sourcename":"hostname2","sourceport":5432,"targetname":"hostname1","targetport":5432} -(3 rows) - --- manually balance the cluster such that we have --- a balanced cluster like above with 1,2,3,4,5,6 and hostname1/2/3 --- shardid 85674051 (1) nodeid 61 (hostname1) --- shardid 85674052 (2) nodeid 61 (hostname1) --- shardid 85674053 (3) nodeid 62 (hostname2) --- shardid 85674054 (4) nodeid 62 (hostname2) --- shardid 85674055 (5) nodeid 63 (hostname3) --- shardid 85674056 (6) nodeid 63 (hostname3) -SELECT pg_catalog.citus_move_shard_placement(85674053,61,62,'auto'); - citus_move_shard_placement ---------------------------------------------------------------------- - -(1 row) - -SELECT pg_catalog.citus_move_shard_placement(85674054,61,62,'auto'); - citus_move_shard_placement ---------------------------------------------------------------------- - -(1 row) - -SELECT pg_catalog.citus_move_shard_placement(85674055,61,63,'auto'); - citus_move_shard_placement ---------------------------------------------------------------------- - -(1 row) - -SELECT pg_catalog.citus_move_shard_placement(85674056,61,63,'auto'); - citus_move_shard_placement ---------------------------------------------------------------------- - -(1 row) - --- now create another rebalance strategy in order to simulate moves --- which use as target a node that has been previously used as source -CREATE OR REPLACE FUNCTION test_shard_allowed_on_node(shardid bigint, nodeid int) - RETURNS boolean AS -$$ - -- analogous to '{"node_name": "hostname1", "disallowed_shards": "1,2,3,5,6"}' - select case when (shardid != 85674054 and nodeid = 61) - then false - -- analogous to '{"node_name": "hostname2", "disallowed_shards": "4"}' - -- AND '{"node_name": "hostname2", "disallowed_shards": "4"}' - when (shardid = 85674054 and nodeid != 61) - then false - else true - end; -$$ LANGUAGE sql; --- insert the new test rebalance strategy -INSERT INTO - pg_catalog.pg_dist_rebalance_strategy( - name, - default_strategy, - shard_cost_function, - node_capacity_function, - shard_allowed_on_node_function, - default_threshold, - minimum_threshold, - improvement_threshold - ) VALUES ( - 'test_source_then_target', - false, - 'citus_shard_cost_1', - 'citus_node_capacity_1', - 'background_rebalance_parallel.test_shard_allowed_on_node', - 0, - 0, - 0 - ); -SELECT * FROM get_rebalance_table_shards_plan(rebalance_strategy := 'test_source_then_target'); - table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport ---------------------------------------------------------------------- - table1_colg1 | 85674051 | 0 | localhost | 57637 | localhost | 57638 - table1_colg2 | 85674052 | 0 | localhost | 57637 | localhost | 57639 - table1_colg4 | 85674054 | 0 | localhost | 57638 | localhost | 57637 -(3 rows) - -SELECT citus_rebalance_start AS job_id from citus_rebalance_start(rebalance_strategy := 'test_source_then_target') \gset --- check that the third move is blocked and depends on the first two -SELECT job_id, task_id, status, nodes_involved -FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; - job_id | task_id | status | nodes_involved ---------------------------------------------------------------------- - 17780 | 1021 | runnable | {61,62} - 17780 | 1022 | runnable | {61,63} - 17780 | 1023 | blocked | {62,61} -(3 rows) - -SELECT D.task_id, - (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), - D.depends_on, - (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) -FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; - task_id | command | depends_on | command ---------------------------------------------------------------------- - 1023 | SELECT pg_catalog.citus_move_shard_placement(85674054,62,61,'auto') | 1021 | SELECT pg_catalog.citus_move_shard_placement(85674051,61,62,'auto') - 1023 | SELECT pg_catalog.citus_move_shard_placement(85674054,62,61,'auto') | 1022 | SELECT pg_catalog.citus_move_shard_placement(85674052,61,63,'auto') -(2 rows) - -SELECT citus_rebalance_stop(); - citus_rebalance_stop ---------------------------------------------------------------------- - -(1 row) - -DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_source_then_target'; -DROP SCHEMA background_rebalance_parallel CASCADE; -TRUNCATE pg_dist_background_job CASCADE; -TRUNCATE pg_dist_background_task CASCADE; -TRUNCATE pg_dist_background_task_depend; -SELECT public.wait_for_resource_cleanup(); - wait_for_resource_cleanup ---------------------------------------------------------------------- - -(1 row) - -select citus_remove_node('localhost', :worker_3_port); - citus_remove_node ---------------------------------------------------------------------- - -(1 row) - -- keep the rest of the tests inact that depends node/group ids ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls; diff --git a/src/test/regress/expected/background_task_queue_monitor.out b/src/test/regress/expected/background_task_queue_monitor.out index 2b4f7de37..9435af3d3 100644 --- a/src/test/regress/expected/background_task_queue_monitor.out +++ b/src/test/regress/expected/background_task_queue_monitor.out @@ -3,7 +3,6 @@ SET search_path TO background_task_queue_monitor; SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 3536400; -SET client_min_messages TO ERROR; -- reset sequence values ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 1450000; ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1450000; @@ -655,268 +654,11 @@ SELECT job_id, task_id, status FROM pg_dist_background_task 1450016 | 1450024 | done (2 rows) --- TEST11 --- verify that we do not allow parallel task executors involving a particular node --- more than citus.max_background_task_executors_per_node --- verify that we can change citus.max_background_task_executors_per_node on the fly --- tests are done with dummy node ids --- citus_task_wait calls are used to ensure consistent pg_dist_background_task query --- output i.e. to avoid flakiness -BEGIN; -INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify changing max background task executors per node on the fly') RETURNING job_id AS job_id1 \gset -INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(2); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id1 \gset -INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(2); $job$, ARRAY [3, 4]) RETURNING task_id AS task_id2 \gset -INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id3 \gset -INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [1, 3]) RETURNING task_id AS task_id4 \gset -INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [2, 4]) RETURNING task_id AS task_id5 \gset -INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(7); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id6 \gset -INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(6); $job$, ARRAY [1, 3]) RETURNING task_id AS task_id7 \gset -INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(6); $job$, ARRAY [1, 4]) RETURNING task_id AS task_id8 \gset -COMMIT; -SELECT citus_task_wait(:task_id1, desired_status => 'running'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT citus_task_wait(:task_id2, desired_status => 'running'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task - WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, - :task_id5, :task_id6, :task_id7, :task_id8) - ORDER BY job_id, task_id; -- show that at most 1 task per node is running - job_id | task_id | status | nodes_involved ---------------------------------------------------------------------- - 1450017 | 1450025 | running | {1,2} - 1450017 | 1450026 | running | {3,4} - 1450017 | 1450027 | runnable | {1,2} - 1450017 | 1450028 | runnable | {1,3} - 1450017 | 1450029 | runnable | {2,4} - 1450017 | 1450030 | runnable | {1,2} - 1450017 | 1450031 | runnable | {1,3} - 1450017 | 1450032 | runnable | {1,4} -(8 rows) - -SELECT citus_task_wait(:task_id1, desired_status => 'done'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT citus_task_wait(:task_id2, desired_status => 'done'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - --- increase max_background_task_executors_per_node on the fly -ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2; -SELECT pg_reload_conf(); - pg_reload_conf ---------------------------------------------------------------------- - t -(1 row) - -SELECT citus_task_wait(:task_id3, desired_status => 'running'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT citus_task_wait(:task_id4, desired_status => 'running'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT citus_task_wait(:task_id5, desired_status => 'running'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task - WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, - :task_id5, :task_id6, :task_id7, :task_id8) - ORDER BY job_id, task_id; -- show that at most 2 tasks per node are running - job_id | task_id | status | nodes_involved ---------------------------------------------------------------------- - 1450017 | 1450025 | done | {1,2} - 1450017 | 1450026 | done | {3,4} - 1450017 | 1450027 | running | {1,2} - 1450017 | 1450028 | running | {1,3} - 1450017 | 1450029 | running | {2,4} - 1450017 | 1450030 | runnable | {1,2} - 1450017 | 1450031 | runnable | {1,3} - 1450017 | 1450032 | runnable | {1,4} -(8 rows) - --- increase to 3 max_background_task_executors_per_node on the fly -SELECT citus_task_wait(:task_id3, desired_status => 'done'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT citus_task_wait(:task_id4, desired_status => 'done'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT citus_task_wait(:task_id5, desired_status => 'done'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -ALTER SYSTEM SET citus.max_background_task_executors_per_node = 3; -SELECT pg_reload_conf(); - pg_reload_conf ---------------------------------------------------------------------- - t -(1 row) - -SELECT citus_task_wait(:task_id6, desired_status => 'running'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT citus_task_wait(:task_id7, desired_status => 'running'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT citus_task_wait(:task_id8, desired_status => 'running'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task - WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, - :task_id5, :task_id6, :task_id7, :task_id8) - ORDER BY job_id, task_id; -- show that at most 3 tasks per node are running - job_id | task_id | status | nodes_involved ---------------------------------------------------------------------- - 1450017 | 1450025 | done | {1,2} - 1450017 | 1450026 | done | {3,4} - 1450017 | 1450027 | done | {1,2} - 1450017 | 1450028 | done | {1,3} - 1450017 | 1450029 | done | {2,4} - 1450017 | 1450030 | running | {1,2} - 1450017 | 1450031 | running | {1,3} - 1450017 | 1450032 | running | {1,4} -(8 rows) - -ALTER SYSTEM RESET citus.max_background_task_executors_per_node; -SELECT pg_reload_conf(); - pg_reload_conf ---------------------------------------------------------------------- - t -(1 row) - --- if pg_cancel_backend is called on one of the running task PIDs --- task doesn't restart because it's not allowed anymore by the limit. --- node with id 1 can be used only once, unless there are previously running tasks -SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset -SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process - pg_cancel_backend ---------------------------------------------------------------------- - t -(1 row) - --- task goes to only runnable state, not running anymore. -SELECT citus_task_wait(:task_id6, desired_status => 'runnable'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - --- show that cancelled task hasn't restarted because limit doesn't allow it -SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task - WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, - :task_id5, :task_id6, :task_id7, :task_id8) - ORDER BY job_id, task_id; - job_id | task_id | status | nodes_involved ---------------------------------------------------------------------- - 1450017 | 1450025 | done | {1,2} - 1450017 | 1450026 | done | {3,4} - 1450017 | 1450027 | done | {1,2} - 1450017 | 1450028 | done | {1,3} - 1450017 | 1450029 | done | {2,4} - 1450017 | 1450030 | runnable | {1,2} - 1450017 | 1450031 | running | {1,3} - 1450017 | 1450032 | running | {1,4} -(8 rows) - -SELECT citus_task_wait(:task_id7, desired_status => 'done'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT citus_task_wait(:task_id8, desired_status => 'done'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - -SELECT citus_task_wait(:task_id6, desired_status => 'running'); - citus_task_wait ---------------------------------------------------------------------- - -(1 row) - --- show that the 6th task has restarted only after both 6 and 7 are done --- since we have a limit of 1 background task executor per node with id 1 -SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task - WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, - :task_id5, :task_id6, :task_id7, :task_id8) - ORDER BY job_id, task_id; - job_id | task_id | status | nodes_involved ---------------------------------------------------------------------- - 1450017 | 1450025 | done | {1,2} - 1450017 | 1450026 | done | {3,4} - 1450017 | 1450027 | done | {1,2} - 1450017 | 1450028 | done | {1,3} - 1450017 | 1450029 | done | {2,4} - 1450017 | 1450030 | running | {1,2} - 1450017 | 1450031 | done | {1,3} - 1450017 | 1450032 | done | {1,4} -(8 rows) - -SELECT citus_job_cancel(:job_id1); - citus_job_cancel ---------------------------------------------------------------------- - -(1 row) - -SELECT citus_job_wait(:job_id1); - citus_job_wait ---------------------------------------------------------------------- - -(1 row) - -ALTER SYSTEM RESET citus.max_background_task_executors_per_node; -SELECT pg_reload_conf(); - pg_reload_conf ---------------------------------------------------------------------- - t -(1 row) - SET client_min_messages TO WARNING; TRUNCATE pg_dist_background_job CASCADE; TRUNCATE pg_dist_background_task CASCADE; TRUNCATE pg_dist_background_task_depend; DROP SCHEMA background_task_queue_monitor CASCADE; -RESET client_min_messages; ALTER SYSTEM RESET citus.background_task_queue_interval; ALTER SYSTEM RESET citus.max_background_task_executors; SELECT pg_reload_conf(); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index cd9987bd6..85e77160a 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1197,7 +1197,44 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Test downgrade to 11.1-1 from 11.2-1 ALTER EXTENSION citus UPDATE TO '11.2-1'; +-- create a table with orphaned shards to see if orphaned shards will be dropped +-- and cleanup records will be created for them +SET citus.shard_replication_factor to 1; +CREATE TABLE table_with_orphaned_shards (a int); +SELECT create_distributed_table('table_with_orphaned_shards', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- show there are 4 placements +SELECT * FROM pg_dist_placement ORDER BY shardid; + placementid | shardid | shardstate | shardlength | groupid +--------------------------------------------------------------------- + 1 | 102008 | 1 | 0 | 0 + 2 | 102009 | 1 | 0 | 0 + 3 | 102010 | 1 | 0 | 0 + 4 | 102011 | 1 | 0 | 0 +(4 rows) + +-- mark two of them as orphaned +UPDATE pg_dist_placement SET shardstate = 4 WHERE shardid % 2 = 1; ALTER EXTENSION citus UPDATE TO '11.1-1'; +-- show placements and cleanup records +SELECT * FROM pg_dist_placement ORDER BY shardid; + placementid | shardid | shardstate | shardlength | groupid +--------------------------------------------------------------------- + 1 | 102008 | 1 | 0 | 0 + 2 | 102009 | 4 | 0 | 0 + 3 | 102010 | 1 | 0 | 0 + 4 | 102011 | 4 | 0 | 0 +(4 rows) + +SELECT * FROM pg_dist_cleanup; + record_id | operation_id | object_type | object_name | node_group_id | policy_type +--------------------------------------------------------------------- +(0 rows) + -- Should be empty result since upgrade+downgrade should be a no-op SELECT * FROM multi_extension.print_extension_changes(); previous_object | current_object @@ -1206,6 +1243,21 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Snapshot of state at 11.2-1 ALTER EXTENSION citus UPDATE TO '11.2-1'; +-- verify that the placements are deleted and cleanup records are created +SELECT * FROM pg_dist_placement ORDER BY shardid; + placementid | shardid | shardstate | shardlength | groupid +--------------------------------------------------------------------- + 1 | 102008 | 1 | 0 | 0 + 3 | 102010 | 1 | 0 | 0 +(2 rows) + +SELECT * FROM pg_dist_cleanup; + record_id | operation_id | object_type | object_name | node_group_id | policy_type +--------------------------------------------------------------------- + 1 | 0 | 1 | table_with_orphaned_shards_102009 | 0 | 0 + 2 | 0 | 1 | table_with_orphaned_shards_102011 | 0 | 0 +(2 rows) + ALTER EXTENSION citus_columnar UPDATE TO '11.2-1'; -- Make sure that we defined dependencies from all rel objects (tables, -- indexes, sequences ..) to columnar table access method ... @@ -1243,6 +1295,16 @@ SELECT COUNT(*)=5 FROM columnar_schema_members_pg_depend; (1 row) DROP TABLE columnar_schema_members, columnar_schema_members_pg_depend; +-- error out as cleanup records remain +ALTER EXTENSION citus UPDATE TO '11.0-4'; +ERROR: pg_dist_cleanup is introduced in Citus 11.1 +HINT: To downgrade Citus to an older version, you should first cleanup all the orphaned resources and make sure pg_dist_cleanup is empty, by executing CALL citus_cleanup_orphaned_resources(); +CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE +-- cleanup +SET client_min_messages TO ERROR; +CALL citus_cleanup_orphaned_resources(); +DROP TABLE table_with_orphaned_shards; +RESET client_min_messages; SELECT * FROM multi_extension.print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- @@ -1286,26 +1348,9 @@ SELECT * FROM multi_extension.print_extension_changes(); | type cluster_clock (38 rows) --- Test downgrade to 11.2-1 from 11.2-2 -ALTER EXTENSION citus UPDATE TO '11.2-2'; -ALTER EXTENSION citus UPDATE TO '11.2-1'; --- Should be empty result since upgrade+downgrade should be a no-op -SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object ---------------------------------------------------------------------- -(0 rows) - --- Snapshot of state at 11.2-2 -ALTER EXTENSION citus UPDATE TO '11.2-2'; -SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object ---------------------------------------------------------------------- - | function worker_adjust_identity_column_seq_ranges(regclass) void -(1 row) - --- Test downgrade to 11.2-2 from 11.3-1 +-- Test downgrade to 11.2-1 from 11.3-1 ALTER EXTENSION citus UPDATE TO '11.3-1'; -ALTER EXTENSION citus UPDATE TO '11.2-2'; +ALTER EXTENSION citus UPDATE TO '11.2-1'; -- Should be empty result since upgrade+downgrade should be a no-op SELECT * FROM multi_extension.print_extension_changes(); previous_object | current_object @@ -1325,10 +1370,11 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_stat_tenants_local(boolean) SETOF record | function citus_stat_tenants_local_reset() void | function citus_stat_tenants_reset() void + | function worker_adjust_identity_column_seq_ranges(regclass) void | function worker_drop_all_shell_tables(boolean) | view citus_stat_tenants | view citus_stat_tenants_local -(11 rows) +(12 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version @@ -1750,4 +1796,4 @@ DROP TABLE version_mismatch_table; DROP SCHEMA multi_extension; ERROR: cannot drop schema multi_extension because other objects depend on it DETAIL: function multi_extension.print_extension_changes() depends on schema multi_extension -HINT: Use DROP ... CASCADE to drop the dependent objects too. \ No newline at end of file +HINT: Use DROP ... CASCADE to drop the dependent objects too. diff --git a/src/test/regress/expected/multi_metadata_attributes.out b/src/test/regress/expected/multi_metadata_attributes.out index b54946d3f..3ce512c2b 100644 --- a/src/test/regress/expected/multi_metadata_attributes.out +++ b/src/test/regress/expected/multi_metadata_attributes.out @@ -9,8 +9,7 @@ FROM pg_attribute WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass, 'pg_dist_rebalance_strategy'::regclass, 'pg_dist_partition'::regclass, - 'pg_dist_object'::regclass, - 'pg_dist_background_task'::regclass) + 'pg_dist_object'::regclass) ORDER BY attrelid, attname; attrelid | attname | atthasmissing | attmissingval --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 8c74045be..f371e11e7 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -497,7 +497,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid; 5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t (4 rows) -SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text; +SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid; logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted --------------------------------------------------------------------- mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f @@ -635,7 +635,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid; 5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t (4 rows) -SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text; +SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid; logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted --------------------------------------------------------------------- mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f @@ -872,7 +872,7 @@ WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass ORDER BY - logicalrelid::text; + logicalrelid; logicalrelid | repmodel --------------------------------------------------------------------- mx_test_schema_1.mx_table_1 | s @@ -888,7 +888,7 @@ WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass ORDER BY - logicalrelid::text, shardid; + logicalrelid, shardid; logicalrelid | shardid | nodename | nodeport --------------------------------------------------------------------- mx_test_schema_1.mx_table_1 | 1310020 | localhost | 57637 @@ -921,9 +921,7 @@ FROM pg_dist_partition WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass - OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass -ORDER BY - logicalrelid::text; + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass; logicalrelid | repmodel --------------------------------------------------------------------- mx_test_schema_1.mx_table_1 | s @@ -939,7 +937,7 @@ WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass ORDER BY - logicalrelid::text, shardid; + logicalrelid, shardid; logicalrelid | shardid | nodename | nodeport --------------------------------------------------------------------- mx_test_schema_1.mx_table_1 | 1310020 | localhost | 57637 @@ -1083,7 +1081,7 @@ FROM WHERE logicalrelid = 'mx_colocation_test_1'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass -ORDER BY logicalrelid::text; +ORDER BY logicalrelid; logicalrelid | colocationid --------------------------------------------------------------------- mx_colocation_test_1 | 10000 @@ -1103,13 +1101,11 @@ FROM pg_dist_partition WHERE logicalrelid = 'mx_colocation_test_1'::regclass - OR logicalrelid = 'mx_colocation_test_2'::regclass -ORDER BY - logicalrelid::text; + OR logicalrelid = 'mx_colocation_test_2'::regclass; logicalrelid | colocationid --------------------------------------------------------------------- - mx_colocation_test_1 | 10001 mx_colocation_test_2 | 10001 + mx_colocation_test_1 | 10001 (2 rows) \c - - - :worker_1_port @@ -1119,13 +1115,11 @@ FROM pg_dist_partition WHERE logicalrelid = 'mx_colocation_test_1'::regclass - OR logicalrelid = 'mx_colocation_test_2'::regclass -ORDER BY - logicalrelid::text; + OR logicalrelid = 'mx_colocation_test_2'::regclass; logicalrelid | colocationid --------------------------------------------------------------------- - mx_colocation_test_1 | 10001 mx_colocation_test_2 | 10001 + mx_colocation_test_1 | 10001 (2 rows) \c - - - :master_port diff --git a/src/test/regress/expected/multi_metadata_sync_0.out b/src/test/regress/expected/multi_metadata_sync_0.out index 5fb08bbc3..5d5aa56dd 100644 --- a/src/test/regress/expected/multi_metadata_sync_0.out +++ b/src/test/regress/expected/multi_metadata_sync_0.out @@ -497,7 +497,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid; 5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t (4 rows) -SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text; +SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid; logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted --------------------------------------------------------------------- mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f @@ -635,7 +635,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid; 5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t (4 rows) -SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text; +SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid; logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted --------------------------------------------------------------------- mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f @@ -872,7 +872,7 @@ WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass ORDER BY - logicalrelid::text; + logicalrelid; logicalrelid | repmodel --------------------------------------------------------------------- mx_test_schema_1.mx_table_1 | s @@ -888,7 +888,7 @@ WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass ORDER BY - logicalrelid::text, shardid; + logicalrelid, shardid; logicalrelid | shardid | nodename | nodeport --------------------------------------------------------------------- mx_test_schema_1.mx_table_1 | 1310020 | localhost | 57637 @@ -921,9 +921,7 @@ FROM pg_dist_partition WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass - OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass -ORDER BY - logicalrelid::text; + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass; logicalrelid | repmodel --------------------------------------------------------------------- mx_test_schema_1.mx_table_1 | s @@ -939,7 +937,7 @@ WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass ORDER BY - logicalrelid::text, shardid; + logicalrelid, shardid; logicalrelid | shardid | nodename | nodeport --------------------------------------------------------------------- mx_test_schema_1.mx_table_1 | 1310020 | localhost | 57637 @@ -1083,7 +1081,7 @@ FROM WHERE logicalrelid = 'mx_colocation_test_1'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass -ORDER BY logicalrelid::text; +ORDER BY logicalrelid; logicalrelid | colocationid --------------------------------------------------------------------- mx_colocation_test_1 | 10000 @@ -1103,13 +1101,11 @@ FROM pg_dist_partition WHERE logicalrelid = 'mx_colocation_test_1'::regclass - OR logicalrelid = 'mx_colocation_test_2'::regclass -ORDER BY - logicalrelid::text; + OR logicalrelid = 'mx_colocation_test_2'::regclass; logicalrelid | colocationid --------------------------------------------------------------------- - mx_colocation_test_1 | 10001 mx_colocation_test_2 | 10001 + mx_colocation_test_1 | 10001 (2 rows) \c - - - :worker_1_port @@ -1119,13 +1115,11 @@ FROM pg_dist_partition WHERE logicalrelid = 'mx_colocation_test_1'::regclass - OR logicalrelid = 'mx_colocation_test_2'::regclass -ORDER BY - logicalrelid::text; + OR logicalrelid = 'mx_colocation_test_2'::regclass; logicalrelid | colocationid --------------------------------------------------------------------- - mx_colocation_test_1 | 10001 mx_colocation_test_2 | 10001 + mx_colocation_test_1 | 10001 (2 rows) \c - - - :master_port diff --git a/src/test/regress/expected/upgrade_basic_after.out b/src/test/regress/expected/upgrade_basic_after.out index ff118c593..e1fab2685 100644 --- a/src/test/regress/expected/upgrade_basic_after.out +++ b/src/test/regress/expected/upgrade_basic_after.out @@ -39,29 +39,16 @@ SELECT nextval('pg_dist_colocationid_seq') = MAX(colocationid)+1 FROM pg_dist_co t (1 row) --- while testing sequences on pg_dist_cleanup, they return null in pg upgrade schedule --- but return a valid value in citus upgrade schedule --- that's why we accept both NULL and MAX()+1 here -SELECT - CASE WHEN MAX(operation_id) IS NULL - THEN true - ELSE nextval('pg_dist_operationid_seq') = MAX(operation_id)+1 - END AS check_operationid - FROM pg_dist_cleanup; - check_operationid +SELECT nextval('pg_dist_operationid_seq') = MAX(operation_id)+1 FROM pg_dist_cleanup; + ?column? --------------------------------------------------------------------- - t + (1 row) -SELECT - CASE WHEN MAX(record_id) IS NULL - THEN true - ELSE nextval('pg_dist_cleanup_recordid_seq') = MAX(record_id)+1 - END AS check_recordid - FROM pg_dist_cleanup; - check_recordid +SELECT nextval('pg_dist_cleanup_recordid_seq') = MAX(record_id)+1 FROM pg_dist_cleanup; + ?column? --------------------------------------------------------------------- - t + (1 row) SELECT nextval('pg_dist_background_job_job_id_seq') > COALESCE(MAX(job_id), 0) FROM pg_dist_background_job; diff --git a/src/test/regress/expected/upgrade_pg_dist_cleanup_after.out b/src/test/regress/expected/upgrade_pg_dist_cleanup_after.out deleted file mode 100644 index a55945e55..000000000 --- a/src/test/regress/expected/upgrade_pg_dist_cleanup_after.out +++ /dev/null @@ -1,13 +0,0 @@ -\set upgrade_test_old_citus_version `echo "$CITUS_OLD_VERSION"` -SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int > 11 OR - (substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int = 11 AND - substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 2) -AS upgrade_test_old_citus_version_gte_11_2; - upgrade_test_old_citus_version_gte_11_2 ---------------------------------------------------------------------- - t -(1 row) - -\gset -\if :upgrade_test_old_citus_version_gte_11_2 -\q \ No newline at end of file diff --git a/src/test/regress/expected/upgrade_pg_dist_cleanup_after_0.out b/src/test/regress/expected/upgrade_pg_dist_cleanup_after_0.out deleted file mode 100644 index d71fad887..000000000 --- a/src/test/regress/expected/upgrade_pg_dist_cleanup_after_0.out +++ /dev/null @@ -1,30 +0,0 @@ -\set upgrade_test_old_citus_version `echo "$CITUS_OLD_VERSION"` -SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int > 11 OR - (substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int = 11 AND - substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 2) -AS upgrade_test_old_citus_version_gte_11_2; - upgrade_test_old_citus_version_gte_11_2 ---------------------------------------------------------------------- - f -(1 row) - -\gset -\if :upgrade_test_old_citus_version_gte_11_2 -\q -\endif --- verify that the orphaned placement is deleted and cleanup record is created -SELECT COUNT(*) FROM pg_dist_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='table_with_orphaned_shards'::regclass); - count ---------------------------------------------------------------------- - 32 -(1 row) - -SELECT * FROM pg_dist_cleanup; - record_id | operation_id | object_type | object_name | node_group_id | policy_type ---------------------------------------------------------------------- - 1 | 0 | 1 | table_with_orphaned_shards_980001 | 1 | 0 -(1 row) - -CALL citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 1 orphaned resources -DROP TABLE table_with_orphaned_shards; diff --git a/src/test/regress/expected/upgrade_pg_dist_cleanup_before.out b/src/test/regress/expected/upgrade_pg_dist_cleanup_before.out deleted file mode 100644 index ebfe15ebe..000000000 --- a/src/test/regress/expected/upgrade_pg_dist_cleanup_before.out +++ /dev/null @@ -1,13 +0,0 @@ -\set upgrade_test_old_citus_version `echo "$CITUS_OLD_VERSION"` -SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int > 11 OR - (substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int = 11 AND - substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 2) -AS upgrade_test_old_citus_version_gte_11_2; - upgrade_test_old_citus_version_gte_11_2 ---------------------------------------------------------------------- - t -(1 row) - -\gset -\if :upgrade_test_old_citus_version_gte_11_2 -\q diff --git a/src/test/regress/expected/upgrade_pg_dist_cleanup_before_0.out b/src/test/regress/expected/upgrade_pg_dist_cleanup_before_0.out deleted file mode 100644 index a0cf9ceb1..000000000 --- a/src/test/regress/expected/upgrade_pg_dist_cleanup_before_0.out +++ /dev/null @@ -1,36 +0,0 @@ -\set upgrade_test_old_citus_version `echo "$CITUS_OLD_VERSION"` -SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int > 11 OR - (substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int = 11 AND - substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 2) -AS upgrade_test_old_citus_version_gte_11_2; - upgrade_test_old_citus_version_gte_11_2 ---------------------------------------------------------------------- - f -(1 row) - -\gset -\if :upgrade_test_old_citus_version_gte_11_2 -\q -\endif --- create a table with orphaned shards to see if orphaned shards will be dropped --- and cleanup records will be created for them -SET citus.next_shard_id TO 980000; -CREATE TABLE table_with_orphaned_shards (a int); -SELECT create_distributed_table('table_with_orphaned_shards', 'a'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - --- show all 32 placements are active -SELECT COUNT(*) FROM pg_dist_placement WHERE shardstate = 1 AND shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='table_with_orphaned_shards'::regclass); - count ---------------------------------------------------------------------- - 32 -(1 row) - --- create an orphaned placement based on an existing one -INSERT INTO pg_dist_placement(placementid, shardid, shardstate, shardlength, groupid) - SELECT nextval('pg_dist_placement_placementid_seq'::regclass), shardid, 4, shardlength, 3-groupid - FROM pg_dist_placement - WHERE shardid = 980001; diff --git a/src/test/regress/sql/background_rebalance_parallel.sql b/src/test/regress/sql/background_rebalance_parallel.sql index 5229e7f88..8c5fb5bb1 100644 --- a/src/test/regress/sql/background_rebalance_parallel.sql +++ b/src/test/regress/sql/background_rebalance_parallel.sql @@ -1,22 +1,12 @@ --- --- BACKGROUND_REBALANCE_PARALLEL --- --- Test to check if the background tasks scheduled by the background rebalancer --- have the correct dependencies --- --- Test to verify that we do not allow parallel rebalancer moves involving a --- particular node (either as source or target) more than --- citus.max_background_task_executors_per_node, and that we can change the GUC on --- the fly, and that will affect the ongoing balance as it should --- --- Test to verify that there's a hard dependency when a specific node is first being --- used as a source for a move, and then later as a target. --- +/* + Test to check if the background tasks scheduled by the background rebalancer + has the correct dependencies. +*/ CREATE SCHEMA background_rebalance_parallel; SET search_path TO background_rebalance_parallel; SET citus.next_shard_id TO 85674000; SET citus.shard_replication_factor TO 1; -SET client_min_messages TO ERROR; +SET client_min_messages TO WARNING; ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; @@ -36,34 +26,34 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port); ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; SELECT pg_reload_conf(); --- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group +/* Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group */ CREATE TABLE table1_colg1 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none'); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4 , colocate_with => 'none'); CREATE TABLE table2_colg1 (b int PRIMARY KEY); -SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); +SELECT create_distributed_table('table2_colg1', 'b' , colocate_with => 'table1_colg1'); --- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group +/* Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group */ CREATE TABLE table1_colg2 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none'); +SELECT create_distributed_table('table1_colg2 ', 'a', shard_count => 4, colocate_with => 'none'); CREATE TABLE table2_colg2 (b int primary key); -SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); +SELECT create_distributed_table('table2_colg2', 'b' , colocate_with => 'table1_colg2'); --- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group +/* Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group */ CREATE TABLE table1_colg3 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none'); +SELECT create_distributed_table('table1_colg3 ', 'a', shard_count => 4, colocate_with => 'none'); CREATE TABLE table2_colg3 (b int primary key); -SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); +SELECT create_distributed_table('table2_colg3', 'b' , colocate_with => 'table1_colg3'); --- Add two new nodes so that we can rebalance +/* Add two new node so that we can rebalance */ SELECT 1 FROM citus_add_node('localhost', :worker_3_port); SELECT 1 FROM citus_add_node('localhost', :worker_4_port); @@ -73,12 +63,10 @@ SELECT * FROM citus_rebalance_start(); SELECT citus_rebalance_wait(); --- PART 1 --- Test to check if the background tasks scheduled by the background rebalancer --- have the correct dependencies - --- Check that a move is dependent on --- any other move scheduled earlier in its colocation group. +/*Check that a move is dependent on + 1. any other move scheduled earlier in its colocation group. + 2. any other move scheduled earlier whose source node or target + node overlaps with the current moves nodes. */ SELECT S.shardid, P.colocationid FROM pg_dist_shard S, pg_dist_partition P WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC; @@ -90,14 +78,14 @@ SELECT D.task_id, FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id, D.depends_on ASC; --- Check that if there is a reference table that needs to be synched to a node, --- any move without a dependency must depend on the move task for reference table. +/* Check that if there is a reference table that needs to be synched to a node, + any move without a dependency must depend on the move task for reference table. */ SELECT 1 FROM citus_drain_node('localhost',:worker_4_port); SELECT public.wait_for_resource_cleanup(); SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true); --- Drain worker_3 so that we can move only one colocation group to worker_3 --- to create an unbalance that would cause parallel rebalancing. +/* Drain worker_3 so that we can move only one colocation group to worker_3 + to create an unbalance that would cause parallel rebalancing. */ SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true); @@ -107,7 +95,7 @@ CREATE TABLE ref_table(a int PRIMARY KEY); SELECT create_reference_table('ref_table'); --- Move all the shards of Colocation group 3 to worker_3. +/* Move all the shards of Colocation group 3 to worker_3.*/ SELECT master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') FROM @@ -119,7 +107,7 @@ ORDER BY CALL citus_cleanup_orphaned_resources(); --- Activate and new nodes so that we can rebalance. +/* Activate and new nodes so that we can rebalance. */ SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true); @@ -140,265 +128,13 @@ SELECT D.task_id, (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id, D.depends_on ASC; --- PART 2 --- Test to verify that we do not allow parallel rebalancer moves involving a --- particular node (either as source or target) --- more than citus.max_background_task_executors_per_node --- and that we can change the GUC on the fly --- citus_task_wait calls are used to ensure consistent pg_dist_background_task query --- output i.e. to avoid flakiness - --- First let's restart the scenario DROP SCHEMA background_rebalance_parallel CASCADE; TRUNCATE pg_dist_background_job CASCADE; -TRUNCATE pg_dist_background_task CASCADE; -TRUNCATE pg_dist_background_task_depend; SELECT public.wait_for_resource_cleanup(); -select citus_remove_node('localhost', :worker_2_port); select citus_remove_node('localhost', :worker_3_port); select citus_remove_node('localhost', :worker_4_port); select citus_remove_node('localhost', :worker_5_port); select citus_remove_node('localhost', :worker_6_port); -CREATE SCHEMA background_rebalance_parallel; -SET search_path TO background_rebalance_parallel; - --- Create 8 tables in 4 colocation groups, and populate them -CREATE TABLE table1_colg1 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg1', 'a', shard_count => 3, colocate_with => 'none'); -INSERT INTO table1_colg1 SELECT i FROM generate_series(0, 100)i; - -CREATE TABLE table2_colg1 (b int PRIMARY KEY); -SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); -INSERT INTO table2_colg1 SELECT i FROM generate_series(0, 100)i; - -CREATE TABLE table1_colg2 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg2', 'a', shard_count => 3, colocate_with => 'none'); -INSERT INTO table1_colg2 SELECT i FROM generate_series(0, 100)i; - -CREATE TABLE table2_colg2 (b int PRIMARY KEY); -SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); -INSERT INTO table2_colg2 SELECT i FROM generate_series(0, 100)i; - -CREATE TABLE table1_colg3 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg3', 'a', shard_count => 3, colocate_with => 'none'); -INSERT INTO table1_colg3 SELECT i FROM generate_series(0, 100)i; - -CREATE TABLE table2_colg3 (b int primary key); -SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); -INSERT INTO table2_colg3 SELECT i FROM generate_series(0, 100)i; - -CREATE TABLE table1_colg4 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg4', 'a', shard_count => 3, colocate_with => 'none'); -INSERT INTO table1_colg4 SELECT i FROM generate_series(0, 100)i; - -CREATE TABLE table2_colg4 (b int PRIMARY KEY); -SELECT create_distributed_table('table2_colg4', 'b', colocate_with => 'table1_colg4'); -INSERT INTO table2_colg4 SELECT i FROM generate_series(0, 100)i; - --- Add nodes so that we can rebalance -SELECT citus_add_node('localhost', :worker_2_port); -SELECT citus_add_node('localhost', :worker_3_port); - -SELECT citus_rebalance_start AS job_id from citus_rebalance_start() \gset - --- see dependent tasks to understand which tasks remain runnable because of --- citus.max_background_task_executors_per_node --- and which tasks are actually blocked from colocation group dependencies -SELECT D.task_id, - (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), - D.depends_on, - (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) -FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; - --- default citus.max_background_task_executors_per_node is 1 --- show that first exactly one task per node is running --- among the tasks that are not blocked -SELECT citus_task_wait(1013, desired_status => 'running'); -SELECT job_id, task_id, status, nodes_involved -FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; - --- increase citus.max_background_task_executors_per_node -ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2; -SELECT pg_reload_conf(); -SELECT citus_task_wait(1015, desired_status => 'running'); -SELECT citus_task_wait(1013, desired_status => 'done'); - --- show that at most 2 tasks per node are running --- among the tasks that are not blocked -SELECT job_id, task_id, status, nodes_involved -FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; - --- decrease to default (1) -ALTER SYSTEM RESET citus.max_background_task_executors_per_node; -SELECT pg_reload_conf(); -SELECT citus_task_wait(1015, desired_status => 'done'); -SELECT citus_task_wait(1014, desired_status => 'done'); -SELECT citus_task_wait(1016, desired_status => 'running'); - --- show that exactly one task per node is running --- among the tasks that are not blocked -SELECT job_id, task_id, status, nodes_involved -FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; - -SELECT citus_rebalance_stop(); - --- PART 3 --- Test to verify that there's a hard dependency when A specific node is first being used as a --- source for a move, and then later as a target. - --- First let's restart the scenario -DROP SCHEMA background_rebalance_parallel CASCADE; -TRUNCATE pg_dist_background_job CASCADE; -TRUNCATE pg_dist_background_task CASCADE; -TRUNCATE pg_dist_background_task_depend; -SELECT public.wait_for_resource_cleanup(); -select citus_remove_node('localhost', :worker_1_port); -select citus_remove_node('localhost', :worker_2_port); -select citus_remove_node('localhost', :worker_3_port); -CREATE SCHEMA background_rebalance_parallel; -SET search_path TO background_rebalance_parallel; -SET citus.next_shard_id TO 85674051; -ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 61; - --- add the first node --- nodeid here is 61 -select citus_add_node('localhost', :worker_1_port); - --- create, populate and distribute 6 tables, each with 1 shard, none colocated with each other -CREATE TABLE table1_colg1 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg1', 'a', shard_count => 1, colocate_with => 'none'); -INSERT INTO table1_colg1 SELECT i FROM generate_series(0, 100)i; - -CREATE TABLE table1_colg2 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg2', 'a', shard_count => 1, colocate_with => 'none'); -INSERT INTO table1_colg2 SELECT i FROM generate_series(0, 100)i; - -CREATE TABLE table1_colg3 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg3', 'a', shard_count => 1, colocate_with => 'none'); -INSERT INTO table1_colg3 SELECT i FROM generate_series(0, 100)i; - -CREATE TABLE table1_colg4 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg4', 'a', shard_count => 1, colocate_with => 'none'); -INSERT INTO table1_colg4 SELECT i FROM generate_series(0, 100)i; - -CREATE TABLE table1_colg5 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg5', 'a', shard_count => 1, colocate_with => 'none'); -INSERT INTO table1_colg5 SELECT i FROM generate_series(0, 100)i; - -CREATE TABLE table1_colg6 (a int PRIMARY KEY); -SELECT create_distributed_table('table1_colg6', 'a', shard_count => 1, colocate_with => 'none'); -INSERT INTO table1_colg6 SELECT i FROM generate_series(0, 100)i; - --- add two other nodes --- nodeid here is 62 -select citus_add_node('localhost', :worker_2_port); --- nodeid here is 63 -select citus_add_node('localhost', :worker_3_port); - -CREATE OR REPLACE FUNCTION shard_placement_rebalance_array( - worker_node_list json[], - shard_placement_list json[], - threshold float4 DEFAULT 0, - max_shard_moves int DEFAULT 1000000, - drain_only bool DEFAULT false, - improvement_threshold float4 DEFAULT 0.5 -) -RETURNS json[] -AS 'citus' -LANGUAGE C STRICT VOLATILE; - --- we are simulating the following from shard_rebalancer_unit.sql --- the following steps are all according to this scenario --- where the third move should be dependent of the first two --- because the third move's target is the source of the first two -SELECT unnest(shard_placement_rebalance_array( - ARRAY['{"node_name": "hostname1", "disallowed_shards": "1,2,3,5,6"}', - '{"node_name": "hostname2", "disallowed_shards": "4"}', - '{"node_name": "hostname3", "disallowed_shards": "4"}' - ]::json[], - ARRAY['{"shardid":1, "nodename":"hostname1"}', - '{"shardid":2, "nodename":"hostname1"}', - '{"shardid":3, "nodename":"hostname2"}', - '{"shardid":4, "nodename":"hostname2"}', - '{"shardid":5, "nodename":"hostname3"}', - '{"shardid":6, "nodename":"hostname3"}' - ]::json[] -)); - --- manually balance the cluster such that we have --- a balanced cluster like above with 1,2,3,4,5,6 and hostname1/2/3 --- shardid 85674051 (1) nodeid 61 (hostname1) --- shardid 85674052 (2) nodeid 61 (hostname1) --- shardid 85674053 (3) nodeid 62 (hostname2) --- shardid 85674054 (4) nodeid 62 (hostname2) --- shardid 85674055 (5) nodeid 63 (hostname3) --- shardid 85674056 (6) nodeid 63 (hostname3) -SELECT pg_catalog.citus_move_shard_placement(85674053,61,62,'auto'); -SELECT pg_catalog.citus_move_shard_placement(85674054,61,62,'auto'); -SELECT pg_catalog.citus_move_shard_placement(85674055,61,63,'auto'); -SELECT pg_catalog.citus_move_shard_placement(85674056,61,63,'auto'); - --- now create another rebalance strategy in order to simulate moves --- which use as target a node that has been previously used as source -CREATE OR REPLACE FUNCTION test_shard_allowed_on_node(shardid bigint, nodeid int) - RETURNS boolean AS -$$ - -- analogous to '{"node_name": "hostname1", "disallowed_shards": "1,2,3,5,6"}' - select case when (shardid != 85674054 and nodeid = 61) - then false - -- analogous to '{"node_name": "hostname2", "disallowed_shards": "4"}' - -- AND '{"node_name": "hostname2", "disallowed_shards": "4"}' - when (shardid = 85674054 and nodeid != 61) - then false - else true - end; -$$ LANGUAGE sql; - --- insert the new test rebalance strategy -INSERT INTO - pg_catalog.pg_dist_rebalance_strategy( - name, - default_strategy, - shard_cost_function, - node_capacity_function, - shard_allowed_on_node_function, - default_threshold, - minimum_threshold, - improvement_threshold - ) VALUES ( - 'test_source_then_target', - false, - 'citus_shard_cost_1', - 'citus_node_capacity_1', - 'background_rebalance_parallel.test_shard_allowed_on_node', - 0, - 0, - 0 - ); - -SELECT * FROM get_rebalance_table_shards_plan(rebalance_strategy := 'test_source_then_target'); - -SELECT citus_rebalance_start AS job_id from citus_rebalance_start(rebalance_strategy := 'test_source_then_target') \gset - --- check that the third move is blocked and depends on the first two -SELECT job_id, task_id, status, nodes_involved -FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; - -SELECT D.task_id, - (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), - D.depends_on, - (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) -FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; - -SELECT citus_rebalance_stop(); -DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_source_then_target'; - -DROP SCHEMA background_rebalance_parallel CASCADE; -TRUNCATE pg_dist_background_job CASCADE; -TRUNCATE pg_dist_background_task CASCADE; -TRUNCATE pg_dist_background_task_depend; -SELECT public.wait_for_resource_cleanup(); -select citus_remove_node('localhost', :worker_3_port); -- keep the rest of the tests inact that depends node/group ids ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls; diff --git a/src/test/regress/sql/background_task_queue_monitor.sql b/src/test/regress/sql/background_task_queue_monitor.sql index 9f6abb73a..04bb898db 100644 --- a/src/test/regress/sql/background_task_queue_monitor.sql +++ b/src/test/regress/sql/background_task_queue_monitor.sql @@ -3,7 +3,6 @@ SET search_path TO background_task_queue_monitor; SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 3536400; -SET client_min_messages TO ERROR; -- reset sequence values ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 1450000; @@ -280,106 +279,11 @@ SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2) ORDER BY job_id, task_id; -- show that task is cancelled --- TEST11 --- verify that we do not allow parallel task executors involving a particular node --- more than citus.max_background_task_executors_per_node --- verify that we can change citus.max_background_task_executors_per_node on the fly --- tests are done with dummy node ids --- citus_task_wait calls are used to ensure consistent pg_dist_background_task query --- output i.e. to avoid flakiness - -BEGIN; -INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify changing max background task executors per node on the fly') RETURNING job_id AS job_id1 \gset -INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(2); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id1 \gset -INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(2); $job$, ARRAY [3, 4]) RETURNING task_id AS task_id2 \gset -INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id3 \gset -INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [1, 3]) RETURNING task_id AS task_id4 \gset -INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [2, 4]) RETURNING task_id AS task_id5 \gset -INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(7); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id6 \gset -INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(6); $job$, ARRAY [1, 3]) RETURNING task_id AS task_id7 \gset -INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(6); $job$, ARRAY [1, 4]) RETURNING task_id AS task_id8 \gset -COMMIT; - -SELECT citus_task_wait(:task_id1, desired_status => 'running'); -SELECT citus_task_wait(:task_id2, desired_status => 'running'); - -SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task - WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, - :task_id5, :task_id6, :task_id7, :task_id8) - ORDER BY job_id, task_id; -- show that at most 1 task per node is running - -SELECT citus_task_wait(:task_id1, desired_status => 'done'); -SELECT citus_task_wait(:task_id2, desired_status => 'done'); --- increase max_background_task_executors_per_node on the fly -ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2; -SELECT pg_reload_conf(); - -SELECT citus_task_wait(:task_id3, desired_status => 'running'); -SELECT citus_task_wait(:task_id4, desired_status => 'running'); -SELECT citus_task_wait(:task_id5, desired_status => 'running'); - -SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task - WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, - :task_id5, :task_id6, :task_id7, :task_id8) - ORDER BY job_id, task_id; -- show that at most 2 tasks per node are running - --- increase to 3 max_background_task_executors_per_node on the fly -SELECT citus_task_wait(:task_id3, desired_status => 'done'); -SELECT citus_task_wait(:task_id4, desired_status => 'done'); -SELECT citus_task_wait(:task_id5, desired_status => 'done'); -ALTER SYSTEM SET citus.max_background_task_executors_per_node = 3; -SELECT pg_reload_conf(); - -SELECT citus_task_wait(:task_id6, desired_status => 'running'); -SELECT citus_task_wait(:task_id7, desired_status => 'running'); -SELECT citus_task_wait(:task_id8, desired_status => 'running'); - -SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task - WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, - :task_id5, :task_id6, :task_id7, :task_id8) - ORDER BY job_id, task_id; -- show that at most 3 tasks per node are running - -ALTER SYSTEM RESET citus.max_background_task_executors_per_node; -SELECT pg_reload_conf(); - --- if pg_cancel_backend is called on one of the running task PIDs --- task doesn't restart because it's not allowed anymore by the limit. --- node with id 1 can be used only once, unless there are previously running tasks -SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset -SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process - --- task goes to only runnable state, not running anymore. -SELECT citus_task_wait(:task_id6, desired_status => 'runnable'); - --- show that cancelled task hasn't restarted because limit doesn't allow it -SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task - WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, - :task_id5, :task_id6, :task_id7, :task_id8) - ORDER BY job_id, task_id; - -SELECT citus_task_wait(:task_id7, desired_status => 'done'); -SELECT citus_task_wait(:task_id8, desired_status => 'done'); -SELECT citus_task_wait(:task_id6, desired_status => 'running'); - --- show that the 6th task has restarted only after both 6 and 7 are done --- since we have a limit of 1 background task executor per node with id 1 -SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task - WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, - :task_id5, :task_id6, :task_id7, :task_id8) - ORDER BY job_id, task_id; - -SELECT citus_job_cancel(:job_id1); -SELECT citus_job_wait(:job_id1); - -ALTER SYSTEM RESET citus.max_background_task_executors_per_node; -SELECT pg_reload_conf(); - SET client_min_messages TO WARNING; TRUNCATE pg_dist_background_job CASCADE; TRUNCATE pg_dist_background_task CASCADE; TRUNCATE pg_dist_background_task_depend; DROP SCHEMA background_task_queue_monitor CASCADE; -RESET client_min_messages; ALTER SYSTEM RESET citus.background_task_queue_interval; ALTER SYSTEM RESET citus.max_background_task_executors; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 6a99dc0b4..d202227ae 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -529,13 +529,33 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Test downgrade to 11.1-1 from 11.2-1 ALTER EXTENSION citus UPDATE TO '11.2-1'; + +-- create a table with orphaned shards to see if orphaned shards will be dropped +-- and cleanup records will be created for them +SET citus.shard_replication_factor to 1; +CREATE TABLE table_with_orphaned_shards (a int); +SELECT create_distributed_table('table_with_orphaned_shards', 'a'); +-- show there are 4 placements +SELECT * FROM pg_dist_placement ORDER BY shardid; +-- mark two of them as orphaned +UPDATE pg_dist_placement SET shardstate = 4 WHERE shardid % 2 = 1; + ALTER EXTENSION citus UPDATE TO '11.1-1'; + +-- show placements and cleanup records +SELECT * FROM pg_dist_placement ORDER BY shardid; +SELECT * FROM pg_dist_cleanup; + -- Should be empty result since upgrade+downgrade should be a no-op SELECT * FROM multi_extension.print_extension_changes(); -- Snapshot of state at 11.2-1 ALTER EXTENSION citus UPDATE TO '11.2-1'; +-- verify that the placements are deleted and cleanup records are created +SELECT * FROM pg_dist_placement ORDER BY shardid; +SELECT * FROM pg_dist_cleanup; + ALTER EXTENSION citus_columnar UPDATE TO '11.2-1'; -- Make sure that we defined dependencies from all rel objects (tables, @@ -569,21 +589,20 @@ SELECT COUNT(*)=5 FROM columnar_schema_members_pg_depend; DROP TABLE columnar_schema_members, columnar_schema_members_pg_depend; +-- error out as cleanup records remain +ALTER EXTENSION citus UPDATE TO '11.0-4'; + +-- cleanup +SET client_min_messages TO ERROR; +CALL citus_cleanup_orphaned_resources(); +DROP TABLE table_with_orphaned_shards; +RESET client_min_messages; + SELECT * FROM multi_extension.print_extension_changes(); --- Test downgrade to 11.2-1 from 11.2-2 -ALTER EXTENSION citus UPDATE TO '11.2-2'; -ALTER EXTENSION citus UPDATE TO '11.2-1'; --- Should be empty result since upgrade+downgrade should be a no-op -SELECT * FROM multi_extension.print_extension_changes(); - --- Snapshot of state at 11.2-2 -ALTER EXTENSION citus UPDATE TO '11.2-2'; -SELECT * FROM multi_extension.print_extension_changes(); - --- Test downgrade to 11.2-2 from 11.3-1 +-- Test downgrade to 11.2-1 from 11.3-1 ALTER EXTENSION citus UPDATE TO '11.3-1'; -ALTER EXTENSION citus UPDATE TO '11.2-2'; +ALTER EXTENSION citus UPDATE TO '11.2-1'; -- Should be empty result since upgrade+downgrade should be a no-op SELECT * FROM multi_extension.print_extension_changes(); diff --git a/src/test/regress/sql/multi_metadata_attributes.sql b/src/test/regress/sql/multi_metadata_attributes.sql index 1a592d858..58351310c 100644 --- a/src/test/regress/sql/multi_metadata_attributes.sql +++ b/src/test/regress/sql/multi_metadata_attributes.sql @@ -10,6 +10,5 @@ FROM pg_attribute WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass, 'pg_dist_rebalance_strategy'::regclass, 'pg_dist_partition'::regclass, - 'pg_dist_object'::regclass, - 'pg_dist_background_task'::regclass) + 'pg_dist_object'::regclass) ORDER BY attrelid, attname; diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index 0b9d46fe2..f90270cf6 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -111,7 +111,7 @@ SELECT nodeid, hasmetadata FROM pg_dist_node WHERE nodename='localhost' AND node \c - - - :worker_1_port SELECT * FROM pg_dist_local_group; SELECT * FROM pg_dist_node ORDER BY nodeid; -SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text; +SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid; SELECT * FROM pg_dist_shard WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY shardid; SELECT * FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid::text LIKE 'mx_testing_schema%') ORDER BY shardid, nodename, nodeport; SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_testing_schema.mx_test_table'::regclass; @@ -161,7 +161,7 @@ SELECT 1 FROM citus_activate_node('localhost', :worker_1_port); \c - - - :worker_1_port SELECT * FROM pg_dist_local_group; SELECT * FROM pg_dist_node ORDER BY nodeid; -SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text; +SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid; SELECT * FROM pg_dist_shard WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY shardid; SELECT * FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid::text LIKE 'mx_testing_schema%') ORDER BY shardid, nodename, nodeport; SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_testing_schema.mx_test_table'::regclass; @@ -252,7 +252,7 @@ WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass ORDER BY - logicalrelid::text; + logicalrelid; -- See the shards and placements of the mx tables SELECT @@ -263,7 +263,7 @@ WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass ORDER BY - logicalrelid::text, shardid; + logicalrelid, shardid; -- Check that metadata of MX tables exist on the metadata worker \c - - - :worker_1_port @@ -278,9 +278,7 @@ FROM pg_dist_partition WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass - OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass -ORDER BY - logicalrelid::text; + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass; -- Check that shard and placement data are created SELECT @@ -291,7 +289,7 @@ WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass ORDER BY - logicalrelid::text, shardid; + logicalrelid, shardid; -- Check that metadata of MX tables don't exist on the non-metadata worker \c - - - :worker_2_port @@ -383,7 +381,7 @@ FROM WHERE logicalrelid = 'mx_colocation_test_1'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass -ORDER BY logicalrelid::text; +ORDER BY logicalrelid; -- Update colocation and see the changes on the master and the worker SELECT update_distributed_table_colocation('mx_colocation_test_1', colocate_with => 'mx_colocation_test_2'); @@ -393,9 +391,7 @@ FROM pg_dist_partition WHERE logicalrelid = 'mx_colocation_test_1'::regclass - OR logicalrelid = 'mx_colocation_test_2'::regclass -ORDER BY - logicalrelid::text; + OR logicalrelid = 'mx_colocation_test_2'::regclass; \c - - - :worker_1_port SELECT logicalrelid, colocationid @@ -403,9 +399,7 @@ FROM pg_dist_partition WHERE logicalrelid = 'mx_colocation_test_1'::regclass - OR logicalrelid = 'mx_colocation_test_2'::regclass -ORDER BY - logicalrelid::text; + OR logicalrelid = 'mx_colocation_test_2'::regclass; \c - - - :master_port diff --git a/src/test/regress/sql/upgrade_basic_after.sql b/src/test/regress/sql/upgrade_basic_after.sql index 03c218a06..2a4c20b3a 100644 --- a/src/test/regress/sql/upgrade_basic_after.sql +++ b/src/test/regress/sql/upgrade_basic_after.sql @@ -8,21 +8,8 @@ SELECT nextval('pg_dist_placement_placementid_seq') = MAX(placementid)+1 FROM pg SELECT nextval('pg_dist_groupid_seq') = MAX(groupid)+1 FROM pg_dist_node; SELECT nextval('pg_dist_node_nodeid_seq') = MAX(nodeid)+1 FROM pg_dist_node; SELECT nextval('pg_dist_colocationid_seq') = MAX(colocationid)+1 FROM pg_dist_colocation; --- while testing sequences on pg_dist_cleanup, they return null in pg upgrade schedule --- but return a valid value in citus upgrade schedule --- that's why we accept both NULL and MAX()+1 here -SELECT - CASE WHEN MAX(operation_id) IS NULL - THEN true - ELSE nextval('pg_dist_operationid_seq') = MAX(operation_id)+1 - END AS check_operationid - FROM pg_dist_cleanup; -SELECT - CASE WHEN MAX(record_id) IS NULL - THEN true - ELSE nextval('pg_dist_cleanup_recordid_seq') = MAX(record_id)+1 - END AS check_recordid - FROM pg_dist_cleanup; +SELECT nextval('pg_dist_operationid_seq') = MAX(operation_id)+1 FROM pg_dist_cleanup; +SELECT nextval('pg_dist_cleanup_recordid_seq') = MAX(record_id)+1 FROM pg_dist_cleanup; SELECT nextval('pg_dist_background_job_job_id_seq') > COALESCE(MAX(job_id), 0) FROM pg_dist_background_job; SELECT nextval('pg_dist_background_task_task_id_seq') > COALESCE(MAX(task_id), 0) FROM pg_dist_background_task; SELECT last_value > 0 FROM pg_dist_clock_logical_seq; diff --git a/src/test/regress/sql/upgrade_pg_dist_cleanup_after.sql b/src/test/regress/sql/upgrade_pg_dist_cleanup_after.sql deleted file mode 100644 index e84c35b60..000000000 --- a/src/test/regress/sql/upgrade_pg_dist_cleanup_after.sql +++ /dev/null @@ -1,15 +0,0 @@ -\set upgrade_test_old_citus_version `echo "$CITUS_OLD_VERSION"` -SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int > 11 OR - (substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int = 11 AND - substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 2) -AS upgrade_test_old_citus_version_gte_11_2; -\gset -\if :upgrade_test_old_citus_version_gte_11_2 -\q -\endif - --- verify that the orphaned placement is deleted and cleanup record is created -SELECT COUNT(*) FROM pg_dist_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='table_with_orphaned_shards'::regclass); -SELECT * FROM pg_dist_cleanup; -CALL citus_cleanup_orphaned_resources(); -DROP TABLE table_with_orphaned_shards; diff --git a/src/test/regress/sql/upgrade_pg_dist_cleanup_before.sql b/src/test/regress/sql/upgrade_pg_dist_cleanup_before.sql deleted file mode 100644 index 62ec8a1fb..000000000 --- a/src/test/regress/sql/upgrade_pg_dist_cleanup_before.sql +++ /dev/null @@ -1,22 +0,0 @@ -\set upgrade_test_old_citus_version `echo "$CITUS_OLD_VERSION"` -SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int > 11 OR - (substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int = 11 AND - substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 2) -AS upgrade_test_old_citus_version_gte_11_2; -\gset -\if :upgrade_test_old_citus_version_gte_11_2 -\q -\endif - --- create a table with orphaned shards to see if orphaned shards will be dropped --- and cleanup records will be created for them -SET citus.next_shard_id TO 980000; -CREATE TABLE table_with_orphaned_shards (a int); -SELECT create_distributed_table('table_with_orphaned_shards', 'a'); --- show all 32 placements are active -SELECT COUNT(*) FROM pg_dist_placement WHERE shardstate = 1 AND shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='table_with_orphaned_shards'::regclass); --- create an orphaned placement based on an existing one -INSERT INTO pg_dist_placement(placementid, shardid, shardstate, shardlength, groupid) - SELECT nextval('pg_dist_placement_placementid_seq'::regclass), shardid, 4, shardlength, 3-groupid - FROM pg_dist_placement - WHERE shardid = 980001;