Revert "Fixes flakiness in multi_metadata_sync test (#6824)"

This reverts commit 6cfcc3709ffe1e869e4b606fd592a0b63cfbc66a.
pull/6843/head
Emel Şimşek 2023-04-10 16:12:03 +03:00
parent b49cd144b0
commit 1aea5b1a51
42 changed files with 270 additions and 1789 deletions

View File

@ -6,7 +6,7 @@ orbs:
parameters: parameters:
image_suffix: image_suffix:
type: string type: string
default: '-v3417e8d' default: '-v087ecd7'
pg13_version: pg13_version:
type: string type: string
default: '13.10' default: '13.10'
@ -490,6 +490,10 @@ jobs:
pg_major: << parameters.pg_major >> pg_major: << parameters.pg_major >>
- configure - configure
- enable_core - enable_core
- run:
name: 'Install DBI.pm'
command: |
apt-get update && apt-get install libdbi-perl && apt-get install libdbd-pg-perl
- run: - run:
name: 'Run Test' name: 'Run Test'
command: | command: |

View File

@ -37,7 +37,8 @@ OBJS += \
all: cdc all: cdc
cdc: cdc:
$(MAKE) -C cdc all echo "running cdc make"
$(MAKE) DECODER=pgoutput -C cdc all
NO_PGXS = 1 NO_PGXS = 1
@ -84,13 +85,12 @@ ifneq (,$(SQL_Po_files))
include $(SQL_Po_files) include $(SQL_Po_files)
endif endif
.PHONY: clean-full install install-downgrades install-all
.PHONY: clean-full install install-downgrades install-all install-cdc clean-cdc
clean: clean-cdc clean: clean-cdc
clean-cdc: clean-cdc:
$(MAKE) -C cdc clean $(MAKE) DECODER=pgoutput -C cdc clean
cleanup-before-install: cleanup-before-install:
rm -f $(DESTDIR)$(datadir)/$(datamoduledir)/citus.control rm -f $(DESTDIR)$(datadir)/$(datamoduledir)/citus.control
@ -99,7 +99,7 @@ cleanup-before-install:
install: cleanup-before-install install-cdc install: cleanup-before-install install-cdc
install-cdc: install-cdc:
$(MAKE) -C cdc install $(MAKE) DECODER=pgoutput -C cdc install
# install and install-downgrades should be run sequentially # install and install-downgrades should be run sequentially
install-all: install install-all: install

View File

@ -1,45 +1,26 @@
ifndef DECODER
DECODER = pgoutput
endif
MODULE_big = citus_$(DECODER)
citus_subdir = src/backend/distributed/cdc
citus_top_builddir = ../../../.. citus_top_builddir = ../../../..
citus_decoders_dir = $(DESTDIR)$(pkglibdir)/citus_decoders
OBJS += cdc_decoder.o cdc_decoder_utils.o
include $(citus_top_builddir)/Makefile.global include $(citus_top_builddir)/Makefile.global
citus_subdir = src/backend/distributed/cdc override CFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include
SRC_DIR = $(citus_abs_top_srcdir)/$(citus_subdir) override CPPFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include
#List of supported based decoders. Add new decoders here. install: install-cdc
cdc_base_decoders :=pgoutput wal2json
all: build-cdc-decoders clean: clean-cdc
copy-decoder-files-to-build-dir: install-cdc:
$(eval DECODER_BUILD_DIR=build-cdc-$(DECODER)) mkdir -p '$(citus_decoders_dir)'
mkdir -p $(DECODER_BUILD_DIR) $(INSTALL_SHLIB) citus_$(DECODER).so '$(citus_decoders_dir)/$(DECODER).so'
@for file in $(SRC_DIR)/*.c $(SRC_DIR)/*.h; do \
if [ -f $$file ]; then \
if [ -f $(DECODER_BUILD_DIR)/$$(basename $$file) ]; then \
if ! diff -q $$file $(DECODER_BUILD_DIR)/$$(basename $$file); then \
cp $$file $(DECODER_BUILD_DIR)/$$(basename $$file); \
fi \
else \
cp $$file $(DECODER_BUILD_DIR)/$$(basename $$file); \
fi \
fi \
done
cp $(SRC_DIR)/Makefile.decoder $(DECODER_BUILD_DIR)/Makefile
build-cdc-decoders:
$(foreach base_decoder,$(cdc_base_decoders),$(MAKE) DECODER=$(base_decoder) build-cdc-decoder;)
install-cdc-decoders:
$(foreach base_decoder,$(cdc_base_decoders),$(MAKE) DECODER=$(base_decoder) -C build-cdc-$(base_decoder) install;)
clean-cdc-decoders:
$(foreach base_decoder,$(cdc_base_decoders),rm -rf build-cdc-$(base_decoder);)
build-cdc-decoder:
$(MAKE) DECODER=$(DECODER) copy-decoder-files-to-build-dir
$(MAKE) DECODER=$(DECODER) -C build-cdc-$(DECODER)
install: install-cdc-decoders
clean: clean-cdc-decoders
clean-cdc:
rm -f '$(DESTDIR)$(datadir)/$(datamoduledir)/citus_decoders/$(DECODER).so'

View File

@ -1,24 +0,0 @@
MODULE_big = citus_$(DECODER)
citus_decoders_dir = $(DESTDIR)$(pkglibdir)/citus_decoders
citus_top_builddir = ../../../../..
citus_subdir = src/backend/distributed/cdc/cdc_$(DECODER)
OBJS += cdc_decoder.o cdc_decoder_utils.o
include $(citus_top_builddir)/Makefile.global
override CFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include
override CPPFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include
install: install-cdc
clean: clean-cdc
install-cdc:
mkdir -p '$(citus_decoders_dir)'
$(INSTALL_SHLIB) citus_$(DECODER).so '$(citus_decoders_dir)/$(DECODER).so'
clean-cdc:
rm -f '$(DESTDIR)$(datadir)/$(datamoduledir)/citus_decoders/$(DECODER).so'

View File

@ -32,7 +32,6 @@
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/extension.h" #include "commands/extension.h"
#include "commands/sequence.h" #include "commands/sequence.h"
#include "distributed/background_jobs.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
@ -58,9 +57,7 @@
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/tuplestore.h" #include "distributed/tuplestore.h"
#include "distributed/utils/array_type.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
@ -780,6 +777,7 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
{ {
partitionedShardNames = lappend(partitionedShardNames, quotedShardName); partitionedShardNames = lappend(partitionedShardNames, quotedShardName);
} }
/* for non-partitioned tables, we will use Postgres' size functions */ /* for non-partitioned tables, we will use Postgres' size functions */
else else
{ {
@ -2818,8 +2816,7 @@ CreateBackgroundJob(const char *jobType, const char *description)
*/ */
BackgroundTask * BackgroundTask *
ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskCount, ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskCount,
int64 dependingTaskIds[], int nodesInvolvedCount, int32 int64 dependingTaskIds[])
nodesInvolved[])
{ {
BackgroundTask *task = NULL; BackgroundTask *task = NULL;
@ -2893,11 +2890,6 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC
values[Anum_pg_dist_background_task_message - 1] = CStringGetTextDatum(""); values[Anum_pg_dist_background_task_message - 1] = CStringGetTextDatum("");
nulls[Anum_pg_dist_background_task_message - 1] = false; nulls[Anum_pg_dist_background_task_message - 1] = false;
values[Anum_pg_dist_background_task_nodes_involved - 1] =
IntArrayToDatum(nodesInvolvedCount, nodesInvolved);
nulls[Anum_pg_dist_background_task_nodes_involved - 1] = (nodesInvolvedCount ==
0);
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask), HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask),
values, nulls); values, nulls);
CatalogTupleInsert(pgDistBackgroundTask, newTuple); CatalogTupleInsert(pgDistBackgroundTask, newTuple);
@ -3209,13 +3201,6 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple)
TextDatumGetCString(values[Anum_pg_dist_background_task_message - 1]); TextDatumGetCString(values[Anum_pg_dist_background_task_message - 1]);
} }
if (!nulls[Anum_pg_dist_background_task_nodes_involved - 1])
{
ArrayType *nodesInvolvedArrayObject =
DatumGetArrayTypeP(values[Anum_pg_dist_background_task_nodes_involved - 1]);
task->nodesInvolved = IntegerArrayTypeToList(nodesInvolvedArrayObject);
}
return task; return task;
} }
@ -3348,8 +3333,7 @@ GetRunnableBackgroundTask(void)
while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor))) while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor)))
{ {
task = DeformBackgroundTaskHeapTuple(tupleDescriptor, taskTuple); task = DeformBackgroundTaskHeapTuple(tupleDescriptor, taskTuple);
if (BackgroundTaskReadyToRun(task) && if (BackgroundTaskReadyToRun(task))
IncrementParallelTaskCountForNodesInvolved(task))
{ {
/* found task, close table and return */ /* found task, close table and return */
break; break;

View File

@ -190,32 +190,13 @@ typedef struct WorkerShardStatistics
HTAB *statistics; HTAB *statistics;
} WorkerShardStatistics; } WorkerShardStatistics;
/* /* ShardMoveDependencyHashEntry contains the taskId which any new shard move task within the corresponding colocation group must take a dependency on */
* ShardMoveDependencyHashEntry contains the taskId which any new shard
* move task within the corresponding colocation group
* must take a dependency on
*/
typedef struct ShardMoveDependencyInfo typedef struct ShardMoveDependencyInfo
{ {
int64 key; int64 key;
int64 taskId; int64 taskId;
} ShardMoveDependencyInfo; } ShardMoveDependencyInfo;
/*
* ShardMoveSourceNodeHashEntry keeps track of the source nodes
* of the moves.
*/
typedef struct ShardMoveSourceNodeHashEntry
{
/* this is the key */
int32 node_id;
List *taskIds;
} ShardMoveSourceNodeHashEntry;
/*
* ShardMoveDependencies keeps track of all needed dependencies
* between shard moves.
*/
typedef struct ShardMoveDependencies typedef struct ShardMoveDependencies
{ {
HTAB *colocationDependencies; HTAB *colocationDependencies;
@ -293,15 +274,6 @@ static void AddToWorkerShardIdSet(HTAB *shardsByWorker, char *workerName, int wo
static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics); static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics);
static void ErrorOnConcurrentRebalance(RebalanceOptions *); static void ErrorOnConcurrentRebalance(RebalanceOptions *);
static List * GetSetCommandListForNewConnections(void); static List * GetSetCommandListForNewConnections(void);
static int64 GetColocationId(PlacementUpdateEvent *move);
static ShardMoveDependencies InitializeShardMoveDependencies();
static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64
colocationId,
ShardMoveDependencies shardMoveDependencies,
int *nDepends);
static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId,
int64 taskId,
ShardMoveDependencies shardMoveDependencies);
/* declarations for dynamic loading */ /* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(rebalance_table_shards); PG_FUNCTION_INFO_V1(rebalance_table_shards);
@ -1958,7 +1930,8 @@ GetColocationId(PlacementUpdateEvent *move)
* InitializeShardMoveDependencies function creates the hash maps that we use to track * InitializeShardMoveDependencies function creates the hash maps that we use to track
* the latest moves so that subsequent moves with the same properties must take a dependency * the latest moves so that subsequent moves with the same properties must take a dependency
* on them. There are two hash maps. One is for tracking the latest move scheduled in a * on them. There are two hash maps. One is for tracking the latest move scheduled in a
* given colocation group and the other one is for tracking source nodes of all moves. * given colocation group and the other one is for tracking the latest move which involves
* a given node either as its source node or its target node.
*/ */
static ShardMoveDependencies static ShardMoveDependencies
InitializeShardMoveDependencies() InitializeShardMoveDependencies()
@ -1968,17 +1941,18 @@ InitializeShardMoveDependencies()
ShardMoveDependencyInfo, ShardMoveDependencyInfo,
"colocationDependencyHashMap", "colocationDependencyHashMap",
6); 6);
shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int32, shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int64,
ShardMoveSourceNodeHashEntry, ShardMoveDependencyInfo,
"nodeDependencyHashMap", "nodeDependencyHashMap",
6); 6);
return shardMoveDependencies; return shardMoveDependencies;
} }
/* /*
* GenerateTaskMoveDependencyList creates and returns a List of taskIds that * GenerateTaskMoveDependencyList creates and returns a List of taskIds that
* the move must take a dependency on, given the shard move dependencies as input. * the move must take a dependency on.
*/ */
static int64 * static int64 *
GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
@ -1998,24 +1972,27 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
} }
/* /* Check if there exists a move scheduled earlier whose source or target node
* Check if there exists moves scheduled earlier whose source node * overlaps with the current move's source node. */
* overlaps with the current move's target node. shardMoveDependencyInfo = hash_search(
* The earlier/first move might make space for the later/second move. shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER,
* So we could run out of disk space (or at least overload the node)
* if we move the second shard to it before the first one is moved away. 
*/
ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search(
shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_FIND,
&found); &found);
if (found) if (found)
{ {
int64 *taskId = NULL; hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
foreach_ptr(taskId, shardMoveSourceNodeHashEntry->taskIds) }
{
hash_search(dependsList, taskId, HASH_ENTER, NULL); /* Check if there exists a move scheduled earlier whose source or target node
} * overlaps with the current move's target node. */
shardMoveDependencyInfo = hash_search(
shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_ENTER,
&found);
if (found)
{
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
} }
*nDepends = hash_get_num_entries(dependsList); *nDepends = hash_get_num_entries(dependsList);
@ -2053,20 +2030,15 @@ UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int
shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL); shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL);
shardMoveDependencyInfo->taskId = taskId; shardMoveDependencyInfo->taskId = taskId;
bool found; shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies,
ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search( &move->sourceNode->nodeId, HASH_ENTER, NULL);
shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER,
&found);
if (!found) shardMoveDependencyInfo->taskId = taskId;
{
shardMoveSourceNodeHashEntry->taskIds = NIL;
}
int64 *newTaskId = palloc0(sizeof(int64)); shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies,
*newTaskId = taskId; &move->targetNode->nodeId, HASH_ENTER, NULL);
shardMoveSourceNodeHashEntry->taskIds = lappend(
shardMoveSourceNodeHashEntry->taskIds, newTaskId); shardMoveDependencyInfo->taskId = taskId;
} }
@ -2163,10 +2135,8 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
appendStringInfo(&buf, appendStringInfo(&buf,
"SELECT pg_catalog.replicate_reference_tables(%s)", "SELECT pg_catalog.replicate_reference_tables(%s)",
quote_literal_cstr(shardTranferModeLabel)); quote_literal_cstr(shardTranferModeLabel));
int32 nodesInvolved[] = { 0 };
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, 0, BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, 0,
NULL, 0, nodesInvolved); NULL);
replicateRefTablesTaskId = task->taskid; replicateRefTablesTaskId = task->taskid;
} }
@ -2200,14 +2170,9 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
dependsArray[0] = replicateRefTablesTaskId; dependsArray[0] = replicateRefTablesTaskId;
} }
int32 nodesInvolved[2] = { 0 };
nodesInvolved[0] = move->sourceNode->nodeId;
nodesInvolved[1] = move->targetNode->nodeId;
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data,
nDepends, nDepends,
dependsArray, 2, dependsArray);
nodesInvolved);
UpdateShardMoveDependencies(move, colocationId, task->taskid, UpdateShardMoveDependencies(move, colocationId, task->taskid,
shardMoveDependencies); shardMoveDependencies);

View File

@ -1793,18 +1793,6 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_background_task_executors_per_node",
gettext_noop(
"Sets the maximum number of parallel background task executor workers "
"for scheduled background tasks that involve a particular node"),
NULL,
&MaxBackgroundTaskExecutorsPerNode,
1, 1, 128,
PGC_SIGHUP,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.max_cached_connection_lifetime", "citus.max_cached_connection_lifetime",
gettext_noop("Sets the maximum lifetime of cached connections to other nodes."), gettext_noop("Sets the maximum lifetime of cached connections to other nodes."),

View File

@ -1,2 +0,0 @@
-- citus--11.2-1--11.2-2
#include "udfs/worker_adjust_identity_column_seq_ranges/11.2-2.sql"

View File

@ -1,5 +1,6 @@
-- citus--11.2-1--11.3-1 -- citus--11.2-1--11.3-1
#include "udfs/repl_origin_helper/11.3-1.sql" #include "udfs/repl_origin_helper/11.3-1.sql"
#include "udfs/worker_adjust_identity_column_seq_ranges/11.3-1.sql"
ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY USING INDEX pg_dist_authinfo_identification_index; ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY USING INDEX pg_dist_authinfo_identification_index;
ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY USING INDEX pg_dist_partition_logical_relid_index; ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY USING INDEX pg_dist_partition_logical_relid_index;
ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY USING INDEX pg_dist_placement_placementid_index; ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY USING INDEX pg_dist_placement_placementid_index;
@ -14,7 +15,3 @@ ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY USING INDEX pg_dist_
#include "udfs/citus_stat_tenants_local_reset/11.3-1.sql" #include "udfs/citus_stat_tenants_local_reset/11.3-1.sql"
#include "udfs/citus_stat_tenants_reset/11.3-1.sql" #include "udfs/citus_stat_tenants_reset/11.3-1.sql"
-- we introduce nodes_involved, which will be used internally to
-- limit the number of parallel tasks running per node
ALTER TABLE pg_catalog.pg_dist_background_task ADD COLUMN nodes_involved int[] DEFAULT NULL;

View File

@ -1,2 +0,0 @@
-- citus--11.2-2--11.2-1
DROP FUNCTION IF EXISTS pg_catalog.worker_adjust_identity_column_seq_ranges(regclass);

View File

@ -3,6 +3,7 @@
DROP FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking(); DROP FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking();
DROP FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking(); DROP FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking();
DROP FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active(); DROP FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active();
DROP FUNCTION IF EXISTS pg_catalog.worker_adjust_identity_column_seq_ranges(regclass);
ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY NOTHING; ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY NOTHING; ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY NOTHING; ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY NOTHING;
@ -28,5 +29,3 @@ DROP FUNCTION pg_catalog.citus_stat_tenants(boolean);
DROP FUNCTION pg_catalog.citus_stat_tenants_local_reset(); DROP FUNCTION pg_catalog.citus_stat_tenants_local_reset();
DROP FUNCTION pg_catalog.citus_stat_tenants_reset(); DROP FUNCTION pg_catalog.citus_stat_tenants_reset();
ALTER TABLE pg_catalog.pg_dist_background_task DROP COLUMN nodes_involved;

View File

@ -1,3 +0,0 @@
-- Since we backported the UDF below from version 11.3, the definition is the same
#include "11.3-1.sql"

View File

@ -140,34 +140,3 @@ TextArrayTypeToIntegerList(ArrayType *arrayObject)
return list; return list;
} }
/*
* IntArrayToDatum
*
* Convert an integer array to the datum int array format
* (currently used for nodes_involved in pg_dist_background_task)
*
* Returns the array in the form of a Datum, or PointerGetDatum(NULL)
* if the int_array is empty.
*/
Datum
IntArrayToDatum(uint32 int_array_size, int int_array[])
{
if (int_array_size == 0)
{
return PointerGetDatum(NULL);
}
ArrayBuildState *astate = NULL;
for (int i = 0; i < int_array_size; i++)
{
Datum dvalue = Int32GetDatum(int_array[i]);
bool disnull = false;
Oid element_type = INT4OID;
astate = accumArrayResult(astate, dvalue, disnull, element_type,
CurrentMemoryContext);
}
return makeArrayResult(astate, CurrentMemoryContext);
}

View File

@ -63,7 +63,6 @@
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_utility.h" #include "distributed/metadata_utility.h"
#include "distributed/shard_cleaner.h" #include "distributed/shard_cleaner.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
/* Table-of-contents constants for our dynamic shared memory segment. */ /* Table-of-contents constants for our dynamic shared memory segment. */
@ -116,17 +115,12 @@ static bool MonitorGotTerminationOrCancellationRequest();
static void QueueMonitorSigTermHandler(SIGNAL_ARGS); static void QueueMonitorSigTermHandler(SIGNAL_ARGS);
static void QueueMonitorSigIntHandler(SIGNAL_ARGS); static void QueueMonitorSigIntHandler(SIGNAL_ARGS);
static void QueueMonitorSigHupHandler(SIGNAL_ARGS); static void QueueMonitorSigHupHandler(SIGNAL_ARGS);
static void DecrementParallelTaskCountForNodesInvolved(BackgroundTask *task);
/* flags set by signal handlers */ /* flags set by signal handlers */
static volatile sig_atomic_t GotSigterm = false; static volatile sig_atomic_t GotSigterm = false;
static volatile sig_atomic_t GotSigint = false; static volatile sig_atomic_t GotSigint = false;
static volatile sig_atomic_t GotSighup = false; static volatile sig_atomic_t GotSighup = false;
/* keeping track of parallel background tasks per node */
HTAB *ParallelTasksPerNode = NULL;
int MaxBackgroundTaskExecutorsPerNode = 1;
PG_FUNCTION_INFO_V1(citus_job_cancel); PG_FUNCTION_INFO_V1(citus_job_cancel);
PG_FUNCTION_INFO_V1(citus_job_wait); PG_FUNCTION_INFO_V1(citus_job_wait);
PG_FUNCTION_INFO_V1(citus_task_wait); PG_FUNCTION_INFO_V1(citus_task_wait);
@ -217,7 +211,7 @@ citus_job_wait(PG_FUNCTION_ARGS)
* assume any terminal state as its desired status. The function returns if the * assume any terminal state as its desired status. The function returns if the
* desired_state was reached. * desired_state was reached.
* *
* The current implementation is a polling implementation with an interval of 0.1 seconds. * The current implementation is a polling implementation with an interval of 1 second.
* Ideally we would have some synchronization between the background tasks queue monitor * Ideally we would have some synchronization between the background tasks queue monitor
* and any backend calling this function to receive a signal when the task changes state. * and any backend calling this function to receive a signal when the task changes state.
*/ */
@ -863,7 +857,6 @@ TaskEnded(TaskExecutionContext *taskExecutionContext)
UpdateBackgroundTask(task); UpdateBackgroundTask(task);
UpdateDependingTasks(task); UpdateDependingTasks(task);
UpdateBackgroundJob(task->jobid); UpdateBackgroundJob(task->jobid);
DecrementParallelTaskCountForNodesInvolved(task);
/* we are sure that at least one task did not block on current iteration */ /* we are sure that at least one task did not block on current iteration */
queueMonitorExecutionContext->allTasksWouldBlock = false; queueMonitorExecutionContext->allTasksWouldBlock = false;
@ -875,77 +868,6 @@ TaskEnded(TaskExecutionContext *taskExecutionContext)
} }
/*
* IncrementParallelTaskCountForNodesInvolved
* Checks whether we have reached the limit of parallel tasks per node
* per each of the nodes involved with the task
* If at least one limit is reached, it returns false.
* If limits aren't reached, it increments the parallel task count
* for each of the nodes involved with the task, and returns true.
*/
bool
IncrementParallelTaskCountForNodesInvolved(BackgroundTask *task)
{
if (task->nodesInvolved)
{
int node;
/* first check whether we have reached the limit for any of the nodes */
foreach_int(node, task->nodesInvolved)
{
bool found;
ParallelTasksPerNodeEntry *hashEntry = hash_search(
ParallelTasksPerNode, &(node), HASH_ENTER, &found);
if (!found)
{
hashEntry->counter = 0;
}
else if (hashEntry->counter >= MaxBackgroundTaskExecutorsPerNode)
{
/* at least one node's limit is reached */
return false;
}
}
/* then, increment the parallel task count per each node */
foreach_int(node, task->nodesInvolved)
{
ParallelTasksPerNodeEntry *hashEntry = hash_search(
ParallelTasksPerNode, &(node), HASH_FIND, NULL);
Assert(hashEntry);
hashEntry->counter += 1;
}
}
return true;
}
/*
* DecrementParallelTaskCountForNodesInvolved
* Decrements the parallel task count for each of the nodes involved
* with the task.
* We call this function after the task has gone through Running state
* and then has ended.
*/
static void
DecrementParallelTaskCountForNodesInvolved(BackgroundTask *task)
{
if (task->nodesInvolved)
{
int node;
foreach_int(node, task->nodesInvolved)
{
ParallelTasksPerNodeEntry *hashEntry = hash_search(ParallelTasksPerNode,
&(node),
HASH_FIND, NULL);
hashEntry->counter -= 1;
}
}
}
/* /*
* QueueMonitorSigHupHandler handles SIGHUP to update monitor related config params. * QueueMonitorSigHupHandler handles SIGHUP to update monitor related config params.
*/ */
@ -1101,7 +1023,7 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg)
/* handle SIGINT to properly cancel active task executors */ /* handle SIGINT to properly cancel active task executors */
pqsignal(SIGINT, QueueMonitorSigIntHandler); pqsignal(SIGINT, QueueMonitorSigIntHandler);
/* handle SIGHUP to update MaxBackgroundTaskExecutors and MaxBackgroundTaskExecutorsPerNode */ /* handle SIGHUP to update MaxBackgroundTaskExecutors */
pqsignal(SIGHUP, QueueMonitorSigHupHandler); pqsignal(SIGHUP, QueueMonitorSigHupHandler);
/* ready to handle signals */ /* ready to handle signals */
@ -1245,15 +1167,10 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg)
{ {
GotSighup = false; GotSighup = false;
/* update max_background_task_executors and max_background_task_executors_per_node if changed */ /* update max_background_task_executors if changed */
ProcessConfigFile(PGC_SIGHUP); ProcessConfigFile(PGC_SIGHUP);
} }
if (ParallelTasksPerNode == NULL)
{
ParallelTasksPerNode = CreateSimpleHash(int32, ParallelTasksPerNodeEntry);
}
/* assign runnable tasks, if any, to new task executors in a transaction if we do not have SIGTERM or SIGINT */ /* assign runnable tasks, if any, to new task executors in a transaction if we do not have SIGTERM or SIGINT */
if (!MonitorGotTerminationOrCancellationRequest()) if (!MonitorGotTerminationOrCancellationRequest())
{ {

View File

@ -85,21 +85,6 @@ typedef struct TaskExecutionContext
} TaskExecutionContext; } TaskExecutionContext;
/*
* ParallelTasksPerNodeEntry is the struct used
* to track the number of concurrent background tasks that
* involve a particular node (the key to the entry)
*/
typedef struct ParallelTasksPerNodeEntry
{
/* Used as hash key. */
int32 node_id;
/* number of concurrent background tasks that involve node node_id */
uint32 counter;
} ParallelTasksPerNodeEntry;
extern BackgroundWorkerHandle * StartCitusBackgroundTaskQueueMonitor(Oid database, extern BackgroundWorkerHandle * StartCitusBackgroundTaskQueueMonitor(Oid database,
Oid extensionOwner); Oid extensionOwner);
extern void CitusBackgroundTaskQueueMonitorMain(Datum arg); extern void CitusBackgroundTaskQueueMonitorMain(Datum arg);
@ -110,6 +95,5 @@ extern Datum citus_job_wait(PG_FUNCTION_ARGS);
extern Datum citus_task_wait(PG_FUNCTION_ARGS); extern Datum citus_task_wait(PG_FUNCTION_ARGS);
extern void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus); extern void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus);
extern void citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus); extern void citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus);
extern bool IncrementParallelTaskCountForNodesInvolved(BackgroundTask *task);
#endif /*CITUS_BACKGROUND_JOBS_H */ #endif /*CITUS_BACKGROUND_JOBS_H */

View File

@ -252,7 +252,6 @@ typedef struct BackgroundTask
int32 *retry_count; int32 *retry_count;
TimestampTz *not_before; TimestampTz *not_before;
char *message; char *message;
List *nodesInvolved;
/* extra space to store values for nullable value types above */ /* extra space to store values for nullable value types above */
struct struct
@ -389,9 +388,7 @@ extern bool HasNonTerminalJobOfType(const char *jobType, int64 *jobIdOut);
extern int64 CreateBackgroundJob(const char *jobType, const char *description); extern int64 CreateBackgroundJob(const char *jobType, const char *description);
extern BackgroundTask * ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, extern BackgroundTask * ScheduleBackgroundTask(int64 jobId, Oid owner, char *command,
int dependingTaskCount, int dependingTaskCount,
int64 dependingTaskIds[], int64 dependingTaskIds[]);
int nodesInvolvedCount,
int32 nodesInvolved[]);
extern BackgroundTask * GetRunnableBackgroundTask(void); extern BackgroundTask * GetRunnableBackgroundTask(void);
extern void ResetRunningBackgroundTasks(void); extern void ResetRunningBackgroundTasks(void);
extern BackgroundJob * GetBackgroundJobByJobId(int64 jobId); extern BackgroundJob * GetBackgroundJobByJobId(int64 jobId);

View File

@ -15,7 +15,7 @@
* compiler constants for pg_dist_background_task * compiler constants for pg_dist_background_task
* ---------------- * ----------------
*/ */
#define Natts_pg_dist_background_task 10 #define Natts_pg_dist_background_task 9
#define Anum_pg_dist_background_task_job_id 1 #define Anum_pg_dist_background_task_job_id 1
#define Anum_pg_dist_background_task_task_id 2 #define Anum_pg_dist_background_task_task_id 2
#define Anum_pg_dist_background_task_owner 3 #define Anum_pg_dist_background_task_owner 3
@ -25,6 +25,5 @@
#define Anum_pg_dist_background_task_retry_count 7 #define Anum_pg_dist_background_task_retry_count 7
#define Anum_pg_dist_background_task_not_before 8 #define Anum_pg_dist_background_task_not_before 8
#define Anum_pg_dist_background_task_message 9 #define Anum_pg_dist_background_task_message 9
#define Anum_pg_dist_background_task_nodes_involved 10
#endif /* CITUS_PG_DIST_BACKGROUND_TASK_H */ #endif /* CITUS_PG_DIST_BACKGROUND_TASK_H */

View File

@ -190,7 +190,6 @@ extern char *VariablesToBePassedToNewConnections;
extern int MaxRebalancerLoggedIgnoredMoves; extern int MaxRebalancerLoggedIgnoredMoves;
extern bool RunningUnderIsolationTest; extern bool RunningUnderIsolationTest;
extern bool PropagateSessionSettingsForLoopbackConnection; extern bool PropagateSessionSettingsForLoopbackConnection;
extern int MaxBackgroundTaskExecutorsPerNode;
/* External function declarations */ /* External function declarations */
extern Datum shard_placement_rebalance_array(PG_FUNCTION_ARGS); extern Datum shard_placement_rebalance_array(PG_FUNCTION_ARGS);

View File

@ -22,6 +22,5 @@ extern ArrayType * DatumArrayToArrayType(Datum *datumArray, int datumCount,
Oid datumTypeId); Oid datumTypeId);
extern List * IntegerArrayTypeToList(ArrayType *arrayObject); extern List * IntegerArrayTypeToList(ArrayType *arrayObject);
extern List * TextArrayTypeToIntegerList(ArrayType *arrayObject); extern List * TextArrayTypeToIntegerList(ArrayType *arrayObject);
extern Datum IntArrayToDatum(uint32 int_array_size, int int_array[]);
#endif /* CITUS_ARRAY_TYPE_H */ #endif /* CITUS_ARRAY_TYPE_H */

View File

@ -1,51 +0,0 @@
# Schema change CDC test for Citus
use strict;
use warnings;
use Test::More;
use lib './t';
use cdctestlib;
use threads;
# Initialize co-ordinator node
my $select_stmt = qq(SELECT * FROM data_100008 ORDER BY id;);
my $result = 0;
### Create the citus cluster with coordinator and two worker nodes
our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636);
print("coordinator port: " . $node_coordinator->port() . "\n");
print("worker0 port:" . $workers[0]->port() . "\n");
my $initial_schema = "
CREATE TABLE data_100008(
id integer,
data text,
PRIMARY KEY (data));";
$node_coordinator->safe_psql('postgres',$initial_schema);
$node_coordinator->safe_psql('postgres','ALTER TABLE data_100008 REPLICA IDENTITY FULL;');
$node_coordinator->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','wal2json');");
#insert data into the data_100008 table in the coordinator node before distributing the table.
$node_coordinator->safe_psql('postgres',"
INSERT INTO data_100008
SELECT i, 'my test data ' || i
FROM generate_series(-1,1)i;");
my $output = $node_coordinator->safe_psql('postgres',"SELECT * FROM pg_logical_slot_get_changes('cdc_replication_slot', NULL, NULL);");
print($output);
my $change_string_expected = '[0,"my test data 0"]';
if ($output =~ /$change_string_expected/) {
$result = 1;
} else {
$result = 0;
}
is($result, 1, 'CDC create_distributed_table - wal2json test');
$node_coordinator->safe_psql('postgres',"SELECT pg_drop_replication_slot('cdc_replication_slot');");
done_testing();

View File

@ -1,7 +1,6 @@
# this schedule is to be run only on coordinators # this schedule is to be run only on coordinators
test: upgrade_citus_finish_citus_upgrade test: upgrade_citus_finish_citus_upgrade
test: upgrade_pg_dist_cleanup_after
test: upgrade_basic_after test: upgrade_basic_after
test: upgrade_partition_constraints_after test: upgrade_partition_constraints_after
test: upgrade_pg_dist_object_test_after test: upgrade_pg_dist_object_test_after

View File

@ -4,5 +4,4 @@ test: upgrade_basic_before
test: upgrade_partition_constraints_before test: upgrade_partition_constraints_before
test: upgrade_pg_dist_object_test_before test: upgrade_pg_dist_object_test_before
test: upgrade_columnar_metapage_before test: upgrade_columnar_metapage_before
test: upgrade_pg_dist_cleanup_before
test: upgrade_post_11_before test: upgrade_post_11_before

View File

@ -110,7 +110,7 @@ class CitusBaseClusterConfig(object, metaclass=NewInitCaller):
"max_connections": 1200, "max_connections": 1200,
} }
self.new_settings = {} self.new_settings = {}
self.add_coordinator_to_metadata = True self.add_coordinator_to_metadata = False
self.env_variables = {} self.env_variables = {}
self.skip_tests = [] self.skip_tests = []
@ -166,6 +166,7 @@ class CitusDefaultClusterConfig(CitusBaseClusterConfig):
"citus.use_citus_managed_tables": True, "citus.use_citus_managed_tables": True,
} }
self.settings.update(new_settings) self.settings.update(new_settings)
self.add_coordinator_to_metadata = True
self.skip_tests = [ self.skip_tests = [
# Alter Table statement cannot be run from an arbitrary node so this test will fail # Alter Table statement cannot be run from an arbitrary node so this test will fail
"arbitrary_configs_alter_table_add_constraint_without_name_create", "arbitrary_configs_alter_table_add_constraint_without_name_create",
@ -379,3 +380,4 @@ class PGUpgradeConfig(CitusBaseClusterConfig):
self.old_datadir = self.temp_dir + "/oldData" self.old_datadir = self.temp_dir + "/oldData"
self.new_datadir = self.temp_dir + "/newData" self.new_datadir = self.temp_dir + "/newData"
self.user = SUPER_USER_NAME self.user = SUPER_USER_NAME
self.add_coordinator_to_metadata = True

View File

@ -1,22 +1,12 @@
-- /*
-- BACKGROUND_REBALANCE_PARALLEL Test to check if the background tasks scheduled by the background rebalancer
-- has the correct dependencies.
-- Test to check if the background tasks scheduled by the background rebalancer */
-- have the correct dependencies
--
-- Test to verify that we do not allow parallel rebalancer moves involving a
-- particular node (either as source or target) more than
-- citus.max_background_task_executors_per_node, and that we can change the GUC on
-- the fly, and that will affect the ongoing balance as it should
--
-- Test to verify that there's a hard dependency when a specific node is first being
-- used as a source for a move, and then later as a target.
--
CREATE SCHEMA background_rebalance_parallel; CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel; SET search_path TO background_rebalance_parallel;
SET citus.next_shard_id TO 85674000; SET citus.next_shard_id TO 85674000;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET client_min_messages TO ERROR; SET client_min_messages TO WARNING;
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777;
ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050;
@ -55,52 +45,52 @@ SELECT pg_reload_conf();
t t
(1 row) (1 row)
-- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group /* Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group */
CREATE TABLE table1_colg1 (a int PRIMARY KEY); CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none'); SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4 , colocate_with => 'none');
create_distributed_table create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
CREATE TABLE table2_colg1 (b int PRIMARY KEY); CREATE TABLE table2_colg1 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); SELECT create_distributed_table('table2_colg1', 'b' , colocate_with => 'table1_colg1');
create_distributed_table create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
-- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group /* Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group */
CREATE TABLE table1_colg2 (a int PRIMARY KEY); CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none'); SELECT create_distributed_table('table1_colg2 ', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
CREATE TABLE table2_colg2 (b int primary key); CREATE TABLE table2_colg2 (b int primary key);
SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); SELECT create_distributed_table('table2_colg2', 'b' , colocate_with => 'table1_colg2');
create_distributed_table create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
-- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group /* Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group */
CREATE TABLE table1_colg3 (a int PRIMARY KEY); CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none'); SELECT create_distributed_table('table1_colg3 ', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
CREATE TABLE table2_colg3 (b int primary key); CREATE TABLE table2_colg3 (b int primary key);
SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); SELECT create_distributed_table('table2_colg3', 'b' , colocate_with => 'table1_colg3');
create_distributed_table create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
-- Add two new nodes so that we can rebalance /* Add two new node so that we can rebalance */
SELECT 1 FROM citus_add_node('localhost', :worker_3_port); SELECT 1 FROM citus_add_node('localhost', :worker_3_port);
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -142,11 +132,10 @@ SELECT citus_rebalance_wait();
(1 row) (1 row)
-- PART 1 /*Check that a move is dependent on
-- Test to check if the background tasks scheduled by the background rebalancer 1. any other move scheduled earlier in its colocation group.
-- have the correct dependencies 2. any other move scheduled earlier whose source node or target
-- Check that a move is dependent on node overlaps with the current moves nodes. */
-- any other move scheduled earlier in its colocation group.
SELECT S.shardid, P.colocationid SELECT S.shardid, P.colocationid
FROM pg_dist_shard S, pg_dist_partition P FROM pg_dist_shard S, pg_dist_partition P
WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC; WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC;
@ -186,12 +175,16 @@ FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id,
task_id | command | depends_on | command task_id | command | depends_on | command
--------------------------------------------------------------------- ---------------------------------------------------------------------
1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto') 1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto')
1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto')
1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto')
1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto')
1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto')
1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto')
1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto') 1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto')
(3 rows) (7 rows)
-- Check that if there is a reference table that needs to be synched to a node, /* Check that if there is a reference table that needs to be synched to a node,
-- any move without a dependency must depend on the move task for reference table. any move without a dependency must depend on the move task for reference table. */
SELECT 1 FROM citus_drain_node('localhost',:worker_4_port); SELECT 1 FROM citus_drain_node('localhost',:worker_4_port);
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -210,8 +203,8 @@ SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true)
1 1
(1 row) (1 row)
-- Drain worker_3 so that we can move only one colocation group to worker_3 /* Drain worker_3 so that we can move only one colocation group to worker_3
-- to create an unbalance that would cause parallel rebalancing. to create an unbalance that would cause parallel rebalancing. */
SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); SELECT 1 FROM citus_drain_node('localhost',:worker_3_port);
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -232,7 +225,7 @@ SELECT create_reference_table('ref_table');
(1 row) (1 row)
-- Move all the shards of Colocation group 3 to worker_3. /* Move all the shards of Colocation group 3 to worker_3.*/
SELECT SELECT
master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes')
FROM FROM
@ -250,7 +243,7 @@ ORDER BY
(4 rows) (4 rows)
CALL citus_cleanup_orphaned_resources(); CALL citus_cleanup_orphaned_resources();
-- Activate and new nodes so that we can rebalance. /* Activate and new nodes so that we can rebalance. */
SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); SELECT 1 FROM citus_activate_node('localhost', :worker_4_port);
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -330,34 +323,18 @@ FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id,
1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto') 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto')
1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') 1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto')
1010 | SELECT pg_catalog.citus_move_shard_placement(85674017,52,53,'auto') | 1007 | SELECT pg_catalog.citus_move_shard_placement(85674016,52,53,'auto') 1010 | SELECT pg_catalog.citus_move_shard_placement(85674017,52,53,'auto') | 1007 | SELECT pg_catalog.citus_move_shard_placement(85674016,52,53,'auto')
1011 | SELECT pg_catalog.citus_move_shard_placement(85674008,51,54,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto') 1011 | SELECT pg_catalog.citus_move_shard_placement(85674008,51,54,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto')
1012 | SELECT pg_catalog.citus_move_shard_placement(85674001,50,55,'auto') | 1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto') 1012 | SELECT pg_catalog.citus_move_shard_placement(85674001,50,55,'auto') | 1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto')
(6 rows) (6 rows)
-- PART 2
-- Test to verify that we do not allow parallel rebalancer moves involving a
-- particular node (either as source or target)
-- more than citus.max_background_task_executors_per_node
-- and that we can change the GUC on the fly
-- citus_task_wait calls are used to ensure consistent pg_dist_background_task query
-- output i.e. to avoid flakiness
-- First let's restart the scenario
DROP SCHEMA background_rebalance_parallel CASCADE; DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE; TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
SELECT public.wait_for_resource_cleanup(); SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup wait_for_resource_cleanup
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
select citus_remove_node('localhost', :worker_2_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_3_port); select citus_remove_node('localhost', :worker_3_port);
citus_remove_node citus_remove_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -382,474 +359,6 @@ select citus_remove_node('localhost', :worker_6_port);
(1 row) (1 row)
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
-- Create 8 tables in 4 colocation groups, and populate them
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 3, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg1 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table2_colg1 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table2_colg1 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2', 'a', shard_count => 3, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg2 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table2_colg2 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table2_colg2 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3', 'a', shard_count => 3, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg3 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table2_colg3 (b int primary key);
SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table2_colg3 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg4 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg4', 'a', shard_count => 3, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg4 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table2_colg4 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg4', 'b', colocate_with => 'table1_colg4');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table2_colg4 SELECT i FROM generate_series(0, 100)i;
-- Add nodes so that we can rebalance
SELECT citus_add_node('localhost', :worker_2_port);
citus_add_node
---------------------------------------------------------------------
56
(1 row)
SELECT citus_add_node('localhost', :worker_3_port);
citus_add_node
---------------------------------------------------------------------
57
(1 row)
SELECT citus_rebalance_start AS job_id from citus_rebalance_start() \gset
-- see dependent tasks to understand which tasks remain runnable because of
-- citus.max_background_task_executors_per_node
-- and which tasks are actually blocked from colocation group dependencies
SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
task_id | command | depends_on | command
---------------------------------------------------------------------
1014 | SELECT pg_catalog.citus_move_shard_placement(85674026,50,57,'auto') | 1013 | SELECT pg_catalog.citus_move_shard_placement(85674025,50,56,'auto')
1016 | SELECT pg_catalog.citus_move_shard_placement(85674032,50,57,'auto') | 1015 | SELECT pg_catalog.citus_move_shard_placement(85674031,50,56,'auto')
1018 | SELECT pg_catalog.citus_move_shard_placement(85674038,50,57,'auto') | 1017 | SELECT pg_catalog.citus_move_shard_placement(85674037,50,56,'auto')
1020 | SELECT pg_catalog.citus_move_shard_placement(85674044,50,57,'auto') | 1019 | SELECT pg_catalog.citus_move_shard_placement(85674043,50,56,'auto')
(4 rows)
-- default citus.max_background_task_executors_per_node is 1
-- show that first exactly one task per node is running
-- among the tasks that are not blocked
SELECT citus_task_wait(1013, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
17779 | 1013 | running | {50,56}
17779 | 1014 | blocked | {50,57}
17779 | 1015 | runnable | {50,56}
17779 | 1016 | blocked | {50,57}
17779 | 1017 | runnable | {50,56}
17779 | 1018 | blocked | {50,57}
17779 | 1019 | runnable | {50,56}
17779 | 1020 | blocked | {50,57}
(8 rows)
-- increase citus.max_background_task_executors_per_node
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT citus_task_wait(1015, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(1013, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
-- show that at most 2 tasks per node are running
-- among the tasks that are not blocked
SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
17779 | 1013 | done | {50,56}
17779 | 1014 | running | {50,57}
17779 | 1015 | running | {50,56}
17779 | 1016 | blocked | {50,57}
17779 | 1017 | runnable | {50,56}
17779 | 1018 | blocked | {50,57}
17779 | 1019 | runnable | {50,56}
17779 | 1020 | blocked | {50,57}
(8 rows)
-- decrease to default (1)
ALTER SYSTEM RESET citus.max_background_task_executors_per_node;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT citus_task_wait(1015, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(1014, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(1016, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
-- show that exactly one task per node is running
-- among the tasks that are not blocked
SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
17779 | 1013 | done | {50,56}
17779 | 1014 | done | {50,57}
17779 | 1015 | done | {50,56}
17779 | 1016 | running | {50,57}
17779 | 1017 | runnable | {50,56}
17779 | 1018 | blocked | {50,57}
17779 | 1019 | runnable | {50,56}
17779 | 1020 | blocked | {50,57}
(8 rows)
SELECT citus_rebalance_stop();
citus_rebalance_stop
---------------------------------------------------------------------
(1 row)
-- PART 3
-- Test to verify that there's a hard dependency when A specific node is first being used as a
-- source for a move, and then later as a target.
-- First let's restart the scenario
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_1_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_2_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_3_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
SET citus.next_shard_id TO 85674051;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 61;
-- add the first node
-- nodeid here is 61
select citus_add_node('localhost', :worker_1_port);
citus_add_node
---------------------------------------------------------------------
61
(1 row)
-- create, populate and distribute 6 tables, each with 1 shard, none colocated with each other
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 1, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg1 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2', 'a', shard_count => 1, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg2 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3', 'a', shard_count => 1, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg3 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg4 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg4', 'a', shard_count => 1, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg4 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg5 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg5', 'a', shard_count => 1, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg5 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg6 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg6', 'a', shard_count => 1, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg6 SELECT i FROM generate_series(0, 100)i;
-- add two other nodes
-- nodeid here is 62
select citus_add_node('localhost', :worker_2_port);
citus_add_node
---------------------------------------------------------------------
62
(1 row)
-- nodeid here is 63
select citus_add_node('localhost', :worker_3_port);
citus_add_node
---------------------------------------------------------------------
63
(1 row)
CREATE OR REPLACE FUNCTION shard_placement_rebalance_array(
worker_node_list json[],
shard_placement_list json[],
threshold float4 DEFAULT 0,
max_shard_moves int DEFAULT 1000000,
drain_only bool DEFAULT false,
improvement_threshold float4 DEFAULT 0.5
)
RETURNS json[]
AS 'citus'
LANGUAGE C STRICT VOLATILE;
-- we are simulating the following from shard_rebalancer_unit.sql
-- the following steps are all according to this scenario
-- where the third move should be dependent of the first two
-- because the third move's target is the source of the first two
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "hostname1", "disallowed_shards": "1,2,3,5,6"}',
'{"node_name": "hostname2", "disallowed_shards": "4"}',
'{"node_name": "hostname3", "disallowed_shards": "4"}'
]::json[],
ARRAY['{"shardid":1, "nodename":"hostname1"}',
'{"shardid":2, "nodename":"hostname1"}',
'{"shardid":3, "nodename":"hostname2"}',
'{"shardid":4, "nodename":"hostname2"}',
'{"shardid":5, "nodename":"hostname3"}',
'{"shardid":6, "nodename":"hostname3"}'
]::json[]
));
unnest
---------------------------------------------------------------------
{"updatetype":1,"shardid":1,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname2","targetport":5432}
{"updatetype":1,"shardid":2,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname3","targetport":5432}
{"updatetype":1,"shardid":4,"sourcename":"hostname2","sourceport":5432,"targetname":"hostname1","targetport":5432}
(3 rows)
-- manually balance the cluster such that we have
-- a balanced cluster like above with 1,2,3,4,5,6 and hostname1/2/3
-- shardid 85674051 (1) nodeid 61 (hostname1)
-- shardid 85674052 (2) nodeid 61 (hostname1)
-- shardid 85674053 (3) nodeid 62 (hostname2)
-- shardid 85674054 (4) nodeid 62 (hostname2)
-- shardid 85674055 (5) nodeid 63 (hostname3)
-- shardid 85674056 (6) nodeid 63 (hostname3)
SELECT pg_catalog.citus_move_shard_placement(85674053,61,62,'auto');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT pg_catalog.citus_move_shard_placement(85674054,61,62,'auto');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT pg_catalog.citus_move_shard_placement(85674055,61,63,'auto');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT pg_catalog.citus_move_shard_placement(85674056,61,63,'auto');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- now create another rebalance strategy in order to simulate moves
-- which use as target a node that has been previously used as source
CREATE OR REPLACE FUNCTION test_shard_allowed_on_node(shardid bigint, nodeid int)
RETURNS boolean AS
$$
-- analogous to '{"node_name": "hostname1", "disallowed_shards": "1,2,3,5,6"}'
select case when (shardid != 85674054 and nodeid = 61)
then false
-- analogous to '{"node_name": "hostname2", "disallowed_shards": "4"}'
-- AND '{"node_name": "hostname2", "disallowed_shards": "4"}'
when (shardid = 85674054 and nodeid != 61)
then false
else true
end;
$$ LANGUAGE sql;
-- insert the new test rebalance strategy
INSERT INTO
pg_catalog.pg_dist_rebalance_strategy(
name,
default_strategy,
shard_cost_function,
node_capacity_function,
shard_allowed_on_node_function,
default_threshold,
minimum_threshold,
improvement_threshold
) VALUES (
'test_source_then_target',
false,
'citus_shard_cost_1',
'citus_node_capacity_1',
'background_rebalance_parallel.test_shard_allowed_on_node',
0,
0,
0
);
SELECT * FROM get_rebalance_table_shards_plan(rebalance_strategy := 'test_source_then_target');
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
table1_colg1 | 85674051 | 0 | localhost | 57637 | localhost | 57638
table1_colg2 | 85674052 | 0 | localhost | 57637 | localhost | 57639
table1_colg4 | 85674054 | 0 | localhost | 57638 | localhost | 57637
(3 rows)
SELECT citus_rebalance_start AS job_id from citus_rebalance_start(rebalance_strategy := 'test_source_then_target') \gset
-- check that the third move is blocked and depends on the first two
SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
17780 | 1021 | runnable | {61,62}
17780 | 1022 | runnable | {61,63}
17780 | 1023 | blocked | {62,61}
(3 rows)
SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
task_id | command | depends_on | command
---------------------------------------------------------------------
1023 | SELECT pg_catalog.citus_move_shard_placement(85674054,62,61,'auto') | 1021 | SELECT pg_catalog.citus_move_shard_placement(85674051,61,62,'auto')
1023 | SELECT pg_catalog.citus_move_shard_placement(85674054,62,61,'auto') | 1022 | SELECT pg_catalog.citus_move_shard_placement(85674052,61,63,'auto')
(2 rows)
SELECT citus_rebalance_stop();
citus_rebalance_stop
---------------------------------------------------------------------
(1 row)
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_source_then_target';
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_3_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
-- keep the rest of the tests inact that depends node/group ids -- keep the rest of the tests inact that depends node/group ids
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls; ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls;

View File

@ -3,7 +3,6 @@ SET search_path TO background_task_queue_monitor;
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 3536400; SET citus.next_shard_id TO 3536400;
SET client_min_messages TO ERROR;
-- reset sequence values -- reset sequence values
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 1450000; ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 1450000;
ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1450000; ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1450000;
@ -655,268 +654,11 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
1450016 | 1450024 | done 1450016 | 1450024 | done
(2 rows) (2 rows)
-- TEST11
-- verify that we do not allow parallel task executors involving a particular node
-- more than citus.max_background_task_executors_per_node
-- verify that we can change citus.max_background_task_executors_per_node on the fly
-- tests are done with dummy node ids
-- citus_task_wait calls are used to ensure consistent pg_dist_background_task query
-- output i.e. to avoid flakiness
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify changing max background task executors per node on the fly') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(2); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(2); $job$, ARRAY [3, 4]) RETURNING task_id AS task_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id3 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [1, 3]) RETURNING task_id AS task_id4 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [2, 4]) RETURNING task_id AS task_id5 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(7); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id6 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(6); $job$, ARRAY [1, 3]) RETURNING task_id AS task_id7 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(6); $job$, ARRAY [1, 4]) RETURNING task_id AS task_id8 \gset
COMMIT;
SELECT citus_task_wait(:task_id1, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id2, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id; -- show that at most 1 task per node is running
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
1450017 | 1450025 | running | {1,2}
1450017 | 1450026 | running | {3,4}
1450017 | 1450027 | runnable | {1,2}
1450017 | 1450028 | runnable | {1,3}
1450017 | 1450029 | runnable | {2,4}
1450017 | 1450030 | runnable | {1,2}
1450017 | 1450031 | runnable | {1,3}
1450017 | 1450032 | runnable | {1,4}
(8 rows)
SELECT citus_task_wait(:task_id1, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id2, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
-- increase max_background_task_executors_per_node on the fly
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT citus_task_wait(:task_id3, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id4, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id5, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id; -- show that at most 2 tasks per node are running
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
1450017 | 1450025 | done | {1,2}
1450017 | 1450026 | done | {3,4}
1450017 | 1450027 | running | {1,2}
1450017 | 1450028 | running | {1,3}
1450017 | 1450029 | running | {2,4}
1450017 | 1450030 | runnable | {1,2}
1450017 | 1450031 | runnable | {1,3}
1450017 | 1450032 | runnable | {1,4}
(8 rows)
-- increase to 3 max_background_task_executors_per_node on the fly
SELECT citus_task_wait(:task_id3, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id4, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id5, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 3;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT citus_task_wait(:task_id6, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id7, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id8, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id; -- show that at most 3 tasks per node are running
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
1450017 | 1450025 | done | {1,2}
1450017 | 1450026 | done | {3,4}
1450017 | 1450027 | done | {1,2}
1450017 | 1450028 | done | {1,3}
1450017 | 1450029 | done | {2,4}
1450017 | 1450030 | running | {1,2}
1450017 | 1450031 | running | {1,3}
1450017 | 1450032 | running | {1,4}
(8 rows)
ALTER SYSTEM RESET citus.max_background_task_executors_per_node;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
-- if pg_cancel_backend is called on one of the running task PIDs
-- task doesn't restart because it's not allowed anymore by the limit.
-- node with id 1 can be used only once, unless there are previously running tasks
SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset
SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process
pg_cancel_backend
---------------------------------------------------------------------
t
(1 row)
-- task goes to only runnable state, not running anymore.
SELECT citus_task_wait(:task_id6, desired_status => 'runnable');
citus_task_wait
---------------------------------------------------------------------
(1 row)
-- show that cancelled task hasn't restarted because limit doesn't allow it
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id;
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
1450017 | 1450025 | done | {1,2}
1450017 | 1450026 | done | {3,4}
1450017 | 1450027 | done | {1,2}
1450017 | 1450028 | done | {1,3}
1450017 | 1450029 | done | {2,4}
1450017 | 1450030 | runnable | {1,2}
1450017 | 1450031 | running | {1,3}
1450017 | 1450032 | running | {1,4}
(8 rows)
SELECT citus_task_wait(:task_id7, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id8, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id6, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
-- show that the 6th task has restarted only after both 6 and 7 are done
-- since we have a limit of 1 background task executor per node with id 1
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id;
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
1450017 | 1450025 | done | {1,2}
1450017 | 1450026 | done | {3,4}
1450017 | 1450027 | done | {1,2}
1450017 | 1450028 | done | {1,3}
1450017 | 1450029 | done | {2,4}
1450017 | 1450030 | running | {1,2}
1450017 | 1450031 | done | {1,3}
1450017 | 1450032 | done | {1,4}
(8 rows)
SELECT citus_job_cancel(:job_id1);
citus_job_cancel
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id1);
citus_job_wait
---------------------------------------------------------------------
(1 row)
ALTER SYSTEM RESET citus.max_background_task_executors_per_node;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
TRUNCATE pg_dist_background_job CASCADE; TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE; TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend; TRUNCATE pg_dist_background_task_depend;
DROP SCHEMA background_task_queue_monitor CASCADE; DROP SCHEMA background_task_queue_monitor CASCADE;
RESET client_min_messages;
ALTER SYSTEM RESET citus.background_task_queue_interval; ALTER SYSTEM RESET citus.background_task_queue_interval;
ALTER SYSTEM RESET citus.max_background_task_executors; ALTER SYSTEM RESET citus.max_background_task_executors;
SELECT pg_reload_conf(); SELECT pg_reload_conf();

View File

@ -1197,7 +1197,44 @@ SELECT * FROM multi_extension.print_extension_changes();
-- Test downgrade to 11.1-1 from 11.2-1 -- Test downgrade to 11.1-1 from 11.2-1
ALTER EXTENSION citus UPDATE TO '11.2-1'; ALTER EXTENSION citus UPDATE TO '11.2-1';
-- create a table with orphaned shards to see if orphaned shards will be dropped
-- and cleanup records will be created for them
SET citus.shard_replication_factor to 1;
CREATE TABLE table_with_orphaned_shards (a int);
SELECT create_distributed_table('table_with_orphaned_shards', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- show there are 4 placements
SELECT * FROM pg_dist_placement ORDER BY shardid;
placementid | shardid | shardstate | shardlength | groupid
---------------------------------------------------------------------
1 | 102008 | 1 | 0 | 0
2 | 102009 | 1 | 0 | 0
3 | 102010 | 1 | 0 | 0
4 | 102011 | 1 | 0 | 0
(4 rows)
-- mark two of them as orphaned
UPDATE pg_dist_placement SET shardstate = 4 WHERE shardid % 2 = 1;
ALTER EXTENSION citus UPDATE TO '11.1-1'; ALTER EXTENSION citus UPDATE TO '11.1-1';
-- show placements and cleanup records
SELECT * FROM pg_dist_placement ORDER BY shardid;
placementid | shardid | shardstate | shardlength | groupid
---------------------------------------------------------------------
1 | 102008 | 1 | 0 | 0
2 | 102009 | 4 | 0 | 0
3 | 102010 | 1 | 0 | 0
4 | 102011 | 4 | 0 | 0
(4 rows)
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
(0 rows)
-- Should be empty result since upgrade+downgrade should be a no-op -- Should be empty result since upgrade+downgrade should be a no-op
SELECT * FROM multi_extension.print_extension_changes(); SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object previous_object | current_object
@ -1206,6 +1243,21 @@ SELECT * FROM multi_extension.print_extension_changes();
-- Snapshot of state at 11.2-1 -- Snapshot of state at 11.2-1
ALTER EXTENSION citus UPDATE TO '11.2-1'; ALTER EXTENSION citus UPDATE TO '11.2-1';
-- verify that the placements are deleted and cleanup records are created
SELECT * FROM pg_dist_placement ORDER BY shardid;
placementid | shardid | shardstate | shardlength | groupid
---------------------------------------------------------------------
1 | 102008 | 1 | 0 | 0
3 | 102010 | 1 | 0 | 0
(2 rows)
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
1 | 0 | 1 | table_with_orphaned_shards_102009 | 0 | 0
2 | 0 | 1 | table_with_orphaned_shards_102011 | 0 | 0
(2 rows)
ALTER EXTENSION citus_columnar UPDATE TO '11.2-1'; ALTER EXTENSION citus_columnar UPDATE TO '11.2-1';
-- Make sure that we defined dependencies from all rel objects (tables, -- Make sure that we defined dependencies from all rel objects (tables,
-- indexes, sequences ..) to columnar table access method ... -- indexes, sequences ..) to columnar table access method ...
@ -1243,6 +1295,16 @@ SELECT COUNT(*)=5 FROM columnar_schema_members_pg_depend;
(1 row) (1 row)
DROP TABLE columnar_schema_members, columnar_schema_members_pg_depend; DROP TABLE columnar_schema_members, columnar_schema_members_pg_depend;
-- error out as cleanup records remain
ALTER EXTENSION citus UPDATE TO '11.0-4';
ERROR: pg_dist_cleanup is introduced in Citus 11.1
HINT: To downgrade Citus to an older version, you should first cleanup all the orphaned resources and make sure pg_dist_cleanup is empty, by executing CALL citus_cleanup_orphaned_resources();
CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE
-- cleanup
SET client_min_messages TO ERROR;
CALL citus_cleanup_orphaned_resources();
DROP TABLE table_with_orphaned_shards;
RESET client_min_messages;
SELECT * FROM multi_extension.print_extension_changes(); SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object previous_object | current_object
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1286,26 +1348,9 @@ SELECT * FROM multi_extension.print_extension_changes();
| type cluster_clock | type cluster_clock
(38 rows) (38 rows)
-- Test downgrade to 11.2-1 from 11.2-2 -- Test downgrade to 11.2-1 from 11.3-1
ALTER EXTENSION citus UPDATE TO '11.2-2';
ALTER EXTENSION citus UPDATE TO '11.2-1';
-- Should be empty result since upgrade+downgrade should be a no-op
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
(0 rows)
-- Snapshot of state at 11.2-2
ALTER EXTENSION citus UPDATE TO '11.2-2';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
| function worker_adjust_identity_column_seq_ranges(regclass) void
(1 row)
-- Test downgrade to 11.2-2 from 11.3-1
ALTER EXTENSION citus UPDATE TO '11.3-1'; ALTER EXTENSION citus UPDATE TO '11.3-1';
ALTER EXTENSION citus UPDATE TO '11.2-2'; ALTER EXTENSION citus UPDATE TO '11.2-1';
-- Should be empty result since upgrade+downgrade should be a no-op -- Should be empty result since upgrade+downgrade should be a no-op
SELECT * FROM multi_extension.print_extension_changes(); SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object previous_object | current_object
@ -1325,10 +1370,11 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_stat_tenants_local(boolean) SETOF record | function citus_stat_tenants_local(boolean) SETOF record
| function citus_stat_tenants_local_reset() void | function citus_stat_tenants_local_reset() void
| function citus_stat_tenants_reset() void | function citus_stat_tenants_reset() void
| function worker_adjust_identity_column_seq_ranges(regclass) void
| function worker_drop_all_shell_tables(boolean) | function worker_drop_all_shell_tables(boolean)
| view citus_stat_tenants | view citus_stat_tenants
| view citus_stat_tenants_local | view citus_stat_tenants_local
(11 rows) (12 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version -- show running version
@ -1750,4 +1796,4 @@ DROP TABLE version_mismatch_table;
DROP SCHEMA multi_extension; DROP SCHEMA multi_extension;
ERROR: cannot drop schema multi_extension because other objects depend on it ERROR: cannot drop schema multi_extension because other objects depend on it
DETAIL: function multi_extension.print_extension_changes() depends on schema multi_extension DETAIL: function multi_extension.print_extension_changes() depends on schema multi_extension
HINT: Use DROP ... CASCADE to drop the dependent objects too. HINT: Use DROP ... CASCADE to drop the dependent objects too.

View File

@ -9,8 +9,7 @@ FROM pg_attribute
WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass, WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass,
'pg_dist_rebalance_strategy'::regclass, 'pg_dist_rebalance_strategy'::regclass,
'pg_dist_partition'::regclass, 'pg_dist_partition'::regclass,
'pg_dist_object'::regclass, 'pg_dist_object'::regclass)
'pg_dist_background_task'::regclass)
ORDER BY attrelid, attname; ORDER BY attrelid, attname;
attrelid | attname | atthasmissing | attmissingval attrelid | attname | atthasmissing | attmissingval
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -497,7 +497,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid;
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t 5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t
(4 rows) (4 rows)
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text; SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid;
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f
@ -635,7 +635,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid;
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t 5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t
(4 rows) (4 rows)
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text; SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid;
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f
@ -872,7 +872,7 @@ WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass
ORDER BY ORDER BY
logicalrelid::text; logicalrelid;
logicalrelid | repmodel logicalrelid | repmodel
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_test_schema_1.mx_table_1 | s mx_test_schema_1.mx_table_1 | s
@ -888,7 +888,7 @@ WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass
ORDER BY ORDER BY
logicalrelid::text, shardid; logicalrelid, shardid;
logicalrelid | shardid | nodename | nodeport logicalrelid | shardid | nodename | nodeport
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_test_schema_1.mx_table_1 | 1310020 | localhost | 57637 mx_test_schema_1.mx_table_1 | 1310020 | localhost | 57637
@ -921,9 +921,7 @@ FROM
pg_dist_partition pg_dist_partition
WHERE WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass;
ORDER BY
logicalrelid::text;
logicalrelid | repmodel logicalrelid | repmodel
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_test_schema_1.mx_table_1 | s mx_test_schema_1.mx_table_1 | s
@ -939,7 +937,7 @@ WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass
ORDER BY ORDER BY
logicalrelid::text, shardid; logicalrelid, shardid;
logicalrelid | shardid | nodename | nodeport logicalrelid | shardid | nodename | nodeport
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_test_schema_1.mx_table_1 | 1310020 | localhost | 57637 mx_test_schema_1.mx_table_1 | 1310020 | localhost | 57637
@ -1083,7 +1081,7 @@ FROM
WHERE WHERE
logicalrelid = 'mx_colocation_test_1'::regclass logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass
ORDER BY logicalrelid::text; ORDER BY logicalrelid;
logicalrelid | colocationid logicalrelid | colocationid
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_colocation_test_1 | 10000 mx_colocation_test_1 | 10000
@ -1103,13 +1101,11 @@ FROM
pg_dist_partition pg_dist_partition
WHERE WHERE
logicalrelid = 'mx_colocation_test_1'::regclass logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass;
ORDER BY
logicalrelid::text;
logicalrelid | colocationid logicalrelid | colocationid
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_colocation_test_1 | 10001
mx_colocation_test_2 | 10001 mx_colocation_test_2 | 10001
mx_colocation_test_1 | 10001
(2 rows) (2 rows)
\c - - - :worker_1_port \c - - - :worker_1_port
@ -1119,13 +1115,11 @@ FROM
pg_dist_partition pg_dist_partition
WHERE WHERE
logicalrelid = 'mx_colocation_test_1'::regclass logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass;
ORDER BY
logicalrelid::text;
logicalrelid | colocationid logicalrelid | colocationid
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_colocation_test_1 | 10001
mx_colocation_test_2 | 10001 mx_colocation_test_2 | 10001
mx_colocation_test_1 | 10001
(2 rows) (2 rows)
\c - - - :master_port \c - - - :master_port

View File

@ -497,7 +497,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid;
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t 5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t
(4 rows) (4 rows)
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text; SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid;
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f
@ -635,7 +635,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid;
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t 5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t
(4 rows) (4 rows)
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text; SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid;
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f
@ -872,7 +872,7 @@ WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass
ORDER BY ORDER BY
logicalrelid::text; logicalrelid;
logicalrelid | repmodel logicalrelid | repmodel
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_test_schema_1.mx_table_1 | s mx_test_schema_1.mx_table_1 | s
@ -888,7 +888,7 @@ WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass
ORDER BY ORDER BY
logicalrelid::text, shardid; logicalrelid, shardid;
logicalrelid | shardid | nodename | nodeport logicalrelid | shardid | nodename | nodeport
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_test_schema_1.mx_table_1 | 1310020 | localhost | 57637 mx_test_schema_1.mx_table_1 | 1310020 | localhost | 57637
@ -921,9 +921,7 @@ FROM
pg_dist_partition pg_dist_partition
WHERE WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass;
ORDER BY
logicalrelid::text;
logicalrelid | repmodel logicalrelid | repmodel
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_test_schema_1.mx_table_1 | s mx_test_schema_1.mx_table_1 | s
@ -939,7 +937,7 @@ WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass
ORDER BY ORDER BY
logicalrelid::text, shardid; logicalrelid, shardid;
logicalrelid | shardid | nodename | nodeport logicalrelid | shardid | nodename | nodeport
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_test_schema_1.mx_table_1 | 1310020 | localhost | 57637 mx_test_schema_1.mx_table_1 | 1310020 | localhost | 57637
@ -1083,7 +1081,7 @@ FROM
WHERE WHERE
logicalrelid = 'mx_colocation_test_1'::regclass logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass
ORDER BY logicalrelid::text; ORDER BY logicalrelid;
logicalrelid | colocationid logicalrelid | colocationid
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_colocation_test_1 | 10000 mx_colocation_test_1 | 10000
@ -1103,13 +1101,11 @@ FROM
pg_dist_partition pg_dist_partition
WHERE WHERE
logicalrelid = 'mx_colocation_test_1'::regclass logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass;
ORDER BY
logicalrelid::text;
logicalrelid | colocationid logicalrelid | colocationid
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_colocation_test_1 | 10001
mx_colocation_test_2 | 10001 mx_colocation_test_2 | 10001
mx_colocation_test_1 | 10001
(2 rows) (2 rows)
\c - - - :worker_1_port \c - - - :worker_1_port
@ -1119,13 +1115,11 @@ FROM
pg_dist_partition pg_dist_partition
WHERE WHERE
logicalrelid = 'mx_colocation_test_1'::regclass logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass;
ORDER BY
logicalrelid::text;
logicalrelid | colocationid logicalrelid | colocationid
--------------------------------------------------------------------- ---------------------------------------------------------------------
mx_colocation_test_1 | 10001
mx_colocation_test_2 | 10001 mx_colocation_test_2 | 10001
mx_colocation_test_1 | 10001
(2 rows) (2 rows)
\c - - - :master_port \c - - - :master_port

View File

@ -39,29 +39,16 @@ SELECT nextval('pg_dist_colocationid_seq') = MAX(colocationid)+1 FROM pg_dist_co
t t
(1 row) (1 row)
-- while testing sequences on pg_dist_cleanup, they return null in pg upgrade schedule SELECT nextval('pg_dist_operationid_seq') = MAX(operation_id)+1 FROM pg_dist_cleanup;
-- but return a valid value in citus upgrade schedule ?column?
-- that's why we accept both NULL and MAX()+1 here
SELECT
CASE WHEN MAX(operation_id) IS NULL
THEN true
ELSE nextval('pg_dist_operationid_seq') = MAX(operation_id)+1
END AS check_operationid
FROM pg_dist_cleanup;
check_operationid
--------------------------------------------------------------------- ---------------------------------------------------------------------
t
(1 row) (1 row)
SELECT SELECT nextval('pg_dist_cleanup_recordid_seq') = MAX(record_id)+1 FROM pg_dist_cleanup;
CASE WHEN MAX(record_id) IS NULL ?column?
THEN true
ELSE nextval('pg_dist_cleanup_recordid_seq') = MAX(record_id)+1
END AS check_recordid
FROM pg_dist_cleanup;
check_recordid
--------------------------------------------------------------------- ---------------------------------------------------------------------
t
(1 row) (1 row)
SELECT nextval('pg_dist_background_job_job_id_seq') > COALESCE(MAX(job_id), 0) FROM pg_dist_background_job; SELECT nextval('pg_dist_background_job_job_id_seq') > COALESCE(MAX(job_id), 0) FROM pg_dist_background_job;

View File

@ -1,13 +0,0 @@
\set upgrade_test_old_citus_version `echo "$CITUS_OLD_VERSION"`
SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int > 11 OR
(substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int = 11 AND
substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 2)
AS upgrade_test_old_citus_version_gte_11_2;
upgrade_test_old_citus_version_gte_11_2
---------------------------------------------------------------------
t
(1 row)
\gset
\if :upgrade_test_old_citus_version_gte_11_2
\q

View File

@ -1,30 +0,0 @@
\set upgrade_test_old_citus_version `echo "$CITUS_OLD_VERSION"`
SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int > 11 OR
(substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int = 11 AND
substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 2)
AS upgrade_test_old_citus_version_gte_11_2;
upgrade_test_old_citus_version_gte_11_2
---------------------------------------------------------------------
f
(1 row)
\gset
\if :upgrade_test_old_citus_version_gte_11_2
\q
\endif
-- verify that the orphaned placement is deleted and cleanup record is created
SELECT COUNT(*) FROM pg_dist_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='table_with_orphaned_shards'::regclass);
count
---------------------------------------------------------------------
32
(1 row)
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
1 | 0 | 1 | table_with_orphaned_shards_980001 | 1 | 0
(1 row)
CALL citus_cleanup_orphaned_resources();
NOTICE: cleaned up 1 orphaned resources
DROP TABLE table_with_orphaned_shards;

View File

@ -1,13 +0,0 @@
\set upgrade_test_old_citus_version `echo "$CITUS_OLD_VERSION"`
SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int > 11 OR
(substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int = 11 AND
substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 2)
AS upgrade_test_old_citus_version_gte_11_2;
upgrade_test_old_citus_version_gte_11_2
---------------------------------------------------------------------
t
(1 row)
\gset
\if :upgrade_test_old_citus_version_gte_11_2
\q

View File

@ -1,36 +0,0 @@
\set upgrade_test_old_citus_version `echo "$CITUS_OLD_VERSION"`
SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int > 11 OR
(substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int = 11 AND
substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 2)
AS upgrade_test_old_citus_version_gte_11_2;
upgrade_test_old_citus_version_gte_11_2
---------------------------------------------------------------------
f
(1 row)
\gset
\if :upgrade_test_old_citus_version_gte_11_2
\q
\endif
-- create a table with orphaned shards to see if orphaned shards will be dropped
-- and cleanup records will be created for them
SET citus.next_shard_id TO 980000;
CREATE TABLE table_with_orphaned_shards (a int);
SELECT create_distributed_table('table_with_orphaned_shards', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- show all 32 placements are active
SELECT COUNT(*) FROM pg_dist_placement WHERE shardstate = 1 AND shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='table_with_orphaned_shards'::regclass);
count
---------------------------------------------------------------------
32
(1 row)
-- create an orphaned placement based on an existing one
INSERT INTO pg_dist_placement(placementid, shardid, shardstate, shardlength, groupid)
SELECT nextval('pg_dist_placement_placementid_seq'::regclass), shardid, 4, shardlength, 3-groupid
FROM pg_dist_placement
WHERE shardid = 980001;

View File

@ -1,22 +1,12 @@
-- /*
-- BACKGROUND_REBALANCE_PARALLEL Test to check if the background tasks scheduled by the background rebalancer
-- has the correct dependencies.
-- Test to check if the background tasks scheduled by the background rebalancer */
-- have the correct dependencies
--
-- Test to verify that we do not allow parallel rebalancer moves involving a
-- particular node (either as source or target) more than
-- citus.max_background_task_executors_per_node, and that we can change the GUC on
-- the fly, and that will affect the ongoing balance as it should
--
-- Test to verify that there's a hard dependency when a specific node is first being
-- used as a source for a move, and then later as a target.
--
CREATE SCHEMA background_rebalance_parallel; CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel; SET search_path TO background_rebalance_parallel;
SET citus.next_shard_id TO 85674000; SET citus.next_shard_id TO 85674000;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET client_min_messages TO ERROR; SET client_min_messages TO WARNING;
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777;
ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000;
@ -36,34 +26,34 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port);
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
SELECT pg_reload_conf(); SELECT pg_reload_conf();
-- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group /* Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group */
CREATE TABLE table1_colg1 (a int PRIMARY KEY); CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none'); SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4 , colocate_with => 'none');
CREATE TABLE table2_colg1 (b int PRIMARY KEY); CREATE TABLE table2_colg1 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); SELECT create_distributed_table('table2_colg1', 'b' , colocate_with => 'table1_colg1');
-- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group /* Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group */
CREATE TABLE table1_colg2 (a int PRIMARY KEY); CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none'); SELECT create_distributed_table('table1_colg2 ', 'a', shard_count => 4, colocate_with => 'none');
CREATE TABLE table2_colg2 (b int primary key); CREATE TABLE table2_colg2 (b int primary key);
SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); SELECT create_distributed_table('table2_colg2', 'b' , colocate_with => 'table1_colg2');
-- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group /* Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group */
CREATE TABLE table1_colg3 (a int PRIMARY KEY); CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none'); SELECT create_distributed_table('table1_colg3 ', 'a', shard_count => 4, colocate_with => 'none');
CREATE TABLE table2_colg3 (b int primary key); CREATE TABLE table2_colg3 (b int primary key);
SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); SELECT create_distributed_table('table2_colg3', 'b' , colocate_with => 'table1_colg3');
-- Add two new nodes so that we can rebalance /* Add two new node so that we can rebalance */
SELECT 1 FROM citus_add_node('localhost', :worker_3_port); SELECT 1 FROM citus_add_node('localhost', :worker_3_port);
SELECT 1 FROM citus_add_node('localhost', :worker_4_port); SELECT 1 FROM citus_add_node('localhost', :worker_4_port);
@ -73,12 +63,10 @@ SELECT * FROM citus_rebalance_start();
SELECT citus_rebalance_wait(); SELECT citus_rebalance_wait();
-- PART 1 /*Check that a move is dependent on
-- Test to check if the background tasks scheduled by the background rebalancer 1. any other move scheduled earlier in its colocation group.
-- have the correct dependencies 2. any other move scheduled earlier whose source node or target
node overlaps with the current moves nodes. */
-- Check that a move is dependent on
-- any other move scheduled earlier in its colocation group.
SELECT S.shardid, P.colocationid SELECT S.shardid, P.colocationid
FROM pg_dist_shard S, pg_dist_partition P FROM pg_dist_shard S, pg_dist_partition P
WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC; WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC;
@ -90,14 +78,14 @@ SELECT D.task_id,
FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id, D.depends_on ASC; FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id, D.depends_on ASC;
-- Check that if there is a reference table that needs to be synched to a node, /* Check that if there is a reference table that needs to be synched to a node,
-- any move without a dependency must depend on the move task for reference table. any move without a dependency must depend on the move task for reference table. */
SELECT 1 FROM citus_drain_node('localhost',:worker_4_port); SELECT 1 FROM citus_drain_node('localhost',:worker_4_port);
SELECT public.wait_for_resource_cleanup(); SELECT public.wait_for_resource_cleanup();
SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true); SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true);
-- Drain worker_3 so that we can move only one colocation group to worker_3 /* Drain worker_3 so that we can move only one colocation group to worker_3
-- to create an unbalance that would cause parallel rebalancing. to create an unbalance that would cause parallel rebalancing. */
SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); SELECT 1 FROM citus_drain_node('localhost',:worker_3_port);
SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true); SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true);
@ -107,7 +95,7 @@ CREATE TABLE ref_table(a int PRIMARY KEY);
SELECT create_reference_table('ref_table'); SELECT create_reference_table('ref_table');
-- Move all the shards of Colocation group 3 to worker_3. /* Move all the shards of Colocation group 3 to worker_3.*/
SELECT SELECT
master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes')
FROM FROM
@ -119,7 +107,7 @@ ORDER BY
CALL citus_cleanup_orphaned_resources(); CALL citus_cleanup_orphaned_resources();
-- Activate and new nodes so that we can rebalance. /* Activate and new nodes so that we can rebalance. */
SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); SELECT 1 FROM citus_activate_node('localhost', :worker_4_port);
SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true); SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true);
@ -140,265 +128,13 @@ SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id, D.depends_on ASC; FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id, D.depends_on ASC;
-- PART 2
-- Test to verify that we do not allow parallel rebalancer moves involving a
-- particular node (either as source or target)
-- more than citus.max_background_task_executors_per_node
-- and that we can change the GUC on the fly
-- citus_task_wait calls are used to ensure consistent pg_dist_background_task query
-- output i.e. to avoid flakiness
-- First let's restart the scenario
DROP SCHEMA background_rebalance_parallel CASCADE; DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE; TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
SELECT public.wait_for_resource_cleanup(); SELECT public.wait_for_resource_cleanup();
select citus_remove_node('localhost', :worker_2_port);
select citus_remove_node('localhost', :worker_3_port); select citus_remove_node('localhost', :worker_3_port);
select citus_remove_node('localhost', :worker_4_port); select citus_remove_node('localhost', :worker_4_port);
select citus_remove_node('localhost', :worker_5_port); select citus_remove_node('localhost', :worker_5_port);
select citus_remove_node('localhost', :worker_6_port); select citus_remove_node('localhost', :worker_6_port);
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
-- Create 8 tables in 4 colocation groups, and populate them
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 3, colocate_with => 'none');
INSERT INTO table1_colg1 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table2_colg1 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1');
INSERT INTO table2_colg1 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2', 'a', shard_count => 3, colocate_with => 'none');
INSERT INTO table1_colg2 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table2_colg2 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2');
INSERT INTO table2_colg2 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3', 'a', shard_count => 3, colocate_with => 'none');
INSERT INTO table1_colg3 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table2_colg3 (b int primary key);
SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3');
INSERT INTO table2_colg3 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg4 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg4', 'a', shard_count => 3, colocate_with => 'none');
INSERT INTO table1_colg4 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table2_colg4 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg4', 'b', colocate_with => 'table1_colg4');
INSERT INTO table2_colg4 SELECT i FROM generate_series(0, 100)i;
-- Add nodes so that we can rebalance
SELECT citus_add_node('localhost', :worker_2_port);
SELECT citus_add_node('localhost', :worker_3_port);
SELECT citus_rebalance_start AS job_id from citus_rebalance_start() \gset
-- see dependent tasks to understand which tasks remain runnable because of
-- citus.max_background_task_executors_per_node
-- and which tasks are actually blocked from colocation group dependencies
SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
-- default citus.max_background_task_executors_per_node is 1
-- show that first exactly one task per node is running
-- among the tasks that are not blocked
SELECT citus_task_wait(1013, desired_status => 'running');
SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
-- increase citus.max_background_task_executors_per_node
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2;
SELECT pg_reload_conf();
SELECT citus_task_wait(1015, desired_status => 'running');
SELECT citus_task_wait(1013, desired_status => 'done');
-- show that at most 2 tasks per node are running
-- among the tasks that are not blocked
SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
-- decrease to default (1)
ALTER SYSTEM RESET citus.max_background_task_executors_per_node;
SELECT pg_reload_conf();
SELECT citus_task_wait(1015, desired_status => 'done');
SELECT citus_task_wait(1014, desired_status => 'done');
SELECT citus_task_wait(1016, desired_status => 'running');
-- show that exactly one task per node is running
-- among the tasks that are not blocked
SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
SELECT citus_rebalance_stop();
-- PART 3
-- Test to verify that there's a hard dependency when A specific node is first being used as a
-- source for a move, and then later as a target.
-- First let's restart the scenario
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
SELECT public.wait_for_resource_cleanup();
select citus_remove_node('localhost', :worker_1_port);
select citus_remove_node('localhost', :worker_2_port);
select citus_remove_node('localhost', :worker_3_port);
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
SET citus.next_shard_id TO 85674051;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 61;
-- add the first node
-- nodeid here is 61
select citus_add_node('localhost', :worker_1_port);
-- create, populate and distribute 6 tables, each with 1 shard, none colocated with each other
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 1, colocate_with => 'none');
INSERT INTO table1_colg1 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2', 'a', shard_count => 1, colocate_with => 'none');
INSERT INTO table1_colg2 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3', 'a', shard_count => 1, colocate_with => 'none');
INSERT INTO table1_colg3 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg4 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg4', 'a', shard_count => 1, colocate_with => 'none');
INSERT INTO table1_colg4 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg5 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg5', 'a', shard_count => 1, colocate_with => 'none');
INSERT INTO table1_colg5 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg6 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg6', 'a', shard_count => 1, colocate_with => 'none');
INSERT INTO table1_colg6 SELECT i FROM generate_series(0, 100)i;
-- add two other nodes
-- nodeid here is 62
select citus_add_node('localhost', :worker_2_port);
-- nodeid here is 63
select citus_add_node('localhost', :worker_3_port);
CREATE OR REPLACE FUNCTION shard_placement_rebalance_array(
worker_node_list json[],
shard_placement_list json[],
threshold float4 DEFAULT 0,
max_shard_moves int DEFAULT 1000000,
drain_only bool DEFAULT false,
improvement_threshold float4 DEFAULT 0.5
)
RETURNS json[]
AS 'citus'
LANGUAGE C STRICT VOLATILE;
-- we are simulating the following from shard_rebalancer_unit.sql
-- the following steps are all according to this scenario
-- where the third move should be dependent of the first two
-- because the third move's target is the source of the first two
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "hostname1", "disallowed_shards": "1,2,3,5,6"}',
'{"node_name": "hostname2", "disallowed_shards": "4"}',
'{"node_name": "hostname3", "disallowed_shards": "4"}'
]::json[],
ARRAY['{"shardid":1, "nodename":"hostname1"}',
'{"shardid":2, "nodename":"hostname1"}',
'{"shardid":3, "nodename":"hostname2"}',
'{"shardid":4, "nodename":"hostname2"}',
'{"shardid":5, "nodename":"hostname3"}',
'{"shardid":6, "nodename":"hostname3"}'
]::json[]
));
-- manually balance the cluster such that we have
-- a balanced cluster like above with 1,2,3,4,5,6 and hostname1/2/3
-- shardid 85674051 (1) nodeid 61 (hostname1)
-- shardid 85674052 (2) nodeid 61 (hostname1)
-- shardid 85674053 (3) nodeid 62 (hostname2)
-- shardid 85674054 (4) nodeid 62 (hostname2)
-- shardid 85674055 (5) nodeid 63 (hostname3)
-- shardid 85674056 (6) nodeid 63 (hostname3)
SELECT pg_catalog.citus_move_shard_placement(85674053,61,62,'auto');
SELECT pg_catalog.citus_move_shard_placement(85674054,61,62,'auto');
SELECT pg_catalog.citus_move_shard_placement(85674055,61,63,'auto');
SELECT pg_catalog.citus_move_shard_placement(85674056,61,63,'auto');
-- now create another rebalance strategy in order to simulate moves
-- which use as target a node that has been previously used as source
CREATE OR REPLACE FUNCTION test_shard_allowed_on_node(shardid bigint, nodeid int)
RETURNS boolean AS
$$
-- analogous to '{"node_name": "hostname1", "disallowed_shards": "1,2,3,5,6"}'
select case when (shardid != 85674054 and nodeid = 61)
then false
-- analogous to '{"node_name": "hostname2", "disallowed_shards": "4"}'
-- AND '{"node_name": "hostname2", "disallowed_shards": "4"}'
when (shardid = 85674054 and nodeid != 61)
then false
else true
end;
$$ LANGUAGE sql;
-- insert the new test rebalance strategy
INSERT INTO
pg_catalog.pg_dist_rebalance_strategy(
name,
default_strategy,
shard_cost_function,
node_capacity_function,
shard_allowed_on_node_function,
default_threshold,
minimum_threshold,
improvement_threshold
) VALUES (
'test_source_then_target',
false,
'citus_shard_cost_1',
'citus_node_capacity_1',
'background_rebalance_parallel.test_shard_allowed_on_node',
0,
0,
0
);
SELECT * FROM get_rebalance_table_shards_plan(rebalance_strategy := 'test_source_then_target');
SELECT citus_rebalance_start AS job_id from citus_rebalance_start(rebalance_strategy := 'test_source_then_target') \gset
-- check that the third move is blocked and depends on the first two
SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
SELECT citus_rebalance_stop();
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_source_then_target';
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
SELECT public.wait_for_resource_cleanup();
select citus_remove_node('localhost', :worker_3_port);
-- keep the rest of the tests inact that depends node/group ids -- keep the rest of the tests inact that depends node/group ids
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls; ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls;

View File

@ -3,7 +3,6 @@ SET search_path TO background_task_queue_monitor;
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 3536400; SET citus.next_shard_id TO 3536400;
SET client_min_messages TO ERROR;
-- reset sequence values -- reset sequence values
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 1450000; ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 1450000;
@ -280,106 +279,11 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2) WHERE task_id IN (:task_id1, :task_id2)
ORDER BY job_id, task_id; -- show that task is cancelled ORDER BY job_id, task_id; -- show that task is cancelled
-- TEST11
-- verify that we do not allow parallel task executors involving a particular node
-- more than citus.max_background_task_executors_per_node
-- verify that we can change citus.max_background_task_executors_per_node on the fly
-- tests are done with dummy node ids
-- citus_task_wait calls are used to ensure consistent pg_dist_background_task query
-- output i.e. to avoid flakiness
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify changing max background task executors per node on the fly') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(2); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(2); $job$, ARRAY [3, 4]) RETURNING task_id AS task_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id3 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [1, 3]) RETURNING task_id AS task_id4 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [2, 4]) RETURNING task_id AS task_id5 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(7); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id6 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(6); $job$, ARRAY [1, 3]) RETURNING task_id AS task_id7 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(6); $job$, ARRAY [1, 4]) RETURNING task_id AS task_id8 \gset
COMMIT;
SELECT citus_task_wait(:task_id1, desired_status => 'running');
SELECT citus_task_wait(:task_id2, desired_status => 'running');
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id; -- show that at most 1 task per node is running
SELECT citus_task_wait(:task_id1, desired_status => 'done');
SELECT citus_task_wait(:task_id2, desired_status => 'done');
-- increase max_background_task_executors_per_node on the fly
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2;
SELECT pg_reload_conf();
SELECT citus_task_wait(:task_id3, desired_status => 'running');
SELECT citus_task_wait(:task_id4, desired_status => 'running');
SELECT citus_task_wait(:task_id5, desired_status => 'running');
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id; -- show that at most 2 tasks per node are running
-- increase to 3 max_background_task_executors_per_node on the fly
SELECT citus_task_wait(:task_id3, desired_status => 'done');
SELECT citus_task_wait(:task_id4, desired_status => 'done');
SELECT citus_task_wait(:task_id5, desired_status => 'done');
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 3;
SELECT pg_reload_conf();
SELECT citus_task_wait(:task_id6, desired_status => 'running');
SELECT citus_task_wait(:task_id7, desired_status => 'running');
SELECT citus_task_wait(:task_id8, desired_status => 'running');
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id; -- show that at most 3 tasks per node are running
ALTER SYSTEM RESET citus.max_background_task_executors_per_node;
SELECT pg_reload_conf();
-- if pg_cancel_backend is called on one of the running task PIDs
-- task doesn't restart because it's not allowed anymore by the limit.
-- node with id 1 can be used only once, unless there are previously running tasks
SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset
SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process
-- task goes to only runnable state, not running anymore.
SELECT citus_task_wait(:task_id6, desired_status => 'runnable');
-- show that cancelled task hasn't restarted because limit doesn't allow it
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id;
SELECT citus_task_wait(:task_id7, desired_status => 'done');
SELECT citus_task_wait(:task_id8, desired_status => 'done');
SELECT citus_task_wait(:task_id6, desired_status => 'running');
-- show that the 6th task has restarted only after both 6 and 7 are done
-- since we have a limit of 1 background task executor per node with id 1
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id;
SELECT citus_job_cancel(:job_id1);
SELECT citus_job_wait(:job_id1);
ALTER SYSTEM RESET citus.max_background_task_executors_per_node;
SELECT pg_reload_conf();
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
TRUNCATE pg_dist_background_job CASCADE; TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE; TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend; TRUNCATE pg_dist_background_task_depend;
DROP SCHEMA background_task_queue_monitor CASCADE; DROP SCHEMA background_task_queue_monitor CASCADE;
RESET client_min_messages;
ALTER SYSTEM RESET citus.background_task_queue_interval; ALTER SYSTEM RESET citus.background_task_queue_interval;
ALTER SYSTEM RESET citus.max_background_task_executors; ALTER SYSTEM RESET citus.max_background_task_executors;

View File

@ -529,13 +529,33 @@ SELECT * FROM multi_extension.print_extension_changes();
-- Test downgrade to 11.1-1 from 11.2-1 -- Test downgrade to 11.1-1 from 11.2-1
ALTER EXTENSION citus UPDATE TO '11.2-1'; ALTER EXTENSION citus UPDATE TO '11.2-1';
-- create a table with orphaned shards to see if orphaned shards will be dropped
-- and cleanup records will be created for them
SET citus.shard_replication_factor to 1;
CREATE TABLE table_with_orphaned_shards (a int);
SELECT create_distributed_table('table_with_orphaned_shards', 'a');
-- show there are 4 placements
SELECT * FROM pg_dist_placement ORDER BY shardid;
-- mark two of them as orphaned
UPDATE pg_dist_placement SET shardstate = 4 WHERE shardid % 2 = 1;
ALTER EXTENSION citus UPDATE TO '11.1-1'; ALTER EXTENSION citus UPDATE TO '11.1-1';
-- show placements and cleanup records
SELECT * FROM pg_dist_placement ORDER BY shardid;
SELECT * FROM pg_dist_cleanup;
-- Should be empty result since upgrade+downgrade should be a no-op -- Should be empty result since upgrade+downgrade should be a no-op
SELECT * FROM multi_extension.print_extension_changes(); SELECT * FROM multi_extension.print_extension_changes();
-- Snapshot of state at 11.2-1 -- Snapshot of state at 11.2-1
ALTER EXTENSION citus UPDATE TO '11.2-1'; ALTER EXTENSION citus UPDATE TO '11.2-1';
-- verify that the placements are deleted and cleanup records are created
SELECT * FROM pg_dist_placement ORDER BY shardid;
SELECT * FROM pg_dist_cleanup;
ALTER EXTENSION citus_columnar UPDATE TO '11.2-1'; ALTER EXTENSION citus_columnar UPDATE TO '11.2-1';
-- Make sure that we defined dependencies from all rel objects (tables, -- Make sure that we defined dependencies from all rel objects (tables,
@ -569,21 +589,20 @@ SELECT COUNT(*)=5 FROM columnar_schema_members_pg_depend;
DROP TABLE columnar_schema_members, columnar_schema_members_pg_depend; DROP TABLE columnar_schema_members, columnar_schema_members_pg_depend;
-- error out as cleanup records remain
ALTER EXTENSION citus UPDATE TO '11.0-4';
-- cleanup
SET client_min_messages TO ERROR;
CALL citus_cleanup_orphaned_resources();
DROP TABLE table_with_orphaned_shards;
RESET client_min_messages;
SELECT * FROM multi_extension.print_extension_changes(); SELECT * FROM multi_extension.print_extension_changes();
-- Test downgrade to 11.2-1 from 11.2-2 -- Test downgrade to 11.2-1 from 11.3-1
ALTER EXTENSION citus UPDATE TO '11.2-2';
ALTER EXTENSION citus UPDATE TO '11.2-1';
-- Should be empty result since upgrade+downgrade should be a no-op
SELECT * FROM multi_extension.print_extension_changes();
-- Snapshot of state at 11.2-2
ALTER EXTENSION citus UPDATE TO '11.2-2';
SELECT * FROM multi_extension.print_extension_changes();
-- Test downgrade to 11.2-2 from 11.3-1
ALTER EXTENSION citus UPDATE TO '11.3-1'; ALTER EXTENSION citus UPDATE TO '11.3-1';
ALTER EXTENSION citus UPDATE TO '11.2-2'; ALTER EXTENSION citus UPDATE TO '11.2-1';
-- Should be empty result since upgrade+downgrade should be a no-op -- Should be empty result since upgrade+downgrade should be a no-op
SELECT * FROM multi_extension.print_extension_changes(); SELECT * FROM multi_extension.print_extension_changes();

View File

@ -10,6 +10,5 @@ FROM pg_attribute
WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass, WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass,
'pg_dist_rebalance_strategy'::regclass, 'pg_dist_rebalance_strategy'::regclass,
'pg_dist_partition'::regclass, 'pg_dist_partition'::regclass,
'pg_dist_object'::regclass, 'pg_dist_object'::regclass)
'pg_dist_background_task'::regclass)
ORDER BY attrelid, attname; ORDER BY attrelid, attname;

View File

@ -111,7 +111,7 @@ SELECT nodeid, hasmetadata FROM pg_dist_node WHERE nodename='localhost' AND node
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT * FROM pg_dist_local_group; SELECT * FROM pg_dist_local_group;
SELECT * FROM pg_dist_node ORDER BY nodeid; SELECT * FROM pg_dist_node ORDER BY nodeid;
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text; SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid;
SELECT * FROM pg_dist_shard WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY shardid; SELECT * FROM pg_dist_shard WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY shardid;
SELECT * FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid::text LIKE 'mx_testing_schema%') ORDER BY shardid, nodename, nodeport; SELECT * FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid::text LIKE 'mx_testing_schema%') ORDER BY shardid, nodename, nodeport;
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_testing_schema.mx_test_table'::regclass; SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_testing_schema.mx_test_table'::regclass;
@ -161,7 +161,7 @@ SELECT 1 FROM citus_activate_node('localhost', :worker_1_port);
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT * FROM pg_dist_local_group; SELECT * FROM pg_dist_local_group;
SELECT * FROM pg_dist_node ORDER BY nodeid; SELECT * FROM pg_dist_node ORDER BY nodeid;
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text; SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid;
SELECT * FROM pg_dist_shard WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY shardid; SELECT * FROM pg_dist_shard WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY shardid;
SELECT * FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid::text LIKE 'mx_testing_schema%') ORDER BY shardid, nodename, nodeport; SELECT * FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid::text LIKE 'mx_testing_schema%') ORDER BY shardid, nodename, nodeport;
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_testing_schema.mx_test_table'::regclass; SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_testing_schema.mx_test_table'::regclass;
@ -252,7 +252,7 @@ WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass
ORDER BY ORDER BY
logicalrelid::text; logicalrelid;
-- See the shards and placements of the mx tables -- See the shards and placements of the mx tables
SELECT SELECT
@ -263,7 +263,7 @@ WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass
ORDER BY ORDER BY
logicalrelid::text, shardid; logicalrelid, shardid;
-- Check that metadata of MX tables exist on the metadata worker -- Check that metadata of MX tables exist on the metadata worker
\c - - - :worker_1_port \c - - - :worker_1_port
@ -278,9 +278,7 @@ FROM
pg_dist_partition pg_dist_partition
WHERE WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass;
ORDER BY
logicalrelid::text;
-- Check that shard and placement data are created -- Check that shard and placement data are created
SELECT SELECT
@ -291,7 +289,7 @@ WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass
ORDER BY ORDER BY
logicalrelid::text, shardid; logicalrelid, shardid;
-- Check that metadata of MX tables don't exist on the non-metadata worker -- Check that metadata of MX tables don't exist on the non-metadata worker
\c - - - :worker_2_port \c - - - :worker_2_port
@ -383,7 +381,7 @@ FROM
WHERE WHERE
logicalrelid = 'mx_colocation_test_1'::regclass logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass
ORDER BY logicalrelid::text; ORDER BY logicalrelid;
-- Update colocation and see the changes on the master and the worker -- Update colocation and see the changes on the master and the worker
SELECT update_distributed_table_colocation('mx_colocation_test_1', colocate_with => 'mx_colocation_test_2'); SELECT update_distributed_table_colocation('mx_colocation_test_1', colocate_with => 'mx_colocation_test_2');
@ -393,9 +391,7 @@ FROM
pg_dist_partition pg_dist_partition
WHERE WHERE
logicalrelid = 'mx_colocation_test_1'::regclass logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass;
ORDER BY
logicalrelid::text;
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT SELECT
logicalrelid, colocationid logicalrelid, colocationid
@ -403,9 +399,7 @@ FROM
pg_dist_partition pg_dist_partition
WHERE WHERE
logicalrelid = 'mx_colocation_test_1'::regclass logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass;
ORDER BY
logicalrelid::text;
\c - - - :master_port \c - - - :master_port

View File

@ -8,21 +8,8 @@ SELECT nextval('pg_dist_placement_placementid_seq') = MAX(placementid)+1 FROM pg
SELECT nextval('pg_dist_groupid_seq') = MAX(groupid)+1 FROM pg_dist_node; SELECT nextval('pg_dist_groupid_seq') = MAX(groupid)+1 FROM pg_dist_node;
SELECT nextval('pg_dist_node_nodeid_seq') = MAX(nodeid)+1 FROM pg_dist_node; SELECT nextval('pg_dist_node_nodeid_seq') = MAX(nodeid)+1 FROM pg_dist_node;
SELECT nextval('pg_dist_colocationid_seq') = MAX(colocationid)+1 FROM pg_dist_colocation; SELECT nextval('pg_dist_colocationid_seq') = MAX(colocationid)+1 FROM pg_dist_colocation;
-- while testing sequences on pg_dist_cleanup, they return null in pg upgrade schedule SELECT nextval('pg_dist_operationid_seq') = MAX(operation_id)+1 FROM pg_dist_cleanup;
-- but return a valid value in citus upgrade schedule SELECT nextval('pg_dist_cleanup_recordid_seq') = MAX(record_id)+1 FROM pg_dist_cleanup;
-- that's why we accept both NULL and MAX()+1 here
SELECT
CASE WHEN MAX(operation_id) IS NULL
THEN true
ELSE nextval('pg_dist_operationid_seq') = MAX(operation_id)+1
END AS check_operationid
FROM pg_dist_cleanup;
SELECT
CASE WHEN MAX(record_id) IS NULL
THEN true
ELSE nextval('pg_dist_cleanup_recordid_seq') = MAX(record_id)+1
END AS check_recordid
FROM pg_dist_cleanup;
SELECT nextval('pg_dist_background_job_job_id_seq') > COALESCE(MAX(job_id), 0) FROM pg_dist_background_job; SELECT nextval('pg_dist_background_job_job_id_seq') > COALESCE(MAX(job_id), 0) FROM pg_dist_background_job;
SELECT nextval('pg_dist_background_task_task_id_seq') > COALESCE(MAX(task_id), 0) FROM pg_dist_background_task; SELECT nextval('pg_dist_background_task_task_id_seq') > COALESCE(MAX(task_id), 0) FROM pg_dist_background_task;
SELECT last_value > 0 FROM pg_dist_clock_logical_seq; SELECT last_value > 0 FROM pg_dist_clock_logical_seq;

View File

@ -1,15 +0,0 @@
\set upgrade_test_old_citus_version `echo "$CITUS_OLD_VERSION"`
SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int > 11 OR
(substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int = 11 AND
substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 2)
AS upgrade_test_old_citus_version_gte_11_2;
\gset
\if :upgrade_test_old_citus_version_gte_11_2
\q
\endif
-- verify that the orphaned placement is deleted and cleanup record is created
SELECT COUNT(*) FROM pg_dist_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='table_with_orphaned_shards'::regclass);
SELECT * FROM pg_dist_cleanup;
CALL citus_cleanup_orphaned_resources();
DROP TABLE table_with_orphaned_shards;

View File

@ -1,22 +0,0 @@
\set upgrade_test_old_citus_version `echo "$CITUS_OLD_VERSION"`
SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int > 11 OR
(substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int = 11 AND
substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 2)
AS upgrade_test_old_citus_version_gte_11_2;
\gset
\if :upgrade_test_old_citus_version_gte_11_2
\q
\endif
-- create a table with orphaned shards to see if orphaned shards will be dropped
-- and cleanup records will be created for them
SET citus.next_shard_id TO 980000;
CREATE TABLE table_with_orphaned_shards (a int);
SELECT create_distributed_table('table_with_orphaned_shards', 'a');
-- show all 32 placements are active
SELECT COUNT(*) FROM pg_dist_placement WHERE shardstate = 1 AND shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='table_with_orphaned_shards'::regclass);
-- create an orphaned placement based on an existing one
INSERT INTO pg_dist_placement(placementid, shardid, shardstate, shardlength, groupid)
SELECT nextval('pg_dist_placement_placementid_seq'::regclass), shardid, 4, shardlength, 3-groupid
FROM pg_dist_placement
WHERE shardid = 980001;