mirror of https://github.com/citusdata/citus.git
Compare commits
11 Commits
Author | SHA1 | Date |
---|---|---|
|
b7ae596fe8 | |
|
6f4324623c | |
|
d5db0adc17 | |
|
099523452e | |
|
af448da1a7 | |
|
acccad9879 | |
|
77947da17c | |
|
7d56c25e28 | |
|
eba70af7a2 | |
|
3f33390f45 | |
|
7b51f3eee2 |
125
CHANGELOG.md
125
CHANGELOG.md
|
@ -1,3 +1,128 @@
|
||||||
|
### citus v11.1.1 (September 16, 2022) ###
|
||||||
|
|
||||||
|
* Fixes a bug that prevents `create_distributed_table_concurrently()` working
|
||||||
|
on an empty node
|
||||||
|
|
||||||
|
### citus v11.1.0 (September 15, 2022) ###
|
||||||
|
|
||||||
|
* Adds support for PostgreSQL 15beta4
|
||||||
|
|
||||||
|
* Adds ability to run shard rebalancer in the background
|
||||||
|
|
||||||
|
* Adds `create_distributed_table_concurrently()` UDF to distribute tables
|
||||||
|
without interrupting the application
|
||||||
|
|
||||||
|
* Adds `citus_split_shard_by_split_points()` UDF that allows
|
||||||
|
splitting a shard to specified set of nodes without blocking writes
|
||||||
|
and based on given split points
|
||||||
|
|
||||||
|
* Adds support for non-blocking tenant isolation
|
||||||
|
|
||||||
|
* Adds support for isolation tenants that use partitioned tables
|
||||||
|
or columnar tables
|
||||||
|
|
||||||
|
* Separates columnar table access method into a separate logical extension
|
||||||
|
|
||||||
|
* Adds support for online replication in `replicate_reference_tables()`
|
||||||
|
|
||||||
|
* Improves performance of blocking shard moves
|
||||||
|
|
||||||
|
* Improves non-blocking shard moves with a faster custom copy logic
|
||||||
|
|
||||||
|
* Creates all foreign keys quickly at the end of a shard move
|
||||||
|
|
||||||
|
* Limits `get_rebalance_progress()` to show shards in moving state
|
||||||
|
|
||||||
|
* Makes `citus_move_shard_placement()` idempotent if shard already exists
|
||||||
|
on target node
|
||||||
|
|
||||||
|
* Shows `citus_copy_shard_placement()` progress in `get_rebalance_progres()`
|
||||||
|
|
||||||
|
* Supports changing CPU priorities for backends and shard moves
|
||||||
|
|
||||||
|
* Adds the GUC `citus.allow_unsafe_constraints` to allow unique/exclusion/
|
||||||
|
primary key constraints without distribution column
|
||||||
|
|
||||||
|
* Introduces GUC `citus.skip_constraint_validation`
|
||||||
|
|
||||||
|
* Introduces `citus_locks` view
|
||||||
|
|
||||||
|
* Improves `citus_tables` view by showing local tables added to metadata
|
||||||
|
|
||||||
|
* Improves columnar table access method by moving old catalog tables into
|
||||||
|
an internal schema and introduces more secure & informative views based
|
||||||
|
on them
|
||||||
|
|
||||||
|
* Adds support for `GRANT/REVOKE` on aggregates
|
||||||
|
|
||||||
|
* Adds support for `NULLS NOT DISTINCT` clauses for indexes for PG15+
|
||||||
|
|
||||||
|
* Adds support for setting relation options for columnar tables using
|
||||||
|
`ALTER TABLE`
|
||||||
|
|
||||||
|
* Adds support for unlogged distributed sequences
|
||||||
|
|
||||||
|
* Removes `do_repair` option from `citus_copy_shard_placement()`
|
||||||
|
|
||||||
|
* Removes deprecated re-partitioning functions like
|
||||||
|
`worker_hash_partition_table()`
|
||||||
|
|
||||||
|
* Drops support for isolation tenants that use replicated tables
|
||||||
|
|
||||||
|
* Checks existence of the shards before insert, delete, and update
|
||||||
|
|
||||||
|
* Hides tables owned by extensions from `citus_tables` and `citus_shards`
|
||||||
|
|
||||||
|
* Propagates `VACUUM` and `ANALYZE` to worker nodes
|
||||||
|
|
||||||
|
* Makes non-partitioned table size calculation quicker
|
||||||
|
|
||||||
|
* Improves `create_distributed_table()` by creating new colocation entries when
|
||||||
|
using `colocate_with => 'none'`
|
||||||
|
|
||||||
|
* Ensures that `SELECT .. FOR UPDATE` opens a transaction block when used in
|
||||||
|
a function call
|
||||||
|
|
||||||
|
* Prevents a segfault by disallowing usage of SQL functions referencing to a
|
||||||
|
distributed table
|
||||||
|
|
||||||
|
* Prevents creating a new colocation entry when replicating reference tables
|
||||||
|
|
||||||
|
* Fixes a bug in query escaping in `undistribute_table()` and
|
||||||
|
`alter_distributed_table()`
|
||||||
|
|
||||||
|
* Fixes a bug preventing the usage of `isolate_tenant_to_new_shard()` with text
|
||||||
|
column
|
||||||
|
|
||||||
|
* Fixes a bug that may cause `GRANT` to propagate within `CREATE EXTENSION`
|
||||||
|
|
||||||
|
* Fixes a bug that causes incorrectly marking `metadatasynced` flag for
|
||||||
|
coordinator
|
||||||
|
|
||||||
|
* Fixes a bug that may prevent Citus from creating function in transaction
|
||||||
|
block properly
|
||||||
|
|
||||||
|
* Fixes a bug that prevents promoting read-replicas as primaries
|
||||||
|
|
||||||
|
* Fixes a bug that prevents setting colocation group of a partitioned
|
||||||
|
distributed table to `none`
|
||||||
|
|
||||||
|
* Fixes a bug that prevents using `AUTO` option for `VACUUM (INDEX_CLEANUP)`
|
||||||
|
operation
|
||||||
|
|
||||||
|
* Fixes a segfault in `citus_copy_shard_placement()`
|
||||||
|
|
||||||
|
* Fixes an issue that can cause logical reference table replication to fail
|
||||||
|
|
||||||
|
* Fixes schema name qualification for `RENAME SEQUENCE` statement
|
||||||
|
|
||||||
|
* Fixes several small memory leaks
|
||||||
|
|
||||||
|
* Fixes the transaction timestamp column of the `get_current_transaction_id()`
|
||||||
|
on coordinator
|
||||||
|
|
||||||
|
* Maps any unused parameters to a generic type in prepared statements
|
||||||
|
|
||||||
### citus v10.2.8 (August 19, 2022) ###
|
### citus v10.2.8 (August 19, 2022) ###
|
||||||
|
|
||||||
* Fixes compilation warning caused by latest upgrade script changes
|
* Fixes compilation warning caused by latest upgrade script changes
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
#! /bin/sh
|
#! /bin/sh
|
||||||
# Guess values for system-dependent variables and create Makefiles.
|
# Guess values for system-dependent variables and create Makefiles.
|
||||||
# Generated by GNU Autoconf 2.69 for Citus 11.1devel.
|
# Generated by GNU Autoconf 2.69 for Citus 11.1.1.
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
|
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
|
||||||
|
@ -579,8 +579,8 @@ MAKEFLAGS=
|
||||||
# Identity of this package.
|
# Identity of this package.
|
||||||
PACKAGE_NAME='Citus'
|
PACKAGE_NAME='Citus'
|
||||||
PACKAGE_TARNAME='citus'
|
PACKAGE_TARNAME='citus'
|
||||||
PACKAGE_VERSION='11.1devel'
|
PACKAGE_VERSION='11.1.1'
|
||||||
PACKAGE_STRING='Citus 11.1devel'
|
PACKAGE_STRING='Citus 11.1.1'
|
||||||
PACKAGE_BUGREPORT=''
|
PACKAGE_BUGREPORT=''
|
||||||
PACKAGE_URL=''
|
PACKAGE_URL=''
|
||||||
|
|
||||||
|
@ -1262,7 +1262,7 @@ if test "$ac_init_help" = "long"; then
|
||||||
# Omit some internal or obsolete options to make the list less imposing.
|
# Omit some internal or obsolete options to make the list less imposing.
|
||||||
# This message is too long to be a string in the A/UX 3.1 sh.
|
# This message is too long to be a string in the A/UX 3.1 sh.
|
||||||
cat <<_ACEOF
|
cat <<_ACEOF
|
||||||
\`configure' configures Citus 11.1devel to adapt to many kinds of systems.
|
\`configure' configures Citus 11.1.1 to adapt to many kinds of systems.
|
||||||
|
|
||||||
Usage: $0 [OPTION]... [VAR=VALUE]...
|
Usage: $0 [OPTION]... [VAR=VALUE]...
|
||||||
|
|
||||||
|
@ -1324,7 +1324,7 @@ fi
|
||||||
|
|
||||||
if test -n "$ac_init_help"; then
|
if test -n "$ac_init_help"; then
|
||||||
case $ac_init_help in
|
case $ac_init_help in
|
||||||
short | recursive ) echo "Configuration of Citus 11.1devel:";;
|
short | recursive ) echo "Configuration of Citus 11.1.1:";;
|
||||||
esac
|
esac
|
||||||
cat <<\_ACEOF
|
cat <<\_ACEOF
|
||||||
|
|
||||||
|
@ -1429,7 +1429,7 @@ fi
|
||||||
test -n "$ac_init_help" && exit $ac_status
|
test -n "$ac_init_help" && exit $ac_status
|
||||||
if $ac_init_version; then
|
if $ac_init_version; then
|
||||||
cat <<\_ACEOF
|
cat <<\_ACEOF
|
||||||
Citus configure 11.1devel
|
Citus configure 11.1.1
|
||||||
generated by GNU Autoconf 2.69
|
generated by GNU Autoconf 2.69
|
||||||
|
|
||||||
Copyright (C) 2012 Free Software Foundation, Inc.
|
Copyright (C) 2012 Free Software Foundation, Inc.
|
||||||
|
@ -1912,7 +1912,7 @@ cat >config.log <<_ACEOF
|
||||||
This file contains any messages produced by compilers while
|
This file contains any messages produced by compilers while
|
||||||
running configure, to aid debugging if configure makes a mistake.
|
running configure, to aid debugging if configure makes a mistake.
|
||||||
|
|
||||||
It was created by Citus $as_me 11.1devel, which was
|
It was created by Citus $as_me 11.1.1, which was
|
||||||
generated by GNU Autoconf 2.69. Invocation command line was
|
generated by GNU Autoconf 2.69. Invocation command line was
|
||||||
|
|
||||||
$ $0 $@
|
$ $0 $@
|
||||||
|
@ -5393,7 +5393,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
|
||||||
# report actual input values of CONFIG_FILES etc. instead of their
|
# report actual input values of CONFIG_FILES etc. instead of their
|
||||||
# values after options handling.
|
# values after options handling.
|
||||||
ac_log="
|
ac_log="
|
||||||
This file was extended by Citus $as_me 11.1devel, which was
|
This file was extended by Citus $as_me 11.1.1, which was
|
||||||
generated by GNU Autoconf 2.69. Invocation command line was
|
generated by GNU Autoconf 2.69. Invocation command line was
|
||||||
|
|
||||||
CONFIG_FILES = $CONFIG_FILES
|
CONFIG_FILES = $CONFIG_FILES
|
||||||
|
@ -5455,7 +5455,7 @@ _ACEOF
|
||||||
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
|
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
|
||||||
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
|
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
|
||||||
ac_cs_version="\\
|
ac_cs_version="\\
|
||||||
Citus config.status 11.1devel
|
Citus config.status 11.1.1
|
||||||
configured by $0, generated by GNU Autoconf 2.69,
|
configured by $0, generated by GNU Autoconf 2.69,
|
||||||
with options \\"\$ac_cs_config\\"
|
with options \\"\$ac_cs_config\\"
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
# everyone needing autoconf installed, the resulting files are checked
|
# everyone needing autoconf installed, the resulting files are checked
|
||||||
# into the SCM.
|
# into the SCM.
|
||||||
|
|
||||||
AC_INIT([Citus], [11.1devel])
|
AC_INIT([Citus], [11.1.1])
|
||||||
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
|
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
|
||||||
|
|
||||||
# we'll need sed and awk for some of the version commands
|
# we'll need sed and awk for some of the version commands
|
||||||
|
|
|
@ -1604,6 +1604,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
|
||||||
}
|
}
|
||||||
else if (ShouldSyncTableMetadata(sourceId))
|
else if (ShouldSyncTableMetadata(sourceId))
|
||||||
{
|
{
|
||||||
|
char *qualifiedTableName = quote_qualified_identifier(schemaName, sourceName);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We are converting a citus local table to a distributed/reference table,
|
* We are converting a citus local table to a distributed/reference table,
|
||||||
* so we should prevent dropping the sequence on the table. Otherwise, we'd
|
* so we should prevent dropping the sequence on the table. Otherwise, we'd
|
||||||
|
@ -1612,8 +1614,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
|
||||||
StringInfo command = makeStringInfo();
|
StringInfo command = makeStringInfo();
|
||||||
|
|
||||||
appendStringInfo(command,
|
appendStringInfo(command,
|
||||||
"SELECT pg_catalog.worker_drop_sequence_dependency('%s');",
|
"SELECT pg_catalog.worker_drop_sequence_dependency(%s);",
|
||||||
quote_qualified_identifier(schemaName, sourceName));
|
quote_literal_cstr(qualifiedTableName));
|
||||||
|
|
||||||
SendCommandToWorkersWithMetadata(command->data);
|
SendCommandToWorkersWithMetadata(command->data);
|
||||||
}
|
}
|
||||||
|
@ -1903,11 +1905,17 @@ CreateWorkerChangeSequenceDependencyCommand(char *sequenceSchemaName, char *sequ
|
||||||
char *sourceSchemaName, char *sourceName,
|
char *sourceSchemaName, char *sourceName,
|
||||||
char *targetSchemaName, char *targetName)
|
char *targetSchemaName, char *targetName)
|
||||||
{
|
{
|
||||||
|
char *qualifiedSchemaName = quote_qualified_identifier(sequenceSchemaName,
|
||||||
|
sequenceName);
|
||||||
|
char *qualifiedSourceName = quote_qualified_identifier(sourceSchemaName, sourceName);
|
||||||
|
char *qualifiedTargetName = quote_qualified_identifier(targetSchemaName, targetName);
|
||||||
|
|
||||||
StringInfo query = makeStringInfo();
|
StringInfo query = makeStringInfo();
|
||||||
appendStringInfo(query, "SELECT worker_change_sequence_dependency('%s', '%s', '%s')",
|
appendStringInfo(query, "SELECT worker_change_sequence_dependency(%s, %s, %s)",
|
||||||
quote_qualified_identifier(sequenceSchemaName, sequenceName),
|
quote_literal_cstr(qualifiedSchemaName),
|
||||||
quote_qualified_identifier(sourceSchemaName, sourceName),
|
quote_literal_cstr(qualifiedSourceName),
|
||||||
quote_qualified_identifier(targetSchemaName, targetName));
|
quote_literal_cstr(qualifiedTargetName));
|
||||||
|
|
||||||
return query->data;
|
return query->data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -128,6 +128,9 @@ citus_add_local_table_to_metadata_internal(Oid relationId, bool cascadeViaForeig
|
||||||
{
|
{
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
/* enable citus_add_local_table_to_metadata on an empty node */
|
||||||
|
InsertCoordinatorIfClusterEmpty();
|
||||||
|
|
||||||
bool autoConverted = false;
|
bool autoConverted = false;
|
||||||
CreateCitusLocalTable(relationId, cascadeViaForeignKeys, autoConverted);
|
CreateCitusLocalTable(relationId, cascadeViaForeignKeys, autoConverted);
|
||||||
}
|
}
|
||||||
|
|
|
@ -382,7 +382,6 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
|
||||||
"citus.shard_replication_factor > 1")));
|
"citus.shard_replication_factor > 1")));
|
||||||
}
|
}
|
||||||
|
|
||||||
EnsureCoordinatorIsInMetadata();
|
|
||||||
EnsureCitusTableCanBeCreated(relationId);
|
EnsureCitusTableCanBeCreated(relationId);
|
||||||
|
|
||||||
EnsureValidDistributionColumn(relationId, distributionColumnName);
|
EnsureValidDistributionColumn(relationId, distributionColumnName);
|
||||||
|
@ -528,6 +527,14 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
|
||||||
colocatedTableId = ColocatedTableId(colocationId);
|
colocatedTableId = ColocatedTableId(colocationId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
List *workerNodeList = DistributedTablePlacementNodeList(NoLock);
|
||||||
|
if (workerNodeList == NIL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
|
errmsg("no worker nodes are available for placing shards"),
|
||||||
|
errhint("Add more worker nodes.")));
|
||||||
|
}
|
||||||
|
|
||||||
List *workersForPlacementList;
|
List *workersForPlacementList;
|
||||||
List *shardSplitPointsList;
|
List *shardSplitPointsList;
|
||||||
|
|
||||||
|
@ -555,7 +562,6 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
|
||||||
/*
|
/*
|
||||||
* Place shards in a round-robin fashion across all data nodes.
|
* Place shards in a round-robin fashion across all data nodes.
|
||||||
*/
|
*/
|
||||||
List *workerNodeList = DistributedTablePlacementNodeList(NoLock);
|
|
||||||
workersForPlacementList = RoundRobinWorkerNodeList(workerNodeList, shardCount);
|
workersForPlacementList = RoundRobinWorkerNodeList(workerNodeList, shardCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -856,6 +862,8 @@ WorkerNodesForShardList(List *shardList)
|
||||||
static List *
|
static List *
|
||||||
RoundRobinWorkerNodeList(List *workerNodeList, int listLength)
|
RoundRobinWorkerNodeList(List *workerNodeList, int listLength)
|
||||||
{
|
{
|
||||||
|
Assert(workerNodeList != NIL);
|
||||||
|
|
||||||
List *nodeIdList = NIL;
|
List *nodeIdList = NIL;
|
||||||
|
|
||||||
for (int idx = 0; idx < listLength; idx++)
|
for (int idx = 0; idx < listLength; idx++)
|
||||||
|
|
|
@ -4022,7 +4022,7 @@ CancelTasksForJob(int64 jobid)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* make sure the current user has the rights to cancel this task */
|
/* make sure the current user has the rights to cancel this task */
|
||||||
Oid taskOwner = DatumGetObjectId(values[Anum_pg_dist_background_task_owner]);
|
Oid taskOwner = DatumGetObjectId(values[Anum_pg_dist_background_task_owner - 1]);
|
||||||
if (superuser_arg(taskOwner) && !superuser())
|
if (superuser_arg(taskOwner) && !superuser())
|
||||||
{
|
{
|
||||||
/* must be a superuser to cancel tasks owned by superuser */
|
/* must be a superuser to cancel tasks owned by superuser */
|
||||||
|
|
|
@ -670,10 +670,15 @@ CheckIfRelationWithSameNameExists(ShardInterval *shardInterval, WorkerNode *work
|
||||||
AppendShardIdToName(&shardName, shardInterval->shardId);
|
AppendShardIdToName(&shardName, shardInterval->shardId);
|
||||||
|
|
||||||
StringInfo checkShardExistsQuery = makeStringInfo();
|
StringInfo checkShardExistsQuery = makeStringInfo();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We pass schemaName and shardName without quote_identifier, since
|
||||||
|
* they are used as strings here.
|
||||||
|
*/
|
||||||
appendStringInfo(checkShardExistsQuery,
|
appendStringInfo(checkShardExistsQuery,
|
||||||
"SELECT EXISTS (SELECT FROM pg_catalog.pg_tables WHERE schemaname = '%s' AND tablename = '%s');",
|
"SELECT EXISTS (SELECT FROM pg_catalog.pg_tables WHERE schemaname = %s AND tablename = %s);",
|
||||||
schemaName,
|
quote_literal_cstr(schemaName),
|
||||||
shardName);
|
quote_literal_cstr(shardName));
|
||||||
|
|
||||||
int connectionFlags = 0;
|
int connectionFlags = 0;
|
||||||
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
|
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
|
||||||
|
@ -691,11 +696,13 @@ CheckIfRelationWithSameNameExists(ShardInterval *shardInterval, WorkerNode *work
|
||||||
ReportResultError(connection, result, ERROR);
|
ReportResultError(connection, result, ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
char *checkExists = PQgetvalue(result, 0, 0);
|
char *existsString = PQgetvalue(result, 0, 0);
|
||||||
|
bool tableExists = strcmp(existsString, "t") == 0;
|
||||||
|
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
ForgetResults(connection);
|
ForgetResults(connection);
|
||||||
|
|
||||||
return strcmp(checkExists, "t") == 0;
|
return tableExists;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1897,14 +1897,14 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
||||||
MemoryContext restrictionsMemoryContext = plannerRestrictionContext->memoryContext;
|
MemoryContext restrictionsMemoryContext = plannerRestrictionContext->memoryContext;
|
||||||
MemoryContext oldMemoryContext = MemoryContextSwitchTo(restrictionsMemoryContext);
|
MemoryContext oldMemoryContext = MemoryContextSwitchTo(restrictionsMemoryContext);
|
||||||
|
|
||||||
bool distributedTable = IsCitusTable(rte->relid);
|
bool isCitusTable = IsCitusTable(rte->relid);
|
||||||
|
|
||||||
RelationRestriction *relationRestriction = palloc0(sizeof(RelationRestriction));
|
RelationRestriction *relationRestriction = palloc0(sizeof(RelationRestriction));
|
||||||
relationRestriction->index = restrictionIndex;
|
relationRestriction->index = restrictionIndex;
|
||||||
relationRestriction->relationId = rte->relid;
|
relationRestriction->relationId = rte->relid;
|
||||||
relationRestriction->rte = rte;
|
relationRestriction->rte = rte;
|
||||||
relationRestriction->relOptInfo = relOptInfo;
|
relationRestriction->relOptInfo = relOptInfo;
|
||||||
relationRestriction->distributedRelation = distributedTable;
|
relationRestriction->citusTable = isCitusTable;
|
||||||
relationRestriction->plannerInfo = root;
|
relationRestriction->plannerInfo = root;
|
||||||
|
|
||||||
/* see comments on GetVarFromAssignedParam() */
|
/* see comments on GetVarFromAssignedParam() */
|
||||||
|
@ -1919,10 +1919,42 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
||||||
* We're also keeping track of whether all participant
|
* We're also keeping track of whether all participant
|
||||||
* tables are reference tables.
|
* tables are reference tables.
|
||||||
*/
|
*/
|
||||||
if (distributedTable)
|
if (isCitusTable)
|
||||||
{
|
{
|
||||||
cacheEntry = GetCitusTableCacheEntry(rte->relid);
|
cacheEntry = GetCitusTableCacheEntry(rte->relid);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The statistics objects of the distributed table are not relevant
|
||||||
|
* for the distributed planning, so we can override it.
|
||||||
|
*
|
||||||
|
* Normally, we should not need this. However, the combination of
|
||||||
|
* Postgres commit 269b532aef55a579ae02a3e8e8df14101570dfd9 and
|
||||||
|
* Citus function AdjustPartitioningForDistributedPlanning()
|
||||||
|
* forces us to do this. The commit expects statistics objects
|
||||||
|
* of partitions to have "inh" flag set properly. Whereas, the
|
||||||
|
* function overrides "inh" flag. To avoid Postgres to throw error,
|
||||||
|
* we override statlist such that Postgres does not try to process
|
||||||
|
* any statistics objects during the standard_planner() on the
|
||||||
|
* coordinator. In the end, we do not need the standard_planner()
|
||||||
|
* on the coordinator to generate an optimized plan. We call
|
||||||
|
* into standard_planner() for other purposes, such as generating the
|
||||||
|
* relationRestrictionContext here.
|
||||||
|
*
|
||||||
|
* AdjustPartitioningForDistributedPlanning() is a hack that we use
|
||||||
|
* to prevent Postgres' standard_planner() to expand all the partitions
|
||||||
|
* for the distributed planning when a distributed partitioned table
|
||||||
|
* is queried. It is required for both correctness and performance
|
||||||
|
* reasons. Although we can eliminate the use of the function for
|
||||||
|
* the correctness (e.g., make sure that rest of the planner can handle
|
||||||
|
* partitions), it's performance implication is hard to avoid. Certain
|
||||||
|
* planning logic of Citus (such as router or query pushdown) relies
|
||||||
|
* heavily on the relationRestrictionList. If
|
||||||
|
* AdjustPartitioningForDistributedPlanning() is removed, all the
|
||||||
|
* partitions show up in the, causing high planning times for
|
||||||
|
* such queries.
|
||||||
|
*/
|
||||||
|
relOptInfo->statlist = NIL;
|
||||||
|
|
||||||
relationRestrictionContext->allReferenceTables &=
|
relationRestrictionContext->allReferenceTables &=
|
||||||
IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE);
|
IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3692,7 +3692,7 @@ CopyRelationRestrictionContext(RelationRestrictionContext *oldContext)
|
||||||
|
|
||||||
newRestriction->index = oldRestriction->index;
|
newRestriction->index = oldRestriction->index;
|
||||||
newRestriction->relationId = oldRestriction->relationId;
|
newRestriction->relationId = oldRestriction->relationId;
|
||||||
newRestriction->distributedRelation = oldRestriction->distributedRelation;
|
newRestriction->citusTable = oldRestriction->citusTable;
|
||||||
newRestriction->rte = copyObject(oldRestriction->rte);
|
newRestriction->rte = copyObject(oldRestriction->rte);
|
||||||
|
|
||||||
/* can't be copied, we copy (flatly) a RelOptInfo, and then decouple baserestrictinfo */
|
/* can't be copied, we copy (flatly) a RelOptInfo, and then decouple baserestrictinfo */
|
||||||
|
|
|
@ -224,7 +224,7 @@ ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext)
|
||||||
{
|
{
|
||||||
RelationRestriction *relationRestriction = lfirst(relationRestrictionCell);
|
RelationRestriction *relationRestriction = lfirst(relationRestrictionCell);
|
||||||
|
|
||||||
if (!relationRestriction->distributedRelation)
|
if (!relationRestriction->citusTable)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,10 @@ static Oid FindTargetRelationOid(Relation sourceShardRelation,
|
||||||
HeapTuple tuple,
|
HeapTuple tuple,
|
||||||
char *currentSlotName);
|
char *currentSlotName);
|
||||||
|
|
||||||
|
static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple,
|
||||||
|
TupleDesc sourceTupleDesc,
|
||||||
|
TupleDesc targetTupleDesc);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Postgres uses 'pgoutput' as default plugin for logical replication.
|
* Postgres uses 'pgoutput' as default plugin for logical replication.
|
||||||
* We want to reuse Postgres pgoutput's functionality as much as possible.
|
* We want to reuse Postgres pgoutput's functionality as much as possible.
|
||||||
|
@ -129,6 +133,71 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
}
|
}
|
||||||
|
|
||||||
Relation targetRelation = RelationIdGetRelation(targetRelationOid);
|
Relation targetRelation = RelationIdGetRelation(targetRelationOid);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If any columns from source relation have been dropped, then the tuple needs to
|
||||||
|
* be formatted according to the target relation.
|
||||||
|
*/
|
||||||
|
TupleDesc sourceRelationDesc = RelationGetDescr(relation);
|
||||||
|
TupleDesc targetRelationDesc = RelationGetDescr(targetRelation);
|
||||||
|
if (sourceRelationDesc->natts > targetRelationDesc->natts)
|
||||||
|
{
|
||||||
|
switch (change->action)
|
||||||
|
{
|
||||||
|
case REORDER_BUFFER_CHANGE_INSERT:
|
||||||
|
{
|
||||||
|
HeapTuple sourceRelationNewTuple = &(change->data.tp.newtuple->tuple);
|
||||||
|
HeapTuple targetRelationNewTuple = GetTupleForTargetSchema(
|
||||||
|
sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc);
|
||||||
|
|
||||||
|
change->data.tp.newtuple->tuple = *targetRelationNewTuple;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case REORDER_BUFFER_CHANGE_UPDATE:
|
||||||
|
{
|
||||||
|
HeapTuple sourceRelationNewTuple = &(change->data.tp.newtuple->tuple);
|
||||||
|
HeapTuple targetRelationNewTuple = GetTupleForTargetSchema(
|
||||||
|
sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc);
|
||||||
|
|
||||||
|
change->data.tp.newtuple->tuple = *targetRelationNewTuple;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Format oldtuple according to the target relation. If the column values of replica
|
||||||
|
* identiy change, then the old tuple is non-null and needs to be formatted according
|
||||||
|
* to the target relation schema.
|
||||||
|
*/
|
||||||
|
if (change->data.tp.oldtuple != NULL)
|
||||||
|
{
|
||||||
|
HeapTuple sourceRelationOldTuple = &(change->data.tp.oldtuple->tuple);
|
||||||
|
HeapTuple targetRelationOldTuple = GetTupleForTargetSchema(
|
||||||
|
sourceRelationOldTuple,
|
||||||
|
sourceRelationDesc,
|
||||||
|
targetRelationDesc);
|
||||||
|
|
||||||
|
change->data.tp.oldtuple->tuple = *targetRelationOldTuple;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case REORDER_BUFFER_CHANGE_DELETE:
|
||||||
|
{
|
||||||
|
HeapTuple sourceRelationOldTuple = &(change->data.tp.oldtuple->tuple);
|
||||||
|
HeapTuple targetRelationOldTuple = GetTupleForTargetSchema(
|
||||||
|
sourceRelationOldTuple, sourceRelationDesc, targetRelationDesc);
|
||||||
|
|
||||||
|
change->data.tp.oldtuple->tuple = *targetRelationOldTuple;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */
|
||||||
|
default:
|
||||||
|
ereport(ERROR, errmsg(
|
||||||
|
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
|
||||||
|
change->action));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pgoutputChangeCB(ctx, txn, targetRelation, change);
|
pgoutputChangeCB(ctx, txn, targetRelation, change);
|
||||||
RelationClose(targetRelation);
|
RelationClose(targetRelation);
|
||||||
}
|
}
|
||||||
|
@ -223,3 +292,51 @@ GetHashValueForIncomingTuple(Relation sourceShardRelation,
|
||||||
|
|
||||||
return DatumGetInt32(hashedValueDatum);
|
return DatumGetInt32(hashedValueDatum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetTupleForTargetSchema returns a tuple with the schema of the target relation.
|
||||||
|
* If some columns within the source relations are dropped, we would have to reformat
|
||||||
|
* the tuple to match the schema of the target relation.
|
||||||
|
*
|
||||||
|
* Consider the below scenario:
|
||||||
|
* Session1 : Drop column followed by create_distributed_table_concurrently
|
||||||
|
* Session2 : Concurrent insert workload
|
||||||
|
*
|
||||||
|
* The child shards created by create_distributed_table_concurrently will have less columns
|
||||||
|
* than the source shard because some column were dropped.
|
||||||
|
* The incoming tuple from session2 will have more columns as the writes
|
||||||
|
* happened on source shard. But now the tuple needs to be applied on child shard. So we need to format
|
||||||
|
* it according to child schema.
|
||||||
|
*/
|
||||||
|
static HeapTuple
|
||||||
|
GetTupleForTargetSchema(HeapTuple sourceRelationTuple,
|
||||||
|
TupleDesc sourceRelDesc,
|
||||||
|
TupleDesc targetRelDesc)
|
||||||
|
{
|
||||||
|
/* Deform the tuple */
|
||||||
|
Datum *oldValues = (Datum *) palloc0(sourceRelDesc->natts * sizeof(Datum));
|
||||||
|
bool *oldNulls = (bool *) palloc0(sourceRelDesc->natts * sizeof(bool));
|
||||||
|
heap_deform_tuple(sourceRelationTuple, sourceRelDesc, oldValues,
|
||||||
|
oldNulls);
|
||||||
|
|
||||||
|
|
||||||
|
/* Create new tuple by skipping dropped columns */
|
||||||
|
int nextAttributeIndex = 0;
|
||||||
|
Datum *newValues = (Datum *) palloc0(targetRelDesc->natts * sizeof(Datum));
|
||||||
|
bool *newNulls = (bool *) palloc0(targetRelDesc->natts * sizeof(bool));
|
||||||
|
for (int i = 0; i < sourceRelDesc->natts; i++)
|
||||||
|
{
|
||||||
|
if (TupleDescAttr(sourceRelDesc, i)->attisdropped)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
newValues[nextAttributeIndex] = oldValues[i];
|
||||||
|
newNulls[nextAttributeIndex] = oldNulls[i];
|
||||||
|
nextAttributeIndex++;
|
||||||
|
}
|
||||||
|
|
||||||
|
HeapTuple targetRelationTuple = heap_form_tuple(targetRelDesc, newValues, newNulls);
|
||||||
|
return targetRelationTuple;
|
||||||
|
}
|
||||||
|
|
|
@ -56,7 +56,7 @@ typedef struct RelationRestriction
|
||||||
{
|
{
|
||||||
Index index;
|
Index index;
|
||||||
Oid relationId;
|
Oid relationId;
|
||||||
bool distributedRelation;
|
bool citusTable;
|
||||||
RangeTblEntry *rte;
|
RangeTblEntry *rte;
|
||||||
RelOptInfo *relOptInfo;
|
RelOptInfo *relOptInfo;
|
||||||
PlannerInfo *plannerInfo;
|
PlannerInfo *plannerInfo;
|
||||||
|
|
|
@ -8,5 +8,6 @@ test: isolation_cluster_management
|
||||||
test: isolation_logical_replication_single_shard_commands
|
test: isolation_logical_replication_single_shard_commands
|
||||||
test: isolation_logical_replication_multi_shard_commands
|
test: isolation_logical_replication_multi_shard_commands
|
||||||
test: isolation_non_blocking_shard_split
|
test: isolation_non_blocking_shard_split
|
||||||
|
test: isolation_create_distributed_concurrently_after_drop_column
|
||||||
test: isolation_non_blocking_shard_split_with_index_as_replicaIdentity
|
test: isolation_non_blocking_shard_split_with_index_as_replicaIdentity
|
||||||
test: isolation_non_blocking_shard_split_fkey
|
test: isolation_non_blocking_shard_split_fkey
|
||||||
|
|
|
@ -176,5 +176,39 @@ SELECT citus_rebalance_wait();
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE t1;
|
||||||
|
-- make sure a non-super user can stop rebalancing
|
||||||
|
CREATE USER non_super_user_rebalance WITH LOGIN;
|
||||||
|
GRANT ALL ON SCHEMA background_rebalance TO non_super_user_rebalance;
|
||||||
|
SET ROLE non_super_user_rebalance;
|
||||||
|
CREATE TABLE non_super_user_t1 (a int PRIMARY KEY);
|
||||||
|
SELECT create_distributed_table('non_super_user_t1', 'a', shard_count => 4, colocate_with => 'none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_move_shard_placement(85674008, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes');
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT 1 FROM citus_rebalance_start();
|
||||||
|
NOTICE: Scheduled 1 moves as job xxx
|
||||||
|
DETAIL: Rebalance scheduled as background job
|
||||||
|
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_rebalance_stop();
|
||||||
|
citus_rebalance_stop
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET ROLE;
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA background_rebalance CASCADE;
|
DROP SCHEMA background_rebalance CASCADE;
|
||||||
|
|
|
@ -57,6 +57,35 @@ ERROR: cannot colocate tables nocolo and test
|
||||||
DETAIL: Distribution column types don't match for nocolo and test.
|
DETAIL: Distribution column types don't match for nocolo and test.
|
||||||
select create_distributed_table_concurrently('test','key', colocate_with := 'noexists');
|
select create_distributed_table_concurrently('test','key', colocate_with := 'noexists');
|
||||||
ERROR: relation "noexists" does not exist
|
ERROR: relation "noexists" does not exist
|
||||||
|
select citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false);
|
||||||
|
citus_set_node_property
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
|
||||||
|
citus_set_node_property
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select create_distributed_table_concurrently('test','key');
|
||||||
|
NOTICE: relation test does not have a REPLICA IDENTITY or PRIMARY KEY
|
||||||
|
DETAIL: UPDATE and DELETE commands on the relation will error out during create_distributed_table_concurrently unless there is a REPLICA IDENTITY or PRIMARY KEY. INSERT commands will still work.
|
||||||
|
ERROR: no worker nodes are available for placing shards
|
||||||
|
HINT: Add more worker nodes.
|
||||||
|
select citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true);
|
||||||
|
citus_set_node_property
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
|
||||||
|
citus_set_node_property
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- use colocate_with "default"
|
-- use colocate_with "default"
|
||||||
select create_distributed_table_concurrently('test','key', shard_count := 11);
|
select create_distributed_table_concurrently('test','key', shard_count := 11);
|
||||||
NOTICE: relation test does not have a REPLICA IDENTITY or PRIMARY KEY
|
NOTICE: relation test does not have a REPLICA IDENTITY or PRIMARY KEY
|
||||||
|
|
|
@ -0,0 +1,667 @@
|
||||||
|
Parsed test spec with 3 sessions
|
||||||
|
|
||||||
|
starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1
|
||||||
|
step s2-print-cluster-1:
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_pk
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
|
||||||
|
nodeport|shardid|success|result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
tenant_id|dummy|measurement_id|payload|observation_time
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
step s3-acquire-advisory-lock:
|
||||||
|
SELECT pg_advisory_lock(44000, 55152);
|
||||||
|
|
||||||
|
pg_advisory_lock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-alter-table:
|
||||||
|
ALTER TABLE observations_with_pk DROP COLUMN dummy;
|
||||||
|
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
|
||||||
|
|
||||||
|
step s1-set-factor-1:
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT citus_set_coordinator_host('localhost');
|
||||||
|
|
||||||
|
citus_set_coordinator_host
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-create-distributed-table-observations_with_pk-concurrently:
|
||||||
|
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
|
||||||
|
<waiting ...>
|
||||||
|
step s2-insert-observations_with_pk:
|
||||||
|
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
|
||||||
|
step s2-update-observations_with_pk:
|
||||||
|
UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
|
||||||
|
|
||||||
|
step s2-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-print-cluster-1:
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_pk
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
|
||||||
|
nodeport|shardid|success|result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
57636|1500004|t | 4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
tenant_id|measurement_id|payload |observation_time
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
|
||||||
|
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
step s3-release-advisory-lock:
|
||||||
|
SELECT pg_advisory_unlock(44000, 55152);
|
||||||
|
|
||||||
|
pg_advisory_unlock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-create-distributed-table-observations_with_pk-concurrently: <... completed>
|
||||||
|
create_distributed_table_concurrently
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s2-print-cluster-1:
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_pk
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
|
||||||
|
nodeport|shardid|success|result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
57637|1500006|t | 4
|
||||||
|
57637|1500008|t | 0
|
||||||
|
57638|1500005|t | 0
|
||||||
|
57638|1500007|t | 0
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
tenant_id|measurement_id|payload |observation_time
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
|
||||||
|
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-primary-key-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1
|
||||||
|
step s2-print-cluster-1:
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_pk
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
|
||||||
|
nodeport|shardid|success|result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
tenant_id|dummy|measurement_id|payload|observation_time
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
step s3-acquire-advisory-lock:
|
||||||
|
SELECT pg_advisory_lock(44000, 55152);
|
||||||
|
|
||||||
|
pg_advisory_lock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-alter-table:
|
||||||
|
ALTER TABLE observations_with_pk DROP COLUMN dummy;
|
||||||
|
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
|
||||||
|
|
||||||
|
step s1-set-factor-1:
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT citus_set_coordinator_host('localhost');
|
||||||
|
|
||||||
|
citus_set_coordinator_host
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-create-distributed-table-observations_with_pk-concurrently:
|
||||||
|
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
|
||||||
|
<waiting ...>
|
||||||
|
step s2-insert-observations_with_pk:
|
||||||
|
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
|
||||||
|
step s2-update-primary-key-observations_with_pk:
|
||||||
|
UPDATE observations_with_pk set measurement_id=100 where tenant_id = 'tenant_id' and measurement_id = 4 ;
|
||||||
|
|
||||||
|
step s2-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-print-cluster-1:
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_pk
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
|
||||||
|
nodeport|shardid|success|result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
57636|1500009|t | 4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
tenant_id|measurement_id|payload |observation_time
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 3|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 100|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
step s3-release-advisory-lock:
|
||||||
|
SELECT pg_advisory_unlock(44000, 55152);
|
||||||
|
|
||||||
|
pg_advisory_unlock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-create-distributed-table-observations_with_pk-concurrently: <... completed>
|
||||||
|
create_distributed_table_concurrently
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s2-print-cluster-1:
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_pk
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
|
||||||
|
nodeport|shardid|success|result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
57637|1500011|t | 4
|
||||||
|
57637|1500013|t | 0
|
||||||
|
57638|1500010|t | 0
|
||||||
|
57638|1500012|t | 0
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
tenant_id|measurement_id|payload |observation_time
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 3|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 100|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-observations_with_pk s2-delete-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1
|
||||||
|
step s2-print-cluster-1:
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_pk
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
|
||||||
|
nodeport|shardid|success|result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
tenant_id|dummy|measurement_id|payload|observation_time
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
step s3-acquire-advisory-lock:
|
||||||
|
SELECT pg_advisory_lock(44000, 55152);
|
||||||
|
|
||||||
|
pg_advisory_lock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-alter-table:
|
||||||
|
ALTER TABLE observations_with_pk DROP COLUMN dummy;
|
||||||
|
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
|
||||||
|
|
||||||
|
step s1-set-factor-1:
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT citus_set_coordinator_host('localhost');
|
||||||
|
|
||||||
|
citus_set_coordinator_host
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-create-distributed-table-observations_with_pk-concurrently:
|
||||||
|
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
|
||||||
|
<waiting ...>
|
||||||
|
step s2-insert-observations_with_pk:
|
||||||
|
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
|
||||||
|
step s2-update-observations_with_pk:
|
||||||
|
UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
|
||||||
|
|
||||||
|
step s2-delete-observations_with_pk:
|
||||||
|
DELETE FROM observations_with_pk where tenant_id = 'tenant_id' and measurement_id = 3 ;
|
||||||
|
|
||||||
|
step s2-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-print-cluster-1:
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_pk
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
|
||||||
|
nodeport|shardid|success|result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
57636|1500014|t | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
tenant_id|measurement_id|payload |observation_time
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
step s3-release-advisory-lock:
|
||||||
|
SELECT pg_advisory_unlock(44000, 55152);
|
||||||
|
|
||||||
|
pg_advisory_unlock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-create-distributed-table-observations_with_pk-concurrently: <... completed>
|
||||||
|
create_distributed_table_concurrently
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s2-print-cluster-1:
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_pk
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
|
||||||
|
nodeport|shardid|success|result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
57637|1500016|t | 3
|
||||||
|
57637|1500018|t | 0
|
||||||
|
57638|1500015|t | 0
|
||||||
|
57638|1500017|t | 0
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
tenant_id|measurement_id|payload |observation_time
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s2-print-cluster-2 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations-2-concurrently s2-insert-observations_with_full_replica_identity s2-update-observations_with_full_replica_identity s2-end s2-print-cluster-2 s3-release-advisory-lock s2-print-cluster-2
|
||||||
|
step s2-print-cluster-2:
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_full_replica_identity
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
|
||||||
|
nodeport|shardid|success|result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
tenant_id|dummy|measurement_id|payload|observation_time
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
step s3-acquire-advisory-lock:
|
||||||
|
SELECT pg_advisory_lock(44000, 55152);
|
||||||
|
|
||||||
|
pg_advisory_lock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-alter-table:
|
||||||
|
ALTER TABLE observations_with_pk DROP COLUMN dummy;
|
||||||
|
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
|
||||||
|
|
||||||
|
step s1-set-factor-1:
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT citus_set_coordinator_host('localhost');
|
||||||
|
|
||||||
|
citus_set_coordinator_host
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-create-distributed-table-observations-2-concurrently:
|
||||||
|
SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id');
|
||||||
|
<waiting ...>
|
||||||
|
step s2-insert-observations_with_full_replica_identity:
|
||||||
|
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
|
||||||
|
step s2-update-observations_with_full_replica_identity:
|
||||||
|
UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
|
||||||
|
|
||||||
|
step s2-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-print-cluster-2:
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_full_replica_identity
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
|
||||||
|
nodeport|shardid|success|result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
57636|1500019|t | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
tenant_id|measurement_id|payload |observation_time
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
step s3-release-advisory-lock:
|
||||||
|
SELECT pg_advisory_unlock(44000, 55152);
|
||||||
|
|
||||||
|
pg_advisory_unlock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-create-distributed-table-observations-2-concurrently: <... completed>
|
||||||
|
create_distributed_table_concurrently
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s2-print-cluster-2:
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_full_replica_identity
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
|
||||||
|
nodeport|shardid|success|result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
57637|1500021|t | 3
|
||||||
|
57637|1500023|t | 0
|
||||||
|
57638|1500020|t | 0
|
||||||
|
57638|1500022|t | 0
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
tenant_id|measurement_id|payload |observation_time
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s2-print-cluster-2 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations-2-concurrently s2-insert-observations_with_full_replica_identity s2-update-observations_with_full_replica_identity s2-delete-observations_with_full_replica_identity s2-end s2-print-cluster-2 s3-release-advisory-lock s2-print-cluster-2
|
||||||
|
step s2-print-cluster-2:
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_full_replica_identity
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
|
||||||
|
nodeport|shardid|success|result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
tenant_id|dummy|measurement_id|payload|observation_time
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
step s3-acquire-advisory-lock:
|
||||||
|
SELECT pg_advisory_lock(44000, 55152);
|
||||||
|
|
||||||
|
pg_advisory_lock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-alter-table:
|
||||||
|
ALTER TABLE observations_with_pk DROP COLUMN dummy;
|
||||||
|
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
|
||||||
|
|
||||||
|
step s1-set-factor-1:
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT citus_set_coordinator_host('localhost');
|
||||||
|
|
||||||
|
citus_set_coordinator_host
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-create-distributed-table-observations-2-concurrently:
|
||||||
|
SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id');
|
||||||
|
<waiting ...>
|
||||||
|
step s2-insert-observations_with_full_replica_identity:
|
||||||
|
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
|
||||||
|
step s2-update-observations_with_full_replica_identity:
|
||||||
|
UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
|
||||||
|
|
||||||
|
step s2-delete-observations_with_full_replica_identity:
|
||||||
|
DELETE FROM observations_with_full_replica_identity where tenant_id = 'tenant_id' and measurement_id = 3 ;
|
||||||
|
|
||||||
|
step s2-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-print-cluster-2:
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_full_replica_identity
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
|
||||||
|
nodeport|shardid|success|result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
57636|1500024|t | 2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
tenant_id|measurement_id|payload |observation_time
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
step s3-release-advisory-lock:
|
||||||
|
SELECT pg_advisory_unlock(44000, 55152);
|
||||||
|
|
||||||
|
pg_advisory_unlock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-create-distributed-table-observations-2-concurrently: <... completed>
|
||||||
|
create_distributed_table_concurrently
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s2-print-cluster-2:
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_full_replica_identity
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
|
||||||
|
nodeport|shardid|success|result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
57637|1500026|t | 2
|
||||||
|
57637|1500028|t | 0
|
||||||
|
57638|1500025|t | 0
|
||||||
|
57638|1500027|t | 0
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
tenant_id|measurement_id|payload |observation_time
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||||
|
(2 rows)
|
||||||
|
|
|
@ -1177,7 +1177,7 @@ DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||||
SHOW citus.version;
|
SHOW citus.version;
|
||||||
citus.version
|
citus.version
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
11.1devel
|
11.1.1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- ensure no unexpected objects were created outside pg_catalog
|
-- ensure no unexpected objects were created outside pg_catalog
|
||||||
|
@ -1521,6 +1521,66 @@ SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenanc
|
||||||
1
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- confirm that we can create a distributed table concurrently on an empty node
|
||||||
|
DROP EXTENSION citus;
|
||||||
|
CREATE EXTENSION citus;
|
||||||
|
CREATE TABLE test (x int, y int);
|
||||||
|
INSERT INTO test VALUES (1,2);
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.defer_drop_after_shard_split TO off;
|
||||||
|
SELECT create_distributed_table_concurrently('test','x');
|
||||||
|
NOTICE: relation test does not have a REPLICA IDENTITY or PRIMARY KEY
|
||||||
|
DETAIL: UPDATE and DELETE commands on the relation will error out during create_distributed_table_concurrently unless there is a REPLICA IDENTITY or PRIMARY KEY. INSERT commands will still work.
|
||||||
|
create_distributed_table_concurrently
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE test;
|
||||||
|
TRUNCATE pg_dist_node;
|
||||||
|
-- confirm that we can create a distributed table on an empty node
|
||||||
|
CREATE TABLE test (x int, y int);
|
||||||
|
INSERT INTO test VALUES (1,2);
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT create_distributed_table('test','x');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test$$)
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE test;
|
||||||
|
TRUNCATE pg_dist_node;
|
||||||
|
-- confirm that we can create a reference table on an empty node
|
||||||
|
CREATE TABLE test (x int, y int);
|
||||||
|
INSERT INTO test VALUES (1,2);
|
||||||
|
SELECT create_reference_table('test');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test$$)
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE test;
|
||||||
|
TRUNCATE pg_dist_node;
|
||||||
|
-- confirm that we can create a local table on an empty node
|
||||||
|
CREATE TABLE test (x int, y int);
|
||||||
|
INSERT INTO test VALUES (1,2);
|
||||||
|
SELECT citus_add_local_table_to_metadata('test');
|
||||||
|
citus_add_local_table_to_metadata
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE test;
|
||||||
|
DROP EXTENSION citus;
|
||||||
|
CREATE EXTENSION citus;
|
||||||
DROP TABLE version_mismatch_table;
|
DROP TABLE version_mismatch_table;
|
||||||
DROP SCHEMA multi_extension;
|
DROP SCHEMA multi_extension;
|
||||||
ERROR: cannot drop schema multi_extension because other objects depend on it
|
ERROR: cannot drop schema multi_extension because other objects depend on it
|
||||||
|
|
|
@ -521,9 +521,9 @@ SELECT tablename, indexname FROM pg_indexes WHERE schemaname = 'fix_idx_names' O
|
||||||
tablename | indexname
|
tablename | indexname
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
date_partitioned_citus_local_table | date_partitioned_citus_local_table_measureid_idx
|
date_partitioned_citus_local_table | date_partitioned_citus_local_table_measureid_idx
|
||||||
date_partitioned_citus_local_table_361369 | date_partitioned_citus_local_table_measureid_idx_361369
|
date_partitioned_citus_local_table_361377 | date_partitioned_citus_local_table_measureid_idx_361377
|
||||||
partition_local_table | partition_local_table_measureid_idx
|
partition_local_table | partition_local_table_measureid_idx
|
||||||
partition_local_table_361370 | partition_local_table_measureid_idx_361370
|
partition_local_table_361378 | partition_local_table_measureid_idx_361378
|
||||||
(4 rows)
|
(4 rows)
|
||||||
|
|
||||||
-- creating a single object should only need to trigger fixing the single object
|
-- creating a single object should only need to trigger fixing the single object
|
||||||
|
@ -753,7 +753,7 @@ DETAIL: drop cascades to table not_partitioned
|
||||||
drop cascades to table not_distributed
|
drop cascades to table not_distributed
|
||||||
drop cascades to table fk_table
|
drop cascades to table fk_table
|
||||||
drop cascades to table p
|
drop cascades to table p
|
||||||
drop cascades to table date_partitioned_citus_local_table_361369
|
drop cascades to table date_partitioned_citus_local_table_361377
|
||||||
drop cascades to table date_partitioned_citus_local_table
|
drop cascades to table date_partitioned_citus_local_table
|
||||||
drop cascades to table parent_table
|
drop cascades to table parent_table
|
||||||
SELECT citus_remove_node('localhost', :master_port);
|
SELECT citus_remove_node('localhost', :master_port);
|
||||||
|
|
|
@ -4324,12 +4324,66 @@ WHERE schemaname = 'partitioning_schema' AND tablename ilike '%part_table_with_%
|
||||||
(6 rows)
|
(6 rows)
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
SET search_path TO partitioning_schema;
|
||||||
|
-- create parent table
|
||||||
|
CREATE TABLE stxdinp(i int, a int, b int) PARTITION BY RANGE (i);
|
||||||
|
-- create partition
|
||||||
|
CREATE TABLE stxdinp1 PARTITION OF stxdinp FOR VALUES FROM (1) TO (100);
|
||||||
|
-- populate table
|
||||||
|
INSERT INTO stxdinp SELECT 1, a/100, a/100 FROM generate_series(1, 999) a;
|
||||||
|
-- create extended statistics
|
||||||
|
CREATE STATISTICS stxdinp ON a, b FROM stxdinp;
|
||||||
|
-- distribute parent table
|
||||||
|
SELECT create_distributed_table('stxdinp', 'i');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$partitioning_schema.stxdinp1$$)
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- run select query, works fine
|
||||||
|
SELECT a, b FROM stxdinp GROUP BY 1, 2;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 1
|
||||||
|
3 | 3
|
||||||
|
7 | 7
|
||||||
|
2 | 2
|
||||||
|
8 | 8
|
||||||
|
0 | 0
|
||||||
|
5 | 5
|
||||||
|
6 | 6
|
||||||
|
9 | 9
|
||||||
|
4 | 4
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
|
-- partitions are processed recursively for PG15+
|
||||||
|
VACUUM ANALYZE stxdinp;
|
||||||
|
SELECT a, b FROM stxdinp GROUP BY 1, 2;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 1
|
||||||
|
3 | 3
|
||||||
|
7 | 7
|
||||||
|
2 | 2
|
||||||
|
8 | 8
|
||||||
|
0 | 0
|
||||||
|
5 | 5
|
||||||
|
6 | 6
|
||||||
|
9 | 9
|
||||||
|
4 | 4
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
DROP SCHEMA partitioning_schema CASCADE;
|
DROP SCHEMA partitioning_schema CASCADE;
|
||||||
NOTICE: drop cascades to 4 other objects
|
NOTICE: drop cascades to 5 other objects
|
||||||
DETAIL: drop cascades to table partitioning_schema."schema-test"
|
DETAIL: drop cascades to table "schema-test"
|
||||||
drop cascades to table partitioning_schema.another_distributed_table
|
drop cascades to table another_distributed_table
|
||||||
drop cascades to table partitioning_schema.distributed_parent_table
|
drop cascades to table distributed_parent_table
|
||||||
drop cascades to table partitioning_schema.part_table_with_very_long_name
|
drop cascades to table part_table_with_very_long_name
|
||||||
|
drop cascades to table stxdinp
|
||||||
RESET search_path;
|
RESET search_path;
|
||||||
DROP TABLE IF EXISTS
|
DROP TABLE IF EXISTS
|
||||||
partitioning_hash_test,
|
partitioning_hash_test,
|
||||||
|
|
|
@ -0,0 +1,176 @@
|
||||||
|
#include "isolation_mx_common.include.spec"
|
||||||
|
|
||||||
|
// Test scenario for nonblocking split and concurrent INSERT/UPDATE/DELETE
|
||||||
|
// session s1 - Executes create_distributed_table_concurrently after dropping a column on tables with replica identities
|
||||||
|
// session s2 - Does concurrent inserts/update/delete
|
||||||
|
// session s3 - Holds advisory locks
|
||||||
|
|
||||||
|
setup
|
||||||
|
{
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
CREATE TABLE observations_with_pk (
|
||||||
|
tenant_id text not null,
|
||||||
|
dummy int,
|
||||||
|
measurement_id bigserial not null,
|
||||||
|
payload jsonb not null,
|
||||||
|
observation_time timestamptz not null default '03/11/2018 02:00:00'::TIMESTAMP,
|
||||||
|
PRIMARY KEY (tenant_id, measurement_id)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE observations_with_full_replica_identity (
|
||||||
|
tenant_id text not null,
|
||||||
|
dummy int,
|
||||||
|
measurement_id bigserial not null,
|
||||||
|
payload jsonb not null,
|
||||||
|
observation_time timestamptz not null default '03/11/2018 02:00:00'::TIMESTAMP
|
||||||
|
);
|
||||||
|
ALTER TABLE observations_with_full_replica_identity REPLICA IDENTITY FULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
teardown
|
||||||
|
{
|
||||||
|
DROP TABLE observations_with_pk;
|
||||||
|
DROP TABLE observations_with_full_replica_identity;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s1"
|
||||||
|
|
||||||
|
step "s1-alter-table"
|
||||||
|
{
|
||||||
|
ALTER TABLE observations_with_pk DROP COLUMN dummy;
|
||||||
|
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-set-factor-1"
|
||||||
|
{
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT citus_set_coordinator_host('localhost');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-create-distributed-table-observations_with_pk-concurrently"
|
||||||
|
{
|
||||||
|
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-create-distributed-table-observations-2-concurrently"
|
||||||
|
{
|
||||||
|
SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id');
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s2"
|
||||||
|
|
||||||
|
step "s2-begin"
|
||||||
|
{
|
||||||
|
BEGIN;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-insert-observations_with_pk"
|
||||||
|
{
|
||||||
|
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-insert-observations_with_full_replica_identity"
|
||||||
|
{
|
||||||
|
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-update-observations_with_pk"
|
||||||
|
{
|
||||||
|
UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-update-primary-key-observations_with_pk"
|
||||||
|
{
|
||||||
|
UPDATE observations_with_pk set measurement_id=100 where tenant_id = 'tenant_id' and measurement_id = 4 ;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-update-observations_with_full_replica_identity"
|
||||||
|
{
|
||||||
|
UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-delete-observations_with_pk"
|
||||||
|
{
|
||||||
|
DELETE FROM observations_with_pk where tenant_id = 'tenant_id' and measurement_id = 3 ;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-delete-observations_with_full_replica_identity"
|
||||||
|
{
|
||||||
|
DELETE FROM observations_with_full_replica_identity where tenant_id = 'tenant_id' and measurement_id = 3 ;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-end"
|
||||||
|
{
|
||||||
|
COMMIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-print-cluster-1"
|
||||||
|
{
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_pk
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-print-cluster-2"
|
||||||
|
{
|
||||||
|
-- row count per shard
|
||||||
|
SELECT
|
||||||
|
nodeport, shardid, success, result
|
||||||
|
FROM
|
||||||
|
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
|
||||||
|
ORDER BY
|
||||||
|
nodeport, shardid;
|
||||||
|
|
||||||
|
SELECT *
|
||||||
|
FROM
|
||||||
|
observations_with_full_replica_identity
|
||||||
|
ORDER BY
|
||||||
|
measurement_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
session "s3"
|
||||||
|
|
||||||
|
// this advisory lock with (almost) random values are only used
|
||||||
|
// for testing purposes. For details, check Citus' logical replication
|
||||||
|
// source code
|
||||||
|
step "s3-acquire-advisory-lock"
|
||||||
|
{
|
||||||
|
SELECT pg_advisory_lock(44000, 55152);
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s3-release-advisory-lock"
|
||||||
|
{
|
||||||
|
SELECT pg_advisory_unlock(44000, 55152);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Concurrent Insert/Update with create_distributed_table_concurrently(with primary key as replica identity) after dropping a column:
|
||||||
|
// s3 holds advisory lock -> s1 starts create_distributed_table_concurrently and waits for advisory lock ->
|
||||||
|
// s2 concurrently inserts/deletes/updates rows -> s3 releases the advisory lock
|
||||||
|
// -> s1 complete create_distributed_table_concurrently -> result is reflected in new shards
|
||||||
|
permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1"
|
||||||
|
permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-primary-key-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1"
|
||||||
|
permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-observations_with_pk" "s2-delete-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1"
|
||||||
|
|
||||||
|
|
||||||
|
// Concurrent Insert/Update with create_distributed_table_concurrently(with replica identity full) after dropping a column:
|
||||||
|
// s3 holds advisory lock -> s1 starts create_distributed_table_concurrently and waits for advisory lock ->
|
||||||
|
// s2 concurrently inserts/deletes/updates rows -> s3 releases the advisory lock
|
||||||
|
// -> s1 complete create_distributed_table_concurrently -> result is reflected in new shards
|
||||||
|
permutation "s2-print-cluster-2" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations-2-concurrently" "s2-insert-observations_with_full_replica_identity" "s2-update-observations_with_full_replica_identity" "s2-end" "s2-print-cluster-2" "s3-release-advisory-lock" "s2-print-cluster-2"
|
||||||
|
permutation "s2-print-cluster-2" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations-2-concurrently" "s2-insert-observations_with_full_replica_identity" "s2-update-observations_with_full_replica_identity" "s2-delete-observations_with_full_replica_identity" "s2-end" "s2-print-cluster-2" "s3-release-advisory-lock" "s2-print-cluster-2"
|
|
@ -59,6 +59,24 @@ SELECT 1 FROM citus_rebalance_start();
|
||||||
SELECT rebalance_table_shards();
|
SELECT rebalance_table_shards();
|
||||||
SELECT citus_rebalance_wait();
|
SELECT citus_rebalance_wait();
|
||||||
|
|
||||||
|
DROP TABLE t1;
|
||||||
|
|
||||||
|
|
||||||
|
-- make sure a non-super user can stop rebalancing
|
||||||
|
CREATE USER non_super_user_rebalance WITH LOGIN;
|
||||||
|
GRANT ALL ON SCHEMA background_rebalance TO non_super_user_rebalance;
|
||||||
|
|
||||||
|
SET ROLE non_super_user_rebalance;
|
||||||
|
|
||||||
|
CREATE TABLE non_super_user_t1 (a int PRIMARY KEY);
|
||||||
|
SELECT create_distributed_table('non_super_user_t1', 'a', shard_count => 4, colocate_with => 'none');
|
||||||
|
SELECT citus_move_shard_placement(85674008, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes');
|
||||||
|
|
||||||
|
SELECT 1 FROM citus_rebalance_start();
|
||||||
|
SELECT citus_rebalance_stop();
|
||||||
|
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA background_rebalance CASCADE;
|
DROP SCHEMA background_rebalance CASCADE;
|
||||||
|
|
|
@ -38,6 +38,12 @@ select create_distributed_table_concurrently('nocolo','x');
|
||||||
select create_distributed_table_concurrently('test','key', colocate_with := 'nocolo');
|
select create_distributed_table_concurrently('test','key', colocate_with := 'nocolo');
|
||||||
select create_distributed_table_concurrently('test','key', colocate_with := 'noexists');
|
select create_distributed_table_concurrently('test','key', colocate_with := 'noexists');
|
||||||
|
|
||||||
|
select citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false);
|
||||||
|
select citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
|
||||||
|
select create_distributed_table_concurrently('test','key');
|
||||||
|
select citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true);
|
||||||
|
select citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
|
||||||
|
|
||||||
-- use colocate_with "default"
|
-- use colocate_with "default"
|
||||||
select create_distributed_table_concurrently('test','key', shard_count := 11);
|
select create_distributed_table_concurrently('test','key', shard_count := 11);
|
||||||
|
|
||||||
|
|
|
@ -795,5 +795,39 @@ FROM test.maintenance_worker();
|
||||||
-- confirm that there is only one maintenance daemon
|
-- confirm that there is only one maintenance daemon
|
||||||
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
|
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
|
||||||
|
|
||||||
|
-- confirm that we can create a distributed table concurrently on an empty node
|
||||||
|
DROP EXTENSION citus;
|
||||||
|
CREATE EXTENSION citus;
|
||||||
|
CREATE TABLE test (x int, y int);
|
||||||
|
INSERT INTO test VALUES (1,2);
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.defer_drop_after_shard_split TO off;
|
||||||
|
SELECT create_distributed_table_concurrently('test','x');
|
||||||
|
DROP TABLE test;
|
||||||
|
TRUNCATE pg_dist_node;
|
||||||
|
|
||||||
|
-- confirm that we can create a distributed table on an empty node
|
||||||
|
CREATE TABLE test (x int, y int);
|
||||||
|
INSERT INTO test VALUES (1,2);
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT create_distributed_table('test','x');
|
||||||
|
DROP TABLE test;
|
||||||
|
TRUNCATE pg_dist_node;
|
||||||
|
|
||||||
|
-- confirm that we can create a reference table on an empty node
|
||||||
|
CREATE TABLE test (x int, y int);
|
||||||
|
INSERT INTO test VALUES (1,2);
|
||||||
|
SELECT create_reference_table('test');
|
||||||
|
DROP TABLE test;
|
||||||
|
TRUNCATE pg_dist_node;
|
||||||
|
|
||||||
|
-- confirm that we can create a local table on an empty node
|
||||||
|
CREATE TABLE test (x int, y int);
|
||||||
|
INSERT INTO test VALUES (1,2);
|
||||||
|
SELECT citus_add_local_table_to_metadata('test');
|
||||||
|
DROP TABLE test;
|
||||||
|
DROP EXTENSION citus;
|
||||||
|
CREATE EXTENSION citus;
|
||||||
|
|
||||||
DROP TABLE version_mismatch_table;
|
DROP TABLE version_mismatch_table;
|
||||||
DROP SCHEMA multi_extension;
|
DROP SCHEMA multi_extension;
|
||||||
|
|
|
@ -2002,6 +2002,30 @@ SELECT tablename, indexname FROM pg_indexes
|
||||||
WHERE schemaname = 'partitioning_schema' AND tablename ilike '%part_table_with_%' ORDER BY 1, 2;
|
WHERE schemaname = 'partitioning_schema' AND tablename ilike '%part_table_with_%' ORDER BY 1, 2;
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
SET search_path TO partitioning_schema;
|
||||||
|
|
||||||
|
-- create parent table
|
||||||
|
CREATE TABLE stxdinp(i int, a int, b int) PARTITION BY RANGE (i);
|
||||||
|
|
||||||
|
-- create partition
|
||||||
|
CREATE TABLE stxdinp1 PARTITION OF stxdinp FOR VALUES FROM (1) TO (100);
|
||||||
|
|
||||||
|
-- populate table
|
||||||
|
INSERT INTO stxdinp SELECT 1, a/100, a/100 FROM generate_series(1, 999) a;
|
||||||
|
|
||||||
|
-- create extended statistics
|
||||||
|
CREATE STATISTICS stxdinp ON a, b FROM stxdinp;
|
||||||
|
|
||||||
|
-- distribute parent table
|
||||||
|
SELECT create_distributed_table('stxdinp', 'i');
|
||||||
|
|
||||||
|
-- run select query, works fine
|
||||||
|
SELECT a, b FROM stxdinp GROUP BY 1, 2;
|
||||||
|
|
||||||
|
-- partitions are processed recursively for PG15+
|
||||||
|
VACUUM ANALYZE stxdinp;
|
||||||
|
SELECT a, b FROM stxdinp GROUP BY 1, 2;
|
||||||
|
|
||||||
DROP SCHEMA partitioning_schema CASCADE;
|
DROP SCHEMA partitioning_schema CASCADE;
|
||||||
RESET search_path;
|
RESET search_path;
|
||||||
DROP TABLE IF EXISTS
|
DROP TABLE IF EXISTS
|
||||||
|
|
Loading…
Reference in New Issue