Merge branch 'citusdata:main' into sqlancer-test-gha

pull/6697/head
Gokhan Gulbiz 2023-04-12 10:36:28 +03:00 committed by GitHub
commit 51c7e7ef7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1652 additions and 158 deletions

View File

@ -6,7 +6,7 @@ orbs:
parameters:
image_suffix:
type: string
default: '-v087ecd7'
default: '-v3417e8d'
pg13_version:
type: string
default: '13.10'
@ -490,10 +490,6 @@ jobs:
pg_major: << parameters.pg_major >>
- configure
- enable_core
- run:
name: 'Install DBI.pm'
command: |
apt-get update && apt-get install libdbi-perl && apt-get install libdbd-pg-perl
- run:
name: 'Run Test'
command: |

3
.gitignore vendored
View File

@ -38,6 +38,9 @@ lib*.pc
/Makefile.global
/src/Makefile.custom
/compile_commands.json
/src/backend/distributed/cdc/build-cdc-*/*
/src/test/cdc/tmp_check/*
# temporary files vim creates
*.swp

19
configure vendored
View File

@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.69 for Citus 11.3devel.
# Generated by GNU Autoconf 2.69 for Citus 12.0devel.
#
#
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
@ -579,8 +579,8 @@ MAKEFLAGS=
# Identity of this package.
PACKAGE_NAME='Citus'
PACKAGE_TARNAME='citus'
PACKAGE_VERSION='11.3devel'
PACKAGE_STRING='Citus 11.3devel'
PACKAGE_VERSION='12.0devel'
PACKAGE_STRING='Citus 12.0devel'
PACKAGE_BUGREPORT=''
PACKAGE_URL=''
@ -1262,7 +1262,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
\`configure' configures Citus 11.3devel to adapt to many kinds of systems.
\`configure' configures Citus 12.0devel to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]...
@ -1324,7 +1324,7 @@ fi
if test -n "$ac_init_help"; then
case $ac_init_help in
short | recursive ) echo "Configuration of Citus 11.3devel:";;
short | recursive ) echo "Configuration of Citus 12.0devel:";;
esac
cat <<\_ACEOF
@ -1429,7 +1429,7 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
Citus configure 11.3devel
Citus configure 12.0devel
generated by GNU Autoconf 2.69
Copyright (C) 2012 Free Software Foundation, Inc.
@ -1912,7 +1912,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.
It was created by Citus $as_me 11.3devel, which was
It was created by Citus $as_me 12.0devel, which was
generated by GNU Autoconf 2.69. Invocation command line was
$ $0 $@
@ -5393,7 +5393,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
This file was extended by Citus $as_me 11.3devel, which was
This file was extended by Citus $as_me 12.0devel, which was
generated by GNU Autoconf 2.69. Invocation command line was
CONFIG_FILES = $CONFIG_FILES
@ -5455,7 +5455,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\
Citus config.status 11.3devel
Citus config.status 12.0devel
configured by $0, generated by GNU Autoconf 2.69,
with options \\"\$ac_cs_config\\"
@ -6160,4 +6160,3 @@ if test -n "$ac_unrecognized_opts" && test "$enable_option_checking" != no; then
{ $as_echo "$as_me:${as_lineno-$LINENO}: WARNING: unrecognized options: $ac_unrecognized_opts" >&5
$as_echo "$as_me: WARNING: unrecognized options: $ac_unrecognized_opts" >&2;}
fi

View File

@ -5,7 +5,7 @@
# everyone needing autoconf installed, the resulting files are checked
# into the SCM.
AC_INIT([Citus], [11.3devel])
AC_INIT([Citus], [12.0devel])
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
# we'll need sed and awk for some of the version commands

View File

@ -37,8 +37,7 @@ OBJS += \
all: cdc
cdc:
echo "running cdc make"
$(MAKE) DECODER=pgoutput -C cdc all
$(MAKE) -C cdc all
NO_PGXS = 1
@ -85,12 +84,13 @@ ifneq (,$(SQL_Po_files))
include $(SQL_Po_files)
endif
.PHONY: clean-full install install-downgrades install-all
.PHONY: clean-full install install-downgrades install-all install-cdc clean-cdc
clean: clean-cdc
clean-cdc:
$(MAKE) DECODER=pgoutput -C cdc clean
$(MAKE) -C cdc clean
cleanup-before-install:
rm -f $(DESTDIR)$(datadir)/$(datamoduledir)/citus.control
@ -99,7 +99,7 @@ cleanup-before-install:
install: cleanup-before-install install-cdc
install-cdc:
$(MAKE) DECODER=pgoutput -C cdc install
$(MAKE) -C cdc install
# install and install-downgrades should be run sequentially
install-all: install

View File

@ -1,26 +1,45 @@
ifndef DECODER
DECODER = pgoutput
endif
MODULE_big = citus_$(DECODER)
citus_subdir = src/backend/distributed/cdc
citus_top_builddir = ../../../..
citus_decoders_dir = $(DESTDIR)$(pkglibdir)/citus_decoders
OBJS += cdc_decoder.o cdc_decoder_utils.o
include $(citus_top_builddir)/Makefile.global
override CFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include
override CPPFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include
citus_subdir = src/backend/distributed/cdc
SRC_DIR = $(citus_abs_top_srcdir)/$(citus_subdir)
install: install-cdc
#List of supported based decoders. Add new decoders here.
cdc_base_decoders :=pgoutput wal2json
clean: clean-cdc
all: build-cdc-decoders
install-cdc:
mkdir -p '$(citus_decoders_dir)'
$(INSTALL_SHLIB) citus_$(DECODER).so '$(citus_decoders_dir)/$(DECODER).so'
copy-decoder-files-to-build-dir:
$(eval DECODER_BUILD_DIR=build-cdc-$(DECODER))
mkdir -p $(DECODER_BUILD_DIR)
@for file in $(SRC_DIR)/*.c $(SRC_DIR)/*.h; do \
if [ -f $$file ]; then \
if [ -f $(DECODER_BUILD_DIR)/$$(basename $$file) ]; then \
if ! diff -q $$file $(DECODER_BUILD_DIR)/$$(basename $$file); then \
cp $$file $(DECODER_BUILD_DIR)/$$(basename $$file); \
fi \
else \
cp $$file $(DECODER_BUILD_DIR)/$$(basename $$file); \
fi \
fi \
done
cp $(SRC_DIR)/Makefile.decoder $(DECODER_BUILD_DIR)/Makefile
build-cdc-decoders:
$(foreach base_decoder,$(cdc_base_decoders),$(MAKE) DECODER=$(base_decoder) build-cdc-decoder;)
install-cdc-decoders:
$(foreach base_decoder,$(cdc_base_decoders),$(MAKE) DECODER=$(base_decoder) -C build-cdc-$(base_decoder) install;)
clean-cdc-decoders:
$(foreach base_decoder,$(cdc_base_decoders),rm -rf build-cdc-$(base_decoder);)
build-cdc-decoder:
$(MAKE) DECODER=$(DECODER) copy-decoder-files-to-build-dir
$(MAKE) DECODER=$(DECODER) -C build-cdc-$(DECODER)
install: install-cdc-decoders
clean: clean-cdc-decoders
clean-cdc:
rm -f '$(DESTDIR)$(datadir)/$(datamoduledir)/citus_decoders/$(DECODER).so'

View File

@ -0,0 +1,24 @@
MODULE_big = citus_$(DECODER)
citus_decoders_dir = $(DESTDIR)$(pkglibdir)/citus_decoders
citus_top_builddir = ../../../../..
citus_subdir = src/backend/distributed/cdc/cdc_$(DECODER)
OBJS += cdc_decoder.o cdc_decoder_utils.o
include $(citus_top_builddir)/Makefile.global
override CFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include
override CPPFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include
install: install-cdc
clean: clean-cdc
install-cdc:
mkdir -p '$(citus_decoders_dir)'
$(INSTALL_SHLIB) citus_$(DECODER).so '$(citus_decoders_dir)/$(DECODER).so'
clean-cdc:
rm -f '$(DESTDIR)$(datadir)/$(datamoduledir)/citus_decoders/$(DECODER).so'

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '11.3-1'
default_version = '12.0-1'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -32,6 +32,7 @@
#include "catalog/pg_type.h"
#include "commands/extension.h"
#include "commands/sequence.h"
#include "distributed/background_jobs.h"
#include "distributed/colocation_utils.h"
#include "distributed/connection_management.h"
#include "distributed/citus_nodes.h"
@ -57,7 +58,9 @@
#include "distributed/relay_utility.h"
#include "distributed/resource_lock.h"
#include "distributed/remote_commands.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/tuplestore.h"
#include "distributed/utils/array_type.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "distributed/version_compat.h"
@ -777,7 +780,6 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
{
partitionedShardNames = lappend(partitionedShardNames, quotedShardName);
}
/* for non-partitioned tables, we will use Postgres' size functions */
else
{
@ -2816,7 +2818,8 @@ CreateBackgroundJob(const char *jobType, const char *description)
*/
BackgroundTask *
ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskCount,
int64 dependingTaskIds[])
int64 dependingTaskIds[], int nodesInvolvedCount, int32
nodesInvolved[])
{
BackgroundTask *task = NULL;
@ -2890,6 +2893,11 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC
values[Anum_pg_dist_background_task_message - 1] = CStringGetTextDatum("");
nulls[Anum_pg_dist_background_task_message - 1] = false;
values[Anum_pg_dist_background_task_nodes_involved - 1] =
IntArrayToDatum(nodesInvolvedCount, nodesInvolved);
nulls[Anum_pg_dist_background_task_nodes_involved - 1] = (nodesInvolvedCount ==
0);
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask),
values, nulls);
CatalogTupleInsert(pgDistBackgroundTask, newTuple);
@ -3201,6 +3209,13 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple)
TextDatumGetCString(values[Anum_pg_dist_background_task_message - 1]);
}
if (!nulls[Anum_pg_dist_background_task_nodes_involved - 1])
{
ArrayType *nodesInvolvedArrayObject =
DatumGetArrayTypeP(values[Anum_pg_dist_background_task_nodes_involved - 1]);
task->nodesInvolved = IntegerArrayTypeToList(nodesInvolvedArrayObject);
}
return task;
}
@ -3333,7 +3348,8 @@ GetRunnableBackgroundTask(void)
while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor)))
{
task = DeformBackgroundTaskHeapTuple(tupleDescriptor, taskTuple);
if (BackgroundTaskReadyToRun(task))
if (BackgroundTaskReadyToRun(task) &&
IncrementParallelTaskCountForNodesInvolved(task))
{
/* found task, close table and return */
break;

View File

@ -108,7 +108,8 @@ static void BlockDistributedQueriesOnMetadataNodes(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
static bool NodeIsLocal(WorkerNode *worker);
static void SetLockTimeoutLocally(int32 lock_cooldown);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort,
bool localOnly);
static bool UnsetMetadataSyncedForAllWorkers(void);
static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode,
int columnIndex,
@ -231,8 +232,8 @@ citus_set_coordinator_host(PG_FUNCTION_ARGS)
* do not need to worry about concurrent changes (e.g. deletion) and
* can proceed to update immediately.
*/
UpdateNodeLocation(coordinatorNode->nodeId, nodeNameString, nodePort);
bool localOnly = false;
UpdateNodeLocation(coordinatorNode->nodeId, nodeNameString, nodePort, localOnly);
/* clear cached plans that have the old host/port */
ResetPlanCache();
@ -1290,7 +1291,8 @@ citus_update_node(PG_FUNCTION_ARGS)
*/
ResetPlanCache();
UpdateNodeLocation(nodeId, newNodeNameString, newNodePort);
bool localOnly = true;
UpdateNodeLocation(nodeId, newNodeNameString, newNodePort, localOnly);
/* we should be able to find the new node from the metadata */
workerNode = FindWorkerNodeAnyCluster(newNodeNameString, newNodePort);
@ -1352,7 +1354,7 @@ SetLockTimeoutLocally(int32 lockCooldown)
static void
UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort)
UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool localOnly)
{
const bool indexOK = true;
@ -1396,6 +1398,20 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort)
CommandCounterIncrement();
if (!localOnly && EnableMetadataSync)
{
WorkerNode *updatedNode = FindWorkerNodeAnyCluster(newNodeName, newNodePort);
Assert(updatedNode->nodeId == nodeId);
/* send the delete command to all primary nodes with metadata */
char *nodeDeleteCommand = NodeDeleteCommand(updatedNode->nodeId);
SendCommandToWorkersWithMetadata(nodeDeleteCommand);
/* send the insert command to all primary nodes with metadata */
char *nodeInsertCommand = NodeListInsertCommand(list_make1(updatedNode));
SendCommandToWorkersWithMetadata(nodeInsertCommand);
}
systable_endscan(scanDescriptor);
table_close(pgDistNode, NoLock);
}

View File

@ -190,13 +190,32 @@ typedef struct WorkerShardStatistics
HTAB *statistics;
} WorkerShardStatistics;
/* ShardMoveDependencyHashEntry contains the taskId which any new shard move task within the corresponding colocation group must take a dependency on */
/*
* ShardMoveDependencyHashEntry contains the taskId which any new shard
* move task within the corresponding colocation group
* must take a dependency on
*/
typedef struct ShardMoveDependencyInfo
{
int64 key;
int64 taskId;
} ShardMoveDependencyInfo;
/*
* ShardMoveSourceNodeHashEntry keeps track of the source nodes
* of the moves.
*/
typedef struct ShardMoveSourceNodeHashEntry
{
/* this is the key */
int32 node_id;
List *taskIds;
} ShardMoveSourceNodeHashEntry;
/*
* ShardMoveDependencies keeps track of all needed dependencies
* between shard moves.
*/
typedef struct ShardMoveDependencies
{
HTAB *colocationDependencies;
@ -274,6 +293,15 @@ static void AddToWorkerShardIdSet(HTAB *shardsByWorker, char *workerName, int wo
static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics);
static void ErrorOnConcurrentRebalance(RebalanceOptions *);
static List * GetSetCommandListForNewConnections(void);
static int64 GetColocationId(PlacementUpdateEvent *move);
static ShardMoveDependencies InitializeShardMoveDependencies();
static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64
colocationId,
ShardMoveDependencies shardMoveDependencies,
int *nDepends);
static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId,
int64 taskId,
ShardMoveDependencies shardMoveDependencies);
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(rebalance_table_shards);
@ -1930,8 +1958,7 @@ GetColocationId(PlacementUpdateEvent *move)
* InitializeShardMoveDependencies function creates the hash maps that we use to track
* the latest moves so that subsequent moves with the same properties must take a dependency
* on them. There are two hash maps. One is for tracking the latest move scheduled in a
* given colocation group and the other one is for tracking the latest move which involves
* a given node either as its source node or its target node.
* given colocation group and the other one is for tracking source nodes of all moves.
*/
static ShardMoveDependencies
InitializeShardMoveDependencies()
@ -1941,18 +1968,17 @@ InitializeShardMoveDependencies()
ShardMoveDependencyInfo,
"colocationDependencyHashMap",
6);
shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int64,
ShardMoveDependencyInfo,
shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int32,
ShardMoveSourceNodeHashEntry,
"nodeDependencyHashMap",
6);
return shardMoveDependencies;
}
/*
* GenerateTaskMoveDependencyList creates and returns a List of taskIds that
* the move must take a dependency on.
* the move must take a dependency on, given the shard move dependencies as input.
*/
static int64 *
GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
@ -1972,27 +1998,24 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
}
/* Check if there exists a move scheduled earlier whose source or target node
* overlaps with the current move's source node. */
shardMoveDependencyInfo = hash_search(
shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER,
/*
* Check if there exists moves scheduled earlier whose source node
* overlaps with the current move's target node.
* The earlier/first move might make space for the later/second move.
* So we could run out of disk space (or at least overload the node)
* if we move the second shard to it before the first one is moved away. 
*/
ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search(
shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_FIND,
&found);
if (found)
{
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
int64 *taskId = NULL;
foreach_ptr(taskId, shardMoveSourceNodeHashEntry->taskIds)
{
hash_search(dependsList, taskId, HASH_ENTER, NULL);
}
/* Check if there exists a move scheduled earlier whose source or target node
* overlaps with the current move's target node. */
shardMoveDependencyInfo = hash_search(
shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_ENTER,
&found);
if (found)
{
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
}
*nDepends = hash_get_num_entries(dependsList);
@ -2030,15 +2053,20 @@ UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int
shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL);
shardMoveDependencyInfo->taskId = taskId;
shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies,
&move->sourceNode->nodeId, HASH_ENTER, NULL);
bool found;
ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search(
shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER,
&found);
shardMoveDependencyInfo->taskId = taskId;
if (!found)
{
shardMoveSourceNodeHashEntry->taskIds = NIL;
}
shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies,
&move->targetNode->nodeId, HASH_ENTER, NULL);
shardMoveDependencyInfo->taskId = taskId;
int64 *newTaskId = palloc0(sizeof(int64));
*newTaskId = taskId;
shardMoveSourceNodeHashEntry->taskIds = lappend(
shardMoveSourceNodeHashEntry->taskIds, newTaskId);
}
@ -2135,8 +2163,10 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
appendStringInfo(&buf,
"SELECT pg_catalog.replicate_reference_tables(%s)",
quote_literal_cstr(shardTranferModeLabel));
int32 nodesInvolved[] = { 0 };
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, 0,
NULL);
NULL, 0, nodesInvolved);
replicateRefTablesTaskId = task->taskid;
}
@ -2170,9 +2200,14 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
dependsArray[0] = replicateRefTablesTaskId;
}
int32 nodesInvolved[2] = { 0 };
nodesInvolved[0] = move->sourceNode->nodeId;
nodesInvolved[1] = move->targetNode->nodeId;
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data,
nDepends,
dependsArray);
dependsArray, 2,
nodesInvolved);
UpdateShardMoveDependencies(move, colocationId, task->taskid,
shardMoveDependencies);

View File

@ -1810,7 +1810,7 @@ CreateWorkerForPlacementSet(List *workersForPlacementList)
/* we don't have value field as it's a set */
info.entrysize = info.keysize;
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
HTAB *workerForPlacementSet = hash_create("worker placement set", 32, &info,
hashFlags);

View File

@ -1793,6 +1793,18 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_background_task_executors_per_node",
gettext_noop(
"Sets the maximum number of parallel background task executor workers "
"for scheduled background tasks that involve a particular node"),
NULL,
&MaxBackgroundTaskExecutorsPerNode,
1, 1, 128,
PGC_SIGHUP,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_cached_connection_lifetime",
gettext_noop("Sets the maximum lifetime of cached connections to other nodes."),

View File

@ -14,3 +14,7 @@ ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY USING INDEX pg_dist_
#include "udfs/citus_stat_tenants_local_reset/11.3-1.sql"
#include "udfs/citus_stat_tenants_reset/11.3-1.sql"
-- we introduce nodes_involved, which will be used internally to
-- limit the number of parallel tasks running per node
ALTER TABLE pg_catalog.pg_dist_background_task ADD COLUMN nodes_involved int[] DEFAULT NULL;

View File

@ -0,0 +1,3 @@
-- citus--11.3-1--12.0-1
-- bump version to 12.0-1

View File

@ -28,3 +28,5 @@ DROP FUNCTION pg_catalog.citus_stat_tenants(boolean);
DROP FUNCTION pg_catalog.citus_stat_tenants_local_reset();
DROP FUNCTION pg_catalog.citus_stat_tenants_reset();
ALTER TABLE pg_catalog.pg_dist_background_task DROP COLUMN nodes_involved;

View File

@ -0,0 +1,2 @@
-- citus--12.0-1--11.3-1
-- this is an empty downgrade path since citus--11.3-1--12.0-1.sql is empty for now

View File

@ -140,3 +140,34 @@ TextArrayTypeToIntegerList(ArrayType *arrayObject)
return list;
}
/*
* IntArrayToDatum
*
* Convert an integer array to the datum int array format
* (currently used for nodes_involved in pg_dist_background_task)
*
* Returns the array in the form of a Datum, or PointerGetDatum(NULL)
* if the int_array is empty.
*/
Datum
IntArrayToDatum(uint32 int_array_size, int int_array[])
{
if (int_array_size == 0)
{
return PointerGetDatum(NULL);
}
ArrayBuildState *astate = NULL;
for (int i = 0; i < int_array_size; i++)
{
Datum dvalue = Int32GetDatum(int_array[i]);
bool disnull = false;
Oid element_type = INT4OID;
astate = accumArrayResult(astate, dvalue, disnull, element_type,
CurrentMemoryContext);
}
return makeArrayResult(astate, CurrentMemoryContext);
}

View File

@ -63,6 +63,7 @@
#include "distributed/metadata_cache.h"
#include "distributed/metadata_utility.h"
#include "distributed/shard_cleaner.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/resource_lock.h"
/* Table-of-contents constants for our dynamic shared memory segment. */
@ -115,12 +116,17 @@ static bool MonitorGotTerminationOrCancellationRequest();
static void QueueMonitorSigTermHandler(SIGNAL_ARGS);
static void QueueMonitorSigIntHandler(SIGNAL_ARGS);
static void QueueMonitorSigHupHandler(SIGNAL_ARGS);
static void DecrementParallelTaskCountForNodesInvolved(BackgroundTask *task);
/* flags set by signal handlers */
static volatile sig_atomic_t GotSigterm = false;
static volatile sig_atomic_t GotSigint = false;
static volatile sig_atomic_t GotSighup = false;
/* keeping track of parallel background tasks per node */
HTAB *ParallelTasksPerNode = NULL;
int MaxBackgroundTaskExecutorsPerNode = 1;
PG_FUNCTION_INFO_V1(citus_job_cancel);
PG_FUNCTION_INFO_V1(citus_job_wait);
PG_FUNCTION_INFO_V1(citus_task_wait);
@ -211,7 +217,7 @@ citus_job_wait(PG_FUNCTION_ARGS)
* assume any terminal state as its desired status. The function returns if the
* desired_state was reached.
*
* The current implementation is a polling implementation with an interval of 1 second.
* The current implementation is a polling implementation with an interval of 0.1 seconds.
* Ideally we would have some synchronization between the background tasks queue monitor
* and any backend calling this function to receive a signal when the task changes state.
*/
@ -857,6 +863,7 @@ TaskEnded(TaskExecutionContext *taskExecutionContext)
UpdateBackgroundTask(task);
UpdateDependingTasks(task);
UpdateBackgroundJob(task->jobid);
DecrementParallelTaskCountForNodesInvolved(task);
/* we are sure that at least one task did not block on current iteration */
queueMonitorExecutionContext->allTasksWouldBlock = false;
@ -868,6 +875,77 @@ TaskEnded(TaskExecutionContext *taskExecutionContext)
}
/*
* IncrementParallelTaskCountForNodesInvolved
* Checks whether we have reached the limit of parallel tasks per node
* per each of the nodes involved with the task
* If at least one limit is reached, it returns false.
* If limits aren't reached, it increments the parallel task count
* for each of the nodes involved with the task, and returns true.
*/
bool
IncrementParallelTaskCountForNodesInvolved(BackgroundTask *task)
{
if (task->nodesInvolved)
{
int node;
/* first check whether we have reached the limit for any of the nodes */
foreach_int(node, task->nodesInvolved)
{
bool found;
ParallelTasksPerNodeEntry *hashEntry = hash_search(
ParallelTasksPerNode, &(node), HASH_ENTER, &found);
if (!found)
{
hashEntry->counter = 0;
}
else if (hashEntry->counter >= MaxBackgroundTaskExecutorsPerNode)
{
/* at least one node's limit is reached */
return false;
}
}
/* then, increment the parallel task count per each node */
foreach_int(node, task->nodesInvolved)
{
ParallelTasksPerNodeEntry *hashEntry = hash_search(
ParallelTasksPerNode, &(node), HASH_FIND, NULL);
Assert(hashEntry);
hashEntry->counter += 1;
}
}
return true;
}
/*
* DecrementParallelTaskCountForNodesInvolved
* Decrements the parallel task count for each of the nodes involved
* with the task.
* We call this function after the task has gone through Running state
* and then has ended.
*/
static void
DecrementParallelTaskCountForNodesInvolved(BackgroundTask *task)
{
if (task->nodesInvolved)
{
int node;
foreach_int(node, task->nodesInvolved)
{
ParallelTasksPerNodeEntry *hashEntry = hash_search(ParallelTasksPerNode,
&(node),
HASH_FIND, NULL);
hashEntry->counter -= 1;
}
}
}
/*
* QueueMonitorSigHupHandler handles SIGHUP to update monitor related config params.
*/
@ -1023,7 +1101,7 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg)
/* handle SIGINT to properly cancel active task executors */
pqsignal(SIGINT, QueueMonitorSigIntHandler);
/* handle SIGHUP to update MaxBackgroundTaskExecutors */
/* handle SIGHUP to update MaxBackgroundTaskExecutors and MaxBackgroundTaskExecutorsPerNode */
pqsignal(SIGHUP, QueueMonitorSigHupHandler);
/* ready to handle signals */
@ -1167,10 +1245,15 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg)
{
GotSighup = false;
/* update max_background_task_executors if changed */
/* update max_background_task_executors and max_background_task_executors_per_node if changed */
ProcessConfigFile(PGC_SIGHUP);
}
if (ParallelTasksPerNode == NULL)
{
ParallelTasksPerNode = CreateSimpleHash(int32, ParallelTasksPerNodeEntry);
}
/* assign runnable tasks, if any, to new task executors in a transaction if we do not have SIGTERM or SIGINT */
if (!MonitorGotTerminationOrCancellationRequest())
{

View File

@ -56,7 +56,7 @@ static int CompareTenantScore(const void *leftElement, const void *rightElement)
static void UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz queryTime);
static void ReduceScoreIfNecessary(TenantStats *tenantStats, TimestampTz queryTime);
static void EvictTenantsIfNecessary(TimestampTz queryTime);
static void RecordTenantStats(TenantStats *tenantStats);
static void RecordTenantStats(TenantStats *tenantStats, TimestampTz queryTime);
static void CreateMultiTenantMonitor(void);
static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void);
static MultiTenantMonitor * GetMultiTenantMonitor(void);
@ -345,7 +345,7 @@ AttributeMetricsIfApplicable()
UpdatePeriodsIfNecessary(tenantStats, queryTime);
ReduceScoreIfNecessary(tenantStats, queryTime);
RecordTenantStats(tenantStats);
RecordTenantStats(tenantStats, queryTime);
LWLockRelease(&tenantStats->lock);
}
@ -372,7 +372,7 @@ AttributeMetricsIfApplicable()
UpdatePeriodsIfNecessary(tenantStats, queryTime);
ReduceScoreIfNecessary(tenantStats, queryTime);
RecordTenantStats(tenantStats);
RecordTenantStats(tenantStats, queryTime);
LWLockRelease(&tenantStats->lock);
}
@ -396,6 +396,7 @@ static void
UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz queryTime)
{
long long int periodInMicroSeconds = StatTenantsPeriod * USECS_PER_SEC;
long long int periodInMilliSeconds = StatTenantsPeriod * 1000;
TimestampTz periodStart = queryTime - (queryTime % periodInMicroSeconds);
/*
@ -416,14 +417,12 @@ UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz queryTime)
* If the last query is more than two periods ago, we clean the last period counts too.
*/
if (TimestampDifferenceExceeds(tenantStats->lastQueryTime, periodStart,
periodInMicroSeconds))
periodInMilliSeconds))
{
tenantStats->writesInLastPeriod = 0;
tenantStats->readsInLastPeriod = 0;
}
tenantStats->lastQueryTime = queryTime;
}
@ -503,7 +502,7 @@ EvictTenantsIfNecessary(TimestampTz queryTime)
* RecordTenantStats records the query statistics for the tenant.
*/
static void
RecordTenantStats(TenantStats *tenantStats)
RecordTenantStats(TenantStats *tenantStats, TimestampTz queryTime)
{
if (tenantStats->score < LLONG_MAX - ONE_QUERY_SCORE)
{
@ -524,6 +523,8 @@ RecordTenantStats(TenantStats *tenantStats)
{
tenantStats->writesInThisPeriod++;
}
tenantStats->lastQueryTime = queryTime;
}

View File

@ -85,6 +85,21 @@ typedef struct TaskExecutionContext
} TaskExecutionContext;
/*
* ParallelTasksPerNodeEntry is the struct used
* to track the number of concurrent background tasks that
* involve a particular node (the key to the entry)
*/
typedef struct ParallelTasksPerNodeEntry
{
/* Used as hash key. */
int32 node_id;
/* number of concurrent background tasks that involve node node_id */
uint32 counter;
} ParallelTasksPerNodeEntry;
extern BackgroundWorkerHandle * StartCitusBackgroundTaskQueueMonitor(Oid database,
Oid extensionOwner);
extern void CitusBackgroundTaskQueueMonitorMain(Datum arg);
@ -95,5 +110,6 @@ extern Datum citus_job_wait(PG_FUNCTION_ARGS);
extern Datum citus_task_wait(PG_FUNCTION_ARGS);
extern void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus);
extern void citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus);
extern bool IncrementParallelTaskCountForNodesInvolved(BackgroundTask *task);
#endif /*CITUS_BACKGROUND_JOBS_H */

View File

@ -252,6 +252,7 @@ typedef struct BackgroundTask
int32 *retry_count;
TimestampTz *not_before;
char *message;
List *nodesInvolved;
/* extra space to store values for nullable value types above */
struct
@ -388,7 +389,9 @@ extern bool HasNonTerminalJobOfType(const char *jobType, int64 *jobIdOut);
extern int64 CreateBackgroundJob(const char *jobType, const char *description);
extern BackgroundTask * ScheduleBackgroundTask(int64 jobId, Oid owner, char *command,
int dependingTaskCount,
int64 dependingTaskIds[]);
int64 dependingTaskIds[],
int nodesInvolvedCount,
int32 nodesInvolved[]);
extern BackgroundTask * GetRunnableBackgroundTask(void);
extern void ResetRunningBackgroundTasks(void);
extern BackgroundJob * GetBackgroundJobByJobId(int64 jobId);

View File

@ -15,7 +15,7 @@
* compiler constants for pg_dist_background_task
* ----------------
*/
#define Natts_pg_dist_background_task 9
#define Natts_pg_dist_background_task 10
#define Anum_pg_dist_background_task_job_id 1
#define Anum_pg_dist_background_task_task_id 2
#define Anum_pg_dist_background_task_owner 3
@ -25,5 +25,6 @@
#define Anum_pg_dist_background_task_retry_count 7
#define Anum_pg_dist_background_task_not_before 8
#define Anum_pg_dist_background_task_message 9
#define Anum_pg_dist_background_task_nodes_involved 10
#endif /* CITUS_PG_DIST_BACKGROUND_TASK_H */

View File

@ -190,6 +190,7 @@ extern char *VariablesToBePassedToNewConnections;
extern int MaxRebalancerLoggedIgnoredMoves;
extern bool RunningUnderIsolationTest;
extern bool PropagateSessionSettingsForLoopbackConnection;
extern int MaxBackgroundTaskExecutorsPerNode;
/* External function declarations */
extern Datum shard_placement_rebalance_array(PG_FUNCTION_ARGS);

View File

@ -22,5 +22,6 @@ extern ArrayType * DatumArrayToArrayType(Datum *datumArray, int datumCount,
Oid datumTypeId);
extern List * IntegerArrayTypeToList(ArrayType *arrayObject);
extern List * TextArrayTypeToIntegerList(ArrayType *arrayObject);
extern Datum IntArrayToDatum(uint32 int_array_size, int int_array[]);
#endif /* CITUS_ARRAY_TYPE_H */

View File

@ -0,0 +1,51 @@
# Schema change CDC test for Citus
use strict;
use warnings;
use Test::More;
use lib './t';
use cdctestlib;
use threads;
# Initialize co-ordinator node
my $select_stmt = qq(SELECT * FROM data_100008 ORDER BY id;);
my $result = 0;
### Create the citus cluster with coordinator and two worker nodes
our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636);
print("coordinator port: " . $node_coordinator->port() . "\n");
print("worker0 port:" . $workers[0]->port() . "\n");
my $initial_schema = "
CREATE TABLE data_100008(
id integer,
data text,
PRIMARY KEY (data));";
$node_coordinator->safe_psql('postgres',$initial_schema);
$node_coordinator->safe_psql('postgres','ALTER TABLE data_100008 REPLICA IDENTITY FULL;');
$node_coordinator->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','wal2json');");
#insert data into the data_100008 table in the coordinator node before distributing the table.
$node_coordinator->safe_psql('postgres',"
INSERT INTO data_100008
SELECT i, 'my test data ' || i
FROM generate_series(-1,1)i;");
my $output = $node_coordinator->safe_psql('postgres',"SELECT * FROM pg_logical_slot_get_changes('cdc_replication_slot', NULL, NULL);");
print($output);
my $change_string_expected = '[0,"my test data 0"]';
if ($output =~ /$change_string_expected/) {
$result = 1;
} else {
$result = 0;
}
is($result, 1, 'CDC create_distributed_table - wal2json test');
$node_coordinator->safe_psql('postgres',"SELECT pg_drop_replication_slot('cdc_replication_slot');");
done_testing();

View File

@ -39,7 +39,7 @@ CITUS_ARBITRARY_TEST_DIR = "./tmp_citus_test"
MASTER = "master"
# This should be updated when citus version changes
MASTER_VERSION = "11.3"
MASTER_VERSION = "12.0"
HOME = expanduser("~")

View File

@ -1,12 +1,22 @@
/*
Test to check if the background tasks scheduled by the background rebalancer
has the correct dependencies.
*/
--
-- BACKGROUND_REBALANCE_PARALLEL
--
-- Test to check if the background tasks scheduled by the background rebalancer
-- have the correct dependencies
--
-- Test to verify that we do not allow parallel rebalancer moves involving a
-- particular node (either as source or target) more than
-- citus.max_background_task_executors_per_node, and that we can change the GUC on
-- the fly, and that will affect the ongoing balance as it should
--
-- Test to verify that there's a hard dependency when a specific node is first being
-- used as a source for a move, and then later as a target.
--
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
SET citus.next_shard_id TO 85674000;
SET citus.shard_replication_factor TO 1;
SET client_min_messages TO WARNING;
SET client_min_messages TO ERROR;
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777;
ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050;
@ -45,52 +55,52 @@ SELECT pg_reload_conf();
t
(1 row)
/* Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group */
-- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4 , colocate_with => 'none');
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table2_colg1 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg1', 'b' , colocate_with => 'table1_colg1');
SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
/* Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group */
-- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2 ', 'a', shard_count => 4, colocate_with => 'none');
SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table2_colg2 (b int primary key);
SELECT create_distributed_table('table2_colg2', 'b' , colocate_with => 'table1_colg2');
SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2');
create_distributed_table
---------------------------------------------------------------------
(1 row)
/* Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group */
-- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3 ', 'a', shard_count => 4, colocate_with => 'none');
SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table2_colg3 (b int primary key);
SELECT create_distributed_table('table2_colg3', 'b' , colocate_with => 'table1_colg3');
SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3');
create_distributed_table
---------------------------------------------------------------------
(1 row)
/* Add two new node so that we can rebalance */
-- Add two new nodes so that we can rebalance
SELECT 1 FROM citus_add_node('localhost', :worker_3_port);
?column?
---------------------------------------------------------------------
@ -132,10 +142,11 @@ SELECT citus_rebalance_wait();
(1 row)
/*Check that a move is dependent on
1. any other move scheduled earlier in its colocation group.
2. any other move scheduled earlier whose source node or target
node overlaps with the current moves nodes. */
-- PART 1
-- Test to check if the background tasks scheduled by the background rebalancer
-- have the correct dependencies
-- Check that a move is dependent on
-- any other move scheduled earlier in its colocation group.
SELECT S.shardid, P.colocationid
FROM pg_dist_shard S, pg_dist_partition P
WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC;
@ -175,16 +186,12 @@ FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id,
task_id | command | depends_on | command
---------------------------------------------------------------------
1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto')
1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto')
1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto')
1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto')
1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto')
1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto')
1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto')
(7 rows)
(3 rows)
/* Check that if there is a reference table that needs to be synched to a node,
any move without a dependency must depend on the move task for reference table. */
-- Check that if there is a reference table that needs to be synched to a node,
-- any move without a dependency must depend on the move task for reference table.
SELECT 1 FROM citus_drain_node('localhost',:worker_4_port);
?column?
---------------------------------------------------------------------
@ -203,8 +210,8 @@ SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true)
1
(1 row)
/* Drain worker_3 so that we can move only one colocation group to worker_3
to create an unbalance that would cause parallel rebalancing. */
-- Drain worker_3 so that we can move only one colocation group to worker_3
-- to create an unbalance that would cause parallel rebalancing.
SELECT 1 FROM citus_drain_node('localhost',:worker_3_port);
?column?
---------------------------------------------------------------------
@ -225,7 +232,7 @@ SELECT create_reference_table('ref_table');
(1 row)
/* Move all the shards of Colocation group 3 to worker_3.*/
-- Move all the shards of Colocation group 3 to worker_3.
SELECT
master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes')
FROM
@ -243,7 +250,7 @@ ORDER BY
(4 rows)
CALL citus_cleanup_orphaned_resources();
/* Activate and new nodes so that we can rebalance. */
-- Activate and new nodes so that we can rebalance.
SELECT 1 FROM citus_activate_node('localhost', :worker_4_port);
?column?
---------------------------------------------------------------------
@ -323,18 +330,34 @@ FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id,
1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto')
1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto')
1010 | SELECT pg_catalog.citus_move_shard_placement(85674017,52,53,'auto') | 1007 | SELECT pg_catalog.citus_move_shard_placement(85674016,52,53,'auto')
1011 | SELECT pg_catalog.citus_move_shard_placement(85674008,51,54,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto')
1011 | SELECT pg_catalog.citus_move_shard_placement(85674008,51,54,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto')
1012 | SELECT pg_catalog.citus_move_shard_placement(85674001,50,55,'auto') | 1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto')
(6 rows)
-- PART 2
-- Test to verify that we do not allow parallel rebalancer moves involving a
-- particular node (either as source or target)
-- more than citus.max_background_task_executors_per_node
-- and that we can change the GUC on the fly
-- citus_task_wait calls are used to ensure consistent pg_dist_background_task query
-- output i.e. to avoid flakiness
-- First let's restart the scenario
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_2_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_3_port);
citus_remove_node
---------------------------------------------------------------------
@ -359,6 +382,474 @@ select citus_remove_node('localhost', :worker_6_port);
(1 row)
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
-- Create 8 tables in 4 colocation groups, and populate them
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 3, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg1 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table2_colg1 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table2_colg1 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2', 'a', shard_count => 3, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg2 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table2_colg2 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table2_colg2 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3', 'a', shard_count => 3, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg3 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table2_colg3 (b int primary key);
SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table2_colg3 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg4 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg4', 'a', shard_count => 3, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg4 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table2_colg4 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg4', 'b', colocate_with => 'table1_colg4');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table2_colg4 SELECT i FROM generate_series(0, 100)i;
-- Add nodes so that we can rebalance
SELECT citus_add_node('localhost', :worker_2_port);
citus_add_node
---------------------------------------------------------------------
56
(1 row)
SELECT citus_add_node('localhost', :worker_3_port);
citus_add_node
---------------------------------------------------------------------
57
(1 row)
SELECT citus_rebalance_start AS job_id from citus_rebalance_start() \gset
-- see dependent tasks to understand which tasks remain runnable because of
-- citus.max_background_task_executors_per_node
-- and which tasks are actually blocked from colocation group dependencies
SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
task_id | command | depends_on | command
---------------------------------------------------------------------
1014 | SELECT pg_catalog.citus_move_shard_placement(85674026,50,57,'auto') | 1013 | SELECT pg_catalog.citus_move_shard_placement(85674025,50,56,'auto')
1016 | SELECT pg_catalog.citus_move_shard_placement(85674032,50,57,'auto') | 1015 | SELECT pg_catalog.citus_move_shard_placement(85674031,50,56,'auto')
1018 | SELECT pg_catalog.citus_move_shard_placement(85674038,50,57,'auto') | 1017 | SELECT pg_catalog.citus_move_shard_placement(85674037,50,56,'auto')
1020 | SELECT pg_catalog.citus_move_shard_placement(85674044,50,57,'auto') | 1019 | SELECT pg_catalog.citus_move_shard_placement(85674043,50,56,'auto')
(4 rows)
-- default citus.max_background_task_executors_per_node is 1
-- show that first exactly one task per node is running
-- among the tasks that are not blocked
SELECT citus_task_wait(1013, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
17779 | 1013 | running | {50,56}
17779 | 1014 | blocked | {50,57}
17779 | 1015 | runnable | {50,56}
17779 | 1016 | blocked | {50,57}
17779 | 1017 | runnable | {50,56}
17779 | 1018 | blocked | {50,57}
17779 | 1019 | runnable | {50,56}
17779 | 1020 | blocked | {50,57}
(8 rows)
-- increase citus.max_background_task_executors_per_node
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT citus_task_wait(1015, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(1013, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
-- show that at most 2 tasks per node are running
-- among the tasks that are not blocked
SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
17779 | 1013 | done | {50,56}
17779 | 1014 | running | {50,57}
17779 | 1015 | running | {50,56}
17779 | 1016 | blocked | {50,57}
17779 | 1017 | runnable | {50,56}
17779 | 1018 | blocked | {50,57}
17779 | 1019 | runnable | {50,56}
17779 | 1020 | blocked | {50,57}
(8 rows)
-- decrease to default (1)
ALTER SYSTEM RESET citus.max_background_task_executors_per_node;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT citus_task_wait(1015, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(1014, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(1016, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
-- show that exactly one task per node is running
-- among the tasks that are not blocked
SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
17779 | 1013 | done | {50,56}
17779 | 1014 | done | {50,57}
17779 | 1015 | done | {50,56}
17779 | 1016 | running | {50,57}
17779 | 1017 | runnable | {50,56}
17779 | 1018 | blocked | {50,57}
17779 | 1019 | runnable | {50,56}
17779 | 1020 | blocked | {50,57}
(8 rows)
SELECT citus_rebalance_stop();
citus_rebalance_stop
---------------------------------------------------------------------
(1 row)
-- PART 3
-- Test to verify that there's a hard dependency when A specific node is first being used as a
-- source for a move, and then later as a target.
-- First let's restart the scenario
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_1_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_2_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_3_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
SET citus.next_shard_id TO 85674051;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 61;
-- add the first node
-- nodeid here is 61
select citus_add_node('localhost', :worker_1_port);
citus_add_node
---------------------------------------------------------------------
61
(1 row)
-- create, populate and distribute 6 tables, each with 1 shard, none colocated with each other
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 1, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg1 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2', 'a', shard_count => 1, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg2 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3', 'a', shard_count => 1, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg3 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg4 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg4', 'a', shard_count => 1, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg4 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg5 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg5', 'a', shard_count => 1, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg5 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg6 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg6', 'a', shard_count => 1, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1_colg6 SELECT i FROM generate_series(0, 100)i;
-- add two other nodes
-- nodeid here is 62
select citus_add_node('localhost', :worker_2_port);
citus_add_node
---------------------------------------------------------------------
62
(1 row)
-- nodeid here is 63
select citus_add_node('localhost', :worker_3_port);
citus_add_node
---------------------------------------------------------------------
63
(1 row)
CREATE OR REPLACE FUNCTION shard_placement_rebalance_array(
worker_node_list json[],
shard_placement_list json[],
threshold float4 DEFAULT 0,
max_shard_moves int DEFAULT 1000000,
drain_only bool DEFAULT false,
improvement_threshold float4 DEFAULT 0.5
)
RETURNS json[]
AS 'citus'
LANGUAGE C STRICT VOLATILE;
-- we are simulating the following from shard_rebalancer_unit.sql
-- the following steps are all according to this scenario
-- where the third move should be dependent of the first two
-- because the third move's target is the source of the first two
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "hostname1", "disallowed_shards": "1,2,3,5,6"}',
'{"node_name": "hostname2", "disallowed_shards": "4"}',
'{"node_name": "hostname3", "disallowed_shards": "4"}'
]::json[],
ARRAY['{"shardid":1, "nodename":"hostname1"}',
'{"shardid":2, "nodename":"hostname1"}',
'{"shardid":3, "nodename":"hostname2"}',
'{"shardid":4, "nodename":"hostname2"}',
'{"shardid":5, "nodename":"hostname3"}',
'{"shardid":6, "nodename":"hostname3"}'
]::json[]
));
unnest
---------------------------------------------------------------------
{"updatetype":1,"shardid":1,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname2","targetport":5432}
{"updatetype":1,"shardid":2,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname3","targetport":5432}
{"updatetype":1,"shardid":4,"sourcename":"hostname2","sourceport":5432,"targetname":"hostname1","targetport":5432}
(3 rows)
-- manually balance the cluster such that we have
-- a balanced cluster like above with 1,2,3,4,5,6 and hostname1/2/3
-- shardid 85674051 (1) nodeid 61 (hostname1)
-- shardid 85674052 (2) nodeid 61 (hostname1)
-- shardid 85674053 (3) nodeid 62 (hostname2)
-- shardid 85674054 (4) nodeid 62 (hostname2)
-- shardid 85674055 (5) nodeid 63 (hostname3)
-- shardid 85674056 (6) nodeid 63 (hostname3)
SELECT pg_catalog.citus_move_shard_placement(85674053,61,62,'auto');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT pg_catalog.citus_move_shard_placement(85674054,61,62,'auto');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT pg_catalog.citus_move_shard_placement(85674055,61,63,'auto');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT pg_catalog.citus_move_shard_placement(85674056,61,63,'auto');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- now create another rebalance strategy in order to simulate moves
-- which use as target a node that has been previously used as source
CREATE OR REPLACE FUNCTION test_shard_allowed_on_node(shardid bigint, nodeid int)
RETURNS boolean AS
$$
-- analogous to '{"node_name": "hostname1", "disallowed_shards": "1,2,3,5,6"}'
select case when (shardid != 85674054 and nodeid = 61)
then false
-- analogous to '{"node_name": "hostname2", "disallowed_shards": "4"}'
-- AND '{"node_name": "hostname2", "disallowed_shards": "4"}'
when (shardid = 85674054 and nodeid != 61)
then false
else true
end;
$$ LANGUAGE sql;
-- insert the new test rebalance strategy
INSERT INTO
pg_catalog.pg_dist_rebalance_strategy(
name,
default_strategy,
shard_cost_function,
node_capacity_function,
shard_allowed_on_node_function,
default_threshold,
minimum_threshold,
improvement_threshold
) VALUES (
'test_source_then_target',
false,
'citus_shard_cost_1',
'citus_node_capacity_1',
'background_rebalance_parallel.test_shard_allowed_on_node',
0,
0,
0
);
SELECT * FROM get_rebalance_table_shards_plan(rebalance_strategy := 'test_source_then_target');
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
table1_colg1 | 85674051 | 0 | localhost | 57637 | localhost | 57638
table1_colg2 | 85674052 | 0 | localhost | 57637 | localhost | 57639
table1_colg4 | 85674054 | 0 | localhost | 57638 | localhost | 57637
(3 rows)
SELECT citus_rebalance_start AS job_id from citus_rebalance_start(rebalance_strategy := 'test_source_then_target') \gset
-- check that the third move is blocked and depends on the first two
SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
17780 | 1021 | runnable | {61,62}
17780 | 1022 | runnable | {61,63}
17780 | 1023 | blocked | {62,61}
(3 rows)
SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
task_id | command | depends_on | command
---------------------------------------------------------------------
1023 | SELECT pg_catalog.citus_move_shard_placement(85674054,62,61,'auto') | 1021 | SELECT pg_catalog.citus_move_shard_placement(85674051,61,62,'auto')
1023 | SELECT pg_catalog.citus_move_shard_placement(85674054,62,61,'auto') | 1022 | SELECT pg_catalog.citus_move_shard_placement(85674052,61,63,'auto')
(2 rows)
SELECT citus_rebalance_stop();
citus_rebalance_stop
---------------------------------------------------------------------
(1 row)
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_source_then_target';
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_3_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
-- keep the rest of the tests inact that depends node/group ids
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls;

View File

@ -3,6 +3,7 @@ SET search_path TO background_task_queue_monitor;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 3536400;
SET client_min_messages TO ERROR;
-- reset sequence values
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 1450000;
ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1450000;
@ -654,11 +655,268 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
1450016 | 1450024 | done
(2 rows)
-- TEST11
-- verify that we do not allow parallel task executors involving a particular node
-- more than citus.max_background_task_executors_per_node
-- verify that we can change citus.max_background_task_executors_per_node on the fly
-- tests are done with dummy node ids
-- citus_task_wait calls are used to ensure consistent pg_dist_background_task query
-- output i.e. to avoid flakiness
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify changing max background task executors per node on the fly') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(2); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(2); $job$, ARRAY [3, 4]) RETURNING task_id AS task_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id3 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [1, 3]) RETURNING task_id AS task_id4 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [2, 4]) RETURNING task_id AS task_id5 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(7); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id6 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(6); $job$, ARRAY [1, 3]) RETURNING task_id AS task_id7 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(6); $job$, ARRAY [1, 4]) RETURNING task_id AS task_id8 \gset
COMMIT;
SELECT citus_task_wait(:task_id1, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id2, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id; -- show that at most 1 task per node is running
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
1450017 | 1450025 | running | {1,2}
1450017 | 1450026 | running | {3,4}
1450017 | 1450027 | runnable | {1,2}
1450017 | 1450028 | runnable | {1,3}
1450017 | 1450029 | runnable | {2,4}
1450017 | 1450030 | runnable | {1,2}
1450017 | 1450031 | runnable | {1,3}
1450017 | 1450032 | runnable | {1,4}
(8 rows)
SELECT citus_task_wait(:task_id1, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id2, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
-- increase max_background_task_executors_per_node on the fly
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT citus_task_wait(:task_id3, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id4, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id5, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id; -- show that at most 2 tasks per node are running
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
1450017 | 1450025 | done | {1,2}
1450017 | 1450026 | done | {3,4}
1450017 | 1450027 | running | {1,2}
1450017 | 1450028 | running | {1,3}
1450017 | 1450029 | running | {2,4}
1450017 | 1450030 | runnable | {1,2}
1450017 | 1450031 | runnable | {1,3}
1450017 | 1450032 | runnable | {1,4}
(8 rows)
-- increase to 3 max_background_task_executors_per_node on the fly
SELECT citus_task_wait(:task_id3, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id4, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id5, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 3;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT citus_task_wait(:task_id6, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id7, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id8, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id; -- show that at most 3 tasks per node are running
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
1450017 | 1450025 | done | {1,2}
1450017 | 1450026 | done | {3,4}
1450017 | 1450027 | done | {1,2}
1450017 | 1450028 | done | {1,3}
1450017 | 1450029 | done | {2,4}
1450017 | 1450030 | running | {1,2}
1450017 | 1450031 | running | {1,3}
1450017 | 1450032 | running | {1,4}
(8 rows)
ALTER SYSTEM RESET citus.max_background_task_executors_per_node;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
-- if pg_cancel_backend is called on one of the running task PIDs
-- task doesn't restart because it's not allowed anymore by the limit.
-- node with id 1 can be used only once, unless there are previously running tasks
SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset
SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process
pg_cancel_backend
---------------------------------------------------------------------
t
(1 row)
-- task goes to only runnable state, not running anymore.
SELECT citus_task_wait(:task_id6, desired_status => 'runnable');
citus_task_wait
---------------------------------------------------------------------
(1 row)
-- show that cancelled task hasn't restarted because limit doesn't allow it
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id;
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
1450017 | 1450025 | done | {1,2}
1450017 | 1450026 | done | {3,4}
1450017 | 1450027 | done | {1,2}
1450017 | 1450028 | done | {1,3}
1450017 | 1450029 | done | {2,4}
1450017 | 1450030 | runnable | {1,2}
1450017 | 1450031 | running | {1,3}
1450017 | 1450032 | running | {1,4}
(8 rows)
SELECT citus_task_wait(:task_id7, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id8, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id6, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
-- show that the 6th task has restarted only after both 6 and 7 are done
-- since we have a limit of 1 background task executor per node with id 1
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id;
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
1450017 | 1450025 | done | {1,2}
1450017 | 1450026 | done | {3,4}
1450017 | 1450027 | done | {1,2}
1450017 | 1450028 | done | {1,3}
1450017 | 1450029 | done | {2,4}
1450017 | 1450030 | running | {1,2}
1450017 | 1450031 | done | {1,3}
1450017 | 1450032 | done | {1,4}
(8 rows)
SELECT citus_job_cancel(:job_id1);
citus_job_cancel
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id1);
citus_job_wait
---------------------------------------------------------------------
(1 row)
ALTER SYSTEM RESET citus.max_background_task_executors_per_node;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SET client_min_messages TO WARNING;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
DROP SCHEMA background_task_queue_monitor CASCADE;
RESET client_min_messages;
ALTER SYSTEM RESET citus.background_task_queue_interval;
ALTER SYSTEM RESET citus.max_background_task_executors;
SELECT pg_reload_conf();

View File

@ -263,6 +263,19 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q
5 | 0 | 0 | 0 | 1
(2 rows)
SELECT sleep_until_next_period();
sleep_until_next_period
---------------------------------------------------------------------
(1 row)
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
1 | 0 | 0 | 0 | 0
5 | 0 | 0 | 0 | 0
(2 rows)
\c - - - :master_port
SET search_path TO citus_stat_tenants;
-- test logs

View File

@ -52,12 +52,28 @@ CREATE TABLE loc1 (id int PRIMARY KEY);
INSERT INTO loc1 SELECT i FROM generate_series(1,100) i;
CREATE TABLE loc2 (id int REFERENCES loc1(id));
INSERT INTO loc2 SELECT i FROM generate_series(1,100) i;
-- citus_set_coordinator_host with wrong port
SELECT citus_set_coordinator_host('localhost', 9999);
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
-- citus_set_coordinator_host with correct port
SELECT citus_set_coordinator_host('localhost', :master_port);
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
-- show coordinator port is correct on all workers
SELECT * FROM run_command_on_workers($$SELECT row(nodename,nodeport) FROM pg_dist_node WHERE groupid = 0$$);
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 9060 | t | (localhost,57636)
localhost | 57637 | t | (localhost,57636)
(2 rows)
SELECT citus_add_local_table_to_metadata('loc1', cascade_via_foreign_keys => true);
citus_add_local_table_to_metadata
---------------------------------------------------------------------

View File

@ -1330,12 +1330,28 @@ SELECT * FROM multi_extension.print_extension_changes();
| view citus_stat_tenants_local
(11 rows)
-- Test downgrade to 11.3-1 from 12.0-1
ALTER EXTENSION citus UPDATE TO '12.0-1';
ALTER EXTENSION citus UPDATE TO '11.3-1';
-- Should be empty result since upgrade+downgrade should be a no-op
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
(0 rows)
-- Snapshot of state at 12.0-1
ALTER EXTENSION citus UPDATE TO '12.0-1';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
(0 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version
SHOW citus.version;
citus.version
---------------------------------------------------------------------
11.3devel
12.0devel
(1 row)
-- ensure no unexpected objects were created outside pg_catalog
@ -1368,7 +1384,7 @@ DROP EXTENSION citus;
DROP EXTENSION citus_columnar;
CREATE EXTENSION citus VERSION '8.0-1';
ERROR: specified version incompatible with loaded Citus library
DETAIL: Loaded library requires 11.3, but 8.0-1 was specified.
DETAIL: Loaded library requires 12.0, but 8.0-1 was specified.
HINT: If a newer library is present, restart the database and try the command again.
-- Test non-distributed queries work even in version mismatch
SET citus.enable_version_checks TO 'false';
@ -1413,7 +1429,7 @@ ORDER BY 1;
-- We should not distribute table in version mistmatch
SELECT create_distributed_table('version_mismatch_table', 'column1');
ERROR: loaded Citus library version differs from installed extension version
DETAIL: Loaded library requires 11.3, but the installed extension version is 8.1-1.
DETAIL: Loaded library requires 12.0, but the installed extension version is 8.1-1.
HINT: Run ALTER EXTENSION citus UPDATE and try again.
-- This function will cause fail in next ALTER EXTENSION
CREATE OR REPLACE FUNCTION pg_catalog.relation_is_a_known_shard(regclass)

View File

@ -9,7 +9,8 @@ FROM pg_attribute
WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass,
'pg_dist_rebalance_strategy'::regclass,
'pg_dist_partition'::regclass,
'pg_dist_object'::regclass)
'pg_dist_object'::regclass,
'pg_dist_background_task'::regclass)
ORDER BY attrelid, attname;
attrelid | attname | atthasmissing | attmissingval
---------------------------------------------------------------------

View File

@ -1,12 +1,22 @@
/*
Test to check if the background tasks scheduled by the background rebalancer
has the correct dependencies.
*/
--
-- BACKGROUND_REBALANCE_PARALLEL
--
-- Test to check if the background tasks scheduled by the background rebalancer
-- have the correct dependencies
--
-- Test to verify that we do not allow parallel rebalancer moves involving a
-- particular node (either as source or target) more than
-- citus.max_background_task_executors_per_node, and that we can change the GUC on
-- the fly, and that will affect the ongoing balance as it should
--
-- Test to verify that there's a hard dependency when a specific node is first being
-- used as a source for a move, and then later as a target.
--
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
SET citus.next_shard_id TO 85674000;
SET citus.shard_replication_factor TO 1;
SET client_min_messages TO WARNING;
SET client_min_messages TO ERROR;
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777;
ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000;
@ -26,34 +36,34 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port);
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
SELECT pg_reload_conf();
/* Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group */
-- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4 , colocate_with => 'none');
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none');
CREATE TABLE table2_colg1 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg1', 'b' , colocate_with => 'table1_colg1');
SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1');
/* Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group */
-- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2 ', 'a', shard_count => 4, colocate_with => 'none');
SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none');
CREATE TABLE table2_colg2 (b int primary key);
SELECT create_distributed_table('table2_colg2', 'b' , colocate_with => 'table1_colg2');
SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2');
/* Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group */
-- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3 ', 'a', shard_count => 4, colocate_with => 'none');
SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none');
CREATE TABLE table2_colg3 (b int primary key);
SELECT create_distributed_table('table2_colg3', 'b' , colocate_with => 'table1_colg3');
SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3');
/* Add two new node so that we can rebalance */
-- Add two new nodes so that we can rebalance
SELECT 1 FROM citus_add_node('localhost', :worker_3_port);
SELECT 1 FROM citus_add_node('localhost', :worker_4_port);
@ -63,10 +73,12 @@ SELECT * FROM citus_rebalance_start();
SELECT citus_rebalance_wait();
/*Check that a move is dependent on
1. any other move scheduled earlier in its colocation group.
2. any other move scheduled earlier whose source node or target
node overlaps with the current moves nodes. */
-- PART 1
-- Test to check if the background tasks scheduled by the background rebalancer
-- have the correct dependencies
-- Check that a move is dependent on
-- any other move scheduled earlier in its colocation group.
SELECT S.shardid, P.colocationid
FROM pg_dist_shard S, pg_dist_partition P
WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC;
@ -78,14 +90,14 @@ SELECT D.task_id,
FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id, D.depends_on ASC;
/* Check that if there is a reference table that needs to be synched to a node,
any move without a dependency must depend on the move task for reference table. */
-- Check that if there is a reference table that needs to be synched to a node,
-- any move without a dependency must depend on the move task for reference table.
SELECT 1 FROM citus_drain_node('localhost',:worker_4_port);
SELECT public.wait_for_resource_cleanup();
SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true);
/* Drain worker_3 so that we can move only one colocation group to worker_3
to create an unbalance that would cause parallel rebalancing. */
-- Drain worker_3 so that we can move only one colocation group to worker_3
-- to create an unbalance that would cause parallel rebalancing.
SELECT 1 FROM citus_drain_node('localhost',:worker_3_port);
SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true);
@ -95,7 +107,7 @@ CREATE TABLE ref_table(a int PRIMARY KEY);
SELECT create_reference_table('ref_table');
/* Move all the shards of Colocation group 3 to worker_3.*/
-- Move all the shards of Colocation group 3 to worker_3.
SELECT
master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes')
FROM
@ -107,7 +119,7 @@ ORDER BY
CALL citus_cleanup_orphaned_resources();
/* Activate and new nodes so that we can rebalance. */
-- Activate and new nodes so that we can rebalance.
SELECT 1 FROM citus_activate_node('localhost', :worker_4_port);
SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true);
@ -128,13 +140,265 @@ SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id, D.depends_on ASC;
-- PART 2
-- Test to verify that we do not allow parallel rebalancer moves involving a
-- particular node (either as source or target)
-- more than citus.max_background_task_executors_per_node
-- and that we can change the GUC on the fly
-- citus_task_wait calls are used to ensure consistent pg_dist_background_task query
-- output i.e. to avoid flakiness
-- First let's restart the scenario
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
SELECT public.wait_for_resource_cleanup();
select citus_remove_node('localhost', :worker_2_port);
select citus_remove_node('localhost', :worker_3_port);
select citus_remove_node('localhost', :worker_4_port);
select citus_remove_node('localhost', :worker_5_port);
select citus_remove_node('localhost', :worker_6_port);
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
-- Create 8 tables in 4 colocation groups, and populate them
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 3, colocate_with => 'none');
INSERT INTO table1_colg1 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table2_colg1 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1');
INSERT INTO table2_colg1 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2', 'a', shard_count => 3, colocate_with => 'none');
INSERT INTO table1_colg2 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table2_colg2 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2');
INSERT INTO table2_colg2 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3', 'a', shard_count => 3, colocate_with => 'none');
INSERT INTO table1_colg3 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table2_colg3 (b int primary key);
SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3');
INSERT INTO table2_colg3 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg4 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg4', 'a', shard_count => 3, colocate_with => 'none');
INSERT INTO table1_colg4 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table2_colg4 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg4', 'b', colocate_with => 'table1_colg4');
INSERT INTO table2_colg4 SELECT i FROM generate_series(0, 100)i;
-- Add nodes so that we can rebalance
SELECT citus_add_node('localhost', :worker_2_port);
SELECT citus_add_node('localhost', :worker_3_port);
SELECT citus_rebalance_start AS job_id from citus_rebalance_start() \gset
-- see dependent tasks to understand which tasks remain runnable because of
-- citus.max_background_task_executors_per_node
-- and which tasks are actually blocked from colocation group dependencies
SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
-- default citus.max_background_task_executors_per_node is 1
-- show that first exactly one task per node is running
-- among the tasks that are not blocked
SELECT citus_task_wait(1013, desired_status => 'running');
SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
-- increase citus.max_background_task_executors_per_node
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2;
SELECT pg_reload_conf();
SELECT citus_task_wait(1015, desired_status => 'running');
SELECT citus_task_wait(1013, desired_status => 'done');
-- show that at most 2 tasks per node are running
-- among the tasks that are not blocked
SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
-- decrease to default (1)
ALTER SYSTEM RESET citus.max_background_task_executors_per_node;
SELECT pg_reload_conf();
SELECT citus_task_wait(1015, desired_status => 'done');
SELECT citus_task_wait(1014, desired_status => 'done');
SELECT citus_task_wait(1016, desired_status => 'running');
-- show that exactly one task per node is running
-- among the tasks that are not blocked
SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
SELECT citus_rebalance_stop();
-- PART 3
-- Test to verify that there's a hard dependency when A specific node is first being used as a
-- source for a move, and then later as a target.
-- First let's restart the scenario
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
SELECT public.wait_for_resource_cleanup();
select citus_remove_node('localhost', :worker_1_port);
select citus_remove_node('localhost', :worker_2_port);
select citus_remove_node('localhost', :worker_3_port);
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
SET citus.next_shard_id TO 85674051;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 61;
-- add the first node
-- nodeid here is 61
select citus_add_node('localhost', :worker_1_port);
-- create, populate and distribute 6 tables, each with 1 shard, none colocated with each other
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 1, colocate_with => 'none');
INSERT INTO table1_colg1 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2', 'a', shard_count => 1, colocate_with => 'none');
INSERT INTO table1_colg2 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3', 'a', shard_count => 1, colocate_with => 'none');
INSERT INTO table1_colg3 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg4 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg4', 'a', shard_count => 1, colocate_with => 'none');
INSERT INTO table1_colg4 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg5 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg5', 'a', shard_count => 1, colocate_with => 'none');
INSERT INTO table1_colg5 SELECT i FROM generate_series(0, 100)i;
CREATE TABLE table1_colg6 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg6', 'a', shard_count => 1, colocate_with => 'none');
INSERT INTO table1_colg6 SELECT i FROM generate_series(0, 100)i;
-- add two other nodes
-- nodeid here is 62
select citus_add_node('localhost', :worker_2_port);
-- nodeid here is 63
select citus_add_node('localhost', :worker_3_port);
CREATE OR REPLACE FUNCTION shard_placement_rebalance_array(
worker_node_list json[],
shard_placement_list json[],
threshold float4 DEFAULT 0,
max_shard_moves int DEFAULT 1000000,
drain_only bool DEFAULT false,
improvement_threshold float4 DEFAULT 0.5
)
RETURNS json[]
AS 'citus'
LANGUAGE C STRICT VOLATILE;
-- we are simulating the following from shard_rebalancer_unit.sql
-- the following steps are all according to this scenario
-- where the third move should be dependent of the first two
-- because the third move's target is the source of the first two
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "hostname1", "disallowed_shards": "1,2,3,5,6"}',
'{"node_name": "hostname2", "disallowed_shards": "4"}',
'{"node_name": "hostname3", "disallowed_shards": "4"}'
]::json[],
ARRAY['{"shardid":1, "nodename":"hostname1"}',
'{"shardid":2, "nodename":"hostname1"}',
'{"shardid":3, "nodename":"hostname2"}',
'{"shardid":4, "nodename":"hostname2"}',
'{"shardid":5, "nodename":"hostname3"}',
'{"shardid":6, "nodename":"hostname3"}'
]::json[]
));
-- manually balance the cluster such that we have
-- a balanced cluster like above with 1,2,3,4,5,6 and hostname1/2/3
-- shardid 85674051 (1) nodeid 61 (hostname1)
-- shardid 85674052 (2) nodeid 61 (hostname1)
-- shardid 85674053 (3) nodeid 62 (hostname2)
-- shardid 85674054 (4) nodeid 62 (hostname2)
-- shardid 85674055 (5) nodeid 63 (hostname3)
-- shardid 85674056 (6) nodeid 63 (hostname3)
SELECT pg_catalog.citus_move_shard_placement(85674053,61,62,'auto');
SELECT pg_catalog.citus_move_shard_placement(85674054,61,62,'auto');
SELECT pg_catalog.citus_move_shard_placement(85674055,61,63,'auto');
SELECT pg_catalog.citus_move_shard_placement(85674056,61,63,'auto');
-- now create another rebalance strategy in order to simulate moves
-- which use as target a node that has been previously used as source
CREATE OR REPLACE FUNCTION test_shard_allowed_on_node(shardid bigint, nodeid int)
RETURNS boolean AS
$$
-- analogous to '{"node_name": "hostname1", "disallowed_shards": "1,2,3,5,6"}'
select case when (shardid != 85674054 and nodeid = 61)
then false
-- analogous to '{"node_name": "hostname2", "disallowed_shards": "4"}'
-- AND '{"node_name": "hostname2", "disallowed_shards": "4"}'
when (shardid = 85674054 and nodeid != 61)
then false
else true
end;
$$ LANGUAGE sql;
-- insert the new test rebalance strategy
INSERT INTO
pg_catalog.pg_dist_rebalance_strategy(
name,
default_strategy,
shard_cost_function,
node_capacity_function,
shard_allowed_on_node_function,
default_threshold,
minimum_threshold,
improvement_threshold
) VALUES (
'test_source_then_target',
false,
'citus_shard_cost_1',
'citus_node_capacity_1',
'background_rebalance_parallel.test_shard_allowed_on_node',
0,
0,
0
);
SELECT * FROM get_rebalance_table_shards_plan(rebalance_strategy := 'test_source_then_target');
SELECT citus_rebalance_start AS job_id from citus_rebalance_start(rebalance_strategy := 'test_source_then_target') \gset
-- check that the third move is blocked and depends on the first two
SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
SELECT citus_rebalance_stop();
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_source_then_target';
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
SELECT public.wait_for_resource_cleanup();
select citus_remove_node('localhost', :worker_3_port);
-- keep the rest of the tests inact that depends node/group ids
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls;

View File

@ -3,6 +3,7 @@ SET search_path TO background_task_queue_monitor;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 3536400;
SET client_min_messages TO ERROR;
-- reset sequence values
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 1450000;
@ -279,11 +280,106 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY job_id, task_id; -- show that task is cancelled
-- TEST11
-- verify that we do not allow parallel task executors involving a particular node
-- more than citus.max_background_task_executors_per_node
-- verify that we can change citus.max_background_task_executors_per_node on the fly
-- tests are done with dummy node ids
-- citus_task_wait calls are used to ensure consistent pg_dist_background_task query
-- output i.e. to avoid flakiness
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify changing max background task executors per node on the fly') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(2); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(2); $job$, ARRAY [3, 4]) RETURNING task_id AS task_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id3 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [1, 3]) RETURNING task_id AS task_id4 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(4); $job$, ARRAY [2, 4]) RETURNING task_id AS task_id5 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(7); $job$, ARRAY [1, 2]) RETURNING task_id AS task_id6 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(6); $job$, ARRAY [1, 3]) RETURNING task_id AS task_id7 \gset
INSERT INTO pg_dist_background_task (job_id, command, nodes_involved) VALUES (:job_id1, $job$ SELECT pg_sleep(6); $job$, ARRAY [1, 4]) RETURNING task_id AS task_id8 \gset
COMMIT;
SELECT citus_task_wait(:task_id1, desired_status => 'running');
SELECT citus_task_wait(:task_id2, desired_status => 'running');
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id; -- show that at most 1 task per node is running
SELECT citus_task_wait(:task_id1, desired_status => 'done');
SELECT citus_task_wait(:task_id2, desired_status => 'done');
-- increase max_background_task_executors_per_node on the fly
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2;
SELECT pg_reload_conf();
SELECT citus_task_wait(:task_id3, desired_status => 'running');
SELECT citus_task_wait(:task_id4, desired_status => 'running');
SELECT citus_task_wait(:task_id5, desired_status => 'running');
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id; -- show that at most 2 tasks per node are running
-- increase to 3 max_background_task_executors_per_node on the fly
SELECT citus_task_wait(:task_id3, desired_status => 'done');
SELECT citus_task_wait(:task_id4, desired_status => 'done');
SELECT citus_task_wait(:task_id5, desired_status => 'done');
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 3;
SELECT pg_reload_conf();
SELECT citus_task_wait(:task_id6, desired_status => 'running');
SELECT citus_task_wait(:task_id7, desired_status => 'running');
SELECT citus_task_wait(:task_id8, desired_status => 'running');
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id; -- show that at most 3 tasks per node are running
ALTER SYSTEM RESET citus.max_background_task_executors_per_node;
SELECT pg_reload_conf();
-- if pg_cancel_backend is called on one of the running task PIDs
-- task doesn't restart because it's not allowed anymore by the limit.
-- node with id 1 can be used only once, unless there are previously running tasks
SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset
SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process
-- task goes to only runnable state, not running anymore.
SELECT citus_task_wait(:task_id6, desired_status => 'runnable');
-- show that cancelled task hasn't restarted because limit doesn't allow it
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id;
SELECT citus_task_wait(:task_id7, desired_status => 'done');
SELECT citus_task_wait(:task_id8, desired_status => 'done');
SELECT citus_task_wait(:task_id6, desired_status => 'running');
-- show that the 6th task has restarted only after both 6 and 7 are done
-- since we have a limit of 1 background task executor per node with id 1
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id;
SELECT citus_job_cancel(:job_id1);
SELECT citus_job_wait(:job_id1);
ALTER SYSTEM RESET citus.max_background_task_executors_per_node;
SELECT pg_reload_conf();
SET client_min_messages TO WARNING;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
DROP SCHEMA background_task_queue_monitor CASCADE;
RESET client_min_messages;
ALTER SYSTEM RESET citus.background_task_queue_interval;
ALTER SYSTEM RESET citus.max_background_task_executors;

View File

@ -92,6 +92,10 @@ SELECT sleep_until_next_period();
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
SELECT sleep_until_next_period();
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
\c - - - :master_port
SET search_path TO citus_stat_tenants;

View File

@ -42,7 +42,12 @@ INSERT INTO loc1 SELECT i FROM generate_series(1,100) i;
CREATE TABLE loc2 (id int REFERENCES loc1(id));
INSERT INTO loc2 SELECT i FROM generate_series(1,100) i;
-- citus_set_coordinator_host with wrong port
SELECT citus_set_coordinator_host('localhost', 9999);
-- citus_set_coordinator_host with correct port
SELECT citus_set_coordinator_host('localhost', :master_port);
-- show coordinator port is correct on all workers
SELECT * FROM run_command_on_workers($$SELECT row(nodename,nodeport) FROM pg_dist_node WHERE groupid = 0$$);
SELECT citus_add_local_table_to_metadata('loc1', cascade_via_foreign_keys => true);
-- Create partitioned distributed table

View File

@ -591,6 +591,16 @@ SELECT * FROM multi_extension.print_extension_changes();
ALTER EXTENSION citus UPDATE TO '11.3-1';
SELECT * FROM multi_extension.print_extension_changes();
-- Test downgrade to 11.3-1 from 12.0-1
ALTER EXTENSION citus UPDATE TO '12.0-1';
ALTER EXTENSION citus UPDATE TO '11.3-1';
-- Should be empty result since upgrade+downgrade should be a no-op
SELECT * FROM multi_extension.print_extension_changes();
-- Snapshot of state at 12.0-1
ALTER EXTENSION citus UPDATE TO '12.0-1';
SELECT * FROM multi_extension.print_extension_changes();
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -10,5 +10,6 @@ FROM pg_attribute
WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass,
'pg_dist_rebalance_strategy'::regclass,
'pg_dist_partition'::regclass,
'pg_dist_object'::regclass)
'pg_dist_object'::regclass,
'pg_dist_background_task'::regclass)
ORDER BY attrelid, attname;