Merge pull request #4457 from citusdata/cascade-udf

* Add infrastructure to cascade citus table functions on foreign keys
* Add cascade_via_foreign_keys option to undistribute_table
pull/4476/head^2
Onur Tirtir 2021-01-07 15:52:08 +03:00 committed by GitHub
commit 47cd1db209
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1281 additions and 48 deletions

View File

@ -0,0 +1,368 @@
/*-------------------------------------------------------------------------
*
* cascade_table_operation_for_connected_relations.c
* Routines to execute citus table functions (e.g undistribute_table,
* create_citus_local_table) by cascading to foreign key connected
* relations.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/xact.h"
#include "catalog/pg_constraint.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/commands.h"
#include "distributed/foreign_key_relationship.h"
#include "distributed/listutils.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/worker_protocol.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
typedef void (*CascadeOperationFunction)(Oid, bool);
static void EnsureSequentialModeForCitusTableCascadeFunction(List *relationIdList);
static bool RelationIdListHasReferenceTable(List *relationIdList);
static void LockRelationsWithLockMode(List *relationIdList, LOCKMODE lockMode);
static List * RemovePartitionRelationIds(List *relationIdList);
static List * GetFKeyCreationCommandsForRelationIdList(List *relationIdList);
static void DropRelationIdListForeignKeys(List *relationIdList);
static void DropRelationForeignKeys(Oid relationId);
static List * GetRelationDropFkeyCommands(Oid relationId);
static char * GetDropFkeyCascadeCommand(Oid relationId, Oid foreignKeyId);
static void ExecuteCascadeOperationForRelationIdList(List *relationIdList,
CascadeOperationType
cascadeOperationType);
static CascadeOperationFunction GetCascadeOperationFunction(CascadeOperationType
cascadeOperationType);
/*
* CascadeOperationForConnectedRelations executes citus table function specified
* by CascadeOperationType argument on each relation that relation
* with relationId is connected via it's foreign key graph, which includes
* input relation itself.
* Also see CascadeOperationType enum definition for supported
* citus table functions.
*/
void
CascadeOperationForConnectedRelations(Oid relationId, LOCKMODE lockMode,
CascadeOperationType
cascadeOperationType)
{
/*
* As we will operate on foreign key connected relations, here we
* invalidate foreign key graph to be on the safe side.
*/
InvalidateForeignKeyGraph();
List *fKeyConnectedRelationIdList = GetForeignKeyConnectedRelationIdList(relationId);
LockRelationsWithLockMode(fKeyConnectedRelationIdList, lockMode);
/*
* We shouldn't cascade through foreign keys on partition tables as citus
* table functions already have their own logics to handle partition relations.
*/
List *nonPartitionRelationIdList =
RemovePartitionRelationIds(fKeyConnectedRelationIdList);
/*
* Our foreign key subgraph can have distributed tables which might already
* be modified in current transaction. So switch to sequential execution
* before executing any ddl's to prevent erroring out later in this function.
*/
EnsureSequentialModeForCitusTableCascadeFunction(nonPartitionRelationIdList);
/* store foreign key creation commands before dropping them */
List *fKeyCreationCommands =
GetFKeyCreationCommandsForRelationIdList(nonPartitionRelationIdList);
/*
* Note that here we only drop referencing foreign keys for each relation.
* This is because referenced foreign keys are already captured as other
* relations' referencing foreign keys.
*/
DropRelationIdListForeignKeys(nonPartitionRelationIdList);
ExecuteCascadeOperationForRelationIdList(nonPartitionRelationIdList,
cascadeOperationType);
/* now recreate foreign keys on tables */
ExecuteAndLogDDLCommandList(fKeyCreationCommands);
}
/*
* LockRelationsWithLockMode sorts given relationIdList and then acquires
* specified lockMode on those relations.
*/
static void
LockRelationsWithLockMode(List *relationIdList, LOCKMODE lockMode)
{
Oid relationId;
relationIdList = SortList(relationIdList, CompareOids);
foreach_oid(relationId, relationIdList)
{
LockRelationOid(relationId, lockMode);
}
}
/*
* RemovePartitionRelationIds returns a list of relation id's by removing
* partition relation id's from given relationIdList.
*/
static List *
RemovePartitionRelationIds(List *relationIdList)
{
List *nonPartitionRelationIdList = NIL;
Oid relationId = InvalidOid;
foreach_oid(relationId, relationIdList)
{
if (PartitionTable(relationId))
{
continue;
}
nonPartitionRelationIdList = lappend_oid(nonPartitionRelationIdList, relationId);
}
return nonPartitionRelationIdList;
}
/*
* EnsureSequentialModeForCitusTableCascadeFunction switches to sequential
* execution mode if needed. If it's not possible, then errors out.
*/
static void
EnsureSequentialModeForCitusTableCascadeFunction(List *relationIdList)
{
if (!RelationIdListHasReferenceTable(relationIdList))
{
/*
* We don't need to switch to sequential execution if there is no
* reference table in our foreign key subgraph.
*/
return;
}
if (ParallelQueryExecutedInTransaction())
{
ereport(ERROR, (errmsg("cannot execute command because there was a parallel "
"operation on a distributed table in transaction"),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
ereport(DEBUG1, (errmsg("switching to sequential query execution mode because the "
"operation cascades into distributed tables with foreign "
"keys to reference tables")));
SetLocalMultiShardModifyModeToSequential();
}
/*
* RelationIdListHasReferenceTable returns true if relationIdList has a relation
* id that belongs to a reference table.
*/
static bool
RelationIdListHasReferenceTable(List *relationIdList)
{
Oid relationId = InvalidOid;
foreach_oid(relationId, relationIdList)
{
if (IsCitusTableType(relationId, REFERENCE_TABLE))
{
return true;
}
}
return false;
}
/*
* GetFKeyCreationCommandsForRelationIdList returns a list of DDL commands to
* create foreign keys for each relation in relationIdList.
*/
static List *
GetFKeyCreationCommandsForRelationIdList(List *relationIdList)
{
List *fKeyCreationCommands = NIL;
Oid relationId = InvalidOid;
foreach_oid(relationId, relationIdList)
{
List *relationFKeyCreationCommands =
GetReferencingForeignConstaintCommands(relationId);
fKeyCreationCommands = list_concat(fKeyCreationCommands,
relationFKeyCreationCommands);
}
return fKeyCreationCommands;
}
/*
* DropRelationIdListForeignKeys drops foreign keys for each relation in given
* relation id list.
*/
static void
DropRelationIdListForeignKeys(List *relationIdList)
{
Oid relationId = InvalidOid;
foreach_oid(relationId, relationIdList)
{
DropRelationForeignKeys(relationId);
}
}
/*
* DropRelationForeignKeys drops foreign keys where the relation with
* relationId is the referencing relation.
*/
static void
DropRelationForeignKeys(Oid relationId)
{
List *dropFkeyCascadeCommandList = GetRelationDropFkeyCommands(relationId);
ExecuteAndLogDDLCommandList(dropFkeyCascadeCommandList);
}
/*
* GetRelationDropFkeyCommands returns a list of DDL commands to drop foreign
* keys where the relation with relationId is the referencing relation.
*/
static List *
GetRelationDropFkeyCommands(Oid relationId)
{
List *dropFkeyCascadeCommandList = NIL;
int flag = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES;
List *relationFKeyIdList = GetForeignKeyOids(relationId, flag);
Oid foreignKeyId;
foreach_oid(foreignKeyId, relationFKeyIdList)
{
char *dropFkeyCascadeCommand = GetDropFkeyCascadeCommand(relationId,
foreignKeyId);
dropFkeyCascadeCommandList = lappend(dropFkeyCascadeCommandList,
dropFkeyCascadeCommand);
}
return dropFkeyCascadeCommandList;
}
/*
* GetDropFkeyCascadeCommand returns DDL command to drop foreign key with
* foreignKeyId.
*/
static char *
GetDropFkeyCascadeCommand(Oid relationId, Oid foreignKeyId)
{
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
char *constraintName = get_constraint_name(foreignKeyId);
const char *quotedConstraintName = quote_identifier(constraintName);
StringInfo dropFkeyCascadeCommand = makeStringInfo();
appendStringInfo(dropFkeyCascadeCommand, "ALTER TABLE %s DROP CONSTRAINT %s CASCADE;",
qualifiedRelationName, quotedConstraintName);
return dropFkeyCascadeCommand->data;
}
/*
* ExecuteCascadeOperationForRelationIdList executes citus table function
* specified by CascadeOperationType argument for given relation id
* list.
*/
static void
ExecuteCascadeOperationForRelationIdList(List *relationIdList,
CascadeOperationType
cascadeOperationType)
{
Oid relationId = InvalidOid;
foreach_oid(relationId, relationIdList)
{
CascadeOperationFunction cascadeOperationFunction =
GetCascadeOperationFunction(cascadeOperationType);
/*
* Caller already passed the relations that we should operate on,
* so we should not cascade here.
*/
bool cascadeViaForeignKeys = false;
cascadeOperationFunction(relationId, cascadeViaForeignKeys);
}
}
/*
* GetCascadeOperationFunction returns c api for citus table operation according
* to given CascadeOperationType.
*/
static CascadeOperationFunction
GetCascadeOperationFunction(CascadeOperationType cascadeOperationType)
{
switch (cascadeOperationType)
{
case UNDISTRIBUTE_TABLE:
{
return UndistributeTable;
}
default:
{
/*
* This is not expected as other create table functions don't have
* cascade option yet. To be on the safe side, error out here.
*/
ereport(ERROR, (errmsg("citus table function could not be found")));
}
}
}
/*
* ExecuteAndLogDDLCommandList takes a list of ddl commands and calls
* ExecuteAndLogDDLCommand function for each of them.
*/
void
ExecuteAndLogDDLCommandList(List *ddlCommandList)
{
char *ddlCommand = NULL;
foreach_ptr(ddlCommand, ddlCommandList)
{
ExecuteAndLogDDLCommand(ddlCommand);
}
}
/*
* ExecuteAndLogDDLCommand takes a ddl command and logs it in DEBUG4 log level.
* Then, parses and executes it via CitusProcessUtility.
*/
void
ExecuteAndLogDDLCommand(const char *commandString)
{
ereport(DEBUG4, (errmsg("executing \"%s\"", commandString)));
Node *parseTree = ParseTreeNode(commandString);
CitusProcessUtility(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL);
}

View File

@ -54,8 +54,6 @@ static char * GetRenameShardIndexCommand(char *indexName, uint64 shardId);
static void RenameShardRelationNonTruncateTriggers(Oid shardRelationId, uint64 shardId);
static char * GetRenameShardTriggerCommand(Oid shardRelationId, char *triggerName,
uint64 shardId);
static void ExecuteAndLogDDLCommandList(List *ddlCommandList);
static void ExecuteAndLogDDLCommand(const char *commandString);
static void DropRelationTruncateTriggers(Oid relationId);
static char * GetDropTriggerCommand(Oid relationId, char *triggerName);
static List * GetExplicitIndexNameList(Oid relationId);
@ -553,36 +551,6 @@ GetRenameShardTriggerCommand(Oid shardRelationId, char *triggerName, uint64 shar
}
/*
* ExecuteAndLogDDLCommandList takes a list of ddl commands and calls
* ExecuteAndLogDDLCommand function for each of them.
*/
static void
ExecuteAndLogDDLCommandList(List *ddlCommandList)
{
char *ddlCommand = NULL;
foreach_ptr(ddlCommand, ddlCommandList)
{
ExecuteAndLogDDLCommand(ddlCommand);
}
}
/*
* ExecuteAndLogDDLCommand takes a ddl command and logs it in DEBUG4 log level.
* Then, parses and executes it via CitusProcessUtility.
*/
static void
ExecuteAndLogDDLCommand(const char *commandString)
{
ereport(DEBUG4, (errmsg("executing \"%s\"", commandString)));
Node *parseTree = ParseTreeNode(commandString);
CitusProcessUtility(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL);
}
/*
* DropRelationTruncateTriggers drops TRUNCATE triggers that are explicitly
* created on relation with relationId.

View File

@ -86,6 +86,11 @@
*/
#define LOG_PER_TUPLE_AMOUNT 1000000
#define UNDISTRIBUTE_TABLE_CASCADE_HINT \
"Use cascade option to undistribute all the relations involved in " \
"a foreign key relationship with %s by executing SELECT " \
"undistribute_table($$%s$$, cascade_via_foreign_keys=>true)"
/* Replication model to use when creating distributed tables */
int ReplicationModel = REPLICATION_MODEL_COORDINATOR;
@ -124,7 +129,6 @@ static void DoCopyFromLocalTableIntoShards(Relation distributedRelation,
DestReceiver *copyDest,
TupleTableSlot *slot,
EState *estate);
static void UndistributeTable(Oid relationId);
static List * GetViewCreationCommandsOfTable(Oid relationId);
static void ReplaceTable(Oid sourceId, Oid targetId);
@ -289,10 +293,11 @@ Datum
undistribute_table(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
bool cascadeViaForeignKeys = PG_GETARG_BOOL(1);
CheckCitusVersion(ERROR);
UndistributeTable(relationId);
UndistributeTable(relationId, cascadeViaForeignKeys);
PG_RETURN_VOID();
}
@ -1552,7 +1557,7 @@ DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
* be dropped.
*/
void
UndistributeTable(Oid relationId)
UndistributeTable(Oid relationId, bool cascadeViaForeignKeys)
{
EnsureCoordinator();
EnsureRelationExists(relationId);
@ -1574,16 +1579,37 @@ UndistributeTable(Oid relationId)
errdetail("because the table is not distributed")));
}
if (TableReferencing(relationId))
bool tableReferencing = TableReferencing(relationId);
bool tableReferenced = TableReferenced(relationId);
if (cascadeViaForeignKeys && (tableReferencing || tableReferenced))
{
ereport(ERROR, (errmsg("cannot undistribute table "
"because it has a foreign key")));
CascadeOperationForConnectedRelations(relationId, lockMode, UNDISTRIBUTE_TABLE);
/*
* Undistributed every foreign key connected relation in our foreign key
* subgraph including itself, so return here.
*/
return;
}
if (TableReferenced(relationId))
if (tableReferencing)
{
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
ereport(ERROR, (errmsg("cannot undistribute table "
"because a foreign key references to it")));
"because it has a foreign key"),
errhint(UNDISTRIBUTE_TABLE_CASCADE_HINT,
qualifiedRelationName,
qualifiedRelationName)));
}
if (tableReferenced)
{
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
ereport(ERROR, (errmsg("cannot undistribute table "
"because a foreign key references to it"),
errhint(UNDISTRIBUTE_TABLE_CASCADE_HINT,
qualifiedRelationName,
qualifiedRelationName)));
}
char relationKind = get_rel_relkind(relationId);
@ -1642,7 +1668,15 @@ UndistributeTable(Oid relationId)
}
preLoadCommands = lappend(preLoadCommands,
makeTableDDLCommandString(attachPartitionCommand));
UndistributeTable(partitionRelationId);
/*
* Even if we called UndistributeTable with cascade option, we
* shouldn't cascade via foreign keys on partitions. Otherwise,
* we might try to undistribute partitions of other tables in
* our foreign key subgraph more than once.
*/
bool cascadeOnPartitionsViaForeignKeys = false;
UndistributeTable(partitionRelationId, cascadeOnPartitionsViaForeignKeys);
}
}

View File

@ -6,5 +6,6 @@ DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass);
#include "udfs/citus_total_relation_size/10.0-1.sql"
#include "udfs/citus_tables/10.0-1.sql"
#include "udfs/citus_finish_pg_upgrade/10.0-1.sql"
#include "udfs/undistribute_table/10.0-1.sql"
#include "../../columnar/sql/columnar--9.5-1--10.0-1.sql"

View File

@ -7,6 +7,8 @@
DROP VIEW public.citus_tables;
DROP FUNCTION pg_catalog.citus_total_relation_size(regclass,boolean);
DROP FUNCTION pg_catalog.undistribute_table(regclass,boolean);
#include "../udfs/citus_total_relation_size/7.0-1.sql"
#include "../udfs/upgrade_to_reference_table/8.0-1.sql"
#include "../udfs/undistribute_table/9.5-1.sql"

View File

@ -0,0 +1,10 @@
DROP FUNCTION pg_catalog.undistribute_table(regclass);
CREATE OR REPLACE FUNCTION pg_catalog.undistribute_table(
table_name regclass, cascade_via_foreign_keys boolean default false)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$undistribute_table$$;
COMMENT ON FUNCTION pg_catalog.undistribute_table(
table_name regclass, cascade_via_foreign_keys boolean)
IS 'undistributes a distributed table';

View File

@ -1,9 +1,10 @@
DROP FUNCTION pg_catalog.undistribute_table(regclass);
CREATE OR REPLACE FUNCTION pg_catalog.undistribute_table(
table_name regclass)
table_name regclass, cascade_via_foreign_keys boolean default false)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$undistribute_table$$;
COMMENT ON FUNCTION pg_catalog.undistribute_table(
table_name regclass)
table_name regclass, cascade_via_foreign_keys boolean)
IS 'undistributes a distributed table';

View File

@ -387,6 +387,26 @@ extern List * CitusLocalTableTriggerCommandDDLJob(Oid relationId, char *triggerN
const char *queryString);
extern Oid GetTriggerFunctionId(Oid triggerId);
/* cascade_citus_table_function.c */
/*
* Flags that can be passed to CascadeOperationForConnectedRelations to specify
* citus table function to be executed in cascading mode.
*/
typedef enum CascadeOperationType
{
INVALID_OPERATION = 1 << 0,
/* execute UndistributeTable on each relation */
UNDISTRIBUTE_TABLE = 1 << 1,
} CascadeOperationType;
extern void CascadeOperationForConnectedRelations(Oid relationId, LOCKMODE relLockMode,
CascadeOperationType
cascadeOperationType);
extern void ExecuteAndLogDDLCommandList(List *ddlCommandList);
extern void ExecuteAndLogDDLCommand(const char *commandString);
extern bool ShouldPropagateSetCommand(VariableSetStmt *setStmt);
extern void PostprocessVariableSetStmt(VariableSetStmt *setStmt, const char *setCommand);

View File

@ -138,6 +138,7 @@ extern void CreateDistributedTable(Oid relationId, Var *distributionColumn,
char distributionMethod, char *colocateWithTableName,
bool viaDeprecatedAPI);
extern void CreateTruncateTrigger(Oid relationId);
extern void UndistributeTable(Oid relationId, bool cascadeViaForeignKeys);
extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
extern List * GetDistributableDependenciesForObject(const ObjectAddress *target);

View File

@ -446,6 +446,7 @@ SELECT * FROM print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
function citus_total_relation_size(regclass) |
function undistribute_table(regclass) |
function upgrade_to_reference_table(regclass) |
| access method columnar
| function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean)
@ -453,13 +454,14 @@ SELECT * FROM print_extension_changes();
| function citus_internal.columnar_ensure_objects_exist()
| function citus_total_relation_size(regclass,boolean)
| function columnar.columnar_handler(internal)
| function undistribute_table(regclass,boolean)
| schema columnar
| sequence columnar.storageid_seq
| table columnar.columnar_skipnodes
| table columnar.columnar_stripes
| table columnar.options
| view citus_tables
(14 rows)
(16 rows)
DROP TABLE prev_objects, extension_diff;
-- show running version

View File

@ -446,16 +446,18 @@ SELECT * FROM print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
function citus_total_relation_size(regclass) |
function undistribute_table(regclass) |
function upgrade_to_reference_table(regclass) |
| function citus_internal.columnar_ensure_objects_exist()
| function citus_total_relation_size(regclass,boolean)
| function undistribute_table(regclass,boolean)
| schema columnar
| sequence columnar.storageid_seq
| table columnar.columnar_skipnodes
| table columnar.columnar_stripes
| table columnar.options
| view citus_tables
(10 rows)
(12 rows)
DROP TABLE prev_objects, extension_diff;
-- show running version

View File

@ -1019,6 +1019,72 @@ SELECT * FROM pg_dist_partition WHERE logicalrelid = 'test_2'::regclass;
---------------------------------------------------------------------
(0 rows)
CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1));
SELECT create_reference_table('reference_table_1');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE distributed_table_1 (col_1 INT UNIQUE);
SELECT create_distributed_table('distributed_table_1', 'col_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE);
SELECT create_citus_local_table('citus_local_table_1');
create_citus_local_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200);
CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300);
SELECT create_distributed_table('partitioned_table_1', 'col_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1);
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
SELECT undistribute_table('partitioned_table_1', cascade_via_foreign_keys=>true);
NOTICE: undistributing the partitions of single_node.partitioned_table_1
NOTICE: creating a new local table for single_node.partitioned_table_1_100_200
NOTICE: Moving the data of single_node.partitioned_table_1_100_200
NOTICE: Dropping the old single_node.partitioned_table_1_100_200
NOTICE: Renaming the new table to single_node.partitioned_table_1_100_200
NOTICE: creating a new local table for single_node.partitioned_table_1_200_300
NOTICE: Moving the data of single_node.partitioned_table_1_200_300
NOTICE: Dropping the old single_node.partitioned_table_1_200_300
NOTICE: Renaming the new table to single_node.partitioned_table_1_200_300
NOTICE: creating a new local table for single_node.partitioned_table_1
NOTICE: Moving the data of single_node.partitioned_table_1
NOTICE: Dropping the old single_node.partitioned_table_1
NOTICE: Renaming the new table to single_node.partitioned_table_1
NOTICE: creating a new local table for single_node.reference_table_1
NOTICE: Moving the data of single_node.reference_table_1
NOTICE: Dropping the old single_node.reference_table_1
NOTICE: Renaming the new table to single_node.reference_table_1
NOTICE: creating a new local table for single_node.distributed_table_1
NOTICE: Moving the data of single_node.distributed_table_1
NOTICE: Dropping the old single_node.distributed_table_1
NOTICE: Renaming the new table to single_node.distributed_table_1
NOTICE: creating a new local table for single_node.citus_local_table_1
NOTICE: Moving the data of single_node.citus_local_table_1
NOTICE: Dropping the old single_node.citus_local_table_1
NOTICE: Renaming the new table to single_node.citus_local_table_1
undistribute_table
---------------------------------------------------------------------
(1 row)
CREATE PROCEDURE call_delegation(x int) LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO test (x) VALUES ($1);

View File

@ -123,8 +123,10 @@ SELECT create_distributed_table('referencing_table', 'id');
INSERT INTO referencing_table VALUES (4, 6, 'cba'), (1, 1, 'dcba'), (2, 3, 'aaa');
SELECT undistribute_table('referenced_table');
ERROR: cannot undistribute table because a foreign key references to it
HINT: Use cascade option to undistribute all the relations involved in a foreign key relationship with undistribute_table.referenced_table by executing SELECT undistribute_table($$undistribute_table.referenced_table$$, cascade_via_foreign_keys=>true)
SELECT undistribute_table('referencing_table');
ERROR: cannot undistribute table because it has a foreign key
HINT: Use cascade option to undistribute all the relations involved in a foreign key relationship with undistribute_table.referencing_table by executing SELECT undistribute_table($$undistribute_table.referencing_table$$, cascade_via_foreign_keys=>true)
DROP TABLE referenced_table, referencing_table;
-- test distributed foreign tables
-- we expect errors

View File

@ -0,0 +1,381 @@
\set VERBOSITY terse
SET citus.next_shard_id TO 1515000;
SET citus.shard_replication_factor TO 1;
CREATE SCHEMA undistribute_table_cascade;
SET search_path TO undistribute_table_cascade;
SET client_min_messages to ERROR;
-- ensure that coordinator is added to pg_dist_node
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1));
CREATE TABLE reference_table_2 (col_1 INT UNIQUE, col_2 INT UNIQUE);
SELECT create_reference_table('reference_table_1');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('reference_table_2');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE distributed_table_1 (col_1 INT UNIQUE);
CREATE TABLE distributed_table_2 (col_1 INT UNIQUE);
CREATE TABLE distributed_table_3 (col_1 INT UNIQUE);
SELECT create_distributed_table('distributed_table_1', 'col_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('distributed_table_2', 'col_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('distributed_table_3', 'col_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE);
CREATE TABLE citus_local_table_2 (col_1 INT UNIQUE);
SELECT create_citus_local_table('citus_local_table_1');
create_citus_local_table
---------------------------------------------------------------------
(1 row)
SELECT create_citus_local_table('citus_local_table_2');
create_citus_local_table
---------------------------------------------------------------------
(1 row)
-- --- ---
-- | | | |
-- | v | v
-- distributed_table_2 -> distributed_table_1 -> reference_table_1 <- reference_table_2
-- ^ | ^ |
-- v | | v
-- distributed_table_3 <------------ citus_local_table_1 citus_local_table_2
--
ALTER TABLE distributed_table_3 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES distributed_table_2(col_1);
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_1) REFERENCES distributed_table_3(col_1);
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES distributed_table_1(col_1);
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1, col_2) REFERENCES reference_table_1(col_2, col_1);
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_6 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_7 FOREIGN KEY (col_1) REFERENCES citus_local_table_2(col_1);
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_8 FOREIGN KEY (col_1) REFERENCES distributed_table_3(col_1);
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_11 FOREIGN KEY (col_1) REFERENCES distributed_table_2(col_1);
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_12 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1);
-- show that all of below fails as we didn't provide cascade=true
SELECT undistribute_table('distributed_table_1');
ERROR: cannot undistribute table because it has a foreign key
SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>false);
ERROR: cannot undistribute table because it has a foreign key
SELECT undistribute_table('reference_table_2');
ERROR: cannot undistribute table because it has a foreign key
-- In each of below transation blocks, show that we preserve foreign keys.
-- Also show that we don't have any citus tables in current schema after
-- undistribute_table(cascade).
-- So in each transaction, both selects should return true.
BEGIN;
SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true);
undistribute_table
---------------------------------------------------------------------
(1 row)
-- show that we switch to sequential execution as there are
-- reference tables in our subgraph
show citus.multi_shard_modify_mode;
citus.multi_shard_modify_mode
---------------------------------------------------------------------
sequential
(1 row)
SELECT COUNT(*)=10 FROM pg_constraint
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND
conname ~ '^fkey\_\d+$';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
WHERE tablename=logicalrelid::regclass::text AND
schemaname='undistribute_table_cascade';
?column?
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
BEGIN;
SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true);
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT COUNT(*)=10 FROM pg_constraint
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND
conname ~ '^fkey\_\d+$';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
WHERE tablename=logicalrelid::regclass::text AND
schemaname='undistribute_table_cascade';
?column?
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
BEGIN;
SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>true);
undistribute_table
---------------------------------------------------------------------
(1 row)
-- print foreign keys only in one of xact blocks not to make tests too verbose
SELECT conname, conrelid::regclass, confrelid::regclass
FROM pg_constraint
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND
conname ~ '^fkey\_\d+$'
ORDER BY conname;
conname | conrelid | confrelid
---------------------------------------------------------------------
fkey_1 | distributed_table_3 | distributed_table_2
fkey_11 | distributed_table_2 | distributed_table_2
fkey_12 | reference_table_1 | reference_table_1
fkey_2 | distributed_table_2 | distributed_table_3
fkey_3 | distributed_table_2 | distributed_table_1
fkey_4 | distributed_table_1 | reference_table_1
fkey_5 | reference_table_2 | reference_table_1
fkey_6 | citus_local_table_1 | reference_table_1
fkey_7 | reference_table_2 | citus_local_table_2
fkey_8 | distributed_table_1 | distributed_table_3
(10 rows)
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
WHERE tablename=logicalrelid::regclass::text AND
schemaname='undistribute_table_cascade';
?column?
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
BEGIN;
SELECT COUNT(*) FROM distributed_table_1;
count
---------------------------------------------------------------------
0
(1 row)
-- show that we error out as select is executed in parallel mode
-- and there are reference tables in our subgraph
SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true);
ERROR: cannot execute command because there was a parallel operation on a distributed table in transaction
ROLLBACK;
BEGIN;
set citus.multi_shard_modify_mode to 'sequential';
SELECT COUNT(*) FROM distributed_table_1;
count
---------------------------------------------------------------------
0
(1 row)
-- even if there are reference tables in our subgraph, show that
-- we don't error out as we already switched to sequential execution
SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true);
undistribute_table
---------------------------------------------------------------------
(1 row)
ROLLBACK;
ALTER TABLE distributed_table_1 DROP CONSTRAINT fkey_4;
BEGIN;
SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true);
undistribute_table
---------------------------------------------------------------------
(1 row)
-- as we splitted distributed_table_1,2 & 3 into a seperate subgraph
-- by dropping reference_table_1, we should not switch to sequential
-- execution
show citus.multi_shard_modify_mode;
citus.multi_shard_modify_mode
---------------------------------------------------------------------
parallel
(1 row)
ROLLBACK;
-- split distributed_table_2 & distributed_table_3 into a seperate foreign
-- key subgraph then undistribute them
ALTER TABLE distributed_table_2 DROP CONSTRAINT fkey_3;
ALTER TABLE distributed_table_1 DROP CONSTRAINT fkey_8;
SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true);
undistribute_table
---------------------------------------------------------------------
(1 row)
-- should return true as we undistributed those two tables
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
WHERE tablename=logicalrelid::regclass::text AND
schemaname='undistribute_table_cascade' AND
(tablename='distributed_table_2' OR tablename='distributed_table_3');
?column?
---------------------------------------------------------------------
t
(1 row)
-- other tables should stay as is since we splited those two tables
SELECT COUNT(*)=5 FROM pg_dist_partition, pg_tables
WHERE tablename=logicalrelid::regclass::text AND
schemaname='undistribute_table_cascade';
?column?
---------------------------------------------------------------------
t
(1 row)
-- test partitioned tables
CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200);
CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300);
SELECT create_distributed_table('partitioned_table_1', 'col_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE partitioned_table_2 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
CREATE TABLE partitioned_table_2_100_200 PARTITION OF partitioned_table_2 FOR VALUES FROM (100) TO (200);
CREATE TABLE partitioned_table_2_200_300 PARTITION OF partitioned_table_2 FOR VALUES FROM (200) TO (300);
SELECT create_distributed_table('partitioned_table_2', 'col_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE reference_table_3 (col_1 INT UNIQUE, col_2 INT UNIQUE);
SELECT create_reference_table('reference_table_3');
create_reference_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_9 FOREIGN KEY (col_1) REFERENCES reference_table_3(col_2);
ALTER TABLE partitioned_table_2 ADD CONSTRAINT fkey_10 FOREIGN KEY (col_1) REFERENCES reference_table_3(col_2);
BEGIN;
SELECT undistribute_table('partitioned_table_2', cascade_via_foreign_keys=>true);
undistribute_table
---------------------------------------------------------------------
(1 row)
-- show that we preserve foreign keys on partitions too
SELECT conname, conrelid::regclass, confrelid::regclass
FROM pg_constraint
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND
conname = 'fkey_9' OR conname = 'fkey_10'
ORDER BY 1,2,3;
conname | conrelid | confrelid
---------------------------------------------------------------------
fkey_10 | partitioned_table_2_100_200 | reference_table_3
fkey_10 | partitioned_table_2_200_300 | reference_table_3
fkey_10 | partitioned_table_2 | reference_table_3
fkey_9 | partitioned_table_1_100_200 | reference_table_3
fkey_9 | partitioned_table_1_200_300 | reference_table_3
fkey_9 | partitioned_table_1 | reference_table_3
(6 rows)
ROLLBACK;
-- as pg < 12 doesn't support foreign keys between partitioned tables,
-- define below foreign key conditionally instead of adding another
-- test output
DO $proc$
BEGIN
IF substring(current_Setting('server_version'), '\d+')::int >= 12 THEN
EXECUTE
$$
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_13 FOREIGN KEY (col_1) REFERENCES partitioned_table_2(col_1);
$$;
END IF;
END$proc$;
BEGIN;
-- For pg versions 11, 12 & 13, partitioned_table_1 references to reference_table_3
-- and partitioned_table_2 references to reference_table_3.
-- For pg versions > 11, partitioned_table_1 references to partitioned_table_2 as well.
-- Anyway show that undistribute_table with cascade is fine.
SELECT undistribute_table('partitioned_table_2', cascade_via_foreign_keys=>true);
undistribute_table
---------------------------------------------------------------------
(1 row)
ROLLBACK;
-- now merge partitioned_table_1, 2 and reference_table_3 into right
-- hand-side of the graph
ALTER TABLE reference_table_3 ADD CONSTRAINT fkey_14 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
BEGIN;
SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>true);
undistribute_table
---------------------------------------------------------------------
(1 row)
-- undistributing citus_local_table_1 cascades to partitioned tables too
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
WHERE tablename=logicalrelid::regclass::text AND
schemaname='undistribute_table_cascade' AND
tablename LIKE 'partitioned_table_%';
?column?
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
CREATE SCHEMA "bad!schemaName";
CREATE TABLE "bad!schemaName"."LocalTabLE.1!?!"(col_1 INT UNIQUE);
CREATE TABLE "bad!schemaName"."LocalTabLE.2!?!"(col_1 INT UNIQUE);
SELECT create_citus_local_table('"bad!schemaName"."LocalTabLE.1!?!"');
create_citus_local_table
---------------------------------------------------------------------
(1 row)
SELECT create_citus_local_table('"bad!schemaName"."LocalTabLE.2!?!"');
create_citus_local_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE "bad!schemaName"."LocalTabLE.1!?!" ADD CONSTRAINT "bad!constraintName" FOREIGN KEY (col_1) REFERENCES "bad!schemaName"."LocalTabLE.2!?!"(col_1);
-- test with weird schema, table & constraint names
SELECT undistribute_table('"bad!schemaName"."LocalTabLE.1!?!"', cascade_via_foreign_keys=>true);
undistribute_table
---------------------------------------------------------------------
(1 row)
-- cleanup at exit
DROP SCHEMA undistribute_table_cascade, "bad!schemaName" CASCADE;

View File

@ -0,0 +1,81 @@
\set VERBOSITY terse
SET citus.next_shard_id TO 1517000;
SET citus.shard_replication_factor TO 1;
SET citus.enable_local_execution TO ON;
CREATE SCHEMA undistribute_table_cascade_mx;
SET search_path TO undistribute_table_cascade_mx;
SET client_min_messages to ERROR;
-- ensure that coordinator is added to pg_dist_node
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
-- ensure that we sync metadata to worker 1 & 2
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1));
SELECT create_reference_table('reference_table_1');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE distributed_table_1 (col_1 INT UNIQUE);
SELECT create_distributed_table('distributed_table_1', 'col_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE);
SELECT create_citus_local_table('citus_local_table_1');
create_citus_local_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200);
CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300);
SELECT create_distributed_table('partitioned_table_1', 'col_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1);
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
SELECT undistribute_table('partitioned_table_1', cascade_via_foreign_keys=>true);
undistribute_table
---------------------------------------------------------------------
(1 row)
-- both workers should print 0 as we undistributed all relations in this schema
SELECT run_command_on_workers(
$$
SELECT count(*) FROM pg_catalog.pg_tables WHERE schemaname='undistribute_table_cascade_mx'
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,0)
(localhost,57638,t,0)
(2 rows)
-- cleanup at exit
DROP SCHEMA undistribute_table_cascade_mx CASCADE;

View File

@ -153,7 +153,7 @@ ORDER BY 1;
function start_metadata_sync_to_node(text,integer)
function stop_metadata_sync_to_node(text,integer)
function truncate_local_data_after_distributing_table(regclass)
function undistribute_table(regclass)
function undistribute_table(regclass,boolean)
function update_distributed_table_colocation(regclass,text)
function worker_append_table_to_shard(text,text,text,integer)
function worker_apply_inter_shard_ddl_command(bigint,text,bigint,text,text)

View File

@ -149,7 +149,7 @@ ORDER BY 1;
function start_metadata_sync_to_node(text,integer)
function stop_metadata_sync_to_node(text,integer)
function truncate_local_data_after_distributing_table(regclass)
function undistribute_table(regclass)
function undistribute_table(regclass,boolean)
function update_distributed_table_colocation(regclass,text)
function worker_append_table_to_shard(text,text,text,integer)
function worker_apply_inter_shard_ddl_command(bigint,text,bigint,text,text)

View File

@ -42,6 +42,7 @@ test: multi_mx_function_call_delegation
test: multi_mx_modifications local_shard_execution
test: multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
test: local_shard_copy
test: undistribute_table_cascade_mx
test: citus_local_tables_mx
test: citus_local_tables_queries_mx
test: multi_mx_transaction_recovery

View File

@ -322,7 +322,7 @@ test: replicate_reference_tables_to_coordinator
test: coordinator_shouldhaveshards
test: local_shard_utility_command_execution
test: citus_local_tables
test: multi_row_router_insert mixed_relkind_tests
test: multi_row_router_insert mixed_relkind_tests undistribute_table_cascade
test: remove_coordinator

View File

@ -576,6 +576,28 @@ ALTER TABLE test DROP CONSTRAINT foreign_key;
SELECT undistribute_table('test_2');
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'test_2'::regclass;
CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1));
SELECT create_reference_table('reference_table_1');
CREATE TABLE distributed_table_1 (col_1 INT UNIQUE);
SELECT create_distributed_table('distributed_table_1', 'col_1');
CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE);
SELECT create_citus_local_table('citus_local_table_1');
CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200);
CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300);
SELECT create_distributed_table('partitioned_table_1', 'col_1');
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1);
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
SELECT undistribute_table('partitioned_table_1', cascade_via_foreign_keys=>true);
CREATE PROCEDURE call_delegation(x int) LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO test (x) VALUES ($1);

View File

@ -0,0 +1,223 @@
\set VERBOSITY terse
SET citus.next_shard_id TO 1515000;
SET citus.shard_replication_factor TO 1;
CREATE SCHEMA undistribute_table_cascade;
SET search_path TO undistribute_table_cascade;
SET client_min_messages to ERROR;
-- ensure that coordinator is added to pg_dist_node
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1));
CREATE TABLE reference_table_2 (col_1 INT UNIQUE, col_2 INT UNIQUE);
SELECT create_reference_table('reference_table_1');
SELECT create_reference_table('reference_table_2');
CREATE TABLE distributed_table_1 (col_1 INT UNIQUE);
CREATE TABLE distributed_table_2 (col_1 INT UNIQUE);
CREATE TABLE distributed_table_3 (col_1 INT UNIQUE);
SELECT create_distributed_table('distributed_table_1', 'col_1');
SELECT create_distributed_table('distributed_table_2', 'col_1');
SELECT create_distributed_table('distributed_table_3', 'col_1');
CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE);
CREATE TABLE citus_local_table_2 (col_1 INT UNIQUE);
SELECT create_citus_local_table('citus_local_table_1');
SELECT create_citus_local_table('citus_local_table_2');
-- --- ---
-- | | | |
-- | v | v
-- distributed_table_2 -> distributed_table_1 -> reference_table_1 <- reference_table_2
-- ^ | ^ |
-- v | | v
-- distributed_table_3 <------------ citus_local_table_1 citus_local_table_2
--
ALTER TABLE distributed_table_3 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES distributed_table_2(col_1);
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_1) REFERENCES distributed_table_3(col_1);
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES distributed_table_1(col_1);
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1, col_2) REFERENCES reference_table_1(col_2, col_1);
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_6 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_7 FOREIGN KEY (col_1) REFERENCES citus_local_table_2(col_1);
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_8 FOREIGN KEY (col_1) REFERENCES distributed_table_3(col_1);
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_11 FOREIGN KEY (col_1) REFERENCES distributed_table_2(col_1);
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_12 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1);
-- show that all of below fails as we didn't provide cascade=true
SELECT undistribute_table('distributed_table_1');
SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>false);
SELECT undistribute_table('reference_table_2');
-- In each of below transation blocks, show that we preserve foreign keys.
-- Also show that we don't have any citus tables in current schema after
-- undistribute_table(cascade).
-- So in each transaction, both selects should return true.
BEGIN;
SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true);
-- show that we switch to sequential execution as there are
-- reference tables in our subgraph
show citus.multi_shard_modify_mode;
SELECT COUNT(*)=10 FROM pg_constraint
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND
conname ~ '^fkey\_\d+$';
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
WHERE tablename=logicalrelid::regclass::text AND
schemaname='undistribute_table_cascade';
ROLLBACK;
BEGIN;
SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true);
SELECT COUNT(*)=10 FROM pg_constraint
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND
conname ~ '^fkey\_\d+$';
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
WHERE tablename=logicalrelid::regclass::text AND
schemaname='undistribute_table_cascade';
ROLLBACK;
BEGIN;
SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>true);
-- print foreign keys only in one of xact blocks not to make tests too verbose
SELECT conname, conrelid::regclass, confrelid::regclass
FROM pg_constraint
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND
conname ~ '^fkey\_\d+$'
ORDER BY conname;
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
WHERE tablename=logicalrelid::regclass::text AND
schemaname='undistribute_table_cascade';
ROLLBACK;
BEGIN;
SELECT COUNT(*) FROM distributed_table_1;
-- show that we error out as select is executed in parallel mode
-- and there are reference tables in our subgraph
SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true);
ROLLBACK;
BEGIN;
set citus.multi_shard_modify_mode to 'sequential';
SELECT COUNT(*) FROM distributed_table_1;
-- even if there are reference tables in our subgraph, show that
-- we don't error out as we already switched to sequential execution
SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true);
ROLLBACK;
ALTER TABLE distributed_table_1 DROP CONSTRAINT fkey_4;
BEGIN;
SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true);
-- as we splitted distributed_table_1,2 & 3 into a seperate subgraph
-- by dropping reference_table_1, we should not switch to sequential
-- execution
show citus.multi_shard_modify_mode;
ROLLBACK;
-- split distributed_table_2 & distributed_table_3 into a seperate foreign
-- key subgraph then undistribute them
ALTER TABLE distributed_table_2 DROP CONSTRAINT fkey_3;
ALTER TABLE distributed_table_1 DROP CONSTRAINT fkey_8;
SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true);
-- should return true as we undistributed those two tables
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
WHERE tablename=logicalrelid::regclass::text AND
schemaname='undistribute_table_cascade' AND
(tablename='distributed_table_2' OR tablename='distributed_table_3');
-- other tables should stay as is since we splited those two tables
SELECT COUNT(*)=5 FROM pg_dist_partition, pg_tables
WHERE tablename=logicalrelid::regclass::text AND
schemaname='undistribute_table_cascade';
-- test partitioned tables
CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200);
CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300);
SELECT create_distributed_table('partitioned_table_1', 'col_1');
CREATE TABLE partitioned_table_2 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
CREATE TABLE partitioned_table_2_100_200 PARTITION OF partitioned_table_2 FOR VALUES FROM (100) TO (200);
CREATE TABLE partitioned_table_2_200_300 PARTITION OF partitioned_table_2 FOR VALUES FROM (200) TO (300);
SELECT create_distributed_table('partitioned_table_2', 'col_1');
CREATE TABLE reference_table_3 (col_1 INT UNIQUE, col_2 INT UNIQUE);
SELECT create_reference_table('reference_table_3');
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_9 FOREIGN KEY (col_1) REFERENCES reference_table_3(col_2);
ALTER TABLE partitioned_table_2 ADD CONSTRAINT fkey_10 FOREIGN KEY (col_1) REFERENCES reference_table_3(col_2);
BEGIN;
SELECT undistribute_table('partitioned_table_2', cascade_via_foreign_keys=>true);
-- show that we preserve foreign keys on partitions too
SELECT conname, conrelid::regclass, confrelid::regclass
FROM pg_constraint
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND
conname = 'fkey_9' OR conname = 'fkey_10'
ORDER BY 1,2,3;
ROLLBACK;
-- as pg < 12 doesn't support foreign keys between partitioned tables,
-- define below foreign key conditionally instead of adding another
-- test output
DO $proc$
BEGIN
IF substring(current_Setting('server_version'), '\d+')::int >= 12 THEN
EXECUTE
$$
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_13 FOREIGN KEY (col_1) REFERENCES partitioned_table_2(col_1);
$$;
END IF;
END$proc$;
BEGIN;
-- For pg versions 11, 12 & 13, partitioned_table_1 references to reference_table_3
-- and partitioned_table_2 references to reference_table_3.
-- For pg versions > 11, partitioned_table_1 references to partitioned_table_2 as well.
-- Anyway show that undistribute_table with cascade is fine.
SELECT undistribute_table('partitioned_table_2', cascade_via_foreign_keys=>true);
ROLLBACK;
-- now merge partitioned_table_1, 2 and reference_table_3 into right
-- hand-side of the graph
ALTER TABLE reference_table_3 ADD CONSTRAINT fkey_14 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
BEGIN;
SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>true);
-- undistributing citus_local_table_1 cascades to partitioned tables too
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
WHERE tablename=logicalrelid::regclass::text AND
schemaname='undistribute_table_cascade' AND
tablename LIKE 'partitioned_table_%';
ROLLBACK;
CREATE SCHEMA "bad!schemaName";
CREATE TABLE "bad!schemaName"."LocalTabLE.1!?!"(col_1 INT UNIQUE);
CREATE TABLE "bad!schemaName"."LocalTabLE.2!?!"(col_1 INT UNIQUE);
SELECT create_citus_local_table('"bad!schemaName"."LocalTabLE.1!?!"');
SELECT create_citus_local_table('"bad!schemaName"."LocalTabLE.2!?!"');
ALTER TABLE "bad!schemaName"."LocalTabLE.1!?!" ADD CONSTRAINT "bad!constraintName" FOREIGN KEY (col_1) REFERENCES "bad!schemaName"."LocalTabLE.2!?!"(col_1);
-- test with weird schema, table & constraint names
SELECT undistribute_table('"bad!schemaName"."LocalTabLE.1!?!"', cascade_via_foreign_keys=>true);
-- cleanup at exit
DROP SCHEMA undistribute_table_cascade, "bad!schemaName" CASCADE;

View File

@ -0,0 +1,48 @@
\set VERBOSITY terse
SET citus.next_shard_id TO 1517000;
SET citus.shard_replication_factor TO 1;
SET citus.enable_local_execution TO ON;
CREATE SCHEMA undistribute_table_cascade_mx;
SET search_path TO undistribute_table_cascade_mx;
SET client_min_messages to ERROR;
-- ensure that coordinator is added to pg_dist_node
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
-- ensure that we sync metadata to worker 1 & 2
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_1_port);
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1));
SELECT create_reference_table('reference_table_1');
CREATE TABLE distributed_table_1 (col_1 INT UNIQUE);
SELECT create_distributed_table('distributed_table_1', 'col_1');
CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE);
SELECT create_citus_local_table('citus_local_table_1');
CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200);
CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300);
SELECT create_distributed_table('partitioned_table_1', 'col_1');
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1);
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
SELECT undistribute_table('partitioned_table_1', cascade_via_foreign_keys=>true);
-- both workers should print 0 as we undistributed all relations in this schema
SELECT run_command_on_workers(
$$
SELECT count(*) FROM pg_catalog.pg_tables WHERE schemaname='undistribute_table_cascade_mx'
$$);
-- cleanup at exit
DROP SCHEMA undistribute_table_cascade_mx CASCADE;