Compare commits

...

19 Commits

Author SHA1 Message Date
Onur Tirtir 9b19e41e46 Add changelog entries for 11.1.2 (#6386)
(cherry picked from commit 17cf137c4c)
2022-09-30 13:07:28 +03:00
Onur Tirtir 10be12e4be Bump citus version to 11.1.2 2022-09-30 13:06:12 +03:00
Jelte Fennema 006f6aceaf Reuse connections for Splits and Logical Replication (#6314)
In Split, Logical replication logic and ShardCleaner we call
`SendCommandListToWorkerOutsideTransaction` and
`SendOptionalCommandListToWorkerOutsideTransaction` frequently. This
opens new connection for each of those calls, even though we already
have a perfectly good connection lying around.

This PR adds two new APIs
`SendCommandListToWorkerOutsideTransactionWithConnection` and
`SendOptionalCommandListToWorkerOutsideTransactionWithConnection` that
allow sending a list of queries in a transaction over an existing
connection. We also update the callers (Split, ShardCleaner, Logical
Replication) to use these new APIs instead.

Co-authored-by: Nitish Upreti <niupre@microsoft.com>
Co-authored-by: Onder Kalaci <onderkalaci@gmail.com>
(cherry picked from commit 24e06af6d2)
2022-09-26 16:53:38 +02:00
Jelte Fennema ebe70adc92 Revert replica identity creation order for shard moves (#6367)
In Citus 11.1.0 we changed the order of doing the initial data copy and
the replica identity creation when doing a non blocking shard move. This
was done to try and increase the speed with which shard moves could be
done. But after doing more extensive performance testing this change
turned out to have a negative impact on the speed of moves on the setups
that I tested.

Looking at the resource usage metrics of the VMs the reason for this
seems to be that these shard moves were bottlenecked by disk bandwidth.
While creating replica identities in bulk after the initial copy will
reduce CPU usage a bit, it does require an additional sequence scan of
the just written data. So when a VM is bottlenecked on disk, it makes
sense to spend a little bit more CPU to avoid an additional scan. Since
PKs are usually simple indexes that don't require lots of CPU to update,
as opposed to e.g. GiST indexes.

This reverts the order change to avoid a regression on shard move speed
in these cases.

For future releases we might consider re-evaluating our index creation
order for other indexes too, and create "simple" indexes before the
copy.

(cherry picked from commit d9a9a3263b)
2022-09-26 16:53:30 +02:00
Onur Tirtir b9e4364acc Not allow ON DELETE/UPDATE SET DEFAULT actions on columns that default to sequences (#6340)
Given that we drop DEFAULT nextval('sequence') expressions from
shard relation columns, allowing `ON DELETE/UPDATE SET DEFAULT`
on such columns might cause inserting NULL values as a result
of a delete/update operation.

For this reason, we disallow ON DELETE/UPDATE SET DEFAULT actions
on columns that default to sequences.

DESCRIPTION: Disallows having ON DELETE/UPDATE SET DEFAULT actions on
columns that default to sequences

Fixes #6339.

(cherry picked from commit a868cc049a)

 Conflicts:
	src/test/regress/expected/pg15.out
	src/test/regress/sql/pg15.sql
2022-09-23 13:55:51 +03:00
Onur Tirtir 53ec5abb75 Not drop default col exprs from shard when adding local table to metadata (#6323)
As we did for GENERATED STORED columns in #4613, we should not drop
column
default expressions that are not based on sequences from shard relation
since
such expressions need to exist e.g. for foreign key actions.

For the column default expressions that are based on sequences we cannot
do much, so we need to disallow having ON DELETE SET DEFAULT actions on
such columns in a separate PR, see #6339.

Fixes #6318.

DESCRIPTION: Fixes a bug that might cause inserting incorrect DEFAULT
values when applying foreign key actions

(cherry picked from commit de24a3eda5)
2022-09-23 13:53:04 +03:00
Ahmet Gedemenli ecaa0cda6d Fix dropping replication slot (#6359)
DESCRIPTION: Fixes dropping replication slots

As detected by a flaky test, Citus sometimes fails to drop replication
slots, possibly due to a race condition, at the end of a shard split.
With this PR, we retry to drop them in case of an `OBJECT_IN_USE` error,
consistently for 20 seconds.

fixes: #6326
(cherry picked from commit bae4b47c2f)
2022-09-22 11:31:37 +03:00
Nitish Upreti a8e7c2cb09 Shard Split : Add / Update logging (#6336)
DESCRIPTION: Improve logging during shard split and resource cleanup

### DESCRIPTION

This PR makes logging improvements to Shard Split : 

1. Update confusing logging to fix #6312
2. Added new `ereport(LOG` to make debugging easier as part of telemetry review.
2022-09-16 10:11:37 -07:00
Onur Tirtir b7ae596fe8
Bump citus version to 11.1.1 (#6356) 2022-09-16 12:33:09 +03:00
Onur Tirtir 6f4324623c Add changelog entries for 11.1.1 (#6354)
(cherry picked from commit 8b5cdaf0e9)
2022-09-16 12:17:25 +03:00
Marco Slot d5db0adc17 Allow create_distributed_table_concurrently on an empty node (#6353)
Co-authored-by: Marco Slot <marco.slot@gmail.com>
2022-09-16 12:17:25 +03:00
Onur Tirtir 099523452e Add changelog entries for 11.1.0 (#6349)
Created by executing `prepare_changelog.pl citus 11.1.0 2022-03-29`.

(cherry picked from commit 57e354ac91)
2022-09-16 11:20:00 +03:00
Onder Kalaci af448da1a7 Prevent failures on partitioned distributed tables with statistics objects on PG 15
Comment from the code is clear on this:
/*
 * The statistics objects of the distributed table are not relevant
 * for the distributed planning, so we can override it.
 *
 * Normally, we should not need this. However, the combination of
 * Postgres commit 269b532aef55a579ae02a3e8e8df14101570dfd9 and
 * Citus function AdjustPartitioningForDistributedPlanning()
 * forces us to do this. The commit expects statistics objects
 * of partitions to have "inh" flag set properly. Whereas, the
 * function overrides "inh" flag. To avoid Postgres to throw error,
 * we override statlist such that Postgres does not try to process
 * any statistics objects during the standard_planner() on the
 * coordinator. In the end, we do not need the standard_planner()
 * on the coordinator to generate an optimized plan. We call
 * into standard_planner() for other purposes, such as generating the
 * relationRestrictionContext here.
 *
 * AdjustPartitioningForDistributedPlanning() is a hack that we use
 * to prevent Postgres' standard_planner() to expand all the partitions
 * for the distributed planning when a distributed partitioned table
 * is queried. It is required for both correctness and performance
 * reasons. Although we can eliminate the use of the function for
 * the correctness (e.g., make sure that rest of the planner can handle
 * partitions), it's performance implication is hard to avoid. Certain
 * planning logic of Citus (such as router or query pushdown) relies
 * heavily on the relationRestrictionList. If
 * AdjustPartitioningForDistributedPlanning() is removed, all the
 * partitions show up in the, causing high planning times for
 * such queries.
 */
2022-09-15 14:37:28 +03:00
Sameer Awasekar acccad9879 Introduce code changes to fix Issue:6303 (#6328)
The PR introduces code changes to fix Issue
[6303](https://github.com/citusdata/citus/issues/6303)

`create_distributed_table_concurrently` following drop column, creates a
buggy situation in split decoder.
 * Consider the below scenario:
* Session1 : Drop column followed by
create_distributed_table_concurrently
 * Session2 : Concurrent insert workload

The child shards created by `create_distributed_table_concurrently` will
have less columns than the source shard because some column were
dropped. The incoming tuple from session2 will have more columns as the
writes happened on source shard. But now the tuple needs to be applied
on child shard. So we need to format existing tuple according to child
schema and skip dropped column values.
The PR fixes this by reformatting the tuple according the target child
schema.

Test:
1) isolation_create_distributed_concurrently_after_drop_column - Repros
the issue and tests on the same.
2022-09-15 09:48:43 +02:00
aykut-bozkurt 77947da17c ensure we have more active nodes than replication factor. (#6341)
DESCRIPTION: Fixes floating exception during
create_distributed_table_concurrently.

Fixes #6332.
During create_distributed_table_concurrently, when there is no active
primary node, it fails with floating exception. We added similar check
with create_distributed_table. It will fail with proper message if
current active node is less than replication factor.

(cherry picked from commit 739b91afa6)
2022-09-14 18:22:58 +03:00
Marco Slot 7d56c25e28 Fix escaping in sequence dependency queries (#6345)
Co-authored-by: Marco Slot <marco.slot@gmail.com>
2022-09-14 16:52:25 +02:00
Marco Slot eba70af7a2 Fix bugs in CheckIfRelationWithSameNameExists (#6343)
Co-authored-by: Marco Slot <marco.slot@gmail.com>
2022-09-14 15:59:43 +02:00
Hanefi Onaldi 3f33390f45 Bump Citus version to 11.1.0 2022-09-14 14:21:43 +03:00
Nils Dijk 7b51f3eee2
Fix: rebalance stop non super user (#6334)
No need for description, fixing issue introduced with new feature for
11.1

Fixes #6333 

Due to Postgres' C api being o-indexed and postgres' attributes being
1-indexed, we were reading the wrong Datum as the Task owner when
cancelling. Here we add a test to show the error and fix the off-by-one
error.
2022-09-13 23:20:06 +02:00
51 changed files with 2604 additions and 277 deletions

View File

@ -1,3 +1,146 @@
### citus v11.1.2 (September 30, 2022) ###
* Disallows having `ON DELETE/UPDATE SET DEFAULT` actions on columns that
default to sequences
* Fixes a bug that might cause inserting incorrect `DEFAULT` values when
applying foreign key actions
* Fixes a performance issue related to shard-moves by creating replica
identities before copying shards
* Improves logging during shard-splits and resource cleanup
* Makes sure to reuse connections for shard-splits and logical replication
* Makes sure to try dropping replication slots a few more times after a failure
at the end of the shard-split
### citus v11.1.1 (September 16, 2022) ###
* Fixes a bug that prevents `create_distributed_table_concurrently()` working
on an empty node
### citus v11.1.0 (September 15, 2022) ###
* Adds support for PostgreSQL 15beta4
* Adds ability to run shard rebalancer in the background
* Adds `create_distributed_table_concurrently()` UDF to distribute tables
without interrupting the application
* Adds `citus_split_shard_by_split_points()` UDF that allows
splitting a shard to specified set of nodes without blocking writes
and based on given split points
* Adds support for non-blocking tenant isolation
* Adds support for isolation tenants that use partitioned tables
or columnar tables
* Separates columnar table access method into a separate logical extension
* Adds support for online replication in `replicate_reference_tables()`
* Improves performance of blocking shard moves
* Improves non-blocking shard moves with a faster custom copy logic
* Creates all foreign keys quickly at the end of a shard move
* Limits `get_rebalance_progress()` to show shards in moving state
* Makes `citus_move_shard_placement()` idempotent if shard already exists
on target node
* Shows `citus_copy_shard_placement()` progress in `get_rebalance_progres()`
* Supports changing CPU priorities for backends and shard moves
* Adds the GUC `citus.allow_unsafe_constraints` to allow unique/exclusion/
primary key constraints without distribution column
* Introduces GUC `citus.skip_constraint_validation`
* Introduces `citus_locks` view
* Improves `citus_tables` view by showing local tables added to metadata
* Improves columnar table access method by moving old catalog tables into
an internal schema and introduces more secure & informative views based
on them
* Adds support for `GRANT/REVOKE` on aggregates
* Adds support for `NULLS NOT DISTINCT` clauses for indexes for PG15+
* Adds support for setting relation options for columnar tables using
`ALTER TABLE`
* Adds support for unlogged distributed sequences
* Removes `do_repair` option from `citus_copy_shard_placement()`
* Removes deprecated re-partitioning functions like
`worker_hash_partition_table()`
* Drops support for isolation tenants that use replicated tables
* Checks existence of the shards before insert, delete, and update
* Hides tables owned by extensions from `citus_tables` and `citus_shards`
* Propagates `VACUUM` and `ANALYZE` to worker nodes
* Makes non-partitioned table size calculation quicker
* Improves `create_distributed_table()` by creating new colocation entries when
using `colocate_with => 'none'`
* Ensures that `SELECT .. FOR UPDATE` opens a transaction block when used in
a function call
* Prevents a segfault by disallowing usage of SQL functions referencing to a
distributed table
* Prevents creating a new colocation entry when replicating reference tables
* Fixes a bug in query escaping in `undistribute_table()` and
`alter_distributed_table()`
* Fixes a bug preventing the usage of `isolate_tenant_to_new_shard()` with text
column
* Fixes a bug that may cause `GRANT` to propagate within `CREATE EXTENSION`
* Fixes a bug that causes incorrectly marking `metadatasynced` flag for
coordinator
* Fixes a bug that may prevent Citus from creating function in transaction
block properly
* Fixes a bug that prevents promoting read-replicas as primaries
* Fixes a bug that prevents setting colocation group of a partitioned
distributed table to `none`
* Fixes a bug that prevents using `AUTO` option for `VACUUM (INDEX_CLEANUP)`
operation
* Fixes a segfault in `citus_copy_shard_placement()`
* Fixes an issue that can cause logical reference table replication to fail
* Fixes schema name qualification for `RENAME SEQUENCE` statement
* Fixes several small memory leaks
* Fixes the transaction timestamp column of the `get_current_transaction_id()`
on coordinator
* Maps any unused parameters to a generic type in prepared statements
### citus v10.2.8 (August 19, 2022) ###
* Fixes compilation warning caused by latest upgrade script changes

18
configure vendored
View File

@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.69 for Citus 11.1devel.
# Generated by GNU Autoconf 2.69 for Citus 11.1.2.
#
#
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
@ -579,8 +579,8 @@ MAKEFLAGS=
# Identity of this package.
PACKAGE_NAME='Citus'
PACKAGE_TARNAME='citus'
PACKAGE_VERSION='11.1devel'
PACKAGE_STRING='Citus 11.1devel'
PACKAGE_VERSION='11.1.2'
PACKAGE_STRING='Citus 11.1.2'
PACKAGE_BUGREPORT=''
PACKAGE_URL=''
@ -1262,7 +1262,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
\`configure' configures Citus 11.1devel to adapt to many kinds of systems.
\`configure' configures Citus 11.1.2 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]...
@ -1324,7 +1324,7 @@ fi
if test -n "$ac_init_help"; then
case $ac_init_help in
short | recursive ) echo "Configuration of Citus 11.1devel:";;
short | recursive ) echo "Configuration of Citus 11.1.2:";;
esac
cat <<\_ACEOF
@ -1429,7 +1429,7 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
Citus configure 11.1devel
Citus configure 11.1.2
generated by GNU Autoconf 2.69
Copyright (C) 2012 Free Software Foundation, Inc.
@ -1912,7 +1912,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.
It was created by Citus $as_me 11.1devel, which was
It was created by Citus $as_me 11.1.2, which was
generated by GNU Autoconf 2.69. Invocation command line was
$ $0 $@
@ -5393,7 +5393,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
This file was extended by Citus $as_me 11.1devel, which was
This file was extended by Citus $as_me 11.1.2, which was
generated by GNU Autoconf 2.69. Invocation command line was
CONFIG_FILES = $CONFIG_FILES
@ -5455,7 +5455,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\
Citus config.status 11.1devel
Citus config.status 11.1.2
configured by $0, generated by GNU Autoconf 2.69,
with options \\"\$ac_cs_config\\"

View File

@ -5,7 +5,7 @@
# everyone needing autoconf installed, the resulting files are checked
# into the SCM.
AC_INIT([Citus], [11.1devel])
AC_INIT([Citus], [11.1.2])
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
# we'll need sed and awk for some of the version commands

View File

@ -1604,6 +1604,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
}
else if (ShouldSyncTableMetadata(sourceId))
{
char *qualifiedTableName = quote_qualified_identifier(schemaName, sourceName);
/*
* We are converting a citus local table to a distributed/reference table,
* so we should prevent dropping the sequence on the table. Otherwise, we'd
@ -1612,8 +1614,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.worker_drop_sequence_dependency('%s');",
quote_qualified_identifier(schemaName, sourceName));
"SELECT pg_catalog.worker_drop_sequence_dependency(%s);",
quote_literal_cstr(qualifiedTableName));
SendCommandToWorkersWithMetadata(command->data);
}
@ -1903,11 +1905,17 @@ CreateWorkerChangeSequenceDependencyCommand(char *sequenceSchemaName, char *sequ
char *sourceSchemaName, char *sourceName,
char *targetSchemaName, char *targetName)
{
char *qualifiedSchemaName = quote_qualified_identifier(sequenceSchemaName,
sequenceName);
char *qualifiedSourceName = quote_qualified_identifier(sourceSchemaName, sourceName);
char *qualifiedTargetName = quote_qualified_identifier(targetSchemaName, targetName);
StringInfo query = makeStringInfo();
appendStringInfo(query, "SELECT worker_change_sequence_dependency('%s', '%s', '%s')",
quote_qualified_identifier(sequenceSchemaName, sequenceName),
quote_qualified_identifier(sourceSchemaName, sourceName),
quote_qualified_identifier(targetSchemaName, targetName));
appendStringInfo(query, "SELECT worker_change_sequence_dependency(%s, %s, %s)",
quote_literal_cstr(qualifiedSchemaName),
quote_literal_cstr(qualifiedSourceName),
quote_literal_cstr(qualifiedTargetName));
return query->data;
}

View File

@ -87,8 +87,8 @@ static List * ReversedOidList(List *oidList);
static void AppendExplicitIndexIdsToList(Form_pg_index indexForm,
List **explicitIndexIdList,
int flags);
static void DropDefaultExpressionsAndMoveOwnedSequenceOwnerships(Oid sourceRelationId,
Oid targetRelationId);
static void DropNextValExprsAndMoveOwnedSeqOwnerships(Oid sourceRelationId,
Oid targetRelationId);
static void DropDefaultColumnDefinition(Oid relationId, char *columnName);
static void TransferSequenceOwnership(Oid ownedSequenceId, Oid targetRelationId,
char *columnName);
@ -128,6 +128,9 @@ citus_add_local_table_to_metadata_internal(Oid relationId, bool cascadeViaForeig
{
CheckCitusVersion(ERROR);
/* enable citus_add_local_table_to_metadata on an empty node */
InsertCoordinatorIfClusterEmpty();
bool autoConverted = false;
CreateCitusLocalTable(relationId, cascadeViaForeignKeys, autoConverted);
}
@ -363,11 +366,11 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
/*
* Move sequence ownerships from shard table to shell table and also drop
* DEFAULT expressions from shard relation as we should evaluate such columns
* in shell table when needed.
* DEFAULT expressions based on sequences from shard relation as we should
* evaluate such columns in shell table when needed.
*/
DropDefaultExpressionsAndMoveOwnedSequenceOwnerships(shardRelationId,
shellRelationId);
DropNextValExprsAndMoveOwnedSeqOwnerships(shardRelationId,
shellRelationId);
InsertMetadataForCitusLocalTable(shellRelationId, shardId, autoConverted);
@ -1158,14 +1161,15 @@ GetRenameStatsCommandList(List *statsOidList, uint64 shardId)
/*
* DropDefaultExpressionsAndMoveOwnedSequenceOwnerships drops default column
* definitions for relation with sourceRelationId. Also, for each column that
* defaults to an owned sequence, it grants ownership to the same named column
* of the relation with targetRelationId.
* DropNextValExprsAndMoveOwnedSeqOwnerships drops default column definitions
* that are based on sequences for relation with sourceRelationId.
*
* Also, for each such column that owns a sequence, it grants ownership to the
* same named column of the relation with targetRelationId.
*/
static void
DropDefaultExpressionsAndMoveOwnedSequenceOwnerships(Oid sourceRelationId,
Oid targetRelationId)
DropNextValExprsAndMoveOwnedSeqOwnerships(Oid sourceRelationId,
Oid targetRelationId)
{
List *columnNameList = NIL;
List *ownedSequenceIdList = NIL;
@ -1176,9 +1180,28 @@ DropDefaultExpressionsAndMoveOwnedSequenceOwnerships(Oid sourceRelationId,
Oid ownedSequenceId = InvalidOid;
forboth_ptr_oid(columnName, columnNameList, ownedSequenceId, ownedSequenceIdList)
{
DropDefaultColumnDefinition(sourceRelationId, columnName);
/*
* We drop nextval() expressions because Citus currently evaluates
* nextval() on the shell table, not on the shards. Hence, there is
* no reason for keeping nextval(). Also, distributed/reference table
* shards do not have - so be consistent with those.
*
* Note that we keep other kind of DEFAULT expressions on shards
* because we still want to be able to evaluate DEFAULT expressions
* that are not based on sequences on shards, e.g., for foreign key
* - SET DEFAULT actions.
*/
AttrNumber columnAttrNumber = get_attnum(sourceRelationId, columnName);
if (ColumnDefaultsToNextVal(sourceRelationId, columnAttrNumber))
{
DropDefaultColumnDefinition(sourceRelationId, columnName);
}
/* column might not own a sequence */
/*
* Column might own a sequence without having a nextval() expr on it
* --e.g., due to ALTER SEQUENCE OWNED BY .. --, so check if that is
* the case even if the column doesn't have a DEFAULT.
*/
if (OidIsValid(ownedSequenceId))
{
TransferSequenceOwnership(ownedSequenceId, targetRelationId, columnName);

View File

@ -382,7 +382,6 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
"citus.shard_replication_factor > 1")));
}
EnsureCoordinatorIsInMetadata();
EnsureCitusTableCanBeCreated(relationId);
EnsureValidDistributionColumn(relationId, distributionColumnName);
@ -528,6 +527,14 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
colocatedTableId = ColocatedTableId(colocationId);
}
List *workerNodeList = DistributedTablePlacementNodeList(NoLock);
if (workerNodeList == NIL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("no worker nodes are available for placing shards"),
errhint("Add more worker nodes.")));
}
List *workersForPlacementList;
List *shardSplitPointsList;
@ -555,7 +562,6 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
/*
* Place shards in a round-robin fashion across all data nodes.
*/
List *workerNodeList = DistributedTablePlacementNodeList(NoLock);
workersForPlacementList = RoundRobinWorkerNodeList(workerNodeList, shardCount);
}
@ -856,6 +862,8 @@ WorkerNodesForShardList(List *shardList)
static List *
RoundRobinWorkerNodeList(List *workerNodeList, int listLength)
{
Assert(workerNodeList != NIL);
List *nodeIdList = NIL;
for (int idx = 0; idx < listLength; idx++)

View File

@ -23,12 +23,14 @@
#include "catalog/pg_type.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/commands/sequence.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/listutils.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/multi_join_order.h"
#include "distributed/namespace_utils.h"
#include "distributed/reference_table_utils.h"
#include "distributed/utils/array_type.h"
#include "distributed/version_compat.h"
#include "miscadmin.h"
#include "utils/builtins.h"
@ -57,6 +59,8 @@ typedef bool (*CheckRelationFunc)(Oid);
/* Local functions forward declarations */
static void EnsureReferencingTableNotReplicated(Oid referencingTableId);
static void EnsureSupportedFKeyOnDistKey(Form_pg_constraint constraintForm);
static bool ForeignKeySetsNextValColumnToDefault(HeapTuple pgConstraintTuple);
static List * ForeignKeyGetDefaultingAttrs(HeapTuple pgConstraintTuple);
static void EnsureSupportedFKeyBetweenCitusLocalAndRefTable(Form_pg_constraint
constraintForm,
char
@ -256,6 +260,23 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
referencedReplicationModel = referencingReplicationModel;
}
/*
* Given that we drop DEFAULT nextval('sequence') expressions from
* shard relation columns, allowing ON DELETE/UPDATE SET DEFAULT
* on such columns causes inserting NULL values to referencing relation
* as a result of a delete/update operation on referenced relation.
*
* For this reason, we disallow ON DELETE/UPDATE SET DEFAULT actions
* on columns that default to sequences.
*/
if (ForeignKeySetsNextValColumnToDefault(heapTuple))
{
ereport(ERROR, (errmsg("cannot create foreign key constraint "
"since Citus does not support ON DELETE "
"/ UPDATE SET DEFAULT actions on the "
"columns that default to sequences")));
}
bool referencingIsCitusLocalOrRefTable =
(referencingDistMethod == DISTRIBUTE_BY_NONE);
bool referencedIsCitusLocalOrRefTable =
@ -358,6 +379,104 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
}
/*
* ForeignKeySetsNextValColumnToDefault returns true if at least one of the
* columns specified in ON DELETE / UPDATE SET DEFAULT clauses default to
* nextval().
*/
static bool
ForeignKeySetsNextValColumnToDefault(HeapTuple pgConstraintTuple)
{
Form_pg_constraint pgConstraintForm =
(Form_pg_constraint) GETSTRUCT(pgConstraintTuple);
List *setDefaultAttrs = ForeignKeyGetDefaultingAttrs(pgConstraintTuple);
AttrNumber setDefaultAttr = InvalidAttrNumber;
foreach_int(setDefaultAttr, setDefaultAttrs)
{
if (ColumnDefaultsToNextVal(pgConstraintForm->conrelid, setDefaultAttr))
{
return true;
}
}
return false;
}
/*
* ForeignKeyGetDefaultingAttrs returns a list of AttrNumbers
* might be set to default ON DELETE or ON UPDATE.
*
* For example; if the foreign key has SET DEFAULT clause for
* both actions, then returns a superset of the attributes that
* might be set to DEFAULT on either of those actions.
*/
static List *
ForeignKeyGetDefaultingAttrs(HeapTuple pgConstraintTuple)
{
bool isNull = false;
Datum referencingColumnsDatum = SysCacheGetAttr(CONSTROID, pgConstraintTuple,
Anum_pg_constraint_conkey, &isNull);
if (isNull)
{
ereport(ERROR, (errmsg("got NULL conkey from catalog")));
}
List *referencingColumns =
IntegerArrayTypeToList(DatumGetArrayTypeP(referencingColumnsDatum));
Form_pg_constraint pgConstraintForm =
(Form_pg_constraint) GETSTRUCT(pgConstraintTuple);
if (pgConstraintForm->confupdtype == FKCONSTR_ACTION_SETDEFAULT)
{
/*
* Postgres doesn't allow specifying SET DEFAULT for a subset of
* the referencing columns for ON UPDATE action, so in that case
* we return all referencing columns regardless of what ON DELETE
* action says.
*/
return referencingColumns;
}
if (pgConstraintForm->confdeltype != FKCONSTR_ACTION_SETDEFAULT)
{
return NIL;
}
List *onDeleteSetDefColumnList = NIL;
#if PG_VERSION_NUM >= PG_VERSION_15
Datum onDeleteSetDefColumnsDatum = SysCacheGetAttr(CONSTROID, pgConstraintTuple,
Anum_pg_constraint_confdelsetcols,
&isNull);
/*
* confdelsetcols being NULL means that "ON DELETE SET DEFAULT" doesn't
* specify which subset of columns should be set to DEFAULT, so fetching
* NULL from the catalog is also possible.
*/
if (!isNull)
{
onDeleteSetDefColumnList =
IntegerArrayTypeToList(DatumGetArrayTypeP(onDeleteSetDefColumnsDatum));
}
#endif
if (list_length(onDeleteSetDefColumnList) == 0)
{
/*
* That means that all referencing columns need to be set to
* DEFAULT.
*/
return referencingColumns;
}
else
{
return onDeleteSetDefColumnList;
}
}
/*
* EnsureSupportedFKeyBetweenCitusLocalAndRefTable is a helper function that
* takes a foreign key constraint form for a foreign key between two citus

View File

@ -27,6 +27,7 @@
#include "nodes/makefuncs.h"
#include "distributed/worker_create_or_replace.h"
#include "nodes/parsenodes.h"
#include "rewrite/rewriteHandler.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@ -213,6 +214,29 @@ ExtractDefaultColumnsAndOwnedSequences(Oid relationId, List **columnNameList,
}
/*
* ColumnDefaultsToNextVal returns true if the column with attrNumber
* has a default expression that contains nextval().
*/
bool
ColumnDefaultsToNextVal(Oid relationId, AttrNumber attrNumber)
{
AssertArg(AttributeNumberIsValid(attrNumber));
Relation relation = RelationIdGetRelation(relationId);
Node *defExpr = build_column_default(relation, attrNumber);
RelationClose(relation);
if (defExpr == NULL)
{
/* column doesn't have a DEFAULT expression */
return false;
}
return contain_nextval_expression_walker(defExpr, NULL);
}
/*
* PreprocessDropSequenceStmt gets called during the planning phase of a DROP SEQUENCE statement
* and returns a list of DDLJob's that will drop any distributed sequences from the

View File

@ -16,7 +16,7 @@
#include "miscadmin.h"
#include "safe_lib.h"
#include "postmaster/postmaster.h"
#include "access/hash.h"
#include "commands/dbcommands.h"
#include "distributed/backend_data.h"
@ -63,7 +63,6 @@ static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static bool ShouldShutdownConnection(MultiConnection *connection, const int
cachedConnectionCount);
static void ResetConnection(MultiConnection *connection);
static bool RemoteTransactionIdle(MultiConnection *connection);
static int EventSetSizeForConnectionList(List *connections);
@ -244,6 +243,23 @@ GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
}
/*
* GetConnectionForLocalQueriesOutsideTransaction returns a localhost connection for
* subtransaction. To avoid creating excessive connections, we reuse an
* existing connection.
*/
MultiConnection *
GetConnectionForLocalQueriesOutsideTransaction(char *userName)
{
int connectionFlag = OUTSIDE_TRANSACTION;
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlag, LocalHostName, PostPortNumber,
userName, get_database_name(MyDatabaseId));
return connection;
}
/*
* StartNodeUserDatabaseConnection() initiates a connection to a remote node.
*
@ -688,8 +704,8 @@ CloseConnection(MultiConnection *connection)
dlist_delete(&connection->connectionNode);
/* same for transaction state and shard/placement machinery */
CloseRemoteTransaction(connection);
CloseShardPlacementAssociation(connection);
ResetRemoteTransaction(connection);
/* we leave the per-host entry alive */
pfree(connection);
@ -1443,7 +1459,10 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
/*
* reset healthy session lifespan connections.
*/
ResetConnection(connection);
ResetRemoteTransaction(connection);
UnclaimConnection(connection);
cachedConnectionCount++;
}
@ -1482,24 +1501,6 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
}
/*
* ResetConnection preserves the given connection for later usage by
* resetting its states.
*/
static void
ResetConnection(MultiConnection *connection)
{
/* reset per-transaction state */
ResetRemoteTransaction(connection);
ResetShardPlacementAssociation(connection);
/* reset copy state */
connection->copyBytesWrittenSinceLastFlush = 0;
UnclaimConnection(connection);
}
/*
* RemoteTransactionIdle function returns true if we manually
* set flag on run_commands_on_session_level_connection_to_node to true to

View File

@ -4022,7 +4022,7 @@ CancelTasksForJob(int64 jobid)
}
/* make sure the current user has the rights to cancel this task */
Oid taskOwner = DatumGetObjectId(values[Anum_pg_dist_background_task_owner]);
Oid taskOwner = DatumGetObjectId(values[Anum_pg_dist_background_task_owner - 1]);
if (superuser_arg(taskOwner) && !superuser())
{
/* must be a superuser to cancel tasks owned by superuser */

View File

@ -303,21 +303,40 @@ DropOrphanedShardsForCleanup()
workerNode->workerName,
workerNode->workerPort))
{
if (record->policy == CLEANUP_DEFERRED_ON_SUCCESS)
{
ereport(LOG, (errmsg("deferred drop of orphaned shard %s on %s:%d "
"completed",
qualifiedTableName,
workerNode->workerName, workerNode->workerPort)));
}
else
{
ereport(LOG, (errmsg("cleaned up orphaned shard %s on %s:%d which "
"was left behind after a failed operation",
qualifiedTableName,
workerNode->workerName, workerNode->workerPort)));
}
/* delete the cleanup record */
DeleteCleanupRecordByRecordId(record->recordId);
removedShardCountForCleanup++;
}
else
{
/*
* We log failures at the end, since they occur repeatedly
* for a large number of objects.
*/
failedShardCountForCleanup++;
}
}
if (failedShardCountForCleanup > 0)
{
ereport(WARNING, (errmsg("Failed to cleanup %d shards out of %d",
failedShardCountForCleanup, list_length(
cleanupRecordList))));
ereport(WARNING, (errmsg("failed to clean up %d orphaned shards out of %d",
failedShardCountForCleanup,
list_length(cleanupRecordList))));
}
return removedShardCountForCleanup;
@ -396,19 +415,29 @@ DropOrphanedShardsForMove(bool waitForLocks)
shardPlacement->nodeName,
shardPlacement->nodePort))
{
ereport(LOG, (errmsg("deferred drop of orphaned shard %s on %s:%d "
"after a move completed",
qualifiedTableName,
shardPlacement->nodeName,
shardPlacement->nodePort)));
/* delete the actual placement */
DeleteShardPlacementRow(placement->placementId);
removedShardCount++;
}
else
{
/*
* We log failures at the end, since they occur repeatedly
* for a large number of objects.
*/
failedShardDropCount++;
}
}
if (failedShardDropCount > 0)
{
ereport(WARNING, (errmsg("Failed to drop %d orphaned shards out of %d",
ereport(WARNING, (errmsg("failed to clean up %d orphaned shards out of %d",
failedShardDropCount, list_length(shardPlacementList))));
}
@ -436,7 +465,7 @@ RegisterOperationNeedingCleanup(void)
* completion with failure. This will trigger cleanup of appropriate resources.
*/
void
FinalizeOperationNeedingCleanupOnFailure()
FinalizeOperationNeedingCleanupOnFailure(const char *operationName)
{
/* We must have a valid OperationId. Any operation requring cleanup
* will call RegisterOperationNeedingCleanup.
@ -454,8 +483,9 @@ FinalizeOperationNeedingCleanupOnFailure()
/* We only supporting cleaning shards right now */
if (record->objectType != CLEANUP_OBJECT_SHARD_PLACEMENT)
{
ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ",
record->objectType)));
ereport(WARNING, (errmsg(
"Invalid object type %d on failed operation cleanup",
record->objectType)));
continue;
}
@ -473,6 +503,12 @@ FinalizeOperationNeedingCleanupOnFailure()
workerNode->workerName,
workerNode->workerPort))
{
ereport(LOG, (errmsg("cleaned up orphaned shard %s on %s:%d after a "
"%s operation failed",
qualifiedTableName,
workerNode->workerName, workerNode->workerPort,
operationName)));
/*
* Given the operation is failing and we will abort its transaction, we cannot delete
* records in the current transaction. Delete these records outside of the
@ -483,23 +519,22 @@ FinalizeOperationNeedingCleanupOnFailure()
}
else
{
/*
* We log failures at the end, since they occur repeatedly
* for a large number of objects.
*/
failedShardCountOnComplete++;
}
}
}
if (list_length(currentOperationRecordList) > 0)
if (failedShardCountOnComplete > 0)
{
ereport(LOG, (errmsg("Removed %d orphaned shards out of %d",
removedShardCountOnComplete, list_length(
currentOperationRecordList))));
if (failedShardCountOnComplete > 0)
{
ereport(WARNING, (errmsg("Failed to cleanup %d shards out of %d",
failedShardCountOnComplete, list_length(
currentOperationRecordList))));
}
ereport(WARNING, (errmsg("failed to clean up %d orphaned shards out of %d after "
"a %s operation failed",
failedShardCountOnComplete,
list_length(currentOperationRecordList),
operationName)));
}
}
@ -509,7 +544,7 @@ FinalizeOperationNeedingCleanupOnFailure()
* completion with success. This will trigger cleanup of appropriate resources.
*/
void
FinalizeOperationNeedingCleanupOnSuccess()
FinalizeOperationNeedingCleanupOnSuccess(const char *operationName)
{
/* We must have a valid OperationId. Any operation requring cleanup
* will call RegisterOperationNeedingCleanup.
@ -527,8 +562,9 @@ FinalizeOperationNeedingCleanupOnSuccess()
/* We only supporting cleaning shards right now */
if (record->objectType != CLEANUP_OBJECT_SHARD_PLACEMENT)
{
ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ",
record->objectType)));
ereport(WARNING, (errmsg(
"Invalid object type %d on operation cleanup",
record->objectType)));
continue;
}
@ -546,6 +582,12 @@ FinalizeOperationNeedingCleanupOnSuccess()
workerNode->workerName,
workerNode->workerPort))
{
ereport(LOG, (errmsg("cleaned up orphaned shard %s on %s:%d after a "
"%s operation completed",
qualifiedTableName,
workerNode->workerName, workerNode->workerPort,
operationName)));
/*
* Delete cleanup records outside transaction as:
* The resources are marked as 'CLEANUP_ALWAYS' and should be cleaned no matter
@ -556,6 +598,10 @@ FinalizeOperationNeedingCleanupOnSuccess()
}
else
{
/*
* We log failures at the end, since they occur repeatedly
* for a large number of objects.
*/
failedShardCountOnComplete++;
}
}
@ -570,18 +616,14 @@ FinalizeOperationNeedingCleanupOnSuccess()
}
}
if (list_length(currentOperationRecordList) > 0)
if (failedShardCountOnComplete > 0)
{
ereport(LOG, (errmsg("Removed %d orphaned shards out of %d",
removedShardCountOnComplete, list_length(
currentOperationRecordList))));
if (failedShardCountOnComplete > 0)
{
ereport(WARNING, (errmsg("Failed to cleanup %d shards out of %d",
failedShardCountOnComplete, list_length(
currentOperationRecordList))));
}
ereport(WARNING, (errmsg(
"failed to clean up %d orphaned shards out of %d after "
"a %s operation completed",
failedShardCountOnComplete,
list_length(currentOperationRecordList),
operationName)));
}
}
@ -670,10 +712,10 @@ InsertCleanupRecordInSubtransaction(CleanupObject objectType,
nodeGroupId,
policy);
SendCommandListToWorkerOutsideTransaction(LocalHostName,
PostPortNumber,
CitusExtensionOwnerName(),
list_make1(command->data));
MultiConnection *connection =
GetConnectionForLocalQueriesOutsideTransaction(CitusExtensionOwnerName());
SendCommandListToWorkerOutsideTransactionWithConnection(connection,
list_make1(command->data));
}
@ -691,10 +733,10 @@ DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId)
PG_DIST_CLEANUP,
recordId);
SendCommandListToWorkerOutsideTransaction(LocalHostName,
PostPortNumber,
CitusExtensionOwnerName(),
list_make1(command->data));
MultiConnection *connection = GetConnectionForLocalQueriesOutsideTransaction(
CitusExtensionOwnerName());
SendCommandListToWorkerOutsideTransactionWithConnection(connection,
list_make1(command->data));
}
@ -727,18 +769,11 @@ TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode)
* true on success.
*/
static bool
TryDropShardOutsideTransaction(OperationId operationId, char *qualifiedTableName,
char *nodeName, int nodePort)
TryDropShardOutsideTransaction(OperationId operationId,
char *qualifiedTableName,
char *nodeName,
int nodePort)
{
char *operation = (operationId == INVALID_OPERATION_ID) ? "move" : "cleanup";
ereport(LOG, (errmsg("cleaning up %s on %s:%d which was left "
"after a %s",
qualifiedTableName,
nodeName,
nodePort,
operation)));
/* prepare sql query to execute to drop the shard */
StringInfo dropQuery = makeStringInfo();
appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, qualifiedTableName);
@ -756,10 +791,14 @@ TryDropShardOutsideTransaction(OperationId operationId, char *qualifiedTableName
dropQuery->data);
/* remove the shard from the node */
bool success = SendOptionalCommandListToWorkerOutsideTransaction(nodeName,
nodePort,
NULL,
dropCommandList);
int connectionFlags = OUTSIDE_TRANSACTION;
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
CurrentUserName(),
NULL);
bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
workerConnection,
dropCommandList);
return success;
}
@ -800,13 +839,8 @@ GetNextOperationId()
appendStringInfo(nextValueCommand, "SELECT nextval(%s);",
quote_literal_cstr(sequenceName->data));
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag,
LocalHostName,
PostPortNumber,
CitusExtensionOwnerName(),
get_database_name(
MyDatabaseId));
MultiConnection *connection = GetConnectionForLocalQueriesOutsideTransaction(
CitusExtensionOwnerName());
PGresult *result = NULL;
int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data,
@ -821,7 +855,6 @@ GetNextOperationId()
PQclear(result);
ForgetResults(connection);
CloseConnection(connection);
return operationdId;
}

View File

@ -151,7 +151,7 @@ static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
List *destinationWorkerNodesList,
DistributionColumnMap *
distributionColumnOverrides);
static void ExecuteSplitShardReleaseSharedMemory(WorkerNode *sourceWorkerNode);
static void ExecuteSplitShardReleaseSharedMemory(MultiConnection *sourceConnection);
static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32
targetNodeId,
ShardInterval *shardInterval);
@ -169,6 +169,12 @@ static const char *const SplitOperationName[] =
[ISOLATE_TENANT_TO_NEW_SHARD] = "isolate",
[CREATE_DISTRIBUTED_TABLE] = "create"
};
static const char *const SplitOperationAPIName[] =
{
[SHARD_SPLIT_API] = "citus_split_shard_by_split_points",
[ISOLATE_TENANT_TO_NEW_SHARD] = "isolate_tenant_to_new_shard",
[CREATE_DISTRIBUTED_TABLE] = "create_distributed_table_concurrently"
};
static const char *const SplitTargetName[] =
{
[SHARD_SPLIT_API] = "shard",
@ -469,6 +475,8 @@ SplitShard(SplitMode splitMode,
List *colocatedShardIntervalList,
uint32 targetColocationId)
{
const char *operationName = SplitOperationAPIName[splitOperation];
ErrorIfModificationAndSplitInTheSameTransaction(splitOperation);
ShardInterval *shardIntervalToSplit = LoadShardInterval(shardIdToSplit);
@ -526,6 +534,8 @@ SplitShard(SplitMode splitMode,
if (splitMode == BLOCKING_SPLIT)
{
ereport(LOG, (errmsg("performing blocking %s ", operationName)));
BlockingShardSplit(
splitOperation,
splitWorkflowId,
@ -536,6 +546,8 @@ SplitShard(SplitMode splitMode,
}
else
{
ereport(LOG, (errmsg("performing non-blocking %s ", operationName)));
NonBlockingShardSplit(
splitOperation,
splitWorkflowId,
@ -548,7 +560,10 @@ SplitShard(SplitMode splitMode,
PlacementMovedUsingLogicalReplicationInTX = true;
}
FinalizeOperationNeedingCleanupOnSuccess();
/*
* Drop temporary objects that were marked as CLEANUP_ALWAYS.
*/
FinalizeOperationNeedingCleanupOnSuccess(operationName);
}
@ -569,6 +584,8 @@ BlockingShardSplit(SplitOperation splitOperation,
List *workersForPlacementList,
DistributionColumnMap *distributionColumnOverrides)
{
const char *operationName = SplitOperationAPIName[splitOperation];
BlockWritesToShardList(sourceColocatedShardIntervalList);
/* First create shard interval metadata for split children */
@ -583,10 +600,14 @@ BlockingShardSplit(SplitOperation splitOperation,
PG_TRY();
{
ereport(LOG, (errmsg("creating child shards for %s", operationName)));
/* Physically create split children. */
CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList);
ereport(LOG, (errmsg("performing copy for %s", operationName)));
/* For Blocking split, copy isn't snapshotted */
char *snapshotName = NULL;
DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList,
@ -596,6 +617,10 @@ BlockingShardSplit(SplitOperation splitOperation,
/* Used for testing */
ConflictOnlyWithIsolationTesting();
ereport(LOG, (errmsg(
"creating auxillary structures (indexes, stats, replicaindentities, triggers) for %s",
operationName)));
/* Create auxiliary structures (indexes, stats, replicaindentities, triggers) */
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList,
@ -617,10 +642,16 @@ BlockingShardSplit(SplitOperation splitOperation,
*/
if (DeferShardDeleteOnSplit)
{
ereport(LOG, (errmsg("marking deferred cleanup of source shard(s) for %s",
operationName)));
InsertDeferredDropCleanupRecordsForShards(sourceColocatedShardIntervalList);
}
else
{
ereport(LOG, (errmsg("performing cleanup of source shard(s) for %s",
operationName)));
DropShardList(sourceColocatedShardIntervalList);
}
@ -635,6 +666,9 @@ BlockingShardSplit(SplitOperation splitOperation,
shardGroupSplitIntervalListList,
workersForPlacementList);
ereport(LOG, (errmsg("creating foreign key constraints (if any) for %s",
operationName)));
/*
* Create foreign keys if exists after the metadata changes happening in
* DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
@ -649,7 +683,7 @@ BlockingShardSplit(SplitOperation splitOperation,
ShutdownAllConnections();
/* Do a best effort cleanup of shards created on workers in the above block */
FinalizeOperationNeedingCleanupOnFailure();
FinalizeOperationNeedingCleanupOnFailure(operationName);
PG_RE_THROW();
}
@ -670,10 +704,15 @@ CheckIfRelationWithSameNameExists(ShardInterval *shardInterval, WorkerNode *work
AppendShardIdToName(&shardName, shardInterval->shardId);
StringInfo checkShardExistsQuery = makeStringInfo();
/*
* We pass schemaName and shardName without quote_identifier, since
* they are used as strings here.
*/
appendStringInfo(checkShardExistsQuery,
"SELECT EXISTS (SELECT FROM pg_catalog.pg_tables WHERE schemaname = '%s' AND tablename = '%s');",
schemaName,
shardName);
"SELECT EXISTS (SELECT FROM pg_catalog.pg_tables WHERE schemaname = %s AND tablename = %s);",
quote_literal_cstr(schemaName),
quote_literal_cstr(shardName));
int connectionFlags = 0;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
@ -691,11 +730,13 @@ CheckIfRelationWithSameNameExists(ShardInterval *shardInterval, WorkerNode *work
ReportResultError(connection, result, ERROR);
}
char *checkExists = PQgetvalue(result, 0, 0);
char *existsString = PQgetvalue(result, 0, 0);
bool tableExists = strcmp(existsString, "t") == 0;
PQclear(result);
ForgetResults(connection);
return strcmp(checkExists, "t") == 0;
return tableExists;
}
@ -1015,11 +1056,13 @@ static void
CreateObjectOnPlacement(List *objectCreationCommandList,
WorkerNode *workerPlacementNode)
{
char *currentUser = CurrentUserName();
SendCommandListToWorkerOutsideTransaction(workerPlacementNode->workerName,
workerPlacementNode->workerPort,
currentUser,
objectCreationCommandList);
MultiConnection *connection =
GetNodeUserDatabaseConnection(OUTSIDE_TRANSACTION,
workerPlacementNode->workerName,
workerPlacementNode->workerPort,
NULL, NULL);
SendCommandListToWorkerOutsideTransactionWithConnection(connection,
objectCreationCommandList);
}
@ -1487,6 +1530,8 @@ NonBlockingShardSplit(SplitOperation splitOperation,
DistributionColumnMap *distributionColumnOverrides,
uint32 targetColocationId)
{
const char *operationName = SplitOperationAPIName[splitOperation];
ErrorIfMultipleNonblockingMoveSplitInTheSameTransaction();
char *superUser = CitusExtensionOwnerName();
@ -1529,6 +1574,9 @@ NonBlockingShardSplit(SplitOperation splitOperation,
/* Non-Blocking shard split workflow starts here */
PG_TRY();
{
ereport(LOG, (errmsg("creating child shards for %s",
operationName)));
/* 1) Physically create split children. */
CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList);
@ -1558,6 +1606,10 @@ NonBlockingShardSplit(SplitOperation splitOperation,
*/
CreateReplicaIdentitiesForDummyShards(mapOfPlacementToDummyShardList);
ereport(LOG, (errmsg(
"creating replication artifacts (publications, replication slots, subscriptions for %s",
operationName)));
/* 4) Create Publications. */
CreatePublications(sourceConnection, publicationInfoHash);
@ -1606,11 +1658,35 @@ NonBlockingShardSplit(SplitOperation splitOperation,
databaseName,
logicalRepTargetList);
/*
* We have to create the primary key (or any other replica identity)
* before the update/delete operations that are queued will be
* replicated. Because if the replica identity does not exist on the
* target, the replication would fail.
*
* So the latest possible moment we could do this is right after the
* initial data COPY, but before enabling the susbcriptions. It might
* seem like a good idea to it after the initial data COPY, since
* it's generally the rule that it's cheaper to build an index at once
* than to create it incrementally. This general rule, is why we create
* all the regular indexes as late during the move as possible.
*
* But as it turns out in practice it's not as clear cut, and we saw a
* speed degradation in the time it takes to move shards when doing the
* replica identity creation after the initial COPY. So, instead we
* keep it before the COPY.
*/
CreateReplicaIdentities(logicalRepTargetList);
ereport(LOG, (errmsg("performing copy for %s", operationName)));
/* 8) Do snapshotted Copy */
DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, workersForPlacementList,
snapshot, distributionColumnOverrides);
ereport(LOG, (errmsg("replicating changes for %s", operationName)));
/*
* 9) Logically replicate all the changes and do most of the table DDL,
* like index and foreign key creation.
@ -1631,10 +1707,16 @@ NonBlockingShardSplit(SplitOperation splitOperation,
*/
if (DeferShardDeleteOnSplit)
{
ereport(LOG, (errmsg("marking deferred cleanup of source shard(s) for %s",
operationName)));
InsertDeferredDropCleanupRecordsForShards(sourceColocatedShardIntervalList);
}
else
{
ereport(LOG, (errmsg("performing cleanup of source shard(s) for %s",
operationName)));
DropShardList(sourceColocatedShardIntervalList);
}
@ -1683,6 +1765,9 @@ NonBlockingShardSplit(SplitOperation splitOperation,
*/
CreatePartitioningHierarchy(logicalRepTargetList);
ereport(LOG, (errmsg("creating foreign key constraints (if any) for %s",
operationName)));
/*
* 14) Create foreign keys if exists after the metadata changes happening in
* DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
@ -1694,7 +1779,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
* 15) Release shared memory allocated by worker_split_shard_replication_setup udf
* at source node.
*/
ExecuteSplitShardReleaseSharedMemory(sourceShardToCopyNode);
ExecuteSplitShardReleaseSharedMemory(sourceConnection);
/* 16) Close source connection */
CloseConnection(sourceConnection);
@ -1716,7 +1801,11 @@ NonBlockingShardSplit(SplitOperation splitOperation,
*/
DropAllLogicalReplicationLeftovers(SHARD_SPLIT);
FinalizeOperationNeedingCleanupOnFailure();
/*
* Drop temporary objects that were marked as CLEANUP_ON_FAILURE
* or CLEANUP_ALWAYS.
*/
FinalizeOperationNeedingCleanupOnFailure(operationName);
PG_RE_THROW();
}
@ -1987,19 +2076,8 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
* shared memory to store split information. This has to be released after split completes(or fails).
*/
static void
ExecuteSplitShardReleaseSharedMemory(WorkerNode *sourceWorkerNode)
ExecuteSplitShardReleaseSharedMemory(MultiConnection *sourceConnection)
{
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(
connectionFlag,
sourceWorkerNode->workerName,
sourceWorkerNode->workerPort,
superUser,
databaseName);
StringInfo splitShardReleaseMemoryUDF = makeStringInfo();
appendStringInfo(splitShardReleaseMemoryUDF,
"SELECT pg_catalog.worker_split_shard_release_dsm();");
@ -2214,14 +2292,8 @@ GetNextShardIdForSplitChild()
appendStringInfo(nextValueCommand, "SELECT nextval(%s);", quote_literal_cstr(
"pg_catalog.pg_dist_shardid_seq"));
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag,
LocalHostName,
PostPortNumber,
CitusExtensionOwnerName(),
get_database_name(
MyDatabaseId));
MultiConnection *connection = GetConnectionForLocalQueriesOutsideTransaction(
CitusExtensionOwnerName());
PGresult *result = NULL;
int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data,
&result);
@ -2238,7 +2310,8 @@ GetNextShardIdForSplitChild()
}
shardId = SafeStringToUint64(PQgetvalue(result, 0, 0 /* nodeId column*/));
CloseConnection(connection);
PQclear(result);
ForgetResults(connection);
return shardId;
}

View File

@ -43,6 +43,10 @@ static DestReceiver * CreatePartitionedSplitCopyDestReceiver(EState *executor,
List *splitCopyInfoList);
static void BuildMinMaxRangeArrays(List *splitCopyInfoList, ArrayType **minValueArray,
ArrayType **maxValueArray);
static char * TraceWorkerSplitCopyUdf(char *sourceShardToCopySchemaName,
char *sourceShardToCopyPrefix,
char *sourceShardToCopyQualifiedName,
List *splitCopyInfoList);
/*
* worker_split_copy(source_shard_id bigint, splitCopyInfo pg_catalog.split_copy_info[])
@ -93,12 +97,18 @@ worker_split_copy(PG_FUNCTION_ARGS)
Oid sourceShardToCopySchemaOId = get_rel_namespace(
shardIntervalToSplitCopy->relationId);
char *sourceShardToCopySchemaName = get_namespace_name(sourceShardToCopySchemaOId);
char *sourceShardToCopyName = get_rel_name(shardIntervalToSplitCopy->relationId);
char *sourceShardPrefix = get_rel_name(shardIntervalToSplitCopy->relationId);
char *sourceShardToCopyName = pstrdup(sourceShardPrefix);
AppendShardIdToName(&sourceShardToCopyName, shardIdToSplitCopy);
char *sourceShardToCopyQualifiedName = quote_qualified_identifier(
sourceShardToCopySchemaName,
sourceShardToCopyName);
ereport(LOG, (errmsg("%s", TraceWorkerSplitCopyUdf(sourceShardToCopySchemaName,
sourceShardPrefix,
sourceShardToCopyQualifiedName,
splitCopyInfoList))));
StringInfo selectShardQueryForCopy = makeStringInfo();
appendStringInfo(selectShardQueryForCopy,
"SELECT * FROM %s;", sourceShardToCopyQualifiedName);
@ -113,6 +123,48 @@ worker_split_copy(PG_FUNCTION_ARGS)
}
/* Trace split copy udf */
static char *
TraceWorkerSplitCopyUdf(char *sourceShardToCopySchemaName,
char *sourceShardToCopyPrefix,
char *sourceShardToCopyQualifiedName,
List *splitCopyInfoList)
{
StringInfo splitCopyTrace = makeStringInfo();
appendStringInfo(splitCopyTrace, "performing copy from shard %s to [",
sourceShardToCopyQualifiedName);
/* split copy always has atleast two destinations */
int index = 1;
int splitWayCount = list_length(splitCopyInfoList);
SplitCopyInfo *splitCopyInfo = NULL;
foreach_ptr(splitCopyInfo, splitCopyInfoList)
{
char *shardNameCopy = pstrdup(sourceShardToCopyPrefix);
AppendShardIdToName(&shardNameCopy, splitCopyInfo->destinationShardId);
char *shardNameCopyQualifiedName = quote_qualified_identifier(
sourceShardToCopySchemaName,
shardNameCopy);
appendStringInfo(splitCopyTrace, "%s (nodeId: %u)", shardNameCopyQualifiedName,
splitCopyInfo->destinationShardNodeId);
pfree(shardNameCopy);
if (index < splitWayCount)
{
appendStringInfo(splitCopyTrace, ", ");
}
index++;
}
appendStringInfo(splitCopyTrace, "]");
return splitCopyTrace->data;
}
/* Parse a single SplitCopyInfo Tuple */
static void
ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo)

View File

@ -1897,14 +1897,14 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
MemoryContext restrictionsMemoryContext = plannerRestrictionContext->memoryContext;
MemoryContext oldMemoryContext = MemoryContextSwitchTo(restrictionsMemoryContext);
bool distributedTable = IsCitusTable(rte->relid);
bool isCitusTable = IsCitusTable(rte->relid);
RelationRestriction *relationRestriction = palloc0(sizeof(RelationRestriction));
relationRestriction->index = restrictionIndex;
relationRestriction->relationId = rte->relid;
relationRestriction->rte = rte;
relationRestriction->relOptInfo = relOptInfo;
relationRestriction->distributedRelation = distributedTable;
relationRestriction->citusTable = isCitusTable;
relationRestriction->plannerInfo = root;
/* see comments on GetVarFromAssignedParam() */
@ -1919,10 +1919,42 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
* We're also keeping track of whether all participant
* tables are reference tables.
*/
if (distributedTable)
if (isCitusTable)
{
cacheEntry = GetCitusTableCacheEntry(rte->relid);
/*
* The statistics objects of the distributed table are not relevant
* for the distributed planning, so we can override it.
*
* Normally, we should not need this. However, the combination of
* Postgres commit 269b532aef55a579ae02a3e8e8df14101570dfd9 and
* Citus function AdjustPartitioningForDistributedPlanning()
* forces us to do this. The commit expects statistics objects
* of partitions to have "inh" flag set properly. Whereas, the
* function overrides "inh" flag. To avoid Postgres to throw error,
* we override statlist such that Postgres does not try to process
* any statistics objects during the standard_planner() on the
* coordinator. In the end, we do not need the standard_planner()
* on the coordinator to generate an optimized plan. We call
* into standard_planner() for other purposes, such as generating the
* relationRestrictionContext here.
*
* AdjustPartitioningForDistributedPlanning() is a hack that we use
* to prevent Postgres' standard_planner() to expand all the partitions
* for the distributed planning when a distributed partitioned table
* is queried. It is required for both correctness and performance
* reasons. Although we can eliminate the use of the function for
* the correctness (e.g., make sure that rest of the planner can handle
* partitions), it's performance implication is hard to avoid. Certain
* planning logic of Citus (such as router or query pushdown) relies
* heavily on the relationRestrictionList. If
* AdjustPartitioningForDistributedPlanning() is removed, all the
* partitions show up in the, causing high planning times for
* such queries.
*/
relOptInfo->statlist = NIL;
relationRestrictionContext->allReferenceTables &=
IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE);
}

View File

@ -3692,7 +3692,7 @@ CopyRelationRestrictionContext(RelationRestrictionContext *oldContext)
newRestriction->index = oldRestriction->index;
newRestriction->relationId = oldRestriction->relationId;
newRestriction->distributedRelation = oldRestriction->distributedRelation;
newRestriction->citusTable = oldRestriction->citusTable;
newRestriction->rte = copyObject(oldRestriction->rte);
/* can't be copied, we copy (flatly) a RelOptInfo, and then decouple baserestrictinfo */

View File

@ -224,7 +224,7 @@ ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext)
{
RelationRestriction *relationRestriction = lfirst(relationRestrictionCell);
if (!relationRestriction->distributedRelation)
if (!relationRestriction->citusTable)
{
return true;
}

View File

@ -66,6 +66,7 @@
#include "utils/syscache.h"
#define STR_ERRCODE_UNDEFINED_OBJECT "42704"
#define STR_ERRCODE_OBJECT_IN_USE "55006"
#define REPLICATION_SLOT_CATALOG_TABLE_NAME "pg_replication_slots"
@ -156,6 +157,10 @@ static void WaitForGroupedLogicalRepTargetsToBecomeReady(
static void WaitForGroupedLogicalRepTargetsToCatchUp(XLogRecPtr sourcePosition,
GroupedLogicalRepTargets *
groupedLogicalRepTargets);
static void RecreateGroupedLogicalRepTargetsConnections(
HTAB *groupedLogicalRepTargetsHash,
char *user,
char *databaseName);
/*
* LogicallyReplicateShards replicates a list of shards from one node to another
@ -233,6 +238,26 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
/* only useful for isolation testing, see the function comment for the details */
ConflictOnlyWithIsolationTesting();
/*
* We have to create the primary key (or any other replica identity)
* before the update/delete operations that are queued will be
* replicated. Because if the replica identity does not exist on the
* target, the replication would fail.
*
* So the latest possible moment we could do this is right after the
* initial data COPY, but before enabling the susbcriptions. It might
* seem like a good idea to it after the initial data COPY, since
* it's generally the rule that it's cheaper to build an index at once
* than to create it incrementally. This general rule, is why we create
* all the regular indexes as late during the move as possible.
*
* But as it turns out in practice it's not as clear cut, and we saw a
* speed degradation in the time it takes to move shards when doing the
* replica identity creation after the initial COPY. So, instead we
* keep it before the COPY.
*/
CreateReplicaIdentities(logicalRepTargetList);
CopyShardsToNode(sourceNode, targetNode, shardList, snapshot);
/*
@ -346,20 +371,6 @@ CompleteNonBlockingShardTransfer(List *shardList,
HTAB *groupedLogicalRepTargetsHash,
LogicalRepType type)
{
/*
* We have to create the primary key (or any other replica identity)
* before the update/delete operations that are queued will be
* replicated. Because if the replica identity does not exist on the
* target, the replication would fail.
*
* So we it right after the initial data COPY, but before enabling the
* susbcriptions. We do it at this latest possible moment, because its
* much cheaper to build an index at once than to create it
* incrementally. So this way we create the primary key index in one go
* for all data from the initial COPY.
*/
CreateReplicaIdentities(logicalRepTargetList);
/* Start applying the changes from the replication slots to catch up. */
EnableSubscriptions(logicalRepTargetList);
@ -559,10 +570,10 @@ DropAllLogicalReplicationLeftovers(LogicalRepType type)
char *databaseName = get_database_name(MyDatabaseId);
/*
* We open new connections to all nodes. The reason for this is that
* operations on subscriptions, publications and replication slotscannot be
* run in a transaction. By forcing a new connection we make sure no
* transaction is active on the connection.
* We need connections that are not currently inside a transaction. The
* reason for this is that operations on subscriptions, publications and
* replication slots cannot be run in a transaction. By forcing a new
* connection we make sure no transaction is active on the connection.
*/
int connectionFlags = FORCE_NEW_CONNECTION;
@ -600,7 +611,9 @@ DropAllLogicalReplicationLeftovers(LogicalRepType type)
/*
* We close all connections that we opened for the dropping here. That
* way we don't keep these connections open unnecessarily during the
* 'LogicalRepType' operation (which can take a long time).
* 'LogicalRepType' operation (which can take a long time). We might
* need to reopen a few later on, but that seems better than keeping
* many open for no reason for a long time.
*/
CloseConnection(cleanupConnection);
}
@ -1150,11 +1163,14 @@ CreatePartitioningHierarchy(List *logicalRepTargetList)
* parallel, so create them sequentially. Also attaching partition
* is a quick operation, so it is fine to execute sequentially.
*/
SendCommandListToWorkerOutsideTransaction(
target->superuserConnection->hostname,
target->superuserConnection->port,
tableOwner,
list_make1(attachPartitionCommand));
MultiConnection *connection =
GetNodeUserDatabaseConnection(OUTSIDE_TRANSACTION,
target->superuserConnection->hostname,
target->superuserConnection->port,
tableOwner, NULL);
ExecuteCriticalRemoteCommand(connection, attachPartitionCommand);
MemoryContextReset(localContext);
}
}
@ -1203,10 +1219,8 @@ CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList)
list_make1("SET LOCAL citus.skip_constraint_validation TO ON;"),
commandList);
SendCommandListToWorkerOutsideTransaction(
target->superuserConnection->hostname,
target->superuserConnection->port,
target->superuserConnection->user,
SendCommandListToWorkerOutsideTransactionWithConnection(
target->superuserConnection,
commandList);
MemoryContextReset(localContext);
@ -1281,18 +1295,64 @@ DropPublications(MultiConnection *sourceConnection, HTAB *publicationInfoHash)
/*
* DropReplicationSlot drops the replication slot with the given name
* if it exists.
* if it exists. It retries if the command fails with an OBJECT_IN_USE error.
*/
static void
DropReplicationSlot(MultiConnection *connection, char *replicationSlotName)
{
ExecuteCriticalRemoteCommand(
connection,
psprintf(
"select pg_drop_replication_slot(slot_name) from "
REPLICATION_SLOT_CATALOG_TABLE_NAME
" where slot_name = %s",
quote_literal_cstr(replicationSlotName)));
int maxSecondsToTryDropping = 20;
bool raiseInterrupts = true;
PGresult *result = NULL;
/* we'll retry in case of an OBJECT_IN_USE error */
while (maxSecondsToTryDropping >= 0)
{
int querySent = SendRemoteCommand(
connection,
psprintf(
"select pg_drop_replication_slot(slot_name) from "
REPLICATION_SLOT_CATALOG_TABLE_NAME
" where slot_name = %s",
quote_literal_cstr(replicationSlotName))
);
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
result = GetRemoteCommandResult(connection, raiseInterrupts);
if (IsResponseOK(result))
{
/* no error, we are good to go */
break;
}
char *errorcode = PQresultErrorField(result, PG_DIAG_SQLSTATE);
if (errorcode != NULL && strcmp(errorcode, STR_ERRCODE_OBJECT_IN_USE) == 0 &&
maxSecondsToTryDropping > 0)
{
/* retry dropping the replication slot after sleeping for one sec */
maxSecondsToTryDropping--;
pg_usleep(1000);
}
else
{
/*
* Report error if:
* - Error code is not 55006 (Object In Use)
* - Or, we have made enough number of retries (currently 20), but didn't work
*/
ReportResultError(connection, result, ERROR);
}
PQclear(result);
ForgetResults(connection);
}
PQclear(result);
ForgetResults(connection);
}
@ -1585,11 +1645,11 @@ DropUser(MultiConnection *connection, char *username)
* The DROP USER command should not propagate, so we temporarily disable
* DDL propagation.
*/
SendCommandListToWorkerOutsideTransaction(
connection->hostname, connection->port, connection->user,
SendCommandListToWorkerOutsideTransactionWithConnection(
connection,
list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf("DROP USER IF EXISTS %s",
psprintf("DROP USER IF EXISTS %s;",
quote_identifier(username))));
}
@ -1771,14 +1831,12 @@ CreateSubscriptions(MultiConnection *sourceConnection,
* create a user with SUPERUSER permissions and then alter it to NOSUPERUSER.
* This prevents permission escalations.
*/
SendCommandListToWorkerOutsideTransaction(
target->superuserConnection->hostname,
target->superuserConnection->port,
target->superuserConnection->user,
SendCommandListToWorkerOutsideTransactionWithConnection(
target->superuserConnection,
list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf(
"CREATE USER %s SUPERUSER IN ROLE %s",
"CREATE USER %s SUPERUSER IN ROLE %s;",
target->subscriptionOwnerName,
GetUserNameFromId(ownerId, false)
)));
@ -1832,14 +1890,12 @@ CreateSubscriptions(MultiConnection *sourceConnection,
* The ALTER ROLE command should not propagate, so we temporarily
* disable DDL propagation.
*/
SendCommandListToWorkerOutsideTransaction(
target->superuserConnection->hostname,
target->superuserConnection->port,
target->superuserConnection->user,
SendCommandListToWorkerOutsideTransactionWithConnection(
target->superuserConnection,
list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf(
"ALTER ROLE %s NOSUPERUSER",
"ALTER ROLE %s NOSUPERUSER;",
target->subscriptionOwnerName
)));
}
@ -2001,8 +2057,12 @@ CreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash,
* RecreateGroupedLogicalRepTargetsConnections recreates connections for all of the
* nodes in the groupedLogicalRepTargetsHash where the old connection is broken or
* currently running a query.
*
* IMPORTANT: When it recreates the connection, it doesn't close the existing
* connection. This means that this function should only be called when we know
* we'll throw an error afterwards, otherwise we would leak these connections.
*/
void
static void
RecreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash,
char *user,
char *databaseName)
@ -2012,10 +2072,11 @@ RecreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash,
GroupedLogicalRepTargets *groupedLogicalRepTargets = NULL;
foreach_htab(groupedLogicalRepTargets, &status, groupedLogicalRepTargetsHash)
{
if (groupedLogicalRepTargets->superuserConnection &&
PQstatus(groupedLogicalRepTargets->superuserConnection->pgConn) ==
CONNECTION_OK &&
!PQisBusy(groupedLogicalRepTargets->superuserConnection->pgConn)
MultiConnection *superuserConnection =
groupedLogicalRepTargets->superuserConnection;
if (superuserConnection &&
PQstatus(superuserConnection->pgConn) == CONNECTION_OK &&
!PQisBusy(superuserConnection->pgConn)
)
{
continue;
@ -2023,12 +2084,12 @@ RecreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash,
WorkerNode *targetWorkerNode = FindNodeWithNodeId(
groupedLogicalRepTargets->nodeId,
false);
MultiConnection *superuserConnection =
GetNodeUserDatabaseConnection(connectionFlags,
targetWorkerNode->workerName,
targetWorkerNode->workerPort,
user,
databaseName);
superuserConnection = GetNodeUserDatabaseConnection(
connectionFlags,
targetWorkerNode->workerName,
targetWorkerNode->workerPort,
user,
databaseName);
/*
* Operations on subscriptions cannot run in a transaction block. We

View File

@ -34,6 +34,10 @@ static Oid FindTargetRelationOid(Relation sourceShardRelation,
HeapTuple tuple,
char *currentSlotName);
static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple,
TupleDesc sourceTupleDesc,
TupleDesc targetTupleDesc);
/*
* Postgres uses 'pgoutput' as default plugin for logical replication.
* We want to reuse Postgres pgoutput's functionality as much as possible.
@ -129,6 +133,71 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
Relation targetRelation = RelationIdGetRelation(targetRelationOid);
/*
* If any columns from source relation have been dropped, then the tuple needs to
* be formatted according to the target relation.
*/
TupleDesc sourceRelationDesc = RelationGetDescr(relation);
TupleDesc targetRelationDesc = RelationGetDescr(targetRelation);
if (sourceRelationDesc->natts > targetRelationDesc->natts)
{
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
{
HeapTuple sourceRelationNewTuple = &(change->data.tp.newtuple->tuple);
HeapTuple targetRelationNewTuple = GetTupleForTargetSchema(
sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc);
change->data.tp.newtuple->tuple = *targetRelationNewTuple;
break;
}
case REORDER_BUFFER_CHANGE_UPDATE:
{
HeapTuple sourceRelationNewTuple = &(change->data.tp.newtuple->tuple);
HeapTuple targetRelationNewTuple = GetTupleForTargetSchema(
sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc);
change->data.tp.newtuple->tuple = *targetRelationNewTuple;
/*
* Format oldtuple according to the target relation. If the column values of replica
* identiy change, then the old tuple is non-null and needs to be formatted according
* to the target relation schema.
*/
if (change->data.tp.oldtuple != NULL)
{
HeapTuple sourceRelationOldTuple = &(change->data.tp.oldtuple->tuple);
HeapTuple targetRelationOldTuple = GetTupleForTargetSchema(
sourceRelationOldTuple,
sourceRelationDesc,
targetRelationDesc);
change->data.tp.oldtuple->tuple = *targetRelationOldTuple;
}
break;
}
case REORDER_BUFFER_CHANGE_DELETE:
{
HeapTuple sourceRelationOldTuple = &(change->data.tp.oldtuple->tuple);
HeapTuple targetRelationOldTuple = GetTupleForTargetSchema(
sourceRelationOldTuple, sourceRelationDesc, targetRelationDesc);
change->data.tp.oldtuple->tuple = *targetRelationOldTuple;
break;
}
/* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */
default:
ereport(ERROR, errmsg(
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
change->action));
}
}
pgoutputChangeCB(ctx, txn, targetRelation, change);
RelationClose(targetRelation);
}
@ -223,3 +292,51 @@ GetHashValueForIncomingTuple(Relation sourceShardRelation,
return DatumGetInt32(hashedValueDatum);
}
/*
* GetTupleForTargetSchema returns a tuple with the schema of the target relation.
* If some columns within the source relations are dropped, we would have to reformat
* the tuple to match the schema of the target relation.
*
* Consider the below scenario:
* Session1 : Drop column followed by create_distributed_table_concurrently
* Session2 : Concurrent insert workload
*
* The child shards created by create_distributed_table_concurrently will have less columns
* than the source shard because some column were dropped.
* The incoming tuple from session2 will have more columns as the writes
* happened on source shard. But now the tuple needs to be applied on child shard. So we need to format
* it according to child schema.
*/
static HeapTuple
GetTupleForTargetSchema(HeapTuple sourceRelationTuple,
TupleDesc sourceRelDesc,
TupleDesc targetRelDesc)
{
/* Deform the tuple */
Datum *oldValues = (Datum *) palloc0(sourceRelDesc->natts * sizeof(Datum));
bool *oldNulls = (bool *) palloc0(sourceRelDesc->natts * sizeof(bool));
heap_deform_tuple(sourceRelationTuple, sourceRelDesc, oldValues,
oldNulls);
/* Create new tuple by skipping dropped columns */
int nextAttributeIndex = 0;
Datum *newValues = (Datum *) palloc0(targetRelDesc->natts * sizeof(Datum));
bool *newNulls = (bool *) palloc0(targetRelDesc->natts * sizeof(bool));
for (int i = 0; i < sourceRelDesc->natts; i++)
{
if (TupleDescAttr(sourceRelDesc, i)->attisdropped)
{
continue;
}
newValues[nextAttributeIndex] = oldValues[i];
newNulls[nextAttributeIndex] = oldNulls[i];
nextAttributeIndex++;
}
HeapTuple targetRelationTuple = heap_form_tuple(targetRelDesc, newValues, newNulls);
return targetRelationTuple;
}

View File

@ -751,12 +751,11 @@ MarkRemoteTransactionCritical(struct MultiConnection *connection)
/*
* CloseRemoteTransaction handles closing a connection that, potentially, is
* part of a coordinated transaction. This should only ever be called from
* connection_management.c, while closing a connection during a transaction.
* ResetRemoteTransaction resets the state of the transaction after the end of
* the main transaction, if the connection is being reused.
*/
void
CloseRemoteTransaction(struct MultiConnection *connection)
ResetRemoteTransaction(struct MultiConnection *connection)
{
RemoteTransaction *transaction = &connection->remoteTransaction;
@ -767,20 +766,14 @@ CloseRemoteTransaction(struct MultiConnection *connection)
dlist_delete(&connection->transactionNode);
}
}
/*
* ResetRemoteTransaction resets the state of the transaction after the end of
* the main transaction, if the connection is being reused.
*/
void
ResetRemoteTransaction(struct MultiConnection *connection)
{
RemoteTransaction *transaction = &connection->remoteTransaction;
/* just reset the entire state, relying on 0 being invalid/false */
memset(transaction, 0, sizeof(*transaction));
ResetShardPlacementAssociation(connection);
/* reset copy state */
connection->copyBytesWrittenSinceLastFlush = 0;
}

View File

@ -340,6 +340,25 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
nodeName, nodePort,
nodeUser, NULL);
SendCommandListToWorkerOutsideTransactionWithConnection(workerConnection,
commandList);
CloseConnection(workerConnection);
}
/*
* SendCommandListToWorkerOutsideTransactionWithConnection sends the command list
* over the specified connection. This opens a new transaction on the
* connection, thus it's important that no transaction is currently open.
* This function is mainly useful to avoid opening an closing
* connections excessively by allowing reusing a single connection to send
* multiple separately committing transactions. The function raises an error if
* any of the queries fail.
*/
void
SendCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerConnection,
List *commandList)
{
MarkRemoteTransactionCritical(workerConnection);
RemoteTransactionBegin(workerConnection);
@ -351,7 +370,7 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
}
RemoteTransactionCommit(workerConnection);
CloseConnection(workerConnection);
ResetRemoteTransaction(workerConnection);
}
@ -430,21 +449,18 @@ SendMetadataCommandListToWorkerListInCoordinatedTransaction(List *workerNodeList
/*
* SendOptionalCommandListToWorkerOutsideTransaction sends the given command
* list to the given worker in a single transaction that is outside of the
* coordinated tranaction. If any of the commands fail, it rollbacks the
* transaction, and otherwise commits.
* SendOptionalCommandListToWorkerOutsideTransactionWithConnection sends the
* given command list over a specified connection in a single transaction that
* is outside of the coordinated tranaction.
*
* If any of the commands fail, it rollbacks the transaction, and otherwise commits.
* A successful commit is indicated by returning true, and a failed commit by returning
* false.
*/
bool
SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
const char *nodeUser, List *commandList)
SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
MultiConnection *workerConnection, List *commandList)
{
int connectionFlags = FORCE_NEW_CONNECTION;
bool failed = false;
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
nodeUser, NULL);
if (PQstatus(workerConnection->pgConn) != CONNECTION_OK)
{
return false;
@ -452,6 +468,7 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no
RemoteTransactionBegin(workerConnection);
/* iterate over the commands and execute them in the same connection */
bool failed = false;
const char *commandString = NULL;
foreach_ptr(commandString, commandList)
{
@ -471,6 +488,30 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no
RemoteTransactionCommit(workerConnection);
}
ResetRemoteTransaction(workerConnection);
return !failed;
}
/*
* SendOptionalCommandListToWorkerOutsideTransaction sends the given command
* list to the given worker in a single transaction that is outside of the
* coordinated tranaction. If any of the commands fail, it rollbacks the
* transaction, and otherwise commits.
*/
bool
SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
const char *nodeUser, List *commandList)
{
int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
nodeUser, NULL);
bool failed = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
workerConnection,
commandList);
CloseConnection(workerConnection);
return !failed;

View File

@ -10,9 +10,11 @@
#ifndef CITUS_SEQUENCE_H
#define CITUS_SEQUENCE_H
#include "access/attnum.h"
#include "nodes/pg_list.h"
extern bool ColumnDefaultsToNextVal(Oid relationId, AttrNumber attrNumber);
extern void ExtractDefaultColumnsAndOwnedSequences(Oid relationId,
List **columnNameList,
List **ownedSequenceIdList);

View File

@ -289,6 +289,7 @@ extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname,
extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname,
int32 port, const char *user,
const char *database);
extern MultiConnection * GetConnectionForLocalQueriesOutsideTransaction(char *userName);
extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
const char *hostname,
int32 port,

View File

@ -56,7 +56,7 @@ typedef struct RelationRestriction
{
Index index;
Oid relationId;
bool distributedRelation;
bool citusTable;
RangeTblEntry *rte;
RelOptInfo *relOptInfo;
PlannerInfo *plannerInfo;

View File

@ -172,10 +172,6 @@ extern HTAB * CreateGroupedLogicalRepTargetsHash(List *subscriptionInfoList);
extern void CreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash,
char *user,
char *databaseName);
extern void RecreateGroupedLogicalRepTargetsConnections(
HTAB *groupedLogicalRepTargetsHash,
char *user,
char *databaseName);
extern void CloseGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash);
extern void CompleteNonBlockingShardTransfer(List *shardList,
MultiConnection *sourceConnection,

View File

@ -130,7 +130,6 @@ extern void MarkRemoteTransactionCritical(struct MultiConnection *connection);
* transaction managment code.
*/
extern void CloseRemoteTransaction(struct MultiConnection *connection);
extern void ResetRemoteTransaction(struct MultiConnection *connection);
/* perform handling for all in-progress transactions */

View File

@ -103,13 +103,13 @@ extern void InsertCleanupRecordInSubtransaction(CleanupObject objectType,
* completion on failure. This will trigger cleanup of appropriate resources
* and cleanup records.
*/
extern void FinalizeOperationNeedingCleanupOnFailure(void);
extern void FinalizeOperationNeedingCleanupOnFailure(const char *operationName);
/*
* FinalizeOperationNeedingCleanupOnSuccess is be called by an operation to signal
* completion on success. This will trigger cleanup of appropriate resources
* and cleanup records.
*/
extern void FinalizeOperationNeedingCleanupOnSuccess(void);
extern void FinalizeOperationNeedingCleanupOnSuccess(const char *operationName);
#endif /*CITUS_SHARD_CLEANER_H */

View File

@ -12,6 +12,7 @@
#ifndef WORKER_TRANSACTION_H
#define WORKER_TRANSACTION_H
#include "distributed/connection_management.h"
#include "distributed/worker_manager.h"
#include "storage/lockdefs.h"
@ -59,6 +60,10 @@ extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeNa
int32 nodePort,
const char *nodeUser,
List *commandList);
extern bool SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
MultiConnection *workerConnection,
List *
commandList);
extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const
char *nodeName,
int32 nodePort,
@ -74,6 +79,9 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,
int32 nodePort,
const char *nodeUser,
List *commandList);
extern void SendCommandListToWorkerOutsideTransactionWithConnection(
MultiConnection *workerConnection,
List *commandList);
extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction(
List *workerNodeList,
const char *

View File

@ -8,5 +8,6 @@ test: isolation_cluster_management
test: isolation_logical_replication_single_shard_commands
test: isolation_logical_replication_multi_shard_commands
test: isolation_non_blocking_shard_split
test: isolation_create_distributed_concurrently_after_drop_column
test: isolation_non_blocking_shard_split_with_index_as_replicaIdentity
test: isolation_non_blocking_shard_split_fkey

View File

@ -176,5 +176,39 @@ SELECT citus_rebalance_wait();
(1 row)
DROP TABLE t1;
-- make sure a non-super user can stop rebalancing
CREATE USER non_super_user_rebalance WITH LOGIN;
GRANT ALL ON SCHEMA background_rebalance TO non_super_user_rebalance;
SET ROLE non_super_user_rebalance;
CREATE TABLE non_super_user_t1 (a int PRIMARY KEY);
SELECT create_distributed_table('non_super_user_t1', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT citus_move_shard_placement(85674008, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT 1 FROM citus_rebalance_start();
NOTICE: Scheduled 1 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_rebalance_stop();
citus_rebalance_stop
---------------------------------------------------------------------
(1 row)
RESET ROLE;
SET client_min_messages TO WARNING;
DROP SCHEMA background_rebalance CASCADE;

View File

@ -57,6 +57,35 @@ ERROR: cannot colocate tables nocolo and test
DETAIL: Distribution column types don't match for nocolo and test.
select create_distributed_table_concurrently('test','key', colocate_with := 'noexists');
ERROR: relation "noexists" does not exist
select citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
select citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
select create_distributed_table_concurrently('test','key');
NOTICE: relation test does not have a REPLICA IDENTITY or PRIMARY KEY
DETAIL: UPDATE and DELETE commands on the relation will error out during create_distributed_table_concurrently unless there is a REPLICA IDENTITY or PRIMARY KEY. INSERT commands will still work.
ERROR: no worker nodes are available for placing shards
HINT: Add more worker nodes.
select citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
select citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
-- use colocate_with "default"
select create_distributed_table_concurrently('test','key', shard_count := 11);
NOTICE: relation test does not have a REPLICA IDENTITY or PRIMARY KEY

View File

@ -41,8 +41,13 @@ SELECT * FROM shards_in_workers;
103 | worker1
(4 rows)
-- failure on creating the subscription
SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").kill()');
-- Failure on creating the subscription
-- Failing exactly on CREATE SUBSCRIPTION is causing flaky test where we fail with either:
-- 1) ERROR: connection to the remote node localhost:xxxxx failed with the following error: ERROR: subscription "citus_shard_move_subscription_xxxxxxx" does not exist
-- another command is already in progress
-- 2) ERROR: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
-- Instead fail on the next step (ALTER SUBSCRIPTION) instead which is also required logically as part of uber CREATE SUBSCRIPTION operation.
SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").kill()');
mitmproxy
---------------------------------------------------------------------

View File

@ -0,0 +1,667 @@
Parsed test spec with 3 sessions
starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
(0 rows)
tenant_id|dummy|measurement_id|payload|observation_time
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s1-alter-table:
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
step s1-set-factor-1:
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently:
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
<waiting ...>
step s2-insert-observations_with_pk:
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
step s2-update-observations_with_pk:
UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
step s2-end:
COMMIT;
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57636|1500004|t | 4
(1 row)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(4 rows)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently: <... completed>
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500006|t | 4
57637|1500008|t | 0
57638|1500005|t | 0
57638|1500007|t | 0
(4 rows)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(4 rows)
starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-primary-key-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
(0 rows)
tenant_id|dummy|measurement_id|payload|observation_time
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s1-alter-table:
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
step s1-set-factor-1:
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently:
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
<waiting ...>
step s2-insert-observations_with_pk:
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
step s2-update-primary-key-observations_with_pk:
UPDATE observations_with_pk set measurement_id=100 where tenant_id = 'tenant_id' and measurement_id = 4 ;
step s2-end:
COMMIT;
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57636|1500009|t | 4
(1 row)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 100|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(4 rows)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently: <... completed>
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500011|t | 4
57637|1500013|t | 0
57638|1500010|t | 0
57638|1500012|t | 0
(4 rows)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 100|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(4 rows)
starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-observations_with_pk s2-delete-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
(0 rows)
tenant_id|dummy|measurement_id|payload|observation_time
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s1-alter-table:
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
step s1-set-factor-1:
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently:
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
<waiting ...>
step s2-insert-observations_with_pk:
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
step s2-update-observations_with_pk:
UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
step s2-delete-observations_with_pk:
DELETE FROM observations_with_pk where tenant_id = 'tenant_id' and measurement_id = 3 ;
step s2-end:
COMMIT;
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57636|1500014|t | 3
(1 row)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(3 rows)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently: <... completed>
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500016|t | 3
57637|1500018|t | 0
57638|1500015|t | 0
57638|1500017|t | 0
(4 rows)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(3 rows)
starting permutation: s2-print-cluster-2 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations-2-concurrently s2-insert-observations_with_full_replica_identity s2-update-observations_with_full_replica_identity s2-end s2-print-cluster-2 s3-release-advisory-lock s2-print-cluster-2
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
(0 rows)
tenant_id|dummy|measurement_id|payload|observation_time
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s1-alter-table:
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
step s1-set-factor-1:
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step s1-create-distributed-table-observations-2-concurrently:
SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id');
<waiting ...>
step s2-insert-observations_with_full_replica_identity:
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
step s2-update-observations_with_full_replica_identity:
UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
step s2-end:
COMMIT;
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57636|1500019|t | 3
(1 row)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
(3 rows)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-create-distributed-table-observations-2-concurrently: <... completed>
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500021|t | 3
57637|1500023|t | 0
57638|1500020|t | 0
57638|1500022|t | 0
(4 rows)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
(3 rows)
starting permutation: s2-print-cluster-2 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations-2-concurrently s2-insert-observations_with_full_replica_identity s2-update-observations_with_full_replica_identity s2-delete-observations_with_full_replica_identity s2-end s2-print-cluster-2 s3-release-advisory-lock s2-print-cluster-2
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
(0 rows)
tenant_id|dummy|measurement_id|payload|observation_time
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s1-alter-table:
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
step s1-set-factor-1:
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step s1-create-distributed-table-observations-2-concurrently:
SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id');
<waiting ...>
step s2-insert-observations_with_full_replica_identity:
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
step s2-update-observations_with_full_replica_identity:
UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
step s2-delete-observations_with_full_replica_identity:
DELETE FROM observations_with_full_replica_identity where tenant_id = 'tenant_id' and measurement_id = 3 ;
step s2-end:
COMMIT;
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57636|1500024|t | 2
(1 row)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(2 rows)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-create-distributed-table-observations-2-concurrently: <... completed>
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500026|t | 2
57637|1500028|t | 0
57638|1500025|t | 0
57638|1500027|t | 0
(4 rows)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(2 rows)

View File

@ -91,7 +91,7 @@ step s1-drop-marked-shards:
<waiting ...>
s1: WARNING: canceling statement due to lock timeout
step s1-drop-marked-shards: <... completed>
s1: WARNING: Failed to drop 1 orphaned shards out of 1
s1: WARNING: failed to clean up 1 orphaned shards out of 1
step s1-commit:
COMMIT;

View File

@ -740,7 +740,7 @@ DETAIL: from localhost:xxxxx
(1 row)
CALL citus_cleanup_orphaned_shards();
LOG: cleaning up public.test_with_pkey_13000042 on localhost:xxxxx which was left after a move
LOG: deferred drop of orphaned shard public.test_with_pkey_13000042 on localhost:xxxxx after a move completed
NOTICE: cleaned up 1 orphaned shards
SET client_min_messages TO DEFAULT;
-- we don't support multiple shard moves in a single transaction

View File

@ -1177,7 +1177,7 @@ DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
SHOW citus.version;
citus.version
---------------------------------------------------------------------
11.1devel
11.1.2
(1 row)
-- ensure no unexpected objects were created outside pg_catalog
@ -1521,6 +1521,66 @@ SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenanc
1
(1 row)
-- confirm that we can create a distributed table concurrently on an empty node
DROP EXTENSION citus;
CREATE EXTENSION citus;
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SET citus.shard_replication_factor TO 1;
SET citus.defer_drop_after_shard_split TO off;
SELECT create_distributed_table_concurrently('test','x');
NOTICE: relation test does not have a REPLICA IDENTITY or PRIMARY KEY
DETAIL: UPDATE and DELETE commands on the relation will error out during create_distributed_table_concurrently unless there is a REPLICA IDENTITY or PRIMARY KEY. INSERT commands will still work.
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
DROP TABLE test;
TRUNCATE pg_dist_node;
-- confirm that we can create a distributed table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('test','x');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
DROP TABLE test;
TRUNCATE pg_dist_node;
-- confirm that we can create a reference table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SELECT create_reference_table('test');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test$$)
create_reference_table
---------------------------------------------------------------------
(1 row)
DROP TABLE test;
TRUNCATE pg_dist_node;
-- confirm that we can create a local table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SELECT citus_add_local_table_to_metadata('test');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
DROP TABLE test;
DROP EXTENSION citus;
CREATE EXTENSION citus;
DROP TABLE version_mismatch_table;
DROP SCHEMA multi_extension;
ERROR: cannot drop schema multi_extension because other objects depend on it

View File

@ -521,9 +521,9 @@ SELECT tablename, indexname FROM pg_indexes WHERE schemaname = 'fix_idx_names' O
tablename | indexname
---------------------------------------------------------------------
date_partitioned_citus_local_table | date_partitioned_citus_local_table_measureid_idx
date_partitioned_citus_local_table_361369 | date_partitioned_citus_local_table_measureid_idx_361369
date_partitioned_citus_local_table_361377 | date_partitioned_citus_local_table_measureid_idx_361377
partition_local_table | partition_local_table_measureid_idx
partition_local_table_361370 | partition_local_table_measureid_idx_361370
partition_local_table_361378 | partition_local_table_measureid_idx_361378
(4 rows)
-- creating a single object should only need to trigger fixing the single object
@ -753,7 +753,7 @@ DETAIL: drop cascades to table not_partitioned
drop cascades to table not_distributed
drop cascades to table fk_table
drop cascades to table p
drop cascades to table date_partitioned_citus_local_table_361369
drop cascades to table date_partitioned_citus_local_table_361377
drop cascades to table date_partitioned_citus_local_table
drop cascades to table parent_table
SELECT citus_remove_node('localhost', :master_port);

View File

@ -1172,5 +1172,108 @@ SELECT create_distributed_table ('dropfkeytest2', 'x', colocate_with:='none');
(1 row)
CREATE TABLE set_on_default_test_referenced(
col_1 int, col_2 int, col_3 int, col_4 int,
unique (col_1, col_3)
);
SELECT create_reference_table('set_on_default_test_referenced');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE set_on_default_test_referencing(
col_1 int, col_2 int, col_3 serial, col_4 int,
FOREIGN KEY(col_1, col_3)
REFERENCES set_on_default_test_referenced(col_1, col_3)
ON UPDATE SET DEFAULT
);
-- from distributed / reference to reference, fkey exists before calling the UDFs
SELECT create_distributed_table('set_on_default_test_referencing', 'col_1');
ERROR: cannot create foreign key constraint since Citus does not support ON DELETE / UPDATE SET DEFAULT actions on the columns that default to sequences
SELECT create_reference_table('set_on_default_test_referencing');
ERROR: cannot create foreign key constraint since Citus does not support ON DELETE / UPDATE SET DEFAULT actions on the columns that default to sequences
DROP TABLE set_on_default_test_referencing;
CREATE TABLE set_on_default_test_referencing(
col_1 serial, col_2 int, col_3 int, col_4 int
);
SELECT create_reference_table('set_on_default_test_referencing');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- from reference to reference, fkey doesn't exist before calling the UDFs
ALTER TABLE set_on_default_test_referencing ADD CONSTRAINT fkey
FOREIGN KEY(col_1, col_3) REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT;
ERROR: cannot create foreign key constraint since Citus does not support ON DELETE / UPDATE SET DEFAULT actions on the columns that default to sequences
DROP TABLE set_on_default_test_referencing;
CREATE TABLE set_on_default_test_referencing(
col_1 int, col_2 serial, col_3 int, col_4 bigserial
);
SELECT create_reference_table('set_on_default_test_referencing');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- ok since referencing columns are not based on sequences
ALTER TABLE set_on_default_test_referencing ADD CONSTRAINT fkey
FOREIGN KEY(col_1, col_3) REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT;
DROP TABLE set_on_default_test_referencing;
CREATE SEQUENCE test_sequence;
CREATE TABLE set_on_default_test_referencing(
col_1 int, col_2 int, col_3 int DEFAULT nextval('test_sequence'), col_4 int
);
SELECT create_distributed_table('set_on_default_test_referencing', 'col_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- from distributed to reference, fkey doesn't exist before calling the UDFs
ALTER TABLE set_on_default_test_referencing ADD CONSTRAINT fkey
FOREIGN KEY(col_1, col_3) REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT ON UPDATE SET DEFAULT;
ERROR: cannot create foreign key constraint since Citus does not support ON DELETE / UPDATE SET DEFAULT actions on the columns that default to sequences
DROP TABLE set_on_default_test_referenced;
CREATE TABLE set_on_default_test_referenced(
col_1 int, col_2 int, col_3 int, col_4 int,
unique (col_1, col_3)
);
SELECT create_distributed_table('set_on_default_test_referenced', 'col_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
DROP TABLE set_on_default_test_referencing;
CREATE TABLE set_on_default_test_referencing(
col_1 bigserial, col_2 int, col_3 int DEFAULT nextval('test_sequence'), col_4 int,
FOREIGN KEY(col_1, col_3)
REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT
);
-- from distributed to distributed, fkey exists before calling the UDFs
SELECT create_distributed_table('set_on_default_test_referencing', 'col_1');
ERROR: cannot create foreign key constraint since Citus does not support ON DELETE / UPDATE SET DEFAULT actions on the columns that default to sequences
DROP TABLE set_on_default_test_referencing;
CREATE TABLE set_on_default_test_referencing(
col_1 int DEFAULT nextval('test_sequence'), col_2 int, col_3 int, col_4 int
);
SELECT create_distributed_table('set_on_default_test_referencing', 'col_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- from distributed to distributed, fkey doesn't exist before calling the UDFs
ALTER TABLE set_on_default_test_referencing ADD CONSTRAINT fkey
FOREIGN KEY(col_1, col_3) REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT;
ERROR: cannot create foreign key constraint since Citus does not support ON DELETE / UPDATE SET DEFAULT actions on the columns that default to sequences
-- we no longer need those tables
DROP TABLE referenced_by_reference_table, references_to_reference_table, reference_table, reference_table_second, referenced_local_table, self_referencing_reference_table, dropfkeytest2;
DROP TABLE referenced_by_reference_table, references_to_reference_table, reference_table, reference_table_second, referenced_local_table, self_referencing_reference_table, dropfkeytest2,
set_on_default_test_referenced, set_on_default_test_referencing;

View File

@ -4324,12 +4324,66 @@ WHERE schemaname = 'partitioning_schema' AND tablename ilike '%part_table_with_%
(6 rows)
\c - - - :master_port
SET search_path TO partitioning_schema;
-- create parent table
CREATE TABLE stxdinp(i int, a int, b int) PARTITION BY RANGE (i);
-- create partition
CREATE TABLE stxdinp1 PARTITION OF stxdinp FOR VALUES FROM (1) TO (100);
-- populate table
INSERT INTO stxdinp SELECT 1, a/100, a/100 FROM generate_series(1, 999) a;
-- create extended statistics
CREATE STATISTICS stxdinp ON a, b FROM stxdinp;
-- distribute parent table
SELECT create_distributed_table('stxdinp', 'i');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$partitioning_schema.stxdinp1$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- run select query, works fine
SELECT a, b FROM stxdinp GROUP BY 1, 2;
a | b
---------------------------------------------------------------------
1 | 1
3 | 3
7 | 7
2 | 2
8 | 8
0 | 0
5 | 5
6 | 6
9 | 9
4 | 4
(10 rows)
-- partitions are processed recursively for PG15+
VACUUM ANALYZE stxdinp;
SELECT a, b FROM stxdinp GROUP BY 1, 2;
a | b
---------------------------------------------------------------------
1 | 1
3 | 3
7 | 7
2 | 2
8 | 8
0 | 0
5 | 5
6 | 6
9 | 9
4 | 4
(10 rows)
DROP SCHEMA partitioning_schema CASCADE;
NOTICE: drop cascades to 4 other objects
DETAIL: drop cascades to table partitioning_schema."schema-test"
drop cascades to table partitioning_schema.another_distributed_table
drop cascades to table partitioning_schema.distributed_parent_table
drop cascades to table partitioning_schema.part_table_with_very_long_name
NOTICE: drop cascades to 5 other objects
DETAIL: drop cascades to table "schema-test"
drop cascades to table another_distributed_table
drop cascades to table distributed_parent_table
drop cascades to table part_table_with_very_long_name
drop cascades to table stxdinp
RESET search_path;
DROP TABLE IF EXISTS
partitioning_hash_test,

View File

@ -758,7 +758,7 @@ SET search_path to "Tenant Isolation";
\set VERBOSITY terse
SELECT isolate_tenant_to_new_shard('orders_streaming', 104, 'CASCADE', shard_transfer_mode => 'block_writes');
WARNING: command DROP TABLE is disabled
WARNING: Failed to cleanup 1 shards out of 1
WARNING: failed to clean up 1 orphaned shards out of 1 after a isolate_tenant_to_new_shard operation failed
ERROR: command CREATE TABLE is disabled
\set VERBOSITY default
\c - postgres - :worker_1_port
@ -811,7 +811,7 @@ WARNING: command DROP TABLE is disabled
WARNING: command DROP TABLE is disabled
WARNING: command DROP TABLE is disabled
WARNING: command DROP TABLE is disabled
WARNING: Failed to cleanup 6 shards out of 6
WARNING: failed to clean up 6 orphaned shards out of 6 after a isolate_tenant_to_new_shard operation failed
ERROR: command DROP TABLE is disabled
\set VERBOSITY default
-- check if metadata is changed

View File

@ -790,7 +790,7 @@ SET search_path to "Tenant Isolation";
\set VERBOSITY terse
SELECT isolate_tenant_to_new_shard('orders_streaming', 104, 'CASCADE', shard_transfer_mode => 'force_logical');
WARNING: command DROP TABLE is disabled
WARNING: Failed to cleanup 1 shards out of 1
WARNING: failed to clean up 1 orphaned shards out of 1 after a isolate_tenant_to_new_shard operation failed
ERROR: command CREATE TABLE is disabled
\set VERBOSITY default
\c - postgres - :worker_1_port

View File

@ -349,6 +349,58 @@ NOTICE: renaming the new table to pg15.tbl2
(1 row)
-- Make sure that we allow foreign key columns on local tables added to
-- metadata to have SET NULL/DEFAULT on column basis.
CREATE TABLE PKTABLE_local (tid int, id int, PRIMARY KEY (tid, id));
CREATE TABLE FKTABLE_local (
tid int, id int,
fk_id_del_set_null int,
fk_id_del_set_default int DEFAULT 0,
FOREIGN KEY (tid, fk_id_del_set_null) REFERENCES PKTABLE_local ON DELETE SET NULL (fk_id_del_set_null),
FOREIGN KEY (tid, fk_id_del_set_default) REFERENCES PKTABLE_local ON DELETE SET DEFAULT (fk_id_del_set_default)
);
SELECT citus_add_local_table_to_metadata('FKTABLE_local', cascade_via_foreign_keys=>true);
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
-- show that the definition is expected
SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = 'FKTABLE_local'::regclass::oid ORDER BY oid;
pg_get_constraintdef
---------------------------------------------------------------------
FOREIGN KEY (tid, fk_id_del_set_default) REFERENCES pktable_local(tid, id) ON DELETE SET DEFAULT (fk_id_del_set_default)
FOREIGN KEY (tid, fk_id_del_set_null) REFERENCES pktable_local(tid, id) ON DELETE SET NULL (fk_id_del_set_null)
(2 rows)
\c - - - :worker_1_port
SET search_path TO pg15;
-- show that the definition is expected on the worker as well
SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = 'FKTABLE_local'::regclass::oid ORDER BY oid;
pg_get_constraintdef
---------------------------------------------------------------------
FOREIGN KEY (tid, fk_id_del_set_default) REFERENCES pktable_local(tid, id) ON DELETE SET DEFAULT (fk_id_del_set_default)
FOREIGN KEY (tid, fk_id_del_set_null) REFERENCES pktable_local(tid, id) ON DELETE SET NULL (fk_id_del_set_null)
(2 rows)
-- also, make sure that it works as expected
INSERT INTO PKTABLE_local VALUES (1, 0), (1, 1), (1, 2);
INSERT INTO FKTABLE_local VALUES
(1, 1, 1, NULL),
(1, 2, NULL, 2);
DELETE FROM PKTABLE_local WHERE id = 1 OR id = 2;
SELECT * FROM FKTABLE_local ORDER BY id;
tid | id | fk_id_del_set_null | fk_id_del_set_default
---------------------------------------------------------------------
1 | 1 | |
1 | 2 | | 0
(2 rows)
\c - - - :master_port
SET search_path TO pg15;
SET client_min_messages to ERROR;
DROP TABLE FKTABLE_local, PKTABLE_local;
RESET client_min_messages;
SELECT 1 FROM citus_remove_node('localhost', :master_port);
?column?
---------------------------------------------------------------------
@ -878,6 +930,72 @@ WARNING: not propagating CLUSTER command for partitioned table to worker nodes
HINT: Provide a child partition table names in order to CLUSTER distributed partitioned tables.
-- verify that we can still cluster the partition tables now since replication factor is 1
CLUSTER sale_newyork_repl_factor_1 USING sale_newyork_repl_factor_1_pkey;
CREATE TABLE set_on_default_test_referenced(
col_1 int, col_2 int, col_3 int, col_4 int,
unique (col_1, col_3)
);
SELECT create_reference_table('set_on_default_test_referenced');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE set_on_default_test_referencing(
col_1 int, col_2 int, col_3 serial, col_4 int,
FOREIGN KEY(col_1, col_3)
REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT (col_1)
ON UPDATE SET DEFAULT
);
-- should error since col_3 defaults to a sequence
SELECT create_reference_table('set_on_default_test_referencing');
ERROR: cannot create foreign key constraint since Citus does not support ON DELETE / UPDATE SET DEFAULT actions on the columns that default to sequences
DROP TABLE set_on_default_test_referencing;
CREATE TABLE set_on_default_test_referencing(
col_1 int, col_2 int, col_3 serial, col_4 int,
FOREIGN KEY(col_1, col_3)
REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT (col_1)
);
-- should not error since this doesn't set any sequence based columns to default
SELECT create_reference_table('set_on_default_test_referencing');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO set_on_default_test_referenced (col_1, col_3) VALUES (1, 1);
INSERT INTO set_on_default_test_referencing (col_1, col_3) VALUES (1, 1);
DELETE FROM set_on_default_test_referenced;
SELECT * FROM set_on_default_test_referencing ORDER BY 1,2;
col_1 | col_2 | col_3 | col_4
---------------------------------------------------------------------
| | 1 |
(1 row)
DROP TABLE set_on_default_test_referencing;
SET client_min_messages to ERROR;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupId => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
RESET client_min_messages;
-- should error since col_3 defaults to a sequence
CREATE TABLE set_on_default_test_referencing(
col_1 int, col_2 int, col_3 serial, col_4 int,
FOREIGN KEY(col_1, col_3)
REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT (col_3)
);
ERROR: cannot create foreign key constraint since Citus does not support ON DELETE / UPDATE SET DEFAULT actions on the columns that default to sequences
SELECT 1 FROM citus_remove_node('localhost', :master_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- Clean up
RESET citus.shard_replication_factor;
\set VERBOSITY terse

View File

@ -250,6 +250,52 @@ BEGIN;
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON DELETE SET DEFAULT);
ERROR: cannot switch local execution status from local execution disabled to local execution enabled since it can cause visibility problems in the current transaction
ROLLBACK;
CREATE TABLE set_on_default_test_referenced(
col_1 int, col_2 int, col_3 int, col_4 int,
unique (col_1, col_3)
);
SELECT create_reference_table('set_on_default_test_referenced');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- from citus local to reference - 1
CREATE TABLE set_on_default_test_referencing(
col_1 int, col_2 int, col_3 serial, col_4 int,
FOREIGN KEY(col_1, col_3)
REFERENCES set_on_default_test_referenced(col_1, col_3)
ON UPDATE SET DEFAULT
);
ERROR: cannot create foreign key constraint since Citus does not support ON DELETE / UPDATE SET DEFAULT actions on the columns that default to sequences
CREATE TABLE set_on_default_test_referencing(
col_1 serial, col_2 int, col_3 int, col_4 int
);
-- from citus local to reference - 2
ALTER TABLE set_on_default_test_referencing ADD CONSTRAINT fkey
FOREIGN KEY(col_1, col_3) REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT;
ERROR: cannot create foreign key constraint since Citus does not support ON DELETE / UPDATE SET DEFAULT actions on the columns that default to sequences
DROP TABLE set_on_default_test_referencing, set_on_default_test_referenced;
NOTICE: executing the command locally: DROP TABLE IF EXISTS ref_citus_local_fkeys.set_on_default_test_referenced_xxxxx CASCADE
CREATE TABLE set_on_default_test_referenced(
col_1 int, col_2 int, col_3 int, col_4 int,
unique (col_1, col_3)
);
SELECT citus_add_local_table_to_metadata('set_on_default_test_referenced');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
-- from citus local to citus local
CREATE TABLE set_on_default_test_referencing(
col_1 int, col_2 int, col_3 serial, col_4 int,
FOREIGN KEY(col_1, col_3)
REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT
);
ERROR: cannot create foreign key constraint since Citus does not support ON DELETE / UPDATE SET DEFAULT actions on the columns that default to sequences
-- cleanup at exit
DROP SCHEMA ref_citus_local_fkeys CASCADE;
NOTICE: drop cascades to 6 other objects
NOTICE: drop cascades to 8 other objects

View File

@ -0,0 +1,176 @@
#include "isolation_mx_common.include.spec"
// Test scenario for nonblocking split and concurrent INSERT/UPDATE/DELETE
// session s1 - Executes create_distributed_table_concurrently after dropping a column on tables with replica identities
// session s2 - Does concurrent inserts/update/delete
// session s3 - Holds advisory locks
setup
{
SET citus.shard_replication_factor TO 1;
CREATE TABLE observations_with_pk (
tenant_id text not null,
dummy int,
measurement_id bigserial not null,
payload jsonb not null,
observation_time timestamptz not null default '03/11/2018 02:00:00'::TIMESTAMP,
PRIMARY KEY (tenant_id, measurement_id)
);
CREATE TABLE observations_with_full_replica_identity (
tenant_id text not null,
dummy int,
measurement_id bigserial not null,
payload jsonb not null,
observation_time timestamptz not null default '03/11/2018 02:00:00'::TIMESTAMP
);
ALTER TABLE observations_with_full_replica_identity REPLICA IDENTITY FULL;
}
teardown
{
DROP TABLE observations_with_pk;
DROP TABLE observations_with_full_replica_identity;
}
session "s1"
step "s1-alter-table"
{
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
}
step "s1-set-factor-1"
{
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
}
step "s1-create-distributed-table-observations_with_pk-concurrently"
{
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
}
step "s1-create-distributed-table-observations-2-concurrently"
{
SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id');
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-insert-observations_with_pk"
{
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
}
step "s2-insert-observations_with_full_replica_identity"
{
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
}
step "s2-update-observations_with_pk"
{
UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
}
step "s2-update-primary-key-observations_with_pk"
{
UPDATE observations_with_pk set measurement_id=100 where tenant_id = 'tenant_id' and measurement_id = 4 ;
}
step "s2-update-observations_with_full_replica_identity"
{
UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
}
step "s2-delete-observations_with_pk"
{
DELETE FROM observations_with_pk where tenant_id = 'tenant_id' and measurement_id = 3 ;
}
step "s2-delete-observations_with_full_replica_identity"
{
DELETE FROM observations_with_full_replica_identity where tenant_id = 'tenant_id' and measurement_id = 3 ;
}
step "s2-end"
{
COMMIT;
}
step "s2-print-cluster-1"
{
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
}
step "s2-print-cluster-2"
{
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
}
session "s3"
// this advisory lock with (almost) random values are only used
// for testing purposes. For details, check Citus' logical replication
// source code
step "s3-acquire-advisory-lock"
{
SELECT pg_advisory_lock(44000, 55152);
}
step "s3-release-advisory-lock"
{
SELECT pg_advisory_unlock(44000, 55152);
}
// Concurrent Insert/Update with create_distributed_table_concurrently(with primary key as replica identity) after dropping a column:
// s3 holds advisory lock -> s1 starts create_distributed_table_concurrently and waits for advisory lock ->
// s2 concurrently inserts/deletes/updates rows -> s3 releases the advisory lock
// -> s1 complete create_distributed_table_concurrently -> result is reflected in new shards
permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1"
permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-primary-key-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1"
permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-observations_with_pk" "s2-delete-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1"
// Concurrent Insert/Update with create_distributed_table_concurrently(with replica identity full) after dropping a column:
// s3 holds advisory lock -> s1 starts create_distributed_table_concurrently and waits for advisory lock ->
// s2 concurrently inserts/deletes/updates rows -> s3 releases the advisory lock
// -> s1 complete create_distributed_table_concurrently -> result is reflected in new shards
permutation "s2-print-cluster-2" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations-2-concurrently" "s2-insert-observations_with_full_replica_identity" "s2-update-observations_with_full_replica_identity" "s2-end" "s2-print-cluster-2" "s3-release-advisory-lock" "s2-print-cluster-2"
permutation "s2-print-cluster-2" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations-2-concurrently" "s2-insert-observations_with_full_replica_identity" "s2-update-observations_with_full_replica_identity" "s2-delete-observations_with_full_replica_identity" "s2-end" "s2-print-cluster-2" "s3-release-advisory-lock" "s2-print-cluster-2"

View File

@ -59,6 +59,24 @@ SELECT 1 FROM citus_rebalance_start();
SELECT rebalance_table_shards();
SELECT citus_rebalance_wait();
DROP TABLE t1;
-- make sure a non-super user can stop rebalancing
CREATE USER non_super_user_rebalance WITH LOGIN;
GRANT ALL ON SCHEMA background_rebalance TO non_super_user_rebalance;
SET ROLE non_super_user_rebalance;
CREATE TABLE non_super_user_t1 (a int PRIMARY KEY);
SELECT create_distributed_table('non_super_user_t1', 'a', shard_count => 4, colocate_with => 'none');
SELECT citus_move_shard_placement(85674008, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes');
SELECT 1 FROM citus_rebalance_start();
SELECT citus_rebalance_stop();
RESET ROLE;
SET client_min_messages TO WARNING;
DROP SCHEMA background_rebalance CASCADE;

View File

@ -38,6 +38,12 @@ select create_distributed_table_concurrently('nocolo','x');
select create_distributed_table_concurrently('test','key', colocate_with := 'nocolo');
select create_distributed_table_concurrently('test','key', colocate_with := 'noexists');
select citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false);
select citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
select create_distributed_table_concurrently('test','key');
select citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true);
select citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
-- use colocate_with "default"
select create_distributed_table_concurrently('test','key', shard_count := 11);

View File

@ -32,8 +32,14 @@ INSERT INTO t SELECT x, x+1, MD5(random()::text) FROM generate_series(1,100000)
-- Initial shard placements
SELECT * FROM shards_in_workers;
-- failure on creating the subscription
SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").kill()');
-- Failure on creating the subscription
-- Failing exactly on CREATE SUBSCRIPTION is causing flaky test where we fail with either:
-- 1) ERROR: connection to the remote node localhost:xxxxx failed with the following error: ERROR: subscription "citus_shard_move_subscription_xxxxxxx" does not exist
-- another command is already in progress
-- 2) ERROR: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
-- Instead fail on the next step (ALTER SUBSCRIPTION) instead which is also required logically as part of uber CREATE SUBSCRIPTION operation.
SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").kill()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- Verify that the shard is not moved and the number of rows are still 100k

View File

@ -795,5 +795,39 @@ FROM test.maintenance_worker();
-- confirm that there is only one maintenance daemon
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
-- confirm that we can create a distributed table concurrently on an empty node
DROP EXTENSION citus;
CREATE EXTENSION citus;
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SET citus.shard_replication_factor TO 1;
SET citus.defer_drop_after_shard_split TO off;
SELECT create_distributed_table_concurrently('test','x');
DROP TABLE test;
TRUNCATE pg_dist_node;
-- confirm that we can create a distributed table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('test','x');
DROP TABLE test;
TRUNCATE pg_dist_node;
-- confirm that we can create a reference table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SELECT create_reference_table('test');
DROP TABLE test;
TRUNCATE pg_dist_node;
-- confirm that we can create a local table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SELECT citus_add_local_table_to_metadata('test');
DROP TABLE test;
DROP EXTENSION citus;
CREATE EXTENSION citus;
DROP TABLE version_mismatch_table;
DROP SCHEMA multi_extension;

View File

@ -696,5 +696,87 @@ DROP TABLE dropfkeytest1 CASCADE;
-- this should work
SELECT create_distributed_table ('dropfkeytest2', 'x', colocate_with:='none');
CREATE TABLE set_on_default_test_referenced(
col_1 int, col_2 int, col_3 int, col_4 int,
unique (col_1, col_3)
);
SELECT create_reference_table('set_on_default_test_referenced');
CREATE TABLE set_on_default_test_referencing(
col_1 int, col_2 int, col_3 serial, col_4 int,
FOREIGN KEY(col_1, col_3)
REFERENCES set_on_default_test_referenced(col_1, col_3)
ON UPDATE SET DEFAULT
);
-- from distributed / reference to reference, fkey exists before calling the UDFs
SELECT create_distributed_table('set_on_default_test_referencing', 'col_1');
SELECT create_reference_table('set_on_default_test_referencing');
DROP TABLE set_on_default_test_referencing;
CREATE TABLE set_on_default_test_referencing(
col_1 serial, col_2 int, col_3 int, col_4 int
);
SELECT create_reference_table('set_on_default_test_referencing');
-- from reference to reference, fkey doesn't exist before calling the UDFs
ALTER TABLE set_on_default_test_referencing ADD CONSTRAINT fkey
FOREIGN KEY(col_1, col_3) REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT;
DROP TABLE set_on_default_test_referencing;
CREATE TABLE set_on_default_test_referencing(
col_1 int, col_2 serial, col_3 int, col_4 bigserial
);
SELECT create_reference_table('set_on_default_test_referencing');
-- ok since referencing columns are not based on sequences
ALTER TABLE set_on_default_test_referencing ADD CONSTRAINT fkey
FOREIGN KEY(col_1, col_3) REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT;
DROP TABLE set_on_default_test_referencing;
CREATE SEQUENCE test_sequence;
CREATE TABLE set_on_default_test_referencing(
col_1 int, col_2 int, col_3 int DEFAULT nextval('test_sequence'), col_4 int
);
SELECT create_distributed_table('set_on_default_test_referencing', 'col_1');
-- from distributed to reference, fkey doesn't exist before calling the UDFs
ALTER TABLE set_on_default_test_referencing ADD CONSTRAINT fkey
FOREIGN KEY(col_1, col_3) REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT ON UPDATE SET DEFAULT;
DROP TABLE set_on_default_test_referenced;
CREATE TABLE set_on_default_test_referenced(
col_1 int, col_2 int, col_3 int, col_4 int,
unique (col_1, col_3)
);
SELECT create_distributed_table('set_on_default_test_referenced', 'col_1');
DROP TABLE set_on_default_test_referencing;
CREATE TABLE set_on_default_test_referencing(
col_1 bigserial, col_2 int, col_3 int DEFAULT nextval('test_sequence'), col_4 int,
FOREIGN KEY(col_1, col_3)
REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT
);
-- from distributed to distributed, fkey exists before calling the UDFs
SELECT create_distributed_table('set_on_default_test_referencing', 'col_1');
DROP TABLE set_on_default_test_referencing;
CREATE TABLE set_on_default_test_referencing(
col_1 int DEFAULT nextval('test_sequence'), col_2 int, col_3 int, col_4 int
);
SELECT create_distributed_table('set_on_default_test_referencing', 'col_1');
-- from distributed to distributed, fkey doesn't exist before calling the UDFs
ALTER TABLE set_on_default_test_referencing ADD CONSTRAINT fkey
FOREIGN KEY(col_1, col_3) REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT;
-- we no longer need those tables
DROP TABLE referenced_by_reference_table, references_to_reference_table, reference_table, reference_table_second, referenced_local_table, self_referencing_reference_table, dropfkeytest2;
DROP TABLE referenced_by_reference_table, references_to_reference_table, reference_table, reference_table_second, referenced_local_table, self_referencing_reference_table, dropfkeytest2,
set_on_default_test_referenced, set_on_default_test_referencing;

View File

@ -2002,6 +2002,30 @@ SELECT tablename, indexname FROM pg_indexes
WHERE schemaname = 'partitioning_schema' AND tablename ilike '%part_table_with_%' ORDER BY 1, 2;
\c - - - :master_port
SET search_path TO partitioning_schema;
-- create parent table
CREATE TABLE stxdinp(i int, a int, b int) PARTITION BY RANGE (i);
-- create partition
CREATE TABLE stxdinp1 PARTITION OF stxdinp FOR VALUES FROM (1) TO (100);
-- populate table
INSERT INTO stxdinp SELECT 1, a/100, a/100 FROM generate_series(1, 999) a;
-- create extended statistics
CREATE STATISTICS stxdinp ON a, b FROM stxdinp;
-- distribute parent table
SELECT create_distributed_table('stxdinp', 'i');
-- run select query, works fine
SELECT a, b FROM stxdinp GROUP BY 1, 2;
-- partitions are processed recursively for PG15+
VACUUM ANALYZE stxdinp;
SELECT a, b FROM stxdinp GROUP BY 1, 2;
DROP SCHEMA partitioning_schema CASCADE;
RESET search_path;
DROP TABLE IF EXISTS

View File

@ -213,6 +213,47 @@ WHEN MATCHED THEN DELETE;
-- now, both distributed, not works
SELECT undistribute_table('tbl1');
SELECT undistribute_table('tbl2');
-- Make sure that we allow foreign key columns on local tables added to
-- metadata to have SET NULL/DEFAULT on column basis.
CREATE TABLE PKTABLE_local (tid int, id int, PRIMARY KEY (tid, id));
CREATE TABLE FKTABLE_local (
tid int, id int,
fk_id_del_set_null int,
fk_id_del_set_default int DEFAULT 0,
FOREIGN KEY (tid, fk_id_del_set_null) REFERENCES PKTABLE_local ON DELETE SET NULL (fk_id_del_set_null),
FOREIGN KEY (tid, fk_id_del_set_default) REFERENCES PKTABLE_local ON DELETE SET DEFAULT (fk_id_del_set_default)
);
SELECT citus_add_local_table_to_metadata('FKTABLE_local', cascade_via_foreign_keys=>true);
-- show that the definition is expected
SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = 'FKTABLE_local'::regclass::oid ORDER BY oid;
\c - - - :worker_1_port
SET search_path TO pg15;
-- show that the definition is expected on the worker as well
SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = 'FKTABLE_local'::regclass::oid ORDER BY oid;
-- also, make sure that it works as expected
INSERT INTO PKTABLE_local VALUES (1, 0), (1, 1), (1, 2);
INSERT INTO FKTABLE_local VALUES
(1, 1, 1, NULL),
(1, 2, NULL, 2);
DELETE FROM PKTABLE_local WHERE id = 1 OR id = 2;
SELECT * FROM FKTABLE_local ORDER BY id;
\c - - - :master_port
SET search_path TO pg15;
SET client_min_messages to ERROR;
DROP TABLE FKTABLE_local, PKTABLE_local;
RESET client_min_messages;
SELECT 1 FROM citus_remove_node('localhost', :master_port);
SELECT create_distributed_table('tbl1', 'x');
@ -540,6 +581,56 @@ CLUSTER sale_repl_factor_1 USING sale_repl_factor_1_pk;
-- verify that we can still cluster the partition tables now since replication factor is 1
CLUSTER sale_newyork_repl_factor_1 USING sale_newyork_repl_factor_1_pkey;
CREATE TABLE set_on_default_test_referenced(
col_1 int, col_2 int, col_3 int, col_4 int,
unique (col_1, col_3)
);
SELECT create_reference_table('set_on_default_test_referenced');
CREATE TABLE set_on_default_test_referencing(
col_1 int, col_2 int, col_3 serial, col_4 int,
FOREIGN KEY(col_1, col_3)
REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT (col_1)
ON UPDATE SET DEFAULT
);
-- should error since col_3 defaults to a sequence
SELECT create_reference_table('set_on_default_test_referencing');
DROP TABLE set_on_default_test_referencing;
CREATE TABLE set_on_default_test_referencing(
col_1 int, col_2 int, col_3 serial, col_4 int,
FOREIGN KEY(col_1, col_3)
REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT (col_1)
);
-- should not error since this doesn't set any sequence based columns to default
SELECT create_reference_table('set_on_default_test_referencing');
INSERT INTO set_on_default_test_referenced (col_1, col_3) VALUES (1, 1);
INSERT INTO set_on_default_test_referencing (col_1, col_3) VALUES (1, 1);
DELETE FROM set_on_default_test_referenced;
SELECT * FROM set_on_default_test_referencing ORDER BY 1,2;
DROP TABLE set_on_default_test_referencing;
SET client_min_messages to ERROR;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupId => 0);
RESET client_min_messages;
-- should error since col_3 defaults to a sequence
CREATE TABLE set_on_default_test_referencing(
col_1 int, col_2 int, col_3 serial, col_4 int,
FOREIGN KEY(col_1, col_3)
REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT (col_3)
);
SELECT 1 FROM citus_remove_node('localhost', :master_port);
-- Clean up
RESET citus.shard_replication_factor;
\set VERBOSITY terse

View File

@ -159,5 +159,44 @@ BEGIN;
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON DELETE SET DEFAULT);
ROLLBACK;
CREATE TABLE set_on_default_test_referenced(
col_1 int, col_2 int, col_3 int, col_4 int,
unique (col_1, col_3)
);
SELECT create_reference_table('set_on_default_test_referenced');
-- from citus local to reference - 1
CREATE TABLE set_on_default_test_referencing(
col_1 int, col_2 int, col_3 serial, col_4 int,
FOREIGN KEY(col_1, col_3)
REFERENCES set_on_default_test_referenced(col_1, col_3)
ON UPDATE SET DEFAULT
);
CREATE TABLE set_on_default_test_referencing(
col_1 serial, col_2 int, col_3 int, col_4 int
);
-- from citus local to reference - 2
ALTER TABLE set_on_default_test_referencing ADD CONSTRAINT fkey
FOREIGN KEY(col_1, col_3) REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT;
DROP TABLE set_on_default_test_referencing, set_on_default_test_referenced;
CREATE TABLE set_on_default_test_referenced(
col_1 int, col_2 int, col_3 int, col_4 int,
unique (col_1, col_3)
);
SELECT citus_add_local_table_to_metadata('set_on_default_test_referenced');
-- from citus local to citus local
CREATE TABLE set_on_default_test_referencing(
col_1 int, col_2 int, col_3 serial, col_4 int,
FOREIGN KEY(col_1, col_3)
REFERENCES set_on_default_test_referenced(col_1, col_3)
ON DELETE SET DEFAULT
);
-- cleanup at exit
DROP SCHEMA ref_citus_local_fkeys CASCADE;