From fe5907ad2d803e3e8bbfbe6958b22f21a4875efa Mon Sep 17 00:00:00 2001 From: Naisila Puka <37271756+naisila@users.noreply.github.com> Date: Thu, 24 Jun 2021 21:23:25 +0300 Subject: [PATCH] Adds propagation of ALTER SEQUENCE and other improvements (#5061) * Alter seq type when we first use the seq in a dist table * Don't allow type changes when seq is used in dist table * ALTER SEQUENCE propagation * Tests for ALTER SEQUENCE propagation * Relocate AlterSequenceType and ensure dependencies for sequence * Support for citus local tables, and other fixes * Final formatting --- .../citus_add_local_table_to_metadata.c | 27 +- .../commands/create_distributed_table.c | 164 ++++++++ .../commands/distribute_object_ops.c | 20 +- src/backend/distributed/commands/rename.c | 10 + src/backend/distributed/commands/sequence.c | 270 ++++++++++--- src/backend/distributed/commands/table.c | 174 +++++++- .../distributed/deparser/citus_ruleutils.c | 145 +------ .../deparser/deparse_sequence_stmts.c | 123 +++++- .../deparser/qualify_sequence_stmt.c | 85 ++++ .../distributed/metadata/metadata_sync.c | 59 +-- src/include/distributed/citus_ruleutils.h | 1 - src/include/distributed/commands.h | 17 +- src/include/distributed/deparser.h | 12 +- src/include/distributed/metadata_utility.h | 7 + .../expected/multi_sequence_default.out | 382 +++++++++++++++++- src/test/regress/expected/multi_table_ddl.out | 2 - .../regress/sql/multi_sequence_default.sql | 143 ++++++- src/test/regress/sql/multi_table_ddl.sql | 1 - 18 files changed, 1345 insertions(+), 297 deletions(-) create mode 100644 src/backend/distributed/deparser/qualify_sequence_stmt.c diff --git a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c index 3a8e54c68..75b7661dd 100644 --- a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c +++ b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c @@ -75,7 +75,7 @@ static void DropDefaultColumnDefinition(Oid relationId, char *columnName); static void TransferSequenceOwnership(Oid ownedSequenceId, Oid targetRelationId, char *columnName); static void InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId); -static void FinalizeCitusLocalTableCreation(Oid relationId); +static void FinalizeCitusLocalTableCreation(Oid relationId, List *dependentSequenceList); PG_FUNCTION_INFO_V1(citus_add_local_table_to_metadata); @@ -315,7 +315,18 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys) InsertMetadataForCitusLocalTable(shellRelationId, shardId); - FinalizeCitusLocalTableCreation(shellRelationId); + /* + * Ensure that the sequences used in column defaults of the table + * have proper types + */ + List *attnumList = NIL; + List *dependentSequenceList = NIL; + GetDependentSequencesWithRelation(shellRelationId, &attnumList, + &dependentSequenceList, 0); + EnsureDistributedSequencesHaveOneType(shellRelationId, dependentSequenceList, + attnumList); + + FinalizeCitusLocalTableCreation(shellRelationId, dependentSequenceList); } @@ -1025,9 +1036,11 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId) * FinalizeCitusLocalTableCreation completes creation of the citus local table * with relationId by performing operations that should be done after creating * the shard and inserting the metadata. + * If the cluster has metadata workers, we ensure proper propagation of the + * sequences dependent with the table. */ static void -FinalizeCitusLocalTableCreation(Oid relationId) +FinalizeCitusLocalTableCreation(Oid relationId, List *dependentSequenceList) { /* * If it is a foreign table, then skip creating citus truncate trigger @@ -1040,6 +1053,14 @@ FinalizeCitusLocalTableCreation(Oid relationId) if (ShouldSyncTableMetadata(relationId)) { + if (ClusterHasKnownMetadataWorkers()) + { + /* + * Ensure sequence dependencies and mark them as distributed + * before creating table metadata on workers + */ + MarkSequenceListDistributedAndPropagateDependencies(dependentSequenceList); + } CreateTableMetadataOnWorkers(relationId); } diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 06bd5096d..111181a0c 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -33,6 +33,7 @@ #include "catalog/pg_trigger.h" #include "commands/defrem.h" #include "commands/extension.h" +#include "commands/sequence.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "distributed/commands/multi_copy.h" @@ -64,6 +65,7 @@ #include "executor/executor.h" #include "executor/spi.h" #include "nodes/execnodes.h" +#include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "nodes/pg_list.h" #include "parser/parse_expr.h" @@ -465,6 +467,16 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn, colocationId, replicationModel); + /* + * Ensure that the sequences used in column defaults of the table + * have proper types + */ + List *attnumList = NIL; + List *dependentSequenceList = NIL; + GetDependentSequencesWithRelation(relationId, &attnumList, &dependentSequenceList, 0); + EnsureDistributedSequencesHaveOneType(relationId, dependentSequenceList, + attnumList); + /* foreign tables do not support TRUNCATE trigger */ if (RegularTable(relationId)) { @@ -498,6 +510,15 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio if (ShouldSyncTableMetadata(relationId)) { + if (ClusterHasKnownMetadataWorkers()) + { + /* + * Ensure sequence dependencies and mark them as distributed + * before creating table metadata on workers + */ + MarkSequenceListDistributedAndPropagateDependencies(dependentSequenceList); + } + CreateTableMetadataOnWorkers(relationId); } @@ -544,6 +565,149 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio } +/* + * EnsureSequenceTypeSupported ensures that the type of the column that uses + * a sequence on its DEFAULT is consistent with previous uses (if any) of the + * sequence in distributed tables. + * If any other distributed table uses the input sequence, it checks whether + * the types of the columns using the sequence match. If they don't, it errors out. + * Otherwise, the condition is ensured. + */ +void +EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId) +{ + List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); + Oid citusTableId = InvalidOid; + foreach_oid(citusTableId, citusTableIdList) + { + List *attnumList = NIL; + List *dependentSequenceList = NIL; + GetDependentSequencesWithRelation(citusTableId, &attnumList, + &dependentSequenceList, 0); + ListCell *attnumCell = NULL; + ListCell *dependentSequenceCell = NULL; + forboth(attnumCell, attnumList, dependentSequenceCell, + dependentSequenceList) + { + AttrNumber currentAttnum = lfirst_int(attnumCell); + Oid currentSeqOid = lfirst_oid(dependentSequenceCell); + + /* + * If another distributed table is using the same sequence + * in one of its column defaults, make sure the types of the + * columns match + */ + if (currentSeqOid == seqOid) + { + Oid currentSeqTypId = GetAttributeTypeOid(citusTableId, + currentAttnum); + if (seqTypId != currentSeqTypId) + { + char *sequenceName = generate_qualified_relation_name( + seqOid); + char *citusTableName = + generate_qualified_relation_name(citusTableId); + ereport(ERROR, (errmsg( + "The sequence %s is already used for a different" + " type in column %d of the table %s", + sequenceName, currentAttnum, + citusTableName))); + } + } + } + } +} + + +/* + * AlterSequenceType alters the given sequence's type to the given type. + */ +void +AlterSequenceType(Oid seqOid, Oid typeOid) +{ + Form_pg_sequence sequenceData = pg_get_sequencedef(seqOid); + Oid currentSequenceTypeOid = sequenceData->seqtypid; + if (currentSequenceTypeOid != typeOid) + { + AlterSeqStmt *alterSequenceStatement = makeNode(AlterSeqStmt); + char *seqNamespace = get_namespace_name(get_rel_namespace(seqOid)); + char *seqName = get_rel_name(seqOid); + alterSequenceStatement->sequence = makeRangeVar(seqNamespace, seqName, -1); + Node *asTypeNode = (Node *) makeTypeNameFromOid(typeOid, -1); + SetDefElemArg(alterSequenceStatement, "as", asTypeNode); + ParseState *pstate = make_parsestate(NULL); + AlterSequence(pstate, alterSequenceStatement); + } +} + + +/* + * MarkSequenceListDistributedAndPropagateDependencies ensures dependencies + * for the given sequence list exist on all nodes and marks the sequences + * as distributed. + */ +void +MarkSequenceListDistributedAndPropagateDependencies(List *sequenceList) +{ + Oid sequenceOid = InvalidOid; + foreach_oid(sequenceOid, sequenceList) + { + MarkSequenceDistributedAndPropagateDependencies(sequenceOid); + } +} + + +/* + * MarkSequenceDistributedAndPropagateDependencies ensures dependencies + * for the given sequence exist on all nodes and marks the sequence + * as distributed. + */ +void +MarkSequenceDistributedAndPropagateDependencies(Oid sequenceOid) +{ + /* get sequence address */ + ObjectAddress sequenceAddress = { 0 }; + ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid); + EnsureDependenciesExistOnAllNodes(&sequenceAddress); + MarkObjectDistributed(&sequenceAddress); +} + + +/* + * EnsureDistributedSequencesHaveOneType first ensures that the type of the column + * in which the sequence is used as default is supported for each sequence in input + * dependentSequenceList, and then alters the sequence type if not the same with the column type. + */ +void +EnsureDistributedSequencesHaveOneType(Oid relationId, List *dependentSequenceList, + List *attnumList) +{ + ListCell *attnumCell = NULL; + ListCell *dependentSequenceCell = NULL; + forboth(attnumCell, attnumList, dependentSequenceCell, dependentSequenceList) + { + AttrNumber attnum = lfirst_int(attnumCell); + Oid sequenceOid = lfirst_oid(dependentSequenceCell); + + /* + * We should make sure that the type of the column that uses + * that sequence is supported + */ + Oid seqTypId = GetAttributeTypeOid(relationId, attnum); + EnsureSequenceTypeSupported(sequenceOid, seqTypId); + + /* + * Alter the sequence's data type in the coordinator if needed. + * A sequence's type is bigint by default and it doesn't change even if + * it's used in an int column. We should change the type if needed, + * and not allow future ALTER SEQUENCE ... TYPE ... commands for + * sequences used as defaults in distributed tables + */ + AlterSequenceType(sequenceOid, seqTypId); + } +} + + /* * GetFKeyCreationCommandsRelationInvolvedWithTableType returns a list of DDL * commands to recreate the foreign keys that relation with relationId is involved diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index e5b8d7e98..a0b0f91ef 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -371,15 +371,22 @@ static DistributeObjectOps Sequence_Alter = { .qualify = NULL, .preprocess = PreprocessAlterSequenceStmt, .postprocess = NULL, - .address = AlterSequenceObjectAddress, + .address = AlterSequenceStmtObjectAddress, }; static DistributeObjectOps Sequence_AlterObjectSchema = { - .deparse = NULL, - .qualify = NULL, + .deparse = DeparseAlterSequenceSchemaStmt, + .qualify = QualifyAlterSequenceSchemaStmt, .preprocess = PreprocessAlterSequenceSchemaStmt, - .postprocess = NULL, + .postprocess = PostprocessAlterSequenceSchemaStmt, .address = AlterSequenceSchemaStmtObjectAddress, }; +static DistributeObjectOps Sequence_AlterOwner = { + .deparse = DeparseAlterSequenceOwnerStmt, + .qualify = QualifyAlterSequenceOwnerStmt, + .preprocess = PreprocessAlterSequenceOwnerStmt, + .postprocess = PostprocessAlterSequenceOwnerStmt, + .address = AlterSequenceOwnerStmtObjectAddress, +}; static DistributeObjectOps Sequence_Drop = { .deparse = DeparseDropSequenceStmt, .qualify = NULL, @@ -787,6 +794,11 @@ GetDistributeObjectOps(Node *node) return &Index_AlterTable; } + case OBJECT_SEQUENCE: + { + return &Sequence_AlterOwner; + } + default: { return &NoDistributeOps; diff --git a/src/backend/distributed/commands/rename.c b/src/backend/distributed/commands/rename.c index 98fee7e6c..db1689ef2 100644 --- a/src/backend/distributed/commands/rename.c +++ b/src/backend/distributed/commands/rename.c @@ -16,6 +16,7 @@ #include "distributed/commands/utility_hook.h" #include "distributed/metadata_cache.h" #include "nodes/parsenodes.h" +#include "utils/lsyscache.h" /* @@ -62,6 +63,15 @@ PreprocessRenameStmt(Node *node, const char *renameCommand, return NIL; } + /* check whether we are dealing with a sequence here */ + if (get_rel_relkind(objectRelationId) == RELKIND_SEQUENCE) + { + RenameStmt *stmtCopy = copyObject(renameStmt); + stmtCopy->renameType = OBJECT_SEQUENCE; + return PreprocessRenameSequenceStmt((Node *) stmtCopy, renameCommand, + processUtilityContext); + } + /* we have no planning to do unless the table is distributed */ switch (renameStmt->renameType) { diff --git a/src/backend/distributed/commands/sequence.c b/src/backend/distributed/commands/sequence.c index d42148b54..94c0867f0 100644 --- a/src/backend/distributed/commands/sequence.c +++ b/src/backend/distributed/commands/sequence.c @@ -24,10 +24,12 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "nodes/parsenodes.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" /* Local functions forward declarations for helper functions */ static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId); -static bool ShouldPropagateAlterSequence(const ObjectAddress *address); +static Oid SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress); /* @@ -284,9 +286,9 @@ PreprocessDropSequenceStmt(Node *node, const char *queryString, } /* - * managing types can only be done on the coordinator if ddl propagation is on. when + * managing sequences can only be done on the coordinator if ddl propagation is on. when * it is off we will never get here. MX workers don't have a notion of distributed - * types, so we block the call. + * sequences, so we block the call. */ EnsureCoordinator(); @@ -303,14 +305,13 @@ PreprocessDropSequenceStmt(Node *node, const char *queryString, */ DropStmt *stmtCopy = copyObject(stmt); stmtCopy->objects = distributedSequencesList; - stmtCopy->missing_ok = true; const char *dropStmtSql = DeparseTreeNode((Node *) stmtCopy); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) dropStmtSql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); } @@ -332,7 +333,7 @@ PreprocessRenameSequenceStmt(Node *node, const char *queryString, ProcessUtility ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, stmt->missing_ok); - if (!ShouldPropagateAlterSequence(&address)) + if (!ShouldPropagateObject(&address)) { return NIL; } @@ -340,16 +341,12 @@ PreprocessRenameSequenceStmt(Node *node, const char *queryString, ProcessUtility EnsureCoordinator(); QualifyTreeNode((Node *) stmt); - /* this takes care of cases where not all workers have synced metadata */ - RenameStmt *stmtCopy = copyObject(stmt); - stmtCopy->missing_ok = true; - - const char *sql = DeparseTreeNode((Node *) stmtCopy); + const char *sql = DeparseTreeNode((Node *) stmt); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); } @@ -372,40 +369,6 @@ RenameSequenceStmtObjectAddress(Node *node, bool missing_ok) } -/* - * ShouldPropagateAlterSequence returns, based on the address of a sequence, if alter - * statements targeting the function should be propagated. - */ -static bool -ShouldPropagateAlterSequence(const ObjectAddress *address) -{ - if (creating_extension) - { - /* - * extensions should be created separately on the workers, sequences cascading - * from an extension should therefore not be propagated. - */ - return false; - } - - if (!EnableDependencyCreation) - { - /* - * we are configured to disable object propagation, should not propagate anything - */ - return false; - } - - if (!IsObjectDistributed(address)) - { - /* do not propagate alter sequence for non-distributed sequences */ - return false; - } - - return true; -} - - /* * PreprocessAlterSequenceStmt gets called during the planning phase of an ALTER SEQUENCE statement * of one of the following forms: @@ -435,21 +398,77 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString, if (IsObjectDistributed(&address)) { ereport(ERROR, (errmsg( - "This operation is currently not allowed for a distributed sequence."))); + "Altering a distributed sequence is currently not supported."))); } - else + + /* + * error out if the sequence is used in a distributed table + * and this is an ALTER SEQUENCE .. AS .. statement + */ + Oid citusTableId = SequenceUsedInDistributedTable(&address); + if (citusTableId != InvalidOid) { - return NIL; + List *options = stmt->options; + DefElem *defel = NULL; + foreach_ptr(defel, options) + { + if (strcmp(defel->defname, "as") == 0) + { + if (IsCitusTableType(citusTableId, CITUS_LOCAL_TABLE)) + { + ereport(ERROR, (errmsg( + "Altering a sequence used in a local table that" + " is added to metadata is currently not supported."))); + } + ereport(ERROR, (errmsg( + "Altering a sequence used in a distributed" + " table is currently not supported."))); + } + } } + + return NIL; } /* - * AlterSequenceOwnerObjectAddress returns the ObjectAddress of the sequence that is the - * subject of the AlterOwnerStmt. + * SequenceUsedInDistributedTable returns true if the argument sequence + * is used as the default value of a column in a distributed table. + * Returns false otherwise + */ +static Oid +SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress) +{ + List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); + Oid citusTableId = InvalidOid; + foreach_oid(citusTableId, citusTableIdList) + { + List *attnumList = NIL; + List *dependentSequenceList = NIL; + GetDependentSequencesWithRelation(citusTableId, &attnumList, + &dependentSequenceList, 0); + Oid currentSeqOid = InvalidOid; + foreach_oid(currentSeqOid, dependentSequenceList) + { + /* + * This sequence is used in a distributed table + */ + if (currentSeqOid == sequenceAddress->objectId) + { + return citusTableId; + } + } + } + return InvalidOid; +} + + +/* + * AlterSequenceStmtObjectAddress returns the ObjectAddress of the sequence that is the + * subject of the AlterSeqStmt. */ ObjectAddress -AlterSequenceObjectAddress(Node *node, bool missing_ok) +AlterSequenceStmtObjectAddress(Node *node, bool missing_ok) { AlterSeqStmt *stmt = castNode(AlterSeqStmt, node); @@ -466,7 +485,7 @@ AlterSequenceObjectAddress(Node *node, bool missing_ok) * PreprocessAlterSequenceSchemaStmt is executed before the statement is applied to the local * postgres instance. * - * For distributed sequences, this operation will not be allowed for now. + * In this stage we can prepare the commands that need to be run on all workers. */ List * PreprocessAlterSequenceSchemaStmt(Node *node, const char *queryString, @@ -477,17 +496,20 @@ PreprocessAlterSequenceSchemaStmt(Node *node, const char *queryString, ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, stmt->missing_ok); - - /* error out if the sequence is distributed */ - if (IsObjectDistributed(&address)) - { - ereport(ERROR, (errmsg( - "This operation is currently not allowed for a distributed sequence."))); - } - else + if (!ShouldPropagateObject(&address)) { return NIL; } + + EnsureCoordinator(); + QualifyTreeNode((Node *) stmt); + + const char *sql = DeparseTreeNode((Node *) stmt); + + List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); } @@ -501,6 +523,108 @@ AlterSequenceSchemaStmtObjectAddress(Node *node, bool missing_ok) AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); Assert(stmt->objectType == OBJECT_SEQUENCE); + RangeVar *sequence = stmt->relation; + Oid seqOid = RangeVarGetRelid(sequence, NoLock, true); + + if (seqOid == InvalidOid) + { + /* + * couldn't find the sequence, might have already been moved to the new schema, we + * construct a new sequence name that uses the new schema to search in. + */ + const char *newSchemaName = stmt->newschema; + Oid newSchemaOid = get_namespace_oid(newSchemaName, true); + seqOid = get_relname_relid(sequence->relname, newSchemaOid); + + if (!missing_ok && seqOid == InvalidOid) + { + /* + * if the sequence is still invalid we couldn't find the sequence, error with the same + * message postgres would error with if missing_ok is false (not ok to miss) + */ + const char *quotedSequenceName = + quote_qualified_identifier(sequence->schemaname, sequence->relname); + + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_TABLE), + errmsg("relation \"%s\" does not exist", + quotedSequenceName))); + } + } + + ObjectAddress sequenceAddress = { 0 }; + ObjectAddressSet(sequenceAddress, RelationRelationId, seqOid); + + return sequenceAddress; +} + + +/* + * PostprocessAlterSequenceSchemaStmt is executed after the change has been applied locally, + * we can now use the new dependencies of the sequence to ensure all its dependencies + * exist on the workers before we apply the commands remotely. + */ +List * +PostprocessAlterSequenceSchemaStmt(Node *node, const char *queryString) +{ + AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); + Assert(stmt->objectType == OBJECT_SEQUENCE); + ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, + stmt->missing_ok); + + if (!ShouldPropagateObject(&address)) + { + return NIL; + } + + /* dependencies have changed (schema) let's ensure they exist */ + EnsureDependenciesExistOnAllNodes(&address); + + return NIL; +} + + +/* + * PreprocessAlterSequenceOwnerStmt is called for change of ownership of sequences before the + * ownership is changed on the local instance. + * + * If the sequence for which the owner is changed is distributed we execute the change on + * all the workers to keep the type in sync across the cluster. + */ +List * +PreprocessAlterSequenceOwnerStmt(Node *node, const char *queryString, + ProcessUtilityContext processUtilityContext) +{ + AlterTableStmt *stmt = castNode(AlterTableStmt, node); + Assert(stmt->relkind == OBJECT_SEQUENCE); + + ObjectAddress sequenceAddress = GetObjectAddressFromParseTree((Node *) stmt, false); + if (!ShouldPropagateObject(&sequenceAddress)) + { + return NIL; + } + + EnsureCoordinator(); + QualifyTreeNode((Node *) stmt); + + const char *sql = DeparseTreeNode((Node *) stmt); + + List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands); +} + + +/* + * AlterSequenceOwnerStmtObjectAddress returns the ObjectAddress of the sequence that is the + * subject of the AlterOwnerStmt. + */ +ObjectAddress +AlterSequenceOwnerStmtObjectAddress(Node *node, bool missing_ok) +{ + AlterTableStmt *stmt = castNode(AlterTableStmt, node); + Assert(stmt->relkind == OBJECT_SEQUENCE); + RangeVar *sequence = stmt->relation; Oid seqOid = RangeVarGetRelid(sequence, NoLock, missing_ok); ObjectAddress sequenceAddress = { 0 }; @@ -508,3 +632,27 @@ AlterSequenceSchemaStmtObjectAddress(Node *node, bool missing_ok) return sequenceAddress; } + + +/* + * PostprocessAlterSequenceOwnerStmt is executed after the change has been applied locally, + * we can now use the new dependencies of the sequence to ensure all its dependencies + * exist on the workers before we apply the commands remotely. + */ +List * +PostprocessAlterSequenceOwnerStmt(Node *node, const char *queryString) +{ + AlterTableStmt *stmt = castNode(AlterTableStmt, node); + Assert(stmt->relkind == OBJECT_SEQUENCE); + + ObjectAddress sequenceAddress = GetObjectAddressFromParseTree((Node *) stmt, false); + if (!ShouldPropagateObject(&sequenceAddress)) + { + return NIL; + } + + /* dependencies have changed (owner) let's ensure they exist */ + EnsureDependenciesExistOnAllNodes(&sequenceAddress); + + return NIL; +} diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index f4958b4bc..0b6b2c767 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -14,6 +14,7 @@ #include "access/htup_details.h" #include "access/xact.h" #include "catalog/index.h" +#include "catalog/pg_attrdef.h" #include "catalog/pg_class.h" #include "catalog/pg_constraint.h" #include "catalog/pg_depend.h" @@ -42,6 +43,7 @@ #include "parser/parse_expr.h" #include "storage/lmgr.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/syscache.h" @@ -92,6 +94,8 @@ static void SetInterShardDDLTaskPlacementList(Task *task, static void SetInterShardDDLTaskRelationShardList(Task *task, ShardInterval *leftShardInterval, ShardInterval *rightShardInterval); +static Oid GetSequenceOid(Oid relationId, AttrNumber attnum); +static Oid get_attrdef_oid(Oid relationId, AttrNumber attnum); /* @@ -463,6 +467,13 @@ PostprocessAlterTableSchemaStmt(Node *node, const char *queryString) */ ObjectAddress tableAddress = GetObjectAddressFromParseTree((Node *) stmt, true); + /* check whether we are dealing with a sequence here */ + if (get_rel_relkind(tableAddress.objectId) == RELKIND_SEQUENCE) + { + stmt->objectType = OBJECT_SEQUENCE; + return PostprocessAlterSequenceSchemaStmt((Node *) stmt, queryString); + } + if (!ShouldPropagate() || !IsCitusTable(tableAddress.objectId)) { return NIL; @@ -503,6 +514,20 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand, return NIL; } + /* + * check whether we are dealing with a sequence here + * if yes, it must be ALTER TABLE .. OWNER TO .. command + * since this is the only ALTER command of a sequence that + * passes through an AlterTableStmt + */ + if (get_rel_relkind(leftRelationId) == RELKIND_SEQUENCE) + { + AlterTableStmt *stmtCopy = copyObject(alterTableStatement); + stmtCopy->relkind = OBJECT_SEQUENCE; + return PreprocessAlterSequenceOwnerStmt((Node *) stmtCopy, alterTableCommand, + processUtilityContext); + } + /* * AlterTableStmt applies also to INDEX relations, and we have support for * SET/SET storage parameters in Citus, so we might have to check for @@ -1385,6 +1410,15 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString, stmt->missing_ok); Oid relationId = address.objectId; + /* check whether we are dealing with a sequence here */ + if (get_rel_relkind(relationId) == RELKIND_SEQUENCE) + { + AlterObjectSchemaStmt *stmtCopy = copyObject(stmt); + stmtCopy->objectType = OBJECT_SEQUENCE; + return PreprocessAlterSequenceSchemaStmt((Node *) stmtCopy, queryString, + processUtilityContext); + } + /* first check whether a distributed relation is affected */ if (!OidIsValid(relationId) || !IsCitusTable(relationId)) { @@ -1547,6 +1581,19 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement) if (relationId != InvalidOid) { + /* + * check whether we are dealing with a sequence here + * if yes, it must be ALTER TABLE .. OWNER TO .. command + * since this is the only ALTER command of a sequence that + * passes through an AlterTableStmt + */ + if (get_rel_relkind(relationId) == RELKIND_SEQUENCE) + { + alterTableStatement->relkind = OBJECT_SEQUENCE; + PostprocessAlterSequenceOwnerStmt((Node *) alterTableStatement, NULL); + return; + } + /* changing a relation could introduce new dependencies */ ObjectAddress tableAddress = { 0 }; ObjectAddressSet(tableAddress, RelationRelationId, relationId); @@ -1629,8 +1676,22 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement) { AttrNumber attnum = get_attnum(relationId, columnDefinition->colname); - Oid seqTypId = GetAttributeTypeOid(relationId, attnum); - EnsureSequenceTypeSupported(relationId, attnum, seqTypId); + Oid seqOid = GetSequenceOid(relationId, attnum); + if (seqOid != InvalidOid) + { + EnsureDistributedSequencesHaveOneType(relationId, + list_make1_oid( + seqOid), + list_make1_int( + attnum)); + + if (ShouldSyncTableMetadata(relationId) && + ClusterHasKnownMetadataWorkers()) + { + MarkSequenceDistributedAndPropagateDependencies( + seqOid); + } + } } } } @@ -1650,8 +1711,19 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement) if (contain_nextval_expression_walker(expr, NULL)) { AttrNumber attnum = get_attnum(relationId, command->name); - Oid seqTypId = GetAttributeTypeOid(relationId, attnum); - EnsureSequenceTypeSupported(relationId, attnum, seqTypId); + Oid seqOid = GetSequenceOid(relationId, attnum); + if (seqOid != InvalidOid) + { + EnsureDistributedSequencesHaveOneType(relationId, + list_make1_oid(seqOid), + list_make1_int(attnum)); + + if (ShouldSyncTableMetadata(relationId) && + ClusterHasKnownMetadataWorkers()) + { + MarkSequenceDistributedAndPropagateDependencies(seqOid); + } + } } } } @@ -1680,6 +1752,100 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement) } +/* + * GetSequenceOid returns the oid of the sequence used as default value + * of the attribute with given attnum of the given table relationId + * If there is no sequence used it returns InvalidOid. + */ +static Oid +GetSequenceOid(Oid relationId, AttrNumber attnum) +{ + /* get attrdefoid from the given relationId and attnum */ + Oid attrdefOid = get_attrdef_oid(relationId, attnum); + + /* retrieve the sequence id of the sequence found in nextval('seq') */ + List *sequencesFromAttrDef = GetSequencesFromAttrDef(attrdefOid); + + if (list_length(sequencesFromAttrDef) == 0) + { + /* + * We need this check because sometimes there are cases where the + * dependency between the table and the sequence is not formed + * One example is when the default is defined by + * DEFAULT nextval('seq_name'::text) (not by DEFAULT nextval('seq_name')) + * In these cases, sequencesFromAttrDef with be empty. + */ + return InvalidOid; + } + + if (list_length(sequencesFromAttrDef) > 1) + { + /* to simplify and eliminate cases like "DEFAULT nextval('..') - nextval('..')" */ + if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) + { + ereport(ERROR, (errmsg( + "More than one sequence in a column default" + " is not supported for adding local tables to metadata"))); + } + ereport(ERROR, (errmsg( + "More than one sequence in a column default" + " is not supported for distribution"))); + } + + return lfirst_oid(list_head(sequencesFromAttrDef)); +} + + +/* + * get_attrdef_oid gets the oid of the attrdef that has dependency with + * the given relationId (refobjid) and attnum (refobjsubid). + * If there is no such attrdef it returns InvalidOid. + * NOTE: we are iterating pg_depend here since this function is used together + * with other functions that iterate pg_depend. Normally, a look at pg_attrdef + * would make more sense. + */ +static Oid +get_attrdef_oid(Oid relationId, AttrNumber attnum) +{ + Oid resultAttrdefOid = InvalidOid; + + ScanKeyData key[3]; + + Relation depRel = table_open(DependRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_depend_refclassid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationRelationId)); + ScanKeyInit(&key[1], + Anum_pg_depend_refobjid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(relationId)); + ScanKeyInit(&key[2], + Anum_pg_depend_refobjsubid, + BTEqualStrategyNumber, F_INT4EQ, + Int32GetDatum(attnum)); + + SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true, + NULL, attnum ? 3 : 2, key); + + HeapTuple tup; + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup); + + if (deprec->classid == AttrDefaultRelationId) + { + resultAttrdefOid = deprec->objid; + } + } + + systable_endscan(scan); + table_close(depRel, AccessShareLock); + return resultAttrdefOid; +} + + void ErrorUnsupportedAlterTableAddColumn(Oid relationId, AlterTableCmd *command, Constraint *constraint) diff --git a/src/backend/distributed/deparser/citus_ruleutils.c b/src/backend/distributed/deparser/citus_ruleutils.c index 28d83918c..7abba98c0 100644 --- a/src/backend/distributed/deparser/citus_ruleutils.c +++ b/src/backend/distributed/deparser/citus_ruleutils.c @@ -27,7 +27,6 @@ #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_am.h" -#include "catalog/pg_attrdef.h" #include "catalog/pg_attribute.h" #include "catalog/pg_authid.h" #include "catalog/pg_class.h" @@ -47,6 +46,7 @@ #include "distributed/metadata_utility.h" #include "distributed/relay_utility.h" #include "distributed/version_compat.h" +#include "distributed/worker_protocol.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "nodes/nodes.h" @@ -77,7 +77,6 @@ static void AppendStorageParametersToString(StringInfo stringBuffer, List *optionList); static void simple_quote_literal(StringInfo buf, const char *val); static char * flatten_reloptions(Oid relid); -static Oid get_attrdef_oid(Oid relationId, AttrNumber attnum); /* @@ -370,16 +369,6 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults, appendStringInfo(&buffer, " DEFAULT %s", defaultString); } } - - /* - * We should make sure that the type of the column that uses - * that sequence is supported - */ - if (contain_nextval_expression_walker(defaultNode, NULL)) - { - EnsureSequenceTypeSupported(tableRelationId, defaultValue->adnum, - attributeForm->atttypid); - } } /* if this column has a not null constraint, append the constraint */ @@ -498,138 +487,6 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults, } -/* - * EnsureSequenceTypeSupported ensures that the type of the column that uses - * a sequence on its DEFAULT is consistent with previous uses of the sequence (if any) - * It gets the AttrDefault OID from the given relationId and attnum, extracts the sequence - * id from it, and if any other distributed table uses that same sequence, it checks whether - * the types of the columns using the sequence match. If they don't, it errors out. - * Otherwise, the condition is ensured. - */ -void -EnsureSequenceTypeSupported(Oid relationId, AttrNumber attnum, Oid seqTypId) -{ - /* get attrdefoid from the given relationId and attnum */ - Oid attrdefOid = get_attrdef_oid(relationId, attnum); - - /* retrieve the sequence id of the sequence found in nextval('seq') */ - List *sequencesFromAttrDef = GetSequencesFromAttrDef(attrdefOid); - - if (list_length(sequencesFromAttrDef) == 0) - { - /* - * We need this check because sometimes there are cases where the - * dependency between the table and the sequence is not formed - * One example is when the default is defined by - * DEFAULT nextval('seq_name'::text) (not by DEFAULT nextval('seq_name')) - * In these cases, sequencesFromAttrDef with be empty. - */ - return; - } - - if (list_length(sequencesFromAttrDef) > 1) - { - /* to simplify and eliminate cases like "DEFAULT nextval('..') - nextval('..')" */ - ereport(ERROR, (errmsg( - "More than one sequence in a column default" - " is not supported for distribution"))); - } - - Oid seqOid = lfirst_oid(list_head(sequencesFromAttrDef)); - - List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); - Oid citusTableId = InvalidOid; - foreach_oid(citusTableId, citusTableIdList) - { - List *attnumList = NIL; - List *dependentSequenceList = NIL; - GetDependentSequencesWithRelation(citusTableId, &attnumList, - &dependentSequenceList, 0); - ListCell *attnumCell = NULL; - ListCell *dependentSequenceCell = NULL; - forboth(attnumCell, attnumList, dependentSequenceCell, - dependentSequenceList) - { - AttrNumber currentAttnum = lfirst_int(attnumCell); - Oid currentSeqOid = lfirst_oid(dependentSequenceCell); - - /* - * If another distributed table is using the same sequence - * in one of its column defaults, make sure the types of the - * columns match - */ - if (currentSeqOid == seqOid) - { - Oid currentSeqTypId = GetAttributeTypeOid(citusTableId, - currentAttnum); - if (seqTypId != currentSeqTypId) - { - char *sequenceName = generate_qualified_relation_name( - seqOid); - char *citusTableName = - generate_qualified_relation_name(citusTableId); - ereport(ERROR, (errmsg( - "The sequence %s is already used for a different" - " type in column %d of the table %s", - sequenceName, currentAttnum, - citusTableName))); - } - } - } - } -} - - -/* - * get_attrdef_oid gets the oid of the attrdef that has dependency with - * the given relationId (refobjid) and attnum (refobjsubid). - * If there is no such attrdef it returns InvalidOid. - * NOTE: we are iterating pg_depend here since this function is used together - * with other functions that iterate pg_depend. Normally, a look at pg_attrdef - * would make more sense. - */ -static Oid -get_attrdef_oid(Oid relationId, AttrNumber attnum) -{ - Oid resultAttrdefOid = InvalidOid; - - ScanKeyData key[3]; - - Relation depRel = table_open(DependRelationId, AccessShareLock); - - ScanKeyInit(&key[0], - Anum_pg_depend_refclassid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(RelationRelationId)); - ScanKeyInit(&key[1], - Anum_pg_depend_refobjid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(relationId)); - ScanKeyInit(&key[2], - Anum_pg_depend_refobjsubid, - BTEqualStrategyNumber, F_INT4EQ, - Int32GetDatum(attnum)); - - SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true, - NULL, attnum ? 3 : 2, key); - - HeapTuple tup; - while (HeapTupleIsValid(tup = systable_getnext(scan))) - { - Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup); - - if (deprec->classid == AttrDefaultRelationId) - { - resultAttrdefOid = deprec->objid; - } - } - - systable_endscan(scan); - table_close(depRel, AccessShareLock); - return resultAttrdefOid; -} - - /* * EnsureRelationKindSupported errors out if the given relation is not supported * as a distributed relation. diff --git a/src/backend/distributed/deparser/deparse_sequence_stmts.c b/src/backend/distributed/deparser/deparse_sequence_stmts.c index ecbcefc20..3fbabf962 100644 --- a/src/backend/distributed/deparser/deparse_sequence_stmts.c +++ b/src/backend/distributed/deparser/deparse_sequence_stmts.c @@ -15,6 +15,7 @@ #include "catalog/namespace.h" #include "distributed/deparser.h" +#include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -23,6 +24,8 @@ static void AppendDropSequenceStmt(StringInfo buf, DropStmt *stmt); static void AppendSequenceNameList(StringInfo buf, List *objects, ObjectType objtype); static void AppendRenameSequenceStmt(StringInfo buf, RenameStmt *stmt); +static void AppendAlterSequenceSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt); +static void AppendAlterSequenceOwnerStmt(StringInfo buf, AlterTableStmt *stmt); /* * DeparseDropSequenceStmt builds and returns a string representing the DropStmt @@ -138,21 +141,121 @@ AppendRenameSequenceStmt(StringInfo buf, RenameStmt *stmt) /* - * QualifyRenameSequenceStmt transforms a - * ALTER SEQUENCE .. RENAME TO .. - * statement in place and makes the sequence name fully qualified. + * DeparseAlterSequenceSchemaStmt builds and returns a string representing the AlterObjectSchemaStmt */ -void -QualifyRenameSequenceStmt(Node *node) +char * +DeparseAlterSequenceSchemaStmt(Node *node) { - RenameStmt *stmt = castNode(RenameStmt, node); - Assert(stmt->renameType == OBJECT_SEQUENCE); + AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); + StringInfoData str = { 0 }; + initStringInfo(&str); + Assert(stmt->objectType == OBJECT_SEQUENCE); + + AppendAlterSequenceSchemaStmt(&str, stmt); + + return str.data; +} + + +/* + * AppendAlterSequenceSchemaStmt appends a string representing the AlterObjectSchemaStmt to a buffer + */ +static void +AppendAlterSequenceSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt) +{ RangeVar *seq = stmt->relation; - if (seq->schemaname == NULL) + char *qualifiedSequenceName = quote_qualified_identifier(seq->schemaname, + seq->relname); + + appendStringInfoString(buf, "ALTER SEQUENCE "); + + if (stmt->missing_ok) { - Oid schemaOid = RangeVarGetCreationNamespace(seq); - seq->schemaname = get_namespace_name(schemaOid); + appendStringInfoString(buf, "IF EXISTS "); + } + + appendStringInfoString(buf, qualifiedSequenceName); + + appendStringInfo(buf, " SET SCHEMA %s;", quote_identifier(stmt->newschema)); +} + + +/* + * DeparseAlterSequenceOwnerStmt builds and returns a string representing the AlterTableStmt + * consisting of changing the owner of a sequence + */ +char * +DeparseAlterSequenceOwnerStmt(Node *node) +{ + AlterTableStmt *stmt = castNode(AlterTableStmt, node); + StringInfoData str = { 0 }; + initStringInfo(&str); + + Assert(stmt->relkind == OBJECT_SEQUENCE); + + AppendAlterSequenceOwnerStmt(&str, stmt); + + return str.data; +} + + +/* + * AppendAlterSequenceOwnerStmt appends a string representing the AlterTableStmt to a buffer + * consisting of changing the owner of a sequence + */ +static void +AppendAlterSequenceOwnerStmt(StringInfo buf, AlterTableStmt *stmt) +{ + Assert(stmt->relkind == OBJECT_SEQUENCE); + RangeVar *seq = stmt->relation; + char *qualifiedSequenceName = quote_qualified_identifier(seq->schemaname, + seq->relname); + appendStringInfoString(buf, "ALTER SEQUENCE "); + + if (stmt->missing_ok) + { + appendStringInfoString(buf, "IF EXISTS "); + } + + appendStringInfoString(buf, qualifiedSequenceName); + + ListCell *cmdCell = NULL; + foreach(cmdCell, stmt->cmds) + { + if (cmdCell != list_head(stmt->cmds)) + { + /* + * normally we shouldn't ever reach this + * because we enter this function after making sure we have only + * one subcommand of the type AT_ChangeOwner + */ + ereport(ERROR, (errmsg("More than one subcommand is not supported " + "for ALTER SEQUENCE"))); + } + + AlterTableCmd *alterTableCmd = castNode(AlterTableCmd, lfirst(cmdCell)); + switch (alterTableCmd->subtype) + { + case AT_ChangeOwner: + { + appendStringInfo(buf, " OWNER TO %s;", get_rolespec_name( + alterTableCmd->newowner)); + break; + } + + default: + { + /* + * normally we shouldn't ever reach this + * because we enter this function after making sure this stmt is of the form + * ALTER SEQUENCE .. OWNER TO .. + */ + ereport(ERROR, (errmsg("unsupported subtype for alter sequence command"), + errdetail("sub command type: %d", + alterTableCmd->subtype))); + } + } } } diff --git a/src/backend/distributed/deparser/qualify_sequence_stmt.c b/src/backend/distributed/deparser/qualify_sequence_stmt.c new file mode 100644 index 000000000..1ad6a9995 --- /dev/null +++ b/src/backend/distributed/deparser/qualify_sequence_stmt.c @@ -0,0 +1,85 @@ +/*------------------------------------------------------------------------- + * + * qualify_sequence_stmt.c + * Functions specialized in fully qualifying all sequence statements. These + * functions are dispatched from qualify.c + * + * Fully qualifying sequence statements consists of adding the schema name + * to the subject of the sequence. + * + * Goal would be that the deparser functions for these statements can + * serialize the statement without any external lookups. + * + * Copyright (c), Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "distributed/deparser.h" +#include "parser/parse_func.h" +#include "utils/lsyscache.h" + + +/* + * QualifyAlterSequenceOwnerStmt transforms a + * ALTER SEQUENCE .. OWNER TO .. + * statement in place and makes the sequence name fully qualified. + */ +void +QualifyAlterSequenceOwnerStmt(Node *node) +{ + AlterTableStmt *stmt = castNode(AlterTableStmt, node); + Assert(stmt->relkind == OBJECT_SEQUENCE); + + RangeVar *seq = stmt->relation; + + if (seq->schemaname == NULL) + { + Oid schemaOid = RangeVarGetCreationNamespace(seq); + seq->schemaname = get_namespace_name(schemaOid); + } +} + + +/* + * QualifyAlterSequenceSchemaStmt transforms a + * ALTER SEQUENCE .. SET SCHEMA .. + * statement in place and makes the sequence name fully qualified. + */ +void +QualifyAlterSequenceSchemaStmt(Node *node) +{ + AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); + Assert(stmt->objectType == OBJECT_SEQUENCE); + + RangeVar *seq = stmt->relation; + + if (seq->schemaname == NULL) + { + Oid schemaOid = RangeVarGetCreationNamespace(seq); + seq->schemaname = get_namespace_name(schemaOid); + } +} + + +/* + * QualifyRenameSequenceStmt transforms a + * ALTER SEQUENCE .. RENAME TO .. + * statement in place and makes the sequence name fully qualified. + */ +void +QualifyRenameSequenceStmt(Node *node) +{ + RenameStmt *stmt = castNode(RenameStmt, node); + Assert(stmt->renameType == OBJECT_SEQUENCE); + + RangeVar *seq = stmt->relation; + + if (seq->schemaname == NULL) + { + Oid schemaOid = RangeVarGetCreationNamespace(seq); + seq->schemaname = get_namespace_name(schemaOid); + } +} diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 80415bbf0..c54a0bc51 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -31,7 +31,6 @@ #include "catalog/pg_namespace.h" #include "catalog/pg_type.h" #include "commands/async.h" -#include "commands/sequence.h" #include "distributed/citus_ruleutils.h" #include "distributed/commands.h" #include "distributed/deparser.h" @@ -48,13 +47,11 @@ #include "distributed/pg_dist_node.h" #include "distributed/remote_commands.h" #include "distributed/worker_manager.h" -#include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" #include "distributed/version_compat.h" #include "executor/spi.h" #include "foreign/foreign.h" #include "miscadmin.h" -#include "nodes/makefuncs.h" #include "nodes/pg_list.h" #include "pgstat.h" #include "postmaster/bgworker.h" @@ -390,6 +387,15 @@ MetadataCreateCommands(void) ObjectAddressSet(tableAddress, RelationRelationId, relationId); EnsureDependenciesExistOnAllNodes(&tableAddress); + /* + * Ensure sequence dependencies and mark them as distributed + */ + List *attnumList = NIL; + List *dependentSequenceList = NIL; + GetDependentSequencesWithRelation(relationId, &attnumList, + &dependentSequenceList, 0); + MarkSequenceListDistributedAndPropagateDependencies(dependentSequenceList); + List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId); metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, workerSequenceDDLCommands); @@ -1056,51 +1062,18 @@ SequenceDDLCommandsForTable(Oid relationId) char *ownerName = TableOwner(relationId); - ListCell *attnumCell = NULL; - ListCell *dependentSequenceCell = NULL; - forboth(attnumCell, attnumList, dependentSequenceCell, dependentSequenceList) + Oid sequenceOid = InvalidOid; + foreach_oid(sequenceOid, dependentSequenceList) { - AttrNumber attnum = lfirst_int(attnumCell); - Oid sequenceOid = lfirst_oid(dependentSequenceCell); - char *sequenceDef = pg_get_sequencedef_string(sequenceOid); char *escapedSequenceDef = quote_literal_cstr(sequenceDef); StringInfo wrappedSequenceDef = makeStringInfo(); StringInfo sequenceGrantStmt = makeStringInfo(); char *sequenceName = generate_qualified_relation_name(sequenceOid); Form_pg_sequence sequenceData = pg_get_sequencedef(sequenceOid); - Oid sequenceTypeOid = GetAttributeTypeOid(relationId, attnum); + Oid sequenceTypeOid = sequenceData->seqtypid; char *typeName = format_type_be(sequenceTypeOid); - /* get sequence address */ - ObjectAddress sequenceAddress = { 0 }; - ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid); - EnsureDependenciesExistOnAllNodes(&sequenceAddress); - - /* - * Alter the sequence's data type in the coordinator if needed. - * A sequence's type is bigint by default and it doesn't change even if - * it's used in an int column. However, when distributing the sequence, - * we don't allow incompatible min/max ranges between the coordinator and - * workers, so we determine the sequence type here based on its current usage - * and propagate that same type to the workers as well. - * TODO: move this command to the part where the sequence is - * used in a distributed table: both in create_distributed_table - * and ALTER TABLE commands that include a sequence default - */ - Oid currentSequenceTypeOid = sequenceData->seqtypid; - if (currentSequenceTypeOid != sequenceTypeOid) - { - AlterSeqStmt *alterSequenceStatement = makeNode(AlterSeqStmt); - char *seqNamespace = get_namespace_name(get_rel_namespace(sequenceOid)); - char *seqName = get_rel_name(sequenceOid); - alterSequenceStatement->sequence = makeRangeVar(seqNamespace, seqName, -1); - Node *asTypeNode = (Node *) makeTypeNameFromOid(sequenceTypeOid, -1); - SetDefElemArg(alterSequenceStatement, "as", asTypeNode); - ParseState *pstate = make_parsestate(NULL); - AlterSequence(pstate, alterSequenceStatement); - } - /* create schema if needed */ appendStringInfo(wrappedSequenceDef, WORKER_APPLY_SEQUENCE_COMMAND, @@ -1113,8 +1086,6 @@ SequenceDDLCommandsForTable(Oid relationId) sequenceDDLList = lappend(sequenceDDLList, wrappedSequenceDef->data); sequenceDDLList = lappend(sequenceDDLList, sequenceGrantStmt->data); - - MarkObjectDistributed(&sequenceAddress); } return sequenceDDLList; @@ -1232,6 +1203,12 @@ GetDependentSequencesWithRelation(Oid relationId, List **attnumList, /* to simplify and eliminate cases like "DEFAULT nextval('..') - nextval('..')" */ if (list_length(sequencesFromAttrDef) > 1) { + if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) + { + ereport(ERROR, (errmsg( + "More than one sequence in a column default" + " is not supported for adding local tables to metadata"))); + } ereport(ERROR, (errmsg("More than one sequence in a column default" " is not supported for distribution"))); } diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index 04529e287..27cfb0a4b 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -42,7 +42,6 @@ extern char * pg_get_indexclusterdef_string(Oid indexRelationId); extern bool contain_nextval_expression_walker(Node *node, void *context); extern char * pg_get_replica_identity_command(Oid tableRelationId); extern const char * RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier); -extern void EnsureSequenceTypeSupported(Oid relationId, AttrNumber attnum, Oid seqTypId); /* Function declarations for version dependent PostgreSQL ruleutils functions */ extern void pg_get_query_def(Query *query, StringInfo buffer); diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 1965e7725..178014669 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -345,18 +345,23 @@ extern List * PreprocessAlterSchemaRenameStmt(Node *node, const char *queryStrin extern ObjectAddress AlterSchemaRenameStmtObjectAddress(Node *node, bool missing_ok); /* sequence.c - forward declarations */ -extern List * PreprocessAlterSequenceStmt(Node *stmt, const char *queryString, +extern List * PreprocessAlterSequenceStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); extern List * PreprocessAlterSequenceSchemaStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); -extern List * PreprocessDropSequenceStmt(Node *stmt, const char *queryString, +extern List * PostprocessAlterSequenceSchemaStmt(Node *node, const char *queryString); +extern List * PreprocessAlterSequenceOwnerStmt(Node *node, const char *queryString, + ProcessUtilityContext processUtilityContext); +extern List * PostprocessAlterSequenceOwnerStmt(Node *node, const char *queryString); +extern List * PreprocessDropSequenceStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); -extern List * PreprocessRenameSequenceStmt(Node *stmt, const char *queryString, +extern List * PreprocessRenameSequenceStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); -extern ObjectAddress AlterSequenceObjectAddress(Node *stmt, bool missing_ok); -extern ObjectAddress AlterSequenceSchemaStmtObjectAddress(Node *stmt, bool missing_ok); -extern ObjectAddress RenameSequenceStmtObjectAddress(Node *stmt, bool missing_ok); +extern ObjectAddress AlterSequenceStmtObjectAddress(Node *node, bool missing_ok); +extern ObjectAddress AlterSequenceSchemaStmtObjectAddress(Node *node, bool missing_ok); +extern ObjectAddress AlterSequenceOwnerStmtObjectAddress(Node *node, bool missing_ok); +extern ObjectAddress RenameSequenceStmtObjectAddress(Node *node, bool missing_ok); extern void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt); extern void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt); diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index b5dcf1905..8f9e43311 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -132,8 +132,14 @@ extern char * DeparseAlterExtensionStmt(Node *stmt); extern char * DeparseAlterDatabaseOwnerStmt(Node *node); /* forward declarations for deparse_sequence_stmts.c */ -extern char * DeparseDropSequenceStmt(Node *stmt); -extern char * DeparseRenameSequenceStmt(Node *stmt); -extern void QualifyRenameSequenceStmt(Node *stmt); +extern char * DeparseDropSequenceStmt(Node *node); +extern char * DeparseRenameSequenceStmt(Node *node); +extern char * DeparseAlterSequenceSchemaStmt(Node *node); +extern char * DeparseAlterSequenceOwnerStmt(Node *node); + +/* forward declarations for qualify_sequence_stmt.c */ +extern void QualifyRenameSequenceStmt(Node *node); +extern void QualifyAlterSequenceSchemaStmt(Node *node); +extern void QualifyAlterSequenceOwnerStmt(Node *node); #endif /* CITUS_DEPARSER_H */ diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 31a42d476..306d414f9 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -290,4 +290,11 @@ extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection, uint64 *availableBytes, uint64 *totalBytes); extern void ExecuteQueryViaSPI(char *query, int SPIOK); +extern void EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId); +extern void AlterSequenceType(Oid seqOid, Oid typeOid); +extern void MarkSequenceListDistributedAndPropagateDependencies(List *sequenceList); +extern void MarkSequenceDistributedAndPropagateDependencies(Oid sequenceOid); +extern void EnsureDistributedSequencesHaveOneType(Oid relationId, + List *dependentSequenceList, + List *attnumList); #endif /* METADATA_UTILITY_H */ diff --git a/src/test/regress/expected/multi_sequence_default.out b/src/test/regress/expected/multi_sequence_default.out index 378f003b4..915c1bbba 100644 --- a/src/test/regress/expected/multi_sequence_default.out +++ b/src/test/regress/expected/multi_sequence_default.out @@ -8,24 +8,69 @@ SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; CREATE SCHEMA sequence_default; SET search_path = sequence_default, public; +-- test both distributed and citus local tables +SELECT 1 FROM citus_add_node('localhost', :master_port, groupId => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + -- Cannot add a column involving DEFAULT nextval('..') because the table is not empty CREATE SEQUENCE seq_0; +CREATE SEQUENCE seq_0_local_table; +-- check sequence type & other things +\d seq_0 + Sequence "sequence_default.seq_0" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------------------------------------------------------------------- + bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1 + +\d seq_0_local_table + Sequence "sequence_default.seq_0_local_table" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------------------------------------------------------------------- + bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1 + +-- we can change the type of the sequence before using it in distributed tables +ALTER SEQUENCE seq_0 AS smallint; +\d seq_0 + Sequence "sequence_default.seq_0" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------------------------------------------------------------------- + smallint | 1 | 1 | 32767 | 1 | no | 1 + CREATE TABLE seq_test_0 (x int, y int); +CREATE TABLE seq_test_0_local_table (x int, y int); SELECT create_distributed_table('seq_test_0','x'); create_distributed_table --------------------------------------------------------------------- (1 row) +SELECT citus_add_local_table_to_metadata('seq_test_0_local_table'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + INSERT INTO seq_test_0 SELECT 1, s FROM generate_series(1, 50) s; +INSERT INTO seq_test_0_local_table SELECT 1, s FROM generate_series(1, 50) s; ALTER TABLE seq_test_0 ADD COLUMN z int DEFAULT nextval('seq_0'); ERROR: cannot add a column involving DEFAULT nextval('..') because the table is not empty HINT: You can first call ALTER TABLE .. ADD COLUMN .. smallint/int/bigint Then set the default by ALTER TABLE .. ALTER COLUMN .. SET DEFAULT nextval('..') +ALTER TABLE seq_test_0_local_table ADD COLUMN z int DEFAULT nextval('seq_0_local_table'); +ERROR: cannot add a column involving DEFAULT nextval('..') because the table is not empty +HINT: You can first call ALTER TABLE .. ADD COLUMN .. smallint/int/bigint +Then set the default by ALTER TABLE .. ALTER COLUMN .. SET DEFAULT nextval('..') ALTER TABLE seq_test_0 ADD COLUMN z serial; ERROR: Cannot add a column involving serial pseudotypes because the table is not empty HINT: You can first call ALTER TABLE .. ADD COLUMN .. smallint/int/bigint Then set the default by ALTER TABLE .. ALTER COLUMN .. SET DEFAULT nextval('..') +ALTER TABLE seq_test_0_local_table ADD COLUMN z serial; +ERROR: Cannot add a column involving serial pseudotypes because the table is not empty +HINT: You can first call ALTER TABLE .. ADD COLUMN .. smallint/int/bigint +Then set the default by ALTER TABLE .. ALTER COLUMN .. SET DEFAULT nextval('..') -- follow hint ALTER TABLE seq_test_0 ADD COLUMN z int; ALTER TABLE seq_test_0 ALTER COLUMN z SET DEFAULT nextval('seq_0'); @@ -47,6 +92,63 @@ SELECT * FROM seq_test_0 ORDER BY 1, 2 LIMIT 5; y | integer | | | z | integer | | | nextval('seq_0'::regclass) +ALTER TABLE seq_test_0_local_table ADD COLUMN z int; +ALTER TABLE seq_test_0_local_table ALTER COLUMN z SET DEFAULT nextval('seq_0_local_table'); +SELECT * FROM seq_test_0_local_table ORDER BY 1, 2 LIMIT 5; + x | y | z +--------------------------------------------------------------------- + 1 | 1 | + 1 | 2 | + 1 | 3 | + 1 | 4 | + 1 | 5 | +(5 rows) + +\d seq_test_0_local_table + Table "sequence_default.seq_test_0_local_table" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + x | integer | | | + y | integer | | | + z | integer | | | nextval('seq_0_local_table'::regclass) + +-- check sequence type -> since it was used in a distributed table +-- type has changed to the type of the column it was used +-- in this case column z is of type int +\d seq_0 + Sequence "sequence_default.seq_0" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------------------------------------------------------------------- + integer | 1 | 1 | 2147483647 | 1 | no | 1 + +\d seq_0_local_table + Sequence "sequence_default.seq_0_local_table" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------------------------------------------------------------------- + integer | 1 | 1 | 2147483647 | 1 | no | 1 + +-- cannot change the type of a sequence used in a distributed table +-- even if metadata is not synced to workers +ALTER SEQUENCE seq_0 AS bigint; +ERROR: Altering a sequence used in a distributed table is currently not supported. +ALTER SEQUENCE seq_0_local_table AS bigint; +ERROR: Altering a sequence used in a local table that is added to metadata is currently not supported. +-- we can change other things like increment +-- if metadata is not synced to workers +ALTER SEQUENCE seq_0 INCREMENT BY 2; +ALTER SEQUENCE seq_0_local_table INCREMENT BY 2; +\d seq_0 + Sequence "sequence_default.seq_0" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------------------------------------------------------------------- + integer | 1 | 1 | 2147483647 | 2 | no | 1 + +\d seq_0_local_table + Sequence "sequence_default.seq_0_local_table" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------------------------------------------------------------------- + integer | 1 | 1 | 2147483647 | 2 | no | 1 + -- check that we can add serial pseudo-type columns -- when metadata is not yet synced to workers TRUNCATE seq_test_0; @@ -56,11 +158,22 @@ ALTER TABLE seq_test_0 ADD COLUMN w10 serial; ALTER TABLE seq_test_0 ADD COLUMN w11 serial4; ALTER TABLE seq_test_0 ADD COLUMN w20 bigserial; ALTER TABLE seq_test_0 ADD COLUMN w21 serial8; +TRUNCATE seq_test_0_local_table; +ALTER TABLE seq_test_0_local_table ADD COLUMN w00 smallserial; +ALTER TABLE seq_test_0_local_table ADD COLUMN w01 serial2; +ALTER TABLE seq_test_0_local_table ADD COLUMN w10 serial; +ALTER TABLE seq_test_0_local_table ADD COLUMN w11 serial4; +ALTER TABLE seq_test_0_local_table ADD COLUMN w20 bigserial; +ALTER TABLE seq_test_0_local_table ADD COLUMN w21 serial8; -- check alter column type precaution ALTER TABLE seq_test_0 ALTER COLUMN z TYPE bigint; ERROR: cannot execute ALTER COLUMN TYPE .. command because the column involves a default coming from a sequence ALTER TABLE seq_test_0 ALTER COLUMN z TYPE smallint; ERROR: cannot execute ALTER COLUMN TYPE .. command because the column involves a default coming from a sequence +ALTER TABLE seq_test_0_local_table ALTER COLUMN z TYPE bigint; +ERROR: cannot execute ALTER COLUMN TYPE .. command because the column involves a default coming from a sequence +ALTER TABLE seq_test_0_local_table ALTER COLUMN z TYPE smallint; +ERROR: cannot execute ALTER COLUMN TYPE .. command because the column involves a default coming from a sequence -- MX tests -- check that there's not problem with group ID cache CREATE TABLE seq_test_4 (x int, y int); @@ -91,6 +204,19 @@ INSERT INTO sequence_default.seq_test_4 VALUES (1,2) RETURNING *; 1 | 2 | | 268435457 (1 row) +-- check that we have can properly insert to tables from before metadata sync +INSERT INTO sequence_default.seq_test_0 VALUES (1,2) RETURNING *; + x | y | z | w00 | w01 | w10 | w11 | w20 | w21 +--------------------------------------------------------------------- + 1 | 2 | 268435457 | 4097 | 4097 | 268435457 | 268435457 | 281474976710657 | 281474976710657 +(1 row) + +INSERT INTO sequence_default.seq_test_0_local_table VALUES (1,2) RETURNING *; + x | y | z | w00 | w01 | w10 | w11 | w20 | w21 +--------------------------------------------------------------------- + 1 | 2 | 268435457 | 4097 | 4097 | 268435457 | 268435457 | 281474976710657 | 281474976710657 +(1 row) + \c - - - :master_port SET citus.shard_replication_factor TO 1; SET search_path = sequence_default, public; @@ -100,7 +226,7 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port); (1 row) --- check sequence type consistency in all nodes +-- check sequence type consistency in all nodes for distributed tables CREATE SEQUENCE seq_1; -- type is bigint by default \d seq_1 @@ -141,10 +267,53 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port); (1 row) +-- check sequence type consistency in all nodes for citus local tables +CREATE SEQUENCE seq_1_local_table; +-- type is bigint by default +\d seq_1_local_table + Sequence "sequence_default.seq_1_local_table" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------------------------------------------------------------------- + bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1 + +CREATE TABLE seq_test_1_local_table (x int, y int); +SELECT citus_add_local_table_to_metadata('seq_test_1_local_table'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +ALTER TABLE seq_test_1_local_table ADD COLUMN z int DEFAULT nextval('seq_1_local_table'); +-- type is changed to int +\d seq_1_local_table + Sequence "sequence_default.seq_1_local_table" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------------------------------------------------------------------- + integer | 1 | 1 | 2147483647 | 1 | no | 1 + +-- check insertion is within int bounds in the worker +\c - - - :worker_1_port +INSERT INTO sequence_default.seq_test_1_local_table values (1, 2) RETURNING *; + x | y | z +--------------------------------------------------------------------- + 1 | 2 | 268435457 +(1 row) + +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + -- check that we cannot add serial pseudo-type columns -- when metadata is synced to workers ALTER TABLE seq_test_1 ADD COLUMN w bigserial; ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes when metadata is synchronized to workers +ALTER TABLE seq_test_1_local_table ADD COLUMN w bigserial; +ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes when metadata is synchronized to workers -- check for sequence type clashes CREATE SEQUENCE seq_2; CREATE TABLE seq_test_2 (x int, y bigint DEFAULT nextval('seq_2')); @@ -168,6 +337,8 @@ CREATE TABLE seq_test_2_0(x int, y smallint DEFAULT nextval('seq_2')); -- shouldn't work SELECT create_distributed_table('seq_test_2_0','x'); ERROR: The sequence sequence_default.seq_2 is already used for a different type in column 2 of the table sequence_default.seq_test_2 +SELECT citus_add_local_table_to_metadata('seq_test_2_0'); +ERROR: The sequence sequence_default.seq_2 is already used for a different type in column 2 of the table sequence_default.seq_test_2 DROP TABLE seq_test_2; DROP TABLE seq_test_2_0; -- should work @@ -183,6 +354,8 @@ CREATE TABLE seq_test_2 (x int, y int DEFAULT nextval('seq_2'), z bigint DEFAULT -- shouldn't work SELECT create_distributed_table('seq_test_2','x'); ERROR: The sequence sequence_default.seq_2 is already used for a different type in column 3 of the table sequence_default.seq_test_2 +SELECT citus_add_local_table_to_metadata('seq_test_2'); +ERROR: The sequence sequence_default.seq_2 is already used for a different type in column 3 of the table sequence_default.seq_test_2 -- check rename is propagated properly ALTER SEQUENCE seq_2 RENAME TO sequence_2; -- check in the worker @@ -202,6 +375,25 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port); (1 row) +-- check rename is propagated properly when we use ALTER TABLE +ALTER TABLE sequence_2 RENAME TO seq_2; +-- check in the worker +\c - - - :worker_1_port +\d sequence_default.seq_2 + Sequence "sequence_default.seq_2" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------------------------------------------------------------------- + bigint | 281474976710657 | 281474976710657 | 562949953421313 | 1 | no | 1 + +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + -- check rename with another schema -- we notice that schema is also propagated as one of the sequence's dependencies CREATE SCHEMA sequence_default_0; @@ -354,7 +546,7 @@ CREATE SCHEMA sequence_default_8; -- can change schema in a sequence not yet distributed ALTER SEQUENCE seq_8 SET SCHEMA sequence_default_8; ALTER SEQUENCE sequence_default_8.seq_8 SET SCHEMA sequence_default; -CREATE TABLE seq_test_8 (x int, y int DEFAULT nextval('seq_8')); +CREATE TABLE seq_test_8 (x int, y int DEFAULT nextval('seq_8'), z bigserial); SELECT create_distributed_table('seq_test_8', 'x'); create_distributed_table --------------------------------------------------------------------- @@ -363,29 +555,179 @@ SELECT create_distributed_table('seq_test_8', 'x'); -- cannot change sequence specifications ALTER SEQUENCE seq_8 AS bigint; -ERROR: This operation is currently not allowed for a distributed sequence. +ERROR: Altering a distributed sequence is currently not supported. ALTER SEQUENCE seq_8 INCREMENT BY 2; -ERROR: This operation is currently not allowed for a distributed sequence. +ERROR: Altering a distributed sequence is currently not supported. ALTER SEQUENCE seq_8 MINVALUE 5 MAXVALUE 5000; -ERROR: This operation is currently not allowed for a distributed sequence. +ERROR: Altering a distributed sequence is currently not supported. ALTER SEQUENCE seq_8 START WITH 6; -ERROR: This operation is currently not allowed for a distributed sequence. +ERROR: Altering a distributed sequence is currently not supported. ALTER SEQUENCE seq_8 RESTART WITH 6; -ERROR: This operation is currently not allowed for a distributed sequence. +ERROR: Altering a distributed sequence is currently not supported. ALTER SEQUENCE seq_8 NO CYCLE; -ERROR: This operation is currently not allowed for a distributed sequence. +ERROR: Altering a distributed sequence is currently not supported. ALTER SEQUENCE seq_8 OWNED BY seq_test_7; -ERROR: This operation is currently not allowed for a distributed sequence. --- cannot change schema in a distributed sequence +ERROR: Altering a distributed sequence is currently not supported. +ALTER SEQUENCE seq_test_8_z_seq AS smallint; +ERROR: Altering a distributed sequence is currently not supported. +ALTER SEQUENCE seq_test_8_z_seq INCREMENT BY 2; +ERROR: Altering a distributed sequence is currently not supported. +ALTER SEQUENCE seq_test_8_z_seq MINVALUE 5 MAXVALUE 5000; +ERROR: Altering a distributed sequence is currently not supported. +ALTER SEQUENCE seq_test_8_z_seq START WITH 6; +ERROR: Altering a distributed sequence is currently not supported. +ALTER SEQUENCE seq_test_8_z_seq RESTART WITH 6; +ERROR: Altering a distributed sequence is currently not supported. +ALTER SEQUENCE seq_test_8_z_seq NO CYCLE; +ERROR: Altering a distributed sequence is currently not supported. +ALTER SEQUENCE seq_test_8_z_seq OWNED BY seq_test_7; +ERROR: cannot alter OWNED BY option of a sequence already owned by a distributed table +-- can change schema in a distributed sequence +-- sequence_default_8 will be created in workers as part of dependencies ALTER SEQUENCE seq_8 SET SCHEMA sequence_default_8; -ERROR: This operation is currently not allowed for a distributed sequence. +\c - - - :worker_1_port +\d sequence_default_8.seq_8 + Sequence "sequence_default_8.seq_8" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------------------------------------------------------------------- + bigint | 268435457 | 268435457 | 536870913 | 1 | no | 1 + +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +-- check we can change the schema when we use ALTER TABLE +ALTER TABLE sequence_default_8.seq_8 SET SCHEMA sequence_default; +\c - - - :worker_1_port +\d sequence_default.seq_8 + Sequence "sequence_default.seq_8" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache +--------------------------------------------------------------------- + bigint | 268435457 | 268435457 | 536870913 | 1 | no | 1 + +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + DROP SCHEMA sequence_default_8; +SELECT run_command_on_workers('DROP SCHEMA IF EXISTS sequence_default_8 CASCADE'); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"DROP SCHEMA") + (localhost,57638,t,"DROP SCHEMA") +(2 rows) + -- cannot use more than one sequence in a column default CREATE SEQUENCE seq_9; CREATE SEQUENCE seq_10; CREATE TABLE seq_test_9 (x int, y int DEFAULT nextval('seq_9') - nextval('seq_10')); SELECT create_distributed_table('seq_test_9', 'x'); ERROR: More than one sequence in a column default is not supported for distribution +SELECT citus_add_local_table_to_metadata('seq_test_9'); +ERROR: More than one sequence in a column default is not supported for adding local tables to metadata +ALTER TABLE seq_test_9 ALTER COLUMN y SET DEFAULT nextval('seq_9'); +SELECT create_distributed_table('seq_test_9', 'x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- we can change the owner role of a sequence +CREATE ROLE seq_role_0; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +CREATE ROLE seq_role_1; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +ALTER SEQUENCE seq_10 OWNER TO seq_role_0; +SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2; + sequencename | sequenceowner +--------------------------------------------------------------------- + seq_10 | seq_role_0 +(1 row) + +SELECT run_command_on_workers('CREATE ROLE seq_role_0'); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"CREATE ROLE") + (localhost,57638,t,"CREATE ROLE") +(2 rows) + +SELECT run_command_on_workers('CREATE ROLE seq_role_1'); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"CREATE ROLE") + (localhost,57638,t,"CREATE ROLE") +(2 rows) + +ALTER TABLE seq_test_9 ALTER COLUMN y SET DEFAULT nextval('seq_10'); +ALTER SEQUENCE seq_10 OWNER TO seq_role_1; +SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2; + sequencename | sequenceowner +--------------------------------------------------------------------- + seq_10 | seq_role_1 +(1 row) + +\c - - - :worker_1_port +SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2; + sequencename | sequenceowner +--------------------------------------------------------------------- + seq_10 | seq_role_1 +(1 row) + +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +-- check we can change the owner role of a sequence when we use ALTER TABLE +ALTER TABLE seq_10 OWNER TO seq_role_0; +SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2; + sequencename | sequenceowner +--------------------------------------------------------------------- + seq_10 | seq_role_0 +(1 row) + +\c - - - :worker_1_port +SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2; + sequencename | sequenceowner +--------------------------------------------------------------------- + seq_10 | seq_role_0 +(1 row) + +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +DROP SEQUENCE seq_10 CASCADE; +NOTICE: drop cascades to default value for column y of table seq_test_9 +DROP ROLE seq_role_0, seq_role_1; +SELECT run_command_on_workers('DROP ROLE IF EXISTS seq_role_0, seq_role_1'); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"DROP ROLE") + (localhost,57638,t,"DROP ROLE") +(2 rows) + -- Check some cases when default is defined by -- DEFAULT nextval('seq_name'::text) (not by DEFAULT nextval('seq_name')) SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); @@ -426,14 +768,20 @@ ERROR: relation "seq_11" does not exist -- clean up DROP TABLE sequence_default.seq_test_7_par; DROP SCHEMA sequence_default CASCADE; -NOTICE: drop cascades to 23 other objects +NOTICE: drop cascades to 29 other objects DETAIL: drop cascades to sequence sequence_default.seq_0 +drop cascades to sequence sequence_default.seq_0_local_table drop cascades to table sequence_default.seq_test_0 +drop cascades to table sequence_default.seq_test_0_local_table_890004 +drop cascades to table sequence_default.seq_test_0_local_table drop cascades to table sequence_default.seq_test_4 drop cascades to sequence sequence_default.seq_4 drop cascades to sequence sequence_default.seq_1 drop cascades to table sequence_default.seq_test_1 -drop cascades to sequence sequence_default.sequence_2 +drop cascades to sequence sequence_default.seq_1_local_table +drop cascades to table sequence_default.seq_test_1_local_table_102016 +drop cascades to table sequence_default.seq_test_1_local_table +drop cascades to sequence sequence_default.seq_2 drop cascades to table sequence_default.seq_test_2 drop cascades to table sequence_default.seq_test_3 drop cascades to table sequence_default.seq_test_5 @@ -445,10 +793,10 @@ drop cascades to sequence sequence_default.seq_7_par drop cascades to sequence sequence_default.seq_8 drop cascades to table sequence_default.seq_test_8 drop cascades to sequence sequence_default.seq_9 -drop cascades to sequence sequence_default.seq_10 drop cascades to table sequence_default.seq_test_9 drop cascades to sequence sequence_default.seq_11 drop cascades to table sequence_default.seq_test_10 +drop cascades to table sequence_default.seq_test_10_102060 drop cascades to table sequence_default.seq_test_11 SELECT run_command_on_workers('DROP SCHEMA IF EXISTS sequence_default CASCADE'); run_command_on_workers @@ -463,4 +811,10 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); (1 row) +SELECT master_remove_node('localhost', :master_port); + master_remove_node +--------------------------------------------------------------------- + +(1 row) + SET search_path TO public; diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index e645c60bb..f1d23a06d 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -116,9 +116,7 @@ HINT: Use a sequence in a distributed table by specifying a serial column type CREATE SEQUENCE standalone_sequence; ALTER SEQUENCE standalone_sequence OWNED BY testserialtable.group_id; -- an edge case, but it's OK to change an owner to the same distributed table --- EDIT: this doesn't work for now for a distributed sequence ALTER SEQUENCE testserialtable_id_seq OWNED BY testserialtable.id; -ERROR: This operation is currently not allowed for a distributed sequence. -- drop distributed table \c - - - :master_port DROP TABLE testserialtable; diff --git a/src/test/regress/sql/multi_sequence_default.sql b/src/test/regress/sql/multi_sequence_default.sql index a986e5ec8..616edd4ab 100644 --- a/src/test/regress/sql/multi_sequence_default.sql +++ b/src/test/regress/sql/multi_sequence_default.sql @@ -11,18 +11,51 @@ CREATE SCHEMA sequence_default; SET search_path = sequence_default, public; +-- test both distributed and citus local tables +SELECT 1 FROM citus_add_node('localhost', :master_port, groupId => 0); -- Cannot add a column involving DEFAULT nextval('..') because the table is not empty CREATE SEQUENCE seq_0; +CREATE SEQUENCE seq_0_local_table; +-- check sequence type & other things +\d seq_0 +\d seq_0_local_table +-- we can change the type of the sequence before using it in distributed tables +ALTER SEQUENCE seq_0 AS smallint; +\d seq_0 CREATE TABLE seq_test_0 (x int, y int); +CREATE TABLE seq_test_0_local_table (x int, y int); SELECT create_distributed_table('seq_test_0','x'); +SELECT citus_add_local_table_to_metadata('seq_test_0_local_table'); INSERT INTO seq_test_0 SELECT 1, s FROM generate_series(1, 50) s; +INSERT INTO seq_test_0_local_table SELECT 1, s FROM generate_series(1, 50) s; ALTER TABLE seq_test_0 ADD COLUMN z int DEFAULT nextval('seq_0'); +ALTER TABLE seq_test_0_local_table ADD COLUMN z int DEFAULT nextval('seq_0_local_table'); ALTER TABLE seq_test_0 ADD COLUMN z serial; +ALTER TABLE seq_test_0_local_table ADD COLUMN z serial; -- follow hint ALTER TABLE seq_test_0 ADD COLUMN z int; ALTER TABLE seq_test_0 ALTER COLUMN z SET DEFAULT nextval('seq_0'); SELECT * FROM seq_test_0 ORDER BY 1, 2 LIMIT 5; \d seq_test_0 +ALTER TABLE seq_test_0_local_table ADD COLUMN z int; +ALTER TABLE seq_test_0_local_table ALTER COLUMN z SET DEFAULT nextval('seq_0_local_table'); +SELECT * FROM seq_test_0_local_table ORDER BY 1, 2 LIMIT 5; +\d seq_test_0_local_table +-- check sequence type -> since it was used in a distributed table +-- type has changed to the type of the column it was used +-- in this case column z is of type int +\d seq_0 +\d seq_0_local_table +-- cannot change the type of a sequence used in a distributed table +-- even if metadata is not synced to workers +ALTER SEQUENCE seq_0 AS bigint; +ALTER SEQUENCE seq_0_local_table AS bigint; +-- we can change other things like increment +-- if metadata is not synced to workers +ALTER SEQUENCE seq_0 INCREMENT BY 2; +ALTER SEQUENCE seq_0_local_table INCREMENT BY 2; +\d seq_0 +\d seq_0_local_table -- check that we can add serial pseudo-type columns @@ -35,10 +68,21 @@ ALTER TABLE seq_test_0 ADD COLUMN w11 serial4; ALTER TABLE seq_test_0 ADD COLUMN w20 bigserial; ALTER TABLE seq_test_0 ADD COLUMN w21 serial8; +TRUNCATE seq_test_0_local_table; +ALTER TABLE seq_test_0_local_table ADD COLUMN w00 smallserial; +ALTER TABLE seq_test_0_local_table ADD COLUMN w01 serial2; +ALTER TABLE seq_test_0_local_table ADD COLUMN w10 serial; +ALTER TABLE seq_test_0_local_table ADD COLUMN w11 serial4; +ALTER TABLE seq_test_0_local_table ADD COLUMN w20 bigserial; +ALTER TABLE seq_test_0_local_table ADD COLUMN w21 serial8; + -- check alter column type precaution ALTER TABLE seq_test_0 ALTER COLUMN z TYPE bigint; ALTER TABLE seq_test_0 ALTER COLUMN z TYPE smallint; +ALTER TABLE seq_test_0_local_table ALTER COLUMN z TYPE bigint; +ALTER TABLE seq_test_0_local_table ALTER COLUMN z TYPE smallint; + -- MX tests @@ -55,13 +99,18 @@ ALTER TABLE seq_test_4 ADD COLUMN b int DEFAULT nextval('seq_4'); -- on worker it should generate high sequence number \c - - - :worker_1_port INSERT INTO sequence_default.seq_test_4 VALUES (1,2) RETURNING *; + +-- check that we have can properly insert to tables from before metadata sync +INSERT INTO sequence_default.seq_test_0 VALUES (1,2) RETURNING *; +INSERT INTO sequence_default.seq_test_0_local_table VALUES (1,2) RETURNING *; + \c - - - :master_port SET citus.shard_replication_factor TO 1; SET search_path = sequence_default, public; SELECT start_metadata_sync_to_node('localhost', :worker_1_port); --- check sequence type consistency in all nodes +-- check sequence type consistency in all nodes for distributed tables CREATE SEQUENCE seq_1; -- type is bigint by default \d seq_1 @@ -79,9 +128,28 @@ SET search_path = sequence_default, public; SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +-- check sequence type consistency in all nodes for citus local tables +CREATE SEQUENCE seq_1_local_table; +-- type is bigint by default +\d seq_1_local_table +CREATE TABLE seq_test_1_local_table (x int, y int); +SELECT citus_add_local_table_to_metadata('seq_test_1_local_table'); +ALTER TABLE seq_test_1_local_table ADD COLUMN z int DEFAULT nextval('seq_1_local_table'); +-- type is changed to int +\d seq_1_local_table +-- check insertion is within int bounds in the worker +\c - - - :worker_1_port +INSERT INTO sequence_default.seq_test_1_local_table values (1, 2) RETURNING *; +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + + -- check that we cannot add serial pseudo-type columns -- when metadata is synced to workers ALTER TABLE seq_test_1 ADD COLUMN w bigserial; +ALTER TABLE seq_test_1_local_table ADD COLUMN w bigserial; -- check for sequence type clashes @@ -96,6 +164,7 @@ SELECT create_distributed_table('seq_test_2','x'); CREATE TABLE seq_test_2_0(x int, y smallint DEFAULT nextval('seq_2')); -- shouldn't work SELECT create_distributed_table('seq_test_2_0','x'); +SELECT citus_add_local_table_to_metadata('seq_test_2_0'); DROP TABLE seq_test_2; DROP TABLE seq_test_2_0; -- should work @@ -105,6 +174,7 @@ DROP TABLE seq_test_2; CREATE TABLE seq_test_2 (x int, y int DEFAULT nextval('seq_2'), z bigint DEFAULT nextval('seq_2')); -- shouldn't work SELECT create_distributed_table('seq_test_2','x'); +SELECT citus_add_local_table_to_metadata('seq_test_2'); -- check rename is propagated properly @@ -116,6 +186,15 @@ ALTER SEQUENCE seq_2 RENAME TO sequence_2; SET citus.shard_replication_factor TO 1; SET search_path = sequence_default, public; SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +-- check rename is propagated properly when we use ALTER TABLE +ALTER TABLE sequence_2 RENAME TO seq_2; +-- check in the worker +\c - - - :worker_1_port +\d sequence_default.seq_2 +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -- check rename with another schema -- we notice that schema is also propagated as one of the sequence's dependencies CREATE SCHEMA sequence_default_0; @@ -195,7 +274,7 @@ CREATE SCHEMA sequence_default_8; -- can change schema in a sequence not yet distributed ALTER SEQUENCE seq_8 SET SCHEMA sequence_default_8; ALTER SEQUENCE sequence_default_8.seq_8 SET SCHEMA sequence_default; -CREATE TABLE seq_test_8 (x int, y int DEFAULT nextval('seq_8')); +CREATE TABLE seq_test_8 (x int, y int DEFAULT nextval('seq_8'), z bigserial); SELECT create_distributed_table('seq_test_8', 'x'); -- cannot change sequence specifications ALTER SEQUENCE seq_8 AS bigint; @@ -205,9 +284,32 @@ ALTER SEQUENCE seq_8 START WITH 6; ALTER SEQUENCE seq_8 RESTART WITH 6; ALTER SEQUENCE seq_8 NO CYCLE; ALTER SEQUENCE seq_8 OWNED BY seq_test_7; --- cannot change schema in a distributed sequence +ALTER SEQUENCE seq_test_8_z_seq AS smallint; +ALTER SEQUENCE seq_test_8_z_seq INCREMENT BY 2; +ALTER SEQUENCE seq_test_8_z_seq MINVALUE 5 MAXVALUE 5000; +ALTER SEQUENCE seq_test_8_z_seq START WITH 6; +ALTER SEQUENCE seq_test_8_z_seq RESTART WITH 6; +ALTER SEQUENCE seq_test_8_z_seq NO CYCLE; +ALTER SEQUENCE seq_test_8_z_seq OWNED BY seq_test_7; +-- can change schema in a distributed sequence +-- sequence_default_8 will be created in workers as part of dependencies ALTER SEQUENCE seq_8 SET SCHEMA sequence_default_8; +\c - - - :worker_1_port +\d sequence_default_8.seq_8 +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +-- check we can change the schema when we use ALTER TABLE +ALTER TABLE sequence_default_8.seq_8 SET SCHEMA sequence_default; +\c - - - :worker_1_port +\d sequence_default.seq_8 +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); DROP SCHEMA sequence_default_8; +SELECT run_command_on_workers('DROP SCHEMA IF EXISTS sequence_default_8 CASCADE'); -- cannot use more than one sequence in a column default @@ -215,6 +317,39 @@ CREATE SEQUENCE seq_9; CREATE SEQUENCE seq_10; CREATE TABLE seq_test_9 (x int, y int DEFAULT nextval('seq_9') - nextval('seq_10')); SELECT create_distributed_table('seq_test_9', 'x'); +SELECT citus_add_local_table_to_metadata('seq_test_9'); +ALTER TABLE seq_test_9 ALTER COLUMN y SET DEFAULT nextval('seq_9'); +SELECT create_distributed_table('seq_test_9', 'x'); + + +-- we can change the owner role of a sequence +CREATE ROLE seq_role_0; +CREATE ROLE seq_role_1; +ALTER SEQUENCE seq_10 OWNER TO seq_role_0; +SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2; +SELECT run_command_on_workers('CREATE ROLE seq_role_0'); +SELECT run_command_on_workers('CREATE ROLE seq_role_1'); +ALTER TABLE seq_test_9 ALTER COLUMN y SET DEFAULT nextval('seq_10'); +ALTER SEQUENCE seq_10 OWNER TO seq_role_1; +SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2; +\c - - - :worker_1_port +SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2; +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +-- check we can change the owner role of a sequence when we use ALTER TABLE +ALTER TABLE seq_10 OWNER TO seq_role_0; +SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2; +\c - - - :worker_1_port +SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2; +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +DROP SEQUENCE seq_10 CASCADE; +DROP ROLE seq_role_0, seq_role_1; +SELECT run_command_on_workers('DROP ROLE IF EXISTS seq_role_0, seq_role_1'); -- Check some cases when default is defined by @@ -234,9 +369,11 @@ SELECT create_distributed_table('seq_test_11', 'col1'); INSERT INTO sequence_default.seq_test_10 VALUES (1); \c - - - :master_port + -- clean up DROP TABLE sequence_default.seq_test_7_par; DROP SCHEMA sequence_default CASCADE; SELECT run_command_on_workers('DROP SCHEMA IF EXISTS sequence_default CASCADE'); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +SELECT master_remove_node('localhost', :master_port); SET search_path TO public; diff --git a/src/test/regress/sql/multi_table_ddl.sql b/src/test/regress/sql/multi_table_ddl.sql index c9eed832e..1b903217b 100644 --- a/src/test/regress/sql/multi_table_ddl.sql +++ b/src/test/regress/sql/multi_table_ddl.sql @@ -86,7 +86,6 @@ CREATE SEQUENCE standalone_sequence; ALTER SEQUENCE standalone_sequence OWNED BY testserialtable.group_id; -- an edge case, but it's OK to change an owner to the same distributed table --- EDIT: this doesn't work for now for a distributed sequence ALTER SEQUENCE testserialtable_id_seq OWNED BY testserialtable.id; -- drop distributed table