From 2e3e680ba90e100fd5af01f79c4b5e8077f18342 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Tue, 29 Dec 2020 16:00:25 +0300 Subject: [PATCH 1/2] Add infra to cascade citus table functions --- ..._table_operation_for_connected_relations.c | 363 ++++++++++++++++++ .../commands/create_citus_local_table.c | 32 -- src/include/distributed/commands.h | 17 + 3 files changed, 380 insertions(+), 32 deletions(-) create mode 100644 src/backend/distributed/commands/cascade_table_operation_for_connected_relations.c diff --git a/src/backend/distributed/commands/cascade_table_operation_for_connected_relations.c b/src/backend/distributed/commands/cascade_table_operation_for_connected_relations.c new file mode 100644 index 000000000..d3d60a9d9 --- /dev/null +++ b/src/backend/distributed/commands/cascade_table_operation_for_connected_relations.c @@ -0,0 +1,363 @@ +/*------------------------------------------------------------------------- + * + * cascade_table_operation_for_connected_relations.c + * Routines to execute citus table functions (e.g undistribute_table, + * create_citus_local_table) by cascading to foreign key connected + * relations. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "catalog/pg_constraint.h" +#include "distributed/commands/utility_hook.h" +#include "distributed/commands.h" +#include "distributed/foreign_key_relationship.h" +#include "distributed/listutils.h" +#include "distributed/multi_executor.h" +#include "distributed/multi_partitioning_utils.h" +#include "distributed/reference_table_utils.h" +#include "distributed/relation_access_tracking.h" +#include "distributed/worker_protocol.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" + + +typedef void (*CascadeOperationFunction)(Oid, bool); + + +static void EnsureSequentialModeForCitusTableCascadeFunction(List *relationIdList); +static bool RelationIdListHasReferenceTable(List *relationIdList); +static void LockRelationsWithLockMode(List *relationIdList, LOCKMODE lockMode); +static List * RemovePartitionRelationIds(List *relationIdList); +static List * GetFKeyCreationCommandsForRelationIdList(List *relationIdList); +static void DropRelationIdListForeignKeys(List *relationIdList); +static void DropRelationForeignKeys(Oid relationId); +static List * GetRelationDropFkeyCommands(Oid relationId); +static char * GetDropFkeyCascadeCommand(Oid relationId, Oid foreignKeyId); +static void ExecuteCascadeOperationForRelationIdList(List *relationIdList, + CascadeOperationType + cascadeOperationType); +static CascadeOperationFunction GetCascadeOperationFunction(CascadeOperationType + cascadeOperationType); + + +/* + * CascadeOperationForConnectedRelations executes citus table function specified + * by CascadeOperationType argument on each relation that relation + * with relationId is connected via it's foreign key graph, which includes + * input relation itself. + * Also see CascadeOperationType enum definition for supported + * citus table functions. + */ +void +CascadeOperationForConnectedRelations(Oid relationId, LOCKMODE lockMode, + CascadeOperationType + cascadeOperationType) +{ + /* + * As we will operate on foreign key connected relations, here we + * invalidate foreign key graph to be on the safe side. + */ + InvalidateForeignKeyGraph(); + + List *fKeyConnectedRelationIdList = GetForeignKeyConnectedRelationIdList(relationId); + LockRelationsWithLockMode(fKeyConnectedRelationIdList, lockMode); + + /* + * We shouldn't cascade through foreign keys on partition tables as citus + * table functions already have their own logics to handle partition relations. + */ + List *nonPartitionRelationIdList = + RemovePartitionRelationIds(fKeyConnectedRelationIdList); + + /* + * Our foreign key subgraph can have distributed tables which might already + * be modified in current transaction. So switch to sequential execution + * before executing any ddl's to prevent erroring out later in this function. + */ + EnsureSequentialModeForCitusTableCascadeFunction(nonPartitionRelationIdList); + + /* store foreign key creation commands before dropping them */ + List *fKeyCreationCommands = + GetFKeyCreationCommandsForRelationIdList(nonPartitionRelationIdList); + + /* + * Note that here we only drop referencing foreign keys for each relation. + * This is because referenced foreign keys are already captured as other + * relations' referencing foreign keys. + */ + DropRelationIdListForeignKeys(nonPartitionRelationIdList); + ExecuteCascadeOperationForRelationIdList(nonPartitionRelationIdList, + cascadeOperationType); + + /* now recreate foreign keys on tables */ + ExecuteAndLogDDLCommandList(fKeyCreationCommands); +} + + +/* + * LockRelationsWithLockMode sorts given relationIdList and then acquires + * specified lockMode on those relations. + */ +static void +LockRelationsWithLockMode(List *relationIdList, LOCKMODE lockMode) +{ + Oid relationId; + relationIdList = SortList(relationIdList, CompareOids); + foreach_oid(relationId, relationIdList) + { + LockRelationOid(relationId, lockMode); + } +} + + +/* + * RemovePartitionRelationIds returns a list of relation id's by removing + * partition relation id's from given relationIdList. + */ +static List * +RemovePartitionRelationIds(List *relationIdList) +{ + List *nonPartitionRelationIdList = NIL; + + Oid relationId = InvalidOid; + foreach_oid(relationId, relationIdList) + { + if (PartitionTable(relationId)) + { + continue; + } + + nonPartitionRelationIdList = lappend_oid(nonPartitionRelationIdList, relationId); + } + + return nonPartitionRelationIdList; +} + + +/* + * EnsureSequentialModeForCitusTableCascadeFunction switches to sequential + * execution mode if needed. If it's not possible, then errors out. + */ +static void +EnsureSequentialModeForCitusTableCascadeFunction(List *relationIdList) +{ + if (!RelationIdListHasReferenceTable(relationIdList)) + { + /* + * We don't need to switch to sequential execution if there is no + * reference table in our foreign key subgraph. + */ + return; + } + + if (ParallelQueryExecutedInTransaction()) + { + ereport(ERROR, (errmsg("cannot execute command because there was a parallel " + "operation on a distributed table in transaction"), + errhint("Try re-running the transaction with " + "\"SET LOCAL citus.multi_shard_modify_mode TO " + "\'sequential\';\""))); + } + + ereport(DEBUG1, (errmsg("switching to sequential query execution mode because the " + "operation cascades into distributed tables with foreign " + "keys to reference tables"))); + SetLocalMultiShardModifyModeToSequential(); +} + + +/* + * RelationIdListHasReferenceTable returns true if relationIdList has a relation + * id that belongs to a reference table. + */ +static bool +RelationIdListHasReferenceTable(List *relationIdList) +{ + Oid relationId = InvalidOid; + foreach_oid(relationId, relationIdList) + { + if (IsCitusTableType(relationId, REFERENCE_TABLE)) + { + return true; + } + } + + return false; +} + + +/* + * GetFKeyCreationCommandsForRelationIdList returns a list of DDL commands to + * create foreign keys for each relation in relationIdList. + */ +static List * +GetFKeyCreationCommandsForRelationIdList(List *relationIdList) +{ + List *fKeyCreationCommands = NIL; + + Oid relationId = InvalidOid; + foreach_oid(relationId, relationIdList) + { + List *relationFKeyCreationCommands = + GetReferencingForeignConstaintCommands(relationId); + fKeyCreationCommands = list_concat(fKeyCreationCommands, + relationFKeyCreationCommands); + } + + return fKeyCreationCommands; +} + + +/* + * DropRelationIdListForeignKeys drops foreign keys for each relation in given + * relation id list. + */ +static void +DropRelationIdListForeignKeys(List *relationIdList) +{ + Oid relationId = InvalidOid; + foreach_oid(relationId, relationIdList) + { + DropRelationForeignKeys(relationId); + } +} + + +/* + * DropRelationForeignKeys drops foreign keys where the relation with + * relationId is the referencing relation. + */ +static void +DropRelationForeignKeys(Oid relationId) +{ + List *dropFkeyCascadeCommandList = GetRelationDropFkeyCommands(relationId); + ExecuteAndLogDDLCommandList(dropFkeyCascadeCommandList); +} + + +/* + * GetRelationDropFkeyCommands returns a list of DDL commands to drop foreign + * keys where the relation with relationId is the referencing relation. + */ +static List * +GetRelationDropFkeyCommands(Oid relationId) +{ + List *dropFkeyCascadeCommandList = NIL; + + int flag = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES; + List *relationFKeyIdList = GetForeignKeyOids(relationId, flag); + + Oid foreignKeyId; + foreach_oid(foreignKeyId, relationFKeyIdList) + { + char *dropFkeyCascadeCommand = GetDropFkeyCascadeCommand(relationId, + foreignKeyId); + dropFkeyCascadeCommandList = lappend(dropFkeyCascadeCommandList, + dropFkeyCascadeCommand); + } + + return dropFkeyCascadeCommandList; +} + + +/* + * GetDropFkeyCascadeCommand returns DDL command to drop foreign key with + * foreignKeyId. + */ +static char * +GetDropFkeyCascadeCommand(Oid relationId, Oid foreignKeyId) +{ + char *qualifiedRelationName = generate_qualified_relation_name(relationId); + + char *constraintName = get_constraint_name(foreignKeyId); + const char *quotedConstraintName = quote_identifier(constraintName); + + StringInfo dropFkeyCascadeCommand = makeStringInfo(); + appendStringInfo(dropFkeyCascadeCommand, "ALTER TABLE %s DROP CONSTRAINT %s CASCADE;", + qualifiedRelationName, quotedConstraintName); + + return dropFkeyCascadeCommand->data; +} + + +/* + * ExecuteCascadeOperationForRelationIdList executes citus table function + * specified by CascadeOperationType argument for given relation id + * list. + */ +static void +ExecuteCascadeOperationForRelationIdList(List *relationIdList, + CascadeOperationType + cascadeOperationType) +{ + Oid relationId = InvalidOid; + foreach_oid(relationId, relationIdList) + { + CascadeOperationFunction cascadeOperationFunction = + GetCascadeOperationFunction(cascadeOperationType); + + /* + * Caller already passed the relations that we should operate on, + * so we should not cascade here. + */ + bool cascadeViaForeignKeys = false; + cascadeOperationFunction(relationId, cascadeViaForeignKeys); + } +} + + +/* + * GetCascadeOperationFunction returns c api for citus table operation according + * to given CascadeOperationType. + */ +static CascadeOperationFunction +GetCascadeOperationFunction(CascadeOperationType cascadeOperationType) +{ + switch (cascadeOperationType) + { + default: + { + /* + * This is not expected as other create table functions don't have + * cascade option yet. To be on the safe side, error out here. + */ + ereport(ERROR, (errmsg("citus table function could not be found"))); + } + } +} + + +/* + * ExecuteAndLogDDLCommandList takes a list of ddl commands and calls + * ExecuteAndLogDDLCommand function for each of them. + */ +void +ExecuteAndLogDDLCommandList(List *ddlCommandList) +{ + char *ddlCommand = NULL; + foreach_ptr(ddlCommand, ddlCommandList) + { + ExecuteAndLogDDLCommand(ddlCommand); + } +} + + +/* + * ExecuteAndLogDDLCommand takes a ddl command and logs it in DEBUG4 log level. + * Then, parses and executes it via CitusProcessUtility. + */ +void +ExecuteAndLogDDLCommand(const char *commandString) +{ + ereport(DEBUG4, (errmsg("executing \"%s\"", commandString))); + + Node *parseTree = ParseTreeNode(commandString); + CitusProcessUtility(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL, + NULL, None_Receiver, NULL); +} diff --git a/src/backend/distributed/commands/create_citus_local_table.c b/src/backend/distributed/commands/create_citus_local_table.c index 352c3197e..b65859820 100644 --- a/src/backend/distributed/commands/create_citus_local_table.c +++ b/src/backend/distributed/commands/create_citus_local_table.c @@ -54,8 +54,6 @@ static char * GetRenameShardIndexCommand(char *indexName, uint64 shardId); static void RenameShardRelationNonTruncateTriggers(Oid shardRelationId, uint64 shardId); static char * GetRenameShardTriggerCommand(Oid shardRelationId, char *triggerName, uint64 shardId); -static void ExecuteAndLogDDLCommandList(List *ddlCommandList); -static void ExecuteAndLogDDLCommand(const char *commandString); static void DropRelationTruncateTriggers(Oid relationId); static char * GetDropTriggerCommand(Oid relationId, char *triggerName); static List * GetExplicitIndexNameList(Oid relationId); @@ -553,36 +551,6 @@ GetRenameShardTriggerCommand(Oid shardRelationId, char *triggerName, uint64 shar } -/* - * ExecuteAndLogDDLCommandList takes a list of ddl commands and calls - * ExecuteAndLogDDLCommand function for each of them. - */ -static void -ExecuteAndLogDDLCommandList(List *ddlCommandList) -{ - char *ddlCommand = NULL; - foreach_ptr(ddlCommand, ddlCommandList) - { - ExecuteAndLogDDLCommand(ddlCommand); - } -} - - -/* - * ExecuteAndLogDDLCommand takes a ddl command and logs it in DEBUG4 log level. - * Then, parses and executes it via CitusProcessUtility. - */ -static void -ExecuteAndLogDDLCommand(const char *commandString) -{ - ereport(DEBUG4, (errmsg("executing \"%s\"", commandString))); - - Node *parseTree = ParseTreeNode(commandString); - CitusProcessUtility(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL, - NULL, None_Receiver, NULL); -} - - /* * DropRelationTruncateTriggers drops TRUNCATE triggers that are explicitly * created on relation with relationId. diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 9a14f8d43..5882ad6c3 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -387,6 +387,23 @@ extern List * CitusLocalTableTriggerCommandDDLJob(Oid relationId, char *triggerN const char *queryString); extern Oid GetTriggerFunctionId(Oid triggerId); +/* cascade_citus_table_function.c */ + +/* + * Flags that can be passed to CascadeOperationForConnectedRelations to specify + * citus table function to be executed in cascading mode. + */ +typedef enum CascadeOperationType +{ + INVALID_OPERATION = 1 << 0, +} CascadeOperationType; + +extern void CascadeOperationForConnectedRelations(Oid relationId, LOCKMODE relLockMode, + CascadeOperationType + cascadeOperationType); +extern void ExecuteAndLogDDLCommandList(List *ddlCommandList); +extern void ExecuteAndLogDDLCommand(const char *commandString); + extern bool ShouldPropagateSetCommand(VariableSetStmt *setStmt); extern void PostprocessVariableSetStmt(VariableSetStmt *setStmt, const char *setCommand); From f3801143fb14bbb68a8073aad30e7a2aea2d7313 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Thu, 7 Jan 2021 13:35:30 +0300 Subject: [PATCH 2/2] Add cascade option to undistribute_table --- ..._table_operation_for_connected_relations.c | 5 + .../commands/create_distributed_table.c | 52 ++- .../distributed/sql/citus--9.5-1--10.0-1.sql | 1 + .../sql/downgrades/citus--10.0-1--9.5-1.sql | 2 + .../sql/udfs/undistribute_table/10.0-1.sql | 10 + .../sql/udfs/undistribute_table/latest.sql | 5 +- src/include/distributed/commands.h | 3 + src/include/distributed/metadata_utility.h | 1 + src/test/regress/expected/multi_extension.out | 4 +- .../regress/expected/multi_extension_0.out | 4 +- src/test/regress/expected/single_node.out | 66 +++ .../regress/expected/undistribute_table.out | 2 + .../expected/undistribute_table_cascade.out | 381 ++++++++++++++++++ .../undistribute_table_cascade_mx.out | 81 ++++ .../expected/upgrade_list_citus_objects.out | 2 +- .../expected/upgrade_list_citus_objects_0.out | 2 +- src/test/regress/multi_mx_schedule | 1 + src/test/regress/multi_schedule | 2 +- src/test/regress/sql/single_node.sql | 22 + .../sql/undistribute_table_cascade.sql | 223 ++++++++++ .../sql/undistribute_table_cascade_mx.sql | 48 +++ 21 files changed, 901 insertions(+), 16 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/undistribute_table/10.0-1.sql create mode 100644 src/test/regress/expected/undistribute_table_cascade.out create mode 100644 src/test/regress/expected/undistribute_table_cascade_mx.out create mode 100644 src/test/regress/sql/undistribute_table_cascade.sql create mode 100644 src/test/regress/sql/undistribute_table_cascade_mx.sql diff --git a/src/backend/distributed/commands/cascade_table_operation_for_connected_relations.c b/src/backend/distributed/commands/cascade_table_operation_for_connected_relations.c index d3d60a9d9..15a758526 100644 --- a/src/backend/distributed/commands/cascade_table_operation_for_connected_relations.c +++ b/src/backend/distributed/commands/cascade_table_operation_for_connected_relations.c @@ -321,6 +321,11 @@ GetCascadeOperationFunction(CascadeOperationType cascadeOperationType) { switch (cascadeOperationType) { + case UNDISTRIBUTE_TABLE: + { + return UndistributeTable; + } + default: { /* diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 9efd91816..262952481 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -86,6 +86,11 @@ */ #define LOG_PER_TUPLE_AMOUNT 1000000 +#define UNDISTRIBUTE_TABLE_CASCADE_HINT \ + "Use cascade option to undistribute all the relations involved in " \ + "a foreign key relationship with %s by executing SELECT " \ + "undistribute_table($$%s$$, cascade_via_foreign_keys=>true)" + /* Replication model to use when creating distributed tables */ int ReplicationModel = REPLICATION_MODEL_COORDINATOR; @@ -124,7 +129,6 @@ static void DoCopyFromLocalTableIntoShards(Relation distributedRelation, DestReceiver *copyDest, TupleTableSlot *slot, EState *estate); -static void UndistributeTable(Oid relationId); static List * GetViewCreationCommandsOfTable(Oid relationId); static void ReplaceTable(Oid sourceId, Oid targetId); @@ -289,10 +293,11 @@ Datum undistribute_table(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); + bool cascadeViaForeignKeys = PG_GETARG_BOOL(1); CheckCitusVersion(ERROR); - UndistributeTable(relationId); + UndistributeTable(relationId, cascadeViaForeignKeys); PG_RETURN_VOID(); } @@ -1552,7 +1557,7 @@ DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc, * be dropped. */ void -UndistributeTable(Oid relationId) +UndistributeTable(Oid relationId, bool cascadeViaForeignKeys) { EnsureCoordinator(); EnsureRelationExists(relationId); @@ -1574,16 +1579,37 @@ UndistributeTable(Oid relationId) errdetail("because the table is not distributed"))); } - if (TableReferencing(relationId)) + bool tableReferencing = TableReferencing(relationId); + bool tableReferenced = TableReferenced(relationId); + if (cascadeViaForeignKeys && (tableReferencing || tableReferenced)) { - ereport(ERROR, (errmsg("cannot undistribute table " - "because it has a foreign key"))); + CascadeOperationForConnectedRelations(relationId, lockMode, UNDISTRIBUTE_TABLE); + + /* + * Undistributed every foreign key connected relation in our foreign key + * subgraph including itself, so return here. + */ + return; } - if (TableReferenced(relationId)) + if (tableReferencing) { + char *qualifiedRelationName = generate_qualified_relation_name(relationId); ereport(ERROR, (errmsg("cannot undistribute table " - "because a foreign key references to it"))); + "because it has a foreign key"), + errhint(UNDISTRIBUTE_TABLE_CASCADE_HINT, + qualifiedRelationName, + qualifiedRelationName))); + } + + if (tableReferenced) + { + char *qualifiedRelationName = generate_qualified_relation_name(relationId); + ereport(ERROR, (errmsg("cannot undistribute table " + "because a foreign key references to it"), + errhint(UNDISTRIBUTE_TABLE_CASCADE_HINT, + qualifiedRelationName, + qualifiedRelationName))); } char relationKind = get_rel_relkind(relationId); @@ -1642,7 +1668,15 @@ UndistributeTable(Oid relationId) } preLoadCommands = lappend(preLoadCommands, makeTableDDLCommandString(attachPartitionCommand)); - UndistributeTable(partitionRelationId); + + /* + * Even if we called UndistributeTable with cascade option, we + * shouldn't cascade via foreign keys on partitions. Otherwise, + * we might try to undistribute partitions of other tables in + * our foreign key subgraph more than once. + */ + bool cascadeOnPartitionsViaForeignKeys = false; + UndistributeTable(partitionRelationId, cascadeOnPartitionsViaForeignKeys); } } diff --git a/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql b/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql index 41cfb951f..87c513800 100644 --- a/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql +++ b/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql @@ -6,5 +6,6 @@ DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass); #include "udfs/citus_total_relation_size/10.0-1.sql" #include "udfs/citus_tables/10.0-1.sql" #include "udfs/citus_finish_pg_upgrade/10.0-1.sql" +#include "udfs/undistribute_table/10.0-1.sql" #include "../../columnar/sql/columnar--9.5-1--10.0-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--10.0-1--9.5-1.sql b/src/backend/distributed/sql/downgrades/citus--10.0-1--9.5-1.sql index 8b028c280..4d058c55d 100644 --- a/src/backend/distributed/sql/downgrades/citus--10.0-1--9.5-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--10.0-1--9.5-1.sql @@ -7,6 +7,8 @@ DROP VIEW public.citus_tables; DROP FUNCTION pg_catalog.citus_total_relation_size(regclass,boolean); +DROP FUNCTION pg_catalog.undistribute_table(regclass,boolean); #include "../udfs/citus_total_relation_size/7.0-1.sql" #include "../udfs/upgrade_to_reference_table/8.0-1.sql" +#include "../udfs/undistribute_table/9.5-1.sql" diff --git a/src/backend/distributed/sql/udfs/undistribute_table/10.0-1.sql b/src/backend/distributed/sql/udfs/undistribute_table/10.0-1.sql new file mode 100644 index 000000000..410fac8e0 --- /dev/null +++ b/src/backend/distributed/sql/udfs/undistribute_table/10.0-1.sql @@ -0,0 +1,10 @@ +DROP FUNCTION pg_catalog.undistribute_table(regclass); +CREATE OR REPLACE FUNCTION pg_catalog.undistribute_table( + table_name regclass, cascade_via_foreign_keys boolean default false) + RETURNS VOID + LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$undistribute_table$$; + +COMMENT ON FUNCTION pg_catalog.undistribute_table( + table_name regclass, cascade_via_foreign_keys boolean) + IS 'undistributes a distributed table'; diff --git a/src/backend/distributed/sql/udfs/undistribute_table/latest.sql b/src/backend/distributed/sql/udfs/undistribute_table/latest.sql index 5cc7165aa..410fac8e0 100644 --- a/src/backend/distributed/sql/udfs/undistribute_table/latest.sql +++ b/src/backend/distributed/sql/udfs/undistribute_table/latest.sql @@ -1,9 +1,10 @@ +DROP FUNCTION pg_catalog.undistribute_table(regclass); CREATE OR REPLACE FUNCTION pg_catalog.undistribute_table( - table_name regclass) + table_name regclass, cascade_via_foreign_keys boolean default false) RETURNS VOID LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$undistribute_table$$; COMMENT ON FUNCTION pg_catalog.undistribute_table( - table_name regclass) + table_name regclass, cascade_via_foreign_keys boolean) IS 'undistributes a distributed table'; diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 5882ad6c3..58773a88f 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -396,6 +396,9 @@ extern Oid GetTriggerFunctionId(Oid triggerId); typedef enum CascadeOperationType { INVALID_OPERATION = 1 << 0, + + /* execute UndistributeTable on each relation */ + UNDISTRIBUTE_TABLE = 1 << 1, } CascadeOperationType; extern void CascadeOperationForConnectedRelations(Oid relationId, LOCKMODE relLockMode, diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index a34a6a9ff..32535421d 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -138,6 +138,7 @@ extern void CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributionMethod, char *colocateWithTableName, bool viaDeprecatedAPI); extern void CreateTruncateTrigger(Oid relationId); +extern void UndistributeTable(Oid relationId, bool cascadeViaForeignKeys); extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); extern List * GetDistributableDependenciesForObject(const ObjectAddress *target); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index e8817f8a8..4c1d1d510 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -446,6 +446,7 @@ SELECT * FROM print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- function citus_total_relation_size(regclass) | + function undistribute_table(regclass) | function upgrade_to_reference_table(regclass) | | access method columnar | function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean) @@ -453,13 +454,14 @@ SELECT * FROM print_extension_changes(); | function citus_internal.columnar_ensure_objects_exist() | function citus_total_relation_size(regclass,boolean) | function columnar.columnar_handler(internal) + | function undistribute_table(regclass,boolean) | schema columnar | sequence columnar.storageid_seq | table columnar.columnar_skipnodes | table columnar.columnar_stripes | table columnar.options | view citus_tables -(14 rows) +(16 rows) DROP TABLE prev_objects, extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_extension_0.out b/src/test/regress/expected/multi_extension_0.out index 5193a3298..29bb6524e 100644 --- a/src/test/regress/expected/multi_extension_0.out +++ b/src/test/regress/expected/multi_extension_0.out @@ -446,16 +446,18 @@ SELECT * FROM print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- function citus_total_relation_size(regclass) | + function undistribute_table(regclass) | function upgrade_to_reference_table(regclass) | | function citus_internal.columnar_ensure_objects_exist() | function citus_total_relation_size(regclass,boolean) + | function undistribute_table(regclass,boolean) | schema columnar | sequence columnar.storageid_seq | table columnar.columnar_skipnodes | table columnar.columnar_stripes | table columnar.options | view citus_tables -(10 rows) +(12 rows) DROP TABLE prev_objects, extension_diff; -- show running version diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 53238612a..ee5b71281 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -1019,6 +1019,72 @@ SELECT * FROM pg_dist_partition WHERE logicalrelid = 'test_2'::regclass; --------------------------------------------------------------------- (0 rows) +CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1)); +SELECT create_reference_table('reference_table_1'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE distributed_table_1 (col_1 INT UNIQUE); +SELECT create_distributed_table('distributed_table_1', 'col_1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE); +SELECT create_citus_local_table('citus_local_table_1'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1); +CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200); +CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300); +SELECT create_distributed_table('partitioned_table_1', 'col_1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); +ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1); +ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1); +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); +ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); +SELECT undistribute_table('partitioned_table_1', cascade_via_foreign_keys=>true); +NOTICE: undistributing the partitions of single_node.partitioned_table_1 +NOTICE: creating a new local table for single_node.partitioned_table_1_100_200 +NOTICE: Moving the data of single_node.partitioned_table_1_100_200 +NOTICE: Dropping the old single_node.partitioned_table_1_100_200 +NOTICE: Renaming the new table to single_node.partitioned_table_1_100_200 +NOTICE: creating a new local table for single_node.partitioned_table_1_200_300 +NOTICE: Moving the data of single_node.partitioned_table_1_200_300 +NOTICE: Dropping the old single_node.partitioned_table_1_200_300 +NOTICE: Renaming the new table to single_node.partitioned_table_1_200_300 +NOTICE: creating a new local table for single_node.partitioned_table_1 +NOTICE: Moving the data of single_node.partitioned_table_1 +NOTICE: Dropping the old single_node.partitioned_table_1 +NOTICE: Renaming the new table to single_node.partitioned_table_1 +NOTICE: creating a new local table for single_node.reference_table_1 +NOTICE: Moving the data of single_node.reference_table_1 +NOTICE: Dropping the old single_node.reference_table_1 +NOTICE: Renaming the new table to single_node.reference_table_1 +NOTICE: creating a new local table for single_node.distributed_table_1 +NOTICE: Moving the data of single_node.distributed_table_1 +NOTICE: Dropping the old single_node.distributed_table_1 +NOTICE: Renaming the new table to single_node.distributed_table_1 +NOTICE: creating a new local table for single_node.citus_local_table_1 +NOTICE: Moving the data of single_node.citus_local_table_1 +NOTICE: Dropping the old single_node.citus_local_table_1 +NOTICE: Renaming the new table to single_node.citus_local_table_1 + undistribute_table +--------------------------------------------------------------------- + +(1 row) + CREATE PROCEDURE call_delegation(x int) LANGUAGE plpgsql AS $$ BEGIN INSERT INTO test (x) VALUES ($1); diff --git a/src/test/regress/expected/undistribute_table.out b/src/test/regress/expected/undistribute_table.out index 54c323cdb..16c16de00 100644 --- a/src/test/regress/expected/undistribute_table.out +++ b/src/test/regress/expected/undistribute_table.out @@ -123,8 +123,10 @@ SELECT create_distributed_table('referencing_table', 'id'); INSERT INTO referencing_table VALUES (4, 6, 'cba'), (1, 1, 'dcba'), (2, 3, 'aaa'); SELECT undistribute_table('referenced_table'); ERROR: cannot undistribute table because a foreign key references to it +HINT: Use cascade option to undistribute all the relations involved in a foreign key relationship with undistribute_table.referenced_table by executing SELECT undistribute_table($$undistribute_table.referenced_table$$, cascade_via_foreign_keys=>true) SELECT undistribute_table('referencing_table'); ERROR: cannot undistribute table because it has a foreign key +HINT: Use cascade option to undistribute all the relations involved in a foreign key relationship with undistribute_table.referencing_table by executing SELECT undistribute_table($$undistribute_table.referencing_table$$, cascade_via_foreign_keys=>true) DROP TABLE referenced_table, referencing_table; -- test distributed foreign tables -- we expect errors diff --git a/src/test/regress/expected/undistribute_table_cascade.out b/src/test/regress/expected/undistribute_table_cascade.out new file mode 100644 index 000000000..e0da4ed8a --- /dev/null +++ b/src/test/regress/expected/undistribute_table_cascade.out @@ -0,0 +1,381 @@ +\set VERBOSITY terse +SET citus.next_shard_id TO 1515000; +SET citus.shard_replication_factor TO 1; +CREATE SCHEMA undistribute_table_cascade; +SET search_path TO undistribute_table_cascade; +SET client_min_messages to ERROR; +-- ensure that coordinator is added to pg_dist_node +SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1)); +CREATE TABLE reference_table_2 (col_1 INT UNIQUE, col_2 INT UNIQUE); +SELECT create_reference_table('reference_table_1'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('reference_table_2'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE distributed_table_1 (col_1 INT UNIQUE); +CREATE TABLE distributed_table_2 (col_1 INT UNIQUE); +CREATE TABLE distributed_table_3 (col_1 INT UNIQUE); +SELECT create_distributed_table('distributed_table_1', 'col_1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('distributed_table_2', 'col_1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('distributed_table_3', 'col_1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE); +CREATE TABLE citus_local_table_2 (col_1 INT UNIQUE); +SELECT create_citus_local_table('citus_local_table_1'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_citus_local_table('citus_local_table_2'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +-- --- --- +-- | | | | +-- | v | v +-- distributed_table_2 -> distributed_table_1 -> reference_table_1 <- reference_table_2 +-- ^ | ^ | +-- v | | v +-- distributed_table_3 <------------ citus_local_table_1 citus_local_table_2 +-- +ALTER TABLE distributed_table_3 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES distributed_table_2(col_1); +ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_1) REFERENCES distributed_table_3(col_1); +ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES distributed_table_1(col_1); +ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1); +ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1, col_2) REFERENCES reference_table_1(col_2, col_1); +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_6 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); +ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_7 FOREIGN KEY (col_1) REFERENCES citus_local_table_2(col_1); +ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_8 FOREIGN KEY (col_1) REFERENCES distributed_table_3(col_1); +ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_11 FOREIGN KEY (col_1) REFERENCES distributed_table_2(col_1); +ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_12 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1); +-- show that all of below fails as we didn't provide cascade=true +SELECT undistribute_table('distributed_table_1'); +ERROR: cannot undistribute table because it has a foreign key +SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>false); +ERROR: cannot undistribute table because it has a foreign key +SELECT undistribute_table('reference_table_2'); +ERROR: cannot undistribute table because it has a foreign key +-- In each of below transation blocks, show that we preserve foreign keys. +-- Also show that we don't have any citus tables in current schema after +-- undistribute_table(cascade). +-- So in each transaction, both selects should return true. +BEGIN; + SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true); + undistribute_table +--------------------------------------------------------------------- + +(1 row) + + -- show that we switch to sequential execution as there are + -- reference tables in our subgraph + show citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + sequential +(1 row) + + SELECT COUNT(*)=10 FROM pg_constraint + WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND + conname ~ '^fkey\_\d+$'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + + SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables + WHERE tablename=logicalrelid::regclass::text AND + schemaname='undistribute_table_cascade'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +BEGIN; + SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true); + undistribute_table +--------------------------------------------------------------------- + +(1 row) + + SELECT COUNT(*)=10 FROM pg_constraint + WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND + conname ~ '^fkey\_\d+$'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + + SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables + WHERE tablename=logicalrelid::regclass::text AND + schemaname='undistribute_table_cascade'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +BEGIN; + SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>true); + undistribute_table +--------------------------------------------------------------------- + +(1 row) + + -- print foreign keys only in one of xact blocks not to make tests too verbose + SELECT conname, conrelid::regclass, confrelid::regclass + FROM pg_constraint + WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND + conname ~ '^fkey\_\d+$' + ORDER BY conname; + conname | conrelid | confrelid +--------------------------------------------------------------------- + fkey_1 | distributed_table_3 | distributed_table_2 + fkey_11 | distributed_table_2 | distributed_table_2 + fkey_12 | reference_table_1 | reference_table_1 + fkey_2 | distributed_table_2 | distributed_table_3 + fkey_3 | distributed_table_2 | distributed_table_1 + fkey_4 | distributed_table_1 | reference_table_1 + fkey_5 | reference_table_2 | reference_table_1 + fkey_6 | citus_local_table_1 | reference_table_1 + fkey_7 | reference_table_2 | citus_local_table_2 + fkey_8 | distributed_table_1 | distributed_table_3 +(10 rows) + + SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables + WHERE tablename=logicalrelid::regclass::text AND + schemaname='undistribute_table_cascade'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +BEGIN; + SELECT COUNT(*) FROM distributed_table_1; + count +--------------------------------------------------------------------- + 0 +(1 row) + + -- show that we error out as select is executed in parallel mode + -- and there are reference tables in our subgraph + SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true); +ERROR: cannot execute command because there was a parallel operation on a distributed table in transaction +ROLLBACK; +BEGIN; + set citus.multi_shard_modify_mode to 'sequential'; + SELECT COUNT(*) FROM distributed_table_1; + count +--------------------------------------------------------------------- + 0 +(1 row) + + -- even if there are reference tables in our subgraph, show that + -- we don't error out as we already switched to sequential execution + SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true); + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +ROLLBACK; +ALTER TABLE distributed_table_1 DROP CONSTRAINT fkey_4; +BEGIN; + SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true); + undistribute_table +--------------------------------------------------------------------- + +(1 row) + + -- as we splitted distributed_table_1,2 & 3 into a seperate subgraph + -- by dropping reference_table_1, we should not switch to sequential + -- execution + show citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + parallel +(1 row) + +ROLLBACK; +-- split distributed_table_2 & distributed_table_3 into a seperate foreign +-- key subgraph then undistribute them +ALTER TABLE distributed_table_2 DROP CONSTRAINT fkey_3; +ALTER TABLE distributed_table_1 DROP CONSTRAINT fkey_8; +SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true); + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +-- should return true as we undistributed those two tables +SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables +WHERE tablename=logicalrelid::regclass::text AND + schemaname='undistribute_table_cascade' AND + (tablename='distributed_table_2' OR tablename='distributed_table_3'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- other tables should stay as is since we splited those two tables +SELECT COUNT(*)=5 FROM pg_dist_partition, pg_tables +WHERE tablename=logicalrelid::regclass::text AND + schemaname='undistribute_table_cascade'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- test partitioned tables +CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1); +CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200); +CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300); +SELECT create_distributed_table('partitioned_table_1', 'col_1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE partitioned_table_2 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1); +CREATE TABLE partitioned_table_2_100_200 PARTITION OF partitioned_table_2 FOR VALUES FROM (100) TO (200); +CREATE TABLE partitioned_table_2_200_300 PARTITION OF partitioned_table_2 FOR VALUES FROM (200) TO (300); +SELECT create_distributed_table('partitioned_table_2', 'col_1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE reference_table_3 (col_1 INT UNIQUE, col_2 INT UNIQUE); +SELECT create_reference_table('reference_table_3'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_9 FOREIGN KEY (col_1) REFERENCES reference_table_3(col_2); +ALTER TABLE partitioned_table_2 ADD CONSTRAINT fkey_10 FOREIGN KEY (col_1) REFERENCES reference_table_3(col_2); +BEGIN; + SELECT undistribute_table('partitioned_table_2', cascade_via_foreign_keys=>true); + undistribute_table +--------------------------------------------------------------------- + +(1 row) + + -- show that we preserve foreign keys on partitions too + SELECT conname, conrelid::regclass, confrelid::regclass + FROM pg_constraint + WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND + conname = 'fkey_9' OR conname = 'fkey_10' + ORDER BY 1,2,3; + conname | conrelid | confrelid +--------------------------------------------------------------------- + fkey_10 | partitioned_table_2_100_200 | reference_table_3 + fkey_10 | partitioned_table_2_200_300 | reference_table_3 + fkey_10 | partitioned_table_2 | reference_table_3 + fkey_9 | partitioned_table_1_100_200 | reference_table_3 + fkey_9 | partitioned_table_1_200_300 | reference_table_3 + fkey_9 | partitioned_table_1 | reference_table_3 +(6 rows) + +ROLLBACK; +-- as pg < 12 doesn't support foreign keys between partitioned tables, +-- define below foreign key conditionally instead of adding another +-- test output +DO $proc$ +BEGIN +IF substring(current_Setting('server_version'), '\d+')::int >= 12 THEN + EXECUTE + $$ + ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_13 FOREIGN KEY (col_1) REFERENCES partitioned_table_2(col_1); + $$; +END IF; +END$proc$; +BEGIN; + -- For pg versions 11, 12 & 13, partitioned_table_1 references to reference_table_3 + -- and partitioned_table_2 references to reference_table_3. + -- For pg versions > 11, partitioned_table_1 references to partitioned_table_2 as well. + -- Anyway show that undistribute_table with cascade is fine. + SELECT undistribute_table('partitioned_table_2', cascade_via_foreign_keys=>true); + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +ROLLBACK; +-- now merge partitioned_table_1, 2 and reference_table_3 into right +-- hand-side of the graph +ALTER TABLE reference_table_3 ADD CONSTRAINT fkey_14 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1); +BEGIN; + SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>true); + undistribute_table +--------------------------------------------------------------------- + +(1 row) + + -- undistributing citus_local_table_1 cascades to partitioned tables too + SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables + WHERE tablename=logicalrelid::regclass::text AND + schemaname='undistribute_table_cascade' AND + tablename LIKE 'partitioned_table_%'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +CREATE SCHEMA "bad!schemaName"; +CREATE TABLE "bad!schemaName"."LocalTabLE.1!?!"(col_1 INT UNIQUE); +CREATE TABLE "bad!schemaName"."LocalTabLE.2!?!"(col_1 INT UNIQUE); +SELECT create_citus_local_table('"bad!schemaName"."LocalTabLE.1!?!"'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_citus_local_table('"bad!schemaName"."LocalTabLE.2!?!"'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +ALTER TABLE "bad!schemaName"."LocalTabLE.1!?!" ADD CONSTRAINT "bad!constraintName" FOREIGN KEY (col_1) REFERENCES "bad!schemaName"."LocalTabLE.2!?!"(col_1); +-- test with weird schema, table & constraint names +SELECT undistribute_table('"bad!schemaName"."LocalTabLE.1!?!"', cascade_via_foreign_keys=>true); + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +-- cleanup at exit +DROP SCHEMA undistribute_table_cascade, "bad!schemaName" CASCADE; diff --git a/src/test/regress/expected/undistribute_table_cascade_mx.out b/src/test/regress/expected/undistribute_table_cascade_mx.out new file mode 100644 index 000000000..e57b6207c --- /dev/null +++ b/src/test/regress/expected/undistribute_table_cascade_mx.out @@ -0,0 +1,81 @@ +\set VERBOSITY terse +SET citus.next_shard_id TO 1517000; +SET citus.shard_replication_factor TO 1; +SET citus.enable_local_execution TO ON; +CREATE SCHEMA undistribute_table_cascade_mx; +SET search_path TO undistribute_table_cascade_mx; +SET client_min_messages to ERROR; +-- ensure that coordinator is added to pg_dist_node +SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- ensure that we sync metadata to worker 1 & 2 +SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_1_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1)); +SELECT create_reference_table('reference_table_1'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE distributed_table_1 (col_1 INT UNIQUE); +SELECT create_distributed_table('distributed_table_1', 'col_1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE); +SELECT create_citus_local_table('citus_local_table_1'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1); +CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200); +CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300); +SELECT create_distributed_table('partitioned_table_1', 'col_1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); +ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1); +ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1); +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); +ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); +SELECT undistribute_table('partitioned_table_1', cascade_via_foreign_keys=>true); + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +-- both workers should print 0 as we undistributed all relations in this schema +SELECT run_command_on_workers( +$$ +SELECT count(*) FROM pg_catalog.pg_tables WHERE schemaname='undistribute_table_cascade_mx' +$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,0) + (localhost,57638,t,0) +(2 rows) + +-- cleanup at exit +DROP SCHEMA undistribute_table_cascade_mx CASCADE; diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index c2ebeb2d2..1f47b3323 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -153,7 +153,7 @@ ORDER BY 1; function start_metadata_sync_to_node(text,integer) function stop_metadata_sync_to_node(text,integer) function truncate_local_data_after_distributing_table(regclass) - function undistribute_table(regclass) + function undistribute_table(regclass,boolean) function update_distributed_table_colocation(regclass,text) function worker_append_table_to_shard(text,text,text,integer) function worker_apply_inter_shard_ddl_command(bigint,text,bigint,text,text) diff --git a/src/test/regress/expected/upgrade_list_citus_objects_0.out b/src/test/regress/expected/upgrade_list_citus_objects_0.out index cfd8ce309..e7348b2c7 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects_0.out +++ b/src/test/regress/expected/upgrade_list_citus_objects_0.out @@ -149,7 +149,7 @@ ORDER BY 1; function start_metadata_sync_to_node(text,integer) function stop_metadata_sync_to_node(text,integer) function truncate_local_data_after_distributing_table(regclass) - function undistribute_table(regclass) + function undistribute_table(regclass,boolean) function update_distributed_table_colocation(regclass,text) function worker_append_table_to_shard(text,text,text,integer) function worker_apply_inter_shard_ddl_command(bigint,text,bigint,text,text) diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index ad69c4668..b19eafc97 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -42,6 +42,7 @@ test: multi_mx_function_call_delegation test: multi_mx_modifications local_shard_execution test: multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 test: local_shard_copy +test: undistribute_table_cascade_mx test: citus_local_tables_mx test: citus_local_tables_queries_mx test: multi_mx_transaction_recovery diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 015135a1d..344ee0094 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -322,7 +322,7 @@ test: replicate_reference_tables_to_coordinator test: coordinator_shouldhaveshards test: local_shard_utility_command_execution test: citus_local_tables -test: multi_row_router_insert mixed_relkind_tests +test: multi_row_router_insert mixed_relkind_tests undistribute_table_cascade test: remove_coordinator diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 372b3608d..b7fcd9f2d 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -576,6 +576,28 @@ ALTER TABLE test DROP CONSTRAINT foreign_key; SELECT undistribute_table('test_2'); SELECT * FROM pg_dist_partition WHERE logicalrelid = 'test_2'::regclass; +CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1)); +SELECT create_reference_table('reference_table_1'); + +CREATE TABLE distributed_table_1 (col_1 INT UNIQUE); +SELECT create_distributed_table('distributed_table_1', 'col_1'); + +CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE); +SELECT create_citus_local_table('citus_local_table_1'); + +CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1); +CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200); +CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300); +SELECT create_distributed_table('partitioned_table_1', 'col_1'); + +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); +ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1); +ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1); +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); +ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); + +SELECT undistribute_table('partitioned_table_1', cascade_via_foreign_keys=>true); + CREATE PROCEDURE call_delegation(x int) LANGUAGE plpgsql AS $$ BEGIN INSERT INTO test (x) VALUES ($1); diff --git a/src/test/regress/sql/undistribute_table_cascade.sql b/src/test/regress/sql/undistribute_table_cascade.sql new file mode 100644 index 000000000..3066abc3b --- /dev/null +++ b/src/test/regress/sql/undistribute_table_cascade.sql @@ -0,0 +1,223 @@ +\set VERBOSITY terse + +SET citus.next_shard_id TO 1515000; +SET citus.shard_replication_factor TO 1; + +CREATE SCHEMA undistribute_table_cascade; +SET search_path TO undistribute_table_cascade; + +SET client_min_messages to ERROR; + +-- ensure that coordinator is added to pg_dist_node +SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); + +CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1)); +CREATE TABLE reference_table_2 (col_1 INT UNIQUE, col_2 INT UNIQUE); +SELECT create_reference_table('reference_table_1'); +SELECT create_reference_table('reference_table_2'); + +CREATE TABLE distributed_table_1 (col_1 INT UNIQUE); +CREATE TABLE distributed_table_2 (col_1 INT UNIQUE); +CREATE TABLE distributed_table_3 (col_1 INT UNIQUE); +SELECT create_distributed_table('distributed_table_1', 'col_1'); +SELECT create_distributed_table('distributed_table_2', 'col_1'); +SELECT create_distributed_table('distributed_table_3', 'col_1'); + +CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE); +CREATE TABLE citus_local_table_2 (col_1 INT UNIQUE); +SELECT create_citus_local_table('citus_local_table_1'); +SELECT create_citus_local_table('citus_local_table_2'); + +-- --- --- +-- | | | | +-- | v | v +-- distributed_table_2 -> distributed_table_1 -> reference_table_1 <- reference_table_2 +-- ^ | ^ | +-- v | | v +-- distributed_table_3 <------------ citus_local_table_1 citus_local_table_2 +-- +ALTER TABLE distributed_table_3 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES distributed_table_2(col_1); +ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_1) REFERENCES distributed_table_3(col_1); +ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES distributed_table_1(col_1); +ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1); +ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1, col_2) REFERENCES reference_table_1(col_2, col_1); +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_6 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); +ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_7 FOREIGN KEY (col_1) REFERENCES citus_local_table_2(col_1); +ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_8 FOREIGN KEY (col_1) REFERENCES distributed_table_3(col_1); +ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_11 FOREIGN KEY (col_1) REFERENCES distributed_table_2(col_1); +ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_12 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1); + +-- show that all of below fails as we didn't provide cascade=true +SELECT undistribute_table('distributed_table_1'); +SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>false); +SELECT undistribute_table('reference_table_2'); + +-- In each of below transation blocks, show that we preserve foreign keys. +-- Also show that we don't have any citus tables in current schema after +-- undistribute_table(cascade). +-- So in each transaction, both selects should return true. + +BEGIN; + SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true); + + -- show that we switch to sequential execution as there are + -- reference tables in our subgraph + show citus.multi_shard_modify_mode; + + SELECT COUNT(*)=10 FROM pg_constraint + WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND + conname ~ '^fkey\_\d+$'; + + SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables + WHERE tablename=logicalrelid::regclass::text AND + schemaname='undistribute_table_cascade'; +ROLLBACK; + +BEGIN; + SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true); + + SELECT COUNT(*)=10 FROM pg_constraint + WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND + conname ~ '^fkey\_\d+$'; + + SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables + WHERE tablename=logicalrelid::regclass::text AND + schemaname='undistribute_table_cascade'; +ROLLBACK; + +BEGIN; + SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>true); + + -- print foreign keys only in one of xact blocks not to make tests too verbose + SELECT conname, conrelid::regclass, confrelid::regclass + FROM pg_constraint + WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND + conname ~ '^fkey\_\d+$' + ORDER BY conname; + + SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables + WHERE tablename=logicalrelid::regclass::text AND + schemaname='undistribute_table_cascade'; +ROLLBACK; + +BEGIN; + SELECT COUNT(*) FROM distributed_table_1; + -- show that we error out as select is executed in parallel mode + -- and there are reference tables in our subgraph + SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true); +ROLLBACK; + +BEGIN; + set citus.multi_shard_modify_mode to 'sequential'; + SELECT COUNT(*) FROM distributed_table_1; + -- even if there are reference tables in our subgraph, show that + -- we don't error out as we already switched to sequential execution + SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true); +ROLLBACK; + +ALTER TABLE distributed_table_1 DROP CONSTRAINT fkey_4; + +BEGIN; + SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true); + + -- as we splitted distributed_table_1,2 & 3 into a seperate subgraph + -- by dropping reference_table_1, we should not switch to sequential + -- execution + show citus.multi_shard_modify_mode; +ROLLBACK; + +-- split distributed_table_2 & distributed_table_3 into a seperate foreign +-- key subgraph then undistribute them +ALTER TABLE distributed_table_2 DROP CONSTRAINT fkey_3; +ALTER TABLE distributed_table_1 DROP CONSTRAINT fkey_8; +SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true); + +-- should return true as we undistributed those two tables +SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables +WHERE tablename=logicalrelid::regclass::text AND + schemaname='undistribute_table_cascade' AND + (tablename='distributed_table_2' OR tablename='distributed_table_3'); + +-- other tables should stay as is since we splited those two tables +SELECT COUNT(*)=5 FROM pg_dist_partition, pg_tables +WHERE tablename=logicalrelid::regclass::text AND + schemaname='undistribute_table_cascade'; + +-- test partitioned tables +CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1); +CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200); +CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300); +SELECT create_distributed_table('partitioned_table_1', 'col_1'); + +CREATE TABLE partitioned_table_2 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1); +CREATE TABLE partitioned_table_2_100_200 PARTITION OF partitioned_table_2 FOR VALUES FROM (100) TO (200); +CREATE TABLE partitioned_table_2_200_300 PARTITION OF partitioned_table_2 FOR VALUES FROM (200) TO (300); +SELECT create_distributed_table('partitioned_table_2', 'col_1'); + +CREATE TABLE reference_table_3 (col_1 INT UNIQUE, col_2 INT UNIQUE); +SELECT create_reference_table('reference_table_3'); + +ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_9 FOREIGN KEY (col_1) REFERENCES reference_table_3(col_2); +ALTER TABLE partitioned_table_2 ADD CONSTRAINT fkey_10 FOREIGN KEY (col_1) REFERENCES reference_table_3(col_2); + +BEGIN; + SELECT undistribute_table('partitioned_table_2', cascade_via_foreign_keys=>true); + + -- show that we preserve foreign keys on partitions too + SELECT conname, conrelid::regclass, confrelid::regclass + FROM pg_constraint + WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND + conname = 'fkey_9' OR conname = 'fkey_10' + ORDER BY 1,2,3; +ROLLBACK; + +-- as pg < 12 doesn't support foreign keys between partitioned tables, +-- define below foreign key conditionally instead of adding another +-- test output +DO $proc$ +BEGIN +IF substring(current_Setting('server_version'), '\d+')::int >= 12 THEN + EXECUTE + $$ + ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_13 FOREIGN KEY (col_1) REFERENCES partitioned_table_2(col_1); + $$; +END IF; +END$proc$; + +BEGIN; + -- For pg versions 11, 12 & 13, partitioned_table_1 references to reference_table_3 + -- and partitioned_table_2 references to reference_table_3. + -- For pg versions > 11, partitioned_table_1 references to partitioned_table_2 as well. + -- Anyway show that undistribute_table with cascade is fine. + SELECT undistribute_table('partitioned_table_2', cascade_via_foreign_keys=>true); +ROLLBACK; + +-- now merge partitioned_table_1, 2 and reference_table_3 into right +-- hand-side of the graph +ALTER TABLE reference_table_3 ADD CONSTRAINT fkey_14 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1); + +BEGIN; + SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>true); + + -- undistributing citus_local_table_1 cascades to partitioned tables too + SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables + WHERE tablename=logicalrelid::regclass::text AND + schemaname='undistribute_table_cascade' AND + tablename LIKE 'partitioned_table_%'; +ROLLBACK; + +CREATE SCHEMA "bad!schemaName"; + +CREATE TABLE "bad!schemaName"."LocalTabLE.1!?!"(col_1 INT UNIQUE); +CREATE TABLE "bad!schemaName"."LocalTabLE.2!?!"(col_1 INT UNIQUE); + +SELECT create_citus_local_table('"bad!schemaName"."LocalTabLE.1!?!"'); +SELECT create_citus_local_table('"bad!schemaName"."LocalTabLE.2!?!"'); + +ALTER TABLE "bad!schemaName"."LocalTabLE.1!?!" ADD CONSTRAINT "bad!constraintName" FOREIGN KEY (col_1) REFERENCES "bad!schemaName"."LocalTabLE.2!?!"(col_1); + +-- test with weird schema, table & constraint names +SELECT undistribute_table('"bad!schemaName"."LocalTabLE.1!?!"', cascade_via_foreign_keys=>true); + +-- cleanup at exit +DROP SCHEMA undistribute_table_cascade, "bad!schemaName" CASCADE; diff --git a/src/test/regress/sql/undistribute_table_cascade_mx.sql b/src/test/regress/sql/undistribute_table_cascade_mx.sql new file mode 100644 index 000000000..4bacbaa86 --- /dev/null +++ b/src/test/regress/sql/undistribute_table_cascade_mx.sql @@ -0,0 +1,48 @@ +\set VERBOSITY terse + +SET citus.next_shard_id TO 1517000; +SET citus.shard_replication_factor TO 1; +SET citus.enable_local_execution TO ON; + +CREATE SCHEMA undistribute_table_cascade_mx; +SET search_path TO undistribute_table_cascade_mx; + +SET client_min_messages to ERROR; + +-- ensure that coordinator is added to pg_dist_node +SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); + +-- ensure that we sync metadata to worker 1 & 2 +SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_1_port); +SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port); + +CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1)); +SELECT create_reference_table('reference_table_1'); + +CREATE TABLE distributed_table_1 (col_1 INT UNIQUE); +SELECT create_distributed_table('distributed_table_1', 'col_1'); + +CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE); +SELECT create_citus_local_table('citus_local_table_1'); + +CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1); +CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200); +CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300); +SELECT create_distributed_table('partitioned_table_1', 'col_1'); + +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); +ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1); +ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1); +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); +ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); + +SELECT undistribute_table('partitioned_table_1', cascade_via_foreign_keys=>true); + +-- both workers should print 0 as we undistributed all relations in this schema +SELECT run_command_on_workers( +$$ +SELECT count(*) FROM pg_catalog.pg_tables WHERE schemaname='undistribute_table_cascade_mx' +$$); + +-- cleanup at exit +DROP SCHEMA undistribute_table_cascade_mx CASCADE;