Compare commits

...

11 Commits

Author SHA1 Message Date
Onur Tirtir b7ae596fe8
Bump citus version to 11.1.1 (#6356) 2022-09-16 12:33:09 +03:00
Onur Tirtir 6f4324623c Add changelog entries for 11.1.1 (#6354)
(cherry picked from commit 8b5cdaf0e9)
2022-09-16 12:17:25 +03:00
Marco Slot d5db0adc17 Allow create_distributed_table_concurrently on an empty node (#6353)
Co-authored-by: Marco Slot <marco.slot@gmail.com>
2022-09-16 12:17:25 +03:00
Onur Tirtir 099523452e Add changelog entries for 11.1.0 (#6349)
Created by executing `prepare_changelog.pl citus 11.1.0 2022-03-29`.

(cherry picked from commit 57e354ac91)
2022-09-16 11:20:00 +03:00
Onder Kalaci af448da1a7 Prevent failures on partitioned distributed tables with statistics objects on PG 15
Comment from the code is clear on this:
/*
 * The statistics objects of the distributed table are not relevant
 * for the distributed planning, so we can override it.
 *
 * Normally, we should not need this. However, the combination of
 * Postgres commit 269b532aef55a579ae02a3e8e8df14101570dfd9 and
 * Citus function AdjustPartitioningForDistributedPlanning()
 * forces us to do this. The commit expects statistics objects
 * of partitions to have "inh" flag set properly. Whereas, the
 * function overrides "inh" flag. To avoid Postgres to throw error,
 * we override statlist such that Postgres does not try to process
 * any statistics objects during the standard_planner() on the
 * coordinator. In the end, we do not need the standard_planner()
 * on the coordinator to generate an optimized plan. We call
 * into standard_planner() for other purposes, such as generating the
 * relationRestrictionContext here.
 *
 * AdjustPartitioningForDistributedPlanning() is a hack that we use
 * to prevent Postgres' standard_planner() to expand all the partitions
 * for the distributed planning when a distributed partitioned table
 * is queried. It is required for both correctness and performance
 * reasons. Although we can eliminate the use of the function for
 * the correctness (e.g., make sure that rest of the planner can handle
 * partitions), it's performance implication is hard to avoid. Certain
 * planning logic of Citus (such as router or query pushdown) relies
 * heavily on the relationRestrictionList. If
 * AdjustPartitioningForDistributedPlanning() is removed, all the
 * partitions show up in the, causing high planning times for
 * such queries.
 */
2022-09-15 14:37:28 +03:00
Sameer Awasekar acccad9879 Introduce code changes to fix Issue:6303 (#6328)
The PR introduces code changes to fix Issue
[6303](https://github.com/citusdata/citus/issues/6303)

`create_distributed_table_concurrently` following drop column, creates a
buggy situation in split decoder.
 * Consider the below scenario:
* Session1 : Drop column followed by
create_distributed_table_concurrently
 * Session2 : Concurrent insert workload

The child shards created by `create_distributed_table_concurrently` will
have less columns than the source shard because some column were
dropped. The incoming tuple from session2 will have more columns as the
writes happened on source shard. But now the tuple needs to be applied
on child shard. So we need to format existing tuple according to child
schema and skip dropped column values.
The PR fixes this by reformatting the tuple according the target child
schema.

Test:
1) isolation_create_distributed_concurrently_after_drop_column - Repros
the issue and tests on the same.
2022-09-15 09:48:43 +02:00
aykut-bozkurt 77947da17c ensure we have more active nodes than replication factor. (#6341)
DESCRIPTION: Fixes floating exception during
create_distributed_table_concurrently.

Fixes #6332.
During create_distributed_table_concurrently, when there is no active
primary node, it fails with floating exception. We added similar check
with create_distributed_table. It will fail with proper message if
current active node is less than replication factor.

(cherry picked from commit 739b91afa6)
2022-09-14 18:22:58 +03:00
Marco Slot 7d56c25e28 Fix escaping in sequence dependency queries (#6345)
Co-authored-by: Marco Slot <marco.slot@gmail.com>
2022-09-14 16:52:25 +02:00
Marco Slot eba70af7a2 Fix bugs in CheckIfRelationWithSameNameExists (#6343)
Co-authored-by: Marco Slot <marco.slot@gmail.com>
2022-09-14 15:59:43 +02:00
Hanefi Onaldi 3f33390f45 Bump Citus version to 11.1.0 2022-09-14 14:21:43 +03:00
Nils Dijk 7b51f3eee2
Fix: rebalance stop non super user (#6334)
No need for description, fixing issue introduced with new feature for
11.1

Fixes #6333 

Due to Postgres' C api being o-indexed and postgres' attributes being
1-indexed, we were reading the wrong Datum as the Task owner when
cancelling. Here we add a test to show the error and fix the off-by-one
error.
2022-09-13 23:20:06 +02:00
25 changed files with 1442 additions and 39 deletions

View File

@ -1,3 +1,128 @@
### citus v11.1.1 (September 16, 2022) ###
* Fixes a bug that prevents `create_distributed_table_concurrently()` working
on an empty node
### citus v11.1.0 (September 15, 2022) ###
* Adds support for PostgreSQL 15beta4
* Adds ability to run shard rebalancer in the background
* Adds `create_distributed_table_concurrently()` UDF to distribute tables
without interrupting the application
* Adds `citus_split_shard_by_split_points()` UDF that allows
splitting a shard to specified set of nodes without blocking writes
and based on given split points
* Adds support for non-blocking tenant isolation
* Adds support for isolation tenants that use partitioned tables
or columnar tables
* Separates columnar table access method into a separate logical extension
* Adds support for online replication in `replicate_reference_tables()`
* Improves performance of blocking shard moves
* Improves non-blocking shard moves with a faster custom copy logic
* Creates all foreign keys quickly at the end of a shard move
* Limits `get_rebalance_progress()` to show shards in moving state
* Makes `citus_move_shard_placement()` idempotent if shard already exists
on target node
* Shows `citus_copy_shard_placement()` progress in `get_rebalance_progres()`
* Supports changing CPU priorities for backends and shard moves
* Adds the GUC `citus.allow_unsafe_constraints` to allow unique/exclusion/
primary key constraints without distribution column
* Introduces GUC `citus.skip_constraint_validation`
* Introduces `citus_locks` view
* Improves `citus_tables` view by showing local tables added to metadata
* Improves columnar table access method by moving old catalog tables into
an internal schema and introduces more secure & informative views based
on them
* Adds support for `GRANT/REVOKE` on aggregates
* Adds support for `NULLS NOT DISTINCT` clauses for indexes for PG15+
* Adds support for setting relation options for columnar tables using
`ALTER TABLE`
* Adds support for unlogged distributed sequences
* Removes `do_repair` option from `citus_copy_shard_placement()`
* Removes deprecated re-partitioning functions like
`worker_hash_partition_table()`
* Drops support for isolation tenants that use replicated tables
* Checks existence of the shards before insert, delete, and update
* Hides tables owned by extensions from `citus_tables` and `citus_shards`
* Propagates `VACUUM` and `ANALYZE` to worker nodes
* Makes non-partitioned table size calculation quicker
* Improves `create_distributed_table()` by creating new colocation entries when
using `colocate_with => 'none'`
* Ensures that `SELECT .. FOR UPDATE` opens a transaction block when used in
a function call
* Prevents a segfault by disallowing usage of SQL functions referencing to a
distributed table
* Prevents creating a new colocation entry when replicating reference tables
* Fixes a bug in query escaping in `undistribute_table()` and
`alter_distributed_table()`
* Fixes a bug preventing the usage of `isolate_tenant_to_new_shard()` with text
column
* Fixes a bug that may cause `GRANT` to propagate within `CREATE EXTENSION`
* Fixes a bug that causes incorrectly marking `metadatasynced` flag for
coordinator
* Fixes a bug that may prevent Citus from creating function in transaction
block properly
* Fixes a bug that prevents promoting read-replicas as primaries
* Fixes a bug that prevents setting colocation group of a partitioned
distributed table to `none`
* Fixes a bug that prevents using `AUTO` option for `VACUUM (INDEX_CLEANUP)`
operation
* Fixes a segfault in `citus_copy_shard_placement()`
* Fixes an issue that can cause logical reference table replication to fail
* Fixes schema name qualification for `RENAME SEQUENCE` statement
* Fixes several small memory leaks
* Fixes the transaction timestamp column of the `get_current_transaction_id()`
on coordinator
* Maps any unused parameters to a generic type in prepared statements
### citus v10.2.8 (August 19, 2022) ### ### citus v10.2.8 (August 19, 2022) ###
* Fixes compilation warning caused by latest upgrade script changes * Fixes compilation warning caused by latest upgrade script changes

18
configure vendored
View File

@ -1,6 +1,6 @@
#! /bin/sh #! /bin/sh
# Guess values for system-dependent variables and create Makefiles. # Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.69 for Citus 11.1devel. # Generated by GNU Autoconf 2.69 for Citus 11.1.1.
# #
# #
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc. # Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
@ -579,8 +579,8 @@ MAKEFLAGS=
# Identity of this package. # Identity of this package.
PACKAGE_NAME='Citus' PACKAGE_NAME='Citus'
PACKAGE_TARNAME='citus' PACKAGE_TARNAME='citus'
PACKAGE_VERSION='11.1devel' PACKAGE_VERSION='11.1.1'
PACKAGE_STRING='Citus 11.1devel' PACKAGE_STRING='Citus 11.1.1'
PACKAGE_BUGREPORT='' PACKAGE_BUGREPORT=''
PACKAGE_URL='' PACKAGE_URL=''
@ -1262,7 +1262,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing. # Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh. # This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF cat <<_ACEOF
\`configure' configures Citus 11.1devel to adapt to many kinds of systems. \`configure' configures Citus 11.1.1 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]... Usage: $0 [OPTION]... [VAR=VALUE]...
@ -1324,7 +1324,7 @@ fi
if test -n "$ac_init_help"; then if test -n "$ac_init_help"; then
case $ac_init_help in case $ac_init_help in
short | recursive ) echo "Configuration of Citus 11.1devel:";; short | recursive ) echo "Configuration of Citus 11.1.1:";;
esac esac
cat <<\_ACEOF cat <<\_ACEOF
@ -1429,7 +1429,7 @@ fi
test -n "$ac_init_help" && exit $ac_status test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then if $ac_init_version; then
cat <<\_ACEOF cat <<\_ACEOF
Citus configure 11.1devel Citus configure 11.1.1
generated by GNU Autoconf 2.69 generated by GNU Autoconf 2.69
Copyright (C) 2012 Free Software Foundation, Inc. Copyright (C) 2012 Free Software Foundation, Inc.
@ -1912,7 +1912,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake. running configure, to aid debugging if configure makes a mistake.
It was created by Citus $as_me 11.1devel, which was It was created by Citus $as_me 11.1.1, which was
generated by GNU Autoconf 2.69. Invocation command line was generated by GNU Autoconf 2.69. Invocation command line was
$ $0 $@ $ $0 $@
@ -5393,7 +5393,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their # report actual input values of CONFIG_FILES etc. instead of their
# values after options handling. # values after options handling.
ac_log=" ac_log="
This file was extended by Citus $as_me 11.1devel, which was This file was extended by Citus $as_me 11.1.1, which was
generated by GNU Autoconf 2.69. Invocation command line was generated by GNU Autoconf 2.69. Invocation command line was
CONFIG_FILES = $CONFIG_FILES CONFIG_FILES = $CONFIG_FILES
@ -5455,7 +5455,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1 cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`" ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\ ac_cs_version="\\
Citus config.status 11.1devel Citus config.status 11.1.1
configured by $0, generated by GNU Autoconf 2.69, configured by $0, generated by GNU Autoconf 2.69,
with options \\"\$ac_cs_config\\" with options \\"\$ac_cs_config\\"

View File

@ -5,7 +5,7 @@
# everyone needing autoconf installed, the resulting files are checked # everyone needing autoconf installed, the resulting files are checked
# into the SCM. # into the SCM.
AC_INIT([Citus], [11.1devel]) AC_INIT([Citus], [11.1.1])
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.]) AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
# we'll need sed and awk for some of the version commands # we'll need sed and awk for some of the version commands

View File

@ -1604,6 +1604,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
} }
else if (ShouldSyncTableMetadata(sourceId)) else if (ShouldSyncTableMetadata(sourceId))
{ {
char *qualifiedTableName = quote_qualified_identifier(schemaName, sourceName);
/* /*
* We are converting a citus local table to a distributed/reference table, * We are converting a citus local table to a distributed/reference table,
* so we should prevent dropping the sequence on the table. Otherwise, we'd * so we should prevent dropping the sequence on the table. Otherwise, we'd
@ -1612,8 +1614,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
StringInfo command = makeStringInfo(); StringInfo command = makeStringInfo();
appendStringInfo(command, appendStringInfo(command,
"SELECT pg_catalog.worker_drop_sequence_dependency('%s');", "SELECT pg_catalog.worker_drop_sequence_dependency(%s);",
quote_qualified_identifier(schemaName, sourceName)); quote_literal_cstr(qualifiedTableName));
SendCommandToWorkersWithMetadata(command->data); SendCommandToWorkersWithMetadata(command->data);
} }
@ -1903,11 +1905,17 @@ CreateWorkerChangeSequenceDependencyCommand(char *sequenceSchemaName, char *sequ
char *sourceSchemaName, char *sourceName, char *sourceSchemaName, char *sourceName,
char *targetSchemaName, char *targetName) char *targetSchemaName, char *targetName)
{ {
char *qualifiedSchemaName = quote_qualified_identifier(sequenceSchemaName,
sequenceName);
char *qualifiedSourceName = quote_qualified_identifier(sourceSchemaName, sourceName);
char *qualifiedTargetName = quote_qualified_identifier(targetSchemaName, targetName);
StringInfo query = makeStringInfo(); StringInfo query = makeStringInfo();
appendStringInfo(query, "SELECT worker_change_sequence_dependency('%s', '%s', '%s')", appendStringInfo(query, "SELECT worker_change_sequence_dependency(%s, %s, %s)",
quote_qualified_identifier(sequenceSchemaName, sequenceName), quote_literal_cstr(qualifiedSchemaName),
quote_qualified_identifier(sourceSchemaName, sourceName), quote_literal_cstr(qualifiedSourceName),
quote_qualified_identifier(targetSchemaName, targetName)); quote_literal_cstr(qualifiedTargetName));
return query->data; return query->data;
} }

View File

@ -128,6 +128,9 @@ citus_add_local_table_to_metadata_internal(Oid relationId, bool cascadeViaForeig
{ {
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
/* enable citus_add_local_table_to_metadata on an empty node */
InsertCoordinatorIfClusterEmpty();
bool autoConverted = false; bool autoConverted = false;
CreateCitusLocalTable(relationId, cascadeViaForeignKeys, autoConverted); CreateCitusLocalTable(relationId, cascadeViaForeignKeys, autoConverted);
} }

View File

@ -382,7 +382,6 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
"citus.shard_replication_factor > 1"))); "citus.shard_replication_factor > 1")));
} }
EnsureCoordinatorIsInMetadata();
EnsureCitusTableCanBeCreated(relationId); EnsureCitusTableCanBeCreated(relationId);
EnsureValidDistributionColumn(relationId, distributionColumnName); EnsureValidDistributionColumn(relationId, distributionColumnName);
@ -528,6 +527,14 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
colocatedTableId = ColocatedTableId(colocationId); colocatedTableId = ColocatedTableId(colocationId);
} }
List *workerNodeList = DistributedTablePlacementNodeList(NoLock);
if (workerNodeList == NIL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("no worker nodes are available for placing shards"),
errhint("Add more worker nodes.")));
}
List *workersForPlacementList; List *workersForPlacementList;
List *shardSplitPointsList; List *shardSplitPointsList;
@ -555,7 +562,6 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
/* /*
* Place shards in a round-robin fashion across all data nodes. * Place shards in a round-robin fashion across all data nodes.
*/ */
List *workerNodeList = DistributedTablePlacementNodeList(NoLock);
workersForPlacementList = RoundRobinWorkerNodeList(workerNodeList, shardCount); workersForPlacementList = RoundRobinWorkerNodeList(workerNodeList, shardCount);
} }
@ -856,6 +862,8 @@ WorkerNodesForShardList(List *shardList)
static List * static List *
RoundRobinWorkerNodeList(List *workerNodeList, int listLength) RoundRobinWorkerNodeList(List *workerNodeList, int listLength)
{ {
Assert(workerNodeList != NIL);
List *nodeIdList = NIL; List *nodeIdList = NIL;
for (int idx = 0; idx < listLength; idx++) for (int idx = 0; idx < listLength; idx++)

View File

@ -4022,7 +4022,7 @@ CancelTasksForJob(int64 jobid)
} }
/* make sure the current user has the rights to cancel this task */ /* make sure the current user has the rights to cancel this task */
Oid taskOwner = DatumGetObjectId(values[Anum_pg_dist_background_task_owner]); Oid taskOwner = DatumGetObjectId(values[Anum_pg_dist_background_task_owner - 1]);
if (superuser_arg(taskOwner) && !superuser()) if (superuser_arg(taskOwner) && !superuser())
{ {
/* must be a superuser to cancel tasks owned by superuser */ /* must be a superuser to cancel tasks owned by superuser */

View File

@ -670,10 +670,15 @@ CheckIfRelationWithSameNameExists(ShardInterval *shardInterval, WorkerNode *work
AppendShardIdToName(&shardName, shardInterval->shardId); AppendShardIdToName(&shardName, shardInterval->shardId);
StringInfo checkShardExistsQuery = makeStringInfo(); StringInfo checkShardExistsQuery = makeStringInfo();
/*
* We pass schemaName and shardName without quote_identifier, since
* they are used as strings here.
*/
appendStringInfo(checkShardExistsQuery, appendStringInfo(checkShardExistsQuery,
"SELECT EXISTS (SELECT FROM pg_catalog.pg_tables WHERE schemaname = '%s' AND tablename = '%s');", "SELECT EXISTS (SELECT FROM pg_catalog.pg_tables WHERE schemaname = %s AND tablename = %s);",
schemaName, quote_literal_cstr(schemaName),
shardName); quote_literal_cstr(shardName));
int connectionFlags = 0; int connectionFlags = 0;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags, MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
@ -691,11 +696,13 @@ CheckIfRelationWithSameNameExists(ShardInterval *shardInterval, WorkerNode *work
ReportResultError(connection, result, ERROR); ReportResultError(connection, result, ERROR);
} }
char *checkExists = PQgetvalue(result, 0, 0); char *existsString = PQgetvalue(result, 0, 0);
bool tableExists = strcmp(existsString, "t") == 0;
PQclear(result); PQclear(result);
ForgetResults(connection); ForgetResults(connection);
return strcmp(checkExists, "t") == 0; return tableExists;
} }

View File

@ -1897,14 +1897,14 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
MemoryContext restrictionsMemoryContext = plannerRestrictionContext->memoryContext; MemoryContext restrictionsMemoryContext = plannerRestrictionContext->memoryContext;
MemoryContext oldMemoryContext = MemoryContextSwitchTo(restrictionsMemoryContext); MemoryContext oldMemoryContext = MemoryContextSwitchTo(restrictionsMemoryContext);
bool distributedTable = IsCitusTable(rte->relid); bool isCitusTable = IsCitusTable(rte->relid);
RelationRestriction *relationRestriction = palloc0(sizeof(RelationRestriction)); RelationRestriction *relationRestriction = palloc0(sizeof(RelationRestriction));
relationRestriction->index = restrictionIndex; relationRestriction->index = restrictionIndex;
relationRestriction->relationId = rte->relid; relationRestriction->relationId = rte->relid;
relationRestriction->rte = rte; relationRestriction->rte = rte;
relationRestriction->relOptInfo = relOptInfo; relationRestriction->relOptInfo = relOptInfo;
relationRestriction->distributedRelation = distributedTable; relationRestriction->citusTable = isCitusTable;
relationRestriction->plannerInfo = root; relationRestriction->plannerInfo = root;
/* see comments on GetVarFromAssignedParam() */ /* see comments on GetVarFromAssignedParam() */
@ -1919,10 +1919,42 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
* We're also keeping track of whether all participant * We're also keeping track of whether all participant
* tables are reference tables. * tables are reference tables.
*/ */
if (distributedTable) if (isCitusTable)
{ {
cacheEntry = GetCitusTableCacheEntry(rte->relid); cacheEntry = GetCitusTableCacheEntry(rte->relid);
/*
* The statistics objects of the distributed table are not relevant
* for the distributed planning, so we can override it.
*
* Normally, we should not need this. However, the combination of
* Postgres commit 269b532aef55a579ae02a3e8e8df14101570dfd9 and
* Citus function AdjustPartitioningForDistributedPlanning()
* forces us to do this. The commit expects statistics objects
* of partitions to have "inh" flag set properly. Whereas, the
* function overrides "inh" flag. To avoid Postgres to throw error,
* we override statlist such that Postgres does not try to process
* any statistics objects during the standard_planner() on the
* coordinator. In the end, we do not need the standard_planner()
* on the coordinator to generate an optimized plan. We call
* into standard_planner() for other purposes, such as generating the
* relationRestrictionContext here.
*
* AdjustPartitioningForDistributedPlanning() is a hack that we use
* to prevent Postgres' standard_planner() to expand all the partitions
* for the distributed planning when a distributed partitioned table
* is queried. It is required for both correctness and performance
* reasons. Although we can eliminate the use of the function for
* the correctness (e.g., make sure that rest of the planner can handle
* partitions), it's performance implication is hard to avoid. Certain
* planning logic of Citus (such as router or query pushdown) relies
* heavily on the relationRestrictionList. If
* AdjustPartitioningForDistributedPlanning() is removed, all the
* partitions show up in the, causing high planning times for
* such queries.
*/
relOptInfo->statlist = NIL;
relationRestrictionContext->allReferenceTables &= relationRestrictionContext->allReferenceTables &=
IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE); IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE);
} }

View File

@ -3692,7 +3692,7 @@ CopyRelationRestrictionContext(RelationRestrictionContext *oldContext)
newRestriction->index = oldRestriction->index; newRestriction->index = oldRestriction->index;
newRestriction->relationId = oldRestriction->relationId; newRestriction->relationId = oldRestriction->relationId;
newRestriction->distributedRelation = oldRestriction->distributedRelation; newRestriction->citusTable = oldRestriction->citusTable;
newRestriction->rte = copyObject(oldRestriction->rte); newRestriction->rte = copyObject(oldRestriction->rte);
/* can't be copied, we copy (flatly) a RelOptInfo, and then decouple baserestrictinfo */ /* can't be copied, we copy (flatly) a RelOptInfo, and then decouple baserestrictinfo */

View File

@ -224,7 +224,7 @@ ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext)
{ {
RelationRestriction *relationRestriction = lfirst(relationRestrictionCell); RelationRestriction *relationRestriction = lfirst(relationRestrictionCell);
if (!relationRestriction->distributedRelation) if (!relationRestriction->citusTable)
{ {
return true; return true;
} }

View File

@ -34,6 +34,10 @@ static Oid FindTargetRelationOid(Relation sourceShardRelation,
HeapTuple tuple, HeapTuple tuple,
char *currentSlotName); char *currentSlotName);
static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple,
TupleDesc sourceTupleDesc,
TupleDesc targetTupleDesc);
/* /*
* Postgres uses 'pgoutput' as default plugin for logical replication. * Postgres uses 'pgoutput' as default plugin for logical replication.
* We want to reuse Postgres pgoutput's functionality as much as possible. * We want to reuse Postgres pgoutput's functionality as much as possible.
@ -129,6 +133,71 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
} }
Relation targetRelation = RelationIdGetRelation(targetRelationOid); Relation targetRelation = RelationIdGetRelation(targetRelationOid);
/*
* If any columns from source relation have been dropped, then the tuple needs to
* be formatted according to the target relation.
*/
TupleDesc sourceRelationDesc = RelationGetDescr(relation);
TupleDesc targetRelationDesc = RelationGetDescr(targetRelation);
if (sourceRelationDesc->natts > targetRelationDesc->natts)
{
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
{
HeapTuple sourceRelationNewTuple = &(change->data.tp.newtuple->tuple);
HeapTuple targetRelationNewTuple = GetTupleForTargetSchema(
sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc);
change->data.tp.newtuple->tuple = *targetRelationNewTuple;
break;
}
case REORDER_BUFFER_CHANGE_UPDATE:
{
HeapTuple sourceRelationNewTuple = &(change->data.tp.newtuple->tuple);
HeapTuple targetRelationNewTuple = GetTupleForTargetSchema(
sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc);
change->data.tp.newtuple->tuple = *targetRelationNewTuple;
/*
* Format oldtuple according to the target relation. If the column values of replica
* identiy change, then the old tuple is non-null and needs to be formatted according
* to the target relation schema.
*/
if (change->data.tp.oldtuple != NULL)
{
HeapTuple sourceRelationOldTuple = &(change->data.tp.oldtuple->tuple);
HeapTuple targetRelationOldTuple = GetTupleForTargetSchema(
sourceRelationOldTuple,
sourceRelationDesc,
targetRelationDesc);
change->data.tp.oldtuple->tuple = *targetRelationOldTuple;
}
break;
}
case REORDER_BUFFER_CHANGE_DELETE:
{
HeapTuple sourceRelationOldTuple = &(change->data.tp.oldtuple->tuple);
HeapTuple targetRelationOldTuple = GetTupleForTargetSchema(
sourceRelationOldTuple, sourceRelationDesc, targetRelationDesc);
change->data.tp.oldtuple->tuple = *targetRelationOldTuple;
break;
}
/* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */
default:
ereport(ERROR, errmsg(
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
change->action));
}
}
pgoutputChangeCB(ctx, txn, targetRelation, change); pgoutputChangeCB(ctx, txn, targetRelation, change);
RelationClose(targetRelation); RelationClose(targetRelation);
} }
@ -223,3 +292,51 @@ GetHashValueForIncomingTuple(Relation sourceShardRelation,
return DatumGetInt32(hashedValueDatum); return DatumGetInt32(hashedValueDatum);
} }
/*
* GetTupleForTargetSchema returns a tuple with the schema of the target relation.
* If some columns within the source relations are dropped, we would have to reformat
* the tuple to match the schema of the target relation.
*
* Consider the below scenario:
* Session1 : Drop column followed by create_distributed_table_concurrently
* Session2 : Concurrent insert workload
*
* The child shards created by create_distributed_table_concurrently will have less columns
* than the source shard because some column were dropped.
* The incoming tuple from session2 will have more columns as the writes
* happened on source shard. But now the tuple needs to be applied on child shard. So we need to format
* it according to child schema.
*/
static HeapTuple
GetTupleForTargetSchema(HeapTuple sourceRelationTuple,
TupleDesc sourceRelDesc,
TupleDesc targetRelDesc)
{
/* Deform the tuple */
Datum *oldValues = (Datum *) palloc0(sourceRelDesc->natts * sizeof(Datum));
bool *oldNulls = (bool *) palloc0(sourceRelDesc->natts * sizeof(bool));
heap_deform_tuple(sourceRelationTuple, sourceRelDesc, oldValues,
oldNulls);
/* Create new tuple by skipping dropped columns */
int nextAttributeIndex = 0;
Datum *newValues = (Datum *) palloc0(targetRelDesc->natts * sizeof(Datum));
bool *newNulls = (bool *) palloc0(targetRelDesc->natts * sizeof(bool));
for (int i = 0; i < sourceRelDesc->natts; i++)
{
if (TupleDescAttr(sourceRelDesc, i)->attisdropped)
{
continue;
}
newValues[nextAttributeIndex] = oldValues[i];
newNulls[nextAttributeIndex] = oldNulls[i];
nextAttributeIndex++;
}
HeapTuple targetRelationTuple = heap_form_tuple(targetRelDesc, newValues, newNulls);
return targetRelationTuple;
}

View File

@ -56,7 +56,7 @@ typedef struct RelationRestriction
{ {
Index index; Index index;
Oid relationId; Oid relationId;
bool distributedRelation; bool citusTable;
RangeTblEntry *rte; RangeTblEntry *rte;
RelOptInfo *relOptInfo; RelOptInfo *relOptInfo;
PlannerInfo *plannerInfo; PlannerInfo *plannerInfo;

View File

@ -8,5 +8,6 @@ test: isolation_cluster_management
test: isolation_logical_replication_single_shard_commands test: isolation_logical_replication_single_shard_commands
test: isolation_logical_replication_multi_shard_commands test: isolation_logical_replication_multi_shard_commands
test: isolation_non_blocking_shard_split test: isolation_non_blocking_shard_split
test: isolation_create_distributed_concurrently_after_drop_column
test: isolation_non_blocking_shard_split_with_index_as_replicaIdentity test: isolation_non_blocking_shard_split_with_index_as_replicaIdentity
test: isolation_non_blocking_shard_split_fkey test: isolation_non_blocking_shard_split_fkey

View File

@ -176,5 +176,39 @@ SELECT citus_rebalance_wait();
(1 row) (1 row)
DROP TABLE t1;
-- make sure a non-super user can stop rebalancing
CREATE USER non_super_user_rebalance WITH LOGIN;
GRANT ALL ON SCHEMA background_rebalance TO non_super_user_rebalance;
SET ROLE non_super_user_rebalance;
CREATE TABLE non_super_user_t1 (a int PRIMARY KEY);
SELECT create_distributed_table('non_super_user_t1', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT citus_move_shard_placement(85674008, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT 1 FROM citus_rebalance_start();
NOTICE: Scheduled 1 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_rebalance_stop();
citus_rebalance_stop
---------------------------------------------------------------------
(1 row)
RESET ROLE;
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA background_rebalance CASCADE; DROP SCHEMA background_rebalance CASCADE;

View File

@ -57,6 +57,35 @@ ERROR: cannot colocate tables nocolo and test
DETAIL: Distribution column types don't match for nocolo and test. DETAIL: Distribution column types don't match for nocolo and test.
select create_distributed_table_concurrently('test','key', colocate_with := 'noexists'); select create_distributed_table_concurrently('test','key', colocate_with := 'noexists');
ERROR: relation "noexists" does not exist ERROR: relation "noexists" does not exist
select citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
select citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
select create_distributed_table_concurrently('test','key');
NOTICE: relation test does not have a REPLICA IDENTITY or PRIMARY KEY
DETAIL: UPDATE and DELETE commands on the relation will error out during create_distributed_table_concurrently unless there is a REPLICA IDENTITY or PRIMARY KEY. INSERT commands will still work.
ERROR: no worker nodes are available for placing shards
HINT: Add more worker nodes.
select citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
select citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
-- use colocate_with "default" -- use colocate_with "default"
select create_distributed_table_concurrently('test','key', shard_count := 11); select create_distributed_table_concurrently('test','key', shard_count := 11);
NOTICE: relation test does not have a REPLICA IDENTITY or PRIMARY KEY NOTICE: relation test does not have a REPLICA IDENTITY or PRIMARY KEY

View File

@ -0,0 +1,667 @@
Parsed test spec with 3 sessions
starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
(0 rows)
tenant_id|dummy|measurement_id|payload|observation_time
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s1-alter-table:
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
step s1-set-factor-1:
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently:
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
<waiting ...>
step s2-insert-observations_with_pk:
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
step s2-update-observations_with_pk:
UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
step s2-end:
COMMIT;
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57636|1500004|t | 4
(1 row)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(4 rows)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently: <... completed>
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500006|t | 4
57637|1500008|t | 0
57638|1500005|t | 0
57638|1500007|t | 0
(4 rows)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(4 rows)
starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-primary-key-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
(0 rows)
tenant_id|dummy|measurement_id|payload|observation_time
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s1-alter-table:
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
step s1-set-factor-1:
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently:
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
<waiting ...>
step s2-insert-observations_with_pk:
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
step s2-update-primary-key-observations_with_pk:
UPDATE observations_with_pk set measurement_id=100 where tenant_id = 'tenant_id' and measurement_id = 4 ;
step s2-end:
COMMIT;
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57636|1500009|t | 4
(1 row)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 100|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(4 rows)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently: <... completed>
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500011|t | 4
57637|1500013|t | 0
57638|1500010|t | 0
57638|1500012|t | 0
(4 rows)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 100|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(4 rows)
starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-observations_with_pk s2-delete-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
(0 rows)
tenant_id|dummy|measurement_id|payload|observation_time
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s1-alter-table:
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
step s1-set-factor-1:
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently:
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
<waiting ...>
step s2-insert-observations_with_pk:
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
step s2-update-observations_with_pk:
UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
step s2-delete-observations_with_pk:
DELETE FROM observations_with_pk where tenant_id = 'tenant_id' and measurement_id = 3 ;
step s2-end:
COMMIT;
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57636|1500014|t | 3
(1 row)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(3 rows)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently: <... completed>
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500016|t | 3
57637|1500018|t | 0
57638|1500015|t | 0
57638|1500017|t | 0
(4 rows)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(3 rows)
starting permutation: s2-print-cluster-2 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations-2-concurrently s2-insert-observations_with_full_replica_identity s2-update-observations_with_full_replica_identity s2-end s2-print-cluster-2 s3-release-advisory-lock s2-print-cluster-2
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
(0 rows)
tenant_id|dummy|measurement_id|payload|observation_time
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s1-alter-table:
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
step s1-set-factor-1:
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step s1-create-distributed-table-observations-2-concurrently:
SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id');
<waiting ...>
step s2-insert-observations_with_full_replica_identity:
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
step s2-update-observations_with_full_replica_identity:
UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
step s2-end:
COMMIT;
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57636|1500019|t | 3
(1 row)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
(3 rows)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-create-distributed-table-observations-2-concurrently: <... completed>
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500021|t | 3
57637|1500023|t | 0
57638|1500020|t | 0
57638|1500022|t | 0
(4 rows)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
(3 rows)
starting permutation: s2-print-cluster-2 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations-2-concurrently s2-insert-observations_with_full_replica_identity s2-update-observations_with_full_replica_identity s2-delete-observations_with_full_replica_identity s2-end s2-print-cluster-2 s3-release-advisory-lock s2-print-cluster-2
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
(0 rows)
tenant_id|dummy|measurement_id|payload|observation_time
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s1-alter-table:
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
step s1-set-factor-1:
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step s1-create-distributed-table-observations-2-concurrently:
SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id');
<waiting ...>
step s2-insert-observations_with_full_replica_identity:
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
step s2-update-observations_with_full_replica_identity:
UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
step s2-delete-observations_with_full_replica_identity:
DELETE FROM observations_with_full_replica_identity where tenant_id = 'tenant_id' and measurement_id = 3 ;
step s2-end:
COMMIT;
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57636|1500024|t | 2
(1 row)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(2 rows)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-create-distributed-table-observations-2-concurrently: <... completed>
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500026|t | 2
57637|1500028|t | 0
57638|1500025|t | 0
57638|1500027|t | 0
(4 rows)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(2 rows)

View File

@ -1177,7 +1177,7 @@ DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
SHOW citus.version; SHOW citus.version;
citus.version citus.version
--------------------------------------------------------------------- ---------------------------------------------------------------------
11.1devel 11.1.1
(1 row) (1 row)
-- ensure no unexpected objects were created outside pg_catalog -- ensure no unexpected objects were created outside pg_catalog
@ -1521,6 +1521,66 @@ SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenanc
1 1
(1 row) (1 row)
-- confirm that we can create a distributed table concurrently on an empty node
DROP EXTENSION citus;
CREATE EXTENSION citus;
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SET citus.shard_replication_factor TO 1;
SET citus.defer_drop_after_shard_split TO off;
SELECT create_distributed_table_concurrently('test','x');
NOTICE: relation test does not have a REPLICA IDENTITY or PRIMARY KEY
DETAIL: UPDATE and DELETE commands on the relation will error out during create_distributed_table_concurrently unless there is a REPLICA IDENTITY or PRIMARY KEY. INSERT commands will still work.
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
DROP TABLE test;
TRUNCATE pg_dist_node;
-- confirm that we can create a distributed table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('test','x');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
DROP TABLE test;
TRUNCATE pg_dist_node;
-- confirm that we can create a reference table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SELECT create_reference_table('test');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test$$)
create_reference_table
---------------------------------------------------------------------
(1 row)
DROP TABLE test;
TRUNCATE pg_dist_node;
-- confirm that we can create a local table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SELECT citus_add_local_table_to_metadata('test');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
DROP TABLE test;
DROP EXTENSION citus;
CREATE EXTENSION citus;
DROP TABLE version_mismatch_table; DROP TABLE version_mismatch_table;
DROP SCHEMA multi_extension; DROP SCHEMA multi_extension;
ERROR: cannot drop schema multi_extension because other objects depend on it ERROR: cannot drop schema multi_extension because other objects depend on it

View File

@ -521,9 +521,9 @@ SELECT tablename, indexname FROM pg_indexes WHERE schemaname = 'fix_idx_names' O
tablename | indexname tablename | indexname
--------------------------------------------------------------------- ---------------------------------------------------------------------
date_partitioned_citus_local_table | date_partitioned_citus_local_table_measureid_idx date_partitioned_citus_local_table | date_partitioned_citus_local_table_measureid_idx
date_partitioned_citus_local_table_361369 | date_partitioned_citus_local_table_measureid_idx_361369 date_partitioned_citus_local_table_361377 | date_partitioned_citus_local_table_measureid_idx_361377
partition_local_table | partition_local_table_measureid_idx partition_local_table | partition_local_table_measureid_idx
partition_local_table_361370 | partition_local_table_measureid_idx_361370 partition_local_table_361378 | partition_local_table_measureid_idx_361378
(4 rows) (4 rows)
-- creating a single object should only need to trigger fixing the single object -- creating a single object should only need to trigger fixing the single object
@ -753,7 +753,7 @@ DETAIL: drop cascades to table not_partitioned
drop cascades to table not_distributed drop cascades to table not_distributed
drop cascades to table fk_table drop cascades to table fk_table
drop cascades to table p drop cascades to table p
drop cascades to table date_partitioned_citus_local_table_361369 drop cascades to table date_partitioned_citus_local_table_361377
drop cascades to table date_partitioned_citus_local_table drop cascades to table date_partitioned_citus_local_table
drop cascades to table parent_table drop cascades to table parent_table
SELECT citus_remove_node('localhost', :master_port); SELECT citus_remove_node('localhost', :master_port);

View File

@ -4324,12 +4324,66 @@ WHERE schemaname = 'partitioning_schema' AND tablename ilike '%part_table_with_%
(6 rows) (6 rows)
\c - - - :master_port \c - - - :master_port
SET search_path TO partitioning_schema;
-- create parent table
CREATE TABLE stxdinp(i int, a int, b int) PARTITION BY RANGE (i);
-- create partition
CREATE TABLE stxdinp1 PARTITION OF stxdinp FOR VALUES FROM (1) TO (100);
-- populate table
INSERT INTO stxdinp SELECT 1, a/100, a/100 FROM generate_series(1, 999) a;
-- create extended statistics
CREATE STATISTICS stxdinp ON a, b FROM stxdinp;
-- distribute parent table
SELECT create_distributed_table('stxdinp', 'i');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$partitioning_schema.stxdinp1$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- run select query, works fine
SELECT a, b FROM stxdinp GROUP BY 1, 2;
a | b
---------------------------------------------------------------------
1 | 1
3 | 3
7 | 7
2 | 2
8 | 8
0 | 0
5 | 5
6 | 6
9 | 9
4 | 4
(10 rows)
-- partitions are processed recursively for PG15+
VACUUM ANALYZE stxdinp;
SELECT a, b FROM stxdinp GROUP BY 1, 2;
a | b
---------------------------------------------------------------------
1 | 1
3 | 3
7 | 7
2 | 2
8 | 8
0 | 0
5 | 5
6 | 6
9 | 9
4 | 4
(10 rows)
DROP SCHEMA partitioning_schema CASCADE; DROP SCHEMA partitioning_schema CASCADE;
NOTICE: drop cascades to 4 other objects NOTICE: drop cascades to 5 other objects
DETAIL: drop cascades to table partitioning_schema."schema-test" DETAIL: drop cascades to table "schema-test"
drop cascades to table partitioning_schema.another_distributed_table drop cascades to table another_distributed_table
drop cascades to table partitioning_schema.distributed_parent_table drop cascades to table distributed_parent_table
drop cascades to table partitioning_schema.part_table_with_very_long_name drop cascades to table part_table_with_very_long_name
drop cascades to table stxdinp
RESET search_path; RESET search_path;
DROP TABLE IF EXISTS DROP TABLE IF EXISTS
partitioning_hash_test, partitioning_hash_test,

View File

@ -0,0 +1,176 @@
#include "isolation_mx_common.include.spec"
// Test scenario for nonblocking split and concurrent INSERT/UPDATE/DELETE
// session s1 - Executes create_distributed_table_concurrently after dropping a column on tables with replica identities
// session s2 - Does concurrent inserts/update/delete
// session s3 - Holds advisory locks
setup
{
SET citus.shard_replication_factor TO 1;
CREATE TABLE observations_with_pk (
tenant_id text not null,
dummy int,
measurement_id bigserial not null,
payload jsonb not null,
observation_time timestamptz not null default '03/11/2018 02:00:00'::TIMESTAMP,
PRIMARY KEY (tenant_id, measurement_id)
);
CREATE TABLE observations_with_full_replica_identity (
tenant_id text not null,
dummy int,
measurement_id bigserial not null,
payload jsonb not null,
observation_time timestamptz not null default '03/11/2018 02:00:00'::TIMESTAMP
);
ALTER TABLE observations_with_full_replica_identity REPLICA IDENTITY FULL;
}
teardown
{
DROP TABLE observations_with_pk;
DROP TABLE observations_with_full_replica_identity;
}
session "s1"
step "s1-alter-table"
{
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
}
step "s1-set-factor-1"
{
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
}
step "s1-create-distributed-table-observations_with_pk-concurrently"
{
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
}
step "s1-create-distributed-table-observations-2-concurrently"
{
SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id');
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-insert-observations_with_pk"
{
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
}
step "s2-insert-observations_with_full_replica_identity"
{
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
}
step "s2-update-observations_with_pk"
{
UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
}
step "s2-update-primary-key-observations_with_pk"
{
UPDATE observations_with_pk set measurement_id=100 where tenant_id = 'tenant_id' and measurement_id = 4 ;
}
step "s2-update-observations_with_full_replica_identity"
{
UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
}
step "s2-delete-observations_with_pk"
{
DELETE FROM observations_with_pk where tenant_id = 'tenant_id' and measurement_id = 3 ;
}
step "s2-delete-observations_with_full_replica_identity"
{
DELETE FROM observations_with_full_replica_identity where tenant_id = 'tenant_id' and measurement_id = 3 ;
}
step "s2-end"
{
COMMIT;
}
step "s2-print-cluster-1"
{
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
}
step "s2-print-cluster-2"
{
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
}
session "s3"
// this advisory lock with (almost) random values are only used
// for testing purposes. For details, check Citus' logical replication
// source code
step "s3-acquire-advisory-lock"
{
SELECT pg_advisory_lock(44000, 55152);
}
step "s3-release-advisory-lock"
{
SELECT pg_advisory_unlock(44000, 55152);
}
// Concurrent Insert/Update with create_distributed_table_concurrently(with primary key as replica identity) after dropping a column:
// s3 holds advisory lock -> s1 starts create_distributed_table_concurrently and waits for advisory lock ->
// s2 concurrently inserts/deletes/updates rows -> s3 releases the advisory lock
// -> s1 complete create_distributed_table_concurrently -> result is reflected in new shards
permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1"
permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-primary-key-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1"
permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-observations_with_pk" "s2-delete-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1"
// Concurrent Insert/Update with create_distributed_table_concurrently(with replica identity full) after dropping a column:
// s3 holds advisory lock -> s1 starts create_distributed_table_concurrently and waits for advisory lock ->
// s2 concurrently inserts/deletes/updates rows -> s3 releases the advisory lock
// -> s1 complete create_distributed_table_concurrently -> result is reflected in new shards
permutation "s2-print-cluster-2" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations-2-concurrently" "s2-insert-observations_with_full_replica_identity" "s2-update-observations_with_full_replica_identity" "s2-end" "s2-print-cluster-2" "s3-release-advisory-lock" "s2-print-cluster-2"
permutation "s2-print-cluster-2" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations-2-concurrently" "s2-insert-observations_with_full_replica_identity" "s2-update-observations_with_full_replica_identity" "s2-delete-observations_with_full_replica_identity" "s2-end" "s2-print-cluster-2" "s3-release-advisory-lock" "s2-print-cluster-2"

View File

@ -59,6 +59,24 @@ SELECT 1 FROM citus_rebalance_start();
SELECT rebalance_table_shards(); SELECT rebalance_table_shards();
SELECT citus_rebalance_wait(); SELECT citus_rebalance_wait();
DROP TABLE t1;
-- make sure a non-super user can stop rebalancing
CREATE USER non_super_user_rebalance WITH LOGIN;
GRANT ALL ON SCHEMA background_rebalance TO non_super_user_rebalance;
SET ROLE non_super_user_rebalance;
CREATE TABLE non_super_user_t1 (a int PRIMARY KEY);
SELECT create_distributed_table('non_super_user_t1', 'a', shard_count => 4, colocate_with => 'none');
SELECT citus_move_shard_placement(85674008, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes');
SELECT 1 FROM citus_rebalance_start();
SELECT citus_rebalance_stop();
RESET ROLE;
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA background_rebalance CASCADE; DROP SCHEMA background_rebalance CASCADE;

View File

@ -38,6 +38,12 @@ select create_distributed_table_concurrently('nocolo','x');
select create_distributed_table_concurrently('test','key', colocate_with := 'nocolo'); select create_distributed_table_concurrently('test','key', colocate_with := 'nocolo');
select create_distributed_table_concurrently('test','key', colocate_with := 'noexists'); select create_distributed_table_concurrently('test','key', colocate_with := 'noexists');
select citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false);
select citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
select create_distributed_table_concurrently('test','key');
select citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true);
select citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
-- use colocate_with "default" -- use colocate_with "default"
select create_distributed_table_concurrently('test','key', shard_count := 11); select create_distributed_table_concurrently('test','key', shard_count := 11);

View File

@ -795,5 +795,39 @@ FROM test.maintenance_worker();
-- confirm that there is only one maintenance daemon -- confirm that there is only one maintenance daemon
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon'; SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
-- confirm that we can create a distributed table concurrently on an empty node
DROP EXTENSION citus;
CREATE EXTENSION citus;
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SET citus.shard_replication_factor TO 1;
SET citus.defer_drop_after_shard_split TO off;
SELECT create_distributed_table_concurrently('test','x');
DROP TABLE test;
TRUNCATE pg_dist_node;
-- confirm that we can create a distributed table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('test','x');
DROP TABLE test;
TRUNCATE pg_dist_node;
-- confirm that we can create a reference table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SELECT create_reference_table('test');
DROP TABLE test;
TRUNCATE pg_dist_node;
-- confirm that we can create a local table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SELECT citus_add_local_table_to_metadata('test');
DROP TABLE test;
DROP EXTENSION citus;
CREATE EXTENSION citus;
DROP TABLE version_mismatch_table; DROP TABLE version_mismatch_table;
DROP SCHEMA multi_extension; DROP SCHEMA multi_extension;

View File

@ -2002,6 +2002,30 @@ SELECT tablename, indexname FROM pg_indexes
WHERE schemaname = 'partitioning_schema' AND tablename ilike '%part_table_with_%' ORDER BY 1, 2; WHERE schemaname = 'partitioning_schema' AND tablename ilike '%part_table_with_%' ORDER BY 1, 2;
\c - - - :master_port \c - - - :master_port
SET search_path TO partitioning_schema;
-- create parent table
CREATE TABLE stxdinp(i int, a int, b int) PARTITION BY RANGE (i);
-- create partition
CREATE TABLE stxdinp1 PARTITION OF stxdinp FOR VALUES FROM (1) TO (100);
-- populate table
INSERT INTO stxdinp SELECT 1, a/100, a/100 FROM generate_series(1, 999) a;
-- create extended statistics
CREATE STATISTICS stxdinp ON a, b FROM stxdinp;
-- distribute parent table
SELECT create_distributed_table('stxdinp', 'i');
-- run select query, works fine
SELECT a, b FROM stxdinp GROUP BY 1, 2;
-- partitions are processed recursively for PG15+
VACUUM ANALYZE stxdinp;
SELECT a, b FROM stxdinp GROUP BY 1, 2;
DROP SCHEMA partitioning_schema CASCADE; DROP SCHEMA partitioning_schema CASCADE;
RESET search_path; RESET search_path;
DROP TABLE IF EXISTS DROP TABLE IF EXISTS