Adds propagation of ALTER SEQUENCE and other improvements (#5061)

* Alter seq type when we first use the seq in a dist table

* Don't allow type changes when seq is used in dist table

* ALTER SEQUENCE propagation

* Tests for ALTER SEQUENCE propagation

* Relocate AlterSequenceType and ensure dependencies for sequence

* Support for citus local tables, and other fixes

* Final formatting
pull/5075/head^2
Naisila Puka 2021-06-24 21:23:25 +03:00 committed by GitHub
parent e9bfb8eddd
commit fe5907ad2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1345 additions and 297 deletions

View File

@ -75,7 +75,7 @@ static void DropDefaultColumnDefinition(Oid relationId, char *columnName);
static void TransferSequenceOwnership(Oid ownedSequenceId, Oid targetRelationId,
char *columnName);
static void InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId);
static void FinalizeCitusLocalTableCreation(Oid relationId);
static void FinalizeCitusLocalTableCreation(Oid relationId, List *dependentSequenceList);
PG_FUNCTION_INFO_V1(citus_add_local_table_to_metadata);
@ -315,7 +315,18 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys)
InsertMetadataForCitusLocalTable(shellRelationId, shardId);
FinalizeCitusLocalTableCreation(shellRelationId);
/*
* Ensure that the sequences used in column defaults of the table
* have proper types
*/
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(shellRelationId, &attnumList,
&dependentSequenceList, 0);
EnsureDistributedSequencesHaveOneType(shellRelationId, dependentSequenceList,
attnumList);
FinalizeCitusLocalTableCreation(shellRelationId, dependentSequenceList);
}
@ -1025,9 +1036,11 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId)
* FinalizeCitusLocalTableCreation completes creation of the citus local table
* with relationId by performing operations that should be done after creating
* the shard and inserting the metadata.
* If the cluster has metadata workers, we ensure proper propagation of the
* sequences dependent with the table.
*/
static void
FinalizeCitusLocalTableCreation(Oid relationId)
FinalizeCitusLocalTableCreation(Oid relationId, List *dependentSequenceList)
{
/*
* If it is a foreign table, then skip creating citus truncate trigger
@ -1040,6 +1053,14 @@ FinalizeCitusLocalTableCreation(Oid relationId)
if (ShouldSyncTableMetadata(relationId))
{
if (ClusterHasKnownMetadataWorkers())
{
/*
* Ensure sequence dependencies and mark them as distributed
* before creating table metadata on workers
*/
MarkSequenceListDistributedAndPropagateDependencies(dependentSequenceList);
}
CreateTableMetadataOnWorkers(relationId);
}

View File

@ -33,6 +33,7 @@
#include "catalog/pg_trigger.h"
#include "commands/defrem.h"
#include "commands/extension.h"
#include "commands/sequence.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "distributed/commands/multi_copy.h"
@ -64,6 +65,7 @@
#include "executor/executor.h"
#include "executor/spi.h"
#include "nodes/execnodes.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/pg_list.h"
#include "parser/parse_expr.h"
@ -465,6 +467,16 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
colocationId, replicationModel);
/*
* Ensure that the sequences used in column defaults of the table
* have proper types
*/
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(relationId, &attnumList, &dependentSequenceList, 0);
EnsureDistributedSequencesHaveOneType(relationId, dependentSequenceList,
attnumList);
/* foreign tables do not support TRUNCATE trigger */
if (RegularTable(relationId))
{
@ -498,6 +510,15 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
if (ShouldSyncTableMetadata(relationId))
{
if (ClusterHasKnownMetadataWorkers())
{
/*
* Ensure sequence dependencies and mark them as distributed
* before creating table metadata on workers
*/
MarkSequenceListDistributedAndPropagateDependencies(dependentSequenceList);
}
CreateTableMetadataOnWorkers(relationId);
}
@ -544,6 +565,149 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
}
/*
* EnsureSequenceTypeSupported ensures that the type of the column that uses
* a sequence on its DEFAULT is consistent with previous uses (if any) of the
* sequence in distributed tables.
* If any other distributed table uses the input sequence, it checks whether
* the types of the columns using the sequence match. If they don't, it errors out.
* Otherwise, the condition is ensured.
*/
void
EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId)
{
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
Oid citusTableId = InvalidOid;
foreach_oid(citusTableId, citusTableIdList)
{
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(citusTableId, &attnumList,
&dependentSequenceList, 0);
ListCell *attnumCell = NULL;
ListCell *dependentSequenceCell = NULL;
forboth(attnumCell, attnumList, dependentSequenceCell,
dependentSequenceList)
{
AttrNumber currentAttnum = lfirst_int(attnumCell);
Oid currentSeqOid = lfirst_oid(dependentSequenceCell);
/*
* If another distributed table is using the same sequence
* in one of its column defaults, make sure the types of the
* columns match
*/
if (currentSeqOid == seqOid)
{
Oid currentSeqTypId = GetAttributeTypeOid(citusTableId,
currentAttnum);
if (seqTypId != currentSeqTypId)
{
char *sequenceName = generate_qualified_relation_name(
seqOid);
char *citusTableName =
generate_qualified_relation_name(citusTableId);
ereport(ERROR, (errmsg(
"The sequence %s is already used for a different"
" type in column %d of the table %s",
sequenceName, currentAttnum,
citusTableName)));
}
}
}
}
}
/*
* AlterSequenceType alters the given sequence's type to the given type.
*/
void
AlterSequenceType(Oid seqOid, Oid typeOid)
{
Form_pg_sequence sequenceData = pg_get_sequencedef(seqOid);
Oid currentSequenceTypeOid = sequenceData->seqtypid;
if (currentSequenceTypeOid != typeOid)
{
AlterSeqStmt *alterSequenceStatement = makeNode(AlterSeqStmt);
char *seqNamespace = get_namespace_name(get_rel_namespace(seqOid));
char *seqName = get_rel_name(seqOid);
alterSequenceStatement->sequence = makeRangeVar(seqNamespace, seqName, -1);
Node *asTypeNode = (Node *) makeTypeNameFromOid(typeOid, -1);
SetDefElemArg(alterSequenceStatement, "as", asTypeNode);
ParseState *pstate = make_parsestate(NULL);
AlterSequence(pstate, alterSequenceStatement);
}
}
/*
* MarkSequenceListDistributedAndPropagateDependencies ensures dependencies
* for the given sequence list exist on all nodes and marks the sequences
* as distributed.
*/
void
MarkSequenceListDistributedAndPropagateDependencies(List *sequenceList)
{
Oid sequenceOid = InvalidOid;
foreach_oid(sequenceOid, sequenceList)
{
MarkSequenceDistributedAndPropagateDependencies(sequenceOid);
}
}
/*
* MarkSequenceDistributedAndPropagateDependencies ensures dependencies
* for the given sequence exist on all nodes and marks the sequence
* as distributed.
*/
void
MarkSequenceDistributedAndPropagateDependencies(Oid sequenceOid)
{
/* get sequence address */
ObjectAddress sequenceAddress = { 0 };
ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid);
EnsureDependenciesExistOnAllNodes(&sequenceAddress);
MarkObjectDistributed(&sequenceAddress);
}
/*
* EnsureDistributedSequencesHaveOneType first ensures that the type of the column
* in which the sequence is used as default is supported for each sequence in input
* dependentSequenceList, and then alters the sequence type if not the same with the column type.
*/
void
EnsureDistributedSequencesHaveOneType(Oid relationId, List *dependentSequenceList,
List *attnumList)
{
ListCell *attnumCell = NULL;
ListCell *dependentSequenceCell = NULL;
forboth(attnumCell, attnumList, dependentSequenceCell, dependentSequenceList)
{
AttrNumber attnum = lfirst_int(attnumCell);
Oid sequenceOid = lfirst_oid(dependentSequenceCell);
/*
* We should make sure that the type of the column that uses
* that sequence is supported
*/
Oid seqTypId = GetAttributeTypeOid(relationId, attnum);
EnsureSequenceTypeSupported(sequenceOid, seqTypId);
/*
* Alter the sequence's data type in the coordinator if needed.
* A sequence's type is bigint by default and it doesn't change even if
* it's used in an int column. We should change the type if needed,
* and not allow future ALTER SEQUENCE ... TYPE ... commands for
* sequences used as defaults in distributed tables
*/
AlterSequenceType(sequenceOid, seqTypId);
}
}
/*
* GetFKeyCreationCommandsRelationInvolvedWithTableType returns a list of DDL
* commands to recreate the foreign keys that relation with relationId is involved

View File

@ -371,15 +371,22 @@ static DistributeObjectOps Sequence_Alter = {
.qualify = NULL,
.preprocess = PreprocessAlterSequenceStmt,
.postprocess = NULL,
.address = AlterSequenceObjectAddress,
.address = AlterSequenceStmtObjectAddress,
};
static DistributeObjectOps Sequence_AlterObjectSchema = {
.deparse = NULL,
.qualify = NULL,
.deparse = DeparseAlterSequenceSchemaStmt,
.qualify = QualifyAlterSequenceSchemaStmt,
.preprocess = PreprocessAlterSequenceSchemaStmt,
.postprocess = NULL,
.postprocess = PostprocessAlterSequenceSchemaStmt,
.address = AlterSequenceSchemaStmtObjectAddress,
};
static DistributeObjectOps Sequence_AlterOwner = {
.deparse = DeparseAlterSequenceOwnerStmt,
.qualify = QualifyAlterSequenceOwnerStmt,
.preprocess = PreprocessAlterSequenceOwnerStmt,
.postprocess = PostprocessAlterSequenceOwnerStmt,
.address = AlterSequenceOwnerStmtObjectAddress,
};
static DistributeObjectOps Sequence_Drop = {
.deparse = DeparseDropSequenceStmt,
.qualify = NULL,
@ -787,6 +794,11 @@ GetDistributeObjectOps(Node *node)
return &Index_AlterTable;
}
case OBJECT_SEQUENCE:
{
return &Sequence_AlterOwner;
}
default:
{
return &NoDistributeOps;

View File

@ -16,6 +16,7 @@
#include "distributed/commands/utility_hook.h"
#include "distributed/metadata_cache.h"
#include "nodes/parsenodes.h"
#include "utils/lsyscache.h"
/*
@ -62,6 +63,15 @@ PreprocessRenameStmt(Node *node, const char *renameCommand,
return NIL;
}
/* check whether we are dealing with a sequence here */
if (get_rel_relkind(objectRelationId) == RELKIND_SEQUENCE)
{
RenameStmt *stmtCopy = copyObject(renameStmt);
stmtCopy->renameType = OBJECT_SEQUENCE;
return PreprocessRenameSequenceStmt((Node *) stmtCopy, renameCommand,
processUtilityContext);
}
/* we have no planning to do unless the table is distributed */
switch (renameStmt->renameType)
{

View File

@ -24,10 +24,12 @@
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "nodes/parsenodes.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
/* Local functions forward declarations for helper functions */
static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId);
static bool ShouldPropagateAlterSequence(const ObjectAddress *address);
static Oid SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress);
/*
@ -284,9 +286,9 @@ PreprocessDropSequenceStmt(Node *node, const char *queryString,
}
/*
* managing types can only be done on the coordinator if ddl propagation is on. when
* managing sequences can only be done on the coordinator if ddl propagation is on. when
* it is off we will never get here. MX workers don't have a notion of distributed
* types, so we block the call.
* sequences, so we block the call.
*/
EnsureCoordinator();
@ -303,14 +305,13 @@ PreprocessDropSequenceStmt(Node *node, const char *queryString,
*/
DropStmt *stmtCopy = copyObject(stmt);
stmtCopy->objects = distributedSequencesList;
stmtCopy->missing_ok = true;
const char *dropStmtSql = DeparseTreeNode((Node *) stmtCopy);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands);
}
@ -332,7 +333,7 @@ PreprocessRenameSequenceStmt(Node *node, const char *queryString, ProcessUtility
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt,
stmt->missing_ok);
if (!ShouldPropagateAlterSequence(&address))
if (!ShouldPropagateObject(&address))
{
return NIL;
}
@ -340,16 +341,12 @@ PreprocessRenameSequenceStmt(Node *node, const char *queryString, ProcessUtility
EnsureCoordinator();
QualifyTreeNode((Node *) stmt);
/* this takes care of cases where not all workers have synced metadata */
RenameStmt *stmtCopy = copyObject(stmt);
stmtCopy->missing_ok = true;
const char *sql = DeparseTreeNode((Node *) stmtCopy);
const char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands);
}
@ -372,40 +369,6 @@ RenameSequenceStmtObjectAddress(Node *node, bool missing_ok)
}
/*
* ShouldPropagateAlterSequence returns, based on the address of a sequence, if alter
* statements targeting the function should be propagated.
*/
static bool
ShouldPropagateAlterSequence(const ObjectAddress *address)
{
if (creating_extension)
{
/*
* extensions should be created separately on the workers, sequences cascading
* from an extension should therefore not be propagated.
*/
return false;
}
if (!EnableDependencyCreation)
{
/*
* we are configured to disable object propagation, should not propagate anything
*/
return false;
}
if (!IsObjectDistributed(address))
{
/* do not propagate alter sequence for non-distributed sequences */
return false;
}
return true;
}
/*
* PreprocessAlterSequenceStmt gets called during the planning phase of an ALTER SEQUENCE statement
* of one of the following forms:
@ -435,21 +398,77 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString,
if (IsObjectDistributed(&address))
{
ereport(ERROR, (errmsg(
"This operation is currently not allowed for a distributed sequence.")));
"Altering a distributed sequence is currently not supported.")));
}
else
/*
* error out if the sequence is used in a distributed table
* and this is an ALTER SEQUENCE .. AS .. statement
*/
Oid citusTableId = SequenceUsedInDistributedTable(&address);
if (citusTableId != InvalidOid)
{
return NIL;
List *options = stmt->options;
DefElem *defel = NULL;
foreach_ptr(defel, options)
{
if (strcmp(defel->defname, "as") == 0)
{
if (IsCitusTableType(citusTableId, CITUS_LOCAL_TABLE))
{
ereport(ERROR, (errmsg(
"Altering a sequence used in a local table that"
" is added to metadata is currently not supported.")));
}
ereport(ERROR, (errmsg(
"Altering a sequence used in a distributed"
" table is currently not supported.")));
}
}
}
return NIL;
}
/*
* AlterSequenceOwnerObjectAddress returns the ObjectAddress of the sequence that is the
* subject of the AlterOwnerStmt.
* SequenceUsedInDistributedTable returns true if the argument sequence
* is used as the default value of a column in a distributed table.
* Returns false otherwise
*/
static Oid
SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress)
{
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
Oid citusTableId = InvalidOid;
foreach_oid(citusTableId, citusTableIdList)
{
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(citusTableId, &attnumList,
&dependentSequenceList, 0);
Oid currentSeqOid = InvalidOid;
foreach_oid(currentSeqOid, dependentSequenceList)
{
/*
* This sequence is used in a distributed table
*/
if (currentSeqOid == sequenceAddress->objectId)
{
return citusTableId;
}
}
}
return InvalidOid;
}
/*
* AlterSequenceStmtObjectAddress returns the ObjectAddress of the sequence that is the
* subject of the AlterSeqStmt.
*/
ObjectAddress
AlterSequenceObjectAddress(Node *node, bool missing_ok)
AlterSequenceStmtObjectAddress(Node *node, bool missing_ok)
{
AlterSeqStmt *stmt = castNode(AlterSeqStmt, node);
@ -466,7 +485,7 @@ AlterSequenceObjectAddress(Node *node, bool missing_ok)
* PreprocessAlterSequenceSchemaStmt is executed before the statement is applied to the local
* postgres instance.
*
* For distributed sequences, this operation will not be allowed for now.
* In this stage we can prepare the commands that need to be run on all workers.
*/
List *
PreprocessAlterSequenceSchemaStmt(Node *node, const char *queryString,
@ -477,17 +496,20 @@ PreprocessAlterSequenceSchemaStmt(Node *node, const char *queryString,
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt,
stmt->missing_ok);
/* error out if the sequence is distributed */
if (IsObjectDistributed(&address))
{
ereport(ERROR, (errmsg(
"This operation is currently not allowed for a distributed sequence.")));
}
else
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands);
}
@ -501,6 +523,108 @@ AlterSequenceSchemaStmtObjectAddress(Node *node, bool missing_ok)
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_SEQUENCE);
RangeVar *sequence = stmt->relation;
Oid seqOid = RangeVarGetRelid(sequence, NoLock, true);
if (seqOid == InvalidOid)
{
/*
* couldn't find the sequence, might have already been moved to the new schema, we
* construct a new sequence name that uses the new schema to search in.
*/
const char *newSchemaName = stmt->newschema;
Oid newSchemaOid = get_namespace_oid(newSchemaName, true);
seqOid = get_relname_relid(sequence->relname, newSchemaOid);
if (!missing_ok && seqOid == InvalidOid)
{
/*
* if the sequence is still invalid we couldn't find the sequence, error with the same
* message postgres would error with if missing_ok is false (not ok to miss)
*/
const char *quotedSequenceName =
quote_qualified_identifier(sequence->schemaname, sequence->relname);
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s\" does not exist",
quotedSequenceName)));
}
}
ObjectAddress sequenceAddress = { 0 };
ObjectAddressSet(sequenceAddress, RelationRelationId, seqOid);
return sequenceAddress;
}
/*
* PostprocessAlterSequenceSchemaStmt is executed after the change has been applied locally,
* we can now use the new dependencies of the sequence to ensure all its dependencies
* exist on the workers before we apply the commands remotely.
*/
List *
PostprocessAlterSequenceSchemaStmt(Node *node, const char *queryString)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_SEQUENCE);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt,
stmt->missing_ok);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
/* dependencies have changed (schema) let's ensure they exist */
EnsureDependenciesExistOnAllNodes(&address);
return NIL;
}
/*
* PreprocessAlterSequenceOwnerStmt is called for change of ownership of sequences before the
* ownership is changed on the local instance.
*
* If the sequence for which the owner is changed is distributed we execute the change on
* all the workers to keep the type in sync across the cluster.
*/
List *
PreprocessAlterSequenceOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
Assert(stmt->relkind == OBJECT_SEQUENCE);
ObjectAddress sequenceAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&sequenceAddress))
{
return NIL;
}
EnsureCoordinator();
QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands);
}
/*
* AlterSequenceOwnerStmtObjectAddress returns the ObjectAddress of the sequence that is the
* subject of the AlterOwnerStmt.
*/
ObjectAddress
AlterSequenceOwnerStmtObjectAddress(Node *node, bool missing_ok)
{
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
Assert(stmt->relkind == OBJECT_SEQUENCE);
RangeVar *sequence = stmt->relation;
Oid seqOid = RangeVarGetRelid(sequence, NoLock, missing_ok);
ObjectAddress sequenceAddress = { 0 };
@ -508,3 +632,27 @@ AlterSequenceSchemaStmtObjectAddress(Node *node, bool missing_ok)
return sequenceAddress;
}
/*
* PostprocessAlterSequenceOwnerStmt is executed after the change has been applied locally,
* we can now use the new dependencies of the sequence to ensure all its dependencies
* exist on the workers before we apply the commands remotely.
*/
List *
PostprocessAlterSequenceOwnerStmt(Node *node, const char *queryString)
{
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
Assert(stmt->relkind == OBJECT_SEQUENCE);
ObjectAddress sequenceAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&sequenceAddress))
{
return NIL;
}
/* dependencies have changed (owner) let's ensure they exist */
EnsureDependenciesExistOnAllNodes(&sequenceAddress);
return NIL;
}

View File

@ -14,6 +14,7 @@
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/index.h"
#include "catalog/pg_attrdef.h"
#include "catalog/pg_class.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_depend.h"
@ -42,6 +43,7 @@
#include "parser/parse_expr.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
@ -92,6 +94,8 @@ static void SetInterShardDDLTaskPlacementList(Task *task,
static void SetInterShardDDLTaskRelationShardList(Task *task,
ShardInterval *leftShardInterval,
ShardInterval *rightShardInterval);
static Oid GetSequenceOid(Oid relationId, AttrNumber attnum);
static Oid get_attrdef_oid(Oid relationId, AttrNumber attnum);
/*
@ -463,6 +467,13 @@ PostprocessAlterTableSchemaStmt(Node *node, const char *queryString)
*/
ObjectAddress tableAddress = GetObjectAddressFromParseTree((Node *) stmt, true);
/* check whether we are dealing with a sequence here */
if (get_rel_relkind(tableAddress.objectId) == RELKIND_SEQUENCE)
{
stmt->objectType = OBJECT_SEQUENCE;
return PostprocessAlterSequenceSchemaStmt((Node *) stmt, queryString);
}
if (!ShouldPropagate() || !IsCitusTable(tableAddress.objectId))
{
return NIL;
@ -503,6 +514,20 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
return NIL;
}
/*
* check whether we are dealing with a sequence here
* if yes, it must be ALTER TABLE .. OWNER TO .. command
* since this is the only ALTER command of a sequence that
* passes through an AlterTableStmt
*/
if (get_rel_relkind(leftRelationId) == RELKIND_SEQUENCE)
{
AlterTableStmt *stmtCopy = copyObject(alterTableStatement);
stmtCopy->relkind = OBJECT_SEQUENCE;
return PreprocessAlterSequenceOwnerStmt((Node *) stmtCopy, alterTableCommand,
processUtilityContext);
}
/*
* AlterTableStmt applies also to INDEX relations, and we have support for
* SET/SET storage parameters in Citus, so we might have to check for
@ -1385,6 +1410,15 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString,
stmt->missing_ok);
Oid relationId = address.objectId;
/* check whether we are dealing with a sequence here */
if (get_rel_relkind(relationId) == RELKIND_SEQUENCE)
{
AlterObjectSchemaStmt *stmtCopy = copyObject(stmt);
stmtCopy->objectType = OBJECT_SEQUENCE;
return PreprocessAlterSequenceSchemaStmt((Node *) stmtCopy, queryString,
processUtilityContext);
}
/* first check whether a distributed relation is affected */
if (!OidIsValid(relationId) || !IsCitusTable(relationId))
{
@ -1547,6 +1581,19 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
if (relationId != InvalidOid)
{
/*
* check whether we are dealing with a sequence here
* if yes, it must be ALTER TABLE .. OWNER TO .. command
* since this is the only ALTER command of a sequence that
* passes through an AlterTableStmt
*/
if (get_rel_relkind(relationId) == RELKIND_SEQUENCE)
{
alterTableStatement->relkind = OBJECT_SEQUENCE;
PostprocessAlterSequenceOwnerStmt((Node *) alterTableStatement, NULL);
return;
}
/* changing a relation could introduce new dependencies */
ObjectAddress tableAddress = { 0 };
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
@ -1629,8 +1676,22 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
{
AttrNumber attnum = get_attnum(relationId,
columnDefinition->colname);
Oid seqTypId = GetAttributeTypeOid(relationId, attnum);
EnsureSequenceTypeSupported(relationId, attnum, seqTypId);
Oid seqOid = GetSequenceOid(relationId, attnum);
if (seqOid != InvalidOid)
{
EnsureDistributedSequencesHaveOneType(relationId,
list_make1_oid(
seqOid),
list_make1_int(
attnum));
if (ShouldSyncTableMetadata(relationId) &&
ClusterHasKnownMetadataWorkers())
{
MarkSequenceDistributedAndPropagateDependencies(
seqOid);
}
}
}
}
}
@ -1650,8 +1711,19 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
if (contain_nextval_expression_walker(expr, NULL))
{
AttrNumber attnum = get_attnum(relationId, command->name);
Oid seqTypId = GetAttributeTypeOid(relationId, attnum);
EnsureSequenceTypeSupported(relationId, attnum, seqTypId);
Oid seqOid = GetSequenceOid(relationId, attnum);
if (seqOid != InvalidOid)
{
EnsureDistributedSequencesHaveOneType(relationId,
list_make1_oid(seqOid),
list_make1_int(attnum));
if (ShouldSyncTableMetadata(relationId) &&
ClusterHasKnownMetadataWorkers())
{
MarkSequenceDistributedAndPropagateDependencies(seqOid);
}
}
}
}
}
@ -1680,6 +1752,100 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
}
/*
* GetSequenceOid returns the oid of the sequence used as default value
* of the attribute with given attnum of the given table relationId
* If there is no sequence used it returns InvalidOid.
*/
static Oid
GetSequenceOid(Oid relationId, AttrNumber attnum)
{
/* get attrdefoid from the given relationId and attnum */
Oid attrdefOid = get_attrdef_oid(relationId, attnum);
/* retrieve the sequence id of the sequence found in nextval('seq') */
List *sequencesFromAttrDef = GetSequencesFromAttrDef(attrdefOid);
if (list_length(sequencesFromAttrDef) == 0)
{
/*
* We need this check because sometimes there are cases where the
* dependency between the table and the sequence is not formed
* One example is when the default is defined by
* DEFAULT nextval('seq_name'::text) (not by DEFAULT nextval('seq_name'))
* In these cases, sequencesFromAttrDef with be empty.
*/
return InvalidOid;
}
if (list_length(sequencesFromAttrDef) > 1)
{
/* to simplify and eliminate cases like "DEFAULT nextval('..') - nextval('..')" */
if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
ereport(ERROR, (errmsg(
"More than one sequence in a column default"
" is not supported for adding local tables to metadata")));
}
ereport(ERROR, (errmsg(
"More than one sequence in a column default"
" is not supported for distribution")));
}
return lfirst_oid(list_head(sequencesFromAttrDef));
}
/*
* get_attrdef_oid gets the oid of the attrdef that has dependency with
* the given relationId (refobjid) and attnum (refobjsubid).
* If there is no such attrdef it returns InvalidOid.
* NOTE: we are iterating pg_depend here since this function is used together
* with other functions that iterate pg_depend. Normally, a look at pg_attrdef
* would make more sense.
*/
static Oid
get_attrdef_oid(Oid relationId, AttrNumber attnum)
{
Oid resultAttrdefOid = InvalidOid;
ScanKeyData key[3];
Relation depRel = table_open(DependRelationId, AccessShareLock);
ScanKeyInit(&key[0],
Anum_pg_depend_refclassid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationRelationId));
ScanKeyInit(&key[1],
Anum_pg_depend_refobjid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relationId));
ScanKeyInit(&key[2],
Anum_pg_depend_refobjsubid,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(attnum));
SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true,
NULL, attnum ? 3 : 2, key);
HeapTuple tup;
while (HeapTupleIsValid(tup = systable_getnext(scan)))
{
Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
if (deprec->classid == AttrDefaultRelationId)
{
resultAttrdefOid = deprec->objid;
}
}
systable_endscan(scan);
table_close(depRel, AccessShareLock);
return resultAttrdefOid;
}
void
ErrorUnsupportedAlterTableAddColumn(Oid relationId, AlterTableCmd *command,
Constraint *constraint)

View File

@ -27,7 +27,6 @@
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_am.h"
#include "catalog/pg_attrdef.h"
#include "catalog/pg_attribute.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_class.h"
@ -47,6 +46,7 @@
#include "distributed/metadata_utility.h"
#include "distributed/relay_utility.h"
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
@ -77,7 +77,6 @@ static void AppendStorageParametersToString(StringInfo stringBuffer,
List *optionList);
static void simple_quote_literal(StringInfo buf, const char *val);
static char * flatten_reloptions(Oid relid);
static Oid get_attrdef_oid(Oid relationId, AttrNumber attnum);
/*
@ -370,16 +369,6 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults,
appendStringInfo(&buffer, " DEFAULT %s", defaultString);
}
}
/*
* We should make sure that the type of the column that uses
* that sequence is supported
*/
if (contain_nextval_expression_walker(defaultNode, NULL))
{
EnsureSequenceTypeSupported(tableRelationId, defaultValue->adnum,
attributeForm->atttypid);
}
}
/* if this column has a not null constraint, append the constraint */
@ -498,138 +487,6 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults,
}
/*
* EnsureSequenceTypeSupported ensures that the type of the column that uses
* a sequence on its DEFAULT is consistent with previous uses of the sequence (if any)
* It gets the AttrDefault OID from the given relationId and attnum, extracts the sequence
* id from it, and if any other distributed table uses that same sequence, it checks whether
* the types of the columns using the sequence match. If they don't, it errors out.
* Otherwise, the condition is ensured.
*/
void
EnsureSequenceTypeSupported(Oid relationId, AttrNumber attnum, Oid seqTypId)
{
/* get attrdefoid from the given relationId and attnum */
Oid attrdefOid = get_attrdef_oid(relationId, attnum);
/* retrieve the sequence id of the sequence found in nextval('seq') */
List *sequencesFromAttrDef = GetSequencesFromAttrDef(attrdefOid);
if (list_length(sequencesFromAttrDef) == 0)
{
/*
* We need this check because sometimes there are cases where the
* dependency between the table and the sequence is not formed
* One example is when the default is defined by
* DEFAULT nextval('seq_name'::text) (not by DEFAULT nextval('seq_name'))
* In these cases, sequencesFromAttrDef with be empty.
*/
return;
}
if (list_length(sequencesFromAttrDef) > 1)
{
/* to simplify and eliminate cases like "DEFAULT nextval('..') - nextval('..')" */
ereport(ERROR, (errmsg(
"More than one sequence in a column default"
" is not supported for distribution")));
}
Oid seqOid = lfirst_oid(list_head(sequencesFromAttrDef));
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
Oid citusTableId = InvalidOid;
foreach_oid(citusTableId, citusTableIdList)
{
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(citusTableId, &attnumList,
&dependentSequenceList, 0);
ListCell *attnumCell = NULL;
ListCell *dependentSequenceCell = NULL;
forboth(attnumCell, attnumList, dependentSequenceCell,
dependentSequenceList)
{
AttrNumber currentAttnum = lfirst_int(attnumCell);
Oid currentSeqOid = lfirst_oid(dependentSequenceCell);
/*
* If another distributed table is using the same sequence
* in one of its column defaults, make sure the types of the
* columns match
*/
if (currentSeqOid == seqOid)
{
Oid currentSeqTypId = GetAttributeTypeOid(citusTableId,
currentAttnum);
if (seqTypId != currentSeqTypId)
{
char *sequenceName = generate_qualified_relation_name(
seqOid);
char *citusTableName =
generate_qualified_relation_name(citusTableId);
ereport(ERROR, (errmsg(
"The sequence %s is already used for a different"
" type in column %d of the table %s",
sequenceName, currentAttnum,
citusTableName)));
}
}
}
}
}
/*
* get_attrdef_oid gets the oid of the attrdef that has dependency with
* the given relationId (refobjid) and attnum (refobjsubid).
* If there is no such attrdef it returns InvalidOid.
* NOTE: we are iterating pg_depend here since this function is used together
* with other functions that iterate pg_depend. Normally, a look at pg_attrdef
* would make more sense.
*/
static Oid
get_attrdef_oid(Oid relationId, AttrNumber attnum)
{
Oid resultAttrdefOid = InvalidOid;
ScanKeyData key[3];
Relation depRel = table_open(DependRelationId, AccessShareLock);
ScanKeyInit(&key[0],
Anum_pg_depend_refclassid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationRelationId));
ScanKeyInit(&key[1],
Anum_pg_depend_refobjid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relationId));
ScanKeyInit(&key[2],
Anum_pg_depend_refobjsubid,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(attnum));
SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true,
NULL, attnum ? 3 : 2, key);
HeapTuple tup;
while (HeapTupleIsValid(tup = systable_getnext(scan)))
{
Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
if (deprec->classid == AttrDefaultRelationId)
{
resultAttrdefOid = deprec->objid;
}
}
systable_endscan(scan);
table_close(depRel, AccessShareLock);
return resultAttrdefOid;
}
/*
* EnsureRelationKindSupported errors out if the given relation is not supported
* as a distributed relation.

View File

@ -15,6 +15,7 @@
#include "catalog/namespace.h"
#include "distributed/deparser.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@ -23,6 +24,8 @@
static void AppendDropSequenceStmt(StringInfo buf, DropStmt *stmt);
static void AppendSequenceNameList(StringInfo buf, List *objects, ObjectType objtype);
static void AppendRenameSequenceStmt(StringInfo buf, RenameStmt *stmt);
static void AppendAlterSequenceSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt);
static void AppendAlterSequenceOwnerStmt(StringInfo buf, AlterTableStmt *stmt);
/*
* DeparseDropSequenceStmt builds and returns a string representing the DropStmt
@ -138,21 +141,121 @@ AppendRenameSequenceStmt(StringInfo buf, RenameStmt *stmt)
/*
* QualifyRenameSequenceStmt transforms a
* ALTER SEQUENCE .. RENAME TO ..
* statement in place and makes the sequence name fully qualified.
* DeparseAlterSequenceSchemaStmt builds and returns a string representing the AlterObjectSchemaStmt
*/
void
QualifyRenameSequenceStmt(Node *node)
char *
DeparseAlterSequenceSchemaStmt(Node *node)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_SEQUENCE);
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->objectType == OBJECT_SEQUENCE);
AppendAlterSequenceSchemaStmt(&str, stmt);
return str.data;
}
/*
* AppendAlterSequenceSchemaStmt appends a string representing the AlterObjectSchemaStmt to a buffer
*/
static void
AppendAlterSequenceSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt)
{
RangeVar *seq = stmt->relation;
if (seq->schemaname == NULL)
char *qualifiedSequenceName = quote_qualified_identifier(seq->schemaname,
seq->relname);
appendStringInfoString(buf, "ALTER SEQUENCE ");
if (stmt->missing_ok)
{
Oid schemaOid = RangeVarGetCreationNamespace(seq);
seq->schemaname = get_namespace_name(schemaOid);
appendStringInfoString(buf, "IF EXISTS ");
}
appendStringInfoString(buf, qualifiedSequenceName);
appendStringInfo(buf, " SET SCHEMA %s;", quote_identifier(stmt->newschema));
}
/*
* DeparseAlterSequenceOwnerStmt builds and returns a string representing the AlterTableStmt
* consisting of changing the owner of a sequence
*/
char *
DeparseAlterSequenceOwnerStmt(Node *node)
{
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->relkind == OBJECT_SEQUENCE);
AppendAlterSequenceOwnerStmt(&str, stmt);
return str.data;
}
/*
* AppendAlterSequenceOwnerStmt appends a string representing the AlterTableStmt to a buffer
* consisting of changing the owner of a sequence
*/
static void
AppendAlterSequenceOwnerStmt(StringInfo buf, AlterTableStmt *stmt)
{
Assert(stmt->relkind == OBJECT_SEQUENCE);
RangeVar *seq = stmt->relation;
char *qualifiedSequenceName = quote_qualified_identifier(seq->schemaname,
seq->relname);
appendStringInfoString(buf, "ALTER SEQUENCE ");
if (stmt->missing_ok)
{
appendStringInfoString(buf, "IF EXISTS ");
}
appendStringInfoString(buf, qualifiedSequenceName);
ListCell *cmdCell = NULL;
foreach(cmdCell, stmt->cmds)
{
if (cmdCell != list_head(stmt->cmds))
{
/*
* normally we shouldn't ever reach this
* because we enter this function after making sure we have only
* one subcommand of the type AT_ChangeOwner
*/
ereport(ERROR, (errmsg("More than one subcommand is not supported "
"for ALTER SEQUENCE")));
}
AlterTableCmd *alterTableCmd = castNode(AlterTableCmd, lfirst(cmdCell));
switch (alterTableCmd->subtype)
{
case AT_ChangeOwner:
{
appendStringInfo(buf, " OWNER TO %s;", get_rolespec_name(
alterTableCmd->newowner));
break;
}
default:
{
/*
* normally we shouldn't ever reach this
* because we enter this function after making sure this stmt is of the form
* ALTER SEQUENCE .. OWNER TO ..
*/
ereport(ERROR, (errmsg("unsupported subtype for alter sequence command"),
errdetail("sub command type: %d",
alterTableCmd->subtype)));
}
}
}
}

View File

@ -0,0 +1,85 @@
/*-------------------------------------------------------------------------
*
* qualify_sequence_stmt.c
* Functions specialized in fully qualifying all sequence statements. These
* functions are dispatched from qualify.c
*
* Fully qualifying sequence statements consists of adding the schema name
* to the subject of the sequence.
*
* Goal would be that the deparser functions for these statements can
* serialize the statement without any external lookups.
*
* Copyright (c), Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/deparser.h"
#include "parser/parse_func.h"
#include "utils/lsyscache.h"
/*
* QualifyAlterSequenceOwnerStmt transforms a
* ALTER SEQUENCE .. OWNER TO ..
* statement in place and makes the sequence name fully qualified.
*/
void
QualifyAlterSequenceOwnerStmt(Node *node)
{
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
Assert(stmt->relkind == OBJECT_SEQUENCE);
RangeVar *seq = stmt->relation;
if (seq->schemaname == NULL)
{
Oid schemaOid = RangeVarGetCreationNamespace(seq);
seq->schemaname = get_namespace_name(schemaOid);
}
}
/*
* QualifyAlterSequenceSchemaStmt transforms a
* ALTER SEQUENCE .. SET SCHEMA ..
* statement in place and makes the sequence name fully qualified.
*/
void
QualifyAlterSequenceSchemaStmt(Node *node)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_SEQUENCE);
RangeVar *seq = stmt->relation;
if (seq->schemaname == NULL)
{
Oid schemaOid = RangeVarGetCreationNamespace(seq);
seq->schemaname = get_namespace_name(schemaOid);
}
}
/*
* QualifyRenameSequenceStmt transforms a
* ALTER SEQUENCE .. RENAME TO ..
* statement in place and makes the sequence name fully qualified.
*/
void
QualifyRenameSequenceStmt(Node *node)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_SEQUENCE);
RangeVar *seq = stmt->relation;
if (seq->schemaname == NULL)
{
Oid schemaOid = RangeVarGetCreationNamespace(seq);
seq->schemaname = get_namespace_name(schemaOid);
}
}

View File

@ -31,7 +31,6 @@
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"
#include "commands/async.h"
#include "commands/sequence.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/commands.h"
#include "distributed/deparser.h"
@ -48,13 +47,11 @@
#include "distributed/pg_dist_node.h"
#include "distributed/remote_commands.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
#include "distributed/version_compat.h"
#include "executor/spi.h"
#include "foreign/foreign.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/pg_list.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
@ -390,6 +387,15 @@ MetadataCreateCommands(void)
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
EnsureDependenciesExistOnAllNodes(&tableAddress);
/*
* Ensure sequence dependencies and mark them as distributed
*/
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(relationId, &attnumList,
&dependentSequenceList, 0);
MarkSequenceListDistributedAndPropagateDependencies(dependentSequenceList);
List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
workerSequenceDDLCommands);
@ -1056,51 +1062,18 @@ SequenceDDLCommandsForTable(Oid relationId)
char *ownerName = TableOwner(relationId);
ListCell *attnumCell = NULL;
ListCell *dependentSequenceCell = NULL;
forboth(attnumCell, attnumList, dependentSequenceCell, dependentSequenceList)
Oid sequenceOid = InvalidOid;
foreach_oid(sequenceOid, dependentSequenceList)
{
AttrNumber attnum = lfirst_int(attnumCell);
Oid sequenceOid = lfirst_oid(dependentSequenceCell);
char *sequenceDef = pg_get_sequencedef_string(sequenceOid);
char *escapedSequenceDef = quote_literal_cstr(sequenceDef);
StringInfo wrappedSequenceDef = makeStringInfo();
StringInfo sequenceGrantStmt = makeStringInfo();
char *sequenceName = generate_qualified_relation_name(sequenceOid);
Form_pg_sequence sequenceData = pg_get_sequencedef(sequenceOid);
Oid sequenceTypeOid = GetAttributeTypeOid(relationId, attnum);
Oid sequenceTypeOid = sequenceData->seqtypid;
char *typeName = format_type_be(sequenceTypeOid);
/* get sequence address */
ObjectAddress sequenceAddress = { 0 };
ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid);
EnsureDependenciesExistOnAllNodes(&sequenceAddress);
/*
* Alter the sequence's data type in the coordinator if needed.
* A sequence's type is bigint by default and it doesn't change even if
* it's used in an int column. However, when distributing the sequence,
* we don't allow incompatible min/max ranges between the coordinator and
* workers, so we determine the sequence type here based on its current usage
* and propagate that same type to the workers as well.
* TODO: move this command to the part where the sequence is
* used in a distributed table: both in create_distributed_table
* and ALTER TABLE commands that include a sequence default
*/
Oid currentSequenceTypeOid = sequenceData->seqtypid;
if (currentSequenceTypeOid != sequenceTypeOid)
{
AlterSeqStmt *alterSequenceStatement = makeNode(AlterSeqStmt);
char *seqNamespace = get_namespace_name(get_rel_namespace(sequenceOid));
char *seqName = get_rel_name(sequenceOid);
alterSequenceStatement->sequence = makeRangeVar(seqNamespace, seqName, -1);
Node *asTypeNode = (Node *) makeTypeNameFromOid(sequenceTypeOid, -1);
SetDefElemArg(alterSequenceStatement, "as", asTypeNode);
ParseState *pstate = make_parsestate(NULL);
AlterSequence(pstate, alterSequenceStatement);
}
/* create schema if needed */
appendStringInfo(wrappedSequenceDef,
WORKER_APPLY_SEQUENCE_COMMAND,
@ -1113,8 +1086,6 @@ SequenceDDLCommandsForTable(Oid relationId)
sequenceDDLList = lappend(sequenceDDLList, wrappedSequenceDef->data);
sequenceDDLList = lappend(sequenceDDLList, sequenceGrantStmt->data);
MarkObjectDistributed(&sequenceAddress);
}
return sequenceDDLList;
@ -1232,6 +1203,12 @@ GetDependentSequencesWithRelation(Oid relationId, List **attnumList,
/* to simplify and eliminate cases like "DEFAULT nextval('..') - nextval('..')" */
if (list_length(sequencesFromAttrDef) > 1)
{
if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
ereport(ERROR, (errmsg(
"More than one sequence in a column default"
" is not supported for adding local tables to metadata")));
}
ereport(ERROR, (errmsg("More than one sequence in a column default"
" is not supported for distribution")));
}

View File

@ -42,7 +42,6 @@ extern char * pg_get_indexclusterdef_string(Oid indexRelationId);
extern bool contain_nextval_expression_walker(Node *node, void *context);
extern char * pg_get_replica_identity_command(Oid tableRelationId);
extern const char * RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier);
extern void EnsureSequenceTypeSupported(Oid relationId, AttrNumber attnum, Oid seqTypId);
/* Function declarations for version dependent PostgreSQL ruleutils functions */
extern void pg_get_query_def(Query *query, StringInfo buffer);

View File

@ -345,18 +345,23 @@ extern List * PreprocessAlterSchemaRenameStmt(Node *node, const char *queryStrin
extern ObjectAddress AlterSchemaRenameStmtObjectAddress(Node *node, bool missing_ok);
/* sequence.c - forward declarations */
extern List * PreprocessAlterSequenceStmt(Node *stmt, const char *queryString,
extern List * PreprocessAlterSequenceStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessAlterSequenceSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessDropSequenceStmt(Node *stmt, const char *queryString,
extern List * PostprocessAlterSequenceSchemaStmt(Node *node, const char *queryString);
extern List * PreprocessAlterSequenceOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessAlterSequenceOwnerStmt(Node *node, const char *queryString);
extern List * PreprocessDropSequenceStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessRenameSequenceStmt(Node *stmt, const char *queryString,
extern List * PreprocessRenameSequenceStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern ObjectAddress AlterSequenceObjectAddress(Node *stmt, bool missing_ok);
extern ObjectAddress AlterSequenceSchemaStmtObjectAddress(Node *stmt, bool missing_ok);
extern ObjectAddress RenameSequenceStmtObjectAddress(Node *stmt, bool missing_ok);
extern ObjectAddress AlterSequenceStmtObjectAddress(Node *node, bool missing_ok);
extern ObjectAddress AlterSequenceSchemaStmtObjectAddress(Node *node, bool missing_ok);
extern ObjectAddress AlterSequenceOwnerStmtObjectAddress(Node *node, bool missing_ok);
extern ObjectAddress RenameSequenceStmtObjectAddress(Node *node, bool missing_ok);
extern void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt);
extern void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt);

View File

@ -132,8 +132,14 @@ extern char * DeparseAlterExtensionStmt(Node *stmt);
extern char * DeparseAlterDatabaseOwnerStmt(Node *node);
/* forward declarations for deparse_sequence_stmts.c */
extern char * DeparseDropSequenceStmt(Node *stmt);
extern char * DeparseRenameSequenceStmt(Node *stmt);
extern void QualifyRenameSequenceStmt(Node *stmt);
extern char * DeparseDropSequenceStmt(Node *node);
extern char * DeparseRenameSequenceStmt(Node *node);
extern char * DeparseAlterSequenceSchemaStmt(Node *node);
extern char * DeparseAlterSequenceOwnerStmt(Node *node);
/* forward declarations for qualify_sequence_stmt.c */
extern void QualifyRenameSequenceStmt(Node *node);
extern void QualifyAlterSequenceSchemaStmt(Node *node);
extern void QualifyAlterSequenceOwnerStmt(Node *node);
#endif /* CITUS_DEPARSER_H */

View File

@ -290,4 +290,11 @@ extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection,
uint64 *availableBytes,
uint64 *totalBytes);
extern void ExecuteQueryViaSPI(char *query, int SPIOK);
extern void EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId);
extern void AlterSequenceType(Oid seqOid, Oid typeOid);
extern void MarkSequenceListDistributedAndPropagateDependencies(List *sequenceList);
extern void MarkSequenceDistributedAndPropagateDependencies(Oid sequenceOid);
extern void EnsureDistributedSequencesHaveOneType(Oid relationId,
List *dependentSequenceList,
List *attnumList);
#endif /* METADATA_UTILITY_H */

View File

@ -8,24 +8,69 @@ SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
CREATE SCHEMA sequence_default;
SET search_path = sequence_default, public;
-- test both distributed and citus local tables
SELECT 1 FROM citus_add_node('localhost', :master_port, groupId => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
-- Cannot add a column involving DEFAULT nextval('..') because the table is not empty
CREATE SEQUENCE seq_0;
CREATE SEQUENCE seq_0_local_table;
-- check sequence type & other things
\d seq_0
Sequence "sequence_default.seq_0"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1
\d seq_0_local_table
Sequence "sequence_default.seq_0_local_table"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1
-- we can change the type of the sequence before using it in distributed tables
ALTER SEQUENCE seq_0 AS smallint;
\d seq_0
Sequence "sequence_default.seq_0"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
smallint | 1 | 1 | 32767 | 1 | no | 1
CREATE TABLE seq_test_0 (x int, y int);
CREATE TABLE seq_test_0_local_table (x int, y int);
SELECT create_distributed_table('seq_test_0','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT citus_add_local_table_to_metadata('seq_test_0_local_table');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
INSERT INTO seq_test_0 SELECT 1, s FROM generate_series(1, 50) s;
INSERT INTO seq_test_0_local_table SELECT 1, s FROM generate_series(1, 50) s;
ALTER TABLE seq_test_0 ADD COLUMN z int DEFAULT nextval('seq_0');
ERROR: cannot add a column involving DEFAULT nextval('..') because the table is not empty
HINT: You can first call ALTER TABLE .. ADD COLUMN .. smallint/int/bigint
Then set the default by ALTER TABLE .. ALTER COLUMN .. SET DEFAULT nextval('..')
ALTER TABLE seq_test_0_local_table ADD COLUMN z int DEFAULT nextval('seq_0_local_table');
ERROR: cannot add a column involving DEFAULT nextval('..') because the table is not empty
HINT: You can first call ALTER TABLE .. ADD COLUMN .. smallint/int/bigint
Then set the default by ALTER TABLE .. ALTER COLUMN .. SET DEFAULT nextval('..')
ALTER TABLE seq_test_0 ADD COLUMN z serial;
ERROR: Cannot add a column involving serial pseudotypes because the table is not empty
HINT: You can first call ALTER TABLE .. ADD COLUMN .. smallint/int/bigint
Then set the default by ALTER TABLE .. ALTER COLUMN .. SET DEFAULT nextval('..')
ALTER TABLE seq_test_0_local_table ADD COLUMN z serial;
ERROR: Cannot add a column involving serial pseudotypes because the table is not empty
HINT: You can first call ALTER TABLE .. ADD COLUMN .. smallint/int/bigint
Then set the default by ALTER TABLE .. ALTER COLUMN .. SET DEFAULT nextval('..')
-- follow hint
ALTER TABLE seq_test_0 ADD COLUMN z int;
ALTER TABLE seq_test_0 ALTER COLUMN z SET DEFAULT nextval('seq_0');
@ -47,6 +92,63 @@ SELECT * FROM seq_test_0 ORDER BY 1, 2 LIMIT 5;
y | integer | | |
z | integer | | | nextval('seq_0'::regclass)
ALTER TABLE seq_test_0_local_table ADD COLUMN z int;
ALTER TABLE seq_test_0_local_table ALTER COLUMN z SET DEFAULT nextval('seq_0_local_table');
SELECT * FROM seq_test_0_local_table ORDER BY 1, 2 LIMIT 5;
x | y | z
---------------------------------------------------------------------
1 | 1 |
1 | 2 |
1 | 3 |
1 | 4 |
1 | 5 |
(5 rows)
\d seq_test_0_local_table
Table "sequence_default.seq_test_0_local_table"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
x | integer | | |
y | integer | | |
z | integer | | | nextval('seq_0_local_table'::regclass)
-- check sequence type -> since it was used in a distributed table
-- type has changed to the type of the column it was used
-- in this case column z is of type int
\d seq_0
Sequence "sequence_default.seq_0"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
integer | 1 | 1 | 2147483647 | 1 | no | 1
\d seq_0_local_table
Sequence "sequence_default.seq_0_local_table"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
integer | 1 | 1 | 2147483647 | 1 | no | 1
-- cannot change the type of a sequence used in a distributed table
-- even if metadata is not synced to workers
ALTER SEQUENCE seq_0 AS bigint;
ERROR: Altering a sequence used in a distributed table is currently not supported.
ALTER SEQUENCE seq_0_local_table AS bigint;
ERROR: Altering a sequence used in a local table that is added to metadata is currently not supported.
-- we can change other things like increment
-- if metadata is not synced to workers
ALTER SEQUENCE seq_0 INCREMENT BY 2;
ALTER SEQUENCE seq_0_local_table INCREMENT BY 2;
\d seq_0
Sequence "sequence_default.seq_0"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
integer | 1 | 1 | 2147483647 | 2 | no | 1
\d seq_0_local_table
Sequence "sequence_default.seq_0_local_table"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
integer | 1 | 1 | 2147483647 | 2 | no | 1
-- check that we can add serial pseudo-type columns
-- when metadata is not yet synced to workers
TRUNCATE seq_test_0;
@ -56,11 +158,22 @@ ALTER TABLE seq_test_0 ADD COLUMN w10 serial;
ALTER TABLE seq_test_0 ADD COLUMN w11 serial4;
ALTER TABLE seq_test_0 ADD COLUMN w20 bigserial;
ALTER TABLE seq_test_0 ADD COLUMN w21 serial8;
TRUNCATE seq_test_0_local_table;
ALTER TABLE seq_test_0_local_table ADD COLUMN w00 smallserial;
ALTER TABLE seq_test_0_local_table ADD COLUMN w01 serial2;
ALTER TABLE seq_test_0_local_table ADD COLUMN w10 serial;
ALTER TABLE seq_test_0_local_table ADD COLUMN w11 serial4;
ALTER TABLE seq_test_0_local_table ADD COLUMN w20 bigserial;
ALTER TABLE seq_test_0_local_table ADD COLUMN w21 serial8;
-- check alter column type precaution
ALTER TABLE seq_test_0 ALTER COLUMN z TYPE bigint;
ERROR: cannot execute ALTER COLUMN TYPE .. command because the column involves a default coming from a sequence
ALTER TABLE seq_test_0 ALTER COLUMN z TYPE smallint;
ERROR: cannot execute ALTER COLUMN TYPE .. command because the column involves a default coming from a sequence
ALTER TABLE seq_test_0_local_table ALTER COLUMN z TYPE bigint;
ERROR: cannot execute ALTER COLUMN TYPE .. command because the column involves a default coming from a sequence
ALTER TABLE seq_test_0_local_table ALTER COLUMN z TYPE smallint;
ERROR: cannot execute ALTER COLUMN TYPE .. command because the column involves a default coming from a sequence
-- MX tests
-- check that there's not problem with group ID cache
CREATE TABLE seq_test_4 (x int, y int);
@ -91,6 +204,19 @@ INSERT INTO sequence_default.seq_test_4 VALUES (1,2) RETURNING *;
1 | 2 | | 268435457
(1 row)
-- check that we have can properly insert to tables from before metadata sync
INSERT INTO sequence_default.seq_test_0 VALUES (1,2) RETURNING *;
x | y | z | w00 | w01 | w10 | w11 | w20 | w21
---------------------------------------------------------------------
1 | 2 | 268435457 | 4097 | 4097 | 268435457 | 268435457 | 281474976710657 | 281474976710657
(1 row)
INSERT INTO sequence_default.seq_test_0_local_table VALUES (1,2) RETURNING *;
x | y | z | w00 | w01 | w10 | w11 | w20 | w21
---------------------------------------------------------------------
1 | 2 | 268435457 | 4097 | 4097 | 268435457 | 268435457 | 281474976710657 | 281474976710657
(1 row)
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path = sequence_default, public;
@ -100,7 +226,7 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
(1 row)
-- check sequence type consistency in all nodes
-- check sequence type consistency in all nodes for distributed tables
CREATE SEQUENCE seq_1;
-- type is bigint by default
\d seq_1
@ -141,10 +267,53 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
(1 row)
-- check sequence type consistency in all nodes for citus local tables
CREATE SEQUENCE seq_1_local_table;
-- type is bigint by default
\d seq_1_local_table
Sequence "sequence_default.seq_1_local_table"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1
CREATE TABLE seq_test_1_local_table (x int, y int);
SELECT citus_add_local_table_to_metadata('seq_test_1_local_table');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
ALTER TABLE seq_test_1_local_table ADD COLUMN z int DEFAULT nextval('seq_1_local_table');
-- type is changed to int
\d seq_1_local_table
Sequence "sequence_default.seq_1_local_table"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
integer | 1 | 1 | 2147483647 | 1 | no | 1
-- check insertion is within int bounds in the worker
\c - - - :worker_1_port
INSERT INTO sequence_default.seq_test_1_local_table values (1, 2) RETURNING *;
x | y | z
---------------------------------------------------------------------
1 | 2 | 268435457
(1 row)
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path = sequence_default, public;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- check that we cannot add serial pseudo-type columns
-- when metadata is synced to workers
ALTER TABLE seq_test_1 ADD COLUMN w bigserial;
ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes when metadata is synchronized to workers
ALTER TABLE seq_test_1_local_table ADD COLUMN w bigserial;
ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes when metadata is synchronized to workers
-- check for sequence type clashes
CREATE SEQUENCE seq_2;
CREATE TABLE seq_test_2 (x int, y bigint DEFAULT nextval('seq_2'));
@ -168,6 +337,8 @@ CREATE TABLE seq_test_2_0(x int, y smallint DEFAULT nextval('seq_2'));
-- shouldn't work
SELECT create_distributed_table('seq_test_2_0','x');
ERROR: The sequence sequence_default.seq_2 is already used for a different type in column 2 of the table sequence_default.seq_test_2
SELECT citus_add_local_table_to_metadata('seq_test_2_0');
ERROR: The sequence sequence_default.seq_2 is already used for a different type in column 2 of the table sequence_default.seq_test_2
DROP TABLE seq_test_2;
DROP TABLE seq_test_2_0;
-- should work
@ -183,6 +354,8 @@ CREATE TABLE seq_test_2 (x int, y int DEFAULT nextval('seq_2'), z bigint DEFAULT
-- shouldn't work
SELECT create_distributed_table('seq_test_2','x');
ERROR: The sequence sequence_default.seq_2 is already used for a different type in column 3 of the table sequence_default.seq_test_2
SELECT citus_add_local_table_to_metadata('seq_test_2');
ERROR: The sequence sequence_default.seq_2 is already used for a different type in column 3 of the table sequence_default.seq_test_2
-- check rename is propagated properly
ALTER SEQUENCE seq_2 RENAME TO sequence_2;
-- check in the worker
@ -202,6 +375,25 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
(1 row)
-- check rename is propagated properly when we use ALTER TABLE
ALTER TABLE sequence_2 RENAME TO seq_2;
-- check in the worker
\c - - - :worker_1_port
\d sequence_default.seq_2
Sequence "sequence_default.seq_2"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
bigint | 281474976710657 | 281474976710657 | 562949953421313 | 1 | no | 1
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path = sequence_default, public;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- check rename with another schema
-- we notice that schema is also propagated as one of the sequence's dependencies
CREATE SCHEMA sequence_default_0;
@ -354,7 +546,7 @@ CREATE SCHEMA sequence_default_8;
-- can change schema in a sequence not yet distributed
ALTER SEQUENCE seq_8 SET SCHEMA sequence_default_8;
ALTER SEQUENCE sequence_default_8.seq_8 SET SCHEMA sequence_default;
CREATE TABLE seq_test_8 (x int, y int DEFAULT nextval('seq_8'));
CREATE TABLE seq_test_8 (x int, y int DEFAULT nextval('seq_8'), z bigserial);
SELECT create_distributed_table('seq_test_8', 'x');
create_distributed_table
---------------------------------------------------------------------
@ -363,29 +555,179 @@ SELECT create_distributed_table('seq_test_8', 'x');
-- cannot change sequence specifications
ALTER SEQUENCE seq_8 AS bigint;
ERROR: This operation is currently not allowed for a distributed sequence.
ERROR: Altering a distributed sequence is currently not supported.
ALTER SEQUENCE seq_8 INCREMENT BY 2;
ERROR: This operation is currently not allowed for a distributed sequence.
ERROR: Altering a distributed sequence is currently not supported.
ALTER SEQUENCE seq_8 MINVALUE 5 MAXVALUE 5000;
ERROR: This operation is currently not allowed for a distributed sequence.
ERROR: Altering a distributed sequence is currently not supported.
ALTER SEQUENCE seq_8 START WITH 6;
ERROR: This operation is currently not allowed for a distributed sequence.
ERROR: Altering a distributed sequence is currently not supported.
ALTER SEQUENCE seq_8 RESTART WITH 6;
ERROR: This operation is currently not allowed for a distributed sequence.
ERROR: Altering a distributed sequence is currently not supported.
ALTER SEQUENCE seq_8 NO CYCLE;
ERROR: This operation is currently not allowed for a distributed sequence.
ERROR: Altering a distributed sequence is currently not supported.
ALTER SEQUENCE seq_8 OWNED BY seq_test_7;
ERROR: This operation is currently not allowed for a distributed sequence.
-- cannot change schema in a distributed sequence
ERROR: Altering a distributed sequence is currently not supported.
ALTER SEQUENCE seq_test_8_z_seq AS smallint;
ERROR: Altering a distributed sequence is currently not supported.
ALTER SEQUENCE seq_test_8_z_seq INCREMENT BY 2;
ERROR: Altering a distributed sequence is currently not supported.
ALTER SEQUENCE seq_test_8_z_seq MINVALUE 5 MAXVALUE 5000;
ERROR: Altering a distributed sequence is currently not supported.
ALTER SEQUENCE seq_test_8_z_seq START WITH 6;
ERROR: Altering a distributed sequence is currently not supported.
ALTER SEQUENCE seq_test_8_z_seq RESTART WITH 6;
ERROR: Altering a distributed sequence is currently not supported.
ALTER SEQUENCE seq_test_8_z_seq NO CYCLE;
ERROR: Altering a distributed sequence is currently not supported.
ALTER SEQUENCE seq_test_8_z_seq OWNED BY seq_test_7;
ERROR: cannot alter OWNED BY option of a sequence already owned by a distributed table
-- can change schema in a distributed sequence
-- sequence_default_8 will be created in workers as part of dependencies
ALTER SEQUENCE seq_8 SET SCHEMA sequence_default_8;
ERROR: This operation is currently not allowed for a distributed sequence.
\c - - - :worker_1_port
\d sequence_default_8.seq_8
Sequence "sequence_default_8.seq_8"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
bigint | 268435457 | 268435457 | 536870913 | 1 | no | 1
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path = sequence_default, public;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- check we can change the schema when we use ALTER TABLE
ALTER TABLE sequence_default_8.seq_8 SET SCHEMA sequence_default;
\c - - - :worker_1_port
\d sequence_default.seq_8
Sequence "sequence_default.seq_8"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
bigint | 268435457 | 268435457 | 536870913 | 1 | no | 1
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path = sequence_default, public;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
DROP SCHEMA sequence_default_8;
SELECT run_command_on_workers('DROP SCHEMA IF EXISTS sequence_default_8 CASCADE');
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"DROP SCHEMA")
(localhost,57638,t,"DROP SCHEMA")
(2 rows)
-- cannot use more than one sequence in a column default
CREATE SEQUENCE seq_9;
CREATE SEQUENCE seq_10;
CREATE TABLE seq_test_9 (x int, y int DEFAULT nextval('seq_9') - nextval('seq_10'));
SELECT create_distributed_table('seq_test_9', 'x');
ERROR: More than one sequence in a column default is not supported for distribution
SELECT citus_add_local_table_to_metadata('seq_test_9');
ERROR: More than one sequence in a column default is not supported for adding local tables to metadata
ALTER TABLE seq_test_9 ALTER COLUMN y SET DEFAULT nextval('seq_9');
SELECT create_distributed_table('seq_test_9', 'x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- we can change the owner role of a sequence
CREATE ROLE seq_role_0;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
CREATE ROLE seq_role_1;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
ALTER SEQUENCE seq_10 OWNER TO seq_role_0;
SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2;
sequencename | sequenceowner
---------------------------------------------------------------------
seq_10 | seq_role_0
(1 row)
SELECT run_command_on_workers('CREATE ROLE seq_role_0');
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"CREATE ROLE")
(localhost,57638,t,"CREATE ROLE")
(2 rows)
SELECT run_command_on_workers('CREATE ROLE seq_role_1');
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"CREATE ROLE")
(localhost,57638,t,"CREATE ROLE")
(2 rows)
ALTER TABLE seq_test_9 ALTER COLUMN y SET DEFAULT nextval('seq_10');
ALTER SEQUENCE seq_10 OWNER TO seq_role_1;
SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2;
sequencename | sequenceowner
---------------------------------------------------------------------
seq_10 | seq_role_1
(1 row)
\c - - - :worker_1_port
SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2;
sequencename | sequenceowner
---------------------------------------------------------------------
seq_10 | seq_role_1
(1 row)
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path = sequence_default, public;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- check we can change the owner role of a sequence when we use ALTER TABLE
ALTER TABLE seq_10 OWNER TO seq_role_0;
SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2;
sequencename | sequenceowner
---------------------------------------------------------------------
seq_10 | seq_role_0
(1 row)
\c - - - :worker_1_port
SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2;
sequencename | sequenceowner
---------------------------------------------------------------------
seq_10 | seq_role_0
(1 row)
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path = sequence_default, public;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
DROP SEQUENCE seq_10 CASCADE;
NOTICE: drop cascades to default value for column y of table seq_test_9
DROP ROLE seq_role_0, seq_role_1;
SELECT run_command_on_workers('DROP ROLE IF EXISTS seq_role_0, seq_role_1');
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"DROP ROLE")
(localhost,57638,t,"DROP ROLE")
(2 rows)
-- Check some cases when default is defined by
-- DEFAULT nextval('seq_name'::text) (not by DEFAULT nextval('seq_name'))
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
@ -426,14 +768,20 @@ ERROR: relation "seq_11" does not exist
-- clean up
DROP TABLE sequence_default.seq_test_7_par;
DROP SCHEMA sequence_default CASCADE;
NOTICE: drop cascades to 23 other objects
NOTICE: drop cascades to 29 other objects
DETAIL: drop cascades to sequence sequence_default.seq_0
drop cascades to sequence sequence_default.seq_0_local_table
drop cascades to table sequence_default.seq_test_0
drop cascades to table sequence_default.seq_test_0_local_table_890004
drop cascades to table sequence_default.seq_test_0_local_table
drop cascades to table sequence_default.seq_test_4
drop cascades to sequence sequence_default.seq_4
drop cascades to sequence sequence_default.seq_1
drop cascades to table sequence_default.seq_test_1
drop cascades to sequence sequence_default.sequence_2
drop cascades to sequence sequence_default.seq_1_local_table
drop cascades to table sequence_default.seq_test_1_local_table_102016
drop cascades to table sequence_default.seq_test_1_local_table
drop cascades to sequence sequence_default.seq_2
drop cascades to table sequence_default.seq_test_2
drop cascades to table sequence_default.seq_test_3
drop cascades to table sequence_default.seq_test_5
@ -445,10 +793,10 @@ drop cascades to sequence sequence_default.seq_7_par
drop cascades to sequence sequence_default.seq_8
drop cascades to table sequence_default.seq_test_8
drop cascades to sequence sequence_default.seq_9
drop cascades to sequence sequence_default.seq_10
drop cascades to table sequence_default.seq_test_9
drop cascades to sequence sequence_default.seq_11
drop cascades to table sequence_default.seq_test_10
drop cascades to table sequence_default.seq_test_10_102060
drop cascades to table sequence_default.seq_test_11
SELECT run_command_on_workers('DROP SCHEMA IF EXISTS sequence_default CASCADE');
run_command_on_workers
@ -463,4 +811,10 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
(1 row)
SELECT master_remove_node('localhost', :master_port);
master_remove_node
---------------------------------------------------------------------
(1 row)
SET search_path TO public;

View File

@ -116,9 +116,7 @@ HINT: Use a sequence in a distributed table by specifying a serial column type
CREATE SEQUENCE standalone_sequence;
ALTER SEQUENCE standalone_sequence OWNED BY testserialtable.group_id;
-- an edge case, but it's OK to change an owner to the same distributed table
-- EDIT: this doesn't work for now for a distributed sequence
ALTER SEQUENCE testserialtable_id_seq OWNED BY testserialtable.id;
ERROR: This operation is currently not allowed for a distributed sequence.
-- drop distributed table
\c - - - :master_port
DROP TABLE testserialtable;

View File

@ -11,18 +11,51 @@ CREATE SCHEMA sequence_default;
SET search_path = sequence_default, public;
-- test both distributed and citus local tables
SELECT 1 FROM citus_add_node('localhost', :master_port, groupId => 0);
-- Cannot add a column involving DEFAULT nextval('..') because the table is not empty
CREATE SEQUENCE seq_0;
CREATE SEQUENCE seq_0_local_table;
-- check sequence type & other things
\d seq_0
\d seq_0_local_table
-- we can change the type of the sequence before using it in distributed tables
ALTER SEQUENCE seq_0 AS smallint;
\d seq_0
CREATE TABLE seq_test_0 (x int, y int);
CREATE TABLE seq_test_0_local_table (x int, y int);
SELECT create_distributed_table('seq_test_0','x');
SELECT citus_add_local_table_to_metadata('seq_test_0_local_table');
INSERT INTO seq_test_0 SELECT 1, s FROM generate_series(1, 50) s;
INSERT INTO seq_test_0_local_table SELECT 1, s FROM generate_series(1, 50) s;
ALTER TABLE seq_test_0 ADD COLUMN z int DEFAULT nextval('seq_0');
ALTER TABLE seq_test_0_local_table ADD COLUMN z int DEFAULT nextval('seq_0_local_table');
ALTER TABLE seq_test_0 ADD COLUMN z serial;
ALTER TABLE seq_test_0_local_table ADD COLUMN z serial;
-- follow hint
ALTER TABLE seq_test_0 ADD COLUMN z int;
ALTER TABLE seq_test_0 ALTER COLUMN z SET DEFAULT nextval('seq_0');
SELECT * FROM seq_test_0 ORDER BY 1, 2 LIMIT 5;
\d seq_test_0
ALTER TABLE seq_test_0_local_table ADD COLUMN z int;
ALTER TABLE seq_test_0_local_table ALTER COLUMN z SET DEFAULT nextval('seq_0_local_table');
SELECT * FROM seq_test_0_local_table ORDER BY 1, 2 LIMIT 5;
\d seq_test_0_local_table
-- check sequence type -> since it was used in a distributed table
-- type has changed to the type of the column it was used
-- in this case column z is of type int
\d seq_0
\d seq_0_local_table
-- cannot change the type of a sequence used in a distributed table
-- even if metadata is not synced to workers
ALTER SEQUENCE seq_0 AS bigint;
ALTER SEQUENCE seq_0_local_table AS bigint;
-- we can change other things like increment
-- if metadata is not synced to workers
ALTER SEQUENCE seq_0 INCREMENT BY 2;
ALTER SEQUENCE seq_0_local_table INCREMENT BY 2;
\d seq_0
\d seq_0_local_table
-- check that we can add serial pseudo-type columns
@ -35,10 +68,21 @@ ALTER TABLE seq_test_0 ADD COLUMN w11 serial4;
ALTER TABLE seq_test_0 ADD COLUMN w20 bigserial;
ALTER TABLE seq_test_0 ADD COLUMN w21 serial8;
TRUNCATE seq_test_0_local_table;
ALTER TABLE seq_test_0_local_table ADD COLUMN w00 smallserial;
ALTER TABLE seq_test_0_local_table ADD COLUMN w01 serial2;
ALTER TABLE seq_test_0_local_table ADD COLUMN w10 serial;
ALTER TABLE seq_test_0_local_table ADD COLUMN w11 serial4;
ALTER TABLE seq_test_0_local_table ADD COLUMN w20 bigserial;
ALTER TABLE seq_test_0_local_table ADD COLUMN w21 serial8;
-- check alter column type precaution
ALTER TABLE seq_test_0 ALTER COLUMN z TYPE bigint;
ALTER TABLE seq_test_0 ALTER COLUMN z TYPE smallint;
ALTER TABLE seq_test_0_local_table ALTER COLUMN z TYPE bigint;
ALTER TABLE seq_test_0_local_table ALTER COLUMN z TYPE smallint;
-- MX tests
@ -55,13 +99,18 @@ ALTER TABLE seq_test_4 ADD COLUMN b int DEFAULT nextval('seq_4');
-- on worker it should generate high sequence number
\c - - - :worker_1_port
INSERT INTO sequence_default.seq_test_4 VALUES (1,2) RETURNING *;
-- check that we have can properly insert to tables from before metadata sync
INSERT INTO sequence_default.seq_test_0 VALUES (1,2) RETURNING *;
INSERT INTO sequence_default.seq_test_0_local_table VALUES (1,2) RETURNING *;
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path = sequence_default, public;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- check sequence type consistency in all nodes
-- check sequence type consistency in all nodes for distributed tables
CREATE SEQUENCE seq_1;
-- type is bigint by default
\d seq_1
@ -79,9 +128,28 @@ SET search_path = sequence_default, public;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- check sequence type consistency in all nodes for citus local tables
CREATE SEQUENCE seq_1_local_table;
-- type is bigint by default
\d seq_1_local_table
CREATE TABLE seq_test_1_local_table (x int, y int);
SELECT citus_add_local_table_to_metadata('seq_test_1_local_table');
ALTER TABLE seq_test_1_local_table ADD COLUMN z int DEFAULT nextval('seq_1_local_table');
-- type is changed to int
\d seq_1_local_table
-- check insertion is within int bounds in the worker
\c - - - :worker_1_port
INSERT INTO sequence_default.seq_test_1_local_table values (1, 2) RETURNING *;
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path = sequence_default, public;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- check that we cannot add serial pseudo-type columns
-- when metadata is synced to workers
ALTER TABLE seq_test_1 ADD COLUMN w bigserial;
ALTER TABLE seq_test_1_local_table ADD COLUMN w bigserial;
-- check for sequence type clashes
@ -96,6 +164,7 @@ SELECT create_distributed_table('seq_test_2','x');
CREATE TABLE seq_test_2_0(x int, y smallint DEFAULT nextval('seq_2'));
-- shouldn't work
SELECT create_distributed_table('seq_test_2_0','x');
SELECT citus_add_local_table_to_metadata('seq_test_2_0');
DROP TABLE seq_test_2;
DROP TABLE seq_test_2_0;
-- should work
@ -105,6 +174,7 @@ DROP TABLE seq_test_2;
CREATE TABLE seq_test_2 (x int, y int DEFAULT nextval('seq_2'), z bigint DEFAULT nextval('seq_2'));
-- shouldn't work
SELECT create_distributed_table('seq_test_2','x');
SELECT citus_add_local_table_to_metadata('seq_test_2');
-- check rename is propagated properly
@ -116,6 +186,15 @@ ALTER SEQUENCE seq_2 RENAME TO sequence_2;
SET citus.shard_replication_factor TO 1;
SET search_path = sequence_default, public;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- check rename is propagated properly when we use ALTER TABLE
ALTER TABLE sequence_2 RENAME TO seq_2;
-- check in the worker
\c - - - :worker_1_port
\d sequence_default.seq_2
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path = sequence_default, public;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- check rename with another schema
-- we notice that schema is also propagated as one of the sequence's dependencies
CREATE SCHEMA sequence_default_0;
@ -195,7 +274,7 @@ CREATE SCHEMA sequence_default_8;
-- can change schema in a sequence not yet distributed
ALTER SEQUENCE seq_8 SET SCHEMA sequence_default_8;
ALTER SEQUENCE sequence_default_8.seq_8 SET SCHEMA sequence_default;
CREATE TABLE seq_test_8 (x int, y int DEFAULT nextval('seq_8'));
CREATE TABLE seq_test_8 (x int, y int DEFAULT nextval('seq_8'), z bigserial);
SELECT create_distributed_table('seq_test_8', 'x');
-- cannot change sequence specifications
ALTER SEQUENCE seq_8 AS bigint;
@ -205,9 +284,32 @@ ALTER SEQUENCE seq_8 START WITH 6;
ALTER SEQUENCE seq_8 RESTART WITH 6;
ALTER SEQUENCE seq_8 NO CYCLE;
ALTER SEQUENCE seq_8 OWNED BY seq_test_7;
-- cannot change schema in a distributed sequence
ALTER SEQUENCE seq_test_8_z_seq AS smallint;
ALTER SEQUENCE seq_test_8_z_seq INCREMENT BY 2;
ALTER SEQUENCE seq_test_8_z_seq MINVALUE 5 MAXVALUE 5000;
ALTER SEQUENCE seq_test_8_z_seq START WITH 6;
ALTER SEQUENCE seq_test_8_z_seq RESTART WITH 6;
ALTER SEQUENCE seq_test_8_z_seq NO CYCLE;
ALTER SEQUENCE seq_test_8_z_seq OWNED BY seq_test_7;
-- can change schema in a distributed sequence
-- sequence_default_8 will be created in workers as part of dependencies
ALTER SEQUENCE seq_8 SET SCHEMA sequence_default_8;
\c - - - :worker_1_port
\d sequence_default_8.seq_8
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path = sequence_default, public;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- check we can change the schema when we use ALTER TABLE
ALTER TABLE sequence_default_8.seq_8 SET SCHEMA sequence_default;
\c - - - :worker_1_port
\d sequence_default.seq_8
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path = sequence_default, public;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
DROP SCHEMA sequence_default_8;
SELECT run_command_on_workers('DROP SCHEMA IF EXISTS sequence_default_8 CASCADE');
-- cannot use more than one sequence in a column default
@ -215,6 +317,39 @@ CREATE SEQUENCE seq_9;
CREATE SEQUENCE seq_10;
CREATE TABLE seq_test_9 (x int, y int DEFAULT nextval('seq_9') - nextval('seq_10'));
SELECT create_distributed_table('seq_test_9', 'x');
SELECT citus_add_local_table_to_metadata('seq_test_9');
ALTER TABLE seq_test_9 ALTER COLUMN y SET DEFAULT nextval('seq_9');
SELECT create_distributed_table('seq_test_9', 'x');
-- we can change the owner role of a sequence
CREATE ROLE seq_role_0;
CREATE ROLE seq_role_1;
ALTER SEQUENCE seq_10 OWNER TO seq_role_0;
SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2;
SELECT run_command_on_workers('CREATE ROLE seq_role_0');
SELECT run_command_on_workers('CREATE ROLE seq_role_1');
ALTER TABLE seq_test_9 ALTER COLUMN y SET DEFAULT nextval('seq_10');
ALTER SEQUENCE seq_10 OWNER TO seq_role_1;
SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2;
\c - - - :worker_1_port
SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2;
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path = sequence_default, public;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- check we can change the owner role of a sequence when we use ALTER TABLE
ALTER TABLE seq_10 OWNER TO seq_role_0;
SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2;
\c - - - :worker_1_port
SELECT sequencename, sequenceowner FROM pg_sequences WHERE sequencename = 'seq_10' ORDER BY 1, 2;
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path = sequence_default, public;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
DROP SEQUENCE seq_10 CASCADE;
DROP ROLE seq_role_0, seq_role_1;
SELECT run_command_on_workers('DROP ROLE IF EXISTS seq_role_0, seq_role_1');
-- Check some cases when default is defined by
@ -234,9 +369,11 @@ SELECT create_distributed_table('seq_test_11', 'col1');
INSERT INTO sequence_default.seq_test_10 VALUES (1);
\c - - - :master_port
-- clean up
DROP TABLE sequence_default.seq_test_7_par;
DROP SCHEMA sequence_default CASCADE;
SELECT run_command_on_workers('DROP SCHEMA IF EXISTS sequence_default CASCADE');
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT master_remove_node('localhost', :master_port);
SET search_path TO public;

View File

@ -86,7 +86,6 @@ CREATE SEQUENCE standalone_sequence;
ALTER SEQUENCE standalone_sequence OWNED BY testserialtable.group_id;
-- an edge case, but it's OK to change an owner to the same distributed table
-- EDIT: this doesn't work for now for a distributed sequence
ALTER SEQUENCE testserialtable_id_seq OWNED BY testserialtable.id;
-- drop distributed table