Compare commits

...

12 Commits

Author SHA1 Message Date
Hanefi Onaldi fcd3b6c12f Changelog entries for 11.3.0 (#6856)
In this release, I tried something different. I experimented with adding
the PR number and title to the changelog right before each changelog
entry. This way, it is easier to track where a particular changelog
entry comes from. After reviews are over, I plan to remove those lines
with PR numbers and titles.

I went through all the PRs that are merged after 11.2.0 release and came
up with a list of PRs that may need help with changelog entries. You can
see details on PRs grouped in several sections below.

The following PRs below do not have a changelog entry. If you think that
this is a mistake, please share it in this PR along with a suggestion on
what the changelog item should be.

PR #6846 : fix 3 flaky tests in failure schedule
PR #6844 : Add CPU usage to citus_stat_tenants
PR #6833 : Fix citus_stat_tenants period updating bug
PR #6787 : Add more tests for ddl coverage
PR #6842 : Add build-cdc-* temporary directories to .gitignore
PR #6841 : Add build-cdc-* temporary directories to .gitignore
PR #6840 : Bump Citus to 12.0devel
PR #6824 : Fixes flakiness in multi_metadata_sync test
PR #6811 : Backport identity column improvements to v11.2
PR #6830 : In run_test.py actually return worker_count
PR #6825 : Fixes flakiness in multi_cluster_management test
PR #6816 : Refactor run_test.py
PR #6817 : Explicitly disallow local rels when inserting into dist table
PR #6821 : Rename citus stats tenants
PR #6822 : Add some more tests for initial sql support
PR #6819 : Fix flakyness in
citus_split_shard_by_split_points_deferred_drop
PR #6814 : Make python-regress based tests runnable with run_test.py
PR #6813 : Fix flaky multi_mx_schema_support test
PR #6720 : Convert columnar tap tests to pytest
PR #6812 : Revoke statistics permissions from public and grant them to
pg_monitor
PR #6769 : Citus stats tenants guc
PR #6807 : Fix the incorrect (constant) value passed to pointer-to-bool
parameter, pass a NULL as the value is not used
PR #6797 : Attribute local queries and cached plans on local execution
PR #6796 : Parse the annotation string correctly
PR #6762 : Add logs to citus_stats_tenants
PR #6773 : Add initial sql support for distributed tables that don't
have a shard key
PR #6792 : Disentangle MERGE planning code from the modify-planning code
path
PR #6761 : Citus stats tenants collector view
PR #6791 : Make 8 more tests runnable multiple times via run_test.py
PR #6786 : Refactor some of the planning code to accommodate a new
planning path for MERGE SQL
PR #6789 : Rename AllRelations.. functions to AllDistributedRelations..
PR #6788 : Actually skip arbitrary_configs_router & nested_execution for
AllNullDistKeyDefaultConfig
PR #6783 : Add a config for arbitrary config tests where all the tables
are null-shard-key tables
PR #6784 : Fix attach partition: citus local to null distributed
PR #6782 : Add an arbitrary config test heavily based on
multi_router_planner_fast_path.sql
PR #6781 : Decide what to do with router planner error at one place
PR #6778 : Support partitioning for dist tables with null dist keys
PR #6766 : fix pip lock file
PR #6764 : Make workerCount configurable for regression tests
PR #6745 : Add support for creating distributed tables with a null shard
key
PR #6696 : This implements MERGE phase-III
PR #6767 : Add pytest depedencies to Pipfile
PR #6760 : Decide core distribution params in CreateCitusTable
PR #6759 : Add multi_create_fdw into minimal_schedule
PR #6743 : Replace CITUS_TABLE_WITH_NO_DIST_KEY checks with
HasDistributionKey()
PR #6751 : Stabilize single_node.sql and others that report illegal node
removal
PR #6742 : Refactor CreateDistributedTable()
PR #6747 : Remove unused lock functions
PR #6744 : Fix multiple output version arbitrary config tests
PR #6741 : Stabilize single node tests
PR #6740 : Fix string eval bug in migration files check
PR #6736 : Make run_test.py and create_test.py importable without errors
PR #6734 : Don't blanket ignore flake8 E402 error
PR #6737 : Fixes bookworm packaging pipeline problem
PR #6735 : Fix run_test.py on python 3.9
PR #6733 : MERGE: In deparser, add missing check for RETURNING clause.
PR #6714 : Remove auto_explain workaround in citus explain hook for
ALTER TABLE
PR #6719 : Fix flaky test
PR #6718 : Add more powerfull dependency tracking to run_test.py
PR #6710 : Install non-vulnerable cryptography package
PR #6711 : Support compilation and run tests on latest PG versions
PR #6700 : Add auto-formatting and linting to our python code
PR #6707 : Allow multi_insert_select to run repeatably
PR #6708 : Fix flakyness in failure_create_distributed_table_non_empty
PR #6698 : Miscellaneous cleanup
PR #6704 : Update README for 11.2
PR #6703 : Fix dubious ownership error from git
PR #6690 : Bump Citus to 11.3devel

The following PRs have changelog entries that are too long to fit in a
single line. I'd expect authors to supply at changelog entries in
`DESCRIPTION:` lines that are at most 78 characters. If you want to
supply multi-line changelog items, you can have multiple lines that
start with `DESCRIPTION:` instead.

PR #6837 : fixes update propagation bug when
`citus_set_coordinator_host` is called more than once
PR #6738 :  Identity column implementation refactorings
PR #6756 : Schedule parallel shard moves in background rebalancer by
removing task dependencies between shard moves across colocation groups.
PR #6793 : Add a GUC to disallow planning the queries that reference
non-colocated tables via router planner
PR #6726 : fix memory leak during altering distributed table with a lot
of partition and shards
PR #6722 : fix memory leak during distribution of a table with a lot of
partitions
PR #6693 : prevent memory leak during ConvertTable with a lot of
partitions

The following PR had an empty `DESCRIPTION:` line. This generates an
empty changelog line that needs to be removed manually. Please either
provide a short entry, or remove `DESCRIPTION:` line completely.

PR #6810 : Make CDC decoder an independent extension
PR #6827 : Makefile changes to build CDC in builddir for pgoutput and
wal2json.

---------

Co-authored-by: Onur Tirtir <onurcantirtir@gmail.com>
(cherry picked from commit 934430003e)
2023-05-02 12:39:15 +03:00
Hanefi Onaldi 6d833a90e5 Bump columnar to 11.3
(cherry picked from commit eca70b29a67ed0b004c820bcb9f716ee2b6013eb)
2023-05-02 12:39:15 +03:00
Ahmet Gedemenli 12c27ace2f Ignore nodes not allowed for shards, when planning rebalance steps (#6887)
We are handling colocation groups with shard group count less than the
worker node count, using a method different than the usual rebalancer.
See #6739
While making the decision of using this method or not, we should've
ignored the nodes that are marked `shouldhaveshards = false`. This PR
excludes those nodes when making the decision.

Adds a test such that:
 coordinator: []
 worker 1: [1_1, 1_2]
 worker 2: [2_1, 2_2]
(rebalance)
 coordinator: []
 worker 1: [1_1, 2_1]
 worker 2: [1_2, 2_2]

If we take the coordinator into account, the rebalancer considers the
first state as balanced and does nothing (because shard_count <
worker_count)
But with this pr, we ignore the coordinator because it's
shouldhaveshards = false
So the rebalancer distributes each colocation group to both workers

Also, fixes an unrelated flaky test in the same file

(cherry picked from commit 59ccf364df)
2023-05-01 12:55:08 +02:00
aykut-bozkurt 262c335860 break sequence dependency during table creation (#6889)
We need to break sequence dependency for a table while creating the
table during non-transactional metadata sync to ensure idempotency of
the creation of the table.

**Problem:**
When we send `SELECT
pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text)
FROM pg_dist_partition` to workers during the non-transactional sync,
table might not be in `pg_dist_partition` at worker, and sequence
dependency is not broken at the worker.

**Solution:**
We break sequence dependency via `SELECT
pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text)`
for each table while creating it at the workers. It is safe to send
since the udf is a no-op when there is no sequence dependency.

DESCRIPTION: Fixes a bug related to sequence idempotency at
non-transactional sync.

Fixes https://github.com/citusdata/citus/issues/6888.

(cherry picked from commit 8cb69cfd13)
2023-04-28 15:12:02 +03:00
Emel Şimşek 7b98fbb05e When creating a HTAB we need to use HASH_COMPARE flag in order to set a user defined comparison function. (#6845)
DESCRIPTION: Fixes memory errors, caught by valgrind, of type
"conditional jump or move depends on uninitialized value"

When running Citus tests under Postgres with valgrind, the test cases
calling into `NonBlockingShardSplit` function produce valgrind errors of
type "conditional jump or move depends on uninitialized value".

The issue is caused by creating a HTAB in a wrong way. HASH_COMPARE flag
should have been used when creating a HTAB with user defined comparison
function. In the absence of HASH_COMPARE flag, HTAB falls back into
built-in string comparison function. However, valgrind somehow discovers
that the match function is not assigned to the user defined function as
intended.

Fixes #6835

(cherry picked from commit e7a25d82c9)
2023-04-18 14:50:09 +03:00
Gokhan Gulbiz 58155c5779
Backport to 11.3 - Ensure partitionKeyValue and colocationId are set for proper tenant stats gathering (#6834) (#6862)
This PR updates the tenant stats implementation to set partitionKeyValue
and colocationId in ExecuteLocalTaskListExtended, in addition to
LocallyExecuteTaskPlan. This ensures that tenant stats can be properly
gathered regardless of the code path taken. The changes were initially
made while testing stored procedure calls for tenant stats.

(cherry picked from commit 8782ea1582)
2023-04-17 15:24:29 +03:00
aykut-bozkurt 17149b92b2 fix 3 flaky tests in failure schedule (#6846)
Fixed 3 flaky tests in failure tests which caused flakiness in other
tests due to changed node and group sequence ids during node
addition-removal.

(cherry picked from commit 3286ec59e9)
2023-04-13 13:19:35 +03:00
aykut-bozkurt 1a9066c34a fixes update propagation bug when `citus_set_coordinator_host` is called more than once (#6837)
DESCRIPTION: Fixes update propagation bug when
`citus_set_coordinator_host` is called more than once.

Fixes https://github.com/citusdata/citus/issues/6731.

(cherry picked from commit a20f7e1a55)
2023-04-13 13:18:28 +03:00
Halil Ozan Akgül e14f4c3dee Add CPU usage to citus_stat_tenants (#6844)
This PR adds CPU usage to `citus_stat_tenants` monitor.
CPU usage is tracked in periods, similar to query counts.

(cherry picked from commit 9ba70696f7)
2023-04-12 17:46:00 +03:00
Halil Ozan Akgül 5525676aad Fix citus_stat_tenants period updating bug (#6833)
Fixes the bug that causes updating the citus_stat_tenants periods
incorrectly.

`TimestampDifferenceExceeds` expects the difference in milliseconds but
it was microseconds, this is fixed.
`tenantStats->lastQueryTime` was updated during monitoring too, now it's
updated only when there are tenant queries.

(cherry picked from commit 8b50e95dc8)
2023-04-12 17:45:44 +03:00
rajeshkt78 234df62106
Add build-cdc-* temporary directories to .gitignore (#6842)
The CDC decoder buillds different versions of CDC base decoders during
the build. Since the source files are copied to the temporay
directories, they come in git status for files to be added. So these
directories and a temporary CDC TAP test directory(tmpcheck) are added
to .gitignore file.
2023-04-11 08:54:43 +05:30
Onur Tirtir 5dd08835df Bump citus version to 11.3.0 2023-04-10 11:16:58 +03:00
35 changed files with 1363 additions and 141 deletions

2
.gitignore vendored
View File

@ -38,6 +38,8 @@ lib*.pc
/Makefile.global
/src/Makefile.custom
/compile_commands.json
/src/backend/distributed/cdc/build-cdc-*/*
/src/test/cdc/tmp_check/*
# temporary files vim creates
*.swp

View File

@ -1,3 +1,200 @@
### citus v11.3.0 (May 2, 2023) ###
* Introduces CDC implementation for Citus using logical replication
(#6623, #6810, #6827)
* Adds support for `MERGE` command on co-located distributed tables joined on
distribution column (#6696, #6733)
* Adds the view `citus_stats_tenants` that monitor statistics on tenant usages
(#6725)
* Adds the GUC `citus.max_background_task_executors_per_node` to control number
of background task executors involving a node (#6771)
* Allows parallel shard moves in background rebalancer (#6756)
* Introduces the GUC `citus.metadata_sync_mode` that introduces nontransactional
mode for metadata sync (#6728, #6889)
* Propagates CREATE/ALTER/DROP PUBLICATION statements for distributed tables
(#6776)
* Adds the GUC `citus.enable_non_colocated_router_query_pushdown` to ensure
generating a consistent distributed plan for the queries that reference
non-colocated distributed tables when set to "false" (#6793)
* Checks if all moves are able to be done via logical replication for rebalancer
(#6754)
* Correctly reports shard size in `citus_shards` view (#6748)
* Fixes a bug in shard copy operations (#6721)
* Fixes a bug that prevents enforcing identity column restrictions on worker
nodes (#6738)
* Fixes a bug with `INSERT .. SELECT` queries with identity columns (#6802)
* Fixes an issue that caused some queries with custom aggregates to fail (#6805)
* Fixes an issue when `citus_set_coordinator_host` is called more than once
(#6837)
* Fixes an uninitialized memory access in shard split API (#6845)
* Fixes memory leak and max allocation block errors during metadata syncing
(#6728)
* Fixes memory leak in `undistribute_table` (#6693)
* Fixes memory leak in `alter_distributed_table` (#6726)
* Fixes memory leak in `create_distributed_table` (#6722)
* Fixes memory leak issue with query results that returns single row (#6724)
* Improves rebalancer when shard groups have placement count less than worker
count (#6739)
* Makes sure to stop maintenance daemon when dropping a database even without
Citus extension (#6688)
* Prevents using `alter_distributed_table` and `undistribute_table` UDFs when a
table has identity columns (#6738)
* Prevents using identity columns on data types other than `bigint` on
distributed tables (#6738)
### citus v11.2.1 (April 20, 2023) ###
* Correctly reports shard size in `citus_shards` view (#6748)
* Fixes a bug in shard copy operations (#6721)
* Fixes a bug with `INSERT .. SELECT` queries with identity columns (#6802)
* Fixes an uninitialized memory access in shard split API (#6845)
* Fixes compilation for PG13.10 and PG14.7 (#6711)
* Fixes memory leak in `alter_distributed_table` (#6726)
* Fixes memory leak issue with query results that returns single row (#6724)
* Prevents using `alter_distributed_table` and `undistribute_table` UDFs when a
table has identity columns (#6738)
* Prevents using identity columns on data types other than `bigint` on
distributed tables (#6738)
### citus v11.1.6 (April 20, 2023) ###
* Correctly reports shard size in `citus_shards` view (#6748)
* Fixes a bug in shard copy operations (#6721)
* Fixes a bug that breaks pg upgrades if the user has a columnar table (#6624)
* Fixes a bug that causes background rebalancer to fail when a reference table
doesn't have a primary key (#6682)
* Fixes a regression in allowed foreign keys on distributed tables (#6550)
* Fixes a use-after-free bug in connection management (#6685)
* Fixes an unexpected foreign table error by disallowing to drop the
`table_name` option (#6669)
* Fixes an uninitialized memory access in shard split API (#6845)
* Fixes compilation for PG13.10 and PG14.7 (#6711)
* Fixes crash that happens when trying to replicate a reference table that is
actually dropped (#6595)
* Fixes memory leak issue with query results that returns single row (#6724)
* Fixes the modifiers for subscription and role creation (#6603)
* Makes sure to quote all identifiers used for logical replication to prevent
potential issues (#6604)
* Makes sure to skip foreign key validations at the end of shard moves (#6640)
### citus v11.0.8 (April 20, 2023) ###
* Correctly reports shard size in `citus_shards` view (#6748)
* Fixes a bug that breaks pg upgrades if the user has a columnar table (#6624)
* Fixes an unexpected foreign table error by disallowing to drop the
`table_name` option (#6669)
* Fixes compilation warning on PG13 + OpenSSL 3.0 (#6038, #6502)
* Fixes crash that happens when trying to replicate a reference table that is
actually dropped (#6595)
* Fixes memory leak issue with query results that returns single row (#6724)
* Fixes the modifiers for subscription and role creation (#6603)
* Fixes two potential dangling pointer issues (#6504, #6507)
* Makes sure to quote all identifiers used for logical replication to prevent
potential issues (#6604)
### citus v10.2.9 (April 20, 2023) ###
* Correctly reports shard size in `citus_shards` view (#6748)
* Fixes a bug in `ALTER EXTENSION citus UPDATE` (#6383)
* Fixes a bug that breaks pg upgrades if the user has a columnar table (#6624)
* Fixes a bug that prevents retaining columnar table options after a
table-rewrite (#6337)
* Fixes memory leak issue with query results that returns single row (#6724)
* Raises memory limits in columnar from 256MB to 1GB for reads and writes
(#6419)
### citus v10.1.6 (April 20, 2023) ###
* Fixes a crash that occurs when the aggregate that cannot be pushed-down
returns empty result from a worker (#5679)
* Fixes columnar freezing/wraparound bug (#5962)
* Fixes memory leak issue with query results that returns single row (#6724)
* Prevents alter table functions from dropping extensions (#5974)
### citus v10.0.8 (April 20, 2023) ###
* Fixes a bug that could break `DROP SCHEMA/EXTENSON` commands when there is a
columnar table (#5458)
* Fixes a crash that occurs when the aggregate that cannot be pushed-down
returns empty result from a worker (#5679)
* Fixes columnar freezing/wraparound bug (#5962)
* Fixes memory leak issue with query results that returns single row (#6724)
* Prevents alter table functions from dropping extensions (#5974)
### citus v9.5.12 (April 20, 2023) ###
* Fixes a crash that occurs when the aggregate that cannot be pushed-down
returns empty result from a worker (#5679)
* Fixes memory leak issue with query results that returns single row (#6724)
* Prevents alter table functions from dropping extensions (#5974)
### citus v11.2.0 (January 30, 2023) ###
* Adds support for outer joins with reference tables / complex subquery-CTEs

18
configure vendored
View File

@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.69 for Citus 11.3devel.
# Generated by GNU Autoconf 2.69 for Citus 11.3.0.
#
#
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
@ -579,8 +579,8 @@ MAKEFLAGS=
# Identity of this package.
PACKAGE_NAME='Citus'
PACKAGE_TARNAME='citus'
PACKAGE_VERSION='11.3devel'
PACKAGE_STRING='Citus 11.3devel'
PACKAGE_VERSION='11.3.0'
PACKAGE_STRING='Citus 11.3.0'
PACKAGE_BUGREPORT=''
PACKAGE_URL=''
@ -1262,7 +1262,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
\`configure' configures Citus 11.3devel to adapt to many kinds of systems.
\`configure' configures Citus 11.3.0 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]...
@ -1324,7 +1324,7 @@ fi
if test -n "$ac_init_help"; then
case $ac_init_help in
short | recursive ) echo "Configuration of Citus 11.3devel:";;
short | recursive ) echo "Configuration of Citus 11.3.0:";;
esac
cat <<\_ACEOF
@ -1429,7 +1429,7 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
Citus configure 11.3devel
Citus configure 11.3.0
generated by GNU Autoconf 2.69
Copyright (C) 2012 Free Software Foundation, Inc.
@ -1912,7 +1912,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.
It was created by Citus $as_me 11.3devel, which was
It was created by Citus $as_me 11.3.0, which was
generated by GNU Autoconf 2.69. Invocation command line was
$ $0 $@
@ -5393,7 +5393,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
This file was extended by Citus $as_me 11.3devel, which was
This file was extended by Citus $as_me 11.3.0, which was
generated by GNU Autoconf 2.69. Invocation command line was
CONFIG_FILES = $CONFIG_FILES
@ -5455,7 +5455,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\
Citus config.status 11.3devel
Citus config.status 11.3.0
configured by $0, generated by GNU Autoconf 2.69,
with options \\"\$ac_cs_config\\"

View File

@ -5,7 +5,7 @@
# everyone needing autoconf installed, the resulting files are checked
# into the SCM.
AC_INIT([Citus], [11.3devel])
AC_INIT([Citus], [11.3.0])
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
# we'll need sed and awk for some of the version commands

View File

@ -1,6 +1,6 @@
# Columnar extension
comment = 'Citus Columnar extension'
default_version = '11.2-1'
default_version = '11.3-1'
module_pathname = '$libdir/citus_columnar'
relocatable = false
schema = pg_catalog

View File

@ -0,0 +1 @@
-- citus_columnar--11.2-1--11.3-1

View File

@ -0,0 +1 @@
-- citus_columnar--11.3-1--11.2-1

View File

@ -1710,20 +1710,13 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
}
else if (ShouldSyncTableMetadata(sourceId))
{
char *qualifiedTableName = quote_qualified_identifier(schemaName, sourceName);
/*
* We are converting a citus local table to a distributed/reference table,
* so we should prevent dropping the sequence on the table. Otherwise, we'd
* lose track of the previous changes in the sequence.
*/
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.worker_drop_sequence_dependency(%s);",
quote_literal_cstr(qualifiedTableName));
SendCommandToWorkersWithMetadata(command->data);
char *command = WorkerDropSequenceDependencyCommand(sourceId);
SendCommandToWorkersWithMetadata(command);
}
}

View File

@ -393,9 +393,17 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
tableDDLCommand));
}
/* we need to drop table, if exists, first to make table creation idempotent */
/*
* We need to drop table, if exists, first to make table creation
* idempotent. Before dropping the table, we should also break
* dependencies with sequences since `drop cascade table` would also
* drop depended sequences. This is safe as we still record dependency
* with the sequence during table creation.
*/
commandList = lcons(DropTableIfExistsCommand(relationId),
commandList);
commandList = lcons(WorkerDropSequenceDependencyCommand(relationId),
commandList);
}
return commandList;

View File

@ -129,6 +129,8 @@ static void LogLocalCommand(Task *task);
static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings,
TupleDestination *tupleDest,
Task *task);
static void SetColocationIdAndPartitionKeyValueForTasks(List *taskList,
Job *distributedPlan);
static void LocallyExecuteUtilityTask(Task *task);
static void ExecuteUdfTaskQuery(Query *localUdfCommandQuery);
static void EnsureTransitionPossible(LocalExecutionStatus from,
@ -228,6 +230,17 @@ ExecuteLocalTaskListExtended(List *taskList,
EnsureTaskExecutionAllowed(isRemote);
}
/*
* If workerJob has a partitionKeyValue, we need to set the colocation id
* and partition key value for each task before we start executing them
* because tenant stats are collected based on these values of a task.
*/
if (distributedPlan != NULL && distributedPlan->workerJob != NULL && taskList != NIL)
{
SetJobColocationId(distributedPlan->workerJob);
SetColocationIdAndPartitionKeyValueForTasks(taskList, distributedPlan->workerJob);
}
/*
* Use a new memory context that gets reset after every task to free
* the deparsed query string and query plan.
@ -367,6 +380,26 @@ ExecuteLocalTaskListExtended(List *taskList,
}
/*
* SetColocationIdAndPartitionKeyValueForTasks sets colocationId and partitionKeyValue
* for the tasks in the taskList if workerJob has a colocationId and partitionKeyValue.
*/
static void
SetColocationIdAndPartitionKeyValueForTasks(List *taskList, Job *workerJob)
{
if (workerJob->colocationId != 0 &&
workerJob->partitionKeyValue != NULL)
{
Task *task = NULL;
foreach_ptr(task, taskList)
{
task->colocationId = workerJob->colocationId;
task->partitionKeyValue = workerJob->partitionKeyValue;
}
}
}
/*
* LocallyPlanAndExecuteMultipleQueries plans and executes the given query strings
* one by one.

View File

@ -686,7 +686,7 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode)
bool singleTransaction = true;
List *dropMetadataCommandList = DetachPartitionCommandList();
dropMetadataCommandList = lappend(dropMetadataCommandList,
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
BREAK_ALL_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
dropMetadataCommandList = lappend(dropMetadataCommandList,
WorkerDropAllShellTablesCommand(singleTransaction));
dropMetadataCommandList = list_concat(dropMetadataCommandList,
@ -4235,6 +4235,22 @@ WorkerDropAllShellTablesCommand(bool singleTransaction)
}
/*
* WorkerDropSequenceDependencyCommand returns command to drop sequence dependencies for
* given table.
*/
char *
WorkerDropSequenceDependencyCommand(Oid relationId)
{
char *qualifiedTableName = generate_qualified_relation_name(relationId);
StringInfo breakSequenceDepCommand = makeStringInfo();
appendStringInfo(breakSequenceDepCommand,
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND,
quote_literal_cstr(qualifiedTableName));
return breakSequenceDepCommand->data;
}
/*
* PropagateNodeWideObjectsCommandList is called during node activation to
* propagate any object that should be propagated for every node. These are
@ -4352,8 +4368,8 @@ SendNodeWideObjectsSyncCommands(MetadataSyncContext *context)
void
SendShellTableDeletionCommands(MetadataSyncContext *context)
{
/* break all sequence deps for citus tables and remove all shell tables */
char *breakSeqDepsCommand = BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND;
/* break all sequence deps for citus tables */
char *breakSeqDepsCommand = BREAK_ALL_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND;
SendOrCollectCommandListToActivatedNodes(context, list_make1(breakSeqDepsCommand));
/* remove shell tables */

View File

@ -108,7 +108,8 @@ static void BlockDistributedQueriesOnMetadataNodes(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
static bool NodeIsLocal(WorkerNode *worker);
static void SetLockTimeoutLocally(int32 lock_cooldown);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort,
bool localOnly);
static bool UnsetMetadataSyncedForAllWorkers(void);
static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode,
int columnIndex,
@ -231,8 +232,8 @@ citus_set_coordinator_host(PG_FUNCTION_ARGS)
* do not need to worry about concurrent changes (e.g. deletion) and
* can proceed to update immediately.
*/
UpdateNodeLocation(coordinatorNode->nodeId, nodeNameString, nodePort);
bool localOnly = false;
UpdateNodeLocation(coordinatorNode->nodeId, nodeNameString, nodePort, localOnly);
/* clear cached plans that have the old host/port */
ResetPlanCache();
@ -1290,7 +1291,8 @@ citus_update_node(PG_FUNCTION_ARGS)
*/
ResetPlanCache();
UpdateNodeLocation(nodeId, newNodeNameString, newNodePort);
bool localOnly = true;
UpdateNodeLocation(nodeId, newNodeNameString, newNodePort, localOnly);
/* we should be able to find the new node from the metadata */
workerNode = FindWorkerNodeAnyCluster(newNodeNameString, newNodePort);
@ -1352,7 +1354,7 @@ SetLockTimeoutLocally(int32 lockCooldown)
static void
UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort)
UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool localOnly)
{
const bool indexOK = true;
@ -1396,6 +1398,20 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort)
CommandCounterIncrement();
if (!localOnly && EnableMetadataSync)
{
WorkerNode *updatedNode = FindWorkerNodeAnyCluster(newNodeName, newNodePort);
Assert(updatedNode->nodeId == nodeId);
/* send the delete command to all primary nodes with metadata */
char *nodeDeleteCommand = NodeDeleteCommand(updatedNode->nodeId);
SendCommandToWorkersWithMetadata(nodeDeleteCommand);
/* send the insert command to all primary nodes with metadata */
char *nodeInsertCommand = NodeListInsertCommand(list_make1(updatedNode));
SendCommandToWorkersWithMetadata(nodeInsertCommand);
}
systable_endscan(scanDescriptor);
table_close(pgDistNode, NoLock);
}

View File

@ -515,6 +515,16 @@ GetRebalanceSteps(RebalanceOptions *options)
/* sort the lists to make the function more deterministic */
List *activeWorkerList = SortedActiveWorkers();
int shardAllowedNodeCount = 0;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, activeWorkerList)
{
if (workerNode->shouldHaveShards)
{
shardAllowedNodeCount++;
}
}
List *activeShardPlacementListList = NIL;
List *unbalancedShards = NIL;
@ -532,8 +542,7 @@ GetRebalanceSteps(RebalanceOptions *options)
shardPlacementList, options->workerNode);
}
if (list_length(activeShardPlacementListForRelation) >= list_length(
activeWorkerList))
if (list_length(activeShardPlacementListForRelation) >= shardAllowedNodeCount)
{
activeShardPlacementListList = lappend(activeShardPlacementListList,
activeShardPlacementListForRelation);

View File

@ -1810,7 +1810,7 @@ CreateWorkerForPlacementSet(List *workersForPlacementList)
/* we don't have value field as it's a set */
info.entrysize = info.keysize;
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
HTAB *workerForPlacementSet = hash_create("worker placement set", 32, &info,
hashFlags);

View File

@ -8,6 +8,8 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants (
OUT read_count_in_last_period INT,
OUT query_count_in_this_period INT,
OUT query_count_in_last_period INT,
OUT cpu_usage_in_this_period DOUBLE PRECISION,
OUT cpu_usage_in_last_period DOUBLE PRECISION,
OUT score BIGINT
)
RETURNS SETOF record
@ -51,6 +53,8 @@ AS (
read_count_in_last_period INT,
query_count_in_this_period INT,
query_count_in_last_period INT,
cpu_usage_in_this_period DOUBLE PRECISION,
cpu_usage_in_last_period DOUBLE PRECISION,
score BIGINT
)
ORDER BY score DESC
@ -66,7 +70,9 @@ SELECT
read_count_in_this_period,
read_count_in_last_period,
query_count_in_this_period,
query_count_in_last_period
query_count_in_last_period,
cpu_usage_in_this_period,
cpu_usage_in_last_period
FROM pg_catalog.citus_stat_tenants(FALSE);
ALTER VIEW citus.citus_stat_tenants SET SCHEMA pg_catalog;

View File

@ -8,6 +8,8 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants (
OUT read_count_in_last_period INT,
OUT query_count_in_this_period INT,
OUT query_count_in_last_period INT,
OUT cpu_usage_in_this_period DOUBLE PRECISION,
OUT cpu_usage_in_last_period DOUBLE PRECISION,
OUT score BIGINT
)
RETURNS SETOF record
@ -51,6 +53,8 @@ AS (
read_count_in_last_period INT,
query_count_in_this_period INT,
query_count_in_last_period INT,
cpu_usage_in_this_period DOUBLE PRECISION,
cpu_usage_in_last_period DOUBLE PRECISION,
score BIGINT
)
ORDER BY score DESC
@ -66,7 +70,9 @@ SELECT
read_count_in_this_period,
read_count_in_last_period,
query_count_in_this_period,
query_count_in_last_period
query_count_in_last_period,
cpu_usage_in_this_period,
cpu_usage_in_last_period
FROM pg_catalog.citus_stat_tenants(FALSE);
ALTER VIEW citus.citus_stat_tenants SET SCHEMA pg_catalog;

View File

@ -6,6 +6,8 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local(
OUT read_count_in_last_period INT,
OUT query_count_in_this_period INT,
OUT query_count_in_last_period INT,
OUT cpu_usage_in_this_period DOUBLE PRECISION,
OUT cpu_usage_in_last_period DOUBLE PRECISION,
OUT score BIGINT)
RETURNS SETOF RECORD
LANGUAGE C
@ -19,7 +21,9 @@ SELECT
read_count_in_this_period,
read_count_in_last_period,
query_count_in_this_period,
query_count_in_last_period
query_count_in_last_period,
cpu_usage_in_this_period,
cpu_usage_in_last_period
FROM pg_catalog.citus_stat_tenants_local()
ORDER BY score DESC;

View File

@ -6,6 +6,8 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local(
OUT read_count_in_last_period INT,
OUT query_count_in_this_period INT,
OUT query_count_in_last_period INT,
OUT cpu_usage_in_this_period DOUBLE PRECISION,
OUT cpu_usage_in_last_period DOUBLE PRECISION,
OUT score BIGINT)
RETURNS SETOF RECORD
LANGUAGE C
@ -19,7 +21,9 @@ SELECT
read_count_in_this_period,
read_count_in_last_period,
query_count_in_this_period,
query_count_in_last_period
query_count_in_last_period,
cpu_usage_in_this_period,
cpu_usage_in_last_period
FROM pg_catalog.citus_stat_tenants_local()
ORDER BY score DESC;

View File

@ -12,13 +12,14 @@
#include "unistd.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/colocation_utils.h"
#include "distributed/distributed_planner.h"
#include "distributed/jsonbutils.h"
#include "distributed/log_utils.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/jsonbutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/multi_executor.h"
#include "distributed/tuplestore.h"
#include "distributed/colocation_utils.h"
#include "distributed/utils/citus_stat_tenants.h"
#include "executor/execdesc.h"
#include "storage/ipc.h"
@ -38,12 +39,14 @@ ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
#define ATTRIBUTE_PREFIX "/*{\"tId\":"
#define ATTRIBUTE_STRING_FORMAT "/*{\"tId\":%s,\"cId\":%d}*/"
#define STAT_TENANTS_COLUMNS 7
#define STAT_TENANTS_COLUMNS 9
#define ONE_QUERY_SCORE 1000000000
static char AttributeToTenant[MAX_TENANT_ATTRIBUTE_LENGTH] = "";
static CmdType AttributeToCommandType = CMD_UNKNOWN;
static int AttributeToColocationGroupId = INVALID_COLOCATION_ID;
static clock_t QueryStartClock = { 0 };
static clock_t QueryEndClock = { 0 };
static const char *SharedMemoryNameForMultiTenantMonitor =
"Shared memory for multi tenant monitor";
@ -56,7 +59,7 @@ static int CompareTenantScore(const void *leftElement, const void *rightElement)
static void UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz queryTime);
static void ReduceScoreIfNecessary(TenantStats *tenantStats, TimestampTz queryTime);
static void EvictTenantsIfNecessary(TimestampTz queryTime);
static void RecordTenantStats(TenantStats *tenantStats);
static void RecordTenantStats(TenantStats *tenantStats, TimestampTz queryTime);
static void CreateMultiTenantMonitor(void);
static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void);
static MultiTenantMonitor * GetMultiTenantMonitor(void);
@ -142,7 +145,9 @@ citus_stat_tenants_local(PG_FUNCTION_ARGS)
tenantStats->writesInThisPeriod);
values[5] = Int32GetDatum(tenantStats->readsInLastPeriod +
tenantStats->writesInLastPeriod);
values[6] = Int64GetDatum(tenantStats->score);
values[6] = Float8GetDatum(tenantStats->cpuUsageInThisPeriod);
values[7] = Float8GetDatum(tenantStats->cpuUsageInLastPeriod);
values[8] = Int64GetDatum(tenantStats->score);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
}
@ -225,6 +230,7 @@ AttributeTask(char *tenantId, int colocationId, CmdType commandType)
strncpy_s(AttributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId,
MAX_TENANT_ATTRIBUTE_LENGTH - 1);
AttributeToCommandType = commandType;
QueryStartClock = clock();
}
@ -316,6 +322,17 @@ AttributeMetricsIfApplicable()
return;
}
/*
* return if we are not in the top level to make sure we are not
* stopping counting time for a sub-level execution
*/
if (ExecutorLevel != 0 || PlannerLevel != 0)
{
return;
}
QueryEndClock = clock();
TimestampTz queryTime = GetCurrentTimestamp();
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
@ -345,7 +362,7 @@ AttributeMetricsIfApplicable()
UpdatePeriodsIfNecessary(tenantStats, queryTime);
ReduceScoreIfNecessary(tenantStats, queryTime);
RecordTenantStats(tenantStats);
RecordTenantStats(tenantStats, queryTime);
LWLockRelease(&tenantStats->lock);
}
@ -372,7 +389,7 @@ AttributeMetricsIfApplicable()
UpdatePeriodsIfNecessary(tenantStats, queryTime);
ReduceScoreIfNecessary(tenantStats, queryTime);
RecordTenantStats(tenantStats);
RecordTenantStats(tenantStats, queryTime);
LWLockRelease(&tenantStats->lock);
}
@ -396,6 +413,7 @@ static void
UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz queryTime)
{
long long int periodInMicroSeconds = StatTenantsPeriod * USECS_PER_SEC;
long long int periodInMilliSeconds = StatTenantsPeriod * 1000;
TimestampTz periodStart = queryTime - (queryTime % periodInMicroSeconds);
/*
@ -410,20 +428,23 @@ UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz queryTime)
tenantStats->readsInLastPeriod = tenantStats->readsInThisPeriod;
tenantStats->readsInThisPeriod = 0;
tenantStats->cpuUsageInLastPeriod = tenantStats->cpuUsageInThisPeriod;
tenantStats->cpuUsageInThisPeriod = 0;
}
/*
* If the last query is more than two periods ago, we clean the last period counts too.
*/
if (TimestampDifferenceExceeds(tenantStats->lastQueryTime, periodStart,
periodInMicroSeconds))
periodInMilliSeconds))
{
tenantStats->writesInLastPeriod = 0;
tenantStats->readsInLastPeriod = 0;
}
tenantStats->lastQueryTime = queryTime;
tenantStats->cpuUsageInLastPeriod = 0;
}
}
@ -503,7 +524,7 @@ EvictTenantsIfNecessary(TimestampTz queryTime)
* RecordTenantStats records the query statistics for the tenant.
*/
static void
RecordTenantStats(TenantStats *tenantStats)
RecordTenantStats(TenantStats *tenantStats, TimestampTz queryTime)
{
if (tenantStats->score < LLONG_MAX - ONE_QUERY_SCORE)
{
@ -524,6 +545,11 @@ RecordTenantStats(TenantStats *tenantStats)
{
tenantStats->writesInThisPeriod++;
}
double queryCpuTime = ((double) (QueryEndClock - QueryStartClock)) / CLOCKS_PER_SEC;
tenantStats->cpuUsageInThisPeriod += queryCpuTime;
tenantStats->lastQueryTime = queryTime;
}

View File

@ -156,6 +156,7 @@ extern void SendOrCollectCommandListToSingleNode(MetadataSyncContext *context,
extern void ActivateNodeList(MetadataSyncContext *context);
extern char * WorkerDropAllShellTablesCommand(bool singleTransaction);
extern char * WorkerDropSequenceDependencyCommand(Oid relationId);
extern void SyncDistributedObjects(MetadataSyncContext *context);
extern void SendNodeWideObjectsSyncCommands(MetadataSyncContext *context);
@ -180,8 +181,10 @@ extern void SendInterTableRelationshipCommands(MetadataSyncContext *context);
#define REMOVE_ALL_CITUS_TABLES_COMMAND \
"SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition"
#define BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \
#define BREAK_ALL_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \
"SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition"
#define BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \
"SELECT pg_catalog.worker_drop_sequence_dependency(%s);"
#define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'"
#define ENABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'on'"

View File

@ -42,6 +42,13 @@ typedef struct TenantStats
int writesInLastPeriod;
int writesInThisPeriod;
/*
* CPU time usage of this tenant in this and last periods.
*/
double cpuUsageInLastPeriod;
double cpuUsageInThisPeriod;
/*
* The latest time this tenant ran a query. This value is used to update the score later.
*/

View File

@ -71,14 +71,17 @@ INSERT INTO dist_tbl VALUES (2, 'abcd');
UPDATE dist_tbl SET b = a + 1 WHERE a = 3;
UPDATE dist_tbl SET b = a + 1 WHERE a = 4;
DELETE FROM dist_tbl WHERE a = 5;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants(true) ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
(cpu_usage_in_this_period>0) AS cpu_is_used_in_this_period, (cpu_usage_in_last_period>0) AS cpu_is_used_in_last_period
FROM citus_stat_tenants(true)
ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period | cpu_is_used_in_this_period | cpu_is_used_in_last_period
---------------------------------------------------------------------
1 | 0 | 0 | 1 | 0
2 | 0 | 0 | 1 | 0
3 | 0 | 0 | 1 | 0
4 | 0 | 0 | 1 | 0
5 | 0 | 0 | 1 | 0
1 | 0 | 0 | 1 | 0 | t | f
2 | 0 | 0 | 1 | 0 | t | f
3 | 0 | 0 | 1 | 0 | t | f
4 | 0 | 0 | 1 | 0 | t | f
5 | 0 | 0 | 1 | 0 | t | f
(5 rows)
SELECT citus_stat_tenants_reset();
@ -241,26 +244,48 @@ SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
INSERT INTO dist_tbl VALUES (5, 'abcd');
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
(cpu_usage_in_this_period>0) AS cpu_is_used_in_this_period, (cpu_usage_in_last_period>0) AS cpu_is_used_in_last_period
FROM citus_stat_tenants_local
ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period | cpu_is_used_in_this_period | cpu_is_used_in_last_period
---------------------------------------------------------------------
1 | 1 | 0 | 1 | 0
5 | 0 | 0 | 1 | 0
1 | 1 | 0 | 1 | 0 | t | f
5 | 0 | 0 | 1 | 0 | t | f
(2 rows)
-- simulate passing the period
SET citus.stat_tenants_period TO 2;
SET citus.stat_tenants_period TO 5;
SELECT sleep_until_next_period();
sleep_until_next_period
---------------------------------------------------------------------
(1 row)
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
(cpu_usage_in_this_period>0) AS cpu_is_used_in_this_period, (cpu_usage_in_last_period>0) AS cpu_is_used_in_last_period
FROM citus_stat_tenants_local
ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period | cpu_is_used_in_this_period | cpu_is_used_in_last_period
---------------------------------------------------------------------
1 | 0 | 1 | 0 | 1
5 | 0 | 0 | 0 | 1
1 | 0 | 1 | 0 | 1 | f | t
5 | 0 | 0 | 0 | 1 | f | t
(2 rows)
SELECT sleep_until_next_period();
sleep_until_next_period
---------------------------------------------------------------------
(1 row)
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
(cpu_usage_in_this_period>0) AS cpu_is_used_in_this_period, (cpu_usage_in_last_period>0) AS cpu_is_used_in_last_period
FROM citus_stat_tenants_local
ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period | cpu_is_used_in_this_period | cpu_is_used_in_last_period
---------------------------------------------------------------------
1 | 0 | 0 | 0 | 0 | f | f
5 | 0 | 0 | 0 | 0 | f | f
(2 rows)
\c - - - :master_port
@ -478,13 +503,17 @@ SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*';
t
(1 row)
DELETE FROM dist_tbl_text WHERE a = '/b*c/de';
DELETE FROM dist_tbl_text WHERE a = '/bcde';
DELETE FROM dist_tbl_text WHERE a = U&'\0061\0308bc';
DELETE FROM dist_tbl_text WHERE a = 'bcde*';
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
/b*c/de | 1 | 0 | 1 | 0
/bcde | 1 | 0 | 1 | 0
äbc | 1 | 0 | 1 | 0
bcde* | 1 | 0 | 1 | 0
/b*c/de | 1 | 0 | 2 | 0
/bcde | 1 | 0 | 2 | 0
äbc | 1 | 0 | 2 | 0
bcde* | 1 | 0 | 2 | 0
(4 rows)
-- test local cached queries & prepared statements
@ -564,10 +593,10 @@ EXECUTE dist_tbl_text_select_plan('bcde*');
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
/b*c/de | 4 | 0 | 4 | 0
/bcde | 4 | 0 | 4 | 0
äbc | 4 | 0 | 4 | 0
bcde* | 4 | 0 | 4 | 0
/b*c/de | 4 | 0 | 5 | 0
/bcde | 4 | 0 | 5 | 0
äbc | 4 | 0 | 5 | 0
bcde* | 4 | 0 | 5 | 0
(4 rows)
\c - - - :master_port
@ -650,10 +679,10 @@ SET search_path TO citus_stat_tenants;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
/b*c/de | 7 | 0 | 7 | 0
/bcde | 7 | 0 | 7 | 0
äbc | 7 | 0 | 7 | 0
bcde* | 7 | 0 | 7 | 0
/b*c/de | 7 | 0 | 8 | 0
/bcde | 7 | 0 | 8 | 0
äbc | 7 | 0 | 8 | 0
bcde* | 7 | 0 | 8 | 0
(4 rows)
\c - - - :master_port
@ -716,5 +745,131 @@ SELECT count(*)>=0 FROM citus_stat_tenants_local();
RESET ROLE;
DROP ROLE stats_non_superuser;
-- test function push down
CREATE OR REPLACE FUNCTION
select_from_dist_tbl_text(p_keyword text)
RETURNS boolean LANGUAGE plpgsql AS $fn$
BEGIN
RETURN(SELECT count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE a = $1);
END;
$fn$;
SELECT create_distributed_function(
'select_from_dist_tbl_text(text)', 'p_keyword', colocate_with => 'dist_tbl_text'
);
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT citus_stat_tenants_reset();
citus_stat_tenants_reset
---------------------------------------------------------------------
(1 row)
SELECT select_from_dist_tbl_text('/b*c/de');
select_from_dist_tbl_text
---------------------------------------------------------------------
t
(1 row)
SELECT select_from_dist_tbl_text('/b*c/de');
select_from_dist_tbl_text
---------------------------------------------------------------------
t
(1 row)
SELECT select_from_dist_tbl_text(U&'\0061\0308bc');
select_from_dist_tbl_text
---------------------------------------------------------------------
t
(1 row)
SELECT select_from_dist_tbl_text(U&'\0061\0308bc');
select_from_dist_tbl_text
---------------------------------------------------------------------
t
(1 row)
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
tenant_attribute | query_count_in_this_period
---------------------------------------------------------------------
/b*c/de | 2
äbc | 2
(2 rows)
CREATE OR REPLACE PROCEDURE select_from_dist_tbl_text_proc(
p_keyword text
)
LANGUAGE plpgsql
AS $$
BEGIN
PERFORM select_from_dist_tbl_text(p_keyword);
PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE b < 0;
PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text;
PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE a = p_keyword;
COMMIT;
END;$$;
CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(NULL);
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
tenant_attribute | query_count_in_this_period
---------------------------------------------------------------------
/b*c/de | 8
äbc | 8
(2 rows)
CREATE OR REPLACE VIEW
select_from_dist_tbl_text_view
AS
SELECT * FROM citus_stat_tenants.dist_tbl_text;
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
tenant_attribute | query_count_in_this_period
---------------------------------------------------------------------
/b*c/de | 11
äbc | 11
(2 rows)
SET client_min_messages TO ERROR;
DROP SCHEMA citus_stat_tenants CASCADE;

View File

@ -187,6 +187,8 @@ ORDER BY placementid;
(1 row)
-- reset cluster to original state
ALTER SEQUENCE pg_dist_node_nodeid_seq RESTART 2;
ALTER SEQUENCE pg_dist_groupid_seq RESTART 2;
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
@ -196,7 +198,7 @@ SELECT citus.mitmproxy('conn.allow()');
SELECT master_add_node('localhost', :worker_2_proxy_port);
master_add_node
---------------------------------------------------------------------
4
2
(1 row)
-- verify node is added

View File

@ -12,6 +12,8 @@ SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 1;
SET citus.max_adaptive_executor_pool_size TO 1;
SELECT pg_backend_pid() as pid \gset
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 222222;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 333333;
-- make sure coordinator is in the metadata
SELECT citus_set_coordinator_host('localhost', 57636);
citus_set_coordinator_host
@ -189,8 +191,8 @@ SELECT create_distributed_table_concurrently('table_1', 'id');
SELECT * FROM pg_dist_shard WHERE logicalrelid = 'table_1'::regclass;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
---------------------------------------------------------------------
table_1 | 1880080 | t | -2147483648 | -1
table_1 | 1880081 | t | 0 | 2147483647
table_1 | 222247 | t | -2147483648 | -1
table_1 | 222248 | t | 0 | 2147483647
(2 rows)
DROP SCHEMA create_dist_tbl_con CASCADE;
@ -201,3 +203,5 @@ SELECT citus_remove_node('localhost', 57636);
(1 row)
ALTER SEQUENCE pg_dist_node_nodeid_seq RESTART 3;
ALTER SEQUENCE pg_dist_groupid_seq RESTART 3;

View File

@ -19,10 +19,30 @@ SET client_min_messages TO ERROR;
-- Create roles
CREATE ROLE foo1;
CREATE ROLE foo2;
-- Create collation
CREATE COLLATION german_phonebook (provider = icu, locale = 'de-u-co-phonebk');
-- Create type
CREATE TYPE pair_type AS (a int, b int);
-- Create function
CREATE FUNCTION one_as_result() RETURNS INT LANGUAGE SQL AS
$$
SELECT 1;
$$;
-- Create text search dictionary
CREATE TEXT SEARCH DICTIONARY my_german_dict (
template = snowball,
language = german,
stopwords = german
);
-- Create text search config
CREATE TEXT SEARCH CONFIGURATION my_ts_config ( parser = default );
ALTER TEXT SEARCH CONFIGURATION my_ts_config ALTER MAPPING FOR asciiword WITH my_german_dict;
-- Create sequence
CREATE SEQUENCE seq;
-- Create colocated distributed tables
CREATE TABLE dist1 (id int PRIMARY KEY default nextval('seq'));
CREATE TABLE dist1 (id int PRIMARY KEY default nextval('seq'), col int default (one_as_result()), myserial serial, phone text COLLATE german_phonebook, initials pair_type);
CREATE SEQUENCE seq_owned OWNED BY dist1.id;
CREATE INDEX dist1_search_phone_idx ON dist1 USING gin (to_tsvector('my_ts_config'::regconfig, (COALESCE(phone, ''::text))::text));
SELECT create_distributed_table('dist1', 'id');
create_distributed_table
---------------------------------------------------------------------
@ -52,12 +72,30 @@ CREATE TABLE loc1 (id int PRIMARY KEY);
INSERT INTO loc1 SELECT i FROM generate_series(1,100) i;
CREATE TABLE loc2 (id int REFERENCES loc1(id));
INSERT INTO loc2 SELECT i FROM generate_series(1,100) i;
-- Create publication
CREATE PUBLICATION pub_all;
-- citus_set_coordinator_host with wrong port
SELECT citus_set_coordinator_host('localhost', 9999);
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
-- citus_set_coordinator_host with correct port
SELECT citus_set_coordinator_host('localhost', :master_port);
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
-- show coordinator port is correct on all workers
SELECT * FROM run_command_on_workers($$SELECT row(nodename,nodeport) FROM pg_dist_node WHERE groupid = 0$$);
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 9060 | t | (localhost,57636)
localhost | 57637 | t | (localhost,57636)
(2 rows)
SELECT citus_add_local_table_to_metadata('loc1', cascade_via_foreign_keys => true);
citus_add_local_table_to_metadata
---------------------------------------------------------------------
@ -152,8 +190,8 @@ SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO pg_dist_node").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to drop sequence
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency").cancel(' || :pid || ')');
-- Failure to drop sequence dependency for all tables
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency.*FROM pg_dist_partition").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
@ -161,7 +199,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequen
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency.*FROM pg_dist_partition").kill()');
mitmproxy
---------------------------------------------------------------------
@ -305,7 +343,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="ALTER DATABASE.*OWNER TO").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Filure to create schema
-- Failure to create schema
SELECT citus.mitmproxy('conn.onQuery(query="CREATE SCHEMA IF NOT EXISTS mx_metadata_sync_multi_trans AUTHORIZATION").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
@ -320,6 +358,108 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE SCHEMA IF NOT EXISTS mx_metad
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create collation
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*CREATE COLLATION mx_metadata_sync_multi_trans.german_phonebook").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*CREATE COLLATION mx_metadata_sync_multi_trans.german_phonebook").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create function
SELECT citus.mitmproxy('conn.onQuery(query="CREATE OR REPLACE FUNCTION mx_metadata_sync_multi_trans.one_as_result").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="CREATE OR REPLACE FUNCTION mx_metadata_sync_multi_trans.one_as_result").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create text search dictionary
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*my_german_dict").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*my_german_dict").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create text search config
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*my_ts_config").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*my_ts_config").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create type
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*pair_type").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*pair_type").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create publication
SELECT citus.mitmproxy('conn.onQuery(query="CREATE PUBLICATION.*pub_all").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="CREATE PUBLICATION.*pub_all").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create sequence
@ -337,6 +477,40 @@ SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_apply_sequence_command
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to drop sequence dependency for distributed table
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency.*mx_metadata_sync_multi_trans.dist1").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency.*mx_metadata_sync_multi_trans.dist1").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to drop distributed table if exists
SELECT citus.mitmproxy('conn.onQuery(query="DROP TABLE IF EXISTS mx_metadata_sync_multi_trans.dist1").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="DROP TABLE IF EXISTS mx_metadata_sync_multi_trans.dist1").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create distributed table
@ -354,6 +528,40 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE mx_metadata_sync_multi_
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to record sequence dependency for table
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_record_sequence_dependency").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_record_sequence_dependency").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create index for table
SELECT citus.mitmproxy('conn.onQuery(query="CREATE INDEX dist1_search_phone_idx ON mx_metadata_sync_multi_trans.dist1 USING gin").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="CREATE INDEX dist1_search_phone_idx ON mx_metadata_sync_multi_trans.dist1 USING gin").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create reference table
@ -524,6 +732,125 @@ SELECT citus.mitmproxy('conn.onQuery(query="SELECT citus_internal_add_object_met
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to mark function as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*one_as_result").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*one_as_result").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to mark collation as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*german_phonebook").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*german_phonebook").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to mark text search dictionary as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*my_german_dict").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*my_german_dict").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to mark text search configuration as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*my_ts_config").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*my_ts_config").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to mark type as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*pair_type").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*pair_type").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to mark sequence as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*seq_owned").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*seq_owned").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to mark publication as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*pub_all").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*pub_all").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to set isactive to true
@ -581,8 +908,8 @@ ERROR: connection not open
SELECT * FROM pg_dist_node ORDER BY nodeport;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
---------------------------------------------------------------------
4 | 4 | localhost | 9060 | default | f | t | primary | default | f | t
6 | 0 | localhost | 57636 | default | t | t | primary | default | t | f
2 | 2 | localhost | 9060 | default | f | t | primary | default | f | t
3 | 0 | localhost | 57636 | default | t | t | primary | default | t | f
1 | 1 | localhost | 57637 | default | t | t | primary | default | t | t
(3 rows)
@ -610,24 +937,14 @@ UPDATE dist1 SET id = :failed_node_val WHERE id = :failed_node_val;
-- Show that we can still delete from a shard at the node from coordinator
DELETE FROM dist1 WHERE id = :failed_node_val;
-- Show that DDL would still propagate to the node
SET client_min_messages TO NOTICE;
SET citus.log_remote_commands TO 1;
CREATE SCHEMA dummy;
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'
NOTICE: issuing CREATE SCHEMA dummy
NOTICE: issuing SET citus.enable_ddl_propagation TO 'on'
NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'
NOTICE: issuing CREATE SCHEMA dummy
NOTICE: issuing SET citus.enable_ddl_propagation TO 'on'
NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['dummy']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx'
NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx'
NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx'
NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx'
SET citus.log_remote_commands TO 0;
SET client_min_messages TO ERROR;
SELECT * FROM run_command_on_workers($$SELECT nspname FROM pg_namespace WHERE nspname = 'dummy'$$);
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 9060 | t | dummy
localhost | 57637 | t | dummy
(2 rows)
-- Successfully activate the node after many failures
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
@ -638,14 +955,14 @@ SELECT citus.mitmproxy('conn.allow()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
citus_activate_node
---------------------------------------------------------------------
4
2
(1 row)
-- Activate the node once more to verify it works again with already synced metadata
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
citus_activate_node
---------------------------------------------------------------------
4
2
(1 row)
-- Show node metadata info on worker2 and coordinator after success
@ -653,8 +970,8 @@ SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT * FROM pg_dist_node ORDER BY nodeport;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
---------------------------------------------------------------------
4 | 4 | localhost | 9060 | default | t | t | primary | default | t | t
6 | 0 | localhost | 57636 | default | t | t | primary | default | t | f
2 | 2 | localhost | 9060 | default | t | t | primary | default | t | t
3 | 0 | localhost | 57636 | default | t | t | primary | default | t | f
1 | 1 | localhost | 57637 | default | t | t | primary | default | t | t
(3 rows)
@ -662,8 +979,8 @@ SELECT * FROM pg_dist_node ORDER BY nodeport;
SELECT * FROM pg_dist_node ORDER BY nodeport;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
---------------------------------------------------------------------
4 | 4 | localhost | 9060 | default | t | t | primary | default | t | t
6 | 0 | localhost | 57636 | default | t | t | primary | default | t | f
2 | 2 | localhost | 9060 | default | t | t | primary | default | t | t
3 | 0 | localhost | 57636 | default | t | t | primary | default | t | f
1 | 1 | localhost | 57637 | default | t | t | primary | default | t | t
(3 rows)
@ -674,9 +991,10 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row)
RESET citus.metadata_sync_mode;
DROP PUBLICATION pub_all;
DROP SCHEMA dummy;
DROP SCHEMA mx_metadata_sync_multi_trans CASCADE;
NOTICE: drop cascades to 10 other objects
NOTICE: drop cascades to 15 other objects
DROP ROLE foo1;
DROP ROLE foo2;
SELECT citus_remove_node('localhost', :master_port);
@ -685,3 +1003,5 @@ SELECT citus_remove_node('localhost', :master_port);
(1 row)
ALTER SEQUENCE pg_dist_node_nodeid_seq RESTART 3;
ALTER SEQUENCE pg_dist_groupid_seq RESTART 3;

View File

@ -1335,7 +1335,7 @@ DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
SHOW citus.version;
citus.version
---------------------------------------------------------------------
11.3devel
11.3.0
(1 row)
-- ensure no unexpected objects were created outside pg_catalog
@ -1750,4 +1750,4 @@ DROP TABLE version_mismatch_table;
DROP SCHEMA multi_extension;
ERROR: cannot drop schema multi_extension because other objects depend on it
DETAIL: function multi_extension.print_extension_changes() depends on schema multi_extension
HINT: Use DROP ... CASCADE to drop the dependent objects too.
HINT: Use DROP ... CASCADE to drop the dependent objects too.

View File

@ -650,7 +650,7 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE EXTENSION IF NOT EXISTS citus_columnar WITH SCHEMA pg_catalog VERSION "11.2-1";
NOTICE: issuing CREATE EXTENSION IF NOT EXISTS citus_columnar WITH SCHEMA pg_catalog VERSION "11.3-1";
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -658,7 +658,7 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE EXTENSION IF NOT EXISTS citus_columnar WITH SCHEMA pg_catalog VERSION "11.2-1";
NOTICE: issuing CREATE EXTENSION IF NOT EXISTS citus_columnar WITH SCHEMA pg_catalog VERSION "11.3-1";
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx

View File

@ -149,6 +149,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('public.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('public.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -174,7 +175,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['public', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(49 rows)
(50 rows)
-- Show that CREATE INDEX commands are included in the activate node snapshot
CREATE INDEX mx_index ON mx_test_table(col_2);
@ -206,6 +207,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('public.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('public.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -231,7 +233,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['public', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(50 rows)
(51 rows)
-- Show that schema changes are included in the activate node snapshot
CREATE SCHEMA mx_testing_schema;
@ -265,6 +267,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('mx_testing_schema.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -291,7 +294,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(52 rows)
(53 rows)
-- Show that append distributed tables are not included in the activate node snapshot
CREATE TABLE non_mx_test_table (col_1 int, col_2 text);
@ -331,6 +334,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('mx_testing_schema.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -357,7 +361,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(52 rows)
(53 rows)
-- Show that range distributed tables are not included in the activate node snapshot
UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass;
@ -390,6 +394,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('mx_testing_schema.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -416,7 +421,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(52 rows)
(53 rows)
-- Test start_metadata_sync_to_node and citus_activate_node UDFs
-- Ensure that hasmetadata=false for all nodes
@ -1943,6 +1948,12 @@ SELECT unnest(activate_node_snapshot()) order by 1;
SELECT citus_internal_add_partition_metadata ('public.dist_table_1'::regclass, 'h', 'a', 10010, 's')
SELECT citus_internal_add_partition_metadata ('public.mx_ref'::regclass, 'n', NULL, 10009, 't')
SELECT citus_internal_add_partition_metadata ('public.test_table'::regclass, 'h', 'id', 10010, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('mx_test_schema_1.mx_table_1');
SELECT pg_catalog.worker_drop_sequence_dependency('mx_test_schema_2.mx_table_2');
SELECT pg_catalog.worker_drop_sequence_dependency('mx_testing_schema.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency('public.dist_table_1');
SELECT pg_catalog.worker_drop_sequence_dependency('public.mx_ref');
SELECT pg_catalog.worker_drop_sequence_dependency('public.test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -1997,7 +2008,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.dist_table_1'::regclass, 1310074, 't'::"char", '-2147483648', '-1073741825'), ('public.dist_table_1'::regclass, 1310075, 't'::"char", '-1073741824', '-1'), ('public.dist_table_1'::regclass, 1310076, 't'::"char", '0', '1073741823'), ('public.dist_table_1'::regclass, 1310077, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_ref'::regclass, 1310073, 't'::"char", NULL, NULL)) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.test_table'::regclass, 1310083, 't'::"char", '-2147483648', '-1073741825'), ('public.test_table'::regclass, 1310084, 't'::"char", '-1073741824', '-1'), ('public.test_table'::regclass, 1310085, 't'::"char", '0', '1073741823'), ('public.test_table'::regclass, 1310086, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(111 rows)
(117 rows)
-- shouldn't work since test_table is MX
ALTER TABLE test_table ADD COLUMN id3 bigserial;

View File

@ -149,6 +149,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('public.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('public.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -174,7 +175,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['public', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(49 rows)
(50 rows)
-- Show that CREATE INDEX commands are included in the activate node snapshot
CREATE INDEX mx_index ON mx_test_table(col_2);
@ -206,6 +207,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('public.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('public.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -231,7 +233,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['public', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(50 rows)
(51 rows)
-- Show that schema changes are included in the activate node snapshot
CREATE SCHEMA mx_testing_schema;
@ -265,6 +267,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('mx_testing_schema.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -291,7 +294,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(52 rows)
(53 rows)
-- Show that append distributed tables are not included in the activate node snapshot
CREATE TABLE non_mx_test_table (col_1 int, col_2 text);
@ -331,6 +334,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('mx_testing_schema.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -357,7 +361,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(52 rows)
(53 rows)
-- Show that range distributed tables are not included in the activate node snapshot
UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass;
@ -390,6 +394,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('mx_testing_schema.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -416,7 +421,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(52 rows)
(53 rows)
-- Test start_metadata_sync_to_node and citus_activate_node UDFs
-- Ensure that hasmetadata=false for all nodes
@ -1943,6 +1948,12 @@ SELECT unnest(activate_node_snapshot()) order by 1;
SELECT citus_internal_add_partition_metadata ('public.dist_table_1'::regclass, 'h', 'a', 10010, 's')
SELECT citus_internal_add_partition_metadata ('public.mx_ref'::regclass, 'n', NULL, 10009, 't')
SELECT citus_internal_add_partition_metadata ('public.test_table'::regclass, 'h', 'id', 10010, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('mx_test_schema_1.mx_table_1');
SELECT pg_catalog.worker_drop_sequence_dependency('mx_test_schema_2.mx_table_2');
SELECT pg_catalog.worker_drop_sequence_dependency('mx_testing_schema.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency('public.dist_table_1');
SELECT pg_catalog.worker_drop_sequence_dependency('public.mx_ref');
SELECT pg_catalog.worker_drop_sequence_dependency('public.test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -1997,7 +2008,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.dist_table_1'::regclass, 1310074, 't'::"char", '-2147483648', '-1073741825'), ('public.dist_table_1'::regclass, 1310075, 't'::"char", '-1073741824', '-1'), ('public.dist_table_1'::regclass, 1310076, 't'::"char", '0', '1073741823'), ('public.dist_table_1'::regclass, 1310077, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_ref'::regclass, 1310073, 't'::"char", NULL, NULL)) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.test_table'::regclass, 1310083, 't'::"char", '-2147483648', '-1073741825'), ('public.test_table'::regclass, 1310084, 't'::"char", '-1073741824', '-1'), ('public.test_table'::regclass, 1310085, 't'::"char", '0', '1073741823'), ('public.test_table'::regclass, 1310086, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(111 rows)
(117 rows)
-- shouldn't work since test_table is MX
ALTER TABLE test_table ADD COLUMN id3 bigserial;

View File

@ -20,13 +20,14 @@ SELECT create_distributed_table('dist_table_test', 'a');
CREATE TABLE postgres_table_test(a int primary key);
-- make sure that all rebalance operations works fine when
-- reference tables are replicated to the coordinator
SET client_min_messages TO ERROR;
SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0);
NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata
?column?
---------------------------------------------------------------------
1
(1 row)
RESET client_min_messages;
-- should just be noops even if we add the coordinator to the pg_dist_node
SELECT rebalance_table_shards('dist_table_test');
rebalance_table_shards
@ -2713,6 +2714,113 @@ SELECT sh.logicalrelid, pl.nodeport
(5 rows)
DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;
-- test the same with coordinator shouldhaveshards = false and shard_count = 2
-- so that the shard allowed node count would be 2 when rebalancing
-- for such cases, we only count the nodes that are allowed for shard placements
UPDATE pg_dist_node SET shouldhaveshards=false WHERE nodeport = :master_port;
create table two_shard_colocation_1a (a int primary key);
create table two_shard_colocation_1b (a int primary key);
SET citus.shard_replication_factor = 1;
select create_distributed_table('two_shard_colocation_1a','a', colocate_with => 'none', shard_count => 2);
create_distributed_table
---------------------------------------------------------------------
(1 row)
select create_distributed_table('two_shard_colocation_1b','a',colocate_with=>'two_shard_colocation_1a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
create table two_shard_colocation_2a (a int primary key);
create table two_shard_colocation_2b (a int primary key);
select create_distributed_table('two_shard_colocation_2a','a', colocate_with => 'none', shard_count => 2);
create_distributed_table
---------------------------------------------------------------------
(1 row)
select create_distributed_table('two_shard_colocation_2b','a',colocate_with=>'two_shard_colocation_2a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- move shards of colocation group 1 to worker1
SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port)
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid = 'two_shard_colocation_1a'::regclass
AND pl.nodeport = :worker_2_port
LIMIT 1;
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- move shards of colocation group 2 to worker2
SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port)
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid = 'two_shard_colocation_2a'::regclass
AND pl.nodeport = :worker_1_port
LIMIT 1;
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- current state:
-- coordinator: []
-- worker 1: [1_1, 1_2]
-- worker 2: [2_1, 2_2]
SELECT sh.logicalrelid, pl.nodeport
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b')
ORDER BY sh.logicalrelid, pl.nodeport;
logicalrelid | nodeport
---------------------------------------------------------------------
two_shard_colocation_1a | 57637
two_shard_colocation_1a | 57637
two_shard_colocation_1b | 57637
two_shard_colocation_1b | 57637
two_shard_colocation_2a | 57638
two_shard_colocation_2a | 57638
two_shard_colocation_2b | 57638
two_shard_colocation_2b | 57638
(8 rows)
-- If we take the coordinator into account, the rebalancer considers this as balanced and does nothing (shard_count < worker_count)
-- but because the coordinator is not allowed for shards, rebalancer will distribute each colocation group to both workers
select rebalance_table_shards(shard_transfer_mode:='block_writes');
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
-- final state:
-- coordinator: []
-- worker 1: [1_1, 2_1]
-- worker 2: [1_2, 2_2]
SELECT sh.logicalrelid, pl.nodeport
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b')
ORDER BY sh.logicalrelid, pl.nodeport;
logicalrelid | nodeport
---------------------------------------------------------------------
two_shard_colocation_1a | 57637
two_shard_colocation_1a | 57638
two_shard_colocation_1b | 57637
two_shard_colocation_1b | 57638
two_shard_colocation_2a | 57637
two_shard_colocation_2a | 57638
two_shard_colocation_2b | 57637
two_shard_colocation_2b | 57638
(8 rows)
-- cleanup
DROP TABLE two_shard_colocation_1a, two_shard_colocation_1b, two_shard_colocation_2a, two_shard_colocation_2b CASCADE;
-- verify we detect if one of the tables do not have a replica identity or primary key
-- and error out in case of shard transfer mode = auto
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);

View File

@ -35,7 +35,10 @@ UPDATE dist_tbl SET b = a + 1 WHERE a = 3;
UPDATE dist_tbl SET b = a + 1 WHERE a = 4;
DELETE FROM dist_tbl WHERE a = 5;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants(true) ORDER BY tenant_attribute;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
(cpu_usage_in_this_period>0) AS cpu_is_used_in_this_period, (cpu_usage_in_last_period>0) AS cpu_is_used_in_last_period
FROM citus_stat_tenants(true)
ORDER BY tenant_attribute;
SELECT citus_stat_tenants_reset();
@ -84,13 +87,26 @@ SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
INSERT INTO dist_tbl VALUES (5, 'abcd');
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
(cpu_usage_in_this_period>0) AS cpu_is_used_in_this_period, (cpu_usage_in_last_period>0) AS cpu_is_used_in_last_period
FROM citus_stat_tenants_local
ORDER BY tenant_attribute;
-- simulate passing the period
SET citus.stat_tenants_period TO 2;
SET citus.stat_tenants_period TO 5;
SELECT sleep_until_next_period();
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
(cpu_usage_in_this_period>0) AS cpu_is_used_in_this_period, (cpu_usage_in_last_period>0) AS cpu_is_used_in_last_period
FROM citus_stat_tenants_local
ORDER BY tenant_attribute;
SELECT sleep_until_next_period();
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
(cpu_usage_in_this_period>0) AS cpu_is_used_in_this_period, (cpu_usage_in_last_period>0) AS cpu_is_used_in_last_period
FROM citus_stat_tenants_local
ORDER BY tenant_attribute;
\c - - - :master_port
SET search_path TO citus_stat_tenants;
@ -158,6 +174,11 @@ SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*';
DELETE FROM dist_tbl_text WHERE a = '/b*c/de';
DELETE FROM dist_tbl_text WHERE a = '/bcde';
DELETE FROM dist_tbl_text WHERE a = U&'\0061\0308bc';
DELETE FROM dist_tbl_text WHERE a = 'bcde*';
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
-- test local cached queries & prepared statements
@ -231,5 +252,64 @@ SELECT count(*)>=0 FROM citus_stat_tenants_local();
RESET ROLE;
DROP ROLE stats_non_superuser;
-- test function push down
CREATE OR REPLACE FUNCTION
select_from_dist_tbl_text(p_keyword text)
RETURNS boolean LANGUAGE plpgsql AS $fn$
BEGIN
RETURN(SELECT count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE a = $1);
END;
$fn$;
SELECT create_distributed_function(
'select_from_dist_tbl_text(text)', 'p_keyword', colocate_with => 'dist_tbl_text'
);
SELECT citus_stat_tenants_reset();
SELECT select_from_dist_tbl_text('/b*c/de');
SELECT select_from_dist_tbl_text('/b*c/de');
SELECT select_from_dist_tbl_text(U&'\0061\0308bc');
SELECT select_from_dist_tbl_text(U&'\0061\0308bc');
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
CREATE OR REPLACE PROCEDURE select_from_dist_tbl_text_proc(
p_keyword text
)
LANGUAGE plpgsql
AS $$
BEGIN
PERFORM select_from_dist_tbl_text(p_keyword);
PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE b < 0;
PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text;
PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE a = p_keyword;
COMMIT;
END;$$;
CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(NULL);
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
CREATE OR REPLACE VIEW
select_from_dist_tbl_text_view
AS
SELECT * FROM citus_stat_tenants.dist_tbl_text;
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de';
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de';
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de';
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
SET client_min_messages TO ERROR;
DROP SCHEMA citus_stat_tenants CASCADE;

View File

@ -97,6 +97,8 @@ WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid;
-- reset cluster to original state
ALTER SEQUENCE pg_dist_node_nodeid_seq RESTART 2;
ALTER SEQUENCE pg_dist_groupid_seq RESTART 2;
SELECT citus.mitmproxy('conn.allow()');
SELECT master_add_node('localhost', :worker_2_proxy_port);

View File

@ -15,6 +15,9 @@ SET citus.shard_replication_factor TO 1;
SET citus.max_adaptive_executor_pool_size TO 1;
SELECT pg_backend_pid() as pid \gset
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 222222;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 333333;
-- make sure coordinator is in the metadata
SELECT citus_set_coordinator_host('localhost', 57636);
@ -108,3 +111,5 @@ SELECT * FROM pg_dist_shard WHERE logicalrelid = 'table_1'::regclass;
DROP SCHEMA create_dist_tbl_con CASCADE;
SET search_path TO default;
SELECT citus_remove_node('localhost', 57636);
ALTER SEQUENCE pg_dist_node_nodeid_seq RESTART 3;
ALTER SEQUENCE pg_dist_groupid_seq RESTART 3;

View File

@ -18,11 +18,36 @@ SET client_min_messages TO ERROR;
CREATE ROLE foo1;
CREATE ROLE foo2;
-- Create collation
CREATE COLLATION german_phonebook (provider = icu, locale = 'de-u-co-phonebk');
-- Create type
CREATE TYPE pair_type AS (a int, b int);
-- Create function
CREATE FUNCTION one_as_result() RETURNS INT LANGUAGE SQL AS
$$
SELECT 1;
$$;
-- Create text search dictionary
CREATE TEXT SEARCH DICTIONARY my_german_dict (
template = snowball,
language = german,
stopwords = german
);
-- Create text search config
CREATE TEXT SEARCH CONFIGURATION my_ts_config ( parser = default );
ALTER TEXT SEARCH CONFIGURATION my_ts_config ALTER MAPPING FOR asciiword WITH my_german_dict;
-- Create sequence
CREATE SEQUENCE seq;
-- Create colocated distributed tables
CREATE TABLE dist1 (id int PRIMARY KEY default nextval('seq'));
CREATE TABLE dist1 (id int PRIMARY KEY default nextval('seq'), col int default (one_as_result()), myserial serial, phone text COLLATE german_phonebook, initials pair_type);
CREATE SEQUENCE seq_owned OWNED BY dist1.id;
CREATE INDEX dist1_search_phone_idx ON dist1 USING gin (to_tsvector('my_ts_config'::regconfig, (COALESCE(phone, ''::text))::text));
SELECT create_distributed_table('dist1', 'id');
INSERT INTO dist1 SELECT i FROM generate_series(1,100) i;
@ -42,7 +67,15 @@ INSERT INTO loc1 SELECT i FROM generate_series(1,100) i;
CREATE TABLE loc2 (id int REFERENCES loc1(id));
INSERT INTO loc2 SELECT i FROM generate_series(1,100) i;
-- Create publication
CREATE PUBLICATION pub_all;
-- citus_set_coordinator_host with wrong port
SELECT citus_set_coordinator_host('localhost', 9999);
-- citus_set_coordinator_host with correct port
SELECT citus_set_coordinator_host('localhost', :master_port);
-- show coordinator port is correct on all workers
SELECT * FROM run_command_on_workers($$SELECT row(nodename,nodeport) FROM pg_dist_node WHERE groupid = 0$$);
SELECT citus_add_local_table_to_metadata('loc1', cascade_via_foreign_keys => true);
-- Create partitioned distributed table
@ -83,10 +116,10 @@ SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO pg_dist_node").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to drop sequence
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency").cancel(' || :pid || ')');
-- Failure to drop sequence dependency for all tables
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency.*FROM pg_dist_partition").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency.*FROM pg_dist_partition").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to drop shell table
@ -137,24 +170,84 @@ SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="ALTER DATABASE.*OWNER TO").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Filure to create schema
-- Failure to create schema
SELECT citus.mitmproxy('conn.onQuery(query="CREATE SCHEMA IF NOT EXISTS mx_metadata_sync_multi_trans AUTHORIZATION").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="CREATE SCHEMA IF NOT EXISTS mx_metadata_sync_multi_trans AUTHORIZATION").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create collation
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*CREATE COLLATION mx_metadata_sync_multi_trans.german_phonebook").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*CREATE COLLATION mx_metadata_sync_multi_trans.german_phonebook").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create function
SELECT citus.mitmproxy('conn.onQuery(query="CREATE OR REPLACE FUNCTION mx_metadata_sync_multi_trans.one_as_result").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="CREATE OR REPLACE FUNCTION mx_metadata_sync_multi_trans.one_as_result").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create text search dictionary
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*my_german_dict").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*my_german_dict").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create text search config
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*my_ts_config").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*my_ts_config").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create type
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*pair_type").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*pair_type").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create publication
SELECT citus.mitmproxy('conn.onQuery(query="CREATE PUBLICATION.*pub_all").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="CREATE PUBLICATION.*pub_all").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create sequence
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_apply_sequence_command").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_apply_sequence_command").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to drop sequence dependency for distributed table
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency.*mx_metadata_sync_multi_trans.dist1").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency.*mx_metadata_sync_multi_trans.dist1").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to drop distributed table if exists
SELECT citus.mitmproxy('conn.onQuery(query="DROP TABLE IF EXISTS mx_metadata_sync_multi_trans.dist1").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="DROP TABLE IF EXISTS mx_metadata_sync_multi_trans.dist1").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create distributed table
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE mx_metadata_sync_multi_trans.dist1").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE mx_metadata_sync_multi_trans.dist1").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to record sequence dependency for table
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_record_sequence_dependency").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_record_sequence_dependency").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create index for table
SELECT citus.mitmproxy('conn.onQuery(query="CREATE INDEX dist1_search_phone_idx ON mx_metadata_sync_multi_trans.dist1 USING gin").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="CREATE INDEX dist1_search_phone_idx ON mx_metadata_sync_multi_trans.dist1 USING gin").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create reference table
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE mx_metadata_sync_multi_trans.ref").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
@ -215,6 +308,48 @@ SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT citus_internal_add_object_metadata").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to mark function as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*one_as_result").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*one_as_result").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to mark collation as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*german_phonebook").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*german_phonebook").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to mark text search dictionary as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*my_german_dict").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*my_german_dict").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to mark text search configuration as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*my_ts_config").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*my_ts_config").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to mark type as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*pair_type").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*pair_type").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to mark sequence as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*seq_owned").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*seq_owned").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to mark publication as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*pub_all").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*pub_all").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to set isactive to true
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE pg_dist_node SET isactive = TRUE").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
@ -255,11 +390,8 @@ UPDATE dist1 SET id = :failed_node_val WHERE id = :failed_node_val;
DELETE FROM dist1 WHERE id = :failed_node_val;
-- Show that DDL would still propagate to the node
SET client_min_messages TO NOTICE;
SET citus.log_remote_commands TO 1;
CREATE SCHEMA dummy;
SET citus.log_remote_commands TO 0;
SET client_min_messages TO ERROR;
SELECT * FROM run_command_on_workers($$SELECT nspname FROM pg_namespace WHERE nspname = 'dummy'$$);
-- Successfully activate the node after many failures
SELECT citus.mitmproxy('conn.allow()');
@ -275,8 +407,11 @@ SELECT * FROM pg_dist_node ORDER BY nodeport;
SELECT citus.mitmproxy('conn.allow()');
RESET citus.metadata_sync_mode;
DROP PUBLICATION pub_all;
DROP SCHEMA dummy;
DROP SCHEMA mx_metadata_sync_multi_trans CASCADE;
DROP ROLE foo1;
DROP ROLE foo2;
SELECT citus_remove_node('localhost', :master_port);
ALTER SEQUENCE pg_dist_node_nodeid_seq RESTART 3;
ALTER SEQUENCE pg_dist_groupid_seq RESTART 3;

View File

@ -13,7 +13,9 @@ CREATE TABLE postgres_table_test(a int primary key);
-- make sure that all rebalance operations works fine when
-- reference tables are replicated to the coordinator
SET client_min_messages TO ERROR;
SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0);
RESET client_min_messages;
-- should just be noops even if we add the coordinator to the pg_dist_node
SELECT rebalance_table_shards('dist_table_test');
@ -1497,6 +1499,61 @@ SELECT sh.logicalrelid, pl.nodeport
DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;
-- test the same with coordinator shouldhaveshards = false and shard_count = 2
-- so that the shard allowed node count would be 2 when rebalancing
-- for such cases, we only count the nodes that are allowed for shard placements
UPDATE pg_dist_node SET shouldhaveshards=false WHERE nodeport = :master_port;
create table two_shard_colocation_1a (a int primary key);
create table two_shard_colocation_1b (a int primary key);
SET citus.shard_replication_factor = 1;
select create_distributed_table('two_shard_colocation_1a','a', colocate_with => 'none', shard_count => 2);
select create_distributed_table('two_shard_colocation_1b','a',colocate_with=>'two_shard_colocation_1a');
create table two_shard_colocation_2a (a int primary key);
create table two_shard_colocation_2b (a int primary key);
select create_distributed_table('two_shard_colocation_2a','a', colocate_with => 'none', shard_count => 2);
select create_distributed_table('two_shard_colocation_2b','a',colocate_with=>'two_shard_colocation_2a');
-- move shards of colocation group 1 to worker1
SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port)
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid = 'two_shard_colocation_1a'::regclass
AND pl.nodeport = :worker_2_port
LIMIT 1;
-- move shards of colocation group 2 to worker2
SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port)
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid = 'two_shard_colocation_2a'::regclass
AND pl.nodeport = :worker_1_port
LIMIT 1;
-- current state:
-- coordinator: []
-- worker 1: [1_1, 1_2]
-- worker 2: [2_1, 2_2]
SELECT sh.logicalrelid, pl.nodeport
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b')
ORDER BY sh.logicalrelid, pl.nodeport;
-- If we take the coordinator into account, the rebalancer considers this as balanced and does nothing (shard_count < worker_count)
-- but because the coordinator is not allowed for shards, rebalancer will distribute each colocation group to both workers
select rebalance_table_shards(shard_transfer_mode:='block_writes');
-- final state:
-- coordinator: []
-- worker 1: [1_1, 2_1]
-- worker 2: [1_2, 2_2]
SELECT sh.logicalrelid, pl.nodeport
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b')
ORDER BY sh.logicalrelid, pl.nodeport;
-- cleanup
DROP TABLE two_shard_colocation_1a, two_shard_colocation_1b, two_shard_colocation_2a, two_shard_colocation_2b CASCADE;
-- verify we detect if one of the tables do not have a replica identity or primary key
-- and error out in case of shard transfer mode = auto
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);